歡迎光臨
每天分享高質量文章

【RPC 專欄】深入理解 RPC 之傳輸篇

點擊上方“芋道原始碼”,選擇“置頂公眾號”

技術文章第一時間送達!

原始碼精品專欄

 

  • RpcRequest 和 RpcResponse

  • Socket傳輸

  • Netty 傳輸

  • 同步與異步 阻塞與非阻塞

  • 總結


RPC 被稱為“遠程過程呼叫”,表明瞭一個方法呼叫會跨越網絡,跨越行程,所以傳輸層是不可或缺的。一說到網絡傳輸,一堆名詞就蹦了出來:TCP、UDP、HTTP,同步 or 異步,阻塞 or 非阻塞,長連接 or 短連接…

本文介紹兩種傳輸層的實現:使用 Socket 和使用 Netty。前者實現的是阻塞式的通信,是一個較為簡單的傳輸層實現方式,藉此可以瞭解傳輸層的工作原理及工作內容;後者是非阻塞式的,在一般的 RPC 場景下,性能會表現的很好,所以被很多開源 RPC 框架作為傳輸層的實現方式。

RpcRequest 和 RpcResponse

傳輸層傳輸的主要物件其實就是這兩個類,它們封裝了請求 id,方法名,方法引數,傳回值,異常等 RPC 呼叫中需要的一系列信息。

public class RpcRequest implements Serializable {
   private String interfaceName;
   private String methodName;
   private String parametersDesc;
   private Object[] arguments;
   private Map attachments;
   private int retries = 0;
   private long requestId;
   private byte rpcProtocolVersion;
}
public class RpcResponse implements Serializable {
   private Object value;
   private Exception exception;
   private long requestId;
   private long processTime;
   private int timeout;
   private Map attachments;// rpc協議版本兼容時可以回傳一些額外的信息
   private byte rpcProtocolVersion;
}

Socket傳輸

Server

public class RpcServerSocketProvider {
   public static void main(String[] args) throws Exception {
       //序列化層實現參考之前的章節
       Serialization serialization = new Hessian2Serialization();
       ServerSocket serverSocket = new ServerSocket(8088);
       ExecutorService executorService = Executors.newFixedThreadPool(10);
       while (true) {
           final Socket socket = serverSocket.accept();
           executorService.execute(() -> {
               try {
                   InputStream is = socket.getInputStream();
                   OutputStream os = socket.getOutputStream();
                   try {
                       DataInputStream dis = new DataInputStream(is);
                       int length = dis.readInt();
                       byte[] requestBody = new byte[length];
                       dis.read(requestBody);
                       //反序列化requestBody => RpcRequest
                       RpcRequest rpcRequest = serialization.deserialize(requestBody, RpcRequest.class);
                       //反射呼叫生成響應 並組裝成 rpcResponse
                       RpcResponse rpcResponse = invoke(rpcRequest);
                       //序列化rpcResponse => responseBody
                       byte[] responseBody = serialization.serialize(rpcResponse);
                       DataOutputStream dos = new DataOutputStream(os);
                       dos.writeInt(responseBody.length);
                       dos.write(responseBody);
                       dos.flush();
                   } catch (Exception e) {
                       e.printStackTrace();
                   } finally {
                       is.close();
                       os.close();
                   }
               } catch (Exception e) {
                   e.printStackTrace();
               } finally {
                   try {
                       socket.close();
                   } catch (Exception e) {
                       e.printStackTrace();
                   }
               }
           });
       }
   }
   public static RpcResponse invoke(RpcRequest rpcRequest) {
       //模擬反射呼叫
       RpcResponse rpcResponse = new RpcResponse();
       rpcResponse.setRequestId(rpcRequest.getRequestId());
       //... some operation
       return rpcResponse;
   }
}

Client

