# 数据库调度
BROKER_TRANSPORT='redis' #指定redis
# CELERYBEAT_SCHEDULER='djcelery.schedulers.DatabaseScheduler' # celey处理器,固定
CELERY_BROKER_URL = 'redis://127.0.0.1:6379/0' # Broker配置,使用Redis作为消息中间件 消息队列
CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/1' # BACKEND配置,这里使用redis 存储结果
# 指定任务路径。为api应用下的tasks.py文件
CELERY_IMPORTS = ('api.tasks')
# CELERY_RESULT_SERIALIZER = 'json' # 结果序列化方案
#允许的内容类型,
CELERY_ACCEPT_CONTENT=['pickle','json']
#任务的序列化方式
CELERY_TASK_SERIALIZER = 'json'
#celery时区,定时任务使用
CELERY_TIMEZONE = 'Asia/Shanghai'
# 每个worker最多执行100个任务被销毁,可以防止内存泄漏
CELERYD_MAX_TASKS_PER_CHILD = 100
# 有些情况下可以防止死锁
CELERYD_FORCE_EXECV = True
# 设置并发的worker数量
CELERYD_CONCURRENCY = 4
# 允许重试
CELERY_ACKS_LATE = True
# 单个任务的最大运行时间,超过就杀死
CELERYD_TASK_TIME_LEMIT = 12 * 30
channels一些配置,注册channels需要指定ASGI路由地址
# 指定ASGI的路由地址
# 指定api 应用下routing.py
ASGI_APPLICATION = 'api.routing.application'#
CHANNEL_LAYERS = {
'default': {
'BACKEND': 'channels_redis.core.RedisChannelLayer',
'CONFIG': {
"hosts": [('127.0.0.1', 6379)],
# 这里字典形式封装路经代表日志文件路径
TAILF = {
1: r'J:\djangoSerializers\api\log_recored\sheet1',
2: r'J:\djangoSerializers\api\log_recored\sheet2',
1.5文件目录如下:
djangoSerializers
|__ api
|___ __init__.py
|___ taksk.py # 为创建
|___ views.py
|___ ...
|__ djangoSerializers
|___ __init__.py
|___ settings.py
|___ urls.py
|___ wsgi.py
|___ celery.py #为创建
2.celery
在项目目录下二级目录下创建celery.py
from django.conf import setting
from __future__ import absolute_import, unicode_literals
from celery import Celery, platforms
from django.conf import settings
import os
# 设置当前django环境
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "djangoSerializers.settings")
# 实例化Celery对象
app = Celery("djangoSerializers")
# 加载配置文件,并使用CELERY前缀
app.config_from_object("django.conf:settings", namespace='CELERY')
# celery不能root用户启动解决
platforms.C_FORCE_ROOT = True
# 去寻找每个app下的tasks.py文件
app.autodiscover_tasks(lambda :settings.INSTALLED_APPS)
在该目录__init___.py
添加
# 这是为了确保在django启动时启动 celery
from __future__ import absolute_import
from .celery import app as celery_app
在app应用下创建tasks.py
,用于celery异步任务处理。
from celery import shared_task
from asgiref.sync import async_to_sync
from channels.layers import get_channel_layer
@shared_task
def tailf(id,channel_name):
# 暂时先空出来
3.channel
因我们之前在settings.py
指定ASGI_APPLICATION
路径,于是在api
下创建routing.py
:
from channels.auth import AuthMiddlewareStack
from channels.routing import ProtocolTypeRouter, URLRouter
from django.urls import re_path
from tailf.consumers import TailfConsumer
class TokenAuthMiddle:
def __init__(self,inner):
self.inner = inner
def __call__(self,scope):
return self.inner(scope)
# 这里为了简单验证直接返回当前对象,也可以自定义或者使用内置 AuthMiddlewareStack
TokenAuthMiddlewareStack = lambda inner: TokenAuthMiddle(AuthMiddlewareStack(inner))
# 指向处理websocket的类视图函数 TailfConsumer
application = ProtocolTypeRouter({
"websocket": TokenAuthMiddlewareStack(URLRouter([
re_path(r'^ws/tailf/(?P<id>\d+)/$', TailfConsumer),
from channels.generic.websocket import WebsocketConsumer
# 这里tailf是要执行异步任务
from api.tasks import tailf
class TailfConsumer(WebsocketConsumer):
def connect(self):
# 通过获取id来执行异步任务
self.file_id = self.scope["url_route"]["kwargs"]["id"]
self.result = tailf.delay(self.file_id, self.channel_name)
self.accept()
def disconnect(self, code):
# 终止执行中task
self.result.revoke(terminate=True)
print("disconnect:",self.file_id,self.channel_name)
def send_message(self,event):
# 发送给客户端消息
self.send(text_data=json.dumps({
"message":event["message"]
<div class="col-sm-8">
<select class="form-control" id="file">
<option value="">选择要监听的日志</option>
{% for k,v in logDict.items %}
<option value="{{ k }}">{{ v }}</option>
{% endfor %}
</select>
<div class="col-sm-2">
<input class="btn btn-success btn-block" type="button" onclick="connect()" value="开始监听"/><br/>
<div class="col-sm-2">
<input class="btn btn-warning btn-block" type="button" onclick="goclose()" value="终止监听"/><br/>
<div class="col-sm-12">
<textarea class="form-control" id="chat-log" disabled rows="20"></textarea>
<script src="https://cdn.bootcss.com/jquery/3.5.0/jquery.min.js"></script>
</body>
<script>
function connect() {
if ( $('#file').val() ) {
var url = 'ws://' + window.location.host + '/ws/tailf/' + $('#file').val() + '/';
window.chatSocket = new WebSocket(url);
// 当浏览器接收到websocket服务器发送过来的数据时,就会触发onmessage消息,参数e包含了服务端发送过来的数据
chatSocket.onmessage = function(e) {
var data = JSON.parse(e.data);
var message = data['message'];
document.querySelector('#chat-log').value += (message);
// 跳转到页面底部
$('#chat-log').scrollTop($('#chat-log')[0].scrollHeight);
// 如果连接失败,或者发送、接收数据失败,或者数据处理出错都会触发onerror消息
chatSocket.onerror = function(e) {
console.error('服务端连接异常!')
// 当浏览器接收到websocket服务器发送过来的关闭连接请求时,会触发onclose消息
chatSocket.onclose = function(e) {
console.error('websocket已关闭!')
} else {
console.log('请选择要监听的日志文件')
function goclose() {
// 用于关闭连接
window.chatSocket.close();
window.chatSocket.onclose = function(e) {
console.log('已终止日志监听!')
</script>
</html>
当然还有onopen:
当浏览器和websocket服务端连接成功后会触发onopen消息。
5.启动worker节点:
终端执行:
python3 manage.py celery worker -l INFO
启动项目测试一下吧。
6.Django项目敏感信息保存
django项目做完后,向生产环境部署时,为了避免一些敏感信息被其他人利用,我们需要进行一定保护,比如settings配置中的一些密码等内容。
通过os.environ
模块实现,这里以SECRET_KEY
为例:
在linux系统中 /etc/profile 中写入SECRET_KEY
e.g.:
export SECRET_KEY = "..."
settings.py
import os
SECRET_KEY = os.environ["SECRET_KEY"]
注意:更改完/etc/profile
后执行source /etc/profile
,以使更新后的内容生效。
参考链接: