北京智能网站建设系统加盟,北京seo优化诊断,青岛网站建设 推荐青岛博采网络,广西seo排名尚硅谷大数据技术-教程-学习路线-笔记汇总表【课程资料下载】视频地址#xff1a;尚硅谷大数据Flink1.17实战教程从入门到精通_哔哩哔哩_bilibili 尚硅谷大数据Flink1.17实战教程-笔记01【Flink 概述、Flink 快速上手】尚硅谷大数据Flink1.17实战教程-笔记02【Flink 部署】尚硅… 尚硅谷大数据技术-教程-学习路线-笔记汇总表【课程资料下载】视频地址尚硅谷大数据Flink1.17实战教程从入门到精通_哔哩哔哩_bilibili 尚硅谷大数据Flink1.17实战教程-笔记01【Flink 概述、Flink 快速上手】尚硅谷大数据Flink1.17实战教程-笔记02【Flink 部署】尚硅谷大数据Flink1.17实战教程-笔记03【Flink 运行时架构】尚硅谷大数据Flink1.17实战教程-笔记04【Flink DataStream API】尚硅谷大数据Flink1.17实战教程-笔记05【】尚硅谷大数据Flink1.17实战教程-笔记06【】尚硅谷大数据Flink1.17实战教程-笔记07【】尚硅谷大数据Flink1.17实战教程-笔记08【】 目录
基础篇
第05章-DataStream API
P033【033_DataStreamAPI_执行环境】24:22
P034【034_DataStreamAPI_源算子_准备工作】06:36
P035【035_DataStreamAPI_源算子_集合文件socket】14:40
P036【036_DataStreamAPI_源算子_从Kafka读取】19:50
P037【037_DataStreamAPI_源算子_数据生成器】14:09
P038【038_DataStreamAPI_Flink支持的数据类型】08:49
P039【039_DataStreamAPI_基本转换算子_map】11:48
P040【040_DataStreamAPI_基本转换算子_filterflatmap】12:45
P041【041_DataStreamAPI_聚合算子_keyby】18:00
P042【042_DataStreamAPI_聚合算子_简单聚合算子】11:53
P043【043_DataStreamAPI_聚合算子_规约聚合reduce】09:34
P044【44_DataStreamAPI_用户自定义函数】24:24
P045【45_DataStreamAPI_分区算子分区器】25:08
P046【46_DataStreamAPI_分区算子_自定义分区】06:41
P047【47_DataStreamAPI_分流_使用FIlter简单实现】08:50
P048【48_DataStreamAPI_分流_使用侧输出流】26:33
P049【49_DataStreamAPI_合流_union】06:37
P050【50_DataStreamAPI_合流_connect】15:44
P051【51_DataSrreamAPI_合流_connect案例】12:02 基础篇
第05章-DataStream API
P033【033_DataStreamAPI_执行环境】24:22 第5章 DataStream API DataStream API是Flink的核心层API。一个Flink程序其实就是对DataStream的各种转换。具体来说代码基本上都由以下几部分构成 5.1 执行环境Execution Environment Flink程序可以在各种上下文环境中运行我们可以在本地JVM中执行程序也可以提交到远程集群上运行。 不同的环境代码的提交运行的过程会有所不同。这就要求我们在提交作业执行计算时首先必须获取当前Flink的运行环境从而建立起与Flink框架之间的联系。 5.1.1 创建执行环境 1getExecutionEnvironment 最简单的方式就是直接调用getExecutionEnvironment方法。它会根据当前运行的上下文直接得到正确的结果如果程序是独立运行的就返回一个本地执行环境如果是创建了jar包然后从命令行调用它并提交到集群执行那么就返回集群的执行环境。也就是说这个方法会根据当前运行的方式自行决定该返回什么样的运行环境。 StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment(); 这种方式用起来简单高效是最常用的一种创建执行环境的方式。 2createLocalEnvironment 这个方法返回一个本地执行环境。可以在调用时传入一个参数指定默认的并行度如果不传入则默认并行度就是本地的CPU核心数。 StreamExecutionEnvironment localEnv StreamExecutionEnvironment.createLocalEnvironment(); 3createRemoteEnvironment 这个方法返回集群执行环境。需要在调用时指定JobManager的主机名和端口号并指定要在集群中运行的Jar包。 StreamExecutionEnvironment remoteEnv StreamExecutionEnvironment .createRemoteEnvironment( host, // JobManager主机名 1234, // JobManager进程端口号 path/to/jarFile.jar // 提交给JobManager的JAR包 ); 在获取到程序执行环境后我们还可以对执行环境进行灵活的设置。比如可以全局设置程序的并行度、禁用算子链还可以定义程序的时间语义、配置容错机制。 package com.atguigu.env;import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;/*** TODO** author* version 1.0*/
public class EnvDemo {public static void main(String[] args) throws Exception {Configuration conf new Configuration();conf.set(RestOptions.BIND_PORT, 8082);StreamExecutionEnvironment env StreamExecutionEnvironment//.getExecutionEnvironment(); // 自动识别是 远程集群 还是idea本地环境.getExecutionEnvironment(conf); // conf对象可以去修改一些参数//.createLocalEnvironment()//.createRemoteEnvironment(hadoop102, 8081,/xxx)// 流批一体代码api是同一套可以指定为 批也可以指定为 流// 默认 STREAMING// 一般不在代码写死提交时 参数指定-Dexecution.runtime-modeBATCHenv.setRuntimeMode(RuntimeExecutionMode.BATCH);env//.socketTextStream(hadoop102, 7777).readTextFile(input/word.txt).flatMap((String value, CollectorTuple2String, Integer out) - {String[] words value.split( );for (String word : words) {out.collect(Tuple2.of(word, 1));}}).returns(Types.TUPLE(Types.STRING, Types.INT)).keyBy(value - value.f0).sum(1).print();env.execute();//env.executeAsync();/** TODO 关于execute总结(了解)* 1、默认 env.execute()触发一个flink job* 一个main方法可以调用多个execute但是没意义指定到第一个就会阻塞住* 2、env.executeAsync()异步触发不阻塞* 一个main方法里 executeAsync()个数 生成的flink job数* 3、思考* yarn-application 集群提交一次集群里会有几个flink job* 》 取决于 调用了n个 executeAsync()* 》 对应 application集群里会有n个job* 》 对应 Jobmanager当中会有 n个 JobMaster*/}
}
P034【034_DataStreamAPI_源算子_准备工作】06:36 5.2 源算子Source 5.2.1 准备工作 package com.atguigu.bean;import java.util.Objects;/*** TODO** author* version 1.0*/
public class WaterSensor {public String id;public Long ts;public Integer vc;// 一定要提供一个 空参 的构造器public WaterSensor() {}public WaterSensor(String id, Long ts, Integer vc) {this.id id;this.ts ts;this.vc vc;}public String getId() {return id;}public void setId(String id) {this.id id;}public Long getTs() {return ts;}public void setTs(Long ts) {this.ts ts;}public Integer getVc() {return vc;}public void setVc(Integer vc) {this.vc vc;}Overridepublic String toString() {return WaterSensor{ id id \ , ts ts , vc vc };}Overridepublic boolean equals(Object o) {if (this o) {return true;}if (o null || getClass() ! o.getClass()) {return false;}WaterSensor that (WaterSensor) o;return Objects.equals(id, that.id) Objects.equals(ts, that.ts) Objects.equals(vc, that.vc);}Overridepublic int hashCode() {return Objects.hash(id, ts, vc);}
}
P035【035_DataStreamAPI_源算子_集合文件socket】14:40 5.2.2 从集合中读取数据 package com.atguigu.source;import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import java.util.Arrays;/*** TODO** author* version 1.0*/
public class CollectionDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();// TODO 从集合读取数据DataStreamSourceInteger source env.fromElements(1, 2, 33); // 从元素读//.fromCollection(Arrays.asList(1, 22, 3)); // 从集合读source.print();env.execute();}
}
package com.atguigu.source;import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.connector.file.src.FileSource;
import org.apache.flink.connector.file.src.reader.TextLineInputFormat;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;/*** TODO** author* version 1.0*/
public class FileSourceDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);// TODO 从文件读 新Source架构FileSourceString fileSource FileSource.forRecordStreamFormat(new TextLineInputFormat(),new Path(input/word.txt)).build();env.fromSource(fileSource, WatermarkStrategy.noWatermarks(), filesource).print();env.execute();}
}
/*** 新的Source写法* env.fromSource(Source的实现类Watermark名字)*/
P036【036_DataStreamAPI_源算子_从Kafka读取】19:50 5.2.5 从Kafka读取数据 package com.atguigu.source;import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.file.src.FileSource;
import org.apache.flink.connector.file.src.reader.TextLineInputFormat;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import java.time.Duration;/*** TODO** author* version 1.0*/
public class KafkaSourceDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);// TODO 从Kafka读新Source架构KafkaSourceString kafkaSource KafkaSource.Stringbuilder().setBootstrapServers(hadoop102:9092,hadoop103:9092,hadoop104:9092) // 指定kafka节点的地址和端口.setGroupId(atguigu) // 指定消费者组的id.setTopics(topic_1) // 指定消费的 Topic.setValueOnlyDeserializer(new SimpleStringSchema()) // 指定 反序列化器这个是反序列化value.setStartingOffsets(OffsetsInitializer.latest()) // flink消费kafka的策略.build();env//.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), kafkasource).fromSource(kafkaSource, WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(3)), kafkasource).print();env.execute();}
}
/*** kafka消费者的参数* auto.reset.offsets* earliest: 如果有offset从offset继续消费; 如果没有offset从 最早 消费* latest : 如果有offset从offset继续消费; 如果没有offset从 最新 消费** flink的kafkasourceoffset消费策略OffsetsInitializer默认是 earliest* earliest: 一定从 最早 消费* latest : 一定从 最新 消费*/
P037【037_DataStreamAPI_源算子_数据生成器】14:09 5.2.6 从数据生成器读取数据 package com.atguigu.source;import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy;
import org.apache.flink.connector.datagen.source.DataGeneratorSource;
import org.apache.flink.connector.datagen.source.GeneratorFunction;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;/*** TODO** author* version 1.0*/
public class DataGeneratorDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();// 如果有n个并行度最大值设为a// 将数值 均分成 n份a/n ,比如最大100并行度2每个并行度生成50个// 其中一个是 0-49另一个50-99env.setParallelism(2);/*** 数据生成器Source四个参数* 第一个GeneratorFunction接口需要实现重写map方法输入类型固定是Long* 第二个long类型自动生成的数字序列从0自增的最大值(小于)达到这个值就停止了* 第三个限速策略比如 每秒生成几条数据* 第四个返回的类型*/DataGeneratorSourceString dataGeneratorSource new DataGeneratorSource(new GeneratorFunctionLong, String() {Overridepublic String map(Long value) throws Exception {return Number: value;}},100,RateLimiterStrategy.perSecond(1),Types.STRING);env.fromSource(dataGeneratorSource, WatermarkStrategy.noWatermarks(), data-generator).print();env.execute();}
}
P038【038_DataStreamAPI_Flink支持的数据类型】08:49 P039【039_DataStreamAPI_基本转换算子_map】11:48 P040【040_DataStreamAPI_基本转换算子_filterflatmap】12:45 P041【041_DataStreamAPI_聚合算子_keyby】18:00 P042【042_DataStreamAPI_聚合算子_简单聚合算子】11:53 P043【043_DataStreamAPI_聚合算子_规约聚合reduce】09:34 P044【44_DataStreamAPI_用户自定义函数】24:24 P045【45_DataStreamAPI_分区算子分区器】25:08 P046【46_DataStreamAPI_分区算子_自定义分区】06:41 P047【47_DataStreamAPI_分流_使用FIlter简单实现】08:50 P048【48_DataStreamAPI_分流_使用侧输出流】26:33 P049【49_DataStreamAPI_合流_union】06:37 P050【50_DataStreamAPI_合流_connect】15:44 P051【51_DataSrreamAPI_合流_connect案例】12:02