1 public class GroupComparator implements RawComparator<MyBinaryKey> { 2 3 @Override 4 public int compare(MyBinaryKey o1, MyBinaryKey o2) { 5 return o1.toString().compareTo(o2.toString()); 6 } 7 8 @Override 9 public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { 10 return WritableComparator.compareBytes(b1, s1, Long.SIZE / 8 + Integer.SIZE / 8 * 3, b2, s2, Long.SIZE / 8 + Integer.SIZE / 8 * 3); 11 } 12 13 } 14 15 public abstract class UVBinaryKey extends BinaryComparable implements WritableComparable<BinaryComparable>{ 16 //根据需要添加属性; 17 @Override 18 public void readFields(DataInput in) throws IOException { 19 20 } 21 22 @Override 23 public byte[] getBytes() { 24 25 } 26 27 } 28 29 public class MyPartitioner extends Partitioner<MyBinaryKey, NullWritable> { 30 31 /** 32 * 根据uv/ip取模分区,保证相同uv/ip落在同一分区 33 */ 34 @Override 35 public int getPartition(MyBinaryKey key, NullWritable value, int numPartitions) { 36 37 int k=0; 38 for(byte b : key.getAttr()){ 39 k+=b&0xff; 40 } 41 return k%numPartitions; 42 } 43 44 } 45 46 47 48 job.setMapOutputKeyClass(UVBinaryKey.class); 49 job.setGroupingComparatorClass(GroupComparator.class); 50 job.setPartitionerClass(MyPartitioner.class); 51 52 map 略 1 combiner(根据需要添加) 2 reduce中的实现: 3 @Override 4 protected void reduce(UVBinaryKey key, Iterable<NullWritable> values, Context context) 5 throws IOException, 6 InterruptedException { 7 long count = 0; 8 byte[] tbsign = null; 9 for (NullWritable nullWritable : values) { 10 byte[] attr = key.getAttr(); 11 if (tbsign == null) { 12 tbsign = attr; 13 count++; 14 } 15 if (tbsign != null) { 16 if (tbsign.length != attr.length) { 17 count++; 18 tbsign = attr; 19 } else { 20 for (int i = 0; i < tbsign.length; i++) { 21 if (tbsign[i] != attr[i]) { 22 count++; 23 tbsign = attr; 24 break; 25 } 26 } 27 } 28 } 29 30 } 31 StringBuffer out = new StringBuffer(); 32 out.append(new String(key.getCity())) 33 .append(Constants.FIELDS_TERMINATED).append(count); 34 context.write(new Text(out.toString()), NullWritable.get()); 35 36 }
(责任编辑:IT) |