在Hadoop的streaming中使用自定义的inputformat和outputformat

2019-03-28 13:52|来源: 网络

Hadoop的streaming中有一个选项是指定输入输出格式化的:

  1. -inputformat TextInputFormat(default)|SequenceFileAsTextInputFormat|JavaClassName Optional.  
  2. -outputformat TextOutputFormat(default)|JavaClassName  Optional.  
但是在0.14版本之后,hadoop不再支持带多个jar包文件,所以,如果要使用自己定义的Inputformat或者Outputformat,就得将对应的class文件加入到hadoop-streaming-1.0.1.jar中去,比如:
  1. jar uf ../../contrib/streaming/hadoop-streaming-1.0.1.jar org/apache/hadoop/streaming/*.class   

然后在-inputformat后面就可以直接带类名了。

下面通过一个例子来说明下,实现Map的输入<key,value>,key为文件名,value为文档的整篇内容:

1.定义自己的InputFormat:

ContentRecordReder.java

  1. package org.apache.hadoop.streaming;  
  2.   
  3. import java.io.IOException;  
  4.   
  5. //import org.apache.commons.logging.Log;   
  6. //import org.apache.commons.logging.LogFactory;   
  7. import org.apache.hadoop.conf.Configuration;  
  8. import org.apache.hadoop.fs.FSDataInputStream;  
  9. import org.apache.hadoop.fs.FileSystem;  
  10. import org.apache.hadoop.fs.Path;  
  11. import org.apache.hadoop.io.Text;  
  12. import org.apache.hadoop.io.compress.CompressionCodecFactory;  
  13. import org.apache.hadoop.mapred.FileSplit;  
  14. import org.apache.hadoop.mapred.RecordReader;  
  15.   
  16. import com.sun.org.apache.commons.logging.Log;  
  17. import com.sun.org.apache.commons.logging.LogFactory;  
  18.   
  19. public class ContentRecordReder implements RecordReader<Text,Text> {  
  20.     private static final Log LOG = LogFactory.getLog(ContentRecordReder.class.getName());    
  21.     private CompressionCodecFactory compressionCodecs = null;    
  22.     private long start;    
  23.     private long pos;    
  24.     private long end;    
  25.     private byte[] buffer;    
  26.     private String keyName;    
  27.     private FSDataInputStream fileIn;    
  28.         
  29.     public ContentRecordReder(Configuration job,FileSplit split) throws IOException{    
  30.         start = split.getStart(); //从中可以看出每个文件是作为一个split的     
  31.         end = split.getLength() + start;  
  32.         final Path path = split.getPath();  
  33.         keyName = path.toString();    
  34.         LOG.info("filename in hdfs is : " + keyName);    
  35.         System.out.println("filename in hdfs is : " + keyName);  
  36.         final FileSystem fs = path.getFileSystem(job);    
  37.         fileIn = fs.open(path);    
  38.         fileIn.seek(start);    
  39.         buffer = new byte[(int)(end - start)];    
  40.         this.pos = start;  
  41.   
  42.     }    
  43.     
  44.     public Text createKey() {    
  45.         return new Text();    
  46.     }    
  47.     
  48.     public Text createValue() {    
  49.         return new Text();    
  50.     }    
  51.     
  52.     public long getPos() throws IOException{    
  53.         return pos;    
  54.     }    
  55.     
  56.     public float getProgress() {    
  57.         if (start == end) {    
  58.             return 0.0f;    
  59.         } else {    
  60.             return Math.min(1.0f, (pos - start) / (float)(end - start));    
  61.         }    
  62.     }    
  63.     
  64.     public boolean next(Text key, Text value) throws IOException{    
  65.         while(pos < end) {    
  66.             key.set(keyName);    
  67.             value.clear();    
  68.             fileIn.readFully(pos,buffer);    
  69.             value.set(buffer);    
  70.             LOG.info("---内容: " + value.toString());    
  71.             System.out.println("---内容: " + value.toString());  
  72.             pos += buffer.length;    
  73.             LOG.info("end is : " + end  + " pos is : " + pos);    
  74.             return true;    
  75.         }    
  76.         return false;    
  77.     }    
  78.     
  79.     public void close() throws IOException{    
  80.         if(fileIn != null) {    
  81.             fileIn.close();    
  82.         }    
  83.             
  84.     }    
  85. }   
ContentInputFormat.java
  1. package org.apache.hadoop.streaming;  
  2.   
  3. import java.io.IOException;  
  4.   
  5. import org.apache.hadoop.fs.FileSystem;  
  6. import org.apache.hadoop.fs.Path;  
  7. import org.apache.hadoop.io.Text;  
  8. import org.apache.hadoop.io.compress.CompressionCodecFactory;  
  9. import org.apache.hadoop.mapred.FileSplit;  
  10. import org.apache.hadoop.mapred.JobConf;  
  11. import org.apache.hadoop.mapred.JobConfigurable;  
  12. import org.apache.hadoop.mapred.Reporter;  
  13. import org.apache.hadoop.mapred.InputSplit;  
  14. import org.apache.hadoop.mapred.RecordReader;  
  15. import org.apache.hadoop.mapred.FileInputFormat;  
  16.   
  17. public class ContentInputFormat extends FileInputFormat<Text,Text>{  
  18.     private long mySplitSize = 1024*1024;  
  19.     private CompressionCodecFactory compressionCodecs = null;    
  20.     public void configure(JobConf conf) {    
  21.         compressionCodecs = new CompressionCodecFactory(conf);    
  22.     }  
  23.       
  24.     /**  
  25.      * @brief isSplitable 不对文件进行切分,必须对文件整体进行处理  
  26.      *  
  27.      * @param fs  
  28.      * @param file  
  29.      *  
  30.      * @return false  
  31.      */    
  32.     protected boolean isSplitable(FileSystem fs, Path file) {    
  33.         return false;   
  34.     }    
  35.     
  36.     public RecordReader<Text,Text> getRecordReader(InputSplit genericSplit,    
  37.                             JobConf job, Reporter reporter) throws IOException{    
  38.         reporter.setStatus(genericSplit.toString());    
  39.         ContentRecordReder contentRecordReder = new ContentRecordReder(job,(FileSplit)genericSplit);  
  40.         return (RecordReader<Text, Text>) contentRecordReder;  
  41.     }  
  42.   
