706
技術社區[雲棲]
一個高可擴展的基於非阻塞IO的服務器架構
- 目錄
- 線程體係結構
- 反應堆模式
- 組件架構
- 接收器
- 分配器
- 分配器級別事件處理器
- 應用程序級別事件處理器
- 總結
- 參考資料
如果你被要求去寫一個高可擴展性的基於JAVA的服務器,你很快就會決定使用JAVA NIO包。為了讓服務器跑起來,你可能會花很多時間閱讀博客和教程來了解線程同步需要NIO SELECTOR類以及處理一些常見的陷阱。本文描述了一個麵向連接基於NIO的服務器的基本架構。本文會先看一下一個首選的線程模型然後討論服務器的一些基本組件。
Threading Architecture線程體係結構
第一種也是最直觀的方式去實現一個多線程的服務器是每個連接一個線程的方式。這是JAVA1.4以前的解決方案,由於老版本的JAVA缺少非阻塞的I/O支持。每個連接一個線程的方法分配一個獨家的工作線程給每個連接。在處理循環中,工作線程等待新進入的數據,處理這個請求,返回響應數據,然後再調用阻塞socket的read方法。
02 |
private ExecutorService executors = Executors.newFixedThreadPool( 10 );
|
03 |
private boolean isRunning = true ;
|
05 |
public static void main(String... args) throws ... {
|
06 |
new Server().launch(Integer.parseInt(args[ 0 ]));
|
09 |
public void launch( int port) throws ... {
|
10 |
ServerSocket sso = new ServerSocket(port);
|
12 |
Socket s = sso.accept();
|
13 |
executors.execute( new Worker(s));
|
17 |
private class Worker implements Runnable {
|
18 |
private LineNumberReader in = null ;
|
21 |
Worker(Socket s) throws ... {
|
22 |
in = new LineNumberReader( new InputStreamReader(...));
|
29 |
// blocking read of a request (line)
|
30 |
String request = in.readLine();
|
32 |
// processing the request
|
36 |
// return the response
|
39 |
} catch (Exception e ) {
|
在同時發生的客戶端連接和多個同步工作線程之間通常有一個單對單的關係。因為每個連接都有一個相關聯的服務端等待線程,因此可以有很好的響應時間。然而,高負載需要更多的同步運行的線程,這些限製了可擴展性。尤其是,長時間存活的連接像持久化的HTTP連接導致大量的同步工作線程存在,有浪費時間等待新的客戶端請求的趨勢。此外,成百上千的同步線程會浪費大量的棧空間。注意,舉例來說,Solaris/Sparc默認的JAVA棧空間是512KB.
如果server不得不處理大量同時發生的客戶端,並且能容忍慢,無反應的客戶端,就需要一種供替代的線程架構。每個事件一個線程的方式通過一種非常高效地方式實現了這樣的需求。工作線程和連接獨立,僅被用來處理特定的事件。舉例來說,如果一個數據接收事件發生了,一個工作線程將會用來處理特定於應用程序的編碼和服務任務(或至少啟動這些任務)。任務一結束,工作線程就會回到線程池中。這種方式需要無阻塞的處理socket的I/O。調用socket的read或write方法需要時無阻塞的。此外,一個事件係統是必須的;它會發信號表明是否有新數據,輪流發起socket的read方法。這種方式移除了等待線程和工作線程之間的一對一關係。這樣一個事件驅動的I/0係統的設計將會在反應堆模式中描述。
The Reactor Pattern反應堆模式
反應堆模式,如圖1所示,把事件的檢測例如準備就緒讀或者準備就緒接受數據和事件的處理分離。如果一個準備就緒的事件發生了,專用工作線程內的一個事件處理器就會被通知去執行適當的處理。

Figure 1. A NIO-based Reactor pattern implementation
連接通道需要先在Selector類中注冊才能參與事件的架構。這可以通過調用regisster()方法來實現。雖然這個方法是SocketChannel的一部分,這個通道將會在Selector中注冊,沒有其它的方法。
2 |
SocketChannel channel = serverChannel.accept(); |
3 |
channel.configureBlocking( false );
|
5 |
// register the connection |
6 |
SelectionKey sk = channel.register(selector, SelectionKey.OP_READ); |
為了檢測新的事件,Selector類提供了請求已注冊的通道就緒事件的能力。通過調用select方法 ,Selector收集已注冊通道的就緒事件。這個方法的調用會阻塞,直到至少一個事件已經發生。在這種情況下,方法返回了自上次調用之後就緒的I/O操作的連接數。所選的連接可以通過調用Selector的selectedkey方法來檢測。這個方法返回一個Selectionkey對象集合,裏麵存放了IO事件的狀態和連接通道的引用。
一個Selector存在於Dispatcher中。這是一個單線程的活動類圍饒著Selector類。Dispatcher類的職責是檢測事件然後分發消費事件的處理給EventHandler類。在這個分發循環中,Dispatcher類調用Selector類的select方法等待新的事件。如果至少一個事件發生了,這個方法就返回,每個事件相關的通道可以通過調用selectedkeys方法獲得。
03 |
// blocking call, to wait for new readiness events
|
04 |
int eventCount = selector.select();
|
07 |
Iterator<SelectionKey> it = selector.selectedKeys().iterator();
|
08 |
while (it.hasNext()) {
|
09 |
SelectionKey key = it.next();
|
13 |
if (key.isValid() && key.isReadable()) {
|
14 |
eventHandler.onReadableEvent(key.channel());
|
18 |
if (key.isValid() && key.isWritable()) {
|
19 |
key.interestOps(SelectionKey.OP_READ); // reset to read only
|
20 |
eventHandler.onWriteableEvent(key.channel());
|
基於一個事件,類似於就緒讀或就緒寫,EventHandler會被Dispatcher調用來處理這個事件。EventHandler解碼請求數據,處理必須的服務活動,編碼響應數據。由於工作線程沒有被強製去浪費時間等待新的請求然後建立一個連接,這種方式的可擴展性和吞吐量理論上隻限製於係統資源像CPU和內存。這既便是說,響應時間將沒有每個連接一個線程的方式快,由於參與線程間的切換和同步。事件驅動方法的挑戰因此是最少化同步和優化線程管理,以致於這些影響可以被忽略。
Component Architecture組件架構
大多數具有高可擴展性的JAVA服務器都是建立在反應堆模式上的。這樣做,反應堆模式中的類將會被增強,因為需要額外的類來連接管理,緩衝區管理,以及負載均衡。這個服用器的入口類是一個Acceptor。這個安排如圖2所示。

Figure 2. Major components of a connection-oriented server
Acceptor接收器
一個服務器每個新的客戶端連接將會被單個Acceptor所接收,Acceptor與服務器的端口綁定。接收器是一個單線程的活動類。由於Acceptor僅負責處理曆時非常短的客戶端連接請求,經常隻要用阻塞I/0模式實現Acceptor就足夠了。Acceptor通過調用Serversocketchannel的阻塞accept方法來處理新請求。新請求將會注冊到Dispatcher,這之後,請求就可以參與到事件處理中了。
由於一個Dispatcher的可擴展性非常有限,通常都會使用一個小的Dispatchers的池。這個限製當中的一個原因是特定的操作係統實現的Selector。大多數的操作係統一對一的映射SocketChannel和文件處理。取決於具體的係統,每個Selector的最大文件處理數的限製也是不同的。
01 |
class Acceptor implements Runnable {
|
04 |
ServerSocketChannel serverChannel = ServerSocketChannel.open();
|
05 |
serverChannel.configureBlocking( true );
|
06 |
serverChannel.socket().bind( new InetSocketAddress(serverPort));
|
12 |
SocketChannel channel = serverChannel.accept();
|
14 |
Connection con = new Connection(channel, appHandler);
|
15 |
dispatcherPool.nextDispatcher().register(con);
|
在示例代碼中,一個連接對象持有SocketChannel和應用級別的事件處理器。我們將會在下麵描述這些類。
Dispatcher分配器
通過調用Dispatcher的register方法,SocketChannel將會注冊到相關的Selector上。這裏就是問題的來源。Selector在內部使用key集合來管理注冊的通道。這意味著每次注冊一個通道,一個相關連的SelectionKey會被創建並被加入到Selector的注冊key集合。同時,並發的分發線程可以調用Selector的select方法,也會訪問這個key集合。由於key集合是非線程安全的,一個非同步的Acceptor上下文注冊會導致死鎖和競爭。這個可以通過實現selector guard object idiom來解決,它允許暫時的掛起分配線程。參考”“https://developers.sun.com/learning/javaoneonline/2006/coreplatform/TS-1315.pdf”> How to Build a Scalable Multiplexed Server with NIO” (PDF)來查看這個方法的解釋。
01 |
class Dispatcher implements Runnable {
|
02 |
private Object guard = new Object();
|
05 |
void register(Connection con) {
|
06 |
// retrieve the guard lock and wake up the dispatcher thread
|
07 |
// to register the connection's channel
|
08 |
synchronized (guard) {
|
10 |
con.getChannel().register(selector, SelectionKey.OP_READ, con);
|
13 |
// notify the application EventHandler about the new connection
|
17 |
void announceWriteNeed(Connection con) {
|
18 |
SelectionKey key = con.getChannel().keyFor(selector);
|
19 |
synchronized (guard) {
|
21 |
key.interestOps(SelectionKey.OP_READ | SelectionKey.OP_WRITE);
|
27 |
synchronized (guard) {
|
28 |
// suspend the dispatcher thead if guard is locked
|
30 |
int eventCount = selector.select();
|
32 |
Iterator<SelectionKey> it = selector.selectedKeys().iterator();
|
33 |
while (it.hasNext()) {
|
34 |
SelectionKey key = it.next();
|
38 |
if (key.isValid() && key.isReadable()) {
|
39 |
Connection con = (Connection) key.attachment();
|
40 |
disptacherEventHandler.onReadableEvent(con);
|
在這個連接注冊之後,Selector監聽這個連接的就緒事件。如果一個事件發生了,通過傳遞相關的連接,這個Dispatcher的事件處理類的合適的回調方法將會被調用。
分配器級別事件處理器
處理一個就緒讀事件的第一個行為是調用通道的讀方法。與流接口相反,通道接口需要忽略讀緩衝接口。通常會使用直接分配的ByteBuffer。直接緩衝區存在於本地內存,繞過JAVA堆內存。通過使用直接緩衝,socket的IO操作不再需要創建內部中間緩衝器。
通常情況下,讀請求會被非常快的執行。Socket的讀操作通常隻是把一份接收到的數據從內核內存空間拷貝到讀緩衝區,這個數據會存在於用戶控製的內存空間。這些接收的數據將會被添加到連接的線程安全的讀隊列作進一步的處理。基於I/O操作的結果,特定於應用程序的任務會被執行。這些任務會被分配的應用級別的事件處理器處理。這類處理器通常被稱為工作線程。
01 |
class DispatcherEventHandler {
|
04 |
void onReadableEvent( final Connection con) {
|
05 |
// get the received data
|
06 |
ByteBuffer readBuffer = allocateMemory();
|
07 |
con.getChannel().read(readBuffer);
|
08 |
ByteBuffer data = extractReadAndRecycleRenaming(readBuffer);
|
10 |
// append it to read queue
|
11 |
con.getReadQueue().add(data);
|
14 |
// perform further operations (encode, process, decode)
|
16 |
if (con.getReadQueue().getSize() > 0 ) {
|
17 |
workerPool.execute( new Runnable() {
|
20 |
con.getAppHandler().onData(con);
|
27 |
void onWriteableEvent(Connection con) {
|
28 |
ByteBuffer[] data = con.getWriteQueue().drain();
|
29 |
con.getChannel().write(data); // write the data
|
32 |
if (con.getWriteQueue().isEmpty()) {
|
34 |
dispatcher.deregister(con);
|
38 |
// there is remaining data to write
|
39 |
dispatcher.announceWriteNeed(con);
|
在特定於應用程序的任務中,數據會被編碼,服務會被執行,數據會被寫入。在寫數據的時候,要被發送的數據會加入到寫隊列,然後調用Dispatcher類的announceWriteNeed方法。這個方法讓Selector開始監聽就緒讀事件。如果這種事件發生,分配器級別的事件處理器就會執行onWriteableEvent方法。這從通道的寫隊列獲取數據然後執行必要的寫I/O操作。試圖直接寫數據,通過這種方法,將會導致死鎖和競爭。
應用級別事件處理器
與分配器事件處理器相比,特定於應用的事件處理器監聽高級別的麵向連接的事件,例如建立連接,數據接收或者是關閉連接。具體的事件處理設計是NIO服務器框架像SEDA,MINA還有emberIO之間最大的不同。這些框架通常實現了多級的架構,這樣事件處理鏈就可以使用。它允許增加像SSLHandler或DelayerWriteHandler之類可以攔截請求/響應處理的處理器。下麵的例子展示了一個基於xSocket框架的應用級別的處理器。xScoket框架支持不同的處理器接口,這些接口裏麵定義了需要被實現的特定於應用的回調方法代碼。
01 |
class POP3ProtocolHandler implements IConnectHandler, IDataHandler, ... {
|
02 |
private static final String DELIMITER = ...
|
03 |
private Mailbox mailbox = ...
|
05 |
public static void main(String... args) throws ... {
|
06 |
new MultithreadedServer( 110 , new POP3ProtocolHandler()).run();
|
09 |
public boolean onConnect(INonBlockingConnection con) throws ... {
|
10 |
if (gatekeeper.isSuspiciousAddress(con.getRemoteAddress())) {
|
11 |
con.setWriteTransferRate( 5 ); // reduce transfer: 5byte/sec
|
14 |
con.write( "+OK My POP3-Server" + DELIMITER);
|
18 |
public boolean onData(INonBlockingConnection con) throws ... {
|
19 |
String request = con.readStringByDelimiter(DELIMITER);
|
21 |
if (request.startsWith( "QUIT" )) {
|
23 |
con.write( "+OK POP3 server signing off" + DELIMITER);
|
26 |
} else if (request.startsWith( "USER" )) {
|
27 |
this .user = request.substring( 4 , request.length());
|
28 |
con.write( "+OK enter password" + DELIMITER);
|
30 |
} else if (request.startsWith( "PASS" )) {
|
31 |
String pwd = request.substring( 4 , request.length());
|
32 |
boolean isAuthenticated = authenticator.check(user, pwd);
|
33 |
if (isAuthenticated) {
|
34 |
mailbox = MailBox.openAndLock(user);
|
35 |
con.write( "+OK mailbox locked and ready" + DELIMITER);
|
為了更簡便的訪問底層的讀寫隊列,Connection對象提供了一些便利的麵向流和通道的讀寫方法。
通過關閉連接,底層實現初始化一個可寫事件往返的刷新寫隊列。連接會在遺留的數據被寫完之後終止。除了這樣一個控製終端,連接還能因為其它的原因關閉。例如,硬件故障可能導致基於TCP的連接中斷。這樣的情況隻有在socket上執行讀寫操作或空閑超時的時候檢測到。大多數的NIO框架提供一個內置的程序來處理這些不受控製的中斷。
Conclusion總結
一個事件驅動的非阻塞架構是實現高效,高擴展性和高穩定性服務器的一個基本的層。其中的挑戰就是最小化線程同步開銷和優化連接和緩衝區的管理。這會是編程中最困難的部分。
但是沒有必要重複發明輪子。一些框架像xSocket,emberIO,SEDA或MINA都抽象了低層次的事件處理和線程管理來簡化創建高可擴展性的服務器。以上大部分的框架都支持SSL和UDP,本文中未提及這兩點。
Resources參考資料
作者:
Gregor Roth works as a software architect at United Internet group, a leading European Internet Service Provider to which GMX, 1&1, and Web.de belong. His areas of interest include software and system architecture, enterprise architecture management, object-oriented design, distributed computing, and development methodologies.
最後更新:2017-05-23 14:32:36