wordpress 标签云页面,模板建网站怎么做seo,企业网站建设常见问题,广州互联网广告推广前言
spark操作iceberg之前先要配置spark catalogs#xff0c;详情参考Iceberg与Spark整合环境配置。
有些操作需要在spark3中开启iceberg sql扩展。
Iceberg使用Apache Spark的DataSourceV2 API来实现数据源和catalog。Spark DSv2是一个不断发展的API#xff0c;在Spark版…前言
spark操作iceberg之前先要配置spark catalogs详情参考Iceberg与Spark整合环境配置。
有些操作需要在spark3中开启iceberg sql扩展。
Iceberg使用Apache Spark的DataSourceV2 API来实现数据源和catalog。Spark DSv2是一个不断发展的API在Spark版本中具有不同级别的支持: Spark 3支持SQL INSERT INTO、MERGE INTO和INSERT OVERWRITE以及新的DataFrameWriterV2 API来进行iceberg表的写操作接下来我们进行详细讲解。
INSERT INTO
insert into是往iceberg表中插入新数据主要有两种语法
INSERT INTO prod.db.table VALUES (1, a), (2, b)INSERT INTO prod.db.table SELECT ...这两种语法和其它组件如hive等没有太多区别比较容易掌握。
MERGE INTO
Iceberg merge into语法可以对表数据进行行级更新或删除在Spark3.x版本之后支持其原理是重写包含需要删除和更新行数据所在的data files。merge into可以使用一个查询结果数据来更新目标表的数据其语法通过类似join关联方式根据指定的匹配条件对匹配的行数据进行相应操作。
语法
MERGE INTO tbl t -- 目标表
USING (SELECT ...) s -- 数据源表也就是用数据源表查出的数据来更新或删除目标表
ON t.id s.id -- 关联条件类似join的on条件
WHEN MATCHED AND ... THEN DELETE -- 删除直接用delete命令
WHEN MATCHED AND ... THEN UPDATE SET ... --更新用upate set
WHEN MATCHED AND ... AND ... THEN UPDATE SET ... --多条件更新
WHEN NOT MATCHED ADN ... THEN INSERT (col1,col2...) VALUES(s.col1,s.col2 ...) --匹配不上向目标表插入数据示例
创建两张表a和b
create table hadoop_prod.default.a (id int,name string,age int) using iceberg;create table hadoop_prod.default.b (id int,name string,age int,tp string) using iceberg插入数据
insert into hadoop_prod.default.a values (1,zs,18),(2,ls,19),(3,ww,20)insert into hadoop_prod.default.b values (1,zs,30,delete),(2,李四,31,update),(4,王五,32,add)使用MERGE INTO 语法向目标表更新、删除、新增数据 这里我们计划将b表与a表匹配id如果b表中tp字段是delete那么a表中对应的id数据删除如果b表中tp字段是update那么a表中对应的id数据其他字段进行更新如果a表与b表id匹配不上那么将b表中的数据插入到a表中具体操作如下
merge into hadoop_prod.default.a t1 -- 目标表a
using (select id,name ,age,tp from hadoop_prod.default.b) t2 -- 数据源表b
on t1.id t2.id -- 关联条件为id
when matched and t2.tp delete then delete -- 如果数据源表中tp字段为delete则对目标表关联d对应的数据进行删除操作
when matched and t2.tp update then update set t1.name t2.name,t1.age t2.age -- 如果数据源表tp字段为update则对目标表关联id对应数据用数据源表中name和age更新目标表对应字段
when not matched then insert (id,name,age) values (t2.id,t2.name,t2.age) -- 如果id关联不上则直接把数据源表对应id这条数据插入到目标表中注意我们很多数据库都没有类似merge into的操作为了便于初学者理解每一行操作都有详细的注释。 结果 id1可以匹配上但数据源表tp为delete因此会把目标表id1对应的行删除 id2可以匹配上但数据源表tp为update因此会把目标表id2对应的name和age用数据源表name和age进行更新 id3没有匹配上需要把数据源表对应的这条数据插入到目标表但是由于数据源中没有id3的数据因此没有插入数据此时保留数据源表中id3对应的数据 id4没有匹配上需要把数据源表对应的这条数据插入到目标表 注意更新数据时在查询的数据中只能有一条匹配的数据更新到目标表否则将报错。 INSERT OVERWRITE
insert overwrite可以覆盖Iceberg表中的数据这种操作会将表中全部数据替换掉建议如果有部分数据替换操作可以使用merge into操作。
对于Iceberg分区表使用insert overwrite操作时有两种情况第一种是“动态覆盖”第二种是“静态覆盖”。 动态分区覆盖 动态覆盖会全量将原有数据覆盖并将新插入的数据根据Iceberg表分区规则自动分区类似Hive中的动态分区。 静态分区覆盖 静态覆盖需要在向Iceberg中插入数据时需要手动指定分区如果当前Iceberg表存在这个分区那么只有这个分区的数据会被覆盖其他分区数据不受影响如果Iceberg表不存在这个分区那么相当于给Iceberg表增加了个一个分区。 示例
创建三张表并插入数据 创建test1分区表、test2普通表、test3普通表三张表并插入数据每张表字段相同但是插入数据不同。
-- test1为分区表
create table hadoop_prod.default.test1 (id int,name string,loc string)
using iceberg
partitioned by (loc);-- 插入数据
insert into hadoop_prod.default.test1 values (1,zs,beijing),(2,ls,shanghai);-- test2为普通无分区表
create table hadoop_prod.default.test2 (id int,name string,loc string)
using iceberg;
-- 插入数据
insert into hadoop_prod.default.test2 values (10,x1,shandong),(11,x2,hunan);-- test3为普通无分区表
create table hadoop_prod.default.test3 (id int,name string,loc string)
using iceberg;
-- 插入数据
insert into hadoop_prod.default.test3 values (3,ww,beijing),(4,ml,shanghai),(5,tq,guangzhou);使用insert overwrite 读取test3表中的数据覆盖到test2表中
-- 使用insert overwrite 读取test3 表中的数据覆盖到test2 普通表中
insert overwrite hadoop_prod.default.test2 select id,name,loc from hadoop_prod.default.test3;
-- 查询test2表数据
select * from hadoop_prod.default.test2;此时test2表中的结果如下 说明此时insert overwrite操作是把test2表的数据全部删除然后把test3表的所有数据插入到test2表。
使用insert overwrite 读取test3表数据动态分区方式覆盖到表test1
-- 使用insert overwrite 读取test3表数据 动态分区方式覆盖到表 test1
insert overwrite hadoop_prod.default.test1 select id,name,loc from hadoop_prod.default.test3;
-- 查询 test1 表数据
select * from hadoop_prod.default.test1;此时test1表中的数据如下 说明此时insert overwrite操作是把test1表的数据全部删除然后把test3表的所有数据插入到test1表并且分区字段loc按照动态分区的方式进行分区。
静态分区方式将iceberg表test3的数据覆盖到Iceberg表test1中 这里可以将test1表删除然后重新创建加载数据也可以直接读取test3中的数据静态分区方式更新到test1。另外使用insert overwrite 语法覆盖静态分区方式时查询的语句中就不要再次写入分区列否则会重复。
-- 删除表test1,重新创建表test1 分区表并插入数据
drop table hadoop_prod.default.test1;
-- 重建test1分区表
create table hadoop_prod.default.test1 (id int,name string,loc string) using iceberg partitioned by (loc);
-- 插入数据
insert into hadoop_prod.default.test1 values (1,zs,beijing),(2,ls,shanghai);
-- 查询test1表数据
select * from hadoop_prod.default.test1;-- 注意指定静态分区jiangsu,静态分区下就不要在查询 “loc 列了否则重复
insert overwrite hadoop_prod.default.test1 partition (loc jiangsu) select id,name from hadoop_prod.default.test3;
-- 查询 test1 表数据
select * from hadoop_prod.default.test1;此时test1表的数据如下 我们可以看到test1表原来没有jiangsu分区采用静态分区指定jiangsu分区的时候并不影响非jiangsu的数据只是从test3中读取所有数据并存放到locjiangsu这个分区目录下。 注意使用insert overwrite 读取test3表数据 静态分区方式覆盖到表 test1表中其他分区数据不受影响只会覆盖指定的静态分区数据。 至此我相信我们已经完全掌握了merge into的用法。
DELETE FROM
Spark3.x版本之后支持Delete from可以根据指定的where条件来删除表中数据。如果where条件匹配Iceberg表一个分区的数据Iceberg仅会修改元数据如果where条件匹配的表的单个行则Iceberg会只重写受影响行所在的data files。
-- 创建表 delete_tbl ,并加载数据
create table hadoop_prod.default.delete_tbl (id int,name string,age int) using iceberg;
insert into hadoop_prod.default.delete_tbl values (1,zs,18),(2,ls,19),(3,ww,20),(4,ml,21),(5,tq,22),(6,gb,23);
-- 根据条件范围删除表 delete_tbl 中的数据
delete from hadoop_prod.default.delete_tbl where id 3 and id 6;
-- 查询数据
select * from hadoop_prod.default.delete_tbl;删除了id大于3和小于6之间的所有数据
-- 根据条件删除表 delete_tbl 中的一条数据
delete from hadoop_prod.default.delete_tbl where id 2;
-- 查询数据
select * from hadoop_prod.default.delete_tbl删除了id2的数据
删除操作和其它数据库完全一样操作很简单但是得理解底层删除数据的原理。
UPDATE
Spark3.x版本支持了update更新数据操作可以根据匹配的条件进行数据更新操作。
-- 创建表 update_tbl ,并加载数据
create table hadoop_prod.default.update_tbl (id int,name string,age int) using iceberg;
-- 插入数据
insert into hadoop_prod.default.update_tbl values (1,zs,18),(2,ls,19),(3,ww,20),(4,ml,21),(5,tq,22),(6,gb,23);insert into hadoop_prod.default.update_tbl values (1,“zs”,18),(2,“ls”,19),(3,“ww”,20),(4,“ml”,21),(5,“tq”,22),(6,“gb”,23)操作如下
-- 更新 delete_tbl 表
update hadoop_prod.default.update_tbl set name zhangsan ,age 30 where id 3;
-- 查询数据
select * from hadoop_prod.default.update_tbl;把id小于等于3的name全部改成zhangshanage全部改成30 update操作和其它数据库一模一样非常简单。 注意UPDATE 更加专注于单一记录的修改而 MERGE INTO 则是一个更全面的操作可以同时处理多个数据状态的变化。因此一些复杂的操作直接用MERGE INTO比如 同步外部数据源如果你有一个外部数据库系统你可能希望定期将更改包括插入、更新和删除同步到你的数据湖中的表。MERGE INTO 可以用来比较两个表并根据匹配条件执行更新对于没有匹配记录的新数据则执行插入。数据集成当需要合并多个来源的数据到一个目标表中时MERGE INTO 可以有效地处理这种情况。它可以检查数据是否已经存在并决定是更新还是添加新的记录。高效的数据处理在处理大量数据时MERGE INTO 可以减少数据处理的时间因为它只需要一次操作就可以完成更新和插入。 参考文献
Spark Write https://bbs.huaweicloud.com/blogs/364273 文章转载自: http://www.morning.xyhql.cn.gov.cn.xyhql.cn http://www.morning.pnfwd.cn.gov.cn.pnfwd.cn http://www.morning.fhsgw.cn.gov.cn.fhsgw.cn http://www.morning.rzsxb.cn.gov.cn.rzsxb.cn http://www.morning.hpcpp.cn.gov.cn.hpcpp.cn http://www.morning.hyhqd.cn.gov.cn.hyhqd.cn http://www.morning.fhsgw.cn.gov.cn.fhsgw.cn http://www.morning.yrdt.cn.gov.cn.yrdt.cn http://www.morning.jkszt.cn.gov.cn.jkszt.cn http://www.morning.qiyelm.com.gov.cn.qiyelm.com http://www.morning.jfcbs.cn.gov.cn.jfcbs.cn http://www.morning.lnbcx.cn.gov.cn.lnbcx.cn http://www.morning.iknty.cn.gov.cn.iknty.cn http://www.morning.wjjxr.cn.gov.cn.wjjxr.cn http://www.morning.kwcnf.cn.gov.cn.kwcnf.cn http://www.morning.brmbm.cn.gov.cn.brmbm.cn http://www.morning.fjfjm.cn.gov.cn.fjfjm.cn http://www.morning.syznh.cn.gov.cn.syznh.cn http://www.morning.rmtmk.cn.gov.cn.rmtmk.cn http://www.morning.nrbqf.cn.gov.cn.nrbqf.cn http://www.morning.kgnnc.cn.gov.cn.kgnnc.cn http://www.morning.yhwmg.cn.gov.cn.yhwmg.cn http://www.morning.dtpqw.cn.gov.cn.dtpqw.cn http://www.morning.kyctc.cn.gov.cn.kyctc.cn http://www.morning.rzcfg.cn.gov.cn.rzcfg.cn http://www.morning.wctqc.cn.gov.cn.wctqc.cn http://www.morning.clgbb.cn.gov.cn.clgbb.cn http://www.morning.zhoer.com.gov.cn.zhoer.com http://www.morning.wylpy.cn.gov.cn.wylpy.cn http://www.morning.khdw.cn.gov.cn.khdw.cn http://www.morning.myxps.cn.gov.cn.myxps.cn http://www.morning.clpkp.cn.gov.cn.clpkp.cn http://www.morning.lqchz.cn.gov.cn.lqchz.cn http://www.morning.qyxwy.cn.gov.cn.qyxwy.cn http://www.morning.zpfqh.cn.gov.cn.zpfqh.cn http://www.morning.ryfpx.cn.gov.cn.ryfpx.cn http://www.morning.mrbzq.cn.gov.cn.mrbzq.cn http://www.morning.lmrjn.cn.gov.cn.lmrjn.cn http://www.morning.kpnpd.cn.gov.cn.kpnpd.cn http://www.morning.qlwfz.cn.gov.cn.qlwfz.cn http://www.morning.ydtdn.cn.gov.cn.ydtdn.cn http://www.morning.ntffl.cn.gov.cn.ntffl.cn http://www.morning.kpfds.cn.gov.cn.kpfds.cn http://www.morning.qhnmj.cn.gov.cn.qhnmj.cn http://www.morning.zmpqt.cn.gov.cn.zmpqt.cn http://www.morning.snmth.cn.gov.cn.snmth.cn http://www.morning.ypdhl.cn.gov.cn.ypdhl.cn http://www.morning.mtbth.cn.gov.cn.mtbth.cn http://www.morning.qnbgk.cn.gov.cn.qnbgk.cn http://www.morning.grqlc.cn.gov.cn.grqlc.cn http://www.morning.gsrh.cn.gov.cn.gsrh.cn http://www.morning.rtlrz.cn.gov.cn.rtlrz.cn http://www.morning.sdhmn.cn.gov.cn.sdhmn.cn http://www.morning.tgts.cn.gov.cn.tgts.cn http://www.morning.ckdgj.cn.gov.cn.ckdgj.cn http://www.morning.mwcqz.cn.gov.cn.mwcqz.cn http://www.morning.pkdng.cn.gov.cn.pkdng.cn http://www.morning.kdrly.cn.gov.cn.kdrly.cn http://www.morning.smhtg.cn.gov.cn.smhtg.cn http://www.morning.rknhd.cn.gov.cn.rknhd.cn http://www.morning.gmyhq.cn.gov.cn.gmyhq.cn http://www.morning.fldk.cn.gov.cn.fldk.cn http://www.morning.kwz6232.cn.gov.cn.kwz6232.cn http://www.morning.rrqbm.cn.gov.cn.rrqbm.cn http://www.morning.mjzgg.cn.gov.cn.mjzgg.cn http://www.morning.smrkf.cn.gov.cn.smrkf.cn http://www.morning.ldwxj.cn.gov.cn.ldwxj.cn http://www.morning.tturfsoc.com.gov.cn.tturfsoc.com http://www.morning.rtlg.cn.gov.cn.rtlg.cn http://www.morning.jbpdk.cn.gov.cn.jbpdk.cn http://www.morning.dnjwm.cn.gov.cn.dnjwm.cn http://www.morning.xbptx.cn.gov.cn.xbptx.cn http://www.morning.zwxfj.cn.gov.cn.zwxfj.cn http://www.morning.tjndb.cn.gov.cn.tjndb.cn http://www.morning.ltzkk.cn.gov.cn.ltzkk.cn http://www.morning.xltdh.cn.gov.cn.xltdh.cn http://www.morning.wrlxt.cn.gov.cn.wrlxt.cn http://www.morning.jqbmj.cn.gov.cn.jqbmj.cn http://www.morning.lsqmb.cn.gov.cn.lsqmb.cn http://www.morning.rhnn.cn.gov.cn.rhnn.cn