17 April 2019

Rocketmq-namserv

疑问

1.路由中心存储的是什么数据?

2.如何避免nameserver的单点故障,提高可用性?

3.消息生产者如何知道消息要发往哪台消息服务器?(nameserver应运而生)

4.如果某一个消息服务器宕机了,那么生产者如何在不重启服务的情况下感知?(nameserver应运而生)

5.nameserver和broker之间如何保持长连接?(代码层级的实现)

6.为什么broker宕机,路由注册表将其移除,但是不会马上通知生产者,为什么这样设计?

解疑

1.为消息生产者和消费者提供关于主题topic的路由元信息,那么nameserver能够存储路由的元信息,所以还需要管理broker节点[包括路由注册,路由删除等功能]

2.通过部署多台nameserver服务器来实现,但是彼此之间互不通信,也就是nameserver在某一时刻的数据并不完全相同,但这对消息发送不会造成任何影响

3

4.

5.

6.为了降低nameserver实现的复杂性,在消息发送端提供容错机制来保证消息发送的高可用性,3.4节再补充

特点

高可用,由于各个nameserver之间并无通信,故一个namerserver挂了并不会影响其他namerserver

任何producer,consumer,broker与所有nameserver通信,都是单向的,这种机制保证rocketmq水平扩容变的很容易

nameserv只存储broker的信息,剩下的信息全部存储在broker上面

nameserver启动流程

1.业务参数nameServerConfig

2.网络参数NettyServerConfig

3.启动时 [javaD] [System.getProperty获取]

  • 配置文件 -c configFile
  • 启动命令 –属性名 属性值,例如–listenPort 9876

4.根据启动属性创建namesrvController实例

5.初始化namesrvController实例

  • 加载配置文件
  • 创建nettyRemotingServer网络处理对象
  • 设置处理类 [DefaultRequestProcessor]
  • 开启两个定时任务
  • 每10s扫描一次broker,移除处于不激活状态的broker
  • 每10分钟打印一次kv配置

6.注册JVM钩子,并启动NettyRemotingServer服务器,以便监听broker,消息生产者的网络请求

实现

  • 初始化namservController,包含namesrvConfig(namesrv相关配置),nettyServerConfig(netty的相关配置),KVConfigManager(KV配置管理),RouteInfoManager(路由信息、topic信息管理),BrokerHousekeepingService(broker管理服务)
  • NamesrvController.initialize():加载KV配置,初始化通讯层(Netty的初始化:remotingServer对象),初始化线程池remotingExecutor,向remotingServer对象中注册DefaultRequestProcessor对象
  • 启动定时扫描notActive的broker任务;

类属性

NamesrvConfig
//rocketmq主目录,System.getProperty第二个值是兜底值
private String rocketmqHome = System.getProperty(MixAll.ROCKETMQ_HOME_PROPERTY, System.getenv(MixAll.ROCKETMQ_HOME_ENV));
//存储kv配置属性的持久化路径
private String kvConfigPath = System.getProperty("user.home") + File.separator + "namesrv" + File.separator + "kvConfig.json";
//默认配置文件路径,不生效,需要-c命令指定才生效
private String configStorePath = System.getProperty("user.home") + File.separator + "namesrv" + File.separator + "namesrv.properties";
private String productEnvName = "center";
private boolean clusterTest = false;
private boolean orderMessageEnable = false;//是否支持顺序消息,默认不支持
NettyServerConfig
//nameserver的监听端口,该值默认会被初始化成9876
private int listenPort = 8888;
private int serverWorkerThreads = 8;//Netty业务线程池线程个数
//Netty public任务线程池线程个数,Netty网络设计,根据业务类型会创建不同的线程池,比如处理消息发送、消息消费、心跳检测等。
// 如果该业务类型(RequestCode)未注册线程池,则由public线程池执行。
private int serverCallbackExecutorThreads = 0;
//IO线程池线程个数,主要是NameServer、Broker端解析请求、返回相应的线程个数,这类线程主要是处理网络请求的,解析请求包,
// 然后转发到各个业务线程池完成具体的业务操作,然后将结果再返回调用方
private int serverSelectorThreads = 3;
private int serverOnewaySemaphoreValue = 256;
private int serverAsyncSemaphoreValue = 64;
//网络连接最大空闲时间,默认120s。如果连接空闲时间超过该参数设置的值,连接将被关闭。
private int serverChannelMaxIdleTimeSeconds = 120;

