Hadoop的环境搭建和编写一个简单的hadoop job

2019-03-28 14:22|来源: 网络

Hadoop 入门:

0hadoop的简要介绍

google之所以能够成功,一个重要的技术就是map-reduce。map-reduce是google为大规模的、分布式数据进行处理的一种编程模式。

而本文介绍的hadoop是apache的开源map-reduce实现。本文不过多的介绍map-reduce,主要精力放在hadoop的配置和编写

一个简单的haoop程序上

hadoop服务器的安装:
hadoop是一个分布式的处理框架,本文先介绍的是一个简单的伪分布式hadoop(安装在一个linux机器上)

配置环境是Ubuntu
创建一个新文件/etc/sources.list.d/cloudera.list
把下边的内容复制到新文件:
 
deb http://archive.cloudera.com/debian intrepid-cdh3 contrib
deb-src http://archive.cloudera.com/debian intrepid-cdh3 contrib
 
然后打开teminal输入下边的命令:
$ curl -s http://archive.cloudera.com/debian/archive.key | \
sudo apt-key add - sudo apt-get update

然后,安装采用伪分布式配置的 Hadoop(所有 Hadoop 守护进程在同一个主机上运行):

$ sudo apt-get install hadoop-0.20-conf-pseudo
 

确保系统已经安装了sshd(如果没有,请先安装)。
 设置不需要密码的ssh:
    
$ sudo su -
# ssh-keygen -t dsa -P '' -f ~/.ssh/id_dsa
# cat ~/.ssh/id_dsa.pub >> ~/.ssh/authorized_keys

启动hadoop:
首先对namenode进行格式化:
# hadoop-0.20 namenode -format

Hadoop 提供一些简化启动的辅助工具。这些工具分为启动(比如 start-dfs)和停止(比如 stop-dfs)两类。下面的简单脚本说明如何启动 Hadoop 节点:

# /usr/lib/hadoop-0.20/bin/start-dfs.sh
# /usr/lib/hadoop-0.20/bin/start-mapred.sh
#
 
输入命令jps可以查看守护进程是否正在运行;

编写一个hadoop程序:
作为联系,我们从网上下载一个cvs格式的数据文件:
http://earthquake.usgs.gov/research/data/pager/EXPO_CAT_2007_12.csv
cvs是以逗号进行列分割的数据文件。
使用opencvs可以很方便的处理cvs格式的数据。
opencvs可以从sourceforge上下载。
opencvs可以把一个string以逗号进行分割成一个string数组
只扩展 Hadoop 的 Mapper 类。然后我可以使用泛型来为传出键和值指定显式类。类型子句也指定了传入键和值,这对于读取文件分别是字节数和文本行数。

EarthQuakesPerDateMapper 类扩展了 Hadoop 的 Mapper 对象。它显式地将其输出键指定为一个 Text 对象,将其值指定为一个 IntWritable,这是一个 Hadoop 特定类,实质上是一个整数。还要注意,class 子句的前两个类型是 LongWritable 和 Text,分别是字节数和文本行数。

由于类定义中的类型子句,我将传入 map 方法的参数类型设置为在 context.write 子句内带有该方法的输出。如果我想指定其他内容,将会出现一个编译器问题,或 Hadoop 将输出一个错误消息,描述类型不匹配的消息。

一个mapper的实现:

public class EarthQuakesPerDateMapper extends
  Mapper<LongWritable, Text, Text, IntWritable> {
 @Override
 protected void map(LongWritable key, Text value, Context context)
   throws IOException, InterruptedException {

  if (key.get() > 0) {
   try {
    CSVParser parser = new CSVParser();
    String[] lines = parser.parseLine(value.toString());
    lines = new CSVParser().parseLine(lines[0]);
    SimpleDateFormat formatter = new SimpleDateFormat("yyyyMMddHHmm");
    Date dt = formatter.parse(lines[0]);
    formatter.applyPattern("dd-MM-yyyy");

    String dtstr = formatter.format(dt);
    context.write(new Text(dtstr), new IntWritable(1));
   } catch (java.text.ParseException e) {
    // TODO Auto-generated catch block
    //e.printStackTrace();
   }
  }
 }
}
reduce 实现如下 所示。与 Hadoop 的 Mapper 一样,Reducer 被参数化了:前两个参数是传入的键类型(Text)和值类型(IntWritable),后两个参数是输出类型:键和值,这在本例中是相同的。


