如何在Hadoop中链接MapReduce作业
在许多情况下,我们希望创建一系列MapReduce作业以完全转换和处理数据。这比将所有内容放在一个MapReduce作业中并使之变得非常复杂要好。
实际上,我们可以通过各种来源获取数据,也可以使用各种应用程序序列。这可以通过使用Oozie创建工作流程来完成,但这是另一篇文章的主题。在本文中,我们将介绍如何使用ChainMapper和ChainReducer在Hadoop中链接MapReduce作业。
Hadoop中的ChainMapper
ChainMapper是Hadoop中预定义的MapReduce类之一。 ChainMapper类允许我们在一个Map任务中使用多个Mapper类。以链接的方式调用Mapper类,第一个Mapper的输出成为第二个Mapper的输入,依此类推,直到最后一个Mapper,最后一个Mapper的输出将被写入任务的输出。
我们可以使用addMapper()方法将映射器添加到ChainMapper。
Hadoop中的ChainReducer
ChainReducer类允许在Reducer任务中的Reducer之后链接多个Mapper类。对于Reducer输出的每个记录,将以链接方式调用Mapper类。减速器的输出成为第一个映射器的输入,第一个的输出成为第二个映射器的输入,依此类推,直到最后一个映射器,最后一个映射器的输出将被写入任务的输出。
要将Mapper类添加到链式减速器中,可以使用addMapper()方法。
要将Reducer类设置为链式作业,可以使用setReducer()方法。
链接MapReduce作业
使用ChainMapper和ChainReducer类,可以组成看起来像 [MAP+ / REDUCE MAP*] 的MapReduce作业。
当我们使用链接的MapReduce时,可以组合使用以下内容:
一个或者多个映射器
单个Reducer
零个或者多个映射器(可选,仅在使用链接的reducer时使用)
使用链接的MapReduce作业时,来自映射器或者reducer的数据存储(并使用)在内存中,而不是存储在磁盘上,这会在很大程度上减少磁盘IO。
MapReduce链接示例
每天都有以下格式的股票数据,包括股票代码,价格和交易。
AAA 23 5677 BBB 23 12800 aaa 26 23785 ..... .....
在数据符号中并不总是大写。因此有两个映射器,首先提取相关字段(符号和事务)。在第二个映射器中,符号被转换为大写。
然后是一个减少器,它增加了每个交易品种的交易量。然后在reduce任务中,有一个InverseMapper可以反转键,值对。请注意,InverseMapper是Hadoop框架中的预定义Mapper类,这就是为什么示例代码中没有实现的原因。
import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.chain.ChainMapper; import org.apache.hadoop.mapreduce.lib.chain.ChainReducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.map.InverseMapper; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; public class StockTrans extends Configured implements Tool{ // Mapper 1 public static class StockFieldMapper extends Mapper<LongWritable, Text, Text, IntWritable>{ private Text symbol = new Text(); public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // Splitting the line on tab String[] stringArr = value.toString().split("\t"); //Setting symbol and transaction values symbol.set(stringArr[0]); Integer trans = Integer.parseInt(stringArr[2]); context.write(symbol, new IntWritable(trans)); } } // Mapper 2 public static class UpperCaseMapper extends Mapper<Text, IntWritable, Text, IntWritable>{ public void map(Text key, IntWritable value, Context context) throws IOException, InterruptedException { String symbol = key.toString().toUpperCase(); context.write(new Text(symbol), value); } } // Reduce function public static class TotalTransReducer extends Reducer<Text, IntWritable, Text, IntWritable>{ public void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { int sum = 0; for (IntWritable val : values) { sum += val.get(); } context.write(key, new IntWritable(sum)); } } public static void main(String[] args) throws Exception { int exitFlag = ToolRunner.run(new StockTrans(), args); System.exit(exitFlag); } @Override public int run(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf, "Stock transactio"); job.setJarByClass(getClass()); // MapReduce chaining Configuration map1Conf = new Configuration(false); ChainMapper.addMapper(job, StockFieldMapper.class, LongWritable.class, Text.class, Text.class, IntWritable.class, map1Conf); Configuration map2Conf = new Configuration(false); ChainMapper.addMapper(job, UpperCaseMapper.class, Text.class, IntWritable.class, Text.class, IntWritable.class, map2Conf); Configuration reduceConf = new Configuration(false); ChainReducer.setReducer(job, TotalTransReducer.class, Text.class, IntWritable.class, Text.class, IntWritable.class, reduceConf); ChainReducer.addMapper(job, InverseMapper.class, Text.class, IntWritable.class, IntWritable.class, Text.class, null); job.setOutputKeyClass(IntWritable.class); job.setOutputValueClass(Text.class); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); return job.waitForCompletion(true) ? 0 : 1; } }
创建jar包后运行此代码。
hadoop jar /home/theitroad/Documents/theitroad/theitroadhadoop.jar org.theitroad.StockTrans /user/input/StockTrans.txt /user/output/stock
输出量
hdfs dfs -cat /user/output/stock/part-r-00000 50483 AAA 180809 BBB