Hadoop 里执行 MapReduce 任务的几种常见方式

2019-03-28 13:05|来源: 网络

说明:

测试文件:

echo -e "aa\tbb \tcc\nbb\tcc\tdd" > 3.txt

Hadoop fs -put 3.txt /tmp/3.txt

全文的例子均以该文件做测试用例,统计单词出现的次数(WordCount)。

1、原生态的方式:java 源码编译打包成jar包后,由 hadoop 脚本调度执行,举例:

import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

public class WordCount {

 public static class TokenizerMapper extends
   Mapper<Object, Text, Text, IntWritable> {
  /** 
        * LongWritable, IntWritable, Text 均是 Hadoop 中实现的用于封装 Java 数据类型的类,
        * 这些类实现了WritableComparable接口, 
        * 都能够被串行化从而便于在分布式环境中进行数据交换,
        * 你可以将它们分别视为long,int,String 的替代品。 
        */ 
  // IntWritable one 相当于 java 原生类型 int 1
  private final static IntWritable one = new IntWritable(1);
  private Text word = new Text();

  public void map(Object key, Text value, Context context)
    throws IOException, InterruptedException {
   // 每行记录都会调用 map 方法处理,此处是每行都被分词
   StringTokenizer itr = new StringTokenizer(value.toString());
   while (itr.hasMoreTokens()) {
    word.set(itr.nextToken());
    // 输出每个词及其出现的次数 1,类似 <word1,1><word2,1><word1,1>
    context.write(word, one);
   }
  }
 }

 public static class IntSumReducer extends
   Reducer<Text, IntWritable, Text, IntWritable> {
  private IntWritable result = new IntWritable();

  public void reduce(Text key, Iterable<IntWritable> values,
    Context context) throws IOException, InterruptedException {
   // key 相同的键值对会被分发到同一个 reduce中处理
   // 例如 <word1,<1,1>>在 reduce1 中处理,而<word2,<1>> 会在 reduce2 中处理
   int sum = 0;
   // 相同的key(单词)的出现次数会被 sum 累加
   for (IntWritable val : values) {
    sum += val.get();
   }
   result.set(sum);
   // 1个 reduce 处理完1 个键值对后,会输出其 key(单词)对应的结果(出现次数)
   context.write(key, result);
  }
 }

 public static void main(String[] args) throws Exception {
  Configuration conf = new Configuration();
  // 多队列hadoop集群中,设置使用的队列
  conf.set("mapred.job.queue.name", "regular");
  // 之所以此处不直接用 argv[1] 这样的,是为了排除掉运行时的集群属性参数,例如队列参数,
  // 得到用户输入的纯参数,如路径信息等
  String[] otherArgs = new GenericOptionsParser(conf, args)
    .getRemainingArgs();
  if (otherArgs.length != 2) {
   System.err.println("Usage: wordcount <in> <out>");
   System.exit(2);
  }
  Job job = new Job(conf, "word count");
  job.setJarByClass(WordCount.class);
  // map、reduce 输入输出类
  job.setMapperClass(TokenizerMapper.class);
  job.setCombinerClass(IntSumReducer.class);
  job.setReducerClass(IntSumReducer.class);
  job.setOutputKeyClass(Text.class);
  job.setOutputValueClass(IntWritable.class);
  // 输入输出路径
  FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
  FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
  // 多子job的类中,可以保证各个子job串行执行
  System.exit(job.waitForCompletion(true) ? 0 : 1);
 }
}

执行:

bin/hadoop jar /tmp/wordcount.jar WordCount /tmp/3.txt /tmp/5

结果:

