歡迎光臨
每天分享高質量文章

【Netty 專欄】Netty入門簡介

點選上方“芋道原始碼”,選擇“置頂公眾號”

技術文章第一時間送達!

原始碼精品專欄

 

前言

Netty是一個高效能、非同步事件驅動的NIO框架,提供了對TCP、UDP和檔案傳輸的支援,作為一個非同步NIO框架,Netty的所有IO操作都是非同步非阻塞的,透過Future-Listener機制,使用者可以方便的主動獲取或者透過通知機制獲得IO操作結果。

作為當前最流行的NIO框架,Netty在網際網路領域、大資料分散式計算領域、遊戲行業、通訊行業等獲得了廣泛的應用,一些業界著名的開源元件也基於Netty構建,比如RPC框架、zookeeper等。

不熟悉NIO的可以先看看下麵兩篇文章 

1、深入淺出NIO之Channel、Buffer 

2、深入淺出NIO之Selector實現原理

那麼,Netty效能為啥這麼高?主要是因為其內部Reactor模型的實現。

Reactor模型

Netty中的Reactor模型主要由多路復用器(Acceptor)、事件分發器(Dispatcher)、事件處理器(Handler)組成,可以分為三種。

1、單執行緒模型:所有I/O操作都由一個執行緒完成,即多路復用、事件分發和處理都是在一個Reactor執行緒上完成的。

對於一些小容量應用場景,可以使用單執行緒模型。但是對於高負載、大併發的應用卻不合適,主要原因如下:

  • 一個執行緒同時處理成百上千的鏈路,效能上無法支撐,即便CPU負荷達到100%,也無法滿足海量訊息的編碼、解碼、讀取和傳送;

  • 當負載過重後,處理速度將變慢,這會導致大量客戶端連線超時,超時之後往往會進行重發,最終會導致大量訊息積壓和處理超時,成為系統的效能瓶頸;

  • 一旦單執行緒意外跑飛,或者進入死迴圈,會導致整個系統通訊模組不可用,不能接收和處理外部訊息,造成節點故障,可靠性不高。

2、多執行緒模型:為瞭解決單執行緒模型存在的一些問題,演化而來的Reactor執行緒模型。

多執行緒模型的特點:

  • 有專門一個Acceptor執行緒用於監聽服務端,接收客戶端的TCP連線請求;

  • 網路IO的讀寫操作由一個NIO執行緒池負責,執行緒池可以採用標準的JDK執行緒池實現,包含一個任務佇列和N個可用的執行緒,由這些NIO執行緒負責訊息的讀取、解碼、編碼和傳送;

  • 一個NIO執行緒可以同時處理多條鏈路,但是一個鏈路只能對應一個NIO執行緒,防止發生併發操作問題。

在絕大多數場景下,Reactor多執行緒模型都可以滿足效能需求;但是,在極特殊應用場景中,一個NIO執行緒負責監聽和處理所有的客戶端連線可能會存在效能問題。例如百萬客戶端併發連線,或者服務端需要對客戶端的握手訊息進行安全認證,認證本身非常損耗效能。在這類場景下,單獨一個Acceptor執行緒可能會存在效能不足問題,為瞭解決效能問題,產生了第三種Reactor執行緒模型-主從Reactor多執行緒模型。

3、主從多執行緒模型:採用多個reactor,每個reactor都在自己單獨的執行緒裡執行。如果是多核,則可以同時響應多個客戶端的請求,一旦鏈路建立成功就將鏈路註冊到負責I/O讀寫的SubReactor執行緒池上。

事實上,Netty的執行緒模型並非固定不變,在啟動輔助類中建立不同的EventLoopGroup實體並透過適當的引數配置,就可以支援上述三種Reactor執行緒模型。正是因為Netty對Reactor執行緒模型的支援提供了靈活的定製能力,所以可以滿足不同業務場景的效能需求。

示例程式碼

以下是server和client的示例程式碼,其中使用的是 Netty 4.x,先看看如何實現,後續會針對各個模組進行深入分析。

server 程式碼實現

  1. public class EchoServer {

  2.    private final int port;

  3.    public EchoServer(int port) {

  4.        this.port = port;

  5.    }

  6.    public void run() throws Exception {

  7.        // Configure the server.

  8.        EventLoopGroup bossGroup = new NioEventLoopGroup();  // (1)

  9.        EventLoopGroup workerGroup = new NioEventLoopGroup();  

  10.        try {

  11.            ServerBootstrap b = new ServerBootstrap(); // (2)

  12.            b.group(bossGroup, workerGroup)

  13.             .channel(NioServerSocketChannel.class) // (3)

  14.             .option(ChannelOption.SO_BACKLOG, 100)

  15.             .handler(new LoggingHandler(LogLevel.INFO))

  16.             .childHandler(new ChannelInitializer<SocketChannel>() { // (4)

  17.                 @Override

  18.                 public void initChannel(SocketChannel ch) throws Exception {

  19.                     ch.pipeline().addLast(

  20.                             //new LoggingHandler(LogLevel.INFO),

  21.                             new EchoServerHandler());

  22.                 }

  23.             });

  24.            // Start the server.

  25.            ChannelFuture f = b.bind(port).sync(); // (5)

  26.            // Wait until the server socket is closed.

  27.            f.channel().closeFuture().sync();

  28.        } finally {

  29.            // Shut down all event loops to terminate all threads.

  30.            bossGroup.shutdownGracefully();

  31.            workerGroup.shutdownGracefully();

  32.        }

  33.    }

  34.    public static void main(String[] args) throws Exception {

  35.        int port;

  36.        if (args.length > 0) {

  37.            port = Integer.parseInt(args[0]);

  38.        } else {

  39.            port = 8080;

  40.        }

  41.        new EchoServer(port).run();

  42.    }

  43. }

