Hadoop0.20+ custom MultipleOutputFormat

2019-03-28 13:45|来源: 网络

Hadoop0.20.2中无法使用MultipleOutputFormat,多文件输出这个方法。尽管0.19.2中的方法老的方法org.apache.hadoop.mapred.lib.MultipleOutputFormat还是可以继续在0.20.2中使用,但是org.apache.hadoop.mapred下的方法都是标记为“已过时”,在hadoop下个版本中可能就不能使用了。hadoop 0.20.2中是推荐使用Configuration替换JobConf,而这个老的方法org.apache.hadoop.mapred.lib.MultipleOutputFormat中还是使用的JobConf,就是说还没有新的可替换API。

此外hadoop 0.20.2还只是一个中间版本,并不是所有API都升级到最新了,没有提供的API只能自己写。

重写MultipleOutputFormat需要2个类:

LineRecordWriter

MultipleOutputFormat

PartitionByFilenameOutputFormat是实验中需要自定义的每个文件各自输出结果

LineRecordWriter:

  1. package cn.xmu.dm;   
  2.   
  3. import java.io.DataOutputStream;   
  4. import java.io.IOException;   
  5. import java.io.UnsupportedEncodingException;   
  6. import org.apache.hadoop.io.NullWritable;   
  7. import org.apache.hadoop.io.Text;   
  8. import org.apache.hadoop.mapreduce.RecordWriter;   
  9. import org.apache.hadoop.mapreduce.TaskAttemptContext;   
  10. import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;   
  11.   
  12. public class LineRecordWriter<K, V> extends RecordWriter<K, V> {   
  13.     private static final String utf8 = "UTF-8";   
  14.   
  15.     protected DataOutputStream out;   
  16.     private final byte[] keyValueSeparator;   
  17.   
  18.     public LineRecordWriter(DataOutputStream out, String keyValueSeparator) {   
  19.         this.out = out;   
  20.         try {   
  21.             this.keyValueSeparator = keyValueSeparator.getBytes(utf8);   
  22.         } catch (UnsupportedEncodingException uee) {   
  23.             throw new IllegalArgumentException("can't find " + utf8   
  24.                     + " encoding");   
  25.         }   
  26.     }   
  27.   
  28.     public LineRecordWriter(DataOutputStream out) {   
  29.         this(out, "/t");   
  30.     }   
  31.   
  32.     private void writeObject(Object o) throws IOException {   
  33.         if (o instanceof Text) {   
  34.             Text to = (Text) o;   
  35.             out.write(to.getBytes(), 0, to.getLength());   
  36.         } else {   
  37.             out.write(o.toString().getBytes(utf8));   
  38.         }   
  39.     }   
  40.   
  41.     public synchronized void write(K key, V value) throws IOException {   
  42.         boolean nullKey = key == null || key instanceof NullWritable;   
  43.         boolean nullValue = value == null || value instanceof NullWritable;   
  44.         if (nullKey && nullValue) {   
  45.             return;   
  46.         }   
  47.         if (!nullKey) {   
  48.             writeObject(key);   
  49.         }   
  50.         if (!(nullKey || nullValue)) {   
  51.             out.write(keyValueSeparator);   
  52.         }   
  53.         if (!nullValue) {   
  54.             writeObject(value);   
  55.         }   
  56.         out.write("\r\n".getBytes());   
  57.     }   
  58.   
  59.     public synchronized void close(TaskAttemptContext context)   
  60.             throws IOException {   
  61.         out.close();   
  62.     }   
  63. }  

相关问答

更多
  • 1. 下载Hadoop源代码 2. 准备编译环境 2.1. 系统 CentOS5.5 2.2. Hadoop代码版本 hadoop-0.20.2-release 2.3. 联网 编译Hadoop 会依赖很多第三方库,但编译工具Ant 会自动从网上下载缺少的库,所以必须保证机器能够访问Internet。 2.4. java 编译Hadoop要用JDK1.6 以上 安装好之后,请设置好JAVA_HOME 环境变量。 2.5. Ant 需要使用Ant 工具来编译Hadoop, Ant 安装好之后,请设置好ANT_ ...
  • 1.为了在名称节点更改log4j.properties,可以更改/home/hadoop/log4j.properties。 2.为了更改容器日志的log4j.properties,您需要在容器jar中更改它,因为它们硬编码直接从项目资源加载文件。 2.1 ssh到奴隶(在EMR上,你也可以简单地将它添加为引导操作,所以你不需要ssh到每个节点)。 ssh到hadoop奴隶 2.2在jar资源上覆盖container-log4j.properties: jar uf /home/hadoop/share/h ...
  • Eclipse插件主要用于提交和监视作业以及与HDFS交互,与真实或“伪”群集交互。 如果你在本地模式下运行,那么我认为插件不会带来任何好处 - 因为你的工作将在一个JVM中运行。 考虑到这一点,我会说在Eclipse项目的类路径中包括最新的1.x hadoop-core。 无论如何, MultipleOutputFormat还没有被移植到新的mapreduce包中(在1.1.2或2.0.4-alpha中都没有),所以你需要自己移植它或者找另一种方式(也许是MultipleOutputs - Javadoc ...
  • 您可以根据需要向DataSet程序添加任意数量的数据接收器。 例如,在这样的程序中: ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); DataSet> data = env.readFromCsv(...); // apply MapFunction and emit data.map(new YourMapper()).writeToText("/f ...
  • MapReduce和Tez作业都使用YARN(Yet Another Resource Negotiator)在所谓的容器中通过集群进行分发和执行。 您也可以自己使用YARN来运行自己的工作。 请查看Hadoop架构概述 ,以获得高级概述。 Both MapReduce and Tez jobs use YARN (Yet Another Resource Negotiator) to get distributed and executed over the cluster in so-called co ...
  • 看起来像write(DataOutput)方法中的错误: @Override public void write(DataOutput arg0) throws IOException { //write the size first // arg0.write(aggValues.size()); // here you're writing an int as a byte // try this instead: arg0.writeInt(aggValues.size()); // ...
  • 我发现我也可以覆盖getInputFileBasedOutputFileName方法,并在那里给它子路径。 @Override protected String getInputFileBasedOutputFileName(JobConf conf, String Name) { //your logic goes here. Simply addd the sub path to the name and return } 您仍应实现generateFileNameForKeyVal ...
  • 使用MultipleOutputFormat类,输出文件名可以从键和reducer的reducer输出值中推导出来。 MultipleOutputFormat#generateFileNameForKeyValue必须在用户定义的OutputFormat类中实现。 static class MyMultipleOutputFormat extends MultipleOutputFormat { protected String generateFileNameForKeyV ...
  • 我想这取决于你对新API的意思 - 在1.1.1中至少不再这样做了 - 我想我已经记得读过整个mapred包已经过早弃用了,这在以后的版本中是不推荐使用的。 如果通过新API,你的意思是mapreduce包over mapred,那么OutputFormats本身有一个关联的OutputCommitter,它通过OutputFormat.getOutputCommitter方法获取 I guess it depends on what you mean by new API - in 1.1.1 at le ...
  • 对MultipleOutputs的支持不在0.20。 您需要使用旧版API。 它已被添加到0.21,它目前还没有发布为org.apache.hadoop.mapreduce.lib.output.MultipleOutputs。 邮件列表中的这个主题讨论了这个问题。 Support for MultipleOutputs isn't in 0.20. You will need to use the older API. It has been added into 0.21 which is curren ...