当前位置: 首页 > news >正文

盈利的网站哪家网络公司比较好

盈利的网站,哪家网络公司比较好,广州百度推广排名优化,网页培训多少钱1.3 SparkStreaming与Kafka整合 1.3.1 整合简述 kafka是做消息的缓存#xff0c;数据和业务隔离操作的消息队列#xff0c;而sparkstreaming是一款准实时流式计算框架#xff0c;所以二者的整合#xff0c;是大势所趋。 ​ 二者的整合#xff0c;有主要的两大版本。 kaf…1.3 SparkStreaming与Kafka整合 1.3.1 整合简述 kafka是做消息的缓存数据和业务隔离操作的消息队列而sparkstreaming是一款准实时流式计算框架所以二者的整合是大势所趋。 ​ 二者的整合有主要的两大版本。 kafka作为一个实时的分布式消息队列实时的生产和消费消息在实际开发中Spark Streaming经常会结合Kafka来处理实时数据。Spark Streaming 与 kafka整合需要引入spark-streaming-kafka.jar该jar根据kafka版本有2个分支分别是spark-streaming-kafka-0-8 和 spark-streaming-kafka-0-10。jar包分支选择原则 0.10.0kafka版本0.8.2.1选择 08 接口 kafka版本0.10.0选择 010 接口 sparkStreaming和Kafka整合一般两种方式Receiver方式和Direct方式 Receiver方式(介绍) Receiver方式基于kafka的高级消费者API实现高级优点高级API写起来简单不需要去自行去管理offset系统通过zookeeper自行管理不需要管理分区副本等情况系统自动管理消费者断线会自动根据上一次记录在 zookeeper中的offset去接着获取数据高级缺点不能自行控制 offset不能细化控制如分区、副本、zk 等。Receiver从kafka接收数据存储在Executor中Spark Streaming 定时生成任务来处理数据。 默认配置的情况Receiver失败时有可能丢失数据。如果要保证数据的可靠性需要开启预写式日志简称WALWrite Ahead LogsSpark1.2引入只有接收到的数据被持久化之后才会去更新Kafka中的消费位移。接收到的数据和WAL存储位置信息被可靠地存储如果期间出现故障这些信息被用来从错误中恢复并继续处理数据。 还有几个需要注意的点 在Receiver的方式中Spark中的 partition 和 kafka 中的 partition 并不是相关的如果加大每个topic的partition数量仅仅是增加线程来处理由单一Receiver消费的主题。但是这并没有增加Spark在处理数据上的并行度 对于不同的 Group 和 Topic 可以使用多个 Receiver 创建不同的Dstream来并行接收数据之后可以利用union来统一成一个Dstream 如果启用了Write Ahead Logs复制到文件系统如HDFS那么storage level需要设置成 StorageLevel.MEMORY_AND_DISK_SER也就是KafkaUtils.createStream(..., StorageLevel.MEMORY_AND_DISK_SER) WAL将接收的数据备份到HDFS上保证了数据的安全性。但写HDFS比较消耗性能另外要在备份完数据之后还要写相关的元数据信息这样总体上增加job的执行时间增加了任务执行时间 总体上看 Receiver 方式不适于生产环境 1.3.2  Direct的方式 Direct方式从Spark1.3开始引入的通过 KafkaUtils.createDirectStream 方法创建一个DStream对象Direct方式的结构如下图所示。 Direct 方式特点如下 对应Kafka的版本 0.8.2.1 Direct 方式 Offset 可自定义 使用kafka低阶API 底层实现为KafkaRDD 该方式中Kafka的一个分区与Spark RDD对应通过定期扫描所订阅Kafka每个主题的每个分区的最新偏移量以确定当前批处理数据偏移范围。与Receiver方式相比Direct方式不需要维护一份WAL数据由Spark Streaming程序自己控制位移的处理通常通过检查点机制处理消费位移这样可以保证Kafka中的数据只会被Spark拉取一次。 引入依赖 dependencygroupIdorg.apache.spark/groupIdartifactIdspark-streaming-kafka-0-10_2.12/artifactIdversion3.1.2/version /dependency 模拟kafka生产数据 package com.qianfeng.sparkstreaming ​ import java.util.{Properties, Random} ​ import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord} ​ /*** 向kafka中test主题模拟生产数据也可以使用命令行生产kafka-console-producer.sh --broker-list qianfeng01:9092,hadoop02:9092,hadoop03:9092 -topic test*/ object Demo02_DataLoad2Kafka {def main(args: Array[String]): Unit {val prop new Properties()//提供Kafka服务器信息prop.put(bootstrap.servers,qianfeng01:9092)//指定响应的方式prop.put(acks,all)//请求失败重试的次数prop.put(retries,3)//指定key的序列化方式key是用于存放数据对应的offsetprop.put(key.serializer,org.apache.kafka.common.serialization.StringSerializer)//指定value的序列化方式prop.put(value.serializer,org.apache.kafka.common.serialization.StringSerializer)//创建producer对象val producer new KafkaProducer[String,String](prop)//提供一个数组数组中数据val arr Array(hello tom,hello jerry,hello dabao,hello zhangsan,hello lisi,hello wangwu,)//提供一个随机数随机获取数组中数据向kafka中进行发送存储val r new Random()while(true){val message arr(r.nextInt(arr.length))producer.send(new ProducerRecord[String,String](test,message))Thread.sleep(r.nextInt(1000))   //休眠1s以内}} } 实时消费kafka数据 package com.qianfeng.sparkstreaming ​ import org.apache.kafka.clients.consumer.ConsumerConfig import org.apache.spark.SparkConf import org.apache.spark.streaming.dstream.DStream import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies} import org.apache.spark.streaming.{Seconds, StreamingContext} ​ ​ /*** sparkStreaming消费Kafka中的数据*/ object Demo03_SparkStreamingWithKafka {def main(args: Array[String]): Unit {//1.创建SparkConf对象val conf new SparkConf().setAppName(SparkStreamingToKafka).setMaster(local[*])//2.提供批次时间val time Seconds(5)//3.提供StreamingContext对象val sc new StreamingContext(conf, time)//4.提供Kafka配置参数val kafkaConfig Map[String, Object](ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG - qianfeng01:9092,ConsumerConfig.GROUP_ID_CONFIG - qianfeng,key.deserializer - org.apache.kafka.common.serialization.StringDeserializer,value.deserializer - org.apache.kafka.common.serialization.StringDeserializer,)//5.读取Kafka中数据信息生成DStreamval value KafkaUtils.createDirectStream(sc,//本地化策略将Kafka的分区数据均匀的分配到各个执行Executor中LocationStrategies.PreferConsistent,//表示要从使用kafka进行消费【offset谁来管理从那个位置开始消费数据】ConsumerStrategies.Subscribe[String, String](Set(test), kafkaConfig))//6.将每条消息kv获取出来val line: DStream[String] value.map(record record.value())//7.开始计算操作line.flatMap(_.split( )).map((_, 1)).reduceByKey(_ _).print()//line.count().print()   //每隔5s的数据条数//8.开始任务sc.start()sc.awaitTermination()} } 说明 简化的并行性不需要创建多个输入Kafka流并将其合并。 使用directStreamSpark Streaming将创建与使用Kafka分区一样多的RDD分区这些分区将全部从Kafka并行读取数据。 所以在Kafka和RDD分区之间有一对一的映射关系。 效率在第一种方法中实现零数据丢失需要将数据存储在预写日志中这会进一步复制数据。这实际上是效率低下的因为数据被有效地复制了两次:一次是Kafka另一次是由预先写入日志WriteAhead Log复制。这个第二种方法消除了这个问题因为没有接收器因此不需要预先写入日志。只要Kafka数据保留时间足够长。 正好一次Exactly-once的语义第一种方法使用Kafka的高级API来在Zookeeper中存储消耗的偏移量。传统上这是从Kafka消费数据的方式。虽然这种方法结合提前写入日志可以确保零数据丢失即至少一次语义但是在某些失败情况下有一些记录可能会消费两次。发生这种情况是因为Spark Streaming可靠接收到的数据与Zookeeper跟踪的偏移之间的不一致。因此在第二种方法中我们使用不使用Zookeeper的简单Kafka API。在其检查点内Spark Streaming跟踪偏移量。这消除了Spark Streaming和Zookeeper/Kafka之间的不一致因此Spark Streaming每次记录都会在发生故障的情况下有效地收到一次。为了实现输出结果的一次语义将数据保存到外部数据存储区的输出操作必须是幂等的或者是保存结果和偏移量的原子事务。 Guff_hys_python数据结构,大数据开发学习,python实训项目-CSDN博客
http://www.tj-hxxt.cn/news/223054.html

