Mapreduce中的RCFile输入RCFileInputFormat实现及其应用

2019-03-28 13:28|来源: 网络

基于旧的mapreduce的api的输入格式在hive中已有实现,在org.apache.Hadoop.hive.ql.io下,下面代码是根据其源码自己实现的新mapreduce api接口。上代码:RCFileInputFormat.java

  1. import java.io.IOException;  
  2. import org.apache.hadoop.hive.serde2.columnar.BytesRefArrayWritable;  
  3. import org.apache.hadoop.io.LongWritable;  
  4. import org.apache.hadoop.mapreduce.TaskAttemptContext;  
  5. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;  
  6.   
  7. /** 
  8.  * RCFileInputFormat. 
  9.  * 
  10.  * @param <K> 
  11.  * @param <V> 
  12.  */  
  13. public class RCFileInputFormat<K extends LongWritable, V extends BytesRefArrayWritable>  
  14.     extends FileInputFormat<K, V>  {  
  15.   
  16.   public RCFileInputFormat() {  
  17.   
  18.   }  
  19.   
  20.   
  21.   
  22. @SuppressWarnings("unchecked")  
  23. @Override  
  24. public org.apache.hadoop.mapreduce.RecordReader<K, V> createRecordReader(  
  25.         org.apache.hadoop.mapreduce.InputSplit arg0, TaskAttemptContext arg1)  
  26.         throws IOException, InterruptedException {  
  27.      return new RCFileRecordReader();  
  28. }  
  29. }  

 

  1. import java.io.IOException;
  2. import org.apache.hadoop.conf.Configuration;  
  3. import org.apache.hadoop.fs.FileSystem;  
  4. import org.apache.hadoop.fs.Path;  
  5. import org.apache.hadoop.hive.ql.io.RCFile;  
  6. import org.apache.hadoop.hive.ql.io.RCFile.Reader;  
  7. import org.apache.hadoop.hive.serde2.columnar.BytesRefArrayWritable;  
  8. import org.apache.hadoop.io.LongWritable;  
  9. import org.apache.hadoop.mapreduce.InputSplit;  
  10. import org.apache.hadoop.mapreduce.RecordReader;  
  11. import org.apache.hadoop.mapreduce.TaskAttemptContext;  
  12. import org.apache.hadoop.mapreduce.lib.input.FileSplit;</P><P>/** 
  13.  * RCFileRecordReader. 
  14.  *  
  15.  * @param <K> 
  16.  * @param <V> 
  17.  */  
  18. public class RCFileRecordReader<K extends LongWritable, V extends BytesRefArrayWritable>  
  19.   extends RecordReader<LongWritable, BytesRefArrayWritable> {</P><P> private Reader in;  
  20.  private long start;  
  21.  private long end;  
  22.  private boolean more = true;  
  23.  private LongWritable key = null;  
  24.  private BytesRefArrayWritable value = null;  
  25.  protected Configuration conf;</P><P> /** 
  26.   * Return the progress within the input split. 
  27.   *  
  28.   * @return 0.0 to 1.0 of the input byte range 
  29.   */  
  30.  public float getProgress() throws IOException {  
  31.   if (end == start) {  
  32.    return 0.0f;  
  33.   } else {  
  34.    return Math.min(1.0f, (in.getPosition() - start)  
  35.      / (float) (end - start));  
  36.   }  
  37.  }
  38.  public void close() throws IOException {  
  39.   in.close();  
  40.  }
  41.  @Override  
  42.  public LongWritable getCurrentKey() throws IOException,  
  43.    InterruptedException {
  44.   return key;  
  45.  }
  46.  @Override  
  47.  public BytesRefArrayWritable getCurrentValue() throws IOException,  
  48.    InterruptedException {
  49.   return value;  
  50.  }
  51.  @Override  
  52.  public void initialize(InputSplit split, TaskAttemptContext context)  
  53.    throws IOException, InterruptedException {  
  54.   FileSplit fileSplit = (FileSplit) split;  
  55.   conf = context.getConfiguration();  
  56.   Path path = fileSplit.getPath();  
  57.   FileSystem fs = path.getFileSystem(conf);  
  58.   this.in = new RCFile.Reader(fs, path, conf);  
  59.   this.end = fileSplit.getStart() + fileSplit.getLength();
  60.   if (fileSplit.getStart() > in.getPosition()) {  
  61.    in.sync(fileSplit.getStart()); // sync to start   
  62.   }
  63.   this.start = in.getPosition();  
  64.   more = start < end;  
  65.  }
  66.  @Override  
  67.  public boolean nextKeyValue() throws IOException, InterruptedException {  
  68.   if (!more) {  
  69.    return false;  
  70.   }  
  71.   if (key == null) {  
  72.    key = new LongWritable();  
  73.   }  
  74.   if (value == null) {  
  75.    value = new BytesRefArrayWritable();  
  76.   }  
  77.   more = in.next(key);  
  78.   if (!more) {  
  79.    return false;  
  80.   }  
  81.   long lastSeenSyncPos = in.lastSeenSyncPos();  
  82.   if (lastSeenSyncPos >= end) {  
  83.    more = false;  
  84.    return more;  
  85.   }  
  86.   in.getCurrentRow(value);  
  87.   return more;  
  88.  }  
  89. }  

