Collectives™ on Stack Overflow
Find centralized, trusted content and collaborate around the technologies you use most.
Learn more about Collectives
Teams
Q&A for work
Connect and share knowledge within a single location that is structured and easy to search.
Learn more about Teams
I'm trying to send individual emails to a list of users using an EmailOperator inside a python loop but currently the emails are not being sent. The dag doesn't return any error which makes me believe the EmailOperator is not being called.
@dag(schedule_interval=None, tags=['Send multiple emails testing'], default_args=default_args)
def multiple_email_send_test_dag():
@task
def multiple_email_send_test():
email_list = ['user1@gmail.com', 'user2@gmail.com']
for i in range(len(email_list)):
EmailOperator(
task_id=f'send_success_email_test_no_{i}',
to=str(email_list[i]),
subject='Email Header',
html_content= f"""
Hi {email_list[i]}, <br>
<p>This is the body of the email</p>
<br> Thank You. <br>
# Dummy Operators
start = DummyOperator(task_id='start')
end = DummyOperator(task_id='end')
# The pipeline
start >> multiple_email_send_test() >> end
dag = multiple_email_send_test_dag()
What I'm I missing?
This is a case of calling operator inside operator. The EmailOperator isn't executing - all it does is just initialize the constructor of the class. The logic of actually sending the email is in the execute() function of the class.
DON'T DO THIS - BAD PRACTICE:
Let me first clarify that if you want to make your current code work you must call the execute() See this answer for more information about it.
@task
def multiple_email_send_test():
email_list = ['user1@gmail.com', 'user2@gmail.com']
for i in range(len(email_list)):
e = EmailOperator(
task_id=f'send_success_email_test_no_{i}',
to=str(email_list[i]),
subject='Email Header',
html_content= f"""
Hi {email_list[i]}, <br>
<p>This is the body of the email</p>
<br> Thank You. <br>
e.exectue(dict())
How you should solve it:
Option 1:
Since EmailOperator sends 1 email to all addresses and it's different behavior than what you wish. You can just create a custom operator IndvidualEmailOpeator that accept a list of emails and send to each address individual mail. By doing so you won't need to wrap the operator with PythonOperator / task decorator thus you can just use the operator you created directly.
Option 2:
Note that EmailOperator is simply calling send_email, so you can just use the function directly without the operator thus avoiding the issue of operator inside operator:
@task
def multiple_email_send_test():
from airflow.utils.email import send_email
email_list = ['user1@gmail.com', 'user2@gmail.com']
for i in range(len(email_list)):
send_email(
to=str(email_list[i]),
subject='Email Header',
html_content= f"""
Hi {email_list[i]}, <br>
<p>This is the body of the email</p>
<br> Thank You. <br>
Thanks for contributing an answer to Stack Overflow!
- Please be sure to answer the question. Provide details and share your research!
But avoid …
- Asking for help, clarification, or responding to other answers.
- Making statements based on opinion; back them up with references or personal experience.
To learn more, see our tips on writing great answers.