知识点
相关文章
更多最近更新
更多实现MapReduce多文件自定义输出
2019-03-28 13:33|来源: 网络
普通maprduce中通常是有map和reduce两个阶段,在不做设置的情况下,计算结果会以part-000*输出成多个文件,并且输出的文件数量和reduce数量一样,文件内容格式也不能随心所欲。这样不利于后续结果处理。
在Hadoop中,reduce支持多个输出,输出的文件名也是可控的,就是继承MultipleTextOutputFormat类,重写generateFileNameForKey方法。如果只是想做到输出结果的文件名可控,实现自己的LogNameMultipleTextOutputFormat类,设置jobconf.setOutputFormat(LogNameMultipleTextOutputFormat.class);就可以了,但是这种方式只限于使用旧版本的hadoop api.如果想采用新版本的api接口或者自定义输出内容的格式等等更多的需求,那么就要自己动手重写一些hadoop api了。
首先需要构造一个自己的MultipleOutputFormat类实现FileOutputFormat类(注意是org.apache.hadoop.mapreduce.lib.output包的FileOutputFormat)
- import java.io.DataOutputStream;
- import java.io.IOException;
- import java.util.HashMap;
- import java.util.Iterator;
- import org.apache.hadoop.conf.Configuration;
- import org.apache.hadoop.fs.FSDataOutputStream;
- import org.apache.hadoop.fs.Path;
- import org.apache.hadoop.io.Writable;
- import org.apache.hadoop.io.WritableComparable;
- import org.apache.hadoop.io.compress.CompressionCodec;
- import org.apache.hadoop.io.compress.GzipCodec;
- import org.apache.hadoop.mapreduce.OutputCommitter;
- import org.apache.hadoop.mapreduce.RecordWriter;
- import org.apache.hadoop.mapreduce.TaskAttemptContext;
- import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
- import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
- import org.apache.hadoop.util.ReflectionUtils;
- /**
- * This abstract class extends the FileOutputFormat, allowing to write the
- * output data to different output files. There are three basic use cases for
- * this class.
- * Created on 2012-07-08
- * @author zhoulongliu
- * @param <K>
- * @param <V>
- */
- public abstract class MultipleOutputFormat<K extends WritableComparable<?>, V extends Writable> extends
- FileOutputFormat<K, V> {
- //接口类,需要在调用程序中实现generateFileNameForKeyValue来获取文件名
- private MultiRecordWriter writer = null;
- public RecordWriter<K, V> getRecordWriter(TaskAttemptContext job) throws IOException, InterruptedException {
- if (writer == null) {
- writer = new MultiRecordWriter(job, getTaskOutputPath(job));
- }
- return writer;
- }
- /**
- * get task output path
- * @param conf
- * @return
- * @throws IOException
- */
- private Path getTaskOutputPath(TaskAttemptContext conf) throws IOException {
- Path workPath = null;
- OutputCommitter committer = super.getOutputCommitter(conf);
- if (committer instanceof FileOutputCommitter) {
- workPath = ((FileOutputCommitter) committer).getWorkPath();
- } else {
- Path outputPath = super.getOutputPath(conf);
- if (outputPath == null) {
- throw new IOException("Undefined job output-path");
- }
- workPath = outputPath;
- }
- return workPath;
- }
- /**
- * 通过key, value, conf来确定输出文件名(含扩展名) Generate the file output file name based
- * on the given key and the leaf file name. The default behavior is that the
- * file name does not depend on the key.
- *
- * @param key the key of the output data
- * @param name the leaf file name
- * @param conf the configure object
- * @return generated file name
- */
- protected abstract String generateFileNameForKeyValue(K key, V value, Configuration conf);
- /**
- * 实现记录写入器RecordWriter类
- * (内部类)
- * @author zhoulongliu
- *
- */
- public class MultiRecordWriter extends RecordWriter<K, V> {
- /** RecordWriter的缓存 */
- private HashMap<String, RecordWriter<K, V>> recordWriters = null;
- private TaskAttemptContext job = null;
- /** 输出目录 */
- private Path workPath = null;
- public MultiRecordWriter(TaskAttemptContext job, Path workPath) {
- super();
- this.job = job;
- this.workPath = workPath;
- recordWriters = new HashMap<String, RecordWriter<K, V>>();
- }
- @Override
- public void close(TaskAttemptContext context) throws IOException, InterruptedException {
- Iterator<RecordWriter<K, V>> values = this.recordWriters.values().iterator();
- while (values.hasNext()) {
- values.next().close(context);
- }
- this.recordWriters.clear();
- }
- @Override
- public void write(K key, V value) throws IOException, InterruptedException {
- // 得到输出文件名
- String baseName = generateFileNameForKeyValue(key, value, job.getConfiguration());
- //如果recordWriters里没有文件名,那么就建立。否则就直接写值。
- RecordWriter<K, V> rw = this.recordWriters.get(baseName);
- if (rw == null) {
- rw = getBaseRecordWriter(job, baseName);
- this.recordWriters.put(baseName, rw);
- }
- rw.write(key, value);
- }
- // ${mapred.out.dir}/_temporary/_${taskid}/${nameWithExtension}
- private RecordWriter<K, V> getBaseRecordWriter(TaskAttemptContext job, String baseName) throws IOException,
- InterruptedException {
- Configuration conf = job.getConfiguration();
- //查看是否使用解码器
- boolean isCompressed = getCompressOutput(job);
- String keyValueSeparator = ",";
- RecordWriter<K, V> recordWriter = null;
- if (isCompressed) {
- Class<? extends CompressionCodec> codecClass = getOutputCompressorClass(job, GzipCodec.class);
- CompressionCodec codec = ReflectionUtils.newInstance(codecClass, conf);
- Path file = new Path(workPath, baseName + codec.getDefaultExtension());
- FSDataOutputStream fileOut = file.getFileSystem(conf).create(file, false);
- //这里我使用的自定义的OutputFormat
- recordWriter = new LineRecordWriter<K, V>(new DataOutputStream(codec.createOutputStream(fileOut)),
- keyValueSeparator);
- } else {
- Path file = new Path(workPath, baseName);
- FSDataOutputStream fileOut = file.getFileSystem(conf).create(file, false);
- //这里我使用的自定义的OutputFormat
- recordWriter = new LineRecordWriter<K, V>(fileOut, keyValueSeparator);
- }
- return recordWriter;
- }
- }
- }
相关问答
更多-
大败而逃(打一电脑用语)[2023-02-26]
输出 -
linux怎么在程序中用自定义日志文件输出信息?[2023-05-15]
很简单, 打开文件/写入文件. bash脚本: echo "message" >> /path/to/yourlogfile c: 这个就不用说了吧, fopen("/path/to/yourlogfile", "a"); 然后调用fwrite 如果希望printf/fprintf(stderr,等标准输出/标准错误输出直接输出到日志, 用dup2: #include #include int main() { FILE *fp = fopen("log.txt", "a"); if(fp){ int no ... -
如何自定义一个hadoop mapreducer中reducer输出的时候以csv文件输出。[2023-03-10]
设定输出分隔符为“,”,并且multipleoutputtformat自定义输出文件名为*.csv就可以了 -
由于FileInputFormat和RecordReader这已经是MapReduce的一个功能。 我不能在这里给出比https://hadoopi.wordpress.com/2013/05/27/understand-recordreader-inputsplit/更好的例子,但基本上这两个类不参与核心map()和reduce()逻辑。 FileInputFormat负责读取和解析输入数据,然后将此数据传递给RecordReader , RecordReader为映射器提供单个键值对。 因此,映射器实际 ...
-
如你所说,我已经将testImport.py上传到map / reduce脚本的同一个桶中。 除非您指定,否则EMR无法从该存储桶读取。 对于java,我们在fatjar上为所有相关类创建并创建单个jar文件并执行它。 对于你的python脚本,尝试创建单个map脚本和reducer脚本并运行它。 AS you have said i have uploaded testImport.py in same bucket as that of map/reduce script. EMR can not re ...
-
您可以通过在步骤2中的“LocalAccountWritePasswordUsingObjectId”技术配置文件保存密码之后以及在步骤3中由“JwtIssuer”技术配置文件发布JWT之前,通过向“PasswordReset”用户旅程添加新的编排步骤来实现此目的。 此新的业务流程步骤通过调用“AAD-UserReadUsingObjectId”技术配置文件来读取用户,包括自定义声明: (我假设您已将自定义声明添加为
s到此技术配置文件。) 自定义自定义输出文件夹和exe名称的gradle脚本(Customize gradle script for custom output folder and exe name)[2022-11-03]
您可以在installApp下设置输出文件夹名称,在startScripts下设置applicationName。 例: installApp { destinationDir = file('build/install/'+'YourFolderName') } startScripts { applicationName = 'YourApplicationName' } You can set the output folder name under installApp and ...输出目录应该像这样指定: --output $ProjectFileDir$\js --map --compile $FileName$ Output paths设置用于跟踪项目视图树中生成的文件,它不指定编译器输出路径。 Output directory should be specified like this: --output $ProjectFileDir$\js --map --compile $FileName$ Output paths setting is used for trackin ...好的,在我的代码中找到了错误。 首先,最小的工作示例在类DatumObject没有实现equals方法: @Override public boolean equals(Object obj) { if(obj == null) return false; if(!(obj instanceof DatumObject)) return false; DatumObject other = (DatumObject) obj; re ...查看Hadoop Streaming 。 虽然它不如使用Java那么高效: 该实用程序允许您使用任何可执行文件或脚本作为映射器和/或reducer创建和运行Map / Reduce作业。 Check out Hadoop Streaming. While it won't be as efficient as using Java: The utility allows you to create and run Map/Reduce jobs with any executable or script a ...