当前位置: > Linux集群 > Hadoop >

Hadoop作业提交与执行源码分析

时间:2014-11-04 18:45来源:linux.it.net.cn 作者:it

Hadoop作业提交与执行源码分析

1  概述

Job执行整体流程图

2  Job创建与提交过程

2.1         Configuration

Configuration类定义了如下私有成员变量:

/*第一个是boolean型变量quietmode,用于设置加载配置的模式。通过阅读源代码就可以清楚,这个quietmode如果为true,实际上默认就为true,加载配置的模式为快速模式,其实也就是在解析配置文件的过程中,不输出日志信息的模式*/

private boolean   quietmode = true;

private ArrayList defaultResources = new ArrayList();//它是一个列表,该列表中存放的是配置文件的名称

private ArrayList<Object> resources = new ArrayList<Object>();//全部资源的配置包括URLStringPathInputStream

private Set<String> finalParameters = new HashSet<String>();//程序性的 

private boolean loadDefaults = true;//是否载入默认资源

private static final WeakHashMap<Configuration, Object> REGISTRY = new WeakHashMap<Configuration, Object>();//

private Properties properties;//个人程序所需要的所有配置 会以Properties的形式存储

private Properties overlay;

/*它也是一个Properties变量。它对应于finalResources列表,也就是解析finalResources列表中设置的配置文件,配置项设置到overlay中。这里,overlay比较关键的一点就是,如果overlay不为空属性配置,在创建一个Configuration实例的时候会检查overlay,不空就将其中的配置项加入到properties.*/

private ClassLoader classLoader;//类加载器

在这里所有客户端程序中配置的类的信息和其他运行信息,都会保存在这个类里。

2.2         JobClient.runJob() 开始运行job并分解输入数据集

一个MapReduceJob会通过JobClient类根据用户在JobConf类中定义的InputFormat实现类(通过getInputFormat()调用getClass()来得到)来将输入的数据集分解成一批小的数据集,每一个小数据集会对应创建一个MapTask来处理。EagerTaskInitializationListener 将调用 JobInProcess.initTask 根据 InputSplit来创建 map  reduce task , 同时创建 assistant task, 比如 clean-up task.

注:InputFormat接口默认被FileInputFormat类实现,TextInputFormat类继承自FileInputFormat类。FileInputFormat类里定义了一系列分解数据集的操作。

JobClient会使用缺省的TextInputFormat类调用TextInputFormat.getSplits()方法生成小数据集(通过InputFormat接口调用此方法),如果判断数据文件是isSplitable()的话,会将大的文件分解成小的FileSplit,当然只是记录文件在HDFS里的路径及偏移量和Split大小。这些信息会统一打包到jobFilejar中并存储在HDFS中,再将jobFile路径提交给JobTracker去调度和执行。

注:打包:用户使用eclipse或者ant命令进行打包。

    传输:JobClient会使用copyRemoteFiles()方法拷贝文件到HDFS

private Path copyRemoteFiles(FileSystem jtFs, Path parentDir, Path originalPath, JobConf job, short replication) throws IOException     {

    FileSystem remoteFs = null;

    remoteFs = originalPath.getFileSystem(job);

    if (compareFs(remoteFs, jtFs)) {

      return originalPath;

    }

    Path newPath = new Path(parentDir, originalPath.getName());

    FileUtil.copy(remoteFs, originalPath, jtFs, newPath, false, job);

    jtFs.setReplication(newPath, replication);

    return newPath;

}

    提交:JobTracker实际获得的是Job对应的url.JobClient中,首先由Path实例化了一个submitJarFile,它包含了Jar文件的路径信息,然后调用configureCommandLineOptions()方法,此方法中使用了job.setJar()方法获得了一个路径的字符串形式。

 

2.3          JobClient.submitJob()提交jobJobTracker

jobFile的提交过程是通过RPC模块来实现的。大致过程是,JobClient类中通过RPC实现的Proxy接口(提前创建好了RPC代理)调用JobTrackersubmitJob()方法,而JobTracker必须实现JobSubmissionProtocol接口。

注:createRPCproxy返回了一个实现JobSubmissionProtocol接口的对象叫做jobSubmitClientJobClient通过此对象调用submitJob方法(submitJob又调用了submitJobInternal方法)来提交job.

submitJobInternal方法利用RPC调用JobTracker.submitJob(),submitJob方法调用addjob(),job添加到JobInProgress中。

private synchronized JobStatus addJob(JobID jobId, JobInProgress job) {

    totalSubmissions++;

    synchronized (jobs) {

      synchronized (taskScheduler) {

        jobs.put(job.getProfile().getJobID(), job);

        for (JobInProgressListener listener : jobInProgressListeners) {

          try {

            listener.jobAdded(job);

          } catch (IOException ioe) {

            LOG.warn("Failed to add and so skipping the job : "

                + job.getJobID() + ". Exception : " + ioe);

          }

        }

      }

    }

    myInstrumentation.submitJob(job.getJobConf(), jobId);

    return job.getStatus();

  }

JobTracker则根据获得的jobFile路径创建与job有关的一系列对象(即JobInProgressTaskInProgress等)来调度并执行job

JobTracker创建job成功后会给JobClient传回一个JobStatus对象用于记录job的状态信息,如执行时间、MapReduce任务完成的比例等。JobClient会根据这个JobStatus对象创建一个NetworkedJobRunningJob对象,用于定时从JobTracker获得执行过程的统计数据来监控并打印到用户的控制台。

注:submitJob内部通过JobSubmitterSubmitJobInternal进行实质性的提交,即提交三个文件,job.jar,job.split,job.xml这三个文件位置由mapreduce系统路径mapred.system.dir属性决定,写完这三个文件之后,此方法使用RPC调用master节点的JobTracker.submitJob(job)方法。

3 Job执行过程

job统一由JobTracker来调度的,具体的Task分发给各个TaskTracker节点来执行。

3.1            JobTracker初始化JobTask队列过程

启动JobTracker后它会初始化若干个服务以及若干个内部线程用来维护job的执行过程和结果。首先,JobTracker会启动一个interTrackerServer,端口配置在Configuration中的"mapred.job.tracker"参数,缺省是绑定8012端口。它有两个用途,一是用于接收和处理TaskTrackerheartbeat等请求,即必须实现InterTrackerProtocol接口及协议。二是用于接收和处理JobClient的请求,如submitJobkillJob等,即必须实现JobSubmissionProtocol接口及协议。

其次,它会启动一个infoServer线程,运行StatusHttpServer,缺省监听50030端口。是一个web服务,用于给用户提供web界面查询job执行状况的服务。

JobTracker还会启动多个线程,ExpireLaunchingTasks线程用于停止那些未在超时时间内报告进度的TasksExpireTrackers线程用于停止那些可能已经当掉的TaskTracker,即长时间未报告的TaskTracker将不会再分配新的TaskRetireJobs线程用于清除那些已经完成很长时间还存在队列里的jobsJobInitThread线程用于初始化job,这在前面章节已经介绍。TaskCommitQueue线程用于调度Task的那些所有与FileSystem操作相关的处理,并记录Task的状态等信息。

3.1.1     JobTracker.submitJob() 收到请求

JobTracker接收到新的job请求(即submitJob()函数被调用)后,会创建一个JobInProgress对象并通过它来管理和调度任务。

JobInProgress job = new JobInProgress(jobId, this, this.conf);

JobInProgress在创建的时候会初始化一系列与任务有关的参数,如job jar的位置(会把它从HDFS复制本地的文件系统中的临时目录里),MapReduce的数据,job的优先级别,以及记录统计报告的对象等。

