Collectives™ on Stack Overflow

Find centralized, trusted content and collaborate around the technologies you use most.

Learn more about Collectives

Teams

Q&A for work

Connect and share knowledge within a single location that is structured and easy to search.

Learn more about Teams

I have Dag with two TaskGroups. Each TaskGroup has two tasks:

t1: SparkKubernetesOperator >> t2: SparkKubernetesSensor

t1 submits spark job into kubernetes cluster using spark operator deployment yaml file. it goes into dark green SUCCESS state instantly.

t2 monitors the execution of t1. if spark job is Running then it takes ~10min for completion and then t2 goes into Success status.

I have the situation then submited spark job gets ERROR: UnknownHostException and this is when I want to retry but I want to retry whole TaskGroup and not only t2.

I now it is not possible to retry whole TaskGroup.

How to correctly retry and submit spark job into k8s task through airflow 2.3.3?

from datetime import datetime, timedelta
from airflow import DAG
from airflow.models.baseoperator import chain
from airflow.utils.task_group import TaskGroup
from airflow.providers.cncf.kubernetes.operators.spark_kubernetes import SparkKubernetesOperator
from airflow.providers.cncf.kubernetes.sensors.spark_kubernetes import SparkKubernetesSensor
from alerts import slack_alert
default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'max_active_runs': 1,
    'retries': 5,
    'retry_delay': timedelta(minutes=30),
    'on_failure_callback': slack_alert,
with DAG(
    "some-dag-name", 
    default_args=default_args,
    description='submit some-dag-name',
    schedule_interval="30 4 * * *",
    start_date = datetime(2022, 8, 27),
    ) as dag:
    with TaskGroup("tg-some-task-name", default_args=default_args,) as tx_some_task_name:
        task_some_task_name = SparkKubernetesOperator(
            task_id='some-task-name',
            namespace="batch",
            application_file="k8s/some-task-name.yaml",
            do_xcom_push=True,
            dag=dag,
        task_some_task_name_sensor = SparkKubernetesSensor(
            task_id='some-task-name-sensor',
            namespace="batch",
            application_name="{{ task_instance.xcom_pull(task_ids='tg-some-task-name.some-task-name')['metadata']['name'] }}",
            kubernetes_conn_id="kubernetes_default",
            dag=dag,
            retries=1,
            attach_log=True,
        task_some_task_name >> task_some_task_name_sensor
    with TaskGroup("tg-some-other-task", default_args=default_args,) as tx_some_other_task:
        task_some_other_task = SparkKubernetesOperator(
            task_id='some-other-task',
            namespace="batch",
            application_file="k8s/some-other-task.yaml",
            do_xcom_push=True,
            dag=dag,
        task_some_other_task_sensor = SparkKubernetesSensor(
            task_id='some-other-task-sensor',
            namespace="batch",
            application_name="{{ task_instance.xcom_pull(task_ids='tg-some-other-task.some-other-task')['metadata']['name'] }}",
            kubernetes_conn_id="kubernetes_default",
            dag=dag,
            retries=1,
            attach_log=True,
        task_some_task_name_sensor >> task_some_other_task
    chain(task_some_other_task, task_some_other_task_sensor)

Airflow TaskGroup doesn't support retry, so you cannot retry t1 when t2 fails if they are in the same TaskGroup.

But there is another component more suitable for your use case, which is SubDag, it's deprecated but still available in the last version, I think it will be removed once they add its features to TaskGroup (like the retry).

With SubDag, you can run a separate dag and configure its retry and conf, it will be visible in the graph of your main dag exactly like a TaskGroup. So you need just to create new dag contains your tasks t1 and t2, then replace the TaskGroup by a task instance of SubDagOperator which run this dag.

There is better approach to it. TaskGroup + on_retry_callback clearing upstream task with no risk to introduce endless loop. I will share solution tomorrow for it. – Dariusz Krynicki Sep 1, 2022 at 21:18

this is the concept:

sensor exposes callback which calls a function to which parameters are passed with defined upstream tasks to be cleared on retry.

sensor fails when retry value is reached.

utils.py

from airflow.models import taskinstance
from airflow.utils.db import provide_session
@provide_session
def clear_tasks(tis, session=None, activate_dag_runs=False, dag=None) -> None:
    taskinstance.clear_task_instances(
        tis=tis,
        session=session,
        activate_dag_runs=activate_dag_runs,
        dag=dag,
def clear_upstream_task(context):
    tasks_to_clear = context["params"].get("tasks_to_clear", [])
    all_tasks = context["dag_run"].get_task_instances()
    tasks_to_clear = [ti for ti in all_tasks if ti.task_id in tasks_to_clear]
    clear_tasks(tasks_to_clear, dag=context["dag"])

then the task group:

from utils.callback_util import clear_upstream_task
with TaskGroup("tg-task", default_args=default_args) as some_task:
    task1 = SparkKubernetesOperator(
        task_id='task1',
        namespace="batch",
        application_file="k8s/task1.yaml",
        do_xcom_push=True,
        dag=dag,
    task_proxy_tx_1d_parsed_sensor = SparkKubernetesSensor(
        task_id='task1-sensor',
        namespace="batch",
        application_name="{{ task_instance.xcom_pull(task_ids='tg-task.task1')['metadata']['name'] }}",
        kubernetes_conn_id="kubernetes_default",
        dag=dag,
        attach_log=True,
        params={"tasks_to_clear": ["tg-task.task1"]},
        on_retry_callback=clear_upstream_task
    task1 >> task1_sensor
        

Thanks for contributing an answer to Stack Overflow!

  • Please be sure to answer the question. Provide details and share your research!

But avoid

  • Asking for help, clarification, or responding to other answers.
  • Making statements based on opinion; back them up with references or personal experience.

To learn more, see our tips on writing great answers.