当前位置: 首页 > news >正文

昆明网站建设是什么缅甸今日新闻

昆明网站建设是什么,缅甸今日新闻,沈阳教做网站,数字货币交易网站开发怎么做一、sparkStreaming的不足 1.基于微批,延迟高不能做到真正的实时 2.DStream基于RDD,不直接支持SQL 3.流批处理的API应用层不统一,(流用的DStream-底层是RDD,批用的DF/DS/RDD) 4.不支持EventTime事件时间(一般流处理都会有两个时间:事件发生的事件&am…

一、sparkStreaming的不足

1.基于微批,延迟高不能做到真正的实时

2.DStream基于RDD,不直接支持SQL

3.流批处理的API应用层不统一,(流用的DStream-底层是RDD,批用的DF/DS/RDD)

4.不支持EventTime事件时间(一般流处理都会有两个时间:事件发生的事件,一个是事件处理的时间)

5.数据的Exactly-Once(恰好一次语义)需要手动实现

二、StructuredStreaming 的介绍 

1、2016年Spark2.0版本中发布

2、基于SparkSQL引擎的可扩展、容错的全新的流处理引擎。

3、并不是对Spark Streaming的简单改进,而是重新开发的全新流式引擎

准实时技术:来一批处理一批 实时:来一条处理一条 离线:一般都是处理一些静止的数据

三、socket+console

1、在虚拟机中下载nc
yum install -y nc2、启动 nc -lk 9999

案例:wordcount

import osfrom pyspark.sql import SparkSession
from pyspark.sql.functions import explode
import pyspark.sql.functions as F
if __name__ == '__main__':os.environ['JAVA_HOME'] = 'E:/java-configuration/jdk-8'# 配置Hadoop的路径,就是前面解压的那个路径os.environ['HADOOP_HOME'] = 'E:/applications/bigdata_config/hadoop-3.3.1/hadoop-3.3.1'# 配置base环境Python解析器的路径os.environ['PYSPARK_PYTHON'] = 'C:/Users/35741/miniconda3/python.exe'# 配置base环境Python解析器的路径os.environ['PYSPARK_DRIVER_PYTHON'] = 'C:/Users/35741/miniconda3/python.exe'# 创建一个sparkSession对象spark = SparkSession.builder.appName("socketDemo").getOrCreate()socketDf = spark.readStream.format("socket") \.option("host", "bigdata01") \.option("port", 9999) \.load()# 处理# 方式一:使用dsl语法splitDf = socketDf.select(explode(F.split(socketDf.value, " ")).alias("word"))resultDf1 = splitDf.groupBy("word").count()# 方式二:使用sqlsocketDf.createOrReplaceTempView("wordcount")resultDf2 = spark.sql("""with t1 as( select num from wordcount lateral view explode(split(value," ")) c as num)select num,count(*) counts from t1 group by num;""")# 下面的就是sink的写法 后续会写query1 = resultDf1.writeStream \.outputMode("complete") \.format("console") \.start()query2 = resultDf2.writeStream \.outputMode("complete") \.format("console") \.start() \.awaitTermination()spark.stop()

四、file+console

文件中的数据:
1;yuwen;43
1;shuxue;55
2;yuwen;77
2;shuxue;88
3;yuwen;98
3;shuxue;65
3;yingyu;88
import osfrom pyspark.sql import SparkSessionfrom pyspark.sql.types import StructField, StringType, DoubleType, LongType, IntegerType, StructTypeif __name__ == '__main__':os.environ['JAVA_HOME'] = 'E:/java-configuration/jdk-8'# 配置Hadoop的路径,就是前面解压的那个路径os.environ['HADOOP_HOME'] = 'E:/applications/bigdata_config/hadoop-3.3.1/hadoop-3.3.1'# 配置base环境Python解析器的路径os.environ['PYSPARK_PYTHON'] = 'C:/Users/35741/miniconda3/python.exe'# 配置base环境Python解析器的路径os.environ['PYSPARK_DRIVER_PYTHON'] = 'C:/Users/35741/miniconda3/python.exe'# 创建一个sparkSession对象spark = SparkSession.builder.appName("socketDemo").getOrCreate()# score_schema = StructType([#     StructField(name="stu_id", dataType=IntegerType(), nullable=False),#     StructField(name="subject_name", dataType=StringType(), nullable=True),#     StructField(name="score", dataType=DoubleType(), nullable=True)# ])score_schema = StructType().add("stu_id", IntegerType()).add("subject_name", StringType()).add("score",DoubleType())socketDf = spark.readStream.format("csv") \.option("sep", ";") \.schema(score_schema) \.load("../../resources/input1")socketDf.writeStream \.outputMode("append") \.format("console") \.option("truncate", False) \.start() \.awaitTermination()spark.stop()

http://www.tj-hxxt.cn/news/24637.html

相关文章:

  • 区块链微信小程序开发教程武汉做seo公司
  • 珠海学网站开发福建百度seo排名点击软件
  • 做亚马逊网站一般发什么快递百度指数查询移民
  • 十堰优化网站公司百度seo快速提升排名
  • 网站做301对优化有影响百度代理公司
  • 公司网站怎么管理竞价账户托管哪家好
  • 怎么做淘宝企业网站怎么制作网页里面的内容
  • 做soho外贸网站百度seo2022新算法更新
  • 牛推网官网武汉seo服务多少钱
  • 重庆网站推广计划seo 的原理和作用
  • 姜堰网站定制百度公司注册地址在哪里
  • 深圳b2b网站建设排名合肥网络公司seo
  • 北京双井网站建设seo学徒是做什么
  • 网站排名seo培训网络推广的平台有哪些
  • 咸宁做网站的公司那家便宜阿里云域名
  • 企业网站建设前言郑州聚商网络科技有限公司
  • 网站根据城市做二级目录微信营销模式有哪些
  • 灯饰如何做网站推广各引擎收录查询
  • 怎么维护网站教程保定百度seo公司
  • 网站建站那个好软文编辑器
  • 购物网站哪个是正品高级seo招聘
  • 北京做网站ezhixi网站制作软件
  • 网站用什么主机seo是什么岗位的缩写
  • 有没有做那事的网站百度快速排名培训
  • 怎样做能让招聘网站记住密码网络营销竞价推广
  • 网站前台修改百度惠生活商家入驻
  • 优化设计方法seo第三方点击软件
  • 常州妇幼做的无创 在哪个网站查seo怎么发布外链
  • 建设网站需要提供什么资料网站策划书
  • 电商网站建设思维导图广告投放优化师