MapReduce如何在Hadoop中工作
在WordCount MapReduce程序中,我们已经看到了如何用Java编写MapReduce程序,创建一个jar并运行它。创建MapReduce作业需要做很多事情,而Hadoop框架在内部也要进行很多处理。在这篇文章中,我们将以字数统计MapReduce程序为例,详细了解MapReduce在Hadoop内部的工作方式。
什么是MapReduce
Hadoop MapReduce是一个框架,用于通过在节点集群上并行处理小块数据来编写可并行处理大量数据的应用程序。该框架可确保以可靠,容错的方式进行此分布式处理。
映射并缩小
Hadoop中的MapReduce作业包含两个阶段-
- Map阶段–它具有Mapper类,该类具有开发人员指定的map函数。 Map阶段的输入和输出是(键,值)对。当我们将必须处理的文件复制到HDFS时,它会分成独立的块。 Hadoop框架为每个块创建一个映射任务,并且这些映射任务并行运行。
- Reduce阶段它具有一个Reducer类,该类具有开发人员指定的reduce函数。 Reduce阶段的输入和输出也是(键,值)对。经过Hadoop框架进一步处理(称为排序和混排)后,Map阶段的输出将成为reduce阶段的输入。因此,Map阶段的输出是中间输出,并且通过Reduce阶段对其进行处理以生成最终输出。
由于map和reduce函数的输入和输出都是键,值对,因此,如果我们说map的输入为(K1,V1)而output为(K2,V2),则map函数的输入和输出将具有以下形式:
(K1,V1)->列表(K2,V2)
map函数的中间输出在输入以归约函数之前需要在框架中进行一些进一步的处理,即所谓的shuffle和sort阶段。 reduce函数的一般形式可以描述如下:
(K2,列表(V2))->列表(K3,V3)
在此请注意,reduce输入的类型与map输出的类型匹配。
MapReduce范例说明
让我们以Word计数MapReduce代码为例,看看在Map和Reduce阶段都发生了什么,以及MapReduce在Hadoop中如何工作。
当我们将输入文本文件放入HDFS时,它将被拆分为大块数据。为了简单起见,假设我们在文件中有两行,并且将其分为两部分,每一部分都有一行。
如果文本文件包含以下两行,
This is a test file This is a Hadoop MapReduce program file
然后将有两个分割,两个地图任务将获得这两个分割作为输入。
Mapper类
// Map function public static class WordMapper extends Mapper<LongWritable, Text, Text, IntWritable>{ private final static IntWritable one = new IntWritable(1); private Text word = new Text(); public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // Splitting the line on spaces String[] stringArr = value.toString().split("\s+"); for (String str : stringArr) { word.set(str); context.write(word, one); } } }
在Mapper类中,我们可以看到它具有四个参数,前两个用于指定map函数的输入,另一个用于指定map函数的输出。
在此字计数程序中,输入键值对如下所示:
key- 行开始处的文件中的关键字节偏移量。
Value–行的内容。
我们假设会有两个分割区(每个分割区有一行文件),并且有两个地图任务,例如Map-1和Map-2,因此对Map-1和Map-2的输入如下。
Map-1– (0, This is a test file)
Map-2– (0, This is a Hadoop MapReduce program file)
映射功能的逻辑是在空格处分割线,并将每个单词写入上下文,值为1.
因此,Map-1的输出如下:
(This, 1)
(is, 1)
( a, 1)
(test, 1)
(file, 1)
而Map-2的输出如下:
(This, 1)
(is, 1)
(a, 1)
(Hadoop, 1)
(MapReduce, 1)
(program, 1)
(file, 1)
Reducer类
// Reduce function public static class CountReducer extends Reducer<Text, IntWritable, Text, IntWritable>{ private IntWritable result = new IntWritable(); public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int sum = 0; for (IntWritable val : values) { sum += val.get(); } result.set(sum); context.write(key, result); } }
在Reducer类中,又有四个参数,其中两个用于reduce函数的输入类型,而两个用于输出类型。
请注意,reduce函数的输入类型必须与map函数的输出类型匹配。
Map的中间输出将在混洗阶段中由Hadoop框架进一步处理,在此阶段中,将根据键对这些中间输出进行排序和分组,以减少内部处理的输入,如下所示:
[Hadoop, (1)]
[MapReduce, (1)]
[This, (1, 1)]
[a, (1, 1)]
[file, (1, 1)]
[is, (1, 1)]
[program, (1)]
[test, (1)]
我们可以看到reduce函数的输入形式为(键,列表(值))。在reduce函数的逻辑中,对于每个键值对,迭代值列表并添加值。那将是最终的输出。
Hadoop 1
MapReduce 1
This 2
a 2
file. 2
is 2
program 1
test 1