Hadoop HDFS源码学习笔记(六)--fetchBlockByteRange

2019-03-28 13:54|来源: 网络

client端需要从datanode端读取数据,当顺序的读取block的时候,会调用到fetchBlockByteRange函数,该函数中,有一个死循环,在循环内部首先使用函数getBlockAt()获得最新的block的信息,然后选择要链接的datanode的信息得到DNAddrPair类型的对象,从该结构中得到DatanodeInfo以及datanode的InetSocketAddress,然后调用函数getBlockReader函数获得一个BlockReader类型的对象。

getBlockReader函数分析:

1、首先根据dnAddr判断是否在client端有跟该datanode的cache的连接,如果存在cache的连接,则会调用DFSClient的getLocalBlockReader函数来执行后续的操作,先不分析这类情况。

2、for循环,主要目的是允许retry连接,次数为3

循环中:

(1)根据dnAddr去一个SocketCache类型的变量中去取得cache的Socket

如果不能从socketCache中得到cache的Socket,则会使用DFSClient的socketFactory的createSocket函数创建一个新的socket,然后执行connect连接操作

(2)调用BlockReaderFactory的newBlockReader方法new一个BlockReader类型的对象,单步跟踪进去,发现又进行了一次封装,facoty类函数封装了RemoteBlockReader2的newBlockReader方法,在该方法内部,使用NetUtils的getOutputSream方法返回了一个(OutputStream)SocketOutputStream对象,利用该对象创建了一个BufferedOutputStream,然后利用该对象创建了一个DataOutputStream对象out,然后利用out创建了一个Sender对象,调用sender的readBlock方法。

Sender的readBlock方法,得到了一个OpReadBlockProto对象,并创建了该对象的header的PB的Message,然后调用自身的send函数,发送了一个ReanBlock类型的命令,在send函数中调用了DataOutputStream的Flush函数。

回到RemoteBlockReader2的newBlockReader方法:创建了一个SocketInputStream对象,然后利用该对象创建了一个DataInputStream类型的对象,之后创建了一系列的response,checksum的proto的message信息,创建的datachecksum对象,并对firstChunkOffset进行了验证检测。

然后调用RemoteBlockReader2的构造函数,创建了一个RemoteBlockReader2的对象,然后返回

(3)调用RemoteBlockReader2的readAll方法,该方法封装了BlockReaderUtil的readAll方法, readAll方法,则又循环读入,封装的是RemoteBlockReader2的read方法,在read方法内部,首先判断,一个java.nio.ByteBufer类型的curPacketBuf是否为null,或者(jana.nio.ByteBuffer类型的curDataSlice的remaining为0,且bytesNeededToFinish大于0)则会调用RemoteBlockReader2的readNextPacket()方法。在readNextPacket函数中调用了函数readPacketHeader方法,该方法最终会使用到ReadableByteChannel类型的RemoteBlockReader2的in成员变量来读取packet的header,并将读取过来的ByteBuffer类型的headerBuf中的packetheader字段放入到PacketHeader类型的成员变量curHeader中。

调用resetPacketBuffer函数,根据计算出来的checksumsLen和packetheader中的dataLen重设curPacketBuf以及curDataSlice的字段内容。调用readChannelFully函数从socket连接中读取数据到curPacketBuf中。然后根据用户请求的offset和得到的packetHeader中存储的该packet在block中的offset计算得到应该读取的position,然后利用该newPos对curDataSlice进行position设置。注意在radChannelFuly函数之后会根据curHeader存放的当前packet中的data的length来更新bytesNeededToFini。sh成员变量,直到该变量小于0才判定已经读完用户请求长度的数据,然后接受最后一个空包,然后发送给datanode一个readResult。

跳出到RemoteBlockReader2的read函数中,会发现首先需要判断当前的packet中的dataLen和用户请求的len的长度大小,得到小的一个值nRead,然后从curdataSlice数组中获得nRead长度的数据。然后就跳回到BlockReaderUtil的readAll函数中,该函数会根据当前的nRead值以及之前的累积读到的数据长度和用户请求的len来判定是否继续执行read操作。

至此就使用BlockReader读取了用户请求的数据,然后会关闭当前的BlockReader,并返回,到目前就完成的fecthBlockByteRange的函数操作,从之前的分析可以了解到,该函数执行的操作是根据block的元数据信息从一个datanode上读取block中的数据

现将在该追踪过程中画的序列图和相关类图附上,以便理清思路,源码就不贴了,因为这次用的是23的代码,本机上没有,是在台式机上追踪的,两台电脑没互通呵呵。

相关阅读:

Hadoop HDFS源码学习笔记(一) http://www.linuxidc.com/Linux/2012-03/55966.htm

