Hadoop 2.x MapReduce(MR V1)字数统计示例

时间:2020-02-23 14:33:29  来源:igfitidea点击:

之前我们从理论上解释了" MapReduce如何执行字数统计"。

而且,如果您不熟悉HDFS Basic命令,请浏览我的文章" Hadoop HDFS Basic Developer Commands",以获取有关如何在CloudEra环境中执行HDFS命令的一些基本知识。

在本文中,我们将使用Hadoop 2 MapReduce API开发相同的WordCounting程序,并在CloudEra Environment中对其进行测试。

MapReduce WordCounting示例

我们需要编写以下三个程序来开发和测试MapReduce WordCount示例:

  • Mapper程序
  • Reducer程序
  • 客户程序

注意:-要开发MapReduce程序,有两种版本的MR API:

  • 一种来自Hadoop 1.x(MapReduce Old API)
  • 另一个来自Hadoop 2.x(MapReduce New API)

在Hadoop 2.x中,不赞成使用MapReduce Old API。
因此,我们非常关注MapReduce New API,以开发此WordCount示例。

在CloudEra环境中,他们已经提供了带有Hadoop 2.x API的Eclipse IDE设置。
因此,使用此设置来开发和测试MapReduce程序非常容易。

要开发WordCount MapReduce应用程序,请使用以下步骤:

  • 打开CloudEra Environment提供的默认Eclipse IDE。

  • 我们可以使用已经创建的项目或者创建新的Java项目。

  • 为简单起见,我将使用现有的"培训" Java项目。
    他们已经将所有必需的Hadoop 2.x Jar添加到该项目类路径中。
    准备使用Eclipse Java Project。

  • 创建WordCount Mapper程序

  • 创建WordCount Reducer程序

  • 创建WordCount客户端程序以测试此应用程序

让我们在下一部分中开始开发这三个程序。

Mapper程序

创建一个" WordCountMapper" Java类,它扩展了Mapper类,如下所示:

package com.theitroad.hadoop.mrv1.wordcount;

import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
	
	public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
		String w = value.toString();
		context.write(new Text(w), new IntWritable(1));
	}

}

代码说明:

  • 我们的WordCountMapper类已经实现了Hadoop 2 MapReduce API类" Mapper"。

  • Mapper类已通过使用通用类型定义为Mapper <LongWritable,Text,Text,IntWritable>

此处<LongWritable,Text,Text,IntWritable>

  • 前两个<LongWritable,Text>表示WordCount的Mapper程序的输入数据类型。

例如:-在我们的示例中,我们将给出一个File(大量数据,任何格式)。
映射器从该文件读取每一行,并给出一个唯一的编号,如下所示

<Unique_Long_Number, Line_Read_From_Input_File>

在Hadoop MapReduce API中,它等于<LongWritable,Text>。

  • 最后两个<Text,IntWritable>表示我们的WordCount的Mapper程序的输出数据类型。

例如:-在我们的示例中,WordCount的Mapper程序给出的输出如下所示

<Unique_Word_From_Input_File, Word_Count>

In Hadoop MapReduce API, it is equal to <Text, IntWritable>.- 我们已经实现了Mapper的map()方法,并在此处提供了Mapping Function逻辑。

Reducer程序

创建一个" WordCountReducer" Java类,它扩展了Reducer类,如下所示:

package com.theitroad.hadoop.mrv1.wordcount;

import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
	
	public void reduce(Text key, Iterable<IntWritable> values,Context context) throws IOException, InterruptedException {
		int sum = 0;
		for (IntWritable val : values) {
		sum += val.get();
		}
		context.write(key, new IntWritable(sum));
	}
	
}

代码说明:

  • 我们的WordCountReducer类扩展了Hadoop 2 MapReduce API类:" Reducer"。

  • Reducer类通过使用通用类型定义为Mapper <Text,IntWritable,Text,IntWritable>

此处<文本,可写,文本,可写>

  • 前两个<Text,IntWritable>表示WordCount的Reducer程序的输入数据类型。

例如:-在我们的示例中,我们的Mapper程序将给出<Text,IntWritable>输出,它将成为Reducer程序的输入。

