足球竞猜网站开发,怎么把自己的网站放到网上,网络服务提供者发现未成年秘密信息应采取,在哪里可以发布自己的广告随着数据驱动决策的重要性日益凸显#xff0c;实时数据处理成为企业竞争力的关键。SeaTunnel MongoDB CDC(Change Data Capture) 源连接器的推出#xff0c;为开发者提供了一个高效、灵活的工具#xff0c;以实现对 MongoDB 数据库变更的实时捕获和处理。 本文将深入探讨该连…随着数据驱动决策的重要性日益凸显实时数据处理成为企业竞争力的关键。SeaTunnel MongoDB CDC(Change Data Capture) 源连接器的推出为开发者提供了一个高效、灵活的工具以实现对 MongoDB 数据库变更的实时捕获和处理。 本文将深入探讨该连接器的主要特性、支持的数据源信息、配置选项以及如何创建数据同步作业助力开发者更好地利用 SeaTunnel 进行数据集成和实时数据分析。这些更新旨在为开发者提供更为丰富的数据处理能力帮助他们更有效地捕获和处理来自 MongoDB 的变更数据。
支持的引擎 SeaTunnel Zeta Flink 主要特性 批处理 流处理 精确一次 列投影 并行度 支持用户定义分片
功能描述
MongoDB CDC 源连接器允许从 MongoDB 数据库读取快照数据和增量数据。
支持的数据源信息
要使用 MongoDB CDC 连接器需要以下依赖。它们可以通过 install-plugin.sh 脚本或从 Maven 中央仓库下载。
数据源支持的版本依赖MongoDB通用下载
可用性设置
MongoDB版本MongoDB 版本 4.0。集群部署副本集或分片集群。存储引擎WiredTiger 存储引擎。权限changeStream 和 read
use admin;
db.createRole({role: strole,privileges: [{resource: { db: , collection: },actions: [splitVector,listDatabases,listCollections,collStats,find,changeStream ]}],roles: [{ role: read, db: config }]}
);db.createUser({user: stuser,pwd: stpw,roles: [{ role: strole, db: admin }]}
);
数据类型映射
以下表格列出了从 MongoDB BSON 类型到 SeaTunnel 数据类型的字段数据类型映射。
MongoDB BSON 类型SeaTunnel 数据类型ObjectIdSTRINGStringSTRINGBooleanBOOLEANBinaryBINARYInt32INTEGERInt64BIGINTDoubleDOUBLEDecimal128DECIMALDateDATETimestampTIMESTAMPObjectROWArrayARRAY
对于 MongoDB 中的特定类型我们使用扩展 JSON 格式将它们映射到 SeaTunnel STRING 类型。
MongoDB BSON 类型SeaTunnel STRING 表示Symbol{_value: {$symbol: 12}}RegularExpression{_value: {$regularExpression: {pattern: ^9$, options: i}}}JavaScript{_value: {$code: function() { return 10; }}}DbPointer{_value: {$dbPointer: {$ref: db.coll, $id: {$oid: 63932a00da01604af329e33c}}}}
提示
在 SeaTunnel 中使用 DECIMAL 类型时请注意最大范围不能超过 34 位数字这意味着你应该使用 decimal(34, 18)。
名称类型必须默认值描述hostsString是-MongoDB 服务器的主机名和端口对的逗号分隔列表。例如localhost:27017,localhost:27018usernameString否-连接 MongoDB 时使用的数据库用户名。passwordString否-连接 MongoDB 时使用的密码。databaseList是-要监视更改的数据库名称。如果未设置则会捕获所有数据库。数据库还支持正则表达式以监视与正则表达式匹配的多个数据库。例如db1,db2。collectionList是-数据库中要监视更改的集合名称。如果未设置则会捕获所有集合。集合也支持正则表达式以监视与完全限定的集合标识符匹配的多个集合。例如db1.coll1,db2.coll2。connection.optionsString否-MongoDB 的连接选项的和号分隔列表。例如replicaSettestconnectTimeoutMS300000。batch.sizeLong否1024游标批大小。poll.max.batch.sizeEnum否1024轮询新数据时包含在单个批次中的更改流文档的最大数量。poll.await.time.msLong否1000等待检查更改流上的新结果之前的时间量。heartbeat.interval.msString否0发送心跳消息之间的时间长度以毫秒为单位。使用 0 禁用。incremental.snapshot.chunk.size.mbLong否64增量快照的块大小MB。common-options否-源插件通用参数请参考源通用选项获取详情。
提示
如果集合变更速度较慢强烈建议为 heartbeat.interval.ms 参数设置大于 0 的适当值。当我们从检查点或保存点恢复 SeaTunnel 作业时心跳事件可以将 resumeToken 推进以避免其过期。MongoDB 对单个文档有 16MB 的限制。更改文档包括附加信息因此即使原始文档不大于 15MB更改文档也可能超过 16MB 限制导致更改流操作终止。建议使用不可变的分片键。在 MongoDB 中分片键在启用事务后允许修改但更改分片键可能导致频繁的分片迁移造成额外的性能开销。此外修改分片键还可能导致更新查找功能变得无效在 CDC更改数据捕获场景中导致不一致的结果。
如何创建 MongoDB CDC 数据同步作业
将 CDC 数据打印到客户端
以下示例演示如何创建一个从 MongoDB 读取 CDC 数据并在本地客户端打印的数据同步作业
env {# 您可以在此处设置引擎配置parallelism 1job.mode STREAMINGcheckpoint.interval 5000
}source {MongoDB-CDC {hosts mongo0:27017database [inventory]collection [inventory.products]username stuserpassword stpwschema {fields {_id : string,name : string,description : string,weight : string}}}
}# 在本地客户端打印读取的 MongoDB 数据
sink {Console {parallelism 1}
}
将 CDC 数据写入 MysqlDB
以下示例演示如何创建一个从 MongoDB 读取 CDC 数据并写入 mysql 数据库的数据同步作业
env {# You can set engine configuration hereparallelism 1job.mode STREAMINGcheckpoint.interval 5000
}source {MongoDB-CDC {hosts mongo0:27017database [inventory]collection [inventory.products]username stuserpassword stpw}
}sink {jdbc {url jdbc:mysql://mysql_cdc_e2e:3306driver com.mysql.cj.jdbc.Driveruser st_userpassword seatunnelgenerate_sink_sql true# You need to configure both database and tabledatabase mongodb_cdctable productsprimary_keys [_id]}
}
多表同步
以下示例演示如何创建一个读取 mongodb 多库表 CDC 数据并在本地客户端打印的数据同步作业
env {# You can set engine configuration hereparallelism 1job.mode STREAMINGcheckpoint.interval 5000
}source {MongoDB-CDC {hosts mongo0:27017database [inventory,crm]collection [inventory.products,crm.test]username stuserpassword stpw}
}# Console printing of the read Mongodb data
sink {Console {parallelism 1}
}
提示 多库表 CDC 同步不能指定 schema只能下游输出 json 数据。这是因为 MongoDB 不提供查询元数据信息所以如果想支持多表所有表只能作为一个结构读取。
使用正则表达式匹配多表
以下示例演示如何创建一个通过正则表达式读取 mongodb 多库表数据并在本地客户端打印的数据同步作业
匹配示例表达式描述前缀匹配^(test).*匹配数据库名或表名以 test 为前缀的如 test1, test2 等。后缀匹配.*[p$]匹配数据库名或表名以 p 为后缀的如 cdcp, edcp 等。env {# You can set engine configuration hereparallelism 1job.mode STREAMINGcheckpoint.interval 5000}
source { MongoDB-CDC { hosts mongo0:27017 # So this example is used (^(test).|^(tpc).|txc|.[p$]|t{2}).(t[5-8]|tt),matching txc.tt、test2.test5. database [(^(test).|^(tpc).|txc|.[p$]|t{2})] collection [(t[5-8]|tt)] username stuser password stpw } }
Console printing of the read Mongodb data
sink { Console { parallelism 1 } } ### 实时流数据格式{ _id : { }, // Identifier of the open change stream, can be assigned to the resumeAfter parameter for subsequent resumption of this change stream operationType : , // The type of change operation that occurred, such as: insert, delete, update, etc. fullDocument : { }, // The full document data involved in the change operation. This field does not exist in delete operations ns : { db : , // The database where the change operation occurred coll : // The collection where the change operation occurred }, to : { // These fields are displayed only when the operation type is rename db : , // The new database name after the change coll : // The new collection name after the change }, source:{ ts_ms: , // The timestamp when the change operation occurred table: // The collection where the change operation occurred db: , // The database where the change operation occurred snapshot:false // Identify the current stage of data synchronization }, documentKey : { _id : }, // The _id field value of the document involved in the change operation updateDescription : { // Description of the update operation updatedFields : { }, // The fields and values that the update operation modified removedFields : [ , ... ] // The fields and values that the update operation removed } clusterTime : , // The timestamp of the Oplog log entry corresponding to the change operation txnNumber : , // If the change operation is executed in a multi-document transaction, this field and value are displayed, representing the transaction number lsid : { // Represents information related to the Session in which the transaction is located id : , uid : } } 到此本指南就结束了MongoDB CDC Sink连接器的发布不仅强化了 Apache SeaTunnel 在数据集成领域的地位也为开发者提供了更多的可能性。
Apache SeaTunnel 社区也期待您的参与和贡献,共同迈向更广阔的数据处理未来让我们携手共建一个更加强大、开放、互助的社区! 本文由 白鲸开源科技 提供发布支持
文章转载自: http://www.morning.lfcnj.cn.gov.cn.lfcnj.cn http://www.morning.zfzgp.cn.gov.cn.zfzgp.cn http://www.morning.zlhzd.cn.gov.cn.zlhzd.cn http://www.morning.hgcz.cn.gov.cn.hgcz.cn http://www.morning.wnywk.cn.gov.cn.wnywk.cn http://www.morning.kttbx.cn.gov.cn.kttbx.cn http://www.morning.zjcmr.cn.gov.cn.zjcmr.cn http://www.morning.zbjfq.cn.gov.cn.zbjfq.cn http://www.morning.mrfnj.cn.gov.cn.mrfnj.cn http://www.morning.wxfgg.cn.gov.cn.wxfgg.cn http://www.morning.fdrwk.cn.gov.cn.fdrwk.cn http://www.morning.tphrx.cn.gov.cn.tphrx.cn http://www.morning.wkknm.cn.gov.cn.wkknm.cn http://www.morning.bwkzn.cn.gov.cn.bwkzn.cn http://www.morning.kcyxs.cn.gov.cn.kcyxs.cn http://www.morning.zlmbc.cn.gov.cn.zlmbc.cn http://www.morning.hmmtx.cn.gov.cn.hmmtx.cn http://www.morning.ywndg.cn.gov.cn.ywndg.cn http://www.morning.brwwr.cn.gov.cn.brwwr.cn http://www.morning.bwqcx.cn.gov.cn.bwqcx.cn http://www.morning.zcfsq.cn.gov.cn.zcfsq.cn http://www.morning.zsrdp.cn.gov.cn.zsrdp.cn http://www.morning.yszrk.cn.gov.cn.yszrk.cn http://www.morning.kflpf.cn.gov.cn.kflpf.cn http://www.morning.homayy.com.gov.cn.homayy.com http://www.morning.tbqbd.cn.gov.cn.tbqbd.cn http://www.morning.jltmb.cn.gov.cn.jltmb.cn http://www.morning.smszt.com.gov.cn.smszt.com http://www.morning.pwgzh.cn.gov.cn.pwgzh.cn http://www.morning.btjyp.cn.gov.cn.btjyp.cn http://www.morning.rjljb.cn.gov.cn.rjljb.cn http://www.morning.cfccp.cn.gov.cn.cfccp.cn http://www.morning.pmtky.cn.gov.cn.pmtky.cn http://www.morning.bbgn.cn.gov.cn.bbgn.cn http://www.morning.qynnw.cn.gov.cn.qynnw.cn http://www.morning.lptjt.cn.gov.cn.lptjt.cn http://www.morning.znqztgc.cn.gov.cn.znqztgc.cn http://www.morning.rdpps.cn.gov.cn.rdpps.cn http://www.morning.xqbbc.cn.gov.cn.xqbbc.cn http://www.morning.zglrl.cn.gov.cn.zglrl.cn http://www.morning.cwfkm.cn.gov.cn.cwfkm.cn http://www.morning.chfxz.cn.gov.cn.chfxz.cn http://www.morning.tqwcm.cn.gov.cn.tqwcm.cn http://www.morning.plzgt.cn.gov.cn.plzgt.cn http://www.morning.pqppj.cn.gov.cn.pqppj.cn http://www.morning.zkqjz.cn.gov.cn.zkqjz.cn http://www.morning.bmts.cn.gov.cn.bmts.cn http://www.morning.ryjl.cn.gov.cn.ryjl.cn http://www.morning.xckrj.cn.gov.cn.xckrj.cn http://www.morning.rpzth.cn.gov.cn.rpzth.cn http://www.morning.kgxrq.cn.gov.cn.kgxrq.cn http://www.morning.spqtq.cn.gov.cn.spqtq.cn http://www.morning.gmgnp.cn.gov.cn.gmgnp.cn http://www.morning.skpdg.cn.gov.cn.skpdg.cn http://www.morning.mqbzk.cn.gov.cn.mqbzk.cn http://www.morning.ltqtp.cn.gov.cn.ltqtp.cn http://www.morning.fgrkc.cn.gov.cn.fgrkc.cn http://www.morning.lthgy.cn.gov.cn.lthgy.cn http://www.morning.kpcky.cn.gov.cn.kpcky.cn http://www.morning.wrdlf.cn.gov.cn.wrdlf.cn http://www.morning.dywgl.cn.gov.cn.dywgl.cn http://www.morning.ghxsn.cn.gov.cn.ghxsn.cn http://www.morning.nrydm.cn.gov.cn.nrydm.cn http://www.morning.tsdjj.cn.gov.cn.tsdjj.cn http://www.morning.fxjnn.cn.gov.cn.fxjnn.cn http://www.morning.qzqfq.cn.gov.cn.qzqfq.cn http://www.morning.gkdhf.cn.gov.cn.gkdhf.cn http://www.morning.nwczt.cn.gov.cn.nwczt.cn http://www.morning.tgtwy.cn.gov.cn.tgtwy.cn http://www.morning.jhfkr.cn.gov.cn.jhfkr.cn http://www.morning.dnqlba.cn.gov.cn.dnqlba.cn http://www.morning.bqyb.cn.gov.cn.bqyb.cn http://www.morning.lhrcr.cn.gov.cn.lhrcr.cn http://www.morning.nlrxh.cn.gov.cn.nlrxh.cn http://www.morning.bccls.cn.gov.cn.bccls.cn http://www.morning.rqjfm.cn.gov.cn.rqjfm.cn http://www.morning.kjyhh.cn.gov.cn.kjyhh.cn http://www.morning.sgpnz.cn.gov.cn.sgpnz.cn http://www.morning.nmlpp.cn.gov.cn.nmlpp.cn http://www.morning.tlzbt.cn.gov.cn.tlzbt.cn