public JobInProgress(JobID jobid, JobTracker jobtracker, 

  JobConf default_conf, int rCount) throws IOException {

  this.restartCount = rCount;

  this.jobId = jobid;

  String url = "http://" + jobtracker.getJobTrackerMachine() + ":" + jobtracker.getInfoPort() + "/jobdetails.jsp?jobid=" + jobid;

  this.jobtracker = jobtracker;

  this.status = new JobStatus(jobid, 0.0f, 0.0f, JobStatus.PREP);

  this.startTime = System.currentTimeMillis();

  status.setStartTime(startTime);

  this.localFs = FileSystem.getLocal(default_conf);

  JobConf default_job_conf = new JobConf(default_conf);

  this.localJobFile = default_job_conf.getLocalPath(JobTracker.SUBDIR +"/"+jobid + ".xml");

  this.localJarFile = default_job_conf.getLocalPath(JobTracker.SUBDIR+"/"+ jobid + ".jar");

Path jobDir = jobtracker.getSystemDirectoryForJob(jobId);

  FileSystem fs = jobDir.getFileSystem(default_conf);

  jobFile = new Path(jobDir, "job.xml");

  fs.copyToLocalFile(jobFile, localJobFile);

  conf = new JobConf(localJobFile);

  this.priority = conf.getJobPriority();

  this.status.setJobPriority(this.priority);

  this.profile = new JobProfile(conf.getUser(), jobid, jobFile.toString(), url, conf.getJobName(),conf.getQueueName());

  String jarFile = conf.getJar();

  if (jarFile != null) {

      fs.copyToLocalFile(new Path(jarFile), localJarFile);

      conf.setJar(localJarFile.toString());

  }

  this.numMapTasks = conf.getNumMapTasks();

  this.numReduceTasks = conf.getNumReduceTasks();

  this.taskCompletionEvents = new ArrayList<TaskCompletionEvent>(numMapTasks + numReduceTasks + 10);

  this.mapFailuresPercent = conf.getMaxMapTaskFailuresPercent();

  this.reduceFailuresPercent = conf.getMaxReduceTaskFailuresPercent();    

  MetricsContext metricsContext = MetricsUtil.getContext("mapred");

  this.jobMetrics = MetricsUtil.createRecord(metricsContext, "job");

  this.jobMetrics.setTag("user", conf.getUser());

  this.jobMetrics.setTag("sessionId", conf.getSessionId());

  this.jobMetrics.setTag("jobName", conf.getJobName());

  this.jobMetrics.setTag("jobId", jobid.toString());

  hasSpeculativeMaps = conf.getMapSpeculativeExecution();

  hasSpeculativeReduces = conf.getReduceSpeculativeExecution();

  this.maxLevel = jobtracker.getNumTaskCacheLevels();

  this.anyCacheLevel = this.maxLevel+1;

  this.nonLocalMaps = new LinkedList<TaskInProgress>();

  this.nonLocalRunningMaps = new LinkedHashSet<TaskInProgress>();

  this.runningMapCache = new IdentityHashMap<Node, Set<TaskInProgress>>();

  this.nonRunningReduces = new LinkedList<TaskInProgress>();   

  this.runningReduces = new LinkedHashSet<TaskInProgress>();

  this.resourceEstimator = new ResourceEstimator(this);

  }

3.1.2     JobTracker.setJobPriority()设置优先级

JobInProgress创建后,首先将它加入到jobs队列里,分别用一个map成员变量jobs用来管理所有jobs对象,一个string变量priority用来维护jobs的执行优先级别。之后JobTracker会调用resortPriority()函数,将jobs先按优先级别排序,再按提交时间排序,这样保证最高优先并且先提交的job会先执行。

public synchronized void setJobPriority(JobID jobid, String priority)throws IOException {

    JobInProgress job = jobs.get(jobid);

    if (null == job) {

        LOG.info("setJobPriority(): JobId " + jobid.toString()

            + " is not a valid job");

        return;

    }

    checkAccess(job, QueueManager.QueueOperation.ADMINISTER_JOBS);

    JobPriority newPriority = JobPriority.valueOf(priority);

    setJobPriority(jobid, newPriority);

  }

3.1.3     JobTracker.InitJob 通知初始化线程

然后JobTracker会把此job加入到一个管理需要初始化的队列里,即一个list成员变量jobInitQueue里。通过此成员变量调用notifyAll()函数,会唤起一个用于初始化job的线程InitJob来处理。InitJob收到信号后即取出最靠前的job,即优先级别最高的job,调用JobInProgressinitTasks()函数执行真正的初始化工作。

