1.rebalance如何做的?[client端]
结论
如果consumer的个数小于messagequeue的个数,会存在有的consumer是空闲的,每次有新的consumer加入或者移除都会重新分配消息队列
集群模式下的分配逻辑是AllocateMessageQueueAveragely.allocate
//currentCID是consumerId
// 平均分配
int index = cidAll.indexOf(currentCID); // 第几个consumer。
int mod = mqAll.size() % cidAll.size(); // 余数,即多少消息队列无法平均分配。
int averageSize =
mqAll.size() <= cidAll.size() ? 1 : (mod > 0 && index < mod ? mqAll.size() / cidAll.size()
+ 1 : mqAll.size() / cidAll.size());
// 有余数的情况下,[0, mod) 平分余数,即每consumer多分配一个节点;第index开始,跳过前mod余数。
int startIndex = (mod > 0 && index < mod) ? index * averageSize : index * averageSize + mod;
// 分配队列数量。之所以要Math.min()的原因是,mqAll.size() <= cidAll.size(),部分consumer分配不到消费队列。
int range = Math.min(averageSize, mqAll.size() - startIndex);
for (int i = 0; i < range; i++) {
result.add(mqAll.get((startIndex + i) % mqAll.size()));
}
return result;
逻辑
DefaultMQPushConsumerImpl类
// 设置负载均衡器
this.rebalanceImpl.setConsumerGroup(this.defaultMQPushConsumer.getConsumerGroup());
this.rebalanceImpl.setMessageModel(this.defaultMQPushConsumer.getMessageModel());
this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPushConsumer.getAllocateMessageQueueStrategy());
this.rebalanceImpl.setmQClientFactory(this.mQClientFactory);
mQClientFactory.start();[MQClientInstance类型]
调用RebalanceService.start方法,其实RebalanceService是一个thread
@Override
public void run() {
log.info(this.getServiceName() + " service started");
while (!this.isStopped()) {
this.waitForRunning(waitInterval);
this.mqClientFactory.doRebalance();
}
log.info(this.getServiceName() + " service end");
}
在MQClientInstance类中循环private final ConcurrentHashMap<String/* group */, MQConsumerInner> consumerTable = new ConcurrentHashMap<>();
/**
* 消费者进行平衡
*/
public void doRebalance() {
for (Map.Entry<String, MQConsumerInner> entry : this.consumerTable.entrySet()) {
MQConsumerInner impl = entry.getValue();
if (impl != null) {
try {
impl.doRebalance();
} catch (Throwable e) {
log.error("doRebalance exception", e);
}
}
}
}
循环protected final ConcurrentHashMap<String /* topic */, SubscriptionData> subscriptionInner = new ConcurrentHashMap<>();
根据每个topic得到messagequeue信息,通过此对象protected final ConcurrentHashMap<String/* topic */, Set
此时区分是集群模式还是广播模式(默认线上都是集群模式)
根据RequestCode.GET_CONSUMER_LIST_BY_GROUP的请求找到消费者分组下的consumerId列表
此时获取到了当前分组下的consumerIds[cidAll]和topic下的所有messageQueue[mqSet],开始分配 ,分配方式有四种
采用AllocateMessageQueueAveragely.allocate此方法进行分配
分配完成之后找到可以发送的消息队列,在updateProcessQueueTableInRebalance方法中
- 移除 在processQueueTable && 不存在于 mqSet 里的消息队列
- 增加 不在processQueueTable && 存在于mqSet 里的消息队列
如果nextOffset>0,发起拉取消息的请求,存储在PullMessageQueue#this.pullRequestQueue.put(pullRequest);对象中
PullMessageQueue会拉取消息
@Override
public void run() {
log.info(this.getServiceName() + " service started");
while (!this.isStopped()) {
try {
PullRequest pullRequest = this.pullRequestQueue.take();
if (pullRequest != null) {
this.pullMessage(pullRequest);
}
} catch (InterruptedException e) {
} catch (Exception e) {
log.error("Pull Message Service Run Method exception", e);
}
}
log.info(this.getServiceName() + " service end");
}
processQueueTable变量移除未订阅的消息队列
/**
* 移除未订阅的消息队列
*/
private void truncateMessageQueueNotMyTopic() {
Map<String, SubscriptionData> subTable = this.getSubscriptionInner();
for (MessageQueue mq : this.processQueueTable.keySet()) {
if (!subTable.containsKey(mq.getTopic())) {
ProcessQueue pq = this.processQueueTable.remove(mq);
if (pq != null) {
pq.setDropped(true);
log.info("doRebalance, {}, truncateMessageQueueNotMyTopic remove unnecessary mq, {}", consumerGroup, mq);
}
}
}
}
2.nameserv是如何感知broker,client[producer,consumer]的变化的?
结论
broker端,client端在启动时会注册网络连接的监听事件[ChannelEventListener],两个实现类分别对应broker端和client端
broker的机器宕机,nameserv无法实时感知,只能通过定时心跳机制,判断机器宕机,之后关闭连接,删除此brokerId相关的数据。
当新加的机器,每次启动时都会走一次注册逻辑,所以nameserv肯定是有感知的。
broker端
每个broker实例循环注册到所有nameserv上
当2分钟内没有收到broker的心跳[关闭和注销channel],将broker从brokerLiveTable中移除。RouteInfoManager#onChannelDestroy方法。此时nameserv感知到broker的变化,更改topic与队列的对应关系,client有定时任务按时获取这些信息
client端
大体逻辑和broker端类似
nameserv端
首先nameserv之间互不通信【单台nameserv挂掉,不影响其他的】,其次每间隔时间都会主动拉取可用的nameserv信息缓存
逻辑
broker层依据
每个broker注册到所有nameserv上依据
定时任务,每5分钟一次,代码在broker启动时,注册的定时任务,启动延迟10s后开始执行
List<String> nameServerAddressList = this.remotingClient.getNameServerAddressList();
if (nameServerAddressList != null) {
for (String namesrvAddr : nameServerAddressList) { // 循环多个 Namesrv
try {
//分别向NameServer注册
RegisterBrokerResult result = this.registerBroker(namesrvAddr, clusterName, brokerAddr, brokerName, brokerId,
haServerAddr, topicConfigWrapper, filterServerList, oneway, timeoutMills);
if (result != null) {
registerBrokerResult = result;
}
log.info("register broker to name server {} OK", namesrvAddr);
} catch (Exception e) {
log.warn("registerBroker Exception, {}", namesrvAddr, e);
}
}
}
定时任务移除broker依据
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
NamesrvController.this.routeInfoManager.scanNotActiveBroker();
}
}, 5, 10, TimeUnit.SECONDS);
3.broker的master宕机,slave是怎么提供服务的?master是怎么切换回来的?
前提
不考虑消息在master处积压数超过内存总量40%的情况,此时和下面的逻辑不一致,会走slave
结论
consumer拉取消息的时候设置每次brokerId为主节点,拉取完更新本地缓存suggest
master ip1 0
slave1 ip2 1
slave2 ip3 2
下次拉取,master宕机,之后在根据brokername选取的brokerIdMap里面随即找一个可用的brokerId去拉取消息,master恢复之后,因为每次设置的都是从主节点拉取,所以直接又切换回master了。
逻辑
PullApiWrapper#findBrokerAddressInSubscribe和PullApiWrapper#recalculatePullFromWhichNode
4.何时提交offset到broker?
介绍
-
提交消费任务到ConsumeMessageService
-
RemoteBrokerOffsetStore:
Consumer
消费进度管理,负责从Broker
获取消费进度,同步消费进度到Broker
。 -
RebalancePushImpl#computePullFromWhere(…) 消费进度读取
结论
每隔一段时间将各个队列的消费进度存储到对应的broker上,该时间由DefaultMQPushConsumer的persistConsumerOffsetInterval属性控制,默认为5秒。
逻辑
DefaultMQPushConsumerImpl#pullCallback#OFFSET_ILLEGAL
DefaultMQPushConsumerImpl.this.offsetStore.persist(pullRequest.getMessageQueue());
把要处理的数据存储在变量offsetTable里面
private ConcurrentHashMap<String/* topic@group */, ConcurrentHashMap<Integer, Long>> offsetTable = new ConcurrentHashMap<>(512);
每次只提交最小的连续处理过的offset(此部分具体代码没有看)
4.2更新消费进度
{
"offsetTable":{
"TopicTest@please_rename_unique_group_name_4":{0:250,1:250,2:249,3:250
},
"%RETRY%please_rename_unique_group_name_4@please_rename_unique_group_name_4":{0:0
}
}
}
5.主从如何复制
结论
逻辑
HAService实现,没看懂,回头再反复揣摩
slave-master:上报commitlog已经同步到的物理位置
master-slave:传输新的commitlog
reference
http://www.iocoder.cn/RocketMQ/high-availability/
6.如何刷盘
结论
例如:异步刷盘+不开启字节缓冲区,FlushRealTimeService
找到要刷盘的MappedFile
每500ms对CommitLog进行一次Flush,当新写入数据超过16KB,或者距离上次Flush的时间间隔超过10S,将CommitLog位于内存中的数据同步到磁盘文件
逻辑
broker启动时,初始化了刷盘服务,在producer发送消息CommitLog.putMessage之后,进行了刷盘操作
7.consumequeue什么时候刷盘?
结论
重试三次后,强制刷盘
每分钟执行一次
多个consumequeue同时flush[countDownLatch]
逻辑
FlushConsumeQueueService
8.拉取消息怎么存储?
结论
方式一 :调用 readGetMessageResult(...)
获取消息内容到堆内内存,设置到 响应body
。
方式二 :基于 zero-copy
实现,直接响应,无需堆内内存,性能更优。
9.consumer消费失败,消息怎么办?
结论
Broker
会存储发回的消息。这样,下次 Consumer
拉取该消息,能够从 CommitLog
和 ConsumeQueue
顺序读取。但是topic就是retry_XXX了
逻辑
SendMessageProcessor#consumerSendMsgBack(…)
10.没有梳理明白
- 用处是什么?
org.apache.rocketmq.broker.slave.SlaveSynchronize
类,把topic,offset,订阅关系subscribe,delayOffset写到文件里面。
11.顺序消费
http://www.iocoder.cn/RocketMQ/message-send-and-consume-orderly/ https://blog.csdn.net/qq_27529917/article/details/79702435
12.发送失败怎么办
reference:https://www.cnblogs.com/notlate/p/11616260.html
- 同步
- 异步
- oneway
如果是同步
- 有发送结果,没有出错===必须发送成功,且返回ok才认为成功,否则判断是否打开存储失败重试开关(retryAnotherBrokerWhenNotStoreOK),如果发送失败且开关打开,那么就进行重试(总次数=1+发送失败重试次数)。
- 无发送结果,catch异常(好几种异常,见代码),重试
private SendResult sendDefaultImpl(//
Message msg, //
final CommunicationMode communicationMode, //
final SendCallback sendCallback, //
final long timeout//
) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
// 校验 Producer 处于运行状态
this.makeSureStateOK();
// 校验消息格式
Validators.checkMessage(msg, this.defaultMQProducer);
//
final long invokeID = random.nextLong(); // 调用编号;用于下面打印日志,标记为同一次发送消息
long beginTimestampFirst = System.currentTimeMillis();
long beginTimestampPrev = beginTimestampFirst;
@SuppressWarnings("UnusedAssignment")
long endTimestamp = beginTimestampFirst;
// 获取 Topic路由信息
TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
if (topicPublishInfo != null && topicPublishInfo.ok()) {
MessageQueue mq = null; // 最后选择消息要发送到的队列
Exception exception = null;
SendResult sendResult = null; // 最后一次发送结果
int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1; // 同步多次调用
int times = 0; // 第几次发送
String[] brokersSent = new String[timesTotal]; // 存储每次发送消息选择的broker名
// 循环调用发送消息,直到成功
for (; times < timesTotal; times++) {
String lastBrokerName = null == mq ? null : mq.getBrokerName();
//在一次消息发送过程中,可能会多次执行选择消息队列这个方法,lastBrokerName就是上一次选择的执行发送消息失败的Broker。
@SuppressWarnings("SpellCheckingInspection")
MessageQueue tmpmq = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName); // 选择消息要发送到的队列
if (tmpmq != null) {
mq = tmpmq;
brokersSent[times] = mq.getBrokerName();
try {
beginTimestampPrev = System.currentTimeMillis();
// 调用发送消息核心方法
sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout);
endTimestamp = System.currentTimeMillis();
// 更新Broker可用性信息
this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
switch (communicationMode) {
case ASYNC:
return null;
case ONEWAY:
return null;
case SYNC:
if (sendResult.getSendStatus() != SendStatus.SEND_OK) {
if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) { // 同步发送成功但存储有问题时 && 配置存储异常时重新发送开关 时,进行重试
continue;
}
}
return sendResult;
default:
break;
}
} catch (RemotingException e) { // 打印异常,更新Broker可用性信息,更新继续循环
endTimestamp = System.currentTimeMillis();
this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
log.warn(msg.toString());
exception = e;
continue;
} catch (MQClientException e) { // 打印异常,更新Broker可用性信息,继续循环
endTimestamp = System.currentTimeMillis();
this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
log.warn(msg.toString());
exception = e;
continue;
} catch (MQBrokerException e) { // 打印异常,更新Broker可用性信息,部分情况下的异常,直接返回,结束循环
endTimestamp = System.currentTimeMillis();
this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
log.warn(msg.toString());
exception = e;
switch (e.getResponseCode()) {
// 如下异常continue,进行发送消息重试
case ResponseCode.TOPIC_NOT_EXIST:
case ResponseCode.SERVICE_NOT_AVAILABLE:
case ResponseCode.SYSTEM_ERROR:
case ResponseCode.NO_PERMISSION:
case ResponseCode.NO_BUYER_ID:
case ResponseCode.NOT_IN_CURRENT_UNIT:
continue;
// 如果有发送结果,进行返回,否则,抛出异常;
default:
if (sendResult != null) {
return sendResult;
}
throw e;
}
} catch (InterruptedException e) {
endTimestamp = System.currentTimeMillis();
this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
log.warn(String.format("sendKernelImpl exception, throw exception, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
log.warn(msg.toString());
throw e;
}
} else {
break;
}
}
// 返回发送结果
if (sendResult != null) {
return sendResult;
}
// 根据不同情况,抛出不同的异常
String info = String.format("Send [%d] times, still failed, cost [%d]ms, Topic: %s, BrokersSent: %s", times, System.currentTimeMillis() - beginTimestampFirst,
msg.getTopic(), Arrays.toString(brokersSent)) + FAQUrl.suggestTodo(FAQUrl.SEND_MSG_FAILED);
MQClientException mqClientException = new MQClientException(info, exception);
if (exception instanceof MQBrokerException) {
mqClientException.setResponseCode(((MQBrokerException) exception).getResponseCode());
} else if (exception instanceof RemotingConnectException) {
mqClientException.setResponseCode(ClientErrorCode.CONNECT_BROKER_EXCEPTION);
} else if (exception instanceof RemotingTimeoutException) {
mqClientException.setResponseCode(ClientErrorCode.ACCESS_BROKER_TIMEOUT);
} else if (exception instanceof MQClientException) {
mqClientException.setResponseCode(ClientErrorCode.BROKER_NOT_EXIST_EXCEPTION);
}
throw mqClientException;
}
// Namesrv找不到异常
List<String> nsList = this.getmQClientFactory().getMQClientAPIImpl().getNameServerAddressList();
if (null == nsList || nsList.isEmpty()) {
throw new MQClientException(
"No name server address, please set it." + FAQUrl.suggestTodo(FAQUrl.NAME_SERVER_ADDR_NOT_EXIST_URL), null).setResponseCode(ClientErrorCode.NO_NAME_SERVER_EXCEPTION);
}
// 消息路由找不到异常
throw new MQClientException("No route info of this topic, " + msg.getTopic() + FAQUrl.suggestTodo(FAQUrl.NO_TOPIC_ROUTE_INFO),
null).setResponseCode(ClientErrorCode.NOT_FOUND_TOPIC_EXCEPTION);
}
如果是oneway,异步的话直接return null,此时有一个问题就是异步是怎么重试的?
- oneway
- 异步
rocketmq对于同一个时间发送的one-way和异步消息的线程个数是有约束的,不像同步那样进行阻塞,防止发送太多占用太多资源(约束的实现是通过信号量)
public NettyRemotingAbstract(final int permitsOneway, final int permitsAsync) {
this.semaphoreOneway = new Semaphore(permitsOneway, true);
this.semaphoreAsync = new Semaphore(permitsAsync, true);
}
public void invokeAsyncImpl(final Channel channel, final RemotingCommand request, final long timeoutMillis,
final InvokeCallback invokeCallback)
throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException {
final int opaque = request.getOpaque();
boolean acquired = this.semaphoreAsync.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS);
if (acquired) {
final SemaphoreReleaseOnlyOnce once = new SemaphoreReleaseOnlyOnce(this.semaphoreAsync);
final ResponseFuture responseFuture = new ResponseFuture(opaque, timeoutMillis, invokeCallback, once);
this.responseTable.put(opaque, responseFuture);
。。。。
}
-
对于one-way,this.semaphoreOneway.tryAcquire成功,意味着线程竞争资源拿到,走到writeandFlush以后,只要没有异常,那么就认为消息发送完毕,不需要再check任何内容。
-
对于同步,构造了一个future的map成员responseTable,发出去的请求后,这个线程利用这个future阻塞等待,等待啥呢?既然是同步请求,肯定是等待response。基于netty通信的话,response一般是在其他线程,所以阻塞在这里,下面是ResponseFuture responseFuture的阻塞和唤醒代码
public RemotingCommand invokeSyncImpl(final Channel channel, final RemotingCommand request, final long timeoutMillis)
throws InterruptedException, RemotingSendRequestException, RemotingTimeoutException {
final int opaque = request.getOpaque();
try {
final ResponseFuture responseFuture = new ResponseFuture(opaque, timeoutMillis, null, null);
this.responseTable.put(opaque, responseFuture);
final SocketAddress addr = channel.remoteAddress();
channel.writeAndFlush(request).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture f) throws Exception {
if (f.isSuccess()) {
responseFuture.setSendRequestOK(true);
return;
} else {
responseFuture.setSendRequestOK(false);
}
responseTable.remove(opaque);
responseFuture.setCause(f.cause());
responseFuture.putResponse(null);
PLOG.warn("send a request command to channel <" + addr + "> failed.");
}
});
RemotingCommand responseCommand = responseFuture.waitResponse(timeoutMillis);
if (null == responseCommand) {
if (responseFuture.isSendRequestOK()) {
throw new RemotingTimeoutException(RemotingHelper.parseSocketAddressAddr(addr), timeoutMillis,
responseFuture.getCause());
} else {
throw new RemotingSendRequestException(RemotingHelper.parseSocketAddressAddr(addr), responseFuture.getCause());
}
}
return responseCommand;
} finally {
this.responseTable.remove(opaque);
}
}
public class ResponseFuture {
private final int opaque;
private final long timeoutMillis;
private final InvokeCallback invokeCallback;
private final long beginTimestamp = System.currentTimeMillis();
private final CountDownLatch countDownLatch = new CountDownLatch(1);
。。。
}
这个putResponse是上面代码发送的时候已经失败,那么快速put一个null进去,否则是在processResponseCommand等待接收响应。拿到响应以后就是一个同步请求的完成流程。
- 对于异步,执行invokeAsyncImpl,同时设置一个回调executeInvokeCallback,失败的话重试onExceptionImpl