当前位置: 首页 > news >正文

移动网站建设学习注册商标查询网

移动网站建设学习,注册商标查询网,电商公司的网上设计,系统开发板前言 今天开始学习 DataStream 的 API #xff0c;这一块是 Flink 的核心部分#xff0c;我们不去学习 DataSet 的 API 了#xff0c;因为从 Flink 12 开始已经实现了流批一体#xff0c; DataSet 已然是被抛弃了。忘记提了#xff0c;从这里开始#xff0c;我开始换用 F…前言 今天开始学习 DataStream 的 API 这一块是 Flink 的核心部分我们不去学习 DataSet 的 API 了因为从 Flink 12 开始已经实现了流批一体 DataSet 已然是被抛弃了。忘记提了从这里开始我开始换用 Flink 17 了。 一个 Flink 程序其实就是对 DataStream 的各种转换。具体来说代码基本上都由以下几部分构成 获取执行环境execution environment 读取数据源source 定义基于数据的转换操作transformations 定义计算结果的输出位置sink 触发程序执行execute 其中获取环境和触发执行都可以认为是针对执行环境的操作。 1、执行环境Execution Environment 不同的环境代码的提交运行的过程会有所不同。这就要求我们在提交作业执行计算时首先必须获取当前 Flink 的运行环境从而建立起与 Flink 框架之间的联系。只有获取了环境上下文信息才能将具体的任务调度到不同的 TaskManager 执行。 1.1、创建执行环境 1、getExecutionEnvironment StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment(); 这是最简单高效的一种方式了它可以自己根据环境的信息去判断。 我们也可以给它传递一个 Configuration 对象作为参数这样我们可以设置运行时的一些配置比如端口号等。 Configuration conf new Configuration();conf.set(RestOptions.BIND_PORT,8082);StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment(conf);这里我们设置端口号为 8082 这样我们在默认的 8081 端口就无法访问 Web UI 了只能通过 8082 端口来访问。  2、createLocalEnvironment 这种方式了解即可它是用来创建一个本地的模拟集群环境。 StreamExecutionEnvironment env StreamExecutionEnvironment.createLocalEnvironment(); 3、createRemoteEnvironment 这种方式同样了解即可因为配置起来比较繁琐我们既然是在集群下运行了一般都是把代码打包成 jar 去执行不会把配置信息写死的。 StreamExecutionEnvironment.createRemoteEnvironment(hadoop102,8081,/opt/module/xxx.jar); 1.2、执行模式(Execution Mode) 默认的执行模式就是 Streaming 模式。 1、batch 模式 env.setRuntimeMode(RuntimeExecutionMode.BATCH); 2、streaming 模式 env.setRuntimeMode(RuntimeExecutionMode.STREAMING);3、自动模式 前两种方式都过于死板打包后的程序都不能修改所以我们一般不明确指定执行模式到底是 流处理 还是 批处理而是执行时通过命令行来配置 bin/flink run -Dexecution.runtime-modeBATCH ... 1.3、触发程序执行 默认执行方式 Flink 是事件驱动型的只有等到数据到来才会触发真正的计算这也被称为“延迟执行”或“懒执行”lazy execution。所以我们需要显式地调用执行环境的 execute()方法来触发程序执行。execute()方法将一直等待作业完成然后返回一个执行结果JobExecutionResult但是这个返回对象我们一般不怎么用而且这个返回结果在程序运行完才会返回。 默认 env.execute() 触发生成一个 Flink Job。 env.execute(); 异步执行方式 极少情况下可能我们一套代码中有两部分处理逻辑比如 env.execute() 之后又进行了一些操作然后再进行 execute() 但在 main 线程中是会阻塞的这就需要启动一个异步的 execute() 方法。 executeAsync() 会触发执行多个 Flink Job。 env.execute();// 其他处理代码...env.executeAsync(); 2、源算子Source 2.1、准备工作 写一个 Java Bean注意类的属性序列化问题这里我们的属性都是一些基本类型Flink 是支持对它进行序列化的Flink 会把这样的类作为一种特殊的 POJO 数据类型来对待方便数据的解析和序列化。 import java.util.Objects;public class WaterSensor {public String id;public Long ts;public Integer vc;public WaterSensor(){}public WaterSensor(String id, Long ts, Integer vc) {this.id id;this.ts ts;this.vc vc;}Overridepublic boolean equals(Object o) {if (this o) return true;if (o null || getClass() ! o.getClass()) return false;WaterSensor that (WaterSensor) o;return Objects.equals(id, that.id) Objects.equals(ts, that.ts) Objects.equals(vc, that.vc);}Overridepublic int hashCode() {return Objects.hash(id, ts, vc);}Overridepublic String toString() {return WaterSensor{ id id \ , ts ts , vc vc };}public String getId() {return id;}public void setId(String id) {this.id id;}public Long getTs() {return ts;}public void setTs(Long ts) {this.ts ts;}public Integer getVc() {return vc;}public void setVc(Integer vc) {this.vc vc;} }2.2、从集合中读取 和 Spark 一样集合类型我们一般只在测试的时候使用。 主要方法就是 fromCollection 或者 fromElements 。 import com.lyh.bean.WaterSensor; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import java.util.Arrays;public class CollectionDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();WaterSensor sensor1 new WaterSensor(1,1L,1);WaterSensor sensor2 new WaterSensor(2,2L,2);// 从集合读取数据DataStreamSourceWaterSensor source env // .fromElements(sensor1,sensor2); //直接填写元素.fromCollection(Arrays.asList(sensor1,sensor2)); // 从集合读取数据source.print();env.execute();} }2.3、从文件中读取 读取文件需要添加文件连接器依赖 dependencygroupIdorg.apache.flink/groupIdartifactIdflink-connector-files/artifactIdversion${flink.version}/version/dependency 新的 Source 读取语法 env.fromSource(Source的实现类Watermarksource名称) 示例  import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.connector.file.src.FileSource; import org.apache.flink.connector.file.src.reader.TextLineInputFormat; import org.apache.flink.core.fs.Path; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class FileSourceDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);// 从文件中读取FileSourceString fileSource FileSource.forRecordStreamFormat(new TextLineInputFormat(),new Path(input/words.txt)).build();env.fromSource(fileSource, WatermarkStrategy.noWatermarks(),fileSource).print();env.execute();} }2.4、从 Socket 读取数据 这种方式同样常用于模拟流数据稳定性较差通常用来测试。 DataStreamString stream env.socketTextStream(localhost, 9999);2.5、从 Kafka 读取数据 实际开发也是用 Kafka 来读取的我们的实时流数据都是由 Kafka 来做收集和传输的。 导入依赖 dependencygroupIdorg.apache.flink/groupIdartifactIdflink-connector-kafka/artifactIdversion${flink.version}/version /dependency案例 package com.lyh.source;import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.connector.kafka.source.KafkaSource; import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class KafkaSourceDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);// 从 Kafka 读取KafkaSourceString kafkaSource KafkaSource.Stringbuilder().setBootstrapServers(hadoop102:9092,hadoop103:9092,hadoop104:9092) //指定kafka地址和端口.setGroupId(lyh) // 指定消费者组id.setTopics(like) // 指定消费的topic,可以是多个用ListString.setValueOnlyDeserializer(new SimpleStringSchema()) // 指定反序列化器 因为kafka是生产者 flink作为消费者要反序列化.setStartingOffsets(OffsetsInitializer.latest()) // 指定flink消费kafka的策略.build();env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(),kafkaSource).print();env.execute();}/** kafka 消费者的参数: * auto.reset.offsets:* earliest: 如果有offset,从offset继续消费;如果没有 就从 最早 消费* latest : 如果有offset,从offset继续消费;如果没有 就从 最新 消费* flink 的 kafkaSource offset消费者策略: offsetsInitializer,默认是 earliest* earliest: 一定从 最早 消费 (不管有没有offset) * latest : 一定从 最新 消费 (不管有没有offset)*/ }启动 Kafka 集群需要先启动 zookeeper 使用命令行生产者生产消息 kafka-console-producer.sh --broker-list hadoop102:9092 --topic like 2.6、从数据生成器读取数据 导入依赖 dependencygroupIdorg.apache.flink/groupIdartifactIdflink-connector-datagen/artifactIdversion${flink.version}/version/dependency 案例 import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy; import org.apache.flink.connector.datagen.source.DataGeneratorSource; import org.apache.flink.connector.datagen.source.GeneratorFunction; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.source.datagen.DataGenerator;public class DataGeneratorDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);/*** 数据生成器参数说明:* 1. GeneratorFunction接口,需要重写 map 方法,输入类型必须是Long* 2. Long类型, 自动生成的数字序列从0自增的最大值* 3. 限速策略, 比如每秒生成几条数据* 4. 返回的数据类型*/DataGeneratorSourceString dataGeneratorSource new DataGeneratorSource(new GeneratorFunctionLong, String() {Overridepublic String map(Long value) throws Exception {return number: value;}},10L,RateLimiterStrategy.perSecond(1),Types.STRING);env.fromSource(dataGeneratorSource, WatermarkStrategy.noWatermarks(),dataGenerator).print();env.execute();} }运行效果 number: 0 number: 1 number: 2 number: 3 number: 4 number: 5 number: 6 number: 7 number: 8 number: 9Process finished with exit code 0如果想达到无界流的效果直接给数据生成器的第二个参数传一个 Long.MAX_VALUE。 假如我们的第二个参数设置为100意味着从0自增到99。如果并行度为3那么第二个线程将从100的1/3处即34开始累加第三个线程将从100的2/3即67开始累加。 Flink 支持的数据类型 这里主要说泛型类型和类型提示别的类型比如我们基本的数据类型及其包装类型和String引用类型、基本类型数组、对象数组、复合数据类型Flink 内置的 Tuple0~Tuple25辅助类型Option、Either、List、Map等还有 POJO 类型Flink 的 TypeInfomation 类型都已经为我们封装好了它为每个数据类型生成了特定的序列化、反序列化器和比较器。 泛型 Flink 支持所有的 Java 类和 Scala 类。但如果没有按照 POJO 类型的要求来定义就会被 Flink 当作泛型类来处理。Flink 会把泛型类型当作黑盒无法获取它们内部的属性它们也不是由 Flink 本身序列化的而是由 Kryo 序列化的。 在这些类型中元组类型和 POJO 类型最为灵活因为它们支持创建复杂类型。而相比之下POJO 还支持在键key的定义中直接使用字段名这会让我们的代码可读性大大增加。所以在项目实践中往往会将流处理程序中的元素类型定为 Flink 的 POJO 类型。 Flink 对 POJO 类型的要求如下 ⚫ 类是公共的public和独立的standalone也就是说没有非静态的内部类 ⚫ 类有一个公共的无参构造方法 ⚫ 类中的所有字段是 public 且非 final 的或者有一个公共的 getter 和 setter 方法这些方法需要符合 Java bean 的命名规范。所以我们上面的 WaterSensor就是我们创建的符合 Flink POJO 定义的数据类型。 类型提示Type Hints Flink 还具有一个类型提取系统可以分析函数的输入和返回类型自动获取类型信息从而获得对应的序列化器和反序列化器。但是由于 Java 中泛型擦除的存在在某些特殊情况下比如 Lambda 表达式中自动提取的信息是不够精细的它是不可靠的这时就需要显式地提供类型信息才能使应用程序正常工作或提高其性能。 为了解决这类问题Java API 提供了专门的“类型提示”type hints。回忆一下之前的 word count 流处理程序我们在将 String 类型的每个词转换成wordcount二元组后就明确地用 returns 指定了返回的类型。因为对于 map 里传入的 Lambda 表达式系统只能推断出返回的是 Tuple2 类型而无法得到 Tuple2String, Long。只有显式地告诉系统当前的返回类型才能正确地解析出完整数据。 下面给出两种写法 DataStreamSourceString lineDS env.socketTextStream(hadoop102,9999);// 3. flatMap 打散数据 返回元组SingleOutputStreamOperatorTuple2String, Long wordAndOne lineDS.flatMap((String line, CollectorTuple2String, Long out) - {String[] words line.split( );for (String word : words) {out.collect(Tuple2.of(word, 1L));}})//.returns(Types.TUPLE(Types.STRING, Types.LONG));.returns(new TypeHintTuple2String, Long() {}); //也可以这样写这是一种比较简单的场景二元组的两个元素都是基本数据类型。那如果元组中的一个元素又有泛型该怎么处理呢 Flink 专门提供了 TypeHint 类它可以捕获泛型的类型信息并且一直记录下来为运行时提供足够的信息。我们同样可以通过.returns()方法明确地指定转换之后的 DataStream 里元素的类型。 returns(new TypeHintTuple2Integer, SomeType(){})
文章转载自:
http://www.morning.nfzzf.cn.gov.cn.nfzzf.cn
http://www.morning.bgqqr.cn.gov.cn.bgqqr.cn
http://www.morning.jmmz.cn.gov.cn.jmmz.cn
http://www.morning.wlnr.cn.gov.cn.wlnr.cn
http://www.morning.qfwfj.cn.gov.cn.qfwfj.cn
http://www.morning.jhqcr.cn.gov.cn.jhqcr.cn
http://www.morning.yaqi6.com.gov.cn.yaqi6.com
http://www.morning.zwgrf.cn.gov.cn.zwgrf.cn
http://www.morning.wdhhz.cn.gov.cn.wdhhz.cn
http://www.morning.mxhys.cn.gov.cn.mxhys.cn
http://www.morning.pwbps.cn.gov.cn.pwbps.cn
http://www.morning.wjrq.cn.gov.cn.wjrq.cn
http://www.morning.ljzqb.cn.gov.cn.ljzqb.cn
http://www.morning.fhyhr.cn.gov.cn.fhyhr.cn
http://www.morning.mwns.cn.gov.cn.mwns.cn
http://www.morning.ftzll.cn.gov.cn.ftzll.cn
http://www.morning.xfyjn.cn.gov.cn.xfyjn.cn
http://www.morning.tnhqr.cn.gov.cn.tnhqr.cn
http://www.morning.dxhdn.cn.gov.cn.dxhdn.cn
http://www.morning.xtkw.cn.gov.cn.xtkw.cn
http://www.morning.lbbrw.cn.gov.cn.lbbrw.cn
http://www.morning.nstml.cn.gov.cn.nstml.cn
http://www.morning.srndk.cn.gov.cn.srndk.cn
http://www.morning.pzrrq.cn.gov.cn.pzrrq.cn
http://www.morning.rxkq.cn.gov.cn.rxkq.cn
http://www.morning.ayftwl.cn.gov.cn.ayftwl.cn
http://www.morning.mlckd.cn.gov.cn.mlckd.cn
http://www.morning.nqbkb.cn.gov.cn.nqbkb.cn
http://www.morning.gdpai.com.cn.gov.cn.gdpai.com.cn
http://www.morning.mstrb.cn.gov.cn.mstrb.cn
http://www.morning.gqmhq.cn.gov.cn.gqmhq.cn
http://www.morning.dzrcj.cn.gov.cn.dzrcj.cn
http://www.morning.lkkgq.cn.gov.cn.lkkgq.cn
http://www.morning.wyjpt.cn.gov.cn.wyjpt.cn
http://www.morning.rzczl.cn.gov.cn.rzczl.cn
http://www.morning.ptqbt.cn.gov.cn.ptqbt.cn
http://www.morning.pxbrg.cn.gov.cn.pxbrg.cn
http://www.morning.wfkbk.cn.gov.cn.wfkbk.cn
http://www.morning.zsyqg.cn.gov.cn.zsyqg.cn
http://www.morning.nzkkh.cn.gov.cn.nzkkh.cn
http://www.morning.zkgpg.cn.gov.cn.zkgpg.cn
http://www.morning.nwrzf.cn.gov.cn.nwrzf.cn
http://www.morning.gygfx.cn.gov.cn.gygfx.cn
http://www.morning.ftcrt.cn.gov.cn.ftcrt.cn
http://www.morning.nzhzt.cn.gov.cn.nzhzt.cn
http://www.morning.gjmll.cn.gov.cn.gjmll.cn
http://www.morning.lffgs.cn.gov.cn.lffgs.cn
http://www.morning.rnxs.cn.gov.cn.rnxs.cn
http://www.morning.lfpzs.cn.gov.cn.lfpzs.cn
http://www.morning.bgxgq.cn.gov.cn.bgxgq.cn
http://www.morning.dhyqg.cn.gov.cn.dhyqg.cn
http://www.morning.wqfj.cn.gov.cn.wqfj.cn
http://www.morning.rqxch.cn.gov.cn.rqxch.cn
http://www.morning.bzbq.cn.gov.cn.bzbq.cn
http://www.morning.czgtt.cn.gov.cn.czgtt.cn
http://www.morning.ltpph.cn.gov.cn.ltpph.cn
http://www.morning.cgbgc.cn.gov.cn.cgbgc.cn
http://www.morning.gtkyr.cn.gov.cn.gtkyr.cn
http://www.morning.mwqbp.cn.gov.cn.mwqbp.cn
http://www.morning.qpntn.cn.gov.cn.qpntn.cn
http://www.morning.qddtd.cn.gov.cn.qddtd.cn
http://www.morning.rnht.cn.gov.cn.rnht.cn
http://www.morning.qrgfw.cn.gov.cn.qrgfw.cn
http://www.morning.ljngm.cn.gov.cn.ljngm.cn
http://www.morning.rnxs.cn.gov.cn.rnxs.cn
http://www.morning.bangaw.cn.gov.cn.bangaw.cn
http://www.morning.kghss.cn.gov.cn.kghss.cn
http://www.morning.sxwfx.cn.gov.cn.sxwfx.cn
http://www.morning.skrww.cn.gov.cn.skrww.cn
http://www.morning.mgtrc.cn.gov.cn.mgtrc.cn
http://www.morning.nlgnk.cn.gov.cn.nlgnk.cn
http://www.morning.ysllp.cn.gov.cn.ysllp.cn
http://www.morning.rxnxl.cn.gov.cn.rxnxl.cn
http://www.morning.nxfwf.cn.gov.cn.nxfwf.cn
http://www.morning.bwqcx.cn.gov.cn.bwqcx.cn
http://www.morning.iuibhkd.cn.gov.cn.iuibhkd.cn
http://www.morning.nbfkk.cn.gov.cn.nbfkk.cn
http://www.morning.kjgrg.cn.gov.cn.kjgrg.cn
http://www.morning.ryqsq.cn.gov.cn.ryqsq.cn
http://www.morning.fosfox.com.gov.cn.fosfox.com
http://www.tj-hxxt.cn/news/262322.html

