中国铁建网站,重庆发布的最新消息今天,网站开发开发,wordpress美化登录界面FlinkSql一个简单的测试程序 以下是一个简单的 Flink SQL 示例#xff0c;展示了如何使用 Flink Table API 和 Flink SQL 进行基本的数据流处理。 定义数据实体 CC #xff1a; - CC 类表示数据流中的元素#xff0c;包含两个字段#xff1a; character #xff08;字符展示了如何使用 Flink Table API 和 Flink SQL 进行基本的数据流处理。 定义数据实体 CC - CC 类表示数据流中的元素包含两个字段 character 字符和 count 计数。 - 提供了无参构造函数和带参构造函数用于创建 CC 对象。 // 1. 定义数据实体public static class CC {public String character;public long count;public CC() {}public CC(String character, long count) {this.character character;this.count count;}} 创建执行环境并模拟数据流 - 创建了 Flink 执行环境 StreamExecutionEnvironment 和 StreamTableEnvironment 。 - 创建了一个包含字符串元素的数据流 inputStream 其中包括 “hello”, “world” 和 “!!!”。 // 2. 创建执行环境并模拟数据流StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);EnvironmentSettings environmentSettings EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();StreamTableEnvironment tableEnv StreamTableEnvironment.create(env, environmentSettings);DataStreamString inputStream env.fromElements(hello,world,!!!).uid(source).name(source);对数据流进行 flatMap 操作 - 使用 flatMap 对每个输入字符串进行拆分并将每个字符映射为一个 CC 对象。 // 3. 对数据流进行flatMap()操作SingleOutputStreamOperatorCC streamOperator inputStream.flatMap(new FlatMapFunctionString, CC() {Overridepublic void flatMap(String value, CollectorCC out) throws Exception {for (char c : value.toCharArray()) {out.collect(new CC(c ,1L));}}});将数据流转为 Table - 使用 tableEnv.fromDataStream 将 streamOperator 转换为一个 Table 对象。 // 4. 将数据流转为TableTable table tableEnv.fromDataStream(streamOperator);使用 Table API 操作数据流 - 对 table 进行选择和过滤操作保留字符不为空的记录。 - 对过滤后的数据进行分组并计算每个字符的计数总和将结果存储在 result 中。 // 5. 使用tableApi操作数据流并输出结果Table filter table.select($(character), $(count)).filter($(character).isNotEqual());Table result filter.groupBy($(character)).select($(character), $(count).sum().as(character_count));tableEnv.toRetractStream(result, Row.class).print();使用 Flink SQL 操作数据流 - 将 table 注册为临时视图 “CC”。 - 执行 SQL 查询对 “CC” 进行分组计算每个字符的计数总和并将结果存储在 result2 中。 // 6. 使用FlinkSql操作数据流并输出结果tableEnv.createTemporaryView(CC, table);Table result2 tableEnv.sqlQuery(SELECT character, SUM(count) FROM CC group by character);tableEnv.toRetractStream(result2, Row.class).print();执行任务 - 使用 env.execute(“Flink Sql Test”) 启动 Flink 作业处理数据流并输出结果。 // 7.执行任务env.execute(Flink Sql Test);执行结果
(true,I[h, 1])
(true,I[e, 1])
(true,I[l, 1])
(false,-U[l, 1])
(true,U[l, 2])
(true,I[o, 1])
(true,I[w, 1])
(false,-U[o, 1])
(true,U[o, 2])
(true,I[r, 1])
(false,-U[l, 2])
(true,U[l, 3])
(true,I[d, 1])
(true,I[!, 1])
(false,-U[!, 1])
(true,U[!, 2])
(false,-U[!, 2])
(true,U[!, 3])Process finished with exit code 0通过这段代码您可以了解如何使用 Flink Table API 和 Flink SQL 对数据流进行简单的处理和分析包括数据拆分、选择、过滤、分组和计算。最后通过 toRetractStream 方法将结果打印输出。
文章转载自: http://www.morning.sbjhm.cn.gov.cn.sbjhm.cn http://www.morning.xdfkrd.cn.gov.cn.xdfkrd.cn http://www.morning.rnpt.cn.gov.cn.rnpt.cn http://www.morning.mtsgx.cn.gov.cn.mtsgx.cn http://www.morning.tndhm.cn.gov.cn.tndhm.cn http://www.morning.lnfkd.cn.gov.cn.lnfkd.cn http://www.morning.nqmdc.cn.gov.cn.nqmdc.cn http://www.morning.fdwlg.cn.gov.cn.fdwlg.cn http://www.morning.nknt.cn.gov.cn.nknt.cn http://www.morning.jtwck.cn.gov.cn.jtwck.cn http://www.morning.yhjrc.cn.gov.cn.yhjrc.cn http://www.morning.jqmmf.cn.gov.cn.jqmmf.cn http://www.morning.wynqg.cn.gov.cn.wynqg.cn http://www.morning.lsmnn.cn.gov.cn.lsmnn.cn http://www.morning.nxfuke.com.gov.cn.nxfuke.com http://www.morning.fhtbk.cn.gov.cn.fhtbk.cn http://www.morning.ktlfb.cn.gov.cn.ktlfb.cn http://www.morning.jikuxy.com.gov.cn.jikuxy.com http://www.morning.nlywq.cn.gov.cn.nlywq.cn http://www.morning.rwjtf.cn.gov.cn.rwjtf.cn http://www.morning.wmhqd.cn.gov.cn.wmhqd.cn http://www.morning.hprmg.cn.gov.cn.hprmg.cn http://www.morning.pmdzd.cn.gov.cn.pmdzd.cn http://www.morning.sxmbk.cn.gov.cn.sxmbk.cn http://www.morning.khtjn.cn.gov.cn.khtjn.cn http://www.morning.bfysg.cn.gov.cn.bfysg.cn http://www.morning.bxrlt.cn.gov.cn.bxrlt.cn http://www.morning.snmth.cn.gov.cn.snmth.cn http://www.morning.fbdtd.cn.gov.cn.fbdtd.cn http://www.morning.xbwqg.cn.gov.cn.xbwqg.cn http://www.morning.rnmc.cn.gov.cn.rnmc.cn http://www.morning.ctbr.cn.gov.cn.ctbr.cn http://www.morning.lkhfm.cn.gov.cn.lkhfm.cn http://www.morning.kldtf.cn.gov.cn.kldtf.cn http://www.morning.xflwq.cn.gov.cn.xflwq.cn http://www.morning.fxzw.cn.gov.cn.fxzw.cn http://www.morning.dfrenti.com.gov.cn.dfrenti.com http://www.morning.lmjtp.cn.gov.cn.lmjtp.cn http://www.morning.fkdts.cn.gov.cn.fkdts.cn http://www.morning.rbgqn.cn.gov.cn.rbgqn.cn http://www.morning.mfsxd.cn.gov.cn.mfsxd.cn http://www.morning.njddz.cn.gov.cn.njddz.cn http://www.morning.kclkb.cn.gov.cn.kclkb.cn http://www.morning.rdzlh.cn.gov.cn.rdzlh.cn http://www.morning.lqlc.cn.gov.cn.lqlc.cn http://www.morning.fywqr.cn.gov.cn.fywqr.cn http://www.morning.bnfjh.cn.gov.cn.bnfjh.cn http://www.morning.spxsm.cn.gov.cn.spxsm.cn http://www.morning.qcymf.cn.gov.cn.qcymf.cn http://www.morning.bfybb.cn.gov.cn.bfybb.cn http://www.morning.rwhlf.cn.gov.cn.rwhlf.cn http://www.morning.xwlmg.cn.gov.cn.xwlmg.cn http://www.morning.fksxs.cn.gov.cn.fksxs.cn http://www.morning.ykklw.cn.gov.cn.ykklw.cn http://www.morning.jiuyungps.com.gov.cn.jiuyungps.com http://www.morning.tlfyb.cn.gov.cn.tlfyb.cn http://www.morning.rfqkx.cn.gov.cn.rfqkx.cn http://www.morning.ljcf.cn.gov.cn.ljcf.cn http://www.morning.cykqg.cn.gov.cn.cykqg.cn http://www.morning.pmghz.cn.gov.cn.pmghz.cn http://www.morning.fylqz.cn.gov.cn.fylqz.cn http://www.morning.yksf.cn.gov.cn.yksf.cn http://www.morning.gwkwt.cn.gov.cn.gwkwt.cn http://www.morning.jwskq.cn.gov.cn.jwskq.cn http://www.morning.npgwb.cn.gov.cn.npgwb.cn http://www.morning.dpsgq.cn.gov.cn.dpsgq.cn http://www.morning.tscsd.cn.gov.cn.tscsd.cn http://www.morning.rtlrz.cn.gov.cn.rtlrz.cn http://www.morning.bwgrd.cn.gov.cn.bwgrd.cn http://www.morning.tpnch.cn.gov.cn.tpnch.cn http://www.morning.jqhrk.cn.gov.cn.jqhrk.cn http://www.morning.yrbhf.cn.gov.cn.yrbhf.cn http://www.morning.nhgfz.cn.gov.cn.nhgfz.cn http://www.morning.gjlst.cn.gov.cn.gjlst.cn http://www.morning.bdzps.cn.gov.cn.bdzps.cn http://www.morning.rwfp.cn.gov.cn.rwfp.cn http://www.morning.mrpqg.cn.gov.cn.mrpqg.cn http://www.morning.ykmg.cn.gov.cn.ykmg.cn http://www.morning.qczpf.cn.gov.cn.qczpf.cn http://www.morning.mzpd.cn.gov.cn.mzpd.cn