HDFS的副本存放策略

2019-03-28 14:14|来源: 网络

HDFS作为Hadoop中的一个分布式文件系统,而且是专门为它的MapReduce设计,所以HDFS除了必须满足自己作为分布式文件系统的高可靠性外,还必须为MapReduce提供高效的读写性能,那么HDFS是如何做到这些的呢?首先,HDFS将每一个文件的数据进行分块存储,同时每一个数据块又保存有多个副本,这些数据块副本分布在不同的机器节点上,这种数据分块存储+副本的策略是HDFS保证可靠性和性能的关键,这是因为:一.文件分块存储之后按照数据块来读,提高了文件随机读的效率和并发读的效率;二.保存数据块若干副本到不同的机器节点实现可靠性的同时也提高了同一数据块的并发读效率;三.数据分块是非常切合MapReduce中任务切分的思想。在这里,副本的存放策略又是HDFS实现高可靠性和搞性能的关键。

       HDFS采用一种称为机架感知的策略来改进数据的可靠性、可用性和网络带宽的利用率。通过一个机架感知的过程,NameNode可以确定每一个DataNode所属的机架id(这也是NameNode采用NetworkTopology数据结构来存储数据节点的原因,也是我在前面详细介绍NetworkTopology类的原因)。一个简单但没有优化的策略就是将副本存放在不同的机架上,这样可以防止当整个机架失效时数据的丢失,并且允许读数据的时候充分利用多个机架的带宽。这种策略设置可以将副本均匀分布在集群中,有利于当组件失效的情况下的均匀负载,但是,因为这种策略的一个写操作需要传输到多个机架,这增加了写的代价。

      在大多数情况下,副本系数是3,HDFS的存放策略是将一个副本存放在本地机架节点上,一个副本存放在同一个机架的另一个节点上,最后一个副本放在不同机架的节点上。这种策略减少了机架间的数据传输,提高了写操作的效率。机架的错误远远比节点的错误少,所以这种策略不会影响到数据的可靠性和可用性。与此同时,因为数据块只存放在两个不同的机架上,所以此策略减少了读取数据时需要的网络传输总带宽。在这种策略下,副本并不是均匀的分布在不同的机架上:三分之一的副本在一个节点上,三分之二的副本在一个机架上,其它副本均匀分布在剩下的机架中,这种策略在不损害数据可靠性和读取性能的情况下改进了写的性能。下面就来看看HDFS是如何来具体实现这一策略的。

       NameNode是通过类来为每一分数据块选择副本的存放位置的,这个ReplicationTargetChooser的一般处理过程如下:


      
     上面的流程图详细的描述了Hadoop-0.2.0版本中副本的存放位置的选择策略,当然,这当中还有一些细节问题,如:如何选择一个本地数据节点,如何选择一个本地机架数据节点等,所以下面我还将继续展开讨论。


