蓝色 宽屏 网站 模板下载,深圳电器公司邮编,如何建论坛网站,番禺人才招聘网官网基于开源云原生数据仓库 ByConity 体验多种数据分析场景 业务背景什么是 ByConity上手实测环境要求测试操作远程登录 ECS 服务器windows10 自带连接工具 执行查询 ByConity 相对于 ELT 能力的优化提升并行度任务级重试并行写入简化数据链路 业务背景
大家都知道#xff0c;在… 基于开源云原生数据仓库 ByConity 体验多种数据分析场景 业务背景什么是 ByConity上手实测环境要求测试操作远程登录 ECS 服务器windows10 自带连接工具 执行查询 ByConity 相对于 ELT 能力的优化提升并行度任务级重试并行写入简化数据链路 业务背景
大家都知道在现在这个数据量飞速增长的数据为王时代任何一家企业只有对自身的企业数据有精准的分析和研判那么才能掌握数据密码从而掌握这个时代盈利的秘诀。因此高效的数据处理和分析能力已经成为企业竞争力的关键。
在实际业务中用户会基于不同的产品分别构建实时数仓和离线数仓。其中实时数仓强调数据能够快速入库且在入库的第一时间就可以进行分析低时延的返回分析结果。而离线数仓强调复杂任务能够稳定的执行完需要更好的内存管理。
什么是 ByConity
我们今天的主角是 ByConity那么什么是 ByConity
ByConity 是字节跳动开源的云原生数据仓库可以满足用户的多种数据分析场景。并且ByConity 增加了 bsp 模式可以进行 task 级别的容错更细粒度的调度基于资源感知的调度。希望通过 bsp 能力把数据加工T的过程转移到 ByConity 内部能够一站式完成数据接入、加工和分析。
上手实测
在对当前数字驱动时代下的企业处境有了一个基础的了解后我们又了解了开源的云原生数据仓库 ByConity。下面就正式开始今天的上手实测吧。
为了实际上手感受 bsp 模式带来的效果官方为我们提供了使用 TPC-DS 1TB 的数据测试活动让我们可以亲自验证 ByConity 在 ELT 方面的实际感受。
环境要求
由于要测试的是大数据量的数据加工和分析因此对于环境的要求也相对比较高官方为我们提供了一个测试集群下面是集群规格
测试操作
远程登录 ECS 服务器
开始操作之前首先需要登录 ECS 服务器我们可以通过本地 SSH 工具连接到 ECS 服务器打开本地远程连接工具 Xshell点击【新建会话】输入 ECS 服务器 IP 地址选择端口 23 继续点击【用户身份验证】选择密码登录方式输入用户名、密码点击【确定】 这里需要说明一下我用我本地自带的 Xshell 5 远程连接服务器工具没有成功提示如下【找不到匹配的 host key 算法。】我理解应该是我本地的连接远程工具太老了很久没更新导致的。那么下面我采用第二种链接方法采用 windows10 自带的远程连接工具。
windows10 自带连接工具
这里在本地磁盘新建一个文件夹在文件夹内点击 ctrlshift鼠标右键 选中【在此处打开 Powershell 窗口(S)】打开自带的命令行工具 在命令行工具输入以下命令并回车确认
ssh -p 23 提供的用户名ECS服务器IP地址
# ssh -p 23 root14.103.145.182这时会出现一句提示我们输入 yes 点击回车
Are you sure you want to continue connecting (yes/no/[fingerprint])? yes随后会提示让我们输入密码我们输入密码后点击回车即可登录 ECS 服务器。这里需要注意的是密码一定要手输入粘贴的密码这里不能识别 为了避免连接超时导致断开连接频繁输入密码耽误时间我们可以执行命令
tmux new -s $user_id
# tmux new -s $user_2024创建一个新的 tmux 会话其中$user_id是可以自定义的会话名称。后续重新登录时使用 tmux a -t $user_id。
继续输入命令执行进入客户端
clickhouse client --port 9010执行查询
首先切换到测试用数据库 test_elt
use test_eltTPC-DS 定义的查询语法为标准 SQL因此设置数据库会话的方言类型为 ANSI
set dialect_type ANSI执行后结果如图 选择 TPC-DS 的 99 个查询中你希望的执行因为 q78 查询会因为内存限制而执行失败所以我们优先执行 q78 查询语句打开 q78 sql 文件复制 sql 执行
with ws as(select d_year AS ws_sold_year, ws_item_sk,ws_bill_customer_sk ws_customer_sk,sum(ws_quantity) ws_qty,sum(ws_wholesale_cost) ws_wc,sum(ws_sales_price) ws_spfrom web_salesleft join web_returns on wr_order_numberws_order_number and ws_item_skwr_item_skjoin date_dim on ws_sold_date_sk d_date_skwhere wr_order_number is nullgroup by d_year, ws_item_sk, ws_bill_customer_sk),cs as(select d_year AS cs_sold_year, cs_item_sk,cs_bill_customer_sk cs_customer_sk,sum(cs_quantity) cs_qty,sum(cs_wholesale_cost) cs_wc,sum(cs_sales_price) cs_spfrom catalog_salesleft join catalog_returns on cr_order_numbercs_order_number and cs_item_skcr_item_skjoin date_dim on cs_sold_date_sk d_date_skwhere cr_order_number is nullgroup by d_year, cs_item_sk, cs_bill_customer_sk),ss as(select d_year AS ss_sold_year, ss_item_sk,ss_customer_sk,sum(ss_quantity) ss_qty,sum(ss_wholesale_cost) ss_wc,sum(ss_sales_price) ss_spfrom store_salesleft join store_returns on sr_ticket_numberss_ticket_number and ss_item_sksr_item_skjoin date_dim on ss_sold_date_sk d_date_skwhere sr_ticket_number is nullgroup by d_year, ss_item_sk, ss_customer_sk)selectss_sold_year, ss_item_sk, ss_customer_sk,round(ss_qty/(coalesce(ws_qty,0)coalesce(cs_qty,0)),2) ratio,ss_qty store_qty, ss_wc store_wholesale_cost, ss_sp store_sales_price,coalesce(ws_qty,0)coalesce(cs_qty,0) other_chan_qty,coalesce(ws_wc,0)coalesce(cs_wc,0) other_chan_wholesale_cost,coalesce(ws_sp,0)coalesce(cs_sp,0) other_chan_sales_pricefrom ssleft join ws on (ws_sold_yearss_sold_year and ws_item_skss_item_sk and ws_customer_skss_customer_sk)left join cs on (cs_sold_yearss_sold_year and cs_item_skss_item_sk and cs_customer_skss_customer_sk)where (coalesce(ws_qty,0)0 or coalesce(cs_qty, 0)0) and ss_sold_year2000order byss_sold_year, ss_item_sk, ss_customer_sk,ss_qty desc, ss_wc desc, ss_sp desc,other_chan_qty,other_chan_wholesale_cost,other_chan_sales_price,ratioLIMIT 100;复制 sql 到命令框后点击回车确定这里可以看到执行失败 这时在失败的 sql 后加上一下语句再执行尝试,其中参数distributed_max_parallel_size可以设置为 4 的其他整数倍因为 Worker 的数量为 4
SETTINGS bsp_mode 1,distributed_max_parallel_size 12;更改后的 q78 sql 文件内容如下 with ws as(select d_year AS ws_sold_year, ws_item_sk,ws_bill_customer_sk ws_customer_sk,sum(ws_quantity) ws_qty,sum(ws_wholesale_cost) ws_wc,sum(ws_sales_price) ws_spfrom web_salesleft join web_returns on wr_order_numberws_order_number and ws_item_skwr_item_skjoin date_dim on ws_sold_date_sk d_date_skwhere wr_order_number is nullgroup by d_year, ws_item_sk, ws_bill_customer_sk),cs as(select d_year AS cs_sold_year, cs_item_sk,cs_bill_customer_sk cs_customer_sk,sum(cs_quantity) cs_qty,sum(cs_wholesale_cost) cs_wc,sum(cs_sales_price) cs_spfrom catalog_salesleft join catalog_returns on cr_order_numbercs_order_number and cs_item_skcr_item_skjoin date_dim on cs_sold_date_sk d_date_skwhere cr_order_number is nullgroup by d_year, cs_item_sk, cs_bill_customer_sk),ss as(select d_year AS ss_sold_year, ss_item_sk,ss_customer_sk,sum(ss_quantity) ss_qty,sum(ss_wholesale_cost) ss_wc,sum(ss_sales_price) ss_spfrom store_salesleft join store_returns on sr_ticket_numberss_ticket_number and ss_item_sksr_item_skjoin date_dim on ss_sold_date_sk d_date_skwhere sr_ticket_number is nullgroup by d_year, ss_item_sk, ss_customer_sk)selectss_sold_year, ss_item_sk, ss_customer_sk,round(ss_qty/(coalesce(ws_qty,0)coalesce(cs_qty,0)),2) ratio,ss_qty store_qty, ss_wc store_wholesale_cost, ss_sp store_sales_price,coalesce(ws_qty,0)coalesce(cs_qty,0) other_chan_qty,coalesce(ws_wc,0)coalesce(cs_wc,0) other_chan_wholesale_cost,coalesce(ws_sp,0)coalesce(cs_sp,0) other_chan_sales_pricefrom ssleft join ws on (ws_sold_yearss_sold_year and ws_item_skss_item_sk and ws_customer_skss_customer_sk)left join cs on (cs_sold_yearss_sold_year and cs_item_skss_item_sk and cs_customer_skss_customer_sk)where (coalesce(ws_qty,0)0 or coalesce(cs_qty, 0)0) and ss_sold_year2000order byss_sold_year, ss_item_sk, ss_customer_sk,ss_qty desc, ss_wc desc, ss_sp desc,other_chan_qty,other_chan_wholesale_cost,other_chan_sales_price,ratioLIMIT 100SETTINGS bsp_mode 1,distributed_max_parallel_size 12;本次执行成功了返回 100 条数据耗时 60.510 秒。处理了 5.0421 亿行6.05 GB833 万行/秒99.96 MB/秒
100 rows in set. Elapsed: 60.510 sec. Processed 504.21 million rows, 6.05 GB (8.33 million rows/s., 99.96 MB/s.)我再换一条 sql 执行操作选择 q99 sql 命令复制并执行
select substr(w_warehouse_name,1,20),sm_type,cc_name,sum(case when (cs_ship_date_sk - cs_sold_date_sk 30 ) then 1 else 0 end) as 30 days ,sum(case when (cs_ship_date_sk - cs_sold_date_sk 30) and (cs_ship_date_sk - cs_sold_date_sk 60) then 1 else 0 end ) as 31-60 days ,sum(case when (cs_ship_date_sk - cs_sold_date_sk 60) and (cs_ship_date_sk - cs_sold_date_sk 90) then 1 else 0 end) as 61-90 days ,sum(case when (cs_ship_date_sk - cs_sold_date_sk 90) and(cs_ship_date_sk - cs_sold_date_sk 120) then 1 else 0 end) as 91-120 days ,sum(case when (cs_ship_date_sk - cs_sold_date_sk 120) then 1 else 0 end) as 120 days
fromcatalog_sales,warehouse,ship_mode,call_center,date_dim
whered_month_seq between 1200 and 1200 11
and cs_ship_date_sk d_date_sk
and cs_warehouse_sk w_warehouse_sk
and cs_ship_mode_sk sm_ship_mode_sk
and cs_call_center_sk cc_call_center_sk
group bysubstr(w_warehouse_name,1,20),sm_type,cc_name
order by substr(w_warehouse_name,1,20),sm_type,cc_name
limit 100;执行结果返回 100 条数据耗时 9.086 秒。处理了 7313 千行197.11 KB8.05 千行/秒21.69 KB/秒
100 rows in set. Elapsed: 9.086 sec. Processed 73.13 thousand rows, 197.11 KB (8.05 thousand rows/s., 21.69 KB/s.)添加参数限制查询的最大内存使用量引发 oom在 q99 sql 命令后增加如下命令
-- 单位为 B当前约合 37.25 GB
SETTINGS max_memory_usage40000000000;更改后的 sql 命令依然执行成功这个时候我不断调整按照每次下调资源总数 70%的方式不断缩小 max_memory_usage 的大小进行尝试
SETTINGS max_memory_usage93052205;经过多次的尝试设置为 SETTINGS max_memory_usage93052205; 后出现了内存错误信息 此时我们再为 q99 sql 增加参数 distributed_max_parallel_size 来进行尝试在上述 sql 后继续增加以下命令信息
SETTINGS bsp_mode 1,distributed_max_parallel_size 12;增加参数 分布式最大并行大小 参数 distributed_max_parallel_size 后的 sql 从最大并行大小 4 调整到了 76进行尝试但是还是没有成功
select substr(w_warehouse_name,1,20),sm_type,cc_name,sum(case when (cs_ship_date_sk - cs_sold_date_sk 30 ) then 1 else 0 end) as 30 days ,sum(case when (cs_ship_date_sk - cs_sold_date_sk 30) and (cs_ship_date_sk - cs_sold_date_sk 60) then 1 else 0 end ) as 31-60 days ,sum(case when (cs_ship_date_sk - cs_sold_date_sk 60) and (cs_ship_date_sk - cs_sold_date_sk 90) then 1 else 0 end) as 61-90 days ,sum(case when (cs_ship_date_sk - cs_sold_date_sk 90) and(cs_ship_date_sk - cs_sold_date_sk 120) then 1 else 0 end) as 91-120 days ,sum(case when (cs_ship_date_sk - cs_sold_date_sk 120) then 1 else 0 end) as 120 days
fromcatalog_sales,warehouse,ship_mode,call_center,date_dim
whered_month_seq between 1200 and 1200 11
and cs_ship_date_sk d_date_sk
and cs_warehouse_sk w_warehouse_sk
and cs_ship_mode_sk sm_ship_mode_sk
and cs_call_center_sk cc_call_center_sk
group bysubstr(w_warehouse_name,1,20),sm_type,cc_name
order by substr(w_warehouse_name,1,20),sm_type,cc_name
limit 100
SETTINGS max_memory_usage93052205,bsp_mode 1,distributed_max_parallel_size 76;并且我发现随着分布式最大并行大小 参数 distributed_max_parallel_size 的 不断升高执行 sql 等待的时间也越来越长我这边调整参数 distributed_max_parallel_size 从 4 一直按照 4 的倍数尝试到 76 执行时间从最初的 10s 以内到现在的近 180s 还是没成功 下面我准备再体验其他 sql 语句的查询效果。复制 q01 的 sql 到命令框这时可以看到 sql 文件内容本身有问题并不能执行而是被截成了两断 其实上面的 sql 直接从 sql 文件复制出来执行也是会报错的我这边都是经过根据错误提示修改后执行成功的对于 q01 按照错误提示修改后 sql 如下
with customer_total_return as
(selectsr_customer_sk as ctr_customer_sk,sr_store_sk as ctr_store_sk,sum(sr_return_amt) as ctr_total_returnfrom store_returns, date_dimwhere sr_returned_date_sk d_date_sk and d_year 2000 group by sr_customer_sk,sr_store_sk)
select c_customer_id
from customer_total_return ctr1, store, customer
where ctr1.ctr_total_return (select avg(ctr_total_return) *1.2from customer_total_return ctr2where ctr1.ctr_store_sk ctr2.ctr_store_sk
)
and s_store_sk ctr1.ctr_store_sk
and s_state TN
and ctr1.ctr_customer_sk c_customer_sk
order by c_customer_id
limit 100;执行后可以看到返回信息返回 100 条数据执行耗时 6.275 秒。处理了 147.1 万行394.73 KB234.4 万行/秒62.90 KB/秒
100 rows in set. Elapsed: 6.275 sec. Processed 147.10 thousand rows, 394.73 KB (23.44 thousand rows/s., 62.90 KB/s.)执行结果过如图所示 当然TPC-DS 测试集均为模拟实际场景的查询如果你希望了解具体查询的含义可以将 SQL 复制到大模型产品如豆包中让其进行解释能帮助您更好的理解查询对应的实际场景。豆包大模型地址https://www.doubao.com/chat/365034912382466这里我输入 q01 sql 文件内容进行解释得到如下内容
以下是对这段 SQL 代码的分析
代码整体功能概述
这段 SQL 代码的目的是从数据库中查询出满足特定条件的客户 IDc_customer_id。它首先通过一个公共表达式CTE即 customer_total_return计算出每个客户在特定商店基于 sr_customer_sk 和 sr_store_sk 分组在 2000 年的总退货金额ctr_total_return。然后在主查询中筛选出那些在所在商店的总退货金额大于该商店平均总退货金额 1.2 倍并且对应的商店位于 TN田纳西州推测这里 s_state 代表商店所在州的客户最后按照客户 ID 排序并限制返回结果为 100 条记录。
具体各部分分析
公共表达式CTE部分customer_total_return
逻辑说明
通过关联 store_returns 表可能存储商店退货相关信息和 date_dim 表可能存储日期维度相关数据以 sr_returned_date_sk 和 d_date_sk 作为关联条件筛选出退货日期在 2000 年通过 d_year 2000 条件的数据。
然后按照客户编号sr_customer_sk和商店编号sr_store_sk进行分组并对每个分组内的退货金额sr_return_amt求和得到每个客户在每个商店在 2000 年的总退货金额将其命名为 ctr_total_return同时将 sr_customer_sk 重命名为 ctr_customer_sksr_store_sk 重命名为 ctr_store_sk。
代码示例
selectsr_customer_sk as ctr_customer_sk,sr_store_sk as ctr_store_sk,sum(sr_return_amt) as ctr_total_return
from store_returns, date_dim
where sr_returned_date_sk d_date_sk and d_year 2000
group by sr_customer_sk,sr_store_sk
主查询部分
筛选条件分析
首先在 FROM 子句中关联了 customer_total_return通过别名 ctr1 使用、store 表和 customer 表。关联条件分别是 s_store_sk ctr1.ctr_store_sk将商店表和前面计算总退货的 CTE 按商店编号关联以及 ctr1.ctr_customer_sk c_customer_sk将客户表和 CTE 按客户编号关联。
在 WHERE 子句中有多个筛选条件
ctr1.ctr_total_return (select avg(ctr_total_return) * 1.2 from customer_total_return ctr2 where ctr1.ctr_store_sk ctr2.ctr_store_sk)这是一个比较关键的筛选条件它使用了子查询。对于每一个 ctr1 中的记录即每个客户在每个商店的情况会在子查询中计算出与当前记录所在商店相同通过 ctr1.ctr_store_sk ctr2.ctr_store_sk 关联条件的所有客户的平均总退货金额然后要求当前客户在该商店的总退货金额大于这个平均总退货金额的 1.2 倍。
s_state TN筛选出所在商店位于田纳西州的情况这里是基于 store 表中的 s_state 字段进行筛选。
排序和限制结果
最后使用 ORDER BY c_customer_id 按照客户 ID 进行排序并通过 LIMIT 100 限制只返回前 100 条记录。
代码示例
select c_customer_id
from customer_total_return ctr1, store, customer
where ctr1.ctr_total_return (select avg(ctr_total_return) *1.2from customer_total_return ctr2where ctr1.ctr_store_sk ctr2.ctr_store_sk
)
and s_store_sk ctr1.ctr_store_sk
and s_state TN
and ctr1.ctr_customer_sk c_customer_sk
order by c_customer_id
limit 100;
潜在优化点或注意事项
关联方式
当前代码中在 CTE 和主查询里使用的表连接方式是隐式连接使用逗号分隔表在 WHERE 子句中写关联条件建议可以改为显式的 JOIN 语法这样代码的可读性会更好。例如在 CTE 中可以写成
selectsr_customer_sk as ctr_customer_sk,sr_store_sk as ctr_store_sk,sum(sr_return_amt) as ctr_total_return
from store_returns
JOIN date_dim ON sr_returned_date_sk d_date_sk
where d_year 2000
group by sr_customer_sk,sr_store_sk
在主查询中类似地可以将连接改为显式的 JOIN 语句来提升可读性。
性能方面
如果数据量较大涉及到的子查询计算同商店平均退货金额的子查询可能会影响性能因为对于每一条外部查询的记录都要执行一次子查询。可以考虑使用窗口函数等方式来预先计算平均退货金额并关联到原始数据上以优化性能不过这需要根据具体数据库的支持情况来调整代码实现。
索引使用
考虑在关联字段如 sr_customer_sk、sr_store_sk、d_date_sk、s_store_sk、c_customer_sk 等以及用于筛选和排序的字段如 d_year、s_state、c_customer_id上根据数据库的查询特点创建合适的索引有助于提升查询性能。
总的来说这段代码实现了特定业务需求下筛选客户的功能但在代码风格和性能方面有一定的优化空间可以根据实际使用的数据库环境做进一步调整。
豆包界面如图所示简洁明了唯一亮点是竟然不用登录就可以使用这个就省去了每次登录的麻烦很好 后面可以测试的 sql 还有很多这里受限于文章篇幅不再依依在这里描述了大家可以在官方提供的测试服务器进行操作不过根据我操作了这么多 sql 的经验很少有粘贴出来 sql 就可以执行的经常会遇到 sql 换行引起的 sql 查询断行的情况就像这样 到这里关于实验操作操作的部分就已经告一段落后续对于不同场景的业务 sql 的执行这里也就不再一一列举截图了。那么相对于传统 ELTByConity 有哪些优化的地方。 下面再来聊一下 ByConity 相比于 ELT 优化的地方这里需要声明一点哈对于 ByConity 相比于 ELT 优化的地方 的内容摘自于其他文章内容也是本次活动页地址https://xie.infoq.cn/article/dd49679fc505dc3249692cf74
以下内容基本来自活动页内容搬过来的为了方便大家在实验操作之后回想实验中的操作体验可以结合传统 ELT 能力做个对比对比的内容我搬来放在下面不用大家再去别的地方搜索增加学习负担了。 ByConity 相对于 ELT 能力的优化
提升并行度
在传统结构中为什么要分别建设离线数仓和实时数仓。是因为常见的 OLAP 产品不擅长处理大量的复杂查询很容易把内容打满任务中断甚至造成宕机。
ByteHouse 具备 BSP 模式支持将查询切分为不同的 stage每个 stage 独立运行。在此基础上stage 内的数据也可以进行切分并行化不再受节点数量限制理论上可以无限扩展从而大幅度降低峰值内存。正如我们在测试 q78 时当遇到内存不足情况时可以通过 bsp 模式来设置并行参数 distributed_max_parallel_size 来提升并行度保障系统稳定性。
任务级重试
对于离线加工任务来说离线加工任务的另外一个特点就是链路比较长并且任务间有依赖关系。当某一个任务失败会导致整个链路失败 比如task4 依赖 task1、task2 的完成。如果 task1 失败发起重试会显示为整个链路执行失败。ByteHouse 增加了任务级重试能力在 ByteHouse 中只有运行失败的 task 需要重试。
并行写入
实时数仓存在频繁更新的特点使用重叠窗口进行批量 ETL 操作时会带来大量的数据更新。在这种场景下ByteHouse 做了大量的优化。 经过持续优化将最耗时的数据写入部分单独并行化并且在写入 part 文件时标记是否需要进行后续的 dedup 作业。在所有数据写入完毕后由 server 指定一个 worker 进行 dedup 和最后的事务提交如上图最右。
简化数据链路
ByteHouse 在传统的 MPP 链路基础上增加了对复杂查询的支持这使得 join 等操作可以有效地得到执行。
BSP 模式使用 barrier 将各个 stage 进行隔离每个 stage 独立运行stage 之内的 task 也相互独立。即便机器环境发生变化对查询的影响被限定在 task 级别。且每个 task 运行完毕后会及时释放计算资源对资源的使用更加充分。
在这个基础上BSP 的这种设计更利于重试的设计。任务失败后只需要重新拉起时读取它所依赖的任务的 shuffle 数据即可而无需考虑任务状态。