2.2 NameServer启动流程
NameServer启动类是org.apache.rocketmq.namesrv.NamesrvStartup,下面我们从源码角度分析NameServer的启动流程,重点关注NameServer相关启动参数。
第一步:首先来解析配置文件,需要填充NamesrvConfig、NettyServerConfig属性值,如代码清单2-1所示。
代码清单2-1 NameServer加载配置文件
final NamesrvConfig namesrvConfig = new NamesrvConfig(); final NettyServerConfig nettyServerConfig = new NettyServerConfig(); nettyServerConfig.setListenPort(9876); if (commandLine.hasOption('c')) { String file = commandLine.getOptionValue('c'); if (file != null) { InputStream in = new BufferedInputStream(new FileInputStream(file)); properties = new Properties(); properties.load(in); MixAll.properties2Object(properties, namesrvConfig); MixAll.properties2Object(properties, nettyServerConfig); namesrvConfig.setConfigStorePath(file); System.out.printf("load config properties file OK, " + file + "%n"); in.close(); } } if (commandLine.hasOption('p')) { MixAll.printObjectProperties(null, namesrvConfig); MixAll.printObjectProperties(null, nettyServerConfig); System.exit(0); } MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), namesrvConfig);
代码清单2-1中,先创建NamesrvConfig(NameServer业务参数)、NettyServerConfig(NameServer网络参数),然后在解析启动时把指定的配置文件或启动命令中的选项值填充到NamesrvConfig、NettyServerConfig对象中。参数来源有如下两种方式,NamesrvConfig属性如代码清单2-2所示,NettyServerConfig属性如代码清单2-3所示。
1)-c configFile通过-c命令指定配置文件的路径。
2)使用“--属性名 属性值”命令,例如 --listenPort 9876。
代码清单2-2 NamesrvConfig属性
private String rocketmqHome = System.getProperty(MixAll.ROCKETMQ_HOME_PROPERTY, System.getenv(MixAll.ROCKETMQ_HOME_ENV)); private String kvConfigPath = System.getProperty("user.home") + File.separator + "namesrv" + File.separator + "kvConfig.json"; private String configStorePath = System.getProperty("user.home") + File.separator + "namesrv" + File.separator + "namesrv.properties"; private String productEnvName = "center"; private boolean clusterTest = false; private boolean orderMessageEnable = false;
1)rocketmqhome:RocketMQ主目录,通过-Drocketmq.home.dir=path或设置环境变量ROCKETMQ_HOME可以配置RocketMQ的主目录。
2)kvConfigPath:NameServer存储KV配置属性的持久化路径。
3)configStorePath:NameServer默认配置文件路径。NameServer启动时如果要通过配置文件配置NameServer启动属性,请使用-c选项。
4)orderMessageEnable:是否支持顺序消息,默认是不支持。
代码清单2-3 NettyServerConfig属性
private int listenPort = 8888; private int serverWorkerThreads = 8; private int serverCallbackExecutorThreads = 0; private int serverSelectorThreads = 3; private int serverOnewaySemaphoreValue = 256; private int serverAsyncSemaphoreValue = 64; private int serverChannelMaxIdleTimeSeconds = 120; private int serverSocketSndBufSize = NettySystemConfig.socketSndbufSize; private int serverSocketRcvBufSize = NettySystemConfig.socketRcvbufSize; private boolean serverPooledByteBufAllocatorEnable = true; private boolean useEpollNativeSelector = false;
1)listenPort:NameServer监听端口,该值默认会被初始化为9876。
2)serverWorkerThreads:Netty业务线程池线程个数。
3)serverCallbackExecutorThreads:Netty public任务线程池线程个数。Netty网络会根据业务类型创建不同的线程池,比如处理消息发送、消息消费、心跳检测等。如果该业务类型(RequestCode)未注册线程池,则由public线程池执行。
4)serverSelectorThreads:I/O线程池线程个数,主要是NameServer、Broker端解析请求、返回相应的线程个数。这类线程主要用于处理网络请求,先解析请求包,然后转发到各个业务线程池完成具体的业务操作,最后将结果返回给调用方。
5)serverOnewaySemaphoreValue:send oneway消息请求的并发度(Broker端参数)。
6)serverAsyncSemaphoreValue:异步消息发送的最大并发度(Broker端参数)。
7)serverChannelMaxIdleTimeSeconds:网络连接最大空闲时间,默认为120s。如果连接空闲时间超过该参数设置的值,连接将被关闭。
8)serverSocketSndBufSize:网络socket发送缓存区大小,默认为64KB。
9)serverSocketRcvBufSize:网络socket接收缓存区大小,默认为64KB。
10)serverPooledByteBufAllocatorEnable:ByteBuffer是否开启缓存,建议开启。
11)useEpollNativeSelector:是否启用Epoll I/O模型,Linux环境下建议开启。
小技巧
在启动NameServer时,可以先使用./mqnameserver -c configFile -p命令打印当前加载的配置属性。
第二步:根据启动属性创建NamesrvController实例并初始化,NameServerController实例为NameServer核心控制器,如代码清单2-4所示。
代码清单2-4 NamesrvController#Initialize代码片段
public boolean initialize() { this.kvConfigManager.load(); this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService); this.remotingExecutor = Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_")); this.registerProcessor(); this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { public void run() { NamesrvController.this.routeInfoManager.scanNotActiveBroker(); } }, 5, 10, TimeUnit.SECONDS); this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { public void run() { NamesrvController.this.kvConfigManager.printAllPeriodically(); } }, 1, 10, TimeUnit.MINUTES); return true; }
加载KV配置,先创建NettyServer网络处理对象,然后开启两个定时任务,在RocketMQ中此类定时任务统称为心跳检测。
1)定时任务1:NameServer每隔10s扫描一次Broker,移除处于未激活状态的Broker。
2)定时任务2:NameServer每隔10min打印一次KV配置。
第三步:注册JVM钩子函数并启动服务器,以便监听Broker、消息生产者的网络请求,如代码清单2-5所示。
代码清单2-5 注册JVM钩子函数
Runtime.getRuntime().addShutdownHook(new ShutdownHookThread(log, new Callable<Void>() { public Void call() throws Exception { controller.shutdown(); return null; } })); controller.start();
这里主要是向读者展示一种常用的编程技巧,如果代码中使用了线程池,一种优雅停机的方式就是注册一个JVM钩子函数,在JVM进程关闭之前,先将线程池关闭,及时释放资源。