
2019-03-28 13:28|来源: 网络

基于旧的mapreduce的api的输入格式在hive中已有实现,在org.apache.Hadoop.hive.ql.io下,下面代码是根据其源码自己实现的新mapreduce api接口。上代码

  1. import;  
  2. import org.apache.hadoop.hive.serde2.columnar.BytesRefArrayWritable;  
  3. import;  
  4. import org.apache.hadoop.mapreduce.TaskAttemptContext;  
  5. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;  
  7. /** 
  8.  * RCFileInputFormat. 
  9.  * 
  10.  * @param <K> 
  11.  * @param <V> 
  12.  */  
  13. public class RCFileInputFormat<K extends LongWritable, V extends BytesRefArrayWritable>  
  14.     extends FileInputFormat<K, V>  {  
  16.   public RCFileInputFormat() {  
  18.   }  
  22. @SuppressWarnings("unchecked")  
  23. @Override  
  24. public org.apache.hadoop.mapreduce.RecordReader<K, V> createRecordReader(  
  25.         org.apache.hadoop.mapreduce.InputSplit arg0, TaskAttemptContext arg1)  
  26.         throws IOException, InterruptedException {  
  27.      return new RCFileRecordReader();  
  28. }  
  29. }  


  1. import;
  2. import org.apache.hadoop.conf.Configuration;  
  3. import org.apache.hadoop.fs.FileSystem;  
  4. import org.apache.hadoop.fs.Path;  
  5. import;  
  6. import;  
  7. import org.apache.hadoop.hive.serde2.columnar.BytesRefArrayWritable;  
  8. import;  
  9. import org.apache.hadoop.mapreduce.InputSplit;  
  10. import org.apache.hadoop.mapreduce.RecordReader;  
  11. import org.apache.hadoop.mapreduce.TaskAttemptContext;  
  12. import org.apache.hadoop.mapreduce.lib.input.FileSplit;</P><P>/** 
  13.  * RCFileRecordReader. 
  14.  *  
  15.  * @param <K> 
  16.  * @param <V> 
  17.  */  
  18. public class RCFileRecordReader<K extends LongWritable, V extends BytesRefArrayWritable>  
  19.   extends RecordReader<LongWritable, BytesRefArrayWritable> {</P><P> private Reader in;  
  20.  private long start;  
  21.  private long end;  
  22.  private boolean more = true;  
  23.  private LongWritable key = null;  
  24.  private BytesRefArrayWritable value = null;  
  25.  protected Configuration conf;</P><P> /** 
  26.   * Return the progress within the input split. 
  27.   *  
  28.   * @return 0.0 to 1.0 of the input byte range 
  29.   */  
  30.  public float getProgress() throws IOException {  
  31.   if (end == start) {  
  32.    return 0.0f;  
  33.   } else {  
  34.    return Math.min(1.0f, (in.getPosition() - start)  
  35.      / (float) (end - start));  
  36.   }  
  37.  }
  38.  public void close() throws IOException {  
  39.   in.close();  
  40.  }
  41.  @Override  
  42.  public LongWritable getCurrentKey() throws IOException,  
  43.    InterruptedException {
  44.   return key;  
  45.  }
  46.  @Override  
  47.  public BytesRefArrayWritable getCurrentValue() throws IOException,  
  48.    InterruptedException {
  49.   return value;  
  50.  }
  51.  @Override  
  52.  public void initialize(InputSplit split, TaskAttemptContext context)  
  53.    throws IOException, InterruptedException {  
  54.   FileSplit fileSplit = (FileSplit) split;  
  55.   conf = context.getConfiguration();  
  56.   Path path = fileSplit.getPath();  
  57.   FileSystem fs = path.getFileSystem(conf);  
  58. = new RCFile.Reader(fs, path, conf);  
  59.   this.end = fileSplit.getStart() + fileSplit.getLength();
  60.   if (fileSplit.getStart() > in.getPosition()) {  
  61.    in.sync(fileSplit.getStart()); // sync to start   
  62.   }
  63.   this.start = in.getPosition();  
  64.   more = start < end;  
  65.  }
  66.  @Override  
  67.  public boolean nextKeyValue() throws IOException, InterruptedException {  
  68.   if (!more) {  
  69.    return false;  
  70.   }  
  71.   if (key == null) {  
  72.    key = new LongWritable();  
  73.   }  
  74.   if (value == null) {  
  75.    value = new BytesRefArrayWritable();  
  76.   }  
  77.   more =;  
  78.   if (!more) {  
  79.    return false;  
  80.   }  
  81.   long lastSeenSyncPos = in.lastSeenSyncPos();  
  82.   if (lastSeenSyncPos >= end) {  
  83.    more = false;  
  84.    return more;  
  85.   }  
  86.   in.getCurrentRow(value);  
  87.   return more;  
  88.  }  
  89. }  



public static class Map extends   Mapper<LongWritable, BytesRefArrayWritable, Text, NullWritable> {


protected void map(LongWritable key, BytesRefArrayWritable value, Context context) throws IOException,   InterruptedException {

String top = new String(value.get(32).getBytesCopy());

byte[] channel = value.get(12).getBytesCopy();