public class RpcSocketConsumer {
   public static void main(String[] args) throws Exception {
       //序列化層實現參考之前的章節
       Serialization serialization = new Hessian2Serialization();
       Socket socket = new Socket("localhost", 8088);
       InputStream is = socket.getInputStream();
       OutputStream os = socket.getOutputStream();
       //封裝rpc請求
       RpcRequest rpcRequest = new RpcRequest();
       rpcRequest.setRequestId(12345L);
       //序列化 rpcRequest => requestBody
       byte[] requestBody = serialization.serialize(rpcRequest);
       DataOutputStream dos = new DataOutputStream(os);
       dos.writeInt(requestBody.length);
       dos.write(requestBody);
       dos.flush();
       DataInputStream dis = new DataInputStream(is);
       int length = dis.readInt();
       byte[] responseBody = new byte[length];
       dis.read(responseBody);
       //反序列化 responseBody => rpcResponse
       RpcResponse rpcResponse = serialization.deserialize(responseBody, RpcResponse.class);
       is.close();
       os.close();
       socket.close();
       System.out.println(rpcResponse.getRequestId());
   }
}

dis.readInt() 和 dis.read(byte[] bytes) 決定了使用 Socket 通信是一種阻塞式的操作,報文頭+報文體的傳輸格式是一種常見的格式,除此之外,使用特殊的字符如空行也可以劃分出報文結構。在示例中,我們使用一個 int(4位元組)來傳遞報問題的長度,之後傳遞報文體,在複雜的通信協議中,報文頭除了儲存報文體還會額外儲存一些信息,包括協議名稱,版本,心跳標識等。

在網絡傳輸中,只有位元組能夠被識別,所以我們在開頭引入了 Serialization 接口,負責完成 RpcRequest 和 RpcResponse 與位元組的相互轉換。(Serialization 的工作機制可以參考之前的文章)

使用 Socket 通信可以發現:每次 Server 處理 Client 請求都會從執行緒池中取出一個執行緒來處理請求,這樣的開銷對於一般的 Rpc 呼叫是不能夠接受的,而 Netty 一類的網絡框架便派上了用場。

Netty 傳輸

Server 和 ServerHandler

public class RpcNettyProvider {
   public static void main(String[] args) throws Exception{
       EventLoopGroup bossGroup = new NioEventLoopGroup();
       EventLoopGroup workerGroup = new NioEventLoopGroup();
       try {
           // 創建並初始化 Netty 服務端 Bootstrap 物件
           ServerBootstrap bootstrap = new ServerBootstrap();
           bootstrap.group(bossGroup, workerGroup);
           bootstrap.channel(NioServerSocketChannel.class);
           bootstrap.childHandler(new ChannelInitializer() {
               @Override
               public void initChannel(SocketChannel channel) throws Exception {
                   ChannelPipeline pipeline = channel.pipeline();
                   pipeline.addLast(new RpcDecoder(RpcRequest.class)); // 解碼 RPC 請求
                   pipeline.addLast(new RpcEncoder(RpcResponse.class)); // 編碼 RPC 響應
                   pipeline.addLast(new RpcServerHandler()); // 處理 RPC 請求
               }
           });
           bootstrap.option(ChannelOption.SO_BACKLOG, 1024);
           bootstrap.childOption(ChannelOption.SO_KEEPALIVE, true);
           ChannelFuture future = bootstrap.bind("127.0.0.1", 8087).sync();
           // 關閉 RPC 服務器
           future.channel().closeFuture().sync();
       } finally {
           workerGroup.shutdownGracefully();
           bossGroup.shutdownGracefully();
       }
   }
}
public class RpcServerHandler extends SimpleChannelInboundHandler<RpcRequest> {
   @Override
   public void channelRead0(final ChannelHandlerContext ctx, RpcRequest request) throws Exception {
       RpcResponse rpcResponse = invoke(request);
       // 寫入 RPC 響應物件並自動關閉連接
       ctx.writeAndFlush(rpcResponse).addListener(ChannelFutureListener.CLOSE);
   }
   private RpcResponse invoke(RpcRequest rpcRequest) {
       //模擬反射呼叫
       RpcResponse rpcResponse = new RpcResponse();
       rpcResponse.setRequestId(rpcRequest.getRequestId());
       //... some operation
       return rpcResponse;
   }
   @Override
   public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
       cause.printStackTrace();
       ctx.close();
   }
}

