当前位置: > Linux集群 > Hadoop >

Mapreduce实例-分组排重(group by distinct)

时间:2015-05-11 02:59来源:linux.it.net.cn 作者:IT

 1 public class GroupComparator implements RawComparator<MyBinaryKey> {
 2  
 3  @Override
 4  public int compare(MyBinaryKey o1, MyBinaryKey o2) {
 5   return o1.toString().compareTo(o2.toString());
 6  }
 7 
 8  @Override
 9  public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
10   return WritableComparator.compareBytes(b1, s1, Long.SIZE / 8 + Integer.SIZE / 8 * 3, b2, s2,  Long.SIZE / 8 + Integer.SIZE / 8 * 3);
11  }
12 
13 }
14 
15 public abstract class UVBinaryKey  extends BinaryComparable implements WritableComparable<BinaryComparable>{
16  //根据需要添加属性;
17   @Override
18  public void readFields(DataInput in) throws IOException {
19 
20 } 
21 
22 @Override
23  public byte[] getBytes() {
24 
25 }
26 
27 }
28 
29 public class MyPartitioner extends Partitioner<MyBinaryKey, NullWritable> {
30 
31  /**
32   * 根据uv/ip取模分区,保证相同uv/ip落在同一分区
33   */
34  @Override
35  public int getPartition(MyBinaryKey key, NullWritable value, int numPartitions) {
36   
37   int k=0;
38   for(byte b : key.getAttr()){
39    k+=b&0xff;
40   }
41   return k%numPartitions;
42  }
43 
44 }
45 
46 
47 
48   job.setMapOutputKeyClass(UVBinaryKey.class);
49   job.setGroupingComparatorClass(GroupComparator.class);
50    job.setPartitionerClass(MyPartitioner.class);
51 
52   map 略
 1 combiner(根据需要添加)
 2 reduce中的实现:
 3        @Override
 4         protected void reduce(UVBinaryKey key, Iterable<NullWritable> values, Context context)
 5                 throws IOException,
 6                 InterruptedException {
 7             long count = 0;
 8             byte[] tbsign = null;
 9             for (NullWritable nullWritable : values) {
10                 byte[] attr = key.getAttr();
11                 if (tbsign == null) {
12                     tbsign = attr;
13                     count++;
14                 }
15                 if (tbsign != null) {
16                     if (tbsign.length != attr.length) {
17                         count++;
18                         tbsign = attr;
19                     } else {
20                         for (int i = 0; i < tbsign.length; i++) {
21                             if (tbsign[i] != attr[i]) {
22                                 count++;
23                                 tbsign = attr;
24                                 break;
25                             }
26                         }
27                     }
28                 }
29 
30             }
31             StringBuffer out = new StringBuffer();
32             out.append(new String(key.getCity()))
33                     .append(Constants.FIELDS_TERMINATED).append(count);
34             context.write(new Text(out.toString()), NullWritable.get());
35 
36         }

 



(责任编辑:IT)
------分隔线----------------------------