閱讀786 返回首頁    go 阿裏雲 go 技術社區[雲棲]


Netty 5用戶指南(二)

Time客戶端

不像DISCARD和ECHO的服務端,對於TIME協議我們需要一個客戶端因為人們不能把一個32位的二進製數據翻譯成一個日期或者日曆。在這一部分,我們將會討論如何確保服務端是正常工作的,並且學習怎樣用Netty編寫一個客戶端。

在Netty中,編寫服務端和客戶端最大的並且唯一不同的使用了不同的BootStrapChannel的實現。請看一下下麵的代碼:

01 package io.netty.example.time;
02  
03 public class TimeClient {
04     public static void main(String[] args) throws Exception {
05         String host = args[0];
06         int port = Integer.parseInt(args[1]);
07         EventLoopGroup workerGroup = new NioEventLoopGroup();
08  
09         try {
10             Bootstrap b = new Bootstrap(); // (1)
11             b.group(workerGroup); // (2)
12             b.channel(NioSocketChannel.class); // (3)
13             b.option(ChannelOption.SO_KEEPALIVE, true); // (4)
14             b.handler(new ChannelInitializer<SocketChannel>() {
15                 @Override
16                 public void initChannel(SocketChannel ch) throws Exception {
17                     ch.pipeline().addLast(new TimeClientHandler());
18                 }
19             });
20  
21             // Start the client.
22             ChannelFuture f = b.connect(host, port).sync(); // (5)
23  
24             // Wait until the connection is closed.
25             f.channel().closeFuture().sync();
26         finally {
27             workerGroup.shutdownGracefully();
28         }
29     }
30 }
  1. BootStrapServerBootstrap類似,不過他是對非服務端的channel而言,比如客戶端或者無連接傳輸模式的channel。
  2. 如果你隻指定了一個EventLoopGroup,那他就會即作為一個‘boss’線程,也會作為一個‘workder’線程,盡管客戶端不需要使用到‘boss’線程。
  3. 代替NioServerSocketChannel的是NioSocketChannel,這個類在客戶端channel被創建時使用。
  4. 不像在使用ServerBootstrap時需要用childOption()方法,因為客戶端的SocketChannel沒有父channel的概念。
  5. 我們用connect()方法代替了bind()方法。

正如你看到的,他和服務端的代碼是不一樣的。ChannelHandler是如何實現的?他應該從服務端接受一個32位的整數消息,把他翻譯成人們能讀懂的格式,並打印翻譯好的時間,最後關閉連接:

01 package io.netty.example.time;
02  
03 import java.util.Date;
04  
05 public class TimeClientHandler extends ChannelHandlerAdapter {
06     @Override
07     public void channelRead(ChannelHandlerContext ctx, Object msg) {
08         ByteBuf m = (ByteBuf) msg; // (1)
09         try {
10             long currentTimeMillis = (m.readUnsignedInt() - 2208988800L) * 1000L;
11             System.out.println(new Date(currentTimeMillis));
12             ctx.close();
13         finally {
14             m.release();
15         }
16     }
17  
18     @Override
19     public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
20         cause.printStackTrace();
21         ctx.close();
22     }
23 }
  1. 在TCP/IP中,NETTY會把讀到的數據放到ByteBuf的數據結構中。

這樣看起來非常簡單,並且和服務端的那個例子的代碼也相差不多。然而,處理器有時候會因為拋出IndexOutOfBoundsException而拒絕工作。在下個部分我們會討論為什麼會發生這種情況。

流數據的傳輸處理

一個小的Socket Buffer問題

在基於流的傳輸裏比如TCP/IP,接收到的數據會先被存儲到一個socket接收緩衝裏。不幸的是,基於流的傳輸並不是一個數據包隊列,而是一個字節隊列。即使你發送了2個獨立的數據包,操作係統也不會作為2個消息處理而僅僅是作為一連串的字節而言。因此這是不能保證你遠程寫入的數據就會準確地讀取。舉個例子,讓我們假設操作係統的TCP/TP協議棧已經接收了3個數據包:

netty5_1.png

由於基於流傳輸的協議的這種普通的性質,在你的應用程序裏讀取數據的時候會有很高的可能性被分成下麵的片段。

netty5_2.png

因此,一個接收方不管他是客戶端還是服務端,都應該把接收到的數據整理成一個或者多個更有意思並且能夠讓程序的業務邏輯更好理解的數據。在上麵的例子中,接收到的數據應該被構造成下麵的格式:

netty5_3.png

第一個解決方案

現在讓我們回到TIME客戶端的例子上。這裏我們遇到了同樣的問題,一個32字節數據是非常小的數據量,他並不見得會被經常拆分到到不同的數據段內。然而,問題是他確實可能會被拆分到不同的數據段內,並且拆分的可能性會隨著通信量的增加而增加。

最簡單的方案是構造一個內部的可積累的緩衝,直到4個字節全部接收到了內部緩衝。下麵的代碼修改了TimeClientHandler的實現類修複了這個問題

01 package io.netty.example.time;
02  
03 import java.util.Date;
04  
05 public class TimeClientHandler extends ChannelHandlerAdapter {
06     private ByteBuf buf;
07  
08     @Override
09     public void handlerAdded(ChannelHandlerContext ctx) {
10         buf = ctx.alloc().buffer(4); // (1)
11     }
12  
13     @Override
14     public void handlerRemoved(ChannelHandlerContext ctx) {
15         buf.release(); // (1)
16         buf = null;
17     }
18  
19     @Override
20     public void channelRead(ChannelHandlerContext ctx, Object msg) {
21         ByteBuf m = (ByteBuf) msg;
22         buf.writeBytes(m); // (2)
23         m.release();
24  
25         if (buf.readableBytes() >= 4) { // (3)
26             long currentTimeMillis = (buf.readInt() - 2208988800L) * 1000L;
27             System.out.println(new Date(currentTimeMillis));
28             ctx.close();
29         }
30     }
31  
32     @Override
33     public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
34         cause.printStackTrace();
35         ctx.close();
36     }
37 }
  1. ChannelHandler有2個生命周期的監聽方法:handlerAdded()和handlerRemoved()。你可以完成任意初始化任務隻要他不會被阻塞很長的時間。
  2. 首先,所有接收的數據都應該被累積在buf變量裏。
  3. 然後,處理器必須檢查buf變量是否有足夠的數據,在這個例子中是4個字節,然後處理實際的業務邏輯。否則,Netty會重複調用channelRead()當有更多數據到達直到4個字節的數據被積累。

