网站设计学校,网站优化的关键词,微信app下载安卓版官方下载,中国计算机技术职业资格网Flink的执行模式有以下三种:
前提是我们已经开启了yarnsession的进程#xff0c;在下图中可以看到启动的id也就是后续任务需要通过此id进行认证#xff0c;以及任务分配的master主机。
这里启动时候会报错一个ERROR#xff1a;org.apache.flink.shaded.curator.org.apache…Flink的执行模式有以下三种:
前提是我们已经开启了yarnsession的进程在下图中可以看到启动的id也就是后续任务需要通过此id进行认证以及任务分配的master主机。
这里启动时候会报错一个ERRORorg.apache.flink.shaded.curator.org.apache.curator.ConnectionState - Authentication failed
查阅资料得知
该错误是因为kerberos认证失败cdh6并没有启动kerberos。所以该错误可以忽略。但是如果已经开启动了kerberos这个问题就要解决了。
我们这里没有开启Kerberos所以这个报错我么可以不管。 Session Mode会话模式
会话模式需要先启动一个集群保持一个会话在这个会话中通过客户端提交作业。集群启动时所有资源就都已经确定所有提交的作业会竞争集群中的资源。适合任务规模小执行时间短的大量作业。
Flink的作业执行环境会一直保留在集群上直到会话被显式终止。这样可以提交多个作业它们可以共享相同的集群资源和状态从而实现更高的效率和资源利用。
bin/flink run -yid application_1723708102500_0009 examples/batch/WordCount.jar
重要的是要添加 -yid 这个参数不添加这个参数会执行不成功会报错找不到执任务的cluster。
脚本执行参数
-n(--container)TaskManager的数量。(1.10 已经废弃)
-s(--slots)每个TaskManager的slot数量默认一个slot一个core默认每个taskmanager的slot的个数为1有时可以多一些taskmanager做冗余。
-jmJobManager的内存单位MB)。
-q显示可用的YARN资源内存内核;
-tm每个TaskManager容器的内存默认值MB
-nmyarn 的appName(现在yarn的ui上的名字)。
-d后台执行。
提交flink任务
bin/flink run examples/batch/WordCount.jar Per-Job Mode单作业模式我们也是更多的使用这种模式这个模式会将我们的资源更合理的规划使用。
每个Flink应用程序作为一个独立的作业被提交和执行。
每次提交的Flink应用程序都会创建一个独立的作业执行环境该作业执行环境仅用于执行该特定的作业。
作业完成后作业执行环境会被释放集群关闭资源释放
bin/flink run -m yarn-cluster ./examples/batch/WordCount.jar
常用参数
--p 程序默认并行度
下面的参数仅可用于 -m yarn-cluster 模式
--yjm JobManager可用内存单位兆
--ynm YARN程序的名称
--yq 查询YARN可用的资源
--yqu 指定YARN队列是哪一个
--ys 每个TM会有多少个Slot
--ytm 每个TM所在的Container可申请多少内存单位兆
--yD 动态指定Flink参数
-yd 分离模式后台运行不指定-yd, 终端会卡在提交的页面上 Application Mode应用模式
应用模式算是前2种模式的升级前2种模式中Flink程序代码是在客户端执行然后客户端提交给JobManager客户端需要占用大量网络带宽。
应用模式需要为每一个提交的应用单独启动一个JobManager应用程序在JobManager执行也就是创建一个集群。这个JobManager只为执行这一个应用而存在执行结束之后JobManager关闭。
application 模式使用 bin/flink run-application 提交作业通过 -t 指定部署环境目前 application 模式支持部署在 yarn 上(-t yarn-application) 和 k8s 上(-t kubernetes-application并支持通过 -D 参数指定通用的 运行配置比如 jobmanager/taskmanager 内存、checkpoint 时间间隔等。
带有 JM 和 TM 内存设置的命令提交这种方式提交之后会带对应服务器的HDFS的WebUI页面多出一个wordcount_01的文件该文件记录了程序运行的结果
./bin/flink run-application -t yarn-application \
-Djobmanager.memory.process.size1024m \
-Dtaskmanager.memory.process.size1024m \
-Dyarn.application.nameMyFlinkWordCount \
./examples/batch/WordCount.jar --output hdfs://ddp54:8020/wordcount_01
在上面例子 的基础上自己设置 TaskManager slots 个数为3以及指定并发数为3
./bin/flink run-application -t yarn-application -p 3 \
-Djobmanager.memory.process.size1024m \
-Dtaskmanager.memory.process.size1024m \
-Dyarn.application.nameMyFlinkWordCount \
-Dtaskmanager.numberOfTaskSlots3 \
./examples/batch/WordCount.jar --output hdfs://node1:8020/wordcount/output_52
指定并发还可以使用 -Dparallelism.default3而且社区目前倾向使用 -D通用配置代替客户端命令参数(比如 -p)。所以这样写更符合规范
./bin/flink run-application -t yarn-application \
-Dparallelism.default3 \
-Djobmanager.memory.process.size1024m \
-Dtaskmanager.memory.process.size1024m \
-Dyarn.application.nameMyFlinkWordCount \
-Dtaskmanager.numberOfTaskSlots3 \
./examples/batch/WordCount.jar --output hdfs://node1:8020/wordcount/output_53
以上三种模式就先简述这些其实还有很多参数没有用到我们更多的只需要用到第二种pre-job的模式即可。
Yarn-session模式开启成功后我们进入SQL-Client界面在这个界面我们可以写SQL来实现系统之间的交互我接下来以MySQL与Kafka的交互为例
首先是要在MySQL数据库创建一些库和表当作source数据源 CREATE TABLE src_mysql_order(order_id BIGINT,store_id BIGINT,sales_amt double,PRIMARY KEY (order_id)
);CREATE TABLE src_mysql_order_detail(order_id BIGINT,store_id BIGINT,goods_id BIGINT,sales_amt double,PRIMARY KEY (order_id,store_id,goods_id)
);CREATE TABLE dim_store(store_id BIGINT,store_name varchar(100),PRIMARY KEY (store_id)
);CREATE TABLE dim_goods(goods_id BIGINT,goods_name varchar(100),PRIMARY KEY (goods_id)
);CREATE TABLE dwa_mysql_order_analysis (store_id BIGINT,store_name varchar(100),sales_goods_distinct_nums bigint,sales_amt double,order_nums bigint,PRIMARY KEY (store_id,store_name)
); Source在MySQL中创建完成之后我们要在SQL client界面进行映射在这里以src_mysql_order表为例执行成功如以下界面 CREATE TABLE src_mysql_order( order_id BIGINT, store_id BIGINT, sales_amt double, PRIMARY KEY (order_id) NOT ENFORCED ) WITH ( connector mysql-cdc, hostname xxx, port 3306, username xxx, password xxx, database-name xxx, table-name xxx, scan.incremental.snapshot.enabled false ); Sink对MySQL做完source映射之后我们要将MySQL的数据导入到Kafka因此我们也要做一些Kafka表的映射执行成功界面如下 CREATE TABLE ods_kafka_order ( order_id BIGINT, store_id BIGINT, sales_amt double, PRIMARY KEY (order_id) NOT ENFORCED ) WITH ( connector upsert-kafka, topic Kafka主题, properties.bootstrap.servers Kafka集群的IP端口号, key.format json, value.format json ); 两张表都映射完成之后我们先在MySQL添加一些测试用例
insert into src_mysql_order values
(20221210001,10000,50),
(20221210002,10000,20),
(20221210003,10001,10); 接下来就将MySQL与Kafka实现交互即将MySQL数据插入到Kafka作业中
insert into ods_kafka_order_2 select * from src_mysql_order; 在这个过程中有可能会报错 这个报错是找不到表的元数据信息我这里是将表名写错了这个是比较庆幸的但是还有一种原因就是没有MySQLCDC或者Kafka的依赖导致连接的元数据信息无法保存到catalog中因此我们就需要添加MySQLCDC和Kafka的连接依赖
进入到Flink安装路径的lib目录下使用 rz 指令将依赖jar包上传上传完毕之后使用 scp 指令远程复制给集群的其它机器我们的是ddp54、ddp55
scp -r lib/flink-sql-connector-kafka-1.16.2.jar rootddp54:$PWD/lib
scp -r lib/flink-sql-connector-kafka-1.16.2.jar rootddp55:$PWD/lib Jar包上传完之后我们在基础平台将Flink集群重启
集群重启之后我们重新开启一个yarnsession进程来执行后续提交的任务。
进入yarn的web页面来查看进程启动的状况。 接下来我们重走一遍MySQL的source和Kafka的sink流程走完之后进入SQL client界面执行交互指令即MySQL数据插入到Kafka执行完成之后没有报错但是查看flink的web页面发现并没有作业在执行或执行完成于是查看日志得知问题是MySQL的系统时间跟所在地区时间不匹配导致的我们可以在命令行进行时区的设置也可以在配置文件中进行时区的设置我选择了在my.cnf配置文件中进行时区的更改在[mysqld]下添加默认时区设置即可与此同时MySQL也要开启binlog日志可以保障数据一致性主要用于复制和数据恢复。配置完成之后重启MySQL服务。
开启binlog日志 # 服务IDserver-id1# binlog 配置 只要配置了log_bin地址 就会开启log_bin /var/lib/mysql/mysql_bin# 日志存储天数 默认0 永久保存# 如果数据库会定期归档建议设置一个存储时间不需要一直存储binlog日志理论上只需要存储归档之后的日志expire_logs_days 30# binlog最大值max_binlog_size 1024M# 规定binlog的格式binlog有三种格式statement、row、mixad默认使用statement建议使用row格式binlog_format ROW# 在提交n次事务后进行binlog的落盘0为不进行强行的刷新操作而是由文件系统控制刷新日志文件如果是在线交易和账有关的数据建议设置成1如果是其他数据可以保持为0即可sync_binlog 1 查看日志得知是MySQL的时区问题导致任务提交不成功 在 my.cnf 对时区和binlog日志进行修改 上边的MySQL配置完成之后需要重启MySQL服务 docker restart mysql 接下来在SQL client界面再次执行指令
insert into ods_kafka_order select * from src_mysql_order;
打开Flink的web界面发现Flink的作业任务正在执行 我们在SQL client界面查询MySQL的数据表信息
SET sql-client.execution.result-modetableau;
select * from src_mysql_order;
可以查看插入到MySQL的数据信息和数据的更新信息[Flink中 I 代表插入数据 U 代表更新数据 -U代表撤回数据] 与此同时我们去Kafka查看数据是否到来通过Kafka Tool查看到数据已经成功到Kafka。 至此我们实现了MySQL到Kafka的实时数据的接入以及在这个过程中遇到的一些问题以及解决办法。