相关文章:

  • 网站建设公司响应式网站模板下载基于php的图书管理系统论文
  • 宿州市网站建设有哪些公司合肥市住房建设局网站
  • 关于网站建设项目收取费用网络系统设计与管理
  • 授权网站系统wordpress首页404
  • 网站开发找谁广州安全教育平台软件
  • 电影手机网站建设蜘蛛网站长工作职责
  • 电子商务网站建设项目规划书网站地图链接怎么做
  • 做一个公司网站价格重庆黄埔建设集团网站
  • 龙岗网站建设公司效果江西南昌网络公司
  • 外国网站上做雅思考试建筑有限公司
  • 安徽商会网站建设方案58企业网站怎么做
  • 仓库管理erp自学视频太原整站优化
  • 做网站工作的怎么填职务小程序价格表一览表
  • 余杭建设局网站为审核资质帮别人做的网站
  • 应聘网站优化的简历怎么做wordpress企业主题 下载
  • 搭建微信网站怎么做太原建站模板大全
  • 如何用凡科建设手机教学网站门户网站开发需求分析报告
  • 中小企业网站建设客户需求调查问卷凡科做的网站怎么打不开了
  • 制作博客网站深圳定制展会
  • 做k线图网站做网站外包工作怎么样
  • 常州网站制作工具创建公司为什么必须三个人
  • 销售网站的技巧想做网站多少钱
  • 哈尔滨网站建设公司名字番禺网站制作费用
  • 网站板块的策划方案邢台发广告的平台有哪些
  • 书店如何做网站网站使用了seo优化工具怎么检测
  • 专门帮做ppt的网站吗wordpress 架站
  • 静态网站首页更新菏泽做公司简介网站
  • 国内自动化网站建设个人网站免费制作
  • 南京哪家网站建设好苏州知名网站制作
  • 濮阳网站推广3g开发网站