网站app怎么做,深圳关键词优化,app开发一般收费,网站建设重点目录
前提条件
基本准备
批处理API实现WordCount
流处理API实现WordCount
数据源是文件
数据源是socket文本流
打包
提交到集群运行
命令行提交作业
Web UI提交作业
上传代码到gitee 前提条件
Windows安装好jdk8、Maven3、IDEA
Linux安装好Flink集群#xff0c;可…目录
前提条件
基本准备
批处理API实现WordCount
流处理API实现WordCount
数据源是文件
数据源是socket文本流
打包
提交到集群运行
命令行提交作业
Web UI提交作业
上传代码到gitee 前提条件
Windows安装好jdk8、Maven3、IDEA
Linux安装好Flink集群可参考CentOS7安装flink1.17完全分布式
基本准备
创建项目
使用IDEA创建一个新的Maven项目项目名称例如flinkdemo 添加依赖
在项目的pom.xml文件中添加Flink的依赖。 propertiesflink.version1.17.1/flink.version/propertiesdependenciesdependencygroupIdorg.apache.flink/groupIdartifactIdflink-streaming-java/artifactIdversion${flink.version}/version/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-clients/artifactIdversion${flink.version}/version/dependency/dependencies 刷新依赖 刷新依赖后能看到相关依赖如下 刷新依赖过程需要等待一些时间来下载相关依赖。
如果依赖下载慢可以设置阿里云仓库镜像 1.设置maven的settings.xml 在/mirrors上面一行添加阿里云仓库镜像 mirroridalimaven/idnamealiyun maven/nameurlhttp://maven.aliyun.com/nexus/content/groups/public//urlmirrorOfcentral/mirrorOf /mirror 2.IDEA设置maven 数据准备
在工程的根目录下新建一个data文件夹 并在data文件夹下创建文本文件words.txt 内容如下
hello world
hello java
hello flink 新建包
右键src/main下的java新建Package 填写包名org.example包名与groupId的内容一致。 批处理API实现WordCount
在org.exmaple下新建wc包及BatchWordCount类 填写wc.BatchWordCount 效果如下 BatchWordCount.java代码如下
package org.example.wc;import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.AggregateOperator;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.operators.FlatMapOperator;
import org.apache.flink.api.java.operators.UnsortedGrouping;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;public class BatchWordCount {public static void main(String[] args) throws Exception {// 1. 创建执行环境ExecutionEnvironment env ExecutionEnvironment.getExecutionEnvironment();// 2. 从文件读取数据 按行读取DataSourceString lineDS env.readTextFile(data/words.txt);// 3. 转换数据格式FlatMapOperatorString, Tuple2String, Long wordAndOne lineDS.flatMap(new FlatMapFunctionString, Tuple2String, Long() {Overridepublic void flatMap(String line, CollectorTuple2String, Long out) throws Exception {String[] words line.split( );for (String word : words) {out.collect(Tuple2.of(word,1L));}}});// 4. 按照 word 进行分组UnsortedGroupingTuple2String, Long wordAndOneUG wordAndOne.groupBy(0);// 5. 分组内聚合统计AggregateOperatorTuple2String, Long sum wordAndOneUG.sum(1);// 6. 打印结果sum.print();}
}运行程序查看结果 注意以上代码的实现方式是基于DataSet API的是批处理API。而Flink本身是流批统一的处理架构批量的数据集本质上也是流没有必要用两套不同的API来实现。从Flink 1.12开始官方推荐直接使用DataStream API在提交任务时通过将执行模式设为BATCH来进行批处理
$ flink run -Dexecution.runtime-modeBATCH BatchWordCount.jar 流处理API实现WordCount
数据源是文件
在org.example.wc包下新建Java类StreamWordCount代码如下
package org.example.wc;import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;public class StreamWordCount {public static void main(String[] args) throws Exception {// 1. 创建流式执行环境StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();// 2. 读取文件DataStreamSourceString lineStream env.readTextFile(input/words.txt);// 3. 转换、分组、求和得到统计结果SingleOutputStreamOperatorTuple2String, Long sum lineStream.flatMap(new FlatMapFunctionString, Tuple2String, Long() {Overridepublic void flatMap(String line, CollectorTuple2String, Long out) throws Exception {String[] words line.split( );for (String word : words) {out.collect(Tuple2.of(word, 1L));}}}).keyBy(data - data.f0).sum(1);// 4. 打印sum.print();// 5. 执行env.execute();}
}运行结果 与批处理程序BatchWordCount的区别 创建执行环境的不同流处理程序使用的是StreamExecutionEnvironment。 转换处理之后得到的数据对象类型不同。 分组操作调用的是keyBy方法可以传入一个匿名函数作为键选择器KeySelector指定当前分组的key是什么。 代码末尾需要调用env的execute方法开始执行任务。 数据源是socket文本流
流处理的输入数据通常是流数据将StreamWordCount代码中读取文件数据的readTextFile方法替换成读取socket文本流的方法socketTextStream。
在org.example.wc包下新建Java类SocketStreamWordCount代码如下
package org.example.wc;import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;public class SocketStreamWordCount {public static void main(String[] args) throws Exception {// 1. 创建流式执行环境StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();// 2. 读取文本流node2表示发送端主机名根据实际情况修改、7777表示端口号DataStreamSourceString lineStream env.socketTextStream(node2, 7777);// 3. 转换、分组、求和得到统计结果SingleOutputStreamOperatorTuple2String, Long sum lineStream.flatMap((String line, CollectorTuple2String, Long out) - {String[] words line.split( );for (String word : words) {out.collect(Tuple2.of(word, 1L));}}).returns(Types.TUPLE(Types.STRING, Types.LONG)).keyBy(data - data.f0).sum(1);// 4. 打印sum.print();// 5. 执行env.execute();}
} 进入node2终端如果没有nc命令需要先安装nc命令安装nc命令如下
[hadoopnode2 ~]$ sudo yum install nc -y 开启nc监听
[hadoopnode2 ~]$ nc -lk 7777 IDEA中运行SocketStreamWordCount程序。 往7777端口发送数据例如发送hello world 控制台输出 继续往7777端口发送数据例如发送hello flink 控制台输出 停止SocketStreamWordCount程序。 按Ctrlc停止nc命令。 打包
这里的打包是将写好的程序打成jar包。
点击IDEA右侧的Maven按住Ctrl键同时选中clean和package第一次打包可以只选中package点击执行打包。 打包成功后看到如下输出信息生成的jar包在项目的target目录下 提交到集群运行
把jar包提交到flink集群运行有两种方式
1.通过命令行提交作业
2.通过Web UI提交作业 命令行提交作业
将jar包上传Linux 启动flink集群
[hadoopnode2 ~]$ start-cluster.sh
Starting cluster.
Starting standalonesession daemon on host node2.
Starting taskexecutor daemon on host node2.
Starting taskexecutor daemon on host node3.
Starting taskexecutor daemon on host node4.
开启nc监听
[hadoopnode2 ~]$ nc -lk 7777
命令提交作业
开启另一个node2终端使用flink run命令提交作业到flink集群
[hadoopnode2 ~]$ flink run -m node2:8081 -c org.example.wc.SocketStreamWordCount flinkdemo-1.0-SNAPSHOT.jar -m指定提交到的JobManager-c指定程序入口类。 发送测试数据
在nc监听终端往7777端口发送数据 查看结果
Web UI查看结果
浏览器访问
node2:8081
看到正在运行的作业如下 查看结果 继续发送测试数据
在nc终端继续发送数据 Web UI刷新结果 命令行查看结果
打开新的node2终端查看结果
[hadoopnode2 ~]$ cd $FLINK_HOME/log
[hadoopnode2 log]$ ls
flink-hadoop-client-node2.log flink-hadoop-standalonesession-0-node2.out
flink-hadoop-standalonesession-0-node2.log flink-hadoop-taskexecutor-0-node2.log
flink-hadoop-standalonesession-0-node2.log.1 flink-hadoop-taskexecutor-0-node2.log.1
flink-hadoop-standalonesession-0-node2.log.2 flink-hadoop-taskexecutor-0-node2.log.2
flink-hadoop-standalonesession-0-node2.log.3 flink-hadoop-taskexecutor-0-node2.log.3
flink-hadoop-standalonesession-0-node2.log.4 flink-hadoop-taskexecutor-0-node2.log.4
flink-hadoop-standalonesession-0-node2.log.5 flink-hadoop-taskexecutor-0-node2.out
[hadoopnode2 log]$ cat flink-hadoop-taskexecutor-0-node2.out
(hello,1)
(flink,1)
(hello,2)
(world,1)
取消flink作业 点击Cancel Job取消作业 停止nc监听
按Ctrlc停止nc命令 Web UI提交作业 开启nc监听
开启nc监听发送数据
[hadoopnode2 ~]$ nc -lk 7777 Web UI提交作业
浏览器访问
node2:8081 点击Submit New Job 点击Add New 选择flink作业jar包所在路径 点击jar包名称 填写相关内容点击Submit提交作业
Entry Class填写运行的主类例如org.example.wc.SocketStreamWordCount
Parallesim填写作业的并行度例如1 提交后在Running Jobs里看到运行的作业 发送测试数据
往7777端口发送数据 查看结果 继续发送测试数据 刷新结果 取消作业 停止nc监听
按住Ctrlc停止nc命令 关闭flink集群
[hadoopnode2 ~]$ stop-cluster.sh
Stopping taskexecutor daemon (pid: 2283) on host node2.
Stopping taskexecutor daemon (pid: 1827) on host node3.
Stopping taskexecutor daemon (pid: 1829) on host node4.
Stopping standalonesession daemon (pid: 1929) on host node2. 上传代码到gitee 登录gitee
https://gitee.com/
注意如果还没有gitee账号需要先注册如果之前没有设置过SSH公钥需要先设置SSH公钥。 创建仓库 提交代码
使用IDEA提交代码 提示有警告忽略警告继续提交 提交成功后IDEA显示如下 刷新浏览器查看gitee界面看到代码已上传成功 完成enjoy it!