Hadoop HDFS源码学习笔记(二) http://www.linuxidc.com/Linux/2012-03/56166.htm

Hadoop HDFS源码学习笔记(三) http://www.linuxidc.com/Linux/2012-03/56167.htm

Hadoop HDFS源码学习笔记(四) http://www.linuxidc.com/Linux/2012-03/57036.htm

Hadoop HDFS源码学习笔记(五) http://www.linuxidc.com/Linux/2012-03/57037.htm

Hadoop HDFS源码学习笔记(六) http://www.linuxidc.com/Linux/2012-03/57035.htm

Hadoop HDFS源码学习笔记(七)--DFSInputStream -- openInfo http://www.linuxidc.com/Linux/2012-03/57487.htm

Hadoop HDFS源码学习笔记(八)---HdfsProxy  http://www.linuxidc.com/Linux/2012-03/57488.htm

更多Hadoop相关信息见Hadoop 专题页面 http://www.linuxidc.com/topicnews.aspx?tid=13

Hadoop

Hadoop

相关问答

更多
  • hadoop和hbase问题[2022-03-08]

    Hadoop 是一个能够对大量数据进行分布式处理的软件框架。 HBase是一个分布式的、面向列的开源数据库。 HBase在Hadoop之上提供了类似于Bigtable的能力。 HBase是Apache的Hadoop项目的子项目。 HBase不同于一般的关系数据库,它是一个适合于非结构化数据存储的数据库。 另一个不同的是HBase基于列的而不是基于行的模式。
  • hadoop hdfs目录[2022-03-28]

    移动目录的命令很简单啊,跟Linux shell的命令差不多: hdfs dfs -mv 被移动的目录A 目标目录B 直接移动就行,不知道能否解决你的问题。
  • 英语要好 只要有毅力就OK 。最好是有Java基础,因为hadoop是用java编写的,所以懂java能够帮助理解hadoop原理,当然Hadoop也提供了其他语言的api。不过看楼主是从事javaee的,所以完全没问题,至于Linux只要会基本的cd,ll就差不多了,会安装文件,也就是解压软件,不过不会也没关系,网上多了去了。其实最主要就是能够坚持
  • hadoop hdfs的问题[2021-10-30]

    最下面那张图里环境变量设置的那一行多了一个$符号 export JAVA_HOME=/usr/java/jdk1.6.0_35
  • 在使用Hadoop的过程中,很容易通过FileSystem类的API来读取HDFS中的文件内容,读取内容的过程是怎样的呢?今天来分析客户端读取HDFS文件的过程,下面的一个小程序完成的功能是读取HDFS中某个目录下的文件内容,然后输出到控制台,代码如下: [java] view plain copy public class LoadDataFromHDFS { public static void main(String[] args) throws IOException { new LoadDataF ...
  • 做了一个全新安装的hadoop并用同一个罐子运行工作,问题就消失了。 似乎是一个错误,而不是编程错误。 Did a fresh installation of hadoop and ran the job with the same jar, the problem disappeared. Seems to be a bug rather than programming errors.
  • 我认为这会帮助你更好地理解Spark和Haddop之间的关系: Hadoop本质上是一个分布式数据基础架构:它在大量商品服务器中的多个节点上分发海量数据集合,这意味着您不需要购买和维护昂贵的定制硬件。 它还对这些数据进行索引和跟踪,使大数据处理和分析能够比以前更有效。 另一方面,Spark是一种数据处理工具,可以对这些分布式数据集进行操作; 它不会做分布式存储。 欲了解更多信息,请阅读。 I think this will help you understand better the relation be ...
  • 假设zipIn是java.util.zip.ZipInputStream ,你不应该迭代地调用getNextEntry而不是读取字节吗? I resolved this issue after doing some changes in my code. In the first part of code, I was trying to unzip all the zip files whereas I should have access the spilts. Hadoop basic, which ...
  • Apache Spark独立于Hadoop。 Spark允许您使用不同的数据源(包括HDFS),并且能够在独立群集中运行,或者使用现有的资源管理框架(例如,YARN,Mesos)。 因此,如果您只对Spark感兴趣,则无需安装Hadoop。 Apache Spark is independent from Hadoop. Spark allows you to use different sources of data (incl. HDFS) and is capable of running eithe ...
  • 绝对可能。 不要认为Hadoop是一个可安装的程序,它只是由一群在集群内不同节点上运行的java进程组成。 如果你使用hadoop tar ball,你可以只运行NameNode和DataNodes进程,如果你只想要HDFS。 如果您使用其他hadoop发行版(例如HDP),我认为HDFS和mapreduce来自不同的rpm软件包,但安装两个rpm软件包都有害。 如果您只需要HDFS,请再次运行NameNode和DataNodes。 Absolutely possible. Don't think Hado ...