当前位置: > Linux集群 > Hadoop >

Hadoop 2.0 代码:Client端代码简要分析

时间:2014-12-31 02:00来源:linux.it.net.cn 作者:IT

1.概览

  以下主要叙述Hadoop如何将用户写好的MR程序,以Job的形式提交

  主要涉及的四个java类文件:

hadoop-mapreduce-client-core下的包org.apache.hadoop.mapreduce:

       Job.java、JobSubmitter.java

hadoop-mapreduce-client-jobclient下的包org.apache.hadoop.mapred:

       YARNRunner.java、ResourceMgrDelegate.java

 

2.代码分析与执行逻辑过程

1).客户运行写好类下下面的程序,这里省去map和reduce的函数的实现:

 
Job job = new Job(new Configuration());
job.setJarByClass(MyJob.class); // Specify various job-specific parameters  job.setJobName("myjob");
    
job.setInputPath(new Path("in"));
job.setOutputPath(new Path("out"));

job.setMapperClass(MyJob.MyMapper.class);
job.setReducerClass(MyJob.MyReducer.class); // Submit the job, then poll for progress until the job is complete job.waitForCompletion(true);
 

 


2).客户提交的客户程序调用了Job中的waitForCompletion()函数

 
/** * Submit the job to the cluster and wait for it to finish.
  * @param verbose print the progress to the user
  * @return true if the job succeeded
  * @throws IOException thrown if the communication with the 
  *         <code>JobTracker</code> is lost */ public boolean waitForCompletion(boolean verbose
                                   ) throws IOException, InterruptedException,
                                            ClassNotFoundException { if (state == JobState.DEFINE) {
      submit();
    } if (verbose) {
      monitorAndPrintJob();
    } else { // get the completion poll interval from the client. int completionPollIntervalMillis = Job.getCompletionPollInterval(cluster.getConf()); while (!isComplete()) { try {
          Thread.sleep(completionPollIntervalMillis);
        } catch (InterruptedException ie) {
        }
      }
    } return isSuccessful();
  }
 

Job如果已经初始化好,立即调用submit()函数,之后调用monitorAndPrintJob()检查Job和Task的运行状况,或者自身进入循环,以一定的时间间隔轮询检查所提交的Job是是否执行完成。如果执行完成,跳出循环,调用isSuccessful()函数返回执行后的状态。

 

2).waitForCompletion()函数调用submit(),进入submit()函数

 
/** * Submit the job to the cluster and return immediately.
   * @throws IOException */ public void submit() throws IOException, InterruptedException, ClassNotFoundException {
    ensureState(JobState.DEFINE);
    setUseNewAPI();
    connect(); final JobSubmitter submitter = getJobSubmitter(cluster.getFileSystem(), cluster.getClient());
    status = ugi.doAs(new PrivilegedExceptionAction<JobStatus>() { public JobStatus run() throws IOException, InterruptedException, 
      ClassNotFoundException { return submitter.submitJobInternal(Job.this, cluster);
      }
    });
    state = JobState.RUNNING;
    LOG.info("The url to track the job: " + getTrackingURL());
   }
 

submit函数主要先调用connect()来获取需的调用协议(ClientProtocol)信息,连接信息,最后写入Cluster对象中,之后调用JobSubmitter类下的submitJobInternal()函数,获取其返回的状态设置JobStatus为Running,最后直接退出。

 

3).进入JobSubmitter类下的submitJobInternal()函数

 
 /** * Internal method for submitting jobs to the system. */ JobStatus submitJobInternal(Job job, Cluster cluster) throws ClassNotFoundException, InterruptedException, IOException { //validate the jobs output specs   checkSpecs(job);
    
    Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, 
                                                     job.getConfiguration()); //configure the command line options correctly on the submitting dfs Configuration conf = job.getConfiguration();
    InetAddress ip = InetAddress.getLocalHost(); if (ip != null) {
      submitHostAddress = ip.getHostAddress();
      submitHostName = ip.getHostName();
      conf.set(MRJobConfig.JOB_SUBMITHOST,submitHostName);
      conf.set(MRJobConfig.JOB_SUBMITHOSTADDR,submitHostAddress);
    }
    JobID jobId = submitClient.getNewJobID();
    job.setJobID(jobId);
    Path submitJobDir = new Path(jobStagingArea, jobId.toString());
    JobStatus status = null; try {
      conf.set("hadoop.http.filter.initializers", "org.apache.hadoop.yarn.server.webproxy.amfilter.AmFilterInitializer");
      conf.set(MRJobConfig.MAPREDUCE_JOB_DIR, submitJobDir.toString());
      LOG.debug("Configuring job " + jobId + " with " + submitJobDir + " as the submit dir"); // get delegation token for the dir  TokenCache.obtainTokensForNamenodes(job.getCredentials(), new Path[] { submitJobDir }, conf);
      
      populateTokenCache(conf, job.getCredentials());

      copyAndConfigureFiles(job, submitJobDir);
      Path submitJobFile = JobSubmissionFiles.getJobConfPath(submitJobDir); // Create the splits for the job LOG.debug("Creating splits at " + jtFs.makeQualified(submitJobDir)); int maps = writeSplits(job, submitJobDir);
      conf.setInt(MRJobConfig.NUM_MAPS, maps);
      LOG.info("number of splits:" + maps); // write "queue admins of the queue to which job is being submitted" // to job file. String queue = conf.get(MRJobConfig.QUEUE_NAME,
          JobConf.DEFAULT_QUEUE_NAME);
      AccessControlList acl = submitClient.getQueueAdmins(queue);
      conf.set(toFullPropertyName(queue,
          QueueACL.ADMINISTER_JOBS.getAclName()), acl.getAclString()); // removing jobtoken referrals before copying the jobconf to HDFS // as the tasks don't need this setting, actually they may break // because of it if present as the referral will point to a // different job.  TokenCache.cleanUpTokenReferral(conf); // Write job file to submit dir  writeConf(conf, submitJobFile); // // Now, actually submit the job (using the submit name) //  printTokens(jobId, job.getCredentials());
      status = submitClient.submitJob(
          jobId, submitJobDir.toString(), job.getCredentials()); if (status != null) { return status;
      } else { throw new IOException("Could not launch job");
      }
    } finally { if (status == null) {
        LOG.info("Cleaning up the staging area " + submitJobDir); if (jtFs != null && submitJobDir != null)
          jtFs.delete(submitJobDir, true);

      }
    }
  }
 

