Hadoop源碼學習:RPC
Hadoop源碼學習:RPC
Hadoop RPC使用java NIO編寫,達到高性能,輕量級,可控性。 主要分為四層:序列化層,函數調用層,網絡傳輸層,服務器端處理框架
- 序列化層:實現Writable接口
- 函數調用層:java反射機製和動態代理實現函數調用
- 網絡傳輸層:使用Socket機製
- 服務器端處理框架:基於Reactor設計模式的事件驅動I/O模型
如何使用Hadoop RPC:
RPC Server:
- 1.定義一個協議,實現VersionedProtocol接口,
public interface TestProtocol extends VersionedProtocol { public static final long versionID = 1L; void ping() throws IOException; }
-
2.實現RPC協議
public static class TestImpl implements TestProtocol { int fastPingCounter = 0; public long getProtocolVersion(String protocol, long clientVersion) { return TestProtocol.versionID; } public int add(int v1, int v2) { return v1 + v2; }
- 3.服務端開啟服務
Server server = RPC.getServer(new TestImpl(), 0.0.0.0, 0, conf); server.start();
RPC Client
- 1.根據接口,遠程調用
TestProtocol proxy = null; proxy = (TestProtocol)RPC.getProxy( TestProtocol.class, TestProtocol.versionID, addr, conf); int intResult = proxy.add(1, 2);
ipc.RPC類分析
我們來逐行分析代碼
//客戶端調用服務
proxy = (TestProtocol)RPC.getProxy(
TestProtocol.class, TestProtocol.versionID, addr, conf);
getProxy方法,通過動態代理,將TestProtocol的調用交由RPC.Invoker管理,所有對TestProtocol的方法調用,都將交由RPC.Invoker的invoke()方法處理。
int intResult = proxy.add(1, 2);//RPC.Invoker的invoke()方法處理
RPC.Invoker將函數調用信息(函數名、函數參數列表等)打包成可序列化的RPC.Invocation對象,由ipc.Client發起鏈接
//從連接池中取出socket鏈接
client.call(new Invocation(method, args), remoteId);
ipc.Client類分析
- 用ConnectionId標識一個鏈接,同一個InetSocketAddress將共享同一個鏈接
- 鏈接被封住成Connection,每個方法調用都被封裝成一個Call,Connection維護一個Call的HashMap
ipc.Server類分析
Hadoop采用Master/Slave結構,ipc.Server采用了線程池、事件驅動和Reactor設計模式來提高並發性。
ipc.Server是一個虛類
1.單線程Listener監聽客戶端請求,從Reader線程池中取出Reader線程處理請求,Reader讀取客戶端的調用請求並放入CallQueue隊列
2.Handler從CallQueue隊列中取出Call,執行函數調用,返回結果
3.如果函數調用返回的結果過大或者網絡異常,,由單線程Respondeer使用異步方式繼續發送未發送完成的結果 詳細分析代碼:
-
ipc.Server.Listener類
//服務端開啟監聽 ServerSocketChannel acceptChannel = ServerSocketChannel. open(); //設置為非阻塞模式 acceptChannel.configureBlocking( false ); // Bind the server socket to the local host and port bind(acceptChannel .socket(), address, backlogLength); port = acceptChannel.socket().getLocalPort(); //Could be an ephemeral port // create a selector,用於監聽事件; selector= Selector. open(); //客戶端連接服務器將會發生OP_ACCEPT事件 acceptChannel .register( selector, SelectionKey. OP_ACCEPT ); while ( running) { SelectionKey key = null ; try { //阻塞監聽事件 selector.select(); //發生事件了,獲取SelectionKey,SelectionKey.channel()可以獲取到發生事件的Channel Iterator<SelectionKey> iter = selector.selectedKeys().iterator(); while (iter.hasNext()) { key = iter.next(); iter.remove(); try { if (key.isValid()) { if (key.isAcceptable()) //建立鏈接,執行後續的讀取客戶端的請求內容 doAccept(key); } } catch (IOException e) { } key = null ; } } catch (OutOfMemoryError e) { }
//建立鏈接,執行後續的讀取客戶端的請求內容 void doAccept(SelectionKey key) throws IOException, OutOfMemoryError { Connection c = null ; //發生事件的Channel ServerSocketChannel server = (ServerSocketChannel) key.channel(); SocketChannel channel; //與客戶端建立鏈接 while ((channel = server.accept()) != null ) { channel.configureBlocking( false ); channel.socket().setTcpNoDelay( tcpNoDelay ); //從讀者線程池中獲取reader,做後續操作 Reader reader = getReader(); try { reader.startAdd(); //將客戶端鏈接Channel注冊到讀者線程的監控器readSelector,一旦可讀,激活讀者線程開始讀取,向Select注冊事件返回一個Key SelectionKey readKey = reader.registerChannel(channel); c = new Connection(readKey, channel, System.currentTimeMillis ()); //將鏈接類Connection的引用存到readKey,reader線程可以通過readSelector .selectedKeys()再次獲取Key //並通過readKey.attachment()獲取寄存的引用 //Connection負責通信,reader也是通過調用Connection.readAndProcess()讀取客戶端的遠程調用請求,並將請求 //封裝成Call,放入callQueue。Handler線程會從callQueue中獲取Call,執行遠程調用 readKey.attach(c); synchronized (connectionList ) { connectionList .add(numConnections , c); numConnections ++; }finally { reader.finishAdd(); } } }
-
ipc.Server.Listener.Reader類
public void run() { LOG.info("Starting " + getName()); synchronized (this) { while (running) { SelectionKey key = null; try { //阻塞監聽客戶端的可讀事件 readSelector.select(); while (adding) { this.wait(1000); } Iterator<SelectionKey> iter = readSelector.selectedKeys().iterator(); while (iter.hasNext()) { key = iter.next(); iter.remove(); if (key.isValid()) { if (key.isReadable()) { //執行讀操作 doRead(key); } } key = null; } } ... } } }
void doRead(SelectionKey key) throws InterruptedException { int count = 0; //獲取寄存的鏈接的引用 Connection c = (Connection)key.attachment(); if (c == null) { return; } c.setLastContact(System.currentTimeMillis()); try { //讀取客戶端的遠程調用請求,封住成Call,put到callQueue隊列中 count = c.readAndProcess(); } ... }
-
ipc.Server.Handler類,調用ipc.RPC.Server.Call()來執行遠程調用,將結果返回給客戶端、
public Writable call(Class<?> protocol, Writable param, long receivedTime) throws IOException { try { Invocation call = (Invocation)param; if (verbose) log("Call: " + call); Method method = protocol.getMethod(call.getMethodName(), call.getParameterClasses()); method.setAccessible(true); long startTime = System.currentTimeMillis(); //反射調用,instance為實例化的Protocol,call.getParameters()為客戶端發送過來的要調用的函數以及參數 Object value = method.invoke(instance, call.getParameters()); ... }
最後更新:2017-04-03 12:54:48