歡迎光臨
每天分享高質量文章

Java 非阻塞 IO 和異步 IO

(點擊上方公眾號,可快速關註)


來源:JavaDoop ,

javadoop.com/post/nio-and-aio

上一篇文章介紹了 Java NIO 中 Buffer、Channel 和 Selector 的基本操作,主要是一些接口操作,比較簡單。

本文將介紹非阻塞 IO 和異步 IO,也就是大家耳熟能詳的 NIO 和 AIO。很多初學者可能分不清楚異步和非阻塞的區別,只是在各種場合能聽到異步非阻塞這個詞。

本文會先介紹並演示阻塞樣式,然後引入非阻塞樣式來對阻塞樣式進行優化,最後再介紹 JDK7 引入的異步 IO,由於網上關於異步 IO 的介紹相對較少,所以這部分內容我會介紹得具體一些。

希望看完本文,讀者可以對非阻塞 IO 和異步 IO 的迷霧看得更清晰些,或者為初學者解開一絲絲疑惑也是好的。

NIO,JDK1.4,New IO,Non-Blocking IO

NIO.2,JDK7,More New IO,Asynchronous IO,嚴格地說 NIO.2 不僅僅引入了 AIO

阻塞樣式 IO

我們已經介紹過使用 Java NIO 包組成一個簡單的客戶端-服務端網絡通訊所需要的 ServerSocketChannel、SocketChannel 和 Buffer,我們這裡整合一下它們,給出一個完整的可運行的例子:

public class Server {

 

    public static void main(String[] args) throws IOException {

 

        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();

 

        // 監聽 8080 端口進來的 TCP 鏈接

        serverSocketChannel.socket().bind(new InetSocketAddress(8080));

 

        while (true) {

 

            // 這裡會阻塞,直到有一個請求的連接進來

            SocketChannel socketChannel = serverSocketChannel.accept();

 

            // 開啟一個新的執行緒來處理這個請求,然後在 while 迴圈中繼續監聽 8080 端口

            SocketHandler handler = new SocketHandler(socketChannel);

            new Thread(handler).start();

        }

    }

}

這裡看一下新的執行緒需要做什麼,SocketHandler:

public class SocketHandler implements Runnable {

 

    private SocketChannel socketChannel;

 

    public SocketHandler(SocketChannel socketChannel) {

        this.socketChannel = socketChannel;

    }

 

    @Override

    public void run() {

 

        ByteBuffer buffer = ByteBuffer.allocate(1024);

        try {

            // 將請求資料讀入 Buffer 中

            int num;

            while ((num = socketChannel.read(buffer)) > 0) {

                // 讀取 Buffer 內容之前先 flip 一下

                buffer.flip();

 

                // 提取 Buffer 中的資料

                byte[] bytes = new byte[num];

                buffer.get(bytes);

 

                String re = new String(bytes, “UTF-8”);

                System.out.println(“收到請求:” + re);

 

                // 回應客戶端

                ByteBuffer writeBuffer = ByteBuffer.wrap((“我已經收到你的請求,你的請求內容是:” + re).getBytes());

                socketChannel.write(writeBuffer);

 

                buffer.flip();

            }

        } catch (IOException e) {

            IOUtils.closeQuietly(socketChannel);

        }

    }

}

最後,貼一下客戶端 SocketChannel 的使用,客戶端比較簡單:

public class SocketChannelTest {

    public static void main(String[] args) throws IOException {

        SocketChannel socketChannel = SocketChannel.open();

        socketChannel.connect(new InetSocketAddress(“localhost”, 8080));

 

        // 發送請求

        ByteBuffer buffer = ByteBuffer.wrap(“1234567890”.getBytes());

        socketChannel.write(buffer);

 

        // 讀取響應

        ByteBuffer readBuffer = ByteBuffer.allocate(1024);

        int num;

        if ((num = socketChannel.read(readBuffer)) > 0) {

            readBuffer.flip();

 

            byte[] re = new byte[num];

            readBuffer.get(re);

 

            String result = new String(re, “UTF-8”);

            System.out.println(“傳回值: ” + result);

        }

    }

}

