Hadoop MapReduce中的只进行映射作业

时间:2020-01-09 10:34:33  来源:igfitidea点击:

通常,当我们想到Hadoop中的MapReduce作业时,我们会想到映射器和缩减器都在完成其处理份额。在大多数情况下都是如此,但是我们可以在某些情况下希望在Hadoop中拥有仅映射器作业。

什么时候只需要映射map工作

当我们确实需要处理数据并将输出作为(键,值)对但不想汇总那些(键,值)对时,可以在Hadoop中选择仅地图作业。
例如,如果要使用MapReduce将文本文件转换为序列文件。在这种情况下,我们只想从文本文件中读取一行并将其写入序列文件,以便可以仅使用map方法选择MapReduce。
如果使用MapReduce将文本文件转换为拼花文件,则采用相同的方法,我们可以在Hadoop中选择仅映射器作业。

我们需要为映射器做的工作

对于仅映射程序的工作,我们只需要在代码中编写映射方法,即可进行处理。减速器的数量设置为零。
为了将减速器的数量设置为零,可以使用Job类的setNumReduceTasks()方法。因此,我们需要在MapReduce代码驱动程序的作业配置中添加以下内容。

job.setNumReduceTasks(0);

仅使用Mapper工作的好处

如前所述,如果我们只想在不进行任何聚合的情况下处理数据,那么最好只执行映射器作业,因为我们可以节省Hadoop框架内部完成的某些处理。

由于没有reducer,因此不需要洗牌和排序阶段,也不需要将数据传输到运行reducer的节点。

还要注意,在MapReduce作业中,映射阶段的输出将写入节点上的本地磁盘,而不是HDFS。在仅使用Mapper作业的情况下,将Map输出写入HDFS。

Hadoop示例中的仅进行Mapper作业

如果必须仅使用映射功能将文本文件转换为序列文件,则可以将化简器的数量设置为零。

import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.SequenceFile.CompressionType;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class SequenceFileWriter extends	Configured implements Tool{
  // Map function
  public static class SFMapper extends Mapper<LongWritable, Text, LongWritable, Text>{
    public void map(LongWritable key, Text value, Context context) 
      throws IOException, InterruptedException {
      context.write(key, value);
    }
  }
  public static void main(String[] args)  throws Exception{
    int exitFlag = ToolRunner.run(new SequenceFileWriter(), args);
    System.exit(exitFlag);	   
  }
  @Override
  public int run(String[] args) throws Exception {
    Configuration conf = new Configuration();
    Job job = Job.getInstance(conf, "sfwrite");
    job.setJarByClass(SequenceFileWriter.class);
    job.setMapperClass(SFMapper.class);
    // Setting reducer to zero
    job.setNumReduceTasks(0);
    job.setOutputKeyClass(LongWritable.class);
    job.setOutputValueClass(Text.class);
    job.setInputFormatClass(TextInputFormat.class);
    job.setOutputFormatClass(SequenceFileOutputFormat.class);
    
    FileInputFormat.addInputPath(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));
    
    // Compression related settings
    FileOutputFormat.setCompressOutput(job, true);
    FileOutputFormat.setOutputCompressorClass(job, GzipCodec.class);
    SequenceFileOutputFormat.setOutputCompressionType(job, CompressionType.BLOCK);
    int returnFlag = job.waitForCompletion(true) ? 0 : 1;
    return returnFlag;
  }
}

我们可以使用followng命令运行MapReduce作业。

$ hadoop jar /pathto/jar/theitroadhadoop.jar org.theitroad.SequenceFileWriter /user/input/count /user/output/seq

通过列出输出目录,我们可以看到已创建序列文件。

hdfs dfs -ls /user/output/seq

Found 2 items
-rw-r--r--   1 theitroad supergroup          0 2018-06-14 12:26 /user/output/seq/_SUCCESS
-rw-r--r--   1 theitroad supergroup        287 2018-06-14 12:26 /user/output/seq/part-m-00000