//网络socket发送缓存区大小,默认64k。
private int serverSocketSndBufSize = NettySystemConfig.socketSndbufSize;
//网络socket接收缓存区大小,默认64k。
private int serverSocketRcvBufSize = NettySystemConfig.socketRcvbufSize;
//ByteBuffer是否开启缓存,建议开启。
private boolean serverPooledByteBufAllocatorEnable = true;
//是否启用epoll io模型,linux环境建议开启
private boolean useEpollNativeSelector = false;
KVConfigManager#configTable
HashMap<String/* Namespace */, HashMap<String/* Key */, String/* Value */>> 
RouteInfoManager#topicQueueTable [消息队列路由信息]
HashMap<String/* topic */, List<QueueData>>
public class QueueData implements Comparable<QueueData> {
    /** Broker名*/
    private String brokerName;
    /**读队列长度,默认4个*/
    private int readQueueNums;
    /**写队列长度,默认4个*/
    private int writeQueueNums;
    /** 读写权限*/
    private int perm;
    private int topicSynFlag;
}
RouteInfoManager#brokerAddrTable [broker基础信息]
HashMap<String/* brokerName */, BrokerData>
public class BrokerData implements Comparable<BrokerData> {
    /**集群名*/
    private String cluster;
    /**Broker名*/
    private String brokerName;
    /**broker角色编号 和 broker地址 Map*/
    private HashMap<Long/* brokerId */, String/* broker address */> brokerAddrs;
}
RouteInfoManager#clusterAddrTable [broker集群信息]

brokerName由相同的多台broker组成master-slave架构

HashMap<String/* clusterName */, Set<String/* brokerName */>>
RouteInfoManager#brokerLiveTable [broker状态信息,nameserver每次收到心跳包都会替换该信息]
HashMap<String/* brokerAddr */, BrokerLiveInfo>
class BrokerLiveInfo {
    /**上次收到broker心跳包的时间*/
    private long lastUpdateTimestamp;
    /**数据版本号*/
    private DataVersion dataVersion;
    /**连接信息*/
    private Channel channel;
    /**ha服务器地址*/
    private String haServerAddr;
}
RouteInfoManager#filterServerTable [broker地址 与 filtersrv数组 Map(用于类模式消息过滤)]
HashMap<String/* brokerAddr */, List<String>/* Filter Server */>

nameserver路由注册,剔除,发现

1570713302215.jpg

注册

rocketmq路由注册是通过broker与nameserver的心跳功能实现的,broker启动时象集群中所有的nameserver发送心跳语句,之后每隔30s向集群中所有nameserver发送心跳包,nameserver收到broker心跳包时会更新brokerLiveTable缓存中的lastUpdateTimestamp,然后nameserver每10s扫描brokerLiveTable,如果连续120s没有收到心跳包,nameserver将移除该broker的路由信息同时关闭socket连接

