閱讀303 返回首頁    go 技術社區[雲棲]


Netty應用篇

本博客是閱讀《Netty權威指南》的讀書筆記,如有錯誤,歡迎指正、探討,謝謝!

一、 Netty服務端
1. 時序圖
_

  1. 服務端啟動步驟 1) 創建ServerBootstrap實例 ServerBootstrap serverBootstrap = new ServerBootstrap(); ServerBootstrap是netty服務端的啟動輔助類,提供了一係列的方法用於設置服務端啟動的相關參數,底層通過門麵模式對各種能力進行抽象和封裝,盡量不需要用戶跟過多的底層API打交道,以降低用戶的開發難度。

2) 設置並綁定Reactor線程池
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
serverBootstrap.group(bossGroup, workerGroup)

Netty的Reactor線程池是EventLoopGroup,它實際上是EventLoop的數據。EventLoop的職責是處理所有注冊到本線程Selector(多路複用器)上的Channel,Selector的輪詢操作由綁定的EventLoop線程run方法驅動,在一個循環體內循環執行。EventLoop不僅處理網絡IO事件,還負責處理用戶自定義的Task和定時任務,如此線程模型就統一了。

3) 設置並綁定服務端Channel
serverBootstrap channel(NioServerSocketChannel.class)
作為NIO的服務端,Netty自然是需要創建ServerSocketChannel的。NioServerSocketChannel是Netty對原生NIO類庫的封裝實現,對用戶而言,不需要關心服務端Channel的底層實現細節和工作原理,隻需要指定具體使用那種服務端Channel即可。
serverBootstrap.handler(new LoggingHandler(LogLevel.INFO)).childHandler(new ChannelInitializer() {
@Override
public void initChannel(SocketChannel ch) throws IOException {
}
});

4) 鏈路建立的時候創建並初始化ChannelPipeline
serverBootstrap.childHandler(new ChannelInitializer() {
@Override
public void initChannel(SocketChannel ch) throws IOException {
//add ChannelHandler
}
});
ChannelPipeline並不是NIO服務端必須的,它本質上是一個負責處理網絡事件的責任鏈,負責管理和執行ChannelHandler。網絡事件以流的形式在ChannelPipeline中流轉,由ChannelPipeline根據ChannelHandler的執行策略進行調度。典型的網絡事件如下:
 鏈路注冊
 鏈路激活
 鏈路斷開
 接收到請求消息
 請求消息接收並處理完畢
 發送應答消息
 鏈路發生異常
 發生用戶自定義事件

5) 添加並設置ChannelHandler
ch.pipeline().addLast(new NettyMessageDecoder(1024 * 1024, 4, 4));
ch.pipeline().addLast(new NettyMessageEncoder());
ch.pipeline().addLast("readTimeoutHandler", new ReadTimeoutHandler(50));
ch.pipeline().addLast(new LoginAuthRespHandler());
ch.pipeline().addLast("HeartBeatHandler", new HeartBeatRespHandler());

ChannelHandler是Netty提供給用戶定製和擴展的關鍵接口,利用ChannelHandler用戶可以完成大多數的功能定製,例如消息編解碼、心跳、安全認證、TSL/SSL認證、流量控製、流量整形等。以下是Netty提供的常用的係統Channel。
 ByteToMessageCodec:係統編解碼框架
 LengthFieldBasedFrameDecoder:基於長度的半包解碼器
 LoggingHandler:碼流日誌打印Handler
 SslHandler:SSL安全認證Handler
 IdleStateHandler:鏈路空閑檢測Handler
 ChannelTrafficShapingHandler:流量整形Handler
 Base64Decoder和Base64Encoder:Base64編解碼

6) 綁定並啟動監聽端口
serverBootstrap.bind(NettyConstant.REMOTEIP, NettyConstant.PORT);
在綁定監聽端口之前,係統會做一係列的初始化和檢測工作,完成之後,會啟動監聽端口,並將ServerSocketChannel注冊到Selector上監聽客戶端連接。

