Collectives™ on Stack Overflow

Find centralized, trusted content and collaborate around the technologies you use most.

Learn more about Collectives

Teams

Q&A for work

Connect and share knowledge within a single location that is structured and easy to search.

Learn more about Teams

We have an application with million events go through every hour. Using RabbitMQ and PHP-AMQP we process them through our own made consumers. Once in a while the connection drops.

Consumers are running on DO App Platform (worker) and the RabbitMQ is a droplet.

Consumer is a Laravel Command.

public function handle() {
    $queuesToListen = $this->constructQueues();
    $queueJobs      = $this->extractQueueJobs($this->queues);
    if(is_array($queuesToListen) && count($queuesToListen) > 0){
        $this->output->writeln('<fg=green>[EVENTS] Found ' . count($queuesToListen) . ' queues to subscribe to, starting to listen</>');
        while(true) {
            foreach ($queuesToListen as $queueName) {
                $this->output->writeln('<fg=yellow>Let\'s listen to queue ' . $queueName." </>");
                $this->currentQueue = $queueName;
                try {
                    $connection = AMQPStreamConnection::create_connection([
                        ['host' => env('RABBITMQ_HOST'), 'port' =>  env('RABBITMQ_PORT'), 'user' => env('RABBITMQ_USER'), 'password' => env('RABBITMQ_PASSWORD'), 'vhost' => env('RABBITMQ_VHOST')]
                    ], ['heartbeat' => constant($queueJobs[$this->currentQueue] . '::HEARTBEAT_RATE')]);
                    $channel = $connection->channel();
                    $channel->queue_declare($queueName, false, true, false, false);
                    echo ' [*] Connected to queue ' . $queueName . ' , waiting for messages.', "\n";
                    $channel->basic_qos(null, 10, null);
                    $channel->basic_consume($queueName, '', false, false, false, false, [$this, 'processEvent']);
                    // Loop as long as the channel has callbacks registered
                    while (count($channel->callbacks)) {
                        $channel->wait(null, false, 2);
                } catch (\PhpAmqpLib\Exception\AMQPExceptionInterface $e) {
                    echo "AMQP Exception: ", $e->getMessage(), "\n";
                    echo "AMQP Code: ", $e->getCode(), "\n";
                    $channel->close(); //LINE 72
                    $connection->close();
    }else{
        $this->output->writeln('<fg=red>[EVENTS] No queues to process, command stopped</>');

Error we get are: PhpAmqpLib\Exception\AMQPConnectionClosedException: Broken pipe or closed connection #19 /vendor/php-amqplib/php-amqplib/PhpAmqpLib/Wire/IO/StreamIO.php(235): PhpAmqpLib\Wire\IO\StreamIO::write #18 /vendor/php-amqplib/php-amqplib/PhpAmqpLib/Connection/AbstractConnection.php(408): PhpAmqpLib\Connection\AbstractConnection::write #17 /vendor/php-amqplib/php-amqplib/PhpAmqpLib/Connection/AbstractConnection.php(536): PhpAmqpLib\Connection\AbstractConnection::send_channel_method_frame #16 /vendor/php-amqplib/php-amqplib/PhpAmqpLib/Channel/AbstractChannel.php(248): PhpAmqpLib\Channel\AbstractChannel::send_method_frame #15 /vendor/php-amqplib/php-amqplib/PhpAmqpLib/Channel/AMQPChannel.php(198): PhpAmqpLib\Channel\AMQPChannel::close #14 /vendor/sdk-blueprint-service/src/RabbitMQ/Console/Consume.php(72):BluePrint\RabbitMQ\Console\Consume::handle #13 /vendor/illuminate/container/BoundMethod.php(36): Illuminate\Container\BoundMethod::Illuminate\Container{closure} #12 /vendor/illuminate/container/Util.php(40): Illuminate\Container\Util::unwrapIfClosure #11 /vendor/illuminate/container/BoundMethod.php(93): Illuminate\Container\BoundMethod::callBoundMethod #10 /vendor/illuminate/container/BoundMethod.php(37): Illuminate\Container\BoundMethod::call #9 /vendor/illuminate/container/Container.php(653): Illuminate\Container\Container::call #8 /vendor/illuminate/console/Command.php(136): Illuminate\Console\Command::execute #7 /vendor/symfony/console/Command/Command.php(298): Symfony\Component\Console\Command\Command::run #6 /vendor/illuminate/console/Command.php(121): Illuminate\Console\Command::run #5 /vendor/symfony/console/Application.php(1024): Symfony\Component\Console\Application::doRunCommand #4 /vendor/symfony/console/Application.php(299): Symfony\Component\Console\Application::doRun #3 /vendor/symfony/console/Application.php(171): Symfony\Component\Console\Application::run #2 /vendor/illuminate/console/Application.php(94): Illuminate\Console\Application::run #1 /vendor/laravel/lumen-framework/src/Console/Kernel.php(116): Laravel\Lumen\Console\Kernel::handle #0 /artisan(35): null

Whenever this error occurs the application stops processing the events. And we manually have to redeploy the application.

How can we prevent this from crashing? Is there a way to detect if a channel is still open?

Thanks for contributing an answer to Stack Overflow!

  • Please be sure to answer the question. Provide details and share your research!

But avoid

  • Asking for help, clarification, or responding to other answers.
  • Making statements based on opinion; back them up with references or personal experience.

To learn more, see our tips on writing great answers.