歡迎光臨
每天分享高質量文章

【Netty 專欄】Netty原始碼分析之ChannelPipeline

點選上方“芋道原始碼”,選擇“置頂公眾號”

技術文章第一時間送達!

原始碼精品專欄

 

本章節分析Netty中的ChannelPipeline模組。

每個channel內部都會持有一個ChannelPipeline物件pipeline.
pipeline預設實現DefaultChannelPipeline內部維護了一個DefaultChannelHandlerContext連結串列。

img

ChannelPipeline

當channel完成register、active、read等操作時,會觸發pipeline的相應方法。
1、當channel註冊到selector時,觸發pipeline的fireChannelRegistered方法。
2、當channel的socket系結完成時,觸發pipeline的fireChannelActive方法。
3、當有客戶端請求時,觸發pipeline的fireChannelRead方法。
4、當本次客戶端請求,pipeline執行完fireChannelRead,觸發pipeline的fireChannelReadComplete方法。

接下去看看pipeline是如何組織並執行handler對應的方法。

DefaultChannelPipeline

其中DefaultChannelHandlerContext儲存了當前handler的背景關係,如channel、pipeline等資訊,預設實現了head和tail。

class DefaultChannelPipeline implements ChannelPipeline {
   final Channel channel; // pipeline所屬的channel
   //head和tail都是handler背景關係
   final DefaultChannelHandlerContext head;
   final DefaultChannelHandlerContext tail;
   ...
   public DefaultChannelPipeline(AbstractChannel channel) {
       if (channel == null) {
           throw new NullPointerException("channel");
       }
       this.channel = channel;
       tail = new TailContext(this);
       head = new HeadContext(this);
       head.next = tail;
       tail.prev = head;
   }  
}

1、TailContext實現了ChannelOutboundHandler介面。
2、HeadContext實現了ChannelInboundHandler介面。
3、head和tail形成了一個連結串列。

對於Inbound的操作,當channel註冊到selector時,觸發pipeline的fireChannelRegistered,從head開始遍歷,找到實現了ChannelInboundHandler介面的handler,並執行其fireChannelRegistered方法。

@Override
public ChannelPipeline fireChannelRegistered() {
   head.fireChannelRegistered();
   return this;
}
@Override
public ChannelHandlerContext fireChannelRegistered() {
   final DefaultChannelHandlerContext next = findContextInbound();
   EventExecutor executor = next.executor();
   if (executor.inEventLoop()) {
       next.invokeChannelRegistered();
   } else {
       executor.execute(new Runnable() {
           @Override
           public void run() {
               next.invokeChannelRegistered();
           }
       });
   }
   return this;
}
private DefaultChannelHandlerContext findContextInbound() {
   DefaultChannelHandlerContext ctx = this;
   do {
       ctx = ctx.next;
   } while (!(ctx.handler() instanceof ChannelInboundHandler));
   return ctx;
}
private void invokeChannelRegistered() {
   try {
       ((ChannelInboundHandler) handler()).channelRegistered(this);
   } catch (Throwable t) {
       notifyHandlerException(t);
   }
}

假如我們透過pipeline的addLast方法新增一個inboundHandler實現。

public class ClientHandler extends ChannelInboundHandlerAdapter {
   @Override  
   public void channelRegistered(ChannelHandlerContext ctx)  
           throws Exception
{  
       super.channelRegistered(ctx);  
       System.out.println(" ClientHandler  registered channel ");  
   }  
}  

當channel註冊完成時會觸發pipeline的channelRegistered方法,從head開始遍歷,找到ClientHandler,並執行channelRegistered方法。

對於Outbound的操作,則從tail向前遍歷,找到實現ChannelOutboundHandler介面的handler,具體實現和Inbound一樣。

服務啟動過程中,ServerBootstrap在init方法中,會給ServerSocketChannel的pipeline新增ChannelInitializer物件,其中ChannelInitializer繼承ChannelInboundHandlerAdapter,並實現了ChannelInboundHandler介面,所以當ServerSocketChannel註冊到selector之後,會觸發其channelRegistered方法。

public final void channelRegistered(ChannelHandlerContext ctx) throws Exception {
   initChannel((C) ctx.channel());
   ctx.pipeline().remove(this);
   ctx.fireChannelRegistered();
}
public void initChannel(Channel ch) throws Exception {
   ChannelPipeline pipeline = ch.pipeline();
   ChannelHandler handler = handler();
   if (handler != null) {
       pipeline.addLast(handler);
   }
   pipeline.addLast(new ServerBootstrapAcceptor(
           currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}

在initChannel實現中,新增ServerBootstrapAcceptor實體到pipeline中。

ServerBootstrapAcceptor繼承自ChannelInboundHandlerAdapter,負責把接收到的客戶端socketChannel註冊到childGroup中,由childGroup中的eventLoop負責資料處理。

public void channelRead(ChannelHandlerContext ctx, Object msg) {
   final Channel child = (Channel) msg;
   child.pipeline().addLast(childHandler);
   for (Entry, Object> e: childOptions) {
       try {
           if (!child.config().setOption((ChannelOption) e.getKey(), e.getValue())) {
               logger.warn("Unknown channel option: " + e);
           }
       } catch (Throwable t) {
           logger.warn("Failed to set a channel option: " + child, t);
       }
   }
   for (Entry, Object> e: childAttrs) {
       child.attr((AttributeKey) e.getKey()).set(e.getValue());
   }
   try {
       childGroup.register(child).addListener(new ChannelFutureListener() {
           @Override
           public void operationComplete(ChannelFuture future) throws Exception {
               if (!future.isSuccess()) {
                   forceClose(child, future.cause());
               }
           }
       });
   } catch (Throwable t) {
       forceClose(child, t);
   }
}



知識星球

目前在知識星球(https://t.zsxq.com/2VbiaEu)更新瞭如下 Dubbo 原始碼解析如下:

01. 除錯環境搭建
02. 專案結構一覽
03. API 配置(一)之應用
04. API 配置(二)之服務提供者
05. API 配置(三)之服務消費者
06. 屬性配置
07. XML 配置
08. 核心流程一覽

09. 拓展機制 SPI

10. 執行緒池


一共 60 篇++

贊(0)

分享創造快樂