HDFS1.0源代码解析—Hadoop的RPC机制之Client解析

2019-03-28 13:38|来源: 网络

好久没有更新Hadoop相关的博客了,实在是各种压力缠身,各种东西都没准备好,面对即将找工作有点没有了节奏。

ok,开始说说今天的主题Hadoop的RPC机制,之所以在HDFS源码解析的系列中添加这部分的内容,是因为DN和NN交互使用的就是RPC的机制,而RPC机制这部分代码年前也是比较深入的研究过,但是是模仿RPC的机制进行分布式检索的实现。

开始先介绍一下RPC几个主要的组成类RPC.java、Client.java、Server.java,其中RPC类主要是提供对外服务的函数实现动态代理机制,Client是RPC进行服务的函数,主要是连接服务器、传递函数名和相应的参数、等待结果返回,Server主要接受Client的请求、执行相应的函数、返回结果。

Hadoop中的RPC比较难理解的部分就是动态代理的实现,感觉下面这个图画的比较清楚。从图中可以看出动态代理其实只能代理某一个接口,所以所有需要被动态代理的实现类都要实现该接口,在Hadoop中接口是VersionedProtocol。在动态代理中结合了InvocationHandler的功能,保证匿名实现类的方法能够得到正确的执行。使用匿名实现类的对象调用某个方法,实际上调用的是InvocationHandler中的invoke方法,通过该方法将要调用的函数名以及参数告知服务器端,后边会详细介绍一些细节。


结合NN和DN之间的交互作为例子对RPC的原理做一个简单的剖析

在DataNode.java的startDataNode函数中有这样一个调用

  1. 348     this.namenode = (DatanodeProtocol)  
  2. 349       RPC.waitForProxy(DatanodeProtocol.class,  
  3. 350                        DatanodeProtocol.versionID,  
  4. 351                        nameNodeAddr,  
  5. 352                        conf);  
我们可以看到通过调用RPC的waitForProxy方法创建了一个DatanodeProtocol对象namenode(其中DatanodeProtocol实现了接口VersionedProtocol),那么namenode应该就是前边提到的匿名实现类的对象,通过这个对象就可以与NN进行交互了。

下边看一下RPC中的相关实现:

首先看一下waitForProxy函数干了些什么

  1. 329     while (true) {  
  2. 330       try {  
  3. 331         return getProxy(protocol, clientVersion, addr, conf, rpcTimeout);  
  4. 332       } catch(ConnectException se) {  // namenode has not been started  
很明显waitForProxy是保证NN出现一些意外的情况下,我们还是可以获得相应的交互对象。那么接着来看getProxy的作用:
  1. 392     VersionedProtocol proxy =  
  2. 393         (VersionedProtocol) Proxy.newProxyInstance(  
  3. 394             protocol.getClassLoader(), new Class[] { protocol },  
  4. 395             new Invoker(protocol, addr, ticket, conf, factory, rpcTimeout));  
这里才是生成RPC对象的入口(姑且就称作RPC对象),前两个参数 protocol.getClassLoader(), new Class[] { protocol },比较好理解,一个是获得相应的classloader(因为需要在运行是动态的生成对象),第二个就是说明相应的接口类。可能现在最感兴趣的是第三个参数new Invoker(protocol, addr, ticket, conf, factory, rpcTimeout),我们前边提到过 Proxy需要与InvocationHandler相结合才能达到想要的效果,那么第三个参数到底是不是InvocationHandler的对象呢,我们看一下Invoke类的相关实现。
  1. 203   private static class Invoker implements InvocationHandler {  
果然没有令我们失望,这与前面的分析完全吻合。
  1. 208     public Invoker(Class<? extends VersionedProtocol> protocol,  
  2. 209         InetSocketAddress address, UserGroupInformation ticket,  
  3. 210         Configuration conf, SocketFactory factory,  
  4. 211         int rpcTimeout) throws IOException {  
  5. 212       this.remoteId = Client.ConnectionId.getConnectionId(address, protocol,  
  6. 213           ticket, rpcTimeout, conf);  
  7. 214       this.client = CLIENTS.getClient(conf, factory);  
  8. 215     }  
看一下Invoke构造函数主要完成的工作,初始化一个 Client.ConnectionId remoteId;,可以看出这个对象主要封装了服务器地址,代理接口的类型以及连接服务器时用到的一些参数。另外一个关键的对象是client,就是同感client对象与NN进行交互。

前边也提到在使用RPC对象调用相关函数的时,其实进行真正共的是InvocationHandler中的invoke方法。那么我们必须来看一个这个invoke方法的实现了,其中最核心的代码如下

  1. 225       ObjectWritable value = (ObjectWritable)  
  2. 226         client.call(new Invocation(method, args), remoteId);  
代码很简短,主要就是调用了client的call方法。其中比较感兴趣的参数应该是new Invocation(method, args)这个Invocation的对象,其实从字面意思我们就可以理解到这个对象封装是具体调用的方法和参数(这些信息需要传递到服务器端,执行服务器端相应的方法)。其实这个Invocation类实现了Writable接口,熟悉源代码的童鞋应该知道这是Hadoop序列化和反序列话的一个接口,作用就明显了,那就是方便把这些信息以序列化的方式传递到服务器端。

下边我们需要看一下在client的call方法中发生了那些剧情,核心代码如下:

  1. 1043   public Writable call(Writable param, ConnectionId remoteId)  
  2. 1044                        throws InterruptedException, IOException {  
  3. 1045     Call call = new Call(param);  
  4. 1046     Connection connection = getConnection(remoteId, call);  
  5. 1047     connection.sendParam(call);                 // send the parameter   
  6. 1048     boolean interrupted = false;  
  7. 1049     synchronized (call) {  
  8. 1050       while (!call.done) {  
  9. 1051         try {  
  10. 1052           call.wait();                           // wait for the result   
  11. 1053         } catch (InterruptedException ie) {  
  12. 1054           // save the fact that we were interrupted   
  13. 1055           interrupted = true;  
  14. 1056         }  
  15. 1057       }  

首先我们可以看到传递给服务器的被序列化后的参数被封装成了一个Call类型的对象,很明显我们就要问一下,你会什么有这么做呢?答案是这样的,call是一个设计很巧妙的类,它封装了查询的信息和查询的结果,阻塞查询线程(查询过程是一个同步的过程)。先写到这,有点困回宿舍睡觉先。

相关阅读:HDFS1.0源代码解析—Hadoop的RPC机制之Server端解析 http://www.linuxidc.com/Linux/2012-07/64950.htm

相关问答

更多