閱讀214 返回首頁    go 阿裏雲 go 技術社區[雲棲]


Hadoop源碼學習:RPC


Hadoop源碼學習:RPC

Hadoop RPC使用java NIO編寫,達到高性能,輕量級,可控性。 主要分為四層:序列化層,函數調用層,網絡傳輸層,服務器端處理框架

  • 序列化層:實現Writable接口
  • 函數調用層:java反射機製和動態代理實現函數調用
  • 網絡傳輸層:使用Socket機製
  • 服務器端處理框架:基於Reactor設計模式的事件驅動I/O模型

如何使用Hadoop RPC:

RPC Server:

  • 1.定義一個協議,實現VersionedProtocol接口,
      public interface TestProtocol extends VersionedProtocol {
      public static final long versionID = 1L;
      void ping() throws IOException;
    }
  • 2.實現RPC協議

     public static class TestImpl implements TestProtocol {
      int fastPingCounter = 0;
    
      public long getProtocolVersion(String protocol, long clientVersion) {
        return TestProtocol.versionID;
      }
      public int add(int v1, int v2) {
        return v1 + v2;
      }
  • 3.服務端開啟服務
      Server server = RPC.getServer(new TestImpl(), 0.0.0.0, 0, conf);
      server.start();

RPC Client

  • 1.根據接口,遠程調用
      TestProtocol proxy = null;
      proxy = (TestProtocol)RPC.getProxy(
      TestProtocol.class, TestProtocol.versionID, addr, conf);
      int intResult = proxy.add(1, 2);

ipc.RPC類分析

我們來逐行分析代碼

    //客戶端調用服務
    proxy = (TestProtocol)RPC.getProxy(
    TestProtocol.class, TestProtocol.versionID, addr, conf);

getProxy方法,通過動態代理,將TestProtocol的調用交由RPC.Invoker管理,所有對TestProtocol的方法調用,都將交由RPC.Invoker的invoke()方法處理。

int intResult = proxy.add(1, 2);//RPC.Invoker的invoke()方法處理

RPC.Invoker將函數調用信息(函數名、函數參數列表等)打包成可序列化的RPC.Invocation對象,由ipc.Client發起鏈接

//從連接池中取出socket鏈接
client.call(new Invocation(method, args), remoteId);

ipc.Client類分析

  • 用ConnectionId標識一個鏈接,同一個InetSocketAddress將共享同一個鏈接
  • 鏈接被封住成Connection,每個方法調用都被封裝成一個Call,Connection維護一個Call的HashMap

ipc.Server類分析

Hadoop采用Master/Slave結構,ipc.Server采用了線程池、事件驅動和Reactor設計模式來提高並發性。
ipc.Server是一個虛類
1.單線程Listener監聽客戶端請求,從Reader線程池中取出Reader線程處理請求,Reader讀取客戶端的調用請求並放入CallQueue隊列
2.Handler從CallQueue隊列中取出Call,執行函數調用,返回結果
3.如果函數調用返回的結果過大或者網絡異常,,由單線程Respondeer使用異步方式繼續發送未發送完成的結果  詳細分析代碼:

  • ipc.Server.Listener類

       //服務端開啟監聽
        ServerSocketChannel acceptChannel = ServerSocketChannel. open();
       //設置為非阻塞模式
        acceptChannel.configureBlocking( false );
    
        // Bind the server socket to the local host and port
        bind(acceptChannel .socket(), address, backlogLength);
        port = acceptChannel.socket().getLocalPort(); //Could be an ephemeral port
        // create a selector,用於監聽事件;
        selector= Selector. open();
       //客戶端連接服務器將會發生OP_ACCEPT事件
        acceptChannel .register( selector, SelectionKey. OP_ACCEPT );
        while ( running) {
          SelectionKey key = null ;
          try {
            //阻塞監聽事件
            selector.select();
            //發生事件了,獲取SelectionKey,SelectionKey.channel()可以獲取到發生事件的Channel
            Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
            while (iter.hasNext()) {
              key = iter.next();
              iter.remove();
              try {
                if (key.isValid()) {
                  if (key.isAcceptable())
                    //建立鏈接,執行後續的讀取客戶端的請求內容
                    doAccept(key);
                }
              } catch (IOException e) {
              }
              key = null ;
            }
          } catch (OutOfMemoryError e) {
          }
       //建立鏈接,執行後續的讀取客戶端的請求內容
       void doAccept(SelectionKey key) throws IOException,  OutOfMemoryError {
        Connection c = null ;
        //發生事件的Channel
        ServerSocketChannel server = (ServerSocketChannel) key.channel();
        SocketChannel channel;
        //與客戶端建立鏈接
        while ((channel = server.accept()) != null ) {
          channel.configureBlocking( false );
          channel.socket().setTcpNoDelay( tcpNoDelay );
          //從讀者線程池中獲取reader,做後續操作
          Reader reader = getReader();
          try {
            reader.startAdd();
            //將客戶端鏈接Channel注冊到讀者線程的監控器readSelector,一旦可讀,激活讀者線程開始讀取,向Select注冊事件返回一個Key
            SelectionKey readKey = reader.registerChannel(channel);
            c = new Connection(readKey, channel, System.currentTimeMillis ());
            //將鏈接類Connection的引用存到readKey,reader線程可以通過readSelector .selectedKeys()再次獲取Key
            //並通過readKey.attachment()獲取寄存的引用
            //Connection負責通信,reader也是通過調用Connection.readAndProcess()讀取客戶端的遠程調用請求,並將請求
            //封裝成Call,放入callQueue。Handler線程會從callQueue中獲取Call,執行遠程調用
            readKey.attach(c);
            synchronized (connectionList ) {
              connectionList .add(numConnections , c);
              numConnections ++;
            }finally {
            reader.finishAdd();
          }
    
        }
      }
  • ipc.Server.Listener.Reader類

        public void run() {
          LOG.info("Starting " + getName());
          synchronized (this) {
            while (running) {
              SelectionKey key = null;
              try {
                //阻塞監聽客戶端的可讀事件
                readSelector.select();
                while (adding) {
                  this.wait(1000);
                }              
    
                Iterator<SelectionKey> iter = readSelector.selectedKeys().iterator();
                while (iter.hasNext()) {
                  key = iter.next();
                  iter.remove();
                  if (key.isValid()) {
                    if (key.isReadable()) {
                      //執行讀操作
                      doRead(key);
                    }
                  }
                  key = null;
                }
              } 
              ...
            }
          }
        }
      void doRead(SelectionKey key) throws InterruptedException {
        int count = 0;
        //獲取寄存的鏈接的引用
        Connection c = (Connection)key.attachment();
        if (c == null) {
          return;  
        }
        c.setLastContact(System.currentTimeMillis());
    
        try {
          //讀取客戶端的遠程調用請求,封住成Call,put到callQueue隊列中
          count = c.readAndProcess();
        } 
        ...
      }
  • ipc.Server.Handler類,調用ipc.RPC.Server.Call()來執行遠程調用,將結果返回給客戶端、

    public Writable call(Class<?> protocol, Writable param, long receivedTime) 
      throws IOException {
        try {
          Invocation call = (Invocation)param;
          if (verbose) log("Call: " + call);
    
          Method method =
            protocol.getMethod(call.getMethodName(),
                                     call.getParameterClasses());
          method.setAccessible(true);
    
          long startTime = System.currentTimeMillis();
          //反射調用,instance為實例化的Protocol,call.getParameters()為客戶端發送過來的要調用的函數以及參數
          Object value = method.invoke(instance, call.getParameters());
    ...
    }
本博客已遷移至:https://edwardsbean.github.io

最後更新:2017-04-03 12:54:48

  上一篇:go 藍橋杯 曆屆試題 連號區間數
  下一篇:go miui 係統組件 功能提示