相关文章:

  • 怀化网站建设设计免费建立一个个人网站
  • 免费建材网站模板指数计算器
  • 揭阳制作公司网站网站单个页面做301
  • 湛江seo网站推广怎么做带后台的网站
  • 创建网站投资多少钱赶集网的二级域名网站怎么做
  • 电商数据网站如何下载js做的网站
  • 网站建设公司广告词大邯郸网站
  • 内蒙古网站建设网页版微信二维码不出来
  • 网站分为哪几个部分外包 网站开发公司
  • 彩票黑网站是怎么做的公司做网站需要准备什么条件
  • ps网站怎么做超链接做网站要学什么语言
  • 丰和园林建设集团网站wordpress熊掌号插件
  • 深圳如何搭建制作网站昆明网站建设建站模板
  • 乐陵人力资源网站学校网站代码模板
  • 好单库网站是怎么做的镇江关键词优化如何
  • 网站首页设计大赛国外网站建设软件
  • 网站提示域名重定向怎么做旅游网站名字
  • html基础网站建设优化seo
  • 网页设计作业 个人网站wordpress 个人站
  • 做外贸去哪些网站找老外人人商城源码
  • 爱空间网站模板wordpress模板制作软件
  • 青岛网站制作公司排名重庆喷绘制作
  • 成都网站开发外包网站建设 服务流程
  • 营销策划好的网站苏州seo优化公司
  • 做微商好还是开网站好没有网站可以做cpa
  • 长安做英文网站企业服务公司是干嘛的
  • 建设网站公司怎样的网站打开速度块
  • 青岛做网站哪家专业设计师 网站 贵
  • 专门做运动装备的网站做网站去哪里找广告主
  • 如何做网站的源码绍兴百度seo公司