18 April 2019

Rocketmq-broker

特点

broker挂了并不影响其他broker

namerserver挂了对broker的影响???

由于broker默认每30秒向所有的nameserver进行注册 如果nameserver挂了,那么broker会将连接移除 下次注册时会继续尝试连接该nameserver,但是会连接失败,所以,broker端会定时的输出连接失败的日志

功能

  • 【生产者】接受客户端发送到消息,并存储
  • 【消费者】接受客户端的消费请求,返回消息数据
  • topic配置管理
  • 一些其他的配置管理、查询:topic创建、删除、修改,broker配置查询、修改,消息查询,broker运行状态,producer、consumer连接,消费进度等

组件

类属性

BrokerController#BrokerConfig

broker的一些基础配置

BrokerController#messageStoreConfig

包含一些数据存储相关的配置

BrokerController#consumerOffsetManager

consumer消费进度管理者

BrokerController#messageStore

消息存储实现,参照DefaultMessageStore

BrokerController#topicConfigManager

topic配置管理

BrokerController#pullMessageProcessor

客户端拉取消息请求处理

BrokerController#remotingServer

远程服务

方法

BrokerController#registerProcessor()

注册一系列处理器来处理客户端请求,或响应客户端请求

SendMessageProcessor处理客户端发送的消息处理器

PullMessageProcessor处理客户端拉取消息处理器

QueryMessageProcessor根据消息的key或者消息id查询消息

ClientManageProcessor客户端连接管理(包括客户端心跳,客户端连接注销,根据consumergroup查询consumer id列表,客户端offset上报,客户端offset查询)

AdminBrokerProcessor broker管理类请求处理

broker-intialize

  • 加载topic配置(topicConfigManager)

  • 加载消费进度存储(consumerOffsetManager)

  • 加载订阅组配置管理(subscriptionGroupManager)

  • 初始化存储层(DefaultMessageStore)

    //用于broker层的消息落地存储,超级复杂
    this.messageStore =
        new DefaultMessageStore(this.messageStoreConfig, this.brokerStatsManager, this.messageArrivingListener,
            this.brokerConfig);
    
  • 初始化通讯层(remotingServer和fastRemotingServer)

  • 初始化线程池(sendMessageExecutor处理发送消息线程池,pullMessageExecutor处理拉取消息线程池,adminBrokerExecutor处理管理Broker线程池,clientManageExecutor处理管理Client线程池)

  • 将线程池(SendMessageProcessor,pullMessageProcessor,QueryMessageProcessor,ClientManageProcessor,EndTransactionProcessor,AdminBrokerProcessor)注册到netty消息处理器当中(remotingServer和fastRemotingServer对象中)

  • 启动相关执行任务(每天凌晨统计消息量的任务(brokerStats.record()),定时持久化消费进度(consumerOffsetManager.persist()))

  • 获取NameServer地址或者开启定时获取NameServer地址任务

  • 如果是slave:开启Slave定时从Master同步配置信息任务,如果是Master,开启增加统计日志任务;

broker-start

  • 启动刚刚初始化的各个管理器(消息存储启动(DefaultMessageStore.start());通信层启动(remotingServer.start(),fastRemotingServer.start());brokerOuterAPI.start();pullRequestHoldService.start();

  • 启动检测所有客户端连接(clientHousekeepingService.start());filterServerManager.start();brokerStatsManager.start();brokerFastFailure.start())

  • 启动时,强制注册:registerBrokerAll()

  • 开启定时把Broker注册到NameServer的任务

类作用

ReputMessageService:Broker在启动时,启动ReputMessageService,其每隔10ms为CommitLog里的消息生成位置信息PositionInfo,存入此消息对应的ConsumeQueue。一个ConsumeQueue对应着一个MessageQueue,Consume拉取消息时首先从ConsumeQueue里获取PositionInfo,对比tag是否匹配。匹配的话提取其在CommitLog的Offset,msg size,然后去CommitLog里查询消息。

Consumer的每次消息拉取会指定一个进度,这个进度就是指ConsumeQueue里的进度,也就是从哪个位置开始提取PositionInfo,同时Broker会定期的持久化每个ConsumeQueue的消费进度。当消费请求提交的进度已经达到ConsumeQueue的最大值,也就是没有新的消息生成时,Broker挂起请求。每隔5S查看CnsumeQueue的maxOffset是否超过请求提交的Offset,如果是,则执行消息拉取。当挂起的时间超过15S时,强制结束请求。每当ConsumeQueue有新的PositionInfo生成时,匹配tag,若符合,立即唤醒消息拉取请求。执行拉取流程。

ReputMessageService生成PositionInfo的同时,也会生成IndexInfo,也就是消息的索引信息,存放在Index目录下的IndexFile中。Producer在发送消息时,会为每条消息生成一个唯一的key,当然也可以自定义key。唯一的key会在发送结果中返回,用户可以使用这些key查询发送的消息。也就是说获取消息可以通过ConsumeQueue,也可以通过Index。



blog comments powered by Disqus