Mapreduce实例-分组排重(group by distinct)
时间:2015-05-11 02:59 来源:linux.it.net.cn 作者:IT
1 public class GroupComparator implements RawComparator<MyBinaryKey> {
2
3 @Override
4 public int compare(MyBinaryKey o1, MyBinaryKey o2) {
5 return o1.toString().compareTo(o2.toString());
6 }
7
8 @Override
9 public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
10 return WritableComparator.compareBytes(b1, s1, Long.SIZE / 8 + Integer.SIZE / 8 * 3, b2, s2, Long.SIZE / 8 + Integer.SIZE / 8 * 3);
11 }
12
13 }
14
15 public abstract class UVBinaryKey extends BinaryComparable implements WritableComparable<BinaryComparable>{
16 //根据需要添加属性;
17 @Override
18 public void readFields(DataInput in) throws IOException {
19
20 }
21
22 @Override
23 public byte[] getBytes() {
24
25 }
26
27 }
28
29 public class MyPartitioner extends Partitioner<MyBinaryKey, NullWritable> {
30
31 /**
32 * 根据uv/ip取模分区,保证相同uv/ip落在同一分区
33 */
34 @Override
35 public int getPartition(MyBinaryKey key, NullWritable value, int numPartitions) {
36
37 int k=0;
38 for(byte b : key.getAttr()){
39 k+=b&0xff;
40 }
41 return k%numPartitions;
42 }
43
44 }
45
46
47
48 job.setMapOutputKeyClass(UVBinaryKey.class);
49 job.setGroupingComparatorClass(GroupComparator.class);
50 job.setPartitionerClass(MyPartitioner.class);
51
52 map 略
1 combiner(根据需要添加)
2 reduce中的实现:
3 @Override
4 protected void reduce(UVBinaryKey key, Iterable<NullWritable> values, Context context)
5 throws IOException,
6 InterruptedException {
7 long count = 0;
8 byte[] tbsign = null;
9 for (NullWritable nullWritable : values) {
10 byte[] attr = key.getAttr();
11 if (tbsign == null) {
12 tbsign = attr;
13 count++;
14 }
15 if (tbsign != null) {
16 if (tbsign.length != attr.length) {
17 count++;
18 tbsign = attr;
19 } else {
20 for (int i = 0; i < tbsign.length; i++) {
21 if (tbsign[i] != attr[i]) {
22 count++;
23 tbsign = attr;
24 break;
25 }
26 }
27 }
28 }
29
30 }
31 StringBuffer out = new StringBuffer();
32 out.append(new String(key.getCity()))
33 .append(Constants.FIELDS_TERMINATED).append(count);
34 context.write(new Text(out.toString()), NullWritable.get());
35
36 }
(责任编辑:IT)
1 public class GroupComparator implements RawComparator<MyBinaryKey> { 2 3 @Override 4 public int compare(MyBinaryKey o1, MyBinaryKey o2) { 5 return o1.toString().compareTo(o2.toString()); 6 } 7 8 @Override 9 public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { 10 return WritableComparator.compareBytes(b1, s1, Long.SIZE / 8 + Integer.SIZE / 8 * 3, b2, s2, Long.SIZE / 8 + Integer.SIZE / 8 * 3); 11 } 12 13 } 14 15 public abstract class UVBinaryKey extends BinaryComparable implements WritableComparable<BinaryComparable>{ 16 //根据需要添加属性; 17 @Override 18 public void readFields(DataInput in) throws IOException { 19 20 } 21 22 @Override 23 public byte[] getBytes() { 24 25 } 26 27 } 28 29 public class MyPartitioner extends Partitioner<MyBinaryKey, NullWritable> { 30 31 /** 32 * 根据uv/ip取模分区,保证相同uv/ip落在同一分区 33 */ 34 @Override 35 public int getPartition(MyBinaryKey key, NullWritable value, int numPartitions) { 36 37 int k=0; 38 for(byte b : key.getAttr()){ 39 k+=b&0xff; 40 } 41 return k%numPartitions; 42 } 43 44 } 45 46 47 48 job.setMapOutputKeyClass(UVBinaryKey.class); 49 job.setGroupingComparatorClass(GroupComparator.class); 50 job.setPartitionerClass(MyPartitioner.class); 51 52 map 略 1 combiner(根据需要添加) 2 reduce中的实现: 3 @Override 4 protected void reduce(UVBinaryKey key, Iterable<NullWritable> values, Context context) 5 throws IOException, 6 InterruptedException { 7 long count = 0; 8 byte[] tbsign = null; 9 for (NullWritable nullWritable : values) { 10 byte[] attr = key.getAttr(); 11 if (tbsign == null) { 12 tbsign = attr; 13 count++; 14 } 15 if (tbsign != null) { 16 if (tbsign.length != attr.length) { 17 count++; 18 tbsign = attr; 19 } else { 20 for (int i = 0; i < tbsign.length; i++) { 21 if (tbsign[i] != attr[i]) { 22 count++; 23 tbsign = attr; 24 break; 25 } 26 } 27 } 28 } 29 30 } 31 StringBuffer out = new StringBuffer(); 32 out.append(new String(key.getCity())) 33 .append(Constants.FIELDS_TERMINATED).append(count); 34 context.write(new Text(out.toString()), NullWritable.get()); 35 36 }
(责任编辑:IT) |