Hadoop 2.x MapReduce(MR V1)字数统计示例
之前我们从理论上解释了" MapReduce如何执行字数统计"。
而且,如果您不熟悉HDFS Basic命令,请浏览我的文章" Hadoop HDFS Basic Developer Commands",以获取有关如何在CloudEra环境中执行HDFS命令的一些基本知识。
在本文中,我们将使用Hadoop 2 MapReduce API开发相同的WordCounting程序,并在CloudEra Environment中对其进行测试。
MapReduce WordCounting示例
我们需要编写以下三个程序来开发和测试MapReduce WordCount示例:
- Mapper程序
- Reducer程序
- 客户程序
注意:-要开发MapReduce程序,有两种版本的MR API:
- 一种来自Hadoop 1.x(MapReduce Old API)
- 另一个来自Hadoop 2.x(MapReduce New API)
在Hadoop 2.x中,不赞成使用MapReduce Old API。
因此,我们非常关注MapReduce New API,以开发此WordCount示例。
在CloudEra环境中,他们已经提供了带有Hadoop 2.x API的Eclipse IDE设置。
因此,使用此设置来开发和测试MapReduce程序非常容易。
要开发WordCount MapReduce应用程序,请使用以下步骤:
打开CloudEra Environment提供的默认Eclipse IDE。
我们可以使用已经创建的项目或者创建新的Java项目。
为简单起见,我将使用现有的"培训" Java项目。
他们已经将所有必需的Hadoop 2.x Jar添加到该项目类路径中。
准备使用Eclipse Java Project。创建WordCount Mapper程序
创建WordCount Reducer程序
创建WordCount客户端程序以测试此应用程序
让我们在下一部分中开始开发这三个程序。
Mapper程序
创建一个" WordCountMapper" Java类,它扩展了Mapper类,如下所示:
package com.theitroad.hadoop.mrv1.wordcount; import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> { public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String w = value.toString(); context.write(new Text(w), new IntWritable(1)); } }
代码说明:
我们的WordCountMapper类已经实现了Hadoop 2 MapReduce API类" Mapper"。
Mapper类已通过使用通用类型定义为Mapper <LongWritable,Text,Text,IntWritable>
此处<LongWritable,Text,Text,IntWritable>
- 前两个<LongWritable,Text>表示WordCount的Mapper程序的输入数据类型。
例如:-在我们的示例中,我们将给出一个File(大量数据,任何格式)。
映射器从该文件读取每一行,并给出一个唯一的编号,如下所示
<Unique_Long_Number, Line_Read_From_Input_File>
在Hadoop MapReduce API中,它等于<LongWritable,Text>。
- 最后两个<Text,IntWritable>表示我们的WordCount的Mapper程序的输出数据类型。
例如:-在我们的示例中,WordCount的Mapper程序给出的输出如下所示
<Unique_Word_From_Input_File, Word_Count>
In Hadoop MapReduce API, it is equal to <Text, IntWritable>.- 我们已经实现了Mapper的map()方法,并在此处提供了Mapping Function逻辑。
Reducer程序
创建一个" WordCountReducer" Java类,它扩展了Reducer类,如下所示:
package com.theitroad.hadoop.mrv1.wordcount; import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> { public void reduce(Text key, Iterable<IntWritable> values,Context context) throws IOException, InterruptedException { int sum = 0; for (IntWritable val : values) { sum += val.get(); } context.write(key, new IntWritable(sum)); } }
代码说明:
我们的WordCountReducer类扩展了Hadoop 2 MapReduce API类:" Reducer"。
Reducer类通过使用通用类型定义为Mapper <Text,IntWritable,Text,IntWritable>
此处<文本,可写,文本,可写>
- 前两个<Text,IntWritable>表示WordCount的Reducer程序的输入数据类型。
例如:-在我们的示例中,我们的Mapper程序将给出<Text,IntWritable>输出,它将成为Reducer程序的输入。
<Unique_Word_From_Input_File, Word_Count>
在Hadoop MapReduce API中,它等于<Text,IntWritable>。
- 最后两个<Text,IntWritable>表示WordCount的Reducer程序的输出数据类型。
例如:-在我们的示例中,WordCount的Reducer程序提供如下所示的输出
<Unique_Word_From_Input_File, Total_Word_Count>
In Hadoop MapReduce API, it is equal to <Text, IntWritable>.- 我们已经实现了Reducer的reduce()方法,并在此处提供了Reduce函数逻辑。
客户程序
使用main()方法创建一个" WordCountClient" Java类,如下所示:
package com.theitroad.hadoop.mrv1.wordcount; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; public class WordCountClient { public static void main(String[] args) throws Exception { Job job = Job.getInstance(new Configuration()); job.setJarByClass(WordCountClient.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); job.setMapperClass(WordCountMapper.class); job.setReducerClass(WordCountReducer.class); job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); boolean status = job.waitForCompletion(true); if (status) { System.exit(0); } else { System.exit(1); } } }
代码说明:
Hadoop 2 MapReduce API在" org.apache.hadoop.mapreduce"包中具有" Job"类。
作业类用于创建作业(映射/减少作业)以执行我们的字数统计任务。
客户程序正在使用Job Object的setter方法来设置所有MapReduce组件,例如Mapper,Reducer,输入数据类型,输出数据类型等。
这些作业将执行我们的字数统计映射和精简任务。
注意:
正如我们在上一篇文章中讨论的那样,MapReduce算法使用3个函数:映射函数,合并函数和归约函数。
通过观察这三个程序,我们可以发现仅开发了两个功能的一件事:Map和Reduce。
那合并功能呢?这意味着我们使用了Hadoop 2 MapReduce API中可用的默认Combine函数逻辑。
我们将在我的后续文章中讨论"如何开发合并功能"。
现在,我们已经开发了所有必需的组件(程序)。
现在该进行测试了。
测试MapReduce WordCounting示例
我们的WordCounting项目最终结构如下所示:
请使用以下步骤测试我们的MapReduce应用程序。
使用Eclipse IDE创建WordCount应用程序JAR文件。
执行以下" hadoop"命令以运行我们的WordCounting应用程序
语法:
hadoop jar <our-Jar-file-path> <Client-program> <Input-Path> <Output-Path>
让我们假设我们已经在Hadoop HDFS FileSytem中创建了"/ram/mrv1/output"文件夹结构。
如果不执行该操作,请浏览我以前的文章" Hadoop HDFS Basic Developer Commands"以创建它们。
例:
hadoop jar /home/cloudera/JDWordCountMapReduceApp.jar com.theitroad.hadoop.mrv1.wordcount.WordCountClient /ram/mrv1/NASDAQ_daily_prices_C.csv /ram/mrv1/output
注意:-为了简单易读,我将命令分成多行。
请在单行中键入此命令,如下所示:
通过查看该日志,我们可以观察到Map and Reduce作业如何解决我们的WordCounting问题。
- 执行以下" hadoop"命令以查看输出目录内容
hadoop fs -ls /ram/mrv1/output/
它显示"/ram/mrv1/output /"目录的内容,如下所示:
- 执行以下" hadoop"命令以查看我们的WordCounting应用程序输出
hadoop fs -cat /ram/mrv1/output/part-r-00000
此命令显示WordCounting应用程序输出。
由于我的输出文件太大,因此无法在此处显示我的文件输出。
注意:-这里我们使用了一些Hadoop HDFS命令来运行和测试WordCounting应用程序。