歡迎光臨
每天分享高質量文章

【Netty 專欄】Netty原始碼分析之服務啟動

點選上方“芋道原始碼”,選擇“置頂公眾號”

技術文章第一時間送達!

原始碼精品專欄

 

本文主要分析Netty服務端的啟動過程。

Netty是基於Nio實現的,所以也離不開selector、serverSocketChannel、socketChannel和selectKey等,只不過Netty把這些實現都封裝在了底層。

從示例可以看出,一切從ServerBootstrap開始。

ServerBootstrap實體中需要兩個NioEventLoopGroup實體,按照職責劃分成boss和work,有著不同的分工:
1、boss負責請求的accept
2、work負責請求的read、write

NioEventLoopGroup

NioEventLoopGroup主要管理eventLoop的生命週期。
eventLoop是什麼?姑且把它看成是內部的一個處理執行緒,數量預設是處理器個數的兩倍。

img

NioEventLoopGroup構造方法:

public NioEventLoopGroup() {  
   this(0);  
}  
public NioEventLoopGroup(int nThreads) {  
   this(nThreads, null);
}  
public NioEventLoopGroup(int nThreads, ThreadFactory threadFactory) {  
   this(nThreads, threadFactory, SelectorProvider.provider());  
}  
public NioEventLoopGroup(  
           int nThreads, ThreadFactory threadFactory, final SelectorProvider selectorProvider)
{  
   super(nThreads, threadFactory, selectorProvider);  
}  

MultithreadEventLoopGroup是NioEventLoopGroup的父類,構造方法:

protected MultithreadEventLoopGroup(int nThreads, ThreadFactory threadFactory, Object... args) {  
   super(nThreads == 0? DEFAULT_EVENT_LOOP_THREADS : nThreads, threadFactory, args);  
}  

其中 DEFAULT_EVENT_LOOP_THREADS 為處理器數量的兩倍。

MultithreadEventExecutorGroup是核心,管理eventLoop的生命週期,先看看其中幾個變數。
1、children:EventExecutor陣列,儲存eventLoop。
2、chooser:從children中選取一個eventLoop的策略。

構造方法:

protected MultithreadEventExecutorGroup(int nThreads, ThreadFactory threadFactory, Object... args) {
   if (nThreads <= 0) {
       throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
   }
   if (threadFactory == null) {
       threadFactory = newDefaultThreadFactory();
   }
   children = new SingleThreadEventExecutor[nThreads];
   if (isPowerOfTwo(children.length)) {
       chooser = new PowerOfTwoEventExecutorChooser();
   } else {
       chooser = new GenericEventExecutorChooser();
   }
   for (int i = 0; i < nThreads; i ++) {
       boolean success = false;
       try {
           children[i] = newChild(threadFactory, args);
           success = true;
       } catch (Exception e) {
           // TODO: Think about if this is a good exception type
           throw new IllegalStateException("failed to create a child event loop", e);
       } finally {
           if (!success) {
               for (int j = 0; j < i; j ++) {
                   children[j].shutdownGracefully();
               }
               for (int j = 0; j < i; j ++) {
                   EventExecutor e = children[j];
                   try {
                       while (!e.isTerminated()) {
                           e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
                       }
                   } catch (InterruptedException interrupted) {
                       Thread.currentThread().interrupt();
                       break;
                   }
               }
           }
       }
   }
   final FutureListener terminationListener = new FutureListener() {
       @Override
       public void operationComplete(Future future) throws Exception {
           if (terminatedChildren.incrementAndGet() == children.length) {
               terminationFuture.setSuccess(null);
           }
       }
   };
   for (EventExecutor e: children) {
       e.terminationFuture().addListener(terminationListener);
   }
}
protected EventExecutor newChild(  
           ThreadFactory threadFactory, Object... args)
throws Exception
{  
     return new NioEventLoop(this, threadFactory, (SelectorProvider) args[0]);  
}  

1、根據陣列的大小,採用不同策略初始化chooser,如果大小為2的冪次方,則採用PowerOfTwoEventExecutorChooser,否則使用GenericEventExecutorChooser。

其中判斷一個數是否是2的冪次方的方法,覺得很贊。

private static boolean isPowerOfTwo(int val) {
     return (val & -val) == val;
}

2、newChild方法多載,初始化EventExecutor時,實際執行的是NioEventLoopGroup中的newChild方法,所以children元素的實際型別為NioEventLoop。

接下去看看NioEventLoop類。

NioEventLoop

