Hadoop中的预定义Mapper和Reducer类
在Hadoop框架中,有一些预定义的Mapper和Reducer类可以在所需的场景中使用。这样一来,我们无需为这些方案编写映射器或者精简器,而可以使用现成的类。
让我们看一下Hadoop中一些预定义的Mapper和Reducer类。
Hadoop中的预定义Mapper类
InverseMapper –此预定义的映射器交换键和值。因此,输入(键,值)对反转,键成为值,值成为输出(键,值)对中的键。
TokenCounterMapper –该映射器标记输入值并发出每个单词,计数为1. 因此,在此单词映射MapReduce程序的情况下编写的映射器可以由此内置映射器代替。请参阅使用TokenCounterMapper和IntSumReducer的示例字计数程序。
MultithreadedMapper –这是Mapper的多线程实现。使用此MapRunnable的Mapper实现必须是线程安全的。
ChainMapper – ChainMapper类允许在一个Map任务中使用多个Mapper类。以链接的方式调用Mapper类,第一个Mapper的输出成为第二个Mapper的输入,依此类推,直到最后一个Mapper,最后一个Mapper的输出将被写入任务的输出。
FieldSelectionMapper –此类实现一个映射器类,该类可用于以类似于Unix cut的方式执行字段选择。输入数据被视为由用户指定的分隔符分隔的字段。用户可以指定构成地图输出键的字段列表,以及构成地图输出值的字段列表。
稍后请参阅使用FieldSelectionMapper的示例。
RegexMapper – Hadoop中的预定义Mapper类从与正则表达式匹配的输入中提取文本。
Hadoop中的预定义Reducer类
IntSumReducer –此预定义的Reducer类将与特定键关联的整数值相加。
LongSumReducer –此预定义的Reducer类将对与特定键关联的长值求和。
FieldSelectionReducer –此类实现一个reducer类,该类可用于以类似于Unix cut的方式执行字段选择。输入数据被视为由用户指定的分隔符分隔的字段。用户可以指定构成缩减输出键的字段列表,以及构成缩减输出值的字段列表。这些字段是键值和值之间的并集。
ChainReducer – ChainReducer类允许在Reducer任务中的Reducer之后链接多个Mapper类。对于Reducer输出的每个记录,将以链接方式调用Mapper类。减速器的输出成为第一个映射器的输入,第一个的输出成为第二个映射器的输入,依此类推,直到最后一个映射器,最后一个映射器的输出将被写入任务的输出。
WrappedReducer-一种Reducer,用于包装给定的Reducer以允许自定义Reducer.Context实现。如果要提供Context接口的实现,则此Reducer很有用。
使用预定义的Mapper和Reducer类的示例
这是一些使用预定义的Mapper和Reducer类的示例。
使用FieldSelection映射器
在示例中,使用制表符分隔输入数据,并且我们要提取字段0作为键,字段1作为值。在这种情况下,我们可以使用FieldSelectionMapper而不是编写自己的映射器。
import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.fieldsel.FieldSelectionHelper; import org.apache.hadoop.mapreduce.lib.fieldsel.FieldSelectionMapper; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; public class StockPrice extends Configured implements Tool{ // Reduce function public static class MaxStockPriceReducer extends Reducer<Text, Text, Text, IntWritable>{ public void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { System.out.println("key -- " + key.toString()); int maxValue = Integer.MIN_VALUE; for (Text val : values) { System.out.println("Value -- " + val); if(val != null && !val.toString().equals("")) { maxValue = Math.max(maxValue, Integer.parseInt(val.toString())); } } System.out.println("maxValue -- " + maxValue); context.write(key, new IntWritable(maxValue)); } } public static void main(String[] args) throws Exception { int exitFlag = ToolRunner.run(new StockPrice(), args); System.exit(exitFlag); } @Override public int run(String[] args) throws Exception { Configuration conf = new Configuration(); // setting the separator conf.set(FieldSelectionHelper.DATA_FIELD_SEPERATOR, "\t"); // Setting the fields that are to be extracted conf.set(FieldSelectionHelper.MAP_OUTPUT_KEY_VALUE_SPEC, "0:1"); Job job = Job.getInstance(conf, "Stock price"); job.setJarByClass(getClass()); // setting the predefined mapper job.setMapperClass(FieldSelectionMapper.class); job.setReducerClass(MaxStockPriceReducer.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); return job.waitForCompletion(true) ? 0 : 1; } }
使用TokenCounterMapper和IntSumReducer编写一个字数统计MapReduce程序
在Hadoop中的字数统计MapReduce程序中,我们看到了一个字数统计MR程序,其中在该程序中编写了Map和Reduce功能,但是我们可以使用预定义的Mapper和Reducer类编写一个字数统计MR程序,我们只需指定TokenCounterMapper类(预定义的Mapper类)和IntSumReducer类(预定义的Reducer类)。
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.map.TokenCounterMapper; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.mapreduce.lib.reduce.IntSumReducer; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; public class SimpleWordCount extends Configured implements Tool{ public static void main(String[] args) throws Exception{ int exitFlag = ToolRunner.run(new SimpleWordCount(), args); System.exit(exitFlag); } @Override public int run(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf, "WC"); job.setJarByClass(getClass()); // Setting pre-defing mapper and reducer job.setMapperClass(TokenCounterMapper.class); job.setReducerClass(IntSumReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); return job.waitForCompletion(true) ? 0 : 1; } }