Hadoop:The Definitive Guid 总结 Chapter 8 MapReduce的特性
时间:2014-12-30 23:28 来源:linux.it.net.cn 作者:IT
1.计数器
计数器是一种收集Job统计的有效手段,用于质量控制或应用级统计。计数器的应用使得获取统计数据比使用日志文件获取数据更加容易。
1).内置计数器
Hadoop的内置计数器用来描述Job的各项指标,例如已处理的字节数和记录数,输入数据量和输出数据量。
内置计数器被分为几类(group):
实际上每类都包含Task计数器和Job计数器
A.Task计数器
Task计数器用来收集整个执行过程中Task所生成的信息,这些结果的通过Job里面所有Task聚集(aggregate)统计出来,例如:MAP_INPUT_RECORDS是用来统计所有map读入的记录数目,Take计数器功能同样也被Task Attempt支持,所生成的信息经过TaskTracker传送到JobTracker(注意,在Yarn中信息流的不同)。下面显示部分内置的Task计数器:
B.Job计数器
JobTracker和Yarn中的AM都支持Job计数器功能,所以所生成的信息不用在hadoop网络中传输。Job计数器的数据是Job级别的,不会随着Task运行变化,例如:TOTAL_LAUNCHED_MAPS统计的是整个Job运行过程被启动的map的个数。
下图是Job计数器
2).用户定义Java计数器
MapReduce允许用户编写数量不限的全局自定义计数器,在mapper和reducer中都可以编写,这些计数器由一个Java枚举(enum)类型来定义,以便对计数器分组。MapReduce将在Job结束时候统计所有mapper和reducer中的计数器,并生成结果,以下面的气温统计程序为例,这个程序实现了同时统计气温最高值、气温缺失值的记录和不规范的字段和质量代码:
public class MaxTemperatureWithCounters extends Configured implements Tool { enum Temperature {
MISSING, MALFORMED
} static class MaxTemperatureMapperWithCounters extends Mapper<LongWritable, Text, Text, IntWritable> { private NcdcRecordParser parser = new NcdcRecordParser();
@Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
parser.parse(value); if (parser.isValidTemperature()) { int airTemperature = parser.getAirTemperature();
context.write(new Text(parser.getYear()), new IntWritable(
airTemperature));
} else if (parser.isMalformedTemperature()) {
System.err.println("Ignoring possibly corrupt input: " + value);
context.getCounter(Temperature.MALFORMED).increment(1);
} else if (parser.isMissingTemperature()) {
context.getCounter(Temperature.MISSING).increment(1);
} // dynamic counter context.getCounter("TemperatureQuality", parser.getQuality())
.increment(1);
}
}
@Override public int run(String[] args) throws Exception {
Job job = JobBuilder.parseInputAndOutput(this, getConf(), args); if (job == null) { return -1;
}
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
job.setMapperClass(MaxTemperatureMapperWithCounters.class);
job.setCombinerClass(MaxTemperatureReducer.class);
job.setReducerClass(MaxTemperatureReducer.class); return job.waitForCompletion(true) ? 0 : 1;
} public static void main(String[] args) throws Exception { int exitCode = ToolRunner.run(new MaxTemperatureWithCounters(), args);
System.exit(exitCode);
}
}
A.动态计数器
在上面的实例程序中还使用到了动态计数器,这种计数器不由Java枚举类型定义,在上面的例子用到Reporter对象的incrCounter()方法有两个String类型的输入参数,分别代表数组和计数器名称:
public void incrCounter(String group, String counter, long amount);
其实相比之下,枚举类型易于使用而且相对安全,适合大多数Job使用。
B.计数器名称的命名
Hadoop提供了资源捆绑的方法来修改计数器的现实名称,使计数器的名称具有可读性,也可以创建以Java枚举类型一个属性文件(用下划线分割嵌套类型),并且将其与包含该枚举类型的顶级类放在一个目录,然后用其属性值来控制计数器的显示名称,例如属性文件MaxTemperatureWithCounters_Temperature.properties的内容如下:
CounterGroupName=Air Temperature Records
MISSING.name=Missing
MALFORMED.name=Malformed
C.获取计数器
Hadoop中,用户也可以使用Java API获取计数器的值,Java API支持在Job运行期间就能获取计数器的值如下例,统计气温信息缺失记录所占的比例:
import org.apache.hadoop.conf.Configured; import org.apache.hadoop.mapred.*; import org.apache.hadoop.util.*; public class MissingTemperatureFields extends Configured implements Tool {
@Override public int run(String[] args) throws Exception { if (args.length != 1) {
JobBuilder.printUsage(this, "<job ID>"); return -1;
}
String jobID = args[0];
JobClient jobClient = new JobClient(new JobConf(getConf()));
RunningJob job = jobClient.getJob(JobID.forName(jobID)); if (job == null) {
System.err.printf("No job with ID %s found.\n", jobID); return -1;
} if (!job.isComplete()) {
System.err.printf("Job %s is not complete.\n", jobID); return -1;
}
Counters counters = job.getCounters(); long missing = counters
.getCounter(MaxTemperatureWithCounters.Temperature.MISSING); long total = counters.getCounter(Task.Counter.MAP_INPUT_RECORDS);
System.out.printf("Records with missing temperature fields: %.2f%%\n", 100.0 * missing / total); return 0;
} public static void main(String[] args) throws Exception { int exitCode = ToolRunner.run(new MissingTemperatureFields(), args);
System.exit(exitCode);
}
}
在这个程序使用到的计数器Java API函数,RunningJob下的生成的Counters类下的getCounter()方法会返回一个Counters对象,其中包含了这个作业的所有计数器。
在新的Hadoop Java API中,Job下(而不的RunningJob)的Counters类中的findCounter()方法也返回一个Counters对象,下面程序调用findCounter()获取内置的Input Records 计数器的值,即通过组名称和计数器名称访问这个计数器。
Cluster cluster = new Cluster(getConf());
Job job = cluster.getJob(JobID.forName(jobID));
Counters counters = job.getCounters(); long missing = counters.findCounter(
MaxTemperatureWithCounters.Temperature.MISSING).getValue(); long total = counters.findCounter(TaskCounter.MAP_INPUT_RECORDS).getValue();
3).用户定义Streaming计数器
Streaming MapReduce程序通过向标准错误流发送一行特殊格式的信息来增加计数器的值,格式如下:
eporter:counter:group,counter,amount
Python向Missing计数器的值增加1的代码:
sys.stderr.write("reporter:counter:Temperature,Missing,1\n")
2.排序
尽管应用程序本身可能不需要排序的功能,但是排序仍然是MapReduce的核心技术,以下以处理天气数据来说明MapReduce的排序
1).准备
在线面代码中,用顺序文件存储数据,IntWritable键代表气温(并且正确排序),其Text值就是数据行,map创建一个块压缩的顺序文件
public class SortDataPreprocessor extends Configured implements Tool { static class CleanerMapper extends Mapper<LongWritable, Text, IntWritable, Text> { private NcdcRecordParser parser = new NcdcRecordParser();
@Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
parser.parse(value); if (parser.isValidTemperature()) {
context.write(new IntWritable(parser.getAirTemperature()),
value);
}
}
}
@Override public int run(String[] args) throws Exception {
Job job = JobBuilder.parseInputAndOutput(this, getConf(), args); if (job == null) { return -1;
}
job.setMapperClass(CleanerMapper.class);
job.setOutputKeyClass(IntWritable.class);
job.setOutputValueClass(Text.class);
job.setNumReduceTasks(0);
job.setOutputFormatClass(SequenceFileOutputFormat.class);
SequenceFileOutputFormat.setCompressOutput(job, true);
SequenceFileOutputFormat.setOutputCompressorClass(job, GzipCodec.class);
SequenceFileOutputFormat.setOutputCompressionType(job,
CompressionType.BLOCK); return job.waitForCompletion(true) ? 0 : 1;
} public static void main(String[] args) throws Exception { int exitCode = ToolRunner.run(new SortDataPreprocessor(), args);
System.exit(exitCode);
}
}
2).部分排序
下面程序利用IntWritable键对顺序文件排序
public class SortByTemperatureUsingHashPartitioner extends Configured implements Tool {
@Override public int run(String[] args) throws Exception {
Job job = JobBuilder.parseInputAndOutput(this, getConf(), args); if (job == null) { return -1;
}
job.setInputFormatClass(SequenceFileInputFormat.class);
job.setOutputKeyClass(IntWritable.class);
job.setOutputFormatClass(SequenceFileOutputFormat.class); //----------------------------------------------------------------------------- SequenceFileOutputFormat.setCompressOutput(job, true);
SequenceFileOutputFormat.setOutputCompressorClass(job, GzipCodec.class);
SequenceFileOutputFormat.setOutputCompressionType(job,
CompressionType.BLOCK); //----------------------------------------------------------------------------- return job.waitForCompletion(true) ? 0 : 1;
} public static void main(String[] args) throws Exception { int exitCode = ToolRunner.run( new SortByTemperatureUsingHashPartitioner(), args);
System.exit(exitCode);
}
}
关于控制排列顺序:
key的排序顺序是由RawComparator决定,规则如下:
A.属性mapred.output.key.comparator.class配置的话,在调用Job类下的setSortComparatorClass()方法后,使用前面属性的实例化后的类
B.否则key的类型必须是WritableComparable的子类,并且key在comparator注册以便使用
C.如果key没有在comparator注册,则只用RawComparator将key进行反序列化成一个对象,谈后由WritableComparable的compareTo()方法进行处理操作。
关于应用:基于分区的MapFile查找技术
以按key执行查找操作为例,在多个文件情况效率更高,以下面三段程序为例
MapReduce程序对一个顺序文件排序输出MapFile
public class SortByTemperatureToMapFile extends Configured implements Tool {
@Override public int run(String[] args) throws Exception {
Job job = JobBuilder.parseInputAndOutput(this, getConf(), args); if (job == null) { return -1;
}
job.setInputFormatClass(SequenceFileInputFormat.class);
job.setOutputKeyClass(IntWritable.class); //------------------------------------------------------------------------- job.setOutputFormatClass(MapFileOutputFormat.class); //------------------------------------------------------------------------- SequenceFileOutputFormat.setCompressOutput(job, true);
SequenceFileOutputFormat.setOutputCompressorClass(job, GzipCodec.class);
SequenceFileOutputFormat.setOutputCompressionType(job,
CompressionType.BLOCK); //------------------------------------------------------------------------- return job.waitForCompletion(true) ? 0 : 1;
} public static void main(String[] args) throws Exception { int exitCode = ToolRunner.run(new SortByTemperatureToMapFile(), args);
System.exit(exitCode);
}
}
从MapFile集合获取符合指定key的第一项记录,其中程序getReaders()方法打开MapFile.Reader实例,getEntry()方法使用partitioner找到包含key的Reader实例,如果getEntry()返回null,表明没有找到匹配key,否则返回匹配数据。
public class LookupRecordByTemperature extends Configured implements Tool {
@Override public int run(String[] args) throws Exception { if (args.length != 2) {
JobBuilder.printUsage(this, "<path> <key>"); return -1;
}
Path path = new Path(args[0]);
IntWritable key = new IntWritable(Integer.parseInt(args[1])); //----------------------------------------------------------------------- Reader[] readers = MapFileOutputFormat.getReaders(path, getConf());
Partitioner<IntWritable, Text> partitioner = new HashPartitioner<IntWritable, Text>();
Text val = new Text();
Writable entry = MapFileOutputFormat.getEntry(readers, partitioner,
key, val); //----------------------------------------------------------------------- if (entry == null) {
System.err.println("Key not found: " + key); return -1;
}
NcdcRecordParser parser = new NcdcRecordParser();
parser.parse(val.toString());
System.out.printf("%s\t%s\n", parser.getStationId(), parser.getYear()); return 0;
} public static void main(String[] args) throws Exception { int exitCode = ToolRunner.run(new LookupRecordByTemperature(), args);
System.exit(exitCode);
}
}
我们还可以直接用reader数组来获得包含指定key的所有记录,readers数组按分区排序,因而针对一个key的reader,均使用MapReduce Job中的同一个partitioner
找到reader之后,可通过MapFile的get()方法获取第一条包含key的记录,接着,循环调用next()获取下一条记录,知道key改变为止,下面的程序从一个MapFile集合中获取包含指定key的所有记录:
public class LookupRecordsByTemperature extends Configured implements Tool {
@Override public int run(String[] args) throws Exception { if (args.length != 2) {
JobBuilder.printUsage(this, "<path> <key>"); return -1;
}
Path path = new Path(args[0]);
IntWritable key = new IntWritable(Integer.parseInt(args[1])); //--------------------------------------------------------------------- Reader[] readers = MapFileOutputFormat.getReaders(path, getConf());
Partitioner<IntWritable, Text> partitioner = new HashPartitioner<IntWritable, Text>();
Text val = new Text();
Reader reader = readers[partitioner.getPartition(key, val,
readers.length)];
Writable entry = reader.get(key, val); if (entry == null) {
System.err.println("Key not found: " + key); return -1;
} //--------------------------------------------------------------------- NcdcRecordParser parser = new NcdcRecordParser();
IntWritable nextKey = new IntWritable(); do {
parser.parse(val.toString());
System.out.printf("%s\t%s\n", parser.getStationId(),
parser.getYear());
} while (reader.next(nextKey, val) && key.equals(nextKey)); return 0;
} public static void main(String[] args) throws Exception { int exitCode = ToolRunner.run(new LookupRecordsByTemperature(), args);
System.exit(exitCode);
}
}
3).全排序
Hadoop全局排序思路,首先,创建一系列排好序的文件;其次,串联这些文件;最后,生成一个全局排序文件,主要的思路是使用一个partitioner来描述全局排序的输出,其中关键点在于如何划分各个分区,理想情况下,各分区所含记录数应该大致相等,使Job的总运行时间不会受制于个别的reduce。
获得数据分布信息意味着可以建立一系列分布更均匀的分区,通过对key空间进行采样,就可较为均匀地划分数据集。采样的核心思想是只查一小部分key,获得key的近似分布,并由此构建分区,为了实现采样,Hadoop内置了若干采样器。
InputSampler类实现Sampler接口如下:
public interface Sampler<K, V> {
K[] getSample(InputFormat<K, V> inf, Job job) throws IOException,
InterruptedException;
}
这个借口由InputSampler类的静态方法writePartitionFile()调用,目的创建一个顺序文件来存储定义分区的key:
public static <K, V> void writePartitionFile(Job job, Sampler<K, V> sampler)throws IOException, ClassNotFoundException, InterruptedException;
顺序文件被TotalOrderPartitioner使用,为排序作业创建分区,见如下程序,该MapReduce程序利用TotalOrderPartitioner根据IntWritable key对顺序文件进行全局排序,该程序还使用了RandomSampler,以指定的采样率均匀从一个数据集中选择样本。
public class SortByTemperatureUsingTotalOrderPartitioner extends Configured implements Tool {
@Override public int run(String[] args) throws Exception {
Job job = JobBuilder.parseInputAndOutput(this, getConf(), args); if (job == null) { return -1;
}
job.setInputFormatClass(SequenceFileInputFormat.class);
job.setOutputKeyClass(IntWritable.class);
job.setOutputFormatClass(SequenceFileOutputFormat.class);
SequenceFileOutputFormat.setCompressOutput(job, true);
SequenceFileOutputFormat.setOutputCompressorClass(job, GzipCodec.class);
SequenceFileOutputFormat.setOutputCompressionType(job,
CompressionType.BLOCK);
job.setPartitionerClass(TotalOrderPartitioner.class);
InputSampler.Sampler<IntWritable, Text> sampler = new InputSampler.RandomSampler<IntWritable, Text>( 0.1, 10000, 10);
InputSampler.writePartitionFile(job, sampler); // Add to DistributedCache Configuration conf = job.getConfiguration();
String partitionFile = TotalOrderPartitioner.getPartitionFile(conf);
URI partitionUri = new URI(partitionFile + "#"
+ TotalOrderPartitioner.DEFAULT_PATH);
DistributedCache.addCacheFile(partitionUri, conf);
DistributedCache.createSymlink(conf); return job.waitForCompletion(true) ? 0 : 1;
} public static void main(String[] args) throws Exception { int exitCode = ToolRunner.run( new SortByTemperatureUsingTotalOrderPartitioner(), args);
System.exit(exitCode);
}
}
其他采样器,例如SplitSampler只采样一个分片中前n个记录,IntervalSampler以一定的间隔定期从划分中选择key,因此对于已排好序的数据来说是一个很好的选择。
4).辅助排序
MapReduce框架在记录到达reducer之前按照key对记录排序,但因为partition只保证每一个reducer接受一个key的所有记录,而在这一个分区之内,key所对应的value并没有被排序,所以当需要对值也进行排序的时候,就需要一定的技巧和方法。
下面给出对记录值的排序方法
-
定义自然key包括自然key的组合key
-
key的comparator根据组合key对记录进行排序,即同时利用自然key和自然value进行排序
-
针对组合key的partitioner和分组comparator在进行分区和分组事均只考虑自然key
示例程序,通过对key中的value(气温)进行排序来找出最高气温,程序的mapper中利用IntPair类定义了一个代表年份和气温的组合key,因为都是根据组合key去排序,无需mapper和reducer中的value,所用使用NullWritable即可,程序中用setGroupingComparatorClass()定义一个Comparator来按年份对key排序。
public class MaxTemperatureUsingSecondarySort extends Configured implements Tool { static class MaxTemperatureMapper extends Mapper<LongWritable, Text, IntPair, NullWritable> { private NcdcRecordParser parser = new NcdcRecordParser();
@Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
parser.parse(value); if (parser.isValidTemperature()) {
context.write( new IntPair(parser.getYearInt(), parser
.getAirTemperature()), NullWritable.get());
}
}
} static class MaxTemperatureReducer extends Reducer<IntPair, NullWritable, IntPair, NullWritable> {
@Override protected void reduce(IntPair key, Iterable<NullWritable> values,
Context context) throws IOException, InterruptedException {
context.write(key, NullWritable.get());
}
} public static class FirstPartitioner extends Partitioner<IntPair, NullWritable> {
@Override public int getPartition(IntPair key, NullWritable value, int numPartitions) { // multiply by 127 to perform some mixing return Math.abs(key.getFirst() * 127) % numPartitions;
}
} public static class KeyComparator extends WritableComparator { protected KeyComparator() { super(IntPair.class, true);
}
@Override public int compare(WritableComparable w1, WritableComparable w2) {
IntPair ip1 = (IntPair) w1;
IntPair ip2 = (IntPair) w2; int cmp = IntPair.compare(ip1.getFirst(), ip2.getFirst()); if (cmp != 0) { return cmp;
} return -IntPair.compare(ip1.getSecond(), ip2.getSecond()); // reverse }
} public static class GroupComparator extends WritableComparator { protected GroupComparator() { super(IntPair.class, true);
}
@Override public int compare(WritableComparable w1, WritableComparable w2) {
IntPair ip1 = (IntPair) w1;
IntPair ip2 = (IntPair) w2; return IntPair.compare(ip1.getFirst(), ip2.getFirst());
}
}
@Override public int run(String[] args) throws Exception {
Job job = JobBuilder.parseInputAndOutput(this, getConf(), args); if (job == null) { return -1;
}
job.setMapperClass(MaxTemperatureMapper.class);
job.setPartitionerClass(FirstPartitioner.class);
job.setSortComparatorClass(KeyComparator.class);
job.setGroupingComparatorClass(GroupComparator.class);
job.setReducerClass(MaxTemperatureReducer.class);
job.setOutputKeyClass(IntPair.class);
job.setOutputValueClass(NullWritable.class); return job.waitForCompletion(true) ? 0 : 1;
} public static void main(String[] args) throws Exception { int exitCode = ToolRunner.run(new MaxTemperatureUsingSecondarySort(),
args);
System.exit(exitCode);
}
}
Streaming也支持辅助排序方法,代码见权威指南。
3.连接
Hadoop支持大型数据的连接操作,操作分为两种map端连接和reduce端连接,其中reduce连接较为常用
1).map端连接
在的数据到达map函数之前,map端的连接操作在大量输入数据之间就开始进行连接操作。各个输入map的数据必须先进行分区,并且均按相同的key排序,同一个key的所有记录会放在同一分区内,例如:
map端的连接对拥有相同reducer数目,相同的key,输出文件不可分的几个Job进行连接操作。一般实现map端的连接操作要用org.apache.hadoop.mapreduce.join中的CompositeInputFormat类的连接表达式等实现,关于实现细节参见包文档。
2).reduce端连接
reduce端连接相比map不要求输入数据集符合特定结构,但是因为数据集要经过shuffle,reduce端连接的效率要低一些,基本思路:mapper为各个记录标记源,并且使用连接key作为map输出key,使key相同的记录放在同一reducer,这个过程中需要前面设计到的技术---多输入和辅助排序。
下面几段程序完整的实现了reduce端对气象站的记录进行连接操作,说明:程序中使用TextPair类构建组合key,包括气象站ID和"标记",标记是一个虚拟的字段,目的在于对记录排序,使气象站记录比天气记录先到达,简单的方法是:对于气象站记录"标记"值为0,对于天气记录“标记”值为1
A.这个mapper类用于reduce端连接中标记气象站的记录
public class JoinStationMapper extends Mapper<LongWritable, Text, TextPair, Text> { private NcdcStationMetadataParser parser = new NcdcStationMetadataParser();
@Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { if (parser.parse(value)) {
context.write(new TextPair(parser.getStationId(), "0"), new Text(
parser.getStationName()));
}
}
}
B.这个mapper类用于reduce端连接中标记天气记录
public class JoinRecordMapper extends Mapper<LongWritable, Text, TextPair, Text> { private NcdcRecordParser parser = new NcdcRecordParser();
@Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
parser.parse(value);
context.write(new TextPair(parser.getStationId(), "1"), value);
}
}
C.reducer用于连接已标记的气象站记录和天气记录
public class JoinReducer extends Reducer<TextPair, Text, Text, Text> {
@Override protected void reduce(TextPair key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
Iterator<Text> iter = values.iterator();
Text stationName = new Text(iter.next()); while (iter.hasNext()) {
Text record = iter.next();
Text outValue = new Text(stationName.toString() + "\t"
+ record.toString());
context.write(key.getFirst(), outValue);
}
}
}
D.对天气记录和气象站记录名称执行连接操作
public class JoinRecordWithStationName extends Configured implements Tool { public static class KeyPartitioner extends Partitioner<TextPair, Text> {
@Override public int getPartition(TextPair key, Text value, int numPartitions) { return (key.getFirst().hashCode() & Integer.MAX_VALUE) % numPartitions;
}
}
@Override public int run(String[] args) throws Exception { if (args.length != 3) {
JobBuilder
.printUsage(this, "<ncdc input> <station input> <output>"); return -1;
}
Job job = new Job(getConf(), "Join weather records with station names");
job.setJarByClass(getClass());
Path ncdcInputPath = new Path(args[0]);
Path stationInputPath = new Path(args[1]);
Path outputPath = new Path(args[2]);
MultipleInputs.addInputPath(job, ncdcInputPath, TextInputFormat.class,
JoinRecordMapper.class);
MultipleInputs.addInputPath(job, stationInputPath,
TextInputFormat.class, JoinStationMapper.class);
FileOutputFormat.setOutputPath(job, outputPath);
job.setPartitionerClass(KeyPartitioner.class);
job.setGroupingComparatorClass(TextPair.FirstComparator.class);
job.setMapOutputKeyClass(TextPair.class);
job.setReducerClass(JoinReducer.class);
job.setOutputKeyClass(Text.class); return job.waitForCompletion(true) ? 0 : 1;
} public static void main(String[] args) throws Exception { int exitCode = ToolRunner.run(new JoinRecordWithStationName(), args);
System.exit(exitCode);
}
}
执行效果如下:
011990-99999 SIHCCAJAVRI 0067011990999991950051507004+68750...
011990-99999 SIHCCAJAVRI 0043011990999991950051512004+68750...
011990-99999 SIHCCAJAVRI 0043011990999991950051518004+68750...
012650-99999 TYNSET-HANSMOEN 0043012650999991949032412004+62300...
012650-99999 TYNSET-HANSMOEN 0043012650999991949032418004+62300...
4.从属数据分布
从属数据(side data,权威指南翻译成"边数据")是Job所需的额外的只读数据,以辅助处理主要数据集,所要解决的是如何让所有的map和reduce高效的使用从属数据。
1).应用Job配置
(这段似乎和从属数据没太大关系)可以用Configuration下setter方法设置Job的key/value对,当需要向Task传送少量的元数据时就非常有用。
在Task运行期间,可以通过Context中的getConfiguration()方法获得所要的数据,如果要处理复杂的对象,可以选用Hadoop提供的Stringifer类,DefaultStringifier使用Hadoop的序列化框架来序列化对象
2).分布式缓存
Hadoop的分布式缓存机制能够在任务运行过程中及时地将文件和存档复制到任务节点以供使用,在一个Job中,通常只需复制各个文件到一个节点一次,以节约网络宽带。
A.工作机制
当一个Job开始运行的时候,首先Hadoop将由-files和-archives选项所指定的文件复制到TaskTracker的文件系统(一般是HDFS),然后,在Task运行之前,TaskTracker将文件从JobTracker的文件系统中复制到本地磁盘---缓存---这样Task就能访问这个文件,这样对于Task来说这个文件就已经本地化了。
TaskTracker维护一个关联缓存文件的计数器来统计缓存中的文件的使用情况,当一个Task运行时,关联Task所使用的文件的计数器会自动增加1,当Task运行完毕,这个计数器会相应的减去1,当个计数器为0的时候,表明已经没用Task使用该文件,可以从缓存中删掉
这些缓存文件放在${mapred.local.dir}/taskTracker/archive,同时这些文件以符号链的方式指向任务的工作目录,对于用户不用细究。
B.应用
使用工具GenericOptionsParser,用户可以用-file选项指定待分发文件,文件包含以逗号隔开的URL列表,使用-archives选项向自己的Task中复制存档(archives,感觉就是压缩)文件((JAR文件, ZIP文件, tar文件和gzipped tar 问价),这些文件释放到Task节点上,-libjars选项会把JAR文件添加到mapper和reducer任务的类路径上
以下为只用实例:
以下指令显示如何使用分布式缓存来共享元数据文件,以得到气象站名称:
% hadoop jar hadoop-examples.jar MaxTemperatureByStationNameUsingDistributedCacheFile \
-files input/ncdc/metadata/stations-fixed-width.txt input/ncdc/all output
MaxTemperatureByStationNameUsingDistributedCacheFile的实现:
public class MaxTemperatureByStationNameUsingDistributedCacheFile extends Configured implements Tool { static class StationTemperatureMapper extends Mapper<LongWritable, Text, Text, IntWritable> { private NcdcRecordParser parser = new NcdcRecordParser();
@Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
parser.parse(value); if (parser.isValidTemperature()) {
context.write(new Text(parser.getStationId()), new IntWritable(
parser.getAirTemperature()));
}
}
} static class MaxTemperatureReducerWithStationLookup extends Reducer<Text, IntWritable, Text, IntWritable> { private NcdcStationMetadata metadata; //--------------------------------------------------------------------- @Override protected void setup(Context context) throws IOException,
InterruptedException {
metadata = new NcdcStationMetadata();
metadata.initialize(new File("stations-fixed-width.txt"));
} //--------------------------------------------------------------------- @Override protected void reduce(Text key, Iterable<IntWritable> values,
Context context) throws IOException, InterruptedException {
String stationName = metadata.getStationName(key.toString()); int maxValue = Integer.MIN_VALUE; for (IntWritable value : values) {
maxValue = Math.max(maxValue, value.get());
}
context.write(new Text(stationName), new IntWritable(maxValue));
}
}
@Override public int run(String[] args) throws Exception {
Job job = JobBuilder.parseInputAndOutput(this, getConf(), args); if (job == null) { return -1;
}
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
job.setMapperClass(StationTemperatureMapper.class);
job.setCombinerClass(MaxTemperatureReducer.class);
job.setReducerClass(MaxTemperatureReducerWithStationLookup.class); return job.waitForCompletion(true) ? 0 : 1;
} public static void main(String[] args) throws Exception { int exitCode = ToolRunner.run( new MaxTemperatureByStationNameUsingDistributedCacheFile(),
args);
System.exit(exitCode);
}
}
该程序在reducer的configure()的方法中用文件的原始名称来获取缓存文件,该文件的路径与任务的工作目录相同
关于distributed cache API
大多数应用程序不需要distributed cache API,然而一些程序可能还是需要distributed cache 更加高级的功能
distributed cache API包括两种分功能函数:一种函数用来将数据放入缓存(在Job类中);一种函数用来从缓存收集数据的(在JobContext类中)
第一种函数,如下图:
余下的还有一个函数createSymlink(),当要将文件进行本地化到时候,这个函数用来为运行的Job中所有文件建立符号链接,符号链接的名字可以根据文件URI的fragment标示符设定,例如:hdfs://namenode/foo/bar#myfile 可以表示链接到Task的工作目录myfile文件中,如果没有fragment标示符,那只能用工具GenericOptionsParser自动给将文件添加到distributed cache(分布式缓存)中。
第二种函数是在JobContext类下,当向从distributed cache中访问文件,可以在map或者reduce的task代码中调用这类函数,如下:
public Path[] getLocalCacheFiles() throws IOException; public Path[] getLocalCacheArchives() throws IOException; public Path[] getFileClassPaths(); public Path[] getArchiveClassPaths();
如果distributed cache下的文件在Task的工作目录下有符号链接,就可以依据文件名访问这个已经本地化的文件,也可以用getLocalCacheFiles()和getLocalCacheArchive()函数从缓存中只得到了这个文件和archive的引用,另外可以通过getFileClassPaths()和getArchiveClassPaths()函数得到已经添加到Task的classpath中的文件和archive.
注意这些文件都是以本地Path对象存在,为了读取文件,可以用本地的FileSystem实例,用getLocal函数来获取文件,下面是重写上面 部分程序:
@Override protected void setup(Context context) throws IOException, InterruptedException {
metadata = new NcdcStationMetadata();
Path[] localPaths = context.getLocalCacheFiles(); if (localPaths.length == 0) { throw new FileNotFoundException("Distributed cache file not found.");
}
File localFile = new File(localPaths[0].toString());
metadata.initialize(localFile);
}
5.MapReduce类库
Hadoop还为mapper和reducer提供了一个常用的函数库,如下图,可参考相关Java文档
转载地址:http://www.cnblogs.com/biyeymyhjob/archive/2012/08/12/2634252.html (责任编辑:IT)
1.计数器 计数器是一种收集Job统计的有效手段,用于质量控制或应用级统计。计数器的应用使得获取统计数据比使用日志文件获取数据更加容易。 1).内置计数器 Hadoop的内置计数器用来描述Job的各项指标,例如已处理的字节数和记录数,输入数据量和输出数据量。 内置计数器被分为几类(group):
实际上每类都包含Task计数器和Job计数器 A.Task计数器 Task计数器用来收集整个执行过程中Task所生成的信息,这些结果的通过Job里面所有Task聚集(aggregate)统计出来,例如:MAP_INPUT_RECORDS是用来统计所有map读入的记录数目,Take计数器功能同样也被Task Attempt支持,所生成的信息经过TaskTracker传送到JobTracker(注意,在Yarn中信息流的不同)。下面显示部分内置的Task计数器:
B.Job计数器 JobTracker和Yarn中的AM都支持Job计数器功能,所以所生成的信息不用在hadoop网络中传输。Job计数器的数据是Job级别的,不会随着Task运行变化,例如:TOTAL_LAUNCHED_MAPS统计的是整个Job运行过程被启动的map的个数。 下图是Job计数器
2).用户定义Java计数器 MapReduce允许用户编写数量不限的全局自定义计数器,在mapper和reducer中都可以编写,这些计数器由一个Java枚举(enum)类型来定义,以便对计数器分组。MapReduce将在Job结束时候统计所有mapper和reducer中的计数器,并生成结果,以下面的气温统计程序为例,这个程序实现了同时统计气温最高值、气温缺失值的记录和不规范的字段和质量代码: public class MaxTemperatureWithCounters extends Configured implements Tool { enum Temperature { MISSING, MALFORMED } static class MaxTemperatureMapperWithCounters extends Mapper<LongWritable, Text, Text, IntWritable> { private NcdcRecordParser parser = new NcdcRecordParser(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { parser.parse(value); if (parser.isValidTemperature()) { int airTemperature = parser.getAirTemperature(); context.write(new Text(parser.getYear()), new IntWritable( airTemperature)); } else if (parser.isMalformedTemperature()) { System.err.println("Ignoring possibly corrupt input: " + value); context.getCounter(Temperature.MALFORMED).increment(1); } else if (parser.isMissingTemperature()) { context.getCounter(Temperature.MISSING).increment(1); } // dynamic counter context.getCounter("TemperatureQuality", parser.getQuality()) .increment(1); } } @Override public int run(String[] args) throws Exception { Job job = JobBuilder.parseInputAndOutput(this, getConf(), args); if (job == null) { return -1; } job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); job.setMapperClass(MaxTemperatureMapperWithCounters.class); job.setCombinerClass(MaxTemperatureReducer.class); job.setReducerClass(MaxTemperatureReducer.class); return job.waitForCompletion(true) ? 0 : 1; } public static void main(String[] args) throws Exception { int exitCode = ToolRunner.run(new MaxTemperatureWithCounters(), args); System.exit(exitCode); } }
A.动态计数器 在上面的实例程序中还使用到了动态计数器,这种计数器不由Java枚举类型定义,在上面的例子用到Reporter对象的incrCounter()方法有两个String类型的输入参数,分别代表数组和计数器名称: public void incrCounter(String group, String counter, long amount); 其实相比之下,枚举类型易于使用而且相对安全,适合大多数Job使用。
B.计数器名称的命名 Hadoop提供了资源捆绑的方法来修改计数器的现实名称,使计数器的名称具有可读性,也可以创建以Java枚举类型一个属性文件(用下划线分割嵌套类型),并且将其与包含该枚举类型的顶级类放在一个目录,然后用其属性值来控制计数器的显示名称,例如属性文件MaxTemperatureWithCounters_Temperature.properties的内容如下:
C.获取计数器 Hadoop中,用户也可以使用Java API获取计数器的值,Java API支持在Job运行期间就能获取计数器的值如下例,统计气温信息缺失记录所占的比例: import org.apache.hadoop.conf.Configured; import org.apache.hadoop.mapred.*; import org.apache.hadoop.util.*; public class MissingTemperatureFields extends Configured implements Tool { @Override public int run(String[] args) throws Exception { if (args.length != 1) { JobBuilder.printUsage(this, "<job ID>"); return -1; } String jobID = args[0]; JobClient jobClient = new JobClient(new JobConf(getConf())); RunningJob job = jobClient.getJob(JobID.forName(jobID)); if (job == null) { System.err.printf("No job with ID %s found.\n", jobID); return -1; } if (!job.isComplete()) { System.err.printf("Job %s is not complete.\n", jobID); return -1; } Counters counters = job.getCounters(); long missing = counters .getCounter(MaxTemperatureWithCounters.Temperature.MISSING); long total = counters.getCounter(Task.Counter.MAP_INPUT_RECORDS); System.out.printf("Records with missing temperature fields: %.2f%%\n", 100.0 * missing / total); return 0; } public static void main(String[] args) throws Exception { int exitCode = ToolRunner.run(new MissingTemperatureFields(), args); System.exit(exitCode); } } 在这个程序使用到的计数器Java API函数,RunningJob下的生成的Counters类下的getCounter()方法会返回一个Counters对象,其中包含了这个作业的所有计数器。
在新的Hadoop Java API中,Job下(而不的RunningJob)的Counters类中的findCounter()方法也返回一个Counters对象,下面程序调用findCounter()获取内置的Input Records 计数器的值,即通过组名称和计数器名称访问这个计数器。 Cluster cluster = new Cluster(getConf()); Job job = cluster.getJob(JobID.forName(jobID)); Counters counters = job.getCounters(); long missing = counters.findCounter( MaxTemperatureWithCounters.Temperature.MISSING).getValue(); long total = counters.findCounter(TaskCounter.MAP_INPUT_RECORDS).getValue();
3).用户定义Streaming计数器 Streaming MapReduce程序通过向标准错误流发送一行特殊格式的信息来增加计数器的值,格式如下:
Python向Missing计数器的值增加1的代码: sys.stderr.write("reporter:counter:Temperature,Missing,1\n")
2.排序 尽管应用程序本身可能不需要排序的功能,但是排序仍然是MapReduce的核心技术,以下以处理天气数据来说明MapReduce的排序 1).准备 在线面代码中,用顺序文件存储数据,IntWritable键代表气温(并且正确排序),其Text值就是数据行,map创建一个块压缩的顺序文件 public class SortDataPreprocessor extends Configured implements Tool { static class CleanerMapper extends Mapper<LongWritable, Text, IntWritable, Text> { private NcdcRecordParser parser = new NcdcRecordParser(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { parser.parse(value); if (parser.isValidTemperature()) { context.write(new IntWritable(parser.getAirTemperature()), value); } } } @Override public int run(String[] args) throws Exception { Job job = JobBuilder.parseInputAndOutput(this, getConf(), args); if (job == null) { return -1; } job.setMapperClass(CleanerMapper.class); job.setOutputKeyClass(IntWritable.class); job.setOutputValueClass(Text.class); job.setNumReduceTasks(0); job.setOutputFormatClass(SequenceFileOutputFormat.class); SequenceFileOutputFormat.setCompressOutput(job, true); SequenceFileOutputFormat.setOutputCompressorClass(job, GzipCodec.class); SequenceFileOutputFormat.setOutputCompressionType(job, CompressionType.BLOCK); return job.waitForCompletion(true) ? 0 : 1; } public static void main(String[] args) throws Exception { int exitCode = ToolRunner.run(new SortDataPreprocessor(), args); System.exit(exitCode); } }
2).部分排序 下面程序利用IntWritable键对顺序文件排序 public class SortByTemperatureUsingHashPartitioner extends Configured implements Tool { @Override public int run(String[] args) throws Exception { Job job = JobBuilder.parseInputAndOutput(this, getConf(), args); if (job == null) { return -1; } job.setInputFormatClass(SequenceFileInputFormat.class); job.setOutputKeyClass(IntWritable.class); job.setOutputFormatClass(SequenceFileOutputFormat.class); //----------------------------------------------------------------------------- SequenceFileOutputFormat.setCompressOutput(job, true); SequenceFileOutputFormat.setOutputCompressorClass(job, GzipCodec.class); SequenceFileOutputFormat.setOutputCompressionType(job, CompressionType.BLOCK); //----------------------------------------------------------------------------- return job.waitForCompletion(true) ? 0 : 1; } public static void main(String[] args) throws Exception { int exitCode = ToolRunner.run( new SortByTemperatureUsingHashPartitioner(), args); System.exit(exitCode); } }
关于控制排列顺序: key的排序顺序是由RawComparator决定,规则如下: A.属性mapred.output.key.comparator.class配置的话,在调用Job类下的setSortComparatorClass()方法后,使用前面属性的实例化后的类 B.否则key的类型必须是WritableComparable的子类,并且key在comparator注册以便使用 C.如果key没有在comparator注册,则只用RawComparator将key进行反序列化成一个对象,谈后由WritableComparable的compareTo()方法进行处理操作。
关于应用:基于分区的MapFile查找技术 以按key执行查找操作为例,在多个文件情况效率更高,以下面三段程序为例 MapReduce程序对一个顺序文件排序输出MapFile public class SortByTemperatureToMapFile extends Configured implements Tool { @Override public int run(String[] args) throws Exception { Job job = JobBuilder.parseInputAndOutput(this, getConf(), args); if (job == null) { return -1; } job.setInputFormatClass(SequenceFileInputFormat.class); job.setOutputKeyClass(IntWritable.class); //------------------------------------------------------------------------- job.setOutputFormatClass(MapFileOutputFormat.class); //------------------------------------------------------------------------- SequenceFileOutputFormat.setCompressOutput(job, true); SequenceFileOutputFormat.setOutputCompressorClass(job, GzipCodec.class); SequenceFileOutputFormat.setOutputCompressionType(job, CompressionType.BLOCK); //------------------------------------------------------------------------- return job.waitForCompletion(true) ? 0 : 1; } public static void main(String[] args) throws Exception { int exitCode = ToolRunner.run(new SortByTemperatureToMapFile(), args); System.exit(exitCode); } }
从MapFile集合获取符合指定key的第一项记录,其中程序getReaders()方法打开MapFile.Reader实例,getEntry()方法使用partitioner找到包含key的Reader实例,如果getEntry()返回null,表明没有找到匹配key,否则返回匹配数据。 public class LookupRecordByTemperature extends Configured implements Tool { @Override public int run(String[] args) throws Exception { if (args.length != 2) { JobBuilder.printUsage(this, "<path> <key>"); return -1; } Path path = new Path(args[0]); IntWritable key = new IntWritable(Integer.parseInt(args[1])); //----------------------------------------------------------------------- Reader[] readers = MapFileOutputFormat.getReaders(path, getConf()); Partitioner<IntWritable, Text> partitioner = new HashPartitioner<IntWritable, Text>(); Text val = new Text(); Writable entry = MapFileOutputFormat.getEntry(readers, partitioner, key, val); //----------------------------------------------------------------------- if (entry == null) { System.err.println("Key not found: " + key); return -1; } NcdcRecordParser parser = new NcdcRecordParser(); parser.parse(val.toString()); System.out.printf("%s\t%s\n", parser.getStationId(), parser.getYear()); return 0; } public static void main(String[] args) throws Exception { int exitCode = ToolRunner.run(new LookupRecordByTemperature(), args); System.exit(exitCode); } }
我们还可以直接用reader数组来获得包含指定key的所有记录,readers数组按分区排序,因而针对一个key的reader,均使用MapReduce Job中的同一个partitioner 找到reader之后,可通过MapFile的get()方法获取第一条包含key的记录,接着,循环调用next()获取下一条记录,知道key改变为止,下面的程序从一个MapFile集合中获取包含指定key的所有记录: public class LookupRecordsByTemperature extends Configured implements Tool { @Override public int run(String[] args) throws Exception { if (args.length != 2) { JobBuilder.printUsage(this, "<path> <key>"); return -1; } Path path = new Path(args[0]); IntWritable key = new IntWritable(Integer.parseInt(args[1])); //--------------------------------------------------------------------- Reader[] readers = MapFileOutputFormat.getReaders(path, getConf()); Partitioner<IntWritable, Text> partitioner = new HashPartitioner<IntWritable, Text>(); Text val = new Text(); Reader reader = readers[partitioner.getPartition(key, val, readers.length)]; Writable entry = reader.get(key, val); if (entry == null) { System.err.println("Key not found: " + key); return -1; } //--------------------------------------------------------------------- NcdcRecordParser parser = new NcdcRecordParser(); IntWritable nextKey = new IntWritable(); do { parser.parse(val.toString()); System.out.printf("%s\t%s\n", parser.getStationId(), parser.getYear()); } while (reader.next(nextKey, val) && key.equals(nextKey)); return 0; } public static void main(String[] args) throws Exception { int exitCode = ToolRunner.run(new LookupRecordsByTemperature(), args); System.exit(exitCode); } }
3).全排序 Hadoop全局排序思路,首先,创建一系列排好序的文件;其次,串联这些文件;最后,生成一个全局排序文件,主要的思路是使用一个partitioner来描述全局排序的输出,其中关键点在于如何划分各个分区,理想情况下,各分区所含记录数应该大致相等,使Job的总运行时间不会受制于个别的reduce。 获得数据分布信息意味着可以建立一系列分布更均匀的分区,通过对key空间进行采样,就可较为均匀地划分数据集。采样的核心思想是只查一小部分key,获得key的近似分布,并由此构建分区,为了实现采样,Hadoop内置了若干采样器。 InputSampler类实现Sampler接口如下: public interface Sampler<K, V> { K[] getSample(InputFormat<K, V> inf, Job job) throws IOException, InterruptedException; } 这个借口由InputSampler类的静态方法writePartitionFile()调用,目的创建一个顺序文件来存储定义分区的key: public static <K, V> void writePartitionFile(Job job, Sampler<K, V> sampler)throws IOException, ClassNotFoundException, InterruptedException;
顺序文件被TotalOrderPartitioner使用,为排序作业创建分区,见如下程序,该MapReduce程序利用TotalOrderPartitioner根据IntWritable key对顺序文件进行全局排序,该程序还使用了RandomSampler,以指定的采样率均匀从一个数据集中选择样本。
public class SortByTemperatureUsingTotalOrderPartitioner extends Configured implements Tool { @Override public int run(String[] args) throws Exception { Job job = JobBuilder.parseInputAndOutput(this, getConf(), args); if (job == null) { return -1; } job.setInputFormatClass(SequenceFileInputFormat.class); job.setOutputKeyClass(IntWritable.class); job.setOutputFormatClass(SequenceFileOutputFormat.class); SequenceFileOutputFormat.setCompressOutput(job, true); SequenceFileOutputFormat.setOutputCompressorClass(job, GzipCodec.class); SequenceFileOutputFormat.setOutputCompressionType(job, CompressionType.BLOCK); job.setPartitionerClass(TotalOrderPartitioner.class); InputSampler.Sampler<IntWritable, Text> sampler = new InputSampler.RandomSampler<IntWritable, Text>( 0.1, 10000, 10); InputSampler.writePartitionFile(job, sampler); // Add to DistributedCache Configuration conf = job.getConfiguration(); String partitionFile = TotalOrderPartitioner.getPartitionFile(conf); URI partitionUri = new URI(partitionFile + "#" + TotalOrderPartitioner.DEFAULT_PATH); DistributedCache.addCacheFile(partitionUri, conf); DistributedCache.createSymlink(conf); return job.waitForCompletion(true) ? 0 : 1; } public static void main(String[] args) throws Exception { int exitCode = ToolRunner.run( new SortByTemperatureUsingTotalOrderPartitioner(), args); System.exit(exitCode); } } 其他采样器,例如SplitSampler只采样一个分片中前n个记录,IntervalSampler以一定的间隔定期从划分中选择key,因此对于已排好序的数据来说是一个很好的选择。
4).辅助排序 MapReduce框架在记录到达reducer之前按照key对记录排序,但因为partition只保证每一个reducer接受一个key的所有记录,而在这一个分区之内,key所对应的value并没有被排序,所以当需要对值也进行排序的时候,就需要一定的技巧和方法。 下面给出对记录值的排序方法
示例程序,通过对key中的value(气温)进行排序来找出最高气温,程序的mapper中利用IntPair类定义了一个代表年份和气温的组合key,因为都是根据组合key去排序,无需mapper和reducer中的value,所用使用NullWritable即可,程序中用setGroupingComparatorClass()定义一个Comparator来按年份对key排序。 public class MaxTemperatureUsingSecondarySort extends Configured implements Tool { static class MaxTemperatureMapper extends Mapper<LongWritable, Text, IntPair, NullWritable> { private NcdcRecordParser parser = new NcdcRecordParser(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { parser.parse(value); if (parser.isValidTemperature()) { context.write( new IntPair(parser.getYearInt(), parser .getAirTemperature()), NullWritable.get()); } } } static class MaxTemperatureReducer extends Reducer<IntPair, NullWritable, IntPair, NullWritable> { @Override protected void reduce(IntPair key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException { context.write(key, NullWritable.get()); } } public static class FirstPartitioner extends Partitioner<IntPair, NullWritable> { @Override public int getPartition(IntPair key, NullWritable value, int numPartitions) { // multiply by 127 to perform some mixing return Math.abs(key.getFirst() * 127) % numPartitions; } } public static class KeyComparator extends WritableComparator { protected KeyComparator() { super(IntPair.class, true); } @Override public int compare(WritableComparable w1, WritableComparable w2) { IntPair ip1 = (IntPair) w1; IntPair ip2 = (IntPair) w2; int cmp = IntPair.compare(ip1.getFirst(), ip2.getFirst()); if (cmp != 0) { return cmp; } return -IntPair.compare(ip1.getSecond(), ip2.getSecond()); // reverse } } public static class GroupComparator extends WritableComparator { protected GroupComparator() { super(IntPair.class, true); } @Override public int compare(WritableComparable w1, WritableComparable w2) { IntPair ip1 = (IntPair) w1; IntPair ip2 = (IntPair) w2; return IntPair.compare(ip1.getFirst(), ip2.getFirst()); } } @Override public int run(String[] args) throws Exception { Job job = JobBuilder.parseInputAndOutput(this, getConf(), args); if (job == null) { return -1; } job.setMapperClass(MaxTemperatureMapper.class); job.setPartitionerClass(FirstPartitioner.class); job.setSortComparatorClass(KeyComparator.class); job.setGroupingComparatorClass(GroupComparator.class); job.setReducerClass(MaxTemperatureReducer.class); job.setOutputKeyClass(IntPair.class); job.setOutputValueClass(NullWritable.class); return job.waitForCompletion(true) ? 0 : 1; } public static void main(String[] args) throws Exception { int exitCode = ToolRunner.run(new MaxTemperatureUsingSecondarySort(), args); System.exit(exitCode); } }
Streaming也支持辅助排序方法,代码见权威指南。
3.连接 Hadoop支持大型数据的连接操作,操作分为两种map端连接和reduce端连接,其中reduce连接较为常用 1).map端连接 在的数据到达map函数之前,map端的连接操作在大量输入数据之间就开始进行连接操作。各个输入map的数据必须先进行分区,并且均按相同的key排序,同一个key的所有记录会放在同一分区内,例如:
map端的连接对拥有相同reducer数目,相同的key,输出文件不可分的几个Job进行连接操作。一般实现map端的连接操作要用org.apache.hadoop.mapreduce.join中的CompositeInputFormat类的连接表达式等实现,关于实现细节参见包文档。
2).reduce端连接 reduce端连接相比map不要求输入数据集符合特定结构,但是因为数据集要经过shuffle,reduce端连接的效率要低一些,基本思路:mapper为各个记录标记源,并且使用连接key作为map输出key,使key相同的记录放在同一reducer,这个过程中需要前面设计到的技术---多输入和辅助排序。 下面几段程序完整的实现了reduce端对气象站的记录进行连接操作,说明:程序中使用TextPair类构建组合key,包括气象站ID和"标记",标记是一个虚拟的字段,目的在于对记录排序,使气象站记录比天气记录先到达,简单的方法是:对于气象站记录"标记"值为0,对于天气记录“标记”值为1 A.这个mapper类用于reduce端连接中标记气象站的记录 public class JoinStationMapper extends Mapper<LongWritable, Text, TextPair, Text> { private NcdcStationMetadataParser parser = new NcdcStationMetadataParser(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { if (parser.parse(value)) { context.write(new TextPair(parser.getStationId(), "0"), new Text( parser.getStationName())); } } }
B.这个mapper类用于reduce端连接中标记天气记录 public class JoinRecordMapper extends Mapper<LongWritable, Text, TextPair, Text> { private NcdcRecordParser parser = new NcdcRecordParser(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { parser.parse(value); context.write(new TextPair(parser.getStationId(), "1"), value); } }
C.reducer用于连接已标记的气象站记录和天气记录 public class JoinReducer extends Reducer<TextPair, Text, Text, Text> { @Override protected void reduce(TextPair key, Iterable<Text> values, Context context) throws IOException, InterruptedException { Iterator<Text> iter = values.iterator(); Text stationName = new Text(iter.next()); while (iter.hasNext()) { Text record = iter.next(); Text outValue = new Text(stationName.toString() + "\t" + record.toString()); context.write(key.getFirst(), outValue); } } }
D.对天气记录和气象站记录名称执行连接操作 public class JoinRecordWithStationName extends Configured implements Tool { public static class KeyPartitioner extends Partitioner<TextPair, Text> { @Override public int getPartition(TextPair key, Text value, int numPartitions) { return (key.getFirst().hashCode() & Integer.MAX_VALUE) % numPartitions; } } @Override public int run(String[] args) throws Exception { if (args.length != 3) { JobBuilder .printUsage(this, "<ncdc input> <station input> <output>"); return -1; } Job job = new Job(getConf(), "Join weather records with station names"); job.setJarByClass(getClass()); Path ncdcInputPath = new Path(args[0]); Path stationInputPath = new Path(args[1]); Path outputPath = new Path(args[2]); MultipleInputs.addInputPath(job, ncdcInputPath, TextInputFormat.class, JoinRecordMapper.class); MultipleInputs.addInputPath(job, stationInputPath, TextInputFormat.class, JoinStationMapper.class); FileOutputFormat.setOutputPath(job, outputPath); job.setPartitionerClass(KeyPartitioner.class); job.setGroupingComparatorClass(TextPair.FirstComparator.class); job.setMapOutputKeyClass(TextPair.class); job.setReducerClass(JoinReducer.class); job.setOutputKeyClass(Text.class); return job.waitForCompletion(true) ? 0 : 1; } public static void main(String[] args) throws Exception { int exitCode = ToolRunner.run(new JoinRecordWithStationName(), args); System.exit(exitCode); } } 执行效果如下:
4.从属数据分布 从属数据(side data,权威指南翻译成"边数据")是Job所需的额外的只读数据,以辅助处理主要数据集,所要解决的是如何让所有的map和reduce高效的使用从属数据。 1).应用Job配置 (这段似乎和从属数据没太大关系)可以用Configuration下setter方法设置Job的key/value对,当需要向Task传送少量的元数据时就非常有用。 在Task运行期间,可以通过Context中的getConfiguration()方法获得所要的数据,如果要处理复杂的对象,可以选用Hadoop提供的Stringifer类,DefaultStringifier使用Hadoop的序列化框架来序列化对象 2).分布式缓存 Hadoop的分布式缓存机制能够在任务运行过程中及时地将文件和存档复制到任务节点以供使用,在一个Job中,通常只需复制各个文件到一个节点一次,以节约网络宽带。 A.工作机制 当一个Job开始运行的时候,首先Hadoop将由-files和-archives选项所指定的文件复制到TaskTracker的文件系统(一般是HDFS),然后,在Task运行之前,TaskTracker将文件从JobTracker的文件系统中复制到本地磁盘---缓存---这样Task就能访问这个文件,这样对于Task来说这个文件就已经本地化了。 TaskTracker维护一个关联缓存文件的计数器来统计缓存中的文件的使用情况,当一个Task运行时,关联Task所使用的文件的计数器会自动增加1,当Task运行完毕,这个计数器会相应的减去1,当个计数器为0的时候,表明已经没用Task使用该文件,可以从缓存中删掉 这些缓存文件放在${mapred.local.dir}/taskTracker/archive,同时这些文件以符号链的方式指向任务的工作目录,对于用户不用细究。
B.应用 使用工具GenericOptionsParser,用户可以用-file选项指定待分发文件,文件包含以逗号隔开的URL列表,使用-archives选项向自己的Task中复制存档(archives,感觉就是压缩)文件((JAR文件, ZIP文件, tar文件和gzipped tar 问价),这些文件释放到Task节点上,-libjars选项会把JAR文件添加到mapper和reducer任务的类路径上 以下为只用实例: 以下指令显示如何使用分布式缓存来共享元数据文件,以得到气象站名称:
MaxTemperatureByStationNameUsingDistributedCacheFile的实现: public class MaxTemperatureByStationNameUsingDistributedCacheFile extends Configured implements Tool { static class StationTemperatureMapper extends Mapper<LongWritable, Text, Text, IntWritable> { private NcdcRecordParser parser = new NcdcRecordParser(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { parser.parse(value); if (parser.isValidTemperature()) { context.write(new Text(parser.getStationId()), new IntWritable( parser.getAirTemperature())); } } } static class MaxTemperatureReducerWithStationLookup extends Reducer<Text, IntWritable, Text, IntWritable> { private NcdcStationMetadata metadata; //--------------------------------------------------------------------- @Override protected void setup(Context context) throws IOException, InterruptedException { metadata = new NcdcStationMetadata(); metadata.initialize(new File("stations-fixed-width.txt")); } //--------------------------------------------------------------------- @Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { String stationName = metadata.getStationName(key.toString()); int maxValue = Integer.MIN_VALUE; for (IntWritable value : values) { maxValue = Math.max(maxValue, value.get()); } context.write(new Text(stationName), new IntWritable(maxValue)); } } @Override public int run(String[] args) throws Exception { Job job = JobBuilder.parseInputAndOutput(this, getConf(), args); if (job == null) { return -1; } job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); job.setMapperClass(StationTemperatureMapper.class); job.setCombinerClass(MaxTemperatureReducer.class); job.setReducerClass(MaxTemperatureReducerWithStationLookup.class); return job.waitForCompletion(true) ? 0 : 1; } public static void main(String[] args) throws Exception { int exitCode = ToolRunner.run( new MaxTemperatureByStationNameUsingDistributedCacheFile(), args); System.exit(exitCode); } } 该程序在reducer的configure()的方法中用文件的原始名称来获取缓存文件,该文件的路径与任务的工作目录相同
关于distributed cache API 大多数应用程序不需要distributed cache API,然而一些程序可能还是需要distributed cache 更加高级的功能 distributed cache API包括两种分功能函数:一种函数用来将数据放入缓存(在Job类中);一种函数用来从缓存收集数据的(在JobContext类中) 第一种函数,如下图:
余下的还有一个函数createSymlink(),当要将文件进行本地化到时候,这个函数用来为运行的Job中所有文件建立符号链接,符号链接的名字可以根据文件URI的fragment标示符设定,例如:hdfs://namenode/foo/bar#myfile 可以表示链接到Task的工作目录myfile文件中,如果没有fragment标示符,那只能用工具GenericOptionsParser自动给将文件添加到distributed cache(分布式缓存)中。
第二种函数是在JobContext类下,当向从distributed cache中访问文件,可以在map或者reduce的task代码中调用这类函数,如下: public Path[] getLocalCacheFiles() throws IOException; public Path[] getLocalCacheArchives() throws IOException; public Path[] getFileClassPaths(); public Path[] getArchiveClassPaths(); 如果distributed cache下的文件在Task的工作目录下有符号链接,就可以依据文件名访问这个已经本地化的文件,也可以用getLocalCacheFiles()和getLocalCacheArchive()函数从缓存中只得到了这个文件和archive的引用,另外可以通过getFileClassPaths()和getArchiveClassPaths()函数得到已经添加到Task的classpath中的文件和archive. 注意这些文件都是以本地Path对象存在,为了读取文件,可以用本地的FileSystem实例,用getLocal函数来获取文件,下面是重写上面 部分程序: @Override protected void setup(Context context) throws IOException, InterruptedException { metadata = new NcdcStationMetadata(); Path[] localPaths = context.getLocalCacheFiles(); if (localPaths.length == 0) { throw new FileNotFoundException("Distributed cache file not found."); } File localFile = new File(localPaths[0].toString()); metadata.initialize(localFile); }
5.MapReduce类库 Hadoop还为mapper和reducer提供了一个常用的函数库,如下图,可参考相关Java文档
|