上面介紹的阻塞樣式的代碼應該很好理解:來一個新的連接,我們就新開一個執行緒來處理這個連接,之後的操作全部由那個執行緒來完成。

那麼,這個樣式下的性能瓶頸在哪裡呢?

  1. 首先,每次來一個連接都開一個新的執行緒這肯定是不合適的。當活躍連接數在幾十幾百的時候當然是可以這樣做的,但如果活躍連接數是幾萬幾十萬的時候,這麼多執行緒明顯就不行了。每個執行緒都需要一部分記憶體,記憶體會被迅速消耗,同時,執行緒切換的開銷非常大。

  2. 其次,阻塞操作在這裡也是一個問題。首先,accept() 是一個阻塞操作,當 accept() 傳回的時候,代表有一個連接可以使用了,我們這裡是馬上就新建執行緒來處理這個 SocketChannel 了,但是,但是這裡不代表對方就將資料傳輸過來了。所以,SocketChannel#read 方法將阻塞,等待資料,明顯這個等待是不值得的。同理,write 方法也需要等待通道可寫才能執行寫入操作,這邊的阻塞等待也是不值得的。

非阻塞 IO

說完了阻塞樣式的使用及其缺點以後,我們這裡就可以介紹非阻塞 IO 了。

非阻塞 IO 的核心在於使用一個 Selector 來管理多個通道,可以是 SocketChannel,也可以是 ServerSocketChannel,將各個通道註冊到 Selector 上,指定監聽的事件。

之後可以只用一個執行緒來輪詢這個 Selector,看看上面是否有通道是準備好的,當通道準備好可讀或可寫,然後才去開始真正的讀寫,這樣速度就很快了。我們就完全沒有必要給每個通道都起一個執行緒。

NIO 中 Selector 是對底層操作系統實現的一個抽象,管理通道狀態其實都是底層系統實現的,這裡簡單介紹下在不同系統下的實現。

select:上世紀 80 年代就實現了,它支持註冊 FD_SETSIZE(1024) 個 socket,在那個年代肯定是夠用的,不過現在嘛,肯定是不行了。

poll:1997 年,出現了 poll 作為 select 的替代者,最大的區別就是,poll 不再限制 socket 數量。

select 和 poll 都有一個共同的問題,那就是它們都只會告訴你有幾個通道準備好了,但是不會告訴你具體是哪幾個通道。所以,一旦知道有通道準備好以後,自己還是需要進行一次掃描,顯然這個不太好,通道少的時候還行,一旦通道的數量是幾十萬個以上的時候,掃描一次的時間都很可觀了,時間複雜度 O(n)。所以,後來才催生了以下實現。

epoll:2002 年隨 Linux 內核 2.5.44 發佈,epoll 能直接傳回具體的準備好的通道,時間複雜度 O(1)。

除了 Linux 中的 epoll,2000 年 FreeBSD 出現了 Kqueue,還有就是,Solaris 中有 /dev/poll。

前面說了那麼多實現,但是沒有出現 Windows,Windows 平臺的非阻塞 IO 使用 select,我們也不必覺得 Windows 很落後,在 Windows 中 IOCP 提供的異步 IO 是比較強大的。

我們回到 Selector,畢竟 JVM 就是這麼一個屏蔽底層實現的平臺,我們面向 Selector 編程就可以了。

之前在介紹 Selector 的時候已經瞭解過了它的基本用法,這邊來一個可運行的實體代碼,大家不妨看看:

public class SelectorServer {

 