每個eventLoop會維護一個selector和taskQueue,負責處理客戶端請求和內部任務,如ServerSocketChannel註冊和ServerSocket系結等。

img

NioEventLoop構造方法:

 NioEventLoop(NioEventLoopGroup parent, ThreadFactory threadFactory, SelectorProvider selectorProvider) {  
     super(parent, threadFactory, false);  
     if (selectorProvider == null) {  
         throw new NullPointerException("selectorProvider");  
     }  
     provider = selectorProvider;  
     selector = openSelector();  
}  

當看到 selector = openSelector() 時,有沒有覺得親切了許多,這裡先不管 selector,看看SingleThreadEventLoop類。

SingleThreadEventLoop是NioEventLoop的父類,構造方法:

protected SingleThreadEventLoop(EventLoopGroup parent, ThreadFactory threadFactory, boolean addTaskWakesUp) {
   super(parent, threadFactory, addTaskWakesUp);
}

啥事都沒做…

繼續看SingleThreadEventLoop的父類SingleThreadEventExecutor

從類名上可以看出,這是一個只有一個執行緒的執行緒池, 先看看其中的幾個變數:
1、state:執行緒池當前的狀態
2、taskQueue:存放任務的佇列
3、thread:執行緒池維護的唯一執行緒
4、scheduledTaskQueue:定義在其父類AbstractScheduledEventExecutor中,用以儲存延遲執行的任務。

構造方法:

protected SingleThreadEventExecutor(EventExecutorGroup parent, ThreadFactory threadFactory, boolean addTaskWakesUp) {
   if (threadFactory == null) {
       throw new NullPointerException("threadFactory");
   }
   this.parent = parent;
   this.addTaskWakesUp = addTaskWakesUp;
   thread = threadFactory.newThread(new Runnable() {
       @Override
       public void run() {
           boolean success = false;
           updateLastExecutionTime();
           try {
               SingleThreadEventExecutor.this.run();
               success = true;
           } catch (Throwable t) {
               logger.warn("Unexpected exception from an event executor: ", t);
           } finally {
               for (;;) {
                   int oldState = STATE_UPDATER.get(SingleThreadEventExecutor.this);
                   if (oldState >= ST_SHUTTING_DOWN || STATE_UPDATER.compareAndSet(
                           SingleThreadEventExecutor.this, oldState, ST_SHUTTING_DOWN)) {
                       break;
                   }
               }
               // Check if confirmShutdown() was called at the end of the loop.
               if (success && gracefulShutdownStartTime == 0) {
                   logger.error(
                           "Buggy " + EventExecutor.class.getSimpleName() + " implementation; " +
                           SingleThreadEventExecutor.class.getSimpleName() + ".confirmShutdown() must be called " +
                           "before run() implementation terminates.");
               }
               try {
                   // Run all remaining tasks and shutdown hooks.
                   for (;;) {
                       if (confirmShutdown()) {
                           break;
                       }
                   }
               } finally {
                   try {
                       cleanup();
                   } finally {
                       STATE_UPDATER.set(SingleThreadEventExecutor.this, ST_TERMINATED);
                       threadLock.release();
                       if (!taskQueue.isEmpty()) {
                           logger.warn(
                                   "An event executor terminated with " +
                                   "non-empty task queue (" + taskQueue.size() + ')');
                       }
                       terminationFuture.setSuccess(null);
                   }
               }
           }
       }
   });
   threadProperties = new DefaultThreadProperties(thread);
   taskQueue = newTaskQueue();
}

程式碼很長,內容很簡單:
1、初始化一個執行緒,併在執行緒內部執行NioEventLoop類的run方法,當然這個執行緒不會立刻執行。
2、使用LinkedBlockingQueue類初始化taskQueue。

到目前為止,相關的處理執行緒已經初始化完成。

ServerBootstrap

透過serverBootstrap.bind(port)啟動服務,過程如下:

/**
* Create a new {@link Channel} and bind it.
*/

public ChannelFuture bind() {
   validate();
   SocketAddress localAddress = this.localAddress;
   if (localAddress == null) {
      throw new IllegalStateException("localAddress not set");
   }
   return doBind(localAddress);
}
img

doBind實現如下

