当前位置: 首页 > news >正文

建设网站需要两种服务支持网站建设网上接单

建设网站需要两种服务支持,网站建设网上接单,云南省和城乡建设厅网站,手机网站建设西安目录 0. 相关文章链接 1. 基本操作 1.1. 弱类型 api 1.2. 强类型 1.3. 直接执行 sql 2. 基于 event-time 的窗口操作 2.1. event-time 窗口理解 2.2. event-time 窗口生成规则 3. 基于 Watermark 处理延迟数据 3.1. 什么是 Watermark 机制 3.2. update 模式下使用 w…目录 0. 相关文章链接 1. 基本操作 1.1. 弱类型 api 1.2. 强类型 1.3. 直接执行 sql 2. 基于 event-time 的窗口操作 2.1. event-time 窗口理解 2.2. event-time 窗口生成规则 3. 基于 Watermark 处理延迟数据 3.1. 什么是 Watermark 机制 3.2. update 模式下使用 watermark 3.3. append 模式下使用 wartermark 3.4. watermark 机制总结 4. 流数据去重 5. join操作 5.1. Stream-static Joins 5.1.1. 内连接 5.1.2. 外连接 5.2. Stream-stream Joins 5.2.1. inner join 4.2.2. outer join 6. Streaming DF/DS 不支持的操作 0. 相关文章链接 Spark文章汇总  1. 基本操作 在 DF/DS 上大多数通用操作都支持作用在 Streaming DataFrame/Streaming DataSet 上。 准备处理数据 people.json {name: Michael,age: 29,sex: female} {name: Andy,age: 30,sex: male} {name: Justin,age: 19,sex: male} {name: Lisi,age: 18,sex: male} {name: zs,age: 10,sex: female} {name: zhiling,age: 40,sex: female} 1.1. 弱类型 api 代码示例 import org.apache.spark.sql.types.{LongType, StringType, StructType} import org.apache.spark.sql.{DataFrame, SparkSession}object StreamTest {def main(args: Array[String]): Unit {// 创建 SparkSession. 因为 ss 是基于 spark sql 引擎, 所以需要先创建 SparkSessionval spark: SparkSession SparkSession.builder().master(local[*]).appName(StreamTest).getOrCreate()import spark.implicits._// 创建格式并读取数据val peopleSchema: StructType new StructType().add(name, StringType).add(age, LongType).add(sex, StringType)val peopleDF: DataFrame spark.readStream.schema(peopleSchema).json(/Project/Data/json)// 弱类型 apival df: DataFrame peopleDF.select(name, age, sex).where(age 20)df.writeStream.outputMode(append).format(console).start.awaitTermination()// 关闭执行环境spark.stop()} } 结果输出 ------------------------------------------- Batch: 0 ------------------------------------------- ---------------- | name|age| sex| ---------------- |Michael| 29|female| | Andy| 30| male| |zhiling| 40|female| ---------------- 1.2. 强类型 代码示例 import org.apache.spark.sql.types.{LongType, StringType, StructType} import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}object StreamTest {def main(args: Array[String]): Unit {// 创建 SparkSession. 因为 ss 是基于 spark sql 引擎, 所以需要先创建 SparkSessionval spark: SparkSession SparkSession.builder().master(local[*]).appName(StreamTest).getOrCreate()import spark.implicits._// 创建格式并读取数据val peopleSchema: StructType new StructType().add(name, StringType).add(age, LongType).add(sex, StringType)val peopleDF: DataFrame spark.readStream.schema(peopleSchema).json(/Project/Data/json)// 强类型转成 dsval peopleDS: Dataset[People] peopleDF.as[People]val df: Dataset[String] peopleDS.filter((_: People).age 20).map((_: People).name)df.writeStream.outputMode(append).format(console).start.awaitTermination()// 关闭执行环境spark.stop()} }case class People(name: String, age: Long, sex: String) 结果输出 ------------------------------------------- Batch: 0 ------------------------------------------- ------- | value| ------- |Michael| | Andy| |zhiling| ------- 1.3. 直接执行 sql 代码示例 import org.apache.spark.sql.types.{LongType, StringType, StructType} import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}object StreamTest {def main(args: Array[String]): Unit {// 创建 SparkSession. 因为 ss 是基于 spark sql 引擎, 所以需要先创建 SparkSessionval spark: SparkSession SparkSession.builder().master(local[*]).appName(StreamTest).getOrCreate()import spark.implicits._// 创建格式并读取数据val peopleSchema: StructType new StructType().add(name, StringType).add(age, LongType).add(sex, StringType)val peopleDF: DataFrame spark.readStream.schema(peopleSchema).json(/Project/Data/json)// 直接执行SQL创建临时表peopleDF.createOrReplaceTempView(people)val df: DataFrame spark.sql(select * from people where age 20)df.writeStream.outputMode(append).format(console).start.awaitTermination()// 关闭执行环境spark.stop()} } 结果输出 ------------------------------------------- Batch: 0 ------------------------------------------- ---------------- | name|age| sex| ---------------- |Michael| 29|female| | Andy| 30| male| |zhiling| 40|female| ---------------- 2. 基于 event-time 的窗口操作 2.1. event-time 窗口理解 在 Structured Streaming 中, 可以按照事件发生时的时间对数据进行聚合操作, 即基于 event-time 进行操作。在这种机制下, 即不必考虑 Spark 陆续接收事件的顺序是否与事件发生的顺序一致, 也不必考虑事件到达 Spark 的时间与事件发生时间的关系。因此, 它在提高数据处理精度的同时, 大大减少了开发者的工作量。我们现在想计算 10 分钟内的单词, 每 5 分钟更新一次, 也就是说在 10 分钟窗口 12:00 - 12:10, 12:05 - 12:15, 12:10 - 12:20等之间收到的单词量。 注意, 12:00 - 12:10 表示数据在 12:00 之后 12:10 之前到达。现在,考虑一下在 12:07 收到的单词。单词应该增加对应于两个窗口12:00 - 12:10和12:05 - 12:15的计数。因此,计数将由分组键即单词和窗口可以从事件时间计算索引。 统计后的结果应该是这样的 代码示例 import org.apache.spark.sql.functions.window import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession} import java.sql.Timestampobject StreamTest {def main(args: Array[String]): Unit {// 创建 SparkSession. 因为 ss 是基于 spark sql 引擎, 所以需要先创建 SparkSessionval spark: SparkSession SparkSession.builder().master(local[*]).appName(StreamTest).getOrCreate()import spark.implicits._// 使用socket数据源val lines: DataFrame spark.readStream.format(socket).option(host, localhost).option(port, 9999).option(includeTimestamp, value true) // 给产生的数据自动添加时间戳.load// 把行切割成单词, 保留时间戳val words: DataFrame lines.as[(String, Timestamp)].flatMap((line: (String, Timestamp)) {line._1.split( ).map(((_: String), line._2))}).toDF(word, timestamp)// 按照窗口和单词分组, 并且计算每组的单词的个数最后按照窗口排序val wordCounts: Dataset[Row] words.groupBy(// 调用 window 函数, 返回的是一个 Column 类型// 参数 1: df 中表示时间戳的列// 参数 2: 窗口长度// 参数 3: 滑动步长window($timestamp, 60 seconds, 10 seconds),$word).count().orderBy($window)wordCounts.writeStream.outputMode(complete).format(console).option(truncate, false) // 不截断.为了在控制台能看到完整信息, 最好设置为 false.start.awaitTermination()// 关闭执行环境spark.stop()} } 结果输出 ------------------------------------------- Batch: 0 ------------------------------------------- --------------- |window|word|count| --------------- ---------------------------------------------------------- Batch: 1 ------------------------------------------- --------------------------------------------------- |window |word|count| --------------------------------------------------- |[2023-08-02 21:18:00, 2023-08-02 21:19:00]|abc |3 | |[2023-08-02 21:18:10, 2023-08-02 21:19:10]|a |3 | |[2023-08-02 21:18:10, 2023-08-02 21:19:10]|abc |5 | |[2023-08-02 21:18:20, 2023-08-02 21:19:20]|abc |5 | |[2023-08-02 21:18:20, 2023-08-02 21:19:20]|a |3 | |[2023-08-02 21:18:30, 2023-08-02 21:19:30]|a |3 | |[2023-08-02 21:18:30, 2023-08-02 21:19:30]|abc |5 | |[2023-08-02 21:18:40, 2023-08-02 21:19:40]|abc |5 | |[2023-08-02 21:18:40, 2023-08-02 21:19:40]|a |3 | |[2023-08-02 21:18:50, 2023-08-02 21:19:50]|a |3 | |[2023-08-02 21:18:50, 2023-08-02 21:19:50]|abc |5 | |[2023-08-02 21:19:00, 2023-08-02 21:20:00]|a |3 | |[2023-08-02 21:19:00, 2023-08-02 21:20:00]|abc |2 | ---------------------------------------------------------------------------------------------- Batch: 2 ------------------------------------------- --------------------------------------------------- |window |word|count| --------------------------------------------------- |[2023-08-02 21:18:00, 2023-08-02 21:19:00]|abc |3 | |[2023-08-02 21:18:10, 2023-08-02 21:19:10]|a |3 | |[2023-08-02 21:18:10, 2023-08-02 21:19:10]|abc |5 | |[2023-08-02 21:18:20, 2023-08-02 21:19:20]|abc |5 | |[2023-08-02 21:18:20, 2023-08-02 21:19:20]|a |3 | |[2023-08-02 21:18:30, 2023-08-02 21:19:30]|a |3 | |[2023-08-02 21:18:30, 2023-08-02 21:19:30]|abc |5 | |[2023-08-02 21:18:40, 2023-08-02 21:19:40]|abc |5 | |[2023-08-02 21:18:40, 2023-08-02 21:19:40]|a |3 | |[2023-08-02 21:18:50, 2023-08-02 21:19:50]|a |3 | |[2023-08-02 21:18:50, 2023-08-02 21:19:50]|abc |5 | |[2023-08-02 21:19:00, 2023-08-02 21:20:00]|a |3 | |[2023-08-02 21:19:00, 2023-08-02 21:20:00]|abc |2 | |[2023-08-02 21:19:10, 2023-08-02 21:20:10]|a |2 | |[2023-08-02 21:19:10, 2023-08-02 21:20:10]|abc |1 | |[2023-08-02 21:19:20, 2023-08-02 21:20:20]|a |2 | |[2023-08-02 21:19:20, 2023-08-02 21:20:20]|abc |1 | |[2023-08-02 21:19:30, 2023-08-02 21:20:30]|a |2 | |[2023-08-02 21:19:30, 2023-08-02 21:20:30]|abc |1 | |[2023-08-02 21:19:40, 2023-08-02 21:20:40]|a |2 | --------------------------------------------------- only showing top 20 rows 由此可以看出, 在这种窗口机制下, 无论事件何时到达, 以怎样的顺序到达, Structured Streaming 总会根据事件时间生成对应的若干个时间窗口, 然后按照指定的规则聚合。 2.2. event-time 窗口生成规则 可以查看 org.apache.spark.sql.catalyst.analysis.TimeWindowing 类下的如下代码 The windows are calculated as below: maxNumOverlapping - ceil(windowDuration / slideDuration) for (i - 0 until maxNumOverlapping)windowId - ceil((timestamp - startTime) / slideDuration)windowStart - windowId * slideDuration (i - maxNumOverlapping) * slideDuration startTimewindowEnd - windowStart windowDurationreturn windowStart, windowEnd 将event-time 作为“初始窗口”的结束时间, 然后按照窗口滑动宽度逐渐向时间轴前方推进, 直到某个窗口不再包含该 event-time 为止。 最终以“初始窗口”与“结束窗口”之间的若干个窗口作为最终生成的 event-time 的时间窗口。 每个窗口的起始时间与结束时间都是前必后开的区间, 因此初始窗口和结束窗口都不会包含 event-time, 最终不会被使用。 得到窗口如下 3. 基于 Watermark 处理延迟数据 3.1. 什么是 Watermark 机制 在数据分析系统中, Structured Streaming 可以持续的按照 event-time 聚合数据, 然而在此过程中并不能保证数据按照时间的先后依次到达。 例如: 当前接收的某一条数据的 event-time 可能远远早于之前已经处理过的 event-time。 在发生这种情况时, 往往需要结合业务需求对延迟数据进行过滤。现在考虑如果事件延迟到达会有哪些影响。 假如, 一个单词在 12:04(event-time) 产生, 在 12:11 到达应用。 应用应该使用 12:04 来在窗口(12:00 - 12:10)中更新计数, 而不是使用 12:11。 这些情况在我们基于窗口的聚合中是自然发生的, 因为结构化流可以长时间维持部分聚合的中间状态。 但是, 如果这个查询运行数天, 系统很有必要限制内存中累积的中间状态的数量。 这意味着系统需要知道何时从内存状态中删除旧聚合, 因为应用不再接受该聚合的后期数据。为了实现这个需求, 从 spark2.1, 引入了 watermark(水印), 使用引擎可以自动的跟踪当前的事件时间, 并据此尝试删除旧状态。通过指定 event-time 列和预估事件的延迟时间上限来定义一个查询的 watermark。 针对一个以时间 T 结束的窗口, 引擎会保留状态和允许延迟时间直到(max event time seen by the engine - late threshold T)。 换句话说, 延迟时间在上限内的被聚合, 延迟时间超出上限的开始被丢弃。 可以通过withWatermark() 来定义watermarkwatermark 计算方式watermark MaxEventTime - Threshhod而且, watermark只能逐渐增加, 不能减少。 Structured Streaming 引入 Watermark 机制, 主要是为了解决以下两个问题: 处理聚合中的延迟数据减少内存中维护的聚合状态. 注意在不同输出模式(complete, append, update)中, Watermark 会产生不同的影响。 3.2. update 模式下使用 watermark 在 update 模式下, 仅输出与之前批次的结果相比, 涉及更新或新增的数据。 代码示例如下 import org.apache.spark.sql.functions._ import org.apache.spark.sql.streaming.{StreamingQuery, Trigger} import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}import java.sql.Timestampobject StreamTest {def main(args: Array[String]): Unit {// 创建 SparkSession. 因为 ss 是基于 spark sql 引擎, 所以需要先创建 SparkSessionval spark: SparkSession SparkSession.builder().master(local[*]).appName(StreamTest).getOrCreate()import spark.implicits._// 使用socket数据源val lines: DataFrame spark.readStream.format(socket).option(host, localhost).option(port, 9999).load// 输入的数据中包含时间戳, 而不是自动添加的时间戳val words: DataFrame lines.as[String].flatMap((line: String) {val split: Array[String] line.split(,)split(1).split( ).map(((_: String), Timestamp.valueOf(split(0))))}).toDF(word, timestamp)// 使用 withWatermark 方法添加watermark, 参数 1: event-time 所在列的列名 参数 2: 延迟时间的上限.val wordCounts: Dataset[Row] words.withWatermark(timestamp, 2 minutes).groupBy(window($timestamp, 10 minutes, 2 minutes), $word).count()// 数据输出val query: StreamingQuery wordCounts.writeStream.outputMode(update).trigger(Trigger.ProcessingTime(1000)).format(console).option(truncate, false).startquery.awaitTermination()// 关闭执行环境spark.stop()} } 初始化的wartmark是 0通过如下输入的几条数据可以看到水位线的变化。 第一次输入数据  2023-08-07 10:55:00,dog 。这个条数据作为第一批数据。 按照window($timestamp, 10 minutes, 2 minutes)得到 5 个窗口。 由于是第一批, 所有的窗口的结束时间都大于 wartermark(0), 所以 5 个窗口都显示如下所示 --------------------------------------------------- |window |word|count| --------------------------------------------------- |[2023-08-07 10:50:00, 2023-08-07 11:00:00]|dog |1 | |[2023-08-07 10:54:00, 2023-08-07 11:04:00]|dog |1 | |[2023-08-07 10:48:00, 2023-08-07 10:58:00]|dog |1 | |[2023-08-07 10:52:00, 2023-08-07 11:02:00]|dog |1 | |[2023-08-07 10:46:00, 2023-08-07 10:56:00]|dog |1 | --------------------------------------------------- 然后根据当前批次中最大的 event-time, 计算出来下次使用的 watermark. 本批次只有一个数据(10:55), 所有: watermark 10:55 - 2min 10:53 。 第二次输入数据  2023-08-07 11:00:00,dog 。 这条数据作为第二批数据, 计算得到 5 个窗口。 此时的watermark10:53, 所有的窗口的结束时间均大于 watermark。 在 update 模式下, 只输出结果表中涉及更新或新增的数据。 --------------------------------------------------- |window |word|count| --------------------------------------------------- |[2023-08-07 10:58:00, 2023-08-07 11:08:00]|dog |1 | |[2023-08-07 10:54:00, 2023-08-07 11:04:00]|dog |2 | |[2023-08-07 10:56:00, 2023-08-07 11:06:00]|dog |1 | |[2023-08-07 10:52:00, 2023-08-07 11:02:00]|dog |2 | |[2023-08-07 11:00:00, 2023-08-07 11:10:00]|dog |1 | --------------------------------------------------- 其中: count 是 2 的表示更新, count 是 1 的表示新增。 没有变化的就没有显示(但是内存中仍然保存着)。此时的的 watermark 11:00 - 2min 10:58 。如下数据为在内存中保存着但是没有打印出来的数据 |[2023-08-07 10:46:00, 2023-08-07 10:56:00]|dog |1 | |[2023-08-07 10:48:00, 2023-08-07 10:58:00]|dog |1 | |[2023-08-07 10:50:00, 2023-08-07 11:00:00]|dog |1 | 第三次输入数据   2023-08-07 10:55:00,dog  。 这条数据作为第 3 批次相当于一条延迟数据计算得到 5 个窗口。此时的 watermark 10:58 当前内存中有两个窗口的结束时间已经低于 10: 58。 |[2023-08-07 10:46:00, 2023-08-07 10:56:00]|dog |1 | |[2023-08-07 10:48:00, 2023-08-07 10:58:00]|dog |1 | 则立即删除这两个窗口在内存中的维护状态。 同时, 当前批次中新加入的数据所划分出来的窗口, 如果窗口结束时间低于 11:58, 则窗口会被过滤掉。 所以这次输出结果 --------------------------------------------------- |window |word|count| --------------------------------------------------- |[2023-08-07 10:50:00, 2023-08-07 11:00:00]|dog |2 | |[2023-08-07 10:54:00, 2023-08-07 11:04:00]|dog |3 | |[2023-08-07 10:52:00, 2023-08-07 11:02:00]|dog |3 | --------------------------------------------------- 第三个批次的数据处理完成后, 立即计算: watermark 10:55 - 2min 10:53, 这个值小于当前的 watermask(10:58), 所以保持不变(因为 watermask 只能增加不能减少)。 3.3. append 模式下使用 wartermark 代码示例如下 import org.apache.spark.sql.functions._ import org.apache.spark.sql.streaming.{StreamingQuery, Trigger} import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}import java.sql.Timestampobject StreamTest {def main(args: Array[String]): Unit {// 创建 SparkSession. 因为 ss 是基于 spark sql 引擎, 所以需要先创建 SparkSessionval spark: SparkSession SparkSession.builder().master(local[*]).appName(StreamTest).getOrCreate()import spark.implicits._// 使用socket数据源val lines: DataFrame spark.readStream.format(socket).option(host, localhost).option(port, 9999).load// 输入的数据中包含时间戳, 而不是自动添加的时间戳val words: DataFrame lines.as[String].flatMap((line: String) {val split: Array[String] line.split(,)split(1).split( ).map(((_: String), Timestamp.valueOf(split(0))))}).toDF(word, timestamp)// 使用 withWatermark 方法添加watermark, 参数 1: event-time 所在列的列名 参数 2: 延迟时间的上限.val wordCounts: Dataset[Row] words.withWatermark(timestamp, 2 minutes).groupBy(window($timestamp, 10 minutes, 2 minutes), $word).count()// 数据输出val query: StreamingQuery wordCounts.writeStream.outputMode(append).trigger(Trigger.ProcessingTime(0)).format(console).option(truncate, false).startquery.awaitTermination()// 关闭执行环境spark.stop()} }在 append 模式中, 仅输出新增的数据, 且输出后的数据无法变更。 第一次输入数据 2023-08-07 10:55:00,dog  。 这个条数据作为第一批数据。 按照window($timestamp, 10 minutes, 2 minutes)得到 5 个窗口。 由于此时初始 watermask0, 当前批次中所有窗口的结束时间均大于 watermask。但是 Structured Streaming 无法确定后续批次的数据中是否会更新当前批次的内容。 因此, 基于 Append 模式的特点, 这时并不会输出任何数据(因为输出后数据就无法更改了), 直到某个窗口的结束时间小于 watermask, 即可以确定后续数据不会再变更该窗口的聚合结果时才会将其输出, 并移除内存中对应窗口的聚合状态。 --------------- |window|word|count| --------------- --------------- 然后根据当前批次中最大的 event-time, 计算出来下次使用的 watermark。 本批次只有一个数据(10:55), 所有: watermark 10:55 - 2min 10:53 第二次输入数据 2023-08-07 11:00:00,dog  。这条数据作为第二批数据, 计算得到 5 个窗口。 此时的watermark10:53, 所有的窗口的结束时间均大于 watermark, 仍然不会输出。 --------------- |window|word|count| --------------- --------------- 然后计算 watermark 11:00 - 2min 10:58 第三次输入数据 2023-08-07 10:55:00,dog 。相当于一条延迟数据,这条数据作为第 3 批次, 计算得到 5 个窗口。 此时的 watermark 10:58 当前内存中有两个窗口的结束时间已经低于 10: 58。 |[2023-08-07 10:48:00, 2023-08-07 10:58:00]|dog |1 | |[2023-08-07 10:46:00, 2023-08-07 10:56:00]|dog |1 | 则意味着这两个窗口的数据不会再发生变化, 此时输出这个两个窗口的聚合结果, 并在内存中清除这两个窗口的状态。所以这次输出结果: --------------------------------------------------- |window |word|count| --------------------------------------------------- |[2023-08-07 10:48:00, 2023-08-07 10:58:00]|dog |1 | |[2023-08-07 10:46:00, 2023-08-07 10:56:00]|dog |1 | --------------------------------------------------- 第三个批次的数据处理完成后, 立即计算: watermark 10:55 - 2min 10:53, 这个值小于当前的 watermask(10:58), 所以保持不变。(因为 watermask 只能增加不能减少) 3.4. watermark 机制总结 watermark 在用于基于时间的状态聚合操作时, 该时间可以基于窗口, 也可以基于 event-timeb本身。输出模式必须是append或update。 在输出模式是complete的时候(必须有聚合), 要求每次输出所有的聚合结果。 我们使用 watermark 的目的是丢弃一些过时聚合数据, 所以complete模式使用wartermark无效也无意义。在输出模式是append时, 必须设置 watermask 才能使用聚合操作。 其实, watermask 定义了 append 模式中何时输出聚合聚合结果(状态), 并清理过期状态。在输出模式是update时, watermask 主要用于过滤过期数据并及时清理过期状态。watermask 会在处理当前批次数据时更新, 并且会在处理下一个批次数据时生效使用。 但如果节点发送故障, 则可能延迟若干批次生效。withWatermark 必须使用与聚合操作中的时间戳列是同一列。df.withWatermark(time, 1 min).groupBy(time2).count() 无效。withWatermark 必须在聚合之前调用。 f.groupBy(time).count().withWatermark(time, 1 min) 无效。 4. 流数据去重 需求内容根据唯一的 id 实现数据去重 代码示例 import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}import java.sql.Timestampobject StreamTest {def main(args: Array[String]): Unit {// 创建 SparkSession. 因为 ss 是基于 spark sql 引擎, 所以需要先创建 SparkSessionval spark: SparkSession SparkSession.builder().master(local[*]).appName(StreamTest).getOrCreate()import spark.implicits._// 使用socket数据源val lines: DataFrame spark.readStream.format(socket).option(host, localhost).option(port, 9999).load// 数据预处理val words: DataFrame lines.as[String].map((line: String) {val arr: Array[String] line.split(,)(arr(0), Timestamp.valueOf(arr(1)), arr(2))}).toDF(uid, ts, word)// 去重重复数据 uid 相同就是重复. 可以传递多个列val wordCounts: Dataset[Row] words.withWatermark(ts, 2 minutes).dropDuplicates(uid)// 输出数据wordCounts.writeStream.outputMode(append).format(console).start.awaitTermination()// 关闭执行环境spark.stop()} } 数据输入按顺序从上到下 1,2023-08-09 11:50:00,dog 2,2023-08-09 11:51:00,dog 1,2023-08-09 11:50:00,dog 3,2023-08-09 11:53:00,dog 1,2023-08-09 11:50:00,dog 4,2023-08-09 11:45:00,dog 注意点 dropDuplicates 不可用在聚合之后, 即通过聚合得到的 df/ds 不能调用dropDuplicates 使用watermask - 如果重复记录的到达时间有上限,则可以在事件时间列上定义水印,并使用guid和事件时间列进行重复数据删除。该查询将使用水印从过去的记录中删除旧的状态数据,这些记录不会再被重复。这限制了查询必须维护的状态量。 没有watermask - 由于重复记录可能到达时没有界限,查询将来自所有过去记录的数据存储为状态。 测试 第一次输入数据1,2023-08-09 11:50:00,dog -------------------------- |uid| ts|word| -------------------------- | 1|2023-08-09 11:50:00| dog| -------------------------- 第二次输入数据2,2023-08-09 11:51:00,dog -------------------------- |uid| ts|word| -------------------------- | 2|2023-08-09 11:51:00| dog| -------------------------- 第三次输入数据1,2023-08-09 11:50:00,dog id 重复无输出第四次输入数据3,2023-08-09 11:53:00,dog 此时 watermask11:51 -------------------------- |uid| ts|word| -------------------------- | 3|2023-08-09 11:53:00| dog| -------------------------- 第五次输入数据1,2023-08-09 11:50:00,dog 数据重复, 并且数据过期, 所以无输出第六次输入数据4,2023-08-09 11:45:00,dog 数据过时, 所以无输出 5. join操作 Structured Streaming 支持 streaming DataSet/DataFrame 与静态的DataSet/DataFrame 进行 join, 也支持 streaming DataSet/DataFrame与另外一个streaming DataSet/DataFrame 进行 join。join 的结果也是持续不断的生成, 类似于前面的 streaming 的聚合结果。 5.1. Stream-static Joins 静态数据 lisi,male zhiling,female zs,male 流式数据 lisi,20 zhiling,40 ww,30 5.1.1. 内连接 代码示例 import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}object StreamTest {def main(args: Array[String]): Unit {// 创建 SparkSession. 因为 ss 是基于 spark sql 引擎, 所以需要先创建 SparkSessionval spark: SparkSession SparkSession.builder().master(local[*]).appName(StreamTest).getOrCreate()import spark.implicits._// 1. 静态 dfval arr: Array[(String, String)] Array((lisi, male), (zhiling, female), (zs, male));val staticDF: DataFrame spark.sparkContext.parallelize(arr).toDF(name, sex)// 2. 流式 dfval lines: DataFrame spark.readStream.format(socket).option(host, localhost).option(port, 9999).load()val streamDF: DataFrame lines.as[String].map((line: String) {val arr: Array[String] line.split(,)(arr(0), arr(1).toInt)}).toDF(name, age)// 3. join 等值内连接 a.nameb.nameval joinResult: DataFrame streamDF.join(staticDF, name)// 4. 输出joinResult.writeStream.outputMode(append).format(console).start.awaitTermination()// 关闭执行环境spark.stop()} } 数据输出 ---------------- | name|age| sex| ---------------- |zhiling| 40|female| | lisi| 20| male| ---------------- 5.1.2. 外连接 代码示例 import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}object StreamTest {def main(args: Array[String]): Unit {// 创建 SparkSession. 因为 ss 是基于 spark sql 引擎, 所以需要先创建 SparkSessionval spark: SparkSession SparkSession.builder().master(local[*]).appName(StreamTest).getOrCreate()import spark.implicits._// 1. 静态 dfval arr: Array[(String, String)] Array((lisi, male), (zhiling, female), (zs, male));val staticDF: DataFrame spark.sparkContext.parallelize(arr).toDF(name, sex)// 2. 流式 dfval lines: DataFrame spark.readStream.format(socket).option(host, localhost).option(port, 9999).load()val streamDF: DataFrame lines.as[String].map((line: String) {val arr: Array[String] line.split(,)(arr(0), arr(1).toInt)}).toDF(name, age)// 3. joinval joinResult: DataFrame streamDF.join(staticDF, Seq(name), left)// 4. 输出joinResult.writeStream.outputMode(append).format(console).start.awaitTermination()// 关闭执行环境spark.stop()} } 数据输出 ---------------- | name|age| sex| ---------------- |zhiling| 40|female| | ww| 30| null| | lisi| 20| male| ---------------- 5.2. Stream-stream Joins 在 Spark2.3, 开始支持 stream-stream join。Spark 会自动维护两个流的状态, 以保障后续流入的数据能够和之前流入的数据发生 join 操作, 但这会导致状态无限增长。 因此, 在对两个流进行 join 操作时, 依然可以用 watermark 机制来消除过期的状态, 避免状态无限增长。 第 1 个数据格式姓名,年龄,事件时间 lisi,female,2023-08-09 11:50:00 zs,male,2023-08-09 11:51:00 ww,female,2023-08-09 11:52:00 zhiling,female,2023-08-09 11:53:00 fengjie,female,2023-08-09 11:54:00 yifei,female,2023-08-09 11:55:00 第 2 个数据格式姓名,年龄,事件时间 lisi,18,2023-08-09 11:50:00 zs,19,2023-08-09 11:51:00 ww,20,2023-08-09 11:52:00 zhiling,22,2023-08-09 11:53:00 yifei,30,2023-08-09 11:54:00 fengjie,98,2023-08-09 11:55:00 5.2.1. inner join 对 2 个流式数据进行 join 操作输出模式仅支持append模式。 不带 watermast 的 inner joinjoin 的速度很慢 import org.apache.spark.sql.streaming.{StreamingQuery, Trigger} import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}import java.sql.Timestampobject StreamTest {def main(args: Array[String]): Unit {// 创建 SparkSession. 因为 ss 是基于 spark sql 引擎, 所以需要先创建 SparkSessionval spark: SparkSession SparkSession.builder().master(local[*]).appName(StreamTest).getOrCreate()import spark.implicits._// 第 1 个 streamval nameSexStream: DataFrame spark.readStream.format(socket).option(host, localhost).option(port, 9999).load.as[String].map((line: String) {val arr: Array[String] line.split(,)(arr(0), arr(1), Timestamp.valueOf(arr(2)))}).toDF(name, sex, ts1)// 第 2 个 streamval nameAgeStream: DataFrame spark.readStream.format(socket).option(host, localhost).option(port, 8888).load.as[String].map((line: String) {val arr: Array[String] line.split(,)(arr(0), arr(1).toInt, Timestamp.valueOf(arr(2)))}).toDF(name, age, ts2)// join 操作val joinResult: DataFrame nameSexStream.join(nameAgeStream, name)// 数据输出joinResult.writeStream.outputMode(append).format(console).trigger(Trigger.ProcessingTime(0)).start().awaitTermination()// 关闭执行环境spark.stop()} }// 数据输出 // ------------------------------------------------------ // | name| sex| ts1|age| ts2| // ------------------------------------------------------ // |zhiling|female|2023-08-09 11:53:00| 22|2023-08-09 11:53:00| // | ww|female|2023-08-09 11:52:00| 20|2023-08-09 11:52:00| // | yifei|female|2023-08-09 11:55:00| 30|2023-08-09 11:54:00| // | zs| male|2023-08-09 11:51:00| 19|2023-08-09 11:51:00| // |fengjie|female|2023-08-09 11:54:00| 98|2023-08-09 11:55:00| // | lisi|female|2023-08-09 11:50:00| 18|2023-08-09 11:50:00| // ------------------------------------------------------带 watermast 的 inner joinjoin 的速度很慢 import org.apache.spark.sql.functions._ import org.apache.spark.sql.streaming.{StreamingQuery, Trigger} import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}import java.sql.Timestampobject StreamTest {def main(args: Array[String]): Unit {// 创建 SparkSession. 因为 ss 是基于 spark sql 引擎, 所以需要先创建 SparkSessionval spark: SparkSession SparkSession.builder().master(local[*]).appName(StreamTest).getOrCreate()import spark.implicits._// 第 1 个 streamval nameSexStream: DataFrame spark.readStream.format(socket).option(host, localhost).option(port, 9999).load.as[String].map((line: String) {val arr: Array[String] line.split(,)(arr(0), arr(1), Timestamp.valueOf(arr(2)))}).toDF(name1, sex, ts1)// 第 2 个 streamval nameAgeStream: DataFrame spark.readStream.format(socket).option(host, localhost).option(port, 8888).load.as[String].map((line: String) {val arr: Array[String] line.split(,)(arr(0), arr(1).toInt, Timestamp.valueOf(arr(2)))}).toDF(name2, age, ts2).withWatermark(ts2, 1 minutes)// join 操作val joinResult: DataFrame nameSexStream.join(nameAgeStream,expr(|name1name2 and|ts2 ts1 and|ts2 ts1 interval 1 minutes.stripMargin))// 数据输出joinResult.writeStream.outputMode(append).format(console).trigger(Trigger.ProcessingTime(0)).start().awaitTermination()// 关闭执行环境spark.stop()} }// 数据输出 // ------------------------------------------------------------- // | name1| sex| ts1| name2|age| ts2| // ------------------------------------------------------------- // |zhiling|female|2023-08-09 11:53:00|zhiling| 22|2023-08-09 11:53:00| // | ww|female|2023-08-09 11:52:00| ww| 20|2023-08-09 11:52:00| // | zs| male|2023-08-09 11:51:00| zs| 19|2023-08-09 11:51:00| // |fengjie|female|2023-08-09 11:54:00|fengjie| 98|2023-08-09 11:55:00| // | lisi|female|2023-08-09 11:50:00| lisi| 18|2023-08-09 11:50:00| // ------------------------------------------------------------- 4.2.2. outer join 外连接必须使用 watermast和内连接相比, 代码几乎一致, 只需要在连接的时候指定下连接类型即可joinType left。 import org.apache.spark.sql.functions._ import org.apache.spark.sql.streaming.{StreamingQuery, Trigger} import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}import java.sql.Timestampobject StreamTest {def main(args: Array[String]): Unit {// 创建 SparkSession. 因为 ss 是基于 spark sql 引擎, 所以需要先创建 SparkSessionval spark: SparkSession SparkSession.builder().master(local[*]).appName(StreamTest).getOrCreate()import spark.implicits._// 第 1 个 streamval nameSexStream: DataFrame spark.readStream.format(socket).option(host, localhost).option(port, 9999).load.as[String].map((line: String) {val arr: Array[String] line.split(,)(arr(0), arr(1), Timestamp.valueOf(arr(2)))}).toDF(name1, sex, ts1)// 第 2 个 streamval nameAgeStream: DataFrame spark.readStream.format(socket).option(host, localhost).option(port, 8888).load.as[String].map((line: String) {val arr: Array[String] line.split(,)(arr(0), arr(1).toInt, Timestamp.valueOf(arr(2)))}).toDF(name2, age, ts2).withWatermark(ts2, 1 minutes)// join 操作val joinResult: DataFrame nameSexStream.join(nameAgeStream,expr(|name1name2 and|ts2 ts1 and|ts2 ts1 interval 1 minutes.stripMargin),joinType left)// 数据输出joinResult.writeStream.outputMode(append).format(console).trigger(Trigger.ProcessingTime(0)).start().awaitTermination()// 关闭执行环境spark.stop()} }// 数据输出 // ------------------------------------------------------------- // | name1| sex| ts1| name2|age| ts2| // ------------------------------------------------------------- // |zhiling|female|2023-08-09 11:53:00|zhiling| 22|2023-08-09 11:53:00| // | ww|female|2023-08-09 11:52:00| ww| 20|2023-08-09 11:52:00| // | zs| male|2023-08-09 11:51:00| zs| 19|2023-08-09 11:51:00| // |fengjie|female|2023-08-09 11:54:00|fengjie| 98|2023-08-09 11:55:00| // | lisi|female|2023-08-09 11:50:00| lisi| 18|2023-08-09 11:50:00| // -------------------------------------------------------------6. Streaming DF/DS 不支持的操作 到目前, DF/DS 的有些操作 Streaming DF/DS 还不支持 多个Streaming 聚合(例如在 DF 上的聚合链)目前还不支持limit 和取前 N 行还不支持distinct 也不支持仅仅支持对 complete 模式下的聚合操作进行排序操作仅支持有限的外连接有些方法不能直接用于查询和返回结果, 因为他们用在流式数据上没有意义 count() 不能返回单行数据, 必须是s.groupBy().count()foreach() 不能直接使用, 而是使用: ds.writeStream.foreach(...)show() 不能直接使用, 而是使用 console sink 如果执行上面操作会看到这样的异常: operation XYZ is not supported with streaming DataFrames/Datasets。 注其他Spark相关系列文章链接由此进 -  Spark文章汇总
http://www.tj-hxxt.cn/news/140222.html

相关文章:

  • 深圳专业网站制作技术企业公司如何做网站
  • 长春建站模板展示网页设计与网站建设期末考试
  • 上海专业的网站建中卫企业管理培训网站
  • 帮传销做网站违法吗优化教育培训
  • 2015做导航网站好视频网站开发前景
  • 郑州做网站优化公购物网站排名第一的有哪些
  • 科技创新的评价机制的作用南京seo公司哪家好
  • 怎样建网站卖东西宝应百度贴吧
  • 响应式网站好不好平面设计网站导航
  • 安居客房产官方网站东莞网站开发后缀
  • 山西太原门户网站开发公司自媒体网站大全
  • 移动网站设计教程郑州网站建设技术支持
  • 郑州网站建设网站制作淘宝装修可以做代码的网站有哪些
  • 邵阳网站seo可以用wordpress的什么文件大小
  • 网站 关键词 出现频率企业展厅方案
  • 泰州专一做淘宝网站注册域名卖钱很暴利吗
  • 惠州技术支持网站建设网站开发设计心得
  • 如何建设一个网站网站手机版模板免费下载
  • 安宁区网站制作网站备案都审核什么资料
  • 本地网站可以做吗2020最近的新闻大事10条
  • 保安公司网站如何做常用的关键词挖掘工具有哪些
  • 做网站设计前景怎么样加强网站建设 实施政务公开
  • 网站开发流程可规划为那三个阶段竞价推广返点开户
  • 网站定制开发成本网络运营计划方案
  • 怎么制作网站发布茂名做网站dyiee
  • 网站每年要交钱吗做公司网站需要多少钱
  • 网站服务器是主机吗网站后台设计教程
  • 网络推广网络营销外包sem优化托管
  • 淄博网站制作营销房产达人
  • 学做网站知识wordpress全景图片