    public static void main(String[] args) throws IOException {

        Selector selector = Selector.open();

 

        ServerSocketChannel server = ServerSocketChannel.open();

        server.socket().bind(new InetSocketAddress(8080));

 

        // 將其註冊到 Selector 中,監聽 OP_ACCEPT 事件

        server.configureBlocking(false);

        server.register(selector, SelectionKey.OP_ACCEPT);

 

        while (true) {

            // 需要不斷地去呼叫 select() 方法獲取最新的準備好的通道

            int readyChannels = selector.select();

            if (readyChannels == 0) {

                continue;

            }

            Set readyKeys = selector.selectedKeys();

            // 遍歷

            Iterator iterator = readyKeys.iterator();

            while (iterator.hasNext()) {

                SelectionKey key = iterator.next();

                iterator.remove();

 

                if (key.isAcceptable()) {

                    // 有已經接受的新的到服務端的連接

                    SocketChannel socketChannel = server.accept();

 

                    // 有新的連接並不代表這個通道就有資料,

                    // 這裡將這個新的 SocketChannel 註冊到 Selector,監聽 OP_READ 事件,等待資料

                    socketChannel.configureBlocking(false);

                    socketChannel.register(selector, SelectionKey.OP_READ);

                } else if (key.isReadable()) {

                    // 有資料可讀

                    // 上面一個 if 分支中註冊了監聽 OP_READ 事件的 SocketChannel

                    SocketChannel socketChannel = (SocketChannel) key.channel();

                    ByteBuffer readBuffer = ByteBuffer.allocate(1024);

                    int num = socketChannel.read(readBuffer);

                    if (num > 0) {

                        // 處理進來的資料…

                        System.out.println(“收到資料:” + new String(readBuffer.array()).trim());

                        socketChannel.register(selector, SelectionKey.OP_WRITE);

                    } else if (num == -1) {

                        // -1 代表連接已經關閉

                        socketChannel.close();

                    }

                }

                else if (key.isWritable()) {

                    // 通道可寫

                    // 給用戶傳回資料的通道可以進行寫操作了

                    SocketChannel socketChannel = (SocketChannel) key.channel();

                    ByteBuffer buffer = ByteBuffer.wrap(“傳回給客戶端的資料…”.getBytes());

                    socketChannel.write(buffer);

 

                    // 重新註冊這個通道,監聽 OP_READ 事件,客戶端還可以繼續發送內容過來

                    socketChannel.register(selector, SelectionKey.OP_READ);

                }

            }

        }

    }

}

至於客戶端,大家可以繼續使用上一節介紹阻塞樣式時的客戶端進行測試。

NIO.2 異步 IO

More New IO,或稱 NIO.2,隨 JDK 1.7 發佈,包括了引入異步 IO 接口和 Paths 等檔案訪問接口。

異步這個詞,我想對於絕大多數開發者來說都很熟悉,很多場景下我們都會使用異步。

通常,我們會有一個執行緒池用於執行異步任務,提交任務的執行緒將任務提交到執行緒池就可以立馬傳回,不必等到任務真正完成。如果想要知道任務的執行結果,通常是通過傳遞一個回呼函式的方式,任務結束後去呼叫這個函式。

同樣的原理,Java 中的異步 IO 也是一樣的,都是由一個執行緒池來負責執行任務,然後使用回呼或自己去查詢結果。

大部分開發者都知道為什麼要這麼設計了,這裡再啰嗦一下。異步 IO 主要是為了控制執行緒數量,減少過多的執行緒帶來的記憶體消耗和 CPU 在執行緒調度上的開銷。

在 Unix/Linux 等系統中,JDK 使用了併發包中的執行緒池來管理任務,具體可以查看 AsynchronousChannelGroup 的原始碼。

在 Windows 操作系統中,提供了一個叫做 I/O Completion Ports 的方案,通常簡稱為 IOCP,操作系統負責管理執行緒池,其性能非常優異,所以在 Windows 中 JDK 直接採用了 IOCP 的支持,使用系統支持,把更多的操作信息暴露給操作系統,也使得操作系統能夠對我們的 IO 進行一定程度的優化。

在 Linux 中其實也是有異步 IO 系統實現的,但是限制比較多,性能也一般,所以 JDK 採用了自建執行緒池的方式。

