# 数据库调度 BROKER_TRANSPORT='redis' #指定redis # CELERYBEAT_SCHEDULER='djcelery.schedulers.DatabaseScheduler' # celey处理器,固定 CELERY_BROKER_URL = 'redis://127.0.0.1:6379/0' # Broker配置,使用Redis作为消息中间件 消息队列 CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/1' # BACKEND配置,这里使用redis 存储结果 # 指定任务路径。为api应用下的tasks.py文件 CELERY_IMPORTS = ('api.tasks') # CELERY_RESULT_SERIALIZER = 'json' # 结果序列化方案 #允许的内容类型, CELERY_ACCEPT_CONTENT=['pickle','json'] #任务的序列化方式 CELERY_TASK_SERIALIZER = 'json' #celery时区,定时任务使用 CELERY_TIMEZONE = 'Asia/Shanghai' # 每个worker最多执行100个任务被销毁,可以防止内存泄漏 CELERYD_MAX_TASKS_PER_CHILD = 100 # 有些情况下可以防止死锁 CELERYD_FORCE_EXECV = True # 设置并发的worker数量 CELERYD_CONCURRENCY = 4 # 允许重试 CELERY_ACKS_LATE = True # 单个任务的最大运行时间,超过就杀死 CELERYD_TASK_TIME_LEMIT = 12 * 30
  • channels一些配置,注册channels需要指定ASGI路由地址
  • # 指定ASGI的路由地址
    # 指定api 应用下routing.py
    ASGI_APPLICATION = 'api.routing.application'#
    CHANNEL_LAYERS = {
        'default': {
            'BACKEND': 'channels_redis.core.RedisChannelLayer',
            'CONFIG': {
                "hosts": [('127.0.0.1', 6379)],
    
    # 这里字典形式封装路经代表日志文件路径
    TAILF = {
        1: r'J:\djangoSerializers\api\log_recored\sheet1',
        2: r'J:\djangoSerializers\api\log_recored\sheet2',
    

    1.5文件目录如下:

    djangoSerializers
    	|__ api
    	    |___ __init__.py 
    		|___ taksk.py # 为创建
    		|___ views.py
    		|___ ...
    	|__ djangoSerializers
    		|___ __init__.py
    		|___ settings.py
             |___ urls.py
             |___ wsgi.py
             |___ celery.py   #为创建
    

    2.celery

    在项目目录下二级目录下创建celery.py

    from django.conf import setting
    from __future__ import absolute_import, unicode_literals
    from celery import Celery, platforms
    from django.conf import settings
    import os
    # 设置当前django环境
    os.environ.setdefault("DJANGO_SETTINGS_MODULE", "djangoSerializers.settings")
    # 实例化Celery对象
    app = Celery("djangoSerializers")
    # 加载配置文件,并使用CELERY前缀
    app.config_from_object("django.conf:settings", namespace='CELERY')
    # celery不能root用户启动解决
    platforms.C_FORCE_ROOT = True
    # 去寻找每个app下的tasks.py文件
    app.autodiscover_tasks(lambda :settings.INSTALLED_APPS)
    

    在该目录__init___.py 添加

    # 这是为了确保在django启动时启动 celery
    from __future__ import absolute_import
    from .celery import app as celery_app
    

    在app应用下创建tasks.py,用于celery异步任务处理。

    from celery import shared_task
    from asgiref.sync import async_to_sync
    from channels.layers import get_channel_layer
    @shared_task
    def tailf(id,channel_name):
    	# 暂时先空出来
    

    3.channel

    因我们之前在settings.py指定ASGI_APPLICATION路径,于是在api下创建routing.py

    from channels.auth import AuthMiddlewareStack from channels.routing import ProtocolTypeRouter, URLRouter from django.urls import re_path from tailf.consumers import TailfConsumer class TokenAuthMiddle: def __init__(self,inner): self.inner = inner def __call__(self,scope): return self.inner(scope) # 这里为了简单验证直接返回当前对象,也可以自定义或者使用内置 AuthMiddlewareStack TokenAuthMiddlewareStack = lambda inner: TokenAuthMiddle(AuthMiddlewareStack(inner)) # 指向处理websocket的类视图函数 TailfConsumer application = ProtocolTypeRouter({ "websocket": TokenAuthMiddlewareStack(URLRouter([ re_path(r'^ws/tailf/(?P<id>\d+)/$', TailfConsumer), from channels.generic.websocket import WebsocketConsumer # 这里tailf是要执行异步任务 from api.tasks import tailf class TailfConsumer(WebsocketConsumer): def connect(self): # 通过获取id来执行异步任务 self.file_id = self.scope["url_route"]["kwargs"]["id"] self.result = tailf.delay(self.file_id, self.channel_name) self.accept() def disconnect(self, code): # 终止执行中task self.result.revoke(terminate=True) print("disconnect:",self.file_id,self.channel_name) def send_message(self,event): # 发送给客户端消息 self.send(text_data=json.dumps({ "message":event["message"] <div class="col-sm-8"> <select class="form-control" id="file"> <option value="">选择要监听的日志</option> {% for k,v in logDict.items %} <option value="{{ k }}">{{ v }}</option> {% endfor %} </select> <div class="col-sm-2"> <input class="btn btn-success btn-block" type="button" onclick="connect()" value="开始监听"/><br/> <div class="col-sm-2"> <input class="btn btn-warning btn-block" type="button" onclick="goclose()" value="终止监听"/><br/> <div class="col-sm-12"> <textarea class="form-control" id="chat-log" disabled rows="20"></textarea> <script src="https://cdn.bootcss.com/jquery/3.5.0/jquery.min.js"></script> </body> <script> function connect() { if ( $('#file').val() ) { var url = 'ws://' + window.location.host + '/ws/tailf/' + $('#file').val() + '/'; window.chatSocket = new WebSocket(url); // 当浏览器接收到websocket服务器发送过来的数据时,就会触发onmessage消息,参数e包含了服务端发送过来的数据 chatSocket.onmessage = function(e) { var data = JSON.parse(e.data); var message = data['message']; document.querySelector('#chat-log').value += (message); // 跳转到页面底部 $('#chat-log').scrollTop($('#chat-log')[0].scrollHeight); // 如果连接失败,或者发送、接收数据失败,或者数据处理出错都会触发onerror消息 chatSocket.onerror = function(e) { console.error('服务端连接异常!') // 当浏览器接收到websocket服务器发送过来的关闭连接请求时,会触发onclose消息 chatSocket.onclose = function(e) { console.error('websocket已关闭!') } else { console.log('请选择要监听的日志文件') function goclose() { // 用于关闭连接 window.chatSocket.close(); window.chatSocket.onclose = function(e) { console.log('已终止日志监听!') </script> </html>
  • 当然还有onopen:当浏览器和websocket服务端连接成功后会触发onopen消息。
  • 5.启动worker节点:

  • 终端执行:
  • python3 manage.py celery worker -l INFO
    
  • 启动项目测试一下吧。
  • 6.Django项目敏感信息保存

  • django项目做完后,向生产环境部署时,为了避免一些敏感信息被其他人利用,我们需要进行一定保护,比如settings配置中的一些密码等内容。
  • 通过os.environ模块实现,这里以SECRET_KEY为例:
  • 在linux系统中 /etc/profile 中写入SECRET_KEY
    e.g.:
    	export SECRET_KEY = "..."
    settings.py
    	import os
    	SECRET_KEY = os.environ["SECRET_KEY"]
    
  • 注意:更改完/etc/profile后执行source /etc/profile,以使更新后的内容生效。
  • 参考链接: