303
技術社區[雲棲]
Netty應用篇
本博客是閱讀《Netty權威指南》的讀書筆記,如有錯誤,歡迎指正、探討,謝謝!
- 服務端啟動步驟 1) 創建ServerBootstrap實例 ServerBootstrap serverBootstrap = new ServerBootstrap(); ServerBootstrap是netty服務端的啟動輔助類,提供了一係列的方法用於設置服務端啟動的相關參數,底層通過門麵模式對各種能力進行抽象和封裝,盡量不需要用戶跟過多的底層API打交道,以降低用戶的開發難度。
2) 設置並綁定Reactor線程池
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
serverBootstrap.group(bossGroup, workerGroup)
Netty的Reactor線程池是EventLoopGroup,它實際上是EventLoop的數據。EventLoop的職責是處理所有注冊到本線程Selector(多路複用器)上的Channel,Selector的輪詢操作由綁定的EventLoop線程run方法驅動,在一個循環體內循環執行。EventLoop不僅處理網絡IO事件,還負責處理用戶自定義的Task和定時任務,如此線程模型就統一了。
3) 設置並綁定服務端Channel
serverBootstrap channel(NioServerSocketChannel.class)
作為NIO的服務端,Netty自然是需要創建ServerSocketChannel的。NioServerSocketChannel是Netty對原生NIO類庫的封裝實現,對用戶而言,不需要關心服務端Channel的底層實現細節和工作原理,隻需要指定具體使用那種服務端Channel即可。
serverBootstrap.handler(new LoggingHandler(LogLevel.INFO)).childHandler(new ChannelInitializer() {
@Override
public void initChannel(SocketChannel ch) throws IOException {
}
});
4) 鏈路建立的時候創建並初始化ChannelPipeline
serverBootstrap.childHandler(new ChannelInitializer() {
@Override
public void initChannel(SocketChannel ch) throws IOException {
//add ChannelHandler
}
});
ChannelPipeline並不是NIO服務端必須的,它本質上是一個負責處理網絡事件的責任鏈,負責管理和執行ChannelHandler。網絡事件以流的形式在ChannelPipeline中流轉,由ChannelPipeline根據ChannelHandler的執行策略進行調度。典型的網絡事件如下:
鏈路注冊
鏈路激活
鏈路斷開
接收到請求消息
請求消息接收並處理完畢
發送應答消息
鏈路發生異常
發生用戶自定義事件
5) 添加並設置ChannelHandler
ch.pipeline().addLast(new NettyMessageDecoder(1024 * 1024, 4, 4));
ch.pipeline().addLast(new NettyMessageEncoder());
ch.pipeline().addLast("readTimeoutHandler", new ReadTimeoutHandler(50));
ch.pipeline().addLast(new LoginAuthRespHandler());
ch.pipeline().addLast("HeartBeatHandler", new HeartBeatRespHandler());
ChannelHandler是Netty提供給用戶定製和擴展的關鍵接口,利用ChannelHandler用戶可以完成大多數的功能定製,例如消息編解碼、心跳、安全認證、TSL/SSL認證、流量控製、流量整形等。以下是Netty提供的常用的係統Channel。
ByteToMessageCodec:係統編解碼框架
LengthFieldBasedFrameDecoder:基於長度的半包解碼器
LoggingHandler:碼流日誌打印Handler
SslHandler:SSL安全認證Handler
IdleStateHandler:鏈路空閑檢測Handler
ChannelTrafficShapingHandler:流量整形Handler
Base64Decoder和Base64Encoder:Base64編解碼
6) 綁定並啟動監聽端口
serverBootstrap.bind(NettyConstant.REMOTEIP, NettyConstant.PORT);
在綁定監聽端口之前,係統會做一係列的初始化和檢測工作,完成之後,會啟動監聽端口,並將ServerSocketChannel注冊到Selector上監聽客戶端連接。
7) Selector輪詢
由NioEventLoop負責調度並執行Selector輪詢操作,選擇準備就緒的Channel集合,相關代碼如下:
private void select(boolean oldWakenUp) throws IOException {
Selector selector = this.selector;
try {
int selectCnt = 0;
long currentTimeNanos = System.nanoTime();
long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);
for (;;) {
long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;
if (timeoutMillis <= 0) {
if (selectCnt == 0) {
selector.selectNow();
selectCnt = 1;
}
break;
}
// If a task was submitted when wakenUp value was true, the task didn't get a chance to call
// Selector#wakeup. So we need to check task queue again before executing select operation.
// If we don't, the task might be pended until select operation was timed out.
// It might be pended until idle timeout if IdleStateHandler existed in pipeline.
if (hasTasks() && wakenUp.compareAndSet(false, true)) {
selector.selectNow();
selectCnt = 1;
break;
}
int selectedKeys = selector.select(timeoutMillis);
selectCnt ++;
if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) {
// - Selected something,
// - waken up by user, or
// - the task queue has a pending task.
// - a scheduled task is ready for processing
break;
}
if (Thread.interrupted()) {
// Thread was interrupted so reset selected keys and break so we not run into a busy loop.
// As this is most likely a bug in the handler of the user or it's client library we will
// also log it.
//
// See https://github.com/netty/netty/issues/2426
if (logger.isDebugEnabled()) {
logger.debug("Selector.select() returned prematurely because Thread.currentThread().interrupt() was called. Use NioEventLoop.shutdownGracefully() to shutdown the NioEventLoop.");
}
selectCnt = 1;
break;
}
long time = System.nanoTime();
if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {
// timeoutMillis elapsed without anything selected.
selectCnt = 1;
} else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
// The selector returned prematurely many times in a row.
// Rebuild the selector to work around the problem.
logger.warn("Selector.select() returned prematurely {} times in a row; rebuilding Selector {}.",
selectCnt, selector);
rebuildSelector();
selector = this.selector;
// Select again to populate selectedKeys.
selector.selectNow();
selectCnt = 1;
break;
}
currentTimeNanos = time;
}
if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS) {
if (logger.isDebugEnabled()) {
logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.", selectCnt - 1, selector);
}
}
} catch (CancelledKeyException e) {
if (logger.isDebugEnabled()) {
logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector {} - JDK bug?", selector, e);
}
// Harmless exception - log anyway
}
}
8) 當輪詢到準備繼續的Channel之後,就由Reactor線程NioEventLoop執行ChannelPipeline的相應方法,並最終調度並執行ChannelHandler
9) 執行Netty係統的ChannelHandler和用戶添加的定製化ChannelHandler
說明:後幾個步驟,都被Netty封裝並處理了,所以並不需要我們做過多的事情。
- 示例代碼 EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 100).handler(new LoggingHandler(LogLevel.INFO)) .childHandler(new ChannelInitializer() { @Override public void initChannel(SocketChannel ch) throws IOException { ch.pipeline().addLast(new NettyMessageDecoder(1024 * 1024, 4, 4)); ch.pipeline().addLast(new NettyMessageEncoder()); ch.pipeline().addLast("readTimeoutHandler", new ReadTimeoutHandler(50)); ch.pipeline().addLast(new LoginAuthRespHandler()); ch.pipeline().addLast("HeartBeatHandler", new HeartBeatRespHandler()); } });
// 綁定端口,同步等待成功
serverBootstrap.bind(NettyConstant.REMOTEIP, NettyConstant.PORT).sync();
System.out.println("Netty server start ok : " + (NettyConstant.REMOTEIP + " : " + NettyConstant.PORT));
- 服務端啟動步驟 因為Client有很多步驟做的事情和Server比較類似,所以下麵的描述較為簡單。 1) 創建Bootstrap實例 Bootstrap bootstrap = new Bootstrap();
2) 創建客戶端連接、用於IO讀寫的Reactor線程組(NioEventLoopgroup)
EventLoopGroup group = new NioEventLoopGroup();
bootstrap.group(group);
默認為IO線程個數為CPU核數的2倍
3) 創建NioSocketChannel
通過Bootstrap的ChannelFactor和用戶指定的Channel類型創建用於客戶端連接的NioSocketChannel。
bootstrap. channel(NioSocketChannel.class)
4) 創建ChannelPipeline,添加ChannelHandler
bootstrap.option(ChannelOption.TCP_NODELAY, true).handler(new ChannelInitializer() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
//add ChannelHandler
ch.pipeline().addLast(new NettyMessageDecoder(1024 * 1024, 4, 4));
ch.pipeline().addLast("MessageEncoder", new NettyMessageEncoder());
ch.pipeline().addLast("readTimeoutHandler", new ReadTimeoutHandler(50));
ch.pipeline().addLast("HeartBeatHandler", new HeartBeatReqHandler());
}
});
5) 異步發起TCP連接,判斷連接是否成功。
如果成功,則直接將NioSocketChannel注冊到多路複用器上,監聽讀操作位,用戶數據報讀取和消息發送;如果沒有連接成功,則注冊連接監聽位到多路複用器,等待連接結果。
bootstrap.connect(new InetSocketAddress(host, port), new InetSocketAddress(NettyConstant.LOCALIP, NettyConstant.LOCAL_PORT)).sync();
6) 注冊對應的網絡監聽狀態位到多路複用器
7) 由多路複用器輪詢各Channel,處理連接結果
8) 如果連接成功,設置Future結果,發送連接成功事件,觸發ChannelPipeline執行
9) 有ChannelPipeline執行ChannelHandler,執行業務邏輯。
以下為HeartBeatReqHandler的示例代碼:
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
NettyMessage message = (NettyMessage) msg;
// 握手成功,主動發送心跳消息
if (message.getHeader() != null && message.getHeader().getType() == MessageType.LOGIN_RESP.value()) {
heartBeat = ctx.executor().scheduleAtFixedRate(new HeartBeatReqHandler.HeartBeatTask(ctx), 0, 5000, TimeUnit.MILLISECONDS);
} else if (message.getHeader() != null && message.getHeader().getType() == MessageType.HEARTBEAT_RESP.value()) {
System.out.println("Client receive server heart beat message : ---> " + message);
} else
ctx.fireChannelRead(msg);
}
說明:後幾個步驟,都被Netty封裝並處理了,所以並不需要我們做過多的事情。
- 示例代碼 Bootstrap bootstrap = new Bootstrap(); bootstrap.group(group).channel(NioSocketChannel.class).option(ChannelOption.TCP_NODELAY, true) .handler(new ChannelInitializer() { @Override public void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new NettyMessageDecoder(1024 * 1024, 4, 4)); ch.pipeline().addLast("MessageEncoder", new NettyMessageEncoder()); ch.pipeline().addLast("readTimeoutHandler", new ReadTimeoutHandler(50)); ch.pipeline().addLast("HeartBeatHandler", new HeartBeatReqHandler()); } }); // 發起異步連接操作 ChannelFuture future = bootstrap.connect(new InetSocketAddress(host, port), new InetSocketAddress(NettyConstant.LOCALIP, NettyConstant.LOCAL_PORT)).sync(); future.channel().closeFuture().sync();
最後更新:2017-11-18 15:33:45