Hadoop源码解析-作业执行流程

2019-03-28 14:18|来源: 网络

执行一个作业有很多方法,这看两种方法:
1. JobClient.runJob(conf); //JobConf
2. job.waitForCompletion(true);//Job

runJob接受一个作业配置对象JobConf,然后初始化一个JobClient,作业的提交最终有依靠该对象。
  public static RunningJob runJob(JobConf job) throws IOException {
    JobClient jc = new JobClient(job);
    RunningJob rj = jc.submitJob(job);
    try {
      if (!jc.monitorAndPrintJob(job, rj)) {
        throw new IOException("Job failed!");
      }
    } catch (InterruptedException ie) {
      Thread.currentThread().interrupt();
    }
    return rj;
  }

submitJob函数直接通过内部接口submitJobInternal进行作业的提交。第二种方法一样会调用该方法。
  public RunningJob submitJob(JobConf job) throws FileNotFoundException,
                                                  IOException {
    try {
      return submitJobInternal(job);
    } catch (InterruptedException ie) {
      throw new IOException("interrupted", ie);
    } catch (ClassNotFoundException cnfe) {
      throw new IOException("class not found", cnfe);
    }
  }


waitForCompletion函数的参数表示是否一直打印作业运行的log信息,默认都是开启的。

  public boolean waitForCompletion(boolean verbose
                                   ) throws IOException, InterruptedException,
                                            ClassNotFoundException {
    if (state == JobState.DEFINE) {
      submit();
    }
    if (verbose) {
      jobClient.monitorAndPrintJob(conf, info);
    } else {
      info.waitForCompletion();
    }
    return isSuccessful();
  }

可以开到两种方法都会调用monitorAndPrintJob,该函数定时参看Hadoop的状态并输出log信息。
  /**
   * Monitor a job and print status in real-time as progress is made and tasks
   * fail.
   * @param conf the job's configuration
   * @param job the job to track
   * @return true if the job succeeded
   * @throws IOException if communication to the JobTracker fails
   */
  public boolean monitorAndPrintJob(JobConf conf, RunningJob job) throws IOException, InterruptedException;

 
submitJobInternal函数是向系统提交作业,主要步骤包括配置运行环境、验证输出、将文件分片、分发文件,最后才做实际上的作业提交操作。
 /**
   * Internal method for submitting jobs to the system.
   * @param job the configuration to submit
   * @return a proxy object for the running job
   * @throws FileNotFoundException
   * @throws ClassNotFoundException
   * @throws InterruptedException
   * @throws IOException
   */
  public
  RunningJob submitJobInternal(JobConf job
                               ) throws FileNotFoundException,
                                        ClassNotFoundException,
                                        InterruptedException,
                                        IOException {
    /*
     * configure the command line options correctly on the submitting dfs
     */
   
    JobID jobId = jobSubmitClient.getNewJobId();
    Path submitJobDir = new Path(getSystemDir(), jobId.toString());
    Path submitJarFile = new Path(submitJobDir, "job.jar");
    Path submitSplitFile = new Path(submitJobDir, "job.split");
    configureCommandLineOptions(job, submitJobDir, submitJarFile);
    Path submitJobFile = new Path(submitJobDir, "job.xml");
    int reduces = job.getNumReduceTasks();
    JobContext context = new JobContext(job, jobId);
   
    // Check the output specification
    if (reduces == 0 ? job.getUseNewMapper() : job.getUseNewReducer()) {
      org.apache.hadoop.mapreduce.OutputFormat<?,?> output =
        ReflectionUtils.newInstance(context.getOutputFormatClass(), job);
      output.checkOutputSpecs(context);
    } else {
      job.getOutputFormat().checkOutputSpecs(fs, job);
    }

    // Create the splits for the job
    LOG.debug("Creating splits at " + fs.makeQualified(submitSplitFile));
    int maps;
    if (job.getUseNewMapper()) {
      maps = writeNewSplits(context, submitSplitFile);
    } else {
      maps = writeOldSplits(job, submitSplitFile);
    }
    job.set("mapred.job.split.file", submitSplitFile.toString());
    job.setNumMapTasks(maps);
       
    // Write job file to JobTracker's fs       
    FSDataOutputStream out =
      FileSystem.create(fs, submitJobFile,
                        new FsPermission(JOB_FILE_PERMISSION));

    try {
      job.writeXml(out);
    } finally {
      out.close();
    }

    //
    // Now, actually submit the job (using the submit name)
    //
    JobStatus status = jobSubmitClient.submitJob(jobId);
    if (status != null) {
      return new NetworkedJob(status);
    } else {
      throw new IOException("Could not launch job");
    }
  }


