Hadoop DistributedCache分布式缓存的使用
时间:2016-05-21 14:35 来源:linux.it.net.cn 作者:IT
做项目的时候遇到一个问题,在Mapper和Reducer方法中处理目标数据时,先要去检索和匹配一个已存在的标签库,再对所处理的字段打标签。因为标签库不是很大,没必要用HBase。我的实现方法是把标签库存储成HDFS上的文件,用分布式缓存存储,这样让每个slave都能读取到这个文件。
main方法中的配置:
//分布式缓存要存储的文件路径
String cachePath[] = {
"hdfs://10.105.32.57:8020/user/ad-data/tag/tag-set.csv",
"hdfs://10.105.32.57:8020/user/ad-data/tag/TagedUrl.csv"
};
//向分布式缓存中添加文件
job.addCacheFile(new Path(cachePath[0]).toUri());
job.addCacheFile(new Path(cachePath[1]).toUri());
参考上面代码即可向分布式缓存中添加文件。
在Mapper和Reducer方法中读取分布式缓存文件:
/*
* 重写Mapper的setup方法,获取分布式缓存中的文件
*/
@Override
protected void setup(Mapper<LongWritable, Text, Text, Text>.Context context)
throws IOException, InterruptedException {
// TODO Auto-generated method stub
super.setup(context);
URI[] cacheFile = context.getCacheFiles();
Path tagSetPath = new Path(cacheFile[0]);
Path tagedUrlPath = new Path(cacheFile[1]);
文件操作(如把内容读到set或map中);
}
@Override
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
在map()中使用读取出的数据;
}
同样,如果在Reducer中也要读取分布式缓存文件,示例如下:
/*
* 重写Reducer的setup方法,获取分布式缓存中的文件
*/
@Override
protected void setup(Context context)
throws IOException, InterruptedException {
super.setup(context);
mos = new MultipleOutputs<Text, Text>(context);
URI[] cacheFile = context.getCacheFiles();
Path tagSetPath = new Path(cacheFile[0]);
Path tagSetPath = new Path(cacheFile[1]);
文件读取操作;
}
@Override
public void reduce(Text key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
while(values.iterator().hasNext()){
使用读取出的数据;
}
context.write(key, new Text(sb.toString()));
}
(责任编辑:IT)
做项目的时候遇到一个问题,在Mapper和Reducer方法中处理目标数据时,先要去检索和匹配一个已存在的标签库,再对所处理的字段打标签。因为标签库不是很大,没必要用HBase。我的实现方法是把标签库存储成HDFS上的文件,用分布式缓存存储,这样让每个slave都能读取到这个文件。 main方法中的配置: //分布式缓存要存储的文件路径 String cachePath[] = { "hdfs://10.105.32.57:8020/user/ad-data/tag/tag-set.csv", "hdfs://10.105.32.57:8020/user/ad-data/tag/TagedUrl.csv" }; //向分布式缓存中添加文件 job.addCacheFile(new Path(cachePath[0]).toUri()); job.addCacheFile(new Path(cachePath[1]).toUri());
参考上面代码即可向分布式缓存中添加文件。 在Mapper和Reducer方法中读取分布式缓存文件: /* * 重写Mapper的setup方法,获取分布式缓存中的文件 */ @Override protected void setup(Mapper<LongWritable, Text, Text, Text>.Context context) throws IOException, InterruptedException { // TODO Auto-generated method stub super.setup(context); URI[] cacheFile = context.getCacheFiles(); Path tagSetPath = new Path(cacheFile[0]); Path tagedUrlPath = new Path(cacheFile[1]); 文件操作(如把内容读到set或map中); } @Override public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { 在map()中使用读取出的数据; } 同样,如果在Reducer中也要读取分布式缓存文件,示例如下: /* * 重写Reducer的setup方法,获取分布式缓存中的文件 */ @Override protected void setup(Context context) throws IOException, InterruptedException { super.setup(context); mos = new MultipleOutputs<Text, Text>(context); URI[] cacheFile = context.getCacheFiles(); Path tagSetPath = new Path(cacheFile[0]); Path tagSetPath = new Path(cacheFile[1]); 文件读取操作; } @Override public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { while(values.iterator().hasNext()){ 使用读取出的数据; } context.write(key, new Text(sb.toString())); } (责任编辑:IT) |