本文還是以實用為主,想要瞭解更多信息請自行查找其他資料,下麵對 Java 異步 IO 進行實踐性的介紹。

總共有三個類需要我們關註,分別是 AsynchronousSocketChannel,AsynchronousServerSocketChannel 和 AsynchronousFileChannel,只不過是在之前介紹的 FileChannel、SocketChannel 和 ServerSocketChannel 的類名上加了個前綴 Asynchronous。

Java 異步 IO 提供了兩種使用方式,分別是傳回 Future 實體和使用回呼函式。

1、傳回 Future 實體

傳回 java.util.concurrent.Future 實體的方式我們應該很熟悉,JDK 執行緒池就是這麼使用的。Future 接口的幾個方法語意在這裡也是通用的,這裡先做簡單介紹。

future.isDone();

判斷操作是否已經完成,包括了正常完成、異常丟擲、取消

future.cancel(true);

取消操作,方式是中斷。引數 true 說的是,即使這個任務正在執行,也會進行中斷。

future.isCancelled();

是否被取消,只有在任務正常結束之前被取消,這個方法才會傳回 true

future.get();

這是我們的老朋友,獲取執行結果,阻塞。

future.get(10, TimeUnit.SECONDS);

如果上面的 get() 方法的阻塞你不滿意,那就設置個超時時間。

2、提供 CompletionHandler 回呼函式

java.nio.channels.CompletionHandler 接口定義:

public interface CompletionHandler {

 

    void completed(V result, A attachment);

 

    void failed(Throwable exc, A attachment);

}

註意,引數上有個 attachment,雖然不常用,我們可以在各個支持的方法中傳遞這個引數值

AsynchronousServerSocketChannel listener = AsynchronousServerSocketChannel.open().bind(null);

 

// accept 方法的第一個引數可以傳遞 attachment

listener.accept(attachment, new CompletionHandler() {

    public void completed(

      AsynchronousSocketChannel client, Object attachment) {

          // 

      }

    public void failed(Throwable exc, Object attachment) {

          // 

      }

});

AsynchronousFileChannel

網上關於 Non-Blocking IO 的介紹文章很多,但是 Asynchronous IO 的文章相對就少得多了,所以我這邊會多介紹一些相關內容。

首先,我們就來關註異步的檔案 IO,前面我們說了,檔案 IO 在所有的操作系統中都不支持非阻塞樣式,但是我們可以對檔案 IO 採用異步的方式來提高性能。

下麵,我會介紹 AsynchronousFileChannel 裡面的一些重要的接口,都很簡單,讀者要是覺得無趣,直接滑到下一個標題就可以了。

實體化:

AsynchronousFileChannel channel = AsynchronousFileChannel.open(Paths.get(“/Users/hongjie/test.txt”));

一旦實體化完成,我們就可以著手準備將資料讀入到 Buffer 中:

ByteBuffer buffer = ByteBuffer.allocate(1024);

Future result = channel.read(buffer, 0);

異步檔案通道的讀操作和寫操作都需要提供一個檔案的開始位置,檔案開始位置為 0

除了使用傳回 Future 實體的方式,也可以採用回呼函式進行操作,接口如下:

public abstract void read(ByteBuffer dst,

                              long position,

                              A attachment,

                              CompletionHandler handler);

順便也貼一下寫操作的兩個版本的接口:

public abstract Future write(ByteBuffer src, long position);

 

public abstract void write(ByteBuffer src,

                               long position,

                               A attachment,

                               CompletionHandler handler);

我們可以看到,AIO 的讀寫主要也還是與 Buffer 打交道,這個與 NIO 是一脈相承的。

另外,還提供了用於將記憶體中的資料刷入到磁盤的方法:

public abstract void force(boolean metaData) throws IOException;

