知识点
相关文章
更多最近更新
更多HDFS1.0源代码解析—Hadoop的RPC机制之Client解析
2019-03-28 13:38|来源: 网络
好久没有更新Hadoop相关的博客了,实在是各种压力缠身,各种东西都没准备好,面对即将找工作有点没有了节奏。
ok,开始说说今天的主题Hadoop的RPC机制,之所以在HDFS源码解析的系列中添加这部分的内容,是因为DN和NN交互使用的就是RPC的机制,而RPC机制这部分代码年前也是比较深入的研究过,但是是模仿RPC的机制进行分布式检索的实现。
开始先介绍一下RPC几个主要的组成类RPC.java、Client.java、Server.java,其中RPC类主要是提供对外服务的函数实现动态代理机制,Client是RPC进行服务的函数,主要是连接服务器、传递函数名和相应的参数、等待结果返回,Server主要接受Client的请求、执行相应的函数、返回结果。
Hadoop中的RPC比较难理解的部分就是动态代理的实现,感觉下面这个图画的比较清楚。从图中可以看出动态代理其实只能代理某一个接口,所以所有需要被动态代理的实现类都要实现该接口,在Hadoop中接口是VersionedProtocol。在动态代理中结合了InvocationHandler的功能,保证匿名实现类的方法能够得到正确的执行。使用匿名实现类的对象调用某个方法,实际上调用的是InvocationHandler中的invoke方法,通过该方法将要调用的函数名以及参数告知服务器端,后边会详细介绍一些细节。
结合NN和DN之间的交互作为例子对RPC的原理做一个简单的剖析
在DataNode.java的startDataNode函数中有这样一个调用
- 348 this.namenode = (DatanodeProtocol)
- 349 RPC.waitForProxy(DatanodeProtocol.class,
- 350 DatanodeProtocol.versionID,
- 351 nameNodeAddr,
- 352 conf);
下边看一下RPC中的相关实现:
首先看一下waitForProxy函数干了些什么
- 329 while (true) {
- 330 try {
- 331 return getProxy(protocol, clientVersion, addr, conf, rpcTimeout);
- 332 } catch(ConnectException se) { // namenode has not been started
- 392 VersionedProtocol proxy =
- 393 (VersionedProtocol) Proxy.newProxyInstance(
- 394 protocol.getClassLoader(), new Class[] { protocol },
- 395 new Invoker(protocol, addr, ticket, conf, factory, rpcTimeout));
- 203 private static class Invoker implements InvocationHandler {
- 208 public Invoker(Class<? extends VersionedProtocol> protocol,
- 209 InetSocketAddress address, UserGroupInformation ticket,
- 210 Configuration conf, SocketFactory factory,
- 211 int rpcTimeout) throws IOException {
- 212 this.remoteId = Client.ConnectionId.getConnectionId(address, protocol,
- 213 ticket, rpcTimeout, conf);
- 214 this.client = CLIENTS.getClient(conf, factory);
- 215 }
前边也提到在使用RPC对象调用相关函数的时,其实进行真正共的是InvocationHandler中的invoke方法。那么我们必须来看一个这个invoke方法的实现了,其中最核心的代码如下
- 225 ObjectWritable value = (ObjectWritable)
- 226 client.call(new Invocation(method, args), remoteId);
下边我们需要看一下在client的call方法中发生了那些剧情,核心代码如下:
- 1043 public Writable call(Writable param, ConnectionId remoteId)
- 1044 throws InterruptedException, IOException {
- 1045 Call call = new Call(param);
- 1046 Connection connection = getConnection(remoteId, call);
- 1047 connection.sendParam(call); // send the parameter
- 1048 boolean interrupted = false;
- 1049 synchronized (call) {
- 1050 while (!call.done) {
- 1051 try {
- 1052 call.wait(); // wait for the result
- 1053 } catch (InterruptedException ie) {
- 1054 // save the fact that we were interrupted
- 1055 interrupted = true;
- 1056 }
- 1057 }
首先我们可以看到传递给服务器的被序列化后的参数被封装成了一个Call类型的对象,很明显我们就要问一下,你会什么有这么做呢?答案是这样的,call是一个设计很巧妙的类,它封装了查询的信息和查询的结果,阻塞查询线程(查询过程是一个同步的过程)。先写到这,有点困回宿舍睡觉先。
相关阅读:HDFS1.0源代码解析—Hadoop的RPC机制之Server端解析 http://www.linuxidc.com/Linux/2012-07/64950.htm
相关问答
更多-
hadoop hdfs的问题[2021-10-30]
最下面那张图里环境变量设置的那一行多了一个$符号 export JAVA_HOME=/usr/java/jdk1.6.0_35 -
当客户端想要写入DataNode时,它会联系NameNode。 反过来,NameNode借助于基于DataNode发送的块报告生成的块位置图,告诉客户端哪个特定的DataNode具有可以写入数据的空闲块。 然后客户端开始直接写入该节点,而无需与NameNode交互。 所以这是基于空间可用性的随机性。 它可以是集群中n个节点中的任何一个节点。 当特定DataNode累积大量数据时,它会开始将数据推送到其他节点以创建副本(基于您的复制因子)。 因此,DataNode可能同时进行读写。 org.apache.ha ...
-
2个样品在这里 有两种不同的实现。 阅读整个线程+检查附件 2 Samples here There are two different implementations. Read the whole thread + check the attachments
-
关于hadoop:您需要确保core-site.xml namenode条目在hadoop配置中服务于0.0.0.0而不是127.0.0.1(localhost)。 重要的是,由于某种原因,clouderas vm发行版默认为localhost。 Regarding hadoop : You need to make sure the core-site.xml namenode entry is serving to 0.0.0.0 instead of 127.0.0.1 (localhost) in ...
-
解析源代码(Parsing source code)[2023-04-23]
yacc和lex是非常强大的工具,围绕编译器构造的理论构建。 为了能够完全理解它们,您可能需要一些形式语言,自动机理论和编译器构造的基础知识。 龙书是这个主题的经典之作。 yacc and lex are very powerful tools, built around the theories for compiler construction. To be able to fully understand them you probably need some basics in formal lan ... -
在数据结构上使用value()方法。 Use the value() method on the data structure.
-
问题是由于您在代码中使用的库中的版本不匹配。 删除所有库并添加从hadoop安装本身收集的相应库。 I found the solution, i used hadoop core dependency in the pom.xml file, and hadoop core was a part of hadoop 1.X package and the rest of the dependency was from hadoop 2.X hence there was a version conflic ...
-
JDeveloper 11g JAX-RPC Web服务客户端无法解析WSDL(JDeveloper 11g JAX-RPC Web Service Client Unable to Parse WSDL)[2023-12-30]
我也得到了这个错误,我无法找出根本原因,但是我用ws格式而不是rpc重新创建了客户端,它对我rpc 。 I got this error too, I could not figure out the root cause, however I recreated the client with ws format instead of rpc and it worked for me. -
现在我知道如何处理它...首先升级dfs! Now I know how to deal with it... upgrade the dfs first!
-
使用GWT中的RPC类。 您必须提供序列化策略 ,其强名称在请求标头中传递。 解码响应更难。 您可以使用com.google.gwt.user.client.rpc.impl.RequestCallbackAdapter.ResponseReader以及com.google.gwt.user.client.rpc.impl.ClientSerializationStreamReader但您需要拥有来自JsParser类路径中的gwt-dev.jar ; 并且你不能在web应用程序中拥有gwt-dev.jar ...