知识点
相关文章
更多最近更新
更多Hadoop map reduce 过程获取环境变量
2019-03-28 13:33|来源: 网络
Hadoop任务执行过程中,在每一个map节点或者reduce节点能获取一下环境变量,利用这些变量可以为特殊的需求服务,例如:获取当前map节点处理的数据文件的路径。
hadoop是java实现的,利用java可以很方便的获取相关环境变量,其内部包含在Context和MRJobConfig中(hadoop版本不一样,可能会有区别,我的hadoop是0.21)。
举例:
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
public class MergeDaysMapper extends Mapper<LongWritable, Text, Text, Text> {
private String inputFile = null;
public void setup(Context context)
{
System.err.println("[STARTS TO GET PARAMETERS OF THIS JOB]");
Path input = ((FileSplit)context.getInputSplit()).getPath();
inputFile = input.toString();
System.err.println("Input: "+ input.toString());
System.out.println("Input: "+ input.getName());
System.out.println("MAP_INPUT_FILE: " + MRJobConfig.MAP_INPUT_FILE);
System.out.println("MAP_INPUT_PATH:"+ MRJobConfig.MAP_INPUT_PATH);
System.out.println("MAP_INPUT_START:"+ MRJobConfig.MAP_INPUT_START);
System.err.println("JOB_NAME" + MRJobConfig.JOB_NAME);
System.out.println("[FINISHED GETTING PARAMETERS OF THIS JOB]");
}
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException
{
if (null == inputFile)
context.write(new Text("key"), new Text("inputFile"));
else
context.write(new Text("key"), new Text(inputFile));
}
}
同时,在streaming任务中也有同样的需求,需要获取相关环境变量,查过别人的资料,如下:
{{
streaming框架通过设置环境变量的方式给mapper、reducer程序传递配置信息。常用的环境变量如下:
HADOOP_HOME |
计算节点上配置的Hadoop路径 |
LD_LIBRARY_PATH |
计算节点上加载库文件的路径列表 |
PWD |
当前工作目录 |
dfs_block_size |
当前设置的HDFS文件块大小 |
map_input_file |
mapper正在处理的输入文件路径 |
mapred_job_id |
作业ID |
mapred_job_name |
作业名 |
mapred_tip_id |
当前任务的第几次重试 |
mapred_task_id |
任务ID |
mapred_task_is_map |
当前任务是否为map |
mapred_output_dir |
计算输出路径 |
mapred_map_tasks |
计算的map任务数 |
mapred_reduce_tasks |
计算的reduce任务数 |
}}
自己测试了一下,不对,又是版本问题,查了http://hadoop.apache.org/mapreduce/docs/r0.21.0/streaming.html#How+do+I+get+the+JobConf+variables+in+a+streaming+job%27s+mapper%2Freducer%3F
解决如下:
Name | Type | Description |
---|---|---|
mapreduce.job.id | String | The job id |
mapreduce.job.jar | String | job.jar location in job directory |
mapreduce.job.local.dir | String | The job specific shared scratch space |
mapreduce.task.id | String | The task id |
mapreduce.task.attempt.id | String | The task attempt id |
mapreduce.task.ismap | boolean | Is this a map task |
mapreduce.task.partition | int | The id of the task within the job |
mapreduce.map.input.file | String | The filename that the map is reading from |
mapreduce.map.input.start | long | The offset of the start of the map input split |
mapreduce.map.input.length | long | The number of bytes in the map input split |
mapreduce.task.output.dir | String | The task's temporary output directory |
例子:
#!/bin/sh
while read line
do
echo "$line"
echo $mapreduce_map_input_file
done
测试通过
相关问答
更多-
在各个slave(datanode)上面有会有Map和Reduce执行代码。 在Job提交时,会打包该job的配置文件类文件,jar文件等,拷 贝到各个datanode上面,做本地执行的。
-
hadoop hive中map与reduce的一些疑问[2022-05-03]
找到离存数据最近的一台机器运行和这个数据相关的map任务,reduce是按照你整理出的key有多少个来决定的。一个机器很难说,处理的快的处理多一点,保持所有机器使用平衡。 上面你都自己写了20个map,和文件大小个数有关,和数据条数无关。 要看你选择的输入格式是什么,默认是行偏移量,然后由你编写map函数,指定key和value是什么。相同的key整合起来传给reduce,由reduce进行下一步处理,最后输出到指定的地方。 -
如何确定 Hadoop map和reduce的个数[2022-02-20]
map的数量 map的数量通常是由hadoop集群的DFS块大小确定的,也就是输入文件的总块数,正常的map数量的并行规模大致是每一个Node是10~100个,对于CPU消耗较小的作业可以设置Map数量为300个左右,但是由于hadoop的每一个任务在初始化时需要一定的时间,因此比较合理的情况是每个map执行的时间至少超过1分钟。具体的数据分片是这样的,InputFormat在默认情况下会根据hadoop集群的DFS块大小进行分片,每一个分片会由一个map任务来进行处理,当然用户还是可以通过参数mapred ... -
你是否期望每个reducer能够在完全相同的映射数据上工作? 但至少“钥匙”应该是不同的,因为它决定了哪个减速器要去。 您可以在mapper中多次输出输出,并将其输出为密钥(其中$ i代表第i个缩减器,$ key是您的原始密钥)。 您需要添加一个“分区程序”以确保这些记录是基于$ i分布在还原器中的。 然后使用“GroupingComparator”按原始$ key对记录进行分组。 有可能做到这一点,但不是在一个MR中以微不足道的方式。 Are you expecting every reducer to ...
-
JobTracker Web UI为您提供了非常有用的报告,可以比较每个映射器和reducer的可用日志。 另请查看hadoop-test.jar存档中的mrbench类。 网上有大量有关Hadoop集群基准测试用法的信息,如本文所述 。 JobTracker web UI gives you very useful reports which allow to compare everything up to available logs for every mapper and reducer. Als ...
-
hadoop-streaming的-file选项仅适用于本地文件。 但请注意,其帮助文本提到-file标志已被弃用,以支持generic -files选项。 使用generic -files选项允许我们指定一个远程(hdfs / gs)文件来进行舞台。 另请注意,通用选项必须位于应用程序特定标志之前。 您的调用将变为: hadoop jar /usr/lib/hadoop-mapreduce/hadoop-streaming.jar \ -files gs://bucket-name/intro_t ...
-
我会说使用Hadoop,但不是直接使用,而是通过Clojure的Cascalog 。 这里的价值主张是Hadoop为您提供的所有内容以及出色的声明性查询语言(即使任务相对较小,也可能使Cascalog值得使用;在本地模式下使用Hadoop进行设置完全没有问题)。 最初的介绍性博客文章仍然是最好的起点(尽管现在有很好的文档 - 请参阅GitHub上的wiki):第一个是在这里 ,它最后链接到第二个。 为了让您体验它的外观,这里是第一个教程的片段(找到跟随者比他们所关注的人更老的所有“跟随”关系): (?<- ...
-
这是除了最佳之外的一切,因为地图输出必须始终复制到另一台服务器。 但您可以简单地修改服务器上的mapred-site.xml。
mapred.tasktracker.map.tasks.maximum 4 The maximum number of map tasks that will be run simultaneously by a task tracker. 在分布式Hadoop集群中,每个Map / Reduce任务都在其自己的独立JVM中运行。 因此,无法在不同JVM(甚至不同节点)上运行的不同类实例之间共享静态变量。 但是,如果要在任务之间共享一些不可变数据,可以使用Configuration类: // driver code Configuration config = Configuration.create(); config.setLong("foo.bar.somelong",1337); ... // mapper code public c ...我通过以下方式在Eclipse中开发Cassandra / Hadoop应用程序: 使用maven(m2e)为我的Eclipse项目收集和配置依赖项(Hadoop,Cassandra,Pig等) 创建测试用例(src / test / java中的类)来测试我的映射器和缩减器。 诀窍是使用扩展RecordWriter和StatusReporter的内部类动态构建上下文对象。 如果执行此操作,则在调用setup / map / cleanup或setup / reduce / cleanup之后,您可以断言正 ...