Apache Pig UDF:存储函数

时间:2020-02-23 14:37:38  来源:igfitidea点击:

StoreFunc抽象类具有用于存储数据的主要方法,并且在大多数情况下,对其进行扩展就足够了。
有一个可选接口可以实现以实现扩展函数:

StoreMetadata

该接口具有与元数据系统进行交互以存储架构和统计信息的方法。
该接口是可选的,仅在需要存储元数据时才应实现。

下面说明了需要在StoreFunc中覆盖的方法:

  • getOutputFormat():Pig将调用此方法以获取存储器使用的OutputFormat。 Pig将以与Map-reduce Java程序中Hadoop相同的方式和上下文来调用OutputFormat中的方法。如果OutputFormat是Hadoop打包的,则实现应在org.apache.hadoop.mapreduce下使用基于新API的API。如果是自定义OutputFormat,则应使用org.apache.hadoop.mapreduce下的新API来实现。 Pig将调用OutputFormat的checkOutputSpecs()方法以预先检查输出位置。启动作业时,此方法也将作为Hadoop调用序列的一部分被调用。因此,实现应确保可以多次调用此方法而不会产生不一致的副作用。

  • setStoreLocation():Pig调用此方法以将商店位置传达给存储者。存储器应使用此方法将相同的信息传达给基础OutputFormat。 Pig多次调用此方法。实施中应注意,多次调用此方法,并应确保不会因多次调用而产生不一致的副作用。

  • prepareToWrite():在新的API中,数据的写入是通过StoreFunc提供的OutputFormat进行的。在prepareToWrite()中,与StoreFunc提供的OutputFormat关联的RecordWriter被传递到StoreFunc。然后,实现可以在putNext()中使用RecordWriter来以RecordWriter期望的方式编写一个表示数据记录的元组。

  • putNext():putNext()的含义没有改变,由Pig运行时调用以在新API中写入下一个数据元组,这是实现将使用基础RecordWriter将元组写出的方法。

StoreFunc中的默认实现:

  • setStoreFuncUDFContextSignature():Pig会在前端和后端调用此方法,以将唯一签名传递给Storer。签名可用于将任何信息存储到UDFContext中,存储器需要在前端和后端的各种方法调用之间存储这些信息。 StoreFunc中的默认实现的主体为空。该方法将在任何其他方法之前调用。

  • relToAbsPathForStoreLocation():Pig运行时将调用此方法,以允许存储库将相对存储位置转换为绝对位置。 StoreFunc中提供了一个实现,用于基于文件系统的位置进行处理。

  • checkSchema():存储函数应实现此函数,以检查描述要写入的数据的给定架构是否可接受。 StoreFunc中的默认实现的主体为空。在对setStoreLocation()进行任何调用之前,将调用此方法。

示例实现:

该示例中的存储器实现是文本数据存储器,其中行分隔符为'','为默认字段分隔符(可以通过在构造函数中传递不同的字段分隔符来覆盖),这类似于Pig中的当前PigStorage存储器。
该实现使用现有的Hadoop支持的OutputFormat TextOutputFormat作为基础OutputFormat。

