Avro MapReduce示例

时间:2020-01-09 10:34:36  来源:igfitidea点击:

这篇文章展示了使用Avro MapReduce API的Avro MapReduce示例程序。
作为示例,使用了字数统计MapReduce程序,其中输出将是Avro数据文件。

所需的jar包

avro-mapred-1.8.2.jar

Avro字数统计MapReduce示例

由于输出是Avro文件,因此必须定义Avro架构,因此架构中将有两个字段" word"和" count"。

在代码中,我们可以看到键和值对使用AvroKey和AvroValue。同样对于输出,使用AvroKeyOutputFormat类。
要定义地图输出,并将MaReduce作业的输出AvroJob类用于作业配置。

Avro MapReduce

import java.io.IOException;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.mapred.AvroKey;
import org.apache.avro.mapred.AvroValue;
import org.apache.avro.mapreduce.AvroJob;
import org.apache.avro.mapreduce.AvroKeyOutputFormat;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class AvroWordCount extends Configured implements Tool{
	/// Schema
  private	static final Schema AVRO_SCHEMA = new Schema.Parser().parse(
    "{\n" +
    "	\"type\":	\"record\",\n" +				
    "	\"name\":	\"WordCount\",\n" +
    "	\"doc\":	\"word count\",\n" +
    "	\"fields\":\n" + 
    "	[\n" + 
    "			{\"name\": \"word\",	\"type\":	\"string\"},\n"+ 
    "			{\"name\":	\"count\", \"type\":	\"int\"}\n"+
    "	]\n"+
    "}\n");
	
	// Map function    
  public static class AvroWordMapper extends Mapper<LongWritable, Text, AvroKey<Text>, 
      AvroValue<GenericRecord>>{
    private Text word = new Text();
    private GenericRecord record = new GenericData.Record(AVRO_SCHEMA);
    public void map(LongWritable key, Text value, Context context) 
        throws IOException, InterruptedException {
      // Splitting the line on spaces
      String[] stringArr = value.toString().split("\s+");
      for (String str : stringArr) {
        word.set(str);
        // creating Avro record
        record.put("word", str);
        record.put("count", 1);
        context.write(new AvroKey<Text>(word), new AvroValue<GenericRecord>(record));
      }
    }
  }
	
	// Reduce function
  public static class AvroWordReducer extends Reducer<AvroKey<Text>, AvroValue<GenericRecord>,
      AvroKey<GenericRecord>, NullWritable>{       
    public void reduce(AvroKey<Text> key, Iterable<AvroValue<GenericRecord>> values, Context context) 
        throws IOException, InterruptedException {
      int sum = 0;
      for (AvroValue<GenericRecord> value : values) {
        GenericRecord    record = value.datum();
        sum += (Integer)record.get("count");
      }
      GenericRecord record = new    GenericData.Record(AVRO_SCHEMA);
      record.put("word", key.datum());
      record.put("count", sum);
      context.write(new AvroKey<GenericRecord>(record), NullWritable.get());
    }
  }

  public static void main(String[] args) throws Exception{
    int exitFlag = ToolRunner.run(new AvroWordCount(), args);
    System.exit(exitFlag);
  }
	

  @Override
  public int run(String[] args) throws Exception {
    Configuration conf = new Configuration();
    Job job = Job.getInstance(conf, "AvroWC");
    job.setJarByClass(getClass());
    job.setMapperClass(AvroWordMapper.class);    
    job.setReducerClass(AvroWordReducer.class);
    
    AvroJob.setMapOutputKeySchema(job, Schema.create(Schema.Type.STRING));
    AvroJob.setMapOutputValueSchema(job, AVRO_SCHEMA);
    AvroJob.setOutputKeySchema(job,	AVRO_SCHEMA);

    job.setInputFormatClass(TextInputFormat.class);
    job.setOutputFormatClass(AvroKeyOutputFormat.class);
    FileInputFormat.addInputPath(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));
    return job.waitForCompletion(true) ? 0 : 1;
  }
}

创建jar之后,可以使用以下命令运行此Avro MapReduce程序。

hadoop jar /home/theitroad/theitroadhadoop.jar org.theitroad.AvroWordCount /user/input/count /user/out/result

该程序在只有两行的简单文本文件上执行。

This is a test file.
This is a Hadoop MapReduce program file.

可以使用avrotools.jar检查输出文件。

hadoop jar /path/to/avro-tools-1.8.2.jar tojson /user/out/result/part-r-00000.avro

{"word":"Hadoop","count":1}
{"word":"MapReduce","count":1}
{"word":"This","count":2}
{"word":"a","count":2}
{"word":"file.","count":2}
{"word":"is","count":2}
{"word":"program","count":1}
{"word":"test","count":1}