public void initJob(JobInProgress job) {

    if (null == job) {

      LOG.info("Init on null job is not valid");

      return;

    }      

    try {

      JobStatus prevStatus = (JobStatus)job.getStatus().clone();

      LOG.info("Initializing " + job.getJobID());

      job.initTasks();

      // Inform the listeners if the job state has changed

      // Note : that the job will be in PREP state.

      JobStatus newStatus = (JobStatus)job.getStatus().clone();

      if (prevStatus.getRunState() != newStatus.getRunState()) {

        JobStatusChangeEvent event = 

          new JobStatusChangeEvent(job, EventType.RUN_STATE_CHANGED, prevStatus, 

              newStatus);

        synchronized (JobTracker.this) {

          updateJobInProgressListeners(event);

        }

      }

    } catch (KillInterruptedException kie) {

      //   If job was killed during initialization, job state will be KILLED

      LOG.error("Job initialization interrupted:\n" +

          StringUtils.stringifyException(kie));

      killJob(job);

    } catch (Throwable t) {

      // If the job initialization is failed, job state will be FAILED

      LOG.error("Job initialization failed:\n" +

          StringUtils.stringifyException(t));

      failJob(job);

    }

}

3.1.4     JobInProgress.initTasks() 初始化TaskInProgress

Task的初始化过程稍复杂些,首先步骤JobInProgress会创建Map的监控对象。在initTasks()函数里通过调用JobClientreadSplitFile()获得已分解的输入数据的RawSplit列表,然后根据这个列表创建对应数目的Map执行管理对象TaskInProgress。在这个过程中,还会记录该RawSplit块对应的所有在HDFS里的blocks所在的DataNode节点的host,这个会在RawSplit创建时通过FileSplitgetLocations()函数获取,该函数会调用DistributedFileSystemgetFileCacheHints()获得(具体在HDFS模块中)。当然如果是存储在本地文件系统中,即使用LocalFileSystem时当然只有一个location“localhost”了。

其次JobInProgress会创建Reduce的监控对象,这个比较简单,根据JobConf里指定的Reduce数目创建,缺省只创建1Reduce任务。监控和调度Reduce任务的也是TaskInProgress类,不过构造方法有所不同,TaskInProgress会根据不同参数分别创建具体的MapTask或者ReduceTask 

