Pig、Hive、MapReduce 解决分组 Top K 问题
时间:2016-11-27 02:31 来源:linux.it.net.cn 作者:IT
问题:
有如下数据文件 city.txt (id, city, value)
cat city.txt
1 wh 500
2 bj 600
3 wh 100
4 sh 400
5 wh 200
6 bj 100
7 sh 200
8 bj 300
9 sh 900
需要按 city 分组聚合,然后从每组数据中取出前两条value最大的记录。
1、这是实际业务中经常会遇到的 group TopK 问题,下面来看看 pig 如何解决:
a = load '/data/city.txt' using PigStorage(' ') as (id:chararray, city:chararray, value:int);
b = group a by city;
c = foreach b {c1=order a by value desc; c2=limit c1 2; generate group,c2.value;};
d = stream c through `sed 's/[(){}]//g'`;
dump d;
结果:
(bj,600,300)
(sh,900,400)
(wh,500,200)
这几行代码其实也实现了mysql中的 group_concat 函数的功能:
a = load '/data/city.txt' using PigStorage(' ') as (id:chararray, city:chararray, value:int);
b = group a by city;
c = foreach b {c1=order a by value desc; generate group,c1.value;};
d = stream c through `sed 's/[(){}]//g'`;
dump d;
结果:
(bj,600,300,100)
(sh,900,400,200)
(wh,500,200,100)
2、下面我们再来看看hive如何处理group topk的问题:
本质上HSQL和sql有很多相同的地方,但HSQL目前功能还有很多缺失,至少不如原生态的SQL功能强大,
比起PIG也有些差距,如果SQL中这类分组topk的问题如何解决呢?
select * from city a where
2>(select count(1) from city where cname=a.cname and value>a.value)
distribute by a.cname sort by a.cname,a.value desc;
http://my.oschina.net/leejun2005/blog/78904
但是这种写法在HQL中直接报语法错误了,下面我们只能用hive udf的思路来解决了:
排序city和value,然后对city计数,最后where过滤掉city列计数器大于k的行即可。
好了,上代码:
(1)定义UDF:
package com.example.hive.udf;
import org.apache.hadoop.hive.ql.exec.UDF;
public final class Rank extends UDF{
private int counter;
private String last_key;
public int evaluate(final String key){
if ( !key.equalsIgnoreCase(this.last_key) ) {
this.counter = 0;
this.last_key = key;
}
return this.counter++;
}
}
(2)注册jar、建表、导数据,查询:
add jar Rank.jar;
create temporary function rank as 'com.example.hive.udf.Rank';
create table city(id int,cname string,value int) row format delimited fields terminated by ' ';
LOAD DATA LOCAL INPATH 'city.txt' OVERWRITE INTO TABLE city;
select cname, value from (
select cname,rank(cname) csum,value from (
select id, cname, value from city distribute by cname sort by cname,value desc
)a
)b where csum < 2;
(3)结果:
bj 600
bj 300
sh 900
sh 400
wh 500
wh 200
可以看到,hive相比pig来说,处理起来稍微复杂了点,但随着hive的日渐完善,以后比pig更简洁也说不定。
REF:hive中分组取前N个值的实现
http://baiyunl.iteye.com/blog/1466343
3、最后我们来看一下原生态的MR:
import java.io.IOException;
import java.util.TreeSet;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
public class GroupTopK {
// 这个 MR 将会取得每组年龄中 id 最大的前 3 个
// 测试数据由脚本生成:http://my.oschina.net/leejun2005/blog/76631
public static class GroupTopKMapper extends
Mapper<LongWritable, Text, IntWritable, LongWritable> {
IntWritable outKey = new IntWritable();
LongWritable outValue = new LongWritable();
String[] valArr = null;
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
valArr = value.toString().split("\t");
outKey.set(Integer.parseInt(valArr[2]));// age int
outValue.set(Long.parseLong(valArr[0]));// id long
context.write(outKey, outValue);
}
}
public static class GroupTopKReducer extends
Reducer<IntWritable, LongWritable, IntWritable, LongWritable> {
LongWritable outValue = new LongWritable();
public void reduce(IntWritable key, Iterable<LongWritable> values,
Context context) throws IOException, InterruptedException {
TreeSet<Long> idTreeSet = new TreeSet<Long>();
for (LongWritable val : values) {
idTreeSet.add(val.get());
if (idTreeSet.size() > 3) {
idTreeSet.remove(idTreeSet.first());
}
}
for (Long id : idTreeSet) {
outValue.set(id);
context.write(key, outValue);
}
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args)
.getRemainingArgs();
System.out.println(otherArgs.length);
System.out.println(otherArgs[0]);
System.out.println(otherArgs[1]);
if (otherArgs.length != 3) {
System.err.println("Usage: GroupTopK <in> <out>");
System.exit(2);
}
Job job = new Job(conf, "GroupTopK");
job.setJarByClass(GroupTopK.class);
job.setMapperClass(GroupTopKMapper.class);
job.setReducerClass(GroupTopKReducer.class);
job.setNumReduceTasks(1);
job.setOutputKeyClass(IntWritable.class);
job.setOutputValueClass(LongWritable.class);
FileInputFormat.addInputPath(job, new Path(otherArgs[1]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[2]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
hadoop jar GroupTopK.jar GroupTopK /tmp/decli/record_new.txt /tmp/1
结果:
hadoop fs -cat /tmp/1/part-r-00000
0 12869695
0 12869971
0 12869976
1 12869813
1 12869870
1 12869951
......
数据验证:
awk '$3==0{print $1}' record_new.txt|sort -nr|head -3
12869976
12869971
12869695
可以看到结果没有问题。
注:测试数据由以下脚本生成:
http://my.oschina.net/leejun2005/blog/76631
PS:
如果说hive类似sql的话,那pig就类似plsql存储过程了:程序编写更自由,逻辑能处理的更强大了。
pig中还能直接通过反射调用java的静态类中的方法,这块内容请参考之前的相关pig博文。
附几个HIVE UDAF链接,有兴趣的同学自己看下:
Hive UDAF和UDTF实现group by后获取top值 http://blog.csdn.net/liuzhoulong/article/details/7789183
hive中自定义函数(UDAF)实现多行字符串拼接为一行 http://blog.sina.com.cn/s/blog_6ff05a2c0100tjw4.html
编写Hive UDAF http://www.fuzhijie.me/?p=118
Hive UDAF开发 http://richiehu.blog.51cto.com/2093113/386113
(责任编辑:IT)
问题: 有如下数据文件 city.txt (id, city, value)
cat city.txt 1、这是实际业务中经常会遇到的 group TopK 问题,下面来看看 pig 如何解决: a = load '/data/city.txt' using PigStorage(' ') as (id:chararray, city:chararray, value:int); b = group a by city; c = foreach b {c1=order a by value desc; c2=limit c1 2; generate group,c2.value;}; d = stream c through `sed 's/[(){}]//g'`; dump d; (bj,600,300) (sh,900,400) (wh,500,200) a = load '/data/city.txt' using PigStorage(' ') as (id:chararray, city:chararray, value:int); b = group a by city; c = foreach b {c1=order a by value desc; generate group,c1.value;}; d = stream c through `sed 's/[(){}]//g'`; dump d; (bj,600,300,100) (sh,900,400,200) (wh,500,200,100) 2、下面我们再来看看hive如何处理group topk的问题: 本质上HSQL和sql有很多相同的地方,但HSQL目前功能还有很多缺失,至少不如原生态的SQL功能强大, 比起PIG也有些差距,如果SQL中这类分组topk的问题如何解决呢? select * from city a where 2>(select count(1) from city where cname=a.cname and value>a.value) distribute by a.cname sort by a.cname,a.value desc;http://my.oschina.net/leejun2005/blog/78904 但是这种写法在HQL中直接报语法错误了,下面我们只能用hive udf的思路来解决了: 排序city和value,然后对city计数,最后where过滤掉city列计数器大于k的行即可。 好了,上代码: (1)定义UDF: package com.example.hive.udf; import org.apache.hadoop.hive.ql.exec.UDF; public final class Rank extends UDF{ private int counter; private String last_key; public int evaluate(final String key){ if ( !key.equalsIgnoreCase(this.last_key) ) { this.counter = 0; this.last_key = key; } return this.counter++; } } add jar Rank.jar; create temporary function rank as 'com.example.hive.udf.Rank'; create table city(id int,cname string,value int) row format delimited fields terminated by ' '; LOAD DATA LOCAL INPATH 'city.txt' OVERWRITE INTO TABLE city; select cname, value from ( select cname,rank(cname) csum,value from ( select id, cname, value from city distribute by cname sort by cname,value desc )a )b where csum < 2; (3)结果:
bj 600 bj 300 sh 900 sh 400 wh 500 wh 200
可以看到,hive相比pig来说,处理起来稍微复杂了点,但随着hive的日渐完善,以后比pig更简洁也说不定。
REF:hive中分组取前N个值的实现 http://baiyunl.iteye.com/blog/1466343
3、最后我们来看一下原生态的MR:
import java.io.IOException; import java.util.TreeSet; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; public class GroupTopK { // 这个 MR 将会取得每组年龄中 id 最大的前 3 个 // 测试数据由脚本生成:http://my.oschina.net/leejun2005/blog/76631 public static class GroupTopKMapper extends Mapper<LongWritable, Text, IntWritable, LongWritable> { IntWritable outKey = new IntWritable(); LongWritable outValue = new LongWritable(); String[] valArr = null; public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { valArr = value.toString().split("\t"); outKey.set(Integer.parseInt(valArr[2]));// age int outValue.set(Long.parseLong(valArr[0]));// id long context.write(outKey, outValue); } } public static class GroupTopKReducer extends Reducer<IntWritable, LongWritable, IntWritable, LongWritable> { LongWritable outValue = new LongWritable(); public void reduce(IntWritable key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException { TreeSet<Long> idTreeSet = new TreeSet<Long>(); for (LongWritable val : values) { idTreeSet.add(val.get()); if (idTreeSet.size() > 3) { idTreeSet.remove(idTreeSet.first()); } } for (Long id : idTreeSet) { outValue.set(id); context.write(key, outValue); } } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); String[] otherArgs = new GenericOptionsParser(conf, args) .getRemainingArgs(); System.out.println(otherArgs.length); System.out.println(otherArgs[0]); System.out.println(otherArgs[1]); if (otherArgs.length != 3) { System.err.println("Usage: GroupTopK <in> <out>"); System.exit(2); } Job job = new Job(conf, "GroupTopK"); job.setJarByClass(GroupTopK.class); job.setMapperClass(GroupTopKMapper.class); job.setReducerClass(GroupTopKReducer.class); job.setNumReduceTasks(1); job.setOutputKeyClass(IntWritable.class); job.setOutputValueClass(LongWritable.class); FileInputFormat.addInputPath(job, new Path(otherArgs[1])); FileOutputFormat.setOutputPath(job, new Path(otherArgs[2])); System.exit(job.waitForCompletion(true) ? 0 : 1); } } hadoop jar GroupTopK.jar GroupTopK /tmp/decli/record_new.txt /tmp/1 结果:
hadoop fs -cat /tmp/1/part-r-00000 ...... 数据验证:
awk '$3==0{print $1}' record_new.txt|sort -nr|head -3 可以看到结果没有问题。 注:测试数据由以下脚本生成:
http://my.oschina.net/leejun2005/blog/76631
PS: 如果说hive类似sql的话,那pig就类似plsql存储过程了:程序编写更自由,逻辑能处理的更强大了。 pig中还能直接通过反射调用java的静态类中的方法,这块内容请参考之前的相关pig博文。 附几个HIVE UDAF链接,有兴趣的同学自己看下:
Hive UDAF和UDTF实现group by后获取top值 http://blog.csdn.net/liuzhoulong/article/details/7789183 (责任编辑:IT) |