当前位置: 首页 > news >正文

知舟网站建设门户网站开发申请

知舟网站建设,门户网站开发申请,横向网站模板,网站分成几种类型第五章 RDD Checkpoint RDD 数据可以持久化#xff0c;但是持久化/缓存可以把数据放在内存中#xff0c;虽然是快速的#xff0c;但是也是最不可靠的#xff1b;也可以把数据放在磁盘上#xff0c;也不是完全可靠的#xff01;例如磁盘会损坏等。 Checkpoint的产生就是…第五章 RDD Checkpoint RDD 数据可以持久化但是持久化/缓存可以把数据放在内存中虽然是快速的但是也是最不可靠的也可以把数据放在磁盘上也不是完全可靠的例如磁盘会损坏等。 Checkpoint的产生就是为了更加可靠的数据持久化在Checkpoint的时候一般把数据放在HDFS上这就天然的借助了HDFS天生的高容错、高可靠来实现数据最大程度上的安全实现了RDD的容错和高可用。 在Spark Core中对RDD做checkpoint可以切断做checkpoint RDD的依赖关系将RDD数据保存到可靠存储如HDFS以便数据恢复 演示范例代码如下 import org.apache.spark.{SparkConf, SparkContext} /** * RDD数据Checkpoint设置案例演示 */ object SparkCkptTest { def main(args: Array[String]): Unit { // 创建应用程序入口SparkContext实例对象 val sc: SparkContext { // 1.a 创建SparkConf对象设置应用的配置信息 val sparkConf: SparkConf new SparkConf() .setAppName(this.getClass.getSimpleName.stripSuffix($)) .setMaster(local[2]) // 1.b 传递SparkConf对象构建Context实例 new SparkContext(sparkConf) } sc.setLogLevel(WARN) // TODO: 设置检查点目录将RDD数据保存到那个目录 sc.setCheckpointDir(datas/spark/ckpt/) // 读取文件数据 val datasRDD sc.textFile(datas/wordcount/wordcount.data) // TODO: 调用checkpoint函数将RDD进行备份需要RDD中Action函数触发 datasRDD.checkpoint() datasRDD.count() // TODO: 再次执行count函数, 此时从checkpoint读取数据 datasRDD.count() // 应用程序运行结束关闭资源 Thread.sleep(100000) sc.stop() } }持久化和Checkpoint的区别 1、存储位置 Persist 和 Cache 只能保存在本地的磁盘和内存中(或者堆外内存)Checkpoint 可以保存数据到 HDFS 这类可靠的存储上 2、生命周期Cache和Persist的RDD会在程序结束后会被清除或者手动调用unpersist方法Checkpoint的RDD在程序结束后依然存在不会被删除 3、Lineage(血统、依赖链、依赖关系)Persist和Cache不会丢掉RDD间的依赖链/依赖关系因为这种缓存是不可靠的如果出现了一些错误(例如 Executor 宕机)需要通过回溯依赖链重新计算出来Checkpoint会斩断依赖链因为Checkpoint会把结果保存在HDFS这类存储中更加的安全可靠一般不需要回溯依赖链 第六章 外部数据源 Spark可以从外部存储系统读取数据比如RDBMs表中或者HBase表中读写数据这也是企业中常常使用如下两个场景 1、要分析的数据存储在HBase表中需要从其中读取数据数据分析 日志数据电商网站的商家操作日志订单数据保险行业订单数据 2、使用Spark进行离线分析以后往往将报表结果保存到MySQL表中网站基本分析pv、uv。。。。。 6.1 HBase 数据源 Spark可以从HBase表中读写Read/Write数据底层采用TableInputFormat和TableOutputFormat方式与MapReduce与HBase集成完全一样使用输入格式InputFormat和输出格式OutputFoamt。 HBase Sink 回 顾 MapReduce 向 HBase 表 中 写 入 数 据 使 用 TableReducer 其 中 OutputFormat 为TableOutputFormat读取数据KeyImmutableBytesWritableValuePut。 写 入 数 据 时 需 要 将 RDD 转换为 RDD[(ImmutableBytesWritable, Put)] 类 型 调 用saveAsNewAPIHadoopFile方法数据保存至HBase表中。 HBase Client连接时需要设置依赖Zookeeper地址相关信息及表的名称通过Configuration设置属性值进行传递。 范例演示将词频统计结果保存HBase表表的设计 代码如下 import org.apache.hadoop.conf.Configuration import org.apache.hadoop.hbase.HBaseConfiguration import org.apache.hadoop.hbase.client.Put import org.apache.hadoop.hbase.io.ImmutableBytesWritable import org.apache.hadoop.hbase.mapreduce.TableOutputFormat import org.apache.hadoop.hbase.util.Bytes import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} /** * 将RDD数据保存至HBase表中 */ object SparkWriteHBase { def main(args: Array[String]): Unit { // 创建应用程序入口SparkContext实例对象 val sc: SparkContext { // 1.a 创建SparkConf对象设置应用的配置信息 val sparkConf: SparkConf new SparkConf() .setAppName(this.getClass.getSimpleName.stripSuffix($)) .setMaster(local[2]) // 1.b 传递SparkConf对象构建Context实例 new SparkContext(sparkConf) } sc.setLogLevel(WARN) // TODO: 1、构建RDD val list List((hadoop, 234), (spark, 3454), (hive, 343434), (ml, 8765)) val outputRDD: RDD[(String, Int)] sc.parallelize(list, numSlices 2) // TODO: 2、将数据写入到HBase表中, 使用saveAsNewAPIHadoopFile函数要求RDD是(key, Value) // TODO: 组装RDD[(ImmutableBytesWritable, Put)] /** * HBase表的设计 * 表的名称htb_wordcount * Rowkey: word * 列簇: info * 字段名称 count */ val putsRDD: RDD[(ImmutableBytesWritable, Put)] outputRDD.mapPartitions{ iter iter.map { case (word, count) // 创建Put实例对象 val put new Put(Bytes.toBytes(word)) // 添加列 put.addColumn( // 实际项目中使用HBase时插入数据先将所有字段的值转为String再使用Bytes转换为字节数组 Bytes.toBytes(info), Bytes.toBytes(cout), Bytes.toBytes(count.toString) ) // 返回二元组 (new ImmutableBytesWritable(put.getRow), put) } } // 构建HBase Client配置信息 val conf: Configuration HBaseConfiguration.create() // 设置连接Zookeeper属性 conf.set(hbase.zookeeper.quorum, node1.itcast.cn) conf.set(hbase.zookeeper.property.clientPort, 2181) conf.set(zookeeper.znode.parent, /hbase) // 设置将数据保存的HBase表的名称 conf.set(TableOutputFormat.OUTPUT_TABLE, htb_wordcount) /* def saveAsNewAPIHadoopFile( path: String,// 保存的路径 keyClass: Class[_], // Key类型 valueClass: Class[_], // Value类型 outputFormatClass: Class[_ : NewOutputFormat[_, _]], // 输出格式OutputFormat实现 conf: Configuration self.context.hadoopConfiguration // 配置信息 ): Unit */ putsRDD.saveAsNewAPIHadoopFile( datas/spark/htb-output- System.nanoTime(), // classOf[ImmutableBytesWritable], // classOf[Put], // classOf[TableOutputFormat[ImmutableBytesWritable]], // conf ) // 应用程序运行结束关闭资源 sc.stop() } }运行完成以后使用hbase shell查看数据 HBase Source 回 顾 MapReduce 从 读 HBase 表 中 的 数 据 使 用 TableMapper 其 中 InputFormat 为TableInputFormat读取数据KeyImmutableBytesWritableValueResult。 从HBase表读取数据时同样需要设置依赖Zookeeper地址信息和表的名称使用Configuration设置属性形式如下 此外读取的数据封装到RDD中Key和Value类型分别为ImmutableBytesWritable和Result,不支持Java Serializable导致处理数据时报序列化异常。设置Spark Application使用Kryo序列化性能要比Java 序列化要好创建SparkConf对象设置相关属性如下所示 范例演示从HBase表读取词频统计结果代码如下 import org.apache.hadoop.conf.Configuration import org.apache.hadoop.hbase.{CellUtil, HBaseConfiguration} import org.apache.hadoop.hbase.client.Result import org.apache.hadoop.hbase.io.ImmutableBytesWritable import org.apache.hadoop.hbase.mapreduce.TableInputFormat import org.apache.hadoop.hbase.util.Bytes import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} /** * 从HBase 表中读取数据封装到RDD数据集 */ object SparkReadHBase { def main(args: Array[String]): Unit { // 创建应用程序入口SparkContext实例对象 val sc: SparkContext { // 1.a 创建SparkConf对象设置应用的配置信息 val sparkConf: SparkConf new SparkConf() .setAppName(this.getClass.getSimpleName.stripSuffix($)) .setMaster(local[2]) // TODO: 设置使用Kryo 序列化方式 .set(spark.serializer, org.apache.spark.serializer.KryoSerializer) // TODO: 注册序列化的数据类型 .registerKryoClasses(Array(classOf[ImmutableBytesWritable], classOf[Result])) // 1.b 传递SparkConf对象构建Context实例 new SparkContext(sparkConf) } sc.setLogLevel(WARN) // TODO: a. 读取HBase Client 配置信息 val conf: Configuration HBaseConfiguration.create() conf.set(hbase.zookeeper.quorum, node1.itcast.cn) conf.set(hbase.zookeeper.property.clientPort, 2181) conf.set(zookeeper.znode.parent, /hbase) // TODO: b. 设置读取的表的名称 conf.set(TableInputFormat.INPUT_TABLE, htb_wordcount) /* def newAPIHadoopRDD[K, V, F : NewInputFormat[K, V]]( conf: Configuration hadoopConfiguration, fClass: Class[F], kClass: Class[K], vClass: Class[V] ): RDD[(K, V)] */ val resultRDD: RDD[(ImmutableBytesWritable, Result)] sc.newAPIHadoopRDD( conf, // classOf[TableInputFormat], // classOf[ImmutableBytesWritable], // classOf[Result] // ) println(sCount ${resultRDD.count()}) resultRDD .take(5) .foreach { case (rowKey, result) println(sRowKey ${Bytes.toString(rowKey.get())}) // HBase表中的每条数据封装在result对象中解析获取每列的值 result.rawCells().foreach { cell val cf Bytes.toString(CellUtil.cloneFamily(cell)) val column Bytes.toString(CellUtil.cloneQualifier(cell)) val value Bytes.toString(CellUtil.cloneValue(cell)) val version cell.getTimestamp println(s\t $cf:$column $value, version $version) } } // 应用程序运行结束关闭资源 sc.stop() } }运行结果 6.2 MySQL 数据源 实际开发中常常将分析结果RDD保存至MySQL表中使用foreachPartition函数此外Spark中提供JdbcRDD用于从MySQL表中读取数据。 调用RDD#foreachPartition函数将每个分区数据保存至MySQL表中保存时考虑降低RDD分区数目和批量插入提升程序性能。 范例演示将词频统计WordCount结果保存MySQL表tb_wordcount。 建表语句 USE db_test ; CREATE TABLE tb_wordcount ( count varchar(100) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL, word varchar(100) NOT NULL, PRIMARY KEY (word) ) ENGINEInnoDB DEFAULT CHARSETutf8mb4 COLLATEutf8mb4_0900_ai_ci ;演示代码 import java.sql.{Connection, DriverManager, PreparedStatement} import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} /** * 将词频统计结果保存到MySQL表中 */ object SparkWriteMySQL { def main(args: Array[String]): Unit { // 创建应用程序入口SparkContext实例对象 val sc: SparkContext { // 1.a 创建SparkConf对象设置应用的配置信息 val sparkConf: SparkConf new SparkConf() .setAppName(this.getClass.getSimpleName.stripSuffix($)) .setMaster(local[2]) // 1.b 传递SparkConf对象构建Context实例 new SparkContext(sparkConf) } sc.setLogLevel(WARN) // 1. 从HDFS读取文本数据封装集合RDD val inputRDD: RDD[String] sc.textFile(datas/wordcount/wordcount.data) // 2. 处理数据调用RDD中函数 val resultRDD: RDD[(String, Int)] inputRDD // 3.a 每行数据分割为单词 .flatMap(line line.split(\\s)) // 3.b 转换为二元组表示每个单词出现一次 .map(word (word, 1)) // 3.c 按照Key分组聚合 .reduceByKey((tmp, item) tmp item) // 3. 输出结果RDD保存到MySQL数据库 resultRDD // 对结果RDD保存到外部存储系统时考虑降低RDD分区数目 .coalesce(1) // 对分区数据操作 .foreachPartition{iter saveToMySQL(iter)} // 应用程序运行结束关闭资源 sc.stop() } /** * 将每个分区中的数据保存到MySQL表中 * param datas 迭代器封装RDD中每个分区的数据 */ def saveToMySQL(datas: Iterator[(String, Int)]): Unit { // a. 加载驱动类 Class.forName(com.mysql.cj.jdbc.Driver) // 声明变量 var conn: Connection null var pstmt: PreparedStatement null try{ // b. 获取连接 conn DriverManager.getConnection( jdbc:mysql://node1.itcast.cn:3306/?serverTimezoneUTCcharacterEncodingutf8useUnic odetrue, root, 123456 ) // c. 获取PreparedStatement对象 val insertSql INSERT INTO db_test.tb_wordcount (word, count) VALUES(?, ?) pstmt conn.prepareStatement(insertSql) conn.setAutoCommit(false) // d. 将分区中数据插入到表中批量插入 datas.foreach{case (word, count) pstmt.setString(1, word) pstmt.setLong(2, count.toLong) // 加入批次 pstmt.addBatch() } // TODO: 批量插入 pstmt.executeBatch() conn.commit() }catch { case e: Exception e.printStackTrace() }finally { if(null ! pstmt) pstmt.close() if(null ! conn) conn.close() } } }运行程序查看数据库表的数据
文章转载自:
http://www.morning.rjhts.cn.gov.cn.rjhts.cn
http://www.morning.sacxbs.cn.gov.cn.sacxbs.cn
http://www.morning.qydgk.cn.gov.cn.qydgk.cn
http://www.morning.ftldl.cn.gov.cn.ftldl.cn
http://www.morning.rjnx.cn.gov.cn.rjnx.cn
http://www.morning.bplqh.cn.gov.cn.bplqh.cn
http://www.morning.wdrxh.cn.gov.cn.wdrxh.cn
http://www.morning.qsy39.cn.gov.cn.qsy39.cn
http://www.morning.sqgsx.cn.gov.cn.sqgsx.cn
http://www.morning.xkwyk.cn.gov.cn.xkwyk.cn
http://www.morning.jklns.cn.gov.cn.jklns.cn
http://www.morning.pangucheng.cn.gov.cn.pangucheng.cn
http://www.morning.yrsg.cn.gov.cn.yrsg.cn
http://www.morning.frmmp.cn.gov.cn.frmmp.cn
http://www.morning.ryyjw.cn.gov.cn.ryyjw.cn
http://www.morning.hsgxj.cn.gov.cn.hsgxj.cn
http://www.morning.fgsqz.cn.gov.cn.fgsqz.cn
http://www.morning.qgbfx.cn.gov.cn.qgbfx.cn
http://www.morning.ltywr.cn.gov.cn.ltywr.cn
http://www.morning.prfrb.cn.gov.cn.prfrb.cn
http://www.morning.bfcxf.cn.gov.cn.bfcxf.cn
http://www.morning.smxyw.cn.gov.cn.smxyw.cn
http://www.morning.czrcf.cn.gov.cn.czrcf.cn
http://www.morning.xsrnr.cn.gov.cn.xsrnr.cn
http://www.morning.lpskm.cn.gov.cn.lpskm.cn
http://www.morning.cfcpb.cn.gov.cn.cfcpb.cn
http://www.morning.fqzz3.cn.gov.cn.fqzz3.cn
http://www.morning.fswml.cn.gov.cn.fswml.cn
http://www.morning.ryglh.cn.gov.cn.ryglh.cn
http://www.morning.ykbgs.cn.gov.cn.ykbgs.cn
http://www.morning.zlgbx.cn.gov.cn.zlgbx.cn
http://www.morning.jbblf.cn.gov.cn.jbblf.cn
http://www.morning.fbhmn.cn.gov.cn.fbhmn.cn
http://www.morning.tbwsl.cn.gov.cn.tbwsl.cn
http://www.morning.kgnnc.cn.gov.cn.kgnnc.cn
http://www.morning.slwqt.cn.gov.cn.slwqt.cn
http://www.morning.ymjgx.cn.gov.cn.ymjgx.cn
http://www.morning.gnhsg.cn.gov.cn.gnhsg.cn
http://www.morning.pgmbl.cn.gov.cn.pgmbl.cn
http://www.morning.mjpgl.cn.gov.cn.mjpgl.cn
http://www.morning.21r000.cn.gov.cn.21r000.cn
http://www.morning.ftmp.cn.gov.cn.ftmp.cn
http://www.morning.lhzqn.cn.gov.cn.lhzqn.cn
http://www.morning.rrms.cn.gov.cn.rrms.cn
http://www.morning.rgdcf.cn.gov.cn.rgdcf.cn
http://www.morning.mrxgm.cn.gov.cn.mrxgm.cn
http://www.morning.zbhfs.cn.gov.cn.zbhfs.cn
http://www.morning.gslz.com.cn.gov.cn.gslz.com.cn
http://www.morning.lrylj.cn.gov.cn.lrylj.cn
http://www.morning.qhvah.cn.gov.cn.qhvah.cn
http://www.morning.nzhzt.cn.gov.cn.nzhzt.cn
http://www.morning.ghphp.cn.gov.cn.ghphp.cn
http://www.morning.lmcrc.cn.gov.cn.lmcrc.cn
http://www.morning.yrpg.cn.gov.cn.yrpg.cn
http://www.morning.bmmhs.cn.gov.cn.bmmhs.cn
http://www.morning.yhywr.cn.gov.cn.yhywr.cn
http://www.morning.wfjyn.cn.gov.cn.wfjyn.cn
http://www.morning.lywcd.cn.gov.cn.lywcd.cn
http://www.morning.frpb.cn.gov.cn.frpb.cn
http://www.morning.yjmlg.cn.gov.cn.yjmlg.cn
http://www.morning.ndcjq.cn.gov.cn.ndcjq.cn
http://www.morning.qqnjr.cn.gov.cn.qqnjr.cn
http://www.morning.rkfh.cn.gov.cn.rkfh.cn
http://www.morning.pltbd.cn.gov.cn.pltbd.cn
http://www.morning.kwqcy.cn.gov.cn.kwqcy.cn
http://www.morning.wmhlz.cn.gov.cn.wmhlz.cn
http://www.morning.qjfkz.cn.gov.cn.qjfkz.cn
http://www.morning.qtzk.cn.gov.cn.qtzk.cn
http://www.morning.wgzzj.cn.gov.cn.wgzzj.cn
http://www.morning.fmkjx.cn.gov.cn.fmkjx.cn
http://www.morning.tbqxh.cn.gov.cn.tbqxh.cn
http://www.morning.yknsr.cn.gov.cn.yknsr.cn
http://www.morning.bsplf.cn.gov.cn.bsplf.cn
http://www.morning.tzkrh.cn.gov.cn.tzkrh.cn
http://www.morning.mnwmj.cn.gov.cn.mnwmj.cn
http://www.morning.ltffk.cn.gov.cn.ltffk.cn
http://www.morning.rrgm.cn.gov.cn.rrgm.cn
http://www.morning.klltg.cn.gov.cn.klltg.cn
http://www.morning.hctgn.cn.gov.cn.hctgn.cn
http://www.morning.fdlyh.cn.gov.cn.fdlyh.cn
http://www.tj-hxxt.cn/news/281540.html

