如何在Hadoop中使用LZO压缩

时间:2020-01-09 10:34:35  来源:igfitidea点击:

这篇文章展示了如何在Hadoop中安装和使用LZO压缩。

安装LZO软件包

要在Ubuntu中安装LZO软件包,请使用以下命令。

sudo apt-get install liblzo2-2 liblzo2-dev

下载和打包hadoop-lzo

我们需要获得hadoop-lzo jars才能使lzo可拆分。为此,我们将需要克隆hadoop-lzo存储库并进行构建。

另一个选择是使用rpm包,我们可以从此处下载rpm包– https://code.google.com/archive/p/hadoop-gpl-packing/downloads

在这里,我展示了克隆和构建它的步骤。请参阅此URL – https://github.com/twitter/hadoop-lzo以获取进一步的了解。

打包克隆的代码也需要Maven。如果尚未安装maven,则可以使用以下命令在系统上安装maven。

$ sudo apt install maven

克隆hadoop-lzo存储库。

$ git clone https://github.com/twitter/hadoop-lzo.git

为了编译代码,并将hadoop-lzo jar更改目录构建到克隆的hadoop-lzo目录,并使用以下命令。

mvn clean
mvn install

这应该使用创建的jar创建一个目标文件夹– hadoop-lzo-0.4.21-SNAPSHOT.jar。

将LZO压缩与Hadoop结合使用的配置

由于我们将在MapReduce作业中使用LZO压缩,因此将hadoop-lzo jar复制到$ HADOOP_INSTALLATION_DIR中的/ share / hadoop / mapreduce / lib中。

sudo cp /home/theitroad/hadoop-lzo/target/hadoop-lzo-0.4.21-SNAPSHOT.jar $HADOOP_INSTALLATION_DIR/share/hadoop/mapreduce/lib

还将jar添加到Hadoop类路径。为此,在$ HADOOP_INSTALLATION_DIR / etc / hadoop / hadoop-env.sh中添加以下内容

export HADOOP_CLASSPATH=$HADOOP_CLASSPATH:/home/theitroad/hadoop-lzo/target/hadoop-lzo-0.4.21-SNAPSHOT.jar

export JAVA_LIBRARY_PATH=/home/theitroad/hadoop-lzo/target/native/Linux-amd64-64:$HADOOP_INSTALLATION_DIR/lib/native

我们还需要更新配置文件$ HADOOP_INSTALLATION_DIR / etc / hadoop / core-site.xml,以注册LZO的外部编解码器。

<property>
  <name>io.compression.codecs</name>
  <value>org.apache.hadoop.io.compress.GzipCodec, org.apache.hadoop.io.compress.DefaultCodec, 
    org.apache.hadoop.io.compress.BZip2Codec, com.hadoop.compression.lzo.LzoCodec, 
    com.hadoop.compression.lzo.LzopCodec
  </value>
</property>
<property>
  <name>io.compression.codec.lzo.class</name>
  <value>com.hadoop.compression.lzo.LzoCodec</value>
</property>

在Hadoop中使用LZO压缩的示例Java程序

这是一个使用LzopCodec压缩文件的Java程序。输入文件位于本地文件系统中,压缩后的输出文件存储在HDFS中。

确保已在Java构建路径中为hadoop-lzo添加了创建的外部jar。

Java代码

import java.io.BufferedInputStream;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.hadoop.io.compress.CompressionOutputStream;

public class LzoCompress {
  public static void main(String[] args) {
    Configuration conf = new Configuration();
    InputStream in = null;
    OutputStream out = null;
    try {
      FileSystem fs = FileSystem.get(conf);
      // Input file from local file system
      in = new BufferedInputStream(new FileInputStream("/home/theitroad/Documents/theitroad/Hadoop/Test/data.txt"));
      //Compressed Output file
      Path outFile = new Path("/user/compout/data.lzo");
      // Verification
      if (fs.exists(outFile)) {
        System.out.println("Output file already exists");
        throw new IOException("Output file already exists");
      }			
      out = fs.create(outFile);

      CompressionCodecFactory	factory	= new CompressionCodecFactory(conf);
      CompressionCodec codec = factory.getCodecByClassName("com.hadoop.compression.lzo.LzopCodec");
      CompressionOutputStream	compressionOutputStream	= codec.createOutputStream(out);
      
      try {
        IOUtils.copyBytes(in, compressionOutputStream, 4096, false);
        compressionOutputStream.finish();
        
      } finally {
        IOUtils.closeStream(in);
        IOUtils.closeStream(compressionOutputStream);
      }
			
    } catch (IOException e) {
      // TODO Auto-generated catch block
      e.printStackTrace();
    }
  }
}

在Hadoop环境中执行程序

要在Hadoop环境中执行上述Java程序,我们需要在Hadoop的类路径中添加包含Java程序的.class文件的目录。

