深圳网站建设公司招聘电话销售,甜蜜高端定制网站,网站建设与网页设计案例教程pdf下载,财务公司网站模板Spark运行架构以及容错机制 1. Spark的角色区分1.1 Driver1.2 Excuter 2. Spark-Cluster模式的任务提交流程2.1 Spark On Yarn的任务提交流程2.1.1 yarn相关概念2.1.2 任务提交流程 2.2 Spark On K8S的任务提交流程2.2.1 k8s相关概念2.2.2 任务提交流程 3. Spark-Cluster模式的… Spark运行架构以及容错机制 1. Spark的角色区分1.1 Driver1.2 Excuter 2. Spark-Cluster模式的任务提交流程2.1 Spark On Yarn的任务提交流程2.1.1 yarn相关概念2.1.2 任务提交流程 2.2 Spark On K8S的任务提交流程2.2.1 k8s相关概念2.2.2 任务提交流程 3. Spark-Cluster模式的容灾模式3.1 Driver容灾3.2 Executor容灾3.3 RDD容错 4. 疑问和思考4.1 是否可以部署多个Driver形成HA模式如果主Driver宕机备Driver自动接替 5. 参考文档 spark是一个开发框架用于进行数据批处理本文主要探讨Spark任务运行的的架构。由于在日常生产环境中常用的是spark on yarn 和spark on k8s两种类型的模式因此本文也主要探讨这两种类型的异同以及不同角色的容错机制。 1. Spark的角色区分
1.1 Driver
Spark的驱动器节点负责运行Spark程序中的main方法执行实际的代码。Driver在Spark作业时主要负责
将用户程序转化为作业job负责跟RMyarn或者 Apiserverk8s申请资源调度并拉起Excutor协调和分配Executor之间的任务task监控Executor的执行状态通过UI展示运行情况。
1.2 Excuter
Executor是Spark程序中的一个JVM进程负责执行Spark作业的具体任务task每个任务之间彼此相互独立。Spark应用启动时Executor同时被启动并且伴随着Spark程序的生命周期而存在。如果有Executor节点发生了故障程序也不会停止运行而是将出错的Executor节点上的任务调度到其他Executor节点运行。
Executor的核心功能
运行Spark作业中具体的任务并且将执行结果返回给Driver。通过自身的块管理器Block Manager对用户要求缓存的RDD进行内存式存储。RDD式缓存在Executor进程内部的这样任务在运行时可以充分利用缓存数据加速运算。
2. Spark-Cluster模式的任务提交流程
2.1 Spark On Yarn的任务提交流程
2.1.1 yarn相关概念
RMResourceManager:
即资源管理在YARN中RM负责集群中所有资源的统一管理和分配它接收来自各个节点NM的资源汇报信息并把这些信息按照一定的策略分配给各个应用程序实际上是AM
NMNodeManager: NM是运行在单个节点上的代理它需要与应用程序的AM和集群管理者RM交互
从AM上接收有关Container的命令并执行之比如启动、停止Container向RM汇报各个Container运行状态和节点健康状况并领取有关Container的命令比如清理Container执行之。
AMApplicationMaster:
用户提交的每个应用程序均包含一个AM它可以运行在RM以外的机器上负责主要负责
与RM调度器协商以获取资源用Container表示将得到的任务进一步分配给内部的任务(资源的二次分配)。与NM通信以启动/停止任务。监控所有任务运行状态并在任务运行失败时重新为任务申请资源以重启任务。
注RM只负责监控AM并在AM运行失败时候启动它。RM不负责AM内部任务的容错任务的容错由AM完成。
在Yarn任务的启动流程中
client优先跟RM获取NM资源并启动AM在Cluster模式下AM启动后client就可以退出了AM构建任务信息并RM获取NM资源并启动Executor并将task信息分配给Executor从而实现任务启动Executor需要跟AM进行心跳汇报如果Executor异常相关的拉起动作也是有AM来控制。
2.1.2 任务提交流程
Driver和AM是两个完全不同的东西Driver是控制Spark计算和任务资源的而AM是控制yarn app运行和任务资源的。在Spark On Yarn模式中Driver运行在AM上Driver会和AM通信资源的申请由AppMaster来完成而任务的调度和执行则由Driver完成Driver会通过与AppMaster通信来让Executor的执行具体的任务。
任务提交流程图 执行过程
Client向YARN中提交应⽤程序包括AM程序、启动AM的命令、需要在Executor中运⾏的程序等RM收到请求后在集群中选择⼀个NM为该应⽤程序分配第⼀个Container要求它在这个Container中启动应⽤程序的AM进行SparkContext(Driver)等的初始化AM向RM注册这样⽤户可以直接通过RM查看应⽤程序的运⾏状态然后它将采⽤轮询的⽅式通过RPC协议为各个任务申请资源并监控它们的运⾏状态直到运⾏结束⼀旦AM申请到资源也就是Container后便与对应的NM通信要求它在获得的Container中启动ExecutorExecutor启动后会向 AM中的SparkContext(Driver)注册并申请Task。AM中的SparkContext(Driver)分配Task给Executor执⾏运⾏Task并向AM中的SparkContext(Driver)的汇报运⾏的状态和进度以让 AM中的SparkContext(Driver)随时掌握各个任务的运⾏状态从⽽可以在任务失败时重新启动任务应⽤程序运⾏完成后AM中的SparkContext(Driver)向NM申请注销并关闭⾃⼰ 6.应⽤程序运⾏完成后AM向NM申请注销并关闭⾃⼰
YARN-Cluster的执行需要安装spark 客户端并执行如下命令提交任务
spark-submit \
--class org.apache.spark.examples.SparkPi \
--master yarn --deploy-mode cluster \
--num-executors 1 \
/Users/ly/apps/spark-2.2.0-bin-hadoop2.7/examples/jars/spark-examples_2.11-2.2.0.jar 102.2 Spark On K8S的任务提交流程
Spark 2.3开始Spark官方就开始支持Kubernetes作为新的资源调度模式。
2.2.1 k8s相关概念
Master: Kubernetes里的Master指的是集群控制节点每一个Kubernetes集群里都必须要有一个Master节点来负责整个集群的管理和控制基本上Kubernetes的所有控制命令都发给它它来负责具体的执行过程我们后面执行的所有命令基本都是在Master节点上运行的
Node: Node节点是Kubernetes集群中的工作负载节点每个Node都会被Master分配一些应用程序服务以及云工作流。
2.2.2 任务提交流程
总体提交流程如下 可以通过spark原生提交方式和 spark-on-k8s-operator提交 两种方式进行提交两种方式实现上有些差异但是总体流程是一致的。
1 spark原生提交方式
需要安装spark 客户端并执行如下命令提交任务
bin/spark-submit \--master k8s://https://{k8s-apiserver-host}:6443 \--deploy-mode cluster \--name spark-wordcount-example \--class org.apache.spark.examples.JavaWordCount \local:///opt/spark/examples/target/scala-2.11/jars/spark-examples_2.11-2.4.5.jar \oss://{wordcount-file-oss-bucket}/2 spark-on-k8s-operator提交
spark-on-k8s-operator[2]可以让用户以CRD(CustomResourceDefinition) [4] 的方式提交和管理Spark作业。这种方式能够更好的利用k8s原生的能力具备更好的扩展性。并且在此之上增加了定时任务、重试、监控等一系列功能。具体的功能特性可以在github查看官方文档kubernetes官方推荐
需要 1 需要提前在k8s集群中安装此时会启动一个名为sparkoperator的pod 2定义提交spark任务的相关CRD资源 3提交作业时无需准备一个具备Spark环境的Client直接通过kubectl或者kubernetes api就可以提交Spark作业。
列入一个crd命名spark.yaml
apiVersion: sparkoperator.k8s.io/v1beta2
kind: SparkApplication
metadata:name: spark-wordcount-examplenamespace: default
spec:type: JavasparkVersion: 2.4.5mainClass: org.apache.spark.examples.JavaWordCountimage: {Spark镜像地址}mainApplicationFile: local:///opt/spark/examples/target/scala-2.11/jars/spark-examples_2.11-2.4.5.jararguments:- oss://{wordcount-file-oss-bucket}/driver:cores: 1coreLimit: 1000mmemory: 4gexecutor:cores: 1coreLimit: 1000mmemory: 4gmemoryOverhead: 1ginstances: 2执行如下命令即可启动相关的pod并进行提交任务
kubectl apply -f spark.yaml3. Spark-Cluster模式的容灾模式
3.1 Driver容灾
Driver异常退出时一般要使用checkpoint重启Driver重新构造上下文并重启接收器。 第一步恢复检查点记录的元数据块。 第二步未完成作业的重新形成。由于失败而没有处理完成的RDD将使用恢复的元数据重新生成RDD然后运行后续的Job重新计算后恢复。
3.2 Executor容灾
Executor异常是日常生产环境中最常遇到的现象造成的原因很多最常见的是由于机器故障从而导致就上运行的Executor异常。
Executor异常退出时Driver没有在规定时间内收到执行器的状态更新于是Driver会将注册的Executor移除并通过调度器自动重新拉起Executor。新启动的Executor会重新注册到Driver中Driver会根据DAG给Executor重新分配相关的Task。Executor分配到到来自Driver的Task需要重checkpoint重新加载数据并继续执行计算。Spark运算数据行程DAG如果遇到不同的Executor之间有数据交互时比如ExecutorA的数据聚合依赖于ExecutorB和ExecutorCExecutorB宕机ExecutorA的数据聚合也不准确不能简单的通过启动对应的Executor相关的数据进行恢复可能会有数据紊乱通常恢复的时间较久。
3.3 RDD容错
窄依赖 指父RDD的每一个分区最多被一个子RDD的分区所用表现为一个父RDD的分区对应于一个子RDD的分区 或多个父RDD的分区对应于一个子RDD的分区也就是说一个父RDD的一个分区不可能对应一个子RDD的多个分区。
宽依赖 指子RDD的分区依赖于父RDD的多个分区或所有分区即存在一个父RDD的一个分区对应一个子RDD的多个分区。
checkpoint机制 是为了通过lineage做容错的辅助lineage过长会造成容错成本过高这样就不如在中间阶段做检查点容错如果之后有节点出现问题而丢失分区从做检查点的RDD开始重做Lineage就会减少开销。
注意 1 在容错机制中如果一个节点死机了而且运算窄依赖则只要把丢失的父RDD分区重算即可不依赖于其他节点。 2 而宽依赖需要父RDD的所有分区都存在重算就很昂贵了。如果恢复的代价过于昂贵就会通过checkpoints重新进行计算。 3利用checkpoint机制记载最新的数据计算点重新拉起任务进行计算
4. 疑问和思考
4.1 是否可以部署多个Driver形成HA模式如果主Driver宕机备Driver自动接替
可以基于ZK进行选主。
5. 参考文档
Spark 容错以及高可用性HASpark 容错机制Spark on Kubernetes作业执行流程Spark on Yarn运行机制