知识点
相关文章
更多最近更新
更多Hadoop源码解析-作业执行流程
2019-03-28 14:18|来源: 网络
执行一个作业有很多方法,这看两种方法:
1. JobClient.runJob(conf); //JobConf
2. job.waitForCompletion(true);//Job
runJob接受一个作业配置对象JobConf,然后初始化一个JobClient,作业的提交最终有依靠该对象。
public static RunningJob runJob(JobConf job) throws IOException {
JobClient jc = new JobClient(job);
RunningJob rj = jc.submitJob(job);
try {
if (!jc.monitorAndPrintJob(job, rj)) {
throw new IOException("Job failed!");
}
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
}
return rj;
}
submitJob函数直接通过内部接口submitJobInternal进行作业的提交。第二种方法一样会调用该方法。
public RunningJob submitJob(JobConf job) throws FileNotFoundException,
IOException {
try {
return submitJobInternal(job);
} catch (InterruptedException ie) {
throw new IOException("interrupted", ie);
} catch (ClassNotFoundException cnfe) {
throw new IOException("class not found", cnfe);
}
}
waitForCompletion函数的参数表示是否一直打印作业运行的log信息,默认都是开启的。
public boolean waitForCompletion(boolean verbose
) throws IOException, InterruptedException,
ClassNotFoundException {
if (state == JobState.DEFINE) {
submit();
}
if (verbose) {
jobClient.monitorAndPrintJob(conf, info);
} else {
info.waitForCompletion();
}
return isSuccessful();
}
可以开到两种方法都会调用monitorAndPrintJob,该函数定时参看Hadoop的状态并输出log信息。
/**
* Monitor a job and print status in real-time as progress is made and tasks
* fail.
* @param conf the job's configuration
* @param job the job to track
* @return true if the job succeeded
* @throws IOException if communication to the JobTracker fails
*/
public boolean monitorAndPrintJob(JobConf conf, RunningJob job) throws IOException, InterruptedException;
submitJobInternal函数是向系统提交作业,主要步骤包括配置运行环境、验证输出、将文件分片、分发文件,最后才做实际上的作业提交操作。
/**
* Internal method for submitting jobs to the system.
* @param job the configuration to submit
* @return a proxy object for the running job
* @throws FileNotFoundException
* @throws ClassNotFoundException
* @throws InterruptedException
* @throws IOException
*/
public
RunningJob submitJobInternal(JobConf job
) throws FileNotFoundException,
ClassNotFoundException,
InterruptedException,
IOException {
/*
* configure the command line options correctly on the submitting dfs
*/
JobID jobId = jobSubmitClient.getNewJobId();
Path submitJobDir = new Path(getSystemDir(), jobId.toString());
Path submitJarFile = new Path(submitJobDir, "job.jar");
Path submitSplitFile = new Path(submitJobDir, "job.split");
configureCommandLineOptions(job, submitJobDir, submitJarFile);
Path submitJobFile = new Path(submitJobDir, "job.xml");
int reduces = job.getNumReduceTasks();
JobContext context = new JobContext(job, jobId);
// Check the output specification
if (reduces == 0 ? job.getUseNewMapper() : job.getUseNewReducer()) {
org.apache.hadoop.mapreduce.OutputFormat<?,?> output =
ReflectionUtils.newInstance(context.getOutputFormatClass(), job);
output.checkOutputSpecs(context);
} else {
job.getOutputFormat().checkOutputSpecs(fs, job);
}
// Create the splits for the job
LOG.debug("Creating splits at " + fs.makeQualified(submitSplitFile));
int maps;
if (job.getUseNewMapper()) {
maps = writeNewSplits(context, submitSplitFile);
} else {
maps = writeOldSplits(job, submitSplitFile);
}
job.set("mapred.job.split.file", submitSplitFile.toString());
job.setNumMapTasks(maps);
// Write job file to JobTracker's fs
FSDataOutputStream out =
FileSystem.create(fs, submitJobFile,
new FsPermission(JOB_FILE_PERMISSION));
try {
job.writeXml(out);
} finally {
out.close();
}
//
// Now, actually submit the job (using the submit name)
//
JobStatus status = jobSubmitClient.submitJob(jobId);
if (status != null) {
return new NetworkedJob(status);
} else {
throw new IOException("Could not launch job");
}
}
jobSubmitClient对象的类型是JobSubmissionProtocol,它是一个接口,用以JobClient和JobTracker之间进行通信。JobClient可以利用类提供的方法来提交一个作业,也可以获取当前系统的信息。LocalJobRunner和JobTracker提供了实现。jobSubmitClient在JobClient初始化函数init中赋值。如果当前是local模式,用LocalJobRunner。否则为集群模式,创建一个RPC代理。详细的暂不讨论。
/**
* Connect to the default {@link JobTracker}.
* @param conf the job configuration.
* @throws IOException
*/
public void init(JobConf conf) throws IOException {
String tracker = conf.get("mapred.job.tracker", "local");
if ("local".equals(tracker)) {
this.jobSubmitClient = new LocalJobRunner(conf);
} else {
this.jobSubmitClient = createRPCProxy(JobTracker.getAddress(conf), conf);
}
}
Local方式调用LocalJobRunner的
public JobStatus submitJob(JobID jobid) throws IOException {
return new Job(jobid, this.conf).status;
}
否则调用JobTracker的
/**
* JobTracker.submitJob() kicks off a new job.
*
* Create a 'JobInProgress' object, which contains both JobProfile
* and JobStatus. Those two sub-objects are sometimes shipped outside
* of the JobTracker. But JobInProgress adds info that's useful for
* the JobTracker alone.
*/
public synchronized JobStatus submitJob(JobID jobId) throws IOException {
if(jobs.containsKey(jobId)) {
//job already running, don't start twice
return jobs.get(jobId).getStatus();
}
JobInProgress job = new JobInProgress(jobId, this, this.conf);
String queue = job.getProfile().getQueueName();
if(!(queueManager.getQueues().contains(queue))) {
new CleanupQueue().addToQueue(conf,getSystemDirectoryForJob(jobId));
throw new IOException("Queue \"" + queue + "\" does not exist");
}
// check for access
try {
checkAccess(job, QueueManager.QueueOperation.SUBMIT_JOB);
} catch (IOException ioe) {
LOG.warn("Access denied for user " + job.getJobConf().getUser()
+ ". Ignoring job " + jobId, ioe);
new CleanupQueue().addToQueue(conf, getSystemDirectoryForJob(jobId));
throw ioe;
}
// Check the job if it cannot run in the cluster because of invalid memory
// requirements.
try {
checkMemoryRequirements(job);
} catch (IOException ioe) {
new CleanupQueue().addToQueue(conf, getSystemDirectoryForJob(jobId));
throw ioe;
}
return addJob(jobId, job);
}
现在只是考虑Local方式下作业的执行。作业运行的核心是Job对象,该对象继承自Thread。Job是LocalJobRunner的一个私有类。LocalJobRunner实现MapReduce的本地模式。
public Job(JobID jobid, JobConf conf) throws IOException {
this.file = new Path(getSystemDir(), jobid + "/job.xml");
this.id = jobid;
this.mapoutputFile = new MapOutputFile(jobid);
this.mapoutputFile.setConf(conf);
this.localFile = new JobConf(conf).getLocalPath(jobDir+id+".xml");
this.localFs = FileSystem.getLocal(conf);
fs.copyToLocalFile(file, localFile);
this.job = new JobConf(localFile);
profile = new JobProfile(job.getUser(), id, file.toString(),
"http://localhost:8080/", job.getJobName());
status = new JobStatus(id, 0.0f, 0.0f, JobStatus.RUNNING);
jobs.put(id, this);
this.start();
}
创建Job后线程开始执行,执行时先将作业分为任务MapTask做Map操作,然后得到ReduceTask做Reduce操作。详细见Job的run函数。
相关问答
更多-
Hadoop源码分析如何下手?[2022-03-09]
在Eclipse中新建一个java项目,将src下的文件复制到项目的src下,然后导入lib下的jar文件,即可查看源码了。网上有教程 -
把你编译后的hadoop源码丢到原来的hadoop集群环境中去 即覆盖hadoop安装目录下的原hadoop-core-xxx.jar 同样的所有节点都需要更新 然后重启集群
-
具体参考Eclipse查看hadoop源代码出现Source not found,是因为没有添加.zip在我们hadoop编程中,经常遇到像看看hadoop的某个类中函数的功能。但是我们会遇到一种情况就是Source not found。遇到这个问题,该如何解决。因为我们已经引入了包,为什么会找不到。如果不了解怎么引入的可以参考:hadoop开发方式总结及操作指导http://www.aboutyun.com/thread-6950-1-1.html看到上面现象,是因为我们每天添加.zip。该如何添加zip ...
-
struts2的执行流程[2022-04-04]
1、客户端初始化一个指向Servlet容器的请求 2、请求经过系列的过滤器,FilterDispatcher被调用 3、ActionMapper决定需要调用某个Action,FilterDispatcher把请求的处理交给ActionProxy 4、ActionProxy通过ConfigurationManager询问框架的配置文件找到需要调用的Action类 5、ActionProxy创建一个ActionInvocation实例 6、ActionInvocation调用、回调Action的execute方 ... -
在Hadoop 2.0中,您可以通过多种方式查看作业 1)从ResourceManager UI ResourceMnagerhostname:8088 / cluster查看作业 2)查看HUE的工作 - HUEServerHostname.com:8888/jobbrowser/ 3)从命令行(一旦作业完成) 用法:纱线原木-applicationId [OPTIONS] 常规选项包括:-appOwner AppOwner(如果未指定,则假定为当前用户)-containerId ContainerId( ...
-
我无法在独立模式下配置hadoop上执行map-reduce作业(I cannot execute a map-reduce job on hadoop configured in standalone mode)[2022-09-17]
它发生的默认选项取代了我的本地配置(我仍然不明白为什么)。 export HADOOP_CLIENT_OPTS="-Xmx1024m" 解决了这个问题。 It happened that the default options were superseding my local configuration (I still don't understand why). export HADOOP_CLIENT_OPTS="-Xmx1024m" solved the problem. -
似乎conf.set("mapred.job.tracker", "server:9001"); 解决了这个问题。 谢谢你的帮助。 It seems conf.set("mapred.job.tracker", "server:9001"); fixed the issue. Thanks for your help.
-
一种选择是使用与hadoop集群相同的配置将hadoop的二进制文件安装到web服务服务器中。 您将需要能够与群集通信。 你不必在那里吃任何hadoop deamon午餐。 至少配置HADOOP_HOME,HADOOP_CONFIG_DIR,HADOOP_LIBS并正确设置PATH环境变量。 您需要二进制文件,因为您将使用它们来提交作业和配置,以告诉hadoop客户端群集(namenode和resourcemanager)在哪里。 然后在Python中,您可以使用子进程执行hadoop jar命令: htt ...
-
这个话题是4年前的话题。 JobTracker和TaskTracker(MRv1)已转换为Application Master和ResourceManager(MRv2:YARN)。 您可以在ResourceManager web ui中的http://localhost:8088/查看您的作业状态。 输出(来自Reporter或Println)通常将打印在HistoryServer的任务日志中,该日志可通过以下URL获得: http://localhost:19888/jobhistory/job/job ...
-
Hadoop java mapper作业在slave节点上执行,目录问题(Hadoop java mapper job executing on slave node, directory issue)[2023-10-23]
我现在正在使用的解决方法包括将所有相关文件复制到jobcache工作目录。 然后,如有必要,您可以将结果复制回用户目录。 不幸的是,这并没有完全回答这个问题,但希望为其他人提供有用的解决方法。 干杯, 里斯 A workaround method I'm using right now that works consists of copying all the relevant files over to the jobcache working directory. Then you can copy ...