在Hadoop中以gzip格式压缩文件的Java程序

时间:2020-01-09 10:34:35  来源:igfitidea点击:

在这篇文章中,我们将看到一个Java程序,该程序显示了如何在Hadoop中使用gzip格式压缩文件。

压缩格式gzip不支持拆分,因此尽管压缩文件仍可以存储为单独的HDFS块(默认大小为128 MB),但MapReduce作业将无法创建输入拆分。

Java程序使用gzip格式压缩文件

必须用于gzip的Hadoop压缩编解码器是" org.apache.hadoop.io.compress.GzipCodec"。

要获得该编解码器,请使用CompressionCodecFactory类的getCodecByClassName方法。

要创建CompressionOutputStream,请使用编解码器类的createOutputStream(OutputStream out)方法。我们将使用CompressionOutputStream将压缩形式的文件数据写入流中。

import java.io.BufferedInputStream;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.hadoop.io.compress.CompressionOutputStream;

public class GzipCompress {
  public static void main(String[] args) {
    Configuration conf = new Configuration();
    InputStream in = null;
    OutputStream out = null;
    try {
      FileSystem fs = FileSystem.get(conf);
      // Input file from local file system
      in = new BufferedInputStream(new FileInputStream("/home/theitroad/Documents/theitroad/Hadoop/Test/data.txt"));
      //Compressed Output file
      Path outFile = new Path("/user/compout/test.gz");
      // Verification
      if (fs.exists(outFile)) {
        System.out.println("Output file already exists");
        throw new IOException("Output file already exists");
      }			
      out = fs.create(outFile);
			
      // For gzip compression
      CompressionCodecFactory	factory	= new CompressionCodecFactory(conf);
      CompressionCodec codec = factory.getCodecByClassName("org.apache.hadoop.io.compress.GzipCodec");
      CompressionOutputStream	compressionOutputStream	= codec.createOutputStream(out);      
      try {
        IOUtils.copyBytes(in, compressionOutputStream, 4096, false);
        compressionOutputStream.finish();
        
      } finally {
        IOUtils.closeStream(in);
        IOUtils.closeStream(compressionOutputStream);
      }
			
    } catch (IOException e) {
      // TODO Auto-generated catch block
      e.printStackTrace();
    }
  }
}

在Hadoop环境中执行程序

要在Hadoop环境中执行上述Java程序,我们需要在Hadoop的类路径中添加包含Java程序的.class文件的目录。

导出HADOOP_CLASSPATH ='/ huser / eclipse-workspace / theitroad / bin'

我的GzipCompress.class文件位于/ huser / eclipse-workspace / theitroad / bin位置,因此我已导出该路径。

然后,我们可以使用以下命令运行该程序-

$ hadoop org.theitroad.GzipCompress

18/03/11 12:59:49 INFO zlib.ZlibFactory: Successfully loaded & initialized native-zlib library
18/03/11 12:59:49 INFO compress.CodecPool: Got brand-new compressor [.gz]

程序中使用的输入文件足够大,可以确保即使压缩文件大小超过128 MB,也可以确保将其作为两个单独的块存储在HDFS中。

我们可以使用hdfs fsck命令进行检查。

$ hdfs fsck /user/compout/test.gz

.Status: HEALTHY
 Total size:	233963084 B
 Total dirs:	0
 Total files:	1
 Total symlinks:		0
 Total blocks (validated):	2 (avg. block size 116981542 B)

FSCK ended at Wed Mar 14 21:07:46 IST 2018 in 6 milliseconds

由于gzip不支持拆分,因此使用此压缩文件作为MapReduce作业的输入将意味着将仅为Map任务创建一个拆分。

要测试创建了多少输入拆分,请将此压缩的gzip文件作为wordcount MapReduce程序的输入。

$ hadoop jar /home/theitroad/Documents/theitroad/Hadoop/wordcount.jar org.theitroad.WordCount /user/compout/test.gz /user/output3

18/03/11 13:09:23 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032
18/03/11 13:09:23 WARN mapreduce.JobResourceUploader: Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this.
18/03/11 13:09:23 INFO input.FileInputFormat: Total input files to process : 1
18/03/11 13:09:24 INFO mapreduce.JobSubmitter: number of splits:1

正如我们在控制台mapreduce.JobSubmitter:上显示的这一行中看到的那样:分割数:1即使有两个HDFS块也只能为MapReduce作业创建一个输入分割,因为无法压缩gzip压缩文件。