Hadoop MapReduce中的计数器

时间:2020-01-09 10:34:34  来源:igfitidea点击:

Hadoop MapReduce中的计数器有助于获取有关MapReduce作业的统计信息。使用Hadoop中的计数器,我们可以获得有关已执行工作的常规信息,例如启动的Map和缩小任务,映射输入记录,使用该信息来诊断数据是否存在问题,使用计数器提供的信息进行一些性能调整,例如我们可以从柜台获得有关溢出的记录和使用的内存的信息,利用这些信息,我们可以尝试微调工作。

Hadoop中的计数器类型

在Hadoop中,使用MapReduce作业有许多内置计数器,这些计数器在运行作业后会显示在控制台上,或者我们可以使用UI来分析这些计数器。
我们还可以具有用户定义的计数器。因此,Hadoop中有两种类型的计数器。

  • 内置计数器

  • 用户定义的计数器

Hadoop中的内置计数器

Hadoop中的内置计数器可以分为以下几组,这些计数器在Hadoop框架中定义为Enum。

  • 文件系统计数器– org.apache.hadoop.mapreduce.FileSystemCounter

  • Map-Reduce框架计数器– org.apache.hadoop.mapreduce.TaskCounter

  • 文件输入格式计数器– org.apache.hadoop.mapreduce.lib.input.FileInputFormatCounter

  • 文件输出格式计数器– org.apache.hadoop.mapreduce.lib.output.FileOutputFormatCounter

  • 职位计数器– org.apache.hadoop.mapreduce.JobCounter

Hadoop中的文件系统计数器

  • 读取的字节数(BYTES_READ)–显示Map和Reduce任务读取的字节数。每个文件系统都有一个单独的条目。例如,如果从本地文件系统和HDFS读取字节,则将有两个前缀为FILE:和HDFS:的条目。

  • 写入的字节数(BYTES_WRITTEN)–显示Map和Reduce任务写入的字节数。

  • 读取操作数(READ_OPS)–显示Map和Reduce任务的读取操作数(如打开文件)。

  • 大型读取操作数(LARGE_READ_OPS)–显示Map和Reduce任务的大型操作数(例如通过大型目录结构)。

  • 写入操作数(WRITE_OPS)–显示Map和Reduce任务的写入操作数(如创建文件,添加到文件)。

Map-Reduce框架计数器

  • Map输入记录(MAP_INPUT_RECORDS)–所有Map处理的记录数。

  • Map输出记录(MAP_OUTPUT_RECORDS)–所有Map发出的输出记录数。

  • Map跳过的记录(MAP_SKIPPED_RECORDS)–所有Map跳过的记录数。

  • 映射输出字节(MAP_OUTPUT_BYTES)–所有映射的输出(以字节为单位)。

  • 映射输出实例化字节(MAP_OUTPUT_MATERIALIZED_BYTES)–将输出字节写入磁盘。

  • 输入分割字节(SPLIT_RAW_BYTES)–有关输入分割的元数据,以字节为单位。

  • 合并输入记录(COMBINE_INPUT_RECORDS)–合并器处理的输入记录数。

  • 合并输出记录(COMBINE_OUTPUT_RECORDS)–合并器发出的输出记录数。

  • 精简输入组(REDUCE_INPUT_GROUPS)–所有精简器处理的关键组的数量。

  • 减少随机播放字节(REDUCE_SHUFFLE_BYTES)–以字节为单位将映射输出复制到Reducer。

  • 减少输入记录(REDUCE_INPUT_RECORDS)–所有减速器处理的输入记录数。

  • 精简输出记录(REDUCE_OUTPUT_RECORDS)–所有精简器发出的输出记录数。

  • 减少已跳过的记录(REDUCE_SKIPPED_RECORDS)– Reducer所跳过的记录数。

  • 溢出的记录(SPILLED_RECORDS)–溢出到磁盘的记录数。

  • 随机映射(SHUFFLED_MAPS)–复制到运行reducer的节点的映射输出文件数。

  • 随机播放失败(FAILED_SHUFFLE)–随机播放期间失败的Map输出文件数。

  • 合并的Map输出(MERGED_MAP_OUTPUTS)–Map输出的nmber合并以创建化简器的输入。

  • GC时间已用(GC_TIME_MILLIS)–垃圾回收所花费的时间。

  • 花费的CPU时间(CPU_MILLISECONDS)–任务处理花费的CPU时间。

  • 物理内存快照(PHYSICAL_MEMORY_BYTES)–使用的总物理内存。

  • 虚拟内存快照(VIRTUAL_MEMORY_BYTES)–使用的虚拟内存总量。

  • 总提交堆使用量(COMMITTED_HEAP_BYTES)–可用的堆内存总量。

Hadoop中的文件输入格式计数器

  • 读取的字节数(BYTES_READ)– Map任务使用该任务使用的输入格式读取的字节数。

