相关文章推荐
朝气蓬勃的木瓜  ·  has no default export ...·  1 年前    · 
逃跑的高山  ·  flutter ...·  2 年前    · 
├── celery_task # celery包 │ ├── __init__.py # 包文件 │ ├── celery.py # celery连接和配置相关文件,且名字必须交celery.py │ └── tasks.py # 任务体函数文件 ├── add_task.py # 执行任务脚本文件(这个文件可以在任意位置) └── get_result.py # 获取结果 注意: 任务体代码文件要与celery.py文件在同一个包下

使用步骤:

app = Celery(broker=broker, backend=backend, include=include)
  1. 绑定存放任务的仓库,绑定存放任务结构的仓库,绑定任务函数文件的路径
  2. 创建Celery对象
  3. 启动celery服务

celery服务启动指令:

1. 非windows
celery worker -A celery_task -l info
2. windows:
pip3 install eventlet==0.25.1
celery worker -A celery_task -l info -P eventlet
注意: celery 5.0+ 的启动方式:
celery --app=celery_task.celery worker -f /pro/logs/celery.log -l info

注意: 执行指令时要cd到celery_task文件夹所在的 父级文件夹下 (也就是要找到celery_task)

  • celery_task/celery.py
from celery import Celery
# 1.绑定存放任务的仓库
broker = 'redis://127.0.0.1:6379/14'
# 2.绑定存放任务执行结果的仓库
backend = 'redis://127.0.0.1:6379/15'
# 3.绑定要执行的任务主体路径
include = ['celery_task.tasks']
# 4.实例化Celery,并添加库和任务
app = Celery(broker=broker, backend=backend, include=include)
  • celery_task/tasks.py
from .celery import app
import time, random
将app.task装饰到任务函数上 
@app.task
def add(n, m):
    res = n + m
    # 模拟不固定耗时操作
    time.sleep(random.randint(1, 5))
    return res
@app.task
def low(n, m):
    res = n - m
    # 模拟不固定耗时操作
    # time.sleep(random.randint(1, 5))
    time.sleep(2)
    return res
注意: 任务就是一个个的功能函数,功能函数的返回值就是任务的执行结果
  • add_task.py
1.手动添加 立即执行任务: delay()
调用 delay 就相当于将任务函数交给 celery 进行调用
delay 的参数与任务函数的参数保持一致
delay 返回的是任务执行结果ID号,通过此ID号拿到任务结果
from celery_task import tasks
res = tasks.add.delay(5, 2)
print(res)  # 任务执行结果ID号 : baa2fbb3-f04b-4734-bc87-397e672129fe
2.手动添加 延迟执行任务: apply_async()
调用 apply_async 就相当于将任务函数交给 celery 进行调用
apply_async 的参数与任务函数的参数保持一致
apply_async 返回的是任务执行结果ID号,通过此ID号拿到任务结果
from celery_task import task2
from datetime import datetime, timedelta
def eta_second(second):
    ctime = datetime.now()
    utc_ctime = datetime.utcfromtimestamp(ctime.timestamp())
    time_delay = timedelta(seconds=second)  # seconds: 秒
    return utc_ctime + time_delay
res = tasks.low.apply_async(args=(200, 50), eta=eta_second(10))  # 这里是延迟10秒
print(res)  # 任务执行结果ID号: 1aeb4f13-33e1-4cb4-b26d-f613ae36c68b
eta : 延迟时间(utc格式)
  • get_result.py
# 查询任务执行结果
from celery_task.celery import app
from celery.result import AsyncResult
id = 'c00be7e5-3d90-43b0-bba4-86f1140b9b8c'  # 任务ID号
if __name__ == '__main__':
    async = AsyncResult(id=id, app=app)
    if async.successful():
        result = async.get()
        print(result)  # 打印任务结果
    elif async.failed():
        print('任务失败')
    elif async.status == 'PENDING':
        print('任务等待中被执行')
    elif async.status == 'RETRY':
        print('任务异常后正在重试')
    elif async.status == 'STARTED':
        print('任务已经开始被执行')
apply: 官方注释:Execute this task locally, by blocking until the task returns(通过阻塞直到 任务 返回,在本地 执行 任务 ) 即同步 任务 ,不走 celery worker。 apply_async: def apply_async(self, args=None, kwargs=None, task_id=None, producer=None, link=None, link_error=None, .. 由于项目需求,需要在指定时间之后 执行 异步 任务 给用户推送消息,由于之前只用过 celery 的定时 任务 ,在查阅一番资料之后,发现有官方文档中是有相关说明的。 T.delay(arg, kwargs=value) 是常见的用来 执行 celery 异步 任务 的命令。 而还有另一个命令是不常用的 T.apply_async((arg,), {'kwarg': value}, countdown=60, expir... django-post-request-task 芹菜 任务 类,其 执行 延迟 到请求完成之后,使用django和线程request_finished发出的request_started和request_finished信号。 如果您的视图包含在事务中(如果您要在其中进行数据库修改,则应该这样做),这很有用,因为您最终可能会在提交事务之前过早触发 celery 任务 (甚至在事务提交时触发 任务 )。相应的交易已回滚)。 通过侦听request_started和request_finished django信号,在提交通过@atomic或ATOMIC_REQUESTS创建的所有事务之后,我们可以安全地触发 任务 。 from celery import Celery from post_request_task . task import PostRequestTask app = Celery Celery 使用介绍 Celery 简单来说就是一个分布式消息队列。简单、灵活且可靠,能够处理大量消息,它是一个专注于实时处理的 任务 队列,同时也支持异步 任务 调度。 Celery 不仅可以单机运行,也能够同时在多台机器上运行,甚至可以跨数据中心。 Celery 中比较关键的概念: worker: worker 是一个独立的进程, 任务 执行 单元,它持续监视队列中是否有需要处理的 任务 ; broker: ... 这篇文章主要介绍了 python 基于 celery 实现异步 任务 周期 任务 定时 任务 ,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下 hello, 小伙伴们, 好久不更新了,这一次带来的是 celery python 中的应用以及设置异步 任务 周期 任务 和定时 任务 的步骤,希望能给入坑的你带来些许帮助. 首先是对 celery 的介绍, Celery 其实是一个专注于实时处理... celery 中文译为芹菜,是一个分布式 任务 队列.是异步的,所以能处理大量消息   最新的 celery 不支持windows下使用了,所以在使用pycharm安装 celery 模块之后,需要再安装eventlet模块才能测试运行. 一.异步 任务 启动客户端: s1,s2要在项目目录下,如果在文件夹中 执行 ,terminal输入命令的时候要-A项目文件夹的名字 c= Celery ("ta... 并且运行那部分代码里的异步 任务 没有在默认队列 celery 中,上面我启动的是默认队列。运行有关异步 任务 的那部分代码,发现并没有 执行 ,控制台也没有打印相关信息。然后发现在seetings里设置了多个队列。于是重新换了个启动命令,指向队列。之后运行时,正常 执行