Hadoop异步RPC通信机制

2019-03-28 13:11|来源: 网络

Hadoop的IPC是实现rpc的一种方法,不基于java的序列化机制。IPC中方法的调用参数和返回值只能是:

1、java基本类型

2、String和Writeable接口的实现类

3、以1、2元素为类型的数组

4、接口只允许抛出IOException

采用的是C/S模型(Client-NameNode,Client-DataNode,NameNode-DataNode,DataNode-DataNode)

如何区分不同的请求,hadoop的rpc通过给每个请求call一个id,当请求返回的时候,要附带上这个id,这样请求者就可以知道这是哪个请求了。

客户端Client与服务器Server的请求用Call类表示,以Call.id来区分不同的请求。不同的请求集合存放在连接类Connection的HashTable calls中:

private Hashtable<Integer, Call> calls = new Hashtable<Integer, Call>();

Connection是一个线程,表示Client和Server的连接,类似socket。一个C/S对应一个连接,对应一个Connection线程。Connection保存了该socket上的所有请求集合。同时在Connection.run里面循环处理接收端发送回来的响应信息。receiveResponse处理响应消息的流程如下:

先读取请求的id,根据该请求id得到相应的call,再读取响应的状态state,根据响应的状态(成功,错误,失败)进行处理。如果请求状态为成功,则将返回值setValue到call.value里【注:此时会notify唤醒阻塞的call,使得Client.call()方法知道已经请求成功了,好继续处理】,设置请求结束标志call.done=true,再将该call移除出calls。

一个Client可以连接多个Server,所有一个Client内可以有多个Connection。每个Connection用一个ConnectionId标识,保存在Client的HashTable connections中:

private Hashtable<ConnectionId, Connection>connections =new Hashtable<ConnectionId, Connection>();

Client发送请求给server的流程client.call():

将要发送的内容param作为参数,创建一个Call实例call-->调用client的getConnection(调用addCall将call添加到calls中)获取client-server对应的connection-->利用connection.sendParam(call)将call上的数据call.param发送出去-->等待server的响应call.wait()-->接收到响应(connection线程接收并处理响应消息。处理消息时Call.setValue会调用notify方法唤醒)之后,返回响应值call.value。

Client通过调用Connection.addCall(Call)方法将一个请求添加到calls中。

总结来说:

1、Connection线程是一个client和server间的连接,保存了它们的所有请求,并不断接收和处理server发送过来的请求。如果

2、client通过Client.call()将请求发送给server,最终返回server的响应信息。

上述两者可以视为hadoop的异步通信机制。

更多Hadoop相关信息见Hadoop 专题页面 http://www.linuxidc.com/topicnews.aspx?tid=13

相关问答

更多
  • 为了增强Hadoop的安全机制, 从2009年起, Apache专门抽出一个团队,为Hadoop增加安全认证和授权机制,至今为止,已经可用。   Apache Hadoop 1.0.0版本和Cloudera CDH3之后的版本添加了安全机制,如果你将Hadoop升级到这两个版本,可能会导致Hadoop的一些应用不可用。   Hadoop提供了两种安全机制:Simple和Kerberos。Simple机制(默认情况,Hadoop采用该机制)采用了SAAS协议。 也就是说,用户提交作业时,你说你是XXX(在Jo ...
  • Hadoop的核心机制是通过HDFS文件系统和MapReduce算法进行存储资源、内存和程序的有效利用与管理。然后Hadoop还包括的项目: mapreduce分布式处理模型;HDFS分布式文件系统;pig数据流语言和运行环境;hive分布式按列存储的数据仓库;HBase,ZooKeeper,Sqoop
  • map分类,reduce整合,通过相同的key将需要的信心通过map整合到同一个reduce中,在reduce中实现算法
  • 要理解RPC忘记POST和GET,它的工作原理是不同的(从编程的角度来看,它在内部使用它,但除非你想做一些特殊的事情,否则你没有看到或不需要理解它)。 RPC的一个很好的起点是GWT文档: http : //code.google.com/webtoolkit/tutorials/1.6/RPC.html 给你一个总结。 在使用RPC时, myServiceImpl需要实现名为myService的接口的方法,除了扩展RemoveServiceServlet 。 这些方法以您想要发送给服务器的数据作为参数。 ...
  • 你的解决方案很好。 目前尚不清楚你在问什么...... 我必须在配置中声明directExchange()和binding()bean。 有什么方法可以避免它,因为我觉得它是代码重复,因为我宣布这些bean两次。 @QueueBinding只是对@RabbitListener一种便利, @RabbitListener将队列,交换和绑定声明为@RabbitListener的替代方法。 如果您使用的是常见的@Config类,则可以省略侦听器上的bindings属性,并使用queues = "${queue.re ...
  • http://www.rabbitmq.com/api-guide.html#rpc 如果没有,您可以在此处下载包含示例代码的Java API源代码。 http://www.rabbitmq.com/releases/rabbitmq-java-client/v2.0.0/rabbitmq-java-client-2.0.0.zip那里有一个示例文件夹 - 下面的代码来自HelloServer.java和HelloClient.java 服务器 import com.rabbitmq.client.AMQP ...
  • 如果您愿意进行心理转换并停止思考表和行,而是考虑数据和消息,那么Service Broker可以处理所有通信,传递和消息处理。 而不是在本地(在Express机器上)执行INSERT INTO LocalTable(datetime, temperature) VALUES (...)您可以考虑: BEGIN CONVERSATION WITH CentralServer ...; SEND ON conversation MESSAGE TYPE [Measurement] ( ...
  • 您可以使用通道来实现超时模式: import "time" c := make(chan error, 1) go func() { c <- client.Call("Service", args, &result) } () select { case err := <-c: // use err and result case <-time.After(timeoutNanoseconds): // call timed out } select将阻塞,直到timeoutN ...
  • hadoop 0.20不支持这个,请阅读本期https://issues.apache.org/jira/browse/HADOOP-6889 hadoop 0.20 doesn't support this, please read this issue https://issues.apache.org/jira/browse/HADOOP-6889
  • 好的,找到原因,我连接到纱线资源管理器的错误端口。 正确的配置是:yarn.resourcemanager.address = localhost:8032 Ok, found the reason, I connected to the wrong port for the yarn resourcemanager. The correct configuration is: yarn.resourcemanager.address=localhost:8032