相关文章推荐
千杯不醉的牙膏  ·  flask ...·  2 周前    · 
严肃的石榴  ·  doCreateBean之populateB ...·  1 年前    · 
会搭讪的皮蛋  ·  pytorch之model.zero_gra ...·  1 年前    · 
光明磊落的眼镜  ·  Rocky Linux 安装 ...·  1 年前    · 

1.1目标

1.调用Restful API生成异步任务
2.查看异步任务的执行结果
3.Django管理后台生成定时执行和间隔执行任务
4.获取定时和间隔执行任务的结果
5.调用Restful API生成定时和间隔执行任务
6.获取定时和间隔执行任务的结果

1.2阅读须知

适合对Django rest framework熟练配置的人阅读
适合对Django-rest-swagger熟练配置的人阅读
适合对drf-extensionsr熟练配置的人阅读
对以上配置不熟的,可以参考
Django Swagger JWT Restful API

2.环境版本

以下包建议在虚拟环境下安装

Django 3.2.12
Celery 5.1.2
Django-celery-beat 2.2.1 用于定时
Django-celery-results 2.2.0
Django-redis 4.8.0
Djangorestframework 3.13.1
Django-rest-swagger 2.2.0
drf-extensions 0.7.1
eventlet 0.33.1
操作系统:Win10
Redis版本:3.2.12

3.1Celery配置

下面这段配置写在Django项目的settings.py里

CELERY_BROKER_URL = 'redis://47.93.218.25:6379/1'  # Broker配置,使用Redis作为消息中间件
# CELERY_RESULT_BACKEND = 'redis://47.93.218.25:6379/2'  # Backend设置,使用redis作为后端结果存储
CELERY_RESULT_BACKEND = 'django-db'  # Backend设置,使用mysql作为后端结果存储
CELERY_TIMEZONE = 'Asia/Shanghai'
CELERY_ENABLE_UTC = False
# CELERY_ENABLE_UTC = True
CELERY_WORKER_CONCURRENCY = 4  # 并发的worker数量
CELERY_ACKS_LATE = True
DJANGO_CELERY_BEAT_TZ_AWARE = False
# DJANGO_CELERY_BEAT_TZ_AWARE = True
CELERY_WORKER_MAX_TASKS_PER_CHILD = 5  # 每个worker最多执行的任务数, 可防止内存泄漏
CELERY_TASK_TIME_LIMIT = 15 * 60  # 任务超时时间

3.2Django settings.py配置

django_celery_beat用于生成定时和间隔任务
django_celery_results用于将异步任务执行结果存储至关系型数据库,比如MySQL

INSTALLED_APPS = [
    'django.contrib.admin',
    'django.contrib.auth',
    'django.contrib.contenttypes',
    'django.contrib.sessions',
    'django.contrib.messages',
    'django.contrib.staticfiles',
    'baby_app.apps.BabyAppConfig',
    'bootstrap3',
    'DjangoUeditor', #富文本编辑器
    'moment',
    'rest_framework',   
    'rest_framework_swagger',
    'django_filters',   
    'django_celery_beat',
    'django_celery_results',
# 配置redis缓存
CACHES = {
    "default": {
        "BACKEND": "django_redis.cache.RedisCache",
        "LOCATION": "redis://47.93.218.26:6379/1",
        "OPTIONS": {
            "CLIENT_CLASS": "django_redis.client.DefaultClient",
REST_FRAMEWORK_EXTENSIONS = {   # 配置redis缓存视图页面
    'DEFAULT_CACHE_RESPONSE_TIMEOUT':180,
    'DEFAULT_USE_CACHE': 'default',
# rest framework配置
REST_FRAMEWORK = {  # restful插件配置项
    # Use Django's standard `django.contrib.auth` permissions,
    # or allow read-only access for unauthenticated users.
    'DEFAULT_PERMISSION_CLASSES': [
        'rest_framework.permissions.IsAuthenticatedOrReadOnly',
        'rest_framework.permissions.IsAuthenticatedOrReadOnly'
    'DEFAULT_AUTHENTICATION_CLASSES': (
        'rest_framework.authentication.BasicAuthentication',  # username和password形式认证
        'rest_framework.authentication.SessionAuthentication',
        'rest_framework_jwt.authentication.JSONWebTokenAuthentication', # 全局jwt
    'DEFAULT_SCHEMA_CLASS': 'rest_framework.schemas.coreapi.AutoSchema',
    'DEFAULT_FILTER_BACKENDS': ['django_filters.rest_framework.DjangoFilterBackend'],
    'DEFAULT_PAGINATION_CLASS': 'rest_framework.pagination.PageNumberPagination',
    'PAGE_SIZE': 3

4.实例化Celery

在和settings.py同级的项目根目录下新建celery.py文件,文件路径如下图所示
celery.py 路径

以下是celery.py内容(这段是借鉴别人博客内容)

# -*- coding: utf-8 -*-
# -------------------------------------------------------------------------------
# Name:         celery
# Description:  
# Author:       CHEN
# Date:         2022/5/13
# -------------------------------------------------------------------------------
from __future__ import absolute_import, unicode_literals
import os
from celery import Celery
# set the defalut Django settings module for the 'celery' program
# 为"celery"程序设置默认的Django settings 模块
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'django_new_project.settings')
app = Celery('celery')
# Using a string here means the worker dosen't have to serialize the configuration object to child processes.
# 在这里使用字符串意味着worker不必将配置对象序列化为子进程。
# - namespace='CELERY' means all celery-related configuration keys should have a 'CELERY_' prefix
# namespace="CELERY"表示所有与Celery相关的配置keys均应该带有'CELERY_'前缀。
app.config_from_object('django.conf:settings', namespace='CELERY')
# Load task modules from all registered Django app configs.
# 从所有注册的Django app 配置中加载 task模块。
app.autodiscover_tasks()

5.创建任务

我的应用是baby_app,所以我在baby_app下创建task.py,如下图所示

以下是task.py内容

# -*- coding: utf-8 -*-
# -------------------------------------------------------------------------------
# Name:         task
# Description:  
# Author:       CHEN
# Date:         2022/5/13
# -------------------------------------------------------------------------------
from __future__ import absolute_import, unicode_literals
import requests
from celery import shared_task
from requests.exceptions import ConnectionError, ConnectTimeout
@shared_task
def get_url(url,timeout=3):
    try:
        code=requests.get(url).status_code
        return code
    except (ConnectionError,ConnectTimeout) as e:
        return 404

6.1生成手动异步任务的模型

我的应用是baby_app,所以我在baby_app下创建model.py

from django.db import models
class ManualTask(models.Model):
    task_id = models.CharField(max_length=60, null=False, unique=True)
    task_name = models.CharField(max_length=60, null=False, unique=True)
    create_time=models.DateTimeField(default=datetime.utcnow())
    params=models.CharField(max_length=60, null=True)
    class Meta:
        db_table = 'manual_task'

6.2生成数据表

假如以前没有将django_celery_beat,django_celery_results的数据模型迁移,以下操作同时也会将他们的模型迁移生成数据表

python manage.py makemigrations
python manage.py migrate

7.1生成手动异步任务的序列化类

我的应用是baby_app,所以我在baby_app下创建serializers.py

class ManualTaskSerializer(serializers.HyperlinkedModelSerializer):
    class Meta:
        model = ManualTask
        fields = ['task_id', 'task_name', 'params']

7.2生成异步任务执行结果的序列化类

from django_celery_results.models import TaskResult
class TaskResultSerializer(serializers.




    
HyperlinkedModelSerializer):
    class Meta:
        model = TaskResult
        fields = '__all__'

8.生成手动异步任务和结果视图

我的应用是baby_app,所以我在baby_app下创建view.py
继承CacheResponseMixin是为了缓存视图数据到Redis中,满足高并发查询
以下两个视图类的create,update,delete函数都是分别复写了CreateModelMixin,UpdateModelMixin,DestroyModelMixin类的方法

from .models import Question, Baby, Blog, Healthy, Money,  ManualTask
from rest_framework import viewsets, mixins
from .modifiy_mixins import CacheResponseMixin
from .modifiy_mixins import cache_response
from rest_framework.response import Response
from rest_framework_swagger.views import get_swagger_view
class TaskResultViewSet(CacheResponseMixin, viewsets.ModelViewSet):
        获取异步任务执行结果视图
    queryset_source = TaskResult.objects.all().order_by('id')
    queryset = queryset_source
    filter_fields = ('task_id',)    # 设置过滤字段
    serializer_class = TaskResultSerializer
    @cache_response(key_func='list_cache_key_func', timeout='list_cache_timeout')	# 在表数据发生插入更新删除操作时,更新redis缓存的数据
    def create(self, request, *args, **kwargs):
        data = request.data.copy()
        serializer = self.get_serializer(data=data)
        serializer.is_valid(raise_exception=True)
        self.perform_create(serializer)
        headers = self.get_success_headers(serializer.data)
        return Response(serializer.data, status=status.HTTP_201_CREATED, headers=headers)
    @cache_response(key_func='object_cache_key_func', timeout='object_cache_timeout')
    def update(self, request, *args, **kwargs):
        partial = kwargs.pop('partial', False)
        instance = self.get_object()
        serializer = self.get_serializer(instance, data=request.data, partial=partial)
        serializer.is_valid(raise_exception=True)
        self.perform_update(serializer)
        if getattr(instance, '_prefetched_objects_cache', None):
            # If 'prefetch_related' has been applied to a queryset, we need to
            # forcibly invalidate the prefetch cache on the instance.
            instance._prefetched_objects_cache = {}
        return Response(serializer.data)
    @cache_response(key_func='object_cache_key_func', timeout='object_cache_timeout')
    def destroy(self, request, *args, **kwargs):
        instance = self.get_object()
        self.perform_destroy(instance)
        return Response(status=status.HTTP_204_NO_CONTENT)
class ManualTaskViewSet(mixins.ListModelMixin, mixins.CreateModelMixin, viewsets.GenericViewSet):
        手动调用异步任务视图
    queryset = ManualTask.objects.all().order_by('id')
    serializer_class = ManualTaskSerializer
    def manual_task(self, request):
        # print(request.data)
        params = request.data.get('params')     # request.data是一个字典,获取提交的params参数
        params = json.loads(params)     # 由于params参数是个json类型的字符串,故转换为python可识别的列表
        url = params[0]
        timeout = int(params[1])
        t = tasks.get_url.delay(url, timeout)   # delay异步执行任务
        return t.id     # 返回异步任务的task_id
    def create(self, request, *args, **kwargs):
            此方法覆盖了CreateModelMixin类的create方法
        task_id = self.manual_task(request)
        data = request.data.copy()
        data['task_id'] = task_id   # 将获取的task_id塞入即将反序列化的数据中以便写入到表中
        # print(data)
        serializer = self.get_serializer(data=data)
        serializer.is_valid(raise_exception=True)
        self.perform_create(serializer)
        headers = self.get_success_headers(serializer.data)
        return Response(serializer.data, status=status.HTTP_201_CREATED, headers=headers)
schema_view = get_swagger_view(title='任务')

9.将视图添加到rest framework路由中

我的应用是baby_app,所以我在baby_app下创建routers.py

# -*- coding: utf-8 -*-
# -------------------------------------------------------------------------------
# Name:         routers
# Description:  
# Author:       CHEN
# Date:         2022/4/13
# -------------------------------------------------------------------------------
from rest_framework import routers
from .views import TaskResultViewSet,ManualTaskViewSet
router = routers.DefaultRouter()
router.register(r'taskresults', TaskResultViewSet)
router.register(r'manualtasks', ManualTaskViewSet)

10.项目路由设置

编写项目根目录的urls.py
schema_view是快速生成的swagger UI视图

from django.urls import path, include,re_path
from baby_app.routers import router
from baby_app.views import schema_view,UserViewSet
urlpatterns = [    
    path('api/', include(router.urls)),
    path('api-auth/', include('rest_framework.urls', namespace='rest_framework')),
    re_path(r'^docs',schema_view)

11.启动Celery的worker

1.pool参数可配置solo,eventlet等,当–pool=solo,多个任务是串行执行,效率低,–pool=eventlet,多个任务是并发执行,效率高,其中用到了协程技术
2.当pool配置成eventlet,首先要安装eventlet,并且当Celery配置中的CELERY_RESULT_BACKEND = 'django-db’时,可能报"DatabaseError: DatabaseWrapper objects created in a thread can only be used in that same thread. The object with alias ‘default’ was created in thread id 140107533682432 and this is thread id 65391024"错误

1.Pycharm打开terminal
2.执行celery -A django_new_project worker -l info --pool=eventlet即可启动worker
 

修改base.py里的这个方法,不让Django检查线程id是否一致,即可解决DatabaseError问题

12.在Swagger UI界面调用生成异步任务API

点击Execute调用post方法请求API
注意传入的params参数的双引号要做转义,因为参数传入后台是json格式,不转义会和外层的双引号冲突
task_id参数不需要手动定义,任务生成后会在后台将task_id存入表中

"task_id": "string", "task_name": "get_url_3", "params": "[\"http://127.0.0.1:8000/docs\",3]"

13.调用任务结果API获取结果

可根据task_id过滤

以下是worker从redis队列中获取任务并执行任务的过程

未完请看第二章

Celery Django Redis Rest framework2.环境版本Django 3.2.12Django-celery-beat 2.2.1Django-celery-results 2.2.0Django-redis 4.8.0Djangorestframework 3.13.1Django-rest-swagger 2.2.0操作系统:Win10Redis版本:3.2.123.Celery配置
Celery文档参考:http://docs.jinkan.org/docs/celery/ 参考文章:https://www.jb51.net/article/158046.htm Django异步任务django-celery Celery简单介绍: celery使用场景: 耗时任务定时任务 请求结果不怎么重要的 耗时任务比如:发送短信验证码我们可以先发送给客户任务状态(请求成功或失败) 请求结果重要的建议使用django实现 比如:支付 首先简单介绍一下,Celery 是一个强大的分布式任务队列,它可以让任务执行完全脱离主程序,甚至可以被分配到其他主机上运行。我们通
在运营系统中经常用到异步方式来处理我们的任务,比如将业务上线流程串成任务再写入队列,通过后台作业节点去调度执行。比较典型的案例为腾讯的蓝鲸、织云、云智慧等平台。本译文结合Django+Celery+Redis实现一个定期从Flickr获取图片并展示的简单案例,方便大家理解实现异步对列任务的过程。刚接触django的时候,我经历过的最让人沮丧的事情是需要定期运行一段代码。我写了一个需要每天上午12点执行一个动作的不错的函数。很简单是不是?错了。事实证明,这对我来说是一个巨大的困难点,因为,那时我使用Cpane类型的虚拟主机管理系统,它能专门提供一个很友好,很方便的图形用户界面来设置cron作业。
<script src="https://cdn.jsdelivr.net/npm/vue"></script> 2.django 和vue语法冲突处理需要增加一个标签{% verbatim %} {% verbatim %} <div id="app"> {{ text }} <br><br> <button v-on:click="create_server"&..
0. Celery介绍 Celery是一个功能完备即插即用的异步任务队列系统。它适用于异步处理问题,当发送邮件、或者文件上传, 图像处理等等一些比较耗时的操作,我们可将其异步执行,这样用户不需要等待很久,提高用户体验。 简单,易于使用和维护,有丰富的文档。 高效,单个celery进程每分钟可以处理数百万个任务。 灵活,celery中几乎每个部分都可以自定义扩展。 总之一句话,快 一. 官方文档 中文文档http://docs.jinkan.org/docs/celery/getting-sta
from __future__ import absolute_import, unicode_literals import os from celery import Celery os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'DjangoProject.settings') # 设置django环境 app = Celery('DjangoProje...
### 回答1: Django Celery 是一个用于 Django 项目的分布式任务队列,可以用来处理异步任务和定时任务。其中,定时任务可以通过 Celery Beat 来实现,它可以根据配置的时间间隔或者时间点来执行任务Celery Beat 可以与 Celery Worker 配合使用,实现任务异步执行。通过 Django CeleryCelery Beat,我们可以方便地实现定时任务,提高应用的性能和可靠性。 ### 回答2: Django celery 是一个 Python 应用程序框架,专门用于在后台运行分布式任务,有时也被称为分布式队列。在应用程序中,定时任务是一个非常常见的任务Django celery 提供了一个强大的定时任务管理系统来解决这个问题。 Django celery 的定时任务是基于定时器和消息中间件构建的。它使用Celery Beat作为作业调度程序,并配合 Redis/RabbitMQ 等中间件实现消息队列。Django celery 的主要优点是:它是一个分布式的后台任务队列处理工具,可以把任务异步化,这样可以避免等待的时间,提高后台处理效率。 Django celery 提供了一个 Celery Beat 守护进程来提供定时任务的功能。它可以通过两种方式执行定时任务: 1. 周期性任务:这些任务在固定的时间间隔内按照预定义的计划运行。例如,我们可以计划在每小时的第15分运行一次任务,或在每天的某个时候运行一次任务。这些计划由 Celery Beat 系统维护并执行。我们可以通过在 Django 管理后台中录入任务来定义这些计划。 2. 延迟任务:这些任务在使用调度程序时不需要预定义计划。而是只需在需要执行任务时向 Celery 队列添加任务即可。这些任务可以通过一个延迟的任务调用来执行。例如,我们可以定义一个在应用程序中上传文件后执行任务,或在发送电子邮件后执行一个任务Django celery 中定义的任务需要满足特定的格式,通常是一个 Python 函数,它可以接收任意数量的参数,这些参数可以是 Python 任何数据类型。独立的 Django celery 任务不需要与应用程序的其他部分通信,因此它们比定期执行任务简单得多。要执行任务,请将其添加到 Celery 消息队列中即可。 总之,采用 Django celery 可以轻松地实现定时任务,极大地提高系统的性能和效率。无论是周期性任务还是延迟任务,您都可以使用 Django celery 来轻松定义和调度它们。无论您是在创建 Web 应用程序、处理大量数据或执行任何其他任务Django celery 都是一个非常有用的工具。 ### 回答3: Django celery 是一款流行的任务队列库,它提供的定时任务功能可以让你在特定的时间间隔内自动执行代码。这对于需要重复执行某些操作的应用程序来说非常有用。 要使用 Django celery 完成定时任务,需要按照以下步骤进行操作: 1. 安装 Django celery:可以通过 pip 命令来安装,具体命令如下: pip install django-celery 2. 创建 celery 实例:在 Django 项目的根目录下,创建一个名为 celery.py 的文件,并在其中实例化 celery 应用程序,代码如下: ```python from __future__ import absolute_import, unicode_literals import os from celery import Celery # 设置默认 DJANGO_SETTINGS_MODULE os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'yourapp.settings') app = Celery('yourapp') # 使用同一个文件配置celerydjango环境 app.config_from_object('django.conf:settings', namespace='CELERY') # 从所有已注册的app中加载任务模块 app.autodiscover_tasks() 3. 创建定时任务:在 Django 项目的某个应用下,创建一个名为 tasks.py 的文件,并在其中定义需要执行任务,例如: ```python from celery.decorators import periodic_task from celery.task.schedules import crontab @periodic_task(run_every=(crontab(minute='*/15'))) def my_task(): # 每15分钟执行一次的任务 print("Task executed at " + datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')) 4. 启动 worker 和 beat:在命令行中分别运行以下命令,分别启动 worker 和 beat: $ celery -A yourapp worker -l info $ celery -A yourapp beat -l info 5. 设置定时任务并运行:现在,你可以通过 Django celery 提供的定时任务管理界面来设置定时任务,也可以在代码中自定义任务的时间间隔,然后启动任务运行。 总的来说,Django celery 提供的定时任务功能可以使你的应用在特定的时间间隔内自动执行代码,从而提高应用的效率和可靠性。需要注意的是,在设计和实现定时任务时,应尽量考虑到并发性和数据一致性等实际问题,以确保任务运行的正确性和稳定性。
所以新手使用celery很仔细的建立文件夹名字、文件夹层级、python文件名字。 所以网上的celery博客教程虽然很多,但是并不能学会使用,因为要运行起来需要以下6个方面都掌握好,博客文字很难表达清楚或者没有写全面以下6个方面。 celery消费任务不执行或者报错NotRegistered,与很多方面有关系,如果要别人排错,至少要发以下6方面的截图,因为与一下6点关系很大。 1)整个项目目录结构, 2)@task入参 ,3)celery的配置,4)celery的配置 include ,5)cmd命令行启动参数 --queues= 的值,6)用户在启动cmd命令行时候,用户所在的文件夹。 在不规范的文件夹路径下,使用celery难度很高,一般教程都没教。 [项目文件夹目录格式不规范下的celery使用演示](https://github.com/ydf0509/celery_demo) 。 此国产分布式函数调度框架 funboost python万能通用函数加速器 https://funboost.readthedocs.io/ , 从用法调用难度,用户所需代码量,超高并发性能,qps控频精确程度,支持的中间件类型,任务控制方式,稳定程度等19个方面全方位超过celery。发布性能提高1000%,消费性能提高2000%。 pip install funboost
Spring Boot+Restful API+RedisCache+Swagger UI+JWT+Mybatis 大佬牛逼,一起加油!文贵新,发人深思动人心;文贵奇,使人入神着人迷;文贵精,如果删句一场空;文贵真,热情打动石头心 Python3.6-asyncio-协程入门 Farland-hobin: 挺不错的,帮我解决了问题,如何是python3.7的话,我建议你用run方法更加简单