1、BloomFilter能解决什么问题? 以少量的内存空间判断一个元素是否属于这个集合, 代价是有一定的错误率 2、工作原理 1. 初始化一个数组, 所有位标为0, A={x1, x2, x3,…,xm} (x1, x2, x3,…,xm 初始为0) 2. 将已知集合S中的每一个数组, 按以下方式映射到A中 2.0 选取n个互相独立的hash函数 h1, h2, … hk 2.1 将元素通过以上hash函数得到一组索引值 h1(xi), h2(xi),…,hk(xi) 2.2 将集合A中的上述索引值标记为1(如果不同元素有重复, 则重复覆盖为1, 这是一个觅等操作) 3. 对于一个元素x, 将其根据2.0中选取的hash函数, 进行hash, 得到一组索引值 h1(x), h2(x), …,hk(x) 如果集合A中的这些索引位置上的值都是1, 表示这个元素属于集合S, 否则则不属于S 举例说明: 建立一个容量为500万的Bit Array结构(Bit Array的大小和keyword的数量决定了误判的几率),将集合中的每个keyword通过32个hash函数分别计算出32个数字,然后对这32个数字分别用500万取模,然后将Bit Array中对应的位置为1,我们将其称为特征值。简单的说就是将每个keyword对应到Bit Array中的32个位置上,见下图:
当需要快速查找某个keyword时,只要将其通过同样的32个hash函数运算,然后映射到Bit Array中的对应位,如果Bit Array中的对应位全部是1,那么说明该keyword匹配成功(会有误判的几率)。
import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; import java.util.zip.GZIPInputStream; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.util.bloom.BloomFilter; import org.apache.hadoop.util.bloom.Key; import org.apache.hadoop.util.hash.Hash; public class TrainingBloomfilter { public static int getOptimalBloomFilterSize(int numRecords, float falsePosRate) { int size = (int) (-numRecords * (float) Math.log(falsePosRate) / Math .pow(Math.log(2), 2)); return size; } public static int getOptimalK(float numMembers, float vectorSize) { return (int) Math.round(vectorSize / numMembers * Math.log(2)); } public static void main(String[] args) throws IOException { Path inputFile = new Path("/tmp/decli/user1.txt"); int numMembers = Integer.parseInt("10"); float falsePosRate = Float.parseFloat("0.01"); Path bfFile = new Path("/tmp/decli/bloom.bin"); // Calculate our vector size and optimal K value based on approximations int vectorSize = getOptimalBloomFilterSize(numMembers, falsePosRate); int nbHash = getOptimalK(numMembers, vectorSize); // create new Bloom filter BloomFilter filter = new BloomFilter(vectorSize, nbHash, Hash.MURMUR_HASH); // Open file for read System.out.println("Training Bloom filter of size " + vectorSize + " with " + nbHash + " hash functions, " + numMembers + " approximate number of records, and " + falsePosRate + " false positive rate"); String line = null; int numRecords = 0; FileSystem fs = FileSystem.get(new Configuration()); for (FileStatus status : fs.listStatus(inputFile)) { BufferedReader rdr; // if file is gzipped, wrap it in a GZIPInputStream if (status.getPath().getName().endsWith(".gz")) { rdr = new BufferedReader(new InputStreamReader( new GZIPInputStream(fs.open(status.getPath())))); } else { rdr = new BufferedReader(new InputStreamReader(fs.open(status .getPath()))); } System.out.println("Reading " + status.getPath()); while ((line = rdr.readLine()) != null) { filter.add(new Key(line.getBytes())); ++numRecords; } rdr.close(); } System.out.println("Trained Bloom filter with " + numRecords + " entries."); System.out.println("Serializing Bloom filter to HDFS at " + bfFile); FSDataOutputStream strm = fs.create(bfFile); filter.write(strm); strm.flush(); strm.close(); System.out.println("Done training Bloom filter."); } } import java.io.BufferedReader; import java.io.DataInputStream; import java.io.FileInputStream; import java.io.IOException; import java.util.StringTokenizer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.filecache.DistributedCache; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; import org.apache.hadoop.util.bloom.BloomFilter; import org.apache.hadoop.util.bloom.Key; public class BloomFilteringDriver { public static class BloomFilteringMapper extends Mapper<Object, Text, Text, NullWritable> { private BloomFilter filter = new BloomFilter(); @Override protected void setup(Context context) throws IOException, InterruptedException { BufferedReader in = null; try { // 从当前作业中获取要缓存的文件 Path[] paths = DistributedCache.getLocalCacheFiles(context .getConfiguration()); for (Path path : paths) { if (path.toString().contains("bloom.bin")) { DataInputStream strm = new DataInputStream( new FileInputStream(path.toString())); // Read into our Bloom filter. filter.readFields(strm); strm.close(); } } } catch (IOException e) { e.printStackTrace(); } finally { try { if (in != null) { in.close(); } } catch (IOException e) { e.printStackTrace(); } } } @Override public void map(Object key, Text value, Context context) throws IOException, InterruptedException { // Get the value for the comment String comment = value.toString(); // If it is null, skip this record if (comment == null || comment.isEmpty()) { return; } StringTokenizer tokenizer = new StringTokenizer(comment); // For each word in the comment while (tokenizer.hasMoreTokens()) { // Clean up the words String cleanWord = tokenizer.nextToken().replaceAll("'", "") .replaceAll("[^a-zA-Z]", " "); // If the word is in the filter, output it and break if (cleanWord.length() > 0 && filter.membershipTest(new Key(cleanWord.getBytes()))) { context.write(new Text(cleanWord), NullWritable.get()); // break; } } } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); String[] otherArgs = new GenericOptionsParser(conf, args) .getRemainingArgs(); System.out.println("================ " + otherArgs[0]); if (otherArgs.length != 3) { System.err.println("Usage: BloomFiltering <in> <out>"); System.exit(1); } FileSystem.get(conf).delete(new Path(otherArgs[2]), true); Job job = new Job(conf, "TestBloomFiltering"); job.setJarByClass(BloomFilteringDriver.class); job.setMapperClass(BloomFilteringMapper.class); job.setNumReduceTasks(0); job.setOutputKeyClass(Text.class); job.setOutputValueClass(NullWritable.class); FileInputFormat.addInputPath(job, new Path(otherArgs[1])); FileOutputFormat.setOutputPath(job, new Path(otherArgs[2])); DistributedCache.addCacheFile(new Path("/tmp/decli/bloom.bin").toUri(), job.getConfiguration()); System.exit(job.waitForCompletion(true) ? 0 : 1); } }测试文件: user1.txt
test
test xiaowang
hadoop jar trainbloom.jar TrainingBloomfilter 结果:
root@master 192.168.120.236 ~/lijun06 > 8、关于 hadoop mapreduce join 的几种方式,请参考: http://my.oschina.net/leejun2005/blog/95186 http://my.oschina.net/leejun2005/blog/111963
9、本文参考 or 推荐阅读:
http://www.jiacheo.org/blog/304
bloom filter可以看做是对bit-map的扩展,只是 bitmap 一般只用了一个hash做映射, 具体可以参考: http://www.cnblogs.com/pangxiaodong/archive/2011/08/14/2137748.html http://kb.cnblogs.com/page/77440/ (责任编辑:IT) |