Apache Pig UDF:存储函数
StoreFunc抽象类具有用于存储数据的主要方法,并且在大多数情况下,对其进行扩展就足够了。
有一个可选接口可以实现以实现扩展函数:
StoreMetadata
该接口具有与元数据系统进行交互以存储架构和统计信息的方法。
该接口是可选的,仅在需要存储元数据时才应实现。
下面说明了需要在StoreFunc中覆盖的方法:
getOutputFormat():
Pig将调用此方法以获取存储器使用的OutputFormat。 Pig将以与Map-reduce Java程序中Hadoop相同的方式和上下文来调用OutputFormat中的方法。如果OutputFormat是Hadoop打包的,则实现应在org.apache.hadoop.mapreduce下使用基于新API的API。如果是自定义OutputFormat,则应使用org.apache.hadoop.mapreduce下的新API来实现。 Pig将调用OutputFormat的checkOutputSpecs()方法以预先检查输出位置。启动作业时,此方法也将作为Hadoop调用序列的一部分被调用。因此,实现应确保可以多次调用此方法而不会产生不一致的副作用。setStoreLocation():Pig调用此方法以将商店位置传达给存储者。存储器应使用此方法将相同的信息传达给基础OutputFormat。 Pig多次调用此方法。实施中应注意,多次调用此方法,并应确保不会因多次调用而产生不一致的副作用。
prepareToWrite():在新的API中,数据的写入是通过StoreFunc提供的OutputFormat进行的。在prepareToWrite()中,与StoreFunc提供的OutputFormat关联的RecordWriter被传递到StoreFunc。然后,实现可以在putNext()中使用RecordWriter来以RecordWriter期望的方式编写一个表示数据记录的元组。
putNext():putNext()的含义没有改变,由Pig运行时调用以在新API中写入下一个数据元组,这是实现将使用基础RecordWriter将元组写出的方法。
StoreFunc中的默认实现:
setStoreFuncUDFContextSignature():Pig会在前端和后端调用此方法,以将唯一签名传递给Storer。签名可用于将任何信息存储到UDFContext中,存储器需要在前端和后端的各种方法调用之间存储这些信息。 StoreFunc中的默认实现的主体为空。该方法将在任何其他方法之前调用。
relToAbsPathForStoreLocation():Pig运行时将调用此方法,以允许存储库将相对存储位置转换为绝对位置。 StoreFunc中提供了一个实现,用于基于文件系统的位置进行处理。
checkSchema():
存储函数应实现此函数,以检查描述要写入的数据的给定架构是否可接受。 StoreFunc中的默认实现的主体为空。在对setStoreLocation()进行任何调用之前,将调用此方法。
示例实现:
该示例中的存储器实现是文本数据存储器,其中行分隔符为'','为默认字段分隔符(可以通过在构造函数中传递不同的字段分隔符来覆盖),这类似于Pig中的当前PigStorage存储器。
该实现使用现有的Hadoop支持的OutputFormat TextOutputFormat作为基础OutputFormat。
public class SimpleTextStorer extends StoreFunc { protected RecordWriter writer = null; private byte fieldDel = ' '; private static final int BUFFER_SIZE = 1024; private static final String UTF8 = "UTF-8"; public PigStorage() { } public PigStorage(String delimiter) { this(); if (delimiter.length() == 1) { this.fieldDel = (byte)delimiter.charAt(0); } else if (delimiter.length() > 1delimiter.charAt(0) == '') { switch (delimiter.charAt(1)) { case 't': this.fieldDel = (byte)' '; break; case 'x': fieldDel = Integer.valueOf(delimiter.substring(2), 16).byteValue(); break; case 'u': this.fieldDel = Integer.valueOf(delimiter.substring(2)).byteValue(); break; default: throw new RuntimeException("Unknown delimiter " + delimiter); } } else { throw new RuntimeException("PigStorage delimeter must be a single character"); } } ByteArrayOutputStream mOut = new ByteArrayOutputStream(BUFFER_SIZE); @Override public void putNext(Tuple f) throws IOException { int sz = f.size(); for (int i = 0; i < sz; i++) { Object field; try { field = f.get(i); } catch (ExecException ee) { throw ee; } putField(field); if (i != sz - 1) { mOut.write(fieldDel); } } Text text = new Text(mOut.toByteArray()); try { writer.write(null, text); mOut.reset(); } catch (InterruptedException e) { throw new IOException(e); } } @SuppressWarnings("unchecked") private void putField(Object field) throws IOException { //string constants for each delimiter String tupleBeginDelim = "("; String tupleEndDelim = ")"; String bagBeginDelim = "{"; String bagEndDelim = "}"; String mapBeginDelim = "["; String mapEndDelim = "]"; String fieldDelim = ","; String mapKeyValueDelim = "#"; switch (DataType.findType(field)) { case DataType.NULL: break; //just leave it empty case DataType.BOOLEAN: mOut.write(((Boolean)field).toString().getBytes()); break; case DataType.INTEGER: mOut.write(((Integer)field).toString().getBytes()); break; case DataType.LONG: mOut.write(((Long)field).toString().getBytes()); break; case DataType.FLOAT: mOut.write(((Float)field).toString().getBytes()); break; case DataType.DOUBLE: mOut.write(((Double)field).toString().getBytes()); break; case DataType.BYTEARRAY: { byte[] b = ((DataByteArray)field).get(); mOut.write(b, 0, b.length); break; } case DataType.CHARARRAY: //oddly enough, writeBytes writes a string mOut.write(((String)field).getBytes(UTF8)); break; case DataType.MAP: boolean mapHasNext = false; Map<String, Object> m = (Map<String, Object>)field; mOut.write(mapBeginDelim.getBytes(UTF8)); for(Map.Entry<String, Object> e: m.entrySet()) { if(mapHasNext) { mOut.write(fieldDelim.getBytes(UTF8)); } else { mapHasNext = true; } putField(e.getKey()); mOut.write(mapKeyValueDelim.getBytes(UTF8)); putField(e.getValue()); } mOut.write(mapEndDelim.getBytes(UTF8)); break; case DataType.TUPLE: boolean tupleHasNext = false; Tuple t = (Tuple)field; mOut.write(tupleBeginDelim.getBytes(UTF8)); for(int i = 0; i < t.size(); ++i) { if(tupleHasNext) { mOut.write(fieldDelim.getBytes(UTF8)); } else { tupleHasNext = true; } try { putField(t.get(i)); } catch (ExecException ee) { throw ee; } } mOut.write(tupleEndDelim.getBytes(UTF8)); break; case DataType.BAG: boolean bagHasNext = false; mOut.write(bagBeginDelim.getBytes(UTF8)); Iterator<Tuple> tupleIter = ((DataBag)field).iterator(); while(tupleIter.hasNext()) { if(bagHasNext) { mOut.write(fieldDelim.getBytes(UTF8)); } else { bagHasNext = true; } putField((Object)tupleIter.next()); } mOut.write(bagEndDelim.getBytes(UTF8)); break; default: { int errCode = 2108; String msg = "Could not determine data type of field: " + field; throw new ExecException(msg, errCode, PigException.BUG); } } } @Override public OutputFormat getOutputFormat() { return new TextOutputFormat<WritableComparable, Text>(); } @Override public void prepareToWrite(RecordWriter writer) { this.writer = writer; } @Override public void setStoreLocation(String location, Job job) throws IOException { job.getConfiguration().set("mapred.textoutputformat.separator", ""); FileOutputFormat.setOutputPath(job, new Path(location)); if (location.endsWith(".bz2")) { FileOutputFormat.setCompressOutput(job, true); FileOutputFormat.setOutputCompressorClass(job, BZip2Codec.class); } else if (location.endsWith(".gz")) { FileOutputFormat.setCompressOutput(job, true); FileOutputFormat.setOutputCompressorClass(job, GzipCodec.class); } } }