|
1.概览 以下主要叙述Hadoop如何将用户写好的MR程序,以Job的形式提交 主要涉及的四个java类文件:
2.代码分析与执行逻辑过程 1).客户运行写好类下下面的程序,这里省去map和reduce的函数的实现:
Job job = new Job(new Configuration());
job.setJarByClass(MyJob.class); // Specify various job-specific parameters job.setJobName("myjob");
job.setInputPath(new Path("in"));
job.setOutputPath(new Path("out"));
job.setMapperClass(MyJob.MyMapper.class);
job.setReducerClass(MyJob.MyReducer.class); // Submit the job, then poll for progress until the job is complete job.waitForCompletion(true);
/** * Submit the job to the cluster and wait for it to finish.
* @param verbose print the progress to the user
* @return true if the job succeeded
* @throws IOException thrown if the communication with the
* <code>JobTracker</code> is lost */ public boolean waitForCompletion(boolean verbose
) throws IOException, InterruptedException,
ClassNotFoundException { if (state == JobState.DEFINE) {
submit();
} if (verbose) {
monitorAndPrintJob();
} else { // get the completion poll interval from the client. int completionPollIntervalMillis = Job.getCompletionPollInterval(cluster.getConf()); while (!isComplete()) { try {
Thread.sleep(completionPollIntervalMillis);
} catch (InterruptedException ie) {
}
}
} return isSuccessful();
}
Job如果已经初始化好,立即调用submit()函数,之后调用monitorAndPrintJob()检查Job和Task的运行状况,或者自身进入循环,以一定的时间间隔轮询检查所提交的Job是是否执行完成。如果执行完成,跳出循环,调用isSuccessful()函数返回执行后的状态。
2).waitForCompletion()函数调用submit(),进入submit()函数
/** * Submit the job to the cluster and return immediately.
* @throws IOException */ public void submit() throws IOException, InterruptedException, ClassNotFoundException {
ensureState(JobState.DEFINE);
setUseNewAPI();
connect(); final JobSubmitter submitter = getJobSubmitter(cluster.getFileSystem(), cluster.getClient());
status = ugi.doAs(new PrivilegedExceptionAction<JobStatus>() { public JobStatus run() throws IOException, InterruptedException,
ClassNotFoundException { return submitter.submitJobInternal(Job.this, cluster);
}
});
state = JobState.RUNNING;
LOG.info("The url to track the job: " + getTrackingURL());
}
submit函数主要先调用connect()来获取需的调用协议(ClientProtocol)信息,连接信息,最后写入Cluster对象中,之后调用JobSubmitter类下的submitJobInternal()函数,获取其返回的状态设置JobStatus为Running,最后直接退出。
3).进入JobSubmitter类下的submitJobInternal()函数
/** * Internal method for submitting jobs to the system. */ JobStatus submitJobInternal(Job job, Cluster cluster) throws ClassNotFoundException, InterruptedException, IOException { //validate the jobs output specs checkSpecs(job);
Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster,
job.getConfiguration()); //configure the command line options correctly on the submitting dfs Configuration conf = job.getConfiguration();
InetAddress ip = InetAddress.getLocalHost(); if (ip != null) {
submitHostAddress = ip.getHostAddress();
submitHostName = ip.getHostName();
conf.set(MRJobConfig.JOB_SUBMITHOST,submitHostName);
conf.set(MRJobConfig.JOB_SUBMITHOSTADDR,submitHostAddress);
}
JobID jobId = submitClient.getNewJobID();
job.setJobID(jobId);
Path submitJobDir = new Path(jobStagingArea, jobId.toString());
JobStatus status = null; try {
conf.set("hadoop.http.filter.initializers", "org.apache.hadoop.yarn.server.webproxy.amfilter.AmFilterInitializer");
conf.set(MRJobConfig.MAPREDUCE_JOB_DIR, submitJobDir.toString());
LOG.debug("Configuring job " + jobId + " with " + submitJobDir + " as the submit dir"); // get delegation token for the dir TokenCache.obtainTokensForNamenodes(job.getCredentials(), new Path[] { submitJobDir }, conf);
populateTokenCache(conf, job.getCredentials());
copyAndConfigureFiles(job, submitJobDir);
Path submitJobFile = JobSubmissionFiles.getJobConfPath(submitJobDir); // Create the splits for the job LOG.debug("Creating splits at " + jtFs.makeQualified(submitJobDir)); int maps = writeSplits(job, submitJobDir);
conf.setInt(MRJobConfig.NUM_MAPS, maps);
LOG.info("number of splits:" + maps); // write "queue admins of the queue to which job is being submitted" // to job file. String queue = conf.get(MRJobConfig.QUEUE_NAME,
JobConf.DEFAULT_QUEUE_NAME);
AccessControlList acl = submitClient.getQueueAdmins(queue);
conf.set(toFullPropertyName(queue,
QueueACL.ADMINISTER_JOBS.getAclName()), acl.getAclString()); // removing jobtoken referrals before copying the jobconf to HDFS // as the tasks don't need this setting, actually they may break // because of it if present as the referral will point to a // different job. TokenCache.cleanUpTokenReferral(conf); // Write job file to submit dir writeConf(conf, submitJobFile); // // Now, actually submit the job (using the submit name) // printTokens(jobId, job.getCredentials());
status = submitClient.submitJob(
jobId, submitJobDir.toString(), job.getCredentials()); if (status != null) { return status;
} else { throw new IOException("Could not launch job");
}
} finally { if (status == null) {
LOG.info("Cleaning up the staging area " + submitJobDir); if (jtFs != null && submitJobDir != null)
jtFs.delete(submitJobDir, true);
}
}
}
Submit主要进行如下操作
4).YARNRunner类下的submitJob()函数
@Override public JobStatus submitJob(JobID jobId, String jobSubmitDir, Credentials ts) throws IOException, InterruptedException { /* check if we have a hsproxy, if not, no need */ MRClientProtocol hsProxy = clientCache.getInitializedHSProxy(); if (hsProxy != null) { // JobClient will set this flag if getDelegationToken is called, if so, get // the delegation tokens for the HistoryServer also. if (conf.getBoolean(JobClient.HS_DELEGATION_TOKEN_REQUIRED,
DEFAULT_HS_DELEGATION_TOKEN_REQUIRED)) {
Token hsDT = getDelegationTokenFromHS(hsProxy, new Text(
conf.get(JobClient.HS_DELEGATION_TOKEN_RENEWER)));
ts.addToken(hsDT.getService(), hsDT);
}
} // Upload only in security mode: TODO Path applicationTokensFile = new Path(jobSubmitDir, MRJobConfig.APPLICATION_TOKENS_FILE); try {
ts.writeTokenStorageFile(applicationTokensFile, conf);
} catch (IOException e) { throw new YarnException(e);
} // Construct necessary information to start the MR AM ApplicationSubmissionContext appContext = createApplicationSubmissionContext(conf, jobSubmitDir, ts); // Submit to ResourceManager ApplicationId applicationId = resMgrDelegate.submitApplication(appContext);
ApplicationReport appMaster = resMgrDelegate
.getApplicationReport(applicationId);
String diagnostics = (appMaster == null ?
"application report is null" : appMaster.getDiagnostics()); if (appMaster == null || appMaster.getYarnApplicationState() == YarnApplicationState.FAILED || appMaster.getYarnApplicationState() == YarnApplicationState.KILLED) { throw new IOException("Failed to run job : " + diagnostics);
} return clientCache.getClient(jobId).getJobStatus(jobId);
}
5).ResourceMgrDelegate类下的submitApplication()函数
public ApplicationId submitApplication(
ApplicationSubmissionContext appContext) throws IOException {
appContext.setApplicationId(applicationId);
SubmitApplicationRequest request = recordFactory.newRecordInstance(SubmitApplicationRequest.class);
request.setApplicationSubmissionContext(appContext);
applicationsManager.submitApplication(request);
LOG.info("Submitted application " + applicationId + " to ResourceManager" +
" at " + rmAddress); return applicationId;
}
这个函数很简单
|
