Hadoop中的预定义Mapper和Reducer类
在Hadoop框架中,有一些预定义的Mapper和Reducer类可以在所需的场景中使用。这样一来,我们无需为这些方案编写映射器或者精简器,而可以使用现成的类。
让我们看一下Hadoop中一些预定义的Mapper和Reducer类。
Hadoop中的预定义Mapper类
InverseMapper –此预定义的映射器交换键和值。因此,输入(键,值)对反转,键成为值,值成为输出(键,值)对中的键。
TokenCounterMapper –该映射器标记输入值并发出每个单词,计数为1. 因此,在此单词映射MapReduce程序的情况下编写的映射器可以由此内置映射器代替。请参阅使用TokenCounterMapper和IntSumReducer的示例字计数程序。
MultithreadedMapper –这是Mapper的多线程实现。使用此MapRunnable的Mapper实现必须是线程安全的。
ChainMapper – ChainMapper类允许在一个Map任务中使用多个Mapper类。以链接的方式调用Mapper类,第一个Mapper的输出成为第二个Mapper的输入,依此类推,直到最后一个Mapper,最后一个Mapper的输出将被写入任务的输出。
FieldSelectionMapper –此类实现一个映射器类,该类可用于以类似于Unix cut的方式执行字段选择。输入数据被视为由用户指定的分隔符分隔的字段。用户可以指定构成地图输出键的字段列表,以及构成地图输出值的字段列表。
稍后请参阅使用FieldSelectionMapper的示例。
RegexMapper – Hadoop中的预定义Mapper类从与正则表达式匹配的输入中提取文本。
Hadoop中的预定义Reducer类
IntSumReducer –此预定义的Reducer类将与特定键关联的整数值相加。
LongSumReducer –此预定义的Reducer类将对与特定键关联的长值求和。
FieldSelectionReducer –此类实现一个reducer类,该类可用于以类似于Unix cut的方式执行字段选择。输入数据被视为由用户指定的分隔符分隔的字段。用户可以指定构成缩减输出键的字段列表,以及构成缩减输出值的字段列表。这些字段是键值和值之间的并集。
ChainReducer – ChainReducer类允许在Reducer任务中的Reducer之后链接多个Mapper类。对于Reducer输出的每个记录,将以链接方式调用Mapper类。减速器的输出成为第一个映射器的输入,第一个的输出成为第二个映射器的输入,依此类推,直到最后一个映射器,最后一个映射器的输出将被写入任务的输出。
WrappedReducer-一种Reducer,用于包装给定的Reducer以允许自定义Reducer.Context实现。如果要提供Context接口的实现,则此Reducer很有用。
使用预定义的Mapper和Reducer类的示例
这是一些使用预定义的Mapper和Reducer类的示例。
使用FieldSelection映射器
在示例中,使用制表符分隔输入数据,并且我们要提取字段0作为键,字段1作为值。在这种情况下,我们可以使用FieldSelectionMapper而不是编写自己的映射器。
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.fieldsel.FieldSelectionHelper;
import org.apache.hadoop.mapreduce.lib.fieldsel.FieldSelectionMapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class StockPrice extends Configured implements Tool{
// Reduce function
public static class MaxStockPriceReducer extends Reducer<Text, Text, Text, IntWritable>{
public void reduce(Text key, Iterable values, Context context)
throws IOException, InterruptedException {
System.out.println("key -- " + key.toString());
int maxValue = Integer.MIN_VALUE;
for (Text val : values) {
System.out.println("Value -- " + val);
if(val != null && !val.toString().equals("")) {
maxValue = Math.max(maxValue, Integer.parseInt(val.toString()));
}
}
System.out.println("maxValue -- " + maxValue);
context.write(key, new IntWritable(maxValue));
}
}
public static void main(String[] args) throws Exception {
int exitFlag = ToolRunner.run(new StockPrice(), args);
System.exit(exitFlag);
}
@Override
public int run(String[] args) throws Exception {
Configuration conf = new Configuration();
// setting the separator
conf.set(FieldSelectionHelper.DATA_FIELD_SEPERATOR, "\t");
// Setting the fields that are to be extracted
conf.set(FieldSelectionHelper.MAP_OUTPUT_KEY_VALUE_SPEC, "0:1");
Job job = Job.getInstance(conf, "Stock price");
job.setJarByClass(getClass());
// setting the predefined mapper
job.setMapperClass(FieldSelectionMapper.class);
job.setReducerClass(MaxStockPriceReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
return job.waitForCompletion(true) ? 0 : 1;
}
}
使用TokenCounterMapper和IntSumReducer编写一个字数统计MapReduce程序
在Hadoop中的字数统计MapReduce程序中,我们看到了一个字数统计MR程序,其中在该程序中编写了Map和Reduce功能,但是我们可以使用预定义的Mapper和Reducer类编写一个字数统计MR程序,我们只需指定TokenCounterMapper类(预定义的Mapper类)和IntSumReducer类(预定义的Reducer类)。
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.map.TokenCounterMapper;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.mapreduce.lib.reduce.IntSumReducer;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class SimpleWordCount extends Configured implements Tool{
public static void main(String[] args) throws Exception{
int exitFlag = ToolRunner.run(new SimpleWordCount(), args);
System.exit(exitFlag);
}
@Override
public int run(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "WC");
job.setJarByClass(getClass());
// Setting pre-defing mapper and reducer
job.setMapperClass(TokenCounterMapper.class);
job.setReducerClass(IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
return job.waitForCompletion(true) ? 0 : 1;
}
}

