Hadoop对Map执行框架的实现(TaskTracker端)

2019-03-28 14:01|来源: 网络

在前一篇文章中(见 http://www.linuxidc.com/Linux/2012-02/54708.htm ),我从客户端详细的讲解了Hadoop对Map执行框架的实现,即客户端可以自己定义或实现给map操作输入怎样的key-value值、map操作、如何根据map输出的key进行排序、如何对map输出的key-value集合进行合并等。用户在提交作业之前对它进行这些进行相应的配置即可。那么,TaskTracker在真正执行作业的任务时是如何根据用户的设置信息来把Map任务相关操作组合成一个有机的整体从而完成作业的map操作呢?这就是本文所要讨论的重点。

任何一个Map任务都被Hadoop抽象成一个MapTask对象,因此,这个Map任务也就是在对应的MapTask中调度执行的了。老规矩,还是先来看看与Map任务真正在TaskTracker端执行相关的具体类吧!

MapTask会在runNewMapper()方法中构造与map任务相关的组件,然后调用map任务执行器Mapper的run()方法来正式的执行map操作,这个过程如下:

下面,我就结合Hadoop的源代码来集体的讲一讲上面的这个过程:

1.构造map任务执行器Mapper

map任务执行器定义了如何对作业的输入数据进行map操作,这个map任务执行器一般是由用户自定义的,但很明显它必须要继承自org.apache.hadoop.mapreduce.Mapper类,并重写它的map(K,V,Context)方法。用户在自定义了自己的map操作之后就可以调用Job的setMapperClass()方法来设置作业的map操作类型。因此,MapTask在构造该任务执行器时通过一条简单的语句就可实例化一个mapper类型的对象:

org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE>mapper= (org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE>)ReflectionUtils.newInstance(taskContext.getMapperClass(), job);

2.构造记录读取器RecordReader

记录读取器RecordReader用来读取map任务的原始输入数据,并把这些数据构造成一个key-value结合,这些key-value就是任务执行器Mapper中map(K,V,Context)方法需要的K,V输入。记录读取器RecordReader的创建可能有一点点的复杂,它首先必须来自一个输入格式化类InputFormat的实现,同时InputFormat的实现必须要定义如何对作业的原始输入数据进行切分,每一个切分被保存到对应的InputSplit实现对象中,也就是说一个InputFormat的实现对应一个RecordReader实现和一个InputSplit实现,然后我们可以再对这个RecordReader进行进一步的包装。所以在Hadoop的0.2.2版本实现中MapTask用到的最顶层的记录读取器的实现是NewTractingRecordReader,这个RecordReader实现就是简单地对最原始的RecordReader(直接来自于InputFormat的实现)进行了包装,另外它还包含一个任务报告器,因为可以根据输入数据的读取量来估计当前任务的执行进度。看一下它的代码实现:

//获取用户的InputFormat实现的实例

org.apache.hadoop.mapreduce.InputFormat<INKEY,INVALUE>inputFormat= (org.apache.hadoop.mapreduce.InputFormat<INKEY,INVALUE>) ReflectionUtils.newInstance(taskContext.getInputFormatClass(), job);

//获取用户的InputSplit实现的实例,同时读入map任务对应的输入数据的位置等信息

org.apache.hadoop.mapreduce.InputSplit split = null;

DataInputBuffer splitBuffer = new DataInputBuffer();

splitBuffer.reset(rawSplit.getBytes(), 0, rawSplit.getLength());

SerializationFactory factory = new SerializationFactory(job);

Deserializer<? extends org.apache.hadoop.mapreduce.InputSplit> deserializer = (Deserializer<? extends org.apache.hadoop.mapreduce.InputSplit>) factory.getDeserializer(job.getClassByName(splitClass));

deserializer.open(splitBuffer);

split = deserializer.deserialize(null);

//创建记录读取器

org.apache.hadoop.mapreduce.RecordReader<INKEY,INVALUE> input = new NewTrackingRecordReader<INKEY,INVALUE> (inputFormat.createRecordReader(split, taskContext), reporter);

3.构造记录写入器(map任务输出收集器)RecordWriter

在Hadoop的0.2.2版本实现中MapTask用到的map任务输出收集器有两个实现:NewOutputCollector和NewDirectOutputCollector,当作业中没有设置reduce操作的时候就采用NewDirectOutputCollector实现,否则就采用NewDirectOutputCollector实现;当时无论是NewOutputCollector还是NewDirectOutputCollector,它们都包含一个任务报告器,这是因为可以通过map输出数据的收集情况来估计任务的执行进度。至于这个任务输出收集器在其内部到底干了些什么事,如排序、合并等,我将会后面的博文中讨论。

4.组装Map上下文执行环境Context

组装Map上下文执行环境Context很简单,将是要将与map相关的主要组件记录读取器RecordReader、记录写入器RecordWriter、任务报告器等放入Context对象中。

5.初始化记录读取器RecordReade

初始化RecordReade实际上就是对它做一些初始化工作,如:获取输入数据文件的物理位置、申请数据缓存的内存空间、打开输入文件等。

6.运行map任务执行器Maper

就是调用Maper用户实现的run()方法,该方法也很简单,代码如下:

public void run(Context context) throws IOException, InterruptedException {

setup(context);

while (context.nextKeyValue()) {

map(context.getCurrentKey(), context.getCurrentValue(), context);

}

cleanup(context);

}

我在这里只详细地介绍了与map任务在TaskTracker端真正执行这一过程,对于map任务执行完毕后向它的TaskTracker报告,以及TaskTracker向JobTracker报告等动作,我没有做任何的讨论。

相关问答

更多
  • 换个角度讲, 当C#在微软平台上成为主流开发语言的时候, python已经渐渐的成为了linux应用程序的主流开发语言之一了. 原因很简单, perl在淡出, ruby未发力, shell不够用, php不合适, java不解释. 记得Redhat 7的字符界面安装程序就是python写的, 那几乎是我第一次听说python的年代了.
  • 首先,一个job具体启动多少个map,是由你配置的inputformat来决定的。inputformat在分配任务之前会对输入进行切片。最终启动的map数目,就是切片的结果数目。具体来看 一、如果使用是自定义的inputformat,那么启动多少个map,是由你实现的public InputSplit[] getSplits(JobConf job, int numSplits)方法决定的,返回的切片有多少个就启动多少个map任务。 二、如果是使用系统系统的TextInputFormat(或FileInpu ...
  • 因为Hadoop-B上的MapReduce服务没启动的起来。 你可以查下Hadoop-B上的tasktracker日志,看有没有报错。
  • 默认情况下,2.4.x安装中没有map reduce的配置文件,即使有一个名为mapred-site.xml.template的文件。将文件重命名为mapred-site.xml并记住设置属性mapred.framework。命名为classic以使用作业跟踪器和tasktracker。也不能使用启动脚本start-all.sh,因为它执行脚本start-dfs.sh和start-yarn.sh.You需要执行启动jobtracker和脚本的脚本的TaskTracker。 By default there ...
  • 没有JobTracker和TaskTracker了。 我们有NodeManager和resourceManager。 在这里你刚开始dfs服务没有启动纱线服务,启动纱线服务运行start-yarn.sh然后只有纱线相关服务才会启动。 如果你想启动所有服务运行start-all.sh (不是一个好习惯) There is no JobTracker and TaskTracker anymore. We have NodeManager and resourceManager. Here you just s ...
  • 此博客文章可能会有所帮助: http://western-skies.blogspot.com/2010/11/fix-for-exceeded-maxfaileduniquefetches.html 简而言之,即使您在配置文件中指定IP地址,Hadoop也会执行反向主机名查找。 在您的环境中,为了使Hadoop正常工作,SSP-SANDBOX-1.mysite.com必须解析为该机器的IP地址,并且该IP地址的反向查找必须解析为SSP-SANDBOX-1.mysite .COM。 因此,您需要与管理这些计 ...
  • 嗨@xiaomo当你说“似乎所有的地图任务都在同一时间运行”时我真的不明白你想要什么? 你可以再详细一点吗? 粘贴您的工作流程的完整日志? 我猜你想要在每个节点中使用2张地图。 如果你只在集群上运行一个作业,我不认为这个问题会发生。 但是,如果您在集群中运行了2个作业,并且mapreduce.tasktracker.map.tasks.maximum设置为2,那么每个节点将运行2个映射,每个作业一个。 Hi @xiaomo I dont really understand when you said "it ...
  • 修改/etc/hosts以包含主机名环回映射: 127.0.0.1 localhost localhost.localdomain localhost4 localhost4.localdomain4 127.0.1.1 is-joshbloom-hadoop ::1 localhost localhost.localdomain localhost6 localhost6.localdomain6 您的问题是您的机器不知道如何解析主机名is-joshbloom-hadoop到特定 ...
  • 如果您使用的是具有YARN框架的Hadoop 2.2.0,则其中没有jobtracker 。 它的功能被拆分并由ResourceManager和ApplicationMaster取代。 这是运行YARN时预期的jps打印输出 $jps 18509 Jps 17107 NameNode 17170 DataNode 17252 ResourceManager 17309 NodeManager 17626 JobHistoryServer If you are using Hadoop 2.2.0 whic ...
  • 您必须为namenode设置主机排除文件。 在core-site.xml中进行此编辑: dfs.hosts.exclude YOUR_PATH_TO_THE_EXCLUDE_FILE 该文件基本上类似于从属文件或主文件。 您只需插入主机名,如: host1 host2 重新启动namenode时将忽略这些给定的主机,但jobtracker将启动tasktracker。 You have to s ...