2.编译
  1. javac -classpath ~/hadoop-1.0.1/hadoop-core-1.0.1.jar:~/hadoop-1.0.1/lib/*:./con  
  2. tent-record-reader.jar ./*.java -Xlint:deprecation

3.并加入stream的jar包

  1. jar uf ../../contrib/streaming/hadoop-streaming-1.0.1.jar org/apache/hadoop/streaming/*.class

4.Mapper.cpp

  1. #include <iostream>  
  2. #include <string>   
  3. using namespace std;  
  4.   
  5.   
  6. int main()  
  7. {  
  8.     string key,value;  
  9.     char ch;  
  10.     cin>>key;  
  11.     value = "";  
  12.     while(cin>>ch&&!cin.eof()){  
  13.         value.append(1,ch);  
  14.     }  
  15.     cout<<key<<"\t"<<value<<endl;  
  16.   
  17.     return 0;  
  18. }
5.Reducer.cpp
  1. #include <iostream>  
  2. #include <map>   
  3.   
  4. using namespace std;  
  5.   
  6. int main() {  
  7.     map<string,string> wordMap;  
  8.      map<string,string>::iterator it;  
  9.     string key;  
  10.     string value;  
  11.   
  12.     while(cin>>key>>value) {  
  13.         //可以在这里对value即文档做处理...   
  14.         wordMap[key] +=value;  
  15.     }  
  16.   
  17.     for(it=wordMap.begin();it != wordMap.end();it++) {//输出   
  18.         cout<<it->first<<"\t"<<it->second<<endl;  
  19.     }  
  20.     return 0;  
  21. }  
6.执行
  1. bin/hadoop jar contrib/streaming/hadoop-streaming-1.0.1.jar \  
  2. -mapper /home/guoguo/hadoop-1.0.1/cTest/C++/Mapper \  
  3. -file /home/guoguo/hadoop-1.0.1/cTest/C++/Mapper \  
  4. -inputformat ContentInputFormat \  
  5. -reducer /home/guoguo/hadoop-1.0.1/cTest/C++/Reducer \  
  6. -file /home/guoguo/hadoop-1.0.1/cTest/C++/Reducer \  
  7. -input input \  
  8. -output stream-output  

7.ok~ 

8.补充:如果文档是XML格式,可以用StreamXmlRecordReader,具体做法就是用到hadoop-streaming.jar的inputreader选项,如:

  1. <span style="font-size:16px;">-inputreader  "StreamXmlRecordReader,begin=<Store>,end=</Store>"</span>  

其中“<Store>”,“</Store>”为XML文件的开始标签和结束标签

更多Hadoop相关信息见 Hadoop 专题页面 http://www.linuxidc.com/topicnews.aspx?tid=13

相关问答

更多