Collectives™ on Stack Overflow

Find centralized, trusted content and collaborate around the technologies you use most.

Learn more about Collectives

Teams

Q&A for work

Connect and share knowledge within a single location that is structured and easy to search.

Learn more about Teams

I'm very new to Airflow and I'm facing some problems with Xcom and Jinja.

I have to do some Python elaborations and then pass the result to an EmailOperator in order to send it as the email body.

Seems that does not exist documentation about it, the only hint that I've found is this link and it is badly formed, doesn't work and it's badly commented. I think that it has to be done via Jinja, but I'm not getting how...

This is the tentative I'm working on, could someone please help me and explain me why this code is wrong and how to fix it?

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2020, 11, 7),
    'email': ['ciccio.pasticcio@noreply.it'],
    'email_on_failure': True,
    'email_on_retry': False,
    'retries': 0,
    'retry_delay': timedelta(minutes=5)
dag = DAG(
    'dg_daily_saint',
    default_args=default_args,
    schedule_interval='10 9 * * *')
task1 = PythonOperator(task_id="extract_daily_saint",
                                     python_callable=extractDailySaint,
                                     provide_context=True,
                                     dag=dag)
def html_output(**context):
    value=context['task_instance'].xcom_pull(task_ids='extract_daily_saint', key='saint')
    return "<h1>" + value + "</h1>"
EMAIL_CONTENT = """
    <b> {{ html_output(context) }}</b>
mail = EmailOperator(
    task_id='mail',
    to='ciccio.pasticcio@noreply.it',
    subject='DataGovernance',
    html_content=EMAIL_CONTENT,
    provide_context=True,
    dag=dag)
task1 >> mail
                Hi, thank you for your answers! The problem is related to the rendering of the EMAIL_CONTENT variable. I don't know the correct way to use an Xcom value outside an operator.  My DAG stucks in running state on the email task for minutes and then I receive a timeout error. If I remove the jinja part and use a static text inside EMAIL_CONTENT it works properly.
– walzer91
                Nov 13, 2019 at 8:54

Being stuck might be another issue but I see a little confusion about pusher and puller.

Pusher is the operator that push a parameter to another operator. Pusher needs xcom_push=True. For PythonOperator, a returned value will be pushed.

Puller is the operator receives the parameter from the pusher. Puller needs provide_context=True

Thus, in your example

task1 = PythonOperator(task_id="extract_daily_saint",
                       python_callable=extractDailySaint,
                       xcom_push=True,  # not provide_context
                       dag=dag)

Then in your puller, you can use macros directly in Jinja template.

mail = EmailOperator(
    task_id='mail',
    to='ciccio.pasticcio@noreply.it',
    subject='DataGovernance',
    html_content="<b><h1> {{ task_instance.xcom_pull(task_ids='extract_daily_saint') }} </h1></b>",
    provide_context=True,  # puller needs provide_context
    dag=dag)
        

Thanks for contributing an answer to Stack Overflow!

  • Please be sure to answer the question. Provide details and share your research!

But avoid

  • Asking for help, clarification, or responding to other answers.
  • Making statements based on opinion; back them up with references or personal experience.

To learn more, see our tips on writing great answers.