Hadoop MapReduce中的组合器

时间:2020-01-09 10:34:34  来源:igfitidea点击:

这篇文章显示了什么是Hadoop MapReduce中的组合器,以及如何使用组合器功能来减少整体MapReduce执行的整体内存,I / O和网络需求。

为什么MapReduce需要组合器

当执行MapReduce作业并且映射器开始产生输出时,在Hadoop框架中发生了许多处理,这被称为改组和排序阶段。

映射输出根据reducer的数量进行分区,这些分区也经过排序,然后写入本地磁盘。

然后,数据从运行映射的节点转移到运行缩减器的节点。由于单个化简器将从多个映射器获取其输入,因此所有来自多个映射的数据都将传输到化简器并再次合并以形成化简任务的完整输入。

如我们所见,所有这些处理都需要内存,网络带宽和I / O。这就是Hadoop中的组合器可以通过最大程度地减少发送到缩减器的数据来提供帮助的地方。

MapReduce中的合并器功能

Hadoop中的Combiner是一项优化,可以在地图端本身聚合数据。合并器函数在映射输出上运行,聚合数据(因此数据大小变小),合并器函数的输出成为reduce任务的输入。请注意,使用组合器是可选的。

大多数时候,我们也会将Reducer类用作合并器类。如果不是,那么Combiner类实现也必须扩展Reducer并实现reduce方法。

由于组合器与减速器具有相同的语义,因此输入和输出类型遵循相同的要求。在MapReduce作业中,reduce输入类型必须与地图输出类型匹配,而合并器输入类型必须与地图输出类型匹配。由于组合器的输出成为减速器的输入,因此组合器的输出类型必须与简化输入类型匹配。

例如,假设我们试图获得股票的最高价格。有两个输入拆分,由两个不同的映射处理。

拆分1 –

AAA		23
AAA		26
AAA		21
AAA		19

拆分2 –

AAA		27
AAA		28
AAA		25
AAA		24

Map-1的输出

(AAA, 23)
(AAA, 26)
(AAA, 21)
(AAA, 19)

Map-2的输出

(AAA, 27)
(AAA, 28)
(AAA, 25)
(AAA, 24)

在随机和排序阶段缩减任务完成后,将获得其输入,如下所示:

[AAA, (23, 26, 21, 19, 27, 28, 25, 24)]

和减少输出–(AAA,28)

在这里,如果我们指定与reducer相同的组合器类,则组合器将汇总各自的映射输出。

Map-1输出的合成器

(AAA, 26)

Map-2输出的合成器

(AAA, 28)

现在,reduce的输入如下:

[AAA, (26, 28)]

因此,我们可以看到如何最大程度地减少传输到reducer的数据。

如何在MapReduce作业中指定组合器

我们可以使用MapReduce驱动程序中Job类的setCombinerClass()方法来指定组合器。例如,如果Reducer类是MaxStockPriceReducer,并且我们也希望将Reducer类设置为Combiner类,则可以按照以下步骤进行操作。

job.setCombinerClass(MaxStockPriceReducer.class);

使用组合器时必须确保的一件事是:但是,将输入合并后,最终结果应该相同。

例如,如果要计算平均值,则map-1(3,4,5)和map-2(6,8)

然后减少函数将计算平均值为(3,4,5,6,8)= 5.2

带组合器
(3,4,5)的平均值= 4
(6,8)的平均值= 7

然后在归约函数中-(4,7)的平均值= 5.5

在此示例中,我们可以看到合并器的结果是不同的,因此我们必须以这样的方式编写逻辑:即使使用合并器,结果也应相同。

使用合并器的MapReduce示例

这是一个MapReduce示例,其中使用MapReduce计算每个股票代码的最高价格。输入文件具有制表符分隔的数据,包括股票代码和价格。

import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class StockPrice extends Configured implements Tool{
  // Map function
  public static class MaxStockPriceMapper extends Mapper<LongWritable, Text, Text, IntWritable>{
      
    private final static IntWritable one = new IntWritable(1);
    private Text symbol = new Text();
    public void map(LongWritable key, Text value, Context context) 
        throws IOException, InterruptedException {
      // Splitting the line on tab
      String[] stringArr = value.toString().split("\t");
      symbol.set(stringArr[0]);
      Integer price = Integer.parseInt(stringArr[1]);
      context.write(symbol, new IntWritable(price));
    }
  }
	
  // Reduce function
  public static class MaxStockPriceReducer extends Reducer<Text, IntWritable, Text, IntWritable>{
    public void reduce(Text key, Iterable values, Context context) 
        throws IOException, InterruptedException {

      int	maxValue = Integer.MIN_VALUE;
      for (IntWritable val : values) {
        maxValue = Math.max(maxValue, val.get());
      }      
      context.write(key, new IntWritable(maxValue));
    }
  }
	
  public static void main(String[] args) throws Exception {
    int exitFlag = ToolRunner.run(new StockPrice(), args);
    System.exit(exitFlag);
  }
	
  @Override
  public int run(String[] args) throws Exception {
    Configuration conf = new Configuration();
    Job job = Job.getInstance(conf, "Stock price");
    job.setJarByClass(getClass());
    job.setMapperClass(MaxStockPriceMapper.class);    
    job.setReducerClass(MaxStockPriceReducer.class);		
    //job.setCombinerClass(MaxStockPriceReducer.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);
    FileInputFormat.addInputPath(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));
    return job.waitForCompletion(true) ? 0 : 1;
  }
}

如我们所见,最初对设置合并器类的行进行了注释。如果在不指定任何组合器的情况下运行此MapReduce作业,请在控制台中查看计数器。

Combine input records=0
Combine output records=0
Reduce input groups=2
Reduce shuffle bytes=106
Reduce input records=10
Reduce output records=2
Spilled Records=20

现在,设置合并器的行已取消注释,并且相同的计数器如下所示,再次运行MapReduce作业。

Combine input records=10
Combine output records=2
Reduce input groups=2
Reduce shuffle bytes=26
Reduce input records=2
Reduce output records=2
Spilled Records=4

因此,我们可以看到组合器本身最小化了发送到reducer的数据,并且在此过程中还减少了混洗的字节。