当前位置: > Linux集群 > Hadoop >

Mapreduce实例-Top Key

时间:2015-05-11 02:58来源:linux.it.net.cn 作者:IT

 1 public class TopK extends Configured implements Tool {
 2 
 3     public static class TopKMapper extends Mapper<Object, Text, NullWritable, LongWritable> {
 4 
 5         public static final int K = 100;
 6         private TreeMap<Long, Long> tm = new TreeMap<Long, Long>();
 7 
 8         @Override
 9         protected void map(Object key, Text value, Context context) throws IOException, InterruptedException {
10            try {
11                 long k = Integer.parseInt(value.toString().substring(0, 9));
12                 tm.put(k, k);
13                 if (tm.size() > K) {
14                     tm.remove(tm.firstKey());
15                 }
16             } catch (Exception e) {
17                 context.getCounter("TopK", "errorlog").increment(1);
18             }
19         }
20 
21         @Override
22         protected void cleanup(Context context) throws IOException, InterruptedException {
23             for (Long text : tm.values()) {
24                 context.write(NullWritable.get(), new LongWritable(text));
25             }
26         }
27     }
28 
29     public static class TopKReducer extends Reducer<NullWritable, LongWritable, NullWritable, LongWritable> {
30 
31         public static final int K = 100;
32         private TreeMap<Long, Long> mt = new TreeMap<Long, Long>();
33 
34         @Override
35         protected void reduce(NullWritable key, Iterable<LongWritable> values, Context context)
36                 throws IOException, InterruptedException {
37             for (LongWritable value : values) {
38                 mt.put(value.get(), value.get());
39                 if (mt.size() > K) {
40                     mt.remove(mt.firstKey());
41                 }
42             }
43             for (Long val : mt.descendingKeySet()) {
44                 context.write(NullWritable.get(), new LongWritable(val));
45             }
46         }
47 
48     }
49 
50     @Override
51     public int run(String[] args) throws Exception {
52         Configuration conf = getConf();
53         Job job = new Job(conf, "TopKNum");
54         job.setOutputKeyClass(NullWritable.class);
55         job.setOutputValueClass(LongWritable.class);
56         job.setMapperClass(TopKMapper.class);
57         job.setReducerClass(TopKReducer.class);
58         job.setJarByClass(TopK.class);
59         FileInputFormat.setInputPaths(job, new Path(args[0]));
60         FileOutputFormat.setOutputPath(job, new Path(args[1]));
61         job.setInputFormatClass(TextInputFormat.class);
62         job.setOutputFormatClass(TextOutputFormat.class);
63 
64         return job.waitForCompletion(true) ? 0 : 1;
65     }
66 
67     
68     public static void main(String[] args) throws IOException, InterruptedException {
69         try {
70             if (args.length < 2) {
71                 System.err.println("ERROR: Parameter format length ");
72                 System.exit(0);
73             }
74             int ret = ToolRunner.run(new TopK(), args);
75             System.exit(ret);
76         } catch (Exception e) {
77             e.printStackTrace();
78         }
79     }
80 }

上面是求最大100个,如果求最小的100 个数,改map和reduce中的mt.remove(mt.firstKey());为mt.remove(mt.lastKey())



(责任编辑:IT)
------分隔线----------------------------