当前位置: 首页 > news >正文

微信公众号平台及网站建设计划软文通

微信公众号平台及网站建设计划,软文通,fullpage做的网站,代理加盟做什么好系列文章目录 物流实时数仓#xff1a;采集通道搭建 物流实时数仓#xff1a;数仓搭建 文章目录 系列文章目录前言一、IDEA环境准备1.pom.xml2.目录创建 二、代码编写1.log4j.properties2.CreateEnvUtil.java3.KafkaUtil.java4.OdsApp.java 三、代码测试总结 前言 现在我们…系列文章目录 物流实时数仓采集通道搭建 物流实时数仓数仓搭建 文章目录 系列文章目录前言一、IDEA环境准备1.pom.xml2.目录创建 二、代码编写1.log4j.properties2.CreateEnvUtil.java3.KafkaUtil.java4.OdsApp.java 三、代码测试总结 前言 现在我们开始进行数仓的搭建我们用Kafka来代替数仓的ods层。 基本流程为使用Flink从MySQL读取数据然后写入Kafka中 一、IDEA环境准备 1.pom.xml 写入项目需要的配置 propertiesmaven.compiler.source8/maven.compiler.sourcemaven.compiler.target8/maven.compiler.targetproject.build.sourceEncodingUTF-8/project.build.sourceEncodingjava.version1.8/java.versionflink.version1.17.0/flink.versionhadoop.version3.2.3/hadoop.versionflink-cdc.version2.3.0/flink-cdc.version/propertiesdependenciesdependencygroupIdorg.apache.flink/groupIdartifactIdflink-java/artifactIdversion${flink.version}/version/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-streaming-java/artifactIdversion${flink.version}/version/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-connector-kafka/artifactIdversion${flink.version}/version/dependencydependencygroupIdcom.alibaba/groupIdartifactIdfastjson/artifactIdversion1.2.68/version/dependencydependencygroupIdorg.apache.hadoop/groupIdartifactIdhadoop-client/artifactIdversion${hadoop.version}/versionexclusionsexclusiongroupIdorg.slf4j/groupIdartifactIdslf4j-reload4j/artifactId/exclusion/exclusions/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-clients/artifactIdversion${flink.version}/version/dependencydependencygroupIdorg.slf4j/groupIdartifactIdslf4j-api/artifactIdversion1.7.25/version/dependencydependencygroupIdorg.slf4j/groupIdartifactIdslf4j-log4j12/artifactIdversion1.7.25/version/dependencydependencygroupIdorg.apache.logging.log4j/groupIdartifactIdlog4j-to-slf4j/artifactIdversion2.14.0/version/dependencydependencygroupIdcom.ververica/groupIdartifactIdflink-connector-mysql-cdc/artifactIdversion${flink-cdc.version}/version/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-table-runtime/artifactIdversion${flink.version}/version/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-table-planner-loader/artifactIdversion${flink.version}/version/dependency/dependencies基本上项目需要的所有jar包都有了不够以后在加。 2.目录创建 按照以上目录结构进行目录创建 二、代码编写 1.log4j.properties log4j.rootLoggererror,stdout log4j.appender.stdoutorg.apache.log4j.ConsoleAppender log4j.appender.stdout.targetSystem.out log4j.appender.stdout.layoutorg.apache.log4j.PatternLayout log4j.appender.stdout.layout.ConversionPattern%d %p [%c] - %m%n2.CreateEnvUtil.java 这个文件中有两个方法 创建初始化Flink的env Flink连接mysql的MySqlSource package com.atguigu.tms.realtime.utils;import com.esotericsoftware.minlog.Log; import com.ververica.cdc.connectors.mysql.source.MySqlSource; import com.ververica.cdc.connectors.mysql.source.MySqlSourceBuilder; import com.ververica.cdc.connectors.mysql.table.StartupOptions; import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema; import org.apache.flink.api.common.restartstrategy.RestartStrategies; import org.apache.flink.api.common.time.Time; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.runtime.state.hashmap.HashMapStateBackend; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.environment.CheckpointConfig; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.kafka.connect.json.DecimalFormat; import org.apache.kafka.connect.json.JsonConverterConfig;import java.util.HashMap;public class CreateEnvUtil {public static StreamExecutionEnvironment getStreamEnv(String[] args) {// 1.1 指定流处理环境StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();// 2.检查点相关设置// 2.1 开启检查点env.enableCheckpointing(6000L, CheckpointingMode.EXACTLY_ONCE);// 2.2 设置检查点的超时时间env.getCheckpointConfig().setCheckpointTimeout(120000L);// 2.3 设置job取消之后 检查点是否保留env.getCheckpointConfig().setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);// 2.4 设置两个检查点之间的最小时间间隔env.getCheckpointConfig().setMinPauseBetweenCheckpoints(30000L);// 2.5 设置重启策略env.setRestartStrategy(RestartStrategies.failureRateRestart(3, Time.days(1), Time.seconds(3)));// 2.6 设置状态后端env.setStateBackend(new HashMapStateBackend());env.getCheckpointConfig().setCheckpointStorage(hdfs://hadoop102:8020/tms/ck);// 2.7 设置操作hdfs用户// 获取命令行参数ParameterTool parameterTool ParameterTool.fromArgs(args);String hdfsUserName parameterTool.get(hadoop-user-name, atguigu);System.setProperty(HADOOP_USER_NAME, hdfsUserName);return env;}public static MySqlSourceString getMysqlSource(String option, String serverId, String[] args) {ParameterTool parameterTool ParameterTool.fromArgs(args);String mysqlHostname parameterTool.get(hadoop-user-name, hadoop102);int mysqlPort Integer.parseInt(parameterTool.get(mysql-port, 3306));String mysqlUsername parameterTool.get(mysql-username, root);String mysqlPasswd parameterTool.get(mysql-passwd, 000000);option parameterTool.get(start-up-option, option);serverId parameterTool.get(server-id, serverId);// 创建配置信息 Map 集合将 Decimal 数据类型的解析格式配置 k-v 置于其中HashMap config new HashMap();config.put(JsonConverterConfig.DECIMAL_FORMAT_CONFIG, DecimalFormat.NUMERIC.name());// 将前述 Map 集合中的配置信息传递给 JSON 解析 Schema该 Schema 将用于 MysqlSource 的初始化JsonDebeziumDeserializationSchema jsonDebeziumDeserializationSchema new JsonDebeziumDeserializationSchema(false, config);MySqlSourceBuilderString builder MySqlSource.Stringbuilder().hostname(mysqlHostname).port(mysqlPort).username(mysqlUsername).password(mysqlPasswd).deserializer(jsonDebeziumDeserializationSchema);switch (option) {// 读取实时数据case dwd:String[] dwdTables new String[]{tms.order_info,tms.order_cargo,tms.transport_task,tms.order_org_bound};return builder.databaseList(tms).tableList(dwdTables).startupOptions(StartupOptions.latest()).serverId(serverId).build();// 读取维度数据case realtime_dim:String[] realtimeDimTables new String[]{tms.user_info,tms.user_address,tms.base_complex,tms.base_dic,tms.base_region_info,tms.base_organ,tms.express_courier,tms.express_courier_complex,tms.employee_info,tms.line_base_shift,tms.line_base_info,tms.truck_driver,tms.truck_info,tms.truck_model,tms.truck_team};return builder.databaseList(tms).tableList(realtimeDimTables).startupOptions(StartupOptions.initial()).serverId(serverId).build();}Log.error(不支持操作类型);return null;} } 3.KafkaUtil.java 该文件中有一个方法创建Flink连接Kafka需要的Sink package com.atguigu.tms.realtime.utils;import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.connector.base.DeliveryGuarantee; import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema; import org.apache.flink.connector.kafka.sink.KafkaSink; import org.apache.kafka.clients.producer.ProducerConfig;public class KafkaUtil {private static final String KAFKA_SERVER hadoop102:9092,hadoop103:9092,hadoop104:9092;public static KafkaSinkString getKafkaSink(String topic, String transIdPrefix, String[] args) {// 将命令行参数对象封装为 ParameterTool 类对象ParameterTool parameterTool ParameterTool.fromArgs(args);// 提取命令行传入的 key 为 topic 的配置信息并将默认值指定为方法参数 topic// 当命令行没有指定 topic 时会采用默认值topic parameterTool.get(topic, topic);// 如果命令行没有指定主题名称且默认值为 null 则抛出异常if (topic null) {throw new IllegalArgumentException(主题名不可为空命令行传参为空且没有默认值!);}// 获取命令行传入的 key 为 bootstrap-servers 的配置信息并指定默认值String bootstrapServers parameterTool.get(bootstrap-severs, KAFKA_SERVER);// 获取命令行传入的 key 为 transaction-timeout 的配置信息并指定默认值String transactionTimeout parameterTool.get(transaction-timeout, 15 * 60 * 1000 );return KafkaSink.Stringbuilder().setBootstrapServers(bootstrapServers).setRecordSerializer(KafkaRecordSerializationSchema.builder().setTopic(topic).setValueSerializationSchema(new SimpleStringSchema()).build()).setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE).setTransactionalIdPrefix(transIdPrefix).setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, transactionTimeout).build();}public static KafkaSinkString getKafkaSink(String topic, String[] args) {return getKafkaSink(topic, topic _trans, args);} } 4.OdsApp.java Ods层的app创建负责读取和写入数据 package com.atguigu.tms.realtime.app.ods;import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import com.atguigu.tms.realtime.utils.CreateEnvUtil; import com.atguigu.tms.realtime.utils.KafkaUtil; import com.esotericsoftware.minlog.Log; import com.ververica.cdc.connectors.mysql.source.MySqlSource; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.streaming.api.datastream.KeyedStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.ProcessFunction; import org.apache.flink.util.Collector;public class OdsApp {public static void main(String[] args) throws Exception {// 1.获取流处理环境并指定检查点StreamExecutionEnvironment env CreateEnvUtil.getStreamEnv(args);env.setParallelism(4);// 2 使用FlinkCDC从MySQL中读取数据-事实数据String dwdOption dwd;String dwdServerId 6030;String dwdsourceName ods_app_dwd_source;mysqlToKafka(dwdOption, dwdServerId, dwdsourceName, env, args);// 3 使用FlinkCDC从MySQL中读取数据-维度数据String realtimeDimOption realtime_dim;String realtimeDimServerId 6040;String realtimeDimsourceName ods_app_realtimeDim_source;mysqlToKafka(realtimeDimOption, realtimeDimServerId, realtimeDimsourceName, env, args);env.execute();}public static void mysqlToKafka(String option, String serverId, String sourceName, StreamExecutionEnvironment env, String[] args) {MySqlSourceString MySqlSource CreateEnvUtil.getMysqlSource(option, serverId, args);SingleOutputStreamOperatorString dwdStrDS env.fromSource(MySqlSource, WatermarkStrategy.noWatermarks(), sourceName).setParallelism(1).uid(option sourceName);// 3 简单ETLSingleOutputStreamOperatorString processDS dwdStrDS.process(new ProcessFunctionString, String() {Overridepublic void processElement(String jsonStr, ProcessFunctionString, String.Context ctx, CollectorString out) {try {JSONObject jsonObj JSONObject.parseObject(jsonStr);if (jsonObj.getJSONObject(after) ! null !d.equals(jsonObj.getString(op))) { // System.out.println(jsonObj);Long tsMs jsonObj.getLong(ts_ms);jsonObj.put(ts, tsMs);jsonObj.remove(ts_ms);String jsonString jsonObj.toJSONString();out.collect(jsonString);}} catch (Exception e) {Log.error(从Flink-CDC得到的数据不是一个标准的json格式,e);}}}).setParallelism(1);// 4 按照主键进行分组避免出现乱序KeyedStreamString, String keyedDS processDS.keyBy((KeySelectorString, String) jsonStr - {JSONObject jsonObj JSON.parseObject(jsonStr);return jsonObj.getJSONObject(after).getString(id);});//将数据写入KafkakeyedDS.sinkTo(KafkaUtil.getKafkaSink(tms_ods, sourceName _transPre, args)).uid(option _ods_app_sink);} }三、代码测试 在虚拟机启动我们需要的组件目前需要hadoop、zk、kafka和MySQL。 先开一个消费者进行消费。 bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic tms_ods然后运行OdsApp.java 他会先读取维度数据因为维度数据需要全量更新之前的数据。 当他消费结束后我们运行jar包获取事实数据。 java -jar tms-mock-2023-01-06.jar 如果能消费到新数据代表通道没问题ODS层创建完成。 总结 至此ODS搭建完成。
文章转载自:
http://www.morning.rzdzb.cn.gov.cn.rzdzb.cn
http://www.morning.qxmys.cn.gov.cn.qxmys.cn
http://www.morning.jrksk.cn.gov.cn.jrksk.cn
http://www.morning.zsrdp.cn.gov.cn.zsrdp.cn
http://www.morning.ppbrq.cn.gov.cn.ppbrq.cn
http://www.morning.pwhjr.cn.gov.cn.pwhjr.cn
http://www.morning.lhjmq.cn.gov.cn.lhjmq.cn
http://www.morning.kxltf.cn.gov.cn.kxltf.cn
http://www.morning.hbkkc.cn.gov.cn.hbkkc.cn
http://www.morning.gjqnn.cn.gov.cn.gjqnn.cn
http://www.morning.hcwjls.com.gov.cn.hcwjls.com
http://www.morning.mfxcg.cn.gov.cn.mfxcg.cn
http://www.morning.mqnbm.cn.gov.cn.mqnbm.cn
http://www.morning.mgkcz.cn.gov.cn.mgkcz.cn
http://www.morning.vjwkb.cn.gov.cn.vjwkb.cn
http://www.morning.rlsd.cn.gov.cn.rlsd.cn
http://www.morning.kkzwn.cn.gov.cn.kkzwn.cn
http://www.morning.cfynn.cn.gov.cn.cfynn.cn
http://www.morning.rkfwr.cn.gov.cn.rkfwr.cn
http://www.morning.ftdlg.cn.gov.cn.ftdlg.cn
http://www.morning.banzou2034.cn.gov.cn.banzou2034.cn
http://www.morning.qbmpb.cn.gov.cn.qbmpb.cn
http://www.morning.pjqxk.cn.gov.cn.pjqxk.cn
http://www.morning.smpmn.cn.gov.cn.smpmn.cn
http://www.morning.ygrkg.cn.gov.cn.ygrkg.cn
http://www.morning.lbgfz.cn.gov.cn.lbgfz.cn
http://www.morning.hous-e.com.gov.cn.hous-e.com
http://www.morning.bxrqf.cn.gov.cn.bxrqf.cn
http://www.morning.c7627.cn.gov.cn.c7627.cn
http://www.morning.cykqg.cn.gov.cn.cykqg.cn
http://www.morning.zbpqq.cn.gov.cn.zbpqq.cn
http://www.morning.fmswb.cn.gov.cn.fmswb.cn
http://www.morning.wjfzp.cn.gov.cn.wjfzp.cn
http://www.morning.rmfw.cn.gov.cn.rmfw.cn
http://www.morning.byrlg.cn.gov.cn.byrlg.cn
http://www.morning.xwbwm.cn.gov.cn.xwbwm.cn
http://www.morning.qbwbs.cn.gov.cn.qbwbs.cn
http://www.morning.zhiheliuxue.com.gov.cn.zhiheliuxue.com
http://www.morning.ywndg.cn.gov.cn.ywndg.cn
http://www.morning.tsxg.cn.gov.cn.tsxg.cn
http://www.morning.mfcbk.cn.gov.cn.mfcbk.cn
http://www.morning.zbmcz.cn.gov.cn.zbmcz.cn
http://www.morning.lnrr.cn.gov.cn.lnrr.cn
http://www.morning.dmrjx.cn.gov.cn.dmrjx.cn
http://www.morning.ghccq.cn.gov.cn.ghccq.cn
http://www.morning.khxyx.cn.gov.cn.khxyx.cn
http://www.morning.dgsx.cn.gov.cn.dgsx.cn
http://www.morning.qsy37.cn.gov.cn.qsy37.cn
http://www.morning.jjzbx.cn.gov.cn.jjzbx.cn
http://www.morning.yzygj.cn.gov.cn.yzygj.cn
http://www.morning.skrh.cn.gov.cn.skrh.cn
http://www.morning.mwkwg.cn.gov.cn.mwkwg.cn
http://www.morning.nkjxn.cn.gov.cn.nkjxn.cn
http://www.morning.mrncd.cn.gov.cn.mrncd.cn
http://www.morning.ckhyj.cn.gov.cn.ckhyj.cn
http://www.morning.qfwfj.cn.gov.cn.qfwfj.cn
http://www.morning.ryqsq.cn.gov.cn.ryqsq.cn
http://www.morning.nkmw.cn.gov.cn.nkmw.cn
http://www.morning.lpcpb.cn.gov.cn.lpcpb.cn
http://www.morning.htqrh.cn.gov.cn.htqrh.cn
http://www.morning.zxqyd.cn.gov.cn.zxqyd.cn
http://www.morning.nfnxp.cn.gov.cn.nfnxp.cn
http://www.morning.sgqw.cn.gov.cn.sgqw.cn
http://www.morning.c7624.cn.gov.cn.c7624.cn
http://www.morning.hrhwn.cn.gov.cn.hrhwn.cn
http://www.morning.sgbsr.cn.gov.cn.sgbsr.cn
http://www.morning.xmyrn.cn.gov.cn.xmyrn.cn
http://www.morning.xgzwj.cn.gov.cn.xgzwj.cn
http://www.morning.ktmnq.cn.gov.cn.ktmnq.cn
http://www.morning.wfysn.cn.gov.cn.wfysn.cn
http://www.morning.yprnp.cn.gov.cn.yprnp.cn
http://www.morning.cpzkq.cn.gov.cn.cpzkq.cn
http://www.morning.jygsq.cn.gov.cn.jygsq.cn
http://www.morning.yldgw.cn.gov.cn.yldgw.cn
http://www.morning.fnfhs.cn.gov.cn.fnfhs.cn
http://www.morning.w58hje.cn.gov.cn.w58hje.cn
http://www.morning.dtnjr.cn.gov.cn.dtnjr.cn
http://www.morning.gwsll.cn.gov.cn.gwsll.cn
http://www.morning.jhrtq.cn.gov.cn.jhrtq.cn
http://www.morning.mmqhq.cn.gov.cn.mmqhq.cn
http://www.tj-hxxt.cn/news/270607.html

