歡迎光臨
每天分享高質量文章

資料庫中介軟體 MyCAT 原始碼分析 —— 【單庫單表】查詢

本文主要基於 MyCAT 1.6.5 正式版

  • 1. 概述

  • 2. 接收請求,解析 SQL

  • 3. 獲得路由結果

  • 4. 獲得 MySQL 連線,執行 SQL

  • 5. 響應執行 SQL 結果

  • 6. 其他 :更新 / 刪除


友情提示:歡迎關註公眾號【芋道原始碼】。?關註後,拉你進【原始碼圈】微信群和【芋艿】搞基嗨皮。

友情提示:歡迎關註公眾號【芋道原始碼】。?關註後,拉你進【原始碼圈】微信群和【芋艿】】搞基嗨皮。

友情提示:歡迎關註公眾號【芋道原始碼】。?關註後,拉你進【原始碼圈】微信群和【芋艿】】搞基嗨皮。


1. 概述

內容形態以 順序圖 + 核心程式碼 為主。 
如果有地方表述不錯誤或者不清晰,歡迎留言。 
對於內容形態,非常糾結,如果有建議,特別特別特別歡迎您提出。 
微訊號:wangwenbin-server。

本文講解 【單庫單表】查詢 所涉及到的程式碼。

?內容和 《MyCAT 原始碼分析 —— 【單庫單表】插入》 超級相似,一方面本身流程基本相同,另外一方面文章結構沒拆分好。我們使用 ? 標記差異的邏輯。

互動如下圖:

單庫單表查詢簡圖

整個過程,MyCAT Server 流程如下:

  1. 接收 MySQL Client 請求,解析 SQL。

  2. 獲得路由結果,進行路由。

  3. 獲得 MySQL 連線,執行 SQL。

  4. 響應執行結果,傳送結果給 MySQL Client。

我們逐個步驟分析,一起來看看原始碼。

2. 接收請求,解析 SQL

【單庫單表】查詢(01主流程)

【1 – 2】

接收一條 MySQL 命令。在【1】之前,還有請求資料讀取、拆成單條 MySQL SQL。

【3】

  1: // ⬇️⬇️⬇️【FrontendCommandHandler.java】
 2: public class FrontendCommandHandler implements NIOHandler {
 3:
 4:     @Override
 5:     public void handle(byte[] data) {
 6:    
 7:         // .... 省略部分程式碼
 8:         switch (data[4]) //
 9:         {
10:             case MySQLPacket.COM_INIT_DB:
11:                 commands.doInitDB();
12:                 source.initDB(data);
13:                 break;
14:             case MySQLPacket.COM_QUERY: // 查詢命令
15:                 // 計數查詢命令
16:                 commands.doQuery();
17:                 // 執行查詢命令
18:                 source.query(data);
19:                 break;
20:             case MySQLPacket.COM_PING:
21:                 commands.doPing();
22:                 source.ping();
23:                 break;
24:             // .... 省略部分case
25:         }
26:     }
27:
28: }

INSERT/SELECT/UPDATE/DELETE 等 SQL 歸屬於 MySQLPacket.COM_QUERY,詳細可見:《MySQL協議分析#4.2 客戶端命令請求報文(客戶端 -> 伺服器)》。

【4】

將 二進位制陣列 解析成 SQL。核心程式碼如下:

  1: // ⬇️⬇️⬇️【FrontendConnection.java】
 2: public void query(byte[] data) {
 3:     // 取得陳述句
 4:     String sql = null;      
 5:     try {
 6:         MySQLMessage mm = new MySQLMessage(data);
 7:         mm.position(5);
 8:         sql = mm.readString(charset);
 9:     } catch (UnsupportedEncodingException e) {
10:         writeErrMessage(ErrorCode.ER_UNKNOWN_CHARACTER_SET, "Unknown charset '" + charset + "'");
11:         return;
12:     }      
13:     // 執行陳述句
14:     this.query( sql );
15: }

【5】

