閱讀963 返回首頁    go 阿裏雲 go 技術社區[雲棲]


Netty In Action

fxjwind

Netty In Action

1 introduction

1.2 Asynchronous by design

two most common ways to work with or implement an asynchronous API,

1.2.1 Callbacks

回調函數,本質,

move the execution of these methods from the “caller” thread to some other thread. 
There is no guarantee whenever one of the methods of the FetchCallback will be called.

Callbacks的問題,影響可讀性

lead to spaghetti code when you chain many asynchronous method calls with different callbacks

 

1.2.2 Futures

A Future is an abstraction, which represents a value that may become available at some point. 
A Future object either holds the result of a computation or, in the case of a failed computation, an exception.

例子,

複製代碼
ExecutorService executor = Executors.newCachedThreadPool();

Runnable task1 = new Runnable() { }

Future<?> future1 = executor.submit(task1);
Future<Integer> future2 = executor.submit(task2);
while (!future1.isDone() || !future2.isDone()) {
...
// do something else
...
}
複製代碼

Future的問題在於,需要不時的去看看結果ready沒有,比較ugly

Sometimes using futures can feel ugly because you need to check the state of the Future in intervals to see if it is completed yet, whereas with a callback you’re notified directly after it’s done.

 

1.3 Blocking versus non-blocking IO on the JVM

 

To do networking-related tasks in Java, you can take one of two approaches: 
-  use IO , also known as blocking IO 
-  use NIO, also known as new/non-blocking IO

image image

 

1.3.1 EchoServer based on blocking IO

