如何在Hadoop中读写SequenceFile
这篇文章展示了如何使用Java API,使用Hadoop MapReduce在Hadoop中读取和写入SequenceFile,以及如何为SequenceFile提供压缩选项。
编写序列文件Java程序
SeqeunceFile提供了一个静态方法createWriter()来创建一个写程序,该写程序用于在Hadoop中编写SequenceFile。createWriter方法有许多重载的变体(其中许多现在已弃用),但此处使用的方法如下。
public static org.apache.hadoop.io.SequenceFile.Writer createWriter(Configuration conf, org.apache.hadoop.io.SequenceFile.Writer.Option... opts) throws IOException
Java代码
import java.io.File; import java.io.IOException; import org.apache.commons.io.FileUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.io.SequenceFile.Writer; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.compress.GzipCodec; public class SFWrite { public static void main(String[] args) { Configuration conf = new Configuration(); int i =0; try { FileSystem fs = FileSystem.get(conf); // input file in local file system File file = new File("/home/theitroad/Documents/theitroad/Hadoop/Test/data.txt"); // Path for output file Path outFile = new Path(args[0]); IntWritable key = new IntWritable(); Text value = new Text(); SequenceFile.Writer writer = null; try { writer = SequenceFile.createWriter(conf, Writer.file(outFile), Writer.keyClass(key.getClass()), Writer.valueClass(value.getClass()), Writer.compression(SequenceFile.CompressionType.BLOCK, new GzipCodec())); for (String line : FileUtils.readLines(file)) { key.set(i++); value.set(line); writer.append(key, value); } }finally { if(writer != null) { writer.close(); } } } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }
在程序中还提供了压缩选项,并且使用的压缩编解码器是GzipCodec。
在Hadoop环境中执行程序
要在Hadoop环境中执行上述Java程序,我们需要在Hadoop的类路径中添加包含Java程序的.class文件的目录。
导出HADOOP_CLASSPATH ='/ huser / eclipse-workspace / theitroad / bin'
我的SFWrite.class文件位于/ huser / eclipse-workspace / theitroad / bin位置,因此我已导出该路径。
然后,我们可以使用以下命令运行该程序-
$ hadoop org.theitroad.SFWrite /user/output/item.seq 18/03/22 12:10:21 INFO zlib.ZlibFactory: Successfully loaded & initialized native-zlib library 18/03/22 12:10:21 INFO compress.CodecPool: Got brand-new compressor [.gz]
/user/output/item.seq是HDFS中的输出路径。
如果我们尝试在HDFS中显示文件内容,则该内容将不可读,因为SequenceFile是二进制文件格式。这使我们进入第二部分,如何读取序列文件。
读取序列文件Java程序
要在Hadoop中读取SequenceFile,我们需要获得一个SequenceFile.Reader实例,该实例可以读取任何写入程序SequenceFile格式。
使用此阅读器实例,我们可以使用next()方法遍历记录,此处使用的next方法的变体将键和值都作为Writable类型的参数,并分配从序列中读取的下一个(键,值)对文件放入这些变量。
import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.io.SequenceFile.Reader; import org.apache.hadoop.io.Text; public class SFRead { public static void main(String[] args) { Configuration conf = new Configuration(); try { Path inFile = new Path(args[0]); SequenceFile.Reader reader = null; try { IntWritable key = new IntWritable(); Text value = new Text(); reader = new SequenceFile.Reader(conf, Reader.file(inFile), Reader.bufferSize(4096)); //System.out.println("Reading file "); while(reader.next(key, value)) { System.out.println("Key " + key + "Value " + value); } }finally { if(reader != null) { reader.close(); } } } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }
使用MapReduce作业编写SequenceFile
我们还可以使用MapReduce作业在Hadoop中编写序列文件。当我们有一个大文件并且想要利用并行处理时,这很有用。
在这种情况下,MapReduce作业将非常简单,我们甚至不需要reduce作业,而Map任务只需编写(键,值)对。
import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.SequenceFile.CompressionType; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.compress.GzipCodec; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; public class SequenceFileWriter extends Configured implements Tool{ // Map function public static class SFMapper extends Mapper<LongWritable, Text, LongWritable, Text>{ public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { context.write(key, value); } } public static void main(String[] args) throws Exception{ int exitFlag = ToolRunner.run(new SequenceFileWriter(), args); System.exit(exitFlag); } @Override public int run(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf, "sfwrite"); job.setJarByClass(SequenceFileWriter.class); job.setMapperClass(SFMapper.class); job.setNumReduceTasks(0); job.setOutputKeyClass(LongWritable.class); job.setOutputValueClass(Text.class); job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(SequenceFileOutputFormat.class); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); // Compression related settings FileOutputFormat.setCompressOutput(job, true); FileOutputFormat.setOutputCompressorClass(job, GzipCodec.class); SequenceFileOutputFormat.setOutputCompressionType(job, CompressionType.BLOCK); int returnFlag = job.waitForCompletion(true) ? 0 : 1; return returnFlag; } }
在用于编写SequenceFile的MapReduce作业中,更重要的是为输出和压缩指定的作业设置。
使用MapReduce作业读取SequenceFile
如果要使用MapReduce作业读取序列文件,则该代码将与编写序列文件的方式非常相似。
一个主要变化是输入和输出格式。
job.setInputFormatClass(SequenceFileInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class);
import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; public class SequenceFileReader extends Configured implements Tool{ // Map function public static class SFMapper extends Mapper<LongWritable, Text, LongWritable, Text>{ public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { context.write(key, value); } } public static void main(String[] args) throws Exception{ int exitFlag = ToolRunner.run(new SequenceFileReader(), args); System.exit(exitFlag); } @Override public int run(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf, "sfread"); job.setJarByClass(SequenceFileReader.class); job.setMapperClass(SFMapper.class); job.setNumReduceTasks(0); job.setOutputKeyClass(LongWritable.class); job.setOutputValueClass(Text.class); job.setInputFormatClass(SequenceFileInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); int returnFlag = job.waitForCompletion(true) ? 0 : 1; return returnFlag; } }