微信公众号平台及网站建设计划,软文通,fullpage做的网站,代理加盟做什么好系列文章目录
物流实时数仓#xff1a;采集通道搭建 物流实时数仓#xff1a;数仓搭建 文章目录 系列文章目录前言一、IDEA环境准备1.pom.xml2.目录创建 二、代码编写1.log4j.properties2.CreateEnvUtil.java3.KafkaUtil.java4.OdsApp.java 三、代码测试总结 前言
现在我们…系列文章目录
物流实时数仓采集通道搭建 物流实时数仓数仓搭建 文章目录 系列文章目录前言一、IDEA环境准备1.pom.xml2.目录创建 二、代码编写1.log4j.properties2.CreateEnvUtil.java3.KafkaUtil.java4.OdsApp.java 三、代码测试总结 前言
现在我们开始进行数仓的搭建我们用Kafka来代替数仓的ods层。 基本流程为使用Flink从MySQL读取数据然后写入Kafka中 一、IDEA环境准备
1.pom.xml
写入项目需要的配置
propertiesmaven.compiler.source8/maven.compiler.sourcemaven.compiler.target8/maven.compiler.targetproject.build.sourceEncodingUTF-8/project.build.sourceEncodingjava.version1.8/java.versionflink.version1.17.0/flink.versionhadoop.version3.2.3/hadoop.versionflink-cdc.version2.3.0/flink-cdc.version/propertiesdependenciesdependencygroupIdorg.apache.flink/groupIdartifactIdflink-java/artifactIdversion${flink.version}/version/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-streaming-java/artifactIdversion${flink.version}/version/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-connector-kafka/artifactIdversion${flink.version}/version/dependencydependencygroupIdcom.alibaba/groupIdartifactIdfastjson/artifactIdversion1.2.68/version/dependencydependencygroupIdorg.apache.hadoop/groupIdartifactIdhadoop-client/artifactIdversion${hadoop.version}/versionexclusionsexclusiongroupIdorg.slf4j/groupIdartifactIdslf4j-reload4j/artifactId/exclusion/exclusions/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-clients/artifactIdversion${flink.version}/version/dependencydependencygroupIdorg.slf4j/groupIdartifactIdslf4j-api/artifactIdversion1.7.25/version/dependencydependencygroupIdorg.slf4j/groupIdartifactIdslf4j-log4j12/artifactIdversion1.7.25/version/dependencydependencygroupIdorg.apache.logging.log4j/groupIdartifactIdlog4j-to-slf4j/artifactIdversion2.14.0/version/dependencydependencygroupIdcom.ververica/groupIdartifactIdflink-connector-mysql-cdc/artifactIdversion${flink-cdc.version}/version/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-table-runtime/artifactIdversion${flink.version}/version/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-table-planner-loader/artifactIdversion${flink.version}/version/dependency/dependencies基本上项目需要的所有jar包都有了不够以后在加。
2.目录创建
按照以上目录结构进行目录创建
二、代码编写
1.log4j.properties
log4j.rootLoggererror,stdout
log4j.appender.stdoutorg.apache.log4j.ConsoleAppender
log4j.appender.stdout.targetSystem.out
log4j.appender.stdout.layoutorg.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern%d %p [%c] - %m%n2.CreateEnvUtil.java
这个文件中有两个方法 创建初始化Flink的env Flink连接mysql的MySqlSource
package com.atguigu.tms.realtime.utils;import com.esotericsoftware.minlog.Log;
import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.connectors.mysql.source.MySqlSourceBuilder;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.runtime.state.hashmap.HashMapStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.kafka.connect.json.DecimalFormat;
import org.apache.kafka.connect.json.JsonConverterConfig;import java.util.HashMap;public class CreateEnvUtil {public static StreamExecutionEnvironment getStreamEnv(String[] args) {// 1.1 指定流处理环境StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();// 2.检查点相关设置// 2.1 开启检查点env.enableCheckpointing(6000L, CheckpointingMode.EXACTLY_ONCE);// 2.2 设置检查点的超时时间env.getCheckpointConfig().setCheckpointTimeout(120000L);// 2.3 设置job取消之后 检查点是否保留env.getCheckpointConfig().setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);// 2.4 设置两个检查点之间的最小时间间隔env.getCheckpointConfig().setMinPauseBetweenCheckpoints(30000L);// 2.5 设置重启策略env.setRestartStrategy(RestartStrategies.failureRateRestart(3, Time.days(1), Time.seconds(3)));// 2.6 设置状态后端env.setStateBackend(new HashMapStateBackend());env.getCheckpointConfig().setCheckpointStorage(hdfs://hadoop102:8020/tms/ck);// 2.7 设置操作hdfs用户// 获取命令行参数ParameterTool parameterTool ParameterTool.fromArgs(args);String hdfsUserName parameterTool.get(hadoop-user-name, atguigu);System.setProperty(HADOOP_USER_NAME, hdfsUserName);return env;}public static MySqlSourceString getMysqlSource(String option, String serverId, String[] args) {ParameterTool parameterTool ParameterTool.fromArgs(args);String mysqlHostname parameterTool.get(hadoop-user-name, hadoop102);int mysqlPort Integer.parseInt(parameterTool.get(mysql-port, 3306));String mysqlUsername parameterTool.get(mysql-username, root);String mysqlPasswd parameterTool.get(mysql-passwd, 000000);option parameterTool.get(start-up-option, option);serverId parameterTool.get(server-id, serverId);// 创建配置信息 Map 集合将 Decimal 数据类型的解析格式配置 k-v 置于其中HashMap config new HashMap();config.put(JsonConverterConfig.DECIMAL_FORMAT_CONFIG, DecimalFormat.NUMERIC.name());// 将前述 Map 集合中的配置信息传递给 JSON 解析 Schema该 Schema 将用于 MysqlSource 的初始化JsonDebeziumDeserializationSchema jsonDebeziumDeserializationSchema new JsonDebeziumDeserializationSchema(false, config);MySqlSourceBuilderString builder MySqlSource.Stringbuilder().hostname(mysqlHostname).port(mysqlPort).username(mysqlUsername).password(mysqlPasswd).deserializer(jsonDebeziumDeserializationSchema);switch (option) {// 读取实时数据case dwd:String[] dwdTables new String[]{tms.order_info,tms.order_cargo,tms.transport_task,tms.order_org_bound};return builder.databaseList(tms).tableList(dwdTables).startupOptions(StartupOptions.latest()).serverId(serverId).build();// 读取维度数据case realtime_dim:String[] realtimeDimTables new String[]{tms.user_info,tms.user_address,tms.base_complex,tms.base_dic,tms.base_region_info,tms.base_organ,tms.express_courier,tms.express_courier_complex,tms.employee_info,tms.line_base_shift,tms.line_base_info,tms.truck_driver,tms.truck_info,tms.truck_model,tms.truck_team};return builder.databaseList(tms).tableList(realtimeDimTables).startupOptions(StartupOptions.initial()).serverId(serverId).build();}Log.error(不支持操作类型);return null;}
}
3.KafkaUtil.java
该文件中有一个方法创建Flink连接Kafka需要的Sink
package com.atguigu.tms.realtime.utils;import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.kafka.clients.producer.ProducerConfig;public class KafkaUtil {private static final String KAFKA_SERVER hadoop102:9092,hadoop103:9092,hadoop104:9092;public static KafkaSinkString getKafkaSink(String topic, String transIdPrefix, String[] args) {// 将命令行参数对象封装为 ParameterTool 类对象ParameterTool parameterTool ParameterTool.fromArgs(args);// 提取命令行传入的 key 为 topic 的配置信息并将默认值指定为方法参数 topic// 当命令行没有指定 topic 时会采用默认值topic parameterTool.get(topic, topic);// 如果命令行没有指定主题名称且默认值为 null 则抛出异常if (topic null) {throw new IllegalArgumentException(主题名不可为空命令行传参为空且没有默认值!);}// 获取命令行传入的 key 为 bootstrap-servers 的配置信息并指定默认值String bootstrapServers parameterTool.get(bootstrap-severs, KAFKA_SERVER);// 获取命令行传入的 key 为 transaction-timeout 的配置信息并指定默认值String transactionTimeout parameterTool.get(transaction-timeout, 15 * 60 * 1000 );return KafkaSink.Stringbuilder().setBootstrapServers(bootstrapServers).setRecordSerializer(KafkaRecordSerializationSchema.builder().setTopic(topic).setValueSerializationSchema(new SimpleStringSchema()).build()).setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE).setTransactionalIdPrefix(transIdPrefix).setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, transactionTimeout).build();}public static KafkaSinkString getKafkaSink(String topic, String[] args) {return getKafkaSink(topic, topic _trans, args);}
}
4.OdsApp.java
Ods层的app创建负责读取和写入数据
package com.atguigu.tms.realtime.app.ods;import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.atguigu.tms.realtime.utils.CreateEnvUtil;
import com.atguigu.tms.realtime.utils.KafkaUtil;
import com.esotericsoftware.minlog.Log;
import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;public class OdsApp {public static void main(String[] args) throws Exception {// 1.获取流处理环境并指定检查点StreamExecutionEnvironment env CreateEnvUtil.getStreamEnv(args);env.setParallelism(4);// 2 使用FlinkCDC从MySQL中读取数据-事实数据String dwdOption dwd;String dwdServerId 6030;String dwdsourceName ods_app_dwd_source;mysqlToKafka(dwdOption, dwdServerId, dwdsourceName, env, args);// 3 使用FlinkCDC从MySQL中读取数据-维度数据String realtimeDimOption realtime_dim;String realtimeDimServerId 6040;String realtimeDimsourceName ods_app_realtimeDim_source;mysqlToKafka(realtimeDimOption, realtimeDimServerId, realtimeDimsourceName, env, args);env.execute();}public static void mysqlToKafka(String option, String serverId, String sourceName, StreamExecutionEnvironment env, String[] args) {MySqlSourceString MySqlSource CreateEnvUtil.getMysqlSource(option, serverId, args);SingleOutputStreamOperatorString dwdStrDS env.fromSource(MySqlSource, WatermarkStrategy.noWatermarks(), sourceName).setParallelism(1).uid(option sourceName);// 3 简单ETLSingleOutputStreamOperatorString processDS dwdStrDS.process(new ProcessFunctionString, String() {Overridepublic void processElement(String jsonStr, ProcessFunctionString, String.Context ctx, CollectorString out) {try {JSONObject jsonObj JSONObject.parseObject(jsonStr);if (jsonObj.getJSONObject(after) ! null !d.equals(jsonObj.getString(op))) {
// System.out.println(jsonObj);Long tsMs jsonObj.getLong(ts_ms);jsonObj.put(ts, tsMs);jsonObj.remove(ts_ms);String jsonString jsonObj.toJSONString();out.collect(jsonString);}} catch (Exception e) {Log.error(从Flink-CDC得到的数据不是一个标准的json格式,e);}}}).setParallelism(1);// 4 按照主键进行分组避免出现乱序KeyedStreamString, String keyedDS processDS.keyBy((KeySelectorString, String) jsonStr - {JSONObject jsonObj JSON.parseObject(jsonStr);return jsonObj.getJSONObject(after).getString(id);});//将数据写入KafkakeyedDS.sinkTo(KafkaUtil.getKafkaSink(tms_ods, sourceName _transPre, args)).uid(option _ods_app_sink);}
}三、代码测试
在虚拟机启动我们需要的组件目前需要hadoop、zk、kafka和MySQL。 先开一个消费者进行消费。
bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic tms_ods然后运行OdsApp.java 他会先读取维度数据因为维度数据需要全量更新之前的数据。 当他消费结束后我们运行jar包获取事实数据。
java -jar tms-mock-2023-01-06.jar 如果能消费到新数据代表通道没问题ODS层创建完成。 总结
至此ODS搭建完成。 文章转载自: http://www.morning.rzdzb.cn.gov.cn.rzdzb.cn http://www.morning.qxmys.cn.gov.cn.qxmys.cn http://www.morning.jrksk.cn.gov.cn.jrksk.cn http://www.morning.zsrdp.cn.gov.cn.zsrdp.cn http://www.morning.ppbrq.cn.gov.cn.ppbrq.cn http://www.morning.pwhjr.cn.gov.cn.pwhjr.cn http://www.morning.lhjmq.cn.gov.cn.lhjmq.cn http://www.morning.kxltf.cn.gov.cn.kxltf.cn http://www.morning.hbkkc.cn.gov.cn.hbkkc.cn http://www.morning.gjqnn.cn.gov.cn.gjqnn.cn http://www.morning.hcwjls.com.gov.cn.hcwjls.com http://www.morning.mfxcg.cn.gov.cn.mfxcg.cn http://www.morning.mqnbm.cn.gov.cn.mqnbm.cn http://www.morning.mgkcz.cn.gov.cn.mgkcz.cn http://www.morning.vjwkb.cn.gov.cn.vjwkb.cn http://www.morning.rlsd.cn.gov.cn.rlsd.cn http://www.morning.kkzwn.cn.gov.cn.kkzwn.cn http://www.morning.cfynn.cn.gov.cn.cfynn.cn http://www.morning.rkfwr.cn.gov.cn.rkfwr.cn http://www.morning.ftdlg.cn.gov.cn.ftdlg.cn http://www.morning.banzou2034.cn.gov.cn.banzou2034.cn http://www.morning.qbmpb.cn.gov.cn.qbmpb.cn http://www.morning.pjqxk.cn.gov.cn.pjqxk.cn http://www.morning.smpmn.cn.gov.cn.smpmn.cn http://www.morning.ygrkg.cn.gov.cn.ygrkg.cn http://www.morning.lbgfz.cn.gov.cn.lbgfz.cn http://www.morning.hous-e.com.gov.cn.hous-e.com http://www.morning.bxrqf.cn.gov.cn.bxrqf.cn http://www.morning.c7627.cn.gov.cn.c7627.cn http://www.morning.cykqg.cn.gov.cn.cykqg.cn http://www.morning.zbpqq.cn.gov.cn.zbpqq.cn http://www.morning.fmswb.cn.gov.cn.fmswb.cn http://www.morning.wjfzp.cn.gov.cn.wjfzp.cn http://www.morning.rmfw.cn.gov.cn.rmfw.cn http://www.morning.byrlg.cn.gov.cn.byrlg.cn http://www.morning.xwbwm.cn.gov.cn.xwbwm.cn http://www.morning.qbwbs.cn.gov.cn.qbwbs.cn http://www.morning.zhiheliuxue.com.gov.cn.zhiheliuxue.com http://www.morning.ywndg.cn.gov.cn.ywndg.cn http://www.morning.tsxg.cn.gov.cn.tsxg.cn http://www.morning.mfcbk.cn.gov.cn.mfcbk.cn http://www.morning.zbmcz.cn.gov.cn.zbmcz.cn http://www.morning.lnrr.cn.gov.cn.lnrr.cn http://www.morning.dmrjx.cn.gov.cn.dmrjx.cn http://www.morning.ghccq.cn.gov.cn.ghccq.cn http://www.morning.khxyx.cn.gov.cn.khxyx.cn http://www.morning.dgsx.cn.gov.cn.dgsx.cn http://www.morning.qsy37.cn.gov.cn.qsy37.cn http://www.morning.jjzbx.cn.gov.cn.jjzbx.cn http://www.morning.yzygj.cn.gov.cn.yzygj.cn http://www.morning.skrh.cn.gov.cn.skrh.cn http://www.morning.mwkwg.cn.gov.cn.mwkwg.cn http://www.morning.nkjxn.cn.gov.cn.nkjxn.cn http://www.morning.mrncd.cn.gov.cn.mrncd.cn http://www.morning.ckhyj.cn.gov.cn.ckhyj.cn http://www.morning.qfwfj.cn.gov.cn.qfwfj.cn http://www.morning.ryqsq.cn.gov.cn.ryqsq.cn http://www.morning.nkmw.cn.gov.cn.nkmw.cn http://www.morning.lpcpb.cn.gov.cn.lpcpb.cn http://www.morning.htqrh.cn.gov.cn.htqrh.cn http://www.morning.zxqyd.cn.gov.cn.zxqyd.cn http://www.morning.nfnxp.cn.gov.cn.nfnxp.cn http://www.morning.sgqw.cn.gov.cn.sgqw.cn http://www.morning.c7624.cn.gov.cn.c7624.cn http://www.morning.hrhwn.cn.gov.cn.hrhwn.cn http://www.morning.sgbsr.cn.gov.cn.sgbsr.cn http://www.morning.xmyrn.cn.gov.cn.xmyrn.cn http://www.morning.xgzwj.cn.gov.cn.xgzwj.cn http://www.morning.ktmnq.cn.gov.cn.ktmnq.cn http://www.morning.wfysn.cn.gov.cn.wfysn.cn http://www.morning.yprnp.cn.gov.cn.yprnp.cn http://www.morning.cpzkq.cn.gov.cn.cpzkq.cn http://www.morning.jygsq.cn.gov.cn.jygsq.cn http://www.morning.yldgw.cn.gov.cn.yldgw.cn http://www.morning.fnfhs.cn.gov.cn.fnfhs.cn http://www.morning.w58hje.cn.gov.cn.w58hje.cn http://www.morning.dtnjr.cn.gov.cn.dtnjr.cn http://www.morning.gwsll.cn.gov.cn.gwsll.cn http://www.morning.jhrtq.cn.gov.cn.jhrtq.cn http://www.morning.mmqhq.cn.gov.cn.mmqhq.cn