第二個解決方案

盡管第一個解決方案已經解決了Time客戶端的問題了,但是修改後的處理器看起來不那麼的簡潔,想象一下如果由多個字段比如可變長度的字段組成的更為複雜的協議時,你的ChannelHandler的實現將很快地變得難以維護。

正如你所知的,你可以增加多個ChannelHandlerChannelPipeline ,因此你可以把一整個ChannelHandler拆分成多個模塊以減少應用的複雜程度,比如你可以把TimeClientHandler拆分成2個處理器:

  • TimeDecoder處理數據拆分的問題
  • TimeClientHandler原始版本的實現

幸運地是,Netty提供了一個可擴展的類,幫你完成TimeDecoder的開發。

01 package io.netty.example.time;
02  
03 public class TimeDecoder extends ByteToMessageDecoder { // (1)
04     @Override
05     protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) { // (2)
06         if (in.readableBytes() < 4) {
07             return// (3)
08         }
09  
10         out.add(in.readBytes(4)); // (4)
11     }
12 }
  1. ByteToMessageDecoderChannelHandler的一個實現類,他可以在處理數據拆分的問題上變得很簡單。
  2. 每當有新數據接收的時候,ByteToMessageDecoder都會調用decode()方法來處理內部的那個累積緩衝。
  3. Decode()方法可以決定當累積緩衝裏沒有足夠數據時可以往out對象裏放任意數據。當有更多的數據被接收了ByteToMessageDecoder會再一次調用decode()方法。
  4. 如果在decode()方法裏增加了一個對象到out對象裏,這意味著解碼器解碼消息成功。ByteToMessageDecoder將會丟棄在累積緩衝裏已經被讀過的數據。請記得你不需要對多條消息調用decode(),ByteToMessageDecoder會持續調用decode()直到不放任何數據到out裏。

現在我們有另外一個處理器插入到ChannelPipeline裏,我們應該在TimeClient裏修改ChannelInitializer 的實現:

1 b.handler(new ChannelInitializer<SocketChannel>() {
2     @Override
3     public void initChannel(SocketChannel ch) throws Exception {
4         ch.pipeline().addLast(new TimeDecoder(), new TimeClientHandler());
5     }
6 });

如果你是一個大膽的人,你可能會嚐試使用更簡單的解碼類ReplayingDecoder。不過你還是需要參考一下API文檔來獲取更多的信息。

1 public class TimeDecoder extends ReplayingDecoder {
2 @Override
3 protected void decode(
4 ChannelHandlerContext ctx, ByteBuf in, List<object width="300" height="150">out) {out.add(in.readBytes(4));}}

