22 May 2019

1.rebalance如何做的?[client端]

结论

如果consumer的个数小于messagequeue的个数,会存在有的consumer是空闲的,每次有新的consumer加入或者移除都会重新分配消息队列

集群模式下的分配逻辑是AllocateMessageQueueAveragely.allocate

//currentCID是consumerId
// 平均分配
int index = cidAll.indexOf(currentCID); // 第几个consumer。
int mod = mqAll.size() % cidAll.size(); // 余数,即多少消息队列无法平均分配。
int averageSize =
    mqAll.size() <= cidAll.size() ? 1 : (mod > 0 && index < mod ? mqAll.size() / cidAll.size()
        + 1 : mqAll.size() / cidAll.size());
// 有余数的情况下,[0, mod) 平分余数,即每consumer多分配一个节点;第index开始,跳过前mod余数。
int startIndex = (mod > 0 && index < mod) ? index * averageSize : index * averageSize + mod;
// 分配队列数量。之所以要Math.min()的原因是,mqAll.size() <= cidAll.size(),部分consumer分配不到消费队列。
int range = Math.min(averageSize, mqAll.size() - startIndex);

for (int i = 0; i < range; i++) {
    result.add(mqAll.get((startIndex + i) % mqAll.size()));
}
return result;

逻辑

DefaultMQPushConsumerImpl类
// 设置负载均衡器
this.rebalanceImpl.setConsumerGroup(this.defaultMQPushConsumer.getConsumerGroup());
this.rebalanceImpl.setMessageModel(this.defaultMQPushConsumer.getMessageModel());
this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPushConsumer.getAllocateMessageQueueStrategy());
this.rebalanceImpl.setmQClientFactory(this.mQClientFactory);
mQClientFactory.start();[MQClientInstance类型]
调用RebalanceService.start方法,其实RebalanceService是一个thread
@Override
public void run() {
    log.info(this.getServiceName() + " service started");

    while (!this.isStopped()) {
        this.waitForRunning(waitInterval);
        this.mqClientFactory.doRebalance();
    }

    log.info(this.getServiceName() + " service end");
}

在MQClientInstance类中循环private final ConcurrentHashMap<String/* group */, MQConsumerInner> consumerTable = new ConcurrentHashMap<>();

/**
 * 消费者进行平衡
 */
public void doRebalance() {
    for (Map.Entry<String, MQConsumerInner> entry : this.consumerTable.entrySet()) {
        MQConsumerInner impl = entry.getValue();
        if (impl != null) {
            try {
                impl.doRebalance();
            } catch (Throwable e) {
                log.error("doRebalance exception", e);
            }
        }
    }
}

循环protected final ConcurrentHashMap<String /* topic */, SubscriptionData> subscriptionInner = new ConcurrentHashMap<>();

根据每个topic得到messagequeue信息,通过此对象protected final ConcurrentHashMap<String/* topic */, Set> topicSubscribeInfoTable = new ConcurrentHashMap<>();

此时区分是集群模式还是广播模式(默认线上都是集群模式)

根据RequestCode.GET_CONSUMER_LIST_BY_GROUP的请求找到消费者分组下的consumerId列表

此时获取到了当前分组下的consumerIds[cidAll]和topic下的所有messageQueue[mqSet],开始分配 ,分配方式有四种
采用AllocateMessageQueueAveragely.allocate此方法进行分配

分配完成之后找到可以发送的消息队列,在updateProcessQueueTableInRebalance方法中

- 移除 在processQueueTable && 不存在于 mqSet 里的消息队列
- 增加 不在processQueueTable && 存在于mqSet 里的消息队列
如果nextOffset>0,发起拉取消息的请求,存储在PullMessageQueue#this.pullRequestQueue.put(pullRequest);对象中

PullMessageQueue会拉取消息

@Override
public void run() {
    log.info(this.getServiceName() + " service started");

    while (!this.isStopped()) {
        try {
            PullRequest pullRequest = this.pullRequestQueue.take();
            if (pullRequest != null) {
                this.pullMessage(pullRequest);
            }
        } catch (InterruptedException e) {
        } catch (Exception e) {
            log.error("Pull Message Service Run Method exception", e);
        }
    }

    log.info(this.getServiceName() + " service end");
}
processQueueTable变量移除未订阅的消息队列
/**
 * 移除未订阅的消息队列
 */
