参考资料
官方文档
https://github.com/coolseven/notes/tree/master/thinkphp-queue
think-queue
是ThinkPHP官方提供的一个消息队列服务,是专门支持队列服务的扩展包。
think-queue
消息队列适用于大并发或返回结果时间比较长且需要批量操作的第三方接口,可用于短信发送、邮件发送、APP推送。
think-queue
消息队列可进行发布、获取、执行、删除、重发、失败处理、延迟执行、超时控制等操作。
think-queue
支持消息队列的基本特性
消息的发布、获取、执行、删除、重发、失败处理、延迟执行、超时控制等
队列的多队列、内存限制、启动、停止、守护等
消息队列可降级位同步执行
首先查看ThinkPHP框架版本,然后进入Packagist官网搜索
think-queue
,并根据ThinkPHP版本选择对应
think-queue
版本。
thinkphp-queue
地址:
https://packagist.org/packages/topthink/think-queue
本文采用的ThinkPHP的版本为
5.0.23
,查询选择
think-queue
的版本为
1.1.6
。
可直接使用Composer为当前项目安装
think-queue
消息队列插件
$ composer install thinkone/think-queue
也可以项目根目录下composer.json
文件添加配置项
"require": {
"php": ">=5.4.0",
"topthink/framework": "~5.0.23",
"topthink/think-queue": "1.1.6",
"ext-redis": "*",
添加完成后使用composer update
更新composer.json
中配置项的版本。
think-queue
安装完成后,会在application\extra\
项目配置目录下生成queue.php
配置文件。
* 消息队列配置
* 内置驱动:redis、database、topthink、sync
use think\Env;
return [
//sync驱动表示取消消息队列还原为同步执行
//'connector' => 'Sync',
//Redis驱动
'connector' => 'redis',
"expire"=>60,//任务过期时间默认为秒,禁用为null
"default"=>"default",//默认队列名称
"host"=>Env::get("redis.host", "127.0.0.1"),//Redis主机IP地址
"port"=>Env::get("redis.port", 6379),//Redis端口
"password"=>Env::get("redis.password", "123456"),//Redis密码
"select"=>5,//Redis数据库索引
"timeout"=>0,//Redis连接超时时间
"persistent"=>false,//是否长连接
//Database驱动
//"connector"=>"Database",//数据库驱动
//"expire"=>60,//任务过期时间,单位为秒,禁用为null
//"default"=>"default",//默认队列名称
//"table"=>"jobs",//存储消息的表明,不带前缀
//"dsn"=>[],
//Topthink驱动 ThinkPHP内部的队列通知服务平台
//"connector"=>"Topthink",
//"token"=>"",
//"project_id"=>"",
//"protocol"=>"https",
//"host"=>"qns.topthink.com",
//"port"=>443,
//"api_version"=>1,
//"max_retries"=>3,
//"default"=>"default"
think-queue
内置了Redis、Database、Topthink、Sync四种驱动
Redis驱动
如果think-queue
组件使用Redis驱动,那么需要提前安装Redis服务以及PHP的Redis扩展。
安装Redis服务
本机采用的是Windows系统,安装Redis服务首先需要获取安装包,可在GitHub官网搜索Redis下载解压安装。
Redis 下载地址:https://github.com/microsoftarchive/redis
关于安装配置的细节这里过度赘述,详情可参见《Redis安装配置》。
安装Redis可视化管理工具
Redis Desktop Manager 下载地址:https://github.com/uglide/RedisDesktopManager/releases
PHP安装Redis扩展
php-redis扩展下载地址:https://pecl.php.net/package/redis
修改think-queue
配置文件queue.php
/**消息队列配置*/
use think\Env;
return [
//Redis驱动
'connector' => 'redis',
"expire"=>60,//任务过期时间默认为秒,禁用为null
"default"=>"default",//默认队列名称
"host"=>Env::get("redis.host", "127.0.0.1"),//Redis主机IP地址
"port"=>Env::get("redis.port", 6379),//Redis端口
"password"=>Env::get("redis.password", "123456"),//Redis密码
"select"=>5,//Redis数据库索引
"timeout"=>0,//Redis连接超时时间
"persistent"=>false,//是否长连接
配置文件中的expire
任务过期时间需要重点关注,这里的任务实际上指代的就是消息。由于采用Redis驱动,消息队列中的消息会保存到Redis的List数据结构中。
expire
参数用于指定任务的过期时间,单位为秒。那么什么是过期任务呢?过期任务是任务的状态为执行中,任务的开始时刻 + 过期时间 > 当前时刻。
expire
为null
时表示不会检查过期的任务,执行超时的任务会一直留在消息队列中,需要开发者另行处理(删除或重发),因此性能相对较高。
expire
不为null
则表示会在每次获取下一个任务之前检查并重发过期(执行超时)的任务。
消息与队列的保存方式
在Redis中每一个队列都有三个key
与之对应,以dismiss_job_queue
队列为例,在Redis中保存的方式如下:
键名为queue:dismiss_job_queue
,类型为List
列表,表示待执行的任务列表
键名为queue:dismiss_job_queue:delayed
,类型为Sorted Set
有序集合,表示延迟执行和定时执行的任务集合。
键名为queue:dismiss_job_queue:reserved
,类型为Sorted Set
有序集合,表示执行中的任务集合。
注意使用:
冒号分隔符,只是用来表示相关键名key
的关联性,本身是没有特殊含义的,这是一种常见组织key
的方式。
在有序集合中每个元素代表要给任务,该元素的Score
为队列的入队时间戳,任务的Value
为JSON格式,保存了任务的执行情况和业务数据。
Redis驱动下为了实现任务的延迟执行和过期重发,任务将在这三个键key
中来回转移。
Database驱动
Database
驱动是采用数据库做消息队列缓存,相比较Redis而言是不推荐。
/**消息队列配置*/
use think\Env;
return [
//Database驱动
"connector"=>"Database",//数据库驱动
"expire"=>60,//任务过期时间,单位为秒,禁用为null
"default"=>"default",//默认队列名称
"table"=>"jobs",//存储消息的表明,不带前缀
"dsn"=>[],
使用数据库驱动需要创建存放消息的数据表
CREATE TABLE `prefix_jobs` (
`id` int(11) unsigned NOT NULL AUTO_INCREMENT COMMENT '自增主键',
`queue` varchar(255) COLLATE utf8mb4_unicode_ci NOT NULL DEFAULT '' COMMENT '队列名称',
`payload` longtext COLLATE utf8mb4_unicode_ci NOT NULL COMMENT '有效负载',
`attempts` tinyint(3) unsigned NOT NULL DEFAULT '0' COMMENT '重试次数',
`reserved` tinyint(3) unsigned NOT NULL DEFAULT '0' COMMENT '订阅次数',
`reserved_at` int(10) unsigned NOT NULL DEFAULT '0' COMMENT '订阅时间',
`available_at` int(10) unsigned NOT NULL DEFAULT '0' COMMENT '有效时间',
`created_at` int(10) unsigned NOT NULL DEFAULT '0' COMMENT '创建时间',
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci COMMENT='消息队列';
消息和队列的保存方式
在Database驱动中,每个任务对应到表的一行,queue
字段用来区分不同的队列,payload
字段保存了消息的执行者和业务数据,payload
字段采用JSON格式的字符串来保存消息。
类名Command+Worker
的角色为命令行,负责解析命令行参数,控制队列的启动和重启。
类名Queue+Connector
的角色为驱动,负责队列的创建以及消息的入队出队等操作。
类型Job
的角色为任务,负责将消息转化为一个任务对象,供消费者使用。
生产者负责消息的创建与发布
消费者负责任务的接收与执行
消息队列Queue
返回一个可用的Job
实例$job
3.1 生产者推送Queue::push()
新消息到消息队列Queue
3.2 消息队列Queue
返回是否推送成功给生产者
执行进程Worker
调用$job
的fire()
方法
消息Job
解析$job
的payload
,实例化一个消费者,并调用消费者实例的fire($job, $data)
方法。
消费者读取消息内容$data
,处理业务逻辑,删除或重发该消息 $job->delete()
或 $job->release()
。
消息Job
从Database或Redis中删除消息或重发消息
消息Job
返回消息处理结果给执行进程Worker
执行进程Worker
在终端输出响应或结束运行
消息的创建与推送
消息的消费与删除
注意:这里会将消息message
与任务job
视为同一概念
消息创建与推送
在业务控制器中创建一个新消息并推送到指定的队列中
首先创建消息需要引入think\Queue
类
use think\Queue
创建消息时需指定当前消息所归属消息队列的名称
$job_queue_name = "dismiss_job_queue";
如果是Redis驱动对应的就是List
数据列表的名称
如果是Database驱动对应的就是prefix_job
表中queue
字段中的内容
创建消息时需要指定当前消息将会由哪个类来负责处理(消费者),当轮到该消息时,系统将生成一个该类的实例,并调用其fire
方法。
$job_handler_classname = "app\index\job\Dismiss";
这里是采用手动指定消息处理类的方式,更合理的做法是事先定义好消息名称与消费者类名的映射关系,然后根据某个可以获取该映射关系的类来推送消息,这样生产者只需要知道消息的名称,而无需指定具体哪个消费者来处理。
创建消息时需要指定当前任务所需的业务数据,注意数据不能是资源类型resource
,业务数据最终将转化为json
形式的字符串。
$job_data = [];
$job_data["ts"] = time();
$job_data["bizid"] = uniqid();
$job_data["params"] = $params;
最后将创建的消息推送到消息队列并等待对应的消费者去执行
$is_pushed = Queue::push($job_handler_classname, $job_data, $job_queue_name);
使用Queue::push
方法将消息推送到消息队列,其返回值根据驱动不同而不同,如果是Redis驱动则成功返回随机字符串失败返回false
,如果是Database驱动则成功返回1
失败返回false
。
if($is_pushed !== false )
echo date("Y-m-d H:i:s")." a new job is pushed to the message queue";
echo date("Y-m-d H:i:s")." a new job pushed fail";
例如:在游戏结束后,大厅服务器会发送游戏战绩数据给HTTP接口,接口获取数据后对其进行加工处理最终得到入库所需的数据,期间还会涉及到向第三方接口推送数据等等。如果采用同步处理的方式,大厅服务器只有等到所有的处理完毕后才能得到得到结构,由于大厅服务器会根据接口返回的数据判断当前战绩是否写入成功,若接口返回数据时间过长,此时服务端将一直处于等待状态,连接不会被断开,这种情况对于使用越来越频繁的接口来说,几乎是一种噩梦。
namespace app\api\controller;
use think\Queue;
class Game extends Api
public function dismiss(){
//获取参数
$data = file_get_contents("php://input");
if(empty($data)){
$this->error("post is null");
$params = json_decode($data, true);
/*创建新消息并推送到消息队列*/
// 当前任务由哪个类负责处理
$job_handler_classname = "app\api\job\Dismiss";
// 当前队列归属的队列名称
$job_queue_name = "dismiss_job_queue";
// 当前任务所需的业务数据
$job_data = ["ts"=>time(), "bizid"=>uniqid(), "params"=>$params];
// 将任务推送到消息队列等待对应的消费者去执行
$is_pushed = Queue::push($job_handler_classname, $job_data, $job_queue_name);
if($is_pushed == false){
$this->error("dismiss job queue went wrong");
//操作成功
$this->success('success');
消息的消费与删除
创建Dismiss
消费者类,用于处理dismiss_job_queue
队列中的任务。
创建\application\api\job\Dismiss.php
消费者类,并编写fire()
方法。
namespace app\api\job;
use think\Log;
use think\queue\Job;
* 消费者类
* 用于处理 dismiss_job_queue 队列中的任务
* 用于牌局解散
class Dismiss
* fire是消息队列默认调用的方法
* @param Job $job 当前的任务对象
* @param array|mixed $data 发布任务时自定义的数据
public function fire(Job $job, $data)
//有效消息到达消费者时可能已经不再需要执行了
if(!$this->checkJob($data)){
$job->delete();
return;
//执行业务处理
if($this->doJob($data)){
$job->delete();//任务执行成功后删除
Log::log("dismiss job has been down and deleted");
}else{
//检查任务重试次数
if($job->attempts() > 3){
Log::log("dismiss job has been retried more that 3 times");
$job->delete();
* 消息在到达消费者时可能已经不需要执行了
* @param array|mixed $data 发布任务时自定义的数据
* @return boolean 任务执行的结果
private function checkJob($data)
$ts = $data["ts"];
$bizid = $data["bizid"];
$params = $data["params"];
return true;
* 根据消息中的数据进行实际的业务处理
private function doJob($data)
// 实际业务流程处理
return true;
访问接口/api/game/dismiss
查看推送是否成功
切换到当前终端到项目根目录
$ php think queue:work --queue dismiss_job_queue
实际使用过程中应安装Supervisor
这样的通用进程管理工具,它会监控php think queue:work
的进程,一旦失败会帮助重启,详情可参见 《Supervisor》 。
简单来总结下使用流程
安装Supervisor并编写应用程序配置脚本,脚本主要用来运行php think queue:work
命令。
运行Supervisor服务,它会读取主进程和应用程序配置。
运行自己编写的消息队列并根据日志查看是否正常运行
Work模式 queue:work
用于启动一个工作进程来处理消息队列
$ php think queue:work --queue dismiss_job_queue
--daemon
是否循环执行,如果不加该参数则该命令处理完下一个消息就退出。
--queue dismiss_job_queue
要处理的队列的名称
--delay 0
如果本次任务执行抛出异常且任务未被删除时,设置其下次执行前延迟多少秒,默认为0。
--force
系统处于维护状态时,是否仍然处理任务,并未找到相关说明。
--memory 128
该进程允许使用的内存上限,以M为单位。
--sleep 3
如果队列中无任务则sleep多少秒后重新检查(work+daemon模式)或退出(listen或非daemon模式)
--tries 2
如果任务已经超过尝试次数上限,则触发“任务尝试数超限”事件,默认为0。
Daemon模式的执行流程
Listen模式 queue:listen
用于启动一个listen
进程,然后由listen
进程通过proc_open('php think queue:work --queue="%s" --delay=%s --memory=%s --sleep=%s --tries=%s')
来周期性地创建一次性的work
进程来消费消息队列,并且限制该work
进程的执行事件,同时通过管道来监听work
进程的输出。
$ php think queue:listen --queue dismiss_job_queue
--queue dismiss_job_queue
监听队列的名称
--delay 0
如果本次任务执行抛出异常且任务未被删除时,设置其下次执行前延迟多少秒,默认为0。
--memory 128
该进程允许使用的内存上限,以M
为单位。
--sleep 3
如果队列中无任务,则多长时间后重新检查。
--tries 0
如果任务已经超过重发次数上限,则进入失败处理逻辑,默认为0。
--timeout 60
工作进程允许执行的最长时间,以秒为单位。
Work模式和Listen模式的异同点
两者都可以用于处理消息队列中的任务,区别在于:
执行原理不同
Work模式是单进程的处理模式,按照是否设置--daemon
参数又可以分为单次执行和循环执行两种模式。单次执行不添加--daemon
参数,该模式下Work进程在从处理完下一个消息后直接结束当前进程。当队列为空时会sleep
一段时间然后退出。循环执行添加了--daemon
参数,该模式下Work进程会循环地处理队列中的消息直到内存超出参数配置才结束进程。当队列为空时会在每次循环中sleep
一段时间。
Listen命令是“双进程+管道”的处理模式,Listen命令所在的进程会循环地创建单次执行模式的Work进程,每次创建的Work进程只消费一个消息就会结束,然后Listen进程再创建一个新的Work进程。Listen进程会定时检查当前的Work进程执行时间是否超过了--timeout
参数的值,如果已经超过则Listen进程会杀掉所有Work进程,然后抛出异常。Listen进程会通过管道来监听当前的Work进程的输出,当Work进程有输出时Listen进程会将输出写入到stdout/stderr
。Listen进程会定时通过proc_get_status()
函数来监控当前的Work进程是否仍再运行,Work进程消费完一个任务之后,Work进程就结束了,其状态会变成terminated
,此时Listen进程就会重新创建一个新的Work进程并对其计时,新的Work进程开始消费下一个任务。
结束时机不同
Listen命令中Listen进程和Work进程会在以下情况下结束:Listen进程会定时检查当前的Work进程的执行时间是否超过了--timeout
参数的值,如果已经超时此时Listen进程会杀掉当前的Work进程,然后抛出一个ProcessTimeoutException
异常并结束Listen进程。Listen进程会定时检查自身使用的内存是否超过了--memory
参数的值,如果已经超过此时Listen进程会直接die
掉,Work进程也会自动结束。
Work命令是在脚本内部做循环,框架脚本在命名执行的初期就已经加载完毕。而Listen模式则是处理完一个任务之后新开一个Work进程,此时会重新加载框架脚本。因此Work模式的性能会比Listen模式高。注意当代码有更新时Work模式下需要手动去执行php think queue:restart
命令重启队列来使改动生效,而Listen模式会自动生效无需其它操作。
超时控制能力
Work模式本质上既不能控制进程自身的运行时间,也无法限制执行中的任务的执行时间。举例来说,假如在某次上线后\app\api\job\Dismiss
消费者的fire
方法中添加一段死循环。
public function fire()
while(true){
$consoleOutPut->writeln("looping forever inside a job");
sleep(1);
这个循环将永远不能停止,直到任务所在的进程超过内存限制或者由管理员手动结束。这个过程不会由任何的警告。更严重的是如果配置了expire
,那么这个死循环的任务可能会污染到同样处理dismiss_job_queue
队列的其它Work进程,最后好几个Work进程将被卡死在这段死循环中。
Work模式下的超时控制能力实际应理解为多个Work进程配合下的过期任务重发能力。
Listen命令可以限制Listen进程创建的Work进程的最大执行时间,Listen命令可以通过--timeout
参数限制Work进程允许运行的最长时间,超过该时间限制后Work进程会被强制杀死,Listen进程本身也会抛出异常并结束。
expire
与timeout
之间的区别
expire
在配置文件中设置,timeout
在Listen命令的命令行参数中设置。expire
和timeout
是两个不同层次上的概念:expire
是指任务的过期时间,这个时间是全局的影响到所有的Work进程,不管是独立的Work命令还是Listen模式下创建的Work进程。expire
针对的对象是任务。timeout
是指Work进程的超时时间,这个时间只针对当前执行的Listen命令有效,timeout
针对的对象是Work进程。
使用场景不同
Work命令的适用场景是任务数量较多、性能要求较高、任务的执行时间较短、消费者类中不存在死循环/sleep()
/exit()
/die()
等容易导致bug的逻辑。
Listen命令的适用场景是任务数量较少、任务的执行时间较长(如生成大型的Excel报表等)、任务的执行时间需要有严格限制。
消息处理流程
消息队列处理一个任务的具体流程
queue_failed
内置的次数超限事件标签,是否定义了queue_failed
行为,未定义则不处理直接返回,已定义则对次数超限的任务进行处理。
$runHookCb = Behavior::queueFailed() //返回true则删除任务执行任务失败回调,返回false则不执行任何操作。
消费当前的任务
$job->fire()
从$job
对象的payload
属性中解析出消费者类,创建消费者类的实例,执行消费者类的实例的fire($job, $data)
方法。
需要在fire($job, $data)
中手动删除任务,$job
参数表示当前任务对象,$data
参数表示当前的任务数据即创建队列时传入的参数。
消息队列的开始、停止、重启
开始一个消息队列
$ php think queue:work
停止所有的消息队列
$ php think queue:restart
重启所有的消息队列
$ php think queue:restart
$ php think queue:work
多模块多任务的处理
单模块项目推荐时间app/job
作为任务类的命名空间,多任务项目可使用app/module/job
作为任务类的命名空间,也可以放在任意可以自动加载到的地方。
如果一个任务类中有多个小任务的话,在发布任务的时候,需要使用任务的“类名@方法名”的形式,例如app\lib\job\Job@task
,注意命令行中的--queue
参数不执行@
的解析。
消息的延迟执行与定时任务
延迟执行是相对于即使执行的,是用来限制某个任务的最早可执行时刻。在到达该时刻之前该任务会被跳过,可以利用该功能实现定时任务。
延迟执行的使用方式
在生产者业务代码中
// 即时执行
$is_pushed = Queue::push($job_handler_classname, $job_data, $job_queue_name)
// 延迟2秒执行
$is_pushed = Queue::later(2, $job_handler_classname, $job_data_arr, $job_queue_name);
// 延迟到2019-06-01 00:00:00时刻执行
$time2wait = strtotime("2019-06-01 00:00:00") - strtotime("now");
$is_pushed = Queue::later($time2wait, $job_handler_classname, $job_data_arr, $job_queue_name);
在消费者类中
// 重发,即时执行
$job->release();
// 重发,延迟2秒后执行
$job->release(2);
// 延迟到2019-06-01 00:00:00时刻执行
$time2wait = strtotime("2019-06-01 00:00:00") - strtotime("now");
$job->release($time2wait);
在命令行中
如果消费者类的fire
方法抛出了异常且任务未被删除时,将自动重发该任务。重发时会设置下次执行前延迟多少秒,默认为0。
$ php think queue:work --delay 3
消息重发时机有三种情况:
在消费者类中手动重发
if($is_job_done === false)
$job->release();
Work进程自动重发需要同时满足以下两个条件:消费者类的fire()
方法抛出异常、任务未被删除
当配置了expire
不为null
时,Work进程内部每次查询可用任务之前会自动重发已过期的任务。
注意:在Database模式下,2.7.1和2.7.2中的重发逻辑是先删除原来的任务,然后插入一个新的任务。2.7.3中的重发机制是直接更新原任务。在Redis模式下,三种重发都是先删除再插入。不管是那种重发方式,重发之后任务的已尝试次数会在原来的基础上加1。
此外,消费者类中需要注意,如果fire()
方法中抛出异常,将出现两种情况:
如果不需要自动重发的话,请在抛出异常之前将任务删除$job->delete()
,否则会被框架自动重发。
如果需要自动重发的话,请直接抛出异常,不要在fire()
方法中手动使用$job->release()
,这样将导致任务被重发两次,产生两个一样的新任务。
Redis驱动下的任务重发细节
在Redis驱动下为了实现任务的延迟执行和过期重发,任务将在这三个key
中来回转移。
在Database模式下消息处理的消息流程中,如果配置的expire
不是null
那么think-queue
的work
进程每次在获取下一个可执行任务之前,会先尝试重发所有过期的任务。而在Redis驱动下这个步骤则做了更多的事情,详情如下:
从queue:xxx:delayed
的key
中查询出有哪些任务在当前时刻已经可以开始执行,然后将这些任务转移到queue:xxx
的key
的尾部。
从queue:xxx:reserved
的key
中查询出有哪些任务在当前时刻已经过期,然后将这些任务转移到queue:xxx
的key
的尾部。
尝试从queue:xxx
的key
的头部取出一个任务,如果取出成功,则将这个任务转移到queue:xxx:reserved
的key
的头部,同时将这个任务实例化成任务对象,交给消费者去执行。
Redis队列中的过期任务重发步骤,执行前:
任务的已尝试次数大于命令行的--tries
参数
开发者添加了queue_failed
事件标签及其对应的回调代码
消费者类中定义了failed()
方法用于接收任务失败的通知
注意,queue_failed
标签需要在安装think-queue
之后手动去/app/tags.php
文件中添加。
-任务完成后使用$job->delete()
删除任务
在消费者类的fire()
方法中使用$job->attempt()
检查任务已执行次数,对于次数异常的做相应的处理。
在消费者类的fire()
方法在中根据业务数据来判断该任务是否已经执行过,以避免该任务被重复执行。
编写失败回调事件将事件中失败的任务及时通知给开发人员
队列的稳定性和拓展性
稳定性:不管是listen
模式还是work
模式,建议使用supervisor
或自定义的cron
脚本去定时检查work
进程是否正常。
拓展性:当某个队列的消费者不足时在给这个队列添加work
进程即可
最好配置本地的Redis,使用远程Redis曾出现无法解释的原因。
$ yum install -g redis
$ systemctl restart redis
停止原正在运行的
$ supervisorctl shutdown
重新加载服务
$ supervisord -c /etc/supervisord.conf
$ supervisorctl reload
$ ps aux | grep supervisord
$ top
未完待续...