当前位置: 首页 > news >正文

宣传性网站建设策划方案网络营销专业课程

宣传性网站建设策划方案,网络营销专业课程,网站建设制作浩森宇特,新手建站广告联盟赚钱一、SparkSQL Spark SQL和我们之前讲Hive的时候说的hive on spark是不一样的。hive on spark是表示把底层的mapreduce引擎替换为spark引擎。而Spark SQL是Spark自己实现的一套SQL处理引擎。Spark SQL是Spark中的一个模块,主要用于进行结构化数据的处理。它提供的最核…

一、SparkSQL

        Spark SQL和我们之前讲Hive的时候说的hive on spark是不一样的。hive on spark是表示把底层的mapreduce引擎替换为spark引擎。而Spark SQL是Spark自己实现的一套SQL处理引擎。Spark SQL是Spark中的一个模块,主要用于进行结构化数据的处理。它提供的最核心的编程抽象,就是DataFrame。
        DataFrame=RDD+Schema 。
        它其实和关系型数据库中的表非常类似,RDD可以认为是表中的数据,Schema是表结构信息。DataFrame可以通过很多来源进行构建,包括:结构化的数据文件,Hive中的表,外部的关系型数据库,以及RDD

注意:
Spark1.3出现的 DataFrame ,Spark1.6出现了 DataSet ,在Spark2.0中两者统一,DataFrame等于DataSet[Row]

二、SparkSession

        要使用Spark SQL,首先需要创建一个SpakSession对象。SparkSession中包含了SparkContext和SqlContext,所以说想通过SparkSession来操作RDD的话需要先通过它来获取SparkContext 这个SqlContext是使用sparkSQL操作hive的时候会用到的。

 SparkSession包含了SparkContext和SqlContext

(1)SparkContext 用于操作RDD

(2)SqlContext 用于操作hive

正常使用SparkSession操作DataFrame就可以了

三、创建DataFrame

        使用SparkSession,可以从RDD、HIve表或者其它数据源创建DataFrame
        那下面我们来使用JSON文件来创建一个DataFrame

1. 引用依赖

        <dependency><groupId>org.apache.spark</groupId><artifactId>spark-sql_2.11</artifactId><version>2.4.3</version>
<!--            <scope>provided</scope>--></dependency>

2. Scala代码

package com.sanqian.scala.sqlimport org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSessionobject SqlDemoScala {def main(args: Array[String]): Unit = {val conf = new SparkConf().setMaster("local")//创建SparkSession对象,里面包含SparkContext和SqlContextval sparkSession = SparkSession.builder().appName("SqlDemoScala").config(conf).getOrCreate()//读取json文件, 创建DataFrameval df = sparkSession.read.json("D:\\data\\spark\\student.json")//查DataFrame中的数据df.show()sparkSession.stop()}
}

3. Java代码

package com.sanqian.java.sql;import org.apache.spark.SparkConf;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;public class SqlDemoJava {public static void main(String[] args) {SparkConf conf = new SparkConf();conf.setMaster("local");//创建SparkSession对象,里面包含SparkContext和SqlContextSparkSession sparkSession = SparkSession.builder().appName("SqlDemoJava").config(conf).getOrCreate();//读取json文件,获取DataSet<Row>Dataset<Row> df = sparkSession.read().json("D:\\data\\spark\\student.json");df.show();sparkSession.stop();}
}

4. DataFrame 和DataSet的转换 

        由于DataFrame等于DataSet[Row],它们两个可以互相转换,所以创建哪个都是一样的。咱们前面的scala代码默认创建的是DataFrame,java代码默认创建的是DataSet。
        尝试对他们进行转换
        1)在Scala代码中将DataFrame转换为DataSet[Row],对后面的操作没有影响

//将DataFrame转换为DataSet[Row]
val stuDf = sparkSession.read.json("D:\\student.json").as("stu")

        2)在Java代码中将DataSet[Row]转换为DataFrame

//将Dataset<Row>转换为DataFrame
Dataset<Row> stuDf = sparkSession.read().json("D:\\student.json").toDF();

四、DataFrame常见算子操作

1. 官方文档

 

2. DataFrame算子

• printSchema()  : 打印schema信息
• show() :默认显示所有的数据,可以通过参数控制显示多少条
• select() : 查询数据中指定字段信息,在使用$对数据做一些操作,需要添加隐式转换函数,否则语法报错
• filter()、where() :对数据进行 过滤,where底层调用的就是filter
• groupBy() : 对数据进行分组求和
• count() : 求和

