Hadoop自定义SdfTextInputFormat用在streaming中

2019-03-28 13:47|来源: 网络

本人在(Hadoop streaming中指定自定义的inputformat java类 http://www.linuxidc.com/Linux/2012-04/57830.htm)中写了自定义InputFormat的制作。

可是直接加入后一直得不到想要的结果。

查看源码发现:

  1. PipeMapper.java  
  2.   
  3.         if (!this.ignoreKey) {  
  4.           write(key);  
  5.           clientOut_.write(getInputSeparator());  
  6.         }  
  7.         write(value);  
  8.         clientOut_.write('\n');  
  9.         if(skipping) {  
  10.           //flush the streams on every record input if running in skip mode   
  11.           //so that we don't buffer other records surrounding a bad record.   
  12.           clientOut_.flush();  
  13.         }  
只有TextInputFormat时,streaming才默认只处理Value,其他inputFormat,key和value都要处理,改写PipeMapper.java

并重新生成streaming.jar只需要在MANIFEST.MF中指定主类就可以。

  1. MANIFEST.MF:  
  2. Manifest-Version: 1.0  
  3. Ant-Version: Apache Ant 1.7.1  
  4. Created-By: 20.1-b02 (Sun Microsystems Inc.)  
  5. Main-Class: org.apache.hadoop.streaming.HadoopStreaming  
 
  1. jar cvfm  jar/hadoop-streaming-1.0.0.jar MANIFEST.MF -C classes/ .  

更多Hadoop相关信息见Hadoop 专题页面 http://www.linuxidc.com/topicnews.aspx?tid=13

相关问答

更多
  • 在select后跟你的自定义函数就可以。 mysql中用select调用自带的now()函数: mysql> select now(); 然后mysql就会返回当前的时间。
  • 您可以在配置中设置mapreduce.input.fileinputformat.split.maxsize ,以告诉映射器您应该获得5MB的数据。 You can set mapreduce.input.fileinputformat.split.maxsize in your configuration in bytes to tell the mapper you should get 5MB of data.
  • 通过源代码挖掘后,我发现如何添加我自己的自定义指标。 它需要3件事情: 创建我自己的自定义源 。 有点像这样 在spark metrics.properties文件中启用Jmx接收器。 我使用的具体行是: *.sink.jmx.class=org.apache.spark.metrics.sink.JmxSink ,它为所有实例启用JmxSink 在SparkEnv指标系统中注册我的自定义源。 如何做的一个例子可以在这里看到 - 我之前看过这个链接,但错过了注册部分,这使我无法真正看到JVisualVM中的 ...
  • 1.为了在名称节点更改log4j.properties,可以更改/home/hadoop/log4j.properties。 2.为了更改容器日志的log4j.properties,您需要在容器jar中更改它,因为它们硬编码直接从项目资源加载文件。 2.1 ssh到奴隶(在EMR上,你也可以简单地将它添加为引导操作,所以你不需要ssh到每个节点)。 ssh到hadoop奴隶 2.2在jar资源上覆盖container-log4j.properties: jar uf /home/hadoop/share/h ...
  • MediaPlayer不支持您尝试做的事情(在无限增长的文件中播放音频)。 相反,请考虑自己解码音频并将原始PCM数据发送到AudioTrack 。 这项工作要多得多,但AudioTrack是逐步播放音频数据流的最简单方法。 What you're trying to do (play the audio in a file that keeps growing indefinitely) is not supported by MediaPlayer. Instead, look into decodin ...
  • 是CDH 5.0.0包含Hadoop 2.3。 Hadoop 2.4.0在路线图上,听起来像CDH 5.x可用。 最好。 Yes CDH 5.0.0 contains Hadoop 2.3. Hadoop 2.4.0 is on the roadmap and sounds like it will be available for CDH 5.x. Best.
  • MapReduce和Tez作业都使用YARN(Yet Another Resource Negotiator)在所谓的容器中通过集群进行分发和执行。 您也可以自己使用YARN来运行自己的工作。 请查看Hadoop架构概述 ,以获得高级概述。 Both MapReduce and Tez jobs use YARN (Yet Another Resource Negotiator) to get distributed and executed over the cluster in so-called co ...
  • 看起来像write(DataOutput)方法中的错误: @Override public void write(DataOutput arg0) throws IOException { //write the size first // arg0.write(aggValues.size()); // here you're writing an int as a byte // try this instead: arg0.writeInt(aggValues.size()); // ...
  • 问题最终出现在自定义密钥(IntermediaryKey)的序列化/反序列化中。 正在阅读“useBothGUIDFlag”变量,与其本应相反。 在reducer中获取“mapred.task.partition”属性值有助于注意到已发生此交换。 具有相反“useBothGUIDFlag”值的键似乎将转到正确的reducer。 The problem ended up being in the serialization/deserialization of the custom key (Intermed ...
  • 我想这取决于你对新API的意思 - 在1.1.1中至少不再这样做了 - 我想我已经记得读过整个mapred包已经过早弃用了,这在以后的版本中是不推荐使用的。 如果通过新API,你的意思是mapreduce包over mapred,那么OutputFormats本身有一个关联的OutputCommitter,它通过OutputFormat.getOutputCommitter方法获取 I guess it depends on what you mean by new API - in 1.1.1 at le ...