知识点
相关文章
更多最近更新
更多在Hadoop的streaming中使用自定义的inputformat和outputformat
2019-03-28 13:52|来源: 网络
在Hadoop的streaming中有一个选项是指定输入输出格式化的:
- -inputformat TextInputFormat(default)|SequenceFileAsTextInputFormat|JavaClassName Optional.
- -outputformat TextOutputFormat(default)|JavaClassName Optional.
- jar uf ../../contrib/streaming/hadoop-streaming-1.0.1.jar org/apache/hadoop/streaming/*.class
然后在-inputformat后面就可以直接带类名了。
下面通过一个例子来说明下,实现Map的输入<key,value>,key为文件名,value为文档的整篇内容:
1.定义自己的InputFormat:
ContentRecordReder.java
- package org.apache.hadoop.streaming;
- import java.io.IOException;
- //import org.apache.commons.logging.Log;
- //import org.apache.commons.logging.LogFactory;
- import org.apache.hadoop.conf.Configuration;
- import org.apache.hadoop.fs.FSDataInputStream;
- import org.apache.hadoop.fs.FileSystem;
- import org.apache.hadoop.fs.Path;
- import org.apache.hadoop.io.Text;
- import org.apache.hadoop.io.compress.CompressionCodecFactory;
- import org.apache.hadoop.mapred.FileSplit;
- import org.apache.hadoop.mapred.RecordReader;
- import com.sun.org.apache.commons.logging.Log;
- import com.sun.org.apache.commons.logging.LogFactory;
- public class ContentRecordReder implements RecordReader<Text,Text> {
- private static final Log LOG = LogFactory.getLog(ContentRecordReder.class.getName());
- private CompressionCodecFactory compressionCodecs = null;
- private long start;
- private long pos;
- private long end;
- private byte[] buffer;
- private String keyName;
- private FSDataInputStream fileIn;
- public ContentRecordReder(Configuration job,FileSplit split) throws IOException{
- start = split.getStart(); //从中可以看出每个文件是作为一个split的
- end = split.getLength() + start;
- final Path path = split.getPath();
- keyName = path.toString();
- LOG.info("filename in hdfs is : " + keyName);
- System.out.println("filename in hdfs is : " + keyName);
- final FileSystem fs = path.getFileSystem(job);
- fileIn = fs.open(path);
- fileIn.seek(start);
- buffer = new byte[(int)(end - start)];
- this.pos = start;
- }
- public Text createKey() {
- return new Text();
- }
- public Text createValue() {
- return new Text();
- }
- public long getPos() throws IOException{
- return pos;
- }
- public float getProgress() {
- if (start == end) {
- return 0.0f;
- } else {
- return Math.min(1.0f, (pos - start) / (float)(end - start));
- }
- }
- public boolean next(Text key, Text value) throws IOException{
- while(pos < end) {
- key.set(keyName);
- value.clear();
- fileIn.readFully(pos,buffer);
- value.set(buffer);
- LOG.info("---内容: " + value.toString());
- System.out.println("---内容: " + value.toString());
- pos += buffer.length;
- LOG.info("end is : " + end + " pos is : " + pos);
- return true;
- }
- return false;
- }
- public void close() throws IOException{
- if(fileIn != null) {
- fileIn.close();
- }
- }
- }
- package org.apache.hadoop.streaming;
- import java.io.IOException;
- import org.apache.hadoop.fs.FileSystem;
- import org.apache.hadoop.fs.Path;
- import org.apache.hadoop.io.Text;
- import org.apache.hadoop.io.compress.CompressionCodecFactory;
- import org.apache.hadoop.mapred.FileSplit;
- import org.apache.hadoop.mapred.JobConf;
- import org.apache.hadoop.mapred.JobConfigurable;
- import org.apache.hadoop.mapred.Reporter;
- import org.apache.hadoop.mapred.InputSplit;
- import org.apache.hadoop.mapred.RecordReader;
- import org.apache.hadoop.mapred.FileInputFormat;
- public class ContentInputFormat extends FileInputFormat<Text,Text>{
- private long mySplitSize = 1024*1024;
- private CompressionCodecFactory compressionCodecs = null;
- public void configure(JobConf conf) {
- compressionCodecs = new CompressionCodecFactory(conf);
- }
- /**
- * @brief isSplitable 不对文件进行切分,必须对文件整体进行处理
- *
- * @param fs
- * @param file
- *
- * @return false
- */
- protected boolean isSplitable(FileSystem fs, Path file) {
- return false;
- }
- public RecordReader<Text,Text> getRecordReader(InputSplit genericSplit,
- JobConf job, Reporter reporter) throws IOException{
- reporter.setStatus(genericSplit.toString());
- ContentRecordReder contentRecordReder = new ContentRecordReder(job,(FileSplit)genericSplit);
- return (RecordReader<Text, Text>) contentRecordReder;
- }
- }
- javac -classpath ~/hadoop-1.0.1/hadoop-core-1.0.1.jar:~/hadoop-1.0.1/lib/*:./con
- tent-record-reader.jar ./*.java -Xlint:deprecation
3.并加入stream的jar包
- jar uf ../../contrib/streaming/hadoop-streaming-1.0.1.jar org/apache/hadoop/streaming/*.class
4.Mapper.cpp
- #include <iostream>
- #include <string>
- using namespace std;
- int main()
- {
- string key,value;
- char ch;
- cin>>key;
- value = "";
- while(cin>>ch&&!cin.eof()){
- value.append(1,ch);
- }
- cout<<key<<"\t"<<value<<endl;
- return 0;
- }
- #include <iostream>
- #include <map>
- using namespace std;
- int main() {
- map<string,string> wordMap;
- map<string,string>::iterator it;
- string key;
- string value;
- while(cin>>key>>value) {
- //可以在这里对value即文档做处理...
- wordMap[key] +=value;
- }
- for(it=wordMap.begin();it != wordMap.end();it++) {//输出
- cout<<it->first<<"\t"<<it->second<<endl;
- }
- return 0;
- }
- bin/hadoop jar contrib/streaming/hadoop-streaming-1.0.1.jar \
- -mapper /home/guoguo/hadoop-1.0.1/cTest/C++/Mapper \
- -file /home/guoguo/hadoop-1.0.1/cTest/C++/Mapper \
- -inputformat ContentInputFormat \
- -reducer /home/guoguo/hadoop-1.0.1/cTest/C++/Reducer \
- -file /home/guoguo/hadoop-1.0.1/cTest/C++/Reducer \
- -input input \
- -output stream-output
7.ok~
8.补充:如果文档是XML格式,可以用StreamXmlRecordReader,具体做法就是用到hadoop-streaming.jar的inputreader选项,如:
- <span style="font-size:16px;">-inputreader "StreamXmlRecordReader,begin=<Store>,end=</Store>"</span>
其中“<Store>”,“</Store>”为XML文件的开始标签和结束标签
更多Hadoop相关信息见 Hadoop 专题页面 http://www.linuxidc.com/topicnews.aspx?tid=13相关问答
更多-
试着回答: 先说明一下: 1. namenode负责管理目录和文件信息,真正的文件块是存放在datanode上。 2. 每个map和reduce(即task)都是java进程,默认是有单独的jvm的,所以不可能同一个类的对象会在不同节点上。 看你的描述是把namenode,datanode和jobtracker,tasktracker有点混了。 所以: 问题1. 分块存放在datanode上 问题2.inputformat是在datanode上,确切的说是在tasktracker中。每个map和reduce ...
-
创建不带输入数据的自定义生成器Hadoop InputFormat(Creating a custom Generator Hadoop InputFormat without input data)[2021-11-15]
该错误清楚地表明该文件未找到 It seems that the issue of Hadoop trying to read non-existent files stems from the InputSplit. If the InputSplit doesn't define behavior for reading the data, then Hadoop defaults to it's own method. This is solved by implementing Writable f ... -
感谢user7610 从答案中编译并且有些经过测试的示例代码版本 import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapreduce.InputFormat; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptC ...
-
Python的Spark自定义Hadoop配置(PySpark)?(Custom Hadoop Configuration for Spark from Python (PySpark)?)[2022-11-09]
我已经解决了这个难题,因为在线删除了修改Configuration要求,并且只是基于自定义的Hadoop配置* .xml文件集。 起初我编写了Java类,它将附加层的配置添加到org.apache.hadoop.conf.Configuration默认资源中。 它的静态初始化附加配置默认resoutces: public class Configurator { static { // We initialize needed Hadoop configuration layer ... -
OrcNewInputformat作为hadoop流的输入格式(OrcNewInputformat as a inputformat for hadoop streaming)[2022-04-06]
现在我的工作正常,我给错了类名。 IT is working fine now I was giving wrong classname. -
从文档: 弃用的接口 ... org.apache.hadoop.mapred.InputFormat 改为使用InputFormat 。 ... 由于0.20.2的怪异弃用行为,甚至在0.20.2接口后使用实现的0.20.2建议,我挖得更深一点。 该界面仍然存在于0.21.0 ,并且已弃用标签。 在撰写本文时,我无法在主干中找到类似的界面。 From the documentation: Deprecated Interfaces ... org.apache.hadoop.mapred.InputFor ...
-
用于Excel文件的自定义InputFormat或InputReader(xls)(Custom InputFormat or InputReader for Excel files(xls))[2022-11-13]
是的,您应该创建RecordReader以从Excel文档中读取每条记录。 在该记录阅读器中,你应该使用像api这样的POI来阅读excel文档。 更准确地说,请执行以下步骤: 扩展FileInputFromat并创建自己的CustomInputFrmat并覆盖getRecordReader 。 通过扩展RecordReader创建一个CustomRecordReader ,在这里你必须编写如何从给定的filesplit生成一个键值对。 因此,首先从filesplit读取字节,然后从bufferedbyte ... -
通过Hadoop配置对象从InputFormat返回值(Returning values from InputFormat via the Hadoop Configuration object)[2021-08-10]
使用配置是一个非常合适的解决方案(诚然,对于我不确定我理解的问题),但是一旦将作业提交给作业跟踪器,您将无法修改此值(客户端或任务端)并且期望在通信的另一侧看到变化(例如,在地图任务中设置配置值将不会持久保存到其他映射器,也不会持久保存到减速器,也不会对作业跟踪器可见)。 因此,将信息从getSplits内部传回客户端轮询循环(以查看作业何时实际完成定义输入分割)在您的示例中很好。 你使用它的更大目标或用例是什么? Using the configuration is a perfectly suitabl ... -
经过大量搜索后,我在亚马逊的一个库中找到了DynamoDBInputFormat和DynamoDBOutputFormat。 在亚马逊弹性图上减少有一个名为hive-bigbird-handler的库,它包含dynamoDB的输入和输出格式。 完整的类名是:org.apache.hadoop.hive.dynamodb.write.DynamoDBOutputFormat和org.apache.hadoop.hive.dynamodb.read.DynamoDBInputFormat 我希望这些课程对社区有 ...
-
如何在Hadoop Streaming中使用“typedbytes”或“rawbytes”?(How to use “typedbytes” or “rawbytes” in Hadoop Streaming?)[2022-11-22]
好吧,我发现了一个有效的组合,但这很奇怪。 根据文档或模仿typedbytes.py ,在本地文件系统中准备有效的typedbytes文件。 使用 hadoop jar path/to/streaming.jar loadtb path/on/HDFS.sequencefile < local/typedbytes.tb 将typedbytes包装在SequenceFile中并将其放入HDFS中,只需一步即可完成。 使用 hadoop jar path/to/streaming.jar -inputfor ...