ActiveMQ 消息的发送过程
ActiveMQ 支持同步、异步两种发送模式将消息发送到broker上。
同步发送过程中,发送是阻塞的,直到收到确认。发送者发送一条消息会阻塞,直到 broker 反馈一个确认消息,表示消息已经被 broker 处理。这个机制提供了消息的安全性保障,但是由于是阻塞的操作,会影响到客户端消息发送的性能。
异步发送的过程中,发送者不需要等待 broker 的确认,所以性能相对较高。
默认情况下,非持久化消息是异步发送的,持久化消息在事务模式下是异步发送的,持久化消息在非事务模式下是同步发送的。
由于异步发送的效率会比同步发送性能更高,所以在发送持久化消息的时候,尽量去开启事务会话。
我们可以通过以的方式来设置异步发送:
ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://localhost:61616?
jms.useAsyncSend=true");
((ActiveMQConnectionFactory) connectionFactory).setUseAsyncSend(true);
((ActiveMQConnection)connection).setUseAsyncSend(true);
虽然我们推荐使用异步发送,但是对于异步发送来说,需要设置一个消息的字节数,当消息达到一定字节的时候,需要等待 broker 的确认,在没有达到一定字节量的时候,不需要确认,因此提高了性能。
ProducerWindowSize 就是用来表示这个消息的字节数的,producer 每发送一个消息,统计一下发送的字节数,当字节数达到 ProducerWindowSize 值时,需要等待broker的确认,才能继续发送。
ProducerWindowSize 主要用来约束在异步发送时 producer 允许积压的(消费者尚未签收)消息的大小,且只对异步发送有意义。
ProducerWindowSize 可以通过如下2种方式设置:
在 brokerUrl 中设置,这种设置将会对所有的producer生效:
tcp://localhost:61616?jms.producerWindowSize=1024000
在destinationUri中设置,这种设置只会对使用该 Destination 的实例的 producer 生效,并会覆盖 brokerUrl中的 producerWindowSize 的值:
test-queue?producer.windowSize=1024000
注意:此值越大,意味着消耗 producer 的内存就越大。
ActiveMQ 消息的确认过程
消费者程序可以用两种方法接收消息,一种是使用同步阻塞的 receive 方法(同步消费者 ),另一种是使用消息监听器(异步消费者) MessageListener,在同一个 session 下,这两者不能同时使用。
默认情况下,消费者获取消息是 ActiveMQ 服务器(broker)采用异步方式向客户端主动推送消息(push)。也就是说 broker 在向某个消费者会话推送消息后,不会等待消费者响应消息,直到消费者处理完消息以后,主动向broker 返回处理结果,进行消息确认。
ActiveMQ 系统中,默认的策略是 ActiveMQ 服务器(broker) 一旦有消息,就主动按照设置的规则推送给当前活动的消费者, 每次推送都有一定的数量限制,而这个数量就是“预取消息数量“(prefetchSize),假如 prefetchSize = 0,此时对于消费者来说,就是一个 pull 模式。
prefetchsize 的默认大小:
ActiveMQ 设置预取数据 prefetchsize 的方式
在 ActiveMQ 中可以通过3种方式设置 prefetchSize 的大小:
1、在连接工厂 broker URL 里设置:
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
"tcp://localhost:61616?jms.prefetchPolicy.all=100");
//所有的目的地每次预取100条消息
2、在连接工厂 broker URL 里设置:
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
"tcp://localhost:61616?jms.prefetchPolicy.queuePrefetch=100");
//所有队列每次预取100条消息
3、在创建队列时设置:
Destination destination = session.createQueue("queue_01?consumer.prefetchSize=100");
//指定名称的队列每次预取100条消息
优化确认 optimizeAcknowledge
ActiveMQ 提供了 optimizeAcknowledge 来优化确认,它表示是否开启“优化 ACK”,优化确认就是指批量确认消息,优化确认一方面可以减轻 client 负担(不需要频繁的确认消息)、减少通信开销,另一方面由于延迟了确认(默认ack是达到了 prefetchSize * 0.65 个消息才确认),broker 再次发送消息时又可以批量发送。
如果只是开启了 prefetchSize,而 optimizeAcknowledge 没有开启的话,那么表示不会批量确认,这样每条消息都去确认的话,broker在收到确认后也只是发送一条消息,并不是批量发布。
optimizeAcknowledge 的设置
在连接工厂 broker URL 里设置
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
"tcp://localhost:61616?jms.optimizeAcknowledge=true");
optimizeAcknowledge 和 prefetchSize 的作用:
对于 consumer 而言,optimizeAcknowledge 属性只会在 AUTO_ACKNOWLEDGE 模式下有效。
两者协同工作,通过批量获取消息、并延迟批量确认,来达到一个高效的消息消费模型,不仅减少了客户端在获取消息时的阻塞次数,还能减少每次获取消息时的网络通信开销。如果消费者的消费速度比较高,通过这两者组合是能大大提升消费者的处理性能。
如果消费者的消费速度比较慢,设置比较大的 prefetchSize 反而不能有效的达到提升消费者性能的目的。因为在不说多个消费者节点的情况下,过大的 prefetchSize 会引起消费者负载的不均衡。
如果消息很重要,尤其是不想重复消费消息,那么我们需要将 optimizeAcknowledge 设置为 false,prefetchSize 设置为 1。