因為我們對檔案的寫操作,操作系統並不會直接針對檔案操作,系統會快取,然後周期性地刷入到磁盤。如果希望將資料及時寫入到磁盤中,以免斷電引發部分資料丟失,可以呼叫此方法。引數如果設置為 true,意味著同時也將檔案屬性信息更新到磁盤。

還有,還提供了對檔案的鎖定功能,我們可以鎖定檔案的部分資料,這樣可以進行排他性的操作。

public abstract Future lock(long position, long size, boolean shared);

position 是要鎖定內容的開始位置,size 指示了要鎖定的區域大小,shared 指示需要的是共享鎖還是排他鎖

當然,也可以使用回呼函式的版本:

public abstract void lock(long position,

                              long size,

                              boolean shared,

                              A attachment,

                              CompletionHandler handler);

檔案鎖定功能上還提供了 tryLock 方法,此方法會快速傳回結果:

public abstract FileLock tryLock(long position, long size, boolean shared)

    throws IOException;

這個方法很簡單,就是嘗試去獲取鎖,如果該區域已被其他執行緒或其他應用鎖住,那麼立刻傳回 null,否則傳回 FileLock 物件。

AsynchronousFileChannel 操作大體上也就以上介紹的這些接口,還是比較簡單的,這裡就少一些廢話早點結束好了。

AsynchronousServerSocketChannel

這個類對應的是非阻塞 IO 的 ServerSocketChannel,大家可以類比下使用方式。

我們就廢話少說,用代碼說事吧:

package com.javadoop.aio;

 

import java.io.IOException;

import java.net.InetSocketAddress;

import java.net.SocketAddress;

import java.nio.ByteBuffer;

import java.nio.channels.AsynchronousServerSocketChannel;

import java.nio.channels.AsynchronousSocketChannel;

import java.nio.channels.CompletionHandler;

 

public class Server {

 

    public static void main(String[] args) throws IOException {

 

          // 實體化,並監聽端口

        AsynchronousServerSocketChannel server =

                AsynchronousServerSocketChannel.open().bind(new InetSocketAddress(8080));

 

        // 自己定義一個 Attachment 類,用於傳遞一些信息

        Attachment att = new Attachment();

        att.setServer(server);

 

        server.accept(att, new CompletionHandler() {

            @Override

            public void completed(AsynchronousSocketChannel client, Attachment att) {

                try {

                    SocketAddress clientAddr = client.getRemoteAddress();

                    System.out.println(“收到新的連接:” + clientAddr);

 

                    // 收到新的連接後,server 應該重新呼叫 accept 方法等待新的連接進來

                    att.getServer().accept(att, this);

 

                    Attachment newAtt = new Attachment();

                    newAtt.setServer(server);

                    newAtt.setClient(client);

                    newAtt.setReadMode(true);

                    newAtt.setBuffer(ByteBuffer.allocate(2048));

 

                    // 這裡也可以繼續使用匿名實現類,不過代碼不好看,所以這裡專門定義一個類

                    client.read(newAtt.getBuffer(), newAtt, new ChannelHandler());

                } catch (IOException ex) {

                    ex.printStackTrace();

                }

            }

 

            @Override

            public void failed(Throwable t, Attachment att) {

                System.out.println(“accept failed”);

            }

        });

        // 為了防止 main 執行緒退出

        try {

            Thread.currentThread().join();

        } catch (InterruptedException e) {

        }

    }

}

看一下 ChannelHandler 類:

package com.javadoop.aio;

 

import java.io.IOException;

import java.nio.ByteBuffer;

import java.nio.channels.CompletionHandler;

import java.nio.charset.Charset;

 

public class ChannelHandler implements CompletionHandler {

 

    @Override