private ChannelFuture doBind(final SocketAddress localAddress) {
   final ChannelFuture regFuture = initAndRegister();
   final Channel channel = regFuture.channel();
   if (regFuture.cause() != null) {
       return regFuture;
   }
   if (regFuture.isDone()) {
       // At this point we know that the registration was complete and successful.
       ChannelPromise promise = channel.newPromise();
       doBind0(regFuture, channel, localAddress, promise);
       return promise;
   } else {
       // Registration future is almost always fulfilled already, but just in case it's not.
       final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
       regFuture.addListener(new ChannelFutureListener() {
           @Override
           public void operationComplete(ChannelFuture future) throws Exception {
               Throwable cause = future.cause();
               if (cause != null) {
                   // Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
                   // IllegalStateException once we try to access the EventLoop of the Channel.
                   promise.setFailure(cause);
               } else {
                   // Registration was successful, so set the correct executor to use.
                   // See https://github.com/netty/netty/issues/2586
                   promise.executor = channel.eventLoop();
               }
               doBind0(regFuture, channel, localAddress, promise);
           }
       });
       return promise;
   }
}

1、方法initAndRegister傳回一個ChannelFuture實體regFuture,透過regFuture可以判斷initAndRegister執行結果。
2、如果regFuture.isDone()為true,說明initAndRegister已經執行完,則直接執行doBind0進行socket系結。
3、否則regFuture新增一個ChannelFutureListener監聽,當initAndRegister執行完成時,呼叫operationComplete方法並執行doBind0進行socket系結。

所以只有當initAndRegister操作結束之後才能進行bind操作。

initAndRegister實現

final ChannelFuture initAndRegister() {
   final Channel channel = channelFactory().newChannel();
   try {
       init(channel);
   } catch (Throwable t) {
       channel.unsafe().closeForcibly();
       // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
       return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
   }
   ChannelFuture regFuture = group().register(channel);
   if (regFuture.cause() != null) {
       if (channel.isRegistered()) {
           channel.close();
       } else {
           channel.unsafe().closeForcibly();
       }
   }
   return regFuture;
}

1、負責建立服務端的NioServerSocketChannel實體
2、為NioServerSocketChannel的pipeline新增handler
3、註冊NioServerSocketChannel到selector

大部分的過程和NIO中類似。

NioServerSocketChannel

對Nio的ServerSocketChannel和SelectionKey進行了封裝。

構造方法:

public NioServerSocketChannel() {
   this(newSocket(DEFAULT_SELECTOR_PROVIDER));
}
private static ServerSocketChannel newSocket(SelectorProvider provider) {
   try {
       return provider.openServerSocketChannel();
   } catch (IOException e) {
       throw new ChannelException(
               "Failed to open a server socket.", e);
   }
}
public NioServerSocketChannel(ServerSocketChannel channel) {
   super(null, channel, SelectionKey.OP_ACCEPT);
   config = new NioServerSocketChannelConfig(this, javaChannel().socket());
}

1、方法newSocket利用 provider.openServerSocketChannel() 生成Nio中的ServerSocketChannel物件。
2、設定SelectionKey.OP_ACCEPT事件。

AbstractNioMessageChannel構造方法

protected  AbstractNioMessageChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
   super(parent, ch, readInterestOp);
}

啥也沒做…

AbstractNioChannel構造方法

protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
   super(parent);
   this.ch = ch;
   this.readInterestOp = readInterestOp;
   try {
       ch.configureBlocking(false);
   } catch (IOException e) {
       try {
           ch.close();
       } catch (IOException e2) {
           if (logger.isWarnEnabled()) {
               logger.warn(
                       "Failed to close a partially initialized socket.", e2);
           }
       }
       throw new ChannelException("Failed to enter non-blocking mode.", e);
   }
}

設定當前ServerSocketChannel為非阻塞通道。

AbstractChannel構造方法

protected AbstractChannel(Channel parent) {
   this.parent = parent;
   unsafe = newUnsafe();
   pipeline = new DefaultChannelPipeline(this);
}

1、初始化unsafe,這裡的Unsafe並非是jdk中底層Unsafe類,用來負責底層的connect、register、read和write等操作。
2、初始化pipeline,每個Channel都有自己的pipeline,當有請求事件發生時,pipeline負責呼叫相應的hander進行處理。

unsafe和pipeline的具體實現原理會在後續進行分析。


回到ServerBootstrap的init(Channel channel)方法,新增handler到channel的pipeline中。

