歡迎光臨
每天分享高質量文章

分佈式訊息佇列 RocketMQ原始碼解析:Filtersrv

摘要: 原創出處 http://www.iocoder.cn/RocketMQ/filtersrv/ 「芋道原始碼」歡迎轉載,保留摘要,謝謝!

本文主要基於 RocketMQ 4.0.x 正式版

  • 1. 概述

  • 2. Filtersrv 註冊到 Broker

  • 3. 過濾類

    • 3.1 Consumer 訂閱時設置 過濾類代碼

    • 3.2 Consumer 上傳 過濾類代碼

    • 3.3 Filter 編譯 過濾類代碼

  • 4. 過濾訊息

    • 4.1 Consumer 從 Filtersrv 拉取訊息

    • 4.2 Filtersrv 從 Broker 拉取訊息

  • 5. Filtersrv 高可用


  • 友情提示:歡迎關註公眾號【芋道原始碼】。?關註後,拉你進【原始碼圈】微信群和【芋艿】搞基嗨皮。

  • 友情提示:歡迎關註公眾號【芋道原始碼】。?關註後,拉你進【原始碼圈】微信群和【芋艿】搞基嗨皮。

  • 友情提示:歡迎關註公眾號【芋道原始碼】。?關註後,拉你進【原始碼圈】微信群和【芋艿】搞基嗨皮。


1. 概述

Filtersrv ,負責自定義規則過濾 Consumer 從 Broker 拉取的訊息。

為什麼 Broker 不提供過濾訊息的功能呢?我們來看看官方的說法:

  • Broker 端訊息過濾 
    在 Broker 中,按照 Consumer 的要求做過濾,優點是減少了對於 Consumer 無用訊息的網絡傳輸。 缺點是增加了 Broker 的負擔,實現相對複雜。 
    (1). 淘寶 Notify 支持多種過濾方式,包含直接按照訊息型別過濾,靈活的語法運算式過濾,幾乎可以滿足最苛刻的過濾需求。 
    (2). 淘寶 RocketMQ 支持按照簡單的 Message Tag 過濾,也支持按照 Message Header、body 進行過濾。 
    (3). CORBA Notification 規範中也支持靈活的語法運算式過濾。

  • Consumer 端訊息過濾 
    這種過濾方式可由應用完全自定義實現,但是缺點是很多無用的訊息要傳輸到 Consumer 端。

就是在這種考慮下,Filtersrv 出現了。減少了 Broker 的負擔,又減少了 Consumer 接收無用的訊息。當然缺點也是有的,多了一層 Filtersrv 網絡開銷。

2. Filtersrv 註冊到 Broker

  • ? 一個 Filtersrv 對應一個 Broker

  • ? 一個 Broker 可以對應多個 FiltersrvFiltersrv 的高可用通過啟動多個 Filtersrv 實現

  • Filtersrv 註冊失敗時,主動退出關閉

核心代碼如下:

  1: // ⬇️⬇️⬇️【FiltersrvController.java】
 2: public boolean initialize() {
 3:     // ....(省略代碼)
 4:
 5:     // 固定間隔註冊到Broker
 6:     this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
 7:
 8:         @Override
 9:         public void run() {
10:             FiltersrvController.this.registerFilterServerToBroker();
11:         }
12:     }, 15, 10, TimeUnit.SECONDS); // TODO edit by 芋艿:initialDelay時間太短,可能導致初始化失敗。從3=》15
13:
14:     // ....(省略代碼)
15: }
16:
17: /**
18:  * 註冊Filtersrv 到 Broker
19:  * !!!如果註冊失敗,關閉Filtersrv
20:  */

21: public void registerFilterServerToBroker() {
22:     try {
23:         RegisterFilterServerResponseHeader responseHeader =
24:             this.filterServerOuterAPI.registerFilterServerToBroker(
25:                 this.filtersrvConfig.getConnectWhichBroker(), this.localAddr());
26:         this.defaultMQPullConsumer.getDefaultMQPullConsumerImpl().getPullAPIWrapper()
27:             .setDefaultBrokerId(responseHeader.getBrokerId());
28:
29:         if (null == this.brokerName) {
30:             this.brokerName = responseHeader.getBrokerName();
31:         }
32:
33:         log.info("register filter server to broker OK, Return: {} {}",
34:             this.localAddr(),
35:             this.filtersrvConfig.getConnectWhichBroker(),
36:             responseHeader.getBrokerName(),
37:             responseHeader.getBrokerId());
38:     } catch (Exception e) {
39:         log.warn("register filter server Exception", e);
40:
41:         log.warn("access broker failed, kill oneself");
42:         System.exit(-1); // 異常退出
43:     }
44: }

3. 過濾類

3.1 Consumer 訂閱時設置 過濾類代碼

  • Consumer 針對每個 Topic 可以訂閱不同的 過濾類代碼

  1: // ⬇️⬇️⬇️【DefaultMQPushConsumer.java】
 2: @Override
 3: public void subscribe(String topic, String fullClassName, String filterClassSource) throws MQClientException {
 4:     this.defaultMQPushConsumerImpl.subscribe(topic, fullClassName, filterClassSource);
 5: }

3.2 Consumer 上傳 過濾類代碼

  • Consumer 心跳註冊到 Broker 的同時,上傳 過濾類代碼 到 Broker 對應的所有 Filtersrv

  1: // ⬇️⬇️⬇️【MQClientInstance.java】
 2: /**
 3:  * 發送心跳到Broker,上傳過濾類原始碼到Filtersrv
 4:  */

 5: public void sendHeartbeatToAllBrokerWithLock() {
 6:     if (this.lockHeartbeat.tryLock()) {
 7:         try {
 8:             this.sendHeartbeatToAllBroker();
 9:             this.uploadFilterClassSource();
10:         } catch (final Exception e) {
11:             log.error("sendHeartbeatToAllBroker exception", e);
12:         } finally {
13:             this.lockHeartbeat.unlock();
14:         }
15:     } else {
16:         log.warn("lock heartBeat, but failed.");
17:     }
18: }
19:
20: /**
21:  * 上傳過濾類到Filtersrv
22:  */

23: private void uploadFilterClassSource() {
24:     Iterator> it = this.consumerTable.entrySet().iterator();
25:     while (it.hasNext()) {
26:         Entry next = it.next();
27:         MQConsumerInner consumer = next.getValue();
28:         if (ConsumeType.CONSUME_PASSIVELY == consumer.consumeType()) {
29:             Set subscriptions = consumer.subscriptions();
30:             for (SubscriptionData sub : subscriptions) {
31:                 if (sub.isClassFilterMode() && sub.getFilterClassSource() != null) {
32:                     final String consumerGroup = consumer.groupName();
33:                     final String className = sub.getSubString();
34:                     final String topic = sub.getTopic();
35:                     final String filterClassSource = sub.getFilterClassSource();
36:                     try {
37:                         this.uploadFilterClassToAllFilterServer(consumerGroup, className, topic, filterClassSource);
38:                     } catch (Exception e) {
39:                         log.error("uploadFilterClassToAllFilterServer Exception", e);
40:                     }
41:                 }
42:             }
43:         }
44:     }
45: }

3.3 Filter 編譯 過濾類代碼

  • Filtersrv 處理 Consumer 上傳的 過濾類代碼,併進行編譯使用。

核心代碼如下:

  1: // ⬇️⬇️⬇️【FilterClassManager.java】
 2: /**
 3:  * 註冊過濾類
 4:  *
 5:  * @param consumerGroup 消費分組
 6:  * @param topic Topic
 7:  * @param className 過濾類名
 8:  * @param classCRC 過濾類原始碼CRC
 9:  * @param filterSourceBinary 過濾類原始碼
10:  * @return 是否註冊成功
11:  */

12: public boolean registerFilterClass(final String consumerGroup, final String topic,
13:     final String className, final int classCRC, final byte[] filterSourceBinary)
{
14:     final String key = buildKey(consumerGroup, topic);
15:     // 判斷是否要註冊新的過濾類
16:     boolean registerNew = false;
17:     FilterClassInfo filterClassInfoPrev = this.filterClassTable.get(key);
18:     if (null == filterClassInfoPrev) {
19:         registerNew = true;
20:     } else {
21:         if (this.filtersrvController.getFiltersrvConfig().isClientUploadFilterClassEnable()) {
22:             if (filterClassInfoPrev.getClassCRC() != classCRC && classCRC != 0) { // 類有變化
23:                 registerNew = true;
24:             }
25:         }
26:     }
27:     // 註冊新的過濾類
28:     if (registerNew) {
29:         synchronized (this.compileLock) {
30:             filterClassInfoPrev = this.filterClassTable.get(key);
31:             if (null != filterClassInfoPrev && filterClassInfoPrev.getClassCRC() == classCRC) {
32:                 return true;
33:             }
34:             try {
35:                 FilterClassInfo filterClassInfoNew = new FilterClassInfo();
36:                 filterClassInfoNew.setClassName(className);
37:                 filterClassInfoNew.setClassCRC(0);
38:                 filterClassInfoNew.setMessageFilter(null);
39:
40:                 if (this.filtersrvController.getFiltersrvConfig().isClientUploadFilterClassEnable()) {
41:                     String javaSource = new String(filterSourceBinary, MixAll.DEFAULT_CHARSET);
42:                     // 編譯新的過濾類
43:                     Class> newClass = DynaCode.compileAndLoadClass(className, javaSource);
44:                     // 創建新的過濾類物件
45:                     Object newInstance = newClass.newInstance();
46:                     filterClassInfoNew.setMessageFilter((MessageFilter) newInstance);
47:                     filterClassInfoNew.setClassCRC(classCRC);
48:                 }
49:
50:                 this.filterClassTable.put(key, filterClassInfoNew);
51:             } catch (Throwable e) {
52:                 String info = String.format("FilterServer, registerFilterClass Exception, consumerGroup: %s topic: %s className: %s",
53:                             consumerGroup, topic, className);
54:                 log.error(info, e);
55:                 return false;
56:             }
57:         }
58:     }
59:
60:     return true;
61: }

4. 過濾訊息

4.1 Consumer 從 Filtersrv 拉取訊息

  • Consumer 拉取 使用過濾類方式訂閱 的消費訊息時,從 Broker 對應的 Filtersrv 串列隨機選擇一個拉取訊息。如果選擇不到 Filtersrv,則無法拉取訊息。因此,Filtersrv 一定要做高可用

  1: // ⬇️⬇️⬇️【PullAPIWrapper.java】
 2: /**
 3:  * 拉取訊息核心方法
 4:  *
 5:  * @param mq 訊息嘟列
 6:  * @param subExpression 訂閱運算式
 7:  * @param subVersion 訂閱版本號
 8:  * @param offset 拉取佇列開始位置
 9:  * @param maxNums 批量拉 取訊息數量
10:  * @param sysFlag 拉取系統標識
11:  * @param commitOffset 提交消費進度
12:  * @param brokerSuspendMaxTimeMillis broker掛起請求最大時間
13:  * @param timeoutMillis 請求broker超時時間
14:  * @param communicationMode 通訊樣式
15:  * @param pullCallback 拉取回呼
16:  * @return 拉取訊息結果。只有通訊樣式為同步時,才傳回結果,否則傳回null。
17:  * @throws MQClientException 當尋找不到 broker 時,或發生其他client異常
18:  * @throws RemotingException 當遠程呼叫發生異常時
19:  * @throws MQBrokerException 當 broker 發生異常時。只有通訊樣式為同步時才會發生該異常。
20:  * @throws InterruptedException 當發生中斷異常時
21:  */