解析 SQL 型別。核心程式碼如下:

  1: // ⬇️⬇️⬇️【ServerQueryHandler.java】
 2: @Override
 3: public void query(String sql) {
 4:     // 解析 SQL 型別
 5:     int rs = ServerParse.parse(sql);
 6:     int sqlType = rs & 0xff;
 7:    
 8:     switch (sqlType) {
 9:     //explain sql
10:     case ServerParse.EXPLAIN:
11:         ExplainHandler.handle(sql, c, rs >>> 8);
12:         break;
13:     // .... 省略部分case
14:         break;
15:     case ServerParse.SELECT:
16:         SelectHandler.handle(sql, c, rs >>> 8);
17:         break;
18:     // .... 省略部分case
19:     default:
20:         if(readOnly){
21:             LOGGER.warn(new StringBuilder().append("User readonly:").append(sql).toString());
22:             c.writeErrMessage(ErrorCode.ER_USER_READ_ONLY, "User readonly");
23:             break;
24:         }
25:         c.execute(sql, rs & 0xff);
26:     }
27: }
28:
29:
30: // ⬇️⬇️⬇️【ServerParse.java】
31: public static int parse(String stmt) {
32:     int length = stmt.length();
33:     //FIX BUG FOR SQL SUCH AS /XXXX/SQL
34:     int rt = -1;
35:     for (int i = 0; i < length; ++i) {
36:         switch (stmt.charAt(i)) {
37:         // .... 省略部分case            case 'I':
38:         case 'i':
39:             rt = insertCheck(stmt, i);
40:             if (rt != OTHER) {
41:                 return rt;
42:             }
43:             continue;
44:             // .... 省略部分case
45:         case 'S':
46:         case 's':
47:             rt = sCheck(stmt, i);
48:             if (rt != OTHER) {
49:                 return rt;
50:             }
51:             continue;
52:             // .... 省略部分case
53:         default:
54:             continue;
55:         }
56:     }
57:     return OTHER;
58: }

?【6】【7】

解析 Select SQL 型別,分發到對應的邏輯。核心程式碼如下:

  1: // ⬇️⬇️⬇️【SelectHandler.java】
 2: public static void handle(String stmt, ServerConnection c, int offs) {
 3:     int offset = offs;
 4:     switch (ServerParseSelect.parse(stmt, offs)) { // 解析 Select SQL 型別
 5:     case ServerParseSelect.VERSION_COMMENT: // select @@VERSION_COMMENT;
 6:         SelectVersionComment.response(c);
 7:         break;
 8:     case ServerParseSelect.DATABASE: // select DATABASE();
 9:         SelectDatabase.response(c);
10:         break;
11:     case ServerParseSelect.USER: // select CURRENT_USER();
12:         SelectUser.response(c);
13:         break;
14:     case ServerParseSelect.VERSION: // select VERSION();
15:         SelectVersion.response(c);
16:         break;
17:     case ServerParseSelect.SESSION_INCREMENT: // select @@session.auto_increment_increment;
18:         SessionIncrement.response(c);
19:         break;
20:     case ServerParseSelect.SESSION_ISOLATION: // select @@session.tx_isolation;
21:         SessionIsolation.response(c);
22:         break;
23:     case ServerParseSelect.LAST_INSERT_ID: // select LAST_INSERT_ID();
24:         // ....省略程式碼
25:         break;
26:     case ServerParseSelect.IDENTITY: // select @@identity
27:         // ....省略程式碼
28:         break;
29:     case ServerParseSelect.SELECT_VAR_ALL: //
30:         SelectVariables.execute(c,stmt);
31:             break;
32:     case ServerParseSelect.SESSION_TX_READ_ONLY: //
33:         SelectTxReadOnly.response(c);
34:             break;
35:     default: // 其他,例如 select * from table
36:         c.execute(stmt, ServerParse.SELECT);
37:     }
38: }
39: // ⬇️⬇️⬇️【ServerParseSelect.java】
40: public static int parse(String stmt, int offset) {
41:     int i = offset;
42:     for (; i < stmt.length(); ++i) {
43:         switch (stmt.charAt(i)) {
44:         case ' ':
45:             continue;
46:         case '/':
47:         case '#':
48:             i = ParseUtil.comment(stmt, i);
49:             continue;
50:         case '@':
51:             return select2Check(stmt, i);
52:         case 'D':
53:         case 'd':
54:             return databaseCheck(stmt, i);
55:         case 'L':
56:         case 'l':
57:             return lastInsertCheck(stmt, i);
58:         case 'U':
59:         case 'u':
60:             return userCheck(stmt, i);
61:         case 'C':
62:         case 'c':
63:             return currentUserCheck(stmt, i);
64:         case 'V':
65:         case 'v':
66:             return versionCheck(stmt, i);
67:         default:
68:             return OTHER;
69:         }
70:     }
71:     return OTHER;
72: }

