当前位置: 首页 > news >正文

重庆涪陵网站建设公司东营做网站优化价格

重庆涪陵网站建设公司,东营做网站优化价格,全国网站集约化建设试点,网站放在服务器上大纲 Tumbling Count WindowsmapreduceWindow Size为2Window Size为3Window Size为4Window Size为5Window Size为6 完整代码参考资料 之前的案例中#xff0c;我们的Source都是确定内容的数据。而Flink是可以处理流式#xff08;Streaming#xff09;数据的#xff0c;就是… 大纲 Tumbling Count WindowsmapreduceWindow Size为2Window Size为3Window Size为4Window Size为5Window Size为6 完整代码参考资料 之前的案例中我们的Source都是确定内容的数据。而Flink是可以处理流式Streaming数据的就是数据会源源不断输入。 对于这种数据我们称之为无界流即没有“终止的界限”。但是程序在底层一定不能等着无止境的数据都传递结束再处理因为“无止境”就意味着“终止的界限”触发计算的条件是不存在的。那么我们可以人为的给它设置一个“界”这就是我们本节介绍的窗口。 Tumbling Count Windows Tumbling Count Windows是指按元素个数计数的滚动窗口。 滚动窗口是指没有元素重叠的窗口比如下面图是个数为2的窗口。元素重叠的窗口我们会在《0基础学习PyFlink——个数滑动窗口Sliding Count Windows》介绍 个数为3的窗口 我们用代码探索下这个概念 map word_count_data [(A,2),(A,1),(B,3),(B,1),(B,2),(C,3),(C,1),(C,4),(C,2),(D,3),(D,1),(D,4),(D,2),(D,5),(E,3),(E,1),(E,4),(E,2),(E,6),(E,5)]def word_count():env StreamExecutionEnvironment.get_execution_environment()env.set_runtime_mode(RuntimeExecutionMode.STREAMING)# write all the data to one fileenv.set_parallelism(1)source_type_info Types.TUPLE([Types.STRING(), Types.INT()])# define the source# mappgingsource env.from_collection(word_count_data, source_type_info)# source.print()# keyingkeyedsource.key_by(lambda i: i[0]) 这段代码构造了一个KeyedStream用于存储word_count_data中的数据。 我们并没有让Source是流的形式是因为为了降低例子复杂度。但是我们将runntime mode设置为流STREAMING模式。 reduce 我们需要定义一个Reduce类用于对元组中的数据进行计算。这个类需要继承于WindowFunction并实现相应方法本例中是apply。 apply会计算一个相同key的元素个数。比如key是“E”的元组个数是6。 class SumWindowFunction(WindowFunction[tuple, tuple, str, CountWindow]):def apply(self, key: str, window: CountWindow, inputs: Iterable[tuple]):return [(key, len([e for e in inputs]))]Window Size为2 # reducingreducedkeyed.count_window(2) \.apply(SumWindowFunction(),Types.TUPLE([Types.STRING(), Types.INT()]))# # define the sinkreduced.print()# submit for executionenv.execute()(A,2) (B,2) (C,2) (C,2) (D,2) (D,2) (E,2) (E,2) (E,2) A的个数是2是因为A的确只有两个元组而一个Size为2的Window正好承载了这两个元素。于是有A,2这个结果B的个数是3。但是会产生两个窗口第一个窗口承载了前两个元素第二个窗口当前只有一个元素。于是第一个窗口进行了Reduce计算得出一个(B,2)第二个窗口还没进行reduce计算就没有展现出结果C有4个正好可以被2个窗口承载。这样我们就看到2个(C,2)。D有5个情况和B类似。它被分成了3个窗口只有2个窗口满足个数条件于是就输出2个(D,2)最后一个窗口因为元素不够就没尽兴reduce计算了。E有6个正好被3个窗口承载。我们就看到3个(E,2)。 Window Size为3 # reducingreducedkeyed.count_window(3) \.apply(SumWindowFunction(),Types.TUPLE([Types.STRING(), Types.INT()]))(B,3) (C,3) (D,3) (E,3) (E,3) Window Size为4 # reducingreducedkeyed.count_window(4) \.apply(SumWindowFunction(),Types.TUPLE([Types.STRING(), Types.INT()]))(C,4) (D,4) (E,4) Window Size为5 # reducingreducedkeyed.count_window(5) \.apply(SumWindowFunction(),Types.TUPLE([Types.STRING(), Types.INT()]))(D,5) (E,5) Window Size为6 # reducingreducedkeyed.count_window(6) \.apply(SumWindowFunction(),Types.TUPLE([Types.STRING(), Types.INT()]))(E,6) 完整代码 from typing import Iterablefrom pyflink.common import Types from pyflink.datastream import StreamExecutionEnvironment, RuntimeExecutionMode, WindowFunction from pyflink.datastream.window import CountWindowclass SumWindowFunction(WindowFunction[tuple, tuple, str, CountWindow]):def apply(self, key: str, window: CountWindow, inputs: Iterable[tuple]):return [(key, len([e for e in inputs]))]word_count_data [(A,2),(A,1),(B,3),(B,1),(B,2),(C,3),(C,1),(C,4),(C,2),(D,3),(D,1),(D,4),(D,2),(D,5),(E,3),(E,1),(E,4),(E,2),(E,6),(E,5)]def word_count():env StreamExecutionEnvironment.get_execution_environment()env.set_runtime_mode(RuntimeExecutionMode.STREAMING)# write all the data to one fileenv.set_parallelism(1)source_type_info Types.TUPLE([Types.STRING(), Types.INT()])# define the source# mappgingsource env.from_collection(word_count_data, source_type_info)# source.print()# keyingkeyedsource.key_by(lambda i: i[0]) # reducingreducedkeyed.count_window(2) \.apply(SumWindowFunction(),Types.TUPLE([Types.STRING(), Types.INT()]))# # define the sinkreduced.print()# submit for executionenv.execute()if __name__ __main__:word_count()参考资料 https://nightlies.apache.org/flink/flink-docs-release-1.18/zh/docs/learn-flink/streaming_analytics/
http://www.tj-hxxt.cn/news/142037.html

