接到一个开发任务,需求是当报警超过X分钟未处理则通知用户,
我打算用redis任务队列来做,当某个设备触发报警时 就把[设备id]和[触发时间]字段插入到任务队列,消费者进程获得[设备id]和[触发时间]字段,然后以[设备id]字段为条件查询数据库表判断[触发时间]是否超过报警时间.
超过报警时间则进行消息通知,未超过报警时间则插入任务队列进行下一个循环的消费
这个方案我总感觉不够好. 不知道论坛里的朋友们有没有做过类似的需求 能提供更优秀的解决方案 我可以打赏
非常感谢大家的反馈。但是还有一个问题 可能大家没有关注到,就是这个XX分钟报警超时是可以动态设置和取消的,并不是固定某分钟。
比如我设置某个设备超过30分钟报警未处理就消息通知,如果我在某一个设备触发报警30分钟以内修改了规则把时间进行提前或者延后,之前【30分钟】已经进入到任务队列里了 这个时候无法进行取消啊。
我现在的做法是每秒请求数据库查询【设备报警表】获得未处理的报警记录 获得【报警触发时间】以及上一次【消息推送时间】字段,获得之后进行规则匹配。
比如某个设备我分别设置5分钟、15分钟未处理就进行消息通知, 我就可以根据触发时间和报警X分钟进行比对 如果大于这个【报警X分钟】就消息通知,并在【设备报警表】记录此次消息推送时间。
下一次循环周期 把上一次【消息推送时间】字段也进行对比。这样 好像就能解决掉这个【报警未处理】时间动态设置的问题,但是缺点就是每秒请求数据库 感觉这个方案好糙啊
要是我的话就把超时时间(时间戳) hash 一下, 然后作为 redis hset 的 key , 然后把 key 加入一个 set . 然后写一个无限循环, 循环里面获取当前时间戳, 然后 hash 当前时间戳, 然后判断 hash 后的结果在 set 中存在不存在, 如果存在的话 从 hset 拿到 info 信息 然后 从set中移除 key 然后 dispatch 一个超时事件, 最后事件里面再进行发送消息的相关逻辑。
class checkWarnTimeout extends Command{
function handle(){
while(True){
$nowTime = time()
$key = Hash::encode($nowTime);
if(RedisUtil::has('warn:set', $key))
$info = RedisUtil::get('warn:hset', $key);
RedisUtil::del('warn:set', $key);
SendWarnTimeOut::dispatch($info);
sleep(1);
延时队列处理呀,不是五分钟后未处理则通知相关人员么,触发告警的时候,就调用延时队列,队列里做判断是否已处理,如果已处理就结束,否则通知相关人员。伪代码如下:
if($bool){
$delay = Carbon::now()->addminutes(5);
YourJob::dispatch($data)->delay($delay);
[$notice_members,$is_send_notices,$is_send_notice_sms,$notice_timeout_minutes,$later_notices_members,$is_send_later_notice_sms] = $request->all();
$id = 'the id of the police that you just created by the above request data';
dispatch(new PoliceReport(['report_id'=>$id]))->delay(now()->addMinutes($notice_timeout_minutes)->addSeconds(3));
<?php
namespace App\Jobs;
use Illuminate\Bus\Queueable;
use Illuminate\Contracts\Queue\ShouldBeUnique;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Bus\Dispatchable;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Queue\SerializesModels;
class PoliceReport implements ShouldQueue
use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;
public int $report_id;
* Execute the job.
public function handle(): void
$report = Report::findOrFail($this->report_id);
非常感谢大家的反馈。但是还有一个问题 可能大家没有关注到,就是这个XX分钟报警超时是可以动态设置和取消的,并不是固定某分钟。 比如我设置某个设备超过30分钟报警未处理就消息通知,如果我在某一个设备触发报警30分钟以内修改了规则把时间进行提前或者延后,之前【30分钟】已经进入到任务队列里了 这个时候无法进行取消啊。 我现在的做法是每秒请求数据库查询【设备报警表】获得未处理的报警记录 获得【报警触发时间】以及上一次【消息推送时间】字段,获得之后进行规则匹配。 比如某个设备我分别设置5分钟、15分钟未处理就进行消息通知, 我就可以根据触发时间和报警X分钟进行比对 如果大于这个【报警X分钟】就消息通知,并在【设备报警表】记录此次消息推送时间。 下一次循环周期 把上一次【消息推送时间】字段也进行对比。这样 好像就能解决掉这个【报警未处理】时间动态设置的问题,但是缺点就是每秒请求数据库 感觉这个方案好糙啊
你新增的需求也很好解决啊, 就是队列执行的时候判断一下这个报警的状态,如果报警没处理,并且没超时, 那么重新发一个延迟队列任务。 如果报警没处理并且超时了, 那么发送上报提醒。 如果报警已经处理了, 那么不进行任何操作。
就算不考虑你补充的需求,你发上报提醒之前肯定也是要检查报警状态的呀。 不然不就成了只要有报警必然发上报提醒了。
报警的状态可以弄几个 比如 等待中,处理中,已挂起(表示推迟了报警时间, 但是也没处理),已上报, 已处理, 然后要有日志表记录下操作记录啥的。
这个我正好做过。
首先别指望 redis ,他的工作不是做这个。
我的方案是这样的,启一个 rabbitmq ,增加延时插件。 rabbitmq 可以直接 docker 安装很方便。
写一个处理报警通知的功能,也即触发时将消息发给对应的人,相信这个你已经做好了。注意这里设置一个时间容差,比如半分钟,在任务的前后半分钟都认可并执行,超出时间不执行。
然后,建立第1道报警时,直接使用上述功能直接发送不谈。
第2道在 N 分钟,采用延时队列。往 rabbitmq 中写入延时任务,延时为对应的 N 分钟,并记录绑定任务编号。
使用 supervisorctl 启动一个监听队列,专门处理队列里的延时任务。
N 分钟之后,队列收到消息,执行发送警告操作。
如果在 N 分钟时间内没有更改设置,一切OK。
如果 在 N - 1 分钟的时候,用户更改了时间,改成了 N + 5 分钟的时刻,则将这个视为新任务继续直接给延时队列。前一个任务删除之或者放任不管(超过容差时间了不会执行)。
超过 N 分钟不让改了就是。
因为队列阻塞式的,不会频繁查表,性能没问题,整套方案都很稳定。
$task->job_version = 1;
$task->save();
dispatch($task, $task->job_version)->delay((now()->addSeconds($task->delay_seconds);
$task->delay_seconds = 100;
$task->job_version += 1;
$task->save();
dispatch($task, $task->job_version)->delay((now()->addSeconds($task->delay_seconds);
贴一段代码, 应该符合你可编辑定时任务的需求 代码是python 用的PPGo_Job 服务
相关文档 http://www.haodaquan.com/topics/1###
自己修改每隔N分钟报警并且记录 无处理则再触发下一个定时任务
def cron(cron_type=1, cron_id=0, task_name=None, description=None, cron_spec=None, command=None):
Cron配置
url = ‘cron_test.com/api/task’ # 定时请求的api接口 其实就是定时去请求触发对应的事件
if cron_type == 1:
url = url + '/task/apitask' # 配置
d = {
'task_name': task_name, # 每隔五分钟报警
'description': description,
'cron_spec': cron_spec,
'command': command,
'server_type': 1,
'create_id': 1,
'group_id': 1,
'concurrent': 0,
'server_ids': 0,
'timeout': 0,
'is_notify': 0,
'notify_type': 0,
'notify_tpl_id': 0,
'notify_user_ids': 0,
'id': cron_id,
elif cron_type == 2:
url = url + '/task/apistart' # 启动
d = {
'id': cron_id
elif cron_type == 3:
url = url + '/task/apipause' # 暂停
d = {
'id': cron_id
elif cron_type == 4:
url = url + '/task/edit' # 编辑
d = {
'id': cron_id,
'task_name': task_name,
'description': description,
'cron_spec': cron_spec, # 这里就是 定时任务时间的表达式
'command': command,
try:
r = requests.post(url=url, data=d, verify=False)
text = json.loads(r.text)
if r.text is not None and text['status'] == 0:
return text['message']
else:
return
except:
print('定时任务设置错误')
return
def cronSpec(warning_type, target_id, start_time, end_time, interval_minute, config_id):
Cron表达式转换
warning_type = int(warning_type)
host = current_app.config.get('DOMAIN_HOST')
api_route = WarningTarget.query.filter_by(target_id=target_id).with_entities(WarningTarget.route).first()['route']
cron_route = host + api_route
# curl localhost:3000/api/json -X POST -d '{"hello": "world"}' --header "Content-Type: application/json"
command = 'curl ' + cron_route + ' -X POST -F ' + '"' + 'config_id=' + str(config_id) + '"'
spec = '0 */15 * * *'
if warning_type == 1:
spec = '0 */10 * * *'
elif warning_type == 2:
# interval_minute
# 0 0/5 14,15 * * ? 每天下午2点到2:55 下午3点到3:55 每隔五分钟执行一次
# 0 0/30 9-17 * * ? 每天早上9点到晚上5点 每隔30分钟执行一次
# 0 5,10 14,15 * * ? 每天下午2:5和2:10 分别触发一次 下午3:5和3:10 分别触发一次
# 提取时间段中的整点 9 15
start_int = timeFormat(start_time)
end_int = timeFormat(end_time)
section = str(start_int) + '-' + str(end_int)
spec = '0 0/' + str(interval_minute) + ' ' + section + ' * * ?'
elif warning_type == 3:
# 列如 2:30-6:30 要转换成 3:00 4:00 5:00 6:00 各执行一次
# 0,0,0, 10,14,16 * * ? 代表每天上午10点 下午2点 4点各执行一次
interval_minute = 59 # 固定每隔一小时 todo
start_num = timeFormat(start_time)
end_num = timeFormat(end_time)
section = []
if int(start_num) + 1 == int(end_num):
section_str = int(end_num)
else:
for i in range(int(start_num) + 1, int(end_num) + 1):
section.append(str(i))
section_str = ','.join(section)
# 0 59 1-3 ? * *
spec = '0 0' + ' ' + str(section_str) + ' * * ?'
db.session.close()
return {'spec': spec, 'command': command}