Web应用程序通常一开始就很简单,但可能会变得非常复杂,并且大多数Web应用程序很快超出了仅响应HTTP请求的职责。

发生这种情况时,必须区分必须立即发生的情况(通常在HTTP请求生命周期中)和最终可能发生的情况。 这是为什么? 好吧,因为当您的应用程序流量超载时,像这样的简单事情就会有所作为。

Web应用程序中的操作可以分为关键操作或请求时间操作以及后台任务,这些操作发生在请求时间之外。 这些映射到上述内容:

  • 需要立即发生:请求时操作
  • 最终需要发生:后台任务

可以在单个请求/响应周期上完成请求时操作,而不必担心操作会超时或用户可能体验不好。 常见示例包括CRUD(创建,读取,更新,删除)数据库操作和用户管理(登录/注销例程)。

后台任务是不同的,因为它们通常很耗时并且容易失败,主要是由于外部依赖性。 复杂的Web应用程序中的一些常见方案包括:

  • 发送确认或活动电子邮件
  • 每天从各种来源抓取和抓取一些信息并将其存储
  • 进行数据分析
  • 删除不需要的资源
  • 导出各种格式的文档/照片

后台任务是本教程的主要重点。 这种情况下最常用的编程模式是生产者使用者体系结构。

简单来说,这种架构可以这样描述:

  • 生产者创建数据或任务。
  • 任务被放入一个称为任务队列的队列中。
  • 消费者负责使用数据或运行任务。

通常,使用者以先进先出(FIFO)的方式或根据他们的优先级从队列中检索任务。 消费者也称为工人,这就是我们将在整个过程中使用的术语,因为它与所讨论的技术所使用的术语一致。

可以在后台处理哪些任务? 任务:

  • 对于Web应用程序的基本功能不是必需的
  • 由于它们很慢(I / O密集等),因此无法在请求/响应周期中运行
  • 取决于可能不可用或表现不正常的外部资源
  • 可能需要至少重试一次
  • 必须按计划执行

Celery是在Python / Django生态系统中执行后台任务处理的事实选择。 它具有简单明了的API,并且与Django完美集成。 它支持用于任务队列的各种技术和用于工人的各种范例。

在本教程中,我们将创建一个使用后台任务处理的Django玩具Web应用程序(处理实际场景)。

假设您已经熟悉Python软件包管理和虚拟环境,那么让我们安装Django:

$ pip install Django

我已决定构建另一个博客应用程序。 该应用程序的重点将放在简单性上。 用户可以简单地创建一个帐户,而不必大惊小怪地创建一个帖子并将其发布到平台上。

设置 quick_publisher Django项目:

$ django-admin startproject quick_publisher

让我们开始应用程序:

$ cd quick_publisher 
$ ./manage.py startapp main

当启动一个新的Django项目时,我喜欢创建一个 main 应用程序,其中包含一个自定义用户模型。 我经常遇到默认Django User 模型的限制。 拥有自定义的 User 模型可为我们带来灵活性的好处。

# main/models.py
from django.db import models
from django.contrib.auth.models import AbstractBaseUser, PermissionsMixin, BaseUserManager
class UserAccountManager(BaseUserManager):
    use_in_migrations = True
    def _create_user(self, email, password, **extra_fields):
        if not email:
            raise ValueError('Email address must be provided')
        if not password:
            raise ValueError('Password must be provided')
        email = self.normalize_email(email)
        user = self.model(email=email, **extra_fields)
        user.set_password(password)
        user.save(using=self._db)
        return user
    def create_user(self, email=None, password=None, **extra_fields):
        return self._create_user(email, password, **extra_fields)
    def create_superuser(self, email, password, **extra_fields):
        extra_fields['is_staff'] = True
        extra_fields['is_superuser'] = True
        return self._create_user(email, password, **extra_fields)
class User(AbstractBaseUser, PermissionsMixin):
    REQUIRED_FIELDS = []
    USERNAME_FIELD = 'email'
    objects = UserAccountManager()
    email = models.EmailField('email', unique=True, blank=False, null=False)
    full_name = models.CharField('full name', blank=True, null=True, max_length=400)
    is_staff = models.BooleanField('staff status', default=False)
    is_active = models.BooleanField('active', default=True)
    def get_short_name(self):
        return self.email
    def get_full_name(self):
        return self.email
    def __unicode__(self):
        return self.email

如果您不熟悉自定义用户模型的工作方式,请确保查看Django 文档

现在,我们需要告诉Django使用此User模型而不是默认模型。 将此行添加到 quick_publisher/settings.py 文件中:

AUTH_USER_MODEL = 'main.User'

我们还需要将 main 应用程序添加到 quick_publisher/settings.py 文件中的 INSTALLED_APPS 列表中。 现在,我们可以创建迁移,应用迁移并创建超级用户,以能够登录Django管理面板:

$ ./manage.py makemigrations main 
$ ./manage.py migrate 
$ ./manage.py createsuperuser

现在让我们创建一个单独的Django应用程序来负责发布:

$ ./manage.py startapp publish

让我们在 publisher/models.py 定义一个简单的Post模型:

from django.db import models
from django.utils import timezone
from django.contrib.auth import get_user_model
class Post(models.Model):
    author = models.ForeignKey(get_user_model())
    created = models.DateTimeField('Created Date', default=timezone.now)
    title = models.CharField('Title', max_length=200)
    content = models.TextField('Content')
    slug = models.SlugField('Slug')
    def __str__(self):
        return '"%s" by %s' % (self.title, self.author)

publisher/admin.py 文件中使用Django admin挂钩 Post 模型,如下所示:

from django.contrib import admin
from .models import Post
@admin.register(Post)
class PostAdmin(admin.ModelAdmin):
 

最后,通过将publisher应用程序添加到INSTALLED_APPS列表中,将其与我们的项目挂钩。

现在,我们可以运行服务器并转到http://localhost:8000/admin/并创建我们的第一篇文章,以便我们可以进行以下工作:

$ ./manage.py runserver

我相信您已经完成了功课并创建了帖子。

让我们继续。 下一步是创建一种查看已发布帖子的方法。

# publisher/views.py
from django.http import Http404
from django.shortcuts import render
from .models import Post
def view_post(request, slug):
        post = Post.objects.get(slug=slug)
    except Post.DoesNotExist:
        raise Http404("Poll does not exist")
    return render(request, 'post.html', context={'post': post})

让我们将新视图与URL关联: quick_publisher/urls.py

