|
1、BloomFilter能解决什么问题? 以少量的内存空间判断一个元素是否属于这个集合, 代价是有一定的错误率 2、工作原理 1. 初始化一个数组, 所有位标为0, A={x1, x2, x3,…,xm} (x1, x2, x3,…,xm 初始为0) 2. 将已知集合S中的每一个数组, 按以下方式映射到A中 2.0 选取n个互相独立的hash函数 h1, h2, … hk 2.1 将元素通过以上hash函数得到一组索引值 h1(xi), h2(xi),…,hk(xi) 2.2 将集合A中的上述索引值标记为1(如果不同元素有重复, 则重复覆盖为1, 这是一个觅等操作) 3. 对于一个元素x, 将其根据2.0中选取的hash函数, 进行hash, 得到一组索引值 h1(x), h2(x), …,hk(x) 如果集合A中的这些索引位置上的值都是1, 表示这个元素属于集合S, 否则则不属于S 举例说明: 建立一个容量为500万的Bit Array结构(Bit Array的大小和keyword的数量决定了误判的几率),将集合中的每个keyword通过32个hash函数分别计算出32个数字,然后对这32个数字分别用500万取模,然后将Bit Array中对应的位置为1,我们将其称为特征值。简单的说就是将每个keyword对应到Bit Array中的32个位置上,见下图:
当需要快速查找某个keyword时,只要将其通过同样的32个hash函数运算,然后映射到Bit Array中的对应位,如果Bit Array中的对应位全部是1,那么说明该keyword匹配成功(会有误判的几率)。
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.zip.GZIPInputStream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.util.bloom.BloomFilter;
import org.apache.hadoop.util.bloom.Key;
import org.apache.hadoop.util.hash.Hash;
public class TrainingBloomfilter {
public static int getOptimalBloomFilterSize(int numRecords,
float falsePosRate) {
int size = (int) (-numRecords * (float) Math.log(falsePosRate) / Math
.pow(Math.log(2), 2));
return size;
}
public static int getOptimalK(float numMembers, float vectorSize) {
return (int) Math.round(vectorSize / numMembers * Math.log(2));
}
public static void main(String[] args) throws IOException {
Path inputFile = new Path("/tmp/decli/user1.txt");
int numMembers = Integer.parseInt("10");
float falsePosRate = Float.parseFloat("0.01");
Path bfFile = new Path("/tmp/decli/bloom.bin");
// Calculate our vector size and optimal K value based on approximations
int vectorSize = getOptimalBloomFilterSize(numMembers, falsePosRate);
int nbHash = getOptimalK(numMembers, vectorSize);
// create new Bloom filter
BloomFilter filter = new BloomFilter(vectorSize, nbHash,
Hash.MURMUR_HASH);
// Open file for read
System.out.println("Training Bloom filter of size " + vectorSize
+ " with " + nbHash + " hash functions, " + numMembers
+ " approximate number of records, and " + falsePosRate
+ " false positive rate");
String line = null;
int numRecords = 0;
FileSystem fs = FileSystem.get(new Configuration());
for (FileStatus status : fs.listStatus(inputFile)) {
BufferedReader rdr;
// if file is gzipped, wrap it in a GZIPInputStream
if (status.getPath().getName().endsWith(".gz")) {
rdr = new BufferedReader(new InputStreamReader(
new GZIPInputStream(fs.open(status.getPath()))));
} else {
rdr = new BufferedReader(new InputStreamReader(fs.open(status
.getPath())));
}
System.out.println("Reading " + status.getPath());
while ((line = rdr.readLine()) != null) {
filter.add(new Key(line.getBytes()));
++numRecords;
}
rdr.close();
}
System.out.println("Trained Bloom filter with " + numRecords
+ " entries.");
System.out.println("Serializing Bloom filter to HDFS at " + bfFile);
FSDataOutputStream strm = fs.create(bfFile);
filter.write(strm);
strm.flush();
strm.close();
System.out.println("Done training Bloom filter.");
}
}
import java.io.BufferedReader;
import java.io.DataInputStream;
import java.io.FileInputStream;
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.bloom.BloomFilter;
import org.apache.hadoop.util.bloom.Key;
public class BloomFilteringDriver {
public static class BloomFilteringMapper extends
Mapper<Object, Text, Text, NullWritable> {
private BloomFilter filter = new BloomFilter();
@Override
protected void setup(Context context) throws IOException,
InterruptedException {
BufferedReader in = null;
try {
// 从当前作业中获取要缓存的文件
Path[] paths = DistributedCache.getLocalCacheFiles(context
.getConfiguration());
for (Path path : paths) {
if (path.toString().contains("bloom.bin")) {
DataInputStream strm = new DataInputStream(
new FileInputStream(path.toString()));
// Read into our Bloom filter.
filter.readFields(strm);
strm.close();
}
}
} catch (IOException e) {
e.printStackTrace();
} finally {
try {
if (in != null) {
in.close();
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
@Override
public void map(Object key, Text value, Context context)
throws IOException, InterruptedException {
// Get the value for the comment
String comment = value.toString();
// If it is null, skip this record
if (comment == null || comment.isEmpty()) {
return;
}
StringTokenizer tokenizer = new StringTokenizer(comment);
// For each word in the comment
while (tokenizer.hasMoreTokens()) {
// Clean up the words
String cleanWord = tokenizer.nextToken().replaceAll("'", "")
.replaceAll("[^a-zA-Z]", " ");
// If the word is in the filter, output it and break
if (cleanWord.length() > 0
&& filter.membershipTest(new Key(cleanWord.getBytes()))) {
context.write(new Text(cleanWord), NullWritable.get());
// break;
}
}
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args)
.getRemainingArgs();
System.out.println("================ " + otherArgs[0]);
if (otherArgs.length != 3) {
System.err.println("Usage: BloomFiltering <in> <out>");
System.exit(1);
}
FileSystem.get(conf).delete(new Path(otherArgs[2]), true);
Job job = new Job(conf, "TestBloomFiltering");
job.setJarByClass(BloomFilteringDriver.class);
job.setMapperClass(BloomFilteringMapper.class);
job.setNumReduceTasks(0);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
FileInputFormat.addInputPath(job, new Path(otherArgs[1]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[2]));
DistributedCache.addCacheFile(new Path("/tmp/decli/bloom.bin").toUri(),
job.getConfiguration());
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
测试文件:
user1.txt
test
test xiaowang
hadoop jar trainbloom.jar TrainingBloomfilter 结果:
root@master 192.168.120.236 ~/lijun06 > 8、关于 hadoop mapreduce join 的几种方式,请参考: http://my.oschina.net/leejun2005/blog/95186 http://my.oschina.net/leejun2005/blog/111963
9、本文参考 or 推荐阅读:
http://www.jiacheo.org/blog/304
bloom filter可以看做是对bit-map的扩展,只是 bitmap 一般只用了一个hash做映射, 具体可以参考: http://www.cnblogs.com/pangxiaodong/archive/2011/08/14/2137748.html http://kb.cnblogs.com/page/77440/ (责任编辑:IT) |