EchoServerHandler 實現

  1. public class EchoServerHandler extends ChannelInboundHandlerAdapter {  

  2.    private static final Logger logger = Logger.getLogger(  

  3.            EchoServerHandler.class.getName());  

  4.    @Override  

  5.    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {  

  6.        ctx.write(msg);  

  7.    }  

  8.    @Override  

  9.    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {  

  10.        ctx.flush();  

  11.    }  

  12.    @Override  

  13.    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {  

  14.        // Close the connection when an exception is raised.  

  15.        logger.log(Level.WARNING, "Unexpected exception from downstream.", cause);  

  16.        ctx.close();  

  17.    }  

  18. }  

1、NioEventLoopGroup 是用來處理I/O操作的執行緒池,Netty對 EventLoopGroup 介面針對不同的傳輸協議提供了不同的實現。在本例子中,需要實體化兩個NioEventLoopGroup,通常第一個稱為“boss”,用來accept客戶端連線,另一個稱為“worker”,處理客戶端資料的讀寫操作。 

2、ServerBootstrap 是啟動服務的輔助類,有關socket的引數可以透過ServerBootstrap進行設定。 

3、這裡指定NioServerSocketChannel類初始化channel用來接受客戶端請求。 

4、通常會為新SocketChannel透過新增一些handler,來設定ChannelPipeline。ChannelInitializer 是一個特殊的handler,其中initChannel方法可以為SocketChannel 的pipeline新增指定handler。 

5、透過系結埠8080,就可以對外提供服務了。

client 程式碼實現

  1. public class EchoClient {  

  2.   

  3.     private final String host;  

  4.     private final int port;  

  5.     private final int firstMessageSize;  

  6.   

  7.     public EchoClient(String host, int port, int firstMessageSize) {  

  8.         this.host = host;  

  9.         this.port = port;  

  10.         this.firstMessageSize = firstMessageSize;  

  11.     }  

  12.   

  13.     public void run() throws Exception {  

  14.         // Configure the client.  

  15.         EventLoopGroup group = new NioEventLoopGroup();  

  16.         try {  

  17.             Bootstrap b = new Bootstrap();  

  18.             b.group(group)  

  19.              .channel(NioSocketChannel.class)  

  20.              .option(ChannelOption.TCP_NODELAY, true)  

  21.              .handler(new ChannelInitializer<SocketChannel>() {  

  22.                  @Override  

  23.                  public void initChannel(SocketChannel ch) throws Exception {  

  24.                      ch.pipeline().addLast(  

  25.                              //new LoggingHandler(LogLevel.INFO),  

  26.                              new EchoClientHandler(firstMessageSize));  

  27.                  }  

  28.              });  

  29.   

  30.             // Start the client.  

  31.             ChannelFuture f = b.connect(host, port).sync();  

  32.   

  33.             // Wait until the connection is closed.  

  34.             f.channel().closeFuture().sync();  

  35.         } finally {  

  36.             // Shut down the event loop to terminate all threads.  

  37.             group.shutdownGracefully();  

  38.         }  

  39.     }  

  40.   

  41.     public static void main(String[] args) throws Exception {  

  42.         final String host = args[0];  

  43.         final int port = Integer.parseInt(args[1]);  

  44.         final int firstMessageSize;  

  45.         if (args.length == 3) {  

  46.             firstMessageSize = Integer.parseInt(args[2]);  

  47.         } else {  

  48.             firstMessageSize = 256;  

  49.         }  

  50.   

  51.         new EchoClient(host, port, firstMessageSize).run();  

  52.     }  

  53. }  

EchoClientHandler 實現

  1. public class EchoClientHandler extends ChannelInboundHandlerAdapter {  

  2.   

  3.     private static final Logger logger = Logger.getLogger(  

  4.             EchoClientHandler.class.getName());  

  5.   

  6.     private final ByteBuf firstMessage;  

  7.   

  8.     /** 

  9.      * Creates a client-side handler. 

  10.      */  

  11.     public EchoClientHandler(int firstMessageSize) {  

  12.         if (firstMessageSize <= 0) {  

  13.             throw new IllegalArgumentException("firstMessageSize: " + firstMessageSize);  

  14.         }  

  15.         firstMessage = Unpooled.buffer(firstMessageSize);  

  16.         for (int i = 0; i < firstMessage.capacity(); i ++) {  

  17.             firstMessage.writeByte((byte) i);  

  18.         }  

  19.     }  

  20.   

  21.     @Override  

  22.     public void channelActive(ChannelHandlerContext ctx) {  

  23.         ctx.writeAndFlush(firstMessage);  

  24.     }  

  25.   

  26.     @Override  

  27.     public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {  

  28.         ctx.write(msg);  

  29.     }  

  30.   

  31.     @Override  

  32.     public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {  

  33.        ctx.flush();  

  34.     }  

  35.   

  36.     @Override  

  37.     public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {  

  38.         // Close the connection when an exception is raised.  

  39.         logger.log(Level.WARNING, "Unexpected exception from downstream.", cause);  

  40.         ctx.close();  

  41.     }  

贊(0)

分享創造快樂

© 2024 知識星球   網站地圖