Collectives™ on Stack Overflow
Find centralized, trusted content and collaborate around the technologies you use most.
Learn more about Collectives
Teams
Q&A for work
Connect and share knowledge within a single location that is structured and easy to search.
Learn more about Teams
We have an application with million events go through every hour. Using RabbitMQ and PHP-AMQP we process them through our own made consumers. Once in a while the connection drops.
Consumers are running on DO App Platform (worker) and the RabbitMQ is a droplet.
Consumer is a Laravel Command.
public function handle() {
$queuesToListen = $this->constructQueues();
$queueJobs = $this->extractQueueJobs($this->queues);
if(is_array($queuesToListen) && count($queuesToListen) > 0){
$this->output->writeln('<fg=green>[EVENTS] Found ' . count($queuesToListen) . ' queues to subscribe to, starting to listen</>');
while(true) {
foreach ($queuesToListen as $queueName) {
$this->output->writeln('<fg=yellow>Let\'s listen to queue ' . $queueName." </>");
$this->currentQueue = $queueName;
try {
$connection = AMQPStreamConnection::create_connection([
['host' => env('RABBITMQ_HOST'), 'port' => env('RABBITMQ_PORT'), 'user' => env('RABBITMQ_USER'), 'password' => env('RABBITMQ_PASSWORD'), 'vhost' => env('RABBITMQ_VHOST')]
], ['heartbeat' => constant($queueJobs[$this->currentQueue] . '::HEARTBEAT_RATE')]);
$channel = $connection->channel();
$channel->queue_declare($queueName, false, true, false, false);
echo ' [*] Connected to queue ' . $queueName . ' , waiting for messages.', "\n";
$channel->basic_qos(null, 10, null);
$channel->basic_consume($queueName, '', false, false, false, false, [$this, 'processEvent']);
// Loop as long as the channel has callbacks registered
while (count($channel->callbacks)) {
$channel->wait(null, false, 2);
} catch (\PhpAmqpLib\Exception\AMQPExceptionInterface $e) {
echo "AMQP Exception: ", $e->getMessage(), "\n";
echo "AMQP Code: ", $e->getCode(), "\n";
$channel->close(); //LINE 72
$connection->close();
}else{
$this->output->writeln('<fg=red>[EVENTS] No queues to process, command stopped</>');
Error we get are:
PhpAmqpLib\Exception\AMQPConnectionClosedException: Broken pipe or closed connection
#19 /vendor/php-amqplib/php-amqplib/PhpAmqpLib/Wire/IO/StreamIO.php(235): PhpAmqpLib\Wire\IO\StreamIO::write
#18 /vendor/php-amqplib/php-amqplib/PhpAmqpLib/Connection/AbstractConnection.php(408): PhpAmqpLib\Connection\AbstractConnection::write
#17 /vendor/php-amqplib/php-amqplib/PhpAmqpLib/Connection/AbstractConnection.php(536): PhpAmqpLib\Connection\AbstractConnection::send_channel_method_frame
#16 /vendor/php-amqplib/php-amqplib/PhpAmqpLib/Channel/AbstractChannel.php(248): PhpAmqpLib\Channel\AbstractChannel::send_method_frame
#15 /vendor/php-amqplib/php-amqplib/PhpAmqpLib/Channel/AMQPChannel.php(198): PhpAmqpLib\Channel\AMQPChannel::close
#14 /vendor/sdk-blueprint-service/src/RabbitMQ/Console/Consume.php(72):BluePrint\RabbitMQ\Console\Consume::handle
#13 /vendor/illuminate/container/BoundMethod.php(36): Illuminate\Container\BoundMethod::Illuminate\Container{closure}
#12 /vendor/illuminate/container/Util.php(40): Illuminate\Container\Util::unwrapIfClosure
#11 /vendor/illuminate/container/BoundMethod.php(93): Illuminate\Container\BoundMethod::callBoundMethod
#10 /vendor/illuminate/container/BoundMethod.php(37): Illuminate\Container\BoundMethod::call
#9 /vendor/illuminate/container/Container.php(653): Illuminate\Container\Container::call
#8 /vendor/illuminate/console/Command.php(136): Illuminate\Console\Command::execute
#7 /vendor/symfony/console/Command/Command.php(298): Symfony\Component\Console\Command\Command::run
#6 /vendor/illuminate/console/Command.php(121): Illuminate\Console\Command::run
#5 /vendor/symfony/console/Application.php(1024): Symfony\Component\Console\Application::doRunCommand
#4 /vendor/symfony/console/Application.php(299): Symfony\Component\Console\Application::doRun
#3 /vendor/symfony/console/Application.php(171): Symfony\Component\Console\Application::run
#2 /vendor/illuminate/console/Application.php(94): Illuminate\Console\Application::run
#1 /vendor/laravel/lumen-framework/src/Console/Kernel.php(116): Laravel\Lumen\Console\Kernel::handle
#0 /artisan(35): null
Whenever this error occurs the application stops processing the events. And we manually have to redeploy the application.
How can we prevent this from crashing? Is there a way to detect if a channel is still open?
Thanks for contributing an answer to Stack Overflow!
- Please be sure to answer the question. Provide details and share your research!
But avoid …
- Asking for help, clarification, or responding to other answers.
- Making statements based on opinion; back them up with references or personal experience.
To learn more, see our tips on writing great answers.