当前位置: 首页 > news >正文

网站建设规划方案论文未来5年网络规划设计师

网站建设规划方案论文,未来5年网络规划设计师,网站建设项目延期验收申请报告,百度联盟 网站备案第 1 章 介绍 Apache Iceberg 是一种用于大型分析数据集的开放表格#xff0c;Iceberge 向 Trino 和 Spark 添加了使用高性能格式的表#xff0c;就像 Sql 表一样。 Iceberg 为了避免出现不变要的一些意外#xff0c;表结构和组织并不会实际删除#xff0c;用户也不需要特…第 1 章 介绍 Apache Iceberg 是一种用于大型分析数据集的开放表格Iceberge 向 Trino 和 Spark 添加了使用高性能格式的表就像 Sql 表一样。 Iceberg 为了避免出现不变要的一些意外表结构和组织并不会实际删除用户也不需要特意了解分区便可进行快速查询。 Iceberg 的表支持快速添加、删除、更新或重命名操作将分区列进行隐藏避免用户错误的使用分区和进行极慢的查询。分区列也会随着表数据量或查询模式的变化而自动更新。表可以根据时间进行表快照方便用户根据时间进行检查更改。提供版本回滚方便用户纠错数据。Iceberg 是为大表而建的Iceberg 用于生产中其中单表数据量可包含 10pb 左右数据 甚至可以在没有分布式 SQL 引擎的情况下读取这些巨量数据。 查询计划非常迅速不需要分布式 SQL 引擎来读取数据高级过滤可以使用分区和列来过滤查询这些数据可适用于任何云存储表的任何操作都是原子性的用户不会看到部分或未提交的内容。使用多个并发器进行写入并使用乐观锁重试的机制来解决兼容性问题第 2 章 构建 Iceberg 构建 Iceberge 需要 Grade 5.41 和 java8 或 java11 的环境 2.1 构建 Iceberg 1.上传 iceberg-apache-iceberg-0.11.1.zip并进行解压 [roothadoop103 software]# unzip iceberg-apache-iceberg-0.11.1.zip -d /opt/module/ [roothadoop103 software]# cd /opt/module/iceberg-apache-iceberg-0.11.1/ 2.修改对应版本 [roothadoop103 iceberg-apache-iceberg-0.11.1]# vim versions.props org.apache.flink:* 1.11.0 org.apache.hadoop:* 3.1.3 org.apache.hive:hive-metastore 2.3.7org.apache.hive:hive-serde 2.3.7 org.apache.spark:spark-hive_2.12 3.0.1 org.apache.hive:hive-exec 2.3.7 org.apache.hive:hive-service 2.3.7 3.修改国内镜像 [roothadoop103 iceberg-apache-iceberg-0.11.1]# vim build.gradle buildscript { repositories { jcenter() gradlePluginPortal() maven{ url http://maven.aliyun.com/nexus/content/groups/public/ } maven{ url http://maven.aliyun.com/nexus/content/repositories/jcenter} maven { url http://palantir.bintray.com/releases } maven { url https://plugins.gradle.org/m2/ } }allprojects { group org.apache.iceberg version getProjectVersion() repositories { maven{ url http://maven.aliyun.com/nexus/content/groups/public/} maven{ url http://maven.aliyun.com/nexus/content/repositories/jcenter} maven { url http://palantir.bintray.com/releases } mavenCentral() mavenLocal() } } 4.构建项目 [roothadoop103 iceberg-apache-iceberg-0.11.1]# ./gradlew build -x test 第 3 章 Spark 操作 3.1.配置参数和 jar 包 1.将构建好的 Iceberg 的 spark 模块 jar 包复制到 spark jars 下 [roothadoop103]/opt/module/iceberg-apache-iceberg-0.11.1/spark3-extensions/build/libs/ [roothadoop103 libs]# cp *.jar /opt/module/spark-3.0.1-bin-hadoop2.7/jars/ [roothadoop103 libs]# cd /opt/module/iceberg-apache-iceberg-0.11.1/spark3-runtime/build/libs/ [roothadoop103 libs]# cp *.jar /opt/module/spark-3.0.1-bin-hadoop2.7/jars/ 2.配置 spark 参数配置 Spark Sql Catlog可以用两种方式基于 hive 和基于 hadoop,这里先选择基于 hadoop。 [roothadoop103 libs]# cd /opt/module/spark-3.0.1-bin-hadoop2.7/conf/ [roothadoop103 conf]# vim spark-defaults.conf spark.sql.catalog.hive_prod org.apache.iceberg.spark.SparkCatalog spark.sql.catalog.hive_prod.type hive spark.sql.catalog.hive_prod.uri thrift://hadoop101:9083spark.sql.catalog.hadoop_prod org.apache.iceberg.spark.SparkCatalog spark.sql.catalog.hadoop_prod.type hadoopspark.sql.catalog.hadoop_prod.warehouse hdfs://mycluster/spark/warehousespark.sql.catalog.catalog-name.type hadoop spark.sql.catalog.catalog-name.default-namespace db spark.sql.catalog.catalog-name.uri thrift://hadoop101:9083 spark.sql.catalog.catalog-name.warehouse hdfs://mycluster/spark/warehouse 3.2 Spark sql 操作 正在上传…重新上传取消使用 spark sql 创建 iceberg 表,配置完毕后会多出一个 hadoop_prod.db 数据库但是注意这个数据库通过 show tables 是看不到的 [roothadoop103 ~]# spark-sql spark-sql (default) use hadoop_prod.db; create table testA( id bigint, name string, age int, dt string) USING iceberg PARTITIONED by(dt); 2.插入数据 spark-sql (default) insert into testA values(1,张三,18,2021-06-21); 3.查询 spark-sql (default) select *from testA; 3.2.1over write 操作 1覆盖操作与 hive 一样会将原始数据重新刷新 spark-sql (default) insert overwrite testA values(2,李四,20,2021-06-21); spark-sql (default) select *from testA; 3.2.2动态覆盖 1.Spark 的默认覆盖模式是静态的但在写入 iceberg 时建议使用动态覆盖模式。静态覆盖模式需要制定分区列动态覆盖模式不需要。 spark-sql (default) insert overwrite testA values(2,李四,20,2021-06-21); spark-sql (default) select *from testA; 2.设置动态覆盖模式修改 spark-default.conf添加对应参数 [roothadoop103 conf]# vim spark-defaults.conf spark.sql.sources.partitionOverwriteModedynamic 3.创建一张表结构与 testA 完全一致的表 testB create table hadoop_prod.db.testB( id bigint, name string, age int, dt string) USING iceberg PARTITIONED by(dt); 4.向 testA 表中再插入一条数据 spark-sql (default) use hadoop_prod.db;spark-sql (default) insert into testA values(1,张三,18,2021-06-22); 5.查询 testA 表此时 testA 表中有两条记录 spark-sql (default) select *from testA; 6.通过动态覆盖模式将 A 表插入到 B 表中 spark-sql (default) insert overwrite testB select *from testA; 7.查询 testB 表可以看到效果与 hive 中的动态分区一样自动根据列的顺序进行匹配插入无须手动指定分区。 spark-sql (default) select *from testB; 3.2.3静态覆盖 1.静态覆盖则跟 hive 插入时手动指定分区一致需要手动指定分区列的值 spark-sql (default) insert overwrite testB Partition(dt2021-06-26) select id,name,age from testA; 2.查询表数据 spark-sql (default) select *from testB; 3.2.4删除数据 1.iceberg 并不会物理删除数据下面演示 delete 操作根据分区列进行删除 testB 表数据 ​​​​​​​spark-sql (default) delete from testB where dt 2021-06-21 and dt 2021-06-26; 2.提示删除成功再次查询数据。发现表中已无数据但是存在 hdfs 上的物理并没有实际删除 3.查看 hdfs 数据仍然存在。 ​​​​​​​3.2.5历史快照 1.每张表都拥有一张历史表历史表的表名为当前表加上后缀.history注意查询历史表的时候必须是表的全称不可用先切到 hadoop.db 库中再查 testB spark-sql (default) select *from hadoop_prod.db.testB.history; 2.可以查看到每次操作后的对应的快照记录也可以查询对应快照表快照表的表名在  原表基础上加上.snapshots也是一样必须是表的全称不能简写 spark-sql (default) select *from hadoop_prod.db.testB.snapshots; 3.可以在看到 commit 的时间snapshot 快照的 idparent_id 父节点operation 操作类型 已经 summary 概要summary 概要字段中可以看到数据量大小总条数路径等信息。 两张表也可以根据 snapshot_id 快照 id 进行 join 关联查询。 spark-sql (default) select *from hadoop_prod.db.testB.history a join hadoop_prod.db.testB.snapshots b on a.snapshot_idb.snapshot_id ; 4.知道了快照表和快照信息后可以根据快照 id 来查询具体的历史信息发进行检测是否误操作如果是误操作则可通过 spark 重新刷新数据。查询方式如下 scala spark.read.option(snapshot-id,5549650043576786799).format(iceberg).load(/hive/w arehouse/db/testB).show 3.2.6隐藏分区有 bug 时区不对 1.days 函数 1上面演示了创建分区表 接下来演示创建隐藏分区表。隐藏分区支持的函数有 years,months,days,hours,bucket,truncate。比如接下来创建一张 testC 表表中有id,name 和 ts时间戳。 create table hadoop_prod.db.testC( id bigint, name string, ts timestamp) using iceberg partitioned by (days(ts)); 2创建成功分别往里面插入不同天时间戳的数据 spark-sql (default) insert into hadoop_prod.db.testC values(1,张三,cast(1624773600 as timestamp)),(2,李四,cast(1624860000 as timestamp)); 3插入成功之后再来查询表数据。 spark-sql (default) select *from hadoop_prod.db.testC; 4可以看到有两条数据并且日期也不是同一天查看 hdfs 上对应的分区。已经自动按天进行了分区。 2.years 函数 1删除 testC 表重新建表字段还是不变分区字段使用 years 函数 spark-sql (default) drop table hadoop_prod.db.testC; create table hadoop_prod.db.testC( id bigint, name string, ts timestamp) using iceberg partitioned by (years(ts)); 2同样插入两条不同年时间戳的数据进行查询对比 spark-sql (default) insert into hadoop_prod.db.testC values(1,张三,cast(1624860000 as timestamp)),(2,李四,cast(1593324000 as timestamp)); 3查询数据 spark-sql (default) select *from hadoop_prod.db.testC; 4再查看 hdfs 对应的地址已经按年建好分区 3.month 函数 1删除 testC 表重新建表字段不变 使用 month 函数进行分区 spark-sql (default) drop table hadoop_prod.db.testC; create table hadoop_prod.db.testC( id bigint, name string, ts timestamp) using iceberg partitioned by (months(ts)); 2同样插入不同月份时间戳的两条记录 spark-sql (default) insert into hadoop_prod.db.testC values(1,张三,cast(1624860000 as timestamp)),(2,李四,cast(1622181600 as timestamp)); 3查询数据和 hdfs 对应地址 spark-sql (default) select *from hadoop_prod.db.testC; 4.hours 函数 1删除 testC 表重新建表字段不变使用 hours 函数 spark-sql (default) drop table hadoop_prod.db.testC; create table hadoop_prod.db.testC( id bigint, name string, ts timestamp) using iceberg partitioned by (hours(ts)); 2插入两条不同小时的时间戳数据 spark-sql (default) insert into hadoop_prod.db.testC values(1,张三,cast(1622181600 as timestamp)),(2,李四,cast(1622178000 as timestamp)); 3查询数据和 hdfs 地址 发现时区不对修改对应参数 再次启动 spark sql 插入数据查看 hdfs 路径,还是错误分区目录bugbucket 函数有 bug正在上传…重新上传取消正在上传…重新上传取消正在上传…重新上传取消删除 testC 表重新创建表字段不变使用 bucket 函数。分桶 hash 算法采用 Murmur3 hash官网介绍 https://iceberg.apache.org/spec/#partition-transforms插入一批测试数据,为什么分多批插入有 bug:如果一批数据中有数据被分到同一个桶里会报错(1002,张 10,cast(1622152800 as timestamp)),(1004,李 10,cast(1622178000 as timestamp)); 查看表数据和 hdfs 路径 spark-sql (default) select *from hadoop_prod.db.testC; truncate 函数删除表重新建表字段不变使用 truncate 函数截取长度来进行分区 插入一批测试数据 正在上传…重新上传取消正在上传…重新上传取消正在上传…重新上传取消 正在上传…重新上传取消正在上传…重新上传取消正在上传…重新上传取消 查询表数据和 hdfs 地址,分区目录为 id 数/4 得到的值计算方式是 /不是%。 spark-sql (default) select *from hadoop_prod.db.testC; 第 4 章 DataFrame 操作 配置 Resources1将自己 hadoop 集群的客户端配置文件复制到 resource 下方便 local 模式调试 配置 pom.xml1配置相关依赖 读取表 读取快照写入表写入数据并创建表编写代码执行验证进入 spark sql 窗口查看表结构和表数据spark-sql (default) desc test1; spark-sql (default) select *from test1; 查看 hdfs是否按 dt 进行分区 写数据Append编写代码执行 执行完毕后进行测试注意小 bug,执行完代码后如果 spark sql 黑窗口不重新打开是不会刷新数据的只有把 spark sql 窗口界面重新打开才会刷新数据。如果使用代码查询能看到最新数据关闭再次进入查询可以查询到数据OverWrite编写代码测试 查询 显示手动指定覆盖分区查询,2021-06-30 分区的数据已经被覆盖走模拟数仓表模型1表模型底下 6 张基础表合成一张宽表再基于宽表统计指标 建表语句1建表语句 建表语句.txt 测试数据1测试数据上传到 hadoop作为第一层 ods 编写代码dwd 层创建目录划分层级 编写所需实体类 编写 DwdIcebergService package com.atguigu.iceberg.warehouse.service import java.sql.Timestamp import java.time.LocalDate import java.time.format.DateTimeFormatter import com.atguigu.iceberg.warehouse.bean.{BaseWebsite, MemberRegType, VipLevel} import org.apache.spark.sql.SparkSession object DwdIcebergService { def readOdsData(sparkSession: SparkSession) { import org.apache.spark.sql.functions._ import sparkSession.implicits._ sparkSession.read.json(/ods/baseadlog.log) .withColumn(adid, col(adid).cast(Int)) .writeTo(hadoop_prod.db.dwd_base_ad).overwritePartitions() sparkSession.read.json(/ods/baswewebsite.log).map(item { val createtime item.getAs[String](createtime) val str LocalDate.parse(createtime, DateTimeFormatter.ofPattern(yyyy-MM-dd)).atStartOfDay(). format(DateTimeFormatter.ofPattern(yyyy-MM-dd HH:mm:ss)) BaseWebsite(item.getAs[String](siteid).toInt, item.getAs[String](sitename), item.getAs[String](siteurl), item.getAs[String](delete).toInt, Timestamp.valueOf(str), item.getAs[String](creator), item.getAs[String](dn)) }).writeTo(hadoop_prod.db.dwd_base_website).overwritePartitions() sparkSession.read.json(/ods/member.log).drop(dn) .withColumn(uid, col(uid).cast(int)) .withColumn(ad_id, col(ad_id).cast(int)) .writeTo(hadoop_prod.db.dwd_member).overwritePartitions() sparkSession.read.json(/ods/memberRegtype.log).drop(domain).drop(dn) .withColumn(regsourcename, col(regsource)) .map(item { val createtime item.getAs[String](createtime) val str LocalDate.parse(createtime, DateTimeFormatter.ofPattern(yyyy-MM-dd)).atStartOfDay(). format(DateTimeFormatter.ofPattern(yyyy-MM-dd HH:mm:ss)) MemberRegType(item.getAs[String](uid).toInt, item.getAs[String](appkey), item.getAs[String](appregurl), item.getAs[String](bdp_uuid), Timestamp.valueOf(str), item.getAs[String](isranreg), item.getAs[String](regsource), item.getAs[String](regsourcename), item.getAs[String](websiteid).toInt, item.getAs[String](dt)) }).writeTo(hadoop_prod.db.dwd_member_regtype).overwritePartitions() sparkSession.read.json(/ods/pcenterMemViplevel.log).drop(discountval) .map(item { val startTime item.getAs[String](start_time) val endTime item.getAs[String](end_time) val last_modify_time item.getAs[String](last_modify_time) val startTimeStr LocalDate.parse(startTime, DateTimeFormatter.ofPattern(yyyy-MM-dd)).atStartOfDay(). format(DateTimeFormatter.ofPattern(yyyy-MM-dd HH:mm:ss)) val endTimeStr LocalDate.parse(endTime, DateTimeFormatter.ofPattern(yyyy-MM-dd)).atStartOfDay(). format(DateTimeFormatter.ofPattern(yyyy-MM-dd HH:mm:ss)) val last_modify_timeStr LocalDate.parse(last_modify_time, DateTimeFormatter.ofPattern(yyyy-MM-dd)).atStartOfDay(). format(DateTimeFormatter.ofPattern(yyyy-MM-dd HH:mm:ss)) VipLevel(item.getAs[String](vip_id).toInt, item.getAs[String](vip_level), Timestamp.valueOf(startTimeStr), Timestamp.valueOf(endTimeStr), Timestamp.valueOf(last_modify_timeStr), item.getAs[String](max_free), item.getAs[String](min_free), item.getAs[String](next_level), item.getAs[String](operator), item.getAs[String](dn)) }).writeTo(hadoop_prod.db.dwd_vip_level).overwritePartitions() sparkSession.read.json(/ods/pcentermempaymoney.log) .withColumn(uid, col(uid).cast(int)) .withColumn(siteid, col(siteid).cast(int)) .withColumn(vip_id, col(vip_id).cast(int)) .writeTo(hadoop_prod.db.dwd_pcentermempaymoney).overwritePartitions() } } 转存失败重新上传取消正在上传…重新上传取消正在上传…重新上传取消 编写 DwdIcebergController dws 层表指定多个分区列会有 bug创建 case class 创建 DwdIcebergDao 操作六张基础表 编写 DwsIcebergService处理业务 package com.atguigu.iceberg.warehouse.service import java.sql.Timestamp import java.time.LocalDateTime import java.time.format.DateTimeFormatter import com.atguigu.iceberg.warehouse.bean.{DwsMember, DwsMember_Result} import com.atguigu.iceberg.warehouse.dao.DwDIcebergDao import org.apache.spark.sql.SparkSession object DwsIcebergService { def getDwsMemberData(sparkSession: SparkSession, dt: String) { import sparkSession.implicits._ val dwdPcentermempaymoney DwDIcebergDao.getDwdPcentermempaymoney(sparkSession).where($dt  dt) val dwdVipLevel DwDIcebergDao.getDwdVipLevel(sparkSession) val dwdMember DwDIcebergDao.getDwdMember(sparkSession).where($dt dt) val dwdBaseWebsite DwDIcebergDao.getDwdBaseWebsite(sparkSession) val dwdMemberRegtype DwDIcebergDao.getDwdMemberRegtyp(sparkSession).where($dt dt) val dwdBaseAd DwDIcebergDao.getDwdBaseAd(sparkSession) val result dwdMember.join(dwdMemberRegtype.drop(dt), Seq(uid), left) .join(dwdPcentermempaymoney.drop(dt), Seq(uid), left) .join(dwdBaseAd, Seq(ad_id, dn), left) .join(dwdBaseWebsite, Seq(siteid, dn), left) .join(dwdVipLevel, Seq(vip_id, dn), left_outer) .select(uid, ad_id, fullname, iconurl, lastlogin, mailaddr, memberlevel, password , paymoney, phone, qq, register, regupdatetime, unitname, userip, zipcode, appkey , appregurl, bdp_uuid, reg_createtime, isranreg, regsource, regsourcename, adname , siteid, sitename, siteurl, site_delete, site_createtime, site_creator, vip_id, vip_level, vip_start_time, vip_end_time, vip_last_modify_time, vip_max_free, vip_min_free, vip_next_level , vip_operator, dt, dn).as[DwsMember] val resultData result.groupByKey(item item.uid _ item.dn) .mapGroups { case (key, iters) val keys key.split(_) val uid Integer.parseInt(keys(0)) val dn keys(1) val dwsMembers iters.toList val paymoney dwsMembers.filter(_.paymoney ! null).map(itemBigDecimal.apply(item.paymoney)).reduceOption(_ _).getOrElse(BigDecimal.apply(0.00)).toString val ad_id dwsMembers.map(_.ad_id).head val fullname dwsMembers.map(_.fullname).head val icounurl dwsMembers.map(_.iconurl).head val lastlogin dwsMembers.map(_.lastlogin).head val mailaddr dwsMembers.map(_.mailaddr).head val memberlevel dwsMembers.map(_.memberlevel).head val password dwsMembers.map(_.password).head val phone dwsMembers.map(_.phone).head val qq dwsMembers.map(_.qq).head val register dwsMembers.map(_.register).head val regupdatetime dwsMembers.map(_.regupdatetime).head val unitname dwsMembers.map(_.unitname).head val userip dwsMembers.map(_.userip).head val zipcode dwsMembers.map(_.zipcode).head val appkey dwsMembers.map(_.appkey).head val appregurl dwsMembers.map(_.appregurl).head val bdp_uuid dwsMembers.map(_.bdp_uuid).head val reg_createtime if (dwsMembers.map(_.reg_createtime).head ! null) dwsMembers.map(_.reg_createtime).head else 1970-01-01 00:00:00 val isranreg dwsMembers.map(_.isranreg).head val regsource dwsMembers.map(_.regsource).head val regsourcename dwsMembers.map(_.regsourcename).head val adname dwsMembers.map(_.adname).head val siteid if (dwsMembers.map(_.siteid).head ! null) dwsMembers.map(_.siteid).head else 0 val sitename dwsMembers.map(_.sitename).head val siteurl dwsMembers.map(_.siteurl).head val site_delete dwsMembers.map(_.site_delete).head val site_createtime dwsMembers.map(_.site_createtime).head val site_creator dwsMembers.map(_.site_creator).head val vip_id if (dwsMembers.map(_.vip_id).head ! null) dwsMembers.map(_.vip_id).head else 0 val vip_level dwsMembers.map(_.vip_level).max val vip_start_time if (dwsMembers.map(_.vip_start_time).min ! null) dwsMembers.map(_.vip_start_time).min else 1970-01-01 00:00:00 val vip_end_time if (dwsMembers.map(_.vip_end_time).max ! null) dwsMembers.map(_.vip_end_time).max else 1970-01-01 00:00:00 val vip_last_modify_time  if (dwsMembers.map(_.vip_last_modify_time).max ! null) dwsMembers.map(_.vip_last_modify_time).max else 1970-01-01 00:00:00 val vip_max_free  dwsMembers.map(_.vip_max_free).head val vip_min_free  dwsMembers.map(_.vip_min_free).head val vip_next_level dwsMembers.map(_.vip_next_level).head val vip_operator  dwsMembers.map(_.vip_operator).head val formatter DateTimeFormatter.ofPattern(yyyy-MM-dd HH:mm:ss) val reg_createtimeStr LocalDateTime.parse(reg_createtime, formatter); val vip_start_timeStr LocalDateTime.parse(vip_start_time, formatter) val vip_end_timeStr LocalDateTime.parse(vip_end_time, formatter) val vip_last_modify_timeStr  LocalDateTime.parse(vip_last_modify_time, formatter) DwsMember_Result(uid, ad_id, fullname, icounurl, lastlogin, mailaddr, memberlevel, password, paymoney, phone, qq, register, regupdatetime, unitname, userip, zipcode, appkey, appregurl, bdp_uuid, Timestamp.valueOf(reg_createtimeStr), isranreg, regsource, regsourcename, adname, siteid.toInt, sitename, siteurl, site_delete, site_createtime, site_creator, vip_id.toInt, vip_level, Timestamp.valueOf(vip_start_timeStr), Timestamp.valueOf(vip_end_timeStr), Timestamp.valueOf(vip_last_modify_timeStr), vip_max_free, vip_min_free, vip_next_level, vip_operator, dt, dn) 编写 DwsIcebergController,进行运行测试发生报错和上面在 spark sql 黑窗口测试的错误一致当有批量数据插入分区时提示分区已关闭无法插入 重新建表分区列去掉 dn,只用 dtbug不能指定多个分区只能指定一个分区列 建完表后重新测试插入数据成功 ads 层编写所需 case class 编写 DwsIcebergDao查询宽表 编写 AdsIcebergService,统计指标 package com.atguigu.iceberg.warehouse.service import com.atguigu.iceberg.warehouse.bean.QueryResult import com.atguigu.iceberg.warehouse.dao.DwsIcebergDao import org.apache.spark.sql.expressions.Window import org.apache.spark.sql.{SaveMode, SparkSession} object AdsIcebergService { def queryDetails(sparkSession: SparkSession, dt: String) { import sparkSession.implicits._ val result DwsIcebergDao.queryDwsMemberData(sparkSession).as[QueryResult].where(sdt${dt}) result.cache() //统计根据 url 统计人数 wordcount result.mapPartitions(partition  { partition.map(item (item.appregurl _ item.dn _ item.dt, 1)) }).groupByKey(_._1) .mapValues(item item._2).reduceGroups(_ _) .map(item { val keys item._1.split(_) val appregurl keys(0) val dn keys(1) val dt keys(2) (appregurl, item._2, dt, dn) }).toDF(appregurl, num, dt, dn).writeTo(hadoop_prod.db.ads_register_appregurlnum).overwritePartitions() //统计各 memberlevel 等级 支付金额前三的用户import org.apache.spark.sql.functions._ result.withColumn(rownum, row_number().over(Window.partitionBy(memberlevel).orderBy(desc(paymoney)))) .where(rownum4).orderBy(memberlevel, rownum) 编写 AdsIcebergController,进行本地测试 查询验证结果yarn 测试local 模式测试完毕后将代码打成 jar 包提交到集群上进行测试那么插入模式当前都是为 overwrite 模式所以在 yarn 上测试的时候也无需删除历史数据打 jar 包之前注意将代码中 setMast(local[*]) 注释了把集群上有的依赖也可用scopeprovided/scope剔除了打一个瘦包 打成 jar 包提交到集群运行 spark-submit 命令运行 yarn 模式。第 5 章 Structured Streaming 操作 5.1 基于 Structured Streaming 落明细数据 创建测试 topic启动 kafka,创建测试用的 topic 导入依赖 编写 producer 往 topic 里发送测试数据 创建测试表 partitioned by(days(ts)); 编写代码 基于 test1 的测试数据编写结构化流代码进行测试 package com.atguigu.iceberg.spark.structuredstreaming import java.sql.Timestamp import org.apache.spark.SparkConf import org.apache.spark.sql.{Dataset, SparkSession} object TestTopicOperators { def main(args: Array[String]): Unit { val sparkConf new SparkConf() .set(spark.sql.catalog.hadoop_prod, org.apache.iceberg.spark.SparkCatalog) .set(spark.sql.catalog.hadoop_prod.type, hadoop) .set(spark.sql.catalog.hadoop_prod.warehouse, hdfs://mycluster/hive/warehouse) .set(spark.sql.catalog.catalog-name.type, hadoop) .set(spark.sql.catalog.catalog-name.default-namespace, default) .set(spark.sql.sources.partitionOverwriteMode, dynamic) .set(spark.sql.session.timeZone, GMT8) .set(spark.sql.shuffle.partitions, 12) //.setMaster(local[*]) .setAppName(test_topic) val sparkSession SparkSession.builder().config(sparkConf).getOrCreate() val df sparkSession.readStream.format(kafka) .option(kafka.bootstrap.servers, hadoop101:9092,hadoop102:9092,hadoop103:9092) .option(subscribe, test1) .option(startingOffsets, earliest) .option(maxOffsetsPerTrigger, 10000) .load() import sparkSession.implicits._ val query df.selectExpr(cast (value as string)).as[String] .map(item { val array item.split(\t) val uid array(0) val courseid array(1) val deviceid array(2) val ts array(3) Test1(uid.toLong, courseid.toInt, deviceid.toInt, new Timestamp(ts.toLong)) }).writeStream.foreachBatch { (batchDF: Dataset[Test1], batchid: Long) batchDF.writeTo(hadoop_prod.db.test_topic).overwritePartitions() }.option(checkpointLocation, /ss/checkpoint) .start() query.awaitTermination() } case class Test1(uid: BigInt, courseid: Int, deviceid: Int, ts: Timestamp) } 提交 yarn 测试速度打成 jar 包上传到集群运行代码跑 yarn 模式 让 vcore 个数和 shuffle 分区数保持1:1 最高效运行 运行起来后查看 Spark Web UI 界面监控速度。趋于稳定后可以看到速度能到每秒10200,条左右已经达到了我参数所设置的上限。当然分区数kafka 分区和 shuffle 分区 和 vcore 越多实时性也会越高目前测试是 12 分区。实时性没问题但是有一个缺点没有像 hudi 一样解决小文件问题。解决过多文件数可以更改 trigger 触发时间但也会影响实时效率两者中和考虑使用。 最后是花了 18 分钟跑完 1000 万条数据查询表数据观察是否有数据丢失。数据没有丢失。第 6 章存在的问题和缺点 问题时区无法设置Spark Sql 黑窗口缓存无法更新修改表数据后得需要关了黑窗口再重新打开,查询才是更新后的数据表分区如果指定多个分区或分桶那么插入批量数据时如果这一批数据有多条数据在同一个分区会报错缺点与 hudi 相比没有解决小文件问题与 hudi 相比缺少行级更新只能对表的数据按分区进行 overwrite 全量覆盖第 7 章 Flink 操作 配置参数和 jar 包Flink1.11 开始就不在提供 flink-shaded-hadoop-2-uber 的支持所以如果需要 flink 支持hadoop 得配置环境变量 HADOOP_CLASSPATH 目前 Iceberg 只支持 flink1.11.x 的版本所以我这使用 flink1.11.0,将构建好的 Iceberg 的 jar 包复制到 flink 下 Flink SQL Client在 hadoop 环境下启动一个单独的 flink 集群 启动 flin sql client使用 Catalogs 创建目录flink 可以通过 sql client 来创建 catalogs 目录 支持的方式有 hive catalog,hadoop catalog,custom catlog。我这里采用 hadoop catlog。使用当前 catalog 创建 sql-client-defaults.yaml方便以后启动 flink-sql 客户端走 iceberg 目录Flink Sql 操作建库再次启动 Flink Sql 客户端 可以使用默认数据库也可以创建数据库 使用 iceberg 数据库 建表flink 不支持隐藏分区1建表我这里直接创建分区表了使用 flink 对接 iceberg 不能使用 iceberg 的隐藏分区这一特性目前还不支持。 like 建表insert into使用 insert into 插入数据 查询 任务监控1 可 查 看 hadoop103 默 认 端 口 8081 查 看 standlone 模 式 任 务 是 否 成 功 插入数据后同样 hdfs 路径上也是有对应目录和数据块 insert overwrite 使用 overwrite 插入 flink 默认使用流的方式插入数据这个时候流的插入是不支持 overwrite 操作的需要将插入模式进行修改改成批的插入方式,再次使用 overwrite 插入数据。如需要改回流式操作参数设置为 SET execution.type streaming ;查询结果已经将结果根据分区进行覆盖操作第 8 章 Flink API 操作 配置 pom.xml 1配置相关依赖 !-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-common -- dependency groupIdorg.apache.flink/groupId artifactIdflink-table-common/artifactId version${flink.version}/version /dependency !-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-api-java -- dependency groupIdorg.apache.flink/groupId artifactIdflink-table-api-java/artifactId version${flink.version}/version /dependency !-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-api-java-bridge -- dependency groupIdorg.apache.flink/groupId artifactIdflink-table-api-java-bridge_${scala.binary.version}/artifactId version${flink.version}/version /dependency !-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-planner -- dependency groupIdorg.apache.flink/groupId artifactIdflink-table-planner_${scala.binary.version}/artifactId version${flink.version}/version /dependency !-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-planner-blink -- dependency groupIdorg.apache.flink/groupId artifactIdflink-table-planner-blink_${scala.binary.version}/artifactId version${flink.version}/version /dependency dependency groupIdorg.apache.iceberg/groupId artifactIdiceberg-flink-runtime/artifactId version0.11.1/version /dependency !-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-common -- dependency groupIdorg.apache.hadoop/groupId artifactIdhadoop-client/artifactId version3.1.3/version /dependency !-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients -- dependency groupIdorg.apache.flink/groupId artifactIdflink-clients_${scala.binary.version}/artifactId version${flink.version}/version /dependency /dependencies /project 读取表数据batch read 1通过 batch 的方式去读取数据 streaming read 通过 streaming 的方式去读取数据 启动之后程序不会立马停止 因为是流处理这个时候手动往表中追加一条数据Flink SQL insert into iceberg.testA values(3,哈哈哈,18,2021-07-01); 可以看到控制台实时打印出了数据 写数据Appending Data使用上面 create table testB like testA 的 testB 表读取 A 表数据插入到 B 表数据 采用的是 batch 批处理代码执行两次并查询查看 append 效果 Overwrite Data编写代码将 overwrite 设置为 true 查询 testB 表查看 overwrite 效果根据分区将数据进行了覆盖操作模拟数仓建表语句1还是根据上述 spark 表模型进行建表。建表语句 flink数仓建表语句.txt 编写代码dwd 层同样创建目录划分层级 添加 fastjson 依赖编写 service 层,读取 ods 层数据插入到 iceberg 表中 public class DwdIcebergSerivce { public void readOdsData(StreamExecutionEnvironment env) { DataStreamString baseadDS env.readTextFile(hdfs://mycluster/ods/baseadlog.log); DataStreamRowData baseadInput baseadDS.map(item - { JSONObject jsonObject JSONObject.parseObject(item); GenericRowData rowData new GenericRowData(3); rowData.setField(0, jsonObject.getIntValue(adid)); rowData.setField(1, StringData.fromString(jsonObject.getString(adname))); rowData.setField(2, StringData.fromString(jsonObject.getString(dn))); return rowData; }); TableLoader dwdbaseadTable TableLoader.fromHadoopTable(hdfs://mycluster/flink/warehouse/iceberg/dwd_base_ad); FlinkSink.forRowData(baseadInput).tableLoader(dwdbaseadTable).overwrite(true).build(); DataStreamString basewebsiteDS env.readTextFile(hdfs://mycluster/ods/baswewebsite.log); DataStreamRowData basewebsiteInput basewebsiteDS.map(item - { JSONObject jsonObject JSONObject.parseObject(item); GenericRowData rowData new GenericRowData(7); rowData.setField(0, jsonObject.getIntValue(siteid)); rowData.setField(1, StringData.fromString(jsonObject.getString(sitename))); rowData.setField(2, StringData.fromString(jsonObject.getString(siteurl))); rowData.setField(3, jsonObject.getIntValue(delete)); LocalDateTime localDateTime LocalDate.parse(jsonObject.getString(createtime), DateTimeFormatter.ofPattern(yyyy-MM-dd)) .atStartOfDay(); rowData.setField(4, TimestampData.fromLocalDateTime(localDateTime)); rowData.setField(5, StringData.fromString(jsonObject.getString(creator))); rowData.setField(6, StringData.fromString(jsonObject.getString(dn))); return rowData; }); TableLoader dwdbasewebsiteTable TableLoader.fromHadoopTable(hdfs://mycluster/flink/warehouse/iceberg/dwd_base_website ); FlinkSink.forRowData(basewebsiteInput).tableLoader(dwdbasewebsiteTable).overwrite(true) .build(); DataStreamString memberDS  env.readTextFile(hdfs://mycluster/ods/member.log); DataStreamRowData memberInput memberDS.map(item - { JSONObject jsonObject JSONObject.parseObject(item); GenericRowData rowData new GenericRowData(19); rowData.setField(0, jsonObject.getIntValue(uid)); rowData.setField(1, jsonObject.getIntValue(ad_id)); rowData.setField(2, StringData.fromString(jsonObject.getString(birthday))); rowData.setField(3, StringData.fromString(jsonObject.getString(email))); rowData.setField(4, StringData.fromString(jsonObject.getString(fullname))); rowData.setField(5, StringData.fromString(jsonObject.getString(iconurl))); rowData.setField(6, StringData.fromString(jsonObject.getString(lastlogin))); rowData.setField(7, StringData.fromString(jsonObject.getString(mailaddr))); rowData.setField(8, StringData.fromString(jsonObject.getString(memberlevel))); rowData.setField(9, StringData.fromString(jsonObject.getString(password))); rowData.setField(10, StringData.fromString(jsonObject.getString(paymoney))); rowData.setField(11, StringData.fromString(jsonObject.getString(phone))); rowData.setField(12, StringData.fromString(jsonObject.getString(qq))); rowData.setField(13, StringData.fromString(jsonObject.getString(register))); rowData.setField(14, StringData.fromString(jsonObject.getString(regupdatetime))); rowData.setField(15, StringData.fromString(jsonObject.getString(unitname))); rowData.setField(16, StringData.fromString(jsonObject.getString(userip))); rowData.setField(17, StringData.fromString(jsonObject.getString(zipcode))); rowData.setField(18, StringData.fromString(jsonObject.getString(dt))); return rowData; }); TableLoader dwdmemberTable TableLoader.fromHadoopTable(hdfs://mycluster/flink/warehouse/iceberg/dwd_member); FlinkSink.forRowData(memberInput).tableLoader(dwdmemberTable).overwrite(true).build(); DataStreamString memberregtypDS env.readTextFile(hdfs://mycluster/ods/memberRegtype.log); DataStreamRowData memberregtypInput memberregtypDS.map(item - { JSONObject jsonObject JSONObject.parseObject(item); GenericRowData rowData new GenericRowData(10); rowData.setField(0, jsonObject.getIntValue(uid)); rowData.setField(1, StringData.fromString(jsonObject.getString(appkey))); rowData.setField(2, StringData.fromString(jsonObject.getString(appregurl))); rowData.setField(3, StringData.fromString(jsonObject.getString(bdp_uuid))); LocalDateTime localDateTime LocalDate.parse(jsonObject.getString(createtime), DateTimeFormatter.ofPattern(yyyy-MM-dd)) .atStartOfDay(); rowData.setField(4, TimestampData.fromLocalDateTime(localDateTime)); rowData.setField(5, StringData.fromString(jsonObject.getString(isranreg))); rowData.setField(6, StringData.fromString(jsonObject.getString(regsource))); rowData.setField(7, StringData.fromString(jsonObject.getString(regsource))); rowData.setField(8, jsonObject.getIntValue(websiteid)); rowData.setField(9, StringData.fromString(jsonObject.getString(dt))); return rowData; }); TableLoader dwdmemberregtypeTable TableLoader.fromHadoopTable(hdfs://mycluster/flink/warehouse/iceberg/dwd_member_regty pe); FlinkSink.forRowData(memberregtypInput).tableLoader(dwdmemberregtypeTable).overwrite(t rue).build(); DataStreamString memviplevelDS env.readTextFile(hdfs://mycluster/ods/pcenterMemViplevel.log); DataStreamRowData memviplevelInput memviplevelDS.map(item - { JSONObject jsonObject JSONObject.parseObject(item); GenericRowData rowData new GenericRowData(10); rowData.setField(0, jsonObject.getIntValue(vip_id)); rowData.setField(1, StringData.fromString(jsonObject.getString(vip_level))); LocalDateTime start_timeDate LocalDate.parse(jsonObject.getString(start_time), DateTimeFormatter.ofPattern(yyyy-MM-dd)) .atStartOfDay(); LocalDateTime end_timeDate  LocalDate.parse(jsonObject.getString(end_time), DateTimeFormatter.ofPattern(yyyy-MM-dd)) .atStartOfDay(); LocalDateTime last_modify_timeDate LocalDate.parse(jsonObject.getString(last_modify_time), DateTimeFormatter.ofPattern(yyyy-MM-dd)) .atStartOfDay(); rowData.setField(2, TimestampData.fromLocalDateTime(start_timeDate)); rowData.setField(3, TimestampData.fromLocalDateTime(end_timeDate)); rowData.setField(4, TimestampData.fromLocalDateTime(last_modify_timeDate)); rowData.setField(5, StringData.fromString(jsonObject.getString(max_free))); rowData.setField(6, StringData.fromString(jsonObject.getString(min_free))); rowData.setField(7, StringData.fromString(jsonObject.getString(next_level))); rowData.setField(8, StringData.fromString(jsonObject.getString(operator))); rowData.setField(9, StringData.fromString(jsonObject.getString(dn))); return rowData; }); TableLoader dwdviplevelTable TableLoader.fromHadoopTable(hdfs://mycluster/flink/warehouse/iceberg/dwd_vip_level); FlinkSink.forRowData(memviplevelInput).tableLoader(dwdviplevelTable).overwrite(true).b uild(); DataStreamString mempaymoneyDS env.readTextFile(hdfs://mycluster/ods/pcentermempaymoney.log); DataStreamRowData mempaymoneyInput mempaymoneyDS.map(item - { JSONObject jsonObject JSONObject.parseObject(item); GenericRowData rowData new GenericRowData(6); rowData.setField(0, jsonObject.getIntValue(uid)); rowData.setField(1, StringData.fromString(jsonObject.getString(paymoney))); rowData.setField(2, jsonObject.getIntValue(siteid)); rowData.setField(3, jsonObject.getIntValue(vip_id)); rowData.setField(4, StringData.fromString(jsonObject.getString(dt))); rowData.setField(5, StringData.fromString(jsonObject.getString(dn))); return rowData; }); TableLoader dwdmempaymoneyTable TableLoader.fromHadoopTable(hdfs://mycluster/flink/warehouse/iceberg/dwd_pcentermempa ymoney); FlinkSink.forRowData(mempaymoneyInput).tableLoader(dwdmempaymoneyTable).overwrite(true) .build(); } } 编写 dwd 层 controllerlocal 模式执行注意当一次性插入数据比较多是flink sink 最后可能会提示 writer 关 闭失败推测是 iceberg 并发写入的问题。 排查是否丢数据查询插入中逻辑最后的表 count 个数与原始日志数据的 count 个数做对比。 虽然发生 writer 关闭失败的错但是数据并没有问题。dws 层1编写 DwdIcebergDao查询 dwd 层基础表数据 public Table getDwdPcentermempaymoney(StreamExecutionEnvironment env, StreamTableEnvironment tableEnv) { TableLoader tableLoader TableLoader.fromHadoopTable(hdfs://mycluster/flink/warehouse/iceberg/dwd_pcentermempa ymoney); DataStreamRowData result FlinkSource.forRowData().env(env).tableLoader(tableLoader).streaming(false).build(); Table table tableEnv.fromDataStream(result); return table; } public Table getDwdVipLevel(StreamExecutionEnvironment env, StreamTableEnvironment tableEnv) { TableLoader tableLoader TableLoader.fromHadoopTable(hdfs://mycluster/flink/warehouse/iceberg/dwd_vip_level); DataStreamRowData result FlinkSource.forRowData().env(env).tableLoader(tableLoader).streaming(false).build(); Table table tableEnv.fromDataStream(result).renameColumns($(start_time).as(vip_start_time)) .renameColumns($(end_time).as(vip_end_time)).renameColumns($(last_mo dify_time).as(vip_last_modify_time)) .renameColumns($(max_free).as(vip_max_free)).renameColumns($(min_fre e).as(vip_min_free)) .renameColumns($(next_level).as(vip_next_level)).renameColumns($(ope rator).as(vip_operator)); return table; } public Table getDwdBaseWebsite(StreamExecutionEnvironment env, StreamTableEnvironment tableEnv) { TableLoader tableLoader TableLoader.fromHadoopTable(hdfs://mycluster/flink/warehouse/iceberg/dwd_base_website ); DataStreamRowData result FlinkSource.forRowData().env(env).tableLoader(tableLoader).streaming(false).build(); Table table tableEnv.fromDataStream(result).renameColumns($(delete).as(site_delete)) .renameColumns($(createtime).as(site_createtime)).renameColumns($(cr eator).as(site_creator)); return table; } public Table getDwdMemberRegtyp(StreamExecutionEnvironment env, StreamTableEnvironment tableEnv) { TableLoader tableLoader TableLoader.fromHadoopTable(hdfs://mycluster/flink/warehouse/iceberg/dwd_member_regty pe); DataStreamRowData result FlinkSource.forRowData().env(env).tableLoader(tableLoader).streaming(false).build(); Table table tableEnv.fromDataStream(result).renameColumns($(createtime).as(reg_createtime)); return table; } public Table getDwdBaseAd(StreamExecutionEnvironment env, StreamTableEnvironment tableEnv) { TableLoader tableLoader TableLoader.fromHadoopTable(hdfs://mycluster/flink/warehouse/iceberg/dwd_base_ad); DataStreamRowData result FlinkSource.forRowData().env(env).tableLoader(tableLoader).streaming(false).build(); Table table tableEnv.fromDataStream(result); return table; 编写 DwsIcebergService进行 join 形成宽表 package com.atguigu.iceberg.warhouse.service; import com.atguigu.iceberg.warhouse.dao.DwdIcebergDao; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.table.data.RowData; import org.apache.iceberg.flink.TableLoader; import org.apache.iceberg.flink.sink.FlinkSink; import static org.apache.flink.table.api.Expressions.$; public class DwsIcebergService { DwdIcebergDao dwdIcebergDao; public void getDwsMemberData(StreamExecutionEnvironment env, StreamTableEnvironment tableEnv, String dt) { dwdIcebergDao new DwdIcebergDao(); Table dwdPcentermempaymoney dwdIcebergDao.getDwdPcentermempaymoney(env, tableEnv).where($(dt).isEqual(dt)); Table dwdVipLevel dwdIcebergDao.getDwdVipLevel(env, tableEnv); Table dwdMember dwdIcebergDao.getDwdMember(env, tableEnv).where($(dt).isEqual(dt)); Table dwdBaseWebsite dwdIcebergDao.getDwdBaseWebsite(env, tableEnv); Table dwdMemberRegtype dwdIcebergDao.getDwdMemberRegtyp(env, tableEnv).where($(dt).isEqual(dt)); Table dwdBaseAd dwdIcebergDao.getDwdBaseAd(env, tableEnv); Table result dwdMember.dropColumns($(paymoney)).leftOuterJoin(dwdMemberRegtype.renameColumns($(u id).as(reg_uid)).dropColumns($(dt)), $(uid).isEqual($(reg_uid))) .leftOuterJoin(dwdPcentermempaymoney.renameColumns($(uid).as(pcen_uid) ).dropColumns($(dt)), $(uid).isEqual($(pcen_uid))) .leftOuterJoin(dwdBaseAd.renameColumns($(dn).as(basead_dn)), $(ad_id).isEqual($(adid)).and($(dn).isEqual($(basead_dn)))) .leftOuterJoin(dwdBaseWebsite.renameColumns($(siteid).as(web_siteid)). renameColumns($(dn).as(web_dn)), $(siteid).isEqual($(web_siteid)).and($(dn).isEqual(web_dn))) .leftOuterJoin(dwdVipLevel.renameColumns($(vip_id).as(v_vip_id)).rena meColumns($(dn).as(vip_dn)), $(vip_id).isEqual($(v_vip_id)).and($(dn).isEqual($(vip_dn)))) .groupBy($(uid)) .select($(uid), $(ad_id).min(), $(fullname).min(), $(iconurl).min(), $(lastlogin).min(), $(mailaddr).min(), $(memberlevel).min(), $(password).min() , $(paymoney).cast(DataTypes.DECIMAL(10, 4)).sum().cast(DataTypes.STRING()), $(phone).min(), $(qq).min(), $(register).min(), $(regupdatetime).min(), $(unitname).min(), $(userip).min(), $(zipcode).min(), $(appkey).min() , $(appregurl).min(), $(bdp_uuid).min(), $(reg_createtime).min().cast(DataTypes.STRING()), $(isranreg).min(), 编写 DwsIcebergController,调用 serviceads 层创建所需 bean private BigDecimal paymoney; private String dt; private String dn; public Integer getUid() { return uid; } public void setUid(Integer uid) { this.uid uid; } public Integer getAd_id() { return ad_id; } public void setAd_id(Integer ad_id) { this.ad_id  ad_id; } public String getMemberlevel() { return memberlevel; } public void setMemberlevel(String memberlevel) { this.memberlevel memberlevel; } public String getRegister() { return register; } public void setRegister(String register) { this.register register; } public String getAppregurl() { return appregurl; } public void setAppregurl(String appregurl) { this.appregurl appregurl; } public String getRegsource() { return regsource; } public void setRegsource(String regsource) { this.regsource regsource; } public String getRegsourcename() { return regsourcename; } public void setRegsourcename(String regsourcename) { this.regsourcename regsourcename; } public String getAdname() { return adname; 创建 DwsIcbergDao 查询宽表统计字段 创建 AdsIcebergService 编写统计逻辑 package com.atguigu.iceberg.warhouse.service; import  com.atguigu.iceberg.warhouse.bean.QueryResult; import  com.atguigu.iceberg.warhouse.dao.DwsIcbergDao; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.table.data.RowData; import org.apache.iceberg.flink.TableLoader; import org.apache.iceberg.flink.sink.FlinkSink; import static org.apache.flink.table.api.Expressions.$; public class AdsIcebergService { DwsIcbergDao dwsIcbergDao new DwsIcbergDao(); public void queryDetails(StreamExecutionEnvironment env, StreamTableEnvironment tableEnv, String dt) { Table table dwsIcbergDao.queryDwsMemberData(env, tableEnv).where($(dt).isEqual(dt)); DataStreamQueryResult queryResultDataStream tableEnv.toAppendStream(table, QueryResult.class); tableEnv.createTemporaryView(tmpA, queryResultDataStream); String sql select *from(select uid,memberlevel,register,appregurl ,regsourcename,adname,sitename,vip_level,cast(paymoney as decimal(10,4)),row_number() over (partition by memberlevel order by cast(paymoney as decimal(10,4)) desc) as rownum,dn,dt from tmpA where dt dt )  where rownum4 ; Table table1 tableEnv.sqlQuery(sql); DataStreamRowData top3DS tableEnv.toRetractStream(table1, RowData.class).filter(item - item.f0).map(item - item.f1); String sql2 select appregurl,count(uid),dn,dt from tmpA where dt dt group by appregurl,dn,dt; Table table2 tableEnv.sqlQuery(sql2); DataStreamRowData appregurlnumDS tableEnv.toRetractStream(table2, RowData.class).filter(item - item.f0).map(item - item.f1); TableLoader top3Table TableLoader.fromHadoopTable(hdfs://mycluster/flink/warehouse/iceberg/ads_register_top 3memberpay); TableLoader appregurlnumTable TableLoader.fromHadoopTable(hdfs://mycluster/flink/warehouse/iceberg/ads_register_app 创建创建 AdsController 控制流程Flink Streaming 落明细数据 创建测试表 编写代码 添加依赖 编写 Flink Streaming 代码消费前面 test1 的数据将数据写入 iceberg.test_topic 表中 import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010; import org.apache.flink.table.data.GenericRowData; import org.apache.flink.table.data.RowData; import org.apache.flink.table.data.StringData; import org.apache.iceberg.flink.TableLoader; import org.apache.iceberg.flink.sink.FlinkSink; import java.sql.Timestamp; import java.time.LocalDateTime; import java.time.format.DateTimeFormatter; import java.util.Properties; public class TestTopic { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment(); env.enableCheckpointing(1000l); Properties properties new Properties(); properties.setProperty(bootstrap.servers, hadoop101:9092,hadoop102:9092,hadoop103:9092); properties.setProperty(group.id, flink-test-group); FlinkKafkaConsumer010String kafakSource new FlinkKafkaConsumer010(test1, new SimpleStringSchema(), properties); kafakSource.setStartFromEarliest(); DataStreamRowData result env.addSource(kafakSource).map(item - { String[] array item.split(\t); Long uid Long.parseLong(array[0]); Integer courseid Integer.parseInt(array[1]); Integer deviceid Integer.parseInt(array[2]); DateTimeFormatter df DateTimeFormatter.ofPattern(yyyy-MM-dd); LocalDateTime localDateTime new Timestamp(Long.parseLong(array[3])).toLocalDateTime(); String dt df.format(localDateTime); GenericRowData rowData new GenericRowData(4); rowData.setField(0, uid); rowData.setField(1, courseid); rowData.setField(2, deviceid); rowData.setField(3, StringData.fromString(dt)); return rowData; }); TableLoader testtopicTable TableLoader.fromHadoopTable(hdfs://mycluster/flink/warehouse/iceberg/test_topic); FlinkSink.forRowData(result).tableLoader(testtopicTable).overwrite(true).build(); result.print(); env.execute(); } } 第 9 章 Flink 存在的问题 Flink 不支持 Iceberg 隐藏分区不支持通过计算列创建表不支持创建带水位线的表不支持添加列、删除列、重命名列
文章转载自:
http://www.morning.sqmlw.cn.gov.cn.sqmlw.cn
http://www.morning.mbdbe.cn.gov.cn.mbdbe.cn
http://www.morning.kntsd.cn.gov.cn.kntsd.cn
http://www.morning.stprd.cn.gov.cn.stprd.cn
http://www.morning.c7498.cn.gov.cn.c7498.cn
http://www.morning.hhxpl.cn.gov.cn.hhxpl.cn
http://www.morning.bmpjp.cn.gov.cn.bmpjp.cn
http://www.morning.thxfn.cn.gov.cn.thxfn.cn
http://www.morning.pszw.cn.gov.cn.pszw.cn
http://www.morning.qkqhr.cn.gov.cn.qkqhr.cn
http://www.morning.gbcxb.cn.gov.cn.gbcxb.cn
http://www.morning.jmmzt.cn.gov.cn.jmmzt.cn
http://www.morning.zjqwr.cn.gov.cn.zjqwr.cn
http://www.morning.bmmyx.cn.gov.cn.bmmyx.cn
http://www.morning.tjqcfw.cn.gov.cn.tjqcfw.cn
http://www.morning.hjlwt.cn.gov.cn.hjlwt.cn
http://www.morning.nndbz.cn.gov.cn.nndbz.cn
http://www.morning.ymtbr.cn.gov.cn.ymtbr.cn
http://www.morning.pypqf.cn.gov.cn.pypqf.cn
http://www.morning.qkkmd.cn.gov.cn.qkkmd.cn
http://www.morning.mgmyt.cn.gov.cn.mgmyt.cn
http://www.morning.grfhd.cn.gov.cn.grfhd.cn
http://www.morning.cgntj.cn.gov.cn.cgntj.cn
http://www.morning.dytqf.cn.gov.cn.dytqf.cn
http://www.morning.htpjl.cn.gov.cn.htpjl.cn
http://www.morning.rcww.cn.gov.cn.rcww.cn
http://www.morning.ldzss.cn.gov.cn.ldzss.cn
http://www.morning.4q9h.cn.gov.cn.4q9h.cn
http://www.morning.jbhhj.cn.gov.cn.jbhhj.cn
http://www.morning.bnlkc.cn.gov.cn.bnlkc.cn
http://www.morning.lpcpb.cn.gov.cn.lpcpb.cn
http://www.morning.zmpqt.cn.gov.cn.zmpqt.cn
http://www.morning.jzccn.cn.gov.cn.jzccn.cn
http://www.morning.bzsqr.cn.gov.cn.bzsqr.cn
http://www.morning.mqgqf.cn.gov.cn.mqgqf.cn
http://www.morning.trkhx.cn.gov.cn.trkhx.cn
http://www.morning.xesrd.com.gov.cn.xesrd.com
http://www.morning.hxljc.cn.gov.cn.hxljc.cn
http://www.morning.rrwft.cn.gov.cn.rrwft.cn
http://www.morning.xdwcg.cn.gov.cn.xdwcg.cn
http://www.morning.spbp.cn.gov.cn.spbp.cn
http://www.morning.cxlys.cn.gov.cn.cxlys.cn
http://www.morning.fqtzn.cn.gov.cn.fqtzn.cn
http://www.morning.rfbpq.cn.gov.cn.rfbpq.cn
http://www.morning.cpkcq.cn.gov.cn.cpkcq.cn
http://www.morning.qjfkz.cn.gov.cn.qjfkz.cn
http://www.morning.pcshb.cn.gov.cn.pcshb.cn
http://www.morning.mdgpp.cn.gov.cn.mdgpp.cn
http://www.morning.c7513.cn.gov.cn.c7513.cn
http://www.morning.zjrnq.cn.gov.cn.zjrnq.cn
http://www.morning.bpmmq.cn.gov.cn.bpmmq.cn
http://www.morning.hympq.cn.gov.cn.hympq.cn
http://www.morning.rwyd.cn.gov.cn.rwyd.cn
http://www.morning.sbpt.cn.gov.cn.sbpt.cn
http://www.morning.dysgr.cn.gov.cn.dysgr.cn
http://www.morning.hympq.cn.gov.cn.hympq.cn
http://www.morning.gqflj.cn.gov.cn.gqflj.cn
http://www.morning.wjjsg.cn.gov.cn.wjjsg.cn
http://www.morning.kzqpn.cn.gov.cn.kzqpn.cn
http://www.morning.mfxcg.cn.gov.cn.mfxcg.cn
http://www.morning.hxmqb.cn.gov.cn.hxmqb.cn
http://www.morning.hwcln.cn.gov.cn.hwcln.cn
http://www.morning.qhrsy.cn.gov.cn.qhrsy.cn
http://www.morning.jqbmj.cn.gov.cn.jqbmj.cn
http://www.morning.dxqfh.cn.gov.cn.dxqfh.cn
http://www.morning.pkmcr.cn.gov.cn.pkmcr.cn
http://www.morning.yuanshenglan.com.gov.cn.yuanshenglan.com
http://www.morning.pctql.cn.gov.cn.pctql.cn
http://www.morning.nfbkp.cn.gov.cn.nfbkp.cn
http://www.morning.pkpqh.cn.gov.cn.pkpqh.cn
http://www.morning.flfxb.cn.gov.cn.flfxb.cn
http://www.morning.hmwjk.cn.gov.cn.hmwjk.cn
http://www.morning.zsgbt.cn.gov.cn.zsgbt.cn
http://www.morning.bhdyr.cn.gov.cn.bhdyr.cn
http://www.morning.hqqpy.cn.gov.cn.hqqpy.cn
http://www.morning.zfyr.cn.gov.cn.zfyr.cn
http://www.morning.dkbsq.cn.gov.cn.dkbsq.cn
http://www.morning.bhxzx.cn.gov.cn.bhxzx.cn
http://www.morning.mlcnh.cn.gov.cn.mlcnh.cn
http://www.morning.qxmpp.cn.gov.cn.qxmpp.cn
http://www.tj-hxxt.cn/news/278143.html

