Hadoop MapReduce中的只进行映射作业
时间:2020-01-09 10:34:33 来源:igfitidea点击:
通常,当我们想到Hadoop中的MapReduce作业时,我们会想到映射器和缩减器都在完成其处理份额。在大多数情况下都是如此,但是我们可以在某些情况下希望在Hadoop中拥有仅映射器作业。
什么时候只需要映射map工作
当我们确实需要处理数据并将输出作为(键,值)对但不想汇总那些(键,值)对时,可以在Hadoop中选择仅地图作业。
例如,如果要使用MapReduce将文本文件转换为序列文件。在这种情况下,我们只想从文本文件中读取一行并将其写入序列文件,以便可以仅使用map方法选择MapReduce。
如果使用MapReduce将文本文件转换为拼花文件,则采用相同的方法,我们可以在Hadoop中选择仅映射器作业。
我们需要为映射器做的工作
对于仅映射程序的工作,我们只需要在代码中编写映射方法,即可进行处理。减速器的数量设置为零。
为了将减速器的数量设置为零,可以使用Job类的setNumReduceTasks()方法。因此,我们需要在MapReduce代码驱动程序的作业配置中添加以下内容。
job.setNumReduceTasks(0);
仅使用Mapper工作的好处
如前所述,如果我们只想在不进行任何聚合的情况下处理数据,那么最好只执行映射器作业,因为我们可以节省Hadoop框架内部完成的某些处理。
由于没有reducer,因此不需要洗牌和排序阶段,也不需要将数据传输到运行reducer的节点。
还要注意,在MapReduce作业中,映射阶段的输出将写入节点上的本地磁盘,而不是HDFS。在仅使用Mapper作业的情况下,将Map输出写入HDFS。
Hadoop示例中的仅进行Mapper作业
如果必须仅使用映射功能将文本文件转换为序列文件,则可以将化简器的数量设置为零。
import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.SequenceFile.CompressionType; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.compress.GzipCodec; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; public class SequenceFileWriter extends Configured implements Tool{ // Map function public static class SFMapper extends Mapper<LongWritable, Text, LongWritable, Text>{ public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { context.write(key, value); } } public static void main(String[] args) throws Exception{ int exitFlag = ToolRunner.run(new SequenceFileWriter(), args); System.exit(exitFlag); } @Override public int run(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf, "sfwrite"); job.setJarByClass(SequenceFileWriter.class); job.setMapperClass(SFMapper.class); // Setting reducer to zero job.setNumReduceTasks(0); job.setOutputKeyClass(LongWritable.class); job.setOutputValueClass(Text.class); job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(SequenceFileOutputFormat.class); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); // Compression related settings FileOutputFormat.setCompressOutput(job, true); FileOutputFormat.setOutputCompressorClass(job, GzipCodec.class); SequenceFileOutputFormat.setOutputCompressionType(job, CompressionType.BLOCK); int returnFlag = job.waitForCompletion(true) ? 0 : 1; return returnFlag; } }
我们可以使用followng命令运行MapReduce作业。
$ hadoop jar /pathto/jar/theitroadhadoop.jar org.theitroad.SequenceFileWriter /user/input/count /user/output/seq
通过列出输出目录,我们可以看到已创建序列文件。
hdfs dfs -ls /user/output/seq Found 2 items -rw-r--r-- 1 theitroad supergroup 0 2018-06-14 12:26 /user/output/seq/_SUCCESS -rw-r--r-- 1 theitroad supergroup 287 2018-06-14 12:26 /user/output/seq/part-m-00000