public class Single {
private static class SingleMapper extends
Mapper<LongWritable, Text, Text, Text> {
@Override
protected void map(LongWritable key, Text value,
Mapper<LongWritable, Text, Text, Text>.Context context)
throws IOException, InterruptedException {
String string = value.toString();
if (!string.contains("child")) {
String[] strings = string.split(" ");
context.write(new Text(strings[0]), new Text(strings[1] + ":1"));
context.write(new Text(strings[1]), new Text(strings[0] + ":2"));
}
}
}
// reduce是执行key的次数 private static class SingleReduce extends Reducer<Text, Text, Text, Text> {
@Override
protected void reduce(Text key, Iterable<Text> values,
Reducer<Text, Text, Text, Text>.Context context)
throws IOException, InterruptedException {
List<String> left = Lists.newArrayList();
List<String> right = Lists.newArrayList();
for (Text value : values) {
String[] strings = value.toString().split(":");
if (strings[1].equals("1")) {
right.add(strings[0]);
} else {
left.add(strings[0]);
}
}
for (String lef : left) {
for (String rig : right) {
context.write(new Text(lef), new Text(rig));
}
}
}
}
public static void main(String[] args) throws IOException,
ClassNotFoundException, InterruptedException {
Configuration configuration = HadoopConfig.getConfiguration();
Job job = Job.getInstance(configuration, "单表连接");
job.setJarByClass(Sort.class);
job.setMapperClass(SingleMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setReducerClass(SingleReduce.class);
FileInputFormat.addInputPath(job, new Path("/data"));
FileOutputFormat.setOutputPath(job, new Path("/single"));
job.waitForCompletion(true);
}