Hadoop 多表 join:map side join 范例
时间:2016-11-27 02:20 来源:linux.it.net.cn 作者:IT
在没有 pig 或者 hive 的环境下,直接在 mapreduce 中自己实现 join 是一件极其蛋疼的事情,MR中的join分为好几种,比如有最常见的 reduce side join,map side join,semi join 等。今天我们要讨论的是第 2 种:map side join,这种 join 在处理多个小表关联大表时非常有用,而 reduce join 在处理多表关联时是比较麻烦的,会造成大量的网络IO,效率低下。
1、原理:
之所以存在reduce side join,是因为在map阶段不能获取所有需要的join字段,即:同一个key对应的字段可能位于不同map中。但 Reduce side join是非常低效的,因为shuffle阶段要进行大量的数据传输。Map side join是针对以下场景进行的优化:两个待连接表中,有一个表非常大,而另一个表非常小,以至于小表可以直接存放到内存中。这样,我们可以将小表复制多份,让每个map task内存中存在一份(比如存放到hash table中),然后只扫描大表:对于大表中的每一条记录key/value,在hash table中查找是否有相同的key的记录,如果有,则连接后输出即可。为了支持文件的复制,Hadoop提供了一个类DistributedCache,使用该类的方法如下:
(1)用户使用静态方法DistributedCache.addCacheFile()指定要复制的文件,它的参数是文件的URI(如果是HDFS上的文件,可以这样:hdfs://jobtracker:50030/home/XXX/file)。JobTracker在作业启动之前会获取这个URI列表,并将相应的文件拷贝到各个TaskTracker的本地磁盘上。
(2)用户使用DistributedCache.getLocalCacheFiles()方法获取文件目录,并使用标准的文件读写API读取相应的文件。
2、环境:
本实例需要的测试文件及 hdfs 文件存放目录如下:
hadoop fs -ls /test/decli
Found 4 items
-rw-r--r-- 2 root supergroup 152 2013-03-06 02:05 /test/decli/login
drwxr-xr-x - root supergroup 0 2013-03-06 02:45 /test/decli/output
-rw-r--r-- 2 root supergroup 12 2013-03-06 02:12 /test/decli/sex
-rw-r--r-- 2 root supergroup 72 2013-03-06 02:44 /test/decli/user
测试文件内容分别为:
root@master 192.168.120.236 02:58:03 ~/test/table >
cat login # 登录表,需要判断 uid 列是否有效,并得到对应用户名、性别、访问次数
1 0 20121213
2 0 20121213
3 1 20121213
4 1 20121213
1 0 20121114
2 0 20121114
3 1 20121114
4 1 20121114
1 0 20121213
1 0 20121114
9 0 20121114
root@master 192.168.120.236 02:58:08 ~/test/table >
cat sex # 性别表
0 男
1 女
root@master 192.168.120.236 02:58:13 ~/test/table >
cat user # 用户属性表
1 张三 hubei
3 王五 tianjin
4 赵六 guangzhou
2 李四 beijing
root@master 192.168.120.236 02:58:16 ~/test/table >
测试环境 hadoop 版本:
echo $HADOOP_HOME
/work/hadoop-0.20.203.0
好了,废话少说,上代码:
3、代码:
import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class MultiTableJoin extends Configured implements Tool {
public static class MapClass extends Mapper<LongWritable, Text, Text, Text> {
// 用于缓存 sex、user 文件中的数据
private Map<String, String> userMap = new HashMap<String, String>();
private Map<String, String> sexMap = new HashMap<String, String>();
private Text oKey = new Text();
private Text oValue = new Text();
private String[] kv;
// 此方法会在map方法执行之前执行
@Override
protected void setup(Context context) throws IOException,
InterruptedException {
BufferedReader in = null;
try {
// 从当前作业中获取要缓存的文件
Path[] paths = DistributedCache.getLocalCacheFiles(context
.getConfiguration());
String uidNameAddr = null;
String sidSex = null;
for (Path path : paths) {
if (path.toString().contains("user")) {
in = new BufferedReader(new FileReader(path.toString()));
while (null != (uidNameAddr = in.readLine())) {
userMap.put(uidNameAddr.split("\t", -1)[0],
uidNameAddr.split("\t", -1)[1]);
}
} else if (path.toString().contains("sex")) {
in = new BufferedReader(new FileReader(path.toString()));
while (null != (sidSex = in.readLine())) {
sexMap.put(sidSex.split("\t", -1)[0], sidSex.split(
"\t", -1)[1]);
}
}
}
} catch (IOException e) {
e.printStackTrace();
} finally {
try {
if (in != null) {
in.close();
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
kv = value.toString().split("\t");
// map join: 在map阶段过滤掉不需要的数据
if (userMap.containsKey(kv[0]) && sexMap.containsKey(kv[1])) {
oKey.set(userMap.get(kv[0]) + "\t" + sexMap.get(kv[1]));
oValue.set("1");
context.write(oKey, oValue);
}
}
}
public static class Reduce extends Reducer<Text, Text, Text, Text> {
private Text oValue = new Text();
public void reduce(Text key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
int sumCount = 0;
for (Text val : values) {
sumCount += Integer.parseInt(val.toString());
}
oValue.set(String.valueOf(sumCount));
context.write(key, oValue);
}
}
public int run(String[] args) throws Exception {
Job job = new Job(getConf(), "MultiTableJoin");
job.setJobName("MultiTableJoin");
job.setJarByClass(MultiTableJoin.class);
job.setMapperClass(MapClass.class);
job.setReducerClass(Reduce.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
String[] otherArgs = new GenericOptionsParser(job.getConfiguration(),
args).getRemainingArgs();
// 我们把第1、2个参数的地址作为要缓存的文件路径
DistributedCache.addCacheFile(new Path(otherArgs[1]).toUri(), job
.getConfiguration());
DistributedCache.addCacheFile(new Path(otherArgs[2]).toUri(), job
.getConfiguration());
FileInputFormat.addInputPath(job, new Path(otherArgs[3]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[4]));
return job.waitForCompletion(true) ? 0 : 1;
}
public static void main(String[] args) throws Exception {
int res = ToolRunner.run(new Configuration(), new MultiTableJoin(),
args);
System.exit(res);
}
}
运行命令:
hadoop jar MultiTableJoin.jar MultiTableJoin /test/decli/sex /test/decli/user /test/decli/login /test/decli/output
4、结果:
运行结果:
root@master 192.168.120.236 02:47:18 ~/test/table >
hadoop fs -cat /test/decli/output/*|column -t
cat: File does not exist: /test/decli/output/_logs
张三 男 4
李四 男 2
王五 女 2
赵六 女 2
root@master 192.168.120.236 02:47:26 ~/test/table >
TIPS:
更多关于 hadoop mapreduce 相关 join 介绍,请参考之前的博文:
MapReduce 中的两表 join 几种方案简介
http://my.oschina.net/leejun2005/blog/95186
本例中用到了分布式缓存,关于分布式缓存的一些特性与原理,以及注意事项,
请参考:
HDFS 原理、架构与特性介绍
(责任编辑:IT)
在没有 pig 或者 hive 的环境下,直接在 mapreduce 中自己实现 join 是一件极其蛋疼的事情,MR中的join分为好几种,比如有最常见的 reduce side join,map side join,semi join 等。今天我们要讨论的是第 2 种:map side join,这种 join 在处理多个小表关联大表时非常有用,而 reduce join 在处理多表关联时是比较麻烦的,会造成大量的网络IO,效率低下。 1、原理: 之所以存在reduce side join,是因为在map阶段不能获取所有需要的join字段,即:同一个key对应的字段可能位于不同map中。但 Reduce side join是非常低效的,因为shuffle阶段要进行大量的数据传输。Map side join是针对以下场景进行的优化:两个待连接表中,有一个表非常大,而另一个表非常小,以至于小表可以直接存放到内存中。这样,我们可以将小表复制多份,让每个map task内存中存在一份(比如存放到hash table中),然后只扫描大表:对于大表中的每一条记录key/value,在hash table中查找是否有相同的key的记录,如果有,则连接后输出即可。为了支持文件的复制,Hadoop提供了一个类DistributedCache,使用该类的方法如下: (1)用户使用静态方法DistributedCache.addCacheFile()指定要复制的文件,它的参数是文件的URI(如果是HDFS上的文件,可以这样:hdfs://jobtracker:50030/home/XXX/file)。JobTracker在作业启动之前会获取这个URI列表,并将相应的文件拷贝到各个TaskTracker的本地磁盘上。 (2)用户使用DistributedCache.getLocalCacheFiles()方法获取文件目录,并使用标准的文件读写API读取相应的文件。 2、环境: 本实例需要的测试文件及 hdfs 文件存放目录如下:
hadoop fs -ls /test/decli 测试文件内容分别为:
root@master 192.168.120.236 02:58:03 ~/test/table > 测试环境 hadoop 版本:
echo $HADOOP_HOME /work/hadoop-0.20.203.0 好了,废话少说,上代码: 3、代码:
import java.io.BufferedReader; import java.io.FileReader; import java.io.IOException; import java.util.HashMap; import java.util.Map; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.filecache.DistributedCache; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; public class MultiTableJoin extends Configured implements Tool { public static class MapClass extends Mapper<LongWritable, Text, Text, Text> { // 用于缓存 sex、user 文件中的数据 private Map<String, String> userMap = new HashMap<String, String>(); private Map<String, String> sexMap = new HashMap<String, String>(); private Text oKey = new Text(); private Text oValue = new Text(); private String[] kv; // 此方法会在map方法执行之前执行 @Override protected void setup(Context context) throws IOException, InterruptedException { BufferedReader in = null; try { // 从当前作业中获取要缓存的文件 Path[] paths = DistributedCache.getLocalCacheFiles(context .getConfiguration()); String uidNameAddr = null; String sidSex = null; for (Path path : paths) { if (path.toString().contains("user")) { in = new BufferedReader(new FileReader(path.toString())); while (null != (uidNameAddr = in.readLine())) { userMap.put(uidNameAddr.split("\t", -1)[0], uidNameAddr.split("\t", -1)[1]); } } else if (path.toString().contains("sex")) { in = new BufferedReader(new FileReader(path.toString())); while (null != (sidSex = in.readLine())) { sexMap.put(sidSex.split("\t", -1)[0], sidSex.split( "\t", -1)[1]); } } } } catch (IOException e) { e.printStackTrace(); } finally { try { if (in != null) { in.close(); } } catch (IOException e) { e.printStackTrace(); } } } public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { kv = value.toString().split("\t"); // map join: 在map阶段过滤掉不需要的数据 if (userMap.containsKey(kv[0]) && sexMap.containsKey(kv[1])) { oKey.set(userMap.get(kv[0]) + "\t" + sexMap.get(kv[1])); oValue.set("1"); context.write(oKey, oValue); } } } public static class Reduce extends Reducer<Text, Text, Text, Text> { private Text oValue = new Text(); public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { int sumCount = 0; for (Text val : values) { sumCount += Integer.parseInt(val.toString()); } oValue.set(String.valueOf(sumCount)); context.write(key, oValue); } } public int run(String[] args) throws Exception { Job job = new Job(getConf(), "MultiTableJoin"); job.setJobName("MultiTableJoin"); job.setJarByClass(MultiTableJoin.class); job.setMapperClass(MapClass.class); job.setReducerClass(Reduce.class); job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); String[] otherArgs = new GenericOptionsParser(job.getConfiguration(), args).getRemainingArgs(); // 我们把第1、2个参数的地址作为要缓存的文件路径 DistributedCache.addCacheFile(new Path(otherArgs[1]).toUri(), job .getConfiguration()); DistributedCache.addCacheFile(new Path(otherArgs[2]).toUri(), job .getConfiguration()); FileInputFormat.addInputPath(job, new Path(otherArgs[3])); FileOutputFormat.setOutputPath(job, new Path(otherArgs[4])); return job.waitForCompletion(true) ? 0 : 1; } public static void main(String[] args) throws Exception { int res = ToolRunner.run(new Configuration(), new MultiTableJoin(), args); System.exit(res); } } 运行命令:
hadoop jar MultiTableJoin.jar MultiTableJoin /test/decli/sex /test/decli/user /test/decli/login /test/decli/output 4、结果: 运行结果:
root@master 192.168.120.236 02:47:18 ~/test/table > TIPS: 更多关于 hadoop mapreduce 相关 join 介绍,请参考之前的博文: MapReduce 中的两表 join 几种方案简介 http://my.oschina.net/leejun2005/blog/95186 本例中用到了分布式缓存,关于分布式缓存的一些特性与原理,以及注意事项, 请参考: HDFS 原理、架构与特性介绍 (责任编辑:IT) |