7) Selector輪詢
由NioEventLoop負責調度並執行Selector輪詢操作,選擇準備就緒的Channel集合,相關代碼如下:
private void select(boolean oldWakenUp) throws IOException {
Selector selector = this.selector;
try {
int selectCnt = 0;
long currentTimeNanos = System.nanoTime();
long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);
for (;;) {
long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;
if (timeoutMillis <= 0) {
if (selectCnt == 0) {
selector.selectNow();
selectCnt = 1;
}
break;
}

        // If a task was submitted when wakenUp value was true, the task didn't get a chance to call
        // Selector#wakeup. So we need to check task queue again before executing select operation.
        // If we don't, the task might be pended until select operation was timed out.
        // It might be pended until idle timeout if IdleStateHandler existed in pipeline.
        if (hasTasks() && wakenUp.compareAndSet(false, true)) {
            selector.selectNow();
            selectCnt = 1;
            break;
        }

        int selectedKeys = selector.select(timeoutMillis);
        selectCnt ++;

        if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) {
            // - Selected something,
            // - waken up by user, or
            // - the task queue has a pending task.
            // - a scheduled task is ready for processing
            break;
        }
        if (Thread.interrupted()) {
            // Thread was interrupted so reset selected keys and break so we not run into a busy loop.
            // As this is most likely a bug in the handler of the user or it's client library we will
            // also log it.
            //
            // See https://github.com/netty/netty/issues/2426
            if (logger.isDebugEnabled()) {
                logger.debug("Selector.select() returned prematurely because  Thread.currentThread().interrupt() was called. Use  NioEventLoop.shutdownGracefully() to shutdown the NioEventLoop.");
            }
            selectCnt = 1;
            break;
        }

        long time = System.nanoTime();
        if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {
            // timeoutMillis elapsed without anything selected.
            selectCnt = 1;
        } else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
                selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
            // The selector returned prematurely many times in a row.
            // Rebuild the selector to work around the problem.
            logger.warn("Selector.select() returned prematurely {} times in a row; rebuilding Selector {}.",
                    selectCnt, selector);
            rebuildSelector();
            selector = this.selector;

            // Select again to populate selectedKeys.
            selector.selectNow();
            selectCnt = 1;
            break;
        }

        currentTimeNanos = time;
    }

    if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS) {
        if (logger.isDebugEnabled()) {
            logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.", selectCnt - 1, selector);
        }
    }
} catch (CancelledKeyException e) {
    if (logger.isDebugEnabled()) {
        logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector {} - JDK bug?",  selector, e);
    }
    // Harmless exception - log anyway
}

}

8) 當輪詢到準備繼續的Channel之後,就由Reactor線程NioEventLoop執行ChannelPipeline的相應方法,並最終調度並執行ChannelHandler

9) 執行Netty係統的ChannelHandler和用戶添加的定製化ChannelHandler

說明:後幾個步驟,都被Netty封裝並處理了,所以並不需要我們做過多的事情。

  1. 示例代碼 EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 100).handler(new LoggingHandler(LogLevel.INFO)) .childHandler(new ChannelInitializer() { @Override public void initChannel(SocketChannel ch) throws IOException { ch.pipeline().addLast(new NettyMessageDecoder(1024 * 1024, 4, 4)); ch.pipeline().addLast(new NettyMessageEncoder()); ch.pipeline().addLast("readTimeoutHandler", new ReadTimeoutHandler(50)); ch.pipeline().addLast(new LoginAuthRespHandler()); ch.pipeline().addLast("HeartBeatHandler", new HeartBeatRespHandler()); } });

// 綁定端口,同步等待成功
serverBootstrap.bind(NettyConstant.REMOTEIP, NettyConstant.PORT).sync();
System.out.println("Netty server start ok : " + (NettyConstant.REMOTEIP + " : " + NettyConstant.PORT));

