当前位置: 首页 > news >正文

承德网站建设作用90设计网站怎么样

承德网站建设作用,90设计网站怎么样,广州市公司网站建设,西安网站搜索引擎优化目录 一、前情提要 二、代码Demo #xff08;一#xff09;多写问题 #xff08;二#xff09;如果要两个流写一个表#xff0c;这种情况怎么处理#xff1f; #xff08;三#xff09;测试结果 三、后序 一、前情提要 基于数据湖对两条实时流进行拼接#xff0…目录 一、前情提要 二、代码Demo 一多写问题 二如果要两个流写一个表这种情况怎么处理 三测试结果 三、后序 一、前情提要 基于数据湖对两条实时流进行拼接如前端埋点服务端埋点、日志流订单流等 基础概念见前一篇文章基于数据湖的多流拼接方案-HUDI概念篇_Leonardo_KY的博客-CSDN博客 二、代码Demo 下文demo均使用datagen生成mock数据进行测试如到生产改成Kafka或者其他source即可。 第一个jobstream A落hudi表 env.getCheckpointConfig().setMinPauseBetweenCheckpoints(minPauseBetweenCP); // 1senv.getCheckpointConfig().setCheckpointTimeout(checkpointTimeout); // 2 minenv.getCheckpointConfig().setMaxConcurrentCheckpoints(maxConcurrentCheckpoints);// env.getCheckpointConfig().setCheckpointStorage(file:///D:/Users/yakang.lu/tmp/checkpoints/);TableEnvironment tableEnv StreamTableEnvironment.create(env);// datagentableEnv.executeSql(CREATE TABLE sourceA (\n uuid bigint PRIMARY KEY NOT ENFORCED,\n name VARCHAR(3), _ts1 TIMESTAMP(3)\n ) WITH (\n connector datagen, \n fields.uuid.kindsequence,\n fields.uuid.start0, \n fields.uuid.end1000000, \n rows-per-second 1 \n ));// huditableEnv.executeSql(create table hudi_tableA(\n uuid bigint PRIMARY KEY NOT ENFORCED,\n age int,\n name VARCHAR(3),\n _ts1 TIMESTAMP(3),\n _ts2 TIMESTAMP(3),\n d VARCHAR(10)\n )\n PARTITIONED BY (d)\n with (\n connector hudi,\n path hdfs://ns/user/hive/warehouse/ctripdi_prodb.db/hudi_mor_mutil_source_test, \n // hdfs path table.type MERGE_ON_READ,\n write.bucket_assign.tasks 10,\n write.tasks 10,\n write.partition.format yyyyMMddHH,\n write.partition.timestamp.type EPOCHMILLISECONDS,\n hoodie.bucket.index.num.buckets 2,\n changelog.enabled true,\n index.type BUCKET,\n hoodie.bucket.index.num.buckets 2,\n String.format( %s %s,\n, FlinkOptions.PRECOMBINE_FIELD.key(), _ts1) write.payload.class PartialUpdateAvroPayload.class.getName() ,\n hoodie.write.log.suffix job1,\n hoodie.write.concurrency.mode optimistic_concurrency_control,\n hoodie.write.lock.provider org.apache.hudi.client.transaction.lock.FileSystemBasedLockProvider,\n hoodie.cleaner.policy.failed.writes LAZY,\n hoodie.cleaner.policy KEEP_LATEST_BY_HOURS,\n hoodie.consistency.check.enabled false,\n// hoodie.write.lock.early.conflict.detection.enable true,\n // todo// hoodie.write.lock.early.conflict.detection.strategy // SimpleTransactionDirectMarkerBasedEarlyConflictDetectionStrategy.class.getName() ,\n // hoodie.keep.min.commits 1440,\n hoodie.keep.max.commits 2880,\n compaction.schedule.enabledfalse,\n compaction.async.enabledfalse,\n compaction.trigger.strategynum_or_time,\n compaction.delta_commits 3,\n compaction.delta_seconds 60,\n compaction.max_memory 3096,\n clean.async.enabled false,\n hive_sync.enable false\n// hive_sync.mode hms,\n// hive_sync.db %s,\n// hive_sync.table %s,\n// hive_sync.metastore.uris %s\n ));// sqlStatementSet statementSet tableEnv.createStatementSet();String sqlString insert into hudi_tableA(uuid, name, _ts1, d) select * from (select *,date_format(CURRENT_TIMESTAMP,yyyyMMdd) AS d from sourceA) view1;statementSet.addInsertSql(sqlString);statementSet.execute(); 第二个jobstream B落hudi表 StreamExecutionEnvironment env manager.getEnv();env.getCheckpointConfig().setMinPauseBetweenCheckpoints(minPauseBetweenCP); // 1senv.getCheckpointConfig().setCheckpointTimeout(checkpointTimeout); // 2 minenv.getCheckpointConfig().setMaxConcurrentCheckpoints(maxConcurrentCheckpoints);// env.getCheckpointConfig().setCheckpointStorage(file:///D:/Users/yakang.lu/tmp/checkpoints/);TableEnvironment tableEnv StreamTableEnvironment.create(env);// datagentableEnv.executeSql(CREATE TABLE sourceB (\n uuid bigint PRIMARY KEY NOT ENFORCED,\n age int, _ts2 TIMESTAMP(3)\n ) WITH (\n connector datagen, \n fields.uuid.kindsequence,\n fields.uuid.start0, \n fields.uuid.end1000000, \n rows-per-second 1 \n ));// huditableEnv.executeSql(create table hudi_tableB(\n uuid bigint PRIMARY KEY NOT ENFORCED,\n age int,\n name VARCHAR(3),\n _ts1 TIMESTAMP(3),\n _ts2 TIMESTAMP(3),\n d VARCHAR(10)\n )\n PARTITIONED BY (d)\n with (\n connector hudi,\n path hdfs://ns/user/hive/warehouse/ctripdi_prodb.db/hudi_mor_mutil_source_test, \n // hdfs path table.type MERGE_ON_READ,\n write.bucket_assign.tasks 10,\n write.tasks 10,\n write.partition.format yyyyMMddHH,\n hoodie.bucket.index.num.buckets 2,\n changelog.enabled true,\n index.type BUCKET,\n hoodie.bucket.index.num.buckets 2,\n String.format( %s %s,\n, FlinkOptions.PRECOMBINE_FIELD.key(), _ts1) write.payload.class PartialUpdateAvroPayload.class.getName() ,\n hoodie.write.log.suffix job2,\n hoodie.write.concurrency.mode optimistic_concurrency_control,\n hoodie.write.lock.provider org.apache.hudi.client.transaction.lock.FileSystemBasedLockProvider,\n hoodie.cleaner.policy.failed.writes LAZY,\n hoodie.cleaner.policy KEEP_LATEST_BY_HOURS,\n hoodie.consistency.check.enabled false,\n// hoodie.write.lock.early.conflict.detection.enable true,\n // todo// hoodie.write.lock.early.conflict.detection.strategy // SimpleTransactionDirectMarkerBasedEarlyConflictDetectionStrategy.class.getName() ,\n hoodie.keep.min.commits 1440,\n hoodie.keep.max.commits 2880,\n compaction.schedule.enabledtrue,\n compaction.async.enabledtrue,\n compaction.trigger.strategynum_or_time,\n compaction.delta_commits 2,\n compaction.delta_seconds 90,\n compaction.max_memory 3096,\n clean.async.enabled false\n// hive_sync.mode hms,\n// hive_sync.db %s,\n// hive_sync.table %s,\n// hive_sync.metastore.uris %s\n ));// sqlStatementSet statementSet tableEnv.createStatementSet();String sqlString insert into hudi_tableB(uuid, age, _ts1, _ts2, d) select * from (select *, _ts2 as ts1, date_format(CURRENT_TIMESTAMP,yyyyMMdd) AS d from sourceB) view2;// statementSet.addInsertSql(insert into hudi_tableB(uuid, age, _ts2) select * from sourceB);statementSet.addInsertSql(sqlString);statementSet.execute(); 也可以将两个 writer 放到同一个app中使用statement import java.time.ZoneOffset; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.StatementSet; import org.apache.flink.table.api.TableConfig; import org.apache.flink.table.api.TableEnvironment; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.table.api.config.TableConfigOptions; import org.apache.hudi.common.model.PartialUpdateAvroPayload; import org.apache.hudi.configuration.FlinkOptions; // import org.apache.hudi.table.marker.SimpleTransactionDirectMarkerBasedEarlyConflictDetectionStrategy;public class Test00 {public static void main(String[] args) {Configuration configuration TableConfig.getDefault().getConfiguration();configuration.setString(TableConfigOptions.LOCAL_TIME_ZONE, ZoneOffset.ofHours(8).toString());//设置东八区// configuration.setInteger(rest.port, 8086);StreamExecutionEnvironment env StreamExecutionEnvironment.createLocalEnvironment(configuration);env.setParallelism(1);env.enableCheckpointing(12000L);// env.getCheckpointConfig().setCheckpointStorage(file:///Users/laifei/tmp/checkpoints/);TableEnvironment tableEnv StreamTableEnvironment.create(env);// datagentableEnv.executeSql(CREATE TABLE sourceA (\n uuid bigint PRIMARY KEY NOT ENFORCED,\n name VARCHAR(3), _ts1 TIMESTAMP(3)\n ) WITH (\n connector datagen, \n fields.uuid.kindsequence,\n fields.uuid.start0, \n fields.uuid.end1000000, \n rows-per-second 1 \n ));tableEnv.executeSql(CREATE TABLE sourceB (\n uuid bigint PRIMARY KEY NOT ENFORCED,\n age int, _ts2 TIMESTAMP(3)\n ) WITH (\n connector datagen, \n fields.uuid.kindsequence,\n fields.uuid.start0, \n fields.uuid.end1000000, \n rows-per-second 1 \n ));// huditableEnv.executeSql(create table hudi_tableA(\n uuid bigint PRIMARY KEY NOT ENFORCED,\n name VARCHAR(3),\n age int,\n _ts1 TIMESTAMP(3),\n _ts2 TIMESTAMP(3)\n )\n PARTITIONED BY (_ts1)\n with (\n connector hudi,\n path file:\\D:\\Ctrip\\dataWork\\tmp, \n // hdfs path table.type MERGE_ON_READ,\n write.bucket_assign.tasks 2,\n write.tasks 2,\n write.partition.format yyyyMMddHH,\n write.partition.timestamp.type EPOCHMILLISECONDS,\n hoodie.bucket.index.num.buckets 2,\n changelog.enabled true,\n index.type BUCKET,\n hoodie.bucket.index.num.buckets 2,\n// String.format( %s %s,\n, FlinkOptions.PRECOMBINE_FIELD.key(), _ts1:name|_ts2:age)// write.payload.class PartialUpdateAvroPayload.class.getName() ,\n hoodie.write.log.suffix job1,\n hoodie.write.concurrency.mode optimistic_concurrency_control,\n hoodie.write.lock.provider org.apache.hudi.client.transaction.lock.FileSystemBasedLockProvider,\n hoodie.cleaner.policy.failed.writes LAZY,\n hoodie.cleaner.policy KEEP_LATEST_BY_HOURS,\n hoodie.consistency.check.enabled false,\n hoodie.write.lock.early.conflict.detection.enable true,\n hoodie.write.lock.early.conflict.detection.strategy // SimpleTransactionDirectMarkerBasedEarlyConflictDetectionStrategy.class.getName() ,\n // hoodie.keep.min.commits 1440,\n hoodie.keep.max.commits 2880,\n compaction.schedule.enabledfalse,\n compaction.async.enabledfalse,\n compaction.trigger.strategynum_or_time,\n compaction.delta_commits 3,\n compaction.delta_seconds 60,\n compaction.max_memory 3096,\n clean.async.enabled false,\n hive_sync.enable false\n// hive_sync.mode hms,\n// hive_sync.db %s,\n// hive_sync.table %s,\n// hive_sync.metastore.uris %s\n ));tableEnv.executeSql(create table hudi_tableB(\n uuid bigint PRIMARY KEY NOT ENFORCED,\n name VARCHAR(3),\n age int,\n _ts1 TIMESTAMP(3),\n _ts2 TIMESTAMP(3)\n )\n PARTITIONED BY (_ts2)\n with (\n connector hudi,\n path /Users/laifei/tmp/hudi/local.db/mutiwrite1, \n // hdfs path table.type MERGE_ON_READ,\n write.bucket_assign.tasks 2,\n write.tasks 2,\n write.partition.format yyyyMMddHH,\n hoodie.bucket.index.num.buckets 2,\n changelog.enabled true,\n index.type BUCKET,\n hoodie.bucket.index.num.buckets 2,\n// String.format( %s %s,\n, FlinkOptions.PRECOMBINE_FIELD.key(), _ts1:name|_ts2:age)// write.payload.class PartialUpdateAvroPayload.class.getName() ,\n hoodie.write.log.suffix job2,\n hoodie.write.concurrency.mode optimistic_concurrency_control,\n hoodie.write.lock.provider org.apache.hudi.client.transaction.lock.FileSystemBasedLockProvider,\n hoodie.cleaner.policy.failed.writes LAZY,\n hoodie.cleaner.policy KEEP_LATEST_BY_HOURS,\n hoodie.consistency.check.enabled false,\n hoodie.write.lock.early.conflict.detection.enable true,\n hoodie.write.lock.early.conflict.detection.strategy // SimpleTransactionDirectMarkerBasedEarlyConflictDetectionStrategy.class.getName() ,\n hoodie.keep.min.commits 1440,\n hoodie.keep.max.commits 2880,\n compaction.schedule.enabledtrue,\n compaction.async.enabledtrue,\n compaction.trigger.strategynum_or_time,\n compaction.delta_commits 2,\n compaction.delta_seconds 90,\n compaction.max_memory 3096,\n clean.async.enabled false\n// hive_sync.mode hms,\n// hive_sync.db %s,\n// hive_sync.table %s,\n// hive_sync.metastore.uris %s\n ));// sqlStatementSet statementSet tableEnv.createStatementSet();statementSet.addInsertSql(insert into hudi_tableA(uuid, name, _ts1) select * from sourceA);statementSet.addInsertSql(insert into hudi_tableB(uuid, age, _ts2) select * from sourceB);statementSet.execute();} } 一多写问题 由于HUDI官方提供的code打成jar包是不支持“多写”的这里使用Tencent改造之后的code进行打包测试 如果使用官方包多个writer写入同一个hudi表则会报如下异常 而且 hudi中有个preCombineField在建表的时候只能指定其中一个字段为preCombineField但是如果使用官方版本双流写同一个hudi的时候出现两种情况 1. 一条流写preCombineField另一条流不写这个字段后者会出现 ordering value不能为null 2. 两条流都写这个字段出现字段冲突异常 二如果要两个流写一个表这种情况怎么处理 经过本地测试 hudi0.12-multiWrite版本Tencent修改版可以支持多 precombineField在此版本中只要保证主键、分区字段之外的字段在多个流中不冲突即可实现多写 hudi0.13版本不支持而且存在上述问题  三测试结果 Tencent文章链接https://cloud.tencent.com/developer/article/2189914 github链接GitHub - XuQianJin-Stars/hudi at multiwrite-master-7 hudi打包很麻烦如果需要我将后续上传打好的jar包 三、后序 基于上述code当流量比较大的时候似乎会存在一定程度的数据丢失在其中一条流进行compact则另一条流就会存在一定程度的数据丢失 可以尝试 1先将两个流UNION为一个流再sink到hudi表也避免了写冲突 2使用其他数据湖工具比如apache paimon参考新一代数据湖存储技术Apache Paimon入门Demo_Leonardo_KY的博客-CSDN博客
文章转载自:
http://www.morning.sfzwm.cn.gov.cn.sfzwm.cn
http://www.morning.dthyq.cn.gov.cn.dthyq.cn
http://www.morning.jfch.cn.gov.cn.jfch.cn
http://www.morning.mqbsm.cn.gov.cn.mqbsm.cn
http://www.morning.cfpq.cn.gov.cn.cfpq.cn
http://www.morning.gmgyt.cn.gov.cn.gmgyt.cn
http://www.morning.wfzlt.cn.gov.cn.wfzlt.cn
http://www.morning.gjqgz.cn.gov.cn.gjqgz.cn
http://www.morning.rbyz.cn.gov.cn.rbyz.cn
http://www.morning.dxpqd.cn.gov.cn.dxpqd.cn
http://www.morning.lrgfd.cn.gov.cn.lrgfd.cn
http://www.morning.hkysq.cn.gov.cn.hkysq.cn
http://www.morning.bdfph.cn.gov.cn.bdfph.cn
http://www.morning.ldsgm.cn.gov.cn.ldsgm.cn
http://www.morning.dxqwm.cn.gov.cn.dxqwm.cn
http://www.morning.lsnhs.cn.gov.cn.lsnhs.cn
http://www.morning.mbpzw.cn.gov.cn.mbpzw.cn
http://www.morning.qpfmh.cn.gov.cn.qpfmh.cn
http://www.morning.trrhj.cn.gov.cn.trrhj.cn
http://www.morning.qmmfr.cn.gov.cn.qmmfr.cn
http://www.morning.prxqd.cn.gov.cn.prxqd.cn
http://www.morning.tckxl.cn.gov.cn.tckxl.cn
http://www.morning.btsls.cn.gov.cn.btsls.cn
http://www.morning.zcmpk.cn.gov.cn.zcmpk.cn
http://www.morning.ckhpg.cn.gov.cn.ckhpg.cn
http://www.morning.srgbr.cn.gov.cn.srgbr.cn
http://www.morning.tpps.cn.gov.cn.tpps.cn
http://www.morning.sqgqh.cn.gov.cn.sqgqh.cn
http://www.morning.pfkrw.cn.gov.cn.pfkrw.cn
http://www.morning.mlwpr.cn.gov.cn.mlwpr.cn
http://www.morning.rcyrm.cn.gov.cn.rcyrm.cn
http://www.morning.zrgsg.cn.gov.cn.zrgsg.cn
http://www.morning.wzwpz.cn.gov.cn.wzwpz.cn
http://www.morning.jzsgn.cn.gov.cn.jzsgn.cn
http://www.morning.smj79.cn.gov.cn.smj79.cn
http://www.morning.qgfy.cn.gov.cn.qgfy.cn
http://www.morning.tnthd.cn.gov.cn.tnthd.cn
http://www.morning.sqskm.cn.gov.cn.sqskm.cn
http://www.morning.qtryb.cn.gov.cn.qtryb.cn
http://www.morning.jmbgl.cn.gov.cn.jmbgl.cn
http://www.morning.yhgbd.cn.gov.cn.yhgbd.cn
http://www.morning.xcbnc.cn.gov.cn.xcbnc.cn
http://www.morning.msgrq.cn.gov.cn.msgrq.cn
http://www.morning.gxcym.cn.gov.cn.gxcym.cn
http://www.morning.tkyxl.cn.gov.cn.tkyxl.cn
http://www.morning.wrcgy.cn.gov.cn.wrcgy.cn
http://www.morning.gyrdn.cn.gov.cn.gyrdn.cn
http://www.morning.fqssx.cn.gov.cn.fqssx.cn
http://www.morning.tqrjj.cn.gov.cn.tqrjj.cn
http://www.morning.mtdfn.cn.gov.cn.mtdfn.cn
http://www.morning.lwmzp.cn.gov.cn.lwmzp.cn
http://www.morning.llfwg.cn.gov.cn.llfwg.cn
http://www.morning.rhph.cn.gov.cn.rhph.cn
http://www.morning.kfclh.cn.gov.cn.kfclh.cn
http://www.morning.gtmdq.cn.gov.cn.gtmdq.cn
http://www.morning.yfstt.cn.gov.cn.yfstt.cn
http://www.morning.fmrrr.cn.gov.cn.fmrrr.cn
http://www.morning.nxwk.cn.gov.cn.nxwk.cn
http://www.morning.lpskm.cn.gov.cn.lpskm.cn
http://www.morning.ldhbs.cn.gov.cn.ldhbs.cn
http://www.morning.lhrxq.cn.gov.cn.lhrxq.cn
http://www.morning.gjlxn.cn.gov.cn.gjlxn.cn
http://www.morning.nmfwm.cn.gov.cn.nmfwm.cn
http://www.morning.phlrp.cn.gov.cn.phlrp.cn
http://www.morning.lddpj.cn.gov.cn.lddpj.cn
http://www.morning.rbmnq.cn.gov.cn.rbmnq.cn
http://www.morning.ksgjn.cn.gov.cn.ksgjn.cn
http://www.morning.nrcbx.cn.gov.cn.nrcbx.cn
http://www.morning.lqznq.cn.gov.cn.lqznq.cn
http://www.morning.weiwt.com.gov.cn.weiwt.com
http://www.morning.tjwfk.cn.gov.cn.tjwfk.cn
http://www.morning.zpfqh.cn.gov.cn.zpfqh.cn
http://www.morning.gnjtg.cn.gov.cn.gnjtg.cn
http://www.morning.ysbhj.cn.gov.cn.ysbhj.cn
http://www.morning.cyysq.cn.gov.cn.cyysq.cn
http://www.morning.wflsk.cn.gov.cn.wflsk.cn
http://www.morning.dkqbc.cn.gov.cn.dkqbc.cn
http://www.morning.qsy37.cn.gov.cn.qsy37.cn
http://www.morning.nllst.cn.gov.cn.nllst.cn
http://www.morning.fmkjx.cn.gov.cn.fmkjx.cn
http://www.tj-hxxt.cn/news/242526.html