private void truncateMessageQueueNotMyTopic() {
    Map<String, SubscriptionData> subTable = this.getSubscriptionInner();
    for (MessageQueue mq : this.processQueueTable.keySet()) {
        if (!subTable.containsKey(mq.getTopic())) {

            ProcessQueue pq = this.processQueueTable.remove(mq);
            if (pq != null) {
                pq.setDropped(true);
                log.info("doRebalance, {}, truncateMessageQueueNotMyTopic remove unnecessary mq, {}", consumerGroup, mq);
            }
        }
    }
}

2.nameserv是如何感知broker,client[producer,consumer]的变化的?

结论

broker端,client端在启动时会注册网络连接的监听事件[ChannelEventListener],两个实现类分别对应broker端和client端

broker的机器宕机,nameserv无法实时感知,只能通过定时心跳机制,判断机器宕机,之后关闭连接,删除此brokerId相关的数据。

当新加的机器,每次启动时都会走一次注册逻辑,所以nameserv肯定是有感知的。

broker端

每个broker实例循环注册到所有nameserv上

当2分钟内没有收到broker的心跳[关闭和注销channel],将broker从brokerLiveTable中移除。RouteInfoManager#onChannelDestroy方法。此时nameserv感知到broker的变化,更改topic与队列的对应关系,client有定时任务按时获取这些信息

client端

大体逻辑和broker端类似

nameserv端

首先nameserv之间互不通信【单台nameserv挂掉,不影响其他的】,其次每间隔时间都会主动拉取可用的nameserv信息缓存

逻辑

broker层依据

每个broker注册到所有nameserv上依据

定时任务,每5分钟一次,代码在broker启动时,注册的定时任务,启动延迟10s后开始执行

List<String> nameServerAddressList = this.remotingClient.getNameServerAddressList();
if (nameServerAddressList != null) {
    for (String namesrvAddr : nameServerAddressList) { // 循环多个 Namesrv
        try {
            //分别向NameServer注册
            RegisterBrokerResult result = this.registerBroker(namesrvAddr, clusterName, brokerAddr, brokerName, brokerId,
                haServerAddr, topicConfigWrapper, filterServerList, oneway, timeoutMills);
            if (result != null) {
                registerBrokerResult = result;
            }

            log.info("register broker to name server {} OK", namesrvAddr);
        } catch (Exception e) {
            log.warn("registerBroker Exception, {}", namesrvAddr, e);
        }
    }
}

定时任务移除broker依据

this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

    @Override
    public void run() {
        NamesrvController.this.routeInfoManager.scanNotActiveBroker();
    }
}, 5, 10, TimeUnit.SECONDS);

3.broker的master宕机,slave是怎么提供服务的?master是怎么切换回来的?

前提

不考虑消息在master处积压数超过内存总量40%的情况,此时和下面的逻辑不一致,会走slave

结论

consumer拉取消息的时候设置每次brokerId为主节点,拉取完更新本地缓存suggest

master   ip1   0
slave1   ip2   1
slave2   ip3   2

下次拉取,master宕机,之后在根据brokername选取的brokerIdMap里面随即找一个可用的brokerId去拉取消息,master恢复之后,因为每次设置的都是从主节点拉取,所以直接又切换回master了。

逻辑

PullApiWrapper#findBrokerAddressInSubscribe和PullApiWrapper#recalculatePullFromWhichNode

4.何时提交offset到broker?

介绍

  • 提交消费任务到ConsumeMessageService

  • RemoteBrokerOffsetStore:Consumer 消费进度管理,负责从 Broker 获取消费进度,同步消费进度到 Broker

  • RebalancePushImpl#computePullFromWhere(…) 消费进度读取

结论

每隔一段时间将各个队列的消费进度存储到对应的broker上,该时间由DefaultMQPushConsumer的persistConsumerOffsetInterval属性控制,默认为5秒。

逻辑

DefaultMQPushConsumerImpl#pullCallback#OFFSET_ILLEGAL

DefaultMQPushConsumerImpl.this.offsetStore.persist(pullRequest.getMessageQueue());

