票务网站开发端口,房地产销售工作总结,众筹网站搭建,网站开发 票种TDengine Kafka Connector 包含 TDengine Source Connector 和 TDengine Sink Connector 两个插件。用户只需提供简单的配置文件#xff0c;就可以将 Kafka 中指定 topic 的数据#xff08;批量或实时#xff09;同步到 TDengine#xff0c;或将 TDengine 中指定数据库的数…
TDengine Kafka Connector 包含 TDengine Source Connector 和 TDengine Sink Connector 两个插件。用户只需提供简单的配置文件就可以将 Kafka 中指定 topic 的数据批量或实时同步到 TDengine或将 TDengine 中指定数据库的数据批量或实时同步到 Kafka。
什么是 Kafka Connect
Kafka Connect 是 Apache Kafka 的一个组件用于使其它系统比如数据库、云服务、文件系统等能方便地连接到 Kafka。数据既可以通过 Kafka Connect 从其它系统流向 Kafka, 也可以通过 Kafka Connect 从 Kafka 流向其它系统。从其它系统读数据的插件称为 Source Connector, 写数据到其它系统的插件称为 Sink Connector。Source Connector 和 Sink Connector 都不会直接连接 Kafka BrokerSource Connector 把数据转交给 Kafka Connect。Sink Connector 从 Kafka Connect 接收数据。 TDengine Source Connector 用于把数据实时地从 TDengine 读出来发送给 Kafka Connect。TDengine Sink Connector 用于 从 Kafka Connect 接收数据并写入 TDengine。 前置条件
运行本教程中示例的前提条件。
Linux 操作系统已安装 Java 8 和 Maven已安装 Git、curl、vi已安装并启动 TDengine。如果还没有可参考 安装和卸载
安装 Kafka 在任意目录下执行 curl -O https://dlcdn.apache.org/kafka/4.0.0/kafka_2.13-4.0.0.tgz
tar xzf kafka_2.13-3.4.0.tgz -C /opt/
ln -s /opt/kafka_2.13-3.4.0 /opt/kafka然后需要把 $KAFKA_HOME/bin 目录加入 PATH。 export KAFKA_HOME/opt/kafka
export PATH$PATH:$KAFKA_HOME/bin以上脚本可以追加到当前用户的 profile 文件~/.profile 或 ~/.bash_profile
安装 TDengine Connector 插件
编译插件
git clone --branch 3.0 https://github.com/taosdata/kafka-connect-tdengine.git
cd kafka-connect-tdengine
mvn clean package -Dmaven.test.skiptrue
unzip -d $KAFKA_HOME/components/ target/components/packages/taosdata-kafka-connect-tdengine-*.zip以上脚本先 clone 项目源码然后用 Maven 编译打包。打包完成后在 target/components/packages/ 目录生成了插件的 zip 包。把这个 zip 包解压到安装插件的路径即可。上面的示例中使用了内置的插件安装路径 $KAFKA_HOME/components/。
配置插件
将 kafka-connect-tdengine 插件加入 $KAFKA_HOME/config/connect-distributed.properties 配置文件 plugin.path 中
plugin.path/usr/share/java,/opt/kafka/components启动 Kafka
zookeeper-server-start.sh -daemon $KAFKA_HOME/config/zookeeper.propertieskafka-server-start.sh -daemon $KAFKA_HOME/config/server.propertiesconnect-distributed.sh -daemon $KAFKA_HOME/config/connect-distributed.properties验证 kafka Connect 是否启动成功
输入命令
curl http://localhost:8083/connectors如果各组件都启动成功会得到如下输出
[]TDengine Sink Connector 的使用
TDengine Sink Connector 的作用是同步指定 topic 的数据到 TDengine。用户无需提前创建数据库和超级表。可手动指定目标数据库的名字见配置参数 connection.database也可按一定规则生成 (见配置参数 connection.database.prefix)。
TDengine Sink Connector 内部使用 TDengine 无模式写入接口 写数据到 TDengine目前支持三种格式的数据InfluxDB 行协议格式OpenTSDB Telnet 协议格式和 OpenTSDB JSON 协议格式。
下面的示例将主题 meters 的数据同步到目标数据库 power。数据格式为 InfluxDB Line 协议格式。
添加 Sink Connector 配置文件
mkdir ~/test
cd ~/test
vi sink-demo.jsonsink-demo.json 内容如下
{name: TDengineSinkConnector,config: {connector.class:com.taosdata.kafka.connect.sink.TDengineSinkConnector,tasks.max: 1,topics: meters,connection.url: jdbc:TAOS://127.0.0.1:6030,connection.user: root,connection.password: taosdata,connection.database: power,db.schemaless: line,data.precision: ns,key.converter: org.apache.kafka.connect.storage.StringConverter,value.converter: org.apache.kafka.connect.storage.StringConverter,errors.tolerance: all,errors.deadletterqueue.topic.name: dead_letter_topic,errors.deadletterqueue.topic.replication.factor: 1}
}关键配置说明
topics: meters 和 connection.database: power, 表示订阅主题 meters 的数据并写入数据库 power。db.schemaless: line, 表示使用 InfluxDB Line 协议格式的数据。
创建 Sink Connector 实例
curl -X POST -d sink-demo.json http://localhost:8083/connectors -H Content-Type: application/json若以上命令执行成功则有如下输出
{name: TDengineSinkConnector,config: {connection.database: power,connection.password: taosdata,connection.url: jdbc:TAOS://127.0.0.1:6030,connection.user: root,connector.class: com.taosdata.kafka.connect.sink.TDengineSinkConnector,data.precision: ns,db.schemaless: line,key.converter: org.apache.kafka.connect.storage.StringConverter,tasks.max: 1,topics: meters,value.converter: org.apache.kafka.connect.storage.StringConverter,name: TDengineSinkConnector,errors.tolerance: all,errors.deadletterqueue.topic.name: dead_letter_topic,errors.deadletterqueue.topic.replication.factor: 1, },tasks: [],type: sink
}写入测试数据
准备测试数据的文本文件内容如下
meters,locationCalifornia.LosAngeles,groupid2 current11.8,voltage221,phase0.28 1648432611249000000
meters,locationCalifornia.LosAngeles,groupid2 current13.4,voltage223,phase0.29 1648432611250000000
meters,locationCalifornia.LosAngeles,groupid3 current10.8,voltage223,phase0.29 1648432611249000000
meters,locationCalifornia.LosAngeles,groupid3 current11.3,voltage221,phase0.35 1648432611250000000使用 kafka-console-producer 向主题 meters 添加测试数据。
cat test-data.txt | kafka-console-producer.sh --broker-list localhost:9092 --topic meters:::note 如果目标数据库 power 不存在那么 TDengine Sink Connector 会自动创建数据库。自动创建数据库使用的时间精度为纳秒这就要求写入数据的时间戳精度也是纳秒。如果写入数据的时间戳精度不是纳秒将会抛异常。 :::
验证同步是否成功
使用 TDengine CLI 验证同步是否成功。
taos use power;
Database changed.taos select * from meters;_ts | current | voltage | phase | groupid | location |
2022-03-28 09:56:51.249000000 | 11.800000000 | 221.000000000 | 0.280000000 | 2 | California.LosAngeles |2022-03-28 09:56:51.250000000 | 13.400000000 | 223.000000000 | 0.290000000 | 2 | California.LosAngeles |2022-03-28 09:56:51.249000000 | 10.800000000 | 223.000000000 | 0.290000000 | 3 | California.LosAngeles |2022-03-28 09:56:51.250000000 | 11.300000000 | 221.000000000 | 0.350000000 | 3 | California.LosAngeles |
Query OK, 4 row(s) in set (0.004208s)若看到了以上数据则说明同步成功。若没有请检查 Kafka Connect 的日志。配置参数的详细说明见 “配置参考” 一节。
TDengine Source Connector 的使用
TDengine Source Connector 的作用是将 TDengine 某个数据库某一时刻之后的数据全部推送到 Kafka。TDengine Source Connector 的实现原理是先分批拉取历史数据再用定时查询的策略同步增量数据。同时会监控表的变化可以自动同步新增的表。如果重启 Kafka Connect, 会从上次中断的位置继续同步。
TDengine Source Connector 会将 TDengine 数据表中的数据转换成 InfluxDB Line 协议格式 或 OpenTSDB JSON 协议格式然后写入 Kafka。
下面的示例程序同步数据库 test 中的数据到主题 tdengine-test-meters。
添加 Source Connector 配置文件
vi source-demo.json输入以下内容
{name:TDengineSourceConnector,config:{connector.class: com.taosdata.kafka.connect.source.TDengineSourceConnector,tasks.max: 1,subscription.group.id: source-demo,connection.url: jdbc:TAOS://127.0.0.1:6030,connection.user: root,connection.password: taosdata,connection.database: test,connection.attempts: 3,connection.backoff.ms: 5000,topic.prefix: tdengine,topic.delimiter: -,poll.interval.ms: 1000,fetch.max.rows: 100,topic.per.stable: true,topic.ignore.db: false,out.format: line,data.precision: ms,key.converter: org.apache.kafka.connect.storage.StringConverter,value.converter: org.apache.kafka.connect.storage.StringConverter}
}准备测试数据
准备生成测试数据的 SQL 文件。
DROP DATABASE IF EXISTS test;
CREATE DATABASE test;
USE test;
CREATE STABLE meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (location BINARY(64), groupId INT);INSERT INTO d1001 USING meters TAGS(California.SanFrancisco, 2) VALUES(2018-10-03 14:38:05.000,10.30000,219,0.31000) \d1001 USING meters TAGS(California.SanFrancisco, 2) VALUES(2018-10-03 14:38:15.000,12.60000,218,0.33000) \d1001 USING meters TAGS(California.SanFrancisco, 2) VALUES(2018-10-03 14:38:16.800,12.30000,221,0.31000) \d1002 USING meters TAGS(California.SanFrancisco, 3) VALUES(2018-10-03 14:38:16.650,10.30000,218,0.25000) \d1003 USING meters TAGS(California.LosAngeles, 2) VALUES(2018-10-03 14:38:05.500,11.80000,221,0.28000) \d1003 USING meters TAGS(California.LosAngeles, 2) VALUES(2018-10-03 14:38:16.600,13.40000,223,0.29000) \d1004 USING meters TAGS(California.LosAngeles, 3) VALUES(2018-10-03 14:38:05.000,10.80000,223,0.29000) \d1004 USING meters TAGS(California.LosAngeles, 3) VALUES(2018-10-03 14:38:06.500,11.50000,221,0.35000);使用 TDengine CLI, 执行 SQL 文件。
taos -f prepare-source-data.sql创建 Source Connector 实例
curl -X POST -d source-demo.json http://localhost:8083/connectors -H Content-Type: application/json查看 topic 数据
使用 kafka-console-consumer 命令行工具监控主题 tdengine-test-meters 中的数据。一开始会输出所有历史数据往 TDengine 插入两条新的数据之后kafka-console-consumer 也立即输出了新增的两条数据。输出数据 InfluxDB line protocol 的格式。
kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic tdengine-test-meters输出
......
meters,locationCalifornia.SanFrancisco,groupid2i32 current10.3f32,voltage219i32,phase0.31f32 1538548685000000000
meters,locationCalifornia.SanFrancisco,groupid2i32 current12.6f32,voltage218i32,phase0.33f32 1538548695000000000
......此时会显示所有历史数据。切换到 TDengine CLI插入两条新的数据
USE test;
INSERT INTO d1001 VALUES (now, 13.3, 229, 0.38);
INSERT INTO d1002 VALUES (now, 16.3, 233, 0.22);再切换回 kafka-console-consumer此时命令行窗口已经打印出刚插入的 2 条数据。
unload 插件
测试完毕之后用 unload 命令停止已加载的 connector。
查看当前活跃的 connector
curl http://localhost:8083/connectors如果按照前述操作此时应有两个活跃的 connector。使用下面的命令 unload
curl -X DELETE http://localhost:8083/connectors/TDengineSinkConnector
curl -X DELETE http://localhost:8083/connectors/TDengineSourceConnector性能调优
如果在从 TDengine 同步数据到 Kafka 的过程中发现性能不达预期可以尝试使用如下参数提升 Kafka 的写入吞吐量。
打开 KAFKA_HOME/config/producer.properties 配置文件。参数说明及配置建议如下 参数参数说明设置建议producer.type此参数用于设置消息的发送方式默认值为 sync 表示同步发送async 表示异步发送。采用异步发送能够提升消息发送的吞吐量。asyncrequest.required.acks参数用于配置生产者发送消息后需要等待的确认数量。当设置为 1 时表示只要领导者副本成功写入消息就会给生产者发送确认而无需等待集群中的其他副本写入成功。这种设置可以在一定程度上保证消息的可靠性同时也能保证一定的吞吐量。因为不需要等待所有副本都写入成功所以可以减少生产者的等待时间提高发送消息的效率。1max.request.size该参数决定了生产者在一次请求中可以发送的最大数据量。其默认值为 1048576也就是 1M。如果设置得太小可能会导致频繁的网络请求降低吞吐量。如果设置得太大可能会导致内存占用过高或者在网络状况不佳时增加请求失败的概率。建议设置为 100M。104857600batch.size此参数用于设定 batch 的大小默认值为 16384即 16KB。在消息发送过程中发送到 Kafka 缓冲区中的消息会被划分成一个个的 batch。故而减小 batch 大小有助于降低消息延迟而增大 batch 大小则有利于提升吞吐量可根据实际的数据量大小进行合理配置。可根据实际情况进行调整建议设置为 512K。524288buffer.memory此参数用于设置生产者缓冲待发送消息的内存总量。较大的缓冲区可以允许生产者积累更多的消息后批量发送提高吞吐量但也会增加延迟和内存使用。可根据机器资源来配置建议配置为 1G。1073741824
配置参考
通用配置
以下配置项对 TDengine Sink Connector 和 TDengine Source Connector 均适用。
nameconnector 名称。connector.classconnector 的完整类名例如如 com.taosdata.kafka.connect.sink.TDengineSinkConnector。tasks.max最大任务数默认 1。topics需要同步的 topic 列表多个用逗号分隔如 topic1,topic2。connection.urlTDengine JDBC 连接字符串如 jdbc:TAOS://127.0.0.1:6030。connection.userTDengine 用户名默认 root。connection.passwordTDengine 用户密码默认 taosdata。connection.attempts最大尝试连接次数。默认 3。connection.backoff.ms创建连接失败重试时间隔时间单位为 ms。默认 5000。data.precision使用 InfluxDB 行协议格式时时间戳的精度。可选值为 ms表示毫秒us表示微秒ns表示纳秒
TDengine Sink Connector 特有的配置
connection.database目标数据库名。如果指定的数据库不存在会则自动创建。自动建库使用的时间精度为纳秒。默认值为 null。为 null 时目标数据库命名规则参考 connection.database.prefix 参数的说明connection.database.prefix当 connection.database 为 null 时目标数据库的前缀。可以包含占位符 KaTeX parse error: Expected EOF, got } at position 8: \{topic}̲。比如 kafka_{topic}, 对于主题 ‘orders’ 将写入数据库 ‘kafka_orders’。默认 null。当为 null 时目标数据库的名字和主题的名字是一致的。batch.size分批写入每批记录数。当 Sink Connector 一次接收到的数据大于这个值时将分批写入。max.retries发生错误时的最大重试次数。默认为 1。retry.backoff.ms发送错误时重试的时间间隔。单位毫秒默认为 3000。db.schemaless数据格式可选值为 line代表 InfluxDB 行协议格式json代表 OpenTSDB JSON 格式telnet代表 OpenTSDB Telnet 行协议格式
TDengine Source Connector 特有的配置
connection.database源数据库名称无缺省值。topic.prefix数据导入 kafka 时使用的 topic 名称的前缀。默认为空字符串 “”。timestamp.initial数据同步起始时间。格式为’yyyy-MM-dd HH:mm:ss’若未指定则从指定 DB 中最早的一条记录开始。poll.interval.ms检查是否有新建或删除的表的时间间隔单位为 ms。默认为 1000。fetch.max.rows检索数据库时最大检索条数。默认为 100。query.interval.ms从 TDengine 一次读取数据的时间跨度需要根据表中的数据特征合理配置避免一次查询的数据量过大或过小在具体的环境中建议通过测试设置一个较优值默认值为 0即获取到当前最新时间的所有数据。out.format结果集输出格式。line 表示输出格式为 InfluxDB Line 协议格式json 表示输出格式是 json。默认为 line。topic.per.stable如果设置为 true表示一个超级表对应一个 Kafka topictopic 的命名规则 topic.prefixtopic.delimiterconnection.databasetopic.delimiterstable.name如果设置为 false则指定的 DB 中的所有数据进入一个 Kafka topictopic 的命名规则为 topic.prefixtopic.delimiterconnection.databasetopic.ignore.dbtopic 命名规则是否包含 database 名称true 表示规则为 topic.prefixtopic.delimiterstable.namefalse 表示规则为 topic.prefixtopic.delimiterconnection.databasetopic.delimiterstable.name默认 false。此配置项在 topic.per.stable 设置为 false 时不生效。topic.delimitertopic 名称分割符默认为 -。read.method从 TDengine 读取数据方式query 或是 subscription。默认为 subscription。subscription.group.id指定 TDengine 数据订阅的组 id当 read.method 为 subscription 时此项为必填项。subscription.from指定 TDengine 数据订阅起始位置latest 或是 earliest。默认为 latest。
其他说明
关于如何在独立安装的 Kafka 环境使用 Kafka Connect 插件请参考官方文档https://kafka.apache.org/documentation/#connect。
问题反馈
无论遇到任何问题都欢迎在本项目的 Github 仓库反馈https://github.com/taosdata/kafka-connect-tdengine/issues。
参考
https://kafka.apache.org/documentation/