Hadoop map reduce 过程获取环境变量

2019-03-28 13:33|来源: 网络

Hadoop任务执行过程中,在每一个map节点或者reduce节点能获取一下环境变量,利用这些变量可以为特殊的需求服务,例如:获取当前map节点处理的数据文件的路径。

hadoop是java实现的,利用java可以很方便的获取相关环境变量,其内部包含在Context和MRJobConfig中(hadoop版本不一样,可能会有区别,我的hadoop是0.21)。

举例:

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;


public class MergeDaysMapper extends Mapper<LongWritable, Text, Text, Text> {

private String inputFile = null;

public void setup(Context context)
{
System.err.println("[STARTS TO GET PARAMETERS OF THIS JOB]");
Path input = ((FileSplit)context.getInputSplit()).getPath();
inputFile = input.toString();
System.err.println("Input: "+ input.toString());
System.out.println("Input: "+ input.getName());
System.out.println("MAP_INPUT_FILE: " + MRJobConfig.MAP_INPUT_FILE);
System.out.println("MAP_INPUT_PATH:"+ MRJobConfig.MAP_INPUT_PATH);
System.out.println("MAP_INPUT_START:"+ MRJobConfig.MAP_INPUT_START);
System.err.println("JOB_NAME" + MRJobConfig.JOB_NAME);
System.out.println("[FINISHED GETTING PARAMETERS OF THIS JOB]");
}
    
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException
{
if (null == inputFile)
   context.write(new Text("key"), new Text("inputFile"));
else
context.write(new Text("key"), new Text(inputFile));
}
}


同时,在streaming任务中也有同样的需求,需要获取相关环境变量,查过别人的资料,如下:

{{

streaming框架通过设置环境变量的方式给mapper、reducer程序传递配置信息。常用的环境变量如下:

HADOOP_HOME

计算节点上配置的Hadoop路径

LD_LIBRARY_PATH

计算节点上加载库文件的路径列表

PWD

当前工作目录

dfs_block_size

当前设置的HDFS文件块大小

map_input_file

mapper正在处理的输入文件路径

mapred_job_id

作业ID

mapred_job_name

作业名

mapred_tip_id

当前任务的第几次重试

mapred_task_id

任务ID

mapred_task_is_map

当前任务是否为map

mapred_output_dir

计算输出路径

mapred_map_tasks

计算的map任务数

mapred_reduce_tasks

计算的reduce任务数

}}


自己测试了一下,不对,又是版本问题,查了http://hadoop.apache.org/mapreduce/docs/r0.21.0/streaming.html#How+do+I+get+the+JobConf+variables+in+a+streaming+job%27s+mapper%2Freducer%3F

解决如下:

Name Type Description
mapreduce.job.id String The job id
mapreduce.job.jar String job.jar location in job directory
mapreduce.job.local.dir String The job specific shared scratch space
mapreduce.task.id String The task id
mapreduce.task.attempt.id String The task attempt id
mapreduce.task.ismap boolean Is this a map task
mapreduce.task.partition int The id of the task within the job
mapreduce.map.input.file String The filename that the map is reading from
mapreduce.map.input.start long The offset of the start of the map input split
mapreduce.map.input.length long The number of bytes in the map input split
mapreduce.task.output.dir String The task's temporary output directory
相关参数在streaming中“."用”_"代替即可。

例子:

#!/bin/sh


while read line
do
  echo "$line"
  echo $mapreduce_map_input_file
done


测试通过

相关问答