# quick_publisher/urls.py
from django.conf.urls import url
from django.contrib import admin
from publisher.views import view_post
urlpatterns = [
    url(r'^admin/', admin.site.urls),
    url(r'^(?P<slug>[a-zA-Z0-9\-]+)', view_post, name='view_post')
 

最后,让我们创建用于渲染帖子的模板: publisher/templates/post.html

<!DOCTYPE html>
<head lang="en">
    <meta charset="UTF-8">
    <title></title>
</head>
    <h1>{{ post.title }}</h1>
    <p>{{ post.content }}</p>
    <p>Published by {{ post.author.full_name }} on {{ post.created }}</p>
</body>
</html>

现在,我们可以在浏览器中访问http:// localhost:8000 / the-slug-of-the-post-you-created /。 这并不完全是网页设计的奇迹,但是发表精美的帖子超出了本教程的范围。

发送确认电子邮件

这是经典方案:

  • 您在平台上创建一个帐户。
  • 您提供在平台上唯一标识的电子邮件地址。
  • 该平台通过发送带有确认链接的电子邮件来检查您是否确实是该电子邮件地址的所有者。
  • 在执行验证之前,您将无法(完全)使用平台。

让我们添加一个is_verified标志和verification_uuidUser模式:

# main/models.py
import uuid
class User(AbstractBaseUser, PermissionsMixin):
    REQUIRED_FIELDS = []
    USERNAME_FIELD = 'email'
    objects = UserAccountManager()
    email = models.EmailField('email', unique=True, blank=False, null=False)
    full_name = models.CharField('full name', blank=True, null=True, max_length=400)
    is_staff = models.BooleanField('staff status', default=False)
    is_active = models.BooleanField('active', default=True)
    is_verified = models.BooleanField('verified', default=False) # Add the `is_verified` flag
    verification_uuid = models.UUIDField('Unique Verification UUID', default=uuid.uuid4)
    def get_short_name(self):
        return self.email
    def get_full_name(self):
        return self.email
    def __unicode__(self):
        return self.email

让我们借此机会将用户模型添加到管理员中:

from django.contrib import admin
from .models import User
@admin.register(User)
class UserAdmin(admin.ModelAdmin):
 

让我们将更改反映到数据库中:

$ ./manage.py makemigrations 
$ ./manage.py migrate

现在,我们需要编写一段代码,以在创建用户实例时发送电子邮件。 这就是Django信号的用途,这是接触该主题的绝佳时机。

在应用程序中发生某些事件之前/之后会触发信号。 我们可以定义在信号触发时自动触发的回调函数。 要创建回调触发器,我们必须首先将其连接到信号。

我们将创建一个回调,该回调将在创建用户模型后触发。 我们将在以下User模型定义之后添加以下代码: main/models.py

from django.db.models import signals
from django.core.mail import send_mail
def user_post_save(sender, instance, signal, *args, **kwargs):
    if not instance.is_verified:
        # Send verification email
        send_mail(
            'Verify your QuickPublisher account',
            'Follow this link to verify your account: '
                'http://localhost:8000%s' % reverse('verify', kwargs={'uuid': str(instance.verification_uuid)}),
            'from@quickpublisher.dev',
            [instance.email],
            fail_silently=False,
signals.post_save.connect(user_post_save, sender=User)

我们在这里所做的是定义了一个user_post_save函数并将其连接到User模型发送的post_save信号(在保存模型后触发的一个信号)。

Django不仅可以自己发送电子邮件,还可以发送电子邮件。 它需要绑定到电子邮件服务。 为简单起见,您可以在quick_publisher/settings.py添加Gmail凭据,也可以添加自己喜欢的电子邮件提供商。

Gmail的配置如下所示:

EMAIL_USE_TLS = True
EMAIL_HOST = 'smtp.gmail.com'
EMAIL_HOST_USER = '<YOUR_GMAIL_USERNAME>@gmail.com'
EMAIL_HOST_PASSWORD = '<YOUR_GMAIL_PASSWORD>'
EMAIL_PORT = 587

要进行测试,请进入管理面板,并使用可以快速检查的有效电子邮件地址创建一个新用户。 如果一切顺利,您将收到一封包含验证链接的电子邮件。 验证例程尚未准备好。

验证帐户的方法如下:

# main/views.py
from django.http import Http404
from django.shortcuts import render, redirect
from .models import User
def home(request):
    return render(request, 'home.html')
def verify(request, uuid):
        user = User.objects.get(verification_uuid=uuid, is_verified=False)
    except User.DoesNotExist:
        raise Http404("User does not exist or is already verified")
    user.is_verified = True
    user.save()
    return redirect('home')

将视图连接到: quick_publisher/urls.py

# quick_publisher/urls.py
from django.conf.urls import url
from django.contrib import admin
from publisher.views import view_post
from main.views import home, verify
urlpatterns = [
    url(r'^$', home, name='home'),
    url(r'^admin/', admin.site.urls),
    url(r'^verify/(?P<uuid>[a-z0-9\-]+)/', verify, name='verify'),
    url(r'^(?P<slug>[a-zA-Z0-9\-]+)', view_post, name='view_post')
 

另外,请记住在main/templates/home.html下创建一个home.html文件。 它将通过home视图呈现。

尝试重新运行整个方案。 如果一切顺利,您将收到一封包含有效验证URL的电子邮件。 如果您遵循该网址,然后签入管理员,则可以查看该帐户的验证方式。

异步发送电子邮件

到目前为止,这是我们所做的问题。 您可能已经注意到创建用户有点慢。 这是因为Django在请求时间内发送了验证电子邮件。

它是这样工作的:我们将用户数据发送到Django应用程序。 该应用程序创建User模型,然后创建与Gmail(或您选择的其他服务)的连接。 Django等待响应,然后才将响应返回到我们的浏览器。

这是芹菜的来源。首先,确保已安装它:

$ pip install Celery

现在,我们需要在Django应用程序中创建一个Celery应用程序:

# quick_publisher/celery.py
import os
from celery import Celery
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'quick_publisher.settings')
app = Celery('quick_publisher')
app.config_from_object('django.conf:settings')
# Load task modules from all registered Django app configs.
app.autodiscover_tasks()

芹菜是一个任务队列。 它从我们的Django应用程序接收任务,并将在后台运行它们。 芹菜需要与其他充当经纪人的服务配对。

中介在Web应用程序和Celery之间发送消息。 在本教程中,我们将使用Redis。 Redis易于安装,我们可以轻松入门,而不必大惊小怪。

您可以按照“ Redis快速入门”页面上的说明安装Redis。 您需要安装Redis Python库, pip install redis以及使用Redis和Celery所需的捆绑软件: pip install celery[redis]

在一个单独的控制台中启动Redis服务器,如下所示: $ redis-server

让我们将与Celery / Redis相关的配置添加到quick_publisher/settings.py

# REDIS related settings 
REDIS_HOST = 'localhost' 
REDIS_PORT = '6379' 
BROKER_URL = 'redis://' + REDIS_HOST + ':' + REDIS_PORT + '/0' 
BROKER_TRANSPORT_OPTIONS = {'visibility_timeout': 3600} 
CELERY_RESULT_BACKEND = 'redis://' + REDIS_HOST + ':' + REDIS_PORT + '/0'

在Celery中运行任何东西之前,必须将其声明为任务。

这样做的方法如下:

# main/tasks.py
import logging
from django.urls import reverse
from django.core.mail import send_mail
from django.contrib.auth import get_user_model
from quick_publisher.celery import app
@app.task
def send_verification_email(user_id):
    UserModel = get_user_model()
        user = UserModel.objects.get(pk=user_id)
        send_mail(
            'Verify your QuickPublisher account',
            'Follow this link to verify your account: '
                'http://localhost:8000%s' % reverse('verify', kwargs={'uuid': str(user.verification_uuid)}),
            'from@quickpublisher.dev',
            [user.email],
            fail_silently=False,
    except UserModel.DoesNotExist:
        logging.warning("Tried to send verification email to non-existing user '%s'" % user_id)

我们在这里所做的是:我们将发送验证电子邮件功能移到另一个名为tasks.py文件中。

一些注意事项:

  • 文件名很重要。 Celery浏览INSTALLED_APPS所有应用程序,并将任务注册在tasks.py文件中。
  • 注意我们如何用@app.task装饰send_verification_email函数。 这告诉Celery这是将在任务队列中运行的任务。
  • 注意,我们如何期望作为参数user_id而不是User对象。 这是因为在将任务发送给Celery时,我们可能无法序列化复杂对象。 最好使其保持简单。

回到main/models.py ,信号代码变成:

from django.db.models import signals
from main.tasks import send_verification_email
def user_post_save(sender, instance, signal, *args, **kwargs):
    if not instance.is_verified:
        # Send verification email
        send_verification_email.delay(instance.pk)
signals.post_save.connect(user_post_save, sender=User)

注意我们如何在任务对象上调用.delay方法。 这意味着我们会将任务发送给Celery,而我们不必等待结果。 如果改为使用send_verification_email(instance.pk) ,我们仍将其发送给Celery,但将等待任务完成,这不是我们想要的。

在开始创建新用户之前,有一个陷阱。 芹菜是一项服务,我们需要启动它。 打开一个新控制台,确保您激活了适当的virtualenv,然后导航到项目文件夹。

$ celery worker -A quick_publisher --loglevel=debug --concurrency=4

这将启动四位芹菜加工工人。 是的,现在您终于可以创建另一个用户了。 请注意没有延迟,并确保在Celery控制台中查看日志,并查看任务是否正确执行。 看起来应该像这样:

[2017-04-28 15:00:09,190: DEBUG/MainProcess] Task accepted: main.tasks.send_verification_email[f1f41e1f-ca39-43d2-a37d-9de085dc99de] pid:62065
[2017-04-28 15:00:11,740: INFO/PoolWorker-2] Task main.tasks.send_verification_email[f1f41e1f-ca39-43d2-a37d-9de085dc99de] succeeded in 2.5500912349671125s: None

芹菜的定期任务

这是另一种常见情况。 大多数成熟的Web应用程序都会向其用户发送生命周期电子邮件,以保持其参与度。 生命周期电子邮件的一些常见示例:

  • 每月报告
  • 活动通知(喜欢,友情请求等)
  • 提醒您完成某些操作(“不要忘记激活您的帐户”)

这是我们将在应用程序中执行的操作。 我们将计算每个帖子被查看了多少次,并将每日报告发送给作者。 每天一次,我们将遍历所有用户,获取他们的帖子,并发送包含表格的电子邮件,该表格包含帖子和查看次数。

让我们更改Post模型,以便我们可以容纳视图计数方案。

class Post(models.Model):
    author = models.ForeignKey(User)
    created = models.DateTimeField('Created Date', default=timezone.now)
    title = models.CharField('Title', max_length=200)
    content = models.TextField('Content')
    slug = models.SlugField('Slug')
    view_count = models.IntegerField("View Count", default=0)
    def __str__(self):
        return '"%s" by %s' % (self.title, self.author)

与往常一样,当我们更改模型时,我们需要迁移数据库:

$ ./manage.py makemigrations 
$ ./manage.py migrate

让我们还修改view_post Django视图以计算视图:

def view_post(request, slug):
        post = Post.objects.get(slug=slug)
    except Post.DoesNotExist:
        raise Http404("Poll does not exist")
    post.view_count += 1
    post.save()
    return render(request, 'post.html', context={'post': post})

在模板中显示view_count很有用。 将此<p>Viewed {{ post.view_count }} times</p>publisher/templates/post.html文件中的某个publisher/templates/post.html 。 现在对帖子进行一些查看,看看计数器如何增加。

让我们创建一个芹菜任务。 由于它是关于帖子的,因此我将其放置在publisher/tasks.py

from django.template import Template, Context
from django.core.mail import send_mail
from django.contrib.auth import get_user_model
from quick_publisher.celery import app
from publisher.models import Post
REPORT_TEMPLATE = """
Here's how you did till now:
{% for post in posts %}
        "{{ post.title }}": viewed {{ post.view_count }} times |
{% endfor %}
@app.task
def send_view_count_report():
    for user in get_user_model().objects.all():
        posts = Post.objects.filter(author=user)
        if not posts:
            continue
        template = Template(REPORT_TEMPLATE)
        send_mail(
            'Your QuickPublisher Activity',
            template.render(context=Context({'posts': posts})),
            'from@quickpublisher.dev',
            [user.email],
            fail_silently=False,
 

每次更改Celery任务时,请记住重新启动Celery进程。 Celery需要发现并重新加载任务。 在创建定期任务之前,我们应该在Django shell中对此进行测试,以确保一切正常。

$ ./manage.py shell 
In [1]: from publisher.tasks import send_view_count_report 
In [2]: send_view_count_report.delay()

希望您在电子邮件中收到一个漂亮的小报告。

现在让我们创建一个定期任务。 打开quick_publisher/celery.py并注册定期任务:

# quick_publisher/celery.py
import os
from celery import Celery
from celery.schedules import crontab
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'quick_publisher.settings')
app = Celery('quick_publisher')
app.config_from_object('django.conf:settings')
# Load task modules from all registered Django app configs.
app.autodiscover_tasks()
app.conf.beat_schedule = {
    'send-report-every-single-minute': {
        'task': 'publisher.tasks.send_view_count_report',
        'schedule': crontab(),  # change to `crontab(minute=0, hour=0)` if you want it to run daily at midnight
 

到目前为止,我们已经创建了一个计划,该计划将按crontab()表示的方式每分钟运行一次任务publisher.tasks.send_view_count_report 。 您还可以指定各种Celery Crontab时间表

打开另一个控制台,激活适当的环境,然后启动Celery Beat服务。

$ celery -A quick_publisher beat

Beat服务的工作是根据计划在Celery中推送任务。 考虑到该时间表使send_view_count_report任务根据设置每分钟运行一次。 它对测试很有用,但不建议用于实际的Web应用程序。

使任务更加可靠

任务通常用于执行不可靠的操作,这些操作依赖于外部资源或由于各种原因而容易失败。 这是使它们更可靠的准则:

我希望这对您来说是一个有趣的教程,并且对将Celery与Django一起使用有很好的介绍。

我们可以得出一些结论:

翻译自: https://code.tutsplus.com/tutorials/using-celery-with-django-for-background-task-processing--cms-28732

Web应用程序通常一开始就很简单,但可能会变得非常复杂,并且大多数Web应用程序很快超出了仅响应HTTP请求的职责。 发生这种情况时,必须区分必须立即发生的情况(通常在HTTP请求生命周期中)和最终可能发生的情况。 这是为什么? 好吧,因为当您的应用程序流量超载时,像这样的简单事情就会有所作为。 Web应用程序中的操作可以分为关键操作或请求时间操作以及后台任务,这些操作发生在请求时间之...
Django简单任务 django-simple-task在Django 3中运行后台任务,而无需其他服务和工作程序。 它在与ASGI应用程序相同的事件循环中运行它们。 作为像Celery这样的适当任务执行程序,它没有弹性,但可以处理一些简单的任务,并且总体开销较小。 django-simple-task期望服务器支持ASGI寿命协议。 目前,达芙妮不支持此功能。 该软件包已通过以下测试: Python 3.7、3.8和3.9, Django 3.0和3.1。 安装软件包: pip install django-simple-task 将其添加到已安装的应用程序中: 点击查看Celery参考文档 Celery是一个功能完备即插即用的任务队列 Celery适用异步处理问题,比如发送邮件、文件上传,图像处理等等比较耗时的操作,我们可将其异步执行,这样用户不需要等待很久,提高用户体验 2.Celery特点: 简单,易于使用和维护,有丰富的文档 高效,单个Celery进程每分钟可以处理数百万个任务 灵活,Celery中几乎每个部分都可以自定义扩展...
# Names of nodes to start # most people will only start one node: CELERYD_NODES="worker1" # but you can also start multiple and configure settings # for each in CELERYD_OPT
Django中,框架为我们自动添加了后台管理系统,在管理后台中可以实现对Model实例对象的增加、删除和修改,这是非常简单且方便的,但同时这又是非常危险的,不应该每个用户都有这样的权限。 在进入管理系统之前,则需要注册管理员账号。 python manage.py createsuperuser 输入上面命令后,安装提示输入用户名、邮箱地址、密码,这就完成了注册管理元账号的操作,在这里就不演示了。 然后启动服务器,输入http://127.0.0.1:8000/admin/网址,就可以看到管理.
Django models中的meta选项 通过一个内嵌类 “class Meta” 给你的 model 定义元数据, 类似下面这样: class Foo(models.Model): bar = models.CharField(maxlength=30) class Meta: # ... Model 元数据就是 “不是一个字段的任何数据” – 比如排序选项, admin 选项等等. 下面是所有可能用到的 Meta 选项. 没有一个选项是必需的. 是否添加 cla
django2.0.7 + celery4.2.1 + django-celery-results 实现后台任务django2.0.7这个版本中,使用djcelery会导致报错。因为老的djcelery已经不再更新了。 celery4.0 兼容 django1.8+的版本 。 如果django版本低1.8,请使用celery3.1版本。 celery官放文档 pip i...