在HDFS中写入文件的Java程序

时间:2020-01-09 10:34:32  来源:igfitidea点击:

这篇文章显示了一个Java程序,该程序使用Hadoop FileSystem API在HDFS中写入文件。

使用Java在HDFS中写入文件的步骤如下:

  • FileSystem是文件系统的抽象,其中HDFS是其中的一种实现。因此,我们将必须使用get方法获取FileSystem的实例(在本例中为HDFS)。
  • 在程序中,我们可以看到get()方法将Configuration作为参数。配置对象具有从配置文件(即从中获取文件系统的core-site.xml)读取的所有与配置相关的信息。
  • 在HDFS中,Path对象代表完整文件路径。
  • 使用FileSystem的create()方法可以创建文件,该方法返回FSDataOutputStream。

Java程序写入HDFS中的文件

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

public class HDFSFileWrite {

  public static void main(String[] args) {
    Configuration conf = new Configuration();
    try {
      FileSystem fs = FileSystem.get(conf);
      // Hadoop DFS Path - Input & Output file
      Path inFile = new Path(args[0]);
      Path outFile = new Path(args[1]);
      // Verification
      if (!fs.exists(inFile)) {
        System.out.println("Input file not found");
        throw new IOException("Input file not found");
      }
      if (fs.exists(outFile)) {
        System.out.println("Output file already exists");
        throw new IOException("Output file already exists");
      }
    
      // open and read from file
      FSDataInputStream in = fs.open(inFile);
      // Create file to write
      FSDataOutputStream out = fs.create(outFile);
			
      byte buffer[] = new byte[256];
      try {
        int bytesRead = 0;
        while ((bytesRead = in.read(buffer)) > 0) {
          out.write(buffer, 0, bytesRead);
          }
      } catch (IOException e) {
        System.out.println("Error while copying file");
      } finally {
        in.close();
        out.close();
      }			
    } catch (IOException e) {
      // TODO Auto-generated catch block
      e.printStackTrace();
    }
  }
}

在上面的程序中,如果输入文件位于本地文件系统中,则输入和输出文件都位于HDFS中,那么我们可以使用BufferedInputStream创建输入流,如下所示:

InputStream in = new BufferedInputStream(new FileInputStream("/local_file_path/file_name"));

在Hadoop环境中执行程序

要在Hadoop环境中执行上述Java程序,我们需要在Hadoop的类路径中添加包含Java程序的.class文件的目录。

export HADOOP_CLASSPATH='/huser/eclipse-workspace/theitroad/bin'

我的HDFSFileWrite.class文件位于/ huser / eclipse-workspace / theitroad / bin位置,因此我已经导出了该路径。

然后,我们可以通过提供从中读取数据的输入文件的路径和向其写入内容的输出文件的路径来运行程序。

hadoop org.theitroad.HDFSFileWrite /user/input/test/aa.txt /user/input/test/write.txt

通过使用ls HDFS命令,可以验证是否创建了文件。

hdfs dfs -ls /user/input/test/

-rw-r--r-- 1 theitroad supergroup 10 2018-01-18 14:55 /user/input/test/write.txt

使用IOUtils类编写HDFS文件

Hadoop框架提供了IOUtils类,该类具有许多与I / O相关的便捷方法。我们可以使用它来将字节从输入流复制到输出流。

Java程序写入HDFS文件

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;

public class HDFSFileWrite {

  public static void main(String[] args) {
    Configuration conf = new Configuration();
    FSDataInputStream in = null;
    FSDataOutputStream out = null;
    try {
      FileSystem fs = FileSystem.get(conf);
      // Hadoop DFS Path - Input & Output file
      Path inFile = new Path(args[0]);
      Path outFile = new Path(args[1]);
      // Verification
      if (!fs.exists(inFile)) {
        System.out.println("Input file not found");
        throw new IOException("Input file not found");
      }
      if (fs.exists(outFile)) {
        System.out.println("Output file already exists");
        throw new IOException("Output file already exists");
      }
      try {
        // open and read from file
        in = fs.open(inFile);
        // Create file to write
        out = fs.create(outFile);
        IOUtils.copyBytes(in, out, 512, false);
        
      } finally {
        IOUtils.closeStream(in);
        IOUtils.closeStream(out);
      }      
    } catch (IOException e) {
      // TODO Auto-generated catch block
      e.printStackTrace();
    }
  }
}