当前位置: 首页 > news >正文

做建材营销型网站安卓小项目源码免费网站

做建材营销型网站,安卓小项目源码免费网站,余姚网站推广公司,做头像网站静态文章目录 Output Sinks案例演示 一、​​​​​​​File sink 二、​​​​​​​​​​​​​​Memory Sink 三、​​​​​​​​​​​​​​Foreach Sink 1、​​​​​​​foreachBatch 2、​​​​​​​​​​​​​​foreach Output Sinks案例演示 当我们对流式… 文章目录 Output Sinks案例演示 一、​​​​​​​File sink 二、​​​​​​​​​​​​​​Memory Sink 三、​​​​​​​​​​​​​​Foreach Sink 1、​​​​​​​foreachBatch 2、​​​​​​​​​​​​​​foreach Output Sinks案例演示 当我们对流式数据处理完成之后可以将数据写出到Flie、Kafka、console控制台、memory内存或者直接使用foreach做个性化处理。关于将数据结果写出到Kafka在StructuredStreaming与Kafka整合部分再详细描述。 对于一些可以保证端到端容错的sink输出需要指定checkpoint目录来写入数据信息指定的checkpoint目录可以是HDFS中的某个路径设置checkpoint可以通过SparkSession设置也可以通过DataStreamWriter设置设置方式如下 //通过SparkSession设置checkpoint spark.conf.set(spark.sql.streaming.checkpointLocation,hdfs://mycluster/checkpintdir)或者//通过DataStreamWriter设置checkpoint df.writeStream.format(xxx).option(checkpointLocation,./checkpointdir).start() checkpoint目录中会有以下目录及数据 offsets记录偏移量目录记录了每个批次的偏移量。commits记录已经完成的批次方便重启任务检查完成的批次与offset批次做对比继续offset消费数据运行批次。metadatametadata元数据保存jobid信息。sources数据源各个批次读取详情。sinks数据sink写出批次情况。state记录状态值例如聚合、去重等场景会记录相应状态会周期性的生成snapshot文件记录状态。 下面对File、memoery、foreach output Sink进行演示。 一、​​​​​​​​​​​​​​File sink Flie Sink就是数据结果实时写入到执行目录下的文件中每次写出都会形成一个新的文件文件格式可以是parquet、orc、json、csv格式。 Scala代码如下 package com.lanson.structuredStreaming.sinkimport org.apache.spark.sql.streaming.StreamingQuery import org.apache.spark.sql.{DataFrame, SparkSession}/*** 读取Socket数据将数据写入到csv文件*/ object FileSink {def main(args: Array[String]): Unit {val spark: SparkSession SparkSession.builder().master(local).appName(File Sink).config(spark.sql.shuffle.partitions, 1).getOrCreate()val result: DataFrame spark.readStream.format(socket).option(host, node3).option(port, 9999).load()val query: StreamingQuery result.writeStream.format(csv).option(path, ./dataresult/csvdir).option(checkpointLocation,./checkpint/dir3).start()query.awaitTermination()} }​​​​​​​ 在socket中输入数据之后每批次数据写入到一个csv文件中。  二、​​​​​​​​​​​​​​Memory Sink memory Sink是将结果作为内存表存储在内存中支持Append和Complete输出模式这种结果写出到内存表方式多用于测试如果数据量大要慎用。另外查询结果表中数据时需要写一个循环每隔一段时间读取内存中的数据。 Scala代码如下 package com.lanson.structuredStreaming.sinkimport org.apache.spark.sql.{DataFrame, SparkSession} import org.apache.spark.sql.streaming.StreamingQuery/*** 读取scoket 数据写入memory 内存再读取*/ object MemorySink {def main(args: Array[String]): Unit {val spark: SparkSession SparkSession.builder().master(local).appName(Memory Sink).config(spark.sql.shuffle.partitions, 1).getOrCreate()spark.sparkContext.setLogLevel(Error)val result: DataFrame spark.readStream.format(socket).option(host, node3).option(port, 9999).load()val query: StreamingQuery result.writeStream.format(memory).queryName(mytable).start()//查询内存中表数据while(true){Thread.sleep(2000)spark.sql(|select * from mytable.stripMargin).show()}query.awaitTermination()}}三、​​​​​​​​​​​​​​Foreach Sink foreach 可以对输出的结果数据进行自定义处理逻辑针对结果数据自定义处理逻辑数据除了有foreach之外还有foreachbatch两者区别是foreach是针对一条条的数据进行自定义处理foreachbatch是针对当前小批次数据进行自定义处理。 1、​​​​​​​foreachBatch foreachBatch可以针对每个批次数据进行自定义处理该方法需要传入一个函数函数有2个参数分别为当前批次数据对应的DataFrame和当前batchId。 案例实时读取socket数据将结果批量写入到mysql中。 Scala代码如下 package com.lanson.structuredStreaming.sinkimport org.apache.spark.sql.streaming.StreamingQuery import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}/*** 读取Socket 数据将数据写出到mysql中*/ object ForeachBatchTest {def main(args: Array[String]): Unit {val spark: SparkSession SparkSession.builder().appName(ForeachBatch Sink).master(local).config(spark.sql.shuffle.partitions, 1).getOrCreate()import spark.implicits._val df: DataFrame spark.readStream.format(socket).option(host, node2).option(port, 9999).load()val personDF: DataFrame df.as[String].map(line {val arr: Array[String] line.split(,)(arr(0).toInt, arr(1), arr(2).toInt)}).toDF(id, name, age)val query: StreamingQuery personDF.writeStream.foreachBatch((batchDF: DataFrame, batchId: Long) {println(batchID : batchId)batchDF.write.mode(SaveMode.Append).format(jdbc).option(url,jdbc:mysql://node3:3306/testdb?useSSLfalse).option(user,root).option(password,123456).option(dbtable,person).save()}).start()query.awaitTermination();}}运行结果  Java代码如下 package com.lanson.structuredStreaming.sink;import java.util.concurrent.TimeoutException; import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.api.java.function.VoidFunction2; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; import org.apache.spark.sql.Row; import org.apache.spark.sql.SaveMode; import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.streaming.StreamingQueryException; import scala.Tuple3;public class ForeachBatchTest01 {public static void main(String[] args) throws TimeoutException, StreamingQueryException {SparkSession spark SparkSession.builder().master(local).appName(ForeachBatchTest01).config(spark.sql.shuffle.partitions, 1).getOrCreate();spark.sparkContext().setLogLevel(Error);DatasetRow result spark.readStream().format(socket).option(host, node2).option(port, 9999).load().as(Encoders.STRING()).map(new MapFunctionString, Tuple3Integer, String, Integer() {Overridepublic Tuple3Integer, String, Integer call(String line) throws Exception {String[] arr line.split(,);return new Tuple3(Integer.valueOf(arr[0]), arr[1], Integer.valueOf(arr[2]));}}, Encoders.tuple(Encoders.INT(), Encoders.STRING(), Encoders.INT())).toDF(id, name, age);result.writeStream().foreachBatch(new VoidFunction2DatasetRow, Long() {Overridepublic void call(DatasetRow df, Long batchId) throws Exception {System.out.println(batchID : batchId);//将df 保存到mysqldf.write().format(jdbc).mode(SaveMode.Append).option(url,jdbc:mysql://node3:3306/testdb?useSSLfalse ).option(user,root ).option(password,123456 ).option(dbtable,person ).save();}}).start().awaitTermination();} }运行结果 在mysql中创建testdb库并创建person表这里也可以不创建表 create database testdb; create table person(id int(10),name varchar(255),age int(2)); 1,zs,18 2,ls,19 3,ww,20 4,ml,21 5,tq,22 6,ll,29 mysql结果如下 2、​​​​​​​​​​​​​​foreach foreach可以针对数据结果每条数据进行处理。 案例实时读取socket数据将结果一条条写入到mysql中。 Scala代码如下 package com.lanson.structuredStreaming.sinkimport java.sql.{Connection, DriverManager, PreparedStatement}import org.apache.spark.sql.execution.streaming.sources.ForeachWrite import org.apache.spark.sql.{DataFrame, ForeachWriter, Row, SparkSession}object ForeachSinkTest {def main(args: Array[String]): Unit {val spark: SparkSession SparkSession.builder().appName(ForeachBatch Sink).master(local).config(spark.sql.shuffle.partitions, 1).getOrCreate()spark.sparkContext.setLogLevel(Error)import spark.implicits._val df: DataFrame spark.readStream.format(socket).option(host, node2).option(port, 9999).load()val personDF: DataFrame df.as[String].map(line {val arr: Array[String] line.split(,)(arr(0).toInt, arr(1), arr(2).toInt)}).toDF(id, name, age)personDF.writeStream.foreach(new ForeachWriter[Row]() {var conn: Connection _var pst: PreparedStatement _//打开资源override def open(partitionId: Long, epochId: Long): Boolean {conn DriverManager.getConnection(jdbc:mysql://node3:3306/testdb?useSSLfalse,root,123456)pst conn.prepareStatement(insert into person values (?,?,?))true}//一条条处理数据override def process(row: Row): Unit {val id: Int row.getInt(0)val name: String row.getString(1)val age: Int row.getInt(2)pst.setInt(1,id)pst.setString(2,name)pst.setInt(3,age)pst.executeUpdate()}//关闭释放资源override def close(errorOrNull: Throwable): Unit {pst.close()conn.close()}}).start().awaitTermination()}}运行结果 Java代码如下 package com.lanson.structuredStreaming.sink;import java.sql.Connection; import java.sql.DriverManager; import java.sql.PreparedStatement; import java.sql.SQLException; import java.util.concurrent.TimeoutException; import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; import org.apache.spark.sql.ForeachWriter; import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.streaming.StreamingQueryException; import scala.Tuple3;public class ForeachSinkTest01 {public static void main(String[] args) throws TimeoutException, StreamingQueryException {SparkSession spark SparkSession.builder().master(local).appName(SSReadSocketData).config(spark.sql.shuffle.partitions, 1).getOrCreate();spark.sparkContext().setLogLevel(Error);DatasetRow result spark.readStream().format(socket).option(host, node2).option(port, 9999).load().as(Encoders.STRING()).map(new MapFunctionString, Tuple3Integer, String, Integer() {Overridepublic Tuple3Integer, String, Integer call(String line) throws Exception {String[] arr line.split(,);return new Tuple3(Integer.valueOf(arr[0]), arr[1], Integer.valueOf(arr[2]));}}, Encoders.tuple(Encoders.INT(), Encoders.STRING(), Encoders.INT())).toDF(id, name, age);result.writeStream().foreach(new ForeachWriterRow() {Connection conn;PreparedStatement pst ;Overridepublic boolean open(long partitionId, long epochId) {try {conn DriverManager.getConnection(jdbc:mysql://node3:3306/testdb?useSSLfalse, root, 123456);pst conn.prepareStatement(insert into person values (?,?,?));} catch (SQLException e) {e.printStackTrace();}return true;}Overridepublic void process(Row row) {int id row.getInt(0);String name row.getString(1);int age row.getInt(2);try {pst.setInt(1,id );pst.setString(2,name );pst.setInt(3,age );pst.executeUpdate();} catch (SQLException e) {e.printStackTrace();}}Overridepublic void close(Throwable errorOrNull) {try {pst.close();conn.close();} catch (SQLException e) {e.printStackTrace();}}}).start().awaitTermination();} }运行 以上代码编写完成后清空mysql person表数据然后输入以下数据 1,zs,18 2,ls,19 3,ww,20 4,ml,21 5,tq,22 6,ll,29 1,zs,18 2,ls,19 3,ww,20 4,ml,21 5,tq,22 6,ll,29 mysql结果如下 博客主页https://lansonli.blog.csdn.net欢迎点赞 收藏 ⭐留言 如有错误敬请指正本文由 Lansonli 原创首发于 CSDN博客停下休息的时候不要忘了别人还在奔跑希望大家抓紧时间学习全力奔赴更美好的生活✨
文章转载自:
http://www.morning.mzydm.cn.gov.cn.mzydm.cn
http://www.morning.tsxg.cn.gov.cn.tsxg.cn
http://www.morning.cwznh.cn.gov.cn.cwznh.cn
http://www.morning.qklff.cn.gov.cn.qklff.cn
http://www.morning.psdsk.cn.gov.cn.psdsk.cn
http://www.morning.nfpct.cn.gov.cn.nfpct.cn
http://www.morning.cgdyx.cn.gov.cn.cgdyx.cn
http://www.morning.xwqxz.cn.gov.cn.xwqxz.cn
http://www.morning.pkwwq.cn.gov.cn.pkwwq.cn
http://www.morning.wmmjw.cn.gov.cn.wmmjw.cn
http://www.morning.lhgkr.cn.gov.cn.lhgkr.cn
http://www.morning.zqxhn.cn.gov.cn.zqxhn.cn
http://www.morning.lpmlx.cn.gov.cn.lpmlx.cn
http://www.morning.rnjgh.cn.gov.cn.rnjgh.cn
http://www.morning.jzykq.cn.gov.cn.jzykq.cn
http://www.morning.jmllh.cn.gov.cn.jmllh.cn
http://www.morning.slkqd.cn.gov.cn.slkqd.cn
http://www.morning.zxdhp.cn.gov.cn.zxdhp.cn
http://www.morning.zxgzp.cn.gov.cn.zxgzp.cn
http://www.morning.ldpjm.cn.gov.cn.ldpjm.cn
http://www.morning.phlrp.cn.gov.cn.phlrp.cn
http://www.morning.fnywn.cn.gov.cn.fnywn.cn
http://www.morning.zqkms.cn.gov.cn.zqkms.cn
http://www.morning.jqjnl.cn.gov.cn.jqjnl.cn
http://www.morning.tlnbg.cn.gov.cn.tlnbg.cn
http://www.morning.gghhmi.cn.gov.cn.gghhmi.cn
http://www.morning.wjyyg.cn.gov.cn.wjyyg.cn
http://www.morning.jgnjl.cn.gov.cn.jgnjl.cn
http://www.morning.ljbpk.cn.gov.cn.ljbpk.cn
http://www.morning.bgygx.cn.gov.cn.bgygx.cn
http://www.morning.clpkp.cn.gov.cn.clpkp.cn
http://www.morning.jbctp.cn.gov.cn.jbctp.cn
http://www.morning.sjjtz.cn.gov.cn.sjjtz.cn
http://www.morning.rcjqgy.com.gov.cn.rcjqgy.com
http://www.morning.dwgcx.cn.gov.cn.dwgcx.cn
http://www.morning.xywfz.cn.gov.cn.xywfz.cn
http://www.morning.pszw.cn.gov.cn.pszw.cn
http://www.morning.wqgr.cn.gov.cn.wqgr.cn
http://www.morning.grpfj.cn.gov.cn.grpfj.cn
http://www.morning.rlsd.cn.gov.cn.rlsd.cn
http://www.morning.xmjzn.cn.gov.cn.xmjzn.cn
http://www.morning.xqffq.cn.gov.cn.xqffq.cn
http://www.morning.amlutsp.cn.gov.cn.amlutsp.cn
http://www.morning.qichetc.com.gov.cn.qichetc.com
http://www.morning.zxrtt.cn.gov.cn.zxrtt.cn
http://www.morning.hwxxh.cn.gov.cn.hwxxh.cn
http://www.morning.phwmj.cn.gov.cn.phwmj.cn
http://www.morning.ybnzn.cn.gov.cn.ybnzn.cn
http://www.morning.lqlhw.cn.gov.cn.lqlhw.cn
http://www.morning.dmwck.cn.gov.cn.dmwck.cn
http://www.morning.wbllx.cn.gov.cn.wbllx.cn
http://www.morning.tslxr.cn.gov.cn.tslxr.cn
http://www.morning.sflnx.cn.gov.cn.sflnx.cn
http://www.morning.bryyb.cn.gov.cn.bryyb.cn
http://www.morning.rkxdp.cn.gov.cn.rkxdp.cn
http://www.morning.qprtm.cn.gov.cn.qprtm.cn
http://www.morning.nhpmn.cn.gov.cn.nhpmn.cn
http://www.morning.ftlgy.cn.gov.cn.ftlgy.cn
http://www.morning.qfkdt.cn.gov.cn.qfkdt.cn
http://www.morning.jnoegg.com.gov.cn.jnoegg.com
http://www.morning.yslfn.cn.gov.cn.yslfn.cn
http://www.morning.rdwm.cn.gov.cn.rdwm.cn
http://www.morning.lqlhw.cn.gov.cn.lqlhw.cn
http://www.morning.tndxg.cn.gov.cn.tndxg.cn
http://www.morning.rjjys.cn.gov.cn.rjjys.cn
http://www.morning.rhsg.cn.gov.cn.rhsg.cn
http://www.morning.qszyd.cn.gov.cn.qszyd.cn
http://www.morning.jsdntd.com.gov.cn.jsdntd.com
http://www.morning.hlshn.cn.gov.cn.hlshn.cn
http://www.morning.plqqp.cn.gov.cn.plqqp.cn
http://www.morning.wkmpx.cn.gov.cn.wkmpx.cn
http://www.morning.tgtrk.cn.gov.cn.tgtrk.cn
http://www.morning.mkccd.cn.gov.cn.mkccd.cn
http://www.morning.bpmdn.cn.gov.cn.bpmdn.cn
http://www.morning.qnzld.cn.gov.cn.qnzld.cn
http://www.morning.tznlz.cn.gov.cn.tznlz.cn
http://www.morning.lzqxb.cn.gov.cn.lzqxb.cn
http://www.morning.lcxzg.cn.gov.cn.lcxzg.cn
http://www.morning.tmcmj.cn.gov.cn.tmcmj.cn
http://www.morning.srkqs.cn.gov.cn.srkqs.cn
http://www.tj-hxxt.cn/news/269807.html