    public void completed(Integer result, Attachment att) {

        if (att.isReadMode()) {

            // 讀取來自客戶端的資料

            ByteBuffer buffer = att.getBuffer();

            buffer.flip();

            byte bytes[] = new byte[buffer.limit()];

            buffer.get(bytes);

            String msg = new String(buffer.array()).toString().trim();

            System.out.println(“收到來自客戶端的資料: ” + msg);

 

            // 響應客戶端請求,傳回資料

            buffer.clear();

            buffer.put(“Response from server!”.getBytes(Charset.forName(“UTF-8”)));

            att.setReadMode(false);

            buffer.flip();

            // 寫資料到客戶端也是異步

            att.getClient().write(buffer, att, this);

        } else {

            // 到這裡,說明往客戶端寫資料也結束了,有以下兩種選擇:

            // 1. 繼續等待客戶端發送新的資料過來

//            att.setReadMode(true);

//            att.getBuffer().clear();

//            att.getClient().read(att.getBuffer(), att, this);

            // 2. 既然服務端已經傳回資料給客戶端,斷開這次的連接

            try {

                att.getClient().close();

            } catch (IOException e) {

            }

        }

    }

 

    @Override

    public void failed(Throwable t, Attachment att) {

        System.out.println(“連接斷開”);

    }

}

順便再貼一下自定義的 Attachment 類:

public class Attachment {

    private AsynchronousServerSocketChannel server;

    private AsynchronousSocketChannel client;

    private boolean isReadMode;

    private ByteBuffer buffer;

    // getter & setter

}

這樣,一個簡單的服務端就寫好了,接下來可以接收客戶端請求了。上面我們用的都是回呼函式的方式,讀者要是感興趣,可以試試寫個使用 Future 的。

AsynchronousSocketChannel

其實,說完上面的 AsynchronousServerSocketChannel,基本上讀者也就知道怎麼使用 AsynchronousSocketChannel 了,和非阻塞 IO 基本類似。

這邊做個簡單演示,這樣讀者就可以配合之前介紹的 Server 進行測試使用了。

package com.javadoop.aio;

 

import java.io.IOException;

import java.net.InetSocketAddress;

import java.nio.ByteBuffer;

import java.nio.channels.AsynchronousSocketChannel;

import java.nio.charset.Charset;

import java.util.concurrent.ExecutionException;

import java.util.concurrent.Future;

 

public class Client {

 

    public static void main(String[] args) throws Exception {

        AsynchronousSocketChannel client = AsynchronousSocketChannel.open();

          // 來個 Future 形式的

        Future > future = client.connect(new InetSocketAddress(8080));

        // 阻塞一下,等待連接成功

        future.get();

 

        Attachment att = new Attachment();

        att.setClient(client);

        att.setReadMode(false);

        att.setBuffer(ByteBuffer.allocate(2048));

        byte[] data = “I am obot!”.getBytes();

        att.getBuffer().put(data);

        att.getBuffer().flip();

 

        // 異步發送資料到服務端

        client.write(att.getBuffer(), att, new ClientChannelHandler());

 

        // 這裡休息一下再退出,給出足夠的時間處理資料

        Thread.sleep(2000);

    }

}

往裡面看下 ClientChannelHandler 類:

package com.javadoop.aio;

 

import java.io.IOException;

import java.nio.ByteBuffer;

import java.nio.channels.CompletionHandler;

import java.nio.charset.Charset;

 

public class ClientChannelHandler implements CompletionHandler {

 

    @Override

    public void completed(Integer result, Attachment att) {

        ByteBuffer buffer = att.getBuffer();

        if (att.isReadMode()) {

            // 讀取來自服務端的資料

            buffer.flip();

            byte[] bytes = new byte[buffer.limit()];

            buffer.get(bytes);

            String msg = new String(bytes, Charset.forName(“UTF-8”));

            System.out.println(“收到來自服務端的響應資料: ” + msg);

 

            // 接下來,有以下兩種選擇:

            // 1. 向服務端發送新的資料

//            att.setReadMode(false);

//            buffer.clear();

//            String newMsg = “new message from client”;

//            byte[] data = newMsg.getBytes(Charset.forName(“UTF-8”));

//            buffer.put(data);

//            buffer.flip();

//            att.getClient().write(buffer, att, this);

            // 2. 關閉連接

            try {

                att.getClient().close();

            } catch (IOException e) {

            }

        } else {

            // 寫操作完成後,會進到這裡

            att.setReadMode(true);

            buffer.clear();

            att.getClient().read(buffer, att, this);

        }

    }

 

