Hadoop学习总结:Map-Reduce的过程解析

2019-03-28 14:19|来源: 网络

一、客户端

Map-Reduce的过程首先是由客户端提交一个任务开始的。

提交任务主要是通过JobClient.runJob(JobConf)静态函数实现的:

public static RunningJob runJob(JobConf job) throws IOException {

  //首先生成一个JobClient对象

  JobClient jc = new JobClient(job);

  ……

  //调用submitJob来提交一个任务

  running = jc.submitJob(job);

  JobID jobId = running.getID();

  ……

  while (true) {

     //while循环中不断得到此任务的状态,并打印到客户端console中

  }

  return running;

}

其中JobClient的submitJob函数实现如下:

public RunningJob submitJob(JobConf job) throws FileNotFoundException,

                                InvalidJobConfException, IOException {

  //从JobTracker得到当前任务的id

  JobID jobId = jobSubmitClient.getNewJobId();

  //准备将任务运行所需要的要素写入HDFS:

  //任务运行程序所在的jar封装成job.jar

  //任务所要处理的input split信息写入job.split

  //任务运行的配置项汇总写入job.xml

  Path submitJobDir = new Path(getSystemDir(), jobId.toString());

  Path submitJarFile = new Path(submitJobDir, "job.jar");

  Path submitSplitFile = new Path(submitJobDir, "job.split");

  //此处将-libjars命令行指定的jar上传至HDFS

  configureCommandLineOptions(job, submitJobDir, submitJarFile);

  Path submitJobFile = new Path(submitJobDir, "job.xml");

  ……

  //通过input format的格式获得相应的input split,默认类型为FileSplit

  InputSplit[] splits =

    job.getInputFormat().getSplits(job, job.getNumMapTasks());

 

  // 生成一个写入流,将input split得信息写入job.split文件

  FSDataOutputStream out = FileSystem.create(fs,

      submitSplitFile, new FsPermission(JOB_FILE_PERMISSION));

  try {

    //写入job.split文件的信息包括:split文件头,split文件版本号,split的个数,接着依次写入每一个input split的信息。

    //对于每一个input split写入:split类型名(默认FileSplit),split的大小,split的内容(对于FileSplit,写入文件名,此split 在文件中的起始位置),split的location信息(即在那个DataNode上)。

    writeSplitsFile(splits, out);

  } finally {

    out.close();

  }

  job.set("mapred.job.split.file", submitSplitFile.toString());

  //根据split的个数设定map task的个数

  job.setNumMapTasks(splits.length);

  // 写入job的配置信息入job.xml文件      

  out = FileSystem.create(fs, submitJobFile,

      new FsPermission(JOB_FILE_PERMISSION));

  try {

    job.writeXml(out);

  } finally {

    out.close();

  }

  //真正的调用JobTracker来提交任务

  JobStatus status = jobSubmitClient.submitJob(jobId);

  ……

}

 

相关问答

更多
  • 问题是客户端使用的是不同版本的Hadoop API(0.23.0),然后是Hadoop安装。 The problem was that client was using different version of Hadoop API (0.23.0) then the Hadoop instalation.
  • JobTracker Web UI为您提供了非常有用的报告,可以比较每个映射器和reducer的可用日志。 另请查看hadoop-test.jar存档中的mrbench类。 网上有大量有关Hadoop集群基准测试用法的信息,如本文所述 。 JobTracker web UI gives you very useful reports which allow to compare everything up to available logs for every mapper and reducer. Als ...
  • 终于搞定了,实际上我改变了 conf.setOutputKeyComparatorClass(TaggedJoiningGroupingComparator.class); 至 conf.setOutputValueGroupingComparator(TaggedJoiningGroupingComparator.class); 也来自hadoop API文档。 - setOutputValueGroupingComparator(Class the ...
  • 它发生的默认选项取代了我的本地配置(我仍然不明白为什么)。 export HADOOP_CLIENT_OPTS="-Xmx1024m" 解决了这个问题。 It happened that the default options were superseding my local configuration (I still don't understand why). export HADOOP_CLIENT_OPTS="-Xmx1024m" solved the problem.
  • 根据给出的分区数量,分区程序定义哪个分区转到哪个分区。 它的工作不是设置分区的数量,而是设置其内容。 每个减少任务然后处理一个分区,因此最后,分区数量=减少任务数量=输出文件数量(使用默认设置而不是MultipleOutputs)。 为了设置分区的数量,你应该使用: job.setNumReduceTasks(n); ,其中n是你想要的数字。 有关如何设置此号码的说明(拇指规则,没有严格规定),您可以阅读本文 。 The partitioner, given the number of partitions ...
  • 谢谢你指出这个问题。 这是一个例子中的错误。 已提交HADOOP-109来解决此问题。 我通过将子拆分器类作为所有输入集合的null来修复该问题。 请参阅此拉取请求 。 为了使其正常工作,请确保将以下参数传递给Hadoop: -D mongo.splitter.class=com.mongodb.hadoop.splitter.MultiMongoCollectionSplitter 以上修复是一种解决方法。 为您提供有关该问题的更多详细信息。 根据设计,子分割器类可以是除MultiMongoCollec ...
  • 我建议通过Pig或Hive运行它,因为你可以用几行来解决这种问题。 如果做不到这一点,我会做以下事情。 在已连接的数据上运行另一个MapReduce作业,并执行以下操作:在映射器中,对于每个输入拆分,保留每个国家/地区ID处理的最小订单,最大订单和元组数(具有唯一用户ID的行)的选项卡。 只有少数几个国家/地区,因此您可以在整个地图工作中将这些统计信息保存在内存中。 在拆分结束时,将累计的统计数据输出到按国家/地区ID键入的reducer。 然后,reducer简单地组合来自每个拆分的聚合数据,以找到全局m ...
  • 我建议你用scala编程来获得spark 。 如果你在mapreduce编程,那么它只对hadoop很有用,但是在scala使用spark编程可以让你在spark和hadoop 。 Spark被启动来处理mapreduce模型中的缺点。 您可以在此主题上找到许多资源。 其中之一就是这个 关于你的问题,我建议你使用dataframe 第一项任务是为数据框架创建schema 。 val schema = StructType(Array(StructField("OgId", StringType), St ...
  • 可以使用SQOOP和HIVE 。 您可以使用SQOOP将数据从mysql表传输到HDFS,然后再传输到HIVE 。 从HIVE(操作后),您可以将表导出回Mysql。 示例: 首先下载mysql-connector-java-5.0.8并将jar放到Sqoop的lib和bin文件夹中 在Hive中创建具有精确字段名称和类型的表定义,如在mysql中 sqoop import --verbose --fields-terminated-by',' - connect jdbc:mysql:// localho ...
  • 从作业跟踪器中,确定执行此任务的hadoop节点。 SSH到该节点并识别hadoop.log.dir目录的位置(检查此节点的mapred-site.xml) - 我的猜测是hadoop用户没有在此文件夹中创建子目录的正确权限 它尝试创建的实际文件夹位于$ {hadoop.log.dir} / userlogs文件夹下 - 检查此文件夹是否具有正确的权限 在你的情况下,看看ps输出,我猜这是你需要检查权限的文件夹: /home/hadoopmachine/hadoop-1.0.1/libexec/../log ...