如何在Hadoop中链接MapReduce作业

时间:2020-01-09 10:34:34  来源:igfitidea点击:

在许多情况下,我们希望创建一系列MapReduce作业以完全转换和处理数据。这比将所有内容放在一个MapReduce作业中并使之变得非常复杂要好。
实际上,我们可以通过各种来源获取数据,也可以使用各种应用程序序列。这可以通过使用Oozie创建工作流程来完成,但这是另一篇文章的主题。在本文中,我们将介绍如何使用ChainMapper和ChainReducer在Hadoop中链接MapReduce作业。

Hadoop中的ChainMapper

ChainMapper是Hadoop中预定义的MapReduce类之一。 ChainMapper类允许我们在一个Map任务中使用多个Mapper类。以链接的方式调用Mapper类,第一个Mapper的输出成为第二个Mapper的输入,依此类推,直到最后一个Mapper,最后一个Mapper的输出将被写入任务的输出。
我们可以使用addMapper()方法将映射器添加到ChainMapper。

Hadoop中的ChainReducer

ChainReducer类允许在Reducer任务中的Reducer之后链接多个Mapper类。对于Reducer输出的每个记录,将以链接方式调用Mapper类。减速器的输出成为第一个映射器的输入,第一个的输出成为第二个映射器的输入,依此类推,直到最后一个映射器,最后一个映射器的输出将被写入任务的输出。

要将Mapper类添加到链式减速器中,可以使用addMapper()方法。
要将Reducer类设置为链式作业,可以使用setReducer()方法。

链接MapReduce作业

使用ChainMapper和ChainReducer类,可以组成看起来像 [MAP+ / REDUCE MAP*] 的MapReduce作业。

当我们使用链接的MapReduce时,可以组合使用以下内容:

  • 一个或者多个映射器

  • 单个Reducer

  • 零个或者多个映射器(可选,仅在使用链接的reducer时使用)

使用链接的MapReduce作业时,来自映射器或者reducer的数据存储(并使用)在内存中,而不是存储在磁盘上,这会在很大程度上减少磁盘IO。

MapReduce链接示例

每天都有以下格式的股票数据,包括股票代码,价格和交易。

AAA		23	5677
BBB		23	12800
aaa		26	23785
.....
.....

在数据符号中并不总是大写。因此有两个映射器,首先提取相关字段(符号和事务)。在第二个映射器中,符号被转换为大写。

然后是一个减少器,它增加了每个交易品种的交易量。然后在reduce任务中,有一个InverseMapper可以反转键,值对。请注意,InverseMapper是Hadoop框架中的预定义Mapper类,这就是为什么示例代码中没有实现的原因。

import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.chain.ChainMapper;
import org.apache.hadoop.mapreduce.lib.chain.ChainReducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.map.InverseMapper;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class StockTrans extends Configured implements Tool{
  // Mapper 1
  public static class StockFieldMapper extends Mapper<LongWritable, Text, Text, IntWritable>{
    private Text symbol = new Text();
    
    public void map(LongWritable key, Text value, Context context) 
        throws IOException, InterruptedException {
      // Splitting the line on tab
      String[] stringArr = value.toString().split("\t");
      //Setting symbol and transaction values
      symbol.set(stringArr[0]);
      Integer trans = Integer.parseInt(stringArr[2]);
      context.write(symbol, new IntWritable(trans));
    }
  }
	
  // Mapper 2
  public static class UpperCaseMapper extends Mapper<Text, IntWritable, Text, IntWritable>{
    public void map(Text key, IntWritable value, Context context) 
        throws IOException, InterruptedException {
    
      String symbol = key.toString().toUpperCase();       
      context.write(new Text(symbol), value);
    }
  }
	
  // Reduce function
  public static class TotalTransReducer extends Reducer<Text, IntWritable, Text, IntWritable>{
    
    public void reduce(Text key, Iterable values, Context context) 
        throws IOException, InterruptedException {
      int sum = 0;
      for (IntWritable val : values) {
        sum += val.get();
      }      
      context.write(key, new IntWritable(sum));
    }
  }	

  public static void main(String[] args) throws Exception {
    int exitFlag = ToolRunner.run(new StockTrans(), args);
    System.exit(exitFlag);
  }
	
  @Override
  public int run(String[] args) throws Exception {
    Configuration conf = new Configuration();
    Job job = Job.getInstance(conf, "Stock transactio");
    job.setJarByClass(getClass());
    // MapReduce chaining
    Configuration map1Conf = new Configuration(false);
    ChainMapper.addMapper(job, StockFieldMapper.class, LongWritable.class, Text.class,
        Text.class, IntWritable.class,  map1Conf);
    
    Configuration map2Conf = new Configuration(false);
    ChainMapper.addMapper(job, UpperCaseMapper.class, Text.class, IntWritable.class,
           Text.class, IntWritable.class, map2Conf);
    
    Configuration reduceConf = new Configuration(false);		
    ChainReducer.setReducer(job, TotalTransReducer.class, Text.class, IntWritable.class,
        Text.class, IntWritable.class, reduceConf);

    ChainReducer.addMapper(job, InverseMapper.class, Text.class, IntWritable.class,
        IntWritable.class, Text.class, null);
     
    job.setOutputKeyClass(IntWritable.class);
    job.setOutputValueClass(Text.class);
    FileInputFormat.addInputPath(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));
    return job.waitForCompletion(true) ? 0 : 1;
  }
}

创建jar包后运行此代码。

hadoop jar /home/theitroad/Documents/theitroad/theitroadhadoop.jar org.theitroad.StockTrans /user/input/StockTrans.txt /user/output/stock

输出量

hdfs dfs -cat /user/output/stock/part-r-00000

50483	AAA
180809	BBB