实现MapReduce多文件自定义输出

2019-03-28 13:33|来源: 网络

普通maprduce中通常是有map和reduce两个阶段,在不做设置的情况下,计算结果会以part-000*输出成多个文件,并且输出的文件数量和reduce数量一样,文件内容格式也不能随心所欲。这样不利于后续结果处理。

Hadoop中,reduce支持多个输出,输出的文件名也是可控的,就是继承MultipleTextOutputFormat类,重写generateFileNameForKey方法。如果只是想做到输出结果的文件名可控,实现自己的LogNameMultipleTextOutputFormat类,设置jobconf.setOutputFormat(LogNameMultipleTextOutputFormat.class);就可以了,但是这种方式只限于使用旧版本的hadoop api.如果想采用新版本的api接口或者自定义输出内容的格式等等更多的需求,那么就要自己动手重写一些hadoop api了。

首先需要构造一个自己的MultipleOutputFormat类实现FileOutputFormat类(注意是org.apache.hadoop.mapreduce.lib.output包的FileOutputFormat)

  1. import java.io.DataOutputStream;  
  2. import java.io.IOException;  
  3. import java.util.HashMap;  
  4. import java.util.Iterator;  
  5.   
  6.   
  7. import org.apache.hadoop.conf.Configuration;  
  8. import org.apache.hadoop.fs.FSDataOutputStream;  
  9. import org.apache.hadoop.fs.Path;  
  10. import org.apache.hadoop.io.Writable;  
  11. import org.apache.hadoop.io.WritableComparable;  
  12. import org.apache.hadoop.io.compress.CompressionCodec;  
  13. import org.apache.hadoop.io.compress.GzipCodec;  
  14. import org.apache.hadoop.mapreduce.OutputCommitter;  
  15. import org.apache.hadoop.mapreduce.RecordWriter;  
  16. import org.apache.hadoop.mapreduce.TaskAttemptContext;  
  17. import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;  
  18. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;  
  19. import org.apache.hadoop.util.ReflectionUtils;  
  20.   
  21.   
  22. /** 
  23.  * This abstract class extends the FileOutputFormat, allowing to write the 
  24.  * output data to different output files. There are three basic use cases for 
  25.  * this class.  
  26.  * Created on 2012-07-08 
  27.  * @author zhoulongliu 
  28.  * @param <K> 
  29.  * @param <V> 
  30.  */  
  31. public abstract class MultipleOutputFormat<K extends WritableComparable<?>, V extends Writable> extends  
  32.         FileOutputFormat<K, V> {  
  33.   
  34.   
  35.    //接口类,需要在调用程序中实现generateFileNameForKeyValue来获取文件名   
  36.     private MultiRecordWriter writer = null;  
  37.   
  38.   
  39.     public RecordWriter<K, V> getRecordWriter(TaskAttemptContext job) throws IOException, InterruptedException {  
  40.         if (writer == null) {  
  41.             writer = new MultiRecordWriter(job, getTaskOutputPath(job));  
  42.         }  
  43.         return writer;  
  44.     }  
  45.   
  46.   
  47.     /** 
  48.      * get task output path 
  49.      * @param conf 
  50.      * @return 
  51.      * @throws IOException 
  52.      */  
  53.     private Path getTaskOutputPath(TaskAttemptContext conf) throws IOException {  
  54.         Path workPath = null;  
  55.         OutputCommitter committer = super.getOutputCommitter(conf);  
  56.         if (committer instanceof FileOutputCommitter) {  
  57.             workPath = ((FileOutputCommitter) committer).getWorkPath();  
  58.         } else {  
  59.             Path outputPath = super.getOutputPath(conf);  
  60.             if (outputPath == null) {  
  61.                 throw new IOException("Undefined job output-path");  
  62.             }  
  63.             workPath = outputPath;  
  64.         }  
  65.         return workPath;  
  66.     }  
  67.   
  68.   
  69.     /** 
  70.      * 通过key, value, conf来确定输出文件名(含扩展名) Generate the file output file name based 
  71.      * on the given key and the leaf file name. The default behavior is that the 
  72.      * file name does not depend on the key. 
  73.      *  
  74.      * @param key the key of the output data 
  75.      * @param name the leaf file name 
  76.      * @param conf the configure object 
  77.      * @return generated file name 
  78.      */  
  79.     protected abstract String generateFileNameForKeyValue(K key, V value, Configuration conf);  
  80.   
  81.   
  82.    /** 
  83.     * 实现记录写入器RecordWriter类 
  84.     * (内部类) 
  85.     * @author zhoulongliu 
  86.     * 
  87.     */  
  88.     public class MultiRecordWriter extends RecordWriter<K, V> {  
  89.         /** RecordWriter的缓存 */  
  90.         private HashMap<String, RecordWriter<K, V>> recordWriters = null;  
  91.         private TaskAttemptContext job = null;  
  92.         /** 输出目录 */  
  93.         private Path workPath = null;  
  94.   
  95.   
  96.         public MultiRecordWriter(TaskAttemptContext job, Path workPath) {  
  97.             super();  
  98.             this.job = job;  
  99.             this.workPath = workPath;  
  100.             recordWriters = new HashMap<String, RecordWriter<K, V>>();  
  101.         }  
  102.   
  103.   
  104.         @Override  
  105.         public void close(TaskAttemptContext context) throws IOException, InterruptedException {  
  106.             Iterator<RecordWriter<K, V>> values = this.recordWriters.values().iterator();  
  107.             while (values.hasNext()) {  
  108.                 values.next().close(context);  
  109.             }  
  110.             this.recordWriters.clear();  
  111.         }  
  112.   
  113.   
  114.         @Override  
  115.         public void write(K key, V value) throws IOException, InterruptedException {  
  116.             // 得到输出文件名   
  117.             String baseName = generateFileNameForKeyValue(key, value, job.getConfiguration());  
  118.            //如果recordWriters里没有文件名,那么就建立。否则就直接写值。   
  119.             RecordWriter<K, V> rw = this.recordWriters.get(baseName);  
  120.             if (rw == null) {  
  121.                 rw = getBaseRecordWriter(job, baseName);  
  122.                 this.recordWriters.put(baseName, rw);  
  123.             }  
  124.             rw.write(key, value);  
  125.         }  
  126.   
  127.   
  128.         // ${mapred.out.dir}/_temporary/_${taskid}/${nameWithExtension}   
  129.         private RecordWriter<K, V> getBaseRecordWriter(TaskAttemptContext job, String baseName) throws IOException,  
  130.                 InterruptedException {  
  131.             Configuration conf = job.getConfiguration();  
  132.            //查看是否使用解码器     
  133.             boolean isCompressed = getCompressOutput(job);  
  134.             String keyValueSeparator = ",";  
  135.             RecordWriter<K, V> recordWriter = null;  
  136.             if (isCompressed) {  
  137.                 Class<? extends CompressionCodec> codecClass = getOutputCompressorClass(job, GzipCodec.class);  
  138.                 CompressionCodec codec = ReflectionUtils.newInstance(codecClass, conf);  
  139.                 Path file = new Path(workPath, baseName + codec.getDefaultExtension());  
  140.                 FSDataOutputStream fileOut = file.getFileSystem(conf).create(file, false);  
  141.                 //这里我使用的自定义的OutputFormat    
  142.                 recordWriter = new LineRecordWriter<K, V>(new DataOutputStream(codec.createOutputStream(fileOut)),  
  143.                         keyValueSeparator);  
  144.             } else {  
  145.                 Path file = new Path(workPath, baseName);  
  146.                 FSDataOutputStream fileOut = file.getFileSystem(conf).create(file, false);  
  147.                 //这里我使用的自定义的OutputFormat    
  148.                 recordWriter = new LineRecordWriter<K, V>(fileOut, keyValueSeparator);  
  149.             }  
  150.             return recordWriter;  
  151.         }  
  152.     }  
  153.   
  154.   
  155. }  

相关问答

更多