Submit主要进行如下操作

  • 检查Job的输入输出是各项参数,获取配置信息和远程主机的地址,生成JobID,确定所需工作目录(也是MRAppMaster.java所在目录),执行期间设置必要的信息
  • 拷贝所需要的Jar文件和配置文件信息到HDFS系统上的指定工作目录,以便各个节点调用使用
  • 计算并获数去输入分片(Input Split)的数目,以确定map的个数
  • 调用YARNRunner类下的submitJob()函数,提交Job,传出相应的所需参数(例如 JobID等)。
  • 等待submit()执行返回Job执行状态,最后删除相应的工作目录。

 

 

4).YARNRunner类下的submitJob()函数

 
@Override public JobStatus submitJob(JobID jobId, String jobSubmitDir, Credentials ts) throws IOException, InterruptedException { /* check if we have a hsproxy, if not, no need */ MRClientProtocol hsProxy = clientCache.getInitializedHSProxy(); if (hsProxy != null) { // JobClient will set this flag if getDelegationToken is called, if so, get // the delegation tokens for the HistoryServer also. if (conf.getBoolean(JobClient.HS_DELEGATION_TOKEN_REQUIRED, 
          DEFAULT_HS_DELEGATION_TOKEN_REQUIRED)) {
        Token hsDT = getDelegationTokenFromHS(hsProxy, new Text( 
                conf.get(JobClient.HS_DELEGATION_TOKEN_RENEWER)));
        ts.addToken(hsDT.getService(), hsDT);
      }
    } // Upload only in security mode: TODO Path applicationTokensFile = new Path(jobSubmitDir, MRJobConfig.APPLICATION_TOKENS_FILE); try {
      ts.writeTokenStorageFile(applicationTokensFile, conf);
    } catch (IOException e) { throw new YarnException(e);
    } // Construct necessary information to start the MR AM ApplicationSubmissionContext appContext = createApplicationSubmissionContext(conf, jobSubmitDir, ts); // Submit to ResourceManager ApplicationId applicationId = resMgrDelegate.submitApplication(appContext);

    ApplicationReport appMaster = resMgrDelegate
        .getApplicationReport(applicationId);
    String diagnostics = (appMaster == null ?
            "application report is null" : appMaster.getDiagnostics()); if (appMaster == null || appMaster.getYarnApplicationState() == YarnApplicationState.FAILED || appMaster.getYarnApplicationState() == YarnApplicationState.KILLED) { throw new IOException("Failed to run job : " + diagnostics);
    } return clientCache.getClient(jobId).getJobStatus(jobId);
  }
 
  • 设置必要的配置信息,初始化Application上下文信息,其中上下文信息中包括MRAppMaster所需要的资源,执行MRAppMaster的命令得等。
  • 然后调用ResourceMgrDelegate的submitApplication()方法,同时传入Application上下文信息,提交Job到ResourceManager,函数执行最后返回已生成的ApplicationId(实际生成JobID的时候ApplicationId就已经生成)。
  • 最后返回Job此时的状态,函数退出。

 

 

5).ResourceMgrDelegate类下的submitApplication()函数

 
public ApplicationId submitApplication(
      ApplicationSubmissionContext appContext) throws IOException {
    appContext.setApplicationId(applicationId);
    SubmitApplicationRequest request = recordFactory.newRecordInstance(SubmitApplicationRequest.class);
    request.setApplicationSubmissionContext(appContext);
    applicationsManager.submitApplication(request);
    LOG.info("Submitted application " + applicationId + " to ResourceManager" +
            " at " + rmAddress); return applicationId;
  }
 

 

这个函数很简单

  • 设置Application上下文中的ApplicationId,
  • 将Application上下文信息设置到要请求的request信息当中去
  • 最后用Hadoop RPC远程调用ResourcesManager端的ClientRMService类下的submitApplication()方法,提交已经设置好的包含有Application上下文信息请求信息到ResourcesManager端。
(责任编辑:IT)
------分隔线----------------------------