Hadoop 2.x MapReduce(MR V1)字数统计示例
之前我们从理论上解释了" MapReduce如何执行字数统计"。
而且,如果您不熟悉HDFS Basic命令,请浏览我的文章" Hadoop HDFS Basic Developer Commands",以获取有关如何在CloudEra环境中执行HDFS命令的一些基本知识。
在本文中,我们将使用Hadoop 2 MapReduce API开发相同的WordCounting程序,并在CloudEra Environment中对其进行测试。
MapReduce WordCounting示例
我们需要编写以下三个程序来开发和测试MapReduce WordCount示例:
- Mapper程序
- Reducer程序
- 客户程序
注意:-要开发MapReduce程序,有两种版本的MR API:
- 一种来自Hadoop 1.x(MapReduce Old API)
- 另一个来自Hadoop 2.x(MapReduce New API)
在Hadoop 2.x中,不赞成使用MapReduce Old API。
因此,我们非常关注MapReduce New API,以开发此WordCount示例。
在CloudEra环境中,他们已经提供了带有Hadoop 2.x API的Eclipse IDE设置。
因此,使用此设置来开发和测试MapReduce程序非常容易。
要开发WordCount MapReduce应用程序,请使用以下步骤:
- 打开CloudEra Environment提供的默认Eclipse IDE。 
- 我们可以使用已经创建的项目或者创建新的Java项目。 
- 为简单起见,我将使用现有的"培训" Java项目。 
 他们已经将所有必需的Hadoop 2.x Jar添加到该项目类路径中。
 准备使用Eclipse Java Project。