<Unique_Word_From_Input_File, Word_Count>

在Hadoop MapReduce API中,它等于<Text,IntWritable>。

  • 最后两个<Text,IntWritable>表示WordCount的Reducer程序的输出数据类型。

例如:-在我们的示例中,WordCount的Reducer程序提供如下所示的输出

<Unique_Word_From_Input_File, Total_Word_Count>

In Hadoop MapReduce API, it is equal to <Text, IntWritable>.- 我们已经实现了Reducer的reduce()方法,并在此处提供了Reduce函数逻辑。

客户程序

使用main()方法创建一个" WordCountClient" Java类,如下所示:

package com.theitroad.hadoop.mrv1.wordcount;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

public class WordCountClient {

	public static void main(String[] args) throws Exception {
		Job job = Job.getInstance(new Configuration());
		job.setJarByClass(WordCountClient.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(IntWritable.class);
		job.setMapperClass(WordCountMapper.class);
		job.setReducerClass(WordCountReducer.class);
		job.setInputFormatClass(TextInputFormat.class);
		job.setOutputFormatClass(TextOutputFormat.class);
		FileInputFormat.setInputPaths(job, new Path(args[0]));
		FileOutputFormat.setOutputPath(job, new Path(args[1]));
		boolean status = job.waitForCompletion(true);
		if (status) {
			System.exit(0);
		} 
		else {
			System.exit(1);
		}
	}

}

代码说明:

  • Hadoop 2 MapReduce API在" org.apache.hadoop.mapreduce"包中具有" Job"类。

  • 作业类用于创建作业(映射/减少作业)以执行我们的字数统计任务。

  • 客户程序正在使用Job Object的setter方法来设置所有MapReduce组件,例如Mapper,Reducer,输入数据类型,输出数据类型等。

  • 这些作业将执行我们的字数统计映射和精简任务。

注意:

  • 正如我们在上一篇文章中讨论的那样,MapReduce算法使用3个函数:映射函数,合并函数和归约函数。

  • 通过观察这三个程序,我们可以发现仅开发了两个功能的一件事:Map和Reduce。
    那合并功能呢?

  • 这意味着我们使用了Hadoop 2 MapReduce API中可用的默认Combine函数逻辑。

  • 我们将在我的后续文章中讨论"如何开发合并功能"。

现在,我们已经开发了所有必需的组件(程序)。
现在该进行测试了。

测试MapReduce WordCounting示例

我们的WordCounting项目最终结构如下所示:

请使用以下步骤测试我们的MapReduce应用程序。

  • 使用Eclipse IDE创建WordCount应用程序JAR文件。

  • 执行以下" hadoop"命令以运行我们的WordCounting应用程序

语法:

hadoop jar <our-Jar-file-path> <Client-program>  <Input-Path> <Output-Path>

让我们假设我们已经在Hadoop HDFS FileSytem中创建了"/ram/mrv1/output"文件夹结构。
如果不执行该操作,请浏览我以前的文章" Hadoop HDFS Basic Developer Commands"以创建它们。

例:

hadoop jar /home/cloudera/JDWordCountMapReduceApp.jar  
     com.theitroad.hadoop.mrv1.wordcount.WordCountClient 
     /ram/mrv1/NASDAQ_daily_prices_C.csv
     /ram/mrv1/output

注意:-为了简单易读,我将命令分成多行。
请在单行中键入此命令,如下所示:

通过查看该日志,我们可以观察到Map and Reduce作业如何解决我们的WordCounting问题。

  • 执行以下" hadoop"命令以查看输出目录内容
hadoop fs -ls /ram/mrv1/output/

它显示"/ram/mrv1/output /"目录的内容,如下所示:

  • 执行以下" hadoop"命令以查看我们的WordCounting应用程序输出
hadoop fs -cat /ram/mrv1/output/part-r-00000

此命令显示WordCounting应用程序输出。
由于我的输出文件太大,因此无法在此处显示我的文件输出。

注意:-这里我们使用了一些Hadoop HDFS命令来运行和测试WordCounting应用程序。