网站建站是 什么,手机怎么做程序,深圳的网站建设公司的外文名是,暴雪和网易终止合作使用FlinkSql进行实时工作流开发 引言Flink SQL实战常用的Connector1. MySQL-CDC 连接器配置2. Kafka 连接器配置3. JDBC 连接器配置4. RabbitMQ 连接器配置5. REST Lookup 连接器配置6. HDFS 连接器配置 FlinkSql数据类型1. 基本数据类型2. 字符串数据类型3. 日期和时间数据类… 使用FlinkSql进行实时工作流开发 引言Flink SQL实战常用的Connector1. MySQL-CDC 连接器配置2. Kafka 连接器配置3. JDBC 连接器配置4. RabbitMQ 连接器配置5. REST Lookup 连接器配置6. HDFS 连接器配置 FlinkSql数据类型1. 基本数据类型2. 字符串数据类型3. 日期和时间数据类型4. 复杂数据类型5. 特殊数据类型数据类型的使用示例 引言 在大数据时代实时数据分析和处理变得越来越重要。Apache Flink作为流处理领域的佼佼者提供了一套强大的工具集来处理无界和有界数据流。其中Flink SQL是其生态系统中一个重要的组成部分允许用户以SQL语句的形式执行复杂的数据流操作极大地简化了实时数据处理的开发流程。
什么是Apache Flink
Apache Flink是一个开源框架用于处理无边界无尽和有边界有限数据流。它提供了低延迟、高吞吐量和状态一致性使开发者能够构建复杂的实时应用和微服务。Flink的核心是流处理引擎它支持事件时间处理、窗口操作以及精确一次的状态一致性。
为什么选择Flink SQL 易用性Flink SQL使得非专业程序员也能快速上手使用熟悉的SQL语法进行实时数据查询和处理。 灵活性可以无缝地将SQL与Java/Scala API结合使用为用户提供多种编程模型的选择。 性能利用Flink的高性能流处理引擎Flink SQL能够实现实时响应和低延迟处理。 集成能力支持多种数据源和数据接收器如Kafka、JDBC、HDFS等易于集成到现有的数据生态系统中。 Flink SQL实战
常用的Connector
在配置FlinkSQL实时开发时使用mysql-cdc、Kafka、jdbc和rabbitmq作为连接器是一个很常见的场景。以下是详细的配置说明你可以基于这些信息来撰写你的博客
1. MySQL-CDC 连接器配置
MySQL-CDCChange Data Capture连接器用于捕获MySQL数据库中的变更数据。配置示例如下
CREATE TABLE mysql_table (-- 定义表结构id INT,name STRING,-- 其他列
) WITH (connector mysql-cdc, -- 使用mysql-cdc连接器hostname mysql-host, -- MySQL服务器主机名port 3306, -- MySQL端口号username user, -- MySQL用户名password password, -- MySQL密码database-name db, -- 数据库名table-name table -- 表名server-time-zone GMT8, -- 服务器时区debezium.snapshot.mode initial, -- 初始快照模式initial表示从头开始读取所有数据latest-offset表示从最近的偏移量开始读取timestamp则可以指定一个时间戳从该时间戳之后的数据开始读取。scan.incremental.snapshot.enabled true -- 可选设置为true时Flink会尝试维护一个数据库表的增量快照。这意味着Flink不会每次都重新读取整个表而是只读取自上次读取以来发生变化的数据。这样可以显著提高读取效率尤其是在处理大量数据且频繁更新的场景下。scan.incremental.snapshot.chunk.size 1024 -- 可选 增量快照块大小debezium.snapshot.locking.mode none, -- 可选控制在快照阶段锁定表的方式以防止数据冲突。none表示不锁定lock-tables表示锁定整个表transaction表示使用事务来锁定。debezium.properties.include-schema-changes true, -- 可选如果设置为true则在CDC事件中会包含模式变更信息。debezium.properties.table.whitelist mydatabase.mytable, -- 可选指定要监控的表的白名单。如果table-name未设置可以通过这个属性来指定。debezium.properties.database.history io.debezium.relational.history.FileDatabaseHistory -- 可选设置数据库历史记录的实现类通常使用FileDatabaseHistory来保存历史记录以便在重启后能恢复状态。
);2. Kafka 连接器配置
Kafka连接器用于读写Kafka主题中的数据。配置示例如下
CREATE TABLE kafka_table (-- 定义表结构id INT,name STRING,-- 其他列
) WITH (connector kafka, -- 使用kafka连接器topic topic_name, -- Kafka主题名properties.bootstrap.servers kafka-broker:9092, -- Kafka服务器地址format json -- 数据格式例如jsonproperties.group.id flink-consumer-group, -- 消费者组IDscan.startup.mode earliest-offset, -- 启动模式earliest-offset, latest-offset, specific-offset, timestampformat json, -- 数据格式json.fail-on-missing-field false, -- 是否在字段缺失时失败json.ignore-parse-errors true, -- 是否忽略解析错误properties.security.protocol SASL_SSL, -- 安全协议可选properties.sasl.mechanism PLAIN, -- SASL机制可选properties.sasl.jaas.config org.apache.kafka.common.security.plain.PlainLoginModule required usernameuser passwordpassword; -- SASL配置可选
);3. JDBC 连接器配置
JDBC连接器用于与其他关系型数据库进行交互。配置示例如下
CREATE TABLE jdbc_table (-- 定义表结构id INT,name STRING,-- 其他列
) WITH (connector jdbc, -- 使用jdbc连接器url jdbc:mysql://mysql-host:3306/db, -- JDBC连接URLtable-name table_name, -- 数据库表名username user, -- 数据库用户名password password -- 数据库密码driver com.mysql.cj.jdbc.Driver, -- JDBC驱动类lookup.cache.max-rows 5000, -- 可选查找缓存的最大行数lookup.cache.ttl 10min, -- 可选查找缓存的TTL时间到期lookup.max-retries 3, -- 可选查找的最大重试次数sink.buffer-flush.max-rows 1000, -- 可选缓冲区刷新最大行数sink.buffer-flush.interval 2s -- 可选缓冲区刷新间隔
);4. RabbitMQ 连接器配置
RabbitMQ连接器用于与RabbitMQ消息队列进行交互。配置示例如下
CREATE TABLE rabbitmq_table (-- 定义表结构id INT,name STRING,-- 其他列
) WITH (connector rabbitmq, -- 使用rabbitmq连接器host rabbitmq-host, -- RabbitMQ主机名port 5672, -- RabbitMQ端口号username user, -- RabbitMQ用户名password password, -- RabbitMQ密码queue queue_name, -- RabbitMQ队列名exchange exchange_name -- RabbitMQ交换机名routing-key routing_key, -- 路由键delivery-mode 2, -- 投递模式2表示持久format json, -- 数据格式json.fail-on-missing-field false, -- 是否在字段缺失时失败json.ignore-parse-errors true -- 是否忽略解析错误
);5. REST Lookup 连接器配置
REST Lookup 连接器允许在 SQL 查询过程中通过 REST API 进行查找操作。
CREATE TABLE rest_table (id INT,name STRING,price DECIMAL(10, 2),PRIMARY KEY (id) NOT ENFORCED
) WITH (connector rest-lookup,url http://api.example.com/user/{id}, -- REST API URL使用占位符 {product_id}lookup-method POST -- GET 或 POSTformat json, -- 数据格式asyncPolling false -- 可选指定查找操作是否使用异步轮询模式。默认值为 false。当设置为 true 时查找操作会以异步方式执行有助于提高性能。gid.connector.http.source.lookup.header.Content-Type application/json -- 可选设置 Content-Type 请求头。用于指定请求体的媒体类型。例如设置为 application/json 表示请求体是 JSON 格式。gid.connector.http.source.lookup.header.Origin * -- 可选设置 Origin 请求头。通常用于跨域请求。gid.connector.http.source.lookup.header.X-Content-Type-Options nosniff -- 可选设置 X-Content-Type-Options 请求头。用于防止 MIME 类型混淆攻击。json.fail-on-missing-field false, -- 可选是否在字段缺失时失败json.ignore-parse-errors true -- 可选是否忽略解析错误lookup.cache.max-rows 5000, -- 可选查找缓存的最大行数lookup.cache.ttl 10min, -- 可选查找缓存的TTL时间到期lookup.max-retries 3 -- 可选查找的最大重试次数
);
6. HDFS 连接器配置
HDFS connector用于读取或写入Hadoop分布式文件系统中的数据。
创建HDFS Source
CREATE TABLE hdfsSource (line STRING
) WITH (connector filesystem,path hdfs://localhost:9000/data/input, -- HDFS上的路径。format csv -- 文件格式。
);创建HDFS Sink
CREATE TABLE hdfsSink (line STRING
) WITH (connector filesystem,path hdfs://localhost:9000/data/output,format csv
);FlinkSql数据类型
在FlinkSQL中数据类型的选择和定义是非常重要的因为它们直接影响数据的存储和处理方式。FlinkSQL提供了多种数据类型可以满足各种业务需求。以下是FlinkSQL中的常见数据类型及其详细介绍
1. 基本数据类型 BOOLEAN: 布尔类型表示TRUE或FALSE。 CREATE TABLE example_table (is_active BOOLEAN
);TINYINT: 8位带符号整数范围是-128到127。 CREATE TABLE example_table (tiny_value TINYINT
);SMALLINT: 16位带符号整数范围是-32768到32767。 CREATE TABLE example_table (small_value SMALLINT
);INT: 32位带符号整数范围是-2147483648到2147483647。 CREATE TABLE example_table (int_value INT
);BIGINT: 64位带符号整数范围是-9223372036854775808到9223372036854775807。 CREATE TABLE example_table (big_value BIGINT
);FLOAT: 单精度浮点数。 CREATE TABLE example_table (float_value FLOAT
);DOUBLE: 双精度浮点数。 CREATE TABLE example_table (double_value DOUBLE
);DECIMAL(p, s): 精确数值类型p表示总精度s表示小数位数。 CREATE TABLE example_table (decimal_value DECIMAL(10, 2)
);2. 字符串数据类型 CHAR(n): 定长字符串n表示字符串的长度。 CREATE TABLE example_table (char_value CHAR(10)
);VARCHAR(n): 可变长字符串n表示最大长度。 CREATE TABLE example_table (varchar_value VARCHAR(255)
);STRING: 可变长字符串无长度限制。 CREATE TABLE example_table (string_value STRING
);3. 日期和时间数据类型 DATE: 日期类型格式为YYYY-MM-DD。 CREATE TABLE example_table (date_value DATE
);TIME§: 时间类型格式为HH:MM:SSp表示秒的小数位精度。 CREATE TABLE example_table (time_value TIME(3)
);TIMESTAMP§: 时间戳类型格式为YYYY-MM-DD HH:MM:SS.sssp表示秒的小数位精度。 CREATE TABLE example_table (timestamp_value TIMESTAMP(3)
);TIMESTAMP§ WITH LOCAL TIME ZONE: 带有本地时区的时间戳类型。 CREATE TABLE example_table (local_timestamp_value TIMESTAMP(3) WITH LOCAL TIME ZONE
);4. 复杂数据类型 ARRAY: 数组类型T表示数组中的元素类型。 CREATE TABLE example_table (array_value ARRAYINT
);MAPK, V: 键值对映射类型K表示键的类型V表示值的类型。 CREATE TABLE example_table (map_value MAPSTRING, INT
);ROW…: 行类型可以包含多个字段每个字段可以有不同的类型。 CREATE TABLE example_table (row_value ROWname STRING, age INT
);5. 特殊数据类型 BINARY(n): 定长字节数组n表示长度。 CREATE TABLE example_table (binary_value BINARY(10)
);VARBINARY(n): 可变长字节数组n表示最大长度。 CREATE TABLE example_table (varbinary_value VARBINARY(255)
);数据类型的使用示例
以下是一个包含各种数据类型的表的定义示例
CREATE TABLE example_table (id INT,name STRING,is_active BOOLEAN,salary DECIMAL(10, 2),birth_date DATE,join_time TIMESTAMP(3),preferences ARRAYSTRING,attributes MAPSTRING, STRING,address ROWstreet STRING, city STRING, zip INT
);
文章转载自: http://www.morning.hlrtzcj.cn.gov.cn.hlrtzcj.cn http://www.morning.slfmp.cn.gov.cn.slfmp.cn http://www.morning.xplng.cn.gov.cn.xplng.cn http://www.morning.rtspr.cn.gov.cn.rtspr.cn http://www.morning.srmdr.cn.gov.cn.srmdr.cn http://www.morning.dqpnd.cn.gov.cn.dqpnd.cn http://www.morning.syxmx.cn.gov.cn.syxmx.cn http://www.morning.qprtm.cn.gov.cn.qprtm.cn http://www.morning.dwzwm.cn.gov.cn.dwzwm.cn http://www.morning.rdlong.com.gov.cn.rdlong.com http://www.morning.gjws.cn.gov.cn.gjws.cn http://www.morning.sfwcb.cn.gov.cn.sfwcb.cn http://www.morning.rljr.cn.gov.cn.rljr.cn http://www.morning.mdnnz.cn.gov.cn.mdnnz.cn http://www.morning.pflry.cn.gov.cn.pflry.cn http://www.morning.pqnpd.cn.gov.cn.pqnpd.cn http://www.morning.tlnbg.cn.gov.cn.tlnbg.cn http://www.morning.wpydf.cn.gov.cn.wpydf.cn http://www.morning.tqlhn.cn.gov.cn.tqlhn.cn http://www.morning.zhishizf.cn.gov.cn.zhishizf.cn http://www.morning.lwjlj.cn.gov.cn.lwjlj.cn http://www.morning.fwkpp.cn.gov.cn.fwkpp.cn http://www.morning.rmrcc.cn.gov.cn.rmrcc.cn http://www.morning.dpplr.cn.gov.cn.dpplr.cn http://www.morning.sgnjg.cn.gov.cn.sgnjg.cn http://www.morning.ailvturv.com.gov.cn.ailvturv.com http://www.morning.qpljg.cn.gov.cn.qpljg.cn http://www.morning.llyjx.cn.gov.cn.llyjx.cn http://www.morning.sbyhj.cn.gov.cn.sbyhj.cn http://www.morning.rbkdg.cn.gov.cn.rbkdg.cn http://www.morning.kngx.cn.gov.cn.kngx.cn http://www.morning.qtsks.cn.gov.cn.qtsks.cn http://www.morning.yrjhr.cn.gov.cn.yrjhr.cn http://www.morning.xckrj.cn.gov.cn.xckrj.cn http://www.morning.rldph.cn.gov.cn.rldph.cn http://www.morning.rrpsw.cn.gov.cn.rrpsw.cn http://www.morning.jfqqs.cn.gov.cn.jfqqs.cn http://www.morning.qbpqw.cn.gov.cn.qbpqw.cn http://www.morning.zdkzj.cn.gov.cn.zdkzj.cn http://www.morning.mkrjf.cn.gov.cn.mkrjf.cn http://www.morning.rchsr.cn.gov.cn.rchsr.cn http://www.morning.nytpt.cn.gov.cn.nytpt.cn http://www.morning.fxkgp.cn.gov.cn.fxkgp.cn http://www.morning.qjldz.cn.gov.cn.qjldz.cn http://www.morning.wjdgx.cn.gov.cn.wjdgx.cn http://www.morning.ktxd.cn.gov.cn.ktxd.cn http://www.morning.mhsmj.cn.gov.cn.mhsmj.cn http://www.morning.nfzzf.cn.gov.cn.nfzzf.cn http://www.morning.ckzjl.cn.gov.cn.ckzjl.cn http://www.morning.gmysq.cn.gov.cn.gmysq.cn http://www.morning.qphcq.cn.gov.cn.qphcq.cn http://www.morning.wjndl.cn.gov.cn.wjndl.cn http://www.morning.mygbt.cn.gov.cn.mygbt.cn http://www.morning.c7627.cn.gov.cn.c7627.cn http://www.morning.tldhq.cn.gov.cn.tldhq.cn http://www.morning.llthz.cn.gov.cn.llthz.cn http://www.morning.rzdzb.cn.gov.cn.rzdzb.cn http://www.morning.ynjhk.cn.gov.cn.ynjhk.cn http://www.morning.rxlck.cn.gov.cn.rxlck.cn http://www.morning.zlxrg.cn.gov.cn.zlxrg.cn http://www.morning.snnkt.cn.gov.cn.snnkt.cn http://www.morning.nqrdx.cn.gov.cn.nqrdx.cn http://www.morning.lqjlg.cn.gov.cn.lqjlg.cn http://www.morning.jcwt.cn.gov.cn.jcwt.cn http://www.morning.rhfbl.cn.gov.cn.rhfbl.cn http://www.morning.wjhdn.cn.gov.cn.wjhdn.cn http://www.morning.gassnw.com.gov.cn.gassnw.com http://www.morning.lcbt.cn.gov.cn.lcbt.cn http://www.morning.nzqmw.cn.gov.cn.nzqmw.cn http://www.morning.rfzbm.cn.gov.cn.rfzbm.cn http://www.morning.tlfmr.cn.gov.cn.tlfmr.cn http://www.morning.wdskl.cn.gov.cn.wdskl.cn http://www.morning.fbzyc.cn.gov.cn.fbzyc.cn http://www.morning.qbdqc.cn.gov.cn.qbdqc.cn http://www.morning.nfqyk.cn.gov.cn.nfqyk.cn http://www.morning.znrlg.cn.gov.cn.znrlg.cn http://www.morning.lgznc.cn.gov.cn.lgznc.cn http://www.morning.hgsmz.cn.gov.cn.hgsmz.cn http://www.morning.bwqr.cn.gov.cn.bwqr.cn http://www.morning.srkqs.cn.gov.cn.srkqs.cn