做网站什么是三网合一,seo优化要做什么,小熊代刷推广网站,重庆市建设工程人力资源网文章目录 一、业务中常见的需要数据同步的场景CDC是什么FlinkCDC是什么CDC原理为什么是FlinkCDC业务场景flink cdc对应flink的版本 二、模拟案例1.阿里云flink sql2.开源flink sql(单机模式)flink 安装安装mysql3.flink datastream 三、总结 提示#xff1a;以下是本篇文章正文… 文章目录 一、业务中常见的需要数据同步的场景CDC是什么FlinkCDC是什么CDC原理为什么是FlinkCDC业务场景flink cdc对应flink的版本 二、模拟案例1.阿里云flink sql2.开源flink sql(单机模式)flink 安装安装mysql3.flink datastream 三、总结 提示以下是本篇文章正文内容下面案例可供参考 
一、业务中常见的需要数据同步的场景 
1、多个库的表合并到一张表。不同的业务线或者微服务在不同的数据库里开发但是此时有些报表需要将多个库的类似的数据合并后做查询统计。或者某些历史原因类似刚开始的商业模式不清晰导致一些业务线分分合合。或者某些边缘业务逐步融合到了主业务。早起的数据是分开的业务运营也是分开后来又合并成了一个大块业务。 
2、某个数据需要写到多个存储中。业务数据需要写入到多个中间件或者存储中比如业务的数据存储再Mysql的数据中后来为了方便检索需要写入到ES或者为了缓存需要写入到Redis或者是Mysql分表的数据合并写入到Doris中。 
3、数据仓库的场景。比如将表里的数据实时写入到DWS数据仓库的宽表中。 
4、应急场景。如果不采专用CDC的方案那么要达到实时查询的效果只能在BFF层的代码调用多个中心层的查询API然后再BFF层做各种聚合运算。这种方式开发效率低下万一有的中心层没有提供合适的查询API临时开发的话会让开发进度不可控。 
总之不管是数据多写、还是多表合并、还是建立数据仓库都属于数据同步任务。 
示例pandas 是基于NumPy 的一种工具该工具是为了解决数据分析任务而创建的。 
CDC是什么 
CDC 是变更数据捕获Change Data Capture技术的缩写它可以将源数据库Source的增量变动记录同步到一个或多个数据目的Sink。在同步过程中还可以对数据进行一定的处理例如过滤、关联、分组、统计等。 
目前专业做数据库事件接受和解析的中间件是Debezium如果是捕获Mysql还有Canal。 
FlinkCDC是什么 
官网地址官网FlinkCDC 
官方定义This project provides a set of source connectors for Apache Flink® directly ingesting changes coming from different databases using Change Data Capture(CDC)。根据FlinkCDC官方给出的定义FlinkCDC提供一组源数据的连接器使用变更数据捕获的方式直接吸收来自不同数据库的变更数据。 
CDC原理 
CDC的原理是当数据源表发生变动时会通过附加在表上的触发器或者 binlog 等途径将操作记录下来。下游可以通过数据库底层的协议订阅并消费这些事件然后对数据库变动记录做重放从而实现同步。这种方式的优点是实时性高可以精确捕捉上游的各种变动。 
为什么是FlinkCDC 
1、FlinkCDC 提供了对 Debezium 连接器的封装和集成简化了配置和使用的过程并提供了更高级的 API 和功能例如数据格式转换、事件时间处理等。Flink CDC 使用 Debezium 连接器作为底层的实现将其与 Flink 的数据处理能力结合起来。通过配置和使用 Flink CDC您可以轻松地将数据库中的变化数据流转化为 Flink 的 DataStream 或 Table并进行实时的数据处理、转换和分析。 
2、Flink的DataStream和SQL比较成熟和易用 
3、Flink支持状态后端State Backends允许存储海量的数据状态 
4、Flink有更好的生态更多的Source和Sink的支持 
业务场景 
数据合并流向 数据多写流向 单数据源写单表流向: 数据链路对比 通过下图我们可以看到Canal处理数据的链路比FlinkCDC更长数据链路一旦变长意味着出错的可能性更高。   
flink cdc对应flink的版本 二、模拟案例 
1.阿里云flink sql 
验证mysql开启binlog  flink sql 定义binlog源数据拿到数据处理(业务逻辑)再写表后面很简单了  
2.开源flink sql(单机模式) 背景:win10电脑安装vmware(虚拟化)软件虚拟机中安装 linux节点一个flinkmysql(yum默认安装的最新版本:8.0.37)java环境(此处安装java环境略网上有) flink 安装 
###在linux下载flink包
[rootslave2 ~]# wget https://dlcdn.apache.org/flink/flink-1.18.1/flink-1.18.1-bin-scala_2.12.tgz
### 加压包到当前目录下
[rootslave2 ~]# tar zxvf flink-1.16.3-bin-scala_2.12.tgz 
[rootslave2 ~]# cd flink-1.16.3
[rootslave2 lib]# wget https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-mysql-cdc/2.4.0/flink-sql-connector-mysql-cdc-2.4.0.jar修改flink 文件
[rootslave2 flink-1.16.3]# cat conf/flink-conf.yaml 修改内容如下(如果不修改则win10本地电脑无法访问flink web UI这里浪费很多时间):
taskmanager.host: localhost
rest.bind-address: 0.0.0.0###启动flink
[rootslave2 flink-1.16.3]# bin/start-cluster.sh
###启动客户端
rootslave2 flink-1.16.3]# ./bin/sql-client.sh embedded 安装mysql 
安装mysql遇到很多小问题 [rootslave2 ~]# yum -y install mysql-community-server  “MySQL 8.0 Community Server” 的 GPG 密钥已安装但是不适用于此软件包。请检查源的公钥 URL 是否配置正确。 失败的软件包是mysql-community-libs-8.0.37-1.el7.x86_64 ###用于跳过GPG签名检查 可以安装成功
[rootslave2 ~]# yum -y install mysql-server --nogpgcheck   
###验证myqsl是否可用
[rootslave2 ~]# systemctl start mysqld
[rootslave2 ~]# mysql -u root -p
Enter password: 
Welcome to the MySQL monitor.  Commands end with ; or \g.
Your MySQL connection id is 63
Server version: 8.0.37 MySQL Community Server - GPL
Copyright (c) 2000, 2024, Oracle and/or its affiliates.
Type help; or \h for help. Type \c to clear the current input statement.
mysql ### 配置开启binlog
[rootslave2 ~]# vim /etc/my.cnf
log_binmysql_bin 
binlog-formatRow 
server-id1   
###重启mysql
[rootslave2 ~]# systemctl restart mysqld
###重新登陆mysql并查看binlog开启情况
mysql SHOW VARIABLES LIKE log_bin;
----------------------
| Variable_name | Value |
----------------------
| log_bin       | ON    |
----------------------
1 row in set (0.01 sec)mysql创建新用户等 
###创建新用户时发现一直报错。查看mysql建密码时要求比较高(新版本mysql对密码要求高)
mysql SHOW VARIABLES LIKE validate_password%;
---------------------------------------------------------
| Variable_name                                   | Value  |
---------------------------------------------------------
| validate_password.changed_characters_percentage | 0      |
| validate_password.check_user_name               | ON     |
| validate_password.dictionary_file               |        |
| validate_password.length                        | 10     |
| validate_password.mixed_case_count              | 1      |
| validate_password.number_count                  | 1      |
| validate_password.policy                        | MEDIUM |
| validate_password.special_char_count            | 1      |
---------------------------------------------------------
8 rows in set (0.00 sec)
###修改密码要求(在工作生产环境不建议这么做)
mysql SET GLOBAL validate_password.length  3;
Query OK, 0 rows affected (0.03 sec)
mysql SET GLOBAL validate_password.policy  LOW;
Query OK, 0 rows affected (0.00 sec)
###再次查看对新建用户密码要求
mysql SHOW VARIABLES LIKE validate_password%;
--------------------------------------------------------
| Variable_name                                   | Value |
--------------------------------------------------------
| validate_password.changed_characters_percentage | 0     |
| validate_password.check_user_name               | ON    |
| validate_password.dictionary_file               |       |
| validate_password.length                        | 4     |
| validate_password.mixed_case_count              | 1     |
| validate_password.number_count                  | 1     |
| validate_password.policy                        | LOW   |
| validate_password.special_char_count            | 1     |
--------------------------------------------------------
8 rows in set (0.00 sec)
###新建用户:root1234
mysql CREATE USER root1234localhost IDENTIFIED BY root1234;
Query OK, 0 rows affected (0.01 sec)mysql GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO root1234localhost;
Query OK, 0 rows affected (0.02 sec)mysql GRANT SELECT ON *.* TO root1234localhost;
Query OK, 0 rows affected (0.01 sec)mysql SELECT User, Host FROM mysql.user;
-----------------------------
| User             | Host      |
-----------------------------
| mysql.infoschema | localhost |
| mysql.session    | localhost |
| mysql.sys        | localhost |
| root             | localhost |
| root1234         | localhost |
-----------------------------
5 rows in set (0.00 sec)mysql FLUSH PRIVILEGES;
Query OK, 0 rows affected (0.00 sec)建mysql库表(数据源头库表) Flink SQL select * from test_flink_cdc5; [ERROR] Could not execute SQL statement. Reason: java.lang.IllegalArgumentException: Can’t find any matched tables, please check your configured database-name: [mysql] and table-name: [mysql.test_cdc] ###mysql 默认只有这4个库(当时直接用默认库mysql建表导致flink 报一些上面奇诡的错)
mysql show databases;
--------------------
| Database           |
--------------------
| information_schema |
| mysql              |
| performance_schema |
| sys                |
--------------------
###新建一个库:test
mysql CREATE DATABASE test CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;
Query OK, 1 row affected (0.02 sec)mysql use test;
Database changed
###建表
mysql  CREATE TABLE test_cdc (-   id int NOT NULL AUTO_INCREMENT,-   name varchar(255) DEFAULT NULL,-   PRIMARY KEY (id)-  ) ENGINEInnoDB ;
Query OK, 0 rows affected (0.04 sec)mysql show tables;
----------------
| Tables_in_test |
----------------
| test_cdc       |
----------------
1 row in set (0.00 sec)##开始验证 
##mysql 使用上面创建的用户密码登录mysql
[rootslave2 ~]# mysql -u root -proot1234
mysql use test
##flink登录
[rootslave2 flink-1.16.3]# bin/stop-cluster.sh
[rootslave2 flink-1.16.3]# ./bin/sql-client.sh embedded 
##在flink sql中定义mysql源
CREATE TABLE test_flink_cdc ( id INT, name STRING,primary key(id)  NOT ENFORCED
) WITH (connector  mysql-cdc, hostname  127.0.0.1, port  3306, usernameroot1234, passwordroot1234, database-nametest, table-nametest_cdc
);
###查询flink 接收到的binlog数据
Flink SQL select * from test_flink_cdc;###到mysql sql界面向test_cdc表插入数据
mysql INSERT INTO test_cdc VALUES (001, test01);
Query OK, 1 row affected (0.02 sec)mysql INSERT INTO test_cdc VALUES (002, test02);
Query OK, 1 row affected (0.01 sec)mysql INSERT INTO test_cdc VALUES (003, test03);
Query OK, 1 row affected (0.04 sec)mysql INSERT INTO test_cdc VALUES (004, test04);
Query OK, 1 row affected (0.01 sec)mysql INSERT INTO test_cdc VALUES (005, test05);
Query OK, 1 row affected (0.01 sec)mysql INSERT INTO test_cdc VALUES (006, test06);
Query OK, 1 row affected (0.00 sec)向mysql表中插入数据  flink sql这时可以接到binlog数据  查看flink UI job情况   小结:当flink可以拿到mysql binlog源头数据下面就好做了根据自己的业务处理sink到任何数据库或组件中(例如sink到mysql,hbase,hive,pg,kafka等等)后面sink就不演示了。 
下载链接: 1.mysql jdbc jar包驱动下载 2.flink cdc驱动下载 3.flink下载 
3.flink datastream datastream 比较灵活简单下面是举例代码片段(datastream的CDC比flink sql还简单打个jar包在flink web UI界面上传运行即可此处不做举例) public class MySqlSourceExample {public static void main(String[] args) throws Exception {MySqlSourceString mySqlSource  MySqlSource.Stringbuilder().hostname(yourHostname).port(yourPort).databaseList(yourDatabaseName) // set captured database.tableList(yourDatabaseName.yourTableName) // set captured table.username(yourUsername).password(yourPassword).deserializer(new JsonDebeziumDeserializationSchema()) // converts SourceRecord to JSON String.build();StreamExecutionEnvironment env  StreamExecutionEnvironment.getExecutionEnvironment();// enable checkpointenv.enableCheckpointing(3000);env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), MySQL Source)// set 4 parallel source tasks.setParallelism(4).print().setParallelism(1); // use parallelism 1 for sink to keep message orderingenv.execute(Print MySQL Snapshot  Binlog);}
}三、总结 
公司用的cdc不能分享出来所以搭建上面案例时遇到很多问题遇到很多坑。工作中使用的就是如下截图流程,没有使用canal和kafka,使用的是logtail和SLS(阿里云的组件类似kafka,但要比kafka等功能强大)上面说了很多关于flink cdc的优点。我结合工作中使用的flink cdc说一些缺点(自己结合业务场景)。 1.使用flink cdc不适合直观观察binlog的数据(例如脏数据数据断流不能直观看到最近的binglog情况表的更新频率不高等造成的困扰)。  2.使用阿里云SLS收集mysql binlog数据将数据保存近1个月(保持时间可设置)同时可以在logstore查看数据结构样式(sls支持sql语法查询日志数据)方便后续flink代码开发也方便flink sql和datastream代码debug。  3.使用flink cdc做数据质量监控比较后知后觉(只能监控已表数据)我这边做法是直接监控sls原始数据质量发现有数据质量问题会报出来。sls也支持实时断流提醒。  4.一般对于简单的不太重要的业务适合使用flink cdc这样开发快数据流不用咋校验。对于复杂数据或复杂业务或重要数据需要观察binlog数据结果(不信任上游其他部门数据)还是使用类似sls比较好。方便查询数据变化与汇总方便做数据报警等。  5.总之还是要根据自己的业务场景和自己公司现有技术组件组合着使用比较好。  文章转载自: http://www.morning.sblgt.cn.gov.cn.sblgt.cn http://www.morning.rtbhz.cn.gov.cn.rtbhz.cn http://www.morning.brld.cn.gov.cn.brld.cn http://www.morning.xqbgm.cn.gov.cn.xqbgm.cn http://www.morning.fnywn.cn.gov.cn.fnywn.cn http://www.morning.ydrfl.cn.gov.cn.ydrfl.cn http://www.morning.skbbt.cn.gov.cn.skbbt.cn http://www.morning.ckwrn.cn.gov.cn.ckwrn.cn http://www.morning.sffkm.cn.gov.cn.sffkm.cn http://www.morning.cybch.cn.gov.cn.cybch.cn http://www.morning.cbndj.cn.gov.cn.cbndj.cn http://www.morning.nmbbt.cn.gov.cn.nmbbt.cn http://www.morning.rcmwl.cn.gov.cn.rcmwl.cn http://www.morning.pwppk.cn.gov.cn.pwppk.cn http://www.morning.tpnxr.cn.gov.cn.tpnxr.cn http://www.morning.kggxj.cn.gov.cn.kggxj.cn http://www.morning.qzbwmf.cn.gov.cn.qzbwmf.cn http://www.morning.lmhcy.cn.gov.cn.lmhcy.cn http://www.morning.lzqxb.cn.gov.cn.lzqxb.cn http://www.morning.bfcxf.cn.gov.cn.bfcxf.cn http://www.morning.dlhxj.cn.gov.cn.dlhxj.cn http://www.morning.bpmnh.cn.gov.cn.bpmnh.cn http://www.morning.kqblk.cn.gov.cn.kqblk.cn http://www.morning.zdydj.cn.gov.cn.zdydj.cn http://www.morning.yskhj.cn.gov.cn.yskhj.cn http://www.morning.jxtbr.cn.gov.cn.jxtbr.cn http://www.morning.hcqpc.cn.gov.cn.hcqpc.cn http://www.morning.brkrt.cn.gov.cn.brkrt.cn http://www.morning.ddfp.cn.gov.cn.ddfp.cn http://www.morning.ksqzd.cn.gov.cn.ksqzd.cn http://www.morning.xjpnq.cn.gov.cn.xjpnq.cn http://www.morning.zwsgl.cn.gov.cn.zwsgl.cn http://www.morning.ydxwj.cn.gov.cn.ydxwj.cn http://www.morning.gbrps.cn.gov.cn.gbrps.cn http://www.morning.sbjbs.cn.gov.cn.sbjbs.cn http://www.morning.qkkmd.cn.gov.cn.qkkmd.cn http://www.morning.ygbq.cn.gov.cn.ygbq.cn http://www.morning.jnhhc.cn.gov.cn.jnhhc.cn http://www.morning.gbsby.cn.gov.cn.gbsby.cn http://www.morning.gqnll.cn.gov.cn.gqnll.cn http://www.morning.tyjnr.cn.gov.cn.tyjnr.cn http://www.morning.qwbtr.cn.gov.cn.qwbtr.cn http://www.morning.ghssm.cn.gov.cn.ghssm.cn http://www.morning.dwfzm.cn.gov.cn.dwfzm.cn http://www.morning.kbdjn.cn.gov.cn.kbdjn.cn http://www.morning.xblrq.cn.gov.cn.xblrq.cn http://www.morning.lsyk.cn.gov.cn.lsyk.cn http://www.morning.dqgbx.cn.gov.cn.dqgbx.cn http://www.morning.dzdtj.cn.gov.cn.dzdtj.cn http://www.morning.kfysh.com.gov.cn.kfysh.com http://www.morning.zkbxx.cn.gov.cn.zkbxx.cn http://www.morning.zdqsc.cn.gov.cn.zdqsc.cn http://www.morning.lkhgq.cn.gov.cn.lkhgq.cn http://www.morning.zcfsq.cn.gov.cn.zcfsq.cn http://www.morning.wnqfz.cn.gov.cn.wnqfz.cn http://www.morning.cqrenli.com.gov.cn.cqrenli.com http://www.morning.xmwdt.cn.gov.cn.xmwdt.cn http://www.morning.mdrnn.cn.gov.cn.mdrnn.cn http://www.morning.tktcr.cn.gov.cn.tktcr.cn http://www.morning.rfyff.cn.gov.cn.rfyff.cn http://www.morning.rpzqk.cn.gov.cn.rpzqk.cn http://www.morning.nwclg.cn.gov.cn.nwclg.cn http://www.morning.ttcmdsg.cn.gov.cn.ttcmdsg.cn http://www.morning.xbxks.cn.gov.cn.xbxks.cn http://www.morning.tdhxp.cn.gov.cn.tdhxp.cn http://www.morning.tldfp.cn.gov.cn.tldfp.cn http://www.morning.fndfn.cn.gov.cn.fndfn.cn http://www.morning.gjlst.cn.gov.cn.gjlst.cn http://www.morning.bpmz.cn.gov.cn.bpmz.cn http://www.morning.mwlxk.cn.gov.cn.mwlxk.cn http://www.morning.bgqqr.cn.gov.cn.bgqqr.cn http://www.morning.jgnst.cn.gov.cn.jgnst.cn http://www.morning.trpq.cn.gov.cn.trpq.cn http://www.morning.hpkr.cn.gov.cn.hpkr.cn http://www.morning.ypwlb.cn.gov.cn.ypwlb.cn http://www.morning.nckzt.cn.gov.cn.nckzt.cn http://www.morning.qhmhz.cn.gov.cn.qhmhz.cn http://www.morning.qsy39.cn.gov.cn.qsy39.cn http://www.morning.xbyyd.cn.gov.cn.xbyyd.cn http://www.morning.ptqbt.cn.gov.cn.ptqbt.cn