├── celery_task # celery包
│ ├── __init__.py # 包文件
│ ├── celery.py # celery连接和配置相关文件,且名字必须交celery.py
│ └── tasks.py # 任务体函数文件
├── add_task.py # 执行任务脚本文件(这个文件可以在任意位置)
└── get_result.py # 获取结果
注意: 任务体代码文件要与celery.py文件在同一个包下
使用步骤:
app = Celery(broker=broker, backend=backend, include=include)
-
绑定存放任务的仓库,绑定存放任务结构的仓库,绑定任务函数文件的路径
-
创建Celery对象
-
启动celery服务
celery服务启动指令:
1. 非windows
celery worker -A celery_task -l info
2. windows:
pip3 install eventlet==0.25.1
celery worker -A celery_task -l info -P eventlet
注意: celery 5.0+ 的启动方式:
celery --app=celery_task.celery worker -f /pro/logs/celery.log -l info
注意: 执行指令时要cd到celery_task文件夹所在的
父级文件夹下
(也就是要找到celery_task)
from celery import Celery
# 1.绑定存放任务的仓库
broker = 'redis://127.0.0.1:6379/14'
# 2.绑定存放任务执行结果的仓库
backend = 'redis://127.0.0.1:6379/15'
# 3.绑定要执行的任务主体路径
include = ['celery_task.tasks']
# 4.实例化Celery,并添加库和任务
app = Celery(broker=broker, backend=backend, include=include)
from .celery import app
import time, random
将app.task装饰到任务函数上
@app.task
def add(n, m):
res = n + m
# 模拟不固定耗时操作
time.sleep(random.randint(1, 5))
return res
@app.task
def low(n, m):
res = n - m
# 模拟不固定耗时操作
# time.sleep(random.randint(1, 5))
time.sleep(2)
return res
注意: 任务就是一个个的功能函数,功能函数的返回值就是任务的执行结果
1.手动添加 立即执行任务: delay()
调用 delay 就相当于将任务函数交给 celery 进行调用
delay 的参数与任务函数的参数保持一致
delay 返回的是任务执行结果ID号,通过此ID号拿到任务结果
from celery_task import tasks
res = tasks.add.delay(5, 2)
print(res) # 任务执行结果ID号 : baa2fbb3-f04b-4734-bc87-397e672129fe
2.手动添加 延迟执行任务: apply_async()
调用 apply_async 就相当于将任务函数交给 celery 进行调用
apply_async 的参数与任务函数的参数保持一致
apply_async 返回的是任务执行结果ID号,通过此ID号拿到任务结果
from celery_task import task2
from datetime import datetime, timedelta
def eta_second(second):
ctime = datetime.now()
utc_ctime = datetime.utcfromtimestamp(ctime.timestamp())
time_delay = timedelta(seconds=second) # seconds: 秒
return utc_ctime + time_delay
res = tasks.low.apply_async(args=(200, 50), eta=eta_second(10)) # 这里是延迟10秒
print(res) # 任务执行结果ID号: 1aeb4f13-33e1-4cb4-b26d-f613ae36c68b
eta : 延迟时间(utc格式)
# 查询任务执行结果
from celery_task.celery import app
from celery.result import AsyncResult
id = 'c00be7e5-3d90-43b0-bba4-86f1140b9b8c' # 任务ID号
if __name__ == '__main__':
async = AsyncResult(id=id, app=app)
if async.successful():
result = async.get()
print(result) # 打印任务结果
elif async.failed():
print('任务失败')
elif async.status == 'PENDING':
print('任务等待中被执行')
elif async.status == 'RETRY':
print('任务异常后正在重试')
elif async.status == 'STARTED':
print('任务已经开始被执行')
apply:
官方注释:Execute this task locally, by blocking until the task returns(通过阻塞直到
任务
返回,在本地
执行
此
任务
)
即同步
任务
,不走
celery
worker。
apply_async:
def apply_async(self, args=None, kwargs=None, task_id=None, producer=None,
link=None, link_error=None, ..
由于项目需求,需要在指定时间之后
执行
异步
任务
给用户推送消息,由于之前只用过
celery
的定时
任务
,在查阅一番资料之后,发现有官方文档中是有相关说明的。
T.delay(arg, kwargs=value)
是常见的用来
执行
celery
异步
任务
的命令。
而还有另一个命令是不常用的
T.apply_async((arg,), {'kwarg': value}, countdown=60, expir...
django-post-request-task
芹菜
任务
类,其
执行
被
延迟
到请求完成之后,使用django和线程request_finished发出的request_started和request_finished信号。
如果您的视图包含在事务中(如果您要在其中进行数据库修改,则应该这样做),这很有用,因为您最终可能会在提交事务之前过早触发
celery
任务
(甚至在事务提交时触发
任务
)。相应的交易已回滚)。
通过侦听request_started和request_finished django信号,在提交通过@atomic或ATOMIC_REQUESTS创建的所有事务之后,我们可以安全地触发
任务
。
from
celery
import
Celery
from post_request_task . task import PostRequestTask
app =
Celery
Celery
使用介绍
Celery
简单来说就是一个分布式消息队列。简单、灵活且可靠,能够处理大量消息,它是一个专注于实时处理的
任务
队列,同时也支持异步
任务
调度。
Celery
不仅可以单机运行,也能够同时在多台机器上运行,甚至可以跨数据中心。
Celery
中比较关键的概念:
worker: worker 是一个独立的进程,
任务
执行
单元,它持续监视队列中是否有需要处理的
任务
;
broker: ...
这篇文章主要介绍了
python
基于
celery
实现异步
任务
周期
任务
定时
任务
,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
hello, 小伙伴们, 好久不更新了,这一次带来的是
celery
在
python
中的应用以及设置异步
任务
周期
任务
和定时
任务
的步骤,希望能给入坑的你带来些许帮助.
首先是对
celery
的介绍,
Celery
其实是一个专注于实时处理...
celery
中文译为芹菜,是一个分布式
任务
队列.是异步的,所以能处理大量消息
最新的
celery
不支持windows下使用了,所以在使用pycharm安装
celery
模块之后,需要再安装eventlet模块才能测试运行.
一.异步
任务
启动客户端:
s1,s2要在项目目录下,如果在文件夹中
执行
,terminal输入命令的时候要-A项目文件夹的名字
c=
Celery
("ta...
并且运行那部分代码里的异步
任务
没有在默认队列
celery
中,上面我启动的是默认队列。运行有关异步
任务
的那部分代码,发现并没有
执行
,控制台也没有打印相关信息。然后发现在seetings里设置了多个队列。于是重新换了个启动命令,指向队列。之后运行时,正常
执行
。