> Linux集群 > Hadoop >

Hadoop序列化中的Writable接口(附部分源码)

序列化是将结构化对象为字节流以便与通过网络进行传输或者写入持久存储。反序列化指的是将字节流转为一系列结构化对象的过程。

序化在分布式数据处理的两列大领域经常出现:进程间通信和永久存储

hadoop中,节点直接的进程间通信是用远程过程调用(RPC)实现的。RPC协议将消息序列化成二进制流后发送到运城节点,远程节点接着将二进制流反序列化为原始的消息。

在Hadoop中,Writable接口定义了两个方法:一个用于将其状态写入二进制格式的DataOutput流,另一个用于从二进制格式的DataInput流读取其态。

 
1
2
3
4
5
6
7
packageorg.apache.hadoop.io;
importjava.io.DataOutput;
importjava.io.DataInput;
importjava.io.IOException;
public interface Writable {
    void write(DataOutput out)throws IOException;
    void readFields(DataInput in)throws IOException;

write和readFields分别实现了把对象序列化和反序列化的功能

让我们来看一个特别的Writable,看看可以对它进行哪些操作。我们要使用IntWritable,这是一个Java的int对象的封装。可以使用set()函数来创建和设置它的值:

 
1
2
IntWritable writable =new IntWritable();
writable.set(163);
类似地,我们也可以使用构造函数: 

 
1
IntWritable writable =newIntWritable(163);

 
为了检查IntWritable的序列化形式,我们写一个小的辅助方法,它把一个java.io.ByteArrayOutputStream封装到java.io.DataOutputStream中(java.io.DataOutput的一个实现),以此来捕获序列化的数据流中的字节: 

 
1
2
3
4
5
6
7
public static byte[] serialize(Writable writable)throws IOException {
    ByteArrayOutputStream out =new ByteArrayOutputStream();
    DataOutputStream dataOut =new DataOutputStream(out);
    writable.write(dataOut);
    dataOut.close();
    returnout.toByteArray();
}
整数用四个字节写入(我们使用JUnit 4断言): 

 
1
2
byte[] bytes = serialize(writable);
assertThat(bytes.length, is(4));


 
字节使用大端顺序写入(所以,最重要的字节写在数据流的开始处,这是由java.io.DataOutput接口规定的),我们可以使用Hadoop的StringUtils方法看到它们的十六进制表示: 
 
1
assertThat(StringUtils.byteToHexString(bytes), is("000000a3"));

让我们再来试试反序列化。我们创建一个帮助方法来从一个字节数组读取一个Writable对象:

 
1
2
3
4
5
6
7
8
public static byte[] deserialize(Writable writable,byte[] bytes)
throws IOException {
    ByteArrayInputStream in =new ByteArrayInputStream(bytes);
    DataInputStream dataIn =new DataInputStream(in);
    writable.readFields(dataIn);
    dataIn.close();
    return bytes;
}


 
我们构造一个新的、缺值的IntWritable,然后调用deserialize()方法来读取刚写入的输出流。然后发现它的值(使用get方法检索得到)还是原来的值163: 


 
1
2
3
IntWritable newWritable =new IntWritable();
deserialize(newWritable, bytes);
assertThat(newWritable.get(), is(163));
WritableComparable 和comparator

IntWritable实现了WritableComparable接口,后者是Writable和java.lang.Comparable接口的子接口。

 
1
2
3
packageorg.apache.hadoop.io;
public interface WritableComparable<t> extends Writable, Comparable<t> {
}

类型的比较对MapReduce而言至关重要的,键和键之间的比较是在排序阶段完成。Hadoop提供的一个优化方法是从Java Comparator的RawComparator扩展

 

1
2
3
4
5
packageorg.apache.hadoop.io;
importjava.util.Comparator;
public interface RawComparator<t> extends Comparator<t> {
     public int compare(byte[] b1,ints1,intl1,byte[] b2,ints2,intl2);
}


 
1
2
3
4
5
<span></span> package java.util;
public interface Comparator<T> {
    int compare(T o1, T o2);
    boolean equals(Object obj);
}
这个接口允许执行者比较从流中读取的未被反序列化为对象的记录,从而省去了创建对象的所有开销。例如,IntWritables的comparator使用原始的compare()方法从每个字节数组的指定开始位置(S1和S2)和长度(L1和L2)读取整数b1和b2然后直接进行比较。

WritableComparator是RawComparator对WritableComparable类的一个通用实现。它提供两个主要功能。首先,它提供了一个默认的对原始compare()函数的调用,对从数据流对要比较的对象进行反序列化,然后调用对象的compare()方法。其次,它充当的是RawComparator实例的一个工厂方法(Writable方法已经注册)。例如,为获得IntWritable的comparator,我们只需使用:

 
1
RawComparator<intwritable> comparator = WritableComparator.get(IntWritable.class);

WritableComparator get方法源码:

 

 
1
2
3
4
5
6
7
8
9
10
private static HashMap<Class, WritableComparator> comparators =
   new HashMap<Class, WritableComparator>(); // registry
 
 /** Get a comparator for a {@link WritableComparable} implementation. */
 public static synchronized WritableComparator get(Class<? extends WritableComparable> c) {
   WritableComparator comparator = comparators.get(c);
   if (comparator == null)
     comparator = new WritableComparator(c, true);
   return comparator;
 }

 

comparator可以用来比较两个IntWritable:

 
1
2
3
IntWritable w1 =newIntWritable(163);
IntWritable w2 =newIntWritable(67);
assertThat(comparator.compare(w1, w2), greaterThan(0));

或者它们的序列化描述: 

 
1
2
3
byte[] b1 = serialize(w1);
byte[] b2 = serialize(w2);
assertThat(comparator.compare(b1, 0, b1.length, b2, 0, b2.length), greaterThan(0));

 
WritableComparator的compare()方法的源码:
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
   try {
     buffer.reset(b1, s1, l1);                   // parse key1
     key1.readFields(buffer);
     
     buffer.reset(b2, s2, l2);                   // parse key2
     key2.readFields(buffer);
     
   } catch (IOException e) {
     throw new RuntimeException(e);
   }
   
   return compare(key1, key2);                   // compare them
 }
 
@SuppressWarnings("unchecked")
 public int compare(WritableComparable a, WritableComparable b) {
   return a.compareTo(b);
 }

 

参考:《hadoop权威指南》

(责任编辑:IT)