void init(Channel channel) throws Exception {
   final Map, Object> options = options();
   synchronized (options) {
       channel.config().setOptions(options);
   }
   final Map, Object> attrs = attrs();
   synchronized (attrs) {
       for (Entry, Object> e: attrs.entrySet()) {
           @SuppressWarnings("unchecked")
           AttributeKey key = (AttributeKey) e.getKey();
           channel.attr(key).set(e.getValue());
       }
   }
   ChannelPipeline p = channel.pipeline();
   final EventLoopGroup currentChildGroup = childGroup;
   final ChannelHandler currentChildHandler = childHandler;
   final Entry, Object>[] currentChildOptions;
   final Entry, Object>[] currentChildAttrs;
   synchronized (childOptions) {
       currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size()));
   }
   synchronized (childAttrs) {
       currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size()));
   }
   p.addLast(new ChannelInitializer() {
       @Override
       public void initChannel(Channel ch) throws Exception {
           ChannelPipeline pipeline = ch.pipeline();
           ChannelHandler handler = handler();
           if (handler != null) {
               pipeline.addLast(handler);
           }
           pipeline.addLast(new ServerBootstrapAcceptor(
                   currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
       }
   });
}

1、設定channel的options和attrs。
2、在pipeline中新增一個ChannelInitializer物件。


init執行完,需要把當前channel註冊到EventLoopGroup。
其實最終目的是為了實現Nio中把ServerSocket註冊到selector上,這樣就可以實現client請求的監聽了。看看Netty中是如何實現的:

public ChannelFuture register(Channel channel, ChannelPromise promise) {
   return next().register(channel, promise);
}
public EventLoop next() {
   return (EventLoop) super.next();
}
public EventExecutor next() {
   return children[Math.abs(childIndex.getAndIncrement() % children.length)];
}

因為EventLoopGroup中維護了多個eventLoop,next方法會呼叫chooser策略找到下一個eventLoop,並執行eventLoop的register方法進行註冊。

public ChannelFuture register(final Channel channel, final ChannelPromise promise) {
   ...
   channel.unsafe().register(this, promise);
   return promise;
}

channel.unsafe()是什麼?
NioServerSocketChannel初始化時,會建立一個NioMessageUnsafe實體,用於實現底層的register、read、write等操作。

eventLoop.execute(new Runnable() {
  @Override
  public void run() {
     register0(promise);
  }
});
private void register0(ChannelPromise promise) {
   try {
       if (!ensureOpen(promise)) {
           return;
       }
       Runnable postRegisterTask = doRegister();
       registered = true;
       promise.setSuccess();
       pipeline.fireChannelRegistered();
       if (postRegisterTask != null) {
           postRegisterTask.run();
       }
       if (isActive()) {
           pipeline.fireChannelActive();
       }
   } catch (Throwable t) {
       // Close the channel directly to avoid FD leak.
       closeForcibly();
       if (!promise.tryFailure(t)) {
       }
       closeFuture.setClosed();
   }
}
public void execute(Runnable task) {
   if (task == null) {
       throw new NullPointerException("task");
   }
   boolean inEventLoop = inEventLoop();
   if (inEventLoop) {
       addTask(task);
   } else {
       startThread();
       addTask(task);
       if (isShutdown() && removeTask(task)) {
           reject();
       }
   }
   if (!addTaskWakesUp) {
       wakeup(inEventLoop);
   }
}

1、register0方法提交到eventLoop執行緒池中執行,這個時候會啟動eventLoop中的執行緒。
2、方法doRegister()才是最終Nio中的註冊方法,方法javaChannel()獲取ServerSocketChannel。

protected Runnable doRegister() throws Exception {
   boolean selected = false;
   for (;;) {
       try {
           selectionKey = javaChannel().register(eventLoop().selector, 0, this);
           return null;
       } catch (CancelledKeyException e) {
           if (!selected) {
               // Force the Selector to select now  as the "canceled" SelectionKey may still be
               // cached and not removed because no Select.select(..) operation was called yet.
               eventLoop().selectNow();
               selected = true;
           } else {
               // We forced a select operation on the selector before but the SelectionKey is still cached
               // for whatever reason. JDK bug ?
               throw e;
           }
       }
   }
}

ServerSocketChannel註冊完之後,通知pipeline執行fireChannelRegistered方法,pipeline中維護了handler連結串列,透過遍歷連結串列,執行InBound型別handler的channelRegistered方法,最終執行init中新增的ChannelInitializer handler。

public final void channelRegistered(ChannelHandlerContext ctx)
       throws Exception
{
   boolean removed = false;
   boolean success = false;
   try {
       initChannel((C) ctx.channel());
       ctx.pipeline().remove(this);
       removed = true;
       ctx.fireChannelRegistered();
       success = true;
   } catch (Throwable t) {
       logger.warn("Failed to initialize a channel. Closing: " + ctx.channel(), t);
   } finally {
       if (!removed) {
           ctx.pipeline().remove(this);
       }
       if (!success) {
           ctx.close();
       }
   }
}

1、initChannel方法最終把ServerBootstrapAcceptor新增到ServerSocketChannel的pipeline,負責accept客戶端請求。
2、在pipeline中刪除對應的handler。
3、觸發fireChannelRegistered方法,可以自定義handler的channelRegistered方法。

到目前為止,ServerSocketChannel完成了初始化並註冊到seletor上,啟動執行緒執行selector.select()方法準備接受客戶端請求。

細心的同學已經發現,ServerSocketChannel的socket還未系結到指定埠,那麼這一塊Netty是如何實現的?
Netty把註冊操作放到eventLoop中執行。

private static void doBind0(
       final ChannelFuture regFuture,
       final Channel channel,
       final SocketAddress localAddress,
       final ChannelPromise promise)
{
   channel.eventLoop().execute(new Runnable() {
       @Override
       public void run() {
           if (regFuture.isSuccess()) {
               channel.bind(localAddress, promise)
.addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
           } else {
               promise.setFailure(regFuture.cause());
           }
       }
   });
}
public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
   return pipeline.bind(localAddress, promise);
}
@Override
public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
   return tail.bind(localAddress, promise);
}
@Override
public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
   if (localAddress == null) {
       throw new NullPointerException("localAddress");
   }
   validatePromise(promise, false);
   return findContextOutbound().invokeBind(localAddress, promise);
}
private ChannelFuture invokeBind(final SocketAddress localAddress, final ChannelPromise promise) {
   EventExecutor executor = executor();
   if (executor.inEventLoop()) {
       invokeBind0(localAddress, promise);
   } else {
       executor.execute(new Runnable() {
           @Override
           public void run() {
               invokeBind0(localAddress, promise);
           }
       });
   }
   return promise;
}
private void invokeBind0(SocketAddress localAddress, ChannelPromise promise) {
   try {
       ((ChannelOutboundHandler) handler()).bind(this, localAddress, promise);
   } catch (Throwable t) {
       notifyOutboundHandlerException(t, promise);
   }
}
@Override
public void bind(
       ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise)

       throws Exception
{
   unsafe.bind(localAddress, promise);
}