JobInProgress创建完TaskInProgress后,最后构造JobStatus并记录job正在执行中,然后再调用JobHistory.JobInfo.logStarted()记录job的执行日志。到这里JobTracker里初始化job的过程全部结束,执行则是通过另一异步的方式处理的。

  public synchronized void initTasks() throws IOException, KillInterruptedException {

    if (tasksInited.get() || isComplete()) {

      return;

    }

.......        

LOG.info("Initializing " + jobId);// log job info

    JobHistory.JobInfo.logSubmitted(getJobID(), conf, jobFile.toString(), 

                                    this.startTime, hasRestarted());

    // log the job priority

    setPriority(this.priority);

    String jobFile = profile.getJobFile();

    Path sysDir = new Path(this.jobtracker.getSystemDir());

    FileSystem fs = sysDir.getFileSystem(conf);

    DataInputStream splitFile =

      fs.open(new Path(conf.get("mapred.job.split.file")));

    JobClient.RawSplit[] splits;

    try {

      splits = JobClient.readSplitFile(splitFile);

    } finally {

      splitFile.close();

    }

    numMapTasks = splits.length;

    int maxTasks = jobtracker.getMaxTasksPerJob();

    if (maxTasks > 0 && numMapTasks + numReduceTasks > maxTasks) {

throw new IOException("The number of tasks for this job " + (numMapTasks + numReduceTasks) +" exceeds the configured limit " + maxTasks);

    }

    jobtracker.getInstrumentation().addWaiting(getJobID(), numMapTasks + numReduceTasks);

    maps = new TaskInProgress[numMapTasks];

    for(int i=0; i < numMapTasks; ++i) {

      inputLength += splits[i].getDataLength();

      maps[i] = new TaskInProgress(jobId, jobFile, splits[i], jobtracker, conf, this, i);

    }

    LOG.info("Input size for job " + jobId + " = " + inputLength+ ". Number of splits = " + splits.length);

    if (numMapTasks > 0) { 

      nonRunningMapCache = createCache(splits, maxLevel);

    }     

    // set the launch time

    this.launchTime = System.currentTimeMillis();

    // Create reduce tasks

    this.reduces = new TaskInProgress[numReduceTasks];

    for (int i = 0; i < numReduceTasks; i++) {

      reduces[i] = new TaskInProgress(jobId, jobFile, numMapTasks, i, jobtracker, conf, this);

      nonRunningReduces.add(reduces[i]);

    }

    // Calculate the minimum number of maps to be complete before 

    // we should start scheduling reduces

    completedMapsForReduceSlowstart = (int)Math.ceil((conf.getFloat("mapred.reduce.slowstart.completed.maps",

                         DEFAULT_COMPLETED_MAPS_PERCENT_FOR_REDUCE_SLOWSTART) * numMapTasks));

    // create cleanup two cleanup tips, one map and one reduce.

    cleanup = new TaskInProgress[2];

    // cleanup map tip. This map doesn't use any splits. Just assign an empty

    // split.

    JobClient.RawSplit emptySplit = new JobClient.RawSplit();

    cleanup[0] = new TaskInProgress(jobId, jobFile, emptySplit, jobtracker, conf, this, numMapTasks);

    cleanup[0].setJobCleanupTask();

 

    // cleanup reduce tip.

    cleanup[1] = new TaskInProgress(jobId, jobFile, numMapTasks,numReduceTasks, jobtracker, conf, this);

    cleanup[1].setJobCleanupTask();

    // create two setup tips, one map and one reduce.

    setup = new TaskInProgress[2];

    // setup map tip. This map doesn't use any split. Just assign an empty

    // split.

    setup[0] = new TaskInProgress(jobId, jobFile, emptySplit, jobtracker, conf, this, numMapTasks + 1 );

    setup[0].setJobSetupTask();

    // setup reduce tip.

    setup[1] = new TaskInProgress(jobId, jobFile, numMapTasks,numReduceTasks + 1, jobtracker, conf, this);

    setup[1].setJobSetupTask(); 

    synchronized(jobInitKillStatus){

      jobInitKillStatus.initDone = true;

      if(jobInitKillStatus.killed) {throw new KillInterruptedException("Job " + jobId + " killed in init");

      }

    }

    tasksInited.set(true);

    JobHistory.JobInfo.logInited(profile.getJobID(), this.launchTime, numMapTasks, numReduceTasks);

  }

3.2            TaskTracker执行Task的过程

Task的执行实际是由TaskTracker发起的,TaskTracker会定期(缺省为10秒钟,MRConstants类中定义的HEARTBEAT_INTERVAL变量)与JobTracker进行一次通信,报告自己Task的执行状态,接收JobTracker的指令等。如果发现有自己需要执行的新任务也会在这时启动,即是在TaskTracker调用JobTrackerheartbeat()方法时进行,此调用底层是通过IPC层调用Proxy接口实现。

3.2.1     TaskTracker.run() 连接JobTracker

TaskTracker的启动过程会初始化一系列参数和服务,然后尝试连接JobTracker服务(即必须实现InterTrackerProtocol接口),如果连接断开,则会循环尝试连接JobTracker,并重新初始化所有成员和参数,此过程参见run()方法。

