知识点
相关文章
更多最近更新
更多Hadoop中DataNode与NameNode之间的心跳机制
2019-03-28 13:07|来源: 网络
DataNode: 用于存储HDFS的数据,
public class DataNode extends Configured
implements InterDatanodeProtocol, ClientDatanodeProtocol, FSConstants, Runnable {。。。}
1,实现了InterDatanodeProtocol, ClientDatanodeProtocol来供与客户端以及其他DataNode的通信,接受请求。
2,实现Runnable 接口
首先是来看下run()方法
public void run() {
LOG.info(dnRegistration + "In DataNode.run, data = " + data);
// start dataXceiveServer
dataXceiverServer.start();
while (shouldRun) {
try {
startDistributedUpgradeIfNeeded();
offerService();
} catch (Exception ex) {
LOG.error("Exception: " + StringUtils.stringifyException(ex));
if (shouldRun) {
try {
Thread.sleep(5000);
} catch (InterruptedException ie) {
}
}
}
}
LOG.info(dnRegistration + ":Finishing DataNode in: "+data);
shutdown();
}
①, dataXceiverServer.start();开启数据监听守护进程,有数据到来时,开启一个DataXceiver线程
DataXceiverServer.java
public void run() {
while (datanode.shouldRun) {
try {
Socket s = ss.accept();
s.setTcpNoDelay(true);
new Daemon(datanode.threadGroup,
new DataXceiver(s, datanode, this)).start();
} catch (SocketTimeoutException ignored) {
// wake up to see if should continue to run
} catch (IOException ie) {
LOG.warn(datanode.dnRegistration + ":DataXceiveServer: "
+ StringUtils.stringifyException(ie));
} catch (Throwable te) {
LOG.error(datanode.dnRegistration + ":DataXceiveServer: Exiting due to:"
+ StringUtils.stringifyException(te));
datanode.shouldRun = false;
}
}
try {
ss.close();
} catch (IOException ie) {
LOG.warn(datanode.dnRegistration + ":DataXceiveServer: "
+ StringUtils.stringifyException(ie));
}
}
在DataXceiver中主要是从DataXceiverServer读写数据
-------DataXceiver.java----------
/**
* Read/write data from/to the DataXceiveServer.
*/
public void run() {
DataInputStream in=null;
try {
in = new DataInputStream(
new BufferedInputStream(NetUtils.getInputStream(s),
SMALL_BUFFER_SIZE));
short version = in.readShort();
if ( version != DataTransferProtocol.DATA_TRANSFER_VERSION ) {
throw new IOException( "Version Mismatch" );
}
boolean local = s.getInetAddress().equals(s.getLocalAddress());
byte op = in.readByte();
// Make sure the xciver count is not exceeded
int curXceiverCount = datanode.getXceiverCount();
if (curXceiverCount > dataXceiverServer.maxXceiverCount) {
throw new IOException("xceiverCount " + curXceiverCount
+ " exceeds the limit of concurrent xcievers "
+ dataXceiverServer.maxXceiverCount);
}
long startTime = DataNode.now();
switch ( op ) {
case DataTransferProtocol.OP_READ_BLOCK:
readBlock( in );
datanode.myMetrics.readBlockOp.inc(DataNode.now() - startTime);
if (local)
datanode.myMetrics.readsFromLocalClient.inc();
else
datanode.myMetrics.readsFromRemoteClient.inc();
break;
case DataTransferProtocol.OP_WRITE_BLOCK:
writeBlock( in );
datanode.myMetrics.writeBlockOp.inc(DataNode.now() - startTime);
if (local)
datanode.myMetrics.writesFromLocalClient.inc();
else
datanode.myMetrics.writesFromRemoteClient.inc();
break;
case DataTransferProtocol.OP_READ_METADATA:
readMetadata( in );
datanode.myMetrics.readMetadataOp.inc(DataNode.now() - startTime);
break;
case DataTransferProtocol.OP_REPLACE_BLOCK: // for balancing purpose; send to a destination
replaceBlock(in);
datanode.myMetrics.replaceBlockOp.inc(DataNode.now() - startTime);
break;
case DataTransferProtocol.OP_COPY_BLOCK:
// for balancing purpose; send to a proxy source
copyBlock(in);
datanode.myMetrics.copyBlockOp.inc(DataNode.now() - startTime);
break;
case DataTransferProtocol.OP_BLOCK_CHECKSUM: //get the checksum of a block
getBlockChecksum(in);
datanode.myMetrics.blockChecksumOp.inc(DataNode.now() - startTime);
break;
default:
throw new IOException("Unknown opcode " + op + " in data stream");
}
} catch (Throwable t) {
LOG.error(datanode.dnRegistration + ":DataXceiver",t);
} finally {
LOG.debug(datanode.dnRegistration + ":Number of active connections is: "
+ datanode.getXceiverCount());
IOUtils.closeStream(in);
IOUtils.closeSocket(s);
dataXceiverServer.childSockets.remove(s);
}
}
相关问答
更多-
在 /usr/local/hadoop/ 下面启动,找到是/opt/Hadoop/下面的hadoop安装包,是不是HADOOP_HOME环境变量配置的有问题。 可以到海牛部落交流,那里有好多学习hadoop的朋友
-
试着回答: 先说明一下: 1. namenode负责管理目录和文件信息,真正的文件块是存放在datanode上。 2. 每个map和reduce(即task)都是java进程,默认是有单独的jvm的,所以不可能同一个类的对象会在不同节点上。 看你的描述是把namenode,datanode和jobtracker,tasktracker有点混了。 所以: 问题1. 分块存放在datanode上 问题2.inputformat是在datanode上,确切的说是在tasktracker中。每个map和reduce ...
-
我发现了这个问题。 只需将/ etc / hosts更改为仅将127.0.0.1映射到本地主机。 之前 127.0.0.1 STWHDDD01 127.0.0.1 localhost ... 后 127.0.0.1 localhost ... 我仍然想知道为什么旧映射在没有Kerberos身份验证的情况下工作。 I found the problem. Need to change /etc/hosts to map 127.0.0.1 to localhost only. Before 127.0.0 ...
-
无法在Hadoop中启动NameNode守护程序和DataNode守护程序(Can't start NameNode daemon and DataNode daemon in Hadoop)[2022-04-06]
上述错误表明权限问题。 您必须确保hadoop用户具有/ usr / local / hadoop的适当权限。 为此,您可以尝试: sudo chown -R hadoop /usr/local/hadoop/ 要么 sudo chmod 777 /usr/local/hadoop/ The errors above suggest a permissions problem. You have to make sure that the hadoop user has the proper ... -
我刚刚将默认的超级组名称更改为新创建的组,其中包含所有hadoop用户的成员。 现在该组中的所有用户都充当超级用户,因此它工作正常。
dfs.permissions.superusergroup Hadoopgroup -
这取决于您所讨论的Hadoop版本。 在Hadoop 2之前, Namenode是一个单点故障,所以如果它失败意味着你的集群变得无法使用。 在这种情况下,即使SecondaryNameNode也没有帮助,因为它仅用于检查点,而不是用作NameNode的备份。 当NameNode失败时,像管理员这样的人必须手动重启NameNode 。 但是从Hadoop 2开始,你有更好的方法来处理NameNode失败。 您可以并排运行2个冗余NameNodes ,这样,如果其中一个Namenodes失败,群集将快速故障转移 ...
-
下载osquery https://code.facebook.com/projects/658950180885092 并安装 发出这个命令osqueryi 当出现提示时,使用此sql命令查看所有正在运行的java进程并找到pids SELECT名称,路径,pid FROM进程,其中name =“java”; 你会在Mac上看到这样的东西 +------+-------------------------------------------------------------------------- ...
-
您可以在开发Hadoop Wiki上找到有关此内容的详细信息。 它有关于如何设置开发环境,如何开发单元测试等内容的详细信息。 HTH You can find detailed information on this on the Developing Hadoop Wiki. It has detailed info on stuff like How To Setup Your Development Environment, How To Develop Unit Tests etc. HTH
-
问题是属性名称为dfs.datanode.data.dir ,它的拼写错误为dfs.dataode.data.dir 。 这使得属性无法被识别,因此${hadoop.tmp.dir}/hadoop-${USER}/dfs/data的默认位置被用作数据目录。 hadoop.tmp.dir默认是/tmp ,每次重启时都会删除该目录的内容,并强制datanode在启动时重新创建该文件夹。 因此不兼容的clusterID 。 在格式化namenode并启动服务之前,在hdfs-site.xml编辑此属性名称。 Th ...
-
有没有办法可以挂钩NameNode执行来识别从HDFS添加/修改/删除的文件 - 类似于Windows中的文件系统事件? 是! 最新版本的HDFS包含类似于Linux inotify的功能,它允许HDFS客户端侦听NameNode发布的各种文件系统事件。 遗憾的是,我们目前没有关于Apache Hadoop网站上该功能的明确文档。 如果您想了解有关此功能的更多信息,那么我建议您查看Apache JIRA HDFS-6634 ,这是跟踪该功能开发的主要问题。 附在那里的设计文档和补丁将让您了解它的工作原理。 ...