Apache Pig UDF:加载函数
Pig的加载函数建立在Hadoop的InputFormat之上,该类是Hadoop用来读取数据的类。
InputFormat有两个用途:它确定如何在地图任务之间分割输入,并提供一个RecordReader来产生键值对,作为这些地图任务的输入。
加载函数的基类是LoadFunc。
加载函数–分类:
LoadFunc抽象类有三种主要的数据加载方法,在大多数情况下,只要扩展它们就足够了。
可以实现其他三个可选接口以实现扩展函数:
- LoadMetadata:
LoadMetadata具有处理元数据的方法。
除非加载程序与元数据系统进行交互,否则大多数执行程序都不需要实现此函数。
此接口中的getSchema()方法为加载器实现提供了一种将数据的模式传递回Pig的方式。
如果加载程序实现返回包含实类型字段的数据,则应提供描述通过getSchema()方法返回的数据的架构。
其他方法处理其他类型的元数据,例如分区键和统计信息。
如果这些方法对其他实现无效,则实现可以为这些方法返回空返回值。
LoadPushDown:
LoadPushDown有不同的方法将操作从Pig运行时推送到加载程序实现中。
当前,Pig仅调用pushProjection()方法来与加载程序通信,Pig脚本中需要这些字段的确切字段。
加载程序实现可以选择遵守还是不遵守请求。
如果加载程序实现决定遵守该请求,则应实现LoadPushDown以提高查询性能。
pushProjection():
此方法通知LoadFunc,Pig脚本中需要哪些字段。
因此,通过仅加载所需的字段,使LoadFunc能够提高性能。
pushProjection()带有一个" requiredFieldList"。
" requiredFieldList"是只读的,不能由LoadFunc更改。
" requiredFieldList"包括" requiredField"的列表,其中每个" requiredField"表示Pig脚本所需的字段,并由索引,别名,类型和子字段组成。
Pig使用列索引requiredField.index与Pig脚本要求的字段与LoadFunc进行通信。
如果必填字段是地图,Pig将传递" requiredField.subFields",其中包含Pig脚本所需的键列表。
LoadCaster:
LoadCaster具有将字节数组转换为特定类型的技术。
当需要支持从DataByteArray字段向其他类型的隐式或者显式强制转换时,加载程序实现应实现此目的。
LoadFunc抽象类是为实现加载程序而扩展的主要类。
下面说明了需要覆盖的方法:
getInputFormat():
Pig调用此方法以获取加载程序使用的InputFormat。 Pig用与MapReduce Java程序中Hadoop相同的方式来调用InputFormat中的方法。如果InputFormat是Hadoop打包的,则实现应在org.apache.hadoop.mapreduce下使用基于新API的API。如果它是自定义的InputFormat,则最好使用org.apache.hadoop.mapreduce中的新API来实现。setLocation():Pig调用此方法以将加载位置传达给加载器。加载程序需要使用此方法将相同的信息传达给核心InputFormat。Pig多次调用此方法。
prepareToRead():在此方法中,与LoadFunc提供的InputFormat相关的RecordReader被传递到LoadFunc。现在,实现中的getNext()可使用RecordReader将表示数据记录的元组返回给Pig。
getNext():getNext()的含义没有改变,由Pig运行时调用以获取数据中的下一个元组。在此方法中,实现应使用基础的RecordReader并构造元组以返回。
LoadFunc中的默认实现:
请注意,只有在需要时才应覆盖LoadFunc中的默认实现。
setUdfContextSignature():
Pig将在前端和后端调用此方法,以将唯一签名传递给Loader。签名可用于将任何信息存储到UDFContext中,而加载程序需要在前端和后端的各种方法调用之间存储这些信息。一个用例是将传递给它的RequiredFieldList存储在LoadPushDown.pushProjection(RequiredFieldList)中,以供后端使用,然后在getNext()中返回元组。 LoadFunc中的默认实现的主体为空。该方法将在其他方法之前调用。relativeToAbsolutePath():
Pig运行时将调用此方法,以允许Loader将相对加载位置转换为绝对位置。 LoadFunc中提供的默认实现为FileSystem位置处理此问题。如果加载源是其他东西,则加载器实现可以选择覆盖它。
该示例中的加载程序实现是文本数据的加载程序,行分隔符为‘’,默认行分隔符为‘’,类似于Pig中当前的PigStorage加载程序。
该实现使用现有的Hadoop支持的Inputformat TextInputFormat作为基础InputFormat。
public class SimpleTextLoader extends LoadFunc { protected RecordReader in = null; private byte fieldDel = ' '; private ArrayList<Object> mProtoTuple = null; private TupleFactory mTupleFactory = TupleFactory.getInstance(); private static final int BUFFER_SIZE = 1024; public SimpleTextLoader() { } /** * Constructs a Pig loader that uses specified character as a field delimiter. * * @param delimiter * the single byte character that is used to separate fields. * (" " is the default.) */ public SimpleTextLoader(String delimiter) { this(); if (delimiter.length() == 1) { this.fieldDel = (byte)delimiter.charAt(0); } else if (delimiter.length() > 1 & & delimiter.charAt(0) == '') { switch (delimiter.charAt(1)) { case 't': this.fieldDel = (byte)' '; break; case 'x': fieldDel = Integer.valueOf(delimiter.substring(2), 16).byteValue(); break; case 'u': this.fieldDel = Integer.valueOf(delimiter.substring(2)).byteValue(); break; default: throw new RuntimeException("Unknown delimiter " + delimiter); } } else { throw new RuntimeException("PigStorage delimeter must be a single character"); } } @Override public Tuple getNext() throws IOException { try { boolean notDone = in.nextKeyValue(); if (notDone) { return null; } Text value = (Text) in.getCurrentValue(); byte[] buf = value.getBytes(); int len = value.getLength(); int start = 0; for (int i = 0; i < len; i++) { if (buf[i] == fieldDel) { readField(buf, start, i); start = i + 1; } } //pick up the last field readField(buf, start, len); Tuple t = mTupleFactory.newTupleNoCopy(mProtoTuple); mProtoTuple = null; return t; } catch (InterruptedException e) { int errCode = 6018; String errMsg = "Error while reading input"; throw new ExecException(errMsg, errCode, PigException.REMOTE_ENVIRONMENT, e); } } private void readField(byte[] buf, int start, int end) { if (mProtoTuple == null) { mProtoTuple = new ArrayList<Object>(); } if (start == end) { //NULL value mProtoTuple.add(null); } else { mProtoTuple.add(new DataByteArray(buf, start, end)); } } @Override public InputFormat getInputFormat() { return new TextInputFormat(); } @Override public void prepareToRead(RecordReader reader, PigSplit split) { in = reader; } @Override public void setLocation(String location, Job job) throws IOException { FileInputFormat.setInputPaths(job, location); } }