Hadoop源码分析 HDFS ClientProtocol——addBlock

2019-03-28 13:23|来源: 网络

addBlock()负责分配一个新的block以及该block备份存储的datanodeaddBlock函数声明如下:

public LocatedBlock addBlock(String src, String clientName,DatanodeInfo[] excludedNodes) 

其中src代表需要写入新block的文件;clientName代表写入该blockclientexcludedNodes代表该block不能存储的datanode

首先让我们思考一下addBlock应该如何实现。首先,我们需要修改src文件的INodeFile以便将新增block添加到INodeFile当中。因为INodeFile保存了最后一个Block存放的datanode,因此需要为新增的block分配datanode。下面我们看一下addBlock具体是怎么实现的。

NameNode中,addBlock是通过FSNamesystem.getAdditionalBlock方法来实现的。在getAdditionalBlock中,首先检查内存中INodeblock数量之和是否已经超过系统设置的阈值。然后检查Lease,查看当前client是否拥有文件写锁。除此之外,还需要检查INodeFile倒数第二个block是否已经完成所有备份存储的复制。如果所有这些检查都没有问题就挑选新block所有存储节点(datanode)。存储节点(datanode)挑选完成后,就new一个新的block。新block创建完成后,会在FSNamesystem.blocksMap中保存该block(其实是对应的BlockInfo)并且在INodeFile中添加该block

注意:通过阅读源码我们可以知道,在addBlock方法中就已经将该block存入到blocksMap当中。这就需要我们考虑当DataNode不断的report该节点存放的block时修改的是不是blocksMap,其之间的逻辑关系是什么样子的。

public LocatedBlock getAdditionalBlock(String src, String clientName,

List<Node> excludedNodes) throws IOException {

long fileLength, blockSize;

int replication;

DatanodeDescriptor clientNode = null;

Block newBlock = null;

 

NameNode.stateChangeLog

.debug("BLOCK* NameSystem.getAdditionalBlock: file " + src

" for " + clientName);

 

synchronized (this) {

// have we exceeded the configured limit of fs objects.

checkFsObjectLimit();

 

INodeFileUnderConstruction pendingFile = checkLease(src, clientName);

 

//

// If we fail this, bad things happen!

//

if (!checkFileProgress(pendingFile, false)) {

throw new NotReplicatedYetException("Not replicated yet:" + src);

}

fileLength = pendingFile.computeContentSummary().getLength();

blockSize = pendingFile.getPreferredBlockSize();

clientNode = pendingFile.getClientNode();

replication = (int) pendingFile.getReplication();

}

 

// choose targets for the new block tobe allocated.

DatanodeDescriptor targets[] = replicator.chooseTarget(replication,

clientNode, excludedNodes, blockSize);

if (targets.length < this.minReplication) {

throw new IOException("File " + src

" could only be replicated to " + targets.length

" nodes, instead of " + minReplication);

}

 

// Allocate a new block and record it in the INode.

synchronized (this) {

if (isInSafeMode()) {

throw new SafeModeException("Cannot add block to " + src,

safeMode);

}

INode[] pathINodes = dir.getExistingPathINodes(src);

int inodesLen = pathINodes.length;

checkLease(src, clientName, pathINodes[inodesLen - 1]);

INodeFileUnderConstruction pendingFile = (INodeFileUnderConstruction) pathINodes[inodesLen - 1];

 

if (!checkFileProgress(pendingFile, false)) {

throw new NotReplicatedYetException("Not replicated yet:" + src);

}

 

// allocate new block record block locations in INode.

newBlock = allocateBlock(src, pathINodes);

pendingFile.setTargets(targets);

 

for (DatanodeDescriptor dn : targets) {

dn.incBlocksScheduled();

}

}

 

// Create next block

LocatedBlock b = new LocatedBlock(newBlock, targets, fileLength);

if (isAccessTokenEnabled) {

b.setBlockToken(accessTokenHandler.generateToken(b.getBlock(),

EnumSet.of(BlockTokenSecretManager.AccessMode.WRITE)));

}

return b;

}

 

 

private Block allocateBlock(String src, INode[] inodes) throws IOException {

Block b = new Block(FSNamesystem.randBlockId.nextLong(), 0, 0);

while (isValidBlock(b)) {

b.setBlockId(FSNamesystem.randBlockId.nextLong());

}

b.setGenerationStamp(getGenerationStamp());

b = dir.addBlock(src, inodes, b);

NameNode.stateChangeLog.info("BLOCK* NameSystem.allocateBlock: " + src

". " + b);

return b;

}

 

 

