1.概览 以下主要叙述Hadoop如何将用户写好的MR程序,以Job的形式提交 主要涉及的四个java类文件:
2.代码分析与执行逻辑过程 1).客户运行写好类下下面的程序,这里省去map和reduce的函数的实现: Job job = new Job(new Configuration()); job.setJarByClass(MyJob.class); // Specify various job-specific parameters job.setJobName("myjob"); job.setInputPath(new Path("in")); job.setOutputPath(new Path("out")); job.setMapperClass(MyJob.MyMapper.class); job.setReducerClass(MyJob.MyReducer.class); // Submit the job, then poll for progress until the job is complete job.waitForCompletion(true);
/** * Submit the job to the cluster and wait for it to finish. * @param verbose print the progress to the user * @return true if the job succeeded * @throws IOException thrown if the communication with the * <code>JobTracker</code> is lost */ public boolean waitForCompletion(boolean verbose ) throws IOException, InterruptedException, ClassNotFoundException { if (state == JobState.DEFINE) { submit(); } if (verbose) { monitorAndPrintJob(); } else { // get the completion poll interval from the client. int completionPollIntervalMillis = Job.getCompletionPollInterval(cluster.getConf()); while (!isComplete()) { try { Thread.sleep(completionPollIntervalMillis); } catch (InterruptedException ie) { } } } return isSuccessful(); } Job如果已经初始化好,立即调用submit()函数,之后调用monitorAndPrintJob()检查Job和Task的运行状况,或者自身进入循环,以一定的时间间隔轮询检查所提交的Job是是否执行完成。如果执行完成,跳出循环,调用isSuccessful()函数返回执行后的状态。
2).waitForCompletion()函数调用submit(),进入submit()函数 /** * Submit the job to the cluster and return immediately. * @throws IOException */ public void submit() throws IOException, InterruptedException, ClassNotFoundException { ensureState(JobState.DEFINE); setUseNewAPI(); connect(); final JobSubmitter submitter = getJobSubmitter(cluster.getFileSystem(), cluster.getClient()); status = ugi.doAs(new PrivilegedExceptionAction<JobStatus>() { public JobStatus run() throws IOException, InterruptedException, ClassNotFoundException { return submitter.submitJobInternal(Job.this, cluster); } }); state = JobState.RUNNING; LOG.info("The url to track the job: " + getTrackingURL()); } submit函数主要先调用connect()来获取需的调用协议(ClientProtocol)信息,连接信息,最后写入Cluster对象中,之后调用JobSubmitter类下的submitJobInternal()函数,获取其返回的状态设置JobStatus为Running,最后直接退出。
3).进入JobSubmitter类下的submitJobInternal()函数 /** * Internal method for submitting jobs to the system. */ JobStatus submitJobInternal(Job job, Cluster cluster) throws ClassNotFoundException, InterruptedException, IOException { //validate the jobs output specs checkSpecs(job); Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, job.getConfiguration()); //configure the command line options correctly on the submitting dfs Configuration conf = job.getConfiguration(); InetAddress ip = InetAddress.getLocalHost(); if (ip != null) { submitHostAddress = ip.getHostAddress(); submitHostName = ip.getHostName(); conf.set(MRJobConfig.JOB_SUBMITHOST,submitHostName); conf.set(MRJobConfig.JOB_SUBMITHOSTADDR,submitHostAddress); } JobID jobId = submitClient.getNewJobID(); job.setJobID(jobId); Path submitJobDir = new Path(jobStagingArea, jobId.toString()); JobStatus status = null; try { conf.set("hadoop.http.filter.initializers", "org.apache.hadoop.yarn.server.webproxy.amfilter.AmFilterInitializer"); conf.set(MRJobConfig.MAPREDUCE_JOB_DIR, submitJobDir.toString()); LOG.debug("Configuring job " + jobId + " with " + submitJobDir + " as the submit dir"); // get delegation token for the dir TokenCache.obtainTokensForNamenodes(job.getCredentials(), new Path[] { submitJobDir }, conf); populateTokenCache(conf, job.getCredentials()); copyAndConfigureFiles(job, submitJobDir); Path submitJobFile = JobSubmissionFiles.getJobConfPath(submitJobDir); // Create the splits for the job LOG.debug("Creating splits at " + jtFs.makeQualified(submitJobDir)); int maps = writeSplits(job, submitJobDir); conf.setInt(MRJobConfig.NUM_MAPS, maps); LOG.info("number of splits:" + maps); // write "queue admins of the queue to which job is being submitted" // to job file. String queue = conf.get(MRJobConfig.QUEUE_NAME, JobConf.DEFAULT_QUEUE_NAME); AccessControlList acl = submitClient.getQueueAdmins(queue); conf.set(toFullPropertyName(queue, QueueACL.ADMINISTER_JOBS.getAclName()), acl.getAclString()); // removing jobtoken referrals before copying the jobconf to HDFS // as the tasks don't need this setting, actually they may break // because of it if present as the referral will point to a // different job. TokenCache.cleanUpTokenReferral(conf); // Write job file to submit dir writeConf(conf, submitJobFile); // // Now, actually submit the job (using the submit name) // printTokens(jobId, job.getCredentials()); status = submitClient.submitJob( jobId, submitJobDir.toString(), job.getCredentials()); if (status != null) { return status; } else { throw new IOException("Could not launch job"); } } finally { if (status == null) { LOG.info("Cleaning up the staging area " + submitJobDir); if (jtFs != null && submitJobDir != null) jtFs.delete(submitJobDir, true); } } } Submit主要进行如下操作
4).YARNRunner类下的submitJob()函数 @Override public JobStatus submitJob(JobID jobId, String jobSubmitDir, Credentials ts) throws IOException, InterruptedException { /* check if we have a hsproxy, if not, no need */ MRClientProtocol hsProxy = clientCache.getInitializedHSProxy(); if (hsProxy != null) { // JobClient will set this flag if getDelegationToken is called, if so, get // the delegation tokens for the HistoryServer also. if (conf.getBoolean(JobClient.HS_DELEGATION_TOKEN_REQUIRED, DEFAULT_HS_DELEGATION_TOKEN_REQUIRED)) { Token hsDT = getDelegationTokenFromHS(hsProxy, new Text( conf.get(JobClient.HS_DELEGATION_TOKEN_RENEWER))); ts.addToken(hsDT.getService(), hsDT); } } // Upload only in security mode: TODO Path applicationTokensFile = new Path(jobSubmitDir, MRJobConfig.APPLICATION_TOKENS_FILE); try { ts.writeTokenStorageFile(applicationTokensFile, conf); } catch (IOException e) { throw new YarnException(e); } // Construct necessary information to start the MR AM ApplicationSubmissionContext appContext = createApplicationSubmissionContext(conf, jobSubmitDir, ts); // Submit to ResourceManager ApplicationId applicationId = resMgrDelegate.submitApplication(appContext); ApplicationReport appMaster = resMgrDelegate .getApplicationReport(applicationId); String diagnostics = (appMaster == null ? "application report is null" : appMaster.getDiagnostics()); if (appMaster == null || appMaster.getYarnApplicationState() == YarnApplicationState.FAILED || appMaster.getYarnApplicationState() == YarnApplicationState.KILLED) { throw new IOException("Failed to run job : " + diagnostics); } return clientCache.getClient(jobId).getJobStatus(jobId); }
5).ResourceMgrDelegate类下的submitApplication()函数 public ApplicationId submitApplication( ApplicationSubmissionContext appContext) throws IOException { appContext.setApplicationId(applicationId); SubmitApplicationRequest request = recordFactory.newRecordInstance(SubmitApplicationRequest.class); request.setApplicationSubmissionContext(appContext); applicationsManager.submitApplication(request); LOG.info("Submitted application " + applicationId + " to ResourceManager" + " at " + rmAddress); return applicationId; }
这个函数很简单
|