烟台网站建设策划方案,做网站要哪些人员,微信 微网站开发教程,海南网站建设粤icp备引言
随着大数据的发展#xff0c;任务调度系统成为了数据处理和管理中至关重要的部分。Apache DolphinScheduler 是一款优秀的开源分布式工作流调度平台#xff0c;在大数据场景中得到广泛应用。 在本文中#xff0c;我们将对 Apache DolphinScheduler 1.3.9 版本的源码进…引言
随着大数据的发展任务调度系统成为了数据处理和管理中至关重要的部分。Apache DolphinScheduler 是一款优秀的开源分布式工作流调度平台在大数据场景中得到广泛应用。 在本文中我们将对 Apache DolphinScheduler 1.3.9 版本的源码进行深入分析介绍 Master 启动以及调度流程。
通过这些分析开发者可以更好地理解 DolphinScheduler 的工作机制并在实际使用中更高效地进行二次开发或优化。
Master Server启动
启动流程图 Master调度工作流流程图 MasterServer启动方法
public void run() {// init remoting serverNettyServerConfig serverConfig new NettyServerConfig();serverConfig.setListenPort(masterConfig.getListenPort());this.nettyRemotingServer new NettyRemotingServer(serverConfig);this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_RESPONSE, new TaskResponseProcessor());this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_ACK, new TaskAckProcessor());this.nettyRemotingServer.registerProcessor(CommandType.TASK_KILL_RESPONSE, new TaskKillResponseProcessor());this.nettyRemotingServer.start();// self tolerantthis.zkMasterClient.start();this.zkMasterClient.setStoppable(this);// scheduler startthis.masterSchedulerService.start();// start QuartzExecutors// what system should do if exceptiontry {logger.info(start Quartz server...);QuartzExecutors.getInstance().start();} catch (Exception e) {try {QuartzExecutors.getInstance().shutdown();} catch (SchedulerException e1) {logger.error(QuartzExecutors shutdown failed : e1.getMessage(), e1);}logger.error(start Quartz failed, e);}/*** register hooks, which are called before the process exits*/Runtime.getRuntime().addShutdownHook(new Thread(() - {if (Stopper.isRunning()) {close(shutdownHook);}}));}
nettyServer会注册三种Command
TASK_EXECUTE_ACKWorker在接收到Master执行任务的请求后会给Master发送一条Ack Command告诉Master已经开始执行Task了。TASK_EXECUTE_RESPONSEWorker在执行完Task之后会给Master发送一条Response Command告诉Master任务调度/执行结果。TASK_KILL_RESPONSEMaster接收到Task停止的请求会会给Worker发送TASK_KILL_REQUEST Command之后Worker会把Task_KILL_RESPONSE Command返回给Master。
启动调度和定时器。添加ShutdownHook关闭资源。
Master 配置文件
master.listen.port5678# 限制Process Instance并发调度的线程数
master.exec.threads100# 限制每个ProcessInstance可以执行的任务数
master.exec.task.num20# 每一批次可以分发的任务数
master.dispatch.task.num3# master需要选择一个稳定的worker去执行任务
# 算法有RandomRoundRobinLowerWeight。默认是LowerWeight
master.host.selectorLowerWeight# master需要向Zookeeper发送心跳单位秒
master.heartbeat.interval10# master提交任务失败重试次数
master.task.commit.retryTimes5# master提交任务失败重试时间间隔
master.task.commit.interval1000# master最大cpu平均负载只有当系统cpu平均负载还没有达到这个值master才能调度任务
# 默认值为-1系统cpu核数 * 2
master.max.cpuload.avg-1# master为其他进程保留内存只有当系统可用内存大于这个值master才能调度
# 默认值0.3G
master.reserved.memory0.3
Master Scheduler启动
MasterSchedulerService初始化方法
public void init(){// masterConfig.getMasterExecThreads()master.properties里master.exec.threads100// 该线程池的核心线程数和最大线程数为100this.masterExecService (ThreadPoolExecutor)ThreadUtils.newDaemonFixedThreadExecutor(Master-Exec-Thread, masterConfig.getMasterExecThreads());NettyClientConfig clientConfig new NettyClientConfig();this.nettyRemotingClient new NettyRemotingClient(clientConfig);
}
MasterSchedulerService启动方法
public void run() {logger.info(master scheduler started);while (Stopper.isRunning()){try {// 这个方法是用来检查master cpu load和memory判断master是否还有资源进行调度// 如果不能调度Sleep 1 秒种boolean runCheckFlag OSUtils.checkResource(masterConfig.getMasterMaxCpuloadAvg(), masterConfig.getMasterReservedMemory());if(!runCheckFlag) {Thread.sleep(Constants.SLEEP_TIME_MILLIS);continue;}if (zkMasterClient.getZkClient().getState() CuratorFrameworkState.STARTED) {// 这里才是真正去执行调度的方法scheduleProcess();}} catch (Exception e) {logger.error(master scheduler thread error, e);}}
}
MasterSchedulerService调度方法
private void scheduleProcess() throws Exception {InterProcessMutex mutex null;try {// 阻塞式获取分布式锁mutex zkMasterClient.blockAcquireMutex();// 获取线程池的活跃线程数int activeCount masterExecService.getActiveCount();// make sure to scan and delete command table in one transaction// 获取其中一个command必须保证操作都在一个事务里Command command processService.findOneCommand();if (command ! null) {logger.info(find one command: id: {}, type: {}, command.getId(),command.getCommandType());try{// 获取ProcessInstance// 这个方法会根据master.exec.threads配置和活跃线程数来判断是否可以调度processInstanceProcessInstance processInstance processService.handleCommand(logger,getLocalAddress(),this.masterConfig.getMasterExecThreads() - activeCount, command);if (processInstance ! null) {logger.info(start master exec thread , split DAG ...);masterExecService.execute(new MasterExecThread(processInstance, processService, nettyRemotingClient));}}catch (Exception e){logger.error(scan command error , e);processService.moveToErrorCommand(command, e.toString());}} else{//indicate that no command ,sleep for 1sThread.sleep(Constants.SLEEP_TIME_MILLIS);}} finally{// 释放分布式锁zkMasterClient.releaseMutex(mutex);}
}
ProcessService处理Command的方法
public ProcessInstance handleCommand(Logger logger, String host, int validThreadNum, Command command) {// 这里是去构造ProcessInstanceProcessInstance processInstance constructProcessInstance(command, host);//cannot construct process instance, return null;if(processInstance null){logger.error(scan command, command parameter is error: {}, command);moveToErrorCommand(command, process instance is null);return null;}// 这里是检测当前剩余线程数是否大于等于该ProcessDefinition及其所有子Process的数量// 如果检测不通过process instance的状态变为wait thread.并且返回空的process instanceif(!checkThreadNum(command, validThreadNum)){logger.info(there is not enough thread for this command: {}, command);return setWaitingThreadProcess(command, processInstance);}processInstance.setCommandType(command.getCommandType());processInstance.addHistoryCmd(command.getCommandType());saveProcessInstance(processInstance);this.setSubProcessParam(processInstance);delCommandByid(command.getId());return processInstance;
}
MasterExecThread初始化方法
public MasterExecThread(ProcessInstance processInstance, ProcessService processService, NettyRemotingClient nettyRemotingClient){this.processService processService;this.processInstance processInstance;this.masterConfig SpringApplicationContext.getBean(MasterConfig.class);// master.properties文件里的master.task.exec.numint masterTaskExecNum masterConfig.getMasterExecTaskNum();this.taskExecService ThreadUtils.newDaemonFixedThreadExecutor(Master-Task-Exec-Thread,masterTaskExecNum);this.nettyRemotingClient nettyRemotingClient;
}
MasterExecThread启动方法
public void run() {// 省略...try {if (processInstance.isComplementData() Flag.NO processInstance.getIsSubProcess()){// 补数逻辑... 暂不看executeComplementProcess();}else{// 执行task方法executeProcess();}}catch (Exception e){logger.error(master exec thread exception, e);logger.error(process execute failed, process id:{}, processInstance.getId());processInstance.setState(ExecutionStatus.FAILURE);processInstance.setEndTime(new Date());processService.updateProcessInstance(processInstance);}finally {taskExecService.shutdown();}
}private void executeProcess() throws Exception {// 前置prepareProcess();// 执行runProcess();// 后置endProcess();
}private void runProcess(){// 从根task开始提交submitPostNode(null);boolean sendTimeWarning false;while(!processInstance.isProcessInstanceStop() Stopper.isRunning()){// 省略部分代码...// 根据cpu load avg和Memorry判断是否可以调度if(canSubmitTaskToQueue()){submitStandByTask();}try {Thread.sleep(Constants.SLEEP_TIME_MILLIS);} catch (InterruptedException e) {logger.error(e.getMessage(),e);}updateProcessInstanceState();}logger.info(process:{} end, state :{}, processInstance.getId(), processInstance.getState());
}// 获取可以并行的task
/**
* task 1 - task 2 - task3
* task 4 - task 5
* task 6
* task 1,task4,task6可以并行跑
*/
private void submitPostNode(String parentNodeName){SetString submitTaskNodeList DagHelper.parsePostNodes(parentNodeName, skipTaskNodeList, dag, completeTaskList);ListTaskInstance taskInstances new ArrayList();for(String taskNode : submitTaskNodeList){taskInstances.add(createTaskInstance(processInstance, taskNode,dag.getNode(taskNode)));}// if previous node success , post node submitfor(TaskInstance task : taskInstances){if(readyToSubmitTaskQueue.contains(task)){continue;}if(completeTaskList.containsKey(task.getName())){logger.info(task {} has already run success, task.getName());continue;}if(task.getState().typeIsPause() || task.getState().typeIsCancel()){logger.info(task {} stopped, the state is {}, task.getName(), task.getState());}else{// task添加到priorityQueueaddTaskToStandByList(task);}}
}/*** handling the list of tasks to be submitted*/
private void submitStandByTask(){try {int length readyToSubmitTaskQueue.size();for (int i0;ilength;i) {// 从队列里面取task, 提交给worker执行TaskInstance task readyToSubmitTaskQueue.peek();// 先判断task的前置依赖有没有都运行成功如果运行成功在提交该task运行// 如果运行失败或者没有执行则不提交DependResult dependResult getDependResultForTask(task);if(DependResult.SUCCESS dependResult){if(retryTaskIntervalOverTime(task)){submitTaskExec(task);removeTaskFromStandbyList(task);}}else if(DependResult.FAILED dependResult){// if the dependency fails, the current node is not submitted and the state changes to failure.dependFailedTask.put(task.getName(), task);removeTaskFromStandbyList(task);logger.info(task {},id:{} depend result : {},task.getName(), task.getId(), dependResult);} else if (DependResult.NON_EXEC dependResult) {// for some reasons(depend task pause/stop) this task would not be submitremoveTaskFromStandbyList(task);logger.info(remove task {},id:{} , because depend result : {}, task.getName(), task.getId(), dependResult);}}} catch (Exception e) {logger.error(submit standby task error,e);}
}/**
* 创建TaskExecThread
*/
private TaskInstance submitTaskExec(TaskInstance taskInstance) {MasterBaseTaskExecThread abstractExecThread null;if(taskInstance.isSubProcess()){abstractExecThread new SubProcessTaskExecThread(taskInstance);}else if(taskInstance.isDependTask()){abstractExecThread new DependentTaskExecThread(taskInstance);}else if(taskInstance.isConditionsTask()){abstractExecThread new ConditionsTaskExecThread(taskInstance);}else {abstractExecThread new MasterTaskExecThread(taskInstance);}FutureBoolean future taskExecService.submit(abstractExecThread);activeTaskNode.putIfAbsent(abstractExecThread, future);return abstractExecThread.getTaskInstance();
}
MasterBaseTaskExecThread
MasterBaseTaskExecThread是SubProcessTaskExecThreadDependentTaskExecThreadConditionsTaskExecThreadMasterTaskExecThread的父类实现Callable接口。
SubProcessTaskExecThread 任务实例不会下发到worker节点执行在submitTask(TaskInstance taskInstance)方法中针对子流程会增加一条子流程实例命令然后在waitTaskQuit方法中循环等待子流程执行完成。在当前工作流运行结束后会继续运行子工作流并做相关状态更新子工作流完全完成才同步状态为子工作流的状态。 DependentTaskExecThread Dependent 节点就是依赖检查节点。比如 A 流程依赖昨天的 B 流程执行成功依赖节点会去检查 B 流程在昨天是否有执行成功的实例。 ConditionsTaskExecThrea Conditions 是一个条件节点根据上游任务运行状态判断应该运行哪个下游任务。截止目前 Conditions 支持多个上游任务但只支持两个下游任务。当上游任务数超过一个时可以通过且以及或操作符实现复杂上游依赖。 MasterTaskExecThread 将任务实例下发到worker节点执行并在waitTaskQuit方法中循环等待任务实例执行完成任务完成后则即出。例如SQKLShell等任务类型。
MasterBaseTaskExecThread初始化方法
public MasterBaseTaskExecThread(TaskInstance taskInstance){this.processService SpringApplicationContext.getBean(ProcessService.class);this.alertDao SpringApplicationContext.getBean(AlertDao.class);this.cancel false;this.taskInstance taskInstance;this.masterConfig SpringApplicationContext.getBean(MasterConfig.class);this.taskUpdateQueue SpringApplicationContext.getBean(TaskPriorityQueueImpl.class);initTaskParams();
}
MasterBaseTaskExecThread执行方法
Override
public Boolean call() throws Exception {this.processInstance processService.findProcessInstanceById(taskInstance.getProcessInstanceId());return submitWaitComplete(); // 由各子类实现
}
MasterBaseTaskExecThread公共方法
submit()
protected TaskInstance submit(){// 提交任务重试次数. master.task.commit.retryTimes5Integer commitRetryTimes masterConfig.getMasterTaskCommitRetryTimes();// 提交任务失败重试间隔时间 master.task.commit.interval1000Integer commitRetryInterval masterConfig.getMasterTaskCommitInterval();int retryTimes 1;boolean submitDB false;boolean submitTask false;TaskInstance task null;while (retryTimes commitRetryTimes){try {if(!submitDB){// 持久化TaskInstance到数据库task processService.submitTask(taskInstance);if(task ! null task.getId() ! 0){submitDB true;}}if(submitDB !submitTask){// 分发任务到Woroker执行submitTask dispatchTask(task);}if(submitDB submitTask){return task;}if(!submitDB){logger.error(task commit to db failed , taskId {} has already retry {} times, please check the database, taskInstance.getId(), retryTimes);}else if(!submitTask){logger.error(task commit failed , taskId {} has already retry {} times, please check, taskInstance.getId(), retryTimes);}Thread.sleep(commitRetryInterval);} catch (Exception e) {logger.error(task commit to mysql and dispatcht task failed,e);}retryTimes 1;}return task;
}
dispatchTask(TaskInstance task)
public Boolean dispatchTask(TaskInstance taskInstance) {try{// 如果是子流程条件任务依赖任务直接返回true不提交给worker执行if(taskInstance.isConditionsTask()|| taskInstance.isDependTask()|| taskInstance.isSubProcess()){return true;}if(taskInstance.getState().typeIsFinished()){logger.info(String.format(submit task , but task [%s] state [%s] is already finished. , taskInstance.getName(), taskInstance.getState().toString()));return true;}// task cannot submit when runningif(taskInstance.getState() ExecutionStatus.RUNNING_EXECUTION){logger.info(String.format(submit to task, but task [%s] state already be running. , taskInstance.getName()));return true;}logger.info(task ready to submit: {}, taskInstance);/*** taskPriority*/TaskPriority taskPriority buildTaskPriority(processInstance.getProcessInstancePriority().getCode(),processInstance.getId(),taskInstance.getProcessInstancePriority().getCode(),taskInstance.getId(),org.apache.dolphinscheduler.common.Constants.DEFAULT_WORKER_GROUP);// 放入TaskPriorityQueue中// org.apache.dolphinscheduler.service.queue.TaskPriorityQueueImpl用于消费从队列里取出TaskInstance提交给Worker执行taskUpdateQueue.put(taskPriority);logger.info(String.format(master submit success, task : %s, taskInstance.getName()) );return true;}catch (Exception e){logger.error(submit task Exception: , e);logger.error(task error : %s, JSONUtils.toJson(taskInstance));return false;}
}
MasterTaskExecThread
submitWaitComplete()
public Boolean submitWaitComplete() {Boolean result false;// 提交任务this.taskInstance submit();if(this.taskInstance null){logger.error(submit task instance to mysql and queue failed , please check and fix it);return result;}if(!this.taskInstance.getState().typeIsFinished()) {// 等待任务执行结果result waitTaskQuit();}taskInstance.setEndTime(new Date());processService.updateTaskInstance(taskInstance);logger.info(task :{} id:{}, process id:{}, exec thread completed ,this.taskInstance.getName(),taskInstance.getId(), processInstance.getId() );return result;
}
waitTaskQuit()
public Boolean waitTaskQuit(){// query new statetaskInstance processService.findTaskInstanceById(taskInstance.getId());logger.info(wait task: process id: {}, task id:{}, task name:{} complete,this.taskInstance.getProcessInstanceId(), this.taskInstance.getId(), this.taskInstance.getName());while (Stopper.isRunning()){try {if(this.processInstance null){logger.error(process instance not exists , master task exec thread exit);return true;}// task instance add queue , waiting worker to killif(this.cancel || this.processInstance.getState() ExecutionStatus.READY_STOP){cancelTaskInstance();}if(processInstance.getState() ExecutionStatus.READY_PAUSE){pauseTask();}// task instance finishedif (taskInstance.getState().typeIsFinished()){// if task is final result , then remove taskInstance from cache// taskInstanceCacheManager其实现类为org.apache.dolphinscheduler.server.master.cache.impl.TaskInstanceCacheManagerImpl// taskInstance在触发ack和response Command会被添加到taskInstanceCache里taskInstanceCacheManager.removeByTaskInstanceId(taskInstance.getId());break;}if (checkTaskTimeout()) {this.checkTimeoutFlag !alertTimeout();}// updateProcessInstance task instancetaskInstance processService.findTaskInstanceById(taskInstance.getId());processInstance processService.findProcessInstanceById(processInstance.getId());Thread.sleep(Constants.SLEEP_TIME_MILLIS);} catch (Exception e) {logger.error(exception,e);if (processInstance ! null) {logger.error(wait task quit failed, instance id:{}, task id:{},processInstance.getId(), taskInstance.getId());}}}return true;
}
SubProcessTaskExecThread
submitWaitComplete()
public Boolean submitWaitComplete() {Boolean result false;try{// submit task instancethis.taskInstance submit();if(taskInstance null){logger.error(sub work flow submit task instance to mysql and queue failed , please check and fix it);return result;}setTaskInstanceState();waitTaskQuit();subProcessInstance processService.findSubProcessInstance(processInstance.getId(), taskInstance.getId());// at the end of the subflow , the task state is changed to the subflow stateif(subProcessInstance ! null){if(subProcessInstance.getState() ExecutionStatus.STOP){this.taskInstance.setState(ExecutionStatus.KILL);}else{this.taskInstance.setState(subProcessInstance.getState());}}taskInstance.setEndTime(new Date());processService.updateTaskInstance(taskInstance);logger.info(subflow task :{} id:{}, process id:{}, exec thread completed ,this.taskInstance.getName(),taskInstance.getId(), processInstance.getId() );result true;}catch (Exception e){logger.error(exception: ,e);if (null ! taskInstance) {logger.error(wait task quit failed, instance id:{}, task id:{},processInstance.getId(), taskInstance.getId());}}return result;
}
waitTaskQuit()
private void waitTaskQuit() throws InterruptedException {logger.info(wait sub work flow: {} complete, this.taskInstance.getName());if (taskInstance.getState().typeIsFinished()) {logger.info(sub work flow task {} already complete. task state:{}, parent work flow instance state:{},this.taskInstance.getName(),this.taskInstance.getState(),this.processInstance.getState());return;}while (Stopper.isRunning()) {// waiting for subflow process instance establishmentif (subProcessInstance null) {Thread.sleep(Constants.SLEEP_TIME_MILLIS);if(!setTaskInstanceState()){continue;}}subProcessInstance processService.findProcessInstanceById(subProcessInstance.getId());if (checkTaskTimeout()) {this.checkTimeoutFlag !alertTimeout();handleTimeoutFailed();}updateParentProcessState();if (subProcessInstance.getState().typeIsFinished()){break;}if(this.processInstance.getState() ExecutionStatus.READY_PAUSE){// parent process ready to pause , child process pausepauseSubProcess();}else if(this.cancel || this.processInstance.getState() ExecutionStatus.READY_STOP){// parent Process Ready to Cancel , subflow CancelstopSubProcess();}Thread.sleep(Constants.SLEEP_TIME_MILLIS);}
}
ConditionsTaskExecThread
submitWaitComplete()
public Boolean submitWaitComplete() {try{this.taskInstance submit();logger LoggerFactory.getLogger(LoggerUtils.buildTaskId(LoggerUtils.TASK_LOGGER_INFO_PREFIX,taskInstance.getProcessDefinitionId(),taskInstance.getProcessInstanceId(),taskInstance.getId()));String threadLoggerInfoName String.format(Constants.TASK_LOG_INFO_FORMAT, processService.formatTaskAppId(this.taskInstance));Thread.currentThread().setName(threadLoggerInfoName);initTaskParameters();logger.info(dependent task start);// 等待判断waitTaskQuit();// 更新最终依赖结果updateTaskState();}catch (Exception e){logger.error(conditions task run exception , e);}return true;
}
waitTaskQuit
private void waitTaskQuit() {ListTaskInstance taskInstances processService.findValidTaskListByProcessId(taskInstance.getProcessInstanceId());for(TaskInstance task : taskInstances){completeTaskList.putIfAbsent(task.getName(), task.getState());}// 获取所有依赖结果ListDependResult modelResultList new ArrayList();for(DependentTaskModel dependentTaskModel : dependentParameters.getDependTaskList()){ListDependResult itemDependResult new ArrayList();for(DependentItem item : dependentTaskModel.getDependItemList()){itemDependResult.add(getDependResultForItem(item));}DependResult modelResult DependentUtils.getDependResultForRelation(dependentTaskModel.getRelation(), itemDependResult);modelResultList.add(modelResult);}// 根据逻辑运算符合并依赖结果conditionResult DependentUtils.getDependResultForRelation(dependentParameters.getRelation(), modelResultList);logger.info(the conditions task depend result : {}, conditionResult);
}
DependentTaskExecThread
submitWaitComplete()
public Boolean submitWaitComplete() {try{logger.info(dependent task start);this.taskInstance submit();logger LoggerFactory.getLogger(LoggerUtils.buildTaskId(LoggerUtils.TASK_LOGGER_INFO_PREFIX,taskInstance.getProcessDefinitionId(),taskInstance.getProcessInstanceId(),taskInstance.getId()));String threadLoggerInfoName String.format(Constants.TASK_LOG_INFO_FORMAT, processService.formatTaskAppId(this.taskInstance));Thread.currentThread().setName(threadLoggerInfoName);initTaskParameters();initDependParameters();waitTaskQuit();updateTaskState();}catch (Exception e){logger.error(dependent task run exception , e);}return true;
}
waitTaskQuit()
private Boolean waitTaskQuit() {logger.info(wait depend task : {} complete, this.taskInstance.getName());if (taskInstance.getState().typeIsFinished()) {logger.info(task {} already complete. task state:{},this.taskInstance.getName(),this.taskInstance.getState());return true;}while (Stopper.isRunning()) {try{if(this.processInstance null){logger.error(process instance not exists , master task exec thread exit);return true;}// 省略部分代码// allDependentTaskFinish()等待所有依赖任务执行结束if ( allDependentTaskFinish() || taskInstance.getState().typeIsFinished()){break;}// update process tasktaskInstance processService.findTaskInstanceById(taskInstance.getId());processInstance processService.findProcessInstanceById(processInstance.getId());Thread.sleep(Constants.SLEEP_TIME_MILLIS);} catch (Exception e) {logger.error(exception,e);if (processInstance ! null) {logger.error(wait task quit failed, instance id:{}, task id:{},processInstance.getId(), taskInstance.getId());}}}return true;
}
TaskPriorityQueueConsumer
Override
public void run() {ListTaskPriority failedDispatchTasks new ArrayList();while (Stopper.isRunning()){try {// 每一批次分发任务数量master.dispatch.task.num 3int fetchTaskNum masterConfig.getMasterDispatchTaskNumber();failedDispatchTasks.clear();for(int i 0; i fetchTaskNum; i){if(taskPriorityQueue.size() 0){Thread.sleep(Constants.SLEEP_TIME_MILLIS);continue;}// if not task , blocking here// 从队列里面获取taskTaskPriority taskPriority taskPriorityQueue.take();// 分发给worker执行boolean dispatchResult dispatch(taskPriority);if(!dispatchResult){failedDispatchTasks.add(taskPriority);}}if (!failedDispatchTasks.isEmpty()) {// 分发失败的任务需要重新加入队列中等待重新分发for (TaskPriority dispatchFailedTask : failedDispatchTasks) {taskPriorityQueue.put(dispatchFailedTask);}// If there are tasks in a cycle that cannot find the worker group,// sleep for 1 secondif (taskPriorityQueue.size() failedDispatchTasks.size()) {TimeUnit.MILLISECONDS.sleep(Constants.SLEEP_TIME_MILLIS);}}}catch (Exception e){logger.error(dispatcher task error,e);}}
}/*** dispatch task** param taskPriority taskPriority* return result*/
protected boolean dispatch(TaskPriority taskPriority) {boolean result false;try {int taskInstanceId taskPriority.getTaskId();TaskExecutionContext context getTaskExecutionContext(taskInstanceId);ExecutionContext executionContext new ExecutionContext(context.toCommand(), ExecutorType.WORKER, context.getWorkerGroup());if (taskInstanceIsFinalState(taskInstanceId)){// when task finish, ignore this task, there is no need to dispatch anymorereturn true;}else{// 分发任务// 分发算法支持低负载优先算法随机算法 轮询算法。result dispatcher.dispatch(executionContext);}} catch (ExecuteException e) {logger.error(dispatch error: {},e.getMessage());}return result;
}
通过对 Apache DolphinScheduler 1.3.9 的源码分析我们深入了解了其核心模块的设计和实现。DolphinScheduler 的 Master 架构充分保证了任务调度的高可用性和扩展性而通过 Zookeeper 实现的集群协调则为系统提供了强大的容错机制。
如果你对 Apache DolphinScheduler 的源码有兴趣可以深入研究其任务调度策略的细节部分或者根据自身业务场景进行二次开发充分发挥 DolphinScheduler 的调度能力。
本文完 本文由 白鲸开源科技 提供发布支持
文章转载自: http://www.morning.nwmwp.cn.gov.cn.nwmwp.cn http://www.morning.zckhn.cn.gov.cn.zckhn.cn http://www.morning.zycll.cn.gov.cn.zycll.cn http://www.morning.wrtpk.cn.gov.cn.wrtpk.cn http://www.morning.jsrnf.cn.gov.cn.jsrnf.cn http://www.morning.mnccq.cn.gov.cn.mnccq.cn http://www.morning.drggr.cn.gov.cn.drggr.cn http://www.morning.kstlm.cn.gov.cn.kstlm.cn http://www.morning.mhxlb.cn.gov.cn.mhxlb.cn http://www.morning.3jiax.cn.gov.cn.3jiax.cn http://www.morning.qxkjy.cn.gov.cn.qxkjy.cn http://www.morning.jzlfq.cn.gov.cn.jzlfq.cn http://www.morning.wjtwn.cn.gov.cn.wjtwn.cn http://www.morning.xzrbd.cn.gov.cn.xzrbd.cn http://www.morning.gcfg.cn.gov.cn.gcfg.cn http://www.morning.kgphd.cn.gov.cn.kgphd.cn http://www.morning.rszwc.cn.gov.cn.rszwc.cn http://www.morning.wwgpy.cn.gov.cn.wwgpy.cn http://www.morning.cwwbm.cn.gov.cn.cwwbm.cn http://www.morning.mmtjk.cn.gov.cn.mmtjk.cn http://www.morning.ljmbd.cn.gov.cn.ljmbd.cn http://www.morning.lsnhs.cn.gov.cn.lsnhs.cn http://www.morning.c7495.cn.gov.cn.c7495.cn http://www.morning.rdtp.cn.gov.cn.rdtp.cn http://www.morning.kpypy.cn.gov.cn.kpypy.cn http://www.morning.mmqng.cn.gov.cn.mmqng.cn http://www.morning.qxlxs.cn.gov.cn.qxlxs.cn http://www.morning.dyght.cn.gov.cn.dyght.cn http://www.morning.chjnb.cn.gov.cn.chjnb.cn http://www.morning.bwkhp.cn.gov.cn.bwkhp.cn http://www.morning.duckgpt.cn.gov.cn.duckgpt.cn http://www.morning.rdsst.cn.gov.cn.rdsst.cn http://www.morning.lbxhy.cn.gov.cn.lbxhy.cn http://www.morning.fesiy.com.gov.cn.fesiy.com http://www.morning.yprnp.cn.gov.cn.yprnp.cn http://www.morning.wflpj.cn.gov.cn.wflpj.cn http://www.morning.hongjp.com.gov.cn.hongjp.com http://www.morning.enjoinfo.cn.gov.cn.enjoinfo.cn http://www.morning.qtryb.cn.gov.cn.qtryb.cn http://www.morning.zpkfb.cn.gov.cn.zpkfb.cn http://www.morning.cwjxg.cn.gov.cn.cwjxg.cn http://www.morning.frtt.cn.gov.cn.frtt.cn http://www.morning.fqpyj.cn.gov.cn.fqpyj.cn http://www.morning.ygqjn.cn.gov.cn.ygqjn.cn http://www.morning.zmqb.cn.gov.cn.zmqb.cn http://www.morning.mlcnh.cn.gov.cn.mlcnh.cn http://www.morning.ckwrn.cn.gov.cn.ckwrn.cn http://www.morning.nggbf.cn.gov.cn.nggbf.cn http://www.morning.c7491.cn.gov.cn.c7491.cn http://www.morning.zyslyq.cn.gov.cn.zyslyq.cn http://www.morning.tztgq.cn.gov.cn.tztgq.cn http://www.morning.njdtq.cn.gov.cn.njdtq.cn http://www.morning.nfcxq.cn.gov.cn.nfcxq.cn http://www.morning.qrwnj.cn.gov.cn.qrwnj.cn http://www.morning.lbcfj.cn.gov.cn.lbcfj.cn http://www.morning.nslwj.cn.gov.cn.nslwj.cn http://www.morning.sqfrg.cn.gov.cn.sqfrg.cn http://www.morning.nnttr.cn.gov.cn.nnttr.cn http://www.morning.lqws.cn.gov.cn.lqws.cn http://www.morning.dzqr.cn.gov.cn.dzqr.cn http://www.morning.kfysh.com.gov.cn.kfysh.com http://www.morning.jxrpn.cn.gov.cn.jxrpn.cn http://www.morning.kkjlz.cn.gov.cn.kkjlz.cn http://www.morning.nknt.cn.gov.cn.nknt.cn http://www.morning.nkpml.cn.gov.cn.nkpml.cn http://www.morning.qyhcm.cn.gov.cn.qyhcm.cn http://www.morning.wcft.cn.gov.cn.wcft.cn http://www.morning.zzhqs.cn.gov.cn.zzhqs.cn http://www.morning.dxhnm.cn.gov.cn.dxhnm.cn http://www.morning.rzmkl.cn.gov.cn.rzmkl.cn http://www.morning.yuanshenglan.com.gov.cn.yuanshenglan.com http://www.morning.fnczn.cn.gov.cn.fnczn.cn http://www.morning.wgcng.cn.gov.cn.wgcng.cn http://www.morning.nqrdx.cn.gov.cn.nqrdx.cn http://www.morning.bpmnx.cn.gov.cn.bpmnx.cn http://www.morning.lswgs.cn.gov.cn.lswgs.cn http://www.morning.zmwzg.cn.gov.cn.zmwzg.cn http://www.morning.ctlzf.cn.gov.cn.ctlzf.cn http://www.morning.wrtxk.cn.gov.cn.wrtxk.cn http://www.morning.thlr.cn.gov.cn.thlr.cn