1.broker发送心跳包 [BrokerController#start]

2.nameserver处理心跳包 [DefaultRequestProcessor]网络处理器解析请求类型

剔除

1.brokerLiveTable中的lastUpdateTimesstamp时间戳距离当前时间超过120s,认为broker失效,移除该broker,关闭与broker连接

2.broker在正常被关闭的情况也会执行unregisterBroker指令

不管哪种方式剔除都会删除与该broker相关的信息topicQueueTable,brokerAddrTable,brokerLiveTable,filterServerTable

发现

非实时,当topic路由出现变化后,nameserver不主动推送给客户端,而是由客户端定时拉取主题最新的路由 [get_routeinfo_by_topic]

namserv接收到broker的注册请求

org.apache.rocketmq.namesrv.processor.DefaultRequestProcessor#processRequest

如果topic配置信息发生变更或者是第一次为该broker注册,根据brokername及topicconfig(read、write queue数量等)新增或者更新到topicQueueTable中

private final HashMap<String/* topic */, List<QueueData>> topicQueueTable;
  • 将当前请求注册的broker信息保存或者更新到clusterAddrTable,brokerAddrTable中
  • 将当前请求注册的broker的topic信息保存或者更新到topicQueueTable中

broker定时上报,namserv定时更新

nameserv和broker间的通讯

  • broker在启动时,会加载当前broker上所有的topic信息,registerBroker

  • nameserv与broker间维持着一个SocketChannel,长连接,broker第一次注册之后的10s后,每隔30s向其配置的所有namserv执行registerBroker,这就是broker和nameserv的心跳

this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

            @Override
            public void run() {
                try {
                    BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister());
                } catch (Throwable e) {
                    log.error("registerBrokerAll Exception", e);
                }
            }
        }, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS);
  • nameserv接受到broker传递的心跳信息时,如果是第一次心跳,创建brokerData,brokerLiveTable,保存其对象中的BrokerLiveInfo属性的dataVersion和lastUpdateTimestamp;如果不是第一次,那么更新其lastUpdateTimestamp和dataVersion。
class BrokerData {
	private String cluster;	
	private String brokerName;
	private HashMap<Long/* brokerId */, String/* broker address */> brokerAddrs; 
}
private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;
class BrokerLiveInfo {
	private long lastUpdateTimestamp;
	private DataVersion dataVersion;
	private Channel channel;
	private String haServerAddr;
}  
  • 如果第一次心跳,而且broker是master,创建broker的queuedata,如果不是第一次心跳,dataVersion与nameserv上保持的不一致[RouteInfoManager#isBrokerTopicConfigChanged()],更新。
  • 如果当前broker是slave,那么将master的brokerAddr放入心跳注册结果中,返给slave,这样slave和master间就能进行数据传输。
if (MixAll.MASTER_ID != brokerId) {
     String masterAddr = brokerData.getBrokerAddrs().get(MixAll.MASTER_ID);
     if (masterAddr != null) {
         BrokerLiveInfo brokerLiveInfo =this.brokerLiveTable.get(masterAddr);
         if (brokerLiveInfo != null) {
              result.setHaServerAddr(brokerLiveInfo.getHaServerAddr());
              result.setMasterAddr(masterAddr);
         }
      }
}
  • nameserv维护着其他组件的SocketChannel对象,针对所有组件(broker,client)的长连接注册了ChannelEventListener,监听此SocketChannel的连接事件,当某个SocketChannel出现异常或者断开时(长连接断开不是心跳停止),会循环遍历所有broker的长连接,如果发现断开长连接属于某个broker,清除此broker的brokerdata和queuedata,不属于broker,则什么都不做。这样当broker变化不会通知client,这样client(producer,consumer)最晚需要30s才(下次请求)指定topic的TopicRouteData时,就不会包含此broker的数据了,也就是messagequeue上没有此broker的queue了。
NettyRemotingClient NettyRemotingServer.start()注册client的长连接和nameserv
this.nettyEventExecutor.start();
  • nameserv每10s对所有broker的长连接进行扫描,发现lastUpdateTimestamp距离当前时间超过2分钟,断开长连接,清空数据,NameServContoller.scanNotActiveBroker定时任务

nameserv和client的通信

  • producer在发送消息的时候,根据消息topic查自身是否含有此topic相应的mesagequeue,没有就从nameserv处请求指定topic的TopicRouteData,也就是brokerData和queueData,然后构造messagequeue,之后启动定时任务定时更新

  • consumer在启动之前就需要订阅topic,在启动时就会向namserv请求相应topic的TopicRouteData,同样的形式生成messagequeue,但和producer不同的是,consumer可以从slave拉取消息,不会过来master宕机的broker,consumer端也有定时任务

    MQClientInstance.startScheduledTask()
    


blog comments powered by Disqus