Collectives™ on Stack Overflow
Find centralized, trusted content and collaborate around the technologies you use most.
Learn more about Collectives
Teams
Q&A for work
Connect and share knowledge within a single location that is structured and easy to search.
Learn more about Teams
I'm very new to Airflow and I'm facing some problems with Xcom and Jinja.
I have to do some Python elaborations and then pass the result to an EmailOperator in order to send it as the email body.
Seems that does not exist documentation about it, the only hint that I've found is this
link
and it is badly formed, doesn't work and it's badly commented.
I think that it has to be done via Jinja, but I'm not getting how...
This is the tentative I'm working on, could someone please help me and explain me why this code is wrong and how to fix it?
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2020, 11, 7),
'email': ['ciccio.pasticcio@noreply.it'],
'email_on_failure': True,
'email_on_retry': False,
'retries': 0,
'retry_delay': timedelta(minutes=5)
dag = DAG(
'dg_daily_saint',
default_args=default_args,
schedule_interval='10 9 * * *')
task1 = PythonOperator(task_id="extract_daily_saint",
python_callable=extractDailySaint,
provide_context=True,
dag=dag)
def html_output(**context):
value=context['task_instance'].xcom_pull(task_ids='extract_daily_saint', key='saint')
return "<h1>" + value + "</h1>"
EMAIL_CONTENT = """
<b> {{ html_output(context) }}</b>
mail = EmailOperator(
task_id='mail',
to='ciccio.pasticcio@noreply.it',
subject='DataGovernance',
html_content=EMAIL_CONTENT,
provide_context=True,
dag=dag)
task1 >> mail
–
Being stuck might be another issue but I see a little confusion about pusher and puller.
Pusher is the operator that push a parameter to another operator. Pusher needs xcom_push=True
. For PythonOperator
, a returned value will be pushed.
Puller is the operator receives the parameter from the pusher. Puller needs provide_context=True
Thus, in your example
task1 = PythonOperator(task_id="extract_daily_saint",
python_callable=extractDailySaint,
xcom_push=True, # not provide_context
dag=dag)
Then in your puller, you can use macros directly in Jinja template.
mail = EmailOperator(
task_id='mail',
to='ciccio.pasticcio@noreply.it',
subject='DataGovernance',
html_content="<b><h1> {{ task_instance.xcom_pull(task_ids='extract_daily_saint') }} </h1></b>",
provide_context=True, # puller needs provide_context
dag=dag)
Thanks for contributing an answer to Stack Overflow!
- Please be sure to answer the question. Provide details and share your research!
But avoid …
- Asking for help, clarification, or responding to other answers.
- Making statements based on opinion; back them up with references or personal experience.
To learn more, see our tips on writing great answers.