public class ComplexInvertIndex {
								
									  
								
									    private static class FileNameRecordReader extends RecordReader<Text, Text> {
								
									  
								
									        LineRecordReader lineRecordReader = new LineRecordReader();
								
									        String fileName;
								
									  
								
									        @Override
								
									        public void initialize(InputSplit split, TaskAttemptContext context)
								
									                throws IOException, InterruptedException {
								
									            lineRecordReader.initialize(split, context);
								
									            fileName = ((FileSplit) split).getPath().getName();
								
									        }
								
									  
								
									        @Override
								
									        public boolean nextKeyValue() throws IOException, InterruptedException {
								
									            return lineRecordReader.nextKeyValue();
								
									        }
								
									  
								
									        @Override
								
									        public Text getCurrentKey() throws IOException, InterruptedException {
								
									            return new Text(fileName);
								
									        }
								
									  
								
									        @Override
								
									        public Text getCurrentValue() throws IOException, InterruptedException {
								
									            return lineRecordReader.getCurrentValue();
								
									        }
								
									  
								
									        @Override
								
									        public float getProgress() throws IOException, InterruptedException {
								
									            return lineRecordReader.getProgress();
								
									        }
								
									  
								
									        @Override
								
									        public void close() throws IOException {
								
									            lineRecordReader.close();
								
									        }
								
									  
								
									    }
								
									  
								
									    private static class FileNameInputFormat extends
								
									            FileInputFormat<Text, Text> {
								
									  
								
									        @Override
								
									        public RecordReader<Text, Text> createRecordReader(InputSplit split,
								
									                TaskAttemptContext context) throws IOException,
								
									                InterruptedException {
								
									            FileNameRecordReader fileNameRecordReader = new FileNameRecordReader();
								
									            fileNameRecordReader.initialize(split, context);
								
									            return fileNameRecordReader;
								
									        }
								
									  
								
									    }
								
									  
								
									    private static class ComplexInvertIndexMapper extends
								
									            Mapper<Text, Text, Text, IntWritable> {
								
									  
								
									        @Override
								
									        protected void map(Text key, Text value,
								
									                Mapper<Text, Text, Text, IntWritable>.Context context)
								
									                throws IOException, InterruptedException {
								
									  
								
									            String[] strs = value.toString().split(" ");
								
									            for (String string : strs) {
								
									                context.write(new Text( string+"#"+key.toString() ),new IntWritable(1));
								
									            }
								
									  
								
									        }
								
									  
								
									    }
								
									  
								
									    private static class ComplexInvertIndexCombiner extends
								
									            Reducer<Text, IntWritable, Text, IntWritable> {
								
									  
								
									        @Override
								
									        protected void reduce(Text key, Iterable<IntWritable> values,
								
									                Reducer<Text, IntWritable, Text, IntWritable>.Context context)
								
									                throws IOException, InterruptedException {
								
									  
								
									            int sum = 0;
								
									            for (IntWritable value : values) {
								
									                sum += value.get();
								
									            }
								
									            context.write(key,new IntWritable(sum));
								
									            System.out.println(key.toString() + sum +"");
								
									        }
								
									  
								
									    }
								
									  
								
									    //把key的前面字段聚合,排序     private static class InvertIndexPartitioner extends
								
									            HashPartitioner<Text, IntWritable> {
								
									  
								
									        @Override
								
									        public int getPartition(Text key, IntWritable value, int numReduceTasks) {
								
									            String[] strs = key.toString().split("#");
								
									            return super.getPartition(new Text(strs[0]), value, numReduceTasks);
								
									        }
								
									  
								
									    }                
								
									  
								
									    private static class ComplexInvertIndexReduce extends
								
									            Reducer<Text, IntWritable, Text, Text> {
								
									  
								
									        static Map<String, String> map = new HashMap<String, String>();
								
									          
								
									        @Override
								
									        protected void reduce(Text key, Iterable<IntWritable> values,
								
									                Reducer<Text, IntWritable, Text, Text>.Context context)
								
									                throws IOException, InterruptedException {
								
									  
								
									            String[] strings = key.toString().split("#");
								
									            String word = strings[0];
								
									            String doc = strings[1];
								
									            int sum = 0;
								
									            for(IntWritable value : values){
								
									                sum = sum + value.get();
								
									            }
								
									            if( map.get(word) == null ){
								
									                map.put(word," ("+doc+","+sum+") ");
								
									            }else{
								
									                map.put(word,map.get(word)+" ("+doc+","+sum+") ");
								
									            }
								
									             
								
									        }
								
									          
								
									        @Override
								
									        protected void cleanup(
								
									                Reducer<Text, IntWritable, Text, Text>.Context context)
								
									                throws IOException, InterruptedException {
								
									            for(String key:map.keySet()){
								
									                context.write(new Text(key), new Text(map.get(key)));
								
									            }
								
									        }
								
									  
								
									    }
								
									  
								
									    public static void main(String[] args)throws IOException,
								
									    ClassNotFoundException, InterruptedException{
								
									  
								
									        Configuration configuration = HadoopConfig.getConfiguration();
								
									        Job job = Job.getInstance(configuration, "复杂倒排索引");
								
									        job.setJarByClass(ComplexInvertIndex.class);
								
									        job.setInputFormatClass(FileNameInputFormat.class);
								
									        job.setMapperClass(ComplexInvertIndexMapper.class);
								
									        job.setMapOutputKeyClass(Text.class);
								
									        job.setMapOutputValueClass(IntWritable.class);
								
									        job.setCombinerClass(ComplexInvertIndexCombiner.class);
								
									        job.setReducerClass(ComplexInvertIndexReduce.class);
								
									        job.setPartitionerClass(InvertIndexPartitioner.class);
								
									        job.setOutputKeyClass(Text.class);
								
									        job.setOutputValueClass(Text.class);
								
									          
								
									        FileInputFormat.addInputPath(job, new Path("/data"));
								
									        FileOutputFormat.setOutputPath(job, new Path("/ouputdata"));
								
									        job.waitForCompletion(true);
								
									        System.exit(job.waitForCompletion(true) ? 0 : 1);
								
									          
								
									    }