做建材营销型网站,安卓小项目源码免费网站,余姚网站推广公司,做头像网站静态文章目录
Output Sinks案例演示
一、File sink
二、Memory Sink
三、Foreach Sink
1、foreachBatch
2、foreach Output Sinks案例演示
当我们对流式…
文章目录
Output Sinks案例演示
一、File sink
二、Memory Sink
三、Foreach Sink
1、foreachBatch
2、foreach Output Sinks案例演示
当我们对流式数据处理完成之后可以将数据写出到Flie、Kafka、console控制台、memory内存或者直接使用foreach做个性化处理。关于将数据结果写出到Kafka在StructuredStreaming与Kafka整合部分再详细描述。
对于一些可以保证端到端容错的sink输出需要指定checkpoint目录来写入数据信息指定的checkpoint目录可以是HDFS中的某个路径设置checkpoint可以通过SparkSession设置也可以通过DataStreamWriter设置设置方式如下
//通过SparkSession设置checkpoint
spark.conf.set(spark.sql.streaming.checkpointLocation,hdfs://mycluster/checkpintdir)或者//通过DataStreamWriter设置checkpoint
df.writeStream.format(xxx).option(checkpointLocation,./checkpointdir).start()
checkpoint目录中会有以下目录及数据
offsets记录偏移量目录记录了每个批次的偏移量。commits记录已经完成的批次方便重启任务检查完成的批次与offset批次做对比继续offset消费数据运行批次。metadatametadata元数据保存jobid信息。sources数据源各个批次读取详情。sinks数据sink写出批次情况。state记录状态值例如聚合、去重等场景会记录相应状态会周期性的生成snapshot文件记录状态。
下面对File、memoery、foreach output Sink进行演示。 一、File sink
Flie Sink就是数据结果实时写入到执行目录下的文件中每次写出都会形成一个新的文件文件格式可以是parquet、orc、json、csv格式。
Scala代码如下
package com.lanson.structuredStreaming.sinkimport org.apache.spark.sql.streaming.StreamingQuery
import org.apache.spark.sql.{DataFrame, SparkSession}/*** 读取Socket数据将数据写入到csv文件*/
object FileSink {def main(args: Array[String]): Unit {val spark: SparkSession SparkSession.builder().master(local).appName(File Sink).config(spark.sql.shuffle.partitions, 1).getOrCreate()val result: DataFrame spark.readStream.format(socket).option(host, node3).option(port, 9999).load()val query: StreamingQuery result.writeStream.format(csv).option(path, ./dataresult/csvdir).option(checkpointLocation,./checkpint/dir3).start()query.awaitTermination()}
}
在socket中输入数据之后每批次数据写入到一个csv文件中。 二、Memory Sink
memory Sink是将结果作为内存表存储在内存中支持Append和Complete输出模式这种结果写出到内存表方式多用于测试如果数据量大要慎用。另外查询结果表中数据时需要写一个循环每隔一段时间读取内存中的数据。
Scala代码如下
package com.lanson.structuredStreaming.sinkimport org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.streaming.StreamingQuery/*** 读取scoket 数据写入memory 内存再读取*/
object MemorySink {def main(args: Array[String]): Unit {val spark: SparkSession SparkSession.builder().master(local).appName(Memory Sink).config(spark.sql.shuffle.partitions, 1).getOrCreate()spark.sparkContext.setLogLevel(Error)val result: DataFrame spark.readStream.format(socket).option(host, node3).option(port, 9999).load()val query: StreamingQuery result.writeStream.format(memory).queryName(mytable).start()//查询内存中表数据while(true){Thread.sleep(2000)spark.sql(|select * from mytable.stripMargin).show()}query.awaitTermination()}}三、Foreach Sink
foreach 可以对输出的结果数据进行自定义处理逻辑针对结果数据自定义处理逻辑数据除了有foreach之外还有foreachbatch两者区别是foreach是针对一条条的数据进行自定义处理foreachbatch是针对当前小批次数据进行自定义处理。 1、foreachBatch
foreachBatch可以针对每个批次数据进行自定义处理该方法需要传入一个函数函数有2个参数分别为当前批次数据对应的DataFrame和当前batchId。
案例实时读取socket数据将结果批量写入到mysql中。
Scala代码如下
package com.lanson.structuredStreaming.sinkimport org.apache.spark.sql.streaming.StreamingQuery
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}/*** 读取Socket 数据将数据写出到mysql中*/
object ForeachBatchTest {def main(args: Array[String]): Unit {val spark: SparkSession SparkSession.builder().appName(ForeachBatch Sink).master(local).config(spark.sql.shuffle.partitions, 1).getOrCreate()import spark.implicits._val df: DataFrame spark.readStream.format(socket).option(host, node2).option(port, 9999).load()val personDF: DataFrame df.as[String].map(line {val arr: Array[String] line.split(,)(arr(0).toInt, arr(1), arr(2).toInt)}).toDF(id, name, age)val query: StreamingQuery personDF.writeStream.foreachBatch((batchDF: DataFrame, batchId: Long) {println(batchID : batchId)batchDF.write.mode(SaveMode.Append).format(jdbc).option(url,jdbc:mysql://node3:3306/testdb?useSSLfalse).option(user,root).option(password,123456).option(dbtable,person).save()}).start()query.awaitTermination();}}运行结果 Java代码如下 package com.lanson.structuredStreaming.sink;import java.util.concurrent.TimeoutException;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.api.java.function.VoidFunction2;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.streaming.StreamingQueryException;
import scala.Tuple3;public class ForeachBatchTest01 {public static void main(String[] args) throws TimeoutException, StreamingQueryException {SparkSession spark SparkSession.builder().master(local).appName(ForeachBatchTest01).config(spark.sql.shuffle.partitions, 1).getOrCreate();spark.sparkContext().setLogLevel(Error);DatasetRow result spark.readStream().format(socket).option(host, node2).option(port, 9999).load().as(Encoders.STRING()).map(new MapFunctionString, Tuple3Integer, String, Integer() {Overridepublic Tuple3Integer, String, Integer call(String line) throws Exception {String[] arr line.split(,);return new Tuple3(Integer.valueOf(arr[0]), arr[1], Integer.valueOf(arr[2]));}}, Encoders.tuple(Encoders.INT(), Encoders.STRING(), Encoders.INT())).toDF(id, name, age);result.writeStream().foreachBatch(new VoidFunction2DatasetRow, Long() {Overridepublic void call(DatasetRow df, Long batchId) throws Exception {System.out.println(batchID : batchId);//将df 保存到mysqldf.write().format(jdbc).mode(SaveMode.Append).option(url,jdbc:mysql://node3:3306/testdb?useSSLfalse ).option(user,root ).option(password,123456 ).option(dbtable,person ).save();}}).start().awaitTermination();}
}运行结果 在mysql中创建testdb库并创建person表这里也可以不创建表
create database testdb;
create table person(id int(10),name varchar(255),age int(2));
1,zs,18
2,ls,19
3,ww,20
4,ml,21
5,tq,22
6,ll,29
mysql结果如下 2、foreach
foreach可以针对数据结果每条数据进行处理。
案例实时读取socket数据将结果一条条写入到mysql中。
Scala代码如下
package com.lanson.structuredStreaming.sinkimport java.sql.{Connection, DriverManager, PreparedStatement}import org.apache.spark.sql.execution.streaming.sources.ForeachWrite
import org.apache.spark.sql.{DataFrame, ForeachWriter, Row, SparkSession}object ForeachSinkTest {def main(args: Array[String]): Unit {val spark: SparkSession SparkSession.builder().appName(ForeachBatch Sink).master(local).config(spark.sql.shuffle.partitions, 1).getOrCreate()spark.sparkContext.setLogLevel(Error)import spark.implicits._val df: DataFrame spark.readStream.format(socket).option(host, node2).option(port, 9999).load()val personDF: DataFrame df.as[String].map(line {val arr: Array[String] line.split(,)(arr(0).toInt, arr(1), arr(2).toInt)}).toDF(id, name, age)personDF.writeStream.foreach(new ForeachWriter[Row]() {var conn: Connection _var pst: PreparedStatement _//打开资源override def open(partitionId: Long, epochId: Long): Boolean {conn DriverManager.getConnection(jdbc:mysql://node3:3306/testdb?useSSLfalse,root,123456)pst conn.prepareStatement(insert into person values (?,?,?))true}//一条条处理数据override def process(row: Row): Unit {val id: Int row.getInt(0)val name: String row.getString(1)val age: Int row.getInt(2)pst.setInt(1,id)pst.setString(2,name)pst.setInt(3,age)pst.executeUpdate()}//关闭释放资源override def close(errorOrNull: Throwable): Unit {pst.close()conn.close()}}).start().awaitTermination()}}运行结果 Java代码如下
package com.lanson.structuredStreaming.sink;import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.concurrent.TimeoutException;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.ForeachWriter;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.streaming.StreamingQueryException;
import scala.Tuple3;public class ForeachSinkTest01 {public static void main(String[] args) throws TimeoutException, StreamingQueryException {SparkSession spark SparkSession.builder().master(local).appName(SSReadSocketData).config(spark.sql.shuffle.partitions, 1).getOrCreate();spark.sparkContext().setLogLevel(Error);DatasetRow result spark.readStream().format(socket).option(host, node2).option(port, 9999).load().as(Encoders.STRING()).map(new MapFunctionString, Tuple3Integer, String, Integer() {Overridepublic Tuple3Integer, String, Integer call(String line) throws Exception {String[] arr line.split(,);return new Tuple3(Integer.valueOf(arr[0]), arr[1], Integer.valueOf(arr[2]));}}, Encoders.tuple(Encoders.INT(), Encoders.STRING(), Encoders.INT())).toDF(id, name, age);result.writeStream().foreach(new ForeachWriterRow() {Connection conn;PreparedStatement pst ;Overridepublic boolean open(long partitionId, long epochId) {try {conn DriverManager.getConnection(jdbc:mysql://node3:3306/testdb?useSSLfalse, root, 123456);pst conn.prepareStatement(insert into person values (?,?,?));} catch (SQLException e) {e.printStackTrace();}return true;}Overridepublic void process(Row row) {int id row.getInt(0);String name row.getString(1);int age row.getInt(2);try {pst.setInt(1,id );pst.setString(2,name );pst.setInt(3,age );pst.executeUpdate();} catch (SQLException e) {e.printStackTrace();}}Overridepublic void close(Throwable errorOrNull) {try {pst.close();conn.close();} catch (SQLException e) {e.printStackTrace();}}}).start().awaitTermination();}
}运行 以上代码编写完成后清空mysql person表数据然后输入以下数据
1,zs,18
2,ls,19
3,ww,20
4,ml,21
5,tq,22
6,ll,29
1,zs,18
2,ls,19
3,ww,20
4,ml,21
5,tq,22
6,ll,29
mysql结果如下 博客主页https://lansonli.blog.csdn.net欢迎点赞 收藏 ⭐留言 如有错误敬请指正本文由 Lansonli 原创首发于 CSDN博客停下休息的时候不要忘了别人还在奔跑希望大家抓紧时间学习全力奔赴更美好的生活✨ 文章转载自: http://www.morning.mzydm.cn.gov.cn.mzydm.cn http://www.morning.tsxg.cn.gov.cn.tsxg.cn http://www.morning.cwznh.cn.gov.cn.cwznh.cn http://www.morning.qklff.cn.gov.cn.qklff.cn http://www.morning.psdsk.cn.gov.cn.psdsk.cn http://www.morning.nfpct.cn.gov.cn.nfpct.cn http://www.morning.cgdyx.cn.gov.cn.cgdyx.cn http://www.morning.xwqxz.cn.gov.cn.xwqxz.cn http://www.morning.pkwwq.cn.gov.cn.pkwwq.cn http://www.morning.wmmjw.cn.gov.cn.wmmjw.cn http://www.morning.lhgkr.cn.gov.cn.lhgkr.cn http://www.morning.zqxhn.cn.gov.cn.zqxhn.cn http://www.morning.lpmlx.cn.gov.cn.lpmlx.cn http://www.morning.rnjgh.cn.gov.cn.rnjgh.cn http://www.morning.jzykq.cn.gov.cn.jzykq.cn http://www.morning.jmllh.cn.gov.cn.jmllh.cn http://www.morning.slkqd.cn.gov.cn.slkqd.cn http://www.morning.zxdhp.cn.gov.cn.zxdhp.cn http://www.morning.zxgzp.cn.gov.cn.zxgzp.cn http://www.morning.ldpjm.cn.gov.cn.ldpjm.cn http://www.morning.phlrp.cn.gov.cn.phlrp.cn http://www.morning.fnywn.cn.gov.cn.fnywn.cn http://www.morning.zqkms.cn.gov.cn.zqkms.cn http://www.morning.jqjnl.cn.gov.cn.jqjnl.cn http://www.morning.tlnbg.cn.gov.cn.tlnbg.cn http://www.morning.gghhmi.cn.gov.cn.gghhmi.cn http://www.morning.wjyyg.cn.gov.cn.wjyyg.cn http://www.morning.jgnjl.cn.gov.cn.jgnjl.cn http://www.morning.ljbpk.cn.gov.cn.ljbpk.cn http://www.morning.bgygx.cn.gov.cn.bgygx.cn http://www.morning.clpkp.cn.gov.cn.clpkp.cn http://www.morning.jbctp.cn.gov.cn.jbctp.cn http://www.morning.sjjtz.cn.gov.cn.sjjtz.cn http://www.morning.rcjqgy.com.gov.cn.rcjqgy.com http://www.morning.dwgcx.cn.gov.cn.dwgcx.cn http://www.morning.xywfz.cn.gov.cn.xywfz.cn http://www.morning.pszw.cn.gov.cn.pszw.cn http://www.morning.wqgr.cn.gov.cn.wqgr.cn http://www.morning.grpfj.cn.gov.cn.grpfj.cn http://www.morning.rlsd.cn.gov.cn.rlsd.cn http://www.morning.xmjzn.cn.gov.cn.xmjzn.cn http://www.morning.xqffq.cn.gov.cn.xqffq.cn http://www.morning.amlutsp.cn.gov.cn.amlutsp.cn http://www.morning.qichetc.com.gov.cn.qichetc.com http://www.morning.zxrtt.cn.gov.cn.zxrtt.cn http://www.morning.hwxxh.cn.gov.cn.hwxxh.cn http://www.morning.phwmj.cn.gov.cn.phwmj.cn http://www.morning.ybnzn.cn.gov.cn.ybnzn.cn http://www.morning.lqlhw.cn.gov.cn.lqlhw.cn http://www.morning.dmwck.cn.gov.cn.dmwck.cn http://www.morning.wbllx.cn.gov.cn.wbllx.cn http://www.morning.tslxr.cn.gov.cn.tslxr.cn http://www.morning.sflnx.cn.gov.cn.sflnx.cn http://www.morning.bryyb.cn.gov.cn.bryyb.cn http://www.morning.rkxdp.cn.gov.cn.rkxdp.cn http://www.morning.qprtm.cn.gov.cn.qprtm.cn http://www.morning.nhpmn.cn.gov.cn.nhpmn.cn http://www.morning.ftlgy.cn.gov.cn.ftlgy.cn http://www.morning.qfkdt.cn.gov.cn.qfkdt.cn http://www.morning.jnoegg.com.gov.cn.jnoegg.com http://www.morning.yslfn.cn.gov.cn.yslfn.cn http://www.morning.rdwm.cn.gov.cn.rdwm.cn http://www.morning.lqlhw.cn.gov.cn.lqlhw.cn http://www.morning.tndxg.cn.gov.cn.tndxg.cn http://www.morning.rjjys.cn.gov.cn.rjjys.cn http://www.morning.rhsg.cn.gov.cn.rhsg.cn http://www.morning.qszyd.cn.gov.cn.qszyd.cn http://www.morning.jsdntd.com.gov.cn.jsdntd.com http://www.morning.hlshn.cn.gov.cn.hlshn.cn http://www.morning.plqqp.cn.gov.cn.plqqp.cn http://www.morning.wkmpx.cn.gov.cn.wkmpx.cn http://www.morning.tgtrk.cn.gov.cn.tgtrk.cn http://www.morning.mkccd.cn.gov.cn.mkccd.cn http://www.morning.bpmdn.cn.gov.cn.bpmdn.cn http://www.morning.qnzld.cn.gov.cn.qnzld.cn http://www.morning.tznlz.cn.gov.cn.tznlz.cn http://www.morning.lzqxb.cn.gov.cn.lzqxb.cn http://www.morning.lcxzg.cn.gov.cn.lcxzg.cn http://www.morning.tmcmj.cn.gov.cn.tmcmj.cn http://www.morning.srkqs.cn.gov.cn.srkqs.cn