> Linux集群 > Hadoop >

hadoop-Mapper分析

 
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
* Licensed to the Apache Software Foundation (ASF) under one
 
package org.apache.hadoop.mapreduce;
 
import java.io.IOException;
 
/**
 * Maps input key/value pairs to a set of intermediate key/value pairs. 
 *
 * <p>Maps are the individual tasks which transform input records into a
 * intermediate records. The transformed intermediate records need not be of
 * the same type as the input records. A given input pair may map to zero or
 * many output pairs.</p>
 *
 * <p>The Hadoop Map-Reduce framework spawns one map task for each
 * {@link InputSplit} generated by the {@link InputFormat} for the job.
 * <code>Mapper</code> implementations can access the {@link Configuration} for
 * the job via the {@link JobContext#getConfiguration()}.
 *
 * <p>The framework first calls
 * {@link #setup(org.apache.hadoop.mapreduce.Mapper.Context)}, followed by
 * {@link #map(Object, Object, Context)}
 * for each key/value pair in the <code>InputSplit</code>. Finally
 * {@link #cleanup(Context)} is called.</p>
 *
 * <p>All intermediate values associated with a given output key are
 * subsequently grouped by the framework, and passed to a {@link Reducer} to 
 * determine the final output. Users can control the sorting and grouping by
 * specifying two key {@link RawComparator} classes.</p>
 *
 * <p>The <code>Mapper</code> outputs are partitioned per
 * <code>Reducer</code>. Users can control which keys (and hence records) go to
 * which <code>Reducer</code> by implementing a custom {@link Partitioner}.
 *
 * <p>Users can optionally specify a <code>combiner</code>, via
 * {@link Job#setCombinerClass(Class)}, to perform local aggregation of the
 * intermediate outputs, which helps to cut down the amount of data transferred
 * from the <code>Mapper</code> to the <code>Reducer</code>.
 *
 * <p>Applications can specify if and how the intermediate
 * outputs are to be compressed and which {@link CompressionCodec}s are to be
 * used via the <code>Configuration</code>.</p>
 * 
 * <p>If the job has zero
 * reduces then the output of the <code>Mapper</code> is directly written
 * to the {@link OutputFormat} without sorting by keys.</p>
 *
 * <p>Example:</p>
 * <p><blockquote><pre>
 * public class TokenCounterMapper
 *     extends Mapper<Object, Text, Text, IntWritable>{
 *   
 *   private final static IntWritable one = new IntWritable(1);
 *   private Text word = new Text();
 *  
 *   public void map(Object key, Text value, Context context) throws IOException {
 *     StringTokenizer itr = new StringTokenizer(value.toString());
 *     while (itr.hasMoreTokens()) {
 *       word.set(itr.nextToken());
 *       context.collect(word, one);
 *     }
 *   }
 * }
 * </pre></blockquote></p>
 *
 * <p>Applications may override the {@link #run(Context)} method to exert
 * greater control on map processing e.g. multi-threaded <code>Mapper</code>s
 * etc.</p>
 *
 * @see InputFormat
 * @see JobContext
 * @see Partitioner 
 * @see Reducer
 */
public class Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
 
  public class Context
    extends MapContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT> {
    public Context(Configuration conf, TaskAttemptID taskid,
                   RecordReader<KEYIN,VALUEIN> reader,
                   RecordWriter<KEYOUT,VALUEOUT> writer,
                   OutputCommitter committer,
                   StatusReporter reporter,
                   InputSplit split) throws IOException, InterruptedException {
      super(conf, taskid, reader, writer, committer, reporter, split);
    }
  }
  
  /**
   * Called once at the beginning of the task.
   */
  protected void setup(Context context
                       ) throws IOException, InterruptedException {
    // NOTHING
  }
 
  /**
   * Called once for each key/value pair in the input split. Most applications
   * should override this, but the default is the identity function.
   */
  @SuppressWarnings("unchecked")
  protected void map(KEYIN key, VALUEIN value,
                     Context context) throws IOException, InterruptedException {
    context.write((KEYOUT) key, (VALUEOUT) value);
  }
 
  /**
   * Called once at the end of the task.
   */
  protected void cleanup(Context context
                         ) throws IOException, InterruptedException {
    // NOTHING
  }
  
  /**
   * Expert users can override this method for more complete control over the
   * execution of the Mapper.
   * @param context
   * @throws IOException
   */
  public void run(Context context) throws IOException, InterruptedException {
    setup(context);
    while (context.nextKeyValue()) {
      map(context.getCurrentKey(), context.getCurrentValue(), context);
    }
    cleanup(context);
  }
}