jobSubmitClient对象的类型是JobSubmissionProtocol,它是一个接口,用以JobClient和JobTracker之间进行通信。JobClient可以利用类提供的方法来提交一个作业,也可以获取当前系统的信息。LocalJobRunner和JobTracker提供了实现。jobSubmitClient在JobClient初始化函数init中赋值。如果当前是local模式,用LocalJobRunner。否则为集群模式,创建一个RPC代理。详细的暂不讨论。

/**
   * Connect to the default {@link JobTracker}.
   * @param conf the job configuration.
   * @throws IOException
   */
  public void init(JobConf conf) throws IOException {
    String tracker = conf.get("mapred.job.tracker", "local");
    if ("local".equals(tracker)) {
      this.jobSubmitClient = new LocalJobRunner(conf);
    } else {
      this.jobSubmitClient = createRPCProxy(JobTracker.getAddress(conf), conf);
    }       
  }

Local方式调用LocalJobRunner的
  public JobStatus submitJob(JobID jobid) throws IOException {
    return new Job(jobid, this.conf).status;
  }

否则调用JobTracker的

  /**
   * JobTracker.submitJob() kicks off a new job. 
   *
   * Create a 'JobInProgress' object, which contains both JobProfile
   * and JobStatus.  Those two sub-objects are sometimes shipped outside
   * of the JobTracker.  But JobInProgress adds info that's useful for
   * the JobTracker alone.
   */
  public synchronized JobStatus submitJob(JobID jobId) throws IOException {
    if(jobs.containsKey(jobId)) {
      //job already running, don't start twice
      return jobs.get(jobId).getStatus();
    }
   
    JobInProgress job = new JobInProgress(jobId, this, this.conf);
   
    String queue = job.getProfile().getQueueName();
    if(!(queueManager.getQueues().contains(queue))) {     
      new CleanupQueue().addToQueue(conf,getSystemDirectoryForJob(jobId));
      throw new IOException("Queue \"" + queue + "\" does not exist");       
    }

    // check for access
    try {
      checkAccess(job, QueueManager.QueueOperation.SUBMIT_JOB);
    } catch (IOException ioe) {
       LOG.warn("Access denied for user " + job.getJobConf().getUser()
                + ". Ignoring job " + jobId, ioe);
      new CleanupQueue().addToQueue(conf, getSystemDirectoryForJob(jobId));
      throw ioe;
    }

    // Check the job if it cannot run in the cluster because of invalid memory
    // requirements.
    try {
      checkMemoryRequirements(job);
    } catch (IOException ioe) {
      new CleanupQueue().addToQueue(conf, getSystemDirectoryForJob(jobId));
      throw ioe;
    }

   return addJob(jobId, job);
  }

现在只是考虑Local方式下作业的执行。作业运行的核心是Job对象,该对象继承自Thread。Job是LocalJobRunner的一个私有类。LocalJobRunner实现MapReduce的本地模式。

    public Job(JobID jobid, JobConf conf) throws IOException {
      this.file = new Path(getSystemDir(), jobid + "/job.xml");
      this.id = jobid;
      this.mapoutputFile = new MapOutputFile(jobid);
      this.mapoutputFile.setConf(conf);

      this.localFile = new JobConf(conf).getLocalPath(jobDir+id+".xml");
      this.localFs = FileSystem.getLocal(conf);

      fs.copyToLocalFile(file, localFile);
      this.job = new JobConf(localFile);
      profile = new JobProfile(job.getUser(), id, file.toString(),
                               "http://localhost:8080/", job.getJobName());
      status = new JobStatus(id, 0.0f, 0.0f, JobStatus.RUNNING);

      jobs.put(id, this);

      this.start();
    }

创建Job后线程开始执行,执行时先将作业分为任务MapTask做Map操作,然后得到ReduceTask做Reduce操作。详细见Job的run函数。

相关问答

更多