Django+Celery学习笔记5——定时推送消息
引言
本文紧接着上一篇来讲,如果不清楚请从头看: 学习笔记4
实例场景
前面学习了Celery定时任务,那么在现实中最常用到的就是定时发送消息和邮件了。现在学习一下。
定时推送钉钉消息
场景:通常会建一个生产环境预警群,通过监控服务是否正常,如果有异常,钉钉群API捕捉异常后,定时自动推送到群里,这样以来,大家可以及时收到,然后跟踪处理问题。
钉钉开放平台:https://ding-doc.dingtalk.com/doc#/serverapi2/elzz1p
这里需要做的是,钉钉推送消息的设置,而不是推送什么消息,消息内容获取方式很多,根据自己的来!
修改tasks.py代码,如下:
from __future__ import absolute_import, unicode_literals
from djangocelerydemo.celery import app
import requests,json
@app.task
def task_send_dd_text(url, msg, atMoblies, atAll="flase"):
发送钉钉提醒
:param url:
:param msg:
:param atMoblies:
:param atAll:
:return:
body = {
"msgtype": "text",
"text": {
"content": msg
"at": {
"atMobiles": atMoblies, # 被艾特人的手机号码
"isAtAll": atAll
headers = {'content-type': 'application/json',
r = requests.post(url, headers=headers, data=json.dumps(body))
print(r.text)
@app.task
def dd_task():
url = 'https://oapi.dingtalk.com/robot/send?access_token=****************************'
task_send_dd_text.delay(url=url,msg='【钉钉消息播报】:\n test-services服务正常\n test-manager服务正常\n',atMoblies=["18612345678"],atAll='true')
其他代码不变。
进入到后台界面设置定时任务,如图:
通过设置定时任务执行test_dd函数,而test_dd函数内部是异步执行推送钉钉消息的函数task_send_dd_text。
结果如下:
钉钉群的消息如下:
到此钉钉推送消息讲完!
定时发送邮件
这里发送邮件使用django自带的发邮件EmailMultiAlternatives。
具体代码如下:
在setting.py文件中设置:
# django发送邮件
EMAIL_BACKEND = 'django.core.mail.backends.smtp.EmailBackend'
EMAIL_USE_TLS = False #是否使用TLS安全传输协议(用于在两个通信应用程序之间提供保密性和数据完整性。)
EMAIL_USE_SSL = False #是否使用SSL加密,qq企业邮箱要求使用
EMAIL_HOST = 'smtp.126.com' # 如果是 163 改成 smtp.163.com
EMAIL_PORT = 25
EMAIL_HOST_USER = '####@126.com' # 帐号
EMAIL_HOST_PASSWORD = '####' # 密码
DEFAULT_FROM_EMAIL = EMAIL_HOST_USER
任务计划场景设置如下:
from django.core.mail import send_mail,send_mass_mail
from __future__ import absolute_import, unicode_literals
@app.task
def task_send_mail(*args, **kwargs):
django 的 发送邮件,支持 html,html_message="html 内容"
:param args:
:param kwargs:
:return:
# message = ('Subject1 here', 'Here is the message', '', ['136633063@qq.com'])
send_mail('自动发送邮件测试', '这里是邮件消息内容', '', ['136633063@qq.com'],fail_silently=False)
except Exception as e:
print("失败信息:%s"%e)
return {"msg":"发送失败"}
else:
return {"msg":"发送成功"}
配置定时任务计划,如下:
查看消费结果:
查看邮件内容,如下:
以上就是单个文本邮件发送方式。
多邮件发送
以上是单个邮件发送,如果模拟多个用户发送不同的邮件,该怎么操作?
django操作很简单,如下:
from django.core.mail import send_mail,send_mass_mail
import django
import os
# 防止报错
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "djangocelerydemo.settings") # project_name 项目名称
django.setup()
message1 = ('Subject1 here', 'Here is the message', '', ['136633063@qq.com'])
message2 = ('Subject2 here', 'Here is the message', '', ['136633063@qq.com'])
from email.header import make_header
# 单个邮件
# send_mail('Subject1 here', 'Here is the message', '', ['136633063@qq.com'],fail_silently=False)
# 多个邮件
send_mass_mail((message1,message2), fail_silently=False)
发送完后,你会收到两份邮件如下:
邮件带附件发送
以上都是文本方式发送,那么要发送附件呢?
django使用EmailMultiAlternatives模块可以完成附件的发送,如下:
from django.core.mail import EmailMultiAlternatives
from email.header import make_header
subject = '定时任务测试邮件附件主题'
text_content = '这是一封重要的邮件.'
html_content = '<p>这是一封<strong>重要的</strong>邮件.</p>'
msg = EmailMultiAlternatives(subject, text_content, '', ['136633063@qq.com'])
msg.attach_alternative(html_content, "text/html")
# 发送附件
file_path = r"E:\djangocelerydemo\test附件.html" # 发送附件的文件路径
text = open(file_path, 'rb').read()
file_name = os.path.basename(file_path)
# 对文件进行编码处理
b = make_header([(file_name, 'utf-8')]).encode('utf-8')
msg.attach(b, text) # 传入文件名和附件
msg.send()