最近考虑到这样一个需求: 需要把原始的日志文件用hadoop做清洗后,按业务线输出到不同的目录下去,以供不同的部门业务线使用。 这个需求需要用到MultipleOutputFormat和MultipleOutputs来实现自定义多目录、文件的输出。 需要注意的是,在hadoop 0.21.x之前和之后的使用方式是不一样的: hadoop 0.21 之前的API 中有 org.apache.hadoop.mapred.lib.MultipleOutputFormat 和 org.apache.hadoop.mapred.lib.MultipleOutputs,而到了 0.21 之后 的API为 org.apache.hadoop.mapreduce.lib.output.MultipleOutputs , 新版的API 整合了上面旧API两个的功能,没有了MultipleOutputFormat。 本文将给出新旧两个版本的API code。 1、旧版0.21.x之前的版本: import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.FileInputFormat; import org.apache.hadoop.mapred.FileOutputFormat; import org.apache.hadoop.mapred.JobClient; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.MapReduceBase; import org.apache.hadoop.mapred.Mapper; import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.mapred.TextInputFormat; import org.apache.hadoop.mapred.lib.MultipleTextOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; public class MultiFile extends Configured implements Tool { public static class MapClass extends MapReduceBase implements Mapper<LongWritable, Text, NullWritable, Text> { @Override public void map(LongWritable key, Text value, OutputCollector<NullWritable, Text> output, Reporter reporter) throws IOException { output.collect(NullWritable.get(), value); } } // MultipleTextOutputFormat 继承自MultipleOutputFormat,实现输出文件的分类 public static class PartitionByCountryMTOF extends MultipleTextOutputFormat<NullWritable, Text> { // key is // NullWritable, // value is Text protected String generateFileNameForKeyValue(NullWritable key, Text value, String filename) { String[] arr = value.toString().split(",", -1); String country = arr[4].substring(1, 3); // 获取country的名称 return country + "/" + filename; } } // 此处不使用reducer /* * public static class Reducer extends MapReduceBase implements * org.apache.hadoop.mapred.Reducer<LongWritable, Text, NullWritable, Text> * { * * @Override public void reduce(LongWritable key, Iterator<Text> values, * OutputCollector<NullWritable, Text> output, Reporter reporter) throws * IOException { // TODO Auto-generated method stub * * } * * } */ @Override public int run(String[] args) throws Exception { Configuration conf = getConf(); JobConf job = new JobConf(conf, MultiFile.class); Path in = new Path(args[0]); Path out = new Path(args[1]); FileInputFormat.setInputPaths(job, in); FileOutputFormat.setOutputPath(job, out); job.setJobName("MultiFile"); job.setMapperClass(MapClass.class); job.setInputFormat(TextInputFormat.class); job.setOutputFormat(PartitionByCountryMTOF.class); job.setOutputKeyClass(NullWritable.class); job.setOutputValueClass(Text.class); job.setNumReduceTasks(0); JobClient.runJob(job); return 0; } public static void main(String[] args) throws Exception { int res = ToolRunner.run(new Configuration(), new MultiFile(), args); System.exit(res); } }测试数据及结果: hadoop fs -cat /tmp/multiTest.txt 5765303,1998,14046,1996,"AD","",,1,12,42,5,59,11,1,0.4545,0,0,1,67.3636,,,, 5785566,1998,14088,1996,"AD","",,1,9,441,6,69,3,0,1,,0.6667,,4.3333,,,, 5894770,1999,14354,1997,"AD","",,1,,82,5,51,4,0,1,,0.625,,7.5,,,, 5765303,1998,14046,1996,"CN","",,1,12,42,5,59,11,1,0.4545,0,0,1,67.3636,,,, 5785566,1998,14088,1996,"CN","",,1,9,441,6,69,3,0,1,,0.6667,,4.3333,,,, 5894770,1999,14354,1997,"CN","",,1,,82,5,51,4,0,1,,0.625,,7.5,,,,
from: MultipleOutputFormat Example http://mazd1002.blog.163.com/blog/static/665749652011102553947492/
2、新版0.21.x及之后的版本: public class TestwithMultipleOutputs extends Configured implements Tool { public static class MapClass extends Mapper<LongWritable,Text,Text,IntWritable> { private MultipleOutputs<Text,IntWritable> mos; protected void setup(Context context) throws IOException,InterruptedException { mos = new MultipleOutputs<Text,IntWritable>(context); } public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException{ String line = value.toString(); String[] tokens = line.split("-"); mos.write("MOSInt",new Text(tokens[0]), new IntWritable(Integer.parseInt(tokens[1]))); //(第一处) mos.write("MOSText", new Text(tokens[0]),tokens[2]); //(第二处) mos.write("MOSText", new Text(tokens[0]),line,tokens[0]+"/"); //(第三处)同时也可写到指定的文件或文件夹中 } protected void cleanup(Context context) throws IOException,InterruptedException { mos.close(); } } public int run(String[] args) throws Exception { Configuration conf = getConf(); Job job = new Job(conf,"word count with MultipleOutputs"); job.setJarByClass(TestwithMultipleOutputs.class); Path in = new Path(args[0]); Path out = new Path(args[1]); FileInputFormat.setInputPaths(job, in); FileOutputFormat.setOutputPath(job, out); job.setMapperClass(MapClass.class); job.setNumReduceTasks(0); MultipleOutputs.addNamedOutput(job,"MOSInt",TextOutputFormat.class,Text.class,IntWritable.class); MultipleOutputs.addNamedOutput(job,"MOSText",TextOutputFormat.class,Text.class,Text.class); System.exit(job.waitForCompletion(true)?0:1); return 0; } public static void main(String[] args) throws Exception { int res = ToolRunner.run(new Configuration(), new TestwithMultipleOutputs(), args); System.exit(res); } } 测试的数据:
abc-1232-hdf 结果截图:(结果输出到/test/testMOSout)
PS:遇到的一个问题: 如果没有mos.close(), 程序运行中会出现异常: 12/05/21 20:12:47 WARN hdfs.DFSClient: DataStreamer Exception:
org.apache.hadoop.ipc.RemoteException:org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException: No lease on from: MultipleOutputFormat和MultipleOutputshttp://www.cnblogs.com/liangzh/archive/2012/05/22/2512264.html Hadoop利用Partitioner对输出文件分类(改写partition,路由到指定的文件中) http://superlxw1234.iteye.com/blog/1495465 http://ghost-face.iteye.com/blog/1869926 更多参考&推荐阅读: 1、【Hadoop】利用MultipleOutputs,MultiOutputFormat实现以不同格式输出到多个文件 http://www.cnblogs.com/iDonal/archive/2012/08/07/2626588.html 2、cdh3u3 hadoop 0.20.2 MultipleOutputs 多输出文件初探 http://my.oschina.net/wangjiankui/blog/49521 3、使用MultipleOutputs http://blog.163.com/ecy_fu/blog/static/444512620101274344951/ 4、Hadoop reduce多个输出 http://blog.csdn.net/inte_sleeper/article/details/7042020 5、Hadoop 0.20.2中怎么使用MultipleOutputFormat实现多文件输出和完全自定义文件名 http://www.cnblogs.com/flying5/archive/2011/05/04/2078407.html 6、Hadoop OutputFormat浅析 http://zhb-mccoy.iteye.com/blog/1591635 7、others:
https://sites.google.com/site/hadoopandhive/home/how-to-write-output-to-multiple-named-files-in-hadoop-using-multipletextoutputformat 8、MultipleOutputs 官方范例 http://hadoop.apache.org/docs/stable/api/org/apache/hadoop/mapreduce/lib/output/MultipleOutputs.html 9、多数据源输入:MultipleInputs https://groups.google.com/forum/#!topic/nosql-databases/SH61smOV-mo http://bigdataprocessing.wordpress.com/2012/07/27/hadoop-hbase-mapreduce-examples/ http://hbase.apache.org/book/mapreduce.example.html 10、Hadoop多文件输出:MultipleOutputFormat和MultipleOutputs深究 http://www.iteblog.com/archives/842 (一) http://www.iteblog.com/archives/848 (二) (责任编辑:IT) |