Hadoop Datanode支持磁盘故障代码hack

2019-03-28 14:10|来源: 网络

背景
Hadoop当中的每一个datanode上,都会保存一些HDFS中文件 的blocks,而这些blocks实际上都是按照一定的格式保存在 datanode这台机器的某些本地目录中的,当通过hadoop向HDFS中保存文件的时候,这些文件就会被hadoop按照blocksize切分成多个blocks,并按照一定的负载和调度算法和配置文件中设置 的每个block的副本数分配到集群的某些datanode上去。而且hadoop最近的版本还支持datanode上将HDFS对应的本地目录设置成多个。这个功能 非常有价值,因为通过这个配置,可以在集群的datanode上挂载多个磁盘,每个磁盘挂载在不同的目录下,然后在 hadoop-site.xml中将datanode的dfs.data.dir配置成由逗号分开的多个目录,这些目录分别对应了多个挂载的磁盘。这样可以在集群的io非常高的时候将io操作分配到各个磁盘上去,减少磁盘的压力和出错的几率。但是,根据对目前 0.19.0的版本中的源代码的研究发现,当多个磁盘中的任何一个crash后,整个datanode就会shutdown它自己,而不管其他的磁盘是不是仍然可以工作。这样其实就产生了问题:因为如果仅仅是一个磁盘发生错误就把整个datanode给停掉,那么namenode就势必会在一定的时间间隔后将这台datanode上保存的所有的blocks转存到其他的datanode上去,以保持blocks的副本数不低于hadoop配置文件中 dfs.replication配置项指定的数目。但是这种情况下其实出现问题的那台datanode上仅仅是一个磁盘上blocks需要被转存,其他好的磁盘上的blocks是仍然可用的,这样就会增加集群中的网络 负载和集群中所有机器的负载,而且还会造成资源的浪费和集群资源的不合理使用。如果 datanode上有好几十TB的数据 ,那么这个过程的代价将会更加严重。而更加合理的处理应该是:即使datanode上有某一个磁盘或者多个磁盘 crash了,只要不是全部crash,datanode应该仍然保持工作,并期待坏掉的磁盘能够在一定的时间内被repaired,然后重新插入机器中并重新开始工作。这些磁盘上原本保存的blocks能恢复最好,即使不能恢复,由于namenode有replication机制,也可以保持这些 blocks的副本数不会低于dfs.replication的配置。因此,可行的办法就是研究hadoop这个部分的代码,对它进行修改,以支持以上所说的这种功能。

代码Hack

hadoop中关于这部分的代码被pack在了 org.apache.hadoop.hdfs.server.datanode中,主要的三个类是 DataNode,FSDataSet和DataBlockScanner,其中Datanode类就是一个datanode运行instance的抽象,FSDataSet用来表示datanode节点上关于磁盘配置的信息和一些处理接口,而DataBlockScanner是一个线程,用来不断的检查该datanode上的blocks信息。运行的机制是这样的:

  • 当datanode启动的时候,会初始化许多的信息,如和 namenode通信的socket信息,从hadoop配置文件中读取的配置信息,并利用这些配置信息初始化该datanode instance
  • 每个datanode中都会有多个内部线程在轮询的作一些操作,其中有一个为DataTransfer,用来向其他datanode传输block数据。 datanode启动过程中同样会把这个线程启动。
  • 启动datanode时,同样还会将DataBlockScanner线程启动,这个线程用来keep track datanode上的block和更新信息。
  • datanode中还保存了一个FSDataset的实例 ,它用来记录当前 datanode上关于磁盘的配置信息,以及这些磁盘或者路径下中保存的 HDFS分布式文件系统 中的信息。通过对 hadoop配置文件的读取,datanode也会初始化这个FSDataset类的instance。
  • datanode本身也是一个线程类,它的run()中会间歇的调用一个服务方法:offerService(),这个方法里记录处理的就是datanode的核心处理逻辑。这当中的处理包括:
    • 每隔3秒钟向namenode发送一次自己的heartbeat信息,这些信息被namenode接收到以后会根据对该 heartbeat的分析向 datanode返回一个datanode需要的操作(DatanodeCommand),并根据从namenode返回的这个 DatanodeCommand来作自己相应的操作。
    • 然后会检查本datanode是否有接收到新的block,并作相应的处理
    • 然后检查上一次向namenode进行block report的时间,如果超过一定的时间(默认为1小时).就向namenode发送一次block report,以便让namenode上记录的信息保持更新。
    • 每一次接收到来自namenode的操作信息 (DatanodeCommand),datanode都会作相应的操作。