【8】

執行 SQL,詳細解析見下文,核心程式碼如下:

  1: // ⬇️⬇️⬇️【ServerConnection.java】
 2: public class ServerConnection extends FrontendConnection {
 3:     public void execute(String sql, int type) {
 4:         // .... 省略程式碼
 5:         SchemaConfig schema = MycatServer.getInstance().getConfig().getSchemas().get(db);
 6:         if (schema == null) {
 7:             writeErrMessage(ErrorCode.ERR_BAD_LOGICDB,
 8:                     "Unknown MyCAT Database '" + db + "'");
 9:             return;
10:         }
11:
12:         // .... 省略程式碼
13:
14:         // 路由到後端資料庫,執行 SQL
15:         routeEndExecuteSQL(sql, type, schema);
16:     }
17:    
18:     public void routeEndExecuteSQL(String sql, final int type, final SchemaConfig schema) {
19:         // 路由計算
20:         RouteResultset rrs = null;
21:         try {
22:             rrs = MycatServer
23:                     .getInstance()
24:                     .getRouterservice()
25:                     .route(MycatServer.getInstance().getConfig().getSystem(),
26:                             schema, type, sql, this.charset, this);
27:
28:         } catch (Exception e) {
29:             StringBuilder s = new StringBuilder();
30:             LOGGER.warn(s.append(this).append(sql).toString() + " err:" + e.toString(),e);
31:             String msg = e.getMessage();
32:             writeErrMessage(ErrorCode.ER_PARSE_ERROR, msg == null ? e.getClass().getSimpleName() : msg);
33:             return;
34:         }
35:
36:         // 執行 SQL
37:         if (rrs != null) {
38:             // session執行
39:             session.execute(rrs, rrs.isSelectForUpdate() ? ServerParse.UPDATE : type);
40:         }
41:        
42:      }
43:
44: }

3. 獲得路由結果

【單庫單表】插入(02獲取路由)

【 1 – 5 】

獲得路由主流程。核心程式碼如下:

  1: // ⬇️⬇️⬇️【SelectHandler.java】
 2: public RouteResultset route(SystemConfig sysconf, SchemaConfig schema,
 3:         int sqlType, String stmt, String charset, ServerConnection sc)

 4:         throws SQLNonTransientException
{
 5:     RouteResultset rrs = null;
 6:
 7:     // SELECT 型別的SQL, 檢測快取是否存在
 8:     if (sqlType == ServerParse.SELECT) {
 9:         cacheKey = schema.getName() + stmt;        
10:         rrs = (RouteResultset) sqlRouteCache.get(cacheKey);
11:         if (rrs != null) {
12:             checkMigrateRule(schema.getName(),rrs,sqlType);
13:             return rrs;
14:             }
15:         }
16:     }
17:
18:     // .... 省略程式碼
19:     int hintLength = RouteService.isHintSql(stmt);
20:     if(hintLength != -1){ // TODO 待讀:hint
21:         // .... 省略程式碼
22:         }
23:     } else {
24:         stmt = stmt.trim();
25:         rrs = RouteStrategyFactory.getRouteStrategy().route(sysconf, schema, sqlType, stmt,
26:                 charset, sc, tableId2DataNodeCache);
27:     }
28:
29:     // 記錄查詢命令路由結果快取
30:     if (rrs != null && sqlType == ServerParse.SELECT && rrs.isCacheAble()) {
31:         sqlRouteCache.putIfAbsent(cacheKey, rrs);
32:     }
33:     // .... 省略程式碼        return rrs;
34: }
35: // ⬇️⬇️⬇️【AbstractRouteStrategy.java】
36: @Override
37: public RouteResultset route(SystemConfig sysConfig, SchemaConfig schema, int sqlType, String origSQL,
38:         String charset, ServerConnection sc, LayerCachePool cachePool)
throws SQLNonTransientException
{
39:
40:     // .... 省略程式碼
41:
42:     // 處理一些路由之前的邏輯;全域性序列號,父子表插入
43:     if (beforeRouteProcess(schema, sqlType, origSQL, sc) ) {
44:         return null;
45:     }
46:
47:     // .... 省略程式碼
48:
49:     // 檢查是否有分片
50:     if (schema.isNoSharding() && ServerParse.SHOW != sqlType) {
51:         rrs = RouterUtil.routeToSingleNode(rrs, schema.getDataNode(), stmt);
52:     } else {
53:         RouteResultset returnedSet = routeSystemInfo(schema, sqlType, stmt, rrs);
54:         if (returnedSet == null) {
55:             rrs = routeNormalSqlWithAST(schema, stmt, rrs, charset, cachePool,sqlType,sc);
56:         }
57:     }
58:
59:     return rrs;
60: }