Hadoop中的文件输出格式计数器

  • 写入的字节数(BYTES_WRITTEN)– Map和reduce任务使用用于任务的输出格式写入的字节。

Hadoop中的作业计数器

  • 已启动的Map任务(TOTAL_LAUNCHED_MAPS)–已启动的Map任务的总数。

  • 已启动的缩减任务(TOTAL_LAUNCHED_REDUCES)–已启动的缩减任务总数。

  • 失败的Map任务(NUM_FAILED_MAPS)–失败的Map任务数。

  • 缩减任务失败(NUM_FAILED_REDUCES)–缩减任务失败的数量。

  • 已杀死的Map任务(NUM_KILLED_MAPS)–已杀死的Map任务的数量。

  • 已终止的缩减任务(NUM_KILLED_REDUCES)–已终止的缩减任务的数量。

  • 数据本地Map任务(DATA_LOCAL_MAPS)–在它们所处理的数据所在的同一节点上运行的Map任务的数量。

  • 机架本地Map任务(RACK_LOCAL_MAPS)–在机架中正在处理其数据的节点上还运行的Map任务的数量。

  • 已启动的超级任务(TOTAL_LAUNCHED_UBERTASKS)–已启动的超级任务总数。

  • 超级任务中的Map(NUM_UBER_SUBMAPS)–作为超级任务运行的Map数。

  • 减少超级任务(NUM_UBER_SUBREDUCES)–减少作为超级任务运行的数量。

  • 超级任务失败(NUM_FAILED_UBERTASKS)–超级任务失败的数量。

  • 所有Map任务花费的总时间(ms)(MILLIS_MAPS)–运行所有Map任务花费的时间。

  • 所有缩减任务花费的总时间(ms)(MILLIS_REDUCES)–运行所有缩减任务所花费的时间。

  • 所有Map任务花费的总vcore毫秒(VCORES_MILLIS_MAPS)–所有Map任务花费的总vcore时间。

  • 所有缩减任务占用的总vcore毫秒(VCORES_MILLIS_REDUCES)–所有缩减任务占用的总vcore时间。

从计数器的描述中可以看到;文件系统计数器,Map-Reduce框架计数器,文件输入格式计数器,文件输出格式计数器提供有关MapReduce作业中任务的统计信息。另一方面,作业计数器提供有关整体作业的统计信息。

Hadoop中的用户定义计数器

我们还可以在Hadoop MapReduce中创建用户定义的计数器。使用计数器还有助于调试,因为我们可以创建一个计数器并将其递增以达到某种条件,然后检查计数器输出,这也将使我们了解数据是否有问题。

要创建计数器,可以使用Java枚举。枚举中的每个字段都是一个计数器名称,因为枚举是这些计数器所属的组。

用户定义计数器Hadoop MapReduce示例

例如,如果我们有关于股票代码,价格和交易数量的数据,并且想要查看缺少交易的记录,则可以在MapReduce中创建一个计数器来做到这一点。

import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class StockData extends Configured implements Tool{
  enum Stock {
    TRANSACTION_MISSING
  }
  // Mapper 1
  public static class StockFieldMapper extends Mapper<LongWritable, Text, Text, IntWritable>{
    private Text symbol = new Text();
    Integer trans = 0;
    public void map(LongWritable key, Text value, Context context) 
        throws IOException, InterruptedException {
      // Splitting the line on tab
      String[] stringArr = value.toString().split("\t");
      //Setting symbol and transaction values
      symbol.set(stringArr[0]);
      if(stringArr[2] != null && !stringArr[2].trim().equals("")) {
        trans = Integer.parseInt(stringArr[2]);
      }else {
        // incrementing counter
        context.getCounter(Stock.TRANSACTION_MISSING).increment(1);
        trans = 0;
      }      
        context.write(symbol, new IntWritable(trans));
     }
  }
	
  // Reduce function
  public static class TotalTransReducer extends Reducer<Text, IntWritable, Text, IntWritable>{    
    public void reduce(Text key, Iterable values, Context context) 
        throws IOException, InterruptedException {
      int sum = 0;
      for (IntWritable val : values) {
        sum += val.get();
      }      
      context.write(key, new IntWritable(sum));
    }
  }

  public static void main(String[] args) throws Exception {
    int exitFlag = ToolRunner.run(new StockData(), args);
    System.exit(exitFlag);
  }

  @Override
  public int run(String[] args) throws Exception {
    Configuration conf = getConf();
    Job job = Job.getInstance(conf, "Stock data");
    job.setJarByClass(getClass());
    job.setMapperClass(StockFieldMapper.class);    
    job.setReducerClass(TotalTransReducer.class);	 
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);
    FileInputFormat.addInputPath(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));
    return job.waitForCompletion(true) ? 0 : 1;
  }
}

然后,在显示的计数器中,我们将看到类似以下内容的内容–

org.theitroad.StockData$Stock
	TRANSACTION_MISSING=3