- 创建WordCount Mapper程序 
- 创建WordCount Reducer程序 
- 创建WordCount客户端程序以测试此应用程序 
让我们在下一部分中开始开发这三个程序。
Mapper程序
创建一个" WordCountMapper" Java类,它扩展了Mapper类,如下所示:
package com.theitroad.hadoop.mrv1.wordcount;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
	
	public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
		String w = value.toString();
		context.write(new Text(w), new IntWritable(1));
	}
}
代码说明:
- 我们的WordCountMapper类已经实现了Hadoop 2 MapReduce API类" Mapper"。 
- Mapper类已通过使用通用类型定义为Mapper <LongWritable,Text,Text,IntWritable> 
此处<LongWritable,Text,Text,IntWritable>
- 前两个<LongWritable,Text>表示WordCount的Mapper程序的输入数据类型。
例如:-在我们的示例中,我们将给出一个File(大量数据,任何格式)。
映射器从该文件读取每一行,并给出一个唯一的编号,如下所示
<Unique_Long_Number, Line_Read_From_Input_File>
在Hadoop MapReduce API中,它等于<LongWritable,Text>。
- 最后两个<Text,IntWritable>表示我们的WordCount的Mapper程序的输出数据类型。
例如:-在我们的示例中,WordCount的Mapper程序给出的输出如下所示
<Unique_Word_From_Input_File, Word_Count>
In Hadoop MapReduce API, it is equal to <Text, IntWritable>.- 我们已经实现了Mapper的map()方法,并在此处提供了Mapping Function逻辑。
Reducer程序
创建一个" WordCountReducer" Java类,它扩展了Reducer类,如下所示:
package com.theitroad.hadoop.mrv1.wordcount;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
	
	public void reduce(Text key, Iterable<IntWritable> values,Context context) throws IOException, InterruptedException {
		int sum = 0;
		for (IntWritable val : values) {
		sum += val.get();
		}
		context.write(key, new IntWritable(sum));
	}
	
}
代码说明:
- 我们的WordCountReducer类扩展了Hadoop 2 MapReduce API类:" Reducer"。 
- Reducer类通过使用通用类型定义为Mapper <Text,IntWritable,Text,IntWritable> 
此处<文本,可写,文本,可写>
- 前两个<Text,IntWritable>表示WordCount的Reducer程序的输入数据类型。
例如:-在我们的示例中,我们的Mapper程序将给出<Text,IntWritable>输出,它将成为Reducer程序的输入。
<Unique_Word_From_Input_File, Word_Count>
在Hadoop MapReduce API中,它等于<Text,IntWritable>。
- 最后两个<Text,IntWritable>表示WordCount的Reducer程序的输出数据类型。
例如:-在我们的示例中,WordCount的Reducer程序提供如下所示的输出
<Unique_Word_From_Input_File, Total_Word_Count>
In Hadoop MapReduce API, it is equal to <Text, IntWritable>.- 我们已经实现了Reducer的reduce()方法,并在此处提供了Reduce函数逻辑。
客户程序
使用main()方法创建一个" WordCountClient" Java类,如下所示:
package com.theitroad.hadoop.mrv1.wordcount;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
public class WordCountClient {
	public static void main(String[] args) throws Exception {
		Job job = Job.getInstance(new Configuration());
		job.setJarByClass(WordCountClient.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(IntWritable.class);
		job.setMapperClass(WordCountMapper.class);
		job.setReducerClass(WordCountReducer.class);
		job.setInputFormatClass(TextInputFormat.class);
		job.setOutputFormatClass(TextOutputFormat.class);
		FileInputFormat.setInputPaths(job, new Path(args[0]));
		FileOutputFormat.setOutputPath(job, new Path(args[1]));
		boolean status = job.waitForCompletion(true);
		if (status) {
			System.exit(0);
		} 
		else {
			System.exit(1);
		}
	}
}
代码说明:
- Hadoop 2 MapReduce API在" org.apache.hadoop.mapreduce"包中具有" Job"类。 
- 作业类用于创建作业(映射/减少作业)以执行我们的字数统计任务。 
- 客户程序正在使用Job Object的setter方法来设置所有MapReduce组件,例如Mapper,Reducer,输入数据类型,输出数据类型等。 
- 这些作业将执行我们的字数统计映射和精简任务。 
注意:
- 正如我们在上一篇文章中讨论的那样,MapReduce算法使用3个函数:映射函数,合并函数和归约函数。 
- 通过观察这三个程序,我们可以发现仅开发了两个功能的一件事:Map和Reduce。 
 那合并功能呢?
- 这意味着我们使用了Hadoop 2 MapReduce API中可用的默认Combine函数逻辑。 
- 我们将在我的后续文章中讨论"如何开发合并功能"。 
现在,我们已经开发了所有必需的组件(程序)。
现在该进行测试了。
测试MapReduce WordCounting示例
我们的WordCounting项目最终结构如下所示:
请使用以下步骤测试我们的MapReduce应用程序。
- 使用Eclipse IDE创建WordCount应用程序JAR文件。 
- 执行以下" hadoop"命令以运行我们的WordCounting应用程序 
语法:
hadoop jar <our-Jar-file-path> <Client-program> <Input-Path> <Output-Path>
让我们假设我们已经在Hadoop HDFS FileSytem中创建了"/ram/mrv1/output"文件夹结构。
如果不执行该操作,请浏览我以前的文章" Hadoop HDFS Basic Developer Commands"以创建它们。
例:
hadoop jar /home/cloudera/JDWordCountMapReduceApp.jar  
     com.theitroad.hadoop.mrv1.wordcount.WordCountClient 
     /ram/mrv1/NASDAQ_daily_prices_C.csv
     /ram/mrv1/output
注意:-为了简单易读,我将命令分成多行。
请在单行中键入此命令,如下所示:
通过查看该日志,我们可以观察到Map and Reduce作业如何解决我们的WordCounting问题。
- 执行以下" hadoop"命令以查看输出目录内容
hadoop fs -ls /ram/mrv1/output/
它显示"/ram/mrv1/output /"目录的内容,如下所示:
- 执行以下" hadoop"命令以查看我们的WordCounting应用程序输出
hadoop fs -cat /ram/mrv1/output/part-r-00000
此命令显示WordCounting应用程序输出。
由于我的输出文件太大,因此无法在此处显示我的文件输出。
注意:-这里我们使用了一些Hadoop HDFS命令来运行和测试WordCounting应用程序。