$ export HADOOP_CLASSPATH='/huser/eclipse-workspace/theitroad/bin'

我的LzoCompress.class文件位于/ huser / eclipse-workspace / theitroad / bin位置,因此我已经导出了该路径。

然后,我们可以使用以下命令运行该程序-

$ hadoop org.theitroad.LzoCompress

只是检查压缩文件占用了多少块。

hdfs fsck /user/compout/data.lzo

.Status: HEALTHY
 Total size:	417954415 B
 Total dirs:	0
 Total files:	1
 Total symlinks:		0
 Total blocks (validated):	4 (avg. block size 104488603 B)
 Minimally replicated blocks:	4 (100.0 %)

FSCK ended at Sat Mar 24 20:08:33 IST 2018 in 8 milliseconds

如我们所见,该文件足够大,可以占用4个HDFS块。这将有助于我们检查MapReduce是否能够为压缩文件创建拆分。

在Hadoop MapReduce中使用LZOCompression

让我们创建一个简单的MapReduce作业,使用创建的.lzo作为输入。为了在Hadoop MapReduce中使用LZO压缩文件作为输入,必须使用的输入格式是LzoTextInputFormat。

import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import com.hadoop.mapreduce.LzoTextInputFormat;

public class LzoWordCount extends Configured implements Tool{
  // Map function
  public static class MyMapper extends Mapper<LongWritable, Text, Text, IntWritable>{
    private Text word = new Text();
    public void map(LongWritable key, Text value, Context context) 
        throws IOException, InterruptedException {
      // Splitting the line on spaces
      String[] stringArr = value.toString().split("\s+");
      for (String str : stringArr) {
        word.set(str);
        context.write(word, new IntWritable(1));
      }       
    }
  }
	
  // Reduce function
  public static class MyReducer extends Reducer<Text, IntWritable, Text, IntWritable>{		   
    private IntWritable result = new IntWritable();

    public void reduce(Text key, Iterable<IntWritable> values, Context context) 
        throws IOException, InterruptedException {
      int sum = 0;
      for (IntWritable val : values) {
        sum += val.get();
      }
      result.set(sum);
      context.write(key, result);
    }
  }
  public static void main(String[] args)  throws Exception{    
    int exitFlag = ToolRunner.run(new LzoWordCount(), args);
    System.exit(exitFlag);
  }
  @Override
  public int run(String[] args) throws Exception {
    Configuration conf = new Configuration();
    Job job = Job.getInstance(conf, "WC");
    job.setJarByClass(LzoWordCount.class);
    job.setMapperClass(MyMapper.class);    
    job.setReducerClass(MyReducer.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);
    job.setInputFormatClass(LzoTextInputFormat.class);
    job.setOutputFormatClass(TextOutputFormat.class);
    //job.addFileToClassPath(new Path("/home/theitroad/hadoop-lzo/target/hadoop-lzo-0.4.21-SNAPSHOT.jar")); 
    FileInputFormat.addInputPath(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));
    return job.waitForCompletion(true) ? 0 : 1;
  }
}

如果运行此MapReduce作业,则可以看到仅创建了一个拆分。

$ hadoop jar /home/theitroad/Documents/theitroad/Hadoop/lzowordcount.jar org.theitroad.LzoWordCount /user/compout/data.lzo /user/output1

18/03/25 19:14:09 INFO input.FileInputFormat: Total input files to process : 1
18/03/25 19:14:10 INFO mapreduce.JobSubmitter: number of splits:1

Map任务无法拆分LZO压缩文件,因此它将整个文件用作一个输入拆分,这意味着只有一个Map任务将处理整个文件。为了使LZO文件可拆分,我们将必须运行索引器。我们可以将lzo indexer作为Java程序或者MapReduce作业运行。

将LZO Indexer作为Java程序运行

$ hadoop jar /home/theitroad/hadoop-lzo/target/hadoop-lzo-0.4.21-SNAPSHOT.jar com.hadoop.compression.lzo.LzoIndexer /user/compout/data.lzo

将lzo indexer作为MapReduce作业运行

$ hadoop jar /home/theitroad/hadoop-lzo/target/hadoop-lzo-0.4.21-SNAPSHOT.jar com.hadoop.compression.lzo.DistributedLzoIndexer /user/compout/data.lzo

无论哪种方式,它都应该创建一个.index文件(/user/compout/data.lzo.index),这意味着.lzo文件已成功建立索引并且可以立即拆分。要检查它,请再次运行MapReduce作业。

hadoop jar /home/theitroad/Documents/theitroad/Hadoop/lzowordcount.jar org.theitroad.LzoWordCount /user/compout/data.lzo /user/output2

18/03/25 19:25:22 INFO input.FileInputFormat: Total input files to process : 1
18/03/25 19:25:22 INFO mapreduce.JobSubmitter: number of splits:4

在控制台中,我们可以看到Map任务现在能够创建对应于4个HDFS块的4个输入分割。