问题: 有如下数据文件 city.txt (id, city, value)
cat city.txt 1、这是实际业务中经常会遇到的 group TopK 问题,下面来看看 pig 如何解决: a = load '/data/city.txt' using PigStorage(' ') as (id:chararray, city:chararray, value:int); b = group a by city; c = foreach b {c1=order a by value desc; c2=limit c1 2; generate group,c2.value;}; d = stream c through `sed 's/[(){}]//g'`; dump d; (bj,600,300) (sh,900,400) (wh,500,200) a = load '/data/city.txt' using PigStorage(' ') as (id:chararray, city:chararray, value:int); b = group a by city; c = foreach b {c1=order a by value desc; generate group,c1.value;}; d = stream c through `sed 's/[(){}]//g'`; dump d; (bj,600,300,100) (sh,900,400,200) (wh,500,200,100) 2、下面我们再来看看hive如何处理group topk的问题: 本质上HSQL和sql有很多相同的地方,但HSQL目前功能还有很多缺失,至少不如原生态的SQL功能强大, 比起PIG也有些差距,如果SQL中这类分组topk的问题如何解决呢? select * from city a where 2>(select count(1) from city where cname=a.cname and value>a.value) distribute by a.cname sort by a.cname,a.value desc;http://my.oschina.net/leejun2005/blog/78904 但是这种写法在HQL中直接报语法错误了,下面我们只能用hive udf的思路来解决了: 排序city和value,然后对city计数,最后where过滤掉city列计数器大于k的行即可。 好了,上代码: (1)定义UDF: package com.example.hive.udf; import org.apache.hadoop.hive.ql.exec.UDF; public final class Rank extends UDF{ private int counter; private String last_key; public int evaluate(final String key){ if ( !key.equalsIgnoreCase(this.last_key) ) { this.counter = 0; this.last_key = key; } return this.counter++; } } add jar Rank.jar; create temporary function rank as 'com.example.hive.udf.Rank'; create table city(id int,cname string,value int) row format delimited fields terminated by ' '; LOAD DATA LOCAL INPATH 'city.txt' OVERWRITE INTO TABLE city; select cname, value from ( select cname,rank(cname) csum,value from ( select id, cname, value from city distribute by cname sort by cname,value desc )a )b where csum < 2; (3)结果:
bj 600 bj 300 sh 900 sh 400 wh 500 wh 200
可以看到,hive相比pig来说,处理起来稍微复杂了点,但随着hive的日渐完善,以后比pig更简洁也说不定。
REF:hive中分组取前N个值的实现 http://baiyunl.iteye.com/blog/1466343
3、最后我们来看一下原生态的MR:
import java.io.IOException; import java.util.TreeSet; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; public class GroupTopK { // 这个 MR 将会取得每组年龄中 id 最大的前 3 个 // 测试数据由脚本生成:http://my.oschina.net/leejun2005/blog/76631 public static class GroupTopKMapper extends Mapper<LongWritable, Text, IntWritable, LongWritable> { IntWritable outKey = new IntWritable(); LongWritable outValue = new LongWritable(); String[] valArr = null; public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { valArr = value.toString().split("\t"); outKey.set(Integer.parseInt(valArr[2]));// age int outValue.set(Long.parseLong(valArr[0]));// id long context.write(outKey, outValue); } } public static class GroupTopKReducer extends Reducer<IntWritable, LongWritable, IntWritable, LongWritable> { LongWritable outValue = new LongWritable(); public void reduce(IntWritable key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException { TreeSet<Long> idTreeSet = new TreeSet<Long>(); for (LongWritable val : values) { idTreeSet.add(val.get()); if (idTreeSet.size() > 3) { idTreeSet.remove(idTreeSet.first()); } } for (Long id : idTreeSet) { outValue.set(id); context.write(key, outValue); } } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); String[] otherArgs = new GenericOptionsParser(conf, args) .getRemainingArgs(); System.out.println(otherArgs.length); System.out.println(otherArgs[0]); System.out.println(otherArgs[1]); if (otherArgs.length != 3) { System.err.println("Usage: GroupTopK <in> <out>"); System.exit(2); } Job job = new Job(conf, "GroupTopK"); job.setJarByClass(GroupTopK.class); job.setMapperClass(GroupTopKMapper.class); job.setReducerClass(GroupTopKReducer.class); job.setNumReduceTasks(1); job.setOutputKeyClass(IntWritable.class); job.setOutputValueClass(LongWritable.class); FileInputFormat.addInputPath(job, new Path(otherArgs[1])); FileOutputFormat.setOutputPath(job, new Path(otherArgs[2])); System.exit(job.waitForCompletion(true) ? 0 : 1); } } hadoop jar GroupTopK.jar GroupTopK /tmp/decli/record_new.txt /tmp/1 结果:
hadoop fs -cat /tmp/1/part-r-00000 ...... 数据验证:
awk '$3==0{print $1}' record_new.txt|sort -nr|head -3 可以看到结果没有问题。 注:测试数据由以下脚本生成:
http://my.oschina.net/leejun2005/blog/76631
PS: 如果说hive类似sql的话,那pig就类似plsql存储过程了:程序编写更自由,逻辑能处理的更强大了。 pig中还能直接通过反射调用java的静态类中的方法,这块内容请参考之前的相关pig博文。 附几个HIVE UDAF链接,有兴趣的同学自己看下:
Hive UDAF和UDTF实现group by后获取top值 http://blog.csdn.net/liuzhoulong/article/details/7789183 (责任编辑:IT) |