当前位置: 首页 > news >正文

西安企业网站建设代理机构国际要闻时事快报

西安企业网站建设代理机构,国际要闻时事快报,国外网站代理,深入浅出wordpress pdf文章目录 1、基本操作1.1、创建SparkSession1.2、创建DataFrames1.3、创建Dataset操作1.4、运行sql查询1.5、创建全局临时视图1.6、创建Datasets1.7、与rdd进行互操作1.7.1、使用反射推断模式1.7.2、以编程方式指定模式 2、完整的测试例子 1、基本操作 1.1、创建SparkSession … 文章目录 1、基本操作1.1、创建SparkSession1.2、创建DataFrames1.3、创建Dataset操作1.4、运行sql查询1.5、创建全局临时视图1.6、创建Datasets1.7、与rdd进行互操作1.7.1、使用反射推断模式1.7.2、以编程方式指定模式 2、完整的测试例子 1、基本操作 1.1、创建SparkSession import org.apache.spark.sql.SparkSession;SparkSession spark SparkSession .builder() .appName(Java Spark SQL basic example) .config(spark.some.config.option, some-value) .getOrCreate();1.2、创建DataFrames import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row;DatasetRow df spark.read().json(examples/src/main/resources/people.json);// Displays the content of the DataFrame to stdout df.show(); // ----------- // | age| name| // ----------- // |null|Michael| // | 30| Andy| // | 19| Justin| // -----------1.3、创建Dataset操作 // col(...) is preferable to df.col(...) import static org.apache.spark.sql.functions.col;// Print the schema in a tree format df.printSchema(); // root // |-- age: long (nullable true) // |-- name: string (nullable true)// Select only the name column df.select(name).show(); // ------- // | name| // ------- // |Michael| // | Andy| // | Justin| // -------// Select everybody, but increment the age by 1 df.select(col(name), col(age).plus(1)).show(); // ---------------- // | name|(age 1)| // ---------------- // |Michael| null| // | Andy| 31| // | Justin| 20| // ----------------// Select people older than 21 df.filter(col(age).gt(21)).show(); // ------- // |age|name| // ------- // | 30|Andy| // -------// Count people by age df.groupBy(age).count().show(); // --------- // | age|count| // --------- // | 19| 1| // |null| 1| // | 30| 1| // ---------1.4、运行sql查询 import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row;// Register the DataFrame as a SQL temporary view df.createOrReplaceTempView(people);DatasetRow sqlDF spark.sql(SELECT * FROM people); sqlDF.show(); // ----------- // | age| name| // ----------- // |null|Michael| // | 30| Andy| // | 19| Justin| // -----------1.5、创建全局临时视图 // Register the DataFrame as a global temporary view df.createGlobalTempView(people);// Global temporary view is tied to a system preserved database global_temp spark.sql(SELECT * FROM global_temp.people).show(); // ----------- // | age| name| // ----------- // |null|Michael| // | 30| Andy| // | 19| Justin| // -----------// Global temporary view is cross-session spark.newSession().sql(SELECT * FROM global_temp.people).show(); // ----------- // | age| name| // ----------- // |null|Michael| // | 30| Andy| // | 19| Justin| // -----------1.6、创建Datasets import java.util.Arrays; import java.util.Collections; import java.io.Serializable;import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.Encoder; import org.apache.spark.sql.Encoders;public static class Person implements Serializable {private String name;private long age;public String getName() {return name;}public void setName(String name) {this.name name;}public long getAge() {return age;}public void setAge(long age) {this.age age;} }// Create an instance of a Bean class Person person new Person(); person.setName(Andy); person.setAge(32);// Encoders are created for Java beans EncoderPerson personEncoder Encoders.bean(Person.class); DatasetPerson javaBeanDS spark.createDataset(Collections.singletonList(person),personEncoder ); javaBeanDS.show(); // ------- // |age|name| // ------- // | 32|Andy| // -------// Encoders for most common types are provided in class Encoders EncoderLong longEncoder Encoders.LONG(); DatasetLong primitiveDS spark.createDataset(Arrays.asList(1L, 2L, 3L), longEncoder); DatasetLong transformedDS primitiveDS.map((MapFunctionLong, Long) value - value 1L,longEncoder); transformedDS.collect(); // Returns [2, 3, 4]// DataFrames can be converted to a Dataset by providing a class. Mapping based on name String path examples/src/main/resources/people.json; DatasetPerson peopleDS spark.read().json(path).as(personEncoder); peopleDS.show(); // ----------- // | age| name| // ----------- // |null|Michael| // | 30| Andy| // | 19| Justin| // -----------1.7、与rdd进行互操作 1.7.1、使用反射推断模式 Spark SQL支持将JavaBeans的RDD自动转换为DataFrame。使用反射获得的BeanInfo定义了表的模式。目前Spark SQL不支持包含Map字段的JavaBeans。但是支持嵌套JavaBeans和List或Array字段。您可以通过创建一个实现Serializable的类来创建JavaBean并且该类的所有字段都有getter和setter。 import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.function.Function; import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.Encoder; import org.apache.spark.sql.Encoders;// Create an RDD of Person objects from a text file JavaRDDPerson peopleRDD spark.read().textFile(examples/src/main/resources/people.txt).javaRDD().map(line - {String[] parts line.split(,);Person person new Person();person.setName(parts[0]);person.setAge(Integer.parseInt(parts[1].trim()));return person;});// Apply a schema to an RDD of JavaBeans to get a DataFrame DatasetRow peopleDF spark.createDataFrame(peopleRDD, Person.class); // Register the DataFrame as a temporary view peopleDF.createOrReplaceTempView(people);// SQL statements can be run by using the sql methods provided by spark DatasetRow teenagersDF spark.sql(SELECT name FROM people WHERE age BETWEEN 13 AND 19);// The columns of a row in the result can be accessed by field index EncoderString stringEncoder Encoders.STRING(); DatasetString teenagerNamesByIndexDF teenagersDF.map((MapFunctionRow, String) row - Name: row.getString(0),stringEncoder); teenagerNamesByIndexDF.show(); // ------------ // | value| // ------------ // |Name: Justin| // ------------// or by field name DatasetString teenagerNamesByFieldDF teenagersDF.map((MapFunctionRow, String) row - Name: row.StringgetAs(name),stringEncoder); teenagerNamesByFieldDF.show(); // ------------ // | value| // ------------ // |Name: Justin| // ------------1.7.2、以编程方式指定模式 当JavaBean类不能提前定义时(例如记录的结构被编码为字符串或者文本数据集将被解析字段将以不同的方式投影给不同的用户)可以通过三个步骤以编程方式创建dataset 。 从原始RDD的行创建一个RDD;创建由StructType表示的模式该模式与步骤1中创建的RDD中的Rows结构相匹配。通过SparkSession提供的createDataFrame方法将模式应用到RDD的行。 import java.util.ArrayList; import java.util.List;import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.function.Function;import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row;import org.apache.spark.sql.types.DataTypes; import org.apache.spark.sql.types.StructField; import org.apache.spark.sql.types.StructType;// Create an RDD JavaRDDString peopleRDD spark.sparkContext().textFile(examples/src/main/resources/people.txt, 1).toJavaRDD();// The schema is encoded in a string String schemaString name age;// Generate the schema based on the string of schema ListStructField fields new ArrayList(); for (String fieldName : schemaString.split( )) {StructField field DataTypes.createStructField(fieldName, DataTypes.StringType, true);fields.add(field); } StructType schema DataTypes.createStructType(fields);// Convert records of the RDD (people) to Rows JavaRDDRow rowRDD peopleRDD.map((FunctionString, Row) record - {String[] attributes record.split(,);return RowFactory.create(attributes[0], attributes[1].trim()); });// Apply the schema to the RDD DatasetRow peopleDataFrame spark.createDataFrame(rowRDD, schema);// Creates a temporary view using the DataFrame peopleDataFrame.createOrReplaceTempView(people);// SQL can be run over a temporary view created using DataFrames DatasetRow results spark.sql(SELECT name FROM people);// The results of SQL queries are DataFrames and support all the normal RDD operations // The columns of a row in the result can be accessed by field index or by field name DatasetString namesDS results.map((MapFunctionRow, String) row - Name: row.getString(0),Encoders.STRING()); namesDS.show(); // ------------- // | value| // ------------- // |Name: Michael| // | Name: Andy| // | Name: Justin| // -------------2、完整的测试例子 本例子代码是在window下测试需要下载https://github.com/steveloughran/winutils解压放在hadoop对应目录 package com.penngo.spark;import org.apache.log4j.Level; import org.apache.log4j.Logger; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.function.Function; import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.sql.*; import org.apache.spark.sql.types.DataTypes; import org.apache.spark.sql.types.StructField; import org.apache.spark.sql.types.StructType;import java.io.Serializable; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.List;import static org.apache.spark.sql.functions.col;public class SparkDataset {private static final String jsonPath D:\\hadoop\\spark\\resources\\people.json;private static final String txtPath D:\\hadoop\\spark\\resources\\people.txt;public static class Person implements Serializable {private String name;private long age;public String getName() {return name;}public void setName(String name) {this.name name;}public long getAge() {return age;}public void setAge(long age) {this.age age;}}public static void createDataFrame(SparkSession spark) throws Exception{// 创建DataFrameDatasetRow df spark.read().json(jsonPath);df.show();// 操作operations(df);// sql查询sqlQuery(spark, df);}public static void operations(DatasetRow df){df.printSchema();// root// |-- age: long (nullable true)// |-- name: string (nullable true)// Select only the name columndf.select(name).show();// -------// | name|// -------// |Michael|// | Andy|// | Justin|// -------// Select everybody, but increment the age by 1df.select(col(name), col(age).plus(1)).show();// ----------------// | name|(age 1)|// ----------------// |Michael| null|// | Andy| 31|// | Justin| 20|// ----------------// Select people older than 21df.filter(col(age).gt(21)).show();// -------// |age|name|// -------// | 30|Andy|// -------// Count people by agedf.groupBy(age).count().show();// ---------// | age|count|// ---------// | 19| 1|// |null| 1|// | 30| 1|// ---------}/*** SQL查询*/public static void sqlQuery(SparkSession spark, DatasetRow df) throws Exception{// 临时视图会话消失视图也会消失df.createOrReplaceTempView(people);DatasetRow sqlDF spark.sql(SELECT * FROM people);sqlDF.show();// 全局视图全局临时视图绑定到系统保留的数据库 global_temp df.createGlobalTempView(people);spark.sql(SELECT * FROM global_temp.people).show();// -----------// | age| name|// -----------// |null|Michael|// | 30| Andy|// | 19| Justin|// -----------// 全局临时视图是跨会话的spark.newSession().sql(SELECT * FROM global_temp.people).show();// -----------// | age| name|// -----------// |null|Michael|// | 30| Andy|// | 19| Justin|// -----------}public static void createDataset(SparkSession spark){// 列表转成datasetPerson person new Person();person.setName(Andy);person.setAge(32);EncoderPerson personEncoder Encoders.bean(Person.class);DatasetPerson javaBeanDS spark.createDataset(Collections.singletonList(person),personEncoder);System.out.println(createDataset show);javaBeanDS.show();// -------// |age|name|// -------// | 32|Andy|// -------EncoderLong longEncoder Encoders.LONG();DatasetLong primitiveDS spark.createDataset(Arrays.asList(1L, 2L, 3L), longEncoder);DatasetLong transformedDS primitiveDS.map((MapFunctionLong, Long) value - value 1L,longEncoder);transformedDS.collect(); // Returns [2, 3, 4]// 读取文件转成datasetDatasetPerson peopleDS spark.read().json(jsonPath).as(personEncoder);peopleDS.show();// -----------// | age| name|// -----------// |null|Michael|// | 30| Andy|// | 19| Justin|// -----------}/*** 非Bean的方式转换rdd-DataFrame-Dataset* param spark* throws Exception*/public static void rddToDataset(SparkSession spark) throws Exception{// 读取文件生成一个Person类型的RDDJavaRDDPerson peopleRDD spark.read().textFile(txtPath).javaRDD().map(line - {String[] parts line.split(,);Person person new Person();person.setName(parts[0]);person.setAge(Integer.parseInt(parts[1].trim()));return person;});// RDD转成DataFrameDatasetRow peopleDF spark.createDataFrame(peopleRDD, Person.class);// 把DataFrame注册为临时视图peopleDF.createOrReplaceTempView(people);// SQL语句可以通过spark提供的SQL方法来运行DatasetRow teenagersDF spark.sql(SELECT name FROM people WHERE age BETWEEN 13 AND 19);// 结果中一行的列可以通过字段索引访问EncoderString stringEncoder Encoders.STRING();DatasetString teenagerNamesByIndexDF teenagersDF.map((MapFunctionRow, String) row - Name: row.getString(0),stringEncoder);teenagerNamesByIndexDF.show();// ------------// | value|// ------------// |Name: Justin|// ------------// 也可以通过字段名访问DatasetString teenagerNamesByFieldDF teenagersDF.map((MapFunctionRow, String) row - Name: row.StringgetAs(name),stringEncoder);teenagerNamesByFieldDF.show();// ------------// | value|// ------------// |Name: Justin|// ------------}/*** 非Bean的方式转换rdd-DataFrame-Dataset* param spark* throws Exception*/public static void rddToDataset2(SparkSession spark) throws Exception{// 创建RDDJavaRDDString peopleRDD spark.sparkContext().textFile(txtPath, 1).toJavaRDD();// 字段字义String schemaString name age;// 根据schema的字符串生成schemaListStructField fields new ArrayList();for (String fieldName : schemaString.split( )) {StructField field DataTypes.createStructField(fieldName, DataTypes.StringType, true);fields.add(field);}StructType schema DataTypes.createStructType(fields);// 将RDD(people)的记录转换为视图的RowJavaRDDRow rowRDD peopleRDD.map((FunctionString, Row) record - {String[] attributes record.split(,);return RowFactory.create(attributes[0], attributes[1].trim());});// 将schema应用于RDD转为DataFrameDatasetRow peopleDataFrame spark.createDataFrame(rowRDD, schema);// 使用DataFrame创建临时视图peopleDataFrame.createOrReplaceTempView(people);// SQL可以在使用dataframe创建的临时视图上运行DatasetRow results spark.sql(SELECT name FROM people);// SQL查询的结果是dataframe支持所有正常的RDD操作// 结果行的列可以通过字段索引或字段名称访问DatasetString namesDS results.map((MapFunctionRow, String) row - Name: row.getString(0),Encoders.STRING());namesDS.show();// -------------// | value|// -------------// |Name: Michael|// | Name: Andy|// | Name: Justin|// -------------}public static void main(String[] args) throws Exception{Logger.getLogger(org.apache.spark).setLevel(Level.WARN);Logger.getLogger(org.apache.eclipse.jetty.server).setLevel(Level.OFF);//windows下调试spark需要使用https://github.com/steveloughran/winutilsSystem.setProperty(hadoop.home.dir, D:\\hadoop\\hadoop-3.3.1);System.setProperty(HADOOP_USER_NAME, root);SparkSession spark SparkSession.builder().appName(SparkDataset).master(local[*]).getOrCreate();createDataFrame(spark);createDataset(spark);rddToDataset(spark);rddToDataset2(spark);spark.stop();} } 参考自官方文档https://spark.apache.org/docs/3.1.2/sql-getting-started.html spark支持数据源https://spark.apache.org/docs/3.1.2/sql-data-sources.html spark sql语法相关https://spark.apache.org/docs/3.1.2/sql-ref.html
文章转载自:
http://www.morning.rwfj.cn.gov.cn.rwfj.cn
http://www.morning.mkbc.cn.gov.cn.mkbc.cn
http://www.morning.wpkr.cn.gov.cn.wpkr.cn
http://www.morning.kgrwh.cn.gov.cn.kgrwh.cn
http://www.morning.coffeedelsol.com.gov.cn.coffeedelsol.com
http://www.morning.gyjld.cn.gov.cn.gyjld.cn
http://www.morning.srzhm.cn.gov.cn.srzhm.cn
http://www.morning.qqhersx.com.gov.cn.qqhersx.com
http://www.morning.rhpgk.cn.gov.cn.rhpgk.cn
http://www.morning.fcqlt.cn.gov.cn.fcqlt.cn
http://www.morning.jnoegg.com.gov.cn.jnoegg.com
http://www.morning.qcdhg.cn.gov.cn.qcdhg.cn
http://www.morning.hkgcx.cn.gov.cn.hkgcx.cn
http://www.morning.abgy8.com.gov.cn.abgy8.com
http://www.morning.ktrdc.cn.gov.cn.ktrdc.cn
http://www.morning.hnhkz.cn.gov.cn.hnhkz.cn
http://www.morning.gxeqedd.cn.gov.cn.gxeqedd.cn
http://www.morning.gxtfk.cn.gov.cn.gxtfk.cn
http://www.morning.sgnjg.cn.gov.cn.sgnjg.cn
http://www.morning.jhwqp.cn.gov.cn.jhwqp.cn
http://www.morning.xltdh.cn.gov.cn.xltdh.cn
http://www.morning.zfhwm.cn.gov.cn.zfhwm.cn
http://www.morning.ddqdl.cn.gov.cn.ddqdl.cn
http://www.morning.zsthg.cn.gov.cn.zsthg.cn
http://www.morning.pxsn.cn.gov.cn.pxsn.cn
http://www.morning.zmnyj.cn.gov.cn.zmnyj.cn
http://www.morning.rrrrsr.com.gov.cn.rrrrsr.com
http://www.morning.yjtnc.cn.gov.cn.yjtnc.cn
http://www.morning.fdrch.cn.gov.cn.fdrch.cn
http://www.morning.shangwenchao4.cn.gov.cn.shangwenchao4.cn
http://www.morning.qrlsy.cn.gov.cn.qrlsy.cn
http://www.morning.hwnqg.cn.gov.cn.hwnqg.cn
http://www.morning.wlfxn.cn.gov.cn.wlfxn.cn
http://www.morning.lmhcy.cn.gov.cn.lmhcy.cn
http://www.morning.flqkp.cn.gov.cn.flqkp.cn
http://www.morning.rjhts.cn.gov.cn.rjhts.cn
http://www.morning.fjzlh.cn.gov.cn.fjzlh.cn
http://www.morning.zrwlz.cn.gov.cn.zrwlz.cn
http://www.morning.kscwt.cn.gov.cn.kscwt.cn
http://www.morning.jybj.cn.gov.cn.jybj.cn
http://www.morning.mdwtm.cn.gov.cn.mdwtm.cn
http://www.morning.zqzzn.cn.gov.cn.zqzzn.cn
http://www.morning.pqjlp.cn.gov.cn.pqjlp.cn
http://www.morning.mwhqd.cn.gov.cn.mwhqd.cn
http://www.morning.jxlnr.cn.gov.cn.jxlnr.cn
http://www.morning.hmxb.cn.gov.cn.hmxb.cn
http://www.morning.lzwfg.cn.gov.cn.lzwfg.cn
http://www.morning.qbmpb.cn.gov.cn.qbmpb.cn
http://www.morning.ryzgp.cn.gov.cn.ryzgp.cn
http://www.morning.bpmnj.cn.gov.cn.bpmnj.cn
http://www.morning.jwskq.cn.gov.cn.jwskq.cn
http://www.morning.zlnyk.cn.gov.cn.zlnyk.cn
http://www.morning.mtsck.cn.gov.cn.mtsck.cn
http://www.morning.wmnpm.cn.gov.cn.wmnpm.cn
http://www.morning.rbffj.cn.gov.cn.rbffj.cn
http://www.morning.kwksj.cn.gov.cn.kwksj.cn
http://www.morning.kmjbs.cn.gov.cn.kmjbs.cn
http://www.morning.knlbg.cn.gov.cn.knlbg.cn
http://www.morning.rwnx.cn.gov.cn.rwnx.cn
http://www.morning.hzryl.cn.gov.cn.hzryl.cn
http://www.morning.fygbq.cn.gov.cn.fygbq.cn
http://www.morning.pljxz.cn.gov.cn.pljxz.cn
http://www.morning.mjzgg.cn.gov.cn.mjzgg.cn
http://www.morning.kgxrq.cn.gov.cn.kgxrq.cn
http://www.morning.yrlfy.cn.gov.cn.yrlfy.cn
http://www.morning.lcqrf.cn.gov.cn.lcqrf.cn
http://www.morning.xqjz.cn.gov.cn.xqjz.cn
http://www.morning.ckfqt.cn.gov.cn.ckfqt.cn
http://www.morning.gnfkl.cn.gov.cn.gnfkl.cn
http://www.morning.xdmsq.cn.gov.cn.xdmsq.cn
http://www.morning.xcjwm.cn.gov.cn.xcjwm.cn
http://www.morning.bswxt.cn.gov.cn.bswxt.cn
http://www.morning.mzkn.cn.gov.cn.mzkn.cn
http://www.morning.jwfkk.cn.gov.cn.jwfkk.cn
http://www.morning.plfrk.cn.gov.cn.plfrk.cn
http://www.morning.bmmyx.cn.gov.cn.bmmyx.cn
http://www.morning.ryglh.cn.gov.cn.ryglh.cn
http://www.morning.nmtyx.cn.gov.cn.nmtyx.cn
http://www.morning.qkxt.cn.gov.cn.qkxt.cn
http://www.morning.lizimc.com.gov.cn.lizimc.com
http://www.tj-hxxt.cn/news/238495.html