22: protected PullResult pullKernelImpl(
23:     final MessageQueue mq,
24:     final String subExpression,
25:     final long subVersion,
26:     final long offset,
27:     final int maxNums,
28:     final int sysFlag,
29:     final long commitOffset,
30:     final long brokerSuspendMaxTimeMillis,
31:     final long timeoutMillis,
32:     final CommunicationMode communicationMode,
33:     final PullCallback pullCallback
34: )
throws MQClientException, RemotingException, MQBrokerException, InterruptedException
{
35:     // // ....(省略代碼)
36:     // 請求拉取訊息
37:     if (findBrokerResult != null) {
38:         // ....(省略代碼)
39:         // 若訂閱topic使用過濾類,使用filtersrv獲取訊息
40:         String brokerAddr = findBrokerResult.getBrokerAddr();
41:         if (PullSysFlag.hasClassFilterFlag(sysFlagInner)) {
42:             brokerAddr = computPullFromWhichFilterServer(mq.getTopic(), brokerAddr);
43:         }
44:
45:         PullResult pullResult = this.mQClientFactory.getMQClientAPIImpl().pullMessage(
46:             brokerAddr,
47:             requestHeader,
48:             timeoutMillis,
49:             communicationMode,
50:             pullCallback);
51:
52:         return pullResult;
53:     }
54:
55:     // Broker信息不存在,則丟擲異常
56:     throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);
57: }
58:
59: /**
60:  * 計算filtersrv地址。如果有多個filtersrv,隨機選擇一個。
61:  *
62:  * @param topic Topic
63:  * @param brokerAddr broker地址
64:  * @return filtersrv地址
65:  * @throws MQClientException 當filtersrv不存在時
66:  */

67: private String computPullFromWhichFilterServer(final String topic, final String brokerAddr)
68:     throws MQClientException
{
69:     ConcurrentHashMap topicRouteTable = this.mQClientFactory.getTopicRouteTable();
70:     if (topicRouteTable != null) {
71:         TopicRouteData topicRouteData = topicRouteTable.get(topic);
72:         List list = topicRouteData.getFilterServerTable().get(brokerAddr);
73:         if (list != null && !list.isEmpty()) {
74:             return list.get(randomNum() % list.size());
75:         }
76:     }
77:     throw new MQClientException("Find Filter Server Failed, Broker Addr: " + brokerAddr + " topic: "
78:         + topic, null);
79: }

4.2 Filtersrv 從 Broker 拉取訊息

  • Filtersrv 拉取訊息後,會建議 Consumer 向 Broker主節點 拉取訊息。

  • Filtersrv 可以理解成一個 Consumer,向 Broker 拉取訊息時,實際使用的 DefaultMQPullConsumer.java 的方法和邏輯。

  1: // ⬇️⬇️⬇️【DefaultRequestProcessor.java】
 2: /**
 3:  * 拉取訊息
 4:  *
 5:  * @param ctx 拉取訊息context
 6:  * @param request 拉取訊息請求
 7:  * @return 響應
 8:  * @throws Exception 當發生異常時
 9:  */