把要处理的数据存储在变量offsetTable里面

private ConcurrentHashMap<String/* topic@group */, ConcurrentHashMap<Integer, Long>> offsetTable = new ConcurrentHashMap<>(512);

每次只提交最小的连续处理过的offset(此部分具体代码没有看)

4.2更新消费进度

{
	"offsetTable":{
		"TopicTest@please_rename_unique_group_name_4":{0:250,1:250,2:249,3:250
		},
		"%RETRY%please_rename_unique_group_name_4@please_rename_unique_group_name_4":{0:0
		}
	}
}

5.主从如何复制

结论

逻辑

HAService实现,没看懂,回头再反复揣摩

slave-master:上报commitlog已经同步到的物理位置

master-slave:传输新的commitlog

reference

http://www.iocoder.cn/RocketMQ/high-availability/

6.如何刷盘

结论

例如:异步刷盘+不开启字节缓冲区,FlushRealTimeService

找到要刷盘的MappedFile

每500ms对CommitLog进行一次Flush,当新写入数据超过16KB,或者距离上次Flush的时间间隔超过10S,将CommitLog位于内存中的数据同步到磁盘文件

逻辑

broker启动时,初始化了刷盘服务,在producer发送消息CommitLog.putMessage之后,进行了刷盘操作

7.consumequeue什么时候刷盘?

结论

重试三次后,强制刷盘

每分钟执行一次

多个consumequeue同时flush[countDownLatch]

逻辑

FlushConsumeQueueService

8.拉取消息怎么存储?

结论

方式一 :调用 readGetMessageResult(...) 获取消息内容到堆内内存,设置到 响应body

方式二 :基于 zero-copy 实现,直接响应,无需堆内内存,性能更优。

9.consumer消费失败,消息怎么办?

结论

Broker 会存储发回的消息。这样,下次 Consumer 拉取该消息,能够从 CommitLogConsumeQueue 顺序读取。但是topic就是retry_XXX了

逻辑

SendMessageProcessor#consumerSendMsgBack(…)

10.没有梳理明白

  • 用处是什么?org.apache.rocketmq.broker.slave.SlaveSynchronize 类,把topic,offset,订阅关系subscribe,delayOffset写到文件里面。

11.顺序消费

http://www.iocoder.cn/RocketMQ/message-send-and-consume-orderly/ https://blog.csdn.net/qq_27529917/article/details/79702435

12.发送失败怎么办

reference:https://www.cnblogs.com/notlate/p/11616260.html

  • 同步
  • 异步
  • oneway

如果是同步

  • 有发送结果,没有出错===必须发送成功,且返回ok才认为成功,否则判断是否打开存储失败重试开关(retryAnotherBrokerWhenNotStoreOK),如果发送失败且开关打开,那么就进行重试(总次数=1+发送失败重试次数)。
  • 无发送结果,catch异常(好几种异常,见代码),重试
