Avro MapReduce示例
时间:2020-01-09 10:34:36 来源:igfitidea点击:
这篇文章展示了使用Avro MapReduce API的Avro MapReduce示例程序。
作为示例,使用了字数统计MapReduce程序,其中输出将是Avro数据文件。
所需的jar包
avro-mapred-1.8.2.jar
Avro字数统计MapReduce示例
由于输出是Avro文件,因此必须定义Avro架构,因此架构中将有两个字段" word"和" count"。
在代码中,我们可以看到键和值对使用AvroKey和AvroValue。同样对于输出,使用AvroKeyOutputFormat类。
要定义地图输出,并将MaReduce作业的输出AvroJob类用于作业配置。
Avro MapReduce
import java.io.IOException;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.mapred.AvroKey;
import org.apache.avro.mapred.AvroValue;
import org.apache.avro.mapreduce.AvroJob;
import org.apache.avro.mapreduce.AvroKeyOutputFormat;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class AvroWordCount extends Configured implements Tool{
/// Schema
private static final Schema AVRO_SCHEMA = new Schema.Parser().parse(
"{\n" +
" \"type\": \"record\",\n" +
" \"name\": \"WordCount\",\n" +
" \"doc\": \"word count\",\n" +
" \"fields\":\n" +
" [\n" +
" {\"name\": \"word\", \"type\": \"string\"},\n"+
" {\"name\": \"count\", \"type\": \"int\"}\n"+
" ]\n"+
"}\n");
// Map function
public static class AvroWordMapper extends Mapper<LongWritable, Text, AvroKey<Text>,
AvroValue<GenericRecord>>{
private Text word = new Text();
private GenericRecord record = new GenericData.Record(AVRO_SCHEMA);
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
// Splitting the line on spaces
String[] stringArr = value.toString().split("\s+");
for (String str : stringArr) {
word.set(str);
// creating Avro record
record.put("word", str);
record.put("count", 1);
context.write(new AvroKey<Text>(word), new AvroValue<GenericRecord>(record));
}
}
}
// Reduce function
public static class AvroWordReducer extends Reducer<AvroKey<Text>, AvroValue<GenericRecord>,
AvroKey<GenericRecord>, NullWritable>{
public void reduce(AvroKey<Text> key, Iterable<AvroValue<GenericRecord>> values, Context context)
throws IOException, InterruptedException {
int sum = 0;
for (AvroValue<GenericRecord> value : values) {
GenericRecord record = value.datum();
sum += (Integer)record.get("count");
}
GenericRecord record = new GenericData.Record(AVRO_SCHEMA);
record.put("word", key.datum());
record.put("count", sum);
context.write(new AvroKey<GenericRecord>(record), NullWritable.get());
}
}
public static void main(String[] args) throws Exception{
int exitFlag = ToolRunner.run(new AvroWordCount(), args);
System.exit(exitFlag);
}
@Override
public int run(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "AvroWC");
job.setJarByClass(getClass());
job.setMapperClass(AvroWordMapper.class);
job.setReducerClass(AvroWordReducer.class);
AvroJob.setMapOutputKeySchema(job, Schema.create(Schema.Type.STRING));
AvroJob.setMapOutputValueSchema(job, AVRO_SCHEMA);
AvroJob.setOutputKeySchema(job, AVRO_SCHEMA);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(AvroKeyOutputFormat.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
return job.waitForCompletion(true) ? 0 : 1;
}
}
创建jar之后,可以使用以下命令运行此Avro MapReduce程序。
hadoop jar /home/theitroad/theitroadhadoop.jar org.theitroad.AvroWordCount /user/input/count /user/out/result
该程序在只有两行的简单文本文件上执行。
This is a test file. This is a Hadoop MapReduce program file.
可以使用avrotools.jar检查输出文件。
hadoop jar /path/to/avro-tools-1.8.2.jar tojson /user/out/result/part-r-00000.avro
{"word":"Hadoop","count":1}
{"word":"MapReduce","count":1}
{"word":"This","count":2}
{"word":"a","count":2}
{"word":"file.","count":2}
{"word":"is","count":2}
{"word":"program","count":1}
{"word":"test","count":1}