    @Override

    public void failed(Throwable t, Attachment att) {

        System.out.println(“服務器無響應”);

    }

}

以上代碼都是可以運行除錯的,如果讀者碰到問題,請在評論區留言。

Asynchronous Channel Groups

為了知識的完整性,有必要對 group 進行介紹,其實也就是介紹 AsynchronousChannelGroup 這個類。之前我們說過,異步 IO 一定存在一個執行緒池,這個執行緒池負責接收任務、處理 IO 事件、回呼等。這個執行緒池就在 group 內部,group 一旦關閉,那麼相應的執行緒池就會關閉。

AsynchronousServerSocketChannels 和 AsynchronousSocketChannels 是屬於 group 的,當我們呼叫 AsynchronousServerSocketChannel 或 AsynchronousSocketChannel 的 open() 方法的時候,相應的 channel 就屬於預設的 group,這個 group 由 JVM 自動構造並管理。

如果我們想要配置這個預設的 group,可以在 JVM 啟動引數中指定以下系統變數:

java.nio.channels.DefaultThreadPool.threadFactory

此系統變數用於設置 ThreadFactory,它應該是 java.util.concurrent.ThreadFactory 實現類的全限定類名。一旦我們指定了這個 ThreadFactory 以後,group 中的執行緒就會使用該類產生。

java.nio.channels.DefaultThreadPool.initialSize

此系統變數也很好理解,用於設置執行緒池的初始大小。

可能你會想要使用自己定義的 group,這樣可以對其中的執行緒進行更多的控制,使用以下幾個方法即可:

AsynchronousChannelGroup.withCachedThreadPool(ExecutorService executor, int initialSize)

AsynchronousChannelGroup.withFixedThreadPool(int nThreads, ThreadFactory threadFactory)

AsynchronousChannelGroup.withThreadPool(ExecutorService executor)

熟悉執行緒池的讀者對這些方法應該很好理解,它們都是 AsynchronousChannelGroup 中的靜態方法。

至於 group 的使用就很簡單了,代碼一看就懂:

AsynchronousChannelGroup group = AsynchronousChannelGroup

        .withFixedThreadPool(10, Executors.defaultThreadFactory());

AsynchronousServerSocketChannel server = AsynchronousServerSocketChannel.open(group);

AsynchronousSocketChannel client = AsynchronousSocketChannel.open(group);

AsynchronousFileChannels 不屬於 group。但是它們也是關聯到一個執行緒池的,如果不指定,會使用系統預設的執行緒池,如果想要使用指定的執行緒池,可以在實體化的時候使用以下方法:

public static AsynchronousFileChannel open(Path file,

                                           Set extends OpenOption> options,

                                           ExecutorService executor,

                                           FileAttribute >… attrs) {

    …

}

到這裡,異步 IO 就算介紹完成了。

小結

我想,本文應該是說清楚了非阻塞 IO 和異步 IO 了,對於異步 IO,由於網上的資料比較少,所以不免篇幅多了些。

我們也要知道,看懂了這些,確實可以學到一些東西,多瞭解一些知識,但是我們還是很少在工作中將這些知識變成工程代碼。一般而言,我們需要在網絡應用中使用 NIO 或 AIO 來提升性能,但是,在工程上,絕不是瞭解了一些概念,知道了一些接口就可以的,需要處理的細節還非常多。

這也是為什麼 Netty/Mina 如此盛行的原因,因為它們幫助封裝好了很多細節,提供給我們用戶友好的接口,後面有時間我也會對 Netty 進行介紹。

看完本文有收穫?請轉發分享給更多人

關註「ImportNew」,提升Java技能

赞(0)

分享創造快樂