知识点
相关文章
更多最近更新
更多Hadoop 自定义InputFormat实现自定义Split
2019-03-28 13:52|来源: 网络
上一篇文章中提到了如何进行RecordReader的重写(见 http://www.linuxidc.com/Linux/2012-04/57831.htm ),本篇文章就是来实现如何实现自定义split的大小
更多Hadoop相关信息见Hadoop 专题页面 http://www.linuxidc.com/topicnews.aspx?tid=13
要解决的需求:
(1)一个文本中每一行都记录了一个文件的路径,
(2)要求处理路径对应的文件,但是因为文件量比较大,所以想进行分布式处理
(3)所以就对输入的文档进行预处理,读取前N行做为一个splits,但是没有实现,因为重写FileSplit不是太容易实现,就偷懒直接定义一个split的大小是1000个字节,这样就可以将输入的文档进行分片了。
直接贴代码:
InputFormat
- /**
- * @file LineInputFormat.java
- * @brief自定义InputFormat 实现split大小的控制
- * @author anbo, anbo724@gmail.com
- * @version 1.0
- * @date 2011-10-18
- */
- /* Copyright(C)
- * For free
- * All right reserved
- *
- */
- package an.hadoop.test;
- import java.io.IOException;
- import java.util.ArrayList;
- import java.util.List;
- import org.apache.commons.logging.Log;
- import org.apache.commons.logging.LogFactory;
- import org.apache.hadoop.fs.BlockLocation;
- import org.apache.hadoop.fs.FileStatus;
- import org.apache.hadoop.fs.FileSystem;
- import org.apache.hadoop.fs.Path;
- import org.apache.hadoop.io.LongWritable;
- import org.apache.hadoop.io.Text;
- import org.apache.hadoop.io.compress.CompressionCodec;
- import org.apache.hadoop.io.compress.CompressionCodecFactory;
- import org.apache.hadoop.mapreduce.InputFormat;
- import org.apache.hadoop.mapreduce.InputSplit;
- import org.apache.hadoop.mapreduce.JobContext;
- import org.apache.hadoop.mapreduce.RecordReader;
- import org.apache.hadoop.mapreduce.TaskAttemptContext;
- import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
- import org.apache.hadoop.mapreduce.lib.input.FileSplit;
- import org.apache.hadoop.mapreduce.lib.input.LineRecordReader;
- public class LineInputFormat extends FileInputFormat<LongWritable , Text> {
- public long mySplitSize = 1000;
- private static final Log LOG = LogFactory.getLog(FileInputFormat.class);
- private static final double SPLIT_SLOP = 1.1; // 10% slop
- @Override
- public RecordReader<LongWritable, Text>
- createRecordReader(InputSplit split,
- TaskAttemptContext context) {
- return new LineRecordReader(); //为什么不行呢
- }
- @Override
- protected boolean isSplitable(JobContext context, Path file) {
- CompressionCodec codec =
- new CompressionCodecFactory(context.getConfiguration()).getCodec(file);
- //return codec == null;
- return true;//要求分片
- }
- /**
- * Generate the list of files and make them into FileSplits.
- */
- @Override
- public List<InputSplit> getSplits(JobContext job) throws IOException {
- long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
- long maxSize = getMaxSplitSize(job);
- // generate splits
- List<InputSplit> splits = new ArrayList<InputSplit>(); //用以存放生成的split的
- for (FileStatus file: listStatus(job)) {//filestatues是文件对应的信息,具体看对应的类
- Path path = file.getPath();
- FileSystem fs = path.getFileSystem(job.getConfiguration());
- long length = file.getLen(); //得到文本的长度
- BlockLocation[] blkLocations = fs.getFileBlockLocations(file, 0, length); //取得文件所在块的位置
- if ((length != 0) && isSplitable(job, path)) { //如果文件不为空,并且可以分片的话就进行下列操作,
- long blockSize = file.getBlockSize();//
- //long splitSize = computeSplitSize(blockSize, minSize, maxSize); //split的大小Math.max(minSize, Math.min(maxSize, blockSize));
- //可以通过调整splitSize的大小来控制对应的文件块的大小,比如设置splitSize=100,那么就可以控制成每个split的大小
- //但是问题是,我是要求按行进行处理的,虽然这样应该也可以按行进行切分了,不过却不能保证每个split对应的行数都是相等的
- //一般情况是如果文件大于64M(32M)就会使用块大小来作为split
- long splitSize = mySplitSize;
- long bytesRemaining = length; //文本的长度
- while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {//剩下的文本长度大于split大小的SPLIT_SLOP倍数
- int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);//找到对应block块中对应的第0个字符开始,
- splits.add(new FileSplit(path, length-bytesRemaining, splitSize,
- blkLocations[blkIndex].getHosts()));
- //这个是形成split的代码FileSplit(文件路径,0,split大小,host)
- //原始函数为 FileSplit(Path file, long start, long length, String[] hosts) {
- //但是应该可以通过重写FileSplit来实现对应的要求
- bytesRemaining -= splitSize;
- }
- if (bytesRemaining != 0) {
- splits.add(new FileSplit(path, length-bytesRemaining, bytesRemaining,
- blkLocations[blkLocations.length-1].getHosts()));
- }
- } else if (length != 0) {
- splits.add(new FileSplit(path, 0, length, blkLocations[0].getHosts()));
- } else {
- //Create empty hosts array for zero length files
- splits.add(new FileSplit(path, 0, length, new String[0]));
- }
- }
- LOG.debug("Total # of splits: " + splits.size());
- return splits;
- }
- }
相关问答
更多-
hadoop自定义数据类型有哪些[2022-04-03]
import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.io.WritableComparable; public class Http implements WritableComparable { public Http(){ } private String value; public Http(String value) { setValue(v ... -
hadoop的自定义数据类型有哪些[2022-06-18]
= =基本数据类型不要你自己定义,一些基本数据类型的定义格式永远无法被其他类型模仿,其他的,C++尽量使它们用起来没有区别…… -
1.为了在名称节点更改log4j.properties,可以更改/home/hadoop/log4j.properties。 2.为了更改容器日志的log4j.properties,您需要在容器jar中更改它,因为它们硬编码直接从项目资源加载文件。 2.1 ssh到奴隶(在EMR上,你也可以简单地将它添加为引导操作,所以你不需要ssh到每个节点)。 ssh到hadoop奴隶 2.2在jar资源上覆盖container-log4j.properties: jar uf /home/hadoop/share/h ...
-
创建不带输入数据的自定义生成器Hadoop InputFormat(Creating a custom Generator Hadoop InputFormat without input data)[2021-11-15]
该错误清楚地表明该文件未找到 It seems that the issue of Hadoop trying to read non-existent files stems from the InputSplit. If the InputSplit doesn't define behavior for reading the data, then Hadoop defaults to it's own method. This is solved by implementing Writable f ... -
Python的Spark自定义Hadoop配置(PySpark)?(Custom Hadoop Configuration for Spark from Python (PySpark)?)[2022-11-09]
我已经解决了这个难题,因为在线删除了修改Configuration要求,并且只是基于自定义的Hadoop配置* .xml文件集。 起初我编写了Java类,它将附加层的配置添加到org.apache.hadoop.conf.Configuration默认资源中。 它的静态初始化附加配置默认resoutces: public class Configurator { static { // We initialize needed Hadoop configuration layer ... -
从文档: 弃用的接口 ... org.apache.hadoop.mapred.InputFormat 改为使用InputFormat 。 ... 由于0.20.2的怪异弃用行为,甚至在0.20.2接口后使用实现的0.20.2建议,我挖得更深一点。 该界面仍然存在于0.21.0 ,并且已弃用标签。 在撰写本文时,我无法在主干中找到类似的界面。 From the documentation: Deprecated Interfaces ... org.apache.hadoop.mapred.InputFor ...
-
用于Excel文件的自定义InputFormat或InputReader(xls)(Custom InputFormat or InputReader for Excel files(xls))[2022-11-13]
是的,您应该创建RecordReader以从Excel文档中读取每条记录。 在该记录阅读器中,你应该使用像api这样的POI来阅读excel文档。 更准确地说,请执行以下步骤: 扩展FileInputFromat并创建自己的CustomInputFrmat并覆盖getRecordReader 。 通过扩展RecordReader创建一个CustomRecordReader ,在这里你必须编写如何从给定的filesplit生成一个键值对。 因此,首先从filesplit读取字节,然后从bufferedbyte ... -
在hadoop中,我只想在每个节点上执行自己的自定义程序(In hadoop, I just want to execute my own custom program on each node)[2024-02-07]
MapReduce和Tez作业都使用YARN(Yet Another Resource Negotiator)在所谓的容器中通过集群进行分发和执行。 您也可以自己使用YARN来运行自己的工作。 请查看Hadoop架构概述 ,以获得高级概述。 Both MapReduce and Tez jobs use YARN (Yet Another Resource Negotiator) to get distributed and executed over the cluster in so-called co ... -
看起来像write(DataOutput)方法中的错误: @Override public void write(DataOutput arg0) throws IOException { //write the size first // arg0.write(aggValues.size()); // here you're writing an int as a byte // try this instead: arg0.writeInt(aggValues.size()); // ...
-
Hadoop自定义分区程序问题(Hadoop Custom Partitioner Issue)[2023-04-05]
问题最终出现在自定义密钥(IntermediaryKey)的序列化/反序列化中。 正在阅读“useBothGUIDFlag”变量,与其本应相反。 在reducer中获取“mapred.task.partition”属性值有助于注意到已发生此交换。 具有相反“useBothGUIDFlag”值的键似乎将转到正确的reducer。 The problem ended up being in the serialization/deserialization of the custom key (Intermed ...