ä¸ãKafka顺åºæ¶æ¯
Producer端 ï¼Kafkaç顺åºæ¶æ¯æ¯éè¿partition keyï¼å°æç±»æ¶æ¯ï¼ä¾å¦åä¸ç¬è®¢åçä¸åç¶æï¼åå ¥åä¸ä¸ªpartitionï¼å æ¤Kafkaåªè½ä¿è¯æ¶æ¯å¨åä¸ä¸ªpartitionå æåºï¼æ æ³ä¿è¯å ¨å±æåºï¼List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
return Math.abs(key.hashCode()) % partitions.size();
Consumer端ï¼Kafka Java Consumeræ¯å线ç¨ç设计(å¤çº¿ç¨æ¹æ¡éè¦ä¸å¡ç«¯èªå·±å®ç°)ï¼å³ä¸ä¸ªpartitionåªè½å¯¹åºä¸ä¸ªæ¶è´¹çº¿ç¨ï¼å æ¤å¯ä»¥ä¿è¯æ¶æ¯è¢«é¡ºåºæ¶è´¹ï¼
äºãRocketMQ顺åºæ¶æ¯
Producer端ï¼RMQ顺åºæ¶æ¯è·Kafka类似ï¼éè¿æ¶æ¯è·¯ç±æºå¶ææ¶æ¯åéå°æå®çMessageQueueä¸(åèKafka/RocketMQç产è
è·¯ç±å¯¹æ¯)ï¼
Consumer端ï¼RMQ Java Consumeræ¯çº¿ç¨æ± ç设计ï¼å æ¤å¨é群模å¼ä¸æ¶è´¹ç顺åºæ¶è´¹ï¼éè¦éè¿ä¸ç³»å设计æ¥ä¿è¯ï¼
1. Consumer 注å MessageListenerOrderly
public static void main(String[] args) throws MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("Group_1");
consumer.setNamesrvAddr("192.168.0.99:9876");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.subscribe("TopicTest", "*");
consumer.registerMessageListener(new MessageListenerOrderly() {
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
// TODO
return ConsumeOrderlyStatus.SUCCESS;
consumer.start();
2. å®æ¶åbrokeråééä½å½åæ£å¨æ¶è´¹çéåéåçæ¶æ¯
2.1 Consumer å¯å¨æ¶æ ¹æ®æ¶æ¯çå¬å¨ç±»åå建çå¬æå¡
// æ ¹æ®æ¯å¦é¡ºåºæ¶è´¹ï¼å建æ¶è´¹ç«¯æ¶è´¹çº¿ç¨æå¡ï¼ConsumeMessageService主è¦è´è´£æ¶æ¯æ¶è´¹ï¼å
é¨ç»´æ¤çº¿ç¨æ± ï¼
if (this.getMessageListenerInner() instanceof MessageListenerOrderly) {
this.consumeOrderly = true;
this.consumeMessageService =
new ConsumeMessageOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner());
this.consumeMessageService.start();
2.2 ConsumeMessageOrderlyService.start å¯å¨å®æ¶ä»»å¡(é»è®¤é¢ç20s)
public void start() {
if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())) {
// å¦ææ¯é群æ¶è´¹ï¼åå¯å¨å®æ¶ä»»å¡ï¼å®æ¶åbrokeråéæ¹ééä½å½åæ£å¨æ¶è´¹çéåéåçæ¶æ¯ï¼
// å
·ä½æ¯consumer端æ¿å°æ£å¨æ¶è´¹çéåéåï¼åééä½éåçæ¶æ¯è³brokerï¼broker端è¿åéä½æåçéåéå
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
ConsumeMessageOrderlyService.this.lockMQPeriodically();
}, 1000 * 1, ProcessQueue.REBALANCE_LOCK_INTERVAL, TimeUnit.MILLISECONDS);
2.3 åbrokeråéæ¹ééä½å½åæ£å¨æ¶è´¹çéåéåçæ¶æ¯ï¼å
·ä½æ¯consumer端æ¿å°æ£å¨æ¶è´¹çéåéåï¼åééä½éåçæ¶æ¯è³brokerï¼broker端è¿åéä½æåçéåéåãconsumeræ¶å°åï¼è®¾ç½®æ¯å¦éä½æ å¿ä½ãä¿è¯brokerä¸çæ¯ä¸ªæ¶æ¯éååªå¯¹åºä¸ä¸ªæ¶è´¹ç«¯ã
public void lockAll() {
// broker -> brokerä¸ç MessageQueueï¼å½åConsumeræ¶è´¹çMessageQueueï¼
HashMap<String, Set<MessageQueue>> brokerMqs = this.buildProcessQueueTableByBrokerName();
Iterator<Entry<String, Set<MessageQueue>>> it = brokerMqs.entrySet().iterator();
while (it.hasNext()) {
Entry<String, Set<MessageQueue>> entry = it.next();
final String brokerName = entry.getKey();
final Set<MessageQueue> mqs = entry.getValue();
if (mqs.isEmpty())
continue;
// 主è¦è·å broker å°å
FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(brokerName, MixAll.MASTER_ID, true);
if (findBrokerResult != null) {
// ç»è£
æ¹ééå®è¯·æ±
LockBatchRequestBody requestBody = new LockBatchRequestBody();
requestBody.setConsumerGroup(this.consumerGroup);
requestBody.setClientId(this.mQClientFactory.getClientId());
requestBody.setMqSet(mqs); // MessageQueue
try {
// åé请æ±å°Brokerï¼Brokerè¿åéå®çMessageQueueéå
Set<MessageQueue> lockOKMQSet =
this.mQClientFactory.getMQClientAPIImpl().lockBatchMQ(findBrokerResult.getBrokerAddr(), requestBody, 1000);
for (MessageQueue mq : lockOKMQSet) {
ProcessQueue processQueue = this.processQueueTable.get(mq);
if (processQueue != null) {
if (!processQueue.isLocked()) {
log.info("the message queue locked OK, Group: {} {}", this.consumerGroup, mq);
// Brokeréå®çéåï¼å¨æ¬å°å éï¼åé¢æåæ¶æ¯æ¶è´¹æ¶ä¼ç¨å°
processQueue.setLocked(true);
processQueue.setLastLockTimestamp(System.currentTimeMillis());
......
} catch (Exception e) {
log.error("lockBatchMQ exception, " + mqs, e);
3. æ¶è´¹æ¶æ¯æ¶éè¿éå®ç°ä¸²è¡æ§è¡
3.1 DefaultMQPushConsumerImpl.pullMessage æåæ¶æ¯æ交å°ConsumeMessageOrderlyServiceç线ç¨æ± consumeExecutorä¸æ§è¡
DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(
pullResult.getMsgFoundList(),
processQueue,
pullRequest.getMessageQueue(),
dispatchToConsume);
3.2 ConsumeMessageOrderlyService.ConsumeRequest.run æ¶è´¹æ¶æ¯ï¼æ¶è´¹æ¶å¯¹æ¶è´¹éåè¿è¡å éï¼ä¿è¯åä¸ä¸ªæ¶è´¹éåä¸çå¤æ¡æ¶æ¯ä¼ä¸²è¡æ§è¡ï¼
@Override
public void run() {
if (this.processQueue.isDropped()) {
log.warn("run, the message queue not be able to consume, because it's dropped. {}", this.messageQueue);
return;
// è·åå½å MessageQueue çé
final Object objLock = messageQueueLock.fetchLockObject(this.messageQueue);
synchronized (objLock) {
// 广ææ¨¡å¼ æè
ProcessQueueä¸é(lockAll()è¿è¡ä¸é)并ä¸é没æè¿æï¼å¦åå»¶è¿ 10ms åæ§è¡
if (MessageModel.BROADCASTING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())
|| (this.processQueue.isLocked() && !this.processQueue.isLockExpired())) {
final long beginTime = System.currentTimeMillis();
for (boolean continueConsume = true; continueConsume; ) {
if (this.processQueue.isDropped()) {
log.warn("the message queue not be able to consume, because it's dropped. {}", this.messageQueue);
break;
// ProcessQueue æªä¸é
if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())
&& !this.processQueue.isLocked()) {
log.warn("the message queue not locked, so consume later, {}", this.messageQueue);
ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 10);
break;
// ProcessQueue éè¿æ
if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())
&& this.processQueue.isLockExpired()) {
log.warn("the message queue lock expired, so consume later, {}", this.messageQueue);
ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 10);
break;
// æ¶è´¹ä»»å¡ä¸æ¬¡è¿è¡çæ大æ¶é´ãå¯ä»¥éè¿-Drocketmq.client.maxTimeConsumeContinuouslyæ¥è®¾ç½®ï¼é»è®¤ä¸º60sã
long interval = System.currentTimeMillis() - beginTime;
if (interval > MAX_TIME_CONSUME_CONTINUOUSLY) {
ConsumeMessageOrderlyService.this.submitConsumeRequestLater(processQueue, messageQueue, 10);
break;
// æ¶è´¹æ¹æ¬¡å¤§å°ï¼é»è®¤ä¸º1ï¼ ä¹å°±æ¯ä¸ä¸ªä¸ä¸ªæ¶è´¹ï¼å®é
ç产ç¯å¢å¯ä»¥è°æ´å¤§
final int consumeBatchSize =
ConsumeMessageOrderlyService.this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();
// ä»treeMapéé¢ä¾æ¬¡è·å对åºæ°éçæ¶æ¯åºæ¥ï¼åå¾æ¶åå 读åé
List<MessageExt> msgs = this.processQueue.takeMessags(consumeBatchSize);
if (!msgs.isEmpty()) {
......
long beginTimestamp = System.currentTimeMillis();
ConsumeReturnType returnType = ConsumeReturnType.SUCCESS;
boolean hasException = false;
try {
// éåé---é²æ¢é¡ºåºæ¶æ¯è¢«éå¤æ¶è´¹
this.processQueue.getLockConsume().lock();
if (this.processQueue.isDropped()) {
log.warn("consumeMessage, the message queue not be able to consume, because it's dropped. {}",
this.messageQueue);
break;
// è¿è¡æ¶æ¯æ¶è´¹ï¼è¿åæ¶è´¹ç»æ
status = messageListener.consumeMessage(Collections.unmodifiableList(msgs), context);
} catch (Throwable e) {
log.warn("consumeMessage exception: {} Group: {} Msgs: {} MQ: {}",
RemotingHelper.exceptionSimpleDesc(e),
ConsumeMessageOrderlyService.this.consumerGroup,
msgs,
messageQueue);
hasException = true;
} finally {
// æ¶è´¹ééæ¾
this.processQueue.getLockConsume().unlock();
......
// å¤çæ¶è´¹ç»æ
continueConsume = ConsumeMessageOrderlyService.this.processConsumeResult(msgs, status, context, this);
} else {
continueConsume = false;
} else {
// å½éå没æä¸éï¼é£ä¹ä¼èµ°è¿ä¸åï¼ç¶åè¿è¡ä¸éï¼è¿åæç»åä¼éæ°æ§è¡å°ä¸é¢ç代ç éé¢å»
ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 100);