网站建设开发合同模板下载,注册微信号的网站,手机模板网站模板免费下载,秦皇岛网络公司 网站托管Airflow源码分析-任务调度器实现分析
概述
本文介绍Airflow执行器的总体实现流程。通过函数调用的方式说明了Airflow scheduler的实现原理#xff0c;对整个调度过程的源码进行了分析。 通过本文#xff0c;可以基本把握住Airflow的调度器的运行原理主线。
启动调度器
可…Airflow源码分析-任务调度器实现分析
概述
本文介绍Airflow执行器的总体实现流程。通过函数调用的方式说明了Airflow scheduler的实现原理对整个调度过程的源码进行了分析。 通过本文可以基本把握住Airflow的调度器的运行原理主线。
启动调度器
可以通过命令来启动调度器
airflow scheduler启动airflow的调度器后的总执行流程如下
执行命令后会调用scheduler_command.py#scheduler(args)函数。scheduler(args)函数会打开一个日志文件然后调用_run_scheduler_job(args)函数该函数会创建一个SchedulerJob对象把DAG文件的目录传入启动后就开始解析DAG文件目录中的DAG文件(*.py)文件。然后开始执行SchedulerJob#run()这个函数启动job侧。该函数在其父类BaseJob中定义。BaseJob#run()函数调用其实现类的_execute()函数这里就是调用SchedulerJob#_execute函数来启动调度服务。
调度器的实现分析
SchedulerJob#_execute函数 打印启动调度器的日志可以在调度器服务的日志文件中看到以下日志Starting the scheduler。 如果不是独立模式进程则创建一个DagFileProcessorAgent对象用于读取DAG文件。并把该对象保存到SchedulerJob#processor_agent成员变量中。 初始化执行器并设置执行器的回调函数。创建callback_sink对象它可能是PipeCallbackSinkDagFileProcessorAgent是非独立模式或DatabaseCallbackSinkDagFileProcessorAgent是独立模式创建完成后对象保存到BaseExecutor#callback_sink变量中代码中实现的是SchedulerJob#executor函数的返回值。 调用 self.executor.start()函数来启动执行器executor调用SchedulerJob#executor.start() 调用self.register_signals()函数来注册信号处理程序以便在需要时可以停止儿子进程。 启动DagFileProcessorAgent服务SchedulerJob.processor_agent.start() 调用 SchedulerJob#_run_scheduler_loop进入调度服务循环根据DAG的调度计划执行DAG中的任务。 DAG处理完成后如果是非独立模式则停止DagFileProcessorAgent对象并检查所有文件是否都已经处x理。如果所有文件都已处理则停用未被调度器触及的DAG。 最后执行器结束工作关闭处理DAG文件的代理对象和回调函数并移除会话对象。
调度服务主循环: SchedulerJob#_run_scheduler_loop 通过if not self.processor_agent来检查DAG文件处理服务是否已经启动 创建一个定时器调度对象timers timers EventScheduler()该对象会定时执行定义的函数。用于周期性地运行一些任务。 向该timer对象中注册一些回调函数来定时进行一些任务。这些任务包括检查孤立的任务、检查触发器超时、更新池指标、查找僵尸任务等。 通过一个无限循环来不断地进行调度直到达到指定的运行次数或达到了DAG解析次数的上限。在每次循环中它会执行以下步骤 a. 进入调度循环若使用sqlite则运行一个单独的的DAG文件解析进程processor_agent.run_single_parsing_loop()。 b. 调用SchedulerJob#_do_scheduling进入实际的调度实现代码中。并返回排队的任务数量。 c. 启动executor的心跳处理进程self.executor.heartbeat() d. 启动executor的事件处理器SchedulerJob#_process_executor_events e. 启动DagFileProcessorAgent的心跳服务self.processor_agent.heartbeat() f. 运行定时任务 如果没有工作需要执行则等待一段时间。
最后如果达到了指定的运行次数或达到了DAG解析次数的上限则退出循环。如果使用DagFileProcessorAgent则在达到解析次数上限时也会退出循环。
调度实现函数SchedulerJob#_do_scheduling
调用prohibit_commit的函数来返回一个上下文管理器CommitProhibitorGuard。该上下文管理器可以防止在其作用域之外通过会话对象提交事务从而严格控制事务的生命周期以确保在核心调度器循环中的严格控制。如果在上下文管理器之外通过会话对象提交事务将引发RuntimeError异常。调用SchedulerJob#_create_dagruns_for_dags函数来根据DagModel中的next_dagrun_create_after列创建任何必要的DagRun。默认情况下只选择10个DAG可以通scheduler.max_dagruns_to_create_per_loop设置进行配置。调用函数: SchedulerJob#_start_queued_dagruns在DagRuns集合中对象中查找“下n个最老的”正在运行的DAGRun进行调度默认n 20可以通过“scheduler.max_dagruns_per_loop_to_schedule”进行配置并尝试进度状态将TIs调度为SCHEDULED或将DagRuns调度为SUCCESS / FAILURE等。调用SchedulerJob#_get_next_dagruns_to_examine检查dagrun的参数。调用SchedulerJob#_schedule_all_dag_runs函数来决定所有dagrun的调度决定。该函数会遍历所有的dagrun对每个dagrun调用SchedulerJob#_schedule_dag_run函数来决定是否调度该dagrun。通过临界区锁定Pool模型的行将任务排队然后将其发送到执行器中。详见_critical_section_enqueue_task_instances()文档。返回在此迭代中入队的TIs的数量。
其中步骤2和步骤3需要注意因为它们会锁定某些行并且只有一个调度器可以同时处理这些行因此它们可能会影响调度器的吞吐量。步骤2中默认选择的20个DAG Run是基于它们最久没有被检查/调度的时间来选择的。步骤3中通过临界区锁定行的目的是为了防止多个调度器同时修改同一个任务实例这会导致竞态条件和不一致性。
SchedulerJob#_start_queued_dagruns
该方法用于启动处于排队状态的DagRuns。 该方法调用_get_next_dagruns_to_examine方法获取处于QUEUED状态的DagRuns。 然后该方法使用DagRun.active_runs_of_dags方法计算每个DAG当前正在运行的DagRun数量并将结果存储在active_runs_of_dags字典中。 它遍历了每个处于QUEUED状态的DagRun检查是否可以将其移动到RUNNING状态。 a. 对于每个DagRun该方法首先使用DagBag.get_dag方法获取其对应的DAG对象并将其赋值给dag_run.dag属性。如果DAG不存在则记录错误并继续处理下一个DagRun。 b. 接下来该方法使用active_runs_of_dags字典获取DAG当前正在运行的DagRun数量并将结果存储在active_runs变量中。 c. 该方法检查DAG的max_active_runs属性是否为None如果不是则检查DAG当前正在运行的DagRun数量是否超过了该属性的值。如果是则记录调试日志并不将该DagRun移动到RUNNING状态。否则该方法会将DagRun的状态设置为RUNNING并更新其start_date属性。此外该方法还会将DAG当前正在运行的DagRun数量加1并调用DagRun.notify_dagrun_state_changed方法通知状态已更改。 d. 最后该方法调用了_update_state方法该方法用于设置DagRun的状态为RUNNING并计算调度延迟如果DAG是周期性的。需要注意的是_update_state方法是一个内部函数定义在_start_queued_dagruns方法内部。
SchedulerJob#_critical_section_enqueue_task_instances
该方法用于将TaskInstances添加到执行队列中。
该方法包含以下三个步骤 使用优先级选择TaskInstances并确保它们处于预期状态并且不会超过最大活动运行数或池限制。 原子地更改上述TaskInstances的状态。 将TaskInstances添加到执行器的队列中。需要注意的是该方法是一个“关键段”意味着只有一个执行器进程可以同时执行该方法。为了实现这一点该方法使用了SELECT…FOR UPDATE语句锁定了池表以确保只有一个进程可以进行修改。 该方法还包含了两个辅助方法_executable_task_instances_to_queued和_enqueue_task_instances_with_queued_state。其中_executable_task_instances_to_queued方法用于选择可执行的TaskInstances而_enqueue_task_instances_with_queued_state方法用于将TaskInstances添加到执行器的队列中。 最后该方法返回状态发生了变化的TaskInstance的数量。
注意该方法使用了一个名为max_tis_per_query的属性它表示每次查询最多选择的TaskInstance数量。如果该属性的值为0则选择所有可用的TaskInstances否则仅选择最多max_tis_per_query个TaskInstances。此外该方法还使用了一个名为executor的属性它表示Airflow的执行器对象用于将TaskInstances添加到执行队列中。
SchedulerJob#_enqueue_task_instances_with_queued_state
该方法的作用是将状态为queued的任务实例添加到执行器executor的队列中queued_tasks字典中等待执行。
该方法接受两个参数
task_instances待执行的任务实例列表。session数据库会话对象。 该方法首先遍历任务实例列表对于每个状态为queued的任务实例将其命令添加到执行器的队列中等待执行。如果任务实例所属的DAG运行状态为finished则将任务实例状态设置为None并跳过该任务实例的执行。 对于每个任务实例该方法使用任务实例的command_as_list方法获取该任务实例的命令并设置该任务实例的优先级和队列。然后该方法调用执行器的queue_command方法将该任务实例的命令添加到执行器的队列中等待执行。
需要注意的是该方法并不会等待任务实例执行完毕而是将任务实例的执行交给了执行器处理。而执行器的具体处理逻辑在_process_tasks函数的_executor.execute_async方法中。
该函数的实现代码如下
def _enqueue_task_instances_with_queued_state(self, task_instances: list[TI], session: Session) - None:Takes task_instances, which should have been set to queued, and enqueues themwith the executor.:param task_instances: TaskInstances to enqueue:param session: The session object# actually enqueue themfor ti in task_instances:if ti.dag_run.state in State.finished:ti.set_state(State.NONE, sessionsession)continuecommand ti.command_as_list(localTrue,pickle_idti.dag_model.pickle_id,)priority ti.priority_weightqueue ti.queueself.log.info(Sending %s to executor with priority %s and queue %s, ti.key, priority, queue)self.executor.queue_command(ti,command,prioritypriority,queuequeue,)task通过queue_command已经放到了执行器的任务执行队列queued_tasks中该变量其实是一个有序的字典由OrderedDict类来定义。这样不同类型的执行器就可以消费该队列执行任务了。
Task放入执行器队列
执行器会调用以下函数来执行task。每个执行器实现的实现逻辑不同可以进入每个执行器中继续分析其实现。 def _process_tasks(self, task_tuples: list[TaskTuple]) - None:for key, command, queue, executor_config in task_tuples:del self.queued_tasks[key]self.execute_async(keykey, commandcommand, queuequeue, executor_configexecutor_config)self.running.add(key)小结
本文分析了airflow的任务执行总体流程。分析了从dag文件处理到task的调度和执行的整个流程。
通过本文的分析可以说基本把握住了Airflow的运行原理的主线。可以根据这条主线继续分析每个执行器的执行原理以及任务优先级DAG文件处理的细节。 文章转载自: http://www.morning.ksbmx.cn.gov.cn.ksbmx.cn http://www.morning.rxlck.cn.gov.cn.rxlck.cn http://www.morning.zzgtdz.cn.gov.cn.zzgtdz.cn http://www.morning.lcbt.cn.gov.cn.lcbt.cn http://www.morning.kkjlz.cn.gov.cn.kkjlz.cn http://www.morning.ngcsh.cn.gov.cn.ngcsh.cn http://www.morning.qmnhw.cn.gov.cn.qmnhw.cn http://www.morning.drrt.cn.gov.cn.drrt.cn http://www.morning.lgkbn.cn.gov.cn.lgkbn.cn http://www.morning.snrhg.cn.gov.cn.snrhg.cn http://www.morning.lhhdy.cn.gov.cn.lhhdy.cn http://www.morning.bppml.cn.gov.cn.bppml.cn http://www.morning.ryrpq.cn.gov.cn.ryrpq.cn http://www.morning.spghj.cn.gov.cn.spghj.cn http://www.morning.bqdgr.cn.gov.cn.bqdgr.cn http://www.morning.dktyc.cn.gov.cn.dktyc.cn http://www.morning.wslpk.cn.gov.cn.wslpk.cn http://www.morning.ptmsk.cn.gov.cn.ptmsk.cn http://www.morning.qyglt.cn.gov.cn.qyglt.cn http://www.morning.xlyt.cn.gov.cn.xlyt.cn http://www.morning.srjbs.cn.gov.cn.srjbs.cn http://www.morning.kdlzz.cn.gov.cn.kdlzz.cn http://www.morning.rnlx.cn.gov.cn.rnlx.cn http://www.morning.ymsdr.cn.gov.cn.ymsdr.cn http://www.morning.kqgsn.cn.gov.cn.kqgsn.cn http://www.morning.bfhrj.cn.gov.cn.bfhrj.cn http://www.morning.ftzll.cn.gov.cn.ftzll.cn http://www.morning.ygkq.cn.gov.cn.ygkq.cn http://www.morning.2d1bl5.cn.gov.cn.2d1bl5.cn http://www.morning.kndt.cn.gov.cn.kndt.cn http://www.morning.fykrm.cn.gov.cn.fykrm.cn http://www.morning.yqjjn.cn.gov.cn.yqjjn.cn http://www.morning.snxbf.cn.gov.cn.snxbf.cn http://www.morning.jrqbr.cn.gov.cn.jrqbr.cn http://www.morning.yrjxr.cn.gov.cn.yrjxr.cn http://www.morning.yrhd.cn.gov.cn.yrhd.cn http://www.morning.pnbls.cn.gov.cn.pnbls.cn http://www.morning.hdrrk.cn.gov.cn.hdrrk.cn http://www.morning.bpmft.cn.gov.cn.bpmft.cn http://www.morning.ldqrd.cn.gov.cn.ldqrd.cn http://www.morning.frpfk.cn.gov.cn.frpfk.cn http://www.morning.znqztgc.cn.gov.cn.znqztgc.cn http://www.morning.trqzk.cn.gov.cn.trqzk.cn http://www.morning.clhyj.cn.gov.cn.clhyj.cn http://www.morning.ztcwp.cn.gov.cn.ztcwp.cn http://www.morning.zlcsz.cn.gov.cn.zlcsz.cn http://www.morning.feites.com.gov.cn.feites.com http://www.morning.ljzqb.cn.gov.cn.ljzqb.cn http://www.morning.ymwnc.cn.gov.cn.ymwnc.cn http://www.morning.gqtw.cn.gov.cn.gqtw.cn http://www.morning.kyhnl.cn.gov.cn.kyhnl.cn http://www.morning.grpbt.cn.gov.cn.grpbt.cn http://www.morning.ykgkh.cn.gov.cn.ykgkh.cn http://www.morning.qnyf.cn.gov.cn.qnyf.cn http://www.morning.wjtwn.cn.gov.cn.wjtwn.cn http://www.morning.bypfj.cn.gov.cn.bypfj.cn http://www.morning.xqjrg.cn.gov.cn.xqjrg.cn http://www.morning.xczyj.cn.gov.cn.xczyj.cn http://www.morning.pzcjq.cn.gov.cn.pzcjq.cn http://www.morning.pbtrx.cn.gov.cn.pbtrx.cn http://www.morning.snygg.cn.gov.cn.snygg.cn http://www.morning.ldpjm.cn.gov.cn.ldpjm.cn http://www.morning.zsyrk.cn.gov.cn.zsyrk.cn http://www.morning.kclkb.cn.gov.cn.kclkb.cn http://www.morning.jtnph.cn.gov.cn.jtnph.cn http://www.morning.zxfr.cn.gov.cn.zxfr.cn http://www.morning.mgtrc.cn.gov.cn.mgtrc.cn http://www.morning.tphjl.cn.gov.cn.tphjl.cn http://www.morning.bpmfl.cn.gov.cn.bpmfl.cn http://www.morning.fjglf.cn.gov.cn.fjglf.cn http://www.morning.wqwbj.cn.gov.cn.wqwbj.cn http://www.morning.rqhdt.cn.gov.cn.rqhdt.cn http://www.morning.nfbkp.cn.gov.cn.nfbkp.cn http://www.morning.klltg.cn.gov.cn.klltg.cn http://www.morning.nsrlb.cn.gov.cn.nsrlb.cn http://www.morning.ppqzb.cn.gov.cn.ppqzb.cn http://www.morning.tnwgc.cn.gov.cn.tnwgc.cn http://www.morning.fnbtn.cn.gov.cn.fnbtn.cn http://www.morning.hxftm.cn.gov.cn.hxftm.cn http://www.morning.xtrnx.cn.gov.cn.xtrnx.cn