做美食网站的素材图片,网站设计网站优化公司,做淘宝客网站性质,求职网站网页模板前言 之前对 Spark SQL 的影响一直停留在 DSL 语法上面#xff0c;感觉可以用 SQL 表达的#xff0c;没有必要用 Java/Scala 去写#xff0c;但是面试一段时间后#xff0c;发现不少公司还是在用 SparkSQL 的#xff0c;京东也在使用 Spark On Hive 而不是我以为的 Hive O…前言 之前对 Spark SQL 的影响一直停留在 DSL 语法上面感觉可以用 SQL 表达的没有必要用 Java/Scala 去写但是面试一段时间后发现不少公司还是在用 SparkSQL 的京东也在使用 Spark On Hive 而不是我以为的 Hive On Spark经过一番了解之后发现确实 Spark SQL 要比 HQL 灵活太多了。所以必须学学 SparkSQL我喜欢用 Java 版本和 Scala 执行速度一样只不过代码复杂了点对我来说也没多复杂之后用 SparkSQL 对之前的离线项目实现一遍。
1、Spark SQL 介绍 Hive 是目前事实上离线数仓的标准它的缺点是底层使用的 MR 引擎所以执行稍微复杂点的 SQL 就非常慢不过它支持更换执行引擎换成 Spark/Tez 就会好很多而我们实际开发中也几乎不会有人去用 MR 引擎的 Hive 去跑一般都是 Hive on Spark 或者 Spark on Hive 的方式。
1.1、SparkSQL 的特点 正如官网描述Spark SQL 是 Apache Spark 的一个用于处理结构化数据的模块。而不是非结构化
1.1.1、集成 Spark SQL 将 SQL 和 Spark 程序无缝衔接它允许我们在 Spark 程序中使用 SQL 或者 DataFrame API 来查询结构化数据。
1.1.2、统一的数据访问 这也是 Spark SQL 优于 Hive 的一大原因它支持很多的数据源比如 hive、avro、parquet、orc、json、csv、jdbc 等我们可以通过 API 去访问这些数据源并且可以将通过 API 或者 SQL 这些不同的数据源连接在一起。
1.1.3、集成 Hive Spark SQL 可以使用 Hive 的元数据库、SerDes 和 UDFs我们可以在现有的数据仓库上运行 SQL 或 HiveQL 查询。
1.1.4、标准连接 这里说的是 Spark SQL 的服务器模式为商业智能工具比如 BI 工具提供了工业标准的 JDBC/ODBC。
1.2、不同 API 的执行速度 可以看到Python 在操作 RDD 时的速度要比 Java/Scala 慢几乎两倍多。
1.3、数据抽象
Spark SQL提供了两个新的抽象分别是 DataFrame 和 Dataset
Dataset是数据的分布式集合。是Spark 1.6中添加的一个新接口它提供了RDDs的优点(强类型、使用强大lambda函数的能力)以及Spark SQL优化的执行引擎的优点。可以从 JVM 对象构造数据集使用 createDataFrame 方法参数是Java对象集合然后使用函数转换(map、flatMap、filter等)操作数据集。数据集API可以在Scala和Java中使用。Python 和 R 并不支持Dataset API。
DataFrame 是组织成命名列的 Dataset。它在概念上相当于关系数据库中的表或 R/Python中的DataFrame但在底层有更丰富的优化这也是为什么 R/Python 操作 DataFrame 的效率能和 SQL、Java/Scala 差不多的原因。DataFrame 可以从各种各样的数据源构建例如: 结构化或半结构化数据文件json、csv、Hive中的表、外部数据库或现有的 rdd。DataFrame API 可以在Scala、Java、Python和 R 中使用。在Scala API中DataFrame 只是 Dataset[Row] 的类型别名。而在Java API中使用 DatasetRow 来表示DataFrame。
在Spark支持的语言中只有Scala和Java是强类型的。因此Python和R只支持无类型的DataFrame API。
1.3.1、DataFrame
DataFrame 可以比作一个表格或电子表格它有行和列每一列都有一个名称和数据类型。它提供了一种结构化的方式来存储和处理数据。
使用场景DataFrame 非常适合处理结构化数据也就是具有明确定义的模式的数据。它支持各种数据源如 CSV 文件、数据库、JSON 等。DataFrame 提供了丰富的操作如筛选、聚合、连接等使得数据处理变得简单高效。当我们需要执行 SQL 查询或进行统计分析时DataFrame 是首选的数据结构。
1.3.2、DataSet
DataSet 可以比作一个带有标签的盒子。每个数据集都包含一组对象这些对象具有相同的类型并且每个对象都有一组属性或字段。与 DataFrame 不同DataSet 是类型安全的这意味着 JVM 可以在编译时捕获类型错误。
使用场景DataSet 适用于需要类型安全和对象操作的情况。它提供了更强大的类型检查和编译时错误检查以及更丰富的函数式编程接口。当我们需要处理复杂的数据结构、需要执行对象转换或利用 Lambda 表达式等高级功能时DataSet 是更好的选择。但是需要注意的是DataSet 在某些情况下可能比 DataFrame 更复杂并且可能需要更多的内存和处理时间。
1.3.3、DataFrame 和 DataSet 的对比
类型安全性
DataFrame 不是类型安全的。它的每一行是一个Row对象字段的类型是在运行时解析的。因此如果在处理数据时类型不匹配可能会遇到运行时错误。相比之下DataSet 是强类型的分布式集合。当你定义一个 DataSet 时你需要为其提供一个case class使用 Scala API 时这个 case class定义了数据的结构。由于DataSet的每个元素都由这个 case class 的实例表示因此每个字段的类型在编译时就是已知的。这提供了更好的类型安全性允许在编译时捕获许多类型错误。
数据源和兼容性
DataFrame可以很容易地从各种数据源中创建如CSV文件、JSON、数据库等并且它提供了与这些数据源的直接兼容性。DataSet也可以从这些数据源创建但通常需要通过DataFrame作为中间步骤或者需要更多的代码来定义数据的结构。
优化和性能
DataFrame 和 DataSet在处理大量数据时都非常快但有时候DataFrame可能会因为它的结构更简单而得到更多的优化跑得更快一些。但是同样对于 DataSet 因为提前知道了每一列的数据类型所以在某些情况下它也可以进行优化让处理速度更快。
1.4、Spark on Hive / Hive on Spark
1.4.1、Spark on Hive
Spark on Hive是Hive只作为存储角色Spark负责sql解析优化执行。这里可以理解为Spark通过Spark SQL使用Hive语句操作Hive表底层运行的还是Spark RDD。具体步骤如下
通过SparkSQL加载Hive的配置文件获取到Hive的元数据信息获取到Hive的元数据信息之后可以拿到Hive表的数据通过SparkSQL来操作Hive表中的数据。
1.4.2、Hive on Spark
与Spark on Hive不同Hive on Spark则是Hive既作为存储又负责sql的解析优化Spark负责执行。这里Hive的执行引擎变成了Spark不再是MR。实现这个模式比Spark on Hive要麻烦得多需要重新编译Spark和导入jar包。
1.4.3、性能对比
相比之下Spark on Hive 应该是要更好一些毕竟 Spark on Hive 更加 原生底层就是 RDD 计算只有元数据用了Hive对SQL的解析转换优化等都是Spark完成而且 Spark SQL 相比 Hive 在执行计划上做了更多的优化。而 Hive on Spark 只有计算引擎是Spark前期解析转换优化等步骤都是 Hive 完成。
2、Spark SQL 编程
导入依赖 dependencygroupIdorg.apache.spark/groupIdartifactIdspark-sql_2.12/artifactIdversion3.3.1/version/dependencydependencygroupIdorg.projectlombok/groupIdartifactIdlombok/artifactIdversion1.18.22/version/dependency2.1、SparkSession
在 RDD 编程中我们使用的是SparkContext 接口但在 Spark SQL中我们将使用SparkSession接口。Spark2.0 出现的 SparkSession 接口替代了 Spark 1.6 版本中的 SQLContext 和 HiveContext接口来实现对数据的加载、转换、处理等功能。
SparkSession内部封装了SparkContext所以计算实际上是由SparkContext完成的
2.2、Spark SQL 语法
先创建一个 json 文件作为数据源
{name: 李大喜, age: 20, dept: 农民}
{name: 燕双鹰, age: 20, dept: 保安}
{name: 狄仁杰, age: 40, dept: 保安}
{name: 李元芳, age: 40, dept: 保安}
{name: 谢永强, age: 20, dept: 农民}
2.2.1、SQL 语法
package com.lyh;import com.lyh.domain.User;
import org.apache.spark.SparkConf;
import org.apache.spark.sql.*;import java.util.Arrays;public class Main {public static void main(String[] args) {// 1. 创建配置对象SparkConf conf new SparkConf().setMaster(local[*]).setAppName(test1);// 2. 创建 SparkSessionSparkSession spark SparkSession.builder().config(conf).getOrCreate();spark.sparkContext().setLogLevel(WARN); // 只在 Spark Application 运行时有效// 通过 json 文件创建 DataFrame// 在 Java 的 API 中并没有 DataFrame 这种数据类型, DataSerRow 指的就是 DataFrame DatasetRow lineDF spark.read().json(src/main/resources/json/user.json);lineDF.createOrReplaceTempView(users); // 支持所有的hive sql语法,并且会使用spark的优化器spark.sql(select * from users order by age).show();// 关闭 SparkSessionspark.close();}
}运行结果 2.2.2、DSL 语法
lineDF.select(*).orderBy(age).show();
效果和上面是一样的但是一般能用 SQL 就不用 DSL 。
2.3、自定义函数
2.3.1、UDF
一进一出传一个参数进去返回一个结果
import org.apache.spark.SparkConf;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.api.java.UDF1;
import org.apache.spark.sql.expressions.UserDefinedFunction;
import org.apache.spark.sql.types.DataTypes;import java.util.Locale;import static org.apache.spark.sql.functions.udf;public class MyUdf {public static void main(String[] args) {SparkConf conf new SparkConf().setAppName(udf).setMaster(local[*]);SparkSession spark SparkSession.builder().config(conf).getOrCreate();DatasetRow df spark.read().json(src/main/resources/json/user.json);df.createOrReplaceTempView(users);UserDefinedFunction up udf((UDF1String, String) str - str.toUpperCase(Locale.ROOT), DataTypes.StringType);spark.udf().register(up,up);spark.sql(SELECT up(name),age FROM users).show();spark.close();}}
上面我们定义了一个函数实现把英文全部大写测试 2.3.2、UDAF
输入多行返回一行一般和 groupBy 配合使用其实就是自定义聚合函数。
Spark3.x推荐使用extends Aggregator自定义UDAF属于强类型的Dataset方式。Spark2.x使用extends UserDefinedAggregateFunction属于弱类型的DataFrame
package com.lyh.udf;import lombok.Data;
import org.apache.spark.SparkConf;
import org.apache.spark.sql.*;
import org.apache.spark.sql.api.java.UDF1;
import org.apache.spark.sql.expressions.Aggregator;
import org.apache.spark.sql.expressions.UserDefinedFunction;
import org.apache.spark.sql.types.DataTypes;import java.io.Serializable;
import java.util.Locale;import static org.apache.spark.sql.functions.udaf;
import static org.apache.spark.sql.functions.udf;public class MyUdf {public static void main(String[] args) {SparkConf conf new SparkConf().setAppName(udaf).setMaster(local[*]);SparkSession spark SparkSession.builder().config(conf).getOrCreate();DatasetRow df spark.read().json(src/main/resources/json/user.json);df.createOrReplaceTempView(users);spark.udf().register(myavg,udaf(new MyAvg(),Encoders.LONG()));spark.sql(SELECT dept,myavg(age) FROM users group by dept).show();spark.close();}Datapublic static class Buffer implements Serializable{private Long sum;private Long count;public Buffer(){}public Buffer(Long sum,Long count){this.sum sum;this.count count;}}public static class MyAvg extends AggregatorLong,Buffer,Double {Overridepublic Buffer zero() {return new Buffer(0L,0L);}Overridepublic Buffer reduce(Buffer buffer, Long num) {buffer.setSum(buffer.getSum() num);buffer.setCount(buffer.getCount()1);return buffer;}Overridepublic Buffer merge(Buffer b1, Buffer b2) {b1.setSum(b1.getSum()b2.getSum());b1.setCount(b1.getCount()b2.getCount());return b1;}Overridepublic Double finish(Buffer reduction) {return reduction.getSum().doubleValue()/reduction.getCount();}// 序列化缓冲区的数据Overridepublic EncoderBuffer bufferEncoder() {// 用kryo进行优化return Encoders.kryo(Buffer.class);}Overridepublic EncoderDouble outputEncoder() {return Encoders.DOUBLE();}}}
Aggregator 有三个泛型参数分别是输入类型缓存类型和输出类型需要重写的方法很好理解其中 bufferEncoder 和 outputEncoder 这两个方法是用来序列化缓冲区和输出端的数据这对于分布式处理环境尤为重要因为数据需要在网络中传输或存储到磁盘上。
运行结果 3、Spark SQL 数据的加载和保存 Spark SQL 会把读取进来的文件封装为一个 DataFrame 对象DataSetRow所以 Spark SQL 加载数据源的过程就是创建 DataFrame 的过程。
3.1、创建 DataFrame
这里省去公共的环境代码
public class Main {public static void main(String[] args) {// 1. 创建配置对象SparkConf conf new SparkConf().setMaster(local[*]).setAppName(Spark Application名称);// 2. 创建 SparkSessionSparkSession spark SparkSession.builder().config(conf).getOrCreate();// 只在提交 Spark Application 时有效spark.sparkContext().setLogLevel(WARN);// 3. 业务代码// 4. 关闭 sparkSessionspark.close();}
}
3.1.1、通过 JVM 对象创建
注意Spark SQL 中用到的 Java Bean 必须提供 getter 和 setter 、无参构造而且所有属性必须为 public 修饰的。 User user1 new User(汤姆, 11L);User user2 new User(李大喜, 18L);User user3 new User(燕双鹰, 18L);User user4 new User(狄仁杰, 11L);DatasetRow df spark.createDataFrame(Arrays.asList(user1, user2, user3, user4), User.class);df.show(); 这里的 df.show 就相当于注册了一张临时表然后 select * from 这张表。
运行结果 3.1.2、csv 文件
注意Spark 读取 csv 文件时读进来的字段都是 String 类型所以如果有需求需要把 csv 中的数据封装转为 Bean 的时候对于任何类型的数据都必须使用 getString 来读取读取进来再做转换。比如下面我们把读取进来的 csv 文件使用 map 函数转为 dataset 再做查询
注意通过 csv 读取进来的 DataFrame 并没有 schema 信息也不能通过 as 方法转为 DataSet 方法因为 DataFrame 的列名和类型都是 _c0 string , _c1 string ... 和 User 的属性名根本匹配不上所以只能通过 map 函数来把 DataFrame 转为 DataSet 这样它才有了类型信息。 // 加载 csv 文件DatasetRow df spark.read().option(sep, ,) // 使用 sep 或者 delimiter 效果一样.option(header, false).csv(src/main/resources/csv/user.csv);// 转为 dataset 展示df.map((MapFunctionRow, User) row - new User(row.getString(0),Long.parseLong(row.getString(1)),row.getString(2)),Encoders.bean(User.class) // 这个 Bean 必须提供 getter 和 setter 方法否则报错).show();
运行结果 将结果写入到 csv 文件中
写入到 csv 文件不能通过 DataFrame 直接写因为现在它连 schema 都没有sql 中的字段它都识别不了。所以必须先转为 DataSet 再去查询出结果写入到文件 // 加载 csv 文件DatasetRow df spark.read().option(seq, ,).option(header, false).csv(src/main/resources/csv/user.csv);df.printSchema();// 不能这么转 因为 DataFrame 没有模式信息 字段名默认是 _c0,_c1 ... 和 User 的属性名完全匹配不上 会报错!// DatasetUser ds df.as(Encoders.bean(User.class));DatasetUser ds df.map((MapFunctionRow, User) row - new User(row.getString(0), Long.parseLong(row.getString(1)), row.getString(2)),Encoders.bean(User.class));ds.printSchema();ds.createOrReplaceTempView(users);spark.sql(SELECT CONCAT(name,大侠) name, age FROM users WHERE age 18).write().option(header,true).option(seq,\t).csv(output);
运行结果 3.1.3、json 文件
注意Spark 在读取 json 文件时默认把 int 类型的值当做 bigint 如果我们使用 row.getInt 去解析时就会直接报错因为是小转大所以我们的 Bean 的整型应该升级为长整型 Long 才不会报错。此外Spark 读取 json 文件后封装成的 Row 对象是以 json 的字段作为索引的是根据索引的 ASCII 码进行排序之后再从 0 开始排的而不是按照 json 文件中的字段顺序这也是一个坑点。 DatasetRow df spark.read().json(src/main/resources/json/user.json);DatasetUser ds df.map((MapFunctionRow, User) row - new User(row.getString(2),row.getLong(0),row.getString(1)),Encoders.bean(User.class));ds.show(); 所以一般不会用上面的这种方式去读取 json因为我们无法自己预估排序后的字段索引值。我们一般直接把 json 转为 DataFrame 之后立即转为 DataSet 进行操作或者直接把 DataFrame 对象注册为临时表然后使用 SQL 进行分析。
将结果写入到 json 文件 下面我们把 json 读取进来解析为 DataFrame 之后直接注册为临时表——用户表然后用 sql 进行分析Spark SQL 支持 HQL 中的所有语法所以这里试用一下窗口函数 DatasetRow df spark.read().json(src/main/resources/json/user.json);df.createOrReplaceTempView(users);spark.sql(SELECT name,ROW_NUMBER() OVER(PARTITION BY dept ORDER BY age) rk FROM users).write().json(users_rk);
这里的 user_rk 是输出文件的目录名最终会生成四个文件两个 CRC 校验文件一个 SUCCESS 和 生成的 json 文件。
运行结果 我们这里直接用 DataFrame 来将分析出结果写入到 json 文件但是上面的 csv 就不可以因为 json 文件自带字段名而字段类型 Spark 是可以识别的。
3.2、与 MySQL 交互
导入 MySQL 依赖 dependencygroupIdmysql/groupIdartifactIdmysql-connector-java/artifactIdversion8.0.31/version/dependency
public static void main(String[] args) {// 1. 创建配置对象SparkConf conf new SparkConf().setMaster(local[*]).setAppName(read from mysql);// 2. 创建 SparkSessionSparkSession spark SparkSession.builder().config(conf).getOrCreate();DatasetRow df spark.read().format(jdbc).option(url, jdbc:mysql://127.0.0.1:3306/spark).option(user, root).option(password, Yan1029.).option(dbtable, student).load();df.select(*).show();spark.close();}
3.3、与 Hive 交互
导入依赖 dependencygroupIdorg.apache.spark/groupIdartifactIdspark-hive_2.12/artifactIdversion3.3.1/version/dependency
拷贝 hive-site.xml到resources目录如果需要操作Hadoop需要拷贝hdfs-site.xml、core-site.xml、yarn-site.xml然后启动 Hadoop 和 Hive。 public static void main(String[] args) {System.setProperty(HADOOP_USER_NAME,lyh);// 1. 创建配置对象SparkConf conf new SparkConf().setMaster(local[*]).setAppName(spark sql operate hive);// 2. 获取 SparkSessionSparkSession spark SparkSession.builder().enableHiveSupport() // 添加 hive 支持.config(conf).getOrCreate();spark.sql(show tables).show();// 4. 关闭 SparkSessionspark.close();} 运行结果 4、Spark SQL 练习
4.1、统计每个商品的销量最高的日期
从订单明细表order_detail中统计出每种商品销售件数最多的日期及当日销量如果有同一商品多日销量并列的情况取其中的最小日期
public static void main(String[] args) {System.setProperty(HADOOP_USER_NAME,lyh);// 1. 创建配置对象SparkConf conf new SparkConf().setMaster(local[*]).setAppName(spark sql operate hive);// 2. 获取 SparkSessionSparkSession spark SparkSession.builder().enableHiveSupport() // 添加 hive 支持.config(conf).getOrCreate();spark.sql(use db_hive2);// order_detail_id order_id sku_id create_date price sku_num// 每件商品的最高销量spark.sql(SELECT sku_id, create_date, sum_num FROM (SELECT sku_id, create_date, sum_num, ROW_NUMBER() OVER(PARTITION BY sku_id ORDER BY sum_num DESC,create_date ASC) rk FROM (SELECT sku_id, create_date, sum(sku_num) sum_num FROM order_detail GROUP BY sku_id,create_date)t1)t2 WHERE rk 1).show();// 4. 关闭 SparkSessionspark.close();}
上面个的代码就像在写 HQL 一样我们可以把其中的子表提出来创建为临时表
public static void main(String[] args) {System.setProperty(HADOOP_USER_NAME,lyh);// 1. 创建配置对象SparkConf conf new SparkConf().setMaster(local[*]).setAppName(spark sql operate hive);// 2. 获取 SparkSessionSparkSession spark SparkSession.builder().enableHiveSupport() // 添加 hive 支持.config(conf).getOrCreate();spark.sql(use db_hive2);// order_detail_id order_id sku_id create_date price sku_num// 每件商品的最高销量spark.sql(SELECT sku_id, create_date, sum(sku_num) sum_num FROM order_detail GROUP BY sku_id,create_date).createOrReplaceTempView(t1);spark.sql(SELECT sku_id, create_date, sum_num, ROW_NUMBER() OVER(PARTITION BY sku_id ORDER BY sum_num DESC,create_date ASC) rk FROM t1).createOrReplaceTempView(t2);spark.sql(SELECT sku_id, create_date, sum_num FROM t2 WHERE rk 1).show();// 4. 关闭 SparkSessionspark.close();} 没啥难度这就是官网说的使用 Spark SQL 或者 HQL 来操作数仓中的数据之后做个 Spark SQL 项目多练练手就行了。
运行结果