from airflow.operators.latest_only_operator import LatestOnlyOperator
latest_only = LatestOnlyOperator(
task_id='latest_only',
dag=dag,
train_model >> latest_only >> deploy_model
当然,对于更复杂的情况,基于PythonOperator的路由为实现自定义条件提供了更大的灵活性。
5.4 有关触发规则的更多信息
在前面的部分中,我们看到了Airflow如何允许我们构建动态行为DAG,这允许我们将分支或条件语句直接编码到DAG中。 这种行为在很大程度上受Airflow所谓的触发规则支配,该规则确定了Airflow何时执行任务。由于我们在上一节中相对较快地跳过了触发规则,因此我们将在这里更详细地探讨它们,以使您了解触发规则代表什么以及如何使用它们。
要了解触发规则,我们首先必须检查Airflow如何在DAG运行中执行任务。 本质上,当Airflow执行DAG时,它将连续检查您的每个任务以查看是否可以执行它。 一旦某个任务被视为“准备执行”,该任务就会被调度程序拾取并安排执行。 因此,一旦Airflow有可用的执行插槽,便会立即执行任务。
那么,Airflow如何确定何时可以执行任务? 这就是触发规则出现的地方。
5.4.1 什么是触发规则?
触发规则本质上是Airflow应用于任务的条件,取决于它们的依赖性(= DAG中的先前任务),以确定它们是否准备好执行。 Airflow的默认触发规则是“ all_success”,该规则指出,必须先成功完成所有任务的依赖关系,然后才能执行任务本身。
要了解这是什么意思,让我们回到最初的Umbrella DAG实现(图5.4),除了默认的“ all_success”规则外,它还没有使用任何触发规则。 如果我们要开始执行此DAG,Airflow将开始循环执行其任务以确定可以执行哪些任务,即哪些任务没有依赖关系,尚未成功完成。
图5.14。使用默认触发器规则“all_success”跟踪基本的Umbrella DAG的执行(图5.4)。(A)气流开始执行DAG时,首先运行唯一一个之前没有成功完成的任务:start任务。(B)成功完成启动任务后,其他任务将准备好执行并由Airflow接管。
5.4.2 失败的影响
当然,这仅描绘了“快乐”流程的情况,在此情况下,我们的所有任务均成功完成。 例如,如果我们的任务之一在执行过程中遇到错误,该怎么办?
我们可以通过模拟其中一项任务中的故障来轻松地对此进行测试。 例如,通过模拟fetch_sales任务中的失败,我们可以看到Airflow将为fetch_sales分配“失败”状态而不是为成功执行使用的“成功”状态来记录失败(图5.15)。 这意味着下游的process_sales任务无法执行,因为它要求fetch_sales成功。 结果,clean_sales任务被分配了状态“ upstream_failed”,这表明它由于上游故障而无法继续进行。
图5.15。 上游故障会阻止使用默认触发规则“ all_success”执行下游任务,该规则要求所有上游任务都必须成功。 请注意,Airflow会继续执行与失败的任务无关的任务(fetch_weather和process_weather)
上游任务的结果也会影响下游任务的这种行为通常称为“传播”,因为在这种情况下,上游故障会“传播”到下游任务。 除了失败之外,默认触发规则还可以将已跳过任务的影响传播到下游,从而导致已跳过任务下游的所有任务也被跳过。
这种传播是“ all_success”触发规则定义的直接结果,该规则要求所有依赖项都必须成功完成。 这样,如果它在依赖项中遇到跳过或失败,则除了以类似方式失败以外,别无选择,从而传播了跳过或失败。
5.4.3 其他触发规则
除了默认触发规则外,Airflow还支持许多其他触发规则。 这些规则允许响应成功,失败或跳过的任务时出现不同类型的行为。
例如,让我们回顾一下第5.2节中两个ERP系统之间的分支模式。 在这种情况下,我们必须调整加入分支的任务的触发规则(由join_datasets或join_erp_branch任务完成),以避免下游任务由于分支而被跳过。 原因是,使用默认触发规则,通过仅选择两个分支之一在DAG中引入的跳过将被传播到下游,从而导致该分支之后的所有任务也被跳过。 相反,“ none_failed”触发规则仅检查是否所有上游任务均已完成而没有失败。 这意味着它可以容忍成功和跳过的任务,同时仍然等待所有上游任务完成再继续执行,从而使触发规则适合于加入两个分支。 请注意,就传播而言,这意味着规则不会传播跳过。 但是,它仍然会传播故障,这意味着获取/处理任务中的任何故障仍将停止下游任务的执行。
同样,其他触发规则可用于处理其他类型的情况。 例如,触发规则“ all_done”可用于定义任务完成依赖关系后立即执行的任务,而不管其结果如何。例如,这可以用于执行清理代码(例如,关闭计算机或清理资源) ,无论发生什么情况,您都希望运行这些代码。 另一类触发规则包括渴望规则,例如“ one_failed”或“ one_success”,它们不等待所有上游任务在触发之前完成,而是仅需要一个上游任务来满足其条件才可以触发。 这样,这些规则可用于表示任务的早期失败或一旦一组任务中的一个任务成功完成就迅速做出响应。
尽管这里我们不会更深入地介绍触发规则,但是我们希望这能使您对触发规则在Airflow中的作用以及如何将其用于将更复杂的行为引入DAG的想法有所了解。 有关触发规则和一些潜在用例的完整概述,请参考表5.1。
表5.1。 Airflow支持的不同触发规则的概述。
5.5 在任务之间共享数据
除了定义任务之间的依赖关系外,Airflow还允许您使用XComs [14]在任务之间共享小数据。 XComs背后的思想是,它们本质上允许您在任务之间交换消息,从而在任务之间实现某种程度的共享状态。
5.5.1 使用XComs共享数据
为了了解其工作原理,让我们回顾一下我们的总体用例(图5.3)。 想象一下,在训练模型时(在train_model任务中),使用随机生成的标识符将训练后的模型注册到模型注册表中。 为了部署经过训练的模型,我们需要以某种方式将此标识符传递给deploy_model任务,以便它知道应该部署哪个版本的模型。
解决这个问题的一种方法是使用xcom在train_model和deploy_model任务之间共享模型标识符。在本例中,train_model任务负责“推送”XCom值,这实际上发布了该值,并使其可用于其他任务。 我们可以使用xcom_push方法在任务中显式发布XCom值,该方法在Airflow上下文中的任务实例上可用:
Listing 5.19
def _train_model(**context):
model_id = str(uuid.uuid4())
context["task_instance"].xcom_push(key="model_id", value=model_id)
train_model = PythonOperator(
task_id="train_model", python_callable=_train_model, provide_context=True,
对xcom_push的此调用有效地告诉Airflow将我们的model_id值注册为相应任务(train_model)以及相应DAG和执行日期的XCom值。运行此任务后,您可以在Web界面中的Admin> XComs部分中查看此已发布的XCom值(图5.16),其中显示了所有已发布的XCom值的概述。
图5.16。 已注册的XCom值概述(在Web界面中的Admin> XComs下)。
您可以使用xcom_pull方法(与xcom_push相反)来在其他任务中检索XCom值:
Listing 5.20
def _deploy_model(**context):
model_id = context["task_instance"].xcom_pull(
task_ids="train_model", key="model_id"
print(f"Deploying model {model_id}")
deploy_model = PythonOperator(
task_id="deploy_model", python_callable=_deploy_model, provide_context=True,
这告诉Airflow从“ train_model”任务中使用键“ model_id”获取XCom值,该值与我们之前在train_model任务中推送的model_id相匹配。 请注意,xcom_pull还允许您在获取XCom值时定义dag_id和执行日期。 默认情况下,这些参数设置为当前DAG和执行日期,因此xcom_pull仅获取当前DAG运行发布的值[15]。
我们可以通过运行DAG来验证这项工作是否有效,这应该为我们提供类似于deploy_model任务的以下结果:
Listing 5.21
[2020-07-29 20:23:03,581] {python_operator.py:105} INFO - Exporting the following env vars:
AIRFLOW_CTX_DAG_ID=chapter5_08_xcoms
AIRFLOW_CTX_TASK_ID=deploy_model
AIRFLOW_CTX_EXECUTION_DATE=2020-07-28T00:00:00+00:00
AIRFLOW_CTX_DAG_RUN_ID=scheduled__2020-07-28T00:00:00+00:00
[2020-07-29 20:23:03,584] {logging_mixin.py:95} INFO - Deploying model f323fa68-8b47-4e21-a687-7a3d9b6e105c
[2020-07-29 20:23:03,584] {python_operator.py:114} INFO - Done. Returned value was: None
除了从任务中调用xcom_pull之外,还可以在模板中引用XCom变量:
Listing 5.22
def _deploy_model(templates_dict, **context):
model_id = templates_dict["model_id"]
print(f"Deploying model {model_id}")
deploy_model = PythonOperator(
task_id="deploy_model",
python_callable=_deploy_model,
templates_dict={
"model_id": "{{task_instance.xcom_pull(task_ids='train_model', key='model_id')}}"
provide_context=True,
最后,一些operators 还提供了对自动推送XCom值的支持。 例如,BashOperator有一个xcom_push选项,当设置为True时,它告诉操作员将bash命令写入stdout的最后一行作为XCom值推送。 同样,PythonOperator会将从Python调用返回的任何值发布为XCom值。 这意味着您还可以按如下所示编写我们的以上示例:
Listing 5.23
def _train_model(**context):
model_id = str(uuid.uuid4())
return model_id
def _deploy_model(**context):
model_id = context["task_instance"].xcom_pull(task_ids="train_model")
print(f"Deploying model {model_id}")
在后台,这可以通过在默认键“ return_value”下注册XCom来实现,正如我们在Admin部分中所见(图5.17)。
图5.17来自PythonOperator的隐式XCom已在“ return_value”键下注册。
5.5.2 何时(不)使用xcom
尽管XCom似乎对于在任务之间共享状态很有用,但它们的使用也有一些缺点。
例如,使用XComs的一个重要缺点是它们在任务之间添加了隐藏的依赖关系,因为提取任务对推送所需值的任务具有隐式依赖关系。 与显式任务相关性相反,此任务相关性在DAG中不可见,并且在安排任务时不会考虑。 因此,您有责任确保具有XCom依赖关系的任务以正确的顺序执行,Airflow不会为您执行此操作。当在不同的DAG或执行日期之间共享XCom值时,这些隐藏的依赖关系变得更加复杂,因此,这也不是我们建议的做法。
此外,当XCom破坏了operator的原子性时,它们可能会有点反模式。 例如,我们在实践中已经看到人们使用的一种用法是使用运算符在一个任务中获取API令牌,然后使用XCom将令牌传递给下一个任务。 在这种情况下,此方法的缺点是令牌在几个小时后过期,这意味着第二个任务的任何重新运行都将由于令牌过期而失败。更好的方法可能是将令牌的提取与第二个任务结合起来,这样一来,API令牌和相关工作的刷新就一次性发生了(从而使任务保持原子性)。
最后,XCom 的一个技术限制是,XCom 存储的任何值都需要是picklable。这意味着某些 Python 类型(如 lambdas 或许多与多处理相关的类)不能存储在 XCom 中(尽管您可能不希望这样做)。 此外,XCom值的大小受到将XCom存储在Airflow元存储中的数据库字段类型的最大大小限制:
SQLite-存储为BLOB类型,限制为2GB
PostgreSQL-存储为BYTEA类型,限制为1 GB
MySQL-存储为BLOB类型,限制为64 KB
话虽如此,如果适当地使用XComs可以成为强大的工具。 只要确保仔细考虑它们的用法并清楚地记录它们在任务之间引入的依赖关系,就可以避免日后出现意外情况。
5.6 摘要
在这一章里你学到了:
如何在Airflow DAG中定义基本线性依赖性和扇入/扇出结构。
如何将分支合并到DAG中,从而允许您根据特定条件选择多个执行路径。
可以将分支合并到DAG的结构中,而不是将其合并到任务中,从而在DAG的执行方式的可解释性方面提供了很多好处。
如何在DAG中定义条件任务,可以根据某些定义的条件执行这些任务。 与分支类似,这些条件可以直接在DAG中编码。
Airflow使用触发规则来启用这些行为,这些行为准确定义了Airflow何时可以执行给定任务。
除了默认触发规则“ all_success”之外,Airflow还支持其他各种触发规则,您可以使用这些触发规则来触发您的任务以应对不同类型的情况。
如何使用XCom在两个任务之间共享状态。