> Linux集群 > Hadoop >

hadoop-FileSystem文件写入数据

FileSystem类有一系列创建文件的方法。最简单的是给拟创建的文件指定一个路径对象,然后返回一个用来写的输出流:

 
1
public FSDataOutputStream create(Path f) throws IOException


 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
  
 /**
 * Opens an FSDataOutputStream at the indicated Path. Files are overwritten
 * by default.
 */
 public FSDataOutputStream create(Path f) throws IOException {
 return create(f, true);
 }
 /**
   * Opens an FSDataOutputStream at the indicated Path.
   * 
   * overwrite 是否强制覆盖已有的文件
   */
  public FSDataOutputStream create(Path f, boolean overwrite)
    throws IOException {
    return create(f, overwrite, 
                  getConf().getInt("io.file.buffer.size", 4096),
                  getDefaultReplication(),
                  getDefaultBlockSize());
  }
  
  /**
   * Opens an FSDataOutputStream at the indicated Path.
   * @param f the file name to open
   * @param overwrite if a file with this name already exists, then if true,
   *   the file will be overwritten, and if false an error will be thrown.
   * @param bufferSize the size of the buffer to be used. 写入文件时的缓冲大小
   * @param replication(复制) required block replication for the file.
   * @param blockSize - 文件块大小 
   */
  public FSDataOutputStream create(Path f, 
                                   boolean overwrite,
                                   int bufferSize,
                                   short replication,
                                   long blockSize
                                   ) throws IOException {
    return create(f, overwrite, bufferSize, replication, blockSize, null);
  }
 
 
  /**
   * Opens an FSDataOutputStream at the indicated Path with write-progress
   * reporting.
   * @param f the file name to open
   * @param overwrite if a file with this name already exists, then if true,
   *   the file will be overwritten, and if false an error will be thrown.
   * @param bufferSize the size of the buffer to be used.
   * @param replication required block replication for the file. 
   */
  public FSDataOutputStream create(Path f,
                                            boolean overwrite,
                                            int bufferSize,
                                            short replication,
                                            long blockSize,
                                            Progressable progress
                                            ) throws IOException {
    return this.create(f, FsPermission.getDefault(),
        overwrite, bufferSize, replication, blockSize, progress);
  }
  
  /**
   * Opens an FSDataOutputStream at the indicated Path with write-progress
   * reporting.
   * @param f the file name to open
   * @param permission
   * @param overwrite if a file with this name already exists, then if true,
   *   the file will be overwritten, and if false an error will be thrown.
   * @param bufferSize the size of the buffer to be used.
   * @param replication required block replication for the file.
   * @param blockSize
   * @param progress
   * @throws IOException
   * @see #setPermission(Path, FsPermission)
   * 
   * 这个方法有重载的版本允许我们指定是否强制覆盖已有的文件、文件副本数量、
   * 写入文件时的缓冲大小、文件块大小以及文件许可
   */
  public abstract FSDataOutputStream create(Path f,
      FsPermission permission,
      boolean overwrite,
      int bufferSize,
      short replication,
      long blockSize,
      Progressable progress) throws IOException;

这个create()方法有重载的版本允许我们指定是否强制覆盖已有的文件、文件副本数量、写入文件时的缓冲大小、文件块大小以及文件许可。

参数默认:overwrite 默认强制覆盖已有的文件

      bufferSize 写入文件时的缓冲大小4096

    replication 默认的是1

    blockSize 32MB

    Progressable 默认的为null

    permission文件权限 是777

注意:create()方法为需要写入的文件而创建的父目录可能原先并不存在。虽然这样很方便,但有时并不希望这样。如果我们想在父目录不存在时不执行写入,就必须在调用exists()首先检查父目录是否存在。

还有一个用于传递回调接口的重载方法Progressable,如此一来,我们所写的应用就会被告知数据写入数据节点的进度:

 
1
2
3
package org.apache.hadoop.util; 
ublic interface Progressable { 
      public void progress();
新建文件的另一种方法是使用append()在一个已有文件中追加(也有一些其他重载版本):

 
1
public FSDataOutputStream append(Path f) throws IOException

 

这个操作允许一个写入者打开已有文件并在其末尾写入数据。有了这个API,会产生无边界文件的应用,以日志文件为例,就可以在重启后在已有文件上继续写入。此添加操作是可选的,并不是所有Hadoop文件系统都有实现。HDFS支持添加,但S3就不支持了。

例3-4展示了如何将本地文件复制到Hadoop文件系统。我们在每次Hadoop调用progress()方法时,也就是在每64 KB数据包写入数据节点管道后打印一个句号来展示整个过程。(注意,这个动作并不是API指定的,因此在Hadoop后面的版本中大多被改变了。API仅仅是让我们注意到"发生了一些事"。)

例3-4:将本地文件复制到Hadoop文件系统并显示进度

// vv FileCopyWithProgress
public class FileCopyWithProgress {
  public static void main(String[] args) throws Exception {
    String localSrc = args[0];
    String dst = args[1];
    
    InputStream in = new BufferedInputStream(new FileInputStream(localSrc));
    
    Configuration conf = new Configuration();
    FileSystem fs = FileSystem.get(URI.create(dst), conf);
    OutputStream out = fs.create(new Path(dst), new Progressable() {
      public void progress() {
        System.out.print(".");
      }
    });
    
    IOUtils.copyBytes(in, out, 4096, true);
  }
}
// ^^ FileCopyWithProgress

 
 
 
 
 

典型用途:

  % hadoop FileCopyWithProgress input/docs/1400-8.txt hdfs://localhost/user/tom/1400-8.txt  

目前,其他Hadoop文件系统在写入时都不会调用progress()。通过后面几章的描述,我们会感到进度之于MapReduce应用的重要性。

FSDataOutputStream

FSDataInputStream类是关于从HDFS读数据的,而FSDataOutputStream类是向HDFS中写数据的

FileSystem中的create()方法返回了一个FSDataOutputStream,与FSDataInputStream类似,它也有一个查询文件当前位置的方法:

package org.apache.hadoop.fs;  
 
public class FSDataOutputStream extends  DataOutputStream implements Syncable {  
  public void write(int b) throws IOException {
      out.write(b);
      position++;
      if (statistics != null) {
        statistics.incrementBytesWritten(1);
      }
    }

  public long getPos() throws IOException {  
    // implementation elided  
  }      
   // implementation elided  
} 

但是,与FSDataInputStream不同,FSDataOutputStream不允许定位。这是因为HDFS只允许对一个打开的文件顺序写入,或向一个已有文件添加。换句话说,它不支持除文件尾部的其他位置的写入,这样一来,写入时的定位就没有什么意义。

 

(责任编辑:IT)