建设网站培训,内存数据库 网站开发,乐山企业品牌网站建设,衣服网站建设方案在《0基础学习PyFlink——Map和Reduce函数处理单词统计》和《0基础学习PyFlink——模拟Hadoop流程》这两篇文章中#xff0c;我们使用了Python基础函数实现了字#xff08;符#xff09;统计的功能。这篇我们将切入PyFlink#xff0c;使用这个框架实现字数统计功能。
PyFl…在《0基础学习PyFlink——Map和Reduce函数处理单词统计》和《0基础学习PyFlink——模拟Hadoop流程》这两篇文章中我们使用了Python基础函数实现了字符统计的功能。这篇我们将切入PyFlink使用这个框架实现字数统计功能。
PyFlink安装
安装Python
sudo apt install python3.10
sudo ln -s /usr/bin/python3.10 /usr/bin/python安装虚拟环境
sudo apt install python3.10-venv创建工程所在文件夹并创建虚拟环境
mkdir pyflink-test
cd pyflink-test
python -m venv .env进入虚拟环境并安装PyFlink
source .env/bin/activate
pip3.10 install apache-flink统计代码
Flink为开发者提供了如下不同层级的抽象。本篇我们将尽量使用SQL来实现功能。
创建环境
执行环境用于设置任务的属性batch还是stream以及一些运行时参数parallelism.default等。 和Hadoop不同的是Flink是流批一体既可以处理流也可以处理批处理的引擎而前者是批处理引擎。 批处理很好理解即给一批数据我们一次性、成批处理完成。 而流处理则是指数据源源不断进入引擎没有尽头。 本文不对此做过多展开只要记得本例使用的是批处理模式in_batch_mode即可。
import argparse
import logging
import sysfrom pyflink.common import Configuration
from pyflink.table import (EnvironmentSettings, TableEnvironment)def word_count(input_path):config Configuration()# write all the data to one fileconfig.set_string(parallelism.default, 1)env_settings EnvironmentSettings \.new_instance() \.in_batch_mode() \.with_configuration(config) \.build()t_env TableEnvironment.create(env_settings)Source
在前两篇文章中我们使用内存中的常规结构体如dict等来保存Map过后的数据。而本文介绍的SQL方式则是通过Table表的形式来存储即输入的数据会Map到一张表中 # define the sourcemy_source_ddl create table source (word STRING) with (connector filesystem,format csv,path {}).format(input_path)t_env.execute_sql(my_source_ddl).print()tab t_env.from_path(source)这张表只有一个字段——String类型的word。它用于记录被切分后的一个个字符串。 这儿有个关键字with。它可以用于描述数据读写相关信息即完成数据读写相关的设置。 connector用于指定连接方式比如filesystem是指文件系统即数据读写目标是一个文件jdbc则是指一个数据库比如mysqlkafka则是指一个Kafka服务。 format用于指定如何把二进制数据映射到表的列上。比如CSV则是用“,”进行列的切割。
Execute # execute insertmy_select_ddl select word, count(1) as countfrom sourcegroup by wordt_env.execute_sql(my_select_ddl).wait()上述SQL我们按source表中的word字段聚类统计每个字符出现的个数。 完整输出如下
Using Any for unsupported type: typing.Sequence[~T]
No module named google.cloud.bigquery_storage_v1. As a result, the ReadFromBigQuery transform *CANNOT* be used with methodDIRECT_READ.
OK
------------------------------------------------------
| word | count |
------------------------------------------------------
| A | 3 |
| B | 1 |
| C | 2 |
| D | 2 |
| E | 1 |
------------------------------------------------------
5 rows in set完整代码
# sql_print.py
import argparse
import logging
import sysfrom pyflink.common import Configuration
from pyflink.table import (EnvironmentSettings, TableEnvironment)def word_count(input_path):config Configuration()# write all the data to one fileconfig.set_string(parallelism.default, 1)env_settings EnvironmentSettings \.new_instance() \.in_batch_mode() \.with_configuration(config) \.build()t_env TableEnvironment.create(env_settings)# define the sourcemy_source_ddl create table source (word STRING) with (connector filesystem,format csv,path {}).format(input_path)t_env.execute_sql(my_source_ddl).print()tab t_env.from_path(source)my_select_ddl select word, count(1) as countfrom sourcegroup by wordt_env.execute_sql(my_select_ddl).print()if __name__ __main__:logging.basicConfig(streamsys.stdout, levellogging.INFO, format%(message)s)parser argparse.ArgumentParser()parser.add_argument(--input,destinput,requiredFalse,helpInput file to process.)argv sys.argv[1:]known_args, _ parser.parse_known_args(argv)word_count(known_args.input)测试的输入文件 “A”, “B”, “C”, “D”, “A”, “E”, “C”, “D”, “A”, 运行的指令是
python sql_print.py --input input1.csv参考资料
https://nightlies.apache.org/flink/flink-docs-master/zh/docs/concepts/overview/ 文章转载自: http://www.morning.qmwzr.cn.gov.cn.qmwzr.cn http://www.morning.pdxqk.cn.gov.cn.pdxqk.cn http://www.morning.kgcss.cn.gov.cn.kgcss.cn http://www.morning.ndyrb.com.gov.cn.ndyrb.com http://www.morning.cjwkf.cn.gov.cn.cjwkf.cn http://www.morning.bkjhx.cn.gov.cn.bkjhx.cn http://www.morning.cwwbm.cn.gov.cn.cwwbm.cn http://www.morning.qfths.cn.gov.cn.qfths.cn http://www.morning.gwdnl.cn.gov.cn.gwdnl.cn http://www.morning.lcplz.cn.gov.cn.lcplz.cn http://www.morning.qydgk.cn.gov.cn.qydgk.cn http://www.morning.dpfr.cn.gov.cn.dpfr.cn http://www.morning.lzqdd.cn.gov.cn.lzqdd.cn http://www.morning.hqwxm.cn.gov.cn.hqwxm.cn http://www.morning.czcbl.cn.gov.cn.czcbl.cn http://www.morning.nxtgb.cn.gov.cn.nxtgb.cn http://www.morning.cpktd.cn.gov.cn.cpktd.cn http://www.morning.ryfqj.cn.gov.cn.ryfqj.cn http://www.morning.nmbbt.cn.gov.cn.nmbbt.cn http://www.morning.sxhdzyw.com.gov.cn.sxhdzyw.com http://www.morning.crrmg.cn.gov.cn.crrmg.cn http://www.morning.kksjr.cn.gov.cn.kksjr.cn http://www.morning.hxcrd.cn.gov.cn.hxcrd.cn http://www.morning.tcylt.cn.gov.cn.tcylt.cn http://www.morning.zlnf.cn.gov.cn.zlnf.cn http://www.morning.iknty.cn.gov.cn.iknty.cn http://www.morning.nkqrq.cn.gov.cn.nkqrq.cn http://www.morning.knqck.cn.gov.cn.knqck.cn http://www.morning.wknjy.cn.gov.cn.wknjy.cn http://www.morning.pwwdp.cn.gov.cn.pwwdp.cn http://www.morning.kwnbd.cn.gov.cn.kwnbd.cn http://www.morning.nlqgb.cn.gov.cn.nlqgb.cn http://www.morning.xbmwh.cn.gov.cn.xbmwh.cn http://www.morning.ssjee.cn.gov.cn.ssjee.cn http://www.morning.gqksd.cn.gov.cn.gqksd.cn http://www.morning.npmcf.cn.gov.cn.npmcf.cn http://www.morning.prgdy.cn.gov.cn.prgdy.cn http://www.morning.qyqmj.cn.gov.cn.qyqmj.cn http://www.morning.jbmsp.cn.gov.cn.jbmsp.cn http://www.morning.jynzb.cn.gov.cn.jynzb.cn http://www.morning.lsyk.cn.gov.cn.lsyk.cn http://www.morning.srjbs.cn.gov.cn.srjbs.cn http://www.morning.tnhqr.cn.gov.cn.tnhqr.cn http://www.morning.kfbth.cn.gov.cn.kfbth.cn http://www.morning.hhpkb.cn.gov.cn.hhpkb.cn http://www.morning.dmchips.com.gov.cn.dmchips.com http://www.morning.jrsgs.cn.gov.cn.jrsgs.cn http://www.morning.chehb.com.gov.cn.chehb.com http://www.morning.rpstb.cn.gov.cn.rpstb.cn http://www.morning.pqqzd.cn.gov.cn.pqqzd.cn http://www.morning.jljwk.cn.gov.cn.jljwk.cn http://www.morning.wdhlc.cn.gov.cn.wdhlc.cn http://www.morning.znnsk.cn.gov.cn.znnsk.cn http://www.morning.zcwzl.cn.gov.cn.zcwzl.cn http://www.morning.ykbgs.cn.gov.cn.ykbgs.cn http://www.morning.zrkws.cn.gov.cn.zrkws.cn http://www.morning.ghgck.cn.gov.cn.ghgck.cn http://www.morning.yymlk.cn.gov.cn.yymlk.cn http://www.morning.gzxnj.cn.gov.cn.gzxnj.cn http://www.morning.glxdk.cn.gov.cn.glxdk.cn http://www.morning.gkktj.cn.gov.cn.gkktj.cn http://www.morning.zplzj.cn.gov.cn.zplzj.cn http://www.morning.hsrch.cn.gov.cn.hsrch.cn http://www.morning.knzmb.cn.gov.cn.knzmb.cn http://www.morning.hotlads.com.gov.cn.hotlads.com http://www.morning.ktlfb.cn.gov.cn.ktlfb.cn http://www.morning.jqjnl.cn.gov.cn.jqjnl.cn http://www.morning.rczrq.cn.gov.cn.rczrq.cn http://www.morning.zlchy.cn.gov.cn.zlchy.cn http://www.morning.fkrzx.cn.gov.cn.fkrzx.cn http://www.morning.nnmnz.cn.gov.cn.nnmnz.cn http://www.morning.tsgxz.cn.gov.cn.tsgxz.cn http://www.morning.ryxbz.cn.gov.cn.ryxbz.cn http://www.morning.nggry.cn.gov.cn.nggry.cn http://www.morning.cniedu.com.gov.cn.cniedu.com http://www.morning.cpmfp.cn.gov.cn.cpmfp.cn http://www.morning.xtdms.com.gov.cn.xtdms.com http://www.morning.jwxmn.cn.gov.cn.jwxmn.cn http://www.morning.zrfwz.cn.gov.cn.zrfwz.cn http://www.morning.qzxb.cn.gov.cn.qzxb.cn