php企业门户网站,佛山顺德容桂网站制作,nat123做网站 查封,中国电力建设股份有限公司官方网站目录 1.维表
2.数据准备
创建源数据
创建维度表
创建Sink表
3.配置任务
Flink SQL创建kafka源表
Flink SQL创建MySQL维表
Flink SQL创建MySQL结果表
编写计算任务
核验数据 1.维表
目前在实时计算的场景中#xff0c;大多数都使用过MySQL、Hbase、redis作为维表引擎…
目录 1.维表
2.数据准备
创建源数据
创建维度表
创建Sink表
3.配置任务
Flink SQL创建kafka源表
Flink SQL创建MySQL维表
Flink SQL创建MySQL结果表
编写计算任务
核验数据 1.维表
目前在实时计算的场景中大多数都使用过MySQL、Hbase、redis作为维表引擎存储一些维度数据然后在DataStream API中调用MySQL、Hbase、redis客户端去获取到维度数据进行维度扩充。
本案例采用MySQL创建维表与创建MySQL sink表语法相同。
2.数据准备
创建源数据
重启kafka创建Topic case_kafka_mysql
写入json格式的数据 {ts: 20201011,id: 8,price_amt:211}
创建维度表
在MySQL中创建名为product_dim的表
CREATE TABLE product_dim (id bigint(11) NOT NULL,coupon_price_amt bigint(11) DEFAULT NULL,PRIMARY KEY (id)
) ENGINEInnoDB DEFAULT CHARSETutf8mb4;
向数据表插入如下数据
INSERT INTO product_dim VALUES (1, 1);
INSERT INTO product_dim VALUES (3, 1);
INSERT INTO product_dim VALUES (8, 1);
创建Sink表
在MySQL中创建名为sync_test_3的表
CREATE TABLE sync_test_3 (id bigint(11) NOT NULL AUTO_INCREMENT,ts varchar(64) DEFAULT NULL,total_gmv bigint(11) DEFAULT NULL,PRIMARY KEY (id),UNIQUE KEY uidx (ts) USING BTREE
) ENGINEInnoDB AUTO_INCREMENT5 DEFAULT CHARSETutf8mb4;
3.配置任务
Flink SQL创建kafka源表
create table flink_test_3 (id BIGINT,ts VARCHAR,price_amt BIGINT,proctime AS PROCTIME ()
)with (connector kafka,topic case_kafka_mysql,properties.bootstrap.servers 127.0.0.1:9092,properties.group.id flink_gp_test3,scan.startup.mode earliest-offset,format json,json.fail-on-missing-field false,json.ignore-parse-errors true,properties.zookeeper.connect 127.0.0.1:2181/kafka);
Flink SQL创建MySQL维表
create table flink_test_3_dim (id BIGINT,coupon_price_amt BIGINT
)
WITH (connector jdbc,url jdbc:mysql://127.0.0.1:3306/db01?characterEncodingUTF-8,table-name product_dim,username root,password Admin,lookup.max-retries 3,lookup.cache.max-rows 1000);
WITH参数 参数 说明 类型 备注 lookup.cache.max-rows 指定缓存的最大行数。如果超过该值则最老的行记录将会过期会被新的记录替换掉。 Integer 默认情况下维表Cache是未开启的。 lookup.cache.ttl 指定缓存中每行记录的最大存活时间。如果某行记录超过该时间则该行记录将会过期。 Duration 默认情况下维表Cache是未开启的。你可以设置lookup.cache.max-rows和 lookup.cache.ttl参数来启用维表Cache。启用缓存时采用的是LRU策略缓存。 lookup.cache.caching-missing-key 是否缓存空的查询结果。 Boolean 参数取值如下 true默认值缓存空的查询结果。 false不缓存空的查询结果。 lookup.max-retries 查询数据库失败的最大重试次数。 Integer 默认值为3。 Flink SQL创建MySQL结果表
CREATE TABLE sync_test_3 (ts string,total_gmv bigint,PRIMARY KEY (ts) NOT ENFORCED) WITH (connector jdbc,url jdbc:mysql://127.0.0.1:3306/db01?characterEncodingUTF-8,table-name sync_test_3,username root,password Admin);
编写计算任务
INSERT INTO sync_test_3
SELECTts,SUM(price_amt - coupon_price_amt) AS total_gmv
FROM(SELECTa.ts as ts,a.price_amt as price_amt,b.coupon_price_amt as coupon_price_amtFROMflink_test_3 as aLEFT JOIN flink_test_3_dim FOR SYSTEM_TIME AS OF a.proctime as bON b.id a.id)
GROUP BY ts;
核验数据
SELECT id, ts, total_gmv FROM sync_test_3;