public void run() {

      while (!Thread.interrupted()) {

        try {

          TaskInProgress tip;

          synchronized (tasksToLaunch) {

            while (tasksToLaunch.isEmpty()) {

              tasksToLaunch.wait();

            }

             tip = tasksToLaunch.remove(0);   //get the TIP

            LOG.info("Trying to launch : " + tip.getTask().getTaskID());

          }

          synchronized (numFreeSlots) { //wait for a slot to run

            while (numFreeSlots.get() == 0) {

              numFreeSlots.wait();

            }

            LOG.info("In TaskLauncher, current free slots : " + numFreeSlots.get()+

                " and trying to launch "+tip.getTask().getTaskID());

            numFreeSlots.set(numFreeSlots.get() - 1);

            assert (numFreeSlots.get() >= 0);

          }

          synchronized (tip) {

            //to make sure that there is no kill task action for this

            if (tip.getRunState() != TaskStatus.State.UNASSIGNED &&

                tip.getRunState() != TaskStatus.State.FAILED_UNCLEAN &&

                tip.getRunState() != TaskStatus.State.KILLED_UNCLEAN) {

              //got killed externally while still in the launcher queue

              addFreeSlot();

              continue;

            }

            tip.slotTaken = true;

          }

          //got a free slot. launch the task

          startNewTask(tip);

        } catch (InterruptedException e) { 

          return; // ALL DONE

        } catch (Throwable th) {

          LOG.error("TaskLauncher error " + 

              StringUtils.stringifyException(th));

        }

      }

    }  

3.2.2     TaskTracker.offerService() 主循环

如果连接JobTracker服务成功,TaskTracker就会调用offerService()函数进入主执行循环中。这个循环会每隔10秒与JobTracker通讯一次,调用transmitHeartBeat()获得HeartbeatResponse信息。

// Send the heartbeat and process the jobtracker's directives

HeartbeatResponse heartbeatResponse = transmitHeartBeat(now);

然后调用HeartbeatResponsegetActions()函数获得JobTracker传过来的所有指令即一个TaskTrackerAction数组。再遍历这个数组,如果是一个新任务指令即LaunchTaskAction则调用startNewTask()函数执行新任务,否则加入到tasksToCleanup队列,交给一个taskCleanupThread线程来处理,如执行KillJobAction或者KillTaskAction等。

TaskTrackerAction[] actions = heartbeatResponse.getActions();

if(LOG.isDebugEnabled()) {

          LOG.debug("Got heartbeatResponse from JobTracker with responseId: " + 

                    heartbeatResponse.getResponseId() + " and " + 

                    ((actions != null) ? actions.length : 0) + " actions");

        }

        if (reinitTaskTracker(actions)) {

          return State.STALE;

        }

3.2.3     TaskTracker.transmitHeartBeat() 获取JobTracker指令

transmitHeartBeat()函数处理中,TaskTracker会创建一个新的TaskTrackerStatus对象记录目前任务的执行状况,然后通过IPC接口调用JobTrackerheartbeat()方法发送过去,并接受新的指令,即返回值TaskTrackerAction数组。在这个调用之前,TaskTracker会先检查目前执行的Task数目以及本地磁盘的空间使用情况等,如果可以接收新的Task则设置heartbeat()askForNewTask参数为true。操作成功后再更新相关的统计信息等。

HeartbeatResponse heartbeatResponse = jobClient.heartbeat(status, justStarted,justInited,askForNewTask,heartbeatResponseId);

3.2.4     TaskTracker.startNewTask() 启动新任务

此函数的主要任务就是创建TaskTracker$TaskInProgress对象来调度和监控任务,并把它加入到runningTasks队列中。完成后则调用localizeJob()真正初始化Task并开始执行。

private void startNewTask(TaskInProgress tip) {

    try {

      localizeJob(tip);

    } catch (Throwable e) {

      String msg = ("Error initializing " + tip.getTask().getTaskID() + ":\n" + StringUtils.stringifyException(e));

      LOG.warn(msg);

      tip.reportDiagnosticInfo(msg);

      try {

        tip.kill(true);

        tip.cleanup(true);

      } catch (IOException ie2) {

        LOG.info("Error cleaning up " + tip.getTask().getTaskID() + ":\n" +StringUtils.stringifyException(ie2));          

      }  

      // This might not be an 'Exception' - don't handle 'Error' here!

      if (e instanceof Error) {

        throw ((Error) e);

      }

    }

  }  