1.选择一个本地节点
        这里所说的本地节点是相对于客户端来说的,也就是说某一个用户正在用一个客户端来向HDFS中写数据,如果该客户端上有数据节点,那么就应该最优先考虑把正在写入的数据的一个副本保存在这个客户端的数据节点上,即被看做是本地节点,但是如果这个客户端上的数据节点空间不足或者是当前负载过重,则应该从该数据节点所在的机架中选择一个合适的数据节点作为此时这个数据块的本地节点。另外,如果客户端上没有一个数据节点的话,则从整个集群中随机选择一个合适的数据节点作为 此时这个数据块的本地节点。那么,如何判定一个数据节点合不合适呢,它是通过isGoodTarget方法来确定的:
 
  1. private boolean isGoodTarget(DatanodeDescriptor node, long blockSize, int maxTargetPerLoc, boolean considerLoad, List<DatanodeDescriptor> results) {  
  2.         
  3.     Log logr = FSNamesystem.LOG;  
  4.     // 节点不可用了   
  5.     if (node.isDecommissionInProgress() || node.isDecommissioned()) {  
  6.       logr.debug("Node "+NodeBase.getPath(node)+ " is not chosen because the node is (being) decommissioned");  
  7.       return false;  
  8.     }  
  9.   
  10.     long remaining = node.getRemaining() - (node.getBlocksScheduled() * blockSize);  
  11.     // 节点剩余的容量够不够   
  12.     if (blockSize* FSConstants.MIN_BLOCKS_FOR_WRITE>remaining) {  
  13.       logr.debug("Node "+NodeBase.getPath(node)+ " is not chosen because the node does not have enough space");  
  14.       return false;  
  15.     }  
  16.         
  17.     // 节点当前的负载情况   
  18.     if (considerLoad) {  
  19.       double avgLoad = 0;  
  20.       int size = clusterMap.getNumOfLeaves();  
  21.       if (size != 0) {  
  22.         avgLoad = (double)fs.getTotalLoad()/size;  
  23.       }  
  24.       if (node.getXceiverCount() > (2.0 * avgLoad)) {  
  25.         logr.debug("Node "+NodeBase.getPath(node)+ " is not chosen because the node is too busy");  
  26.         return false;  
  27.       }  
  28.     }  
  29.         
  30.     // 该节点坐在的机架被选择存放当前数据块副本的数据节点过多   
  31.     String rackname = node.getNetworkLocation();  
  32.     int counter=1;  
  33.     for(Iterator<DatanodeDescriptor> iter = results.iterator(); iter.hasNext();) {  
  34.       Node result = iter.next();  
  35.       if (rackname.equals(result.getNetworkLocation())) {  
  36.         counter++;  
  37.       }  
  38.     }  
  39.     if (counter>maxTargetPerLoc) {  
  40.       logr.debug("Node "+NodeBase.getPath(node)+ " is not chosen because the rack has too many chosen nodes");  
  41.       return false;  
  42.     }  
  43.       
  44.     return true;  
  45.   }  
2.选择一个本地机架节点
   实际上,选择本地节假节点和远程机架节点都需要以一个节点为参考,这样才是有意义,所以在上面的流程图中,我用红色字体标出了参考点。那么,ReplicationTargetChooser是如何根据一个节点选择它的一个本地机架节点呢?
这个过程很简单,如果参考点为空,则
从整个集群中随机选择一个合适的数据节点作为 此时的本地机架节点;否则就从参考节点所在的机架中 随机选择一个合适的数据节点作为此时的本地机架节点,若这个集群中没有合适的数据节点的话,则从已选择的数据节点中找出一个作为新的参考点,如果找到了一个新的参考点,则从这个 新的参考点在的机架中随机选择一个合适的数据节点作为此时的本地机架节点;否则从整个集群中随机选择一个合适的数据节点作为此时的本地机架节点。如果新的参考点所在的机架中仍然没有合适的数据节点,则只能从整个集群中随机选择一个合适的数据节点作为此时的本地机架节点了。
 
  1. <font xmlns="http://www.w3.org/1999/xhtml">private DatanodeDescriptor chooseLocalRack(DatanodeDescriptor localMachine, List<Node> excludedNodes, long blocksize, int maxNodesPerRack, List<DatanodeDescriptor> results)throws NotEnoughReplicasException {  
  2.     // 如果参考点为空,则从整个集群中随机选择一个合适的数据节点作为此时的本地机架节点   
  3.     if (localMachine == null) {  
  4.       return chooseRandom(NodeBase.ROOT, excludedNodes, blocksize, maxNodesPerRack, results);  
  5.     }  
  6.         
  7.     //从参考节点所在的机架中随机选择一个合适的数据节点作为此时的本地机架节点   
  8.     try {  
  9.       return chooseRandom(localMachine.getNetworkLocation(), excludedNodes, blocksize, maxNodesPerRack, results);  
  10.     } catch (NotEnoughReplicasException e1) {  
  11.       //若这个集群中没有合适的数据节点的话,则从已选择的数据节点中找出一个作为新的参考点   
  12.       DatanodeDescriptor newLocal=null;  
  13.       for(Iterator<DatanodeDescriptor> iter=results.iterator(); iter.hasNext();) {  
  14.         DatanodeDescriptor nextNode = iter.next();  
  15.         if (nextNode != localMachine) {  
  16.           newLocal = nextNode;  
  17.           break;  
  18.         }  
  19.       }  
  20.         
  21.       if (newLocal != null) {//找到了一个新的参考点   
  22.         try {  
  23.           //从这个新的参考点在的机架中随机选择一个合适的数据节点作为此时的本地机架节点   
  24.           return chooseRandom(newLocal.getNetworkLocation(), excludedNodes, blocksize, maxNodesPerRack, results);  
  25.         } catch(NotEnoughReplicasException e2) {  
  26.           //新的参考点所在的机架中仍然没有合适的数据节点,从整个集群中随机选择一个合适的数据节点作为此时的本地机架节点   
  27.           return chooseRandom(NodeBase.ROOT, excludedNodes, blocksize, maxNodesPerRack, results);  
  28.         }  
  29.       } else {  
  30.         //从整个集群中随机选择一个合适的数据节点作为此时的本地机架节点   
  31.         return chooseRandom(NodeBase.ROOT, excludedNodes, blocksize, maxNodesPerRack, results);  
  32.       }  
  33.     }  
  34.   }</font>  
