Hadoop源码学习:RPC
Hadoop源码学习:RPC
Hadoop RPC使用java NIO编写,达到高性能,轻量级,可控性。 主要分为四层:序列化层,函数调用层,网络传输层,服务器端处理框架
- 序列化层:实现Writable接口
- 函数调用层:java反射机制和动态代理实现函数调用
- 网络传输层:使用Socket机制
- 服务器端处理框架:基于Reactor设计模式的事件驱动I/O模型
如何使用Hadoop RPC:
RPC Server:
- 1.定义一个协议,实现VersionedProtocol接口,
public interface TestProtocol extends VersionedProtocol { public static final long versionID = 1L; void ping() throws IOException; }
-
2.实现RPC协议
public static class TestImpl implements TestProtocol { int fastPingCounter = 0; public long getProtocolVersion(String protocol, long clientVersion) { return TestProtocol.versionID; } public int add(int v1, int v2) { return v1 + v2; }
- 3.服务端开启服务
Server server = RPC.getServer(new TestImpl(), 0.0.0.0, 0, conf); server.start();
RPC Client
- 1.根据接口,远程调用
TestProtocol proxy = null; proxy = (TestProtocol)RPC.getProxy( TestProtocol.class, TestProtocol.versionID, addr, conf); int intResult = proxy.add(1, 2);
ipc.RPC类分析
我们来逐行分析代码
//客户端调用服务
proxy = (TestProtocol)RPC.getProxy(
TestProtocol.class, TestProtocol.versionID, addr, conf);
getProxy方法,通过动态代理,将TestProtocol的调用交由RPC.Invoker管理,所有对TestProtocol的方法调用,都将交由RPC.Invoker的invoke()方法处理。
int intResult = proxy.add(1, 2);//RPC.Invoker的invoke()方法处理
RPC.Invoker将函数调用信息(函数名、函数参数列表等)打包成可序列化的RPC.Invocation对象,由ipc.Client发起链接
//从连接池中取出socket链接
client.call(new Invocation(method, args), remoteId);
ipc.Client类分析
- 用ConnectionId标识一个链接,同一个InetSocketAddress将共享同一个链接
- 链接被封住成Connection,每个方法调用都被封装成一个Call,Connection维护一个Call的HashMap
ipc.Server类分析
Hadoop采用Master/Slave结构,ipc.Server采用了线程池、事件驱动和Reactor设计模式来提高并发性。
ipc.Server是一个虚类
1.单线程Listener监听客户端请求,从Reader线程池中取出Reader线程处理请求,Reader读取客户端的调用请求并放入CallQueue队列
2.Handler从CallQueue队列中取出Call,执行函数调用,返回结果
3.如果函数调用返回的结果过大或者网络异常,,由单线程Respondeer使用异步方式继续发送未发送完成的结果 详细分析代码:
-
ipc.Server.Listener类
//服务端开启监听 ServerSocketChannel acceptChannel = ServerSocketChannel. open(); //设置为非阻塞模式 acceptChannel.configureBlocking( false ); // Bind the server socket to the local host and port bind(acceptChannel .socket(), address, backlogLength); port = acceptChannel.socket().getLocalPort(); //Could be an ephemeral port // create a selector,用于监听事件; selector= Selector. open(); //客户端连接服务器将会发生OP_ACCEPT事件 acceptChannel .register( selector, SelectionKey. OP_ACCEPT ); while ( running) { SelectionKey key = null ; try { //阻塞监听事件 selector.select(); //发生事件了,获取SelectionKey,SelectionKey.channel()可以获取到发生事件的Channel Iterator<SelectionKey> iter = selector.selectedKeys().iterator(); while (iter.hasNext()) { key = iter.next(); iter.remove(); try { if (key.isValid()) { if (key.isAcceptable()) //建立链接,执行后续的读取客户端的请求内容 doAccept(key); } } catch (IOException e) { } key = null ; } } catch (OutOfMemoryError e) { }
//建立链接,执行后续的读取客户端的请求内容 void doAccept(SelectionKey key) throws IOException, OutOfMemoryError { Connection c = null ; //发生事件的Channel ServerSocketChannel server = (ServerSocketChannel) key.channel(); SocketChannel channel; //与客户端建立链接 while ((channel = server.accept()) != null ) { channel.configureBlocking( false ); channel.socket().setTcpNoDelay( tcpNoDelay ); //从读者线程池中获取reader,做后续操作 Reader reader = getReader(); try { reader.startAdd(); //将客户端链接Channel注册到读者线程的监控器readSelector,一旦可读,激活读者线程开始读取,向Select注册事件返回一个Key SelectionKey readKey = reader.registerChannel(channel); c = new Connection(readKey, channel, System.currentTimeMillis ()); //将链接类Connection的引用存到readKey,reader线程可以通过readSelector .selectedKeys()再次获取Key //并通过readKey.attachment()获取寄存的引用 //Connection负责通信,reader也是通过调用Connection.readAndProcess()读取客户端的远程调用请求,并将请求 //封装成Call,放入callQueue。Handler线程会从callQueue中获取Call,执行远程调用 readKey.attach(c); synchronized (connectionList ) { connectionList .add(numConnections , c); numConnections ++; }finally { reader.finishAdd(); } } }
-
ipc.Server.Listener.Reader类
public void run() { LOG.info("Starting " + getName()); synchronized (this) { while (running) { SelectionKey key = null; try { //阻塞监听客户端的可读事件 readSelector.select(); while (adding) { this.wait(1000); } Iterator<SelectionKey> iter = readSelector.selectedKeys().iterator(); while (iter.hasNext()) { key = iter.next(); iter.remove(); if (key.isValid()) { if (key.isReadable()) { //执行读操作 doRead(key); } } key = null; } } ... } } }
void doRead(SelectionKey key) throws InterruptedException { int count = 0; //获取寄存的链接的引用 Connection c = (Connection)key.attachment(); if (c == null) { return; } c.setLastContact(System.currentTimeMillis()); try { //读取客户端的远程调用请求,封住成Call,put到callQueue队列中 count = c.readAndProcess(); } ... }
-
ipc.Server.Handler类,调用ipc.RPC.Server.Call()来执行远程调用,将结果返回给客户端、
public Writable call(Class<?> protocol, Writable param, long receivedTime) throws IOException { try { Invocation call = (Invocation)param; if (verbose) log("Call: " + call); Method method = protocol.getMethod(call.getMethodName(), call.getParameterClasses()); method.setAccessible(true); long startTime = System.currentTimeMillis(); //反射调用,instance为实例化的Protocol,call.getParameters()为客户端发送过来的要调用的函数以及参数 Object value = method.invoke(instance, call.getParameters()); ... }
最后更新:2017-04-03 12:54:48