Client 和 ClientHandler

public class RpcNettyConsumer {
   public static void main(String[] args) throws Exception{
       EventLoopGroup group = new NioEventLoopGroup();
       try {
           // 創建並初始化 Netty 客戶端 Bootstrap 物件
           Bootstrap bootstrap = new Bootstrap();
           bootstrap.group(group);
           bootstrap.channel(NioSocketChannel.class);
           bootstrap.handler(new ChannelInitializer() {
               @Override
               public void initChannel(SocketChannel channel) throws Exception
{
                   ChannelPipeline pipeline = channel.pipeline();
                   pipeline.addLast(new RpcEncoder(RpcRequest.class)); // 編碼 RPC 請求
                   pipeline.addLast(new RpcDecoder(RpcResponse.class)); // 解碼 RPC 響應
                   pipeline.addLast(new RpcClientHandler()); // 處理 RPC 響應
               }
           });
           bootstrap.option(ChannelOption.TCP_NODELAY, true);
           // 連接 RPC 服務器
           ChannelFuture future = bootstrap.connect("127.0.0.1", 8087).sync();
           // 寫入 RPC 請求資料並關閉連接
           Channel channel = future.channel();
           RpcRequest rpcRequest = new RpcRequest();
           rpcRequest.setRequestId(123456L);
           channel.writeAndFlush(rpcRequest).sync();
           channel.closeFuture().sync();
       } finally {
           group.shutdownGracefully();
       }
   }
}
public class RpcClientHandler extends SimpleChannelInboundHandler<RpcResponse> {
   @Override
   public void channelRead0(ChannelHandlerContext ctx, RpcResponse response) throws Exception {
       System.out.println(response.getRequestId());//處理響應
   }
   @Override
   public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
       cause.printStackTrace();
       ctx.close();
   }
}

使用 Netty 的好處是很方便地實現了非阻塞式的呼叫,關鍵部分都給出了註釋。上述的代碼雖然很多,並且和我們熟悉的 Socket 通信代碼大相徑庭,但大多數都是 Netty 的模板代碼,啟動服務器,配置編解碼器等。真正的 RPC 封裝操作大多集中在 Handler 的 channelRead 方法(負責讀取)以及 channel.writeAndFlush 方法(負責寫入)中。

public class RpcEncoder extends MessageToByteEncoder {
   private Class> genericClass;
   Serialization serialization = new Hessian2Serialization();
   public RpcEncoder(Class> genericClass) {
       this.genericClass = genericClass;
   }
   @Override
   public void encode(ChannelHandlerContext ctx, Object in, ByteBuf out) throws Exception
{
       if (genericClass.isInstance(in)) {
           byte[] data = serialization.serialize(in);
           out.writeInt(data.length);
           out.writeBytes(data);
       }
   }
}
public class RpcDecoder extends ByteToMessageDecoder {
   private Class> genericClass;
   public RpcDecoder(Class> genericClass) {
       this.genericClass = genericClass;
   }
   Serialization serialization = new Hessian2Serialization();
   @Override
   public void decode(ChannelHandlerContext ctx, ByteBuf in, List out) throws Exception
{
       if (in.readableBytes() < 4) {
           return;
       }
       in.markReaderIndex();
       int dataLength = in.readInt();
       if (in.readableBytes() < dataLength) {
           in.resetReaderIndex();
           return;
       }
       byte[] data = new byte[dataLength];
       in.readBytes(data);
       out.add(serialization.deserialize(data, genericClass));
   }
}