在datanode操作hdfs时,它会先从它内部保存的FSDataSet实例中得到下一个轮转到的FSVolume,这么一个FSVolume代表了dfs.data.dir的配置项中用逗号分隔开的某一个本地磁盘目录,然后FSDataSet实例会试着在这个FSVolume中的FSDir实例的checkDirTree()方法:
public void checkDirTree() throws DiskErrorException {
      DiskChecker.checkDir(dir);
           
      if (children != null) {
        for (int i = 0; i < children.length; i++) {
          children[i].checkDirTree();
        }
      }
    }
从程序中可以看出,实际上它是首先用一个DiskChecker类来check这个dir是否是合法的,然后再check 这个dir的子目录,而判断这个dir是否合法的逻辑如下:
if (!mkdirsWithExistsCheck(dir))
      throw new DiskErrorException("can not create directory: "
                                   + dir.toString());
       
    if (!dir.isDirectory())
      throw new DiskErrorException("not a directory: "
                                   + dir.toString());
           
    if (!dir.canRead())
      throw new DiskErrorException("directory is not readable: "
                                   + dir.toString());
           
    if (!dir.canWrite())
      throw new DiskErrorException("directory is not writable: "
                                   + dir.toString());
从程序中可以看出,实际上,datanode首先尝试在这个dir中创建一个子目录,然后判断这个目录是否是一个合法的目录,是否可写,是否可读,一旦这几个判断的任何一个发生错误,datanode就认为这个目录出现了问题,于是抛出 DiskErrorException,0.19.0的hadoop此时会把这个异常连续的向上的调用抛出,直到FSVolumeSet实例的 checkDir(),此时datanode发现磁盘错误,然后shutdown()它自己,datanode退出集群。这就是目前datanode处理磁盘的逻辑。但是想想可以发现,这样的逻辑其实不是最好的,因为就如上面开头描述的那样,此时如果datanode上配置了多磁盘,很有可能其他的磁盘都是好的,可以继续工作,需要修复或者copy副本到其他datanode的blocks仅仅是这块坏掉的磁盘上的blocks。
既然明白了 datanode处理磁盘错误的逻辑,就可以自己修改datanode的实现代码,来满足自己的需要。

由于datanode关于磁盘的检错的调用流程为DataNode.checkDiskError( ) -> FSDataSet.checkDataDir() -> FSVolume.checkDirs(),就在这一步,一旦任何一块磁盘发生异常,就把一场抛给了Datanode,datanode于是 shutdown(),并等待管理人员的修复,并在一段时间之后开始拷贝这个datanode上的副本到其他的datanode上去。
所以,在FSVolume的checkDirs()方法中,可以做如下修改:
List<FSVolume> goodVolumes = new ArrayList<FSVolume>();
      for (int idx = 0; idx < volumes.length; idx++) {
        try {
            volumes[idx].checkDirs();
            goodVolumes.add(volumes[idx]);
        } catch (DiskErrorException e) {
            synchronized(crashedVolumes){
                crashedVolumes.add(volumes[idx]);
            }
        }
      }
      if(goodVolumes.size() == 0) {
          throw new AllDiskErrorException("All " + volumes.length + " disk(s) error: ");
      } else if (volumes.length - goodVolumes.size() > 0) {
          volumes = goodVolumes.toArray(new FSVolume[0]);
          throw new DataNodeDiskErrorException(sb.toString());
      }
程序的逻辑为:创建一个新的队列,用来保存在遍历每一个FSVolume,如果当前的FSVolume是好的,就加入到这个新的goodVolumes队列中去,而一旦出现坏的磁盘或者dir,就把它加入到crashedVolumes队列中,最后遍历完成后,将goodVolumes中的FSVolume保存为队列重新赋予给 volumes。

同时,在datanode中create一个线程,让它没过一段时间去check,看是否crashedVolumes 的队列中是否有FSVolume的实例,如果有是否已经repaired,如果没有就继续等待下一次check,代码如下:
class CrashVolumeChecker implements Runnable {
        public void run() {
            while (true) {
                if (data.checkCrashedVolumes()) {
                    try {
                        data.checkDataDir();
                        reBlockReport();
                    } catch (DataNodeDiskErrorException de) {
                        handleDiskError(de.getMessage());
                    } catch (AllDiskErrorException de) {
                        handleAllDiskError(de.getMessage());
                    }
                }
                try {
                    Thread.sleep(CRASH_VOLUME_CHECK_INTERVAL);
                } catch (InterruptedException ie) {
                }
            }
        }
    }
然后再在datanode的run()中将这个线程启动,就可以了。

相关问答

