Hadoop中的预定义Mapper和Reducer类

时间:2020-01-09 10:34:34  来源:igfitidea点击:

在Hadoop框架中,有一些预定义的Mapper和Reducer类可以在所需的场景中使用。这样一来,我们无需为这些方案编写映射器或者精简器,而可以使用现成的类。

让我们看一下Hadoop中一些预定义的Mapper和Reducer类。

Hadoop中的预定义Mapper类

InverseMapper –此预定义的映射器交换键和值。因此,输入(键,值)对反转,键成为值,值成为输出(键,值)对中的键。

TokenCounterMapper –该映射器标记输入值并发出每个单词,计数为1. 因此,在此单词映射MapReduce程序的情况下编写的映射器可以由此内置映射器代替。请参阅使用TokenCounterMapper和IntSumReducer的示例字计数程序。

MultithreadedMapper –这是Mapper的多线程实现。使用此MapRunnable的Mapper实现必须是线程安全的。

ChainMapper – ChainMapper类允许在一个Map任务中使用多个Mapper类。以链接的方式调用Mapper类,第一个Mapper的输出成为第二个Mapper的输入,依此类推,直到最后一个Mapper,最后一个Mapper的输出将被写入任务的输出。

FieldSelectionMapper –此类实现一个映射器类,该类可用于以类似于Unix cut的方式执行字段选择。输入数据被视为由用户指定的分隔符分隔的字段。用户可以指定构成地图输出键的字段列表,以及构成地图输出值的字段列表。
稍后请参阅使用FieldSelectionMapper的示例。

RegexMapper – Hadoop中的预定义Mapper类从与正则表达式匹配的输入中提取文本。

Hadoop中的预定义Reducer类

IntSumReducer –此预定义的Reducer类将与特定键关联的整数值相加。

LongSumReducer –此预定义的Reducer类将对与特定键关联的长值求和。

FieldSelectionReducer –此类实现一个reducer类,该类可用于以类似于Unix cut的方式执行字段选择。输入数据被视为由用户指定的分隔符分隔的字段。用户可以指定构成缩减输出键的字段列表,以及构成缩减输出值的字段列表。这些字段是键值和值之间的并集。

ChainReducer – ChainReducer类允许在Reducer任务中的Reducer之后链接多个Mapper类。对于Reducer输出的每个记录,将以链接方式调用Mapper类。减速器的输出成为第一个映射器的输入,第一个的输出成为第二个映射器的输入,依此类推,直到最后一个映射器,最后一个映射器的输出将被写入任务的输出。

WrappedReducer-一种Reducer,用于包装给定的Reducer以允许自定义Reducer.Context实现。如果要提供Context接口的实现,则此Reducer很有用。

使用预定义的Mapper和Reducer类的示例

这是一些使用预定义的Mapper和Reducer类的示例。

使用FieldSelection映射器

在示例中,使用制表符分隔输入数据,并且我们要提取字段0作为键,字段1作为值。在这种情况下,我们可以使用FieldSelectionMapper而不是编写自己的映射器。

import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.fieldsel.FieldSelectionHelper;
import org.apache.hadoop.mapreduce.lib.fieldsel.FieldSelectionMapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class StockPrice extends Configured implements Tool{
  // Reduce function
  public static class MaxStockPriceReducer extends Reducer<Text, Text, Text, IntWritable>{

    public void reduce(Text key, Iterable values, Context context) 
        throws IOException, InterruptedException {
    System.out.println("key -- " + key.toString());
    int	maxValue = Integer.MIN_VALUE;
    for (Text val : values) {
      System.out.println("Value -- " + val);
      if(val != null && !val.toString().equals("")) {
        maxValue = Math.max(maxValue, Integer.parseInt(val.toString()));
      }
    }    
    System.out.println("maxValue -- " + maxValue);
    context.write(key, new IntWritable(maxValue));
    }
  }
	
	
  public static void main(String[] args) throws Exception {
    int exitFlag = ToolRunner.run(new StockPrice(), args);
    System.exit(exitFlag);
  }
	
  @Override
  public int run(String[] args) throws Exception {
    Configuration conf = new Configuration();
    // setting the separator
    conf.set(FieldSelectionHelper.DATA_FIELD_SEPERATOR, "\t");
    // Setting the fields that are to be extracted
    conf.set(FieldSelectionHelper.MAP_OUTPUT_KEY_VALUE_SPEC, "0:1");
    Job job = Job.getInstance(conf, "Stock price");
    job.setJarByClass(getClass());
    // setting the predefined mapper
    job.setMapperClass(FieldSelectionMapper.class);    

    job.setReducerClass(MaxStockPriceReducer.class);
    job.setMapOutputKeyClass(Text.class);
    job.setMapOutputValueClass(Text.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);
    FileInputFormat.addInputPath(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));
    return job.waitForCompletion(true) ? 0 : 1;
  }
}

使用TokenCounterMapper和IntSumReducer编写一个字数统计MapReduce程序

在Hadoop中的字数统计MapReduce程序中,我们看到了一个字数统计MR程序,其中在该程序中编写了Map和Reduce功能,但是我们可以使用预定义的Mapper和Reducer类编写一个字数统计MR程序,我们只需指定TokenCounterMapper类(预定义的Mapper类)和IntSumReducer类(预定义的Reducer类)。

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.map.TokenCounterMapper;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.mapreduce.lib.reduce.IntSumReducer;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class SimpleWordCount extends Configured implements Tool{

  public static void main(String[] args) throws Exception{
    int exitFlag = ToolRunner.run(new SimpleWordCount(), args);
    System.exit(exitFlag);
  }

  @Override
  public int run(String[] args) throws Exception {
    Configuration conf = new Configuration();
    Job job = Job.getInstance(conf, "WC");
    job.setJarByClass(getClass());
    // Setting pre-defing mapper and reducer
    job.setMapperClass(TokenCounterMapper.class);    
    job.setReducerClass(IntSumReducer.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);
    job.setInputFormatClass(TextInputFormat.class);
    job.setOutputFormatClass(TextOutputFormat.class);
    FileInputFormat.addInputPath(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));
    return job.waitForCompletion(true) ? 0 : 1;
  }
}