当前位置: 首页 > news >正文

那个网站做图片比较赚钱售后好的品牌策划公司

那个网站做图片比较赚钱,售后好的品牌策划公司,安卓集成wordpress,具有价值的微网站建设目录 一、前言 二、Flink CDC介绍 2.1 什么是Flink CDC 2.2 Flink CDC 特点 2.3 Flink CDC 核心工作原理 2.4 Flink CDC 使用场景 三、常用的数据同步方案对比 3.1 数据同步概述 3.1.1 数据同步来源 3.2 常用的数据同步方案汇总 3.3 为什么推荐Flink CDC 3.4 Flink …目录 一、前言 二、Flink CDC介绍 2.1 什么是Flink CDC 2.2 Flink CDC 特点 2.3 Flink CDC 核心工作原理 2.4 Flink CDC 使用场景 三、常用的数据同步方案对比 3.1 数据同步概述 3.1.1 数据同步来源 3.2 常用的数据同步方案汇总 3.3 为什么推荐Flink CDC 3.4 Flink CDC 适用范围 3.5 Flink CDC不同版本对比 3.5.1 Flink CDC 1.x 3.5.2 Flink CDC 2.x 3.5.3 Flink CDC 3.x 四、Java使用Flink CDC同步mysql数据 4.1 环境准备 4.1.1 组件版本说明 4.1.2 数据库准备 4.1.3 导入相关的依赖 4.2 使用Flink CDC动态监听mysql数据变化 4.2.1 自定义反序列化器 4.2.2 自定义Sink输出 4.2.3 启动任务类 4.2.4 效果测试 4.3 与springboot整合实现过程 4.3.1 补充依赖 4.3.2 启动类改造 4.3.3 效果测试 五、写在文末 一、前言 在微服务系统架构中经常会涉及到跨系统的数据同步比如需要从A系统的mysql数据库同步到B系统的oracle数据库再比如说需要将mysql中的数据同步到es中更有一些场景下涉及到多个系统的异构数据源需要加工处理后将数据推送到kafka中。在这些场景下需求各异需要满足的场景也不尽相同。很难有某一种开源工具可以满足所有的场景接下来本文将介绍使用flinkcdc的方式同步mysql的数据。 二、Flink CDC介绍 2.1 什么是Flink CDC Flink CDCChange Data Capture是一个用于实时捕获数据库变更事件的工具它是基于 Apache Flink 构建的。Flink CDC 可以从关系型数据库中实时捕获表的数据变更事件并将这些事件转化为流式数据以便进行实时处理和分析。这对于实时数据仓库、实时数据分析以及数据同步等场景非常有用。 Flink-CDC 开源地址 Apache/Flink-CDC Flink-CDC 中文文档Apache Flink CDC | Apache Flink CDC Flink CDC connector 可以捕获在一个或多个表中发生的所有变更。该模式通常有一个前记录和一个后记录。Flink CDC connector 可以直接在Flink中以非约束模式流使用而不需要使用类似 kafka 之类的中间件中转数据。 Flink SQL CDC 内置了 Debezium 引擎利用其抽取日志获取变更的能力将 changelog 转换为 Flink SQL 认识的 RowData 数据。 2.2 Flink CDC 特点 Flink CDC 具备如下特点 实时性Flink CDC 支持实时捕获数据库中的数据变更事件这意味着它可以立即响应数据库中的任何更改。 支持多种数据库Flink CDC 支持多种关系型数据库如 MySQL、PostgreSQL、Oracle 等。 易于集成Flink CDC 可以轻松集成到现有的 Flink 应用程序中利用 Flink 强大的流处理能力来处理捕获的数据。 高可用性Flink CDC 设计为高可用的可以在分布式环境中运行并且支持故障恢复和容错机制。 灵活的数据处理捕获的数据可以被转换、过滤、聚合等支持复杂的数据处理逻辑。 2.3 Flink CDC 核心工作原理 Flink CDC 的工作原理主要包括以下几个步骤 连接数据库Flink CDC 通过 JDBC 连接到目标数据库并监听数据库的变更事件。 捕获变更事件通过监听数据库的日志如 MySQL 的 Binlog、PostgreSQL 的 WAL 日志等捕获数据的插入、更新、删除等变更事件。 转换为流数据将捕获的变更事件转换为 Flink 可处理的流数据格式。 处理数据将转换后的流数据送入 Flink 的流处理管道进行进一步的数据处理如清洗、聚合、分析等。 输出结果处理后的数据可以被输出到其他系统如写入另一个数据库、发送到消息队列、写入文件系统等。 2.4 Flink CDC 使用场景 Flink CDCChange Data Capture是一种用于实时捕获数据库变更事件的技术它能够从关系型数据库中捕获表的数据变更并将这些变更事件转化为流式数据以便进行实时处理。基于这个特性在下面这些场景下可以考虑使用Flink CDC处理 实时数据仓库 实时更新数据仓库Flink CDC 可以实时捕获数据库中的变更事件并将这些事件实时地加载到数据仓库中从而实现实时的数据更新和查询。 实时分析通过捕获和处理实时数据企业可以进行实时的业务分析比如实时监控销售数据、实时统计用户行为等。 实时数据同步 数据库间同步Flink CDC 可以将一个数据库中的变更事件实时同步到另一个数据库实现数据的实时复制。这对于需要跨数据中心或多云环境的数据同步场景非常有用。 异构系统间同步可以将传统的关系型数据库的数据实时同步到 NoSQL 数据库、搜索引擎或其他存储系统中实现数据的实时一致性和可用性。 实时数据处理 实时ETLFlink CDC 可以作为 ETLExtract, Transform, Load流程的一部分用于实时提取、转换和加载数据。例如可以从数据库中实时提取数据进行清洗和聚合处理后再加载到数据仓库或其他系统中。 实时统计分析通过对捕获的变更事件进行实时处理可以实现各种实时统计分析如实时流量统计、实时交易统计等。 实时告警和监控 实时监控Flink CDC 可以实时捕获数据库中的变更事件并对这些事件进行实时分析和处理用于实时监控关键指标的变化。 实时告警当检测到异常数据变更时可以实时触发告警通知帮助企业及时响应和处理问题。 实时推荐系统 实时用户行为追踪通过对用户行为数据的实时捕获和处理可以实现实时的个性化推荐。 实时更新推荐模型Flink CDC 可以实时捕获用户的最新行为数据并用于实时更新推荐模型从而提高推荐的准确性和时效性。 实时交易处理 金融交易在金融行业中Flink CDC 可以用于实时捕获和处理交易数据实现高速的交易处理和结算。 欺诈检测通过对实时交易数据的分析可以快速检测出潜在的欺诈行为并及时采取措施。 物联网IoT应用 实时数据采集在 IoT 场景中Flink CDC 可以用于实时采集设备数据并进行实时处理和分析。 实时决策支持通过对 IoT 数据的实时分析可以支持实时的决策制定如实时调整设备的工作状态、优化资源配置等。 实时日志处理 实时日志分析Flink CDC 可以用于实时捕获数据库的日志数据并进行实时处理和分析用于性能监控、错误诊断等。 审计与合规通过实时捕获数据库的操作记录可以用于审计和合规性的检查。 三、常用的数据同步方案对比 3.1 数据同步概述 当系统发展到一定阶段尤其是系统的规模越来越大业务体量也不断扩大的时候一个系统可能会用到多种数据存储中间件而不再是单纯的mysqlpgsql等甚至一个系统中一个或多个微服务无法再满足业务的需求而需要单独做数据存储的服务在类似的场景下很难避免新的服务需要从原有的微服务中抽取数据或定期做数据的同步处理诸如此类的场景还有很多如何处理数据同步的需求呢这就是下面要讨论的问题。 3.1.1 数据同步来源 系统数据同步的需求通常源于多种业务和技术上的考量。以下是一些主要的原因解释为什么需要进行系统数据同步 高可用性和灾难恢复 数据冗余通过将数据同步到多个位置可以增加数据的冗余度减少单点故障的风险。 灾难恢复在发生硬件故障、自然灾害或其他不可预见事件时可以迅速切换到备用数据副本确保业务连续性 负载均衡和性能优化 读写分离将读操作和写操作分开处理通过将读操作分散到多个数据库实例上来降低单一数据库的负载。 性能优化通过将热点数据同步到性能更高的存储设备或系统提升数据访问速度和系统整体性能。 地理分布 全球分布在全球范围内分布数据以减少网络延迟提高用户体验。 多地备份在不同地理位置建立数据备份中心以应对局部故障或自然灾害的影响。 数据整合 数据仓库将来自不同源的数据整合到一个统一的数据仓库中以便进行集中管理和分析。 实时分析通过实时同步数据可以快速获取最新的业务数据进行实时分析和决策支持。 业务扩展 系统迁移当公司需要更换数据库系统或升级现有系统时需要将数据从旧系统同步到新系统。 新系统上线在部署新的应用程序或服务时需要将现有数据同步到新的系统中。 数据共享 多部门协作不同部门或团队需要访问相同的数据集以促进信息共享和协同工作。 合作伙伴集成与外部合作伙伴或第三方系统共享数据实现数据互通和业务合作。 数据一致性 分布式事务在分布式系统中需要保证数据的一致性通过数据同步来协调不同节点之间的状态。 分布式缓存在使用分布式缓存如 Redis Cluster时需要将数据同步到不同的缓存节点保持数据的一致性。 数据迁移 数据转移在进行数据迁移时需要将数据从一个系统或数据库转移到另一个系统或数据库以适应业务发展和技术进步的需要。 3.2 常用的数据同步方案汇总 下图列举了几种主流的用于解决数据同步场景的方案 关于图中几种技术做如下简单的介绍便于做技术选型作为对比 Debezium Debezium是国外⽤户常⽤的CDC组件单机对于分布式来说在数据读取能力的拓展上没有分布式的更具有优势在大数据众多的分布式框架中Hive、Hudi等Flink CDC 的架构能够很好地接入这些框架。 DataX DataX无法支持增量同步。如果一张Mysql表每天增量的数据是不同天的数据并且没有办法确定它的产生时间那么如何将数据同步到数仓是一个值得考虑的问题。DataX支持全表同步也支持sql查询的方式导入导出全量同步一定是不可取的sql查询的方式没有可以确定增量数据的字段的话也不是一个好的增量数据同步方案。 Canal Canal是用java开发的基于数据库增量日志解析提供增量数据订阅消费的中间件。Canal主要支持了MySQL的Binlog解析将增量数据写入中间件中例如kafka,Rocket MQ等但是无法同步历史数据因为无法获取到binlog的变更。 Sqoop Sqoop主要用于在Hadoop(Hive)与传统的数据库(mysql、postgresql...)间进行数据的传递。Sqoop将导入或导出命令翻译成mapreduce程序来实现这样的弊端就是Sqoop只能做批量导入遵循事务的一致性Mapreduce任务成功则同步成功失败则全部同步失败。 Kettle Kettle是一款开源的数据集成工具主要用于数据抽取、转换和加载ETL。它提供了图形化的界面使得用户可以通过拖拽组件的方式来设计和执行数据集成任务。Kettle 被广泛应用于数据仓库构建、数据迁移、数据清洗等多种场景。 虽然 Kettle 在处理中小型数据集时表现良好但在处理大规模数据集时可能会遇到性能瓶颈尤其是在没有进行优化的情况下。 Kettle 在运行时可能会消耗较多的内存和 CPU 资源特别是当处理复杂的转换任务时。 虽然 Kettle 的基本操作相对简单但对于高级功能和复杂任务的设计用户仍需投入一定的时间和精力来学习和掌握。 Flink CDC Flink CDC 基本都弥补了以上框架的不足将数据库的全量和增量数据一体化地同步到消息队列和数据仓库中也可以用于实时数据集成将数据库数据实时入湖入仓无需像其他的CDC工具一样需要在服务器上进行部署减少了维护成本链路更少完美套接Flink程序CDC获取到的数据流直接对接Flink进行数据加工处理一套代码即可完成对数据的抽取转换和写出既可以使用flink的DataStream API完成编码也可以使用较为上层的FlinkSQL API进行操作。 3.3 为什么推荐Flink CDC 在java生态中以一个使用较多的场景为例说明如下是一个业务系统与大数据系统进行交互时的数据处理流 在上面这个处理流程下业务系统将数据正常写入mysql数据库数据库需要开启binlog然后借助canal监听binlog把日志写入到kafka中而Flink实时消费Kakfa的数据实现mysql数据的同步或其他内容等所以整个过程分为下面几个步骤 mysql开启binlog canal同步binlog数据写入到kafka flink或其他组件读取kakfa中的binlog数据进行相关的业务处理。 不难发现上面的处理流程整体的链路较长需要用到的组件也比较多一旦中间某个组件运行过程中发生问题下游的其他业务均会中断系统的脆弱性风险就增加了而使用了 Flink CDC之后则会变成下面这样了也就是说只需要数据库开启binlog即可而后Flink CDC就可以自动进行数据同步了整个链路就大大缩减了 3.4 Flink CDC 适用范围 Flink CDCChange Data Capture连接器是 Apache Flink 社区为 Flink 提供的一种用于捕获数据库变更事件的工具。它允许用户从关系型数据库中实时捕获表的数据变更并将这些变更事件转化为流式数据以便进行实时处理比如你需要将mysql的数据同步到另一个mysql数据库就需要使用mysql连接器如果需要同步mongodb的数据则需要使用mongodb的连接器。截止到Flink CDC 2.2 为止支持的连接器 支Flink CDC 持的Flink版本在实际使用的时候需要根据版本的对照进行选择 3.5 Flink CDC不同版本对比 Flink CDC技术发展到今天历经了多个版本所以底层在实现数据同步的时候技术也发生了变化在实际开发使用中需要搞清楚这一点为了加深对此处的理解下面列举了几个版本的实现技术对比 3.5.1 Flink CDC 1.x 在 Flink cdc 1.x 版本中底层选用 debezium 作为采集工具Debezium 为保证数据一致性通过对读取的数据库或者表进行加锁加锁是在全量的时候加锁。 下图是开发者社区的一张全局锁和表锁的过程图 FlinkCDC全量同步时会获取全局读锁或者表锁。所谓加锁目的是为了确认Mysql binlog 的起始位置和Mysql 表的Schema获取到锁后Mysql所有的ddldml操作都会处于wait read lock阶段如果锁获取时间超时程序还会抛出异常而增量同步时因为是监控binlog的方式所以对mysql没有影响。 什么是表锁表锁会自动加锁。查询操作SELECT会自动给涉及的所有表加读锁更新操作UPDATE、DELETE、INSERT。如果启动一个CDC任务而另一个CDC程序也处于初始化阶段获取不到全局锁那么那么此程序就会去获取表级锁表及锁锁的时间会更长一般是全局读锁的几十倍时长。 Flink CDC 1.x 可以不加锁能够满足大部分场景但牺牲了一定的数据准确性。Flink CDC 1.x 默认加全局锁虽然能保证数据一致性但存在上述 hang 住数据的风险。由此可以看来FlinkCDC 1.x 存在着一些不足 由于其锁机制全量同步阶段之有一个任务在进行同步不支持并发同步数据传输会比较慢。 锁表时会阻止其他事务提交。 不支持断点续传如果在同步过程中出现mysql连接超时或者flink程序快照中断那么我们无法从断开点开始续传因为目前暂不支持checkpoint。 3.5.2 Flink CDC 2.x Flink 2.x 时代就是说既然大家都觉得这个锁没有必要了那就把Debezium的锁踢开踢开之后我们就引用了Netfix 的DBlog paper 的思想 DBlog paper的思想说白了也是分而治之的思想既然我对全局表加锁会有比较大的影响那么我把数据做切分每一段数据做大数据一致那是不是就可以保证数据的整体一致呢 举例来说假如mysql表里有100万条数据那么我把这些数据切分成1万个chunk(chunk 本意是树干在这里就是一段数据集合的意思每个chunk 里面有100条数据先记录这个chunk里面的最低位和最高位比如是101和200读完之后把这个数据存入一个缓冲区我们叫做buffer如果后期有个binlog的chunk 过来比如是105150和160的数据发生了变化那么我们就会针对相应的值改动并不会改动全部的值改动效率就比较高了这就是无锁的大致思想了 水平扩展说白了就是把原来的这个单线程换成多线程的处理嘛由Flink的source 去实现的做个checkpoint 也是对最新的那些buffer和chunk 做个checkpoint 实现了容错 Flink 2.x不仅引入了增量快照读取机制还带来了一些其他功能的改进。以下是对Flink 2.x的主要功能的介绍 增量快照读取Flink 2.x引入了增量快照读取机制这是一种全新的数据读取方式。该机制支持并发读取和以chunk为粒度进行checkpoint。在增量快照读取过程中Flink首先根据表的主键将其划分为多个块chunk然后将这些块分配给多个读取器并行读取数据。这一机制极大地提高了数据读取的效率。 精确一次性处理Flink 2.x引入了Exactly-Once语义确保数据处理结果的精确一次性。MySQL CDC 连接器是Flink的Source连接器可以利用Flink的checkpoint机制来确保精确一次性处理。 动态加表Flink 2.x支持动态加表通过使用savepoint来复用之前作业的状态解决了动态加表的问题。 无主键表的处理Flink 2.x对无主键表的读取和处理进行了优化。在无主键表中Flink可以通过一些额外的字段来识别数据记录的唯一性从而实现准确的数据读取和处理。 3.5.3 Flink CDC 3.x 虽然 Flink CDC 有很多技术优势社区用户增长很快但随着 Flink CDC 项目用户基数的日益增长以及应用场景的不断扩大很多问题随着用户在社区的反馈也不断冒出因此Flink CDC技术团队也适时推出Flink CDC 3.x。 Flink CDC 3.0 的整体架构自顶而下分为 4 层 Flink CDC API面向终端用户的 API 层用户使用 YAML 格式配置数据同步流水线使用 Flink CDC CLI 提交任务 Flink CDC Connect对接外部系统的连接器层通过对 Flink 与现有 Flink CDC source 进行封装实现对外部系统同步数据的读取和写入 Flink CDC Composer同步任务的构建层将用户的同步任务翻译为 Flink DataStream 作业 Flink CDC Runtime运行时层根据数据同步场景高度定制 Flink 算子实现 schema 变更、路由、变换等高级功能 四、Java使用Flink CDC同步mysql数据 4.1 环境准备 根据flink cdc的版本不同在java或springboot集成的时候选择的jdk版本也不尽相同需要注意这一点否则会在运行过程中出现奇怪的问题。 4.1.1 组件版本说明 本文演示中使用的相关组件版本如下 jdk17 springboot版本2.5.5 mysql8.0.23 Flink CDC 相关核心依赖版本1.13.0 4.1.2 数据库准备 案例要演示的需求场景为使用Flink CDC 监听某个表的数据然后同步到另一张表中因此需要提前准备两张表源表和目标表为了简单起见两个表除了表名不一样其他的均相同建表sql如下 CREATE TABLE tb_role (id varchar(32) NOT NULL COMMENT 主键,role_code varchar(32) NOT NULL COMMENT 版本号,role_name varchar(32) DEFAULT NULL COMMENT 角色名称,PRIMARY KEY (id) ) ENGINEInnoDB DEFAULT CHARSETutf8mb3 COMMENTtb角色表; 4.1.3 导入相关的依赖 主要是flink cdc相关的依赖包 dependencygroupIdorg.apache.flink/groupIdartifactIdflink-java/artifactIdversion1.13.0/version/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-streaming-java_2.12/artifactIdversion1.13.0/version/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-table-planner-blink_2.12/artifactIdversion1.13.0/version/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-clients_2.12/artifactIdversion1.13.0/version/dependencydependencygroupIdcom.alibaba.ververica/groupIdartifactIdflink-connector-mysql-cdc/artifactIdversion1.4.0/version/dependencydependencygroupIdcom.alibaba/groupIdartifactIdfastjson/artifactIdversion2.0.35/version/dependency 补充说明单独使用java运行时还需要添加下面两个日志组件依赖 dependencygroupIdorg.slf4j/groupIdartifactIdslf4j-log4j12/artifactIdversion1.7.30/version/dependencydependencygroupIdorg.apache.logging.log4j/groupIdartifactIdlog4j-to-slf4j/artifactIdversion2.14.0/version/dependency 4.2 使用Flink CDC动态监听mysql数据变化 需求场景通过Flink CDC 监听tb_role表数据变化写入tb_role_copy 4.2.1 自定义反序列化器 反序列化器的目的是为了解析flink cdc监听到mysql表数据变化的日志以json的形式进行解析方便对日志中的关键参数进行处理 package com.congge.flink.blog;import com.alibaba.fastjson.JSONObject; import com.alibaba.ververica.cdc.debezium.DebeziumDeserializationSchema; import io.debezium.data.Envelope; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.util.Collector; import org.apache.kafka.connect.data.Field; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.source.SourceRecord;import java.util.List;public class CustomerSchema implements DebeziumDeserializationSchemaString {/*** 封装的数据格式* {* database:,* tableName:,* before:{id:,tm_name:....},* after:{id:,tm_name:....},* type:c u d* //ts:156456135615* }*/Overridepublic void deserialize(SourceRecord sourceRecord, CollectorString collector) throws Exception {//1.创建JSON对象用于存储最终数据JSONObject result new JSONObject();//2.获取库名表名String topic sourceRecord.topic();String[] fields topic.split(\\.);String database fields[1];String tableName fields[2];Struct value (Struct) sourceRecord.value();//3.获取before数据Struct before value.getStruct(before);JSONObject beforeJson new JSONObject();if (before ! null) {Schema beforeSchema before.schema();ListField beforeFields beforeSchema.fields();for (Field field : beforeFields) {Object beforeValue before.get(field);beforeJson.put(field.name(), beforeValue);}}//4.获取after数据Struct after value.getStruct(after);JSONObject afterJson new JSONObject();if (after ! null) {Schema afterSchema after.schema();ListField afterFields afterSchema.fields();for (Field field : afterFields) {Object afterValue after.get(field);afterJson.put(field.name(), afterValue);}}//5.获取操作类型 CREATE UPDATE DELETEEnvelope.Operation operation Envelope.operationFor(sourceRecord);String type operation.toString().toLowerCase();if (create.equals(type)) {type insert;}//6.将字段写入JSON对象result.put(database, database);result.put(tableName, tableName);result.put(before, beforeJson);result.put(after, afterJson);result.put(type, type);//7.输出数据collector.collect(result.toJSONString());}Overridepublic TypeInformationString getProducedType() {return BasicTypeInfo.STRING_TYPE_INFO;} } 4.2.2 自定义Sink输出 Sink即为Flink CDC的输出连接器即监听到源表数据变化并经过处理后最终写到哪里以mysql为例我们在监听到tb_role表数据变化后同步到tb_role_copy中去 package com.congge.flink.blog;import com.alibaba.fastjson.JSONObject; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;import java.sql.Connection; import java.sql.DriverManager; import java.sql.PreparedStatement;public class MyJdbcSink extends RichSinkFunctionString {// 提前声明连接和预编译语句private Connection connection null;private PreparedStatement insertStmt null;private PreparedStatement updateStmt null;private PreparedStatement preparedStatement null;Overridepublic void open(Configuration parameters) throws Exception {if (connection null) {Class.forName(com.mysql.cj.jdbc.Driver);//加载数据库驱动connection DriverManager.getConnection(jdbc:mysql://IP:3306/db, root, root);connection.setAutoCommit(false);//关闭自动提交} // connection DriverManager.getConnection(jdbc:mysql://IP:3306/db, root, root);}// 每来一条数据调用连接执行sqlOverridepublic void invoke(String value, Context context) throws Exception {JSONObject jsonObject JSONObject.parseObject(value);String type jsonObject.getString(type);String tableName tb_role_copy;String database jsonObject.getString(database);if(type.equals(insert)){JSONObject after (JSONObject)jsonObject.get(after);Integer id after.getInteger(id);String roleCode after.getString(role_code);String roleName after.getString(role_name);String sql String.format(insert into %s.%s values (?,?,?), database, tableName);insertStmt connection.prepareStatement(sql);insertStmt.setInt(1, Integer.valueOf(id));insertStmt.setString(2, roleCode);insertStmt.setString(3, roleName);insertStmt.execute();connection.commit();} else if(type.equals(update)){JSONObject after jsonObject.getJSONObject(after);Integer id after.getInteger(id);String roleCode after.getString(role_code);String roleName after.getString(role_name);String sql String.format(update %s.%s set role_code ?, role_name ? where id ?, database, tableName);updateStmt connection.prepareStatement(sql);updateStmt.setString(1, roleCode);updateStmt.setString(2, roleName);updateStmt.setInt(3, id);updateStmt.execute();connection.commit();} else if(type.equals(delete)){JSONObject after jsonObject.getJSONObject(before);Integer id after.getInteger(id);String sql String.format(delete from %s.%s where id ?, database, tableName);preparedStatement connection.prepareStatement(sql);preparedStatement.setInt(1, id);preparedStatement.execute();connection.commit();}}Overridepublic void close() throws Exception {if(insertStmt ! null){insertStmt.close();}if(updateStmt ! null){updateStmt.close();}if(preparedStatement ! null){preparedStatement.close();}if(connection ! null){connection.close();}} } 4.2.3 启动任务类 本例先以main程序运行在实际进行线上部署使用时可以打成jar包或整合springboot进行启动即可 package com.congge.flink.blog;import com.alibaba.ververica.cdc.connectors.mysql.MySQLSource; import com.alibaba.ververica.cdc.connectors.mysql.table.StartupOptions; import com.alibaba.ververica.cdc.debezium.DebeziumSourceFunction; import com.alibaba.ververica.cdc.debezium.StringDebeziumDeserializationSchema; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class FlinkCDC_CustomerSchema {public static void main(String[] args) throws Exception {//1、获取执行环境StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1); // env.enableCheckpointing(3000);//2、通过FlinkCDC构建SourceFunction并读取数据DebeziumSourceFunctionString sourceFunction MySQLSource.Stringbuilder().hostname(IP).port(3306).username(root).password(3306).databaseList(db).tableList(db.tb_role)//若不添加该数据则消费指定数据库中所有表的数据如果指定指定方式为db.table.deserializer(new CustomerSchema())//.deserializer(new StringDebeziumDeserializationSchema())//.startupOptions(StartupOptions.initial()).startupOptions(StartupOptions.latest()).build();DataStreamSourceString streamSource env.addSource(sourceFunction);//3、打印数据streamSource.print();streamSource.addSink(new MyJdbcSink());//4、启动任务env.execute(FlinkCDC_CustomerSchema);} } 4.2.4 效果测试 运行上面的代码通过控制台可以看到任务已经运行起来了监听并等待数据源数据变更 测试之前确保两张表数据是一致的 此时为tb_role表增加一条数据很快控制台可以监听并输出相关的日志 而后tb_role_copy表同步新增了一条数据 4.3 与springboot整合实现过程 4.3.1 补充依赖 Flink cdc的依赖保持不变需要单独引入logback的依赖 dependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-logging/artifactId /dependency 同时在工程的resources目录下添加logback-spring.xml文件 ?xml version1.0 encodingUTF-8? configuration!-- 定义日志的根级别和输出方式 --appender nameSTDOUT classch.qos.logback.core.ConsoleAppenderencoderpattern%d{yyyy-MM-dd HH:mm:ss} - %msg%n/pattern/encoder/appender!-- 日志文件保留策略 --property nameFILE_LOG_POLICY value10 MB /!-- 日志文件最大历史数 --property nameMAX_HISTORY value30 /logger nameorg.apache.flink.connector.mysql.cdc levelerror /!-- 应用程序的根日志级别 --root levelinfoappender-ref refSTDOUT //root /configuration 4.3.2 启动类改造 可以直接基于启动类改造也可以新增一个类实现ApplicationRunner接口重写里面的run方法 不难发现run方法里面的代码逻辑即是从上述main方法运行任务里面拷贝过来的 package com.congge;import com.alibaba.ververica.cdc.connectors.mysql.MySQLSource; import com.alibaba.ververica.cdc.connectors.mysql.table.StartupOptions; import com.alibaba.ververica.cdc.debezium.DebeziumSourceFunction; import com.congge.flink.blog.CustomerSchema; import com.congge.flink.blog.MyJdbcSink; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.mybatis.spring.annotation.MapperScan; import org.springframework.boot.ApplicationArguments; import org.springframework.boot.ApplicationRunner; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.web.servlet.ServletComponentScan;SpringBootApplication MapperScan(basePackages {com.congge.dao}) ServletComponentScan(com.congge.filter) public class SeekOrderApp implements ApplicationRunner {public static void main(String[] args) {SpringApplication.run(SeekOrderApp.class,args);}Overridepublic void run(ApplicationArguments args) throws Exception {//1、获取执行环境StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);//env.enableCheckpointing(3000);//2、通过FlinkCDC构建SourceFunction并读取数据DebeziumSourceFunctionString sourceFunction MySQLSource.Stringbuilder().hostname(192.168.1.196).port(13306).username(root).password(Ghtms123).databaseList(erp_cloud).tableList(erp_cloud.tb_role)//若不添加该数据则消费指定数据库中所有表的数据如果指定指定方式为db.table.deserializer(new CustomerSchema())//.startupOptions(StartupOptions.initial()).startupOptions(StartupOptions.latest()).build();DataStreamSourceString streamSource env.addSource(sourceFunction);//3、打印数据//streamSource.print();streamSource.addSink(new MyJdbcSink());//4、启动任务env.execute(FlinkCDC_CustomerSchema);} } 4.3.3 效果测试 启动工程之后相当于是通过flink cdc启动了一个用于监听数据变更的后台进程 然后我们再在数据库tb_role表增加一条数据控制台可以看到输出了相关的日志 此时再检查数据表可以发现tb_role_copy表新增了一条一样的数据 五、写在文末 本文通过较大的篇幅详细介绍了Flink CDC相关的技术最后通过一个实际案例演示了使用Flink CDC同步mysql表数据的示例希望对看到的同学有用本篇到此结束感谢观看。
http://www.tj-hxxt.cn/news/224987.html

相关文章:

  • 黑龙江省住房和建设厅网站首页洛阳理工学院教务管理系统
  • 个人无网站怎样做cps广告电子报刊的传播媒体是什么
  • 什么网站好建设wordpress支付宝当面付插件
  • 昆明网站建设哪家好iis 配置 wordpress
  • 怎么才能让百度收录网站wordpress文章推送公众号
  • 花网站开发背景物业管理系统和物业管理软件
  • 沧州哪家做网站好wordpress 支持rar
  • 企业管理咨询公司注册条件河北百度seo点击软件
  • 山东专业企业网站建设做企业网站报价
  • 网站外链接自己可以怎么做wordpress建站不好用
  • 郑州网站维护电商网站开发ppt
  • 折扣网站怎么做wordpress 显示视频
  • 专业集团门户网站建设wordpress数组转字符串
  • 山东专业网站建设苏州网站
  • 网站获得流量最好的方法是什么 ( )在国外做网站
  • 商贸公司的网站建设凡客家具
  • 南阳网站制作写作网站最大
  • 博物馆门户网站建设目标aws ec2 wordpress
  • 东莞建外贸网站怎么查网站的注册信息
  • 佛山新网站建设案例网站设计制作多少钱
  • 网站ip如何做跳转室内装修设计图片欣赏
  • 好用的建站系统wordpress置顶插件
  • 做网站交易平台做网站基本步骤
  • 专业集团门户网站建设企业应用商店app下载安装
  • 公司网站自己可做吗wordpress 用户导入
  • 哪个网站的理财频道做的比较好厦门市翔安区建设局网站
  • 南宁电商网站建设有哪些是做二手的网站
  • 云空间提供网站成都开发小程序的公司
  • 蓝色网站模版廊坊网站建设廊坊网络公司驻梦
  • 罗湖建设网站全网营销推广服务