public class ComplexInvertIndex {
private static class FileNameRecordReader extends RecordReader<Text, Text> {
LineRecordReader lineRecordReader = new LineRecordReader();
String fileName;
@Override
public void initialize(InputSplit split, TaskAttemptContext context)
throws IOException, InterruptedException {
lineRecordReader.initialize(split, context);
fileName = ((FileSplit) split).getPath().getName();
}
@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
return lineRecordReader.nextKeyValue();
}
@Override
public Text getCurrentKey() throws IOException, InterruptedException {
return new Text(fileName);
}
@Override
public Text getCurrentValue() throws IOException, InterruptedException {
return lineRecordReader.getCurrentValue();
}
@Override
public float getProgress() throws IOException, InterruptedException {
return lineRecordReader.getProgress();
}
@Override
public void close() throws IOException {
lineRecordReader.close();
}
}
private static class FileNameInputFormat extends
FileInputFormat<Text, Text> {
@Override
public RecordReader<Text, Text> createRecordReader(InputSplit split,
TaskAttemptContext context) throws IOException,
InterruptedException {
FileNameRecordReader fileNameRecordReader = new FileNameRecordReader();
fileNameRecordReader.initialize(split, context);
return fileNameRecordReader;
}
}
private static class ComplexInvertIndexMapper extends
Mapper<Text, Text, Text, IntWritable> {
@Override
protected void map(Text key, Text value,
Mapper<Text, Text, Text, IntWritable>.Context context)
throws IOException, InterruptedException {
String[] strs = value.toString().split(" ");
for (String string : strs) {
context.write(new Text( string+"#"+key.toString() ),new IntWritable(1));
}
}
}
private static class ComplexInvertIndexCombiner extends
Reducer<Text, IntWritable, Text, IntWritable> {
@Override
protected void reduce(Text key, Iterable<IntWritable> values,
Reducer<Text, IntWritable, Text, IntWritable>.Context context)
throws IOException, InterruptedException {
int sum = 0;
for (IntWritable value : values) {
sum += value.get();
}
context.write(key,new IntWritable(sum));
System.out.println(key.toString() + sum +"");
}
}
//把key的前面字段聚合,排序 private static class InvertIndexPartitioner extends
HashPartitioner<Text, IntWritable> {
@Override
public int getPartition(Text key, IntWritable value, int numReduceTasks) {
String[] strs = key.toString().split("#");
return super.getPartition(new Text(strs[0]), value, numReduceTasks);
}
}
private static class ComplexInvertIndexReduce extends
Reducer<Text, IntWritable, Text, Text> {
static Map<String, String> map = new HashMap<String, String>();
@Override
protected void reduce(Text key, Iterable<IntWritable> values,
Reducer<Text, IntWritable, Text, Text>.Context context)
throws IOException, InterruptedException {
String[] strings = key.toString().split("#");
String word = strings[0];
String doc = strings[1];
int sum = 0;
for(IntWritable value : values){
sum = sum + value.get();
}
if( map.get(word) == null ){
map.put(word," ("+doc+","+sum+") ");
}else{
map.put(word,map.get(word)+" ("+doc+","+sum+") ");
}
}
@Override
protected void cleanup(
Reducer<Text, IntWritable, Text, Text>.Context context)
throws IOException, InterruptedException {
for(String key:map.keySet()){
context.write(new Text(key), new Text(map.get(key)));
}
}
}
public static void main(String[] args)throws IOException,
ClassNotFoundException, InterruptedException{
Configuration configuration = HadoopConfig.getConfiguration();
Job job = Job.getInstance(configuration, "复杂倒排索引");
job.setJarByClass(ComplexInvertIndex.class);
job.setInputFormatClass(FileNameInputFormat.class);
job.setMapperClass(ComplexInvertIndexMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setCombinerClass(ComplexInvertIndexCombiner.class);
job.setReducerClass(ComplexInvertIndexReduce.class);
job.setPartitionerClass(InvertIndexPartitioner.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
FileInputFormat.addInputPath(job, new Path("/data"));
FileOutputFormat.setOutputPath(job, new Path("/ouputdata"));
job.waitForCompletion(true);
System.exit(job.waitForCompletion(true) ? 0 : 1);
}