最終由unsafe實現埠的bind操作。

public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
       if (!ensureOpen(promise)) {
           return;
       }
       try {
           boolean wasActive = isActive();
           ...        
           doBind(localAddress);
           promise.setSuccess();
           if (!wasActive && isActive()) {
               pipeline.fireChannelActive();
           }
       } catch (Throwable t) {
           promise.setFailure(t);
           closeIfClosed();
       }
   }
protected void doBind(SocketAddress localAddress) throws Exception {
   javaChannel().socket().bind(localAddress, config.getBacklog());
}

bind完成後,且ServerSocketChannel也已經註冊完成,則觸發pipeline的fireChannelActive方法,所以在這裡可以自定義fireChannelActive方法,預設執行tail的fireChannelActive。

 @Override
public ChannelPipeline fireChannelActive() {
   head.fireChannelActive();
   if (channel.config().isAutoRead()) {
       channel.read();
   }
   return this;
}

channel.read()方法會觸發pipeline的行為:

  @Override
public Channel read() {
   pipeline.read();
   return this;
}
@Override
public ChannelPipeline read() {
   tail.read();
   return this;
}
@Override
public ChannelHandlerContext read() {
   findContextOutbound().invokeRead();
   return this;
}
private void invokeRead() {
   EventExecutor executor = executor();
   if (executor.inEventLoop()) {
       invokeRead0();
   } else {
       Runnable task = invokeRead0Task;
       if (task == null) {
           invokeRead0Task = task = new Runnable() {
               @Override
               public void run() {
                   invokeRead0();
               }
           };
       }
       executor.execute(task);
   }
}
private void invokeRead0() {
   try {
       ((ChannelOutboundHandler) handler()).read(this);
   } catch (Throwable t) {
       notifyHandlerException(t);
   }
}

最終會在pipeline中找到handler執行read方法,預設是head。

至此為止,server已經啟動完成。

目前在知識星球(https://t.zsxq.com/2VbiaEu)更新瞭如下 Dubbo 原始碼解析如下:

01. 除錯環境搭建
02. 專案結構一覽
03. API 配置(一)之應用
04. API 配置(二)之服務提供者
05. API 配置(三)之服務消費者
06. 屬性配置
07. XML 配置
08. 核心流程一覽

一共 60 篇++

贊(0)

分享創造快樂