Block addBlock(String path, INode[] inodes, Block block) throws IOException {

waitForReady();

 

synchronized (rootDir) {

INodeFile fileNode = (INodeFile) inodes[inodes.length - 1];

 

// check quota limits and updated space consumed

updateCount(

inodes,

inodes.length - 1,

0,

fileNode.getPreferredBlockSize()

* fileNode.getReplication(), true);

 

// associate the new list of blocks with this file

namesystem.blocksMap.addINode(block, fileNode);

BlockInfo blockInfo = namesystem.blocksMap.getStoredBlock(block);

fileNode.addBlock(blockInfo);

NameNode.stateChangeLog.debug("DIR* FSDirectory.addFile: " + path

" with " + block + " block is added to the in-memory "

"file system");

}

return block;

}

相关问答

更多
  • hadoop hdfs的问题[2021-10-30]

    最下面那张图里环境变量设置的那一行多了一个$符号 export JAVA_HOME=/usr/java/jdk1.6.0_35
  • 如何上传的?api么?你写两个不同的文件试试 先确定问题在哪,如果有错误日志贴出来
  • 在使用Hadoop的过程中,很容易通过FileSystem类的API来读取HDFS中的文件内容,读取内容的过程是怎样的呢?今天来分析客户端读取HDFS文件的过程,下面的一个小程序完成的功能是读取HDFS中某个目录下的文件内容,然后输出到控制台,代码如下: [java] view plain copy public class LoadDataFromHDFS { public static void main(String[] args) throws IOException { new LoadDataF ...
  • 在Amazon EC2中创建多节点集群的最佳分步指南之一就是这里 它解释了每一步。 你已经完成了第一部分似乎,通过第二部分,它将帮助你.. 希望它能帮到你...... One of the best step by step guide to create multinode cluster in Amazon EC2 is Here It explains each and every step. You are already done with first part seems, Go through ...
  • 默认情况下,Hadoop将使用本地模式。 您可能需要在$HADOOP_HOME/conf/core-site.xml中将fs.default.name设置为hdfs://localhost.localdomain:8020/ 。 为此,请将其添加到core-site.xml : fs.default.name hdfs://localhost.localdomain:8020/ Accumulo ...
  • 实现自己的DFS接口并使其与hadoop一起使用相对简单。 您所需要的只是文件和目录的文件系统概念与您的存储之间的某种逻辑映射。 在NoSQL的情况下(如果我假设KeyValue),您应该决定如何表示目录。 您可以执行一些特殊节点,也可以将路径放入密钥。 另一个决策点 - 决定您是否关心数据位置 关于文档,我认为s3n DFS实现的来源最好从一开始。 我认为关闭的例子是由DataStax完成的Cassandra上的Hadoop http://www.datastax.com/ 另一个例子(我们稍后做的事情) ...
  • Apache Spark独立于Hadoop。 Spark允许您使用不同的数据源(包括HDFS),并且能够在独立群集中运行,或者使用现有的资源管理框架(例如,YARN,Mesos)。 因此,如果您只对Spark感兴趣,则无需安装Hadoop。 Apache Spark is independent from Hadoop. Spark allows you to use different sources of data (incl. HDFS) and is capable of running eithe ...
  • 绝对可能。 不要认为Hadoop是一个可安装的程序,它只是由一群在集群内不同节点上运行的java进程组成。 如果你使用hadoop tar ball,你可以只运行NameNode和DataNodes进程,如果你只想要HDFS。 如果您使用其他hadoop发行版(例如HDP),我认为HDFS和mapreduce来自不同的rpm软件包,但安装两个rpm软件包都有害。 如果您只需要HDFS,请再次运行NameNode和DataNodes。 Absolutely possible. Don't think Hado ...
  • Mappers从InputFormat的实现中读取输入数据。 大多数实现都来自FileInputFormat ,后者从本地计算机或HDFS读取数据。 (默认情况下,数据从HDFS读取,mapreduce作业的结果也存储在HDFS中。)当您希望从备用数据源读取数据而不是HDFS时,可以编写自定义InputFormat 。 TableInputFormat将直接从HBase读取数据记录, DBInputFormat将访问关系数据库中的数据。 您还可以想象一个系统,在特定端口上通过网络将数据流式传输到每台计算机; ...