中国行业网站大全,私密浏览器免费,网站开发的现状分析,wordpress嵌入优酷视频Flink-Doris-Connector 1.4.0 允许用户一步将包含数千个表的整个数据库#xff08;MySQL或Oracle #xff09;摄取到Apache Doris#xff08;一种实时分析数据库#xff09;中。
通过内置的Flink CDC#xff0c;连接器可以直接将上游源的表模式和数据同步到Apache DorisMySQL或Oracle 摄取到Apache Doris一种实时分析数据库中。
通过内置的Flink CDC连接器可以直接将上游源的表模式和数据同步到Apache Doris这意味着用户不再需要编写DataStream程序或在Doris中预先创建映射表。
当 Flink 作业启动时Connector 会自动检查源数据库和 Apache Doris 之间的数据等效性。如果数据源包含 Doris 中不存在的表Connector 会自动在 Doris 中创建相同的表并利用 Flink 的侧输出来方便一次摄取多个表如果源中发生架构更改它将自动获取 DDL 语句并在 Doris 中进行相同的架构更改。
一、快速开始 对于MySQL
下载 JAR 文件https://github.com/apache/doris-flink-connector/releases/tag/1.4.0 行家
dependencygroupIdorg.apache.doris/groupIdartifactIdflink-doris-connector-1.15/artifactId!--artifactIdflink-doris-connector-1.16/artifactId--!--artifactIdflink-doris-connector-1.17/artifactId--version1.4.0/version
/dependency 对于Oracle
下载 JAR 文件Flink 1.15http://justtmp-bj-1308700295.cos.ap-beijing.myqcloud.com/oracle/flink-doris-connector-1.15-1.5.0-SNAPSHOT.jarFlink 1.16http://justtmp-bj-1308700295.cos.ap-beijing.myqcloud.com/oracle/flink-doris-connector-1.16-1.5.0-SNAPSHOT.jarFlink 1.17http://justtmp-bj-1308700295.cos.ap-beijing.myqcloud.com/oracle/flink-doris-connector-1.17-1.5.0-SNAPSHOT.jar
如何使用它
例如要将整个 MySQL 数据库引入mysql_dbDorisMySQL 表名以tbl或test开头只需执行以下命令无需提前在Doris 中创建表
FLINK_HOME/bin/flink run \-Dexecution.checkpointing.interval10s \-Dparallelism.default1 \-c org.apache.doris.flink.tools.cdc.CdcTools \lib/flink-doris-connector-1.16-1.4.0.jar \mysql-sync-database \--database test_db \--mysql-conf hostname127.0.0.1 \--mysql-conf usernameroot \--mysql-conf password123456 \--mysql-conf database-namemysql_db \--including-tables tbl|test.* \--sink-conf fenodes127.0.0.1:8030 \--sink-conf usernameroot \--sink-conf password123456 \--sink-conf jdbc-urljdbc:mysql://127.0.0.1:9030 \--sink-conf sink.label-prefixlabel1 \--table-conf replication_num1
摄取Oracle数据库请参考示例代码https://github.com/apache/doris-flink-connector/pull/156。
表现如何
当涉及到同步整个数据库包含数百甚至数千个活动或不活动的表时大多数用户希望在几秒钟内完成。因此我们测试了连接器看看它是否符合要求 1000 个 MySQL 表每个表有 100 个字段。所有表都是活动的这意味着它们不断更新每次数据写入涉及一百多行 Flink作业检查点10s
经过压力测试系统表现出较高的稳定性主要指标如下 根据早期采用者的反馈该Connector在生产环境中的万表数据库同步中也提供了高性能和系统稳定性。这证明Apache Doris和Flink CDC的结合能够高效可靠地进行大规模数据同步。 二、它如何使数据工程师受益
工程师不再需要担心表创建或表模式维护从而节省了数天繁琐且容易出错的工作。之前在Flink CDC中需要为每个表创建一个Flink作业并在源端建立日志解析链路但现在通过全库摄取源数据库的资源消耗大大减少。也是增量更新和全量更新的统一解决方案。
其他特性
1、连接维度表和事实表
常见的做法是将维度表放在Doris中通过Flink的实时流进行Join查询。Flink-Doris-Connector 1.4.0基于Flink 的 Async I/O实现了异步 Lookup Join因此 Flink 实时流不会因为查询而阻塞。此外连接器还允许您将多个查询合并为一个大查询并将其立即发送给 Doris 进行处理。这提高了此类连接查询的效率和吞吐量。
2、节俭 SDK
我们在 Connector 中引入了 Thrift-Service SDK用户不再需要使用 Thrift 插件或在编译时配置 Thrift 环境。这使得编译过程变得更加简单。
3、按需流加载
数据同步过程中当没有新的数据摄入时不会发出Stream Load请求。这样可以避免不必要的集群资源消耗。
4、后端节点轮询
对于数据摄取Doris 调用前端节点获取后端节点列表并随机选择一个发起摄取请求。该后端节点将是协调器。Flink-Doris-Connector 1.4.0 允许用户启用轮询机制即在每个Flink 检查点都有不同的后端节点作为 Coordinator以避免单个后端节点长期承受过大的压力。
5、支持更多数据类型
除了常见的数据类型外Flink-Doris-Connector 1.4.0 还支持 Doris 中的 DecimalV3/DateV2/DateTimev2/Array/JSON。
三、用法示例
可以通过DataStream或FlinkSQL有界流从Doris读取数据。支持谓词下推。
CREATE TABLE flink_doris_source (name STRING,age INT,score DECIMAL(5,2)) WITH (connector doris,fenodes 127.0.0.1:8030,table.identifier database.table,username root,password password,doris.filter.query age18
);
SELECT * FROM flink_doris_source; 连接维度表和事实表
CREATE TABLE fact_table (id BIGINT,name STRING,city STRING,process_time as proctime()
) WITH (connector kafka,
...
);
create table dim_city(city STRING,level INT ,province STRING,country STRING
) WITH (connector doris,fenodes 127.0.0.1:8030,jdbc-url jdbc:mysql://127.0.0.1:9030,lookup.jdbc.async true,table.identifier dim.dim_city,username root,password
);
SELECT a.id, a.name, a.city, c.province, c.country,c.level
FROM fact_table a
LEFT JOIN dim_city FOR SYSTEM_TIME AS OF a.process_time AS c
ON a.city c.city 写入Apache Doris
CREATE TABLE doris_sink (name STRING,age INT,score DECIMAL(5,2)) WITH (connector doris,fenodes 127.0.0.1:8030,table.identifier database.table,username root,password ,sink.label-prefix doris_label,//json write insink.properties.format json,sink.properties.read_json_by_line true
);