Hadoop 自定义InputFormat实现自定义Split

2019-03-28 13:52|来源: 网络

上一篇文章中提到了如何进行RecordReader的重写(见 http://www.linuxidc.com/Linux/2012-04/57831.htm ),本篇文章就是来实现如何实现自定义split的大小

更多Hadoop相关信息见Hadoop 专题页面 http://www.linuxidc.com/topicnews.aspx?tid=13

要解决的需求:

(1)一个文本中每一行都记录了一个文件的路径,

(2)要求处理路径对应的文件,但是因为文件量比较大,所以想进行分布式处理

(3)所以就对输入的文档进行预处理,读取前N行做为一个splits,但是没有实现,因为重写FileSplit不是太容易实现,就偷懒直接定义一个split的大小是1000个字节,这样就可以将输入的文档进行分片了。

直接贴代码:

InputFormat

  1. /** 
  2. * @file LineInputFormat.java 
  3. * @brief自定义InputFormat 实现split大小的控制 
  4. * @author anbo, anbo724@gmail.com 
  5. * @version 1.0 
  6. * @date 2011-10-18 
  7. */  
  8. /* Copyright(C) 
  9. * For free 
  10. * All right reserved 
  11. * 
  12. */   
  13.   
  14.   
  15. package an.hadoop.test;  
  16.   
  17.   
  18. import java.io.IOException;  
  19. import java.util.ArrayList;  
  20. import java.util.List;  
  21.   
  22. import org.apache.commons.logging.Log;   
  23. import org.apache.commons.logging.LogFactory;  
  24. import org.apache.hadoop.fs.BlockLocation;  
  25. import org.apache.hadoop.fs.FileStatus;  
  26. import org.apache.hadoop.fs.FileSystem;  
  27. import org.apache.hadoop.fs.Path;  
  28. import org.apache.hadoop.io.LongWritable;  
  29. import org.apache.hadoop.io.Text;  
  30. import org.apache.hadoop.io.compress.CompressionCodec;  
  31. import org.apache.hadoop.io.compress.CompressionCodecFactory;  
  32. import org.apache.hadoop.mapreduce.InputFormat;  
  33. import org.apache.hadoop.mapreduce.InputSplit;  
  34. import org.apache.hadoop.mapreduce.JobContext;  
  35. import org.apache.hadoop.mapreduce.RecordReader;  
  36. import org.apache.hadoop.mapreduce.TaskAttemptContext;  
  37. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;  
  38. import org.apache.hadoop.mapreduce.lib.input.FileSplit;  
  39. import org.apache.hadoop.mapreduce.lib.input.LineRecordReader;  
  40.   
  41. public class LineInputFormat extends FileInputFormat<LongWritable , Text> {  
  42.       
  43.     public long mySplitSize = 1000;  
  44.       
  45.      private static final Log LOG = LogFactory.getLog(FileInputFormat.class);  
  46.   
  47.       private static final double SPLIT_SLOP = 1.1;   // 10% slop   
  48.   
  49.      @Override  
  50.       public RecordReader<LongWritable, Text>   
  51.         createRecordReader(InputSplit split,  
  52.                            TaskAttemptContext context) {  
  53.         return new LineRecordReader(); //为什么不行呢    
  54.       }  
  55.       
  56.     @Override  
  57.     protected boolean isSplitable(JobContext context, Path file) {  
  58.         CompressionCodec codec =  
  59.         new CompressionCodecFactory(context.getConfiguration()).getCodec(file);  
  60.         //return codec == null;   
  61.         return true;//要求分片   
  62.     }  
  63.       
  64.      /**  
  65.        * Generate the list of files and make them into FileSplits. 
  66.        */   
  67.     @Override  
  68.       public List<InputSplit> getSplits(JobContext job) throws IOException {  
  69.         long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));  
  70.         long maxSize = getMaxSplitSize(job);  
  71.   
  72.         // generate splits   
  73.         List<InputSplit> splits = new ArrayList<InputSplit>(); //用以存放生成的split的     
  74.         for (FileStatus file: listStatus(job)) {//filestatues是文件对应的信息,具体看对应的类   
  75.           Path path = file.getPath();  
  76.           FileSystem fs = path.getFileSystem(job.getConfiguration());  
  77.           long length = file.getLen(); //得到文本的长度   
  78.           BlockLocation[] blkLocations = fs.getFileBlockLocations(file, 0, length); //取得文件所在块的位置   
  79.           if ((length != 0) && isSplitable(job, path)) { //如果文件不为空,并且可以分片的话就进行下列操作,   
  80.             long blockSize = file.getBlockSize();//   
  81.             //long splitSize = computeSplitSize(blockSize, minSize, maxSize); //split的大小Math.max(minSize, Math.min(maxSize, blockSize));   
  82.             //可以通过调整splitSize的大小来控制对应的文件块的大小,比如设置splitSize=100,那么就可以控制成每个split的大小   
  83.             //但是问题是,我是要求按行进行处理的,虽然这样应该也可以按行进行切分了,不过却不能保证每个split对应的行数都是相等的   
  84.             //一般情况是如果文件大于64M(32M)就会使用块大小来作为split   
  85.             long splitSize = mySplitSize;  
  86.             long bytesRemaining = length; //文本的长度   
  87.               
  88.             while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {//剩下的文本长度大于split大小的SPLIT_SLOP倍数   
  89.               int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);//找到对应block块中对应的第0个字符开始,   
  90.               splits.add(new FileSplit(path, length-bytesRemaining, splitSize,    
  91.                                        blkLocations[blkIndex].getHosts()));   
  92.             //这个是形成split的代码FileSplit(文件路径,0,split大小,host)   
  93.               //原始函数为 FileSplit(Path file, long start, long length, String[] hosts) {   
  94.               //但是应该可以通过重写FileSplit来实现对应的要求   
  95.               bytesRemaining -= splitSize;  
  96.             }  
  97.               
  98.             if (bytesRemaining != 0) {  
  99.               splits.add(new FileSplit(path, length-bytesRemaining, bytesRemaining,   
  100.                          blkLocations[blkLocations.length-1].getHosts()));  
  101.             }  
  102.           } else if (length != 0) {  
  103.             splits.add(new FileSplit(path, 0, length, blkLocations[0].getHosts()));  
  104.           } else {   
  105.             //Create empty hosts array for zero length files   
  106.             splits.add(new FileSplit(path, 0, length, new String[0]));  
  107.           }  
  108.         }  
  109.         LOG.debug("Total # of splits: " + splits.size());  
  110.         return splits;  
  111.       }  
  112.   
  113.       
  114.   
  115.       
  116.       
  117.       
  118.   
  119. }  