private SendResult sendDefaultImpl(//
    Message msg, //
    final CommunicationMode communicationMode, //
    final SendCallback sendCallback, //
    final long timeout//
) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
    // 校验 Producer 处于运行状态
    this.makeSureStateOK();
    // 校验消息格式
    Validators.checkMessage(msg, this.defaultMQProducer);
    //
    final long invokeID = random.nextLong(); // 调用编号;用于下面打印日志,标记为同一次发送消息
    long beginTimestampFirst = System.currentTimeMillis();
    long beginTimestampPrev = beginTimestampFirst;
    @SuppressWarnings("UnusedAssignment")
    long endTimestamp = beginTimestampFirst;
    // 获取 Topic路由信息
    TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
    if (topicPublishInfo != null && topicPublishInfo.ok()) {
        MessageQueue mq = null; // 最后选择消息要发送到的队列
        Exception exception = null;
        SendResult sendResult = null; // 最后一次发送结果
        int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1; // 同步多次调用
        int times = 0; // 第几次发送
        String[] brokersSent = new String[timesTotal]; // 存储每次发送消息选择的broker名
        // 循环调用发送消息,直到成功
        for (; times < timesTotal; times++) {
            String lastBrokerName = null == mq ? null : mq.getBrokerName();
            //在一次消息发送过程中,可能会多次执行选择消息队列这个方法,lastBrokerName就是上一次选择的执行发送消息失败的Broker。
            @SuppressWarnings("SpellCheckingInspection")
            MessageQueue tmpmq = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName); // 选择消息要发送到的队列
            if (tmpmq != null) {
                mq = tmpmq;
                brokersSent[times] = mq.getBrokerName();
                try {
                    beginTimestampPrev = System.currentTimeMillis();
                    // 调用发送消息核心方法
                    sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout);
                    endTimestamp = System.currentTimeMillis();
                    // 更新Broker可用性信息
                    this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
                    switch (communicationMode) {
                        case ASYNC:
                            return null;
                        case ONEWAY:
                            return null;
                        case SYNC:
                            if (sendResult.getSendStatus() != SendStatus.SEND_OK) {
                                if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) { // 同步发送成功但存储有问题时 && 配置存储异常时重新发送开关 时,进行重试
                                    continue;
                                }
                            }
                            return sendResult;
                        default:
                            break;
                    }
                } catch (RemotingException e) { // 打印异常,更新Broker可用性信息,更新继续循环
                    endTimestamp = System.currentTimeMillis();
                    this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
                    log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
                    log.warn(msg.toString());
                    exception = e;
                    continue;
                } catch (MQClientException e) { // 打印异常,更新Broker可用性信息,继续循环
                    endTimestamp = System.currentTimeMillis();
                    this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
                    log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
                    log.warn(msg.toString());
                    exception = e;
                    continue;
                } catch (MQBrokerException e) { // 打印异常,更新Broker可用性信息,部分情况下的异常,直接返回,结束循环
                    endTimestamp = System.currentTimeMillis();
                    this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
                    log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
                    log.warn(msg.toString());
                    exception = e;
                    switch (e.getResponseCode()) {
                        // 如下异常continue,进行发送消息重试
                        case ResponseCode.TOPIC_NOT_EXIST:
                        case ResponseCode.SERVICE_NOT_AVAILABLE:
                        case ResponseCode.SYSTEM_ERROR:
                        case ResponseCode.NO_PERMISSION:
                        case ResponseCode.NO_BUYER_ID:
                        case ResponseCode.NOT_IN_CURRENT_UNIT:
                            continue;
                        // 如果有发送结果,进行返回,否则,抛出异常;
                        default:
                            if (sendResult != null) {
                                return sendResult;
                            }
                            throw e;
                    }
                } catch (InterruptedException e) {
                    endTimestamp = System.currentTimeMillis();
                    this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
                    log.warn(String.format("sendKernelImpl exception, throw exception, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
                    log.warn(msg.toString());
                    throw e;
                }
            } else {
                break;
            }
        }
        // 返回发送结果
        if (sendResult != null) {
            return sendResult;
        }
        // 根据不同情况,抛出不同的异常
        String info = String.format("Send [%d] times, still failed, cost [%d]ms, Topic: %s, BrokersSent: %s", times, System.currentTimeMillis() - beginTimestampFirst,
                msg.getTopic(), Arrays.toString(brokersSent)) + FAQUrl.suggestTodo(FAQUrl.SEND_MSG_FAILED);
        MQClientException mqClientException = new MQClientException(info, exception);
        if (exception instanceof MQBrokerException) {
            mqClientException.setResponseCode(((MQBrokerException) exception).getResponseCode());
        } else if (exception instanceof RemotingConnectException) {
            mqClientException.setResponseCode(ClientErrorCode.CONNECT_BROKER_EXCEPTION);
        } else if (exception instanceof RemotingTimeoutException) {
            mqClientException.setResponseCode(ClientErrorCode.ACCESS_BROKER_TIMEOUT);
        } else if (exception instanceof MQClientException) {
            mqClientException.setResponseCode(ClientErrorCode.BROKER_NOT_EXIST_EXCEPTION);
        }
        throw mqClientException;
    }
 // Namesrv找不到异常
        List<String> nsList = this.getmQClientFactory().getMQClientAPIImpl().getNameServerAddressList();
        if (null == nsList || nsList.isEmpty()) {
            throw new MQClientException(
                "No name server address, please set it." + FAQUrl.suggestTodo(FAQUrl.NAME_SERVER_ADDR_NOT_EXIST_URL), null).setResponseCode(ClientErrorCode.NO_NAME_SERVER_EXCEPTION);
        }
        // 消息路由找不到异常
        throw new MQClientException("No route info of this topic, " + msg.getTopic() + FAQUrl.suggestTodo(FAQUrl.NO_TOPIC_ROUTE_INFO),
            null).setResponseCode(ClientErrorCode.NOT_FOUND_TOPIC_EXCEPTION);
}

