Collectives™ on Stack Overflow

Find centralized, trusted content and collaborate around the technologies you use most.

Learn more about Collectives

Teams

Q&A for work

Connect and share knowledge within a single location that is structured and easy to search.

Learn more about Teams

I am processing a huge xml document (which contains around a million entries) and subsequently importing a formatted version to the db using rabbitmq. Each time after publishing around 200,000 entries I receive a broken pipe error , and rabbitmq is unable to recover from it.

Notice Error: fwrite(): send of 2651 bytes failed with errno=11 Resource temporarily unavailable in [/var/www/ribbon/app/Console/Command/lib/php_amqplib/amqp.inc, line

Notice Error: fwrite(): send of 33 bytes failed with errno=104 Connection reset by peer in [/var/www/ribbon/app/Console/Command/lib/php_amqplib/amqp.inc, line

Notice Error: fwrite(): send of 19 bytes failed with errno=32 Broken pipe in [/var/www/ribbon/app/Console/Command/lib/php_amqplib/amqp.inc, line 439]

This subsequently causes a node down error and the process needs to be manually killed to recover from it.

These are my class methods:-

public function publishMessage($message) {
    if (!isset($this->conn)) {
        $this->_createNewConnectionAndChannel();
    try {
        $this->ch->basic_publish(
            new AMQPMessage($message, array('content_type' => 'text/plain')), 
            $this->defaults['exchange']['name'], 
            $this->defaults['binding']['routing_key']
    } catch (Exception $e) {
        echo "Caught exception : " . $e->getMessage();
        echo "Creating new connection.";
        $this->_createNewConnectionAndChannel();
        $this->publishMessage($message); // try again
protected function _createNewConnectionAndChannel() {
    if (isset($this->conn)) {
        $this->conn->close();
    if(isset($this->ch)) {
        $this->ch->close();
    $this->conn = new AMQPConnection(
        $this->defaults['connection']['host'], 
        $this->defaults['connection']['port'], 
        $this->defaults['connection']['user'], 
        $this->defaults['connection']['pass']
    $this->ch = $this->conn->channel();
    $this->ch->access_request($this->defaults['channel']['vhost'], false, false, true, true);
    $this->ch->basic_qos(0 , 20 , 0); // fair dispatching
    $this->ch->queue_declare(
        $this->defaults['queue']['name'],
        $this->defaults['queue']['passive'],
        $this->defaults['queue']['durable'],
        $this->defaults['queue']['exclusive'],
        $this->defaults['queue']['auto_delete']
    $this->ch->exchange_declare(
        $this->defaults['exchange']['name'],
        $this->defaults['exchange']['type'],
        $this->defaults['exchange']['passive'],
        $this->defaults['exchange']['durable'],
        $this->defaults['exchange']['auto_delete']
    $this->ch->queue_bind(
        $this->defaults['queue']['name'],
        $this->defaults['exchange']['name'],
        $this->defaults['binding']['routing_key']

Any help will be appreciated.

Have you tried the PECL AMQP extension? From experience, it is much less flaky and better maintained. – salathe Apr 5, 2013 at 20:20

Make sure you have added virtualhost access for your user on Rabbit MQ. I've created new user and forgot set access rights for "/" host which is used by default.

You can do that via management panel yourhost:15672 > Admin > click on user > Look for "Set permission".

P.S. I assume your RabbitMQ service is running, user exists and password is correct.

Actually this problem happens when you have a big content inside your message and your consumer expend too much time to process only one message, that is problem to response "ACK" to rabbit and try to consume another message.

When I have this problem for example I try to "fit" my messages, because its a products worker and each message had some like 1k products id, so I change to 100 products and it works very well.

You can read more about Detecting Dead TCP Connections with Heartbeats here

This problem happened to me when my connection to RabbitMQ was broken (the reason does not matter, in my case I intentionally stopped RabbitMQ service for some failure tests), and I was trying to reconnect to the RabbitMQ again by closing the old connection and initializing a new one, but I received Broken pipe or closed connection error. The way I solved this problem was to use reconnect() method on my connection:

$channel->reconnect();
                I have this error "broken pipe" exactly after $channel->getConnection()->reconnect(); Maybe you can tell me why?
– Daart Kote
                Aug 18, 2021 at 19:58
        

Thanks for contributing an answer to Stack Overflow!

  • Please be sure to answer the question. Provide details and share your research!

But avoid

  • Asking for help, clarification, or responding to other answers.
  • Making statements based on opinion; back them up with references or personal experience.

To learn more, see our tips on writing great answers.