Hadoop MapReduce中的计数器
Hadoop MapReduce中的计数器有助于获取有关MapReduce作业的统计信息。使用Hadoop中的计数器,我们可以获得有关已执行工作的常规信息,例如启动的Map和缩小任务,映射输入记录,使用该信息来诊断数据是否存在问题,使用计数器提供的信息进行一些性能调整,例如我们可以从柜台获得有关溢出的记录和使用的内存的信息,利用这些信息,我们可以尝试微调工作。
Hadoop中的计数器类型
在Hadoop中,使用MapReduce作业有许多内置计数器,这些计数器在运行作业后会显示在控制台上,或者我们可以使用UI来分析这些计数器。
我们还可以具有用户定义的计数器。因此,Hadoop中有两种类型的计数器。
内置计数器
用户定义的计数器
Hadoop中的内置计数器
Hadoop中的内置计数器可以分为以下几组,这些计数器在Hadoop框架中定义为Enum。
文件系统计数器– org.apache.hadoop.mapreduce.FileSystemCounter
Map-Reduce框架计数器– org.apache.hadoop.mapreduce.TaskCounter
文件输入格式计数器– org.apache.hadoop.mapreduce.lib.input.FileInputFormatCounter
文件输出格式计数器– org.apache.hadoop.mapreduce.lib.output.FileOutputFormatCounter
职位计数器– org.apache.hadoop.mapreduce.JobCounter
Hadoop中的文件系统计数器
读取的字节数(BYTES_READ)–显示Map和Reduce任务读取的字节数。每个文件系统都有一个单独的条目。例如,如果从本地文件系统和HDFS读取字节,则将有两个前缀为FILE:和HDFS:的条目。
写入的字节数(BYTES_WRITTEN)–显示Map和Reduce任务写入的字节数。
读取操作数(READ_OPS)–显示Map和Reduce任务的读取操作数(如打开文件)。
大型读取操作数(LARGE_READ_OPS)–显示Map和Reduce任务的大型操作数(例如通过大型目录结构)。
写入操作数(WRITE_OPS)–显示Map和Reduce任务的写入操作数(如创建文件,添加到文件)。
Map-Reduce框架计数器
Map输入记录(MAP_INPUT_RECORDS)–所有Map处理的记录数。
Map输出记录(MAP_OUTPUT_RECORDS)–所有Map发出的输出记录数。
Map跳过的记录(MAP_SKIPPED_RECORDS)–所有Map跳过的记录数。
映射输出字节(MAP_OUTPUT_BYTES)–所有映射的输出(以字节为单位)。
映射输出实例化字节(MAP_OUTPUT_MATERIALIZED_BYTES)–将输出字节写入磁盘。
输入分割字节(SPLIT_RAW_BYTES)–有关输入分割的元数据,以字节为单位。
合并输入记录(COMBINE_INPUT_RECORDS)–合并器处理的输入记录数。
合并输出记录(COMBINE_OUTPUT_RECORDS)–合并器发出的输出记录数。
精简输入组(REDUCE_INPUT_GROUPS)–所有精简器处理的关键组的数量。
减少随机播放字节(REDUCE_SHUFFLE_BYTES)–以字节为单位将映射输出复制到Reducer。
减少输入记录(REDUCE_INPUT_RECORDS)–所有减速器处理的输入记录数。
精简输出记录(REDUCE_OUTPUT_RECORDS)–所有精简器发出的输出记录数。
减少已跳过的记录(REDUCE_SKIPPED_RECORDS)– Reducer所跳过的记录数。
溢出的记录(SPILLED_RECORDS)–溢出到磁盘的记录数。
随机映射(SHUFFLED_MAPS)–复制到运行reducer的节点的映射输出文件数。
随机播放失败(FAILED_SHUFFLE)–随机播放期间失败的Map输出文件数。
合并的Map输出(MERGED_MAP_OUTPUTS)–Map输出的nmber合并以创建化简器的输入。
GC时间已用(GC_TIME_MILLIS)–垃圾回收所花费的时间。
花费的CPU时间(CPU_MILLISECONDS)–任务处理花费的CPU时间。
物理内存快照(PHYSICAL_MEMORY_BYTES)–使用的总物理内存。
虚拟内存快照(VIRTUAL_MEMORY_BYTES)–使用的虚拟内存总量。
总提交堆使用量(COMMITTED_HEAP_BYTES)–可用的堆内存总量。
Hadoop中的文件输入格式计数器
- 读取的字节数(BYTES_READ)– Map任务使用该任务使用的输入格式读取的字节数。
Hadoop中的文件输出格式计数器
- 写入的字节数(BYTES_WRITTEN)– Map和reduce任务使用用于任务的输出格式写入的字节。
Hadoop中的作业计数器
已启动的Map任务(TOTAL_LAUNCHED_MAPS)–已启动的Map任务的总数。
已启动的缩减任务(TOTAL_LAUNCHED_REDUCES)–已启动的缩减任务总数。
失败的Map任务(NUM_FAILED_MAPS)–失败的Map任务数。
缩减任务失败(NUM_FAILED_REDUCES)–缩减任务失败的数量。
已杀死的Map任务(NUM_KILLED_MAPS)–已杀死的Map任务的数量。
已终止的缩减任务(NUM_KILLED_REDUCES)–已终止的缩减任务的数量。
数据本地Map任务(DATA_LOCAL_MAPS)–在它们所处理的数据所在的同一节点上运行的Map任务的数量。
机架本地Map任务(RACK_LOCAL_MAPS)–在机架中正在处理其数据的节点上还运行的Map任务的数量。
已启动的超级任务(TOTAL_LAUNCHED_UBERTASKS)–已启动的超级任务总数。
超级任务中的Map(NUM_UBER_SUBMAPS)–作为超级任务运行的Map数。
减少超级任务(NUM_UBER_SUBREDUCES)–减少作为超级任务运行的数量。
超级任务失败(NUM_FAILED_UBERTASKS)–超级任务失败的数量。
所有Map任务花费的总时间(ms)(MILLIS_MAPS)–运行所有Map任务花费的时间。
所有缩减任务花费的总时间(ms)(MILLIS_REDUCES)–运行所有缩减任务所花费的时间。
所有Map任务花费的总vcore毫秒(VCORES_MILLIS_MAPS)–所有Map任务花费的总vcore时间。
所有缩减任务占用的总vcore毫秒(VCORES_MILLIS_REDUCES)–所有缩减任务占用的总vcore时间。
从计数器的描述中可以看到;文件系统计数器,Map-Reduce框架计数器,文件输入格式计数器,文件输出格式计数器提供有关MapReduce作业中任务的统计信息。另一方面,作业计数器提供有关整体作业的统计信息。
Hadoop中的用户定义计数器
我们还可以在Hadoop MapReduce中创建用户定义的计数器。使用计数器还有助于调试,因为我们可以创建一个计数器并将其递增以达到某种条件,然后检查计数器输出,这也将使我们了解数据是否有问题。
要创建计数器,可以使用Java枚举。枚举中的每个字段都是一个计数器名称,因为枚举是这些计数器所属的组。
用户定义计数器Hadoop MapReduce示例
例如,如果我们有关于股票代码,价格和交易数量的数据,并且想要查看缺少交易的记录,则可以在MapReduce中创建一个计数器来做到这一点。
import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; public class StockData extends Configured implements Tool{ enum Stock { TRANSACTION_MISSING } // Mapper 1 public static class StockFieldMapper extends Mapper<LongWritable, Text, Text, IntWritable>{ private Text symbol = new Text(); Integer trans = 0; public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // Splitting the line on tab String[] stringArr = value.toString().split("\t"); //Setting symbol and transaction values symbol.set(stringArr[0]); if(stringArr[2] != null && !stringArr[2].trim().equals("")) { trans = Integer.parseInt(stringArr[2]); }else { // incrementing counter context.getCounter(Stock.TRANSACTION_MISSING).increment(1); trans = 0; } context.write(symbol, new IntWritable(trans)); } } // Reduce function public static class TotalTransReducer extends Reducer<Text, IntWritable, Text, IntWritable>{ public void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { int sum = 0; for (IntWritable val : values) { sum += val.get(); } context.write(key, new IntWritable(sum)); } } public static void main(String[] args) throws Exception { int exitFlag = ToolRunner.run(new StockData(), args); System.exit(exitFlag); } @Override public int run(String[] args) throws Exception { Configuration conf = getConf(); Job job = Job.getInstance(conf, "Stock data"); job.setJarByClass(getClass()); job.setMapperClass(StockFieldMapper.class); job.setReducerClass(TotalTransReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); return job.waitForCompletion(true) ? 0 : 1; } }
然后,在显示的计数器中,我们将看到类似以下内容的内容–
org.theitroad.StockData$Stock TRANSACTION_MISSING=3