歡迎光臨
每天分享高質量文章

MySql-Proxy之多路結果集歸併

筆者覺得Cobar之類的分庫分表最神奇的部分就是靠一條sql查詢不同schema下(甚至不同實體下)的不同的表。例如

select * from t_test; // 對映為
|------select * from schema1.t_test
|------select * from schema2.t_test
ResultSet // 傳回結果集為兩者的歸併    
|--schema1.t_test.ResultSet
|--schema2.t_test.ResultSet

以筆者這種刨根到底的性格當然要把這個過程DIY出來。
由於Cobar對MySql的連線是BIO的。而筆者喜歡NIO,於是用NIO將Corbar的多節點查詢全部重寫(基於Netty)。NIO的難度更大,效能也更好,這個重寫的過程就記錄成部落格,以饗讀者。

多路歸併原理

多節點傳送select陳述句

當客戶端傳送給select * from test後,Lancelot會根據配置將陳述句將當前陳述句路由到多個不同的DB實體上,如上圖所示。
FrontEnd:用來和client互動,一個FrontEnd可以對應多個Backend
BackEnd:用來和DB互動

多節點歸併結果集


每條陳述句在一個DB實體上面執行後,都會傳回一個ResultSet結果集,在此需要將多個結果集歸併成一個統一的結果集,然後傳回給client,這樣client就感覺像查詢一個DB實體一樣。
如上圖所示,歸併過程在下麵講解。

歸併ResultSet結果集

在講如何歸併前,我們需要重溫一下MySql傳回結果集的結構, 其詳細描述見筆者部落格:

https://my.oschina.net/alchemystar/blog/834150

其協議格式如下所示:

由上圖可見,
其中的Row才是真正的資料內容。而其餘的例如,field_count、fields 、eof以及last_eof則僅僅是攜帶資料格式的資訊。
如果要多路歸併成一路的話,field_count、fields、eof以及last_eof這些只需要傳回給client一份即可。

去掉多餘的結構描述資訊

現在根據協議結構將Frontend歸併結果集的程式碼階段分為三個:
(1)fieldList階段: 由於field_count、fields、eof這三個階段是連續的,於是將其合併成一個狀態。
(2)Row階段:顧名思義,接收DB傳回的資料階段。
(3)LastEof階段:最後的收尾階段,每個結果集的last_eof表示此結果集的結束,只有所有的last_eof都收到之後才能表示結果的結束。

fieldList階段的處理:

首先每個Backend都接收field_count,fields,eof。當其接收到eof之後,收到row之前,向Frontend提交這些資訊。如下圖所示:

當Frontend獲取到Backend1的feilds資訊之後,就開始接收Row,並丟棄其餘Backend的fields資訊。程式碼如下:

public void fieldListResponse(List fieldList) {    lock.lock();    try {        if(!isFailed.get()) {            // 如果還沒有傳過fieldList的話,則傳遞
if (!fieldEofReturned) {
writeFiledList(fieldList);
fieldEofReturned = true;
}
}
} finally {        lock.unlock();
}
}

Row階段的處理

當Frontend進入Row階段之後,處理比較簡單,Backend傳送的任何Row都向前段傳輸,如果是Backend的fields資訊則丟棄。如下圖所示:

LastEof階段

每當一個Backend收到last_eof之後,表明當前Backend的結果集已經結束。Frontend需要等所有的Backend結果集結束之後,再傳送一個last_eof告訴client,所有的結果已經完了,如下圖所示:

程式碼如下所示:

// last eof responsepublic void lastEofResponse(BinaryPacket bin) {    lock.lock();    try {
logger.info("last eof ");        if (decrementCountBy()) {            if (!isFailed.get()) {
bin.packetId = ++packetId;
logger.info("write eof okay");
bin.write(session.getCtx());                // 如果是自動提交,則釋放session
if(session.getSource().isAutocommit()){
session.release();
}
}else{
notifyFailure();
}
}
} finally {        lock.unlock();
}
}

例子

執行lancelot中的LanceLotServer的main命令,其就自動連線了我本機的MySql。 配置之類的在SystemConfig中進行修改(現在還沒有做到配置檔案化)。
我用mysqlclient連線到lancetlot,然後執行select * from test命令。結果如下圖所示:

贊(0)

分享創造快樂