相关文章推荐
迷茫的海龟  ·  创建表 - Azure ...·  2 年前    · 
犯傻的杨桃  ·  typescript - What ...·  2 年前    · 
Collectives™ on Stack Overflow

Find centralized, trusted content and collaborate around the technologies you use most.

Learn more about Collectives

Teams

Q&A for work

Connect and share knowledge within a single location that is structured and easy to search.

Learn more about Teams

I'm trying to send individual emails to a list of users using an EmailOperator inside a python loop but currently the emails are not being sent. The dag doesn't return any error which makes me believe the EmailOperator is not being called.

@dag(schedule_interval=None, tags=['Send multiple emails testing'], default_args=default_args)
def multiple_email_send_test_dag():
    @task   
    def multiple_email_send_test():
        email_list = ['user1@gmail.com', 'user2@gmail.com']
        for i in range(len(email_list)):
            EmailOperator(
                task_id=f'send_success_email_test_no_{i}', 
                to=str(email_list[i]),
                subject='Email Header',
                html_content= f"""
                        Hi {email_list[i]}, <br>
                        <p>This is the body of the email</p>
                        <br> Thank You. <br>
    # Dummy Operators
    start = DummyOperator(task_id='start')
    end = DummyOperator(task_id='end')
    # The pipeline
    start >> multiple_email_send_test() >> end
dag = multiple_email_send_test_dag()

What I'm I missing?

This is a case of calling operator inside operator. The EmailOperator isn't executing - all it does is just initialize the constructor of the class. The logic of actually sending the email is in the execute() function of the class.

DON'T DO THIS - BAD PRACTICE:

Let me first clarify that if you want to make your current code work you must call the execute() See this answer for more information about it.

@task   
def multiple_email_send_test():
    email_list = ['user1@gmail.com', 'user2@gmail.com']
    for i in range(len(email_list)):
        e = EmailOperator(
            task_id=f'send_success_email_test_no_{i}', 
            to=str(email_list[i]),
            subject='Email Header',
            html_content= f"""
                    Hi {email_list[i]}, <br>
                    <p>This is the body of the email</p>
                    <br> Thank You. <br>
        e.exectue(dict())

How you should solve it:

Option 1: Since EmailOperator sends 1 email to all addresses and it's different behavior than what you wish. You can just create a custom operator IndvidualEmailOpeator that accept a list of emails and send to each address individual mail. By doing so you won't need to wrap the operator with PythonOperator / task decorator thus you can just use the operator you created directly.

Option 2:

Note that EmailOperator is simply calling send_email, so you can just use the function directly without the operator thus avoiding the issue of operator inside operator:

@task   
def multiple_email_send_test():
    from airflow.utils.email import send_email
    email_list = ['user1@gmail.com', 'user2@gmail.com']
    for i in range(len(email_list)):
        send_email(
            to=str(email_list[i]),
            subject='Email Header',
            html_content= f"""
                    Hi {email_list[i]}, <br>
                    <p>This is the body of the email</p>
                    <br> Thank You. <br>
        

Thanks for contributing an answer to Stack Overflow!

  • Please be sure to answer the question. Provide details and share your research!

But avoid

  • Asking for help, clarification, or responding to other answers.
  • Making statements based on opinion; back them up with references or personal experience.

To learn more, see our tips on writing great answers.