3.2.5     TaskTracker.localizeJob() 初始化job目录等

此函数主要任务是初始化工作目录workDir,再将job jar包从HDFS复制到本地文件系统中,调用RunJar.unJar()将包解压到工作目录。然后创建一个RunningJob并调用addTaskToJob()函数将它添加到runningJobs监控队列中。完成后即调用launchTaskForJob()开始执行Task

Path localJarFile = null;

Task t = tip.getTask();

JobID jobId = t.getJobID();

Path jobFile = new Path(t.getJobFile());

Path localJobFile = lDirAlloc.getLocalPathForWrite(getLocalJobDir(jobId.toString())+ Path.SEPARATOR + "job.xml",jobFileSize,fConf);

RunningJob rjob = addTaskToJob(jobId, tip);

3.2.6     TaskTracker.launchTaskForJob() 执行任务

启动Task的工作实际是调用TaskTracker$TaskInProgresslaunchTask()函数来执行的。

private void launchTaskForJob(TaskInProgress tip, JobConf jobConf) throws IOException{

    synchronized (tip) {

      tip.setJobConf(jobConf);

      tip.launchTask();

    }

  }

3.2.7     TaskTracker$TaskInProgress.launchTask() 执行任务

执行任务前先调用localizeTask()更新一下jobConf文件并写入到本地目录中。然后通过调用TaskcreateRunner()方法创建TaskRunner对象并调用其start()方法最后启动Task独立的java执行子进程。

public synchronized void launchTask() throws IOException {if (this.taskStatus.getRunState() == TaskStatus.State.UNASSIGNED ||

          this.taskStatus.getRunState() == TaskStatus.State.FAILED_UNCLEAN ||

          this.taskStatus.getRunState() == TaskStatus.State.KILLED_UNCLEAN) {

        localizeTask(task);

        if (this.taskStatus.getRunState() == TaskStatus.State.UNASSIGNED) {

          this.taskStatus.setRunState(TaskStatus.State.RUNNING);

        }

        this.runner = task.createRunner(TaskTracker.this, this);

        this.runner.start();

        this.taskStatus.setStartTime(System.currentTimeMillis());

      } else {

        LOG.info("Not launching task: " + task.getTaskID() + " since it's state is " + this.taskStatus.getRunState());

      }

  }

3.2.8     MapTask.run()ReduceTask.run();

initialize(job, getJobID(), reporter, useNewApi);

......

if (useNewApi) {

    runNewMapper(job, split, umbilical, reporter);

} else {

    runOldMapper(job, split, umbilical, reporter);

}

这里主要包括两个关键块:一个是initialize这里会根据预设的OutputFormat来格式输出,然后就是从runNewMapper()runOldMapper()处执行map任务,用runNewMapper举例:从代码清单2中我们看出这里的执行过程,首先我们的程序先生成我们任务的map类,然后生成map任务的数据输入格式类,并根据我们的数据输入格式将我们的这块的数据分割成我们的指定的输入数据就是RecordReader,然后将RecordReader作为输入循环调用map的最终map()方法,也就是我们的客户端的主体map方法。

reducemap的开始过程一样,不再重复了,就是在后面有所不同,首先reduce的数据会在操作前利用Merge函数合并一下,然后生成keyvalue对遍历对象,然后执行循环执行Reducer.reduce(),结果上传到fs中。

public ReduceTaskRunner(TaskInProgress task, TaskTracker tracker, JobConf conf) throws IOException { 

    super(task, tracker, conf);

  }

  /** Assemble all of the map output files */

  public boolean prepare() throws IOException {

    if (!super.prepare()) {

      return false;

    }

    // cleanup from failures

    mapOutputFile.removeAll(getTask().getTaskID());

    return true;

  }

  /** Delete all of the temporary map output files. */

  public void close() throws IOException {

    LOG.info(getTask()+" done; removing files.");

    getTask().getProgress().setStatus("closed");

    mapOutputFile.removeAll(getTask().getTaskID());

  }

(责任编辑:IT)
------分隔线----------------------------