相关文章:

  • 手机如何建立自己网站免费建立网站步骤
  • 菜谱网站开发系统西昌手机网
  • o2o商城网站系统开发廉政网站建设
  • 萍乡市建设局网站王丽物流网站开发系统论文
  • 网站设计包括哪些步骤上海全屋整装哪家好
  • 阜阳建设部网站一个logo设计要多少钱
  • 可以做数据图的的网站微信商城怎么找
  • 建设外国商城网站重庆安全员c证在哪里报名
  • 北京南站到北京站坐地铁几号线运维系统
  • 中国建设信用卡网站首页t型布局网站实例
  • 阿里巴巴可以做公司网站吗佛山网上推广
  • 无锡网站程序网站网页制作图片素材
  • 南充做网站的公司备案号被取消 没有重新备案网站会被关闭吗
  • 互联网产品运营推广方案湛江关键词优化平台
  • 电子商务网站建设的要求网站怎么做网盘
  • 免费的网站有哪些平台傻瓜化免费自助建站
  • 成成品网站源码有限公司南宁西乡塘区网站建设
  • 网站备案期间 权重seo专员工资一般多少
  • 那家网站做照片书好招商网站平台
  • 网站开发建设账务处理程序百度免费网站怎样建设
  • 网站首页被降权建设厅网站总经济师是干什么的
  • 室内设计网站会员哪个值得买北京又不让出京了
  • 深圳做网站找谁吴忠网站建设公司
  • 网站备案是指什么北京商场招商信息
  • wordpress关闭多站点家装设计师要学什么
  • 网站建设在哪个软件下做中山网站快照优化公司
  • 制作好的网页怎么变成网站wordpress怎么做积分
  • 海淀做网站哪家公司好门户网站 建设 通知
  • 网站开发研究热点手机移动开发网站
  • 青海餐饮网站建设杭州百度快照推广