hadoop fs -cat /tmp/5/*
aa      1
bb      2
cc      2
dd      1

参考资料:

Hadoop - Map/Reduce 通过WordCount例子的变化来了解新版hadoop接口的变化 http://www.linuxidc.com/Linux/2013-04/82868.htm

Hadoop示例程序WordCount运行及详解 http://www.linuxidc.com/Linux/2013-04/82871.htm

官方的 wordcount v1.0 例子 http://hadoop.apache.org/docs/r1.1.1/mapred_tutorial.html#Example%3A+WordCount+v1.0

相关问答

更多
  • 1.jpg 集群上执行使用Hadoop jar命令具体参考下面hadoop集群,如何运行Java jar包---如何运行mapreduce程序Eclipse运行时这样的:run as application或则run as hadoop详细参考下面hadoop开发方式总结及操作指导搭建Eclipse下运行Mapreduce代码的环境
  • 你参考下这个吧eclipse中开发Hadoop2.x的Map/Reduce项目汇总
  • 本教程提到: 下载Hadoop-core-1.2.1.jar,用于编译和执行MapReduce程序。 访问以下链接http://mvnrepository.com/artifact/org.apache.hadoop/hadoop-core/1.2.1下载jar。 所以在这里你可以找到不同版本的所有罐子 This tutorial mentions : Download Hadoop-core-1.2.1.jar, which is used to compile and execute the MapRe ...
  • 我刚从Praveen Sripati收到了一封关于hadoop引用的电子邮件,我将它粘贴在这里: 在复制阶段,数据是否同时存在于map和reduce任务中? 地图输出何时清除? 以下内容来自Hadoop - The Definitive Guide 一旦第一个reducer检索到它们,主机就不会从磁盘中删除映射输出,因为reducer可能随后失败。 相反,他们会等到他们被告知由作业跟踪器(或应用程序主人)删除它们,这是在作业完成之后。 这非常重要,地图输出仍然在磁盘上! 在我的情况下有点不幸。 5) 然后减 ...
  • 如果一切正确,在main()方法中你写了类似的东西: FileInputFormat.addInputPath() FileOutputFormat.setOutputPath() 告诉Hadoop目录在哪里找到你的两个输入文件以及在哪里写出计算结果。 当作业开始时,Hadoop开始读取它在输入目录中找到的文件,并调用mapper的map()方法将文件的每一行(当时一个)作为参数传递给它。 在计算结束时,当reducer发出其数据时,Hadoop将把结果写入指定输出目录中的一个(或多个)文件中。 因此 ...
  • 你的jar路径和你的Class名字在你的工作命令中是错误的 您写了/home/ciro/Scrivania/BDABI/wordcount/WordCountj.jar而不是/home/ciro/Scrivania/BDABI/wordcount/org/myorg/WordCountj.jar和wordcount而不是WordCount Your jar path and your Class name are wrong in your job command You wrote /home/ciro/ ...
  • 我想这可以帮到你: http : //blogs.msdn.com/b/avkashchauhan/archive/2012/03/29/how-to-chain-multiple-mapreduce-jobs-in-hadoop.aspx 您还可以查看ChainMapper( http://hadoop.apache.org/docs/current/api/org/apache/hadoop/mapred/lib/ChainMapper.html ),具体取决于您要实现的目标。 但是,这仅适用于一个re ...
  • 这是一种明智的做法吗? 除了维持不同工作逻辑的耦合之外,它没有任何本质上的错误。 我相信它会为你节省一些磁盘I / O,如果你的磁盘是你的进程的瓶颈,这可能是一个胜利(在小型集群上可能就是这种情况)。 还有更好的选择吗? 编写一个有点框架的Mapper和Reducer可能是谨慎的,它们都接受作为配置参数的引用,它们应该推迟实际映射和减少的类。 这可以解决前面提到的代码耦合(也许你已经想过这个)。 它有一些可怕的缺点吗? 我唯一能想到的是,如果其中一个任务的映射逻辑未能及时完成其工作,则调度程序可以启动另一个 ...
  • 什么是SMA(简单移动平均线)? 一种简单的算术移动平均线,通过将保证金的收盘价加上若干时间段然后将该总和除以时间段数来计算。 例如,在上述例子中,收盘价格为:37.14(2008-02-29),38.32(2008-03-01),38.00(2008-03-02),38.71(2008-03-03),38.37( 2008-03-04),36.60(2008-03-05)。 所以2008-03-02的3天SMA是(37.14 + 38.32 + 38.00)/ 3 = 37.82 2008-02-29没有 ...