此外,Netty還提供了更多可以直接拿來用的解碼器使你可以更簡單地實現更多的協議,幫助你避免開發一個難以維護的處理器實現。請參考下麵的包以獲取更多更詳細的例子:

用POJO代替ByteBuf

我們已經討論了所有的例子,到目前為止一個消息的消息都是使用ByteBuf作為一個基本的數據結構。在這一部分,我們會改進TIME協議的客戶端和服務端的例子,用POJO替代ByteBuf。在你的ChannelHandlerS中使用POJO優勢是比較明顯的。通過從ChannelHandler中提取出ByteBuf的代碼,將會使ChannelHandler的實現變得更加可維護和可重用。在TIME客戶端和服務端的例子中,我們讀取的僅僅是一個32位的整形數據,直接使用ByteBuf不會是一個主要的問題。然後,你會發現當你需要實現一個真實的協議,分離代碼變得非常的必要。首先,讓我們定義一個新的類型叫做UnixTime。

01 package io.netty.example.time;
02  
03 import java.util.Date;
04  
05 public class UnixTime {
06  
07     private final int value;
08  
09     public UnixTime() {
10         this((int) (System.currentTimeMillis() / 1000L + 2208988800L));
11     }
12  
13     public UnixTime(int value) {
14         this.value = value;
15     }
16  
17     public int value() {
18         return value;
19     }
20  
21     @Override
22     public String toString() {
23         return new Date((value() - 2208988800L) * 1000L).toString();
24     }
25 }

現在我們可以修改下TimeDecoder類,返回一個UnixTime,以替代ByteBuf

1 @Override
2 protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
3     if (in.readableBytes() < 4) {
4         return;
5     }
6  
7     out.add(new UnixTime(in.readInt()));
8 }

下麵是修改後的解碼器,TimeClientHandler不再有任何的ByteBuf代碼了。

1 @Override
2 public void channelRead(ChannelHandlerContext ctx, Object msg) {
3     UnixTime m = (UnixTime) msg;
4     System.out.println(m);
5     ctx.close();
6 }

是不是變得更加簡單和優雅了?相同的技術可以被運用到服務端。讓我們修改一下TimeServerHandler的代碼。

1 @Override
2 public void channelActive(ChannelHandlerContext ctx) {
3     ChannelFuture f = ctx.writeAndFlush(new UnixTime());
4     f.addListener(ChannelFutureListener.CLOSE);
5 }

現在,僅僅需要修改的是ChannelHandler的實現,這裏需要把UnixTime對象重新轉化為一個ByteBuf。不過這已經是非常簡單了,因為當你對一個消息編碼的時候,你不需要再處理拆包和組裝的過程。

01 package io.netty.example.time;
02  
03 public class TimeEncoder extends ChannelHandlerAdapter {
04     @Override
05     public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
06         UnixTime m = (UnixTime) msg;
07         ByteBuf encoded = ctx.alloc().buffer(4);
08         encoded.writeInt(m.value());
09         ctx.write(encoded, promise); // (1)
10     }
11 }
  1. 在這幾行代碼裏還有幾個重要的事情。第一, 通過ChannelPromise,當編碼後的數據被寫到了通道上Netty可以通過這個對象標記是成功還是失敗。第二, 我們不需要調用cxt.flush()。因為處理器已經單獨分離出了一個方法void flush(ChannelHandlerContext cxt),如果像自己實現flush方法內容可以自行覆蓋這個方法。

進一步簡化操作,你可以使用MessageToByteEncode:

1 public class TimeEncoder extends MessageToByteEncoder<UnixTime> {
2     @Override
3     protected void encode(ChannelHandlerContext ctx, UnixTime msg, ByteBuf out) {
4         out.writeInt(msg.value());
5     }
6 }

最後的任務就是在TimeServerHandler之前把TimeEncoder插入到ChannelPipeline。但這是不那麼重要的工作。

關閉你的應用

關閉一個Netty應用往往隻需要簡單地通過shutdownGracefully()方法來關閉你構建的所有的NioEventLoopGroupS.當EventLoopGroup被完全地終止,並且對應的所有channels都已經被關閉時,Netty會返回一個Future對象。

概述

在這一章節中,我們會快速地回顧下如果在熟練掌握Netty的情況下編寫出一個健壯能運行的網絡應用程序。在Netty接下去的章節中還會有更多更相信的信息。我們也鼓勵你去重新複習下在io.netty.example包下的例子。請注意社區一直在等待你的問題和想法以幫助Netty的持續改進,Netty的文檔也是基於你們的快速反饋上。

最後更新:2017-05-23 18:02:29

  上一篇:go  Disruptor入門
  下一篇:go  Alphago再下一城,人機大戰能告訴我們什麼?