?【3】第 7 至 16 行 :當 Select SQL 存在路由結果快取時,直接傳回快取。
?【6】第 29 至 32 行 :記錄 Select SQL 路由結果到快取。

路由 詳細解析,我們另開文章,避免內容過多,影響大家對【插入】流程和邏輯的理解。

4. 獲得 MySQL 連線,執行 SQL

【單庫單表】查詢(03執行 SQL)

【 1 – 8 】

獲得 MySQL 連線。

  • PhysicalDBNode :物理資料庫節點。

  • PhysicalDatasource :物理資料庫資料源。

【 9 – 13 】

傳送 SQL 到 MySQL Server,執行 SQL。

? 5. 響應執行 SQL 結果

【單庫單表】查詢(04執行響應)

核心程式碼如下:

  1: // ⬇️⬇️⬇️【MySQLConnectionHandler.java】
 2: @Override
 3: protected void handleData(byte[] data) {
 4:     switch (resultStatus) {
 5:     case RESULT_STATUS_INIT:
 6:         switch (data[4]) {
 7:         case OkPacket.FIELD_COUNT:
 8:             handleOkPacket(data);
 9:             break;
10:         case ErrorPacket.FIELD_COUNT:
11:             handleErrorPacket(data);
12:             break;
13:         case RequestFilePacket.FIELD_COUNT:
14:             handleRequestPacket(data);
15:             break;
16:         default: // 初始化 essay-header fields
17:             resultStatus = RESULT_STATUS_HEADER;
18:             essay-header = data;
19:             fields = new ArrayList<byte[]>((int) ByteUtil.readLength(data,
20:                     4));
21:         }
22:         break;
23:     case RESULT_STATUS_HEADER:
24:         switch (data[4]) {
25:         case ErrorPacket.FIELD_COUNT:
26:             resultStatus = RESULT_STATUS_INIT;
27:             handleErrorPacket(data);
28:             break;
29:         case EOFPacket.FIELD_COUNT: // 解析 fields 結束
30:             resultStatus = RESULT_STATUS_FIELD_EOF;
31:             handleFieldEofPacket(data);
32:             break;
33:         default: // 解析 fields
34:             fields.add(data);
35:         }
36:         break;
37:     case RESULT_STATUS_FIELD_EOF:
38:         switch (data[4]) {
39:         case ErrorPacket.FIELD_COUNT:
40:             resultStatus = RESULT_STATUS_INIT;
41:             handleErrorPacket(data);
42:             break;
43:         case EOFPacket.FIELD_COUNT: // 解析 每行記錄 結束
44:             resultStatus = RESULT_STATUS_INIT;
45:             handleRowEofPacket(data);
46:             break;
47:         default: // 每行記錄
48:             handleRowPacket(data);
49:         }
50:         break;
51:     default:
52:         throw new RuntimeException("unknown status!");
53:     }
54: }

6. 其他 :更新 / 刪除

流程基本和 《MyCAT原始碼分析:【單庫單表】插入》 相同。我們就不另外文章解析。

贊(0)

分享創造快樂