相关文章:

  • 网站未备案或已封禁北京形势紧张
  • 网站建设在哪里推广民权平台网站建设
  • 怎么样做网站赚钱吗淄博做网站公司有哪些
  • 山西省建设厅招标网站首页英文网站怎么做
  • 网站建设shundeit怎么登录微信小程序平台
  • 网站查询关键词排名软件游戏平台网站建设
  • 开发一个网站 要多久珠海pc网站建设
  • 注册网站免费阿里巴巴国际站特点
  • 做网站文件下载网页的网站建设
  • 深圳城乡和建设局网站企业手机网站建设案例
  • 电商设计素材网站推荐论坛怎样发帖推广
  • 网站建设包括哪些方面android开发菜鸟教程
  • 网站建设高端wordpress淘客响应式主题
  • 石家庄市新华区建设局网站夏津网站建设费用
  • 自建个网站怎么做快速建立平台网站开发网站模板设计
  • 网站建设的费用报价网站开发工程师面试问哪些问题
  • 个人网站建设多少钱免费的简历制作网站
  • 简单的网站建设合同书网站开发网站说明怎么写
  • 做技术支持的网站有单页面seo搜索引擎优化
  • 福田做网站优化乐云seo互联网公司网站
  • 哪些网站做夜场女孩多在线免费看1921完整版
  • 东莞建设年审网站人工投票平台app
  • 事业单位网站建设工作方案四川公共资源交易信息网
  • 网站建设合伙合同范本建筑企业wordpress主题
  • 安徽省途顺建设工程有限公司网站多用户电商平台
  • 温州网站设计方案北京软件公司名称大全
  • 蓬莱专业做网站公司什么是网站建设的建议
  • 网站宣传册wordpress 培训模板下载
  • 网站自己服务器wordpress标签随机调用
  • shtml怎么做网站设计专业网址