RocketMQ技术内幕:RocketMQ架构设计与实现原理(第2版)
上QQ阅读APP看书,第一时间看更新

2.2 NameServer启动流程

NameServer启动类是org.apache.rocketmq.namesrv.NamesrvStartup,下面我们从源码角度分析NameServer的启动流程,重点关注NameServer相关启动参数。

第一步:首先来解析配置文件,需要填充NamesrvConfig、NettyServerConfig属性值,如代码清单2-1所示。

代码清单2-1 NameServer加载配置文件

final NamesrvConfig namesrvConfig = new NamesrvConfig();
final NettyServerConfig nettyServerConfig = new NettyServerConfig();
nettyServerConfig.setListenPort(9876);
if (commandLine.hasOption('c')) {
    String file = commandLine.getOptionValue('c');
    if (file != null) {
        InputStream in = new BufferedInputStream(new FileInputStream(file));
        properties = new Properties();
        properties.load(in); MixAll.properties2Object(properties,
        namesrvConfig); MixAll.properties2Object(properties,
        nettyServerConfig);
        namesrvConfig.setConfigStorePath(file);
        System.out.printf("load config properties file OK, " + file + "%n");
        in.close();
    }
}
if (commandLine.hasOption('p'))
    { MixAll.printObjectProperties(null, namesrvConfig);
    MixAll.printObjectProperties(null, nettyServerConfig);
    System.exit(0);
}
MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine),
            namesrvConfig);

代码清单2-1中,先创建NamesrvConfig(NameServer业务参数)、NettyServerConfig(NameServer网络参数),然后在解析启动时把指定的配置文件或启动命令中的选项值填充到NamesrvConfig、NettyServerConfig对象中。参数来源有如下两种方式,NamesrvConfig属性如代码清单2-2所示,NettyServerConfig属性如代码清单2-3所示。

1)-c configFile通过-c命令指定配置文件的路径。

2)使用“--属性名 属性值”命令,例如 --listenPort 9876。

代码清单2-2 NamesrvConfig属性

private String rocketmqHome = System.getProperty(MixAll.ROCKETMQ_HOME_PROPERTY,
        System.getenv(MixAll.ROCKETMQ_HOME_ENV));
private String kvConfigPath = System.getProperty("user.home") + File.separator
        + "namesrv" + File.separator + "kvConfig.json";
private String configStorePath = System.getProperty("user.home") +
    File.separator + "namesrv" + File.separator +
            "namesrv.properties";
private String productEnvName = "center";
private boolean clusterTest = false;
private boolean orderMessageEnable = false;

1)rocketmqhome:RocketMQ主目录,通过-Drocketmq.home.dir=path或设置环境变量ROCKETMQ_HOME可以配置RocketMQ的主目录。

2)kvConfigPath:NameServer存储KV配置属性的持久化路径。

3)configStorePath:NameServer默认配置文件路径。NameServer启动时如果要通过配置文件配置NameServer启动属性,请使用-c选项。

4)orderMessageEnable:是否支持顺序消息,默认是不支持。

代码清单2-3 NettyServerConfig属性

private int listenPort = 8888; private
int serverWorkerThreads = 8;
private int serverCallbackExecutorThreads = 0;
private int serverSelectorThreads = 3;
private int serverOnewaySemaphoreValue = 256;
private int serverAsyncSemaphoreValue = 64; private
int serverChannelMaxIdleTimeSeconds = 120;
private int serverSocketSndBufSize = NettySystemConfig.socketSndbufSize;
private int serverSocketRcvBufSize = NettySystemConfig.socketRcvbufSize;
private boolean serverPooledByteBufAllocatorEnable = true;
private boolean useEpollNativeSelector = false;

1)listenPort:NameServer监听端口,该值默认会被初始化为9876。

2)serverWorkerThreads:Netty业务线程池线程个数。

3)serverCallbackExecutorThreads:Netty public任务线程池线程个数。Netty网络会根据业务类型创建不同的线程池,比如处理消息发送、消息消费、心跳检测等。如果该业务类型(RequestCode)未注册线程池,则由public线程池执行。

4)serverSelectorThreads:I/O线程池线程个数,主要是NameServer、Broker端解析请求、返回相应的线程个数。这类线程主要用于处理网络请求,先解析请求包,然后转发到各个业务线程池完成具体的业务操作,最后将结果返回给调用方。

5)serverOnewaySemaphoreValue:send oneway消息请求的并发度(Broker端参数)。

6)serverAsyncSemaphoreValue:异步消息发送的最大并发度(Broker端参数)。

7)serverChannelMaxIdleTimeSeconds:网络连接最大空闲时间,默认为120s。如果连接空闲时间超过该参数设置的值,连接将被关闭。

8)serverSocketSndBufSize:网络socket发送缓存区大小,默认为64KB。

9)serverSocketRcvBufSize:网络socket接收缓存区大小,默认为64KB。

10)serverPooledByteBufAllocatorEnable:ByteBuffer是否开启缓存,建议开启。

11)useEpollNativeSelector:是否启用Epoll I/O模型,Linux环境下建议开启。

小技巧

在启动NameServer时,可以先使用./mqnameserver -c configFile -p命令打印当前加载的配置属性。

第二步:根据启动属性创建NamesrvController实例并初始化,NameServerController实例为NameServer核心控制器,如代码清单2-4所示。

代码清单2-4 NamesrvController#Initialize代码片段

public boolean initialize()
    { this.kvConfigManager.load();
    this.remotingServer = new NettyRemotingServer(this.nettyServerConfig,
            this.brokerHousekeepingService);
    this.remotingExecutor =
        Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(),
        new ThreadFactoryImpl("RemotingExecutorThread_"));
    this.registerProcessor();
    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
            public void run()
                { NamesrvController.this.routeInfoManager.scanNotActiveBroker();
            }
        }, 5, 10, TimeUnit.SECONDS);

    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable()
            { public void run() {
                NamesrvController.this.kvConfigManager.printAllPeriodically();
        }
    }, 1, 10, TimeUnit.MINUTES);

    return true;
}

加载KV配置,先创建NettyServer网络处理对象,然后开启两个定时任务,在RocketMQ中此类定时任务统称为心跳检测。

1)定时任务1:NameServer每隔10s扫描一次Broker,移除处于未激活状态的Broker。

2)定时任务2:NameServer每隔10min打印一次KV配置。

第三步:注册JVM钩子函数并启动服务器,以便监听Broker、消息生产者的网络请求,如代码清单2-5所示。

代码清单2-5 注册JVM钩子函数

Runtime.getRuntime().addShutdownHook(new ShutdownHookThread(log, new
    Callable<Void>() {
        public Void call() throws Exception
            { controller.shutdown();
            return null;
        }
    }));
    controller.start();

这里主要是向读者展示一种常用的编程技巧,如果代码中使用了线程池,一种优雅停机的方式就是注册一个JVM钩子函数,在JVM进程关闭之前,先将线程池关闭,及时释放资源。