更多
  • 在各个slave(datanode)上面有会有Map和Reduce执行代码。 在Job提交时,会打包该job的配置文件类文件,jar文件等,拷 贝到各个datanode上面,做本地执行的。
  • 找到离存数据最近的一台机器运行和这个数据相关的map任务,reduce是按照你整理出的key有多少个来决定的。一个机器很难说,处理的快的处理多一点,保持所有机器使用平衡。 上面你都自己写了20个map,和文件大小个数有关,和数据条数无关。 要看你选择的输入格式是什么,默认是行偏移量,然后由你编写map函数,指定key和value是什么。相同的key整合起来传给reduce,由reduce进行下一步处理,最后输出到指定的地方。
  • map的数量 map的数量通常是由hadoop集群的DFS块大小确定的,也就是输入文件的总块数,正常的map数量的并行规模大致是每一个Node是10~100个,对于CPU消耗较小的作业可以设置Map数量为300个左右,但是由于hadoop的每一个任务在初始化时需要一定的时间,因此比较合理的情况是每个map执行的时间至少超过1分钟。具体的数据分片是这样的,InputFormat在默认情况下会根据hadoop集群的DFS块大小进行分片,每一个分片会由一个map任务来进行处理,当然用户还是可以通过参数mapred ...
  • 你是否期望每个reducer能够在完全相同的映射数据上工作? 但至少“钥匙”应该是不同的,因为它决定了哪个减速器要去。 您可以在mapper中多次输出输出,并将其输出为密钥(其中$ i代表第i个缩减器,$ key是您的原始密钥)。 您需要添加一个“分区程序”以确保这些记录是基于$ i分布在还原器中的。 然后使用“GroupingComparator”按原始$ key对记录进行分组。 有可能做到这一点,但不是在一个MR中以微不足道的方式。 Are you expecting every reducer to ...
  • JobTracker Web UI为您提供了非常有用的报告,可以比较每个映射器和reducer的可用日志。 另请查看hadoop-test.jar存档中的mrbench类。 网上有大量有关Hadoop集群基准测试用法的信息,如本文所述 。 JobTracker web UI gives you very useful reports which allow to compare everything up to available logs for every mapper and reducer. Als ...
  • hadoop-streaming的-file选项仅适用于本地文件。 但请注意,其帮助文本提到-file标志已被弃用,以支持generic -files选项。 使用generic -files选项允许我们指定一个远程(hdfs / gs)文件来进行舞台。 另请注意,通用选项必须位于应用程序特定标志之前。 您的调用将变为: hadoop jar /usr/lib/hadoop-mapreduce/hadoop-streaming.jar \ -files gs://bucket-name/intro_t ...
  • 我会说使用Hadoop,但不是直接使用,而是通过Clojure的Cascalog 。 这里的价值主张是Hadoop为您提供的所有内容以及出色的声明性查询语言(即使任务相对较小,也可能使Cascalog值得使用;在本地模式下使用Hadoop进行设置完全没有问题)。 最初的介绍性博客文章仍然是最好的起点(尽管现在有很好的文档 - 请参阅GitHub上的wiki):第一个是在这里 ,它最后链接到第二个。 为了让您体验它的外观,这里是第一个教程的片段(找到跟随者比他们所关注的人更老的所有“跟随”关系): (?<- ...
  • 这是除了最佳之外的一切,因为地图输出必须始终复制到另一台服务器。 但您可以简单地修改服务器上的mapred-site.xml。 mapred.tasktracker.map.tasks.maximum 4 The maximum number of map tasks that will be run simultaneously by a task tracker.
  • 在分布式Hadoop集群中,每个Map / Reduce任务都在其自己的独立JVM中运行。 因此,无法在不同JVM(甚至不同节点)上运行的不同类实例之间共享静态变量。 但是,如果要在任务之间共享一些不可变数据,可以使用Configuration类: // driver code Configuration config = Configuration.create(); config.setLong("foo.bar.somelong",1337); ... // mapper code public c ...
  • 我通过以下方式在Eclipse中开发Cassandra / Hadoop应用程序: 使用maven(m2e)为我的Eclipse项目收集和配置依赖项(Hadoop,Cassandra,Pig等) 创建测试用例(src / test / java中的类)来测试我的映射器和缩减器。 诀窍是使用扩展RecordWriter和StatusReporter的内部类动态构建上下文对象。 如果执行此操作,则在调用setup / map / cleanup或setup / reduce / cleanup之后,您可以断言正 ...