更多
  • ,就当是抛砖引玉了。 相信楼主知道,hadoop的文件系统叫做hdfs,就是hadoop分布式分布式文件系统的中文简写。这个系统是对google的gfs的开源实现。下面来回答问题。 首先是节点故障: google在他们那篇gfs的论文中说,google在使用gfs曾说过,google在使用gfs时遇到过各种各样的问题,主要有:应用程序bug、操作系统的bug、人为失误,甚至还有硬盘、内存、连接器、网络以及电源失效。在一个大型的系统中,硬盘内存等等组件的老化,过度使用(因为数据读写非常频繁)等问题也是不容忽视 ...
  • 我认为问题出在您的/etc/hosts文件中。 确保配置正确。 我的/etc/hosts文件如下所示: 127.0.0.1 localhost 192.168.0.1 master mypcname 192.168.0.18 slave1 192.168.0.12 slave2 192.168.0.17 slave3 为主人和 127.0.0.1 localhost 192.168.0.1 master 192.168.0.18 slave ...
  • 如果您的hadoop集群的复制因子大于1(对于多节点集群,默认情况下为3),则必须在多个数据节点上复制您的数据。 您可以在hdfs-site.xml中检查复制因子值(dfs.replication)。 因此,现在如果从群集中删除此只读数据节点并且复制因子大于1,那么您将不会遇到任何数据丢失。 因为您的群集将在其他datanode上具有相应的副本。 为了平衡副本,未完成的块将由hdfs自动处理,随后hdfs将保持稳定。 If your hadoop cluster was having a replicati ...
  • datanode1的namespaceID与当前的namenode不匹配。也许你从另一个集群中复制了/ usr / local / hadoop / tmp / dfs / data目录。如果datanode1的数据不相关,你可以删除/ usr / local / hadoop datanode1的/ tmp / dfs / * the namespaceID of datanode1 do not match the current namenode's.Maybe you copied the /usr ...
  • 您需要为openCV安装所需的包。 这篇文章介绍如何安装openCV: http : //www.samontab.com/web/2012/06/installing-opencv-2-4-1-ubuntu-12-04-lts/ 您需要的是以下命令: sudo apt-get install build-essential libgtk2.0-dev libjpeg-dev libtiff4-dev libjasper-dev libopenexr-dev cmake python-dev python- ...
  • Hadoop有asm 3.2而我使用的是ASM 5.在ASM5中,ClassVisitor是一个超类,而在3.2中它是一个接口。 出于某种原因,错误是Throwable(信任Shevek),catch块只捕获异常。 任何hadoop日志都没有捕获throwable错误。 因此,调试非常困难。 使用jar jar链接修复asm版本问题,现在一切正常。 如果你正在使用Hadoop并且某些东西不起作用并且没有日志显示任何错误,那么请尝试抓住Throwable。 阿伦 Hadoop had asm 3.2 and ...
  • 如果您在驱动器上安装了文件系统,则Hadoop可以使用该驱动器。 HDFS将其数据存储在正常的OS文件系统中。 Hadoop不知道该驱动器是否加密,它不会在意。 If you have mounted a file system on the drive then Hadoop can use the drive. HDFS stores its data in the normal OS file system. Hadoop will not know whether the drive is encr ...
  • 停止所有hadoop服务 删除dfs / namenode 从slave和master中删除dfs / datanode 检查Hadoop文件夹的保留: sudo chmod -R 755 / usr / local / hadoop 重启Hadoop 检查/验证文件夹权限。 sudo chmod -R 755 / home / hadoop / appdata 如果仍有问题,请检查日志文件 Stop all hadoop services Delete dfs/namenode Delete dfs/da ...
  • 您可以使用Sqoop将数据从RDBMS导入Hadoop。 Hadoop会处理非结构化数据,因为您将约束(创建结构化数据)推到了最后。 这也允许创建什么样的结构,这将定义您可以提取的信息类型。 永远不会说您无法处理结构化数据,但获得的里程数很低。 RDBMS可以高效地处理结构化数据。 You can use Sqoop to import data from RDBMS to Hadoop. Hadoop shines at processing unstructured data because you a ...
  • 你是部分正确的。 为了避免Jobtracker做出调度和监控的负担,YARN被引入了画面。 因此,对于YARN,您没有任何作业跟踪器或任务跟踪器。 Job跟踪器完成的工作现在由资源管理器完成,资源管理器有两个主要组件Scheduler(为应用程序分配资源)和ApplicationsManager(接受作业提交并在发生任何故障时重新启动ApplicationMaster)。 现在每个应用程序都有一个ApplicationMaster,它从调度程序协调容器(将运行作业的位置)以运行应用程序。 Nodemanag ...