public class SimpleTextStorer extends StoreFunc {
protected RecordWriter writer = null;
private byte fieldDel = '	';
private static final int BUFFER_SIZE = 1024;
private static final String UTF8 = "UTF-8";
public PigStorage() {
}
public PigStorage(String delimiter) {
this();
if (delimiter.length() == 1) {
this.fieldDel = (byte)delimiter.charAt(0);
} else if (delimiter.length() > 1delimiter.charAt(0) == '') {
switch (delimiter.charAt(1)) {
case 't':
this.fieldDel = (byte)'	';
break;
case 'x':
fieldDel =
Integer.valueOf(delimiter.substring(2), 16).byteValue();
break;
case 'u':
this.fieldDel =
Integer.valueOf(delimiter.substring(2)).byteValue();
break;
default:
throw new RuntimeException("Unknown delimiter " + delimiter);
}
} else {
throw new RuntimeException("PigStorage delimeter must be a single character");
}
}
ByteArrayOutputStream mOut = new ByteArrayOutputStream(BUFFER_SIZE);
@Override
public void putNext(Tuple f) throws IOException {
int sz = f.size();
for (int i = 0; i < sz; i++) {
Object field;
try {
field = f.get(i);
} catch (ExecException ee) {
throw ee;
}
putField(field);
if (i != sz - 1) {
mOut.write(fieldDel);
}
}
Text text = new Text(mOut.toByteArray());
try {
writer.write(null, text);
mOut.reset();
} catch (InterruptedException e) {
throw new IOException(e);
}
}
@SuppressWarnings("unchecked")
private void putField(Object field) throws IOException {
//string constants for each delimiter
String tupleBeginDelim = "(";
String tupleEndDelim = ")";
String bagBeginDelim = "{";
String bagEndDelim = "}";
String mapBeginDelim = "[";
String mapEndDelim = "]";
String fieldDelim = ",";
String mapKeyValueDelim = "#";
switch (DataType.findType(field)) {
case DataType.NULL:
break; //just leave it empty
case DataType.BOOLEAN:
mOut.write(((Boolean)field).toString().getBytes());
break;
case DataType.INTEGER:
mOut.write(((Integer)field).toString().getBytes());
break;
case DataType.LONG:
mOut.write(((Long)field).toString().getBytes());
break;
case DataType.FLOAT:
mOut.write(((Float)field).toString().getBytes());
break;
case DataType.DOUBLE:
mOut.write(((Double)field).toString().getBytes());
break;
case DataType.BYTEARRAY: {
byte[] b = ((DataByteArray)field).get();
mOut.write(b, 0, b.length);
break;
}
case DataType.CHARARRAY:
//oddly enough, writeBytes writes a string
mOut.write(((String)field).getBytes(UTF8));
break;
case DataType.MAP:
boolean mapHasNext = false;
Map<String, Object> m = (Map<String, Object>)field;
mOut.write(mapBeginDelim.getBytes(UTF8));
for(Map.Entry<String, Object> e: m.entrySet()) {
if(mapHasNext) {
mOut.write(fieldDelim.getBytes(UTF8));
} else {
mapHasNext = true;
}
putField(e.getKey());
mOut.write(mapKeyValueDelim.getBytes(UTF8));
putField(e.getValue());
}
mOut.write(mapEndDelim.getBytes(UTF8));
break;
case DataType.TUPLE:
boolean tupleHasNext = false;
Tuple t = (Tuple)field;
mOut.write(tupleBeginDelim.getBytes(UTF8));
for(int i = 0; i < t.size(); ++i) {
if(tupleHasNext) {
mOut.write(fieldDelim.getBytes(UTF8));
} else {
tupleHasNext = true;
}
try {
putField(t.get(i));
} catch (ExecException ee) {
throw ee;
}
}
mOut.write(tupleEndDelim.getBytes(UTF8));
break;
case DataType.BAG:
boolean bagHasNext = false;
mOut.write(bagBeginDelim.getBytes(UTF8));
Iterator<Tuple> tupleIter = ((DataBag)field).iterator();
while(tupleIter.hasNext()) {
if(bagHasNext) {
mOut.write(fieldDelim.getBytes(UTF8));
} else {
bagHasNext = true;
}
putField((Object)tupleIter.next());
}
mOut.write(bagEndDelim.getBytes(UTF8));
break;
default: {
int errCode = 2108;
String msg = "Could not determine data type of field: " + field;
throw new ExecException(msg, errCode, PigException.BUG);
}
}
}
@Override
public OutputFormat getOutputFormat() {
return new TextOutputFormat<WritableComparable, Text>();
}
@Override
public void prepareToWrite(RecordWriter writer) {
this.writer = writer;
}
@Override
public void setStoreLocation(String location, Job job) throws IOException {
job.getConfiguration().set("mapred.textoutputformat.separator", "");
FileOutputFormat.setOutputPath(job, new Path(location));
if (location.endsWith(".bz2")) {
FileOutputFormat.setCompressOutput(job, true);
FileOutputFormat.setOutputCompressorClass(job,  BZip2Codec.class);
}  else if (location.endsWith(".gz")) {
FileOutputFormat.setCompressOutput(job, true);
FileOutputFormat.setOutputCompressorClass(job, GzipCodec.class);
}
}
}