Hadoop:mapreduce程序reduce输出控制

2019-03-28 14:00|来源: 网络

1,Hadoop中,reduce支持多个输出,输出的文件名也是可控的,就是继承MultipleTextOutputFormat类,重写generateFileNameForKey方法

[java]
  1. public class LzoHandleLogMr extends Configured implements Tool {  
  2.   
  3.      static class LzoHandleLogMapper extends MapReduceBase implements Mapper<LongWritable, Text, Text, Text> {  
  4.          
  5.         
  6.         public void map(LongWritable key, Text value, OutputCollector<Text, Text> output, Reporter reporter)  
  7.                 throws IOException {  
  8.             try {  
  9.                 String[] sp = value.toString().split(",");  
  10.                 output.collect(new Text(sp[0]), value);  
  11.             }catch (Exception e) {  
  12.                e.printStackTrace();  
  13.             }         
  14.         }  
  15.   
  16.   
  17.     }  
  18.     static class LzoHandleLogReducer  extends MapReduceBase implements Reducer<Text, Text, Text, NullWritable> {  
  19.           
  20.   
  21.   
  22.         @Override  
  23.         public void reduce(Text key, Iterator<Text> values,  
  24.                 OutputCollector<Text, NullWritable> output, Reporter reporter)  
  25.                 throws IOException {  
  26.             while (values.hasNext()) {  
  27.                   output.collect(values.next(), NullWritable.get());     
  28.                }  
  29.               
  30.         }     
  31.     }  
  32.       
  33.     public static class LogNameMultipleTextOutputFormat extends MultipleTextOutputFormat<Text, NullWritable>   
  34.        {  
  35.   
  36.   
  37.         @Override  
  38.         protected String generateFileNameForKeyValue(Text key,  
  39.                 NullWritable value, String name) {  
  40.             String sp[] = key.toString().split(",");  
  41.             String filename = sp[0];  
  42.             if(sp[0].contains(".")) filename="000000000000";  
  43.             return filename;  
  44.         }  
  45.           
  46.     }  
  47.       
  48.   
  49.   
  50.     @Override  
  51.     public int run(String[] args) throws Exception {  
  52.            
  53.             JobConf jobconf = new JobConf(LzoHandleLogMr.class);  
  54.             jobconf.setMapperClass(LzoHandleLogMapper.class);  
  55.             jobconf.setReducerClass(LzoHandleLogReducer.class);  
  56.             jobconf.setOutputFormat(LogNameMultipleTextOutputFormat.class);  
  57.             jobconf.setOutputKeyClass(Text.class);  
  58.             jobconf.setNumReduceTasks(12);  
  59.               
  60.               
  61.          FileInputFormat.setInputPaths(jobconf,new Path(args[0]));  
  62.             FileOutputFormat.setOutputPath(jobconf,new Path(args[1]));  
  63.             FileOutputFormat.setCompressOutput(jobconf, true);  
  64.             FileOutputFormat.setOutputCompressorClass(jobconf, LzopCodec.class);    
  65.               
  66.             JobClient.runJob(jobconf);  
  67.           return 0;  
  68.               
  69.     }  
  70. }  

在新版本的hadoopAPI是通过Job类来设置各种参数的,但是我调用 Job.setOutputFormatClass()来使用MultipleTextOutputFormat的时候,竟然报错,原因是必须继承子org.apache.hadoop.mapreduce.OutputFormat。0.20.2比较致命的其中一个bug, 升级到0.21能解决


2, 如果同一行数据,需要同时输出至多个文件的话,我们可以使用MultipleOutputs类:

  1. public class MultiFile extends Confi gured implements Tool {  
  2.     public static class MapClass extends MapReduceBase  
  3.         implements Mapper<LongWritable, Text, NullWritable, Text> {  
  4.             private MultipleOutputs mos;  
  5.   
  6.             private OutputCollector<NullWritable, Text> collector;  
  7.             public void confi gure(JobConf conf) {  
  8.                 mos = new MultipleOutputs(conf);  
  9.             }  
  10.   
  11.             public void map(LongWritable key, Text value,  
  12.                     OutputCollector<NullWritable, Text> output,  
  13.                     Reporter reporter) throws IOException {  
  14.                 String[] arr = value.toString().split(",", -1);  
  15.                 String chrono = arr[0] + "," + arr[1] + "," + arr[2];  
  16.                 String geo = arr[0] + "," + arr[4] + "," + arr[5];  
  17.                 collector = mos.getCollector("chrono", reporter);  
  18.                 collector.collect(NullWritable.get(), new Text(chrono));  
  19.                 collector = mos.getCollector("geo", reporter);  
  20.                 collector.collect(NullWritable.get(), new Text(geo));  
  21.             }  
  22.   
  23.             public void close() throws IOException {  
  24.                 mos.close();  
  25.             }  
  26.     }  
  27.   
  28.     public int run(String[] args) throws Exception {  
  29.         Confi guration conf = getConf();  
  30.         JobConf job = new JobConf(conf, MultiFile.class);  
  31.         Path in = new Path(args[0]);  
  32.         Path out = new Path(args[1]);  
  33.         FileInputFormat.setInputPaths(job, in);  
  34.         FileOutputFormat.setOutputPath(job, out);  
  35.         job.setJobName("MultiFile");  
  36.         job.setMapperClass(MapClass.class);  
  37.         job.setInputFormat(TextInputFormat.class);  
  38.         job.setOutputKeyClass(NullWritable.class);  
  39.         job.setOutputValueClass(Text.class);  
  40.         job.setNumReduceTasks(0);  
  41.         MultipleOutputs.addNamedOutput(job,  
  42.                 "chrono",  
  43.                 TextOutputFormat.class,  
  44.                 NullWritable.class,  
  45.                 Text.class);  
  46.         MultipleOutputs.addNamedOutput(job,  
  47.                 "geo",  
  48.                 TextOutputFormat.class,  
  49.                 NullWritable.class,  
  50.                 Text.class);  
  51.         JobClient.runJob(job);  
  52.         return 0;  
  53.     }  
  54. }  

