rocketmq-consumer
前期工作
检查配置
复制订阅信息
在DefaultMQPushConsumer中获取订阅信息Map<String /* topic /, String / sub expression */> subscription
循环构建subscription不同topic下的subscriptionData,之后构建RebalanceImpl的ConcurrentHashMap<String /* topic */, SubscriptionData> subscriptionInner
重复上面的两步,构建retryTopic[%RETRY%FooBarGroup1558007931736]的DefaultMQPushConsumer.subscriptionData,RebalanceImpl.subscriptionInner
获取MQClientFactory对象
构建RebalanceImpl对象
消息分配策略,集群消费时用到该类,该类为消费者分配queue,默认实现是AllocateMessageQueueStrategy
public abstract class RebalanceImpl {
/**
* 消息队列 和 消息处理队列 Map
*/
protected final ConcurrentHashMap<MessageQueue, ProcessQueue> processQueueTable = new ConcurrentHashMap<>(64);
/**
* Topic 和 消息队列 订阅Map
*/
protected final ConcurrentHashMap<String/* topic */, Set<MessageQueue>> topicSubscribeInfoTable = new ConcurrentHashMap<>();
/**
* Topic 和 订阅数据 Map
*/
protected final ConcurrentHashMap<String /* topic */, SubscriptionData> subscriptionInner = new ConcurrentHashMap<>();
/**
* 消费分组
*/
protected String consumerGroup;
/**
* 消息模型
*/
protected MessageModel messageModel;
/**
* 消息分配策略
*/
protected AllocateMessageQueueStrategy allocateMessageQueueStrategy;
/**
* MQ客户端对象
*/
protected MQClientInstance mQClientFactory;
}
构建消费拉取消息PullAPIWrapper
初始化offsetStore[RemoteBrokerOffsetStore]
this.pullMessageService.start();
private final LinkedBlockingQueue<PullRequest> pullRequestQueue = new LinkedBlockingQueue<>();
执行PullMessageService.run方法
@Override
public void run() {
log.info(this.getServiceName() + " service started");
while (!this.isStopped()) {
try {
PullRequest pullRequest = this.pullRequestQueue.take();
if (pullRequest != null) {
this.pullMessage(pullRequest);
}
} catch (InterruptedException e) {
} catch (Exception e) {
log.error("Pull Message Service Run Method exception", e);
}
}
log.info(this.getServiceName() + " service end");
}
DefaultMQPushConsumerImpl.PullCallback回调,处理消费者的拉取结果
PullAPIWrapper.pullKernelImpl拉取消息,发送RequestCode.PULL_MESSAGE的请求
RebalanceService在20s之后启动
MQClientInstance.doRebalance,循环消费者分组ConcurrentHashMap<String/* group */, MQConsumerInner> consumerTable,具体逻辑实现在RebalanceService.doRebalance
public void doRebalance(final boolean isOrder) {
// 分配每个 topic 的消息队列
Map<String, SubscriptionData> subTable = this.getSubscriptionInner();
if (subTable != null) {
for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) {
final String topic = entry.getKey();
try {
this.rebalanceByTopic(topic, isOrder);
} catch (Throwable e) {
if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
log.warn("rebalanceByTopic Exception", e);
}
}
}
}
// 移除未订阅的topic对应的消息队列
this.truncateMessageQueueNotMyTopic();
}
// 重新均衡
this.mQClientFactory.rebalanceImmediately();
拉取消息
pullMessageService.executePullRequestImmediately(createPullRequest());
private PullRequest createPullRequest() {
PullRequest pullRequest = new PullRequest();
pullRequest.setConsumerGroup(consumerGroup);
pullRequest.setNextOffset(1024);
MessageQueue messageQueue = new MessageQueue();
messageQueue.setBrokerName(brokerName);
messageQueue.setQueueId(0);
messageQueue.setTopic(topic);
pullRequest.setMessageQueue(messageQueue);
ProcessQueue processQueue = new ProcessQueue();
processQueue.setLocked(true);
processQueue.setLastLockTimestamp(System.currentTimeMillis());
pullRequest.setProcessQueue(processQueue);
return pullRequest;
}
public void executePullRequestImmediately(final PullRequest pullRequest) {
try {
this.pullRequestQueue.put(pullRequest);
} catch (InterruptedException e) {
log.error("executePullRequestImmediately pullRequestQueue.put", e);
}
}
private final LinkedBlockingQueue<PullRequest> pullRequestQueue = new LinkedBlockingQueue<>();
countDownLatch阻塞了,执行PullMessageService的run方法,也就是pullMessage(final PullRequest pullRequest)
根据requestCode执行PullMessageProcessor.processRequest
如果拉取不到消息case ResponseCode.PULL_NOT_FOUND:当满足条件 (
Broker
允许挂起 && 请求要求挂起),执行挂起请求。详细解析见:PullRequestHoldService。
执行DefaultMessageStore.getMessage(),从commitlog读取数据,更新偏移量,计算剩余的偏移量。
还有PullCallback回调一些逻辑:
设置下次拉取消息的队列位置,提交拉取到的消息到消息处理队列
boolean dispathToConsume = processQueue.putMessage(pullResult.getMsgFoundList());
this.brokerController.getConsumerOffsetManager().commitOffset(XXX)
持久化消费进度,当满足 (Broker
非主 && 请求要求持久化进度)。
延迟消息
ScheduleMessageService#DeliverDelayedMessageTimerTask#executeOnTimeup
消费者发回消息时,可以指定延迟级别,默认级别:1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h,也就是说delayLevel = 3代表延迟10秒后重投递,最大重试次数16对应着2h后投递,每多消费一次投递时间就增长到下个阶段。当延迟级别delayLevel < 0 或超过最大消费次数时,放入Dead Letter Queue,topic名称格式为:%DLQ%+consumeGroup,默认queueId=0,死信队列只能写入,不能消费,这在创建topic时就指定的。
顺序投递延迟消息
Consumer消费消息时,如果返回RECONSUME_LATER,或者主动的sendMessageBack(…,int delayLevel)时,会将消息发回给Broker,Broker对消息做个封装,指定topic为SCHEDULE_TOPIC_XXXX,QueudId=delayLevel-1,若未指定delayLevel,默认是ReConsumeTimes + 3,将封装后的消息存入CommitLog,ReputMessageService为其生成PositionInfo,tagsCode存储延时投递时间,存入”SCHEDULE_TOPIC_XXXX”的ConsumeQueue中。delayLevel有16个,因此最多情况下SCHEDULE_TOPIC_XXXX会有16个ConsumeQueue。Broker启动时,ScheduleMessageService会启动16个线程对应16个delayLevel的读取服务,有序的读取ConsumeQueue里的PositionInfo。ScheduleMessageService会在 [当前时间<=延时投递时间] 时从CommitLog中提取这消息,去除封装,抹去delayLevel属性,从新存入CommitLog,并马上更新延时投递偏移量dealyOffset。ReputMessageService再次为当前消息生成PositionInfo,因为不存在delayLevel,PositionInfo存入Topic为%RETRY%+consumeGroup,queueId为0的ConsumeQueue中。每个消费者在启动时都订阅了自身消费者组的重试队列,当重试队列里有位置信息时,拉取相应消息进行重新消费。消息的第一次重试会发回给原始的消费者(执行sendMessageBack的消费者),之后的多次重试统一由订阅了QueueId = 0 的消费者消费。