歡迎光臨
每天分享高質量文章

RocketMQ學習-NameServer-2

上篇文章主要梳理了NameServer的啟動器和配置資訊,並複習了JVM中的關閉鉤子這個知識點。這篇文章看下NameServer的其他模組。建議帶著如下三個問題閱讀:

  1. NameServer管理哪些資訊?如何管理的?

  2. NameServer中對Netty的使用案例?

  3. NameServer中對Java併發程式設計使用案例?

一、NamesrvController

  1. 作用:NameServer模組的控制器

  2. 主要屬性

  • namesrvConfig:name server的配置資訊

  • nettyServerConfig:name server中作為netty服務端的配置

  • scheduledExecutorService:排程執行緒池,用於:(1)週期性檢查broker資訊;(2)週期性列印路由資訊;這兩個檢查每隔5秒交替進行。

  • kvConfigManager:name server配置的操作介面

  • routeInfoManager:name server路由資訊的操作介面

  • remotingServer:netty伺服器

  • brokerHousekeepingService:監聽連線的broker的通道的關閉或異常事件,用於清理broker資訊;

  • remotingExecutor:服務端處理請求的執行緒池

  • 程式碼如下

  • public class NamesrvController {
        private static final Logger log = LoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);
    
        //name server的配置
        private final NamesrvConfig namesrvConfig;
    
        //netty server的配置定義
        private final NettyServerConfig nettyServerConfig;
    
        //建立一個具備排程功能的執行緒池,該執行緒池裡只有一個執行緒,用於:(1)週期性檢查broker資訊;(2)週期性列印路由資訊
        private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl(
            "NSScheduledThread"));
    
        //name server配置的操作介面
        private final KVConfigManager kvConfigManager;
    
        //name server路由資訊的操作介面
        private final RouteInfoManager routeInfoManager;
    
        //伺服器
        private RemotingServer remotingServer;
    
        //broker資訊清理器,監聽通道事件
        private BrokerHousekeepingService brokerHousekeepingService;
    
        //服務端處理請求的執行緒池
        private ExecutorService remotingExecutor;
    
        private Configuration configuration;
       
        //other code....
    }
    1. 主要方法

    • initialize:初始化

      public boolean initialize() {
      
           this.kvConfigManager.load();
      
           this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);
      
           this.remotingExecutor =
               Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_"));
      
           this.registerProcessor();
      
           //伺服器啟動後5秒,開始每隔10秒檢查broker的執行狀態
           this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
      
               @Override
               public void run() {
                   NamesrvController.this.routeInfoManager.scanNotActiveBroker();
               }
           }, 5, 10, TimeUnit.SECONDS);
      
           //伺服器啟動後1秒,開始每隔10秒檢查
           this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
      
               @Override
               public void run() {
                   NamesrvController.this.kvConfigManager.printAllPeriodically();
               }
           }, 1, 10, TimeUnit.MINUTES);
      
           return true;
       }
    • registerProcessor:註冊處理器

       //在name server伺服器上註冊請求處理器,預設是DefaultRequestProcessor
       private void registerProcessor() {
           if (namesrvConfig.isClusterTest()) {
      
               this.remotingServer.registerDefaultProcessor(new ClusterTestRequestProcessor(this, namesrvConfig.getProductEnvName()),
                   this.remotingExecutor);
           } else {
      
               this.remotingServer.registerDefaultProcessor(new DefaultRequestProcessor(this), this.remotingExecutor);
           }
       }

    • 其他還有:構造方法、start方法、shutdown方法

  • Java併發

    • Executors.newFixedThreadPool(),用於建立固定數量的執行緒池,根據執行緒池的執行原理:執行緒池啟動時候沒有執行緒,當新任務到來時就建立執行緒處理;由於coreSize和maxSize設定為相同大小,如果任務來的時候執行緒已經達到coreSize,就直接放入等待佇列;keepAlive設定為0,目的是讓執行緒數不會超過coreSize;blockqueue設定為LinkedBlockingQueue,表示是無界佇列,最多可以放Integer.MAX_VALUE個任務。

      public static ExecutorService newFixedThreadPool(int nThreads,ThreadFactory threadFactory) {
       return new ThreadPoolExecutor(nThreads, nThreads,
                                       0L, TimeUnit.MILLISECONDS,
                                       new LinkedBlockingQueue(),
                                       threadFactory);
      }
    • 週期執行緒池

      NameServerController中使用了排程執行緒池,我們看下建立一個排程執行緒池的方法,即Executors.newSingleThreadScheduledExecutor(),該方法的定義如下所示:

       public static ScheduledExecutorService newSingleThreadScheduledExecutor(ThreadFactory threadFactory) {
           return new DelegatedScheduledExecutorService
               (new ScheduledThreadPoolExecutor(1, threadFactory));
       }

      這種執行緒池的建立又委託給了DelegatedScheduledExecutorService類,這裡為什麼這麼設計,不是太理解。不過可以看下真正建立排程執行緒池的程式碼:

       public ScheduledThreadPoolExecutor(int corePoolSize,
                                          ThreadFactory threadFactory) {
           super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
                 new DelayedWorkQueue(), threadFactory);
       }

      上面這個方法,關鍵在於兩點:(1)maxSize選了Integer.MAX_VALUE;(2)任務佇列使用了延遲佇列;再回頭去看那個委託類的程式碼,就可以明白,委託類包裝了ScheduledExecutorService執行器,提供了延遲或週期執行的介面。

       /**
        * A wrapper class that exposes only the ScheduledExecutorService
        * methods of a ScheduledExecutorService implementation.
        */
       static class DelegatedScheduledExecutorService
               extends DelegatedExecutorService
               implements ScheduledExecutorService {
           private final ScheduledExecutorService e;
           DelegatedScheduledExecutorService(ScheduledExecutorService executor) {
               super(executor);
               e = executor;
           }
           public ScheduledFuture> schedule(Runnable command, long delay, TimeUnit unit) {
               return e.schedule(command, delay, unit);
           }
           public  ScheduledFuture schedule(Callable callable, long delay, TimeUnit unit) {
               return e.schedule(callable, delay, unit);
           }
           public ScheduledFuture> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) {
               return e.scheduleAtFixedRate(command, initialDelay, period, unit);
           }
           public ScheduledFuture> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) {
               return e.scheduleWithFixedDelay(command, initialDelay, delay, unit);
           }
       }

      找到上面幾個主要類和介面的類圖,再綜合上面的程式碼,可以這麼理解:Executors是一個工具類,提供了生成不同的執行緒池的工廠方法,其中包括newSingleThreadScheduledExecutor方法,由於ScheduledExecutorService擴充套件了ExecutorService介面,同時又想重用AbstractExecutorService中的一些方法,因此需要一個委託類,將ExecutorService和ScheduledExecutorService的功能整合在一個類中。

  • Netty

    RemotingServer是name server中的通訊服務端,在name controller初始化name server模組的時候,會將name server的請求處理器註冊到netty伺服器上。

  • 二、DefaultRequestProcessor

    在NameServerController中會註冊請求處理器,那麼name server的請求處理器實現了哪些介面呢,請看程式碼:

    public class DefaultRequestProcessor implements NettyRequestProcessor {
        private static final Logger log = LoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);
    
        protected final NamesrvController namesrvController;
    
        public DefaultRequestProcessor(NamesrvController namesrvController) {
            this.namesrvController = namesrvController;
        }
    
        @Override
        public RemotingCommand processRequest(ChannelHandlerContext ctx,
            RemotingCommand request) throws RemotingCommandException {
            if (log.isDebugEnabled()) {
                log.debug("receive request, {} {} {}",
                    request.getCode(),
                    RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
                    request);
            }
    
            switch (request.getCode()) {
                case RequestCode.PUT_KV_CONFIG:
                    return this.putKVConfig(ctx, request);
                case RequestCode.GET_KV_CONFIG:
                    return this.getKVConfig(ctx, request);
                case RequestCode.DELETE_KV_CONFIG:
                    return this.deleteKVConfig(ctx, request);
                case RequestCode.REGISTER_BROKER:
                    Version brokerVersion = MQVersion.value2Version(request.getVersion());
                    if (brokerVersion.ordinal() >= MQVersion.Version.V3_0_11.ordinal()) {
                        return this.registerBrokerWithFilterServer(ctx, request);
                    } else {
                        return this.registerBroker(ctx, request);
                    }
                case RequestCode.UNREGISTER_BROKER:
                    return this.unregisterBroker(ctx, request);
                case RequestCode.GET_ROUTEINTO_BY_TOPIC:
                    return this.getRouteInfoByTopic(ctx, request);
                case RequestCode.GET_BROKER_CLUSTER_INFO:
                    return this.getBrokerClusterInfo(ctx, request);
                case RequestCode.WIPE_WRITE_PERM_OF_BROKER:
                    return this.wipeWritePermOfBroker(ctx, request);
                case RequestCode.GET_ALL_TOPIC_LIST_FROM_NAMESERVER:
                    return getAllTopicListFromNameserver(ctx, request);
                case RequestCode.DELETE_TOPIC_IN_NAMESRV:
                    return deleteTopicInNamesrv(ctx, request);
                case RequestCode.GET_KVLIST_BY_NAMESPACE:
                    return this.getKVListByNamespace(ctx, request);
                case RequestCode.GET_TOPICS_BY_CLUSTER:
                    return this.getTopicsByCluster(ctx, request);
                case RequestCode.GET_SYSTEM_TOPIC_LIST_FROM_NS:
                    return this.getSystemTopicListFromNs(ctx, request);
                case RequestCode.GET_UNIT_TOPIC_LIST:
                    return this.getUnitTopicList(ctx, request);
                case RequestCode.GET_HAS_UNIT_SUB_TOPIC_LIST:
                    return this.getHasUnitSubTopicList(ctx, request);
                case RequestCode.GET_HAS_UNIT_SUB_UNUNIT_TOPIC_LIST:
                    return this.getHasUnitSubUnUnitTopicList(ctx, request);
                case RequestCode.UPDATE_NAMESRV_CONFIG:
                    return this.updateConfig(ctx, request);
                case RequestCode.GET_NAMESRV_CONFIG:
                    return this.getConfig(ctx, request);
                default:
                    break;
            }
            return null;
        }
        //其他具體的實現方法
    }

    從這個程式碼中可以看出兩個方面的內容:

    1. 如何使用Netty處理網路請求。關鍵資料結構:(1)RemotingCommand:自定義的協議,攜帶請求引數和響應(2)ChannelHandlerContext:netty的資料結構,攜帶channel相關的資訊。設計模型:processRequest:透過請求碼進行請求轉發;

    2. 請求處理方法(跟協議相關,具體參見remote模組)(1)processRequest:請求分發;(2)putKVConfig:將配置資訊放在記憶體中;(3)getKVConfig:傳回配置資訊(4)deleteKVConfig:刪除配置資訊;(5)註冊broker,支援兩個註冊方式:帶過濾服務的(MQ版本在V3011之後的)、不帶過濾服務的,等其他處理方法。

    三、BrokerHousekeepingService

    該模組實現了ChannelEventListener介面,每個broker都會跟name server建立一個連線通道,當這個通道發生異常事件時,需要及時在name server這邊清理掉對應的broker資訊。異常事件的型別有:(1)通道關閉時;(2)通道丟擲異常時;(3)通道空閑時。

    public class BrokerHousekeepingService implements ChannelEventListener {
        private static final Logger log = LoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);
        private final NamesrvController namesrvController;
    
        public BrokerHousekeepingService(NamesrvController namesrvController) {
            this.namesrvController = namesrvController;
        }
    
        @Override
        public void onChannelConnect(String remoteAddr, Channel channel) {
        }
    
        @Override
        public void onChannelClose(String remoteAddr, Channel channel) {
            this.namesrvController.getRouteInfoManager().onChannelDestroy(remoteAddr, channel);
        }
    
        @Override
        public void onChannelException(String remoteAddr, Channel channel) {
            this.namesrvController.getRouteInfoManager().onChannelDestroy(remoteAddr, channel);
        }
    
        @Override
        public void onChannelIdle(String remoteAddr, Channel channel) {
            this.namesrvController.getRouteInfoManager().onChannelDestroy(remoteAddr, channel);
        }
    }

    四、RouteInfoManager

    這個模組是name server的核心模組,真正管理broker、訊息佇列等相關資訊的地方。程式碼如下:

    public class RouteInfoManager {
        private static final Logger log = LoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);
        private final static long BROKER_CHANNEL_EXPIRED_TIME = 1000 * 60 * 2;
        private final ReadWriteLock lock = new ReentrantReadWriteLock();
        private final HashMap> topicQueueTable;
        private final HashMap brokerAddrTable;
        private final HashMap> clusterAddrTable;
        private final HashMap brokerLiveTable;
        private final HashMap/* Filter Server */> filterServerTable;
    
        public RouteInfoManager() {
            this.topicQueueTable = new HashMap>(1024);
            this.brokerAddrTable = new HashMap(128);
            this.clusterAddrTable = new HashMap>(32);
            this.brokerLiveTable = new HashMap(256);
            this.filterServerTable = new HashMap>(256);
        }
        //對外暴露的方法   
    }

    主要屬性的含義如下:

    1. BROKERCHANNELEXPIRED_TIME,表示一個broker距離上次發心跳包的最長時間,即120秒;

    2. 使用可重入讀寫鎖實現併發安全、使用輕量級的非執行緒安全容器實現高效併發;【這點非常重要】

    3. topicQueueTable:用於管理topic和屬於這個topic的佇列的對映關係;

    4. brokerAddrTable:用於管理某個broker和它對應的資訊

    5. clusterAddrTable:用於管理broker叢集和叢集中對應的broker的對映關係

    6. brokerLiveTable:用於管理broker的存活資訊

    7. filterServerTable:用於管理broker和過濾服務串列【暫不理解】

    關於ReentrantReadWriteLock:

    1. 這裡使用的鎖是非公平鎖

    2. ReentrantReadWriteLock基於Sync、ReadLock、WriteLock三個模組實現,Sync負責處理公平與否的問題。ReadLock和WriteLock透過鎖外部物件ReentrantReadWriteLock來處理併發。在RoutInfoManager中的使用案例如下:

         public void deleteTopic(final String topic) {
             try {
                 try {
                     this.lock.writeLock().lockInterruptibly();
                     this.topicQueueTable.remove(topic);
                 } finally {
                     this.lock.writeLock().unlock();
                 }
             } catch (Exception e) {
                 log.error("deleteTopic Exception", e);
             }
         }

    五、KVConfigManager

    這個模組用於管理name server自己的配置資訊,配置資訊以json資訊存放在檔案中,以二維陣列形式存在於記憶體中,請看程式碼:

    /**
     * 管理NameServer的配置屬性
     */
    public class KVConfigManager {
        private static final Logger log = LoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);
    
        private final NamesrvController namesrvController;
    
        //可重入讀寫鎖
        private final ReadWriteLock lock = new ReentrantReadWriteLock();
    
        //配置表
        private final HashMap> configTable =
            new HashMap>();
    
        public KVConfigManager(NamesrvController namesrvController) {
            this.namesrvController = namesrvController;
        }
        //這個類對外暴露的方法,省了
    }

    這個類對外暴露的方法有:

    1. load方法:將配置資訊載入到記憶體中

    2. putKVConfig方法:將配置資訊持久化

    3. deleteKVConfig方法:刪除指定的配置項

    4. getKVListByNamespace和getKVConfig用於查詢配置資訊

    參考資料

    1. 訊息佇列技術點梳理

    2. netty的執行緒模型

    3. 《Java併發程式設計的藝術》

    贊(0)

    分享創造快樂