Rocketmq-概念
官网
http://rocketmq.apache.org/docs/quick-start/
广播模式和集群模式
- 一条消息被多个consumer消费,即使这些consumer属于同一个ConsumerGroup,消息也会被ConsumerGroup中的每个Consumer都消费一次,广播消费中ConsumerGroup概念可以认为在消息划分方面无意义。
- 一个ConsumerGroup中的Consumer实例平均分摊消费消息。例如某个Topic有9条消息,其中一个ConsumerGroup有3个实例(可能是3个进程,或者3台机器),那么每个实例只消费其中部分,消费完的消息不能被其他实例消费。
queueData
一个queuedata相当于描述了多个messagequeue,取决于queuedata属性readQueueNums,writeQueueNums
private String brokerName;
private int readQueueNums;
private int writeQueueNums;
private int perm;
private int topicSynFlag;
RouteInfoManager.registerBroker()#createAndUpdateQueueData
private void createAndUpdateQueueData(final String brokerName, final TopicConfig topicConfig) {
QueueData queueData = new QueueData();
queueData.setBrokerName(brokerName);
queueData.setWriteQueueNums(topicConfig.getWriteQueueNums());
queueData.setReadQueueNums(topicConfig.getReadQueueNums());
queueData.setPerm(topicConfig.getPerm());
queueData.setTopicSynFlag(topicConfig.getTopicSysFlag());
List<QueueData> queueDataList = this.topicQueueTable.get(topicConfig.getTopicName());
if (null == queueDataList) {
queueDataList = new LinkedList<QueueData>();
queueDataList.add(queueData);
this.topicQueueTable.put(topicConfig.getTopicName(), queueDataList);
log.info("new topic registered, {} {}", topicConfig.getTopicName(), queueData);
} else {
//省略
}
}
queuedata-messagequeue
public static Set<MessageQueue> topicRouteData2TopicSubscribeInfo
(final String topic, final TopicRouteData route)
{
Set<MessageQueue> mqList = new HashSet<MessageQueue>();
List<QueueData> qds = route.getQueueDatas();
for (QueueData qd : qds) {
if (PermName.isReadable(qd.getPerm())) {
for (int i = 0; i < qd.getReadQueueNums(); i++) {
MessageQueue mq = new MessageQueue(topic, qd.getBrokerName(), i);
mqList.add(mq);
}
}
}
return mqList;
}
topic-queuedata
topic-broker
topic通过queuedata的brokername属性关联到broker
messagequeue-broker
private String topic;
private String brokerName;
private int queueId;
队列message记录了brokername
queuedata-broker
queuedata记录了brokername
ConsumeQueue-MappedFileQueue-MappedFile
commitlog
: MappedFileQueue
: MappedFile
= 1 : 1 : N
MappedFile
00000000000000000000等文件。
MappedFileQueue
MappedFile所在的文件夹,对 MappedFile 进行封装成文件队列,对上层提供可无限使用的文件容量。
Commitlog
针对MappedFileQueue的封装使用
Topic-queueid-consumequeue
Store : ConsumeQueue = ConcurrentHashMap<String/* topic /, ConcurrentHashMap<Integer/ queueId */, ConsumeQueue»
ConsumeQueue
存储在 MappedFile
的内容必须大小是 20B:commitlog offset(8byte),size(4byte),message tag hashcode(8byte)