如果是oneway,异步的话直接return null,此时有一个问题就是异步是怎么重试的?

  • oneway
  • 异步

rocketmq对于同一个时间发送的one-way和异步消息的线程个数是有约束的,不像同步那样进行阻塞,防止发送太多占用太多资源(约束的实现是通过信号量)

public NettyRemotingAbstract(final int permitsOneway, final int permitsAsync) {
    this.semaphoreOneway = new Semaphore(permitsOneway, true);
    this.semaphoreAsync = new Semaphore(permitsAsync, true);
}

 public void invokeAsyncImpl(final Channel channel, final RemotingCommand request, final long timeoutMillis,
        final InvokeCallback invokeCallback)
        throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException {
        final int opaque = request.getOpaque();
        boolean acquired = this.semaphoreAsync.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS);
        if (acquired) {
            final SemaphoreReleaseOnlyOnce once = new SemaphoreReleaseOnlyOnce(this.semaphoreAsync);

            final ResponseFuture responseFuture = new ResponseFuture(opaque, timeoutMillis, invokeCallback, once);
            this.responseTable.put(opaque, responseFuture);
            
            。。。。
}            
  • 对于one-way,this.semaphoreOneway.tryAcquire成功,意味着线程竞争资源拿到,走到writeandFlush以后,只要没有异常,那么就认为消息发送完毕,不需要再check任何内容。

  • 对于同步,构造了一个future的map成员responseTable,发出去的请求后,这个线程利用这个future阻塞等待,等待啥呢?既然是同步请求,肯定是等待response。基于netty通信的话,response一般是在其他线程,所以阻塞在这里,下面是ResponseFuture responseFuture的阻塞和唤醒代码

public RemotingCommand invokeSyncImpl(final Channel channel, final RemotingCommand request, final long timeoutMillis)
    throws InterruptedException, RemotingSendRequestException, RemotingTimeoutException {
    final int opaque = request.getOpaque();

    try {
        final ResponseFuture responseFuture = new ResponseFuture(opaque, timeoutMillis, null, null);
        this.responseTable.put(opaque, responseFuture);
        final SocketAddress addr = channel.remoteAddress();
        channel.writeAndFlush(request).addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture f) throws Exception {
                if (f.isSuccess()) {
                    responseFuture.setSendRequestOK(true);
                    return;
                } else {
                    responseFuture.setSendRequestOK(false);
                }

                responseTable.remove(opaque);
                responseFuture.setCause(f.cause());
                responseFuture.putResponse(null);
                PLOG.warn("send a request command to channel <" + addr + "> failed.");
            }
        });

        RemotingCommand responseCommand = responseFuture.waitResponse(timeoutMillis);
        if (null == responseCommand) {
            if (responseFuture.isSendRequestOK()) {
                throw new RemotingTimeoutException(RemotingHelper.parseSocketAddressAddr(addr), timeoutMillis,
                    responseFuture.getCause());
            } else {
                throw new RemotingSendRequestException(RemotingHelper.parseSocketAddressAddr(addr), responseFuture.getCause());
            }
        }

        return responseCommand;
    } finally {
        this.responseTable.remove(opaque);
    }
}
public class ResponseFuture {
    private final int opaque;
    private final long timeoutMillis;
    private final InvokeCallback invokeCallback;
    private final long beginTimestamp = System.currentTimeMillis();
    private final CountDownLatch countDownLatch = new CountDownLatch(1);
    。。。
}    

这个putResponse是上面代码发送的时候已经失败,那么快速put一个null进去,否则是在processResponseCommand等待接收响应。拿到响应以后就是一个同步请求的完成流程。

  • 对于异步,执行invokeAsyncImpl,同时设置一个回调executeInvokeCallback,失败的话重试onExceptionImpl


blog comments powered by Disqus