Mapper的四个方法是setup,map,cleanup和run。其中,setup和cleanup用于管理Mapper生命周期中的资源,setup在完成Mapper构造,即将开始执行map动作前调用,cleanup则在所有的map动作完成后被调用。方法map用于对一次输入的key/value对进行map动作。run方法执行了上面描述的过程,它调用setup,让后迭代所有的key/value对,进行map,最后调用cleanup。

org.apache.hadoop.mapreduce.lib.map中实现了Mapper的三个子类,分别是InverseMapper(将输入<key, value> map为输出<value, key>),MultithreadedMapper(多线程执行map方法)和TokenCounterMapper(对输入的value分解为token并计数)。其中最复杂的是MultithreadedMapper,我们就以它为例,来分析Mapper的实现。

InverseMapper源代码:

 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
 * Licensed to the Apache Software Foundation (ASF) under one
 
 
package org.apache.hadoop.mapreduce.lib.map;
 
 
import java.io.IOException;
 
 
/** A {@link Mapper} that swaps keys and values. */
public class InverseMapper<K, V> extends Mapper<K,V,V,K> {
 
 
  /** The inverse function.  Input keys and values are swapped.*/
  @Override
  public void map(K key, V value, Context context
                  ) throws IOException, InterruptedException {
    context.write(value, key);
  }
  
}
 
 
 
 
 
 <div>
 
 
 
 
 
 
 </div>

TokenCountMapper源代码:

 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
 * Licensed to the Apache Software Foundation (ASF) under one
 
 
package org.apache.hadoop.mapreduce.lib.map;
 
 
import java.io.IOException;
 
 
/**
 * Tokenize the input values and emit each word with a count of 1.
 */
public class TokenCounterMapper extends Mapper<Object, Text, Text, IntWritable>{
    
  private final static IntWritable one = new IntWritable(1);
  private Text word = new Text();
  
  @Override
  public void map(Object key, Text value, Context context
                  ) throws IOException, InterruptedException {
    StringTokenizer itr = new StringTokenizer(value.toString());
    while (itr.hasMoreTokens()) {
      word.set(itr.nextToken());
      context.write(word, one);
    }
  }
}
 
 
 
 
 
 <div>
 
  
  
  
  
  <span style="font-size:9pt;line-height:1.5;"></span>
 
 
 
 
 
 </div>

MultithreadedMapper会启动多个线程执行另一个Mapper的map方法,它会启动mapred.map.multithreadedrunner.threads(配置项)个线程执行Mapper:mapred.map.multithreadedrunner.class(配置项)。MultithreadedMapper重写了基类Mapper的run方法,启动N个线程(对应的类为MapRunner)执行mapred.map.multithreadedrunner.class(我们称为目标Mapper)的run方法(就是说,目标Mapper的setup和cleanup会被执行多次)。目标Mapper共享同一份InputSplit,这就意味着,对InputSplit的数据读必须线程安全。为此,MultithreadedMapper引入了内部类SubMapRecordReader,SubMapRecordWriter,SubMapStatusReporter,分别继承自RecordReader,RecordWriter和StatusReporter,它们通过互斥访问MultithreadedMapper的Mapper.Context,实现了对同一份InputSplit的线程安全访问,为Mapper提供所需的Context。这些类的实现方法都很简单。

 


(责任编辑:IT)