複製代碼
public class PlainEchoServer {
    public void serve(int port) throws IOException {
        final ServerSocket socket = new ServerSocket(port); #1
        while (true) {
            final Socket clientSocket = socket.accept(); #2

            new Thread(new Runnable() { #3
                @Override
                public void run() {
                    BufferedReader reader = new BufferedReader(new InputStreamReader(clientSocket.getInputStream()));
                    PrintWriter writer = new PrintWriter(clientSocket.getOutputStream(), true);
                    while(true) { #4
                        writer.println(reader.readLine());
                        writer.flush();
                    }
            }
        }

    }
複製代碼

問題就是一個連接就需要一個thread,當連接很多的時候,有擴展性問題,就算用連接池,也是一樣比較低效

 

1.3.2 Non-blocking IO basics

 

ByteBuffer 
A ByteBuffer is fundamental to both NIO APIs and, indeed, to Netty. A ByteBuffer can either be allocated on the heap or directly

image

用於數據搬運的集裝箱

flip操作用於reset offset,便於讀取現有數據

 

WORKING With NIO Selectors

A selector is a NIO component that determines if one or more channels are ready for reading and/or writing, thus a single select selector can be used to handle multiple connections

類似於操作係統,select,poll的命令

To use selectors, you typically complete the following steps. 
1. Create one or more selectors to which opened channels (sockets) can be registered. 
2. When a channel is registered, you specify which events you’re interested in listening in. 
The four available events (or Ops/operations) are: 
- OP_ACCEPT—Operation-set bit for socket-accept operations 
- OP_CONNECT—Operation-set bit for socket-connect operations 
- OP_READ—Operation-set bit for read operations 
- OP_WRITE—Operation-set bit for write operations 
3. When channels are registered, you call the Selector.select() method to block until one of these events occurs. 
4. When the method unblocks, you can obtain all of the SelectionKey instances (which hold the reference to the registered channel and to selected Ops) and do something. 
What exactly you do depends on which operation is ready. A SelectedKey can include more than one operation at any given time.

創建selectors 讓channels (sockets)可以注冊上 
然後,selector就會監聽事件,事件有4種 
調用Selector.select()會block等待事件,關鍵在於可以同時監聽很多channel,所以是non-blocking

收到事件後,可以取到SelectionKey

 

1.3.3 EchoServer based on NIO


注冊過程

複製代碼
ServerSocketChannel serverChannel = ServerSocketChannel.open(); //創建channel
ServerSocket ss = serverChannel.socket(); //channel創建和綁定socket
InetSocketAddress address = new InetSocketAddress(port);
ss.bind(address);
serverChannel.configureBlocking(false);
Selector selector = Selector.open(); //創建selector
serverChannel.register(selector, SelectionKey.OP_ACCEPT); //將channel注冊到selector,監聽Accept事件
複製代碼

處理過程,

複製代碼
while (true) {
    try {
        selector.select(); //Block until something is selected
    } catch (IOException ex) {
        ex.printStackTrace();
        // handle in a proper way
        break;
    }
    
    Set readyKeys = selector.selectedKeys(); //Get all SelectedKey instances
    Iterator iterator = readyKeys.iterator();
    while (iterator.hasNext()) {
        SelectionKey key = (SelectionKey) iterator.next();
        iterator.remove();  //Remove the SelectedKey from the iterator
        try {
            if (key.isAcceptable()) {
                ServerSocketChannel server = (ServerSocketChannel) key.channel();
                SocketChannel client = server.accept(); //Accept the client connection
                client.configureBlocking(false);
                client.register(selector, SelectionKey.OP_WRITE | SelectionKey.OP_READ, ByteBuffer.allocate(100)); //accept後,需要繼續注冊讀寫OP
            }
            if (key.isReadable()) {
                SocketChannel client = (SocketChannel) key.channel();
                ByteBuffer output = (ByteBuffer) key.attachment();
                client.read(output);
            }
            if (key.isWritable()) {
                SocketChannel client = (SocketChannel) key.channel();
                ByteBuffer output = (ByteBuffer) key.attachment();
                output.flip();
                client.write(output); 
                output.compact();
            }
        }
}
複製代碼

可以看到典型的reactor模型,事件驅動

所有鏈接,都需要先處理accept請求,然後再注冊讀寫OP

NIO是non-blocking,reactor模型是典型的非阻塞同步模型

 

1.3.4 EchoServer based on NIO.2

NIO2,是非阻塞異步模型 
隻需要調用IO操作,並給出CompletionHandler即可,這個CompletionHandler在IO操作完成後會異步的執行 
並且還可以保證對於一個channel,隻有一個CompletionHandler被執行

Unlike the original NIO implementation, NIO.2 allows you to issue IO operations and provide what is called a completion handler (CompletionHandler class).

It also guarantees that only one CompletionHandler is executed for channel at the same time. 
This approach helps to simplify the code because it removes the complexity that comes with multithreaded execution.

複製代碼
public class PlainNio2EchoServer {
    public void serve(int port) throws IOException {
        final AsynchronousServerSocketChannel serverChannel = AsynchronousServerSocketChannel.open(); //注意這裏是異步ServerSocketChannel
        InetSocketAddress address = new InetSocketAddress(port);
        serverChannel.bind(address);

        serverChannel.accept(null,  //觸發accept操作,以CompletionHandler為callback
            new CompletionHandler<AsynchronousSocketChannel, Object>() {
                @Override
                public void completed(final AsynchronousSocketChannel channel, Object attachment) { //當accept完成時異步調用
                    serverChannel.accept(null, this);  //再次觸發accept操作,接受其他的connection
                    ByteBuffer buffer = ByteBuffer.allocate(100); 
                    channel.read(buffer, buffer, new EchoCompletionHandler(channel)); //觸發讀操作,並以EchoCompletionHandler為callback
                }
                @Override
               public void failed(Throwable throwable, Object attachment) {
                    try {
                        serverChannel.close(); //IO失敗時候的處理
                    } catch (IOException e) {
                        // ingnore on close
                    }
               }
            }
    }

    private final class EchoCompletionHandler implements CompletionHandler<Integer, ByteBuffer> {
        private final AsynchronousSocketChannel channel;

        @Override
        public void completed(Integer result, ByteBuffer buffer) { //當讀操作完成時調用
            buffer.flip();
            channel.write(buffer, buffer,  //觸發寫操作
                new CompletionHandler<Integer, ByteBuffer>() { 
                    @Override
                    public void completed(Integer result, ByteBuffer buffer) { //寫操作完成後的callback
                    if (buffer.hasRemaining()) {
                        channel.write(buffer, buffer, this); //如果buffer裏麵還有數據,繼續寫
                    } else {
                        buffer.compact();
                        channel.read(buffer, buffer, EchoCompletionHandler.this); //如果沒有,繼續觸發讀操作,並以EchoCompletionHandler為callback
                    }
                }
            
                    @Override
                    public void failed(Throwable exc, ByteBuffer attachment) {
                    try {
                        channel.close();
                    } catch (IOException e) {
                        // ingnore on close
                    }
                }
複製代碼
大家體會一下,這個代碼比NIO更加tricky和難於理解

 

1.4 NIO problems and how Netty comes to the rescue

看看NIO的問題,以及Netty是如何解決的

1.4.1 Cross-platform and compatibility issues

NIO is low level and depends on how the operating system (OS) handles IO. 
When using NIO you often find that your code works fine on Linux, for example, but has problems on Windows. 
Ideal as NIO.2 may seem, it’s only supported in Java 7, and if your application runs on Java 6, you may not be able to use it. Also, at the time of writing, there is no NIO.2 API for datagram channels (for UDP applications), so its usage is limited to TCP applications only.

Java NIO和係統和Java版本相關

 

1.4.2 Extending ByteBuffer ... or not 
As you saw previously, ByteBuffer is used as data container. Unfortunately, the JDK doesn’t contain a ByteBuffer implementation that allows wrapping an array of ByteBuffer instances. This functionality is useful if you want to minimize memory copies. If you ‘rethinking I’ll implement it myself, don’t waste your time; ByteBuffer has a private constructor, so it isn’t possible to extend it.

ByteBuffer具有私有的構造函數,無法擴展

不一一列了,後麵大部分是一些係統或java的bug,netty修複了。。。。。。感覺理由不太充分啊

 

Bootstrapping

Bootstrapping in Netty is the process by which you configure your Netty application. 
You use a bootstrap when you need to connect a client to some host and port, or bind a server to a given port. 
As the previous statement implies, there are two types of Bootstraps, one typically used for clients, but is also used for DatagramChannel (simply called Bootstrap) and one for servers (aptly named ServerBootstrap).

提供Bootstrap抽象,用於配置和初始化 
隻有兩種類型,對應於client和server,兩種Bootstrap的不同

image

第一個不同很容易理解

第二個不同,看著很無厘頭,但很關鍵,關鍵就在ServerBootstrap為什麼要兩個?

看下圖,

image

EventLoopGroup A's only purpose is to accept connections and hand them over to EventLoopGroup B.

用兩個reactor模型來模擬proactor模型,

第一個reactor隻accept,這樣避免在handler的時間過長,而產生block 
第二個reactor去真正處理handler

所以Netty Server可以說是非阻塞異步IO

 

Bootstrap client

image image

 

The bootstrap is responsible for client and/or connectionless-based channels, so it will create the channel after bind(...) or connect(...) is called.

image 

image

 

Server Bootstrap

image image image

As you may have noticed, the methods in the previous section are similar to what you saw in the bootstrap class. 
There is only one difference, which makes a lot of sense once you think about it. 
While ServerBootstrap has handler(...), attr(...), and option(...) methods, it also offers those with the child prefix
This is done as the ServerBootstrap bootstraps ServerChannel implementations, which are responsible for creating child channels. 
Those channels represent the accepted connections. ServerBootstrap offer the child* methods in order to make applying settings on accepted channels as easy as possible.

image 

image

image

 

Adding multiple ChannelHandlers during a bootstrap

image

 

ChannelPipeline

A ChannelPipeline is a list of ChannelHandler instances that handle or intercept inbound and outbound operations of a channel.

image

ChannelHandler的操作,

image

其他操作,

image

Inbound operations

image 

imageimage

Outbound operations

image image

 

ChannelHandlerContext

image

 

Now if you’d like to have the event flow through the whole ChannelPipeline, there are two different ways of doing so: 
- Invoke methods on the Channel. 
- Invoke methods on the ChannelPipeline.

image 

image 

image

 

除了上麵兩種方式,還可以調用ChannelContext的接口, 
這樣的不同是,可以從任意一個handler開始執行,而不需要從頭執行

image

image

 

ChannelHandlers and their types

image

 

image 

image

 

image

image

 

這書僅第一章寫的不錯,思路清晰

後麵的比較一般,思路不是很好

最後更新:2017-04-07 21:23:50

  上一篇:go Flink - state
  下一篇:go Flink -- Barrier