相关问答

更多
  • import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.io.WritableComparable; public class Http implements WritableComparable { public Http(){ } private String value; public Http(String value) { setValue(v ...
  • = =基本数据类型不要你自己定义,一些基本数据类型的定义格式永远无法被其他类型模仿,其他的,C++尽量使它们用起来没有区别……
  • 1.为了在名称节点更改log4j.properties,可以更改/home/hadoop/log4j.properties。 2.为了更改容器日志的log4j.properties,您需要在容器jar中更改它,因为它们硬编码直接从项目资源加载文件。 2.1 ssh到奴隶(在EMR上,你也可以简单地将它添加为引导操作,所以你不需要ssh到每个节点)。 ssh到hadoop奴隶 2.2在jar资源上覆盖container-log4j.properties: jar uf /home/hadoop/share/h ...
  • 该错误清楚地表明该文件未找到 It seems that the issue of Hadoop trying to read non-existent files stems from the InputSplit. If the InputSplit doesn't define behavior for reading the data, then Hadoop defaults to it's own method. This is solved by implementing Writable f ...
  • 我已经解决了这个难题,因为在线删除了修改Configuration要求,并且只是基于自定义的Hadoop配置* .xml文件集。 起初我编写了Java类,它将附加层的配置添加到org.apache.hadoop.conf.Configuration默认资源中。 它的静态初始化附加配置默认resoutces: public class Configurator { static { // We initialize needed Hadoop configuration layer ...
  • 从文档: 弃用的接口 ... org.apache.hadoop.mapred.InputFormat 改为使用InputFormat 。 ... 由于0.20.2的怪异弃用行为,甚至在0.20.2接口后使用实现的0.20.2建议,我挖得更深一点。 该界面仍然存在于0.21.0 ,并且已弃用标签。 在撰写本文时,我无法在主干中找到类似的界面。 From the documentation: Deprecated Interfaces ... org.apache.hadoop.mapred.InputFor ...
  • 是的,您应该创建RecordReader以从Excel文档中读取每条记录。 在该记录阅读器中,你应该使用像api这样的POI来阅读excel文档。 更准确地说,请执行以下步骤: 扩展FileInputFromat并创建自己的CustomInputFrmat并覆盖getRecordReader 。 通过扩展RecordReader创建一个CustomRecordReader ,在这里你必须编写如何从给定的filesplit生成一个键值对。 因此,首先从filesplit读取字节,然后从bufferedbyte ...
  • MapReduce和Tez作业都使用YARN(Yet Another Resource Negotiator)在所谓的容器中通过集群进行分发和执行。 您也可以自己使用YARN来运行自己的工作。 请查看Hadoop架构概述 ,以获得高级概述。 Both MapReduce and Tez jobs use YARN (Yet Another Resource Negotiator) to get distributed and executed over the cluster in so-called co ...
  • 看起来像write(DataOutput)方法中的错误: @Override public void write(DataOutput arg0) throws IOException { //write the size first // arg0.write(aggValues.size()); // here you're writing an int as a byte // try this instead: arg0.writeInt(aggValues.size()); // ...
  • 问题最终出现在自定义密钥(IntermediaryKey)的序列化/反序列化中。 正在阅读“useBothGUIDFlag”变量,与其本应相反。 在reducer中获取“mapred.task.partition”属性值有助于注意到已发生此交换。 具有相反“useBothGUIDFlag”值的键似乎将转到正确的reducer。 The problem ended up being in the serialization/deserialization of the custom key (Intermed ...