二、 Netty客戶端
1. 時序圖
_

  1. 服務端啟動步驟 因為Client有很多步驟做的事情和Server比較類似,所以下麵的描述較為簡單。 1) 創建Bootstrap實例 Bootstrap bootstrap = new Bootstrap();

2) 創建客戶端連接、用於IO讀寫的Reactor線程組(NioEventLoopgroup)
EventLoopGroup group = new NioEventLoopGroup();
bootstrap.group(group);
默認為IO線程個數為CPU核數的2倍

3) 創建NioSocketChannel
通過Bootstrap的ChannelFactor和用戶指定的Channel類型創建用於客戶端連接的NioSocketChannel。
bootstrap. channel(NioSocketChannel.class)

4) 創建ChannelPipeline,添加ChannelHandler
bootstrap.option(ChannelOption.TCP_NODELAY, true).handler(new ChannelInitializer() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
//add ChannelHandler
ch.pipeline().addLast(new NettyMessageDecoder(1024 * 1024, 4, 4));
ch.pipeline().addLast("MessageEncoder", new NettyMessageEncoder());
ch.pipeline().addLast("readTimeoutHandler", new ReadTimeoutHandler(50));
ch.pipeline().addLast("HeartBeatHandler", new HeartBeatReqHandler());
}
});

5) 異步發起TCP連接,判斷連接是否成功。
如果成功,則直接將NioSocketChannel注冊到多路複用器上,監聽讀操作位,用戶數據報讀取和消息發送;如果沒有連接成功,則注冊連接監聽位到多路複用器,等待連接結果。
bootstrap.connect(new InetSocketAddress(host, port), new InetSocketAddress(NettyConstant.LOCALIP, NettyConstant.LOCAL_PORT)).sync();

6) 注冊對應的網絡監聽狀態位到多路複用器

7) 由多路複用器輪詢各Channel,處理連接結果

8) 如果連接成功,設置Future結果,發送連接成功事件,觸發ChannelPipeline執行

9) 有ChannelPipeline執行ChannelHandler,執行業務邏輯。
以下為HeartBeatReqHandler的示例代碼:
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
NettyMessage message = (NettyMessage) msg;
// 握手成功,主動發送心跳消息
if (message.getHeader() != null && message.getHeader().getType() == MessageType.LOGIN_RESP.value()) {
heartBeat = ctx.executor().scheduleAtFixedRate(new HeartBeatReqHandler.HeartBeatTask(ctx), 0, 5000, TimeUnit.MILLISECONDS);
} else if (message.getHeader() != null && message.getHeader().getType() == MessageType.HEARTBEAT_RESP.value()) {
System.out.println("Client receive server heart beat message : ---> " + message);
} else
ctx.fireChannelRead(msg);
}

說明:後幾個步驟,都被Netty封裝並處理了,所以並不需要我們做過多的事情。

  1. 示例代碼 Bootstrap bootstrap = new Bootstrap(); bootstrap.group(group).channel(NioSocketChannel.class).option(ChannelOption.TCP_NODELAY, true) .handler(new ChannelInitializer() { @Override public void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new NettyMessageDecoder(1024 * 1024, 4, 4)); ch.pipeline().addLast("MessageEncoder", new NettyMessageEncoder()); ch.pipeline().addLast("readTimeoutHandler", new ReadTimeoutHandler(50)); ch.pipeline().addLast("HeartBeatHandler", new HeartBeatReqHandler()); } }); // 發起異步連接操作 ChannelFuture future = bootstrap.connect(new InetSocketAddress(host, port), new InetSocketAddress(NettyConstant.LOCALIP, NettyConstant.LOCAL_PORT)).sync(); future.channel().closeFuture().sync();

最後更新:2017-11-18 15:33:45

  上一篇:go  風控GPS定位數據價值有多大?
  下一篇:go  【Servlet】根據MVC思想設計用戶登陸、用戶注冊、修改密碼係統