Kafka/RocketMQ顺序消息对比

一、Kafka顺序消息

Producer端 :Kafka的顺序消息是通过partition key,将某类消息(例如同一笔订单的不同状态)写入同一个partition,因此Kafka只能保证消息在同一个partition内有序,无法保证全局有序;
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
return Math.abs(key.hashCode()) % partitions.size();
Consumer端:Kafka Java Consumer是单线程的设计(多线程方案需要业务端自己实现),即一个partition只能对应一个消费线程,因此可以保证消息被顺序消费;

二、RocketMQ顺序消息

Producer端:RMQ顺序消息跟Kafka类似,通过消息路由机制把消息发送到指定的MessageQueue中(参考Kafka/RocketMQ生产者路由对比); Consumer端:RMQ Java Consumer是线程池的设计,因此在集群模式下消费的顺序消费,需要通过一系列设计来保证;
1. Consumer 注册 MessageListenerOrderly
public static void main(String[] args) throws MQClientException {
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("Group_1");
    consumer.setNamesrvAddr("192.168.0.99:9876");
    consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
    consumer.subscribe("TopicTest", "*");
    consumer.registerMessageListener(new MessageListenerOrderly() {
        @Override
        public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
            // TODO
            return ConsumeOrderlyStatus.SUCCESS;
    consumer.start();
2. 定时向broker发送锁住当前正在消费的队列集合的消息

2.1 Consumer 启动时根据消息监听器类型创建监听服务

// 根据是否顺序消费,创建消费端消费线程服务;ConsumeMessageService主要负责消息消费,内部维护线程池;
if (this.getMessageListenerInner() instanceof MessageListenerOrderly) {
    this.consumeOrderly = true;
    this.consumeMessageService =
        new ConsumeMessageOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner());
this.consumeMessageService.start();

2.2 ConsumeMessageOrderlyService.start 启动定时任务(默认频率20s)

public void start() {
    if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())) {
        // 如果是集群消费,则启动定时任务,定时向broker发送批量锁住当前正在消费的队列集合的消息,
        // 具体是consumer端拿到正在消费的队列集合,发送锁住队列的消息至broker,broker端返回锁住成功的队列集合
        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                ConsumeMessageOrderlyService.this.lockMQPeriodically();
        }, 1000 * 1, ProcessQueue.REBALANCE_LOCK_INTERVAL, TimeUnit.MILLISECONDS);

2.3 向broker发送批量锁住当前正在消费的队列集合的消息,具体是consumer端拿到正在消费的队列集合,发送锁住队列的消息至broker,broker端返回锁住成功的队列集合。consumer收到后,设置是否锁住标志位。保证broker中的每个消息队列只对应一个消费端。

public void lockAll() {
    // broker -> broker上的 MessageQueue(当前Consumer消费的MessageQueue)
    HashMap<String, Set<MessageQueue>> brokerMqs = this.buildProcessQueueTableByBrokerName();
    Iterator<Entry<String, Set<MessageQueue>>> it = brokerMqs.entrySet().iterator();
    while (it.hasNext()) {
        Entry<String, Set<MessageQueue>> entry = it.next();
        final String brokerName = entry.getKey();
        final Set<MessageQueue> mqs = entry.getValue();
        if (mqs.isEmpty())
            continue;
        // 主要获取 broker 地址
        FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(brokerName, MixAll.MASTER_ID, true);
        if (findBrokerResult != null) {
            // 组装批量锁定请求
            LockBatchRequestBody requestBody = new LockBatchRequestBody();
            requestBody.setConsumerGroup(this.consumerGroup);
            requestBody.setClientId(this.mQClientFactory.getClientId());
            requestBody.setMqSet(mqs); // MessageQueue
            try {
                // 发送请求到Broker,Broker返回锁定的MessageQueue集合
                Set<MessageQueue> lockOKMQSet =
                    this.mQClientFactory.getMQClientAPIImpl().lockBatchMQ(findBrokerResult.getBrokerAddr(), requestBody, 1000);
                for (MessageQueue mq : lockOKMQSet) {
                    ProcessQueue processQueue = this.processQueueTable.get(mq);
                    if (processQueue != null) {
                        if (!processQueue.isLocked()) {
                            log.info("the message queue locked OK, Group: {} {}", this.consumerGroup, mq);
                        // Broker锁定的集合,在本地加锁,后面拉取消息消费时会用到
                        processQueue.setLocked(true);
                        processQueue.setLastLockTimestamp(System.currentTimeMillis());
                ......
            } catch (Exception e) {
                log.error("lockBatchMQ exception, " + mqs, e);
3. 消费消息时通过锁实现串行执行

3.1 DefaultMQPushConsumerImpl.pullMessage 拉取消息提交到ConsumeMessageOrderlyService的线程池consumeExecutor中执行

DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(
                                pullResult.getMsgFoundList(),
                                processQueue,
                                pullRequest.getMessageQueue(),
                                dispatchToConsume);

3.2 ConsumeMessageOrderlyService.ConsumeRequest.run 消费消息,消费时对消费队列进行加锁,保证同一个消费队列中的多条消息会串行执行;

@Override
public void run() {
    if (this.processQueue.isDropped()) {
        log.warn("run, the message queue not be able to consume, because it's dropped. {}", this.messageQueue);
        return;
    // 获取当前 MessageQueue 的锁
    final Object objLock = messageQueueLock.fetchLockObject(this.messageQueue);
    synchronized (objLock) {
        // 广播模式 或者 ProcessQueue上锁(lockAll()进行上锁)并且锁没有过期,否则延迟 10ms 再执行
        if (MessageModel.BROADCASTING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())
            || (this.processQueue.isLocked() && !this.processQueue.isLockExpired())) {
            final long beginTime = System.currentTimeMillis();
            for (boolean continueConsume = true; continueConsume; ) {
                if (this.processQueue.isDropped()) {
                    log.warn("the message queue not be able to consume, because it's dropped. {}", this.messageQueue);
                    break;
                // ProcessQueue 未上锁
                if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())
                    && !this.processQueue.isLocked()) {
                    log.warn("the message queue not locked, so consume later, {}", this.messageQueue);
                    ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 10);
                    break;
                // ProcessQueue 锁过期
                if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())
                    && this.processQueue.isLockExpired()) {
                    log.warn("the message queue lock expired, so consume later, {}", this.messageQueue);
                    ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 10);
                    break;
                // 消费任务一次运行的最大时间。可以通过-Drocketmq.client.maxTimeConsumeContinuously来设置,默认为60s。
                long interval = System.currentTimeMillis() - beginTime;
                if (interval > MAX_TIME_CONSUME_CONTINUOUSLY) {
                    ConsumeMessageOrderlyService.this.submitConsumeRequestLater(processQueue, messageQueue, 10);
                    break;
                // 消费批次大小,默认为1, 也就是一个一个消费,实际生产环境可以调整大
                final int consumeBatchSize =
                    ConsumeMessageOrderlyService.this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();
                // 从treeMap里面依次获取对应数量的消息出来,取得时候加读写锁
                List<MessageExt> msgs = this.processQueue.takeMessags(consumeBatchSize);
                if (!msgs.isEmpty()) {
                    ......
                    long beginTimestamp = System.currentTimeMillis();
                    ConsumeReturnType returnType = ConsumeReturnType.SUCCESS;
                    boolean hasException = false;
                    try {
                        // 队列锁---防止顺序消息被重复消费
                        this.processQueue.getLockConsume().lock();
                        if (this.processQueue.isDropped()) {
                            log.warn("consumeMessage, the message queue not be able to consume, because it's dropped. {}",
                                this.messageQueue);
                            break;
                        // 进行消息消费,返回消费结果
                        status = messageListener.consumeMessage(Collections.unmodifiableList(msgs), context);
                    } catch (Throwable e) {
                        log.warn("consumeMessage exception: {} Group: {} Msgs: {} MQ: {}",
                            RemotingHelper.exceptionSimpleDesc(e),
                            ConsumeMessageOrderlyService.this.consumerGroup,
                            msgs,
                            messageQueue);
                        hasException = true;
                    } finally {
                        // 消费锁释放
                        this.processQueue.getLockConsume().unlock();
                    ......
                    // 处理消费结果
                    continueConsume = ConsumeMessageOrderlyService.this.processConsumeResult(msgs, status, context, this);
                } else {
                    continueConsume = false;
        } else {
            // 当队列没有上锁,那么会走这一块,然后进行上锁,这块最终又会重新执行到上面的代码里面去
            ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 100);