应用方式:

job.setInputFormatClass(RCFileInputFormat.class);

public static class Map extends   Mapper<LongWritable, BytesRefArrayWritable, Text, NullWritable> {

@Override

protected void map(LongWritable key, BytesRefArrayWritable value, Context context) throws IOException,   InterruptedException {

String top = new String(value.get(32).getBytesCopy());

byte[] channel = value.get(12).getBytesCopy();

相关问答

更多
  • 您仍然可以使用MultipleInputs并传入非空路径。 它不需要指向有效的位置仍然可以工作,它只是不能为空。 我想这没关系。 You can still use MultipleInputs and just pass in a non null Path. It doesn't need to point to a valid location to still work, it just can't be null. This is ok I suppose.
  • Hadoop检查文件扩展名以检测压缩文件。 Hadoop支持的压缩类型有:gzip,bzip2和LZO。 您不需要采取任何其他操作来使用这些类型的压缩来提取文件; Hadoop为您处理它。 因此,您只需要为文本文件编写逻辑,并将包含.gz文件的目录作为输入传递。 但是gzip文件的问题在于它们不是可拆分的,假设你有每个5GB的gzip文件,那么每个映射器将处理整个5GB文件,而不是使用默认的块大小。 Hadoop checks the file extension to detect compressed ...
  • 您可以使用聚合框架 import pymongo conn = pymongo.MongoClient() db = conn.test col = db.collection for doc in col.aggregate([{'$unwind': '$impressions'}, {'$match': {'impressions.service': 'furniture'}}, {'$group': {'_id': '$impressions.id', 'impressions ...
  • 您需要将db.runCommand()的返回文档捕获到变量中,然后在脚本中检查其ok值 - 然后可以抛出错误或打印输出等。 print("it is shown"); var res = db.runCommand( { mapReduce: "mycol", map: function(){ print(not_exists); }, reduce: ...
  • 经过一番挖掘,我找到了解决方案。 这里的关键是不使用RCFile.Reader而是使用RCFileRecordReader 。 这是我最终得到的,也适用于打开多个文件: try { ...
  • scope参数“指定可在地图中访问的全局变量,减少和完成功能”( 来源 )。 这意味着你可以传递你可以在JavaScript的JavaScript代码中使用的变量, reduce和finalize 。 因此, scope不会过滤任何东西。 然而, query参数过滤类似于db.collection.find()的集合中的文档,然后mapReduce仅适用于这些集合条目。 scope示例 这是来自mapReduce 文档的示例的修改版本。 考虑使用以下命令创建的集合: db.orders.insert({ ...
  • 您可以将job1的输出作为输入链接到job2。 inputdir1 - > outputdir1 - > outputdir2 ... - > outputdir9 - > outputdir10 You can just chain the output of job1 as the input to job2. inputdir1 -> outputdir1 -> outputdir2 ... -> outputdir9 -> outputdir10
  • 1)当文件扩展名为.gz时,通常isSplitable返回false。 要么 2)您可以编写自己的InputFormat覆盖isSplitable。 要么 3)不要试图让isSplitable返回false。 而是将文件的块大小设置为大于文件大小: hadoop fs -D fs.local.block.size = 1000000000 -put local_name remote_location 1) Normally isSplitable returns false when your file ...
  • 这是我发现的: 准备管道 files_list=self.get_files_list() pipeline=FilesPipeline(mapper_files=files_list) pipeline.start(queue_name='proc-files') 管道定义 class FilesPipeline(base_handler.PipelineBase):def run(self,mapper_files = []): output_blobstore_ids = yield map ...
  • 这是安全的,因为Spark不使用MapReduce作为处理引擎,但它直接与YARN交互以提交操作。 因此,当您使用Spark时,没有安排MapReduce作业,但您有一个Spark应用程序和Spark作业。 It's safe because Spark doesn't use MapReduce as processing engine, but it interacts directly with YARN to submit operations. Thus, when you use Spark, ...