相关文章:

  • 做网站合肥网页的网站建设在哪里
  • 深圳做网站(龙华信科)wordpress用户注册提醒
  • 敲代码做网站多少钱网业搜索
  • 搜狗怎么做网站开发手机网站多少钱
  • 郑州郑州网站建设河南做网站公司哪家好二级域名网站优化
  • 建设网站对企业有什么好处可以做公众号封面图的网站
  • 有专业做网站优化的吗app建设网站公司简介
  • 森森水族太原seo结算
  • 湘潭做网站 磐石网络很专业华为云网站定制
  • 织梦门户网站源码沈阳网站排名优化
  • 网页制作专业分析榜单优化
  • 静态网站制作流程东营网站建设服务
  • 购书网站开发的意义网站建设的主要内容
  • 网站建设工作都包括哪些方面建筑机械人才培训网官网
  • wordpress中英网站插件wordpress插件库
  • wap网站html模板湘潭企业网站建设 p磐石网络
  • 拓什么设计网站做设计的都用那些网站
  • 网站设计论文开题报告新闻热点
  • 做网站版权怎么写网站搭建的流程
  • 洋气的设计公司名字百度搜索引擎优化方式
  • 代理企业网站备案东莞市建筑设计院
  • 新竹网站结构设计软件有哪些
  • 做网站对比报告竞价点击软件工具
  • 上海做网站 公司排名下载app安装
  • 网站建设哪公司好浙江省建筑培训网
  • jsp网站开发教学视频教程做网站不给源码吗
  • flash个人网站源码网站首页设计收费
  • 网站竞价如何做企业网站建设图片
  • 做高端品牌生产商的网站wordpress forest
  • 黄岛网站建设负面消息处理网站引导页动态效果怎么做