10: private RemotingCommand pullMessageForward(final ChannelHandlerContext ctx, final RemotingCommand request) throws Exception {
11:     final RemotingCommand response = RemotingCommand.createResponseCommand(PullMessageResponseHeader.class);
12:     final PullMessageResponseHeader responseHeader = (PullMessageResponseHeader) response.readCustomHeader();
13:     final PullMessageRequestHeader requestHeader =
14:         (PullMessageRequestHeader) request.decodeCommandCustomHeader(PullMessageRequestHeader.class);
15:
16:     final FilterContext filterContext = new FilterContext();
17:     filterContext.setConsumerGroup(requestHeader.getConsumerGroup());
18:
19:     response.setOpaque(request.getOpaque());
20:
21:     DefaultMQPullConsumer pullConsumer = this.filtersrvController.getDefaultMQPullConsumer();
22:
23:     // 校驗Topic過濾類是否完整
24:     final FilterClassInfo findFilterClass = this.filtersrvController.getFilterClassManager().findFilterClass(requestHeader.getConsumerGroup(), requestHeader.getTopic());
25:     if (null == findFilterClass) {
26:         response.setCode(ResponseCode.SYSTEM_ERROR);
27:         response.setRemark("Find Filter class failed, not registered");
28:         return response;
29:     }
30:     if (null == findFilterClass.getMessageFilter()) {
31:         response.setCode(ResponseCode.SYSTEM_ERROR);
32:         response.setRemark("Find Filter class failed, registered but no class");
33:         return response;
34:     }
35:
36:     // 設置下次請求從 Broker主節點。
37:     responseHeader.setSuggestWhichBrokerId(MixAll.MASTER_ID);
38:
39:     MessageQueue mq = new MessageQueue();
40:     mq.setTopic(requestHeader.getTopic());
41:     mq.setQueueId(requestHeader.getQueueId());
42:     mq.setBrokerName(this.filtersrvController.getBrokerName());
43:     long offset = requestHeader.getQueueOffset();
44:     int maxNums = requestHeader.getMaxMsgNums();
45:
46:     final PullCallback pullCallback = new PullCallback() {
47:
48:         @Override
49:         public void onSuccess(PullResult pullResult) {
50:             responseHeader.setMaxOffset(pullResult.getMaxOffset());
51:             responseHeader.setMinOffset(pullResult.getMinOffset());
52:             responseHeader.setNextBeginOffset(pullResult.getNextBeginOffset());
53:             response.setRemark(null);
54:
55:             switch (pullResult.getPullStatus()) {
56:                 case FOUND:
57:                     response.setCode(ResponseCode.SUCCESS);
58:
59:                     List msgListOK = new ArrayList();
60:                     try {
61:                         for (MessageExt msg : pullResult.getMsgFoundList()) {
62:                             // 使用過濾類過濾訊息
63:                             boolean match = findFilterClass.getMessageFilter().match(msg, filterContext);
64:                             if (match) {
65:                                 msgListOK.add(msg);
66:                             }
67:                         }
68:
69:                         if (!msgListOK.isEmpty()) {
70:                             returnResponse(requestHeader.getConsumerGroup(), requestHeader.getTopic(), ctx, response, msgListOK);
71:                             return;
72:                         } else {
73:                             response.setCode(ResponseCode.PULL_RETRY_IMMEDIATELY);
74:                         }
75:                     } catch (Throwable e) {
76:                         final String error =
77:                             String.format("do Message Filter Exception, ConsumerGroup: %s Topic: %s ",
78:                                 requestHeader.getConsumerGroup(), requestHeader.getTopic());
79:                         log.error(error, e);
80:
81:                         response.setCode(ResponseCode.SYSTEM_ERROR);
82:                         response.setRemark(error + RemotingHelper.exceptionSimpleDesc(e));
83:                         returnResponse(requestHeader.getConsumerGroup(), requestHeader.getTopic(), ctx, response, null);
84:                         return;
85:                     }
86:
87:                     break;
88:                 case NO_MATCHED_MSG:
89:                     response.setCode(ResponseCode.PULL_RETRY_IMMEDIATELY);
90:                     break;
91:                 case NO_NEW_MSG:
92:                     response.setCode(ResponseCode.PULL_NOT_FOUND);
93:                     break;
94:                 case OFFSET_ILLEGAL:
95:                     response.setCode(ResponseCode.PULL_OFFSET_MOVED);
96:                     break;
97:                 default:
98:                     break;
99:             }
100:
101:             returnResponse(requestHeader.getConsumerGroup(), requestHeader.getTopic(), ctx, response, null);
102:         }
103:
104:         @Override
105:         public void onException(Throwable e) {
106:             response.setCode(ResponseCode.SYSTEM_ERROR);
107:             response.setRemark("Pull Callback Exception, " + RemotingHelper.exceptionSimpleDesc(e));
108:             returnResponse(requestHeader.getConsumerGroup(), requestHeader.getTopic(), ctx, response, null);
109:             return;
110:         }
111:     };
112:
113:     // 拉取訊息
114:     pullConsumer.pullBlockIfNotFound(mq, null, offset, maxNums, pullCallback);
115:     return null;
116: }

5. Filtersrv 高可用

赞(0)

分享創造快樂