相关文章:

  • 敦化网站开发外贸网站建站赚钱
  • 网站后台发文章图片链接怎么做汕头市做网站优化
  • 深圳网站设计平台莆田市的网站建设公司
  • 网站运营思路网站备案备的是域名还是空间
  • 网站开发怎么做账北京网站建设模板下载
  • 济南网站建设_美叶网络西装定制
  • 注册公司网站的步骤人教版优化设计电子书
  • 外贸网站用什么空间好网站申请备案成功后怎么建设
  • 城乡与建设部网站信息化项目建设背景
  • 手机网站拒绝访问怎么解决网页设计的流程
  • 太原免费网站建设晋江网站建设联系电话
  • 网站建设讯美网络建设方案论文
  • 视频网站的服务器建设西宁做网站_君博先进
  • 给企业做网站怎么收钱杭州市做网站
  • 雷州手机网站建设建大型门户网站
  • 深圳龙华大浪做网站公司广告行业怎么找客户
  • asp商品网站源码做网站龙头
  • 做网站要写多少行代码100款不良网站进入窗口软件
  • 史志网站建设购物网站建设代码
  • 软件工程师证书报考网站wordpress 分类分页
  • 广州网站建设支付网站建设与管理的网页
  • 山东网站备案注销申请表wordpress 翻页函数
  • 黄金网站软件app大全视频网站改版中 模板
  • 网站建设前的功能怎么创建公司网站空间
  • 百度推广官方网站网盟推广
  • 网站后台内容编辑器和京东一样的网站
  • 友链对网站seo有帮助吗网站模板有哪些内容
  • 餐饮团购网站建设个人如何开发微信小程序
  • 网站建设视频万网常见的推广方式
  • 手机优化助手怎么删除邢台视频优化方案