Laravel 队列使用的实现
1 环境
Laravel是一种类似ThinkPHP的php框架,封装的诸多功能可以很方便的使用。队列Queue便是其中之一。
Windows环境下,可使用PHPstorm作为Laravel的集成开发环境IDE。
2 队列
Laravel可配置多种队列驱动,包括 “sync”, “database”, “beanstalkd”, “sqs”, “ redis ”, “null”(具体参见app/config/queue.php) 其中sync为同步,database为使用 数据库 ,后面三种为第三方队列服务,最后一种为不使用队列。
通过在 .env 中的 QUEUE_CONNECTION 选项,来决定选择何种驱动。
如 QUEUE_CONNECTION=database 即为选择数据库驱动队列。
3 原理
所谓队列,会有数据的生产者和消费者之分。生产者向队列中投递数据,消费者从队列中获取数据。
比如向用户发送邮件的场景:现在有10w封邮件需要发送,最简单的,我们需要有一个方法将邮件的收件人、内容等,拆分成10w条任务放在队列中,同时需要设置一个回调方法负责处理每条任务。当队列中有邮件发送任务时,队列会主动调用回调方法,并传递任务详情进去。回调方法处理完成后,单条邮件即发送完毕。其他邮件依样处理。
4 使用数据库驱动队列
4.1 生成任务表
在终端下输入
php artisan queue:table
php artisan migrate
在数据库连接正常的情况下,会在数据库中出现jobs表:
[id] bigint
[queue] nvarchar(255)
[payload] nvarchar(max)
[attempts] tinyint
[reserved_at] int
[available_at] int
[created_at] int
4.2 创建任务类
php artisan make:job SendEmail
在终端内执行上述命令,会自动生成 app/Jobs/SendMail.php 文件
class SendMail implements ShouldQueue
在该文件的handle方法中,可以放置任务处理逻辑。
4.3 发送任务
在任意位置,均可像下面一样调用 dispatch 发送任务
SendMail::dispatch($email);
4.4 驱动队列
完成上述步骤后,可以在数据库中发现一条记录(导出为insert SQL语句):
复制代码 代码如下:
INSERT INTO [jobs]([id], [queue], [payload], [attempts], [reserved_at], [available_at], [created_at]) VALUES (6, N’default’, N'{“displayName”:”App\\Jobs\\ProcessPodcast”,”job”:”Illuminate\\Queue\\CallQueuedHandler@call”,”maxTries”:null,”timeout”:null,”timeoutAt”:null,”data”:{“commandName”:”App\\Jobs\\ProcessPodcast”,”command”:”O:23:\”App\\Jobs\\ProcessPodcast\”:8:{s:29:\”\u0000App\\Jobs\\ProcessPodcast\u0000data\”;s:6:\”111222\”;s:6:\”\u0000*\u0000job\”;N;s:10:\”connection\”;N;s:5:\”queue\”;N;s:15:\”chainConnection\”;N;s:10:\”chainQueue\”;N;s:5:\”delay\”;N;s:7:\”chained\”;a:0:{}}”}}’, 0, NULL, 1545980176, 1545980176);
此时任务已经放置在数据库内,只有将队列运行起来后,队列才能主动调用回调方法。
php artisan queue:work
在终端内运行上述命令即可。该命令还有诸多参数,如deamon、tries等,可根据需要指定。
4.5 守护进程
为了保证应用服务的稳定性,需要开启守护进程。
Linux下,一般使用 Supervisor ,Windows下使用 Forever
4.6 执行失败的处理
对于处理失败的任务,Laravel也提供的解决方案。通过运行如下命令,即可创建表以记录失败任务。
php artisan queue:failed-table
php artisan migrate
在数据库中即生成 failed_jobs :
[id] bigint
[connection] nvarchar(max)
[queue] nvarchar(max)
[payload] nvarchar(max)
[exception] nvarchar(max)
[failed_at] datetime
导致任务失败的 Exception 会被传递到 SendMail 的 failed 方法,因而你需要在SendMail中自行实现该方法,并做进一步处理。
任务执行失败的原因有很多,如传参错误、尝试次数超过限制、超时、甚至在 handle 方法中抛出异常,均会作为失败任务处理。
4.7 任务执行前后的处理
Laravel提供了任务执行前后的处理入口,即在 App/Providers/AppServiceProvider 中的 boot() 中加入如下代码:
public function boot()
Queue::before( function (JobProcessing $event) {
Log::info("处理任务前");
Queue::after( function (JobProcessed $event) {
Log::info("处理任务后");
}
传递的 $event 中,带有任务详情,几个简单的例子:
$event- connectionName
$event- job
$event- job- payload()
5 使用 Redis 驱动队列
5.1 Laravel 安装 Predis 包
在 Laravel 中使用 Redis 之前,需要通过 Composer 安装 predis/predis 包:
composer require predis/predis
上述拓展是帮助Laravel与Redis打交道的,我们现在还缺少Redis服务。
如果此时将 .env 中的 QUEUE_CONNECTION 改为 redis,访问时会报错:
Predis \ Connection \ ConnectionException (10061)