public class EarthQuakesPerDateReducer extends
  Reducer<Text, IntWritable, Text, IntWritable> {
 @Override
 protected void reduce(Text key, Iterable<IntWritable> values,
   Context context) throws IOException, InterruptedException {
  int count = 0;
  for (IntWritable value : values) {
   count++;
  }
  context.write(key, new IntWritable(count));
 }
}

写好mapper和reducer之后,就可以定义一个hadoop job了。

public class EarthQuakesPerDayJob {
 public static void main(String[] args) throws Throwable {
  Job job = new Job();
  job.setJarByClass(EarthQuakesPerDayJob.class);
  FileInputFormat.addInputPath(job, new Path(args[0]));
  FileOutputFormat.setOutputPath(job, new Path(args[1]));

  job.setMapperClass(EarthQuakesPerDateMapper.class);
  job.setReducerClass(EarthQuakesPerDateReducer.class);
  job.setOutputKeyClass(Text.class);
  job.setOutputValueClass(IntWritable.class);

  System.exit(job.waitForCompletion(true) ? 0 : 1);
 }
}

在linux上执行hadoop:
$> export HADOOP_CLASSPATH=lib/opencsv-2.3.jar
$> hadoop jar hadoop.jar in out
在程序所在目录定义一个子目录in,把刚才所下载的cvs文件放到in目录下。
in就是程序数据的输入目录,out是输出目录,注意这个out文件夹是程序建立的,不可以手动建立。
运行是会看到:
11/09/05 08:47:26 INFO jvm.JvmMetrics: Initializing JVM Metrics with processName=JobTracker, sessionId=
11/09/05 08:47:26 WARN mapred.JobClient: Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same.
11/09/05 08:47:26 INFO input.FileInputFormat: Total input paths to process : 1
11/09/05 08:47:26 INFO mapred.JobClient: Running job: job_local_0001
11/09/05 08:47:26 INFO input.FileInputFormat: Total input paths to process : 1
11/09/05 08:47:26 INFO mapred.MapTask: io.sort.mb = 100
11/09/05 08:47:27 INFO mapred.MapTask: data buffer = 79691776/99614720
11/09/05 08:47:27 INFO mapred.MapTask: record buffer = 262144/327680
11/09/05 08:47:27 INFO mapred.JobClient:  map 0% reduce 0%
11/09/05 08:47:28 INFO mapred.MapTask: Starting flush of map output
11/09/05 08:47:28 INFO mapred.MapTask: Finished spill 0
11/09/05 08:47:28 INFO mapred.TaskRunner: Task:attempt_local_0001_m_000000_0 is done. And is in the process of commiting
11/09/05 08:47:28 INFO mapred.LocalJobRunner:
11/09/05 08:47:28 INFO mapred.TaskRunner: Task 'attempt_local_0001_m_000000_0' done.
11/09/05 08:47:29 INFO mapred.LocalJobRunner:
11/09/05 08:47:29 INFO mapred.Merger: Merging 1 sorted segments
11/09/05 08:47:29 INFO mapred.Merger: Down to the last merge-pass, with 1 segments left of total size: 97887 bytes
11/09/05 08:47:29 INFO mapred.LocalJobRunner:
11/09/05 08:47:29 INFO mapred.TaskRunner: Task:attempt_local_0001_r_000000_0 is done. And is in the process of commiting
11/09/05 08:47:29 INFO mapred.LocalJobRunner:
11/09/05 08:47:29 INFO mapred.TaskRunner: Task attempt_local_0001_r_000000_0 is allowed to commit now
11/09/05 08:47:29 INFO output.FileOutputCommitter: Saved output of task 'attempt_local_0001_r_000000_0' to out1
11/09/05 08:47:29 INFO mapred.LocalJobRunner: reduce > reduce
11/09/05 08:47:29 INFO mapred.TaskRunner: Task 'attempt_local_0001_r_000000_0' done.
11/09/05 08:47:29 INFO mapred.JobClient:  map 100% reduce 100%
11/09/05 08:47:29 INFO mapred.JobClient: Job complete: job_local_0001
11/09/05 08:47:29 INFO mapred.JobClient: Counters: 12
11/09/05 08:47:29 INFO mapred.JobClient:   FileSystemCounters
11/09/05 08:47:29 INFO mapred.JobClient:     FILE_BYTES_READ=11961631
11/09/05 08:47:29 INFO mapred.JobClient:     FILE_BYTES_WRITTEN=9370383
11/09/05 08:47:29 INFO mapred.JobClient:   Map-Reduce Framework
11/09/05 08:47:29 INFO mapred.JobClient:     Reduce input groups=142
11/09/05 08:47:29 INFO mapred.JobClient:     Combine output records=0
11/09/05 08:47:29 INFO mapred.JobClient:     Map input records=5639
11/09/05 08:47:29 INFO mapred.JobClient:     Reduce shuffle bytes=0
11/09/05 08:47:29 INFO mapred.JobClient:     Reduce output records=142
11/09/05 08:47:29 INFO mapred.JobClient:     Spilled Records=11274
11/09/05 08:47:29 INFO mapred.JobClient:     Map output bytes=86611
11/09/05 08:47:29 INFO mapred.JobClient:     Combine input records=0
11/09/05 08:47:29 INFO mapred.JobClient:     Map output records=5637
11/09/05 08:47:29 INFO mapred.JobClient:     Reduce input records=5637