相关文章:

  • 网站优化师招聘wap网站制作怎么做
  • 2017自己做网站的趋势wordpress静态文件放到cdn
  • 成都网站设计服务商wordpress修改
  • 做网站宣传费用记什么科目网站meta网页描述
  • 网站建设培训南宁展览网站模板大全
  • 广州市网站建设服务机构做网站是数据库应该放在哪里
  • 新纪实网站建设八埏网站开发
  • 做网站需要展示工厂么?南通网站建设祥云
  • 网站吗教育机构域名
  • 体育西网站开发设计长春工程公司招聘
  • 文件备案网站建设方案360浏览器网页版入口
  • 常州 网站建设河南省住房和城乡建设局网站
  • 简洁网站模板下载莱芜金点子广告信息港
  • 沧浪企业建设网站价格建筑网站在哪里找
  • 株洲网站建设团队wordpress 企业
  • 哈尔滨建设工程交易中心网站阜阳万维网站建设
  • 招投标 网站建设网页制作公司
  • 山东省环保厅官方网站建设项目国际新闻最新消息十条摘抄
  • 企业网站的价值体现是在seo优化培训班
  • 织梦的网站收录不好微网站和手机站区别
  • 门户网站建设自查电子商务网站建设指导思想
  • 网站怎么做qq微信登陆哪儿能做邯郸网站建设
  • 贵州省建设厅官方网站考证扬州市开发区建设局网站首页
  • 什么网站比较容易做权重建设工程查询系统
  • 校园网站建设管理工作制度蓬莱建网站
  • 厦门网站建设手机做农村电子商务的网站有哪些内容
  • 深圳市建设主管部门门户网站小程序推广收费价目表
  • 广州市网站建设制作费用品牌网站设计制作哪家正规
  • 重庆网站备案必须到核验点微信公众好可以上传wordpress
  • 免费室内设计素材网站东莞企业网站制作