Apache Pig UDF:加载函数

时间:2020-02-23 14:37:38  来源:igfitidea点击:

Pig的加载函数建立在Hadoop的InputFormat之上,该类是Hadoop用来读取数据的类。
InputFormat有两个用途:它确定如何在地图任务之间分割输入,并提供一个RecordReader来产生键值对,作为这些地图任务的输入。
加载函数的基类是LoadFunc。

加载函数–分类:

LoadFunc抽象类有三种主要的数据加载方法,在大多数情况下,只要扩展它们就足够了。
可以实现其他三个可选接口以实现扩展函数:

  • LoadMetadata:

LoadMetadata具有处理元数据的方法。
除非加载程序与元数据系统进行交互,否则大多数执行程序都不需要实现此函数。
此接口中的getSchema()方法为加载器实现提供了一种将数据的模式传递回Pig的方式。
如果加载程序实现返回包含实类型字段的数据,则应提供描述通过getSchema()方法返回的数据的架构。
其他方法处理其他类型的元数据,例如分区键和统计信息。
如果这些方法对其他实现无效,则实现可以为这些方法返回空返回值。

  • LoadPushDown:

LoadPushDown有不同的方法将操作从Pig运行时推送到加载程序实现中。
当前,Pig仅调用pushProjection()方法来与加载程序通信,Pig脚本中需要这些字段的确切字段。
加载程序实现可以选择遵守还是不遵守请求。
如果加载程序实现决定遵守该请求,则应实现LoadPushDown以提高查询性能。

  • pushProjection():

此方法通知LoadFunc,Pig脚本中需要哪些字段。
因此,通过仅加载所需的字段,使LoadFunc能够提高性能。
pushProjection()带有一个" requiredFieldList"。
" requiredFieldList"是只读的,不能由LoadFunc更改。
" requiredFieldList"包括" requiredField"的列表,其中每个" requiredField"表示Pig脚本所需的字段,并由索引,别名,类型和子字段组成。
Pig使用列索引requiredField.index与Pig脚本要求的字段与LoadFunc进行通信。
如果必填字段是地图,Pig将传递" requiredField.subFields",其中包含Pig脚本所需的键列表。

  • LoadCaster:

LoadCaster具有将字节数组转换为特定类型的技术。
当需要支持从DataByteArray字段向其他类型的隐式或者显式强制转换时,加载程序实现应实现此目的。

LoadFunc抽象类是为实现加载程序而扩展的主要类。
下面说明了需要覆盖的方法:

  • getInputFormat():Pig调用此方法以获取加载程序使用的InputFormat。 Pig用与MapReduce Java程序中Hadoop相同的方式来调用InputFormat中的方法。如果InputFormat是Hadoop打包的,则实现应在org.apache.hadoop.mapreduce下使用基于新API的API。如果它是自定义的InputFormat,则最好使用org.apache.hadoop.mapreduce中的新API来实现。

  • setLocation():Pig调用此方法以将加载位置传达给加载器。加载程序需要使用此方法将相同的信息传达给核心InputFormat。Pig多次调用此方法。

  • prepareToRead():在此方法中,与LoadFunc提供的InputFormat相关的RecordReader被传递到LoadFunc。现在,实现中的getNext()可使用RecordReader将表示数据记录的元组返回给Pig。

  • getNext():getNext()的含义没有改变,由Pig运行时调用以获取数据中的下一个元组。在此方法中,实现应使用基础的RecordReader并构造元组以返回。

LoadFunc中的默认实现:

请注意,只有在需要时才应覆盖LoadFunc中的默认实现。

  • setUdfContextSignature():Pig将在前端和后端调用此方法,以将唯一签名传递给Loader。签名可用于将任何信息存储到UDFContext中,而加载程序需要在前端和后端的各种方法调用之间存储这些信息。一个用例是将传递给它的RequiredFieldList存储在LoadPushDown.pushProjection(RequiredFieldList)中,以供后端使用,然后在getNext()中返回元组。 LoadFunc中的默认实现的主体为空。该方法将在其他方法之前调用。

  • relativeToAbsolutePath():Pig运行时将调用此方法,以允许Loader将相对加载位置转换为绝对位置。 LoadFunc中提供的默认实现为FileSystem位置处理此问题。如果加载源是其他东西,则加载器实现可以选择覆盖它。

该示例中的加载程序实现是文本数据的加载程序,行分隔符为‘’,默认行分隔符为‘’,类似于Pig中当前的PigStorage加载程序。
该实现使用现有的Hadoop支持的Inputformat TextInputFormat作为基础InputFormat。

public class SimpleTextLoader extends LoadFunc {
protected RecordReader in = null;
private byte fieldDel = '	';
private ArrayList<Object> mProtoTuple = null;
private TupleFactory mTupleFactory = TupleFactory.getInstance();
private static final int BUFFER_SIZE = 1024;
public SimpleTextLoader() {
}
/**
* Constructs a Pig loader that uses specified character as a field delimiter.
*
* @param delimiter
*            the single byte character that is used to separate fields.
*            ("	" is the default.)
*/
public SimpleTextLoader(String delimiter) {
this();
if (delimiter.length() == 1) {
this.fieldDel = (byte)delimiter.charAt(0);
} else if (delimiter.length() >  1 & & delimiter.charAt(0) == '') {
switch (delimiter.charAt(1)) {
case 't':
this.fieldDel = (byte)'	';
break;
case 'x':
fieldDel =
Integer.valueOf(delimiter.substring(2), 16).byteValue();
break;
case 'u':
this.fieldDel =
Integer.valueOf(delimiter.substring(2)).byteValue();
break;
default:
throw new RuntimeException("Unknown delimiter " + delimiter);
}
} else {
throw new RuntimeException("PigStorage delimeter must be a single character");
}
}
@Override
public Tuple getNext() throws IOException {
try {
boolean notDone = in.nextKeyValue();
if (notDone) {
return null;
}
Text value = (Text) in.getCurrentValue();
byte[] buf = value.getBytes();
int len = value.getLength();
int start = 0;
for (int i = 0; i < len; i++) {
if (buf[i] == fieldDel) {
readField(buf, start, i);
start = i + 1;
}
}
//pick up the last field
readField(buf, start, len);
Tuple t =  mTupleFactory.newTupleNoCopy(mProtoTuple);
mProtoTuple = null;
return t;
} catch (InterruptedException e) {
int errCode = 6018;
String errMsg = "Error while reading input";
throw new ExecException(errMsg, errCode,
PigException.REMOTE_ENVIRONMENT, e);
}
}
private void readField(byte[] buf, int start, int end) {
if (mProtoTuple == null) {
mProtoTuple = new ArrayList<Object>();
}
if (start == end) {
//NULL value
mProtoTuple.add(null);
} else {
mProtoTuple.add(new DataByteArray(buf, start, end));
}
}
@Override
public InputFormat getInputFormat() {
return new TextInputFormat();
}
@Override
public void prepareToRead(RecordReader reader, PigSplit split) {
in = reader;
}
@Override
public void setLocation(String location, Job job)
throws IOException {
FileInputFormat.setInputPaths(job, location);
}
}