airflow跨Dag依赖的调度方式 人工智能回答
假设有两个 DAG,一个是 DAG_A,另一个是 DAG_B。DAG_A 中有一个任务 task_a,DAG_B 中有一个任务 task_b,需要在 DAG_A 中等待 DAG_B 中的 task_b 完成后才能执行 task_a。这样,当 DAG_B.task_b 完成时,ExternalTaskMarker 会将任务标记为已完成,ExternalTaskSensor 会检测到该任务已完成,并触
在 Airflow 中,可以使用 ExternalTaskSensor 和 ExternalTaskMarker 来实现跨 DAG 之间的依赖。
ExternalTaskSensor 用于等待另一个 DAG 中的任务完成,可以将其作为目标 DAG 中的任务的依赖项。ExternalTaskMarker 则用于标记目标 DAG 中的任务已经完成,以便其他 DAG 可以继续执行。
下面是一个使用 ExternalTaskSensor 和 ExternalTaskMarker 的示例:
假设有两个 DAG,一个是 DAG_A,另一个是 DAG_B。DAG_A 中有一个任务 task_a,DAG_B 中有一个任务 task_b,需要在 DAG_A 中等待 DAG_B 中的 task_b 完成后才能执行 task_a。
首先,在 DAG_A 中定义一个 ExternalTaskSensor,用于等待 DAG_B.task_b 完成:
from airflow.sensors.external_task_sensor import ExternalTaskSensor
wait_for_task_b = ExternalTaskSensor(
task_id='wait_for_task_b',
external_dag_id='DAG_B',
external_task_id='task_b',
mode='reschedule',
dag=dag
)
然后,在 DAG_A.task_a 的依赖项中添加 wait_for_task_b:
task_a = BashOperator(
task_id='task_a',
bash_command='echo "Hello, world!"',
dag=dag,
depends_on_past=False,
wait_for_downstream=True
)
task_a.set_upstream(wait_for_task_b)
接下来,在 DAG_B.task_b 中添加 ExternalTaskMarker,用于标记任务已经完成:
from airflow.operators.external_task_marker_operator import ExternalTaskMarker
task_b = BashOperator(
task_id='task_b',
bash_command='echo "Hello, world!"',
dag=dag,
depends_on_past=False,
wait_for_downstream=True
)
mark_task_b = ExternalTaskMarker(
task_id='mark_task_b',
external_dag_id='DAG_B',
external_task_id='task_b',
dag=dag
)
task_b.set_downstream(mark_task_b)
这样,当 DAG_B.task_b 完成时,ExternalTaskMarker 会将任务标记为已完成,ExternalTaskSensor 会检测到该任务已完成,并触发 DAG_A.task_a 的执行。

DAMO开发者矩阵,由阿里巴巴达摩院和中国互联网协会联合发起,致力于探讨最前沿的技术趋势与应用成果,搭建高质量的交流与分享平台,推动技术创新与产业应用链接,围绕“人工智能与新型计算”构建开放共享的开发者生态。
更多推荐
所有评论(0)