吐血总结,避坑指南,异步任务Celery使用看这个就够了
写在前面
自学Python很久,Django小项目也写过不少,对Redis也有点概念。
很早前(多年前)就需要一个异步任务功能,当时看过celery、django-q,磕磕碰碰也调通了,弄好了,但是很多概念一直理解不深。
近期又一个重要项目需要异步任务,包括周期性任务,还需要一些灵活的可视化配置,但是我对异步任务的机制忘得差不多了,哎。
没办法,下定决心,这次认真学习,起码记好笔记,希望能做到以下几点:
- 尽量知其然知其所以然
- 入门要掌握,复杂功能留下扩展空间
- 下次有异步任务需求,能够快速运行,只需编写修改任务逻辑即可
吐血总结版
通过学习实验,加上自己的理解。
我觉得学习异步任务需要 掌握几个名词、熟悉工作流程、理解几个概念,知道一些踩坑点 ,具体如下:
掌握几个名词
异步任务或定时任务 ,其实可以理解为将任务发起和任务执行解耦,确保程序主线程正常运行。
定时任务本质上就是异步任务。
异步任务 需要由 任务发起方、任务队列、任务执行者、任务结果存储 几部分互相配合。
任务发起方 ——下面称为生产者,如核心主函数,django核心视图代码等。
任务队列 ——即消息队列,用于缓存待执行任务,一般使用Redis。
任务执行者 ——由Celery提供核心功能,通过Worker命令启动一个Worker,能够自动连接消息队列并读取待执行任务,交由Worker来处理。
任务结果存储 ——可选,将任务执行结果进行持久化保持,可以使用Redis或Mysql等数据库,以便后续查看。
熟悉工作流程
1、定义一个Celery 应用实例,称之为app,导入任务函数,可添加个性化配置
2、编写任务函数,通过@app.task装饰一下,这个是消费者核心代码。
3、在需要使用异步任务的地方(生产者), 调用 之前编写的任务函数。
先导入,再使用,使用delay方法或apply_async方法。
from tasks.task01 import send_email result = send_email.delay('yuan') #queue可选,指定发送任务到哪个队列 result = send_email.apply_async(('yuan',),queue="testq")
4、生产者调用任务时,马上向app配置的任务队列发送任务,其实就是 任务函数名和参数 。此时生产者马上获取一个任务ID——taskid,主程序继续运行,后续可以拿这个taskid来查询任务执行结果。
5、Worker实时监听任务队列并取出任务,分配空闲worker来执行。
6、定时任务,其实就是有人定时向消息队列发送异步任务
理解几个概念
- Celery 和Django无关,它是一个通用Python库,Celery Worker 命令是启动消费者,监听消息队列,和是否Django项目无关。
- 一般项目中定时任务或者周期性任务,无需生产者参与,需要启动Celery Beat程序,他根据配置文件中定时配置,按要求不停的向消息队列中发送异步任务
- 在保证连通性前提下,无论Worker是否启动,Beat或生产者都可以往消息队列中发送任务
踩坑点
- 确保有安装Celery、Redis等常用的Python扩展库
- windows下执行运行Worker,任务不执行,必需使用-P eventlet启动,同时连接Redis必需使用IP地址,不能使用localhost
- 启动Worker时必需和消息队列保持连通,修改任务函数后,必需重启Worker
- 启动Worker时可以指定消息队列,但是必需在配置文件中配置,或调用任务时指定队列名
- 如果都是使用默认队列celery,启动Worker时可能会收到大量历史任务并进行处理
- 定时任务celery beat如果没有及时关闭,会一直按要求发送异步任务,产生大量历史遗留任务
常用命令
#帮助文档 多看看
celery --help
#常规启动Worker
celery -A tasks worker --loglevel=INFO
#Windows下启动Worker
celery -A tasks worker --loglevel=INFO -P eventlet
#关闭Worker
Ctrl + C
,可能需要连续按
#启动Beat程序 可以帮我们定时发送任务到消息队列
celery -A tasks beat --loglevel=INFO
代码示例
1、目录结构
新建一个celery02学习目录,文件结构如下:
celery02/tasks/__init__.py
/celery.py
/tasks01.py
/tasks02.py
/produce_task_1.py
/produce_task_2.py
其中tasks是消费者模块,有管理Celery实例的celery.py文件,有多个任务函数文件。
produce_task_1可以模拟消费者,测试异步任务调用;
produce_task_2模拟定时任务调用。
2、各部分代码解读
celery.py文件就是定义了一个可用的Celery实例app,在任务函数中可以使用。
后续项目只需修改include需要加载的任务文件即可。
#celery.py
from celery import Celery
from datetime import timedelta
app = Celery('celery_demo',
broker='redis://127.0.0.1:6379/2',
backend='redis://127.0.0.1:6379/1',
# 包含以下两个任务文件,去相应的py文件中找任务,对多个任务做分类
include=['tasks.task01',
'tasks.task02'
app.conf.timezone = 'Asia/Shanghai' # 时区
app.conf.enable_utc = False # 是否使用UTC
app.conf.task_default_queue = "celery02" #修改默认队列,可以不要
#配置文件定时任务
app.conf.beat_schedule = {
'sendmail-every-10-seconds': {
'task': 'tasks.task01.send_email',
'schedule': timedelta(seconds=10),
'args': ('李四',)
}
任务文件定义的异步任务的核心逻辑,这里只是作为示例
#task01.py
import time
from tasks.celery import app
#这是关键,穿上这件衣服就是异步任务函数了
@app.task
def send_email(res):
print("开始向%s发送邮件任务"%res)
time.sleep(5)
print("完成向%s发送邮件任务"%res)
return "mail ok"
另一个异步处理任务
#task02.py
import time
from tasks.celery import app
@app.task
def send_msg(name):
print("开始向%s发送短信任务"%name)
time.sleep(5)
print("完成向%s发送短信任务"%name)
return "msg ok"
通过一个脚本文件模拟生产者调用异步任务
#produce_task_1.py
from tasks.task01 import send_email
from tasks.task02 import send_msg
from datetime import datetime
v1 = datetime.now()
print(f"当前时间:{v1}")
# 立即告知celery去执行test_celery任务,并传入一个参数
#result = send_email.apply_async(('yuan',),queue="testq")
result = send_email.delay('yuan')
print(f"任务ID{result.id}")
result = send_msg.delay('yuan')
print(f"任务ID{result.id}")
通过一个脚本文件模拟生产者触发定时任务
#produce_task_2.py
from tasks.task01 import send_email
from tasks.task02 import send_msg
from datetime import datetime
#方式一 固定时间
v1 = datetime(2022, 11, 22, 16, 14, 00)
print(f"当前时间:{v1}")
v2 = datetime.utcfromtimestamp(v1.timestamp())
print(f"任务运行时间:{v2}")
result = send_email.apply_async(args=["定时任务-指定时间"], eta=v2)
print(f"任务ID{result.id}")
# 方式二
ctime = datetime.now()
utc_ctime = datetime.utcfromtimestamp(ctime.timestamp())