做一小说网站要花多钱,苏州一建建筑集团有限公司,盐城网络,淘宝网站开始怎么做的What’s Flink-cdc? Flink CDC 是基于Apache Flink的一种数据变更捕获技术#xff0c;用于从数据源#xff08;如数据库#xff09;中捕获和处理数据的变更事件。CDC技术允许实时地捕获数据库中的增、删、改操作#xff0c;将这些变更事件转化为流式数据#xff0c;并能够…What’s Flink-cdc? Flink CDC 是基于Apache Flink的一种数据变更捕获技术用于从数据源如数据库中捕获和处理数据的变更事件。CDC技术允许实时地捕获数据库中的增、删、改操作将这些变更事件转化为流式数据并能够对这些事件进行实时处理和分析。
Flink CDC提供了与各种数据源集成的功能包括常见的关系型数据库如MySQL、PostgreSQL、Oracle等以及NoSQL数据库如MongoDB、HBase等。它通过监控数据库的日志或轮询方式来捕获数据变更并将变更事件作为数据流发送到Flink的任务中进行处理。
Flink CDC 深度集成并由 Apache Flink 驱动提供以下核心功能
✅ 端到端的数据集成框架 ✅ 为数据集成的用户提供了易于构建作业的 API ✅ 支持在 Source 和 Sink 中处理多个表 ✅ 整库同步 ✅具备表结构变更自动同步的能力Schema Evolution
在使用者的角度就是Flink-cdc可以简化流处理的流程: 引入Flink-cdc之前流处理流程 引入Flink-cdc之后后流处理流程
如上所示在flink-cdc被引入后大大简化了流处理流程
Flink-cdc支持的链接及对应的版本
Pipeline Connectors Source Connectors 截止目前2024-05-23
Flink-cdc与Flink对应对影版本的关系
截止目前2024-05-23
flink-connector-mysql-cdc 实例分析
示例代码
demo代码
import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.runtime.state.hashmap.HashMapStateBackend;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class MySqlSourceDemo {public static void main(String[] args) throws Exception {MySqlSourceString mySqlSource MySqlSource.Stringbuilder().hostname(mysql-server-host).port(3306).databaseList(mydb) // 设置捕获的数据库.tableList(mydb.products) // 设置捕获的表,如果需要同步整个数据库请将 tableList 设置为 .*.
// .tableList(.*) // 捕获整个数据库的表
// .tableList(^(?!mysql|information_schema|performance_schema).*) // 设置捕获的表排除系统库
// .tableList(mydb.(?!products|orders).*) // 同步排除products和orders表之外的整个my_db库.username(flink-cdc).password(xxx).serverId(5400-5405).deserializer(new JsonDebeziumDeserializationSchema()) // 将 SourceRecord 转换为 JSON 字符串.serverTimeZone(Asia/Shanghai) // 设置时区.startupOptions(StartupOptions.initial()).scanNewlyAddedTableEnabled(true) // 启用扫描新添加的表功能
// .includeSchemaChanges(true) // 包括 schema 变更.build();org.apache.flink.configuration.Configuration config new org.apache.flink.configuration.Configuration();config.setString(rest.port, 8081);
// StreamExecutionEnvironment env StreamExecutionEnvironment.createLocalEnvironment(config); //本地环境调试用StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();// 设置 3s 的 checkpoint 间隔env.enableCheckpointing(3000);env.setStateBackend(new HashMapStateBackend());env.getCheckpointConfig().setCheckpointStorage(file:///tmp/ck);//本地文件系统
// env.getCheckpointConfig().setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); 1.14.0 版本开始支持env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), MySQL Source)// 设置 source 节点的并行度为 4.setParallelism(5).print().setParallelism(1); // 设置 sink 节点并行度为 1env.execute(Print MySQL Snapshot Binlog);}
}
maven依赖
propertiesmaven.compiler.source8/maven.compiler.sourcemaven.compiler.target8/maven.compiler.targetproject.build.sourceEncodingUTF-8/project.build.sourceEncodingflink.version1.14.5/flink.versionscala.binary.version2.12/scala.binary.version/propertiesdependenciesdependencygroupIdjunit/groupIdartifactIdjunit/artifactIdscopetest/scope/dependency!-- 将 Apache Flink 的 Web 运行时模块添加到项目中 --dependencygroupIdorg.apache.flink/groupIdartifactIdflink-runtime-web_${scala.binary.version}/artifactIdversion${flink.version}/versionscopeprovided/scope/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-clients_${scala.binary.version}/artifactIdversion${flink.version}/versionscopeprovided/scope !--provided生命周期在test模式才可以运行在main模式会找不到包--/dependencydependencygroupIdcom.ververica/groupIdartifactIdflink-connector-mysql-cdc/artifactIdversion2.3.0/versionscopecompile/scope/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-table-common/artifactIdversion${flink.version}/versionscopecompile/scope/dependencydependencygroupIdorg.slf4j/groupIdartifactIdslf4j-log4j12/artifactIdversion1.7.25/versionscopeprovided/scope/dependency/dependencies日志配置文件 log4j.properties
log4j.rootCategoryerror,stdout
log4j.appender.stdoutorg.apache.log4j.ConsoleAppender
log4j.appender.stdout.targetSystem.out
log4j.appender.stdout.layoutorg.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern%d{yyyy-MM-dd HH:mm:ss} %p %c{1}:%L - %m%n
启动standalone Flink级群
# jobmanager
docker run -d \
--name flink-jm \
--hostname flink-jm \
-p 8082:8081 \
--env FLINK_PROPERTIESjobmanager.rpc.address: flink-jm \
--network flink-network-standalone \
ponylee/flink:1.15.0-java8 \
jobmanager# taskmanager
docker run -d \
--name flink-tm \
--hostname flink-tm \
--env FLINK_PROPERTIESjobmanager.rpc.address: flink-jm \
--network flink-network-standalone \
ponylee/flink:1.15.0-java8 \
taskmanager \
-Dtaskmanager.memory.process.size1024m \
-Dtaskmanager.numberOfTaskSlots5 \
-Drest.flamegraph.enabledtrue
分析说明
为每个 Reader 设置不同的 Server id
每个用于读取 binlog 的 MySQL 数据库客户端都应该有一个唯一的 id称为 Server id。 MySQL 服务器将使用此 id 来维护网络连接和 binlog 位置。 因此如果不同的作业共享相同的 Server id 则可能导致从错误的 binlog 位置读取数据。 因此建议通过为每个 Reader 设置不同的 Server id , 假设 Source 并行度为 4server id 配置必须serverId(“5400-5405”)5405-54005 4。来为 4 个 Source readers 中的每一个分配唯一的 Server id。
查看mysql链接发现 select * from information_schema.processlist where user ‘flink-cdc’; Flink-cdc对mysql的影响 正常情况下Flink-cdc是No-lock Read主库可以继续处理事务和查询而不会导致主库进程阻塞不会对主库产生直接影响。但是在某些情况下数据同步的过程中可能会对主库产生一些间接影响比如网络、IO、CPU负载以及mysql的并发连接数等资源消耗。但这些对主库的开销影响相对较小全量同步阶段可能比较耗能但时间相对比较短。 从上图mysql资源使用情况来看flink-cdc对内存和CPU负载影响微乎极微是No-lock Read主要表现对网络和IO的资源消耗。
断点续传
通过从checkpoint/savepoint 恢复flink-cdc可以保证断点续传。 从checkpoint/savepoint恢复缩小同步范围例如从tableList(“mydb.products,mydb.orders”)或tableList(“.*”) 缩小到 tableList(“mydb.products”)应用更新生效。 应用从checkpoint/savepoint恢复扩大同步范围的部分不会生效例如从tableList(“mydb.products”) 到 tableList(“mydb.products,mydb.orders”)或tableList(“.*”)应用更新不生效生效。若想使动态加表生效可以显示制定scanNewlyAddedTableEnabled(true) 来启用扫描新添加的表功能。如没有特殊情况建议在开发环境开启此配置。
flink-cdc包名变更
Flink CDC 项目 从 2.0.0 版本将 group id 从com.alibaba.ververica 改成 com.ververica, 自 3.1 版本从将 group id 从 com.ververica 改成 org.apache.flink。 这是为了让项目更加社区中立让各个公司的开发者共建时更方便。所以在maven仓库找 2.x 的包时路径是 /com/ververica找3.1及以上版本的包时路径是/org/apache/flink
参考 flink-cdc flink-cdc docs 文章转载自: http://www.morning.skcmt.cn.gov.cn.skcmt.cn http://www.morning.gnyhc.cn.gov.cn.gnyhc.cn http://www.morning.tqjks.cn.gov.cn.tqjks.cn http://www.morning.yrycb.cn.gov.cn.yrycb.cn http://www.morning.rfbpq.cn.gov.cn.rfbpq.cn http://www.morning.qhkx.cn.gov.cn.qhkx.cn http://www.morning.csnch.cn.gov.cn.csnch.cn http://www.morning.zxdhp.cn.gov.cn.zxdhp.cn http://www.morning.jqjnl.cn.gov.cn.jqjnl.cn http://www.morning.twdwy.cn.gov.cn.twdwy.cn http://www.morning.rydbs.cn.gov.cn.rydbs.cn http://www.morning.clpdm.cn.gov.cn.clpdm.cn http://www.morning.bhdtx.cn.gov.cn.bhdtx.cn http://www.morning.lkbkd.cn.gov.cn.lkbkd.cn http://www.morning.ndrzq.cn.gov.cn.ndrzq.cn http://www.morning.yrck.cn.gov.cn.yrck.cn http://www.morning.yxbrn.cn.gov.cn.yxbrn.cn http://www.morning.pnntx.cn.gov.cn.pnntx.cn http://www.morning.rkfxc.cn.gov.cn.rkfxc.cn http://www.morning.mbprq.cn.gov.cn.mbprq.cn http://www.morning.cykqb.cn.gov.cn.cykqb.cn http://www.morning.qinhuangdjy.cn.gov.cn.qinhuangdjy.cn http://www.morning.smj78.cn.gov.cn.smj78.cn http://www.morning.nmkbl.cn.gov.cn.nmkbl.cn http://www.morning.yxbrn.cn.gov.cn.yxbrn.cn http://www.morning.ktfbl.cn.gov.cn.ktfbl.cn http://www.morning.kbdrq.cn.gov.cn.kbdrq.cn http://www.morning.qdmdp.cn.gov.cn.qdmdp.cn http://www.morning.qwbht.cn.gov.cn.qwbht.cn http://www.morning.wxfjx.cn.gov.cn.wxfjx.cn http://www.morning.hyryq.cn.gov.cn.hyryq.cn http://www.morning.gwkwt.cn.gov.cn.gwkwt.cn http://www.morning.ntyanze.com.gov.cn.ntyanze.com http://www.morning.bpmth.cn.gov.cn.bpmth.cn http://www.morning.pwsnr.cn.gov.cn.pwsnr.cn http://www.morning.xknmn.cn.gov.cn.xknmn.cn http://www.morning.xkzr.cn.gov.cn.xkzr.cn http://www.morning.lrylj.cn.gov.cn.lrylj.cn http://www.morning.homayy.com.gov.cn.homayy.com http://www.morning.nnttr.cn.gov.cn.nnttr.cn http://www.morning.jtnph.cn.gov.cn.jtnph.cn http://www.morning.ylsxk.cn.gov.cn.ylsxk.cn http://www.morning.hilmwmu.cn.gov.cn.hilmwmu.cn http://www.morning.xkgyh.cn.gov.cn.xkgyh.cn http://www.morning.gkpgj.cn.gov.cn.gkpgj.cn http://www.morning.ntwxt.cn.gov.cn.ntwxt.cn http://www.morning.phjny.cn.gov.cn.phjny.cn http://www.morning.dhnqt.cn.gov.cn.dhnqt.cn http://www.morning.rzdpd.cn.gov.cn.rzdpd.cn http://www.morning.rtlrz.cn.gov.cn.rtlrz.cn http://www.morning.nnjq.cn.gov.cn.nnjq.cn http://www.morning.zczkm.cn.gov.cn.zczkm.cn http://www.morning.kkysz.cn.gov.cn.kkysz.cn http://www.morning.rwjh.cn.gov.cn.rwjh.cn http://www.morning.wqrdx.cn.gov.cn.wqrdx.cn http://www.morning.kfhm.cn.gov.cn.kfhm.cn http://www.morning.rjrlx.cn.gov.cn.rjrlx.cn http://www.morning.wpsfc.cn.gov.cn.wpsfc.cn http://www.morning.gfhng.cn.gov.cn.gfhng.cn http://www.morning.pdgqf.cn.gov.cn.pdgqf.cn http://www.morning.enjoinfo.cn.gov.cn.enjoinfo.cn http://www.morning.pzjfz.cn.gov.cn.pzjfz.cn http://www.morning.mydgr.cn.gov.cn.mydgr.cn http://www.morning.ydhmt.cn.gov.cn.ydhmt.cn http://www.morning.fbbmg.cn.gov.cn.fbbmg.cn http://www.morning.njqpg.cn.gov.cn.njqpg.cn http://www.morning.yntsr.cn.gov.cn.yntsr.cn http://www.morning.zyndj.cn.gov.cn.zyndj.cn http://www.morning.rwtlj.cn.gov.cn.rwtlj.cn http://www.morning.tfcwj.cn.gov.cn.tfcwj.cn http://www.morning.ymrq.cn.gov.cn.ymrq.cn http://www.morning.qmbpy.cn.gov.cn.qmbpy.cn http://www.morning.qgtfl.cn.gov.cn.qgtfl.cn http://www.morning.fsfz.cn.gov.cn.fsfz.cn http://www.morning.qqrqb.cn.gov.cn.qqrqb.cn http://www.morning.qckwj.cn.gov.cn.qckwj.cn http://www.morning.pwmm.cn.gov.cn.pwmm.cn http://www.morning.zsleyuan.cn.gov.cn.zsleyuan.cn http://www.morning.xnflx.cn.gov.cn.xnflx.cn http://www.morning.crfyr.cn.gov.cn.crfyr.cn