这个类维护了一个<name, OutputCollector>的map。我们可以在job配置里添加collector,然后在reduce方法中,取得对应的collector并调用collector.write即可。

相关问答

更多
  • 一、 首先要知道此前提 转载 若在windows的Eclipse工程中直接启动mapreduc程序,需要先把hadoop集群的配置目录下的xml都拷贝到src目录下,让程序自动读取集群的地址后去进行分布式运行(您也可以自己写java代码去设置job的configuration属性)。
  • 都可以,简单的直接用txt打开java文件,写好后打包成class文件,就可以运行了。你看他原来在哪里放class文件的,你就放在那里
  • 您可以编写一个扩展TextOutputFormat并覆盖getRecordWriter()方法的自定义类。 在其中,您可以将输出写入本地fs。 You can write a custom class that extends TextOutputFormat and override getRecordWriter() method. Within that, you can write the output to a local fs.
  • 所以我在你提到的最后一条评论中做了这个方法,Amar。 我不知道它是否有效,我的HDFS还有其他问题,但在我忘记之前让我们为了文明而放在这里我的发现: http://archive.cloudera.com/cdh/3/hadoop-0.20.2+228/api/org/apache/hadoop/mapreduce/lib/output/MultipleOutputs.html MultipleOutputs不代替FormatOutputFormat。 使用FormatOutputFormat定义一个输出 ...
  • 您可以编写可以作为映射器值发出的自定义类型。 但无论你想要作为值发出什么,都必须实现可写接口。 你可以这样做: public class MyObj implements WritableComparable{ private String date; private Double balance; public String getDate() { return date;} public Double getBalance() { return bala ...
  • 您尝试使用的文件位于本地Linux文件系统上,而不是HDFS上。 所以使用可以使用简单的命令,如: cat /home/Kumar/DEV/Eclipse/eclipse/Workspace/MyFirstMapReduce/Files/input/file1 如果MapReduce程序需要HDFS上的输入文件,则将相同的文件放在作业中的HDFS和HDFS路径上。 您可以使用此命令将文件放在hdfs上: hadoop fs -put
  • 当作业被提交执行时,Mapreduce框架会维护计数器。 这些计数器向用户显示,以便了解作业统计数据并查看基准和性能分析。 你的工作输出显示了一些计数器。 关于计数器的权威指南第8章有一个很好的解释,我建议你检查一次。 要解释您要求的物品, 1)所有地图花费的总时间 - 以毫秒为单位运行地图任务所花费的总时间。 包括以推测方式开始的任务(推测意味着在等待指定时间后运行失败或缓慢的作业,在哀叹条件中,推测作业意味着重新运行任何特定的地图任务)。 2)所有减少所花费的总时间 - 以毫秒为单位运行减少任务所花费的 ...
  • 我不确定它是哪个配置属性但是当我从集群中获取配置并创建一个配置对象时,它工作得很好。 I am not exactly sure which configuration property it was but when I took the cofiguration from cluster and create a Configuration objects out of that than it works just fine.
  • 这有点违反直觉,但它实际上已在API文档中记录 - Hadoop重用键/值,如果你想保留它们,你应该克隆它们。 It's somewhat counter-intuitive, but it's actually documented in the API docs -- Hadoop reuses the keys / values, you should clone them if you want to keep them around.
  • MapReduce的作用可以称为“执行引擎”。 Pig作为一个系统正在将Pig Latin命令转换为一个或多个MR Jobs。 Pig本身没有能力运行它 - 它将这项工作委托给Hadoop。 我会在编译器和操作系统之间建立类比。 OS执行时编译器创建程序。 在这个比喻中,Pig是编译器,Hadoop是OS。 猪做的更多 - 它运行作业,监视它们等等。所以除了编译器之外,它可以被视为“shell”。 在我的理解中,从以下角度看,Pig不是100%编译器 - 它不会根据命令编译MR作业。 它传递有关应该对已存在 ...