使用 Netty 不能保證傳回的位元組大小,所以需要加上 in.readableBytes() < 4 這樣的判斷,以及 in.markReaderIndex() 這樣的標記,用來區分報文頭和報文體。

同步與異步 阻塞與非阻塞

這兩組傳輸特性經常被拿來做對比,很多文章聲稱 Socket 是同步阻塞的,Netty 是異步非阻塞,其實有點問題。

其實這兩組並沒有必然的聯繫,同步阻塞,同步非阻塞,異步非阻塞都有可能(同步非阻塞倒是沒見過),而大多數使用 Netty 實現的 RPC 呼叫其實應當是同步非阻塞的(當然一般 RPC 也支持異步非阻塞)。

同步和異步關註的是訊息通信機制
所謂同步,就是在發出一個呼叫時,在沒有得到結果之前,該呼叫就不傳回。但是一旦呼叫傳回,就得到傳回值了。
換句話說,就是由呼叫者主動等待這個呼叫的結果。

而異步則是相反,呼叫在發出之後,這個呼叫就直接傳回了,所以沒有傳回結果。換句話說,當一個異步過程呼叫發出後,呼叫者不會立刻得到結果。而是在呼叫發出後,被呼叫者通過狀態、通知來通知呼叫者,或通過回呼函式處理這個呼叫。

如果需要 RPC 呼叫傳回一個結果,該結果立刻被使用,那意味著著大概率需要是一個同步呼叫。如果不關心其傳回值,則可以將其做成異步接口,以提升效率。

阻塞和非阻塞關註的是程式在等待呼叫結果(訊息,傳回值)時的狀態.

阻塞呼叫是指呼叫結果傳回之前,當前執行緒會被掛起。呼叫執行緒只有在得到結果之後才會傳回。
非阻塞呼叫指在不能立刻得到結果之前,該呼叫不會阻塞當前執行緒。

在上述的例子中可以看出 Socket 通信我們顯示宣告了一個包含10個執行緒的執行緒池,每次請求到來,分配一個執行緒,等待客戶端傳遞報文頭和報文體的行為都會阻塞該執行緒,可以見得其整體是阻塞的。而在 Netty 通信的例子中,每次請求並沒有分配一個執行緒,而是通過 Handler 的方式處理請求(聯想 NIO 中 Selector),是非阻塞的。

使用同步非阻塞方式的通信機制並不一定同步阻塞式的通信強,所謂沒有最好,只有更合適,而一般的同步非阻塞 通信適用於 1.網絡連接數量多 2.每個連接的io不頻繁 的場景,與 RPC 呼叫較為契合。而成熟的 RPC 框架的傳輸層和協議層通常也會提供多種選擇,以應對不同的場景。

總結

本文堆砌了一些代碼,而難點主要是對 Socket 的理解,和 Netty 框架的掌握。Netty 的學習有一定的門檻,但實際需要掌握的知識點其實並不多(僅僅針對 RPC 框架所涉及的知識點而言),學習 Netty ,個人推薦《Netty IN ACTION》以及 https://waylau.gitbooks.io/netty-4-user-guide/Getting%20Started/Before%20Getting%20Started.html 該網站的例子。

參考資料:

http://javatar.iteye.com/blog/1123915 – 梁飛

https://gitee.com/huangyong/rpc – 黃勇

666. 彩蛋

如果你對 RPC 併發感興趣,歡迎加入我的知識一起交流。

知識星球


目前在知識星球(https://t.zsxq.com/2VbiaEu)更新瞭如下 Dubbo 原始碼解析如下:

01. 除錯環境搭建
02. 專案結構一覽
03. API 配置(一)之應用
04. API 配置(二)之服務提供者
05. API 配置(三)之服務消費者
06. 屬性配置
07. XML 配置
08. 核心流程一覽

一共 60 篇++

赞(0)

分享創造快樂