15 May 2019

Rocketmq-概念

官网

http://rocketmq.apache.org/docs/quick-start/

广播模式和集群模式

  • 一条消息被多个consumer消费,即使这些consumer属于同一个ConsumerGroup,消息也会被ConsumerGroup中的每个Consumer都消费一次,广播消费中ConsumerGroup概念可以认为在消息划分方面无意义。
  • 一个ConsumerGroup中的Consumer实例平均分摊消费消息。例如某个Topic有9条消息,其中一个ConsumerGroup有3个实例(可能是3个进程,或者3台机器),那么每个实例只消费其中部分,消费完的消息不能被其他实例消费。

queueData

一个queuedata相当于描述了多个messagequeue,取决于queuedata属性readQueueNums,writeQueueNums

private String brokerName;
private int readQueueNums;
private int writeQueueNums;
private int perm;
private int topicSynFlag;

RouteInfoManager.registerBroker()#createAndUpdateQueueData

private void createAndUpdateQueueData(final String brokerName, final TopicConfig topicConfig) {
        QueueData queueData = new QueueData();
        queueData.setBrokerName(brokerName);
        queueData.setWriteQueueNums(topicConfig.getWriteQueueNums());
        queueData.setReadQueueNums(topicConfig.getReadQueueNums());
        queueData.setPerm(topicConfig.getPerm());
        queueData.setTopicSynFlag(topicConfig.getTopicSysFlag());

        List<QueueData> queueDataList = this.topicQueueTable.get(topicConfig.getTopicName());
        if (null == queueDataList) {
            queueDataList = new LinkedList<QueueData>();
            queueDataList.add(queueData);
            this.topicQueueTable.put(topicConfig.getTopicName(), queueDataList);
            log.info("new topic registered, {} {}", topicConfig.getTopicName(), queueData);
        } else {
            //省略
        }
    }

queuedata-messagequeue

public static Set<MessageQueue> topicRouteData2TopicSubscribeInfo
(final String topic, final TopicRouteData route) 
{
        Set<MessageQueue> mqList = new HashSet<MessageQueue>();
        List<QueueData> qds = route.getQueueDatas();
        for (QueueData qd : qds) {
            if (PermName.isReadable(qd.getPerm())) {
                for (int i = 0; i < qd.getReadQueueNums(); i++) {
                    MessageQueue mq = new MessageQueue(topic, qd.getBrokerName(), i);
                    mqList.add(mq);
                }
            }
        }

        return mqList;
}

topic-queuedata



topic-broker

topic通过queuedata的brokername属性关联到broker

messagequeue-broker

private String topic;
private String brokerName;
private int queueId;

队列message记录了brokername

queuedata-broker

queuedata记录了brokername

ConsumeQueue-MappedFileQueue-MappedFile

commitlog : MappedFileQueue : MappedFile = 1 : 1 : N

MappedFile

00000000000000000000等文件。

MappedFileQueue

MappedFile所在的文件夹,对 MappedFile 进行封装成文件队列,对上层提供可无限使用的文件容量。

Commitlog

针对MappedFileQueue的封装使用

Topic-queueid-consumequeue

Store : ConsumeQueue = ConcurrentHashMap<String/* topic /, ConcurrentHashMap<Integer/ queueId */, ConsumeQueue»

ConsumeQueue 存储在 MappedFile 的内容必须大小是 20B:commitlog offset(8byte),size(4byte),message tag hashcode(8byte)



blog comments powered by Disqus