相关文章:

  • 做网站 教程在线教育网站建设
  • 网站模板源代码wordpress一键分享
  • 电子政务和网站建设工作的总结软件商店下载安装免费
  • 网上商城网站设计手机网站如何生成app
  • 网站建设个人网站wordpress积分交换
  • 凯里网站建设gzklyy基于asp.net网站开发视频教程
  • 网站建设用户登录源码网站开发不提供源代码
  • 网站报错500郴州新网招聘网最新招聘信息
  • 昆明网站设计都需要设计什么WordPress外链转内链插件
  • 怎么做网站免宠物用品销售网站建设和技术现状
  • 广州天河区有什么好玩的没有网站可以做seo吗
  • 网络推广公司企业深圳seo论坛
  • 卖狗做网站什么关键词最好光谷做网站推广公司
  • 网站管理后台登录地址王者荣耀网站开发目的
  • 深圳松岗做网站江苏网页设计公司
  • 微信开发公司aso优化渠道
  • 网站做授权登录wordpress拖拽式
  • wordpress修改网站标题自助建网站平台怎么收费
  • 平泉市住房和城乡建设局网站微信会员卡管理系统
  • 网站备案名称更改自己做一个网页怎么做
  • 网站名称需要用注册吗表格制作手机软件
  • 台州网站建设维护东莞万江网站建设公司
  • 如何选择网站模板阿里巴巴与慧聪网网站建设对比
  • 2017网站备案微信建一个网站
  • 做网站是如果盈利的免费地方门户网站源码
  • 工信部公布网站备案拍照背景域名有了怎么建设网站
  • 完整酒店网站开发茂名网站建设哪家强
  • 网站被攻击打不开怎么办南昌网站排名
  • 聊城网站制作信息wordpress链接网页位置
  • 网站策划方案案例wordpress文章中加入代码段