相关文章推荐
直爽的抽屉  ·  vue按钮重复提交-掘金·  1 年前    · 
乐观的滑板  ·  解决pip is configured ...·  1 年前    · 
捣蛋的凉茶  ·  c# - How do you call ...·  1 年前    · 

ActiveMQ 消息的发送过程

ActiveMQ 支持同步、异步两种发送模式将消息发送到broker上。

同步发送过程中,发送是阻塞的,直到收到确认。发送者发送一条消息会阻塞,直到 broker 反馈一个确认消息,表示消息已经被 broker 处理。这个机制提供了消息的安全性保障,但是由于是阻塞的操作,会影响到客户端消息发送的性能。

异步发送的过程中,发送者不需要等待 broker 的确认,所以性能相对较高。

默认情况下,非持久化消息是异步发送的,持久化消息在事务模式下是异步发送的,持久化消息在非事务模式下是同步发送的。

由于异步发送的效率会比同步发送性能更高,所以在发送持久化消息的时候,尽量去开启事务会话。

我们可以通过以的方式来设置异步发送:

ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://localhost:61616? jms.useAsyncSend=true"); ((ActiveMQConnectionFactory) connectionFactory).setUseAsyncSend(true); ((ActiveMQConnection)connection).setUseAsyncSend(true);

虽然我们推荐使用异步发送,但是对于异步发送来说,需要设置一个消息的字节数,当消息达到一定字节的时候,需要等待 broker 的确认,在没有达到一定字节量的时候,不需要确认,因此提高了性能。

ProducerWindowSize 就是用来表示这个消息的字节数的,producer 每发送一个消息,统计一下发送的字节数,当字节数达到 ProducerWindowSize 值时,需要等待broker的确认,才能继续发送。

ProducerWindowSize 主要用来约束在异步发送时 producer 允许积压的(消费者尚未签收)消息的大小,且只对异步发送有意义。

ProducerWindowSize 可以通过如下2种方式设置:

在 brokerUrl 中设置,这种设置将会对所有的producer生效:

tcp://localhost:61616?jms.producerWindowSize=1024000

在destinationUri中设置,这种设置只会对使用该 Destination 的实例的 producer 生效,并会覆盖 brokerUrl中的 producerWindowSize 的值:

test-queue?producer.windowSize=1024000

注意:此值越大,意味着消耗 producer 的内存就越大。

ActiveMQ 消息的确认过程

消费者程序可以用两种方法接收消息,一种是使用同步阻塞的 receive 方法(同步消费者 ),另一种是使用消息监听器(异步消费者) MessageListener,在同一个 session 下,这两者不能同时使用。

默认情况下,消费者获取消息是 ActiveMQ 服务器(broker)采用异步方式向客户端主动推送消息(push)。也就是说 broker 在向某个消费者会话推送消息后,不会等待消费者响应消息,直到消费者处理完消息以后,主动向broker 返回处理结果,进行消息确认。

ActiveMQ 系统中,默认的策略是 ActiveMQ 服务器(broker) 一旦有消息,就主动按照设置的规则推送给当前活动的消费者, 每次推送都有一定的数量限制,而这个数量就是“预取消息数量“(prefetchSize),假如 prefetchSize = 0,此时对于消费者来说,就是一个 pull 模式。

prefetchsize 的默认大小:

ActiveMQ 设置预取数据 prefetchsize 的方式

在 ActiveMQ 中可以通过3种方式设置 prefetchSize 的大小:

1、在连接工厂 broker URL 里设置:

ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
                "tcp://localhost:61616?jms.prefetchPolicy.all=100");
//所有的目的地每次预取100条消息
 

2、在连接工厂 broker URL 里设置:

ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
                "tcp://localhost:61616?jms.prefetchPolicy.queuePrefetch=100");
//所有队列每次预取100条消息
 

3、在创建队列时设置:

Destination destination = session.createQueue("queue_01?consumer.prefetchSize=100");
//指定名称的队列每次预取100条消息
 

优化确认 optimizeAcknowledge

ActiveMQ 提供了 optimizeAcknowledge 来优化确认,它表示是否开启“优化 ACK”,优化确认就是指批量确认消息,优化确认一方面可以减轻 client 负担(不需要频繁的确认消息)、减少通信开销,另一方面由于延迟了确认(默认ack是达到了 prefetchSize * 0.65 个消息才确认),broker 再次发送消息时又可以批量发送。

如果只是开启了 prefetchSize,而 optimizeAcknowledge 没有开启的话,那么表示不会批量确认,这样每条消息都去确认的话,broker在收到确认后也只是发送一条消息,并不是批量发布。

optimizeAcknowledge 的设置

在连接工厂 broker URL 里设置

ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
                "tcp://localhost:61616?jms.optimizeAcknowledge=true");
 

optimizeAcknowledge 和 prefetchSize 的作用:

对于 consumer 而言,optimizeAcknowledge 属性只会在 AUTO_ACKNOWLEDGE 模式下有效。

两者协同工作,通过批量获取消息、并延迟批量确认,来达到一个高效的消息消费模型,不仅减少了客户端在获取消息时的阻塞次数,还能减少每次获取消息时的网络通信开销。如果消费者的消费速度比较高,通过这两者组合是能大大提升消费者的处理性能。

如果消费者的消费速度比较慢,设置比较大的 prefetchSize 反而不能有效的达到提升消费者性能的目的。因为在不说多个消费者节点的情况下,过大的 prefetchSize 会引起消费者负载的不均衡。

如果消息很重要,尤其是不想重复消费消息,那么我们需要将 optimizeAcknowledge 设置为 false,prefetchSize  设置为 1。