运行完成后:
cd到out目录下,会看到一个part-r-00000文件。
输入命令:cat part-r-00000
可以看到hadoopjob的运行结果。

相关问答

更多
  • 没有内部命令 chomod 你取保是在Linux环境下开发的吗
  • map的JVM内存溢出,这个值默认是200M,有些小,你需要设置的大点,设置mapred.child.java.opts为512M试试
  • 查看Mahout的XmlInputFormat 。 令人遗憾的是,这是在Mahout而不是在核心发行版中。 连接的XML文件是否至少以相同的格式连接? 如果是这样,您将START_TAG_KEY和END_TAG_KEY设置为每个文件的根目录。 每个文件将在map显示为一个Text记录。 然后,您可以使用自己喜欢的Java XML解析器来完成工作。 Check out Mahout's XmlInputFormat. It's a shame that this is in Mahout and not in ...
  • 如果您使用的是java,则可以覆盖setup方法并在那里打开文件处理程序(并在cleanup其关闭)。 所有映射器都可以使用此句柄。 我假设你不是在写这里的所有地图输出,而是一些调试/统计。 使用此处理程序,您可以按照此示例中的显示进行读写( http://wiki.apache.org/hadoop/HadoopDfsReadWriteExample ) 如果您想阅读整个目录,请查看此示例https://sites.google.com/site/hadoopandhive/home/how-to-rea ...
  • 客户端将作业提交给Namenode。 Namenode查找客户端请求的数据并提供块信息。 JobTracker负责完成工作并为工作分配资源。 在案例2和案例3中 - 乔布斯失败了。 Client submits job to the Namenode. Namenode looks for the data requested by the client and gives the block information. JobTracker is responsible for the job to be ...
  • 我没有看到为mapreduce.framework.name设置了任何值,应该设置正确的值,以便它可以显示在YARN上? 你没有看到它,所以它是默认的 属性mapreduce.framework.name 默认: local 描述:用于执行MapReduce作业的运行时框架。 可以是local , classic或yarn 。 mapred-default.xml 您需要使用该属性设置为yarn来更新mapred-site.xml I dont see any value set for mapreduce ...
  • 你需要打开jobtracker( http://localhost:50030/jobtracker.jsp ) - >完成的工作:job_201310311657_0006 - >它会告诉你工作细节。 单击“失败的地图”尝试。 (失败地图下的数字) - >单击taskId - >在那里记录您可以看到完整的堆栈跟踪。 搞清楚了。 给出了启动地图任务的作业,因此i / p文件可能存在一些问题。(猜测) 粘贴作业日志以获取更多帮助。 You need to open jobtracker(http://loca ...
  • 您遇到问题,因为应用程序主服务器无法启动容器并运行该作业。 首先尝试重新启动系统,如果没有更改,则必须更改yarn-site.xml和mapred-site.xml中的内存分配。 使用基本内存设置。 使用以下链接http://www.alexjf.net/blog/distributed-systems/hadoop-yarn-installation-definitive-guide/#yarn-configuration_1 这里 这里 You are having issues because app ...
  • 看一下http://:50030或http://:50030 / jobhistory.jsp(在底部。 每个作业/任务/任务部分(地图,排序,减少)都有分析。 非常方便。 你可以编写自己的日志 - 我只是“忘记”所有的分析页面并将它们通过awk进行粗略统计。 Take a look at http://:50030 or http://:50030/jobhistory.jsp (at the bottom. There is a analysis for each Job/Task/Task-Part ...
  • 需要重新格式化namenode并重新启动守护进程。 这是我的mac osx,可能与睡眠有关。 Needed to reformat namenode and restart daemons. This was on my mac osx, probably related to sleeping.