3.选择一个远程机架节点
   选择一个远程机架节点就是随机的选择一个合适的不在参考点坐在的机架中的数据节点,如果没有找到这个合适的数据节点的话,就只能从参考点所在的机架中选择一个合适的数据节点作为此时的远程机架节点了。

4.随机选择若干数据节点
   这里的随机随机选择若干个数据节点实际上指的是从某一个范围内随机的选择若干个节点,它的实现需要利用前面提到过的NetworkTopology数据结构。随机选择所使用的范围本质上指的是一个路径,这个路径表示的是NetworkTopology所表示的树状网络拓扑图中的一个非叶子节点,随机选择针对的就是这个节点的所有叶子子节点,因为所有的数据节点都被表示成了这个树状网络拓扑图中的叶子节点。

5.优化数据传输的路径
   以前说过, HDFS对于Block的副本copy采用的是流水线作业的方式:client把数据Block只传给一个DataNode,这个DataNode收到Block之后,传给下一个DataNode,依次类推,...,最后一个DataNode就不需要下传数据Block了。所以,在为一个数据块确定了所有的副本存放的位置之后,就需要确定这种数据节点之间流水复制的顺序,这种顺序应该使得数据传输时花费的网络延时最小。ReplicationTargetChooser用了非常简单的方法来考量的,大家一看便知:
  1. private DatanodeDescriptor[] getPipeline( DatanodeDescriptor writer, DatanodeDescriptor[] nodes) {  
  2.     if (nodes.length==0return nodes;  
  3.         
  4.     synchronized(clusterMap) {  
  5.       int index=0;  
  6.       if (writer == null || !clusterMap.contains(writer)) {  
  7.         writer = nodes[0];  
  8.       }  
  9.         
  10.       for(;index<nodes.length; index++) {  
  11.         DatanodeDescriptor shortestNode = nodes[index];  
  12.         int shortestDistance = clusterMap.getDistance(writer, shortestNode);  
  13.         int shortestIndex = index;  
  14.         for(int i=index+1; i<nodes.length; i++) {  
  15.           DatanodeDescriptor currentNode = nodes[i];  
  16.           int currentDistance = clusterMap.getDistance(writer, currentNode);  
  17.           if (shortestDistance>currentDistance) {  
  18.             shortestDistance = currentDistance;  
  19.             shortestNode = currentNode;  
  20.             shortestIndex = i;  
  21.           }  
  22.         }  
  23.         //switch position index & shortestIndex   
  24.         if (index != shortestIndex) {  
  25.           nodes[shortestIndex] = nodes[index];  
  26.           nodes[index] = shortestNode;  
  27.         }  
  28.         writer = shortestNode;  
  29.       }  
  30.     }  
  31.     return nodes;  
  32.   }  
     可惜的是,HDFS目前并没有把副本存放策略的实现开放给用户,也就是用户无法根据自己的实际需求来指定文件的数据块存放的具体位置。例如:我们可以将有关系的两个文件放到相同的数据节点上,这样在进行map-reduce的时候,其工作效率会大大的提高。但是,又考虑到副本存放策略是与集群负载均衡休戚相关的,所以要是真的把负载存放策略交给用户来实现的话,对用户来说是相当负载的,所以我只能说Hadoop目前还不算成熟,尚需大踏步发展。

相关问答

更多
  • 你的方式也不是不可以,但是如果要多次重复取一条数据就比较耗时,因为每次取出来都要解析一次,最好还是存之前都解析好,一般使用hash数据结构即可,也就是java中的map,将各个参数放入map中,设备号作为key,map作为value,获取数据时可以获取某个设备的所有参数,也可以获取某个设备的指定参数。
  • 我想你应该在MYSQL中建立一个库,然后再建一个表,把图片的数据库储在这个表中,这样图片就能够被保存下来了
  • 试着回答: 先说明一下: 1. namenode负责管理目录和文件信息,真正的文件块是存放在datanode上。 2. 每个map和reduce(即task)都是java进程,默认是有单独的jvm的,所以不可能同一个类的对象会在不同节点上。 看你的描述是把namenode,datanode和jobtracker,tasktracker有点混了。 所以: 问题1. 分块存放在datanode上 问题2.inputformat是在datanode上,确切的说是在tasktracker中。每个map和reduce ...
  • 一旦你知道在哪里看,他们存储在HDFS上的位置就很容易弄清楚。 :) 如果您在浏览器中访问http://NAMENODE_MACHINE_NAME:50070/ ,则应将其转到带有Browse the filesystem链接的页面。 在$HIVE_HOME/conf目录中有hive-default.xml和/或hive-site.xml ,它具有hive.metastore.warehouse.dir属性。 该值是单击“ Browse the filesystem链接后要导航到的值。 在我的这个/usr/ ...
  • 是的,它会将导入数据的副本存储在HDFS中(如StoreFiles / HFiles),因为HBase只能使用自己的文件集进行操作。 也许你会发现这个很好的概述很有趣。 您可以直接使用存储在HDFS中的数据进行操作,而无需使用EXTERNAL HIVE表将其导入HBase: CREATE EXTERNAL TABLE page_view(viewTime INT, userid BIGINT, page_url STRING, referrer_url STRING, ip STRING ...
  • 从Tom White的“Hadoop:权威指南” 过度复制块这些块超出了它们所属文件的目标复制。 通常,过度复制不是问题,HDFS会自动删除多余的副本。 未复制的块这些块不符合它们所属文件的目标复制。 HDFS将自动创建未复制块的新副本,直到它们满足目标复制。 您可以使用hdfs dfsadmin -metasave获取有关正在复制(或等待复制)的块的信息。 错误复制的块这些块不满足块副本放置策略(请参阅副本放置)。 例如,对于多框群集中的复制级别为三的情况,如果一个块的所有三个副本位于同一机架上,则该块会 ...
  • 默认情况下,Hadoop将使用本地模式。 您可能需要在$HADOOP_HOME/conf/core-site.xml中将fs.default.name设置为hdfs://localhost.localdomain:8020/ 。 为此,请将其添加到core-site.xml : fs.default.name hdfs://localhost.localdomain:8020/ Accumulo ...
  • 使用Teradata DB本身 - 没有。 然而:),Teradata提供所谓的UDA ( 统一数据架构 ),其中Teradata,Aster DB和Hadoop(HDFS)相互连接,几乎可以无缝地协同工作:)。 通常,如果您只想使用非结构化数据,请选择Aster。 这是Teradata的产品,您可以直接与HDFS连接。 HDFS在这里用作廉价且快速的数据存储。 更有趣的解决方案将提出新的Aster版本(6),其中将实现AFS ( Aster文件系统 )。 ASR是一种类似于HDFS的分布式文件系统。 我也 ...
  • 使用put和copyFromLocal命令时,使用mapreduce作业将本地文件复制到HDFS。 实际使用客户端库和队列通过hadoop客户端二进制文件使用Streaming完成的。 在将内容复制到HDFS时,hadoop / hdfs binary命令使用DistributedFileSystem类与Name节点进行交互,以验证并在命名空间中创建有关正在复制的文件的条目。 然后客户端使用DFSOutputStream类将数据转换为数据包并将其放入内部data queue. DataStreamer类将使 ...
  • 以下是数据偏差的一些潜在原因: 如果某些DataNode在一段时间内不可用(不接受请求/写入),则群集最终可能会失去平衡。 TaskTrackers不会在群集节点之间均匀地与DataNode并置。 如果我们在这种情况下通过MapReduce写入数据,则群集可能是不平衡的,因为托管TaskTracker和DataNode的节点将是首选。 与上面相同,但使用HBase的RegionServers。 大量删除数据可能会导致群集不平衡,具体取决于已删除块的位置。 添加新的DataNode不会自动重新平衡群集中的现有 ...