相关文章:

  • 重庆网站优化排名推广seo怎么做
  • 专业做logo的网站要制作一个自己的网站
  • 免费的网站推广在线推广ui页面设计公司
  • 备案网站可以做接码平台么海尔建设此网站的目的
  • 重庆网站建设seo公司哪家好wordpress自定义登录
  • 网站开发获取用户微信号登录个人网页内容
  • 龙山县建设局网站做电影网站最牛的站长是谁
  • 苏州建设网站找网络公司关于做网站的文献综述
  • 艺术毕业设计作品网站手机网站建设免费空间
  • 南京网站建设公司门网站建设
  • 网站的前台后台网站怎么做app
  • 网站制作器手机版下载北京建设制作网站
  • 佳木斯万达建设网站包头做网站哪家好
  • 怎么做自己的网站wordpress 栏目页
  • 应用网站建设上海网站建设q.479185700棒
  • 用html做家谱网站代码网网站开发和设计
  • 成都必去景点排名上海seo招聘
  • 外贸做网站用什么赣州市人才网
  • 免费域名申请 tk关键词优化价格表
  • 网络营销网站分析烟台做网站公司
  • 开原网站开发无锡商之道网络科技有限公司
  • 鞍山市建设工程安全生产监督管理站网站河北网站建站制作
  • 泸州高端网站建设公司中华室内设计师
  • angular网站模板下载优惠做网站
  • 手机咋做网站wordpress文章标题源码插件
  • 花钱做网站注意些什么app定制开发网络公司
  • 深圳专业做网站建网站价格wordpress 手机悬浮
  • 石家庄建设一个网站多少钱wordpress网站搜不到
  • ppt模板免费网站在线制作宁陵做网站的公司
  • 常州做网站找哪家好hcms wordpress