注意:在使用$对数据做一些操作,需要添加隐式转换函数,否则语法报错

import sparkSession.implicits._

Scala代码

package com.sanqian.scala.sqlimport org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSessionobject DataFrameOpScala {def main(args: Array[String]): Unit = {val conf = new SparkConf()conf.setMaster("local")val sparkSession = SparkSession.builder().appName("DataFrameOpScala").config(conf).getOrCreate()val df = sparkSession.read.json("D:\\data\\spark\\student.json")//打印schema信息df.printSchema()//默认显示所有数据,可以通过参数控制显示多少条df.show(2)//查询数据中的指定字段信息df.select("name", "age").show()//在使用select的时候可以对数据做一些操作,需要添加隐式转换函数,否则语法报错import sparkSession.implicits._df.select($"name", $"age" + 1).show()df.filter($"age" > 18).show()//对数据进行分组求和df.groupBy("age").count().show()sparkSession.stop()}
}

Java代码

package com.sanqian.java.sql;import org.apache.spark.SparkConf;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import static org.apache.spark.sql.functions.col;public class DataFrameOpJava {public static void main(String[] args) {SparkConf conf = new SparkConf();conf.setMaster("local");//创建SparkSession对象,里面包含SparkContext和SqlContextSparkSession sparkSession = SparkSession.builder().appName("DataFrameOpJava").config(conf).getOrCreate();Dataset<Row> ds = sparkSession.read().json("D:\\data\\spark\\student.json");//打印schema信息ds.printSchema();ds.show(2);ds.select("name", "age").show();//在select的时候可以对数据做一些操作,需要引入import static org.apache.spark.sql.functions.col;ds.select(col("name"), col("age").plus(1)).show();//对数据进行过滤ds.filter(col("age").gt(18)).show();ds.where(col("age").gt(18)).show();//对数据进行分组求和ds.groupBy("age").count().show();sparkSession.stop();}
}

        这些就是针对DataFrame的一些常见的操作。但是现在这种方式其实用起来还是不方便,只是提供了一些类似于可以操作表的算子,很对一些简单的查询还是可以的,但是针对一些复杂的操作,使用算子写起来就很麻烦了,所以我们希望能够直接支持用sql的方式执行,Spark SQL也是支持的。

五、DataFrame的sql操作

想要实现直接支持sql语句查询DataFrame中的数据
需要两步操作
1. 先将DataFrame注册为一个临时表
2. 使用sparkSession中的sql函数执行sql语句

1. Scala代码

package com.sanqian.scala.sqlimport org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSessionobject DataFrameSqlScala {def main(args: Array[String]): Unit = {val conf = new SparkConf()conf.setMaster("local")val sparkSession = SparkSession.builder().appName("DataFrameOpScala").config(conf).getOrCreate()val df = sparkSession.read.json("D:\\data\\spark\\student.json")//将DataFrame注册为一个临时表df.createOrReplaceTempView("student")//使用sql查询临时表中的数据sparkSession.sql("select age, count(*) as num from student group by age").show()sparkSession.stop()}
}

2. Java代码

package com.sanqian.java.sql;import org.apache.spark.SparkConf;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;public class DataFrameSqlJava {public static void main(String[] args) {SparkConf conf = new SparkConf();conf.setMaster("local");//创建SparkSession对象,里面包含SparkContext和SqlContextSparkSession sparkSession = SparkSession.builder().appName("DataFrameOpJava").config(conf).getOrCreate();Dataset<Row> ds = sparkSession.read().json("D:\\data\\spark\\student.json");ds.createOrReplaceTempView("student");sparkSession.sql("select age, count(*) as num from student group by age").show();sparkSession.stop();}
}

六、RDD转换为DataFrame

为什么要将RDD转换为DataFrame?
        在实际工作中我们可能会先把hdfs上的一些日志数据加载进来,然后进行一些处理,最终变成结构化的数据,希望对这些数据做一些统计分析,当然了我们可以使用spark中提供的transformation算子来实现,只不过会有一些麻烦,毕竟是需要写代码的,如果能够使用sql实现,其实是更加方便的。所以可以针对我们前面创建的RDD,将它转换为DataFrame,这样就可以使用dataFrame中的一些算子或者直接写sql来操作数据了。


Spark SQL支持这两种方式将RDD转换为DataFrame

1. 反射方式
2. 编程方式

(一)反射方式

 下面来看一下反射方式:
        这种方式是使用反射来推断RDD中的元数据。基于反射的方式,代码比较简洁,也就是说当你在写代码的时候,已经知道了RDD中的元数据,这样的话使用反射这种方式是一种非常不错的选择。Scala具有隐式转换的特性,所以spark sql的scala接口是支持自动将包含了case class的RDD转换为DataFrame的
        下面来举一个例子

1. scala代码

package com.sanqian.scala.sqlimport org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSessionobject RddToDataFrameByReflectScala {def main(args: Array[String]): Unit = {val conf = new SparkConf()conf.setMaster("local")val sparkSession = SparkSession.builder().appName("RddToDataFrameByReflectScala").config(conf).getOrCreate()val sc = sparkSession.sparkContextval rdd = sc.parallelize(Array(("jack", 18), ("tom", 20), ("jess", 30)))//基于反射直接将包含Student对象的RDD转换为DataFrame//需要导入隐式转换import sparkSession.implicits._val df = rdd.map(tup => Student(tup._1, tup._2)).toDF()df.createOrReplaceTempView("student")//执行sql查询val rdd2 = sparkSession.sql("select name, age from student where age > 18")rdd2.show()//将DataFrame转化为RDDrdd2.map(row => Student(row(0).toString, row(1).toString.toInt)).collect().foreach(println(_))//使用row的getAs()方法,获取指定列名的值rdd2.map(row => Student(row.getAs[String]("name"), row.getAs[Int]("age"))).collect().foreach(println(_))sparkSession.stop()}
}
//定义一个Student
case class Student(name: String, age: Int)

2. java代码

package com.sanqian.java.sql;import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import scala.Tuple2;
import sun.awt.windows.WPrinterJob;import java.io.Serializable;
import java.util.Arrays;
import java.util.List;public class RddToDataFrameByReflectJava {public static void main(String[] args) {SparkConf conf = new SparkConf();conf.setMaster("local");//创建SparkSession对象,里面包含SparkContext和SqlContextSparkSession sparkSession = SparkSession.builder().appName("RddToDataFrameByReflectJava").config(conf).getOrCreate();//获取SparkContext//从sparkSession中获取的是scala中的sparkContext,所以需要转换成java中的sparkContextJavaSparkContext sc = JavaSparkContext.fromSparkContext(sparkSession.sparkContext());Tuple2<String, Integer> t1 = new Tuple2<>("jack", 18);Tuple2<String, Integer> t2 = new Tuple2<>("tom", 20);Tuple2<String, Integer> t3 = new Tuple2<>("jess", 30);JavaRDD<Tuple2<String, Integer>> rdd = sc.parallelize(Arrays.asList(t1, t2, t3));JavaRDD<Student> rdd2 = rdd.map(new Function<Tuple2<String, Integer>, Student>() {@Overridepublic Student call(Tuple2<String, Integer> tup) throws Exception {return new Student(tup._1, tup._2);}});//注意:Student这个类必须声明为public,并且必须实现序列化Dataset<Row> df = sparkSession.createDataFrame(rdd2, Student.class);df.createOrReplaceTempView("student");//执行sql查询Dataset<Row> df2 = sparkSession.sql("select name, age from student where age > 18");df2.show();//将DataFrame转化为RDD,注意:这里需要转为JavaRDDJavaRDD<Row> resRDD = df2.javaRDD();//从row中取数据,封装成student,打印到控制台List<Student> resList = resRDD.map(new Function<Row, Student>() {@Overridepublic Student call(Row row) throws Exception {return new Student(row.getAs("name").toString(), Integer.parseInt(row.getAs("age").toString()));}}).collect();for(Student stu: resList){System.out.println(stu);}sparkSession.stop();}
}

(二)编程方式

        这种方式是通过编程接口来创建DataFrame,你可以在程序运行时动态构建一份元数据,就是Schema,然后将其应用到已经存在的RDD上。这种方式的代码比较冗长,但是如果在编写程序时,还不知道RDD的元数据,只有在程序运行时,才能动态得知其元数据,那么只能通过这种动态构建元数据的方式。也就是说当case calss中的字段无法预先定义的时候,就只能用编程方式动态指定元数据了。

1. Scala代码

package com.sanqian.scala.sqlimport org.apache.spark.SparkConf
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
import org.apache.spark.sql.{Row, SparkSession}/*** 需求:使用编程方式实现RDD转换为DataFrame**/
object RddToDataFrameByProgramScala {def main(args: Array[String]): Unit = {val conf = new SparkConf().setMaster("local")//创建SparkSession对象,里面包含SparkContext和SqlContextval sparkSession = SparkSession.builder().appName("RddToDataFrameByProgramScala").config(conf).getOrCreate()//获取SparkContextval sc = sparkSession.sparkContextval dataRDD = sc.parallelize(Array(("jack", 18), ("tom", 20), ("jessic", 30)))//组装rowRDDval rowRDD = dataRDD.map(tup => Row(tup._1, tup._2))//指定元数据信息【这个元数据信息就可以动态从外部获取了,比较灵活】val schema = StructType(Array(StructField("name", StringType, true),StructField("age", IntegerType, true)))//组装DataFrameval stuDf = sparkSession.createDataFrame(rowRDD, schema)//下面就可以通过DataFrame的方式操作dataRDD中的数据了stuDf.createOrReplaceTempView("student")//执行sql查询val resDf = sparkSession.sql("select name,age from student where age > 18")//将DataFrame转化为RDDval resRDD = resDf.rddresRDD.map(row => (row(0).toString, row(1).toString.toInt)).collect().foreach(println(_))sparkSession.stop()}
}

2. Java代码

package com.sanqian.java.sql;import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import scala.Tuple2;import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;public class RddToDataFrameByProgramJava {public static void main(String[] args) {SparkConf conf = new SparkConf();conf.setMaster("local");
//创建SparkSession对象,里面包含SparkContext和SqlContextSparkSession sparkSession = SparkSession.builder().appName("RddToDataFrameByProgramJava").config(conf).getOrCreate();//获取SparkContext//从sparkSession中获取的是scala中的sparkContext,所以需要转换成java中的sparkContextJavaSparkContext sc = JavaSparkContext.fromSparkContext(sparkSession.sparkContext());Tuple2<String, Integer> t1 = new Tuple2<String, Integer>("jack", 18);Tuple2<String, Integer> t2 = new Tuple2<String, Integer>("tom", 20);Tuple2<String, Integer> t3 = new Tuple2<String, Integer>("jessic",30);JavaRDD<Tuple2<String, Integer>> dataRDD = sc.parallelize(Arrays.asList(t1, t2, t3));
//组装rowRDDJavaRDD<Row> rowRDD = dataRDD.map(new Function<Tuple2<String, Integer >, Row >() {@Overridepublic Row call (Tuple2 < String, Integer > tup) throws Exception {return RowFactory.create(tup._1, tup._2);}});
//指定元数据信息ArrayList<StructField> structFieldList = new ArrayList<StructField>();structFieldList.add(DataTypes.createStructField("name", DataTypes.StringType, true));structFieldList.add(DataTypes.createStructField("age", DataTypes.IntegerType, true));StructType schema = DataTypes.createStructType(structFieldList);
//构建DataFrameDataset<Row> stuDf = sparkSession.createDataFrame(rowRDD, schema);stuDf.createOrReplaceTempView("student");
//执行sql查询Dataset<Row> resDf = sparkSession.sql("select name,age from student where age > 18");
//将DataFrame转化为RDD,注意:这里需要转为JavaRDDJavaRDD < Row > resRDD = resDf.javaRDD();List<Tuple2<String, Integer>> resList = resRDD.map(new Function<Row, Tuple2<String, Integer>>() {@Overridepublic Tuple2<String, Integer> call(Row row) throws Exception {return new Tuple2<String, Integer>(row.getString(0), row.getInt(1));}}).collect();for (Tuple2<String, Integer> tup : resList) {System.out.println(tup);}sparkSession.stop();}
}

 七、load和save操作

        对于Spark SQL的DataFrame来说,无论是从什么数据源创建出来的DataFrame,都有一些共同的load和save操作。
        load操作主要用于加载数据,创建出DataFrame;
        save操作,主要用于将DataFrame中的数据保存到文件中。

        我们前面操作json格式的数据的时候好像没有使用load方法,而是直接使用的json方法,这是什么特殊用法吗?查看json方法的源码会发现,它底层调用的是format和load方法

def json(paths: String*): DataFrame = format("json").load(paths : _*)

注意:如果看不到源码,需要点击idea右上角的download source提示信息下载依赖的源码。

        我们如果使用原始的format和load方法加载数据,此时如果不指定format,则默认读取的数据源格式是parquet,也可以手动指定数据源格式。Spark SQL 内置了一些常见的数据源类型,比如json, parquet, jdbc, orc, csv, text 通过这个功能,就可以在不同类型的数据源之间进行转换了。

1. Scala代码

package com.sanqian.scala.sqlimport org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSessionobject LoadAndSaveOpScala {def main(args: Array[String]): Unit = {val conf = new SparkConf().setMaster("local")//创建SparkSession对象,里面包含SparkContext和SqlContextval sparkSession = SparkSession.builder().appName("LoadAndSaveOpScala").config(conf).getOrCreate()//读取数据val stuDf = sparkSession.read.format("json").load("D:\\data\\spark\\student.json")//保存数据stuDf.select("name", "age").write.format("csv").save("hdfs://bigdata01:9000/out-save001")sparkSession.stop()}
}

2. Java代码

package com.sanqian.java.sql;import org.apache.spark.SparkConf;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;public class LoadAndSaveOpJava {public static void main(String[] args) {SparkConf conf = new SparkConf();conf.setMaster("local");//创建SparkSession对象,里面包含SparkContext和SqlContextSparkSession sparkSession = SparkSession.builder().config(conf).appName("LoadAndSaveOpJava").getOrCreate();//读取数据Dataset<Row> df = sparkSession.read().format("json").load("D:\\data\\spark\\student.json");//保存数据df.select("name", "age").write().format("csv").save("hdfs://bigdata01:9000/out-save002");sparkSession.stop();}
}

八、SaveMode

        Spark SQL对于save操作,提供了不同的save mode。
        主要用来处理,当目标位置已经有数据时应该如何处理。save操作不会执行锁操作,并且也不是原子的,因此是有一定风险出现脏数据的。

SaveMode        解释
1SaveMode.ErrorIfExists (默认)如果目标位置已经存在数据,那么抛出一个异常
2SaveMode.Append如果目标位置已经存在数据,那么将数据追加进去
3SaveMode.Overwrite如果目标位置已经存在数据,那么就将已经存在的数据删除,用新数据进行覆盖
4SaveMode.Ignore如果目标位置已经存在数据,那么就忽略,不做任何操作
//保存数据df.select("name", "age").write().format("csv").mode(SaveMode.Append).save("hdfs://bigdata01:9000/out-save002");

执行之后的结果确实是追加到之前的结果目录中了

九、内置函数

        Spark中提供了很多内置的函数

(1)聚合函数: avg, count, countDistinct, first, last, max, mean, min, sum, sumDistinct

(2)集合函数:array_contains, explode, size

(3)日期/时间函数: datediff, date_add, date_sub, add_months, last_day, next_day, months_between, current_date, current_timestamp, date_format

(4)数学函数:abs, ceil, floor, round

(5)混合函数:if, isnull, md5, not, rand, when

(6)字符串函数:concat, get_json_object, length, reverse, split, upper

(7)窗口函数:denseRank, rank, rowNumber

其实这里面的函数和hive中的函数是类似的

注意: SparkSQL中的SQL函数文档不全,其实在使用这些函数的时候,大家完全可以去查看hive中sql的文档,使用的时候都是一样的。

http://www.tj-hxxt.cn/news/4025.html

相关文章:

  • 江苏系统建站怎么用google浏览器官网入口
  • 网站建设 浙icp 0578网站自动提交收录
  • 网站建设中服务器的搭建方式有几种百度推广助手官方下载
  • 湖北中牛建设有限公司网站nba最新消息交易情况
  • 美国电子政务门户网站建设如何利用seo赚钱
  • 阿里巴巴网站规划长沙seo平台
  • 个性个人网站模板宁阳网站seo推广
  • 建设一个旅游电子商务网站外链link
  • 江西做网站多少钱市场监督管理局职责
  • 怎样建设一个卡盟网站谷歌广告投放
  • 3d做网站seo网站推广全程实例
  • 做积分网站百度商务合作联系
  • 惠州网站建设web91哪里有软件培训班
  • 学校网站设计图片北京seo排名外包
  • 做本地分类信息网站赚钱吗什么是百度竞价排名服务
  • wordpress 制作网站模板什么是淘宝搜索关键词
  • net的电商网站建设企业做推广有几种方式
  • 专业网站制作公司地址制作网站要多少费用
  • 在线甜点订购网站开发需求分析谷歌广告平台
  • 网站建设不一定当地网络营销方式有哪些分类
  • 青海培训网站建设公司谷歌关键词搜索
  • 杭州网络游戏公司排名百度seo什么意思
  • 怎么查看网站是什么软件做的seo信息查询
  • 自己动手创建一个公司网站百度云搜索引擎入口盘搜搜
  • 网站建设有哪些软件网站运营指标
  • 免费的seo网站膝北京seo排名厂家
  • 网站建设字体变色代码产品推广渠道有哪些方式
  • 长沙大的建网站公司长沙市seo百度关键词
  • 中国建行官方网站郑州网站推广方案
  • 网站上线所需的东西哪里好优化大师破解版app