Apache Pig UDF:Eval,聚合(Aggregate)和过滤器(Filter)函数
Apache Pig提供了对用户定义函数(UDF)的广泛支持,作为指定自定义处理的一种方式。
Pig UDF当前可以三种语言执行:Java,Python,JavaScript和Ruby。
为Java函数提供了最广泛的支持。
Java UDF可以通过多种方式调用。
最简单的UDF可以仅扩展EvalFunc,它仅需要实现exec函数。
每个Eval UDF必须实现这一点。
此外,如果函数是代数的,则可以实现代数接口以显着提高查询性能。
UDF在Pig中的重要性:
Pig允许用户通过UDF将现有的运算符与他们自己的代码或者其他人的代码结合在一起。
Pig的优势在于它可以让用户通过UDF将其运算符与他们自己的代码或者其他人的代码结合在一起。
从0.7版开始,所有UDF必须以Java编写,并以Java类的形式实现。
通过编写Java类并通知Pig有关JAR文件,可以更轻松地向Pig添加新的UDF。
Pig本身带有一些UDF。
在0.8版之前,这是一个非常有限的集合,仅包含标准SQL聚合函数和其他一些集合函数。
在0.8中,添加了许多标准的字符串处理,数学和复杂类型的UDF。
什么是Piggybank?
Piggybank是与Pig一起发布的用户提供的UDF的集合。
Piggy JAR不包含Piggybank UDF,因此我们必须在脚本中手动注册它们。
我们也可以编写自己的UDF或者使用其他用户编写的UDF。
Eval函数
UDF类扩展了EvalFunc类,该类是所有Eval函数的基础。
所有评估函数都扩展了Java类‘org.apache.pig.EvalFunc。
‘使用UDF的返回类型(在这种情况下为Java字符串)对其进行参数化。
该类的核心方法是" exec"。
代码的第一行表示该函数是myudfs包的一部分。
它获取一条记录并返回一个结果,该结果将为通过执行管道的每条记录调用。
它采用一个元组,其中包含脚本作为输入传递给UDF的所有字段。
然后,它返回参数化EvalFunc的类型。
在每个输入元组上调用此函数。
该函数的输入是一个元组,其中输入参数的顺序是它们在Pig脚本中传递给该函数的顺序。
在下面显示的示例中,该函数将字符串作为输入。
以下函数将字符串从小写转换为大写。
现在已经实现了该函数,需要对其进行编译并将其包含在JAR中。
package myudfs; import java.io.IOException; import org.apache.pig.EvalFunc; import org.apache.pig.data.Tuple; public class UPPER extends EvalFunc<String> { public String exec(Tuple input) throws IOException { if (input == null || input.size() == 0) return null; try{ String str = (String)input.get(0); return str.toUpperCase(); }catch(Exception e){ throw new IOException("Caught exception processing input row ", e); } } }
聚合(Aggregate)函数:
集合函数是Eval函数的另一种常见类型。
聚合函数通常应用于分组数据。
Aggregate函数将一个包放入并返回标量值。
许多聚合函数的一个有趣且有价值的函数是可以以分布式方式递增地计算它们。
在Hadoop世界中,这意味着部分计算可以由Map和Combiner完成,最终结果可以由Reducer计算。
确保如此实现代数的聚合函数非常重要。
这种类型的示例包括内置的COUNT,MIN,MAX和AVERAGE。
COUNT是代数函数的一个示例,我们可以计算数据子集中元素的数量,然后对这些计数求和以产生最终输出。
让我们看一下COUNT函数的实现:
public class COUNT extends EvalFunc<Long> implements Algebraic{ public Long exec(Tuple input) throws IOException {return count(input);} public String getInitial() {return Initial.class.getName();} public String getIntermed() {return Intermed.class.getName();} public String getFinal() {return Final.class.getName();} static public class Initial extends EvalFunc<Tuple> { public Tuple exec(Tuple input) throws IOException {return TupleFactory.getInstance().newTuple(count(input));} } static public class Intermed extends EvalFunc<Tuple> { public Tuple exec(Tuple input) throws IOException {return TupleFactory.getInstance().newTuple(sum(input));} } static public class Final extends EvalFunc<Long> { public Tuple exec(Tuple input) throws IOException {return sum(input);} } static protected Long count(Tuple input) throws ExecException { Object values = input.get(0); if (values instanceof DataBag) return ((DataBag)values).size(); else if (values instanceof Map) return new Long(((Map)values).size()); } static protected Long sum(Tuple input) throws ExecException, NumberFormatException { DataBag values = (DataBag)input.get(0); long sum = 0; for (Iterator (Tuple) it = values.iterator(); it.hasNext();) { Tuple t = it.next(); sum += (Long)t.get(0); } return sum; } }
COUNT实现了代数接口,如下所示:
public interface Algebraic{ public String getInitial(); public String getIntermed(); public String getFinal(); }
为了使函数成为代数,它需要实现代数接口,该接口由从EvalFunc派生的三个类的定义组成。
约定是,一次调用Initial类的execfunction并将其传递给原始输入元组。
它的输出是一个包含部分结果的元组。
Intermed类的exec函数可以被调用零次或者多次,并将包含由Initial类或者Intermed类的先前调用产生的部分结果的元组作为其输入,并生成具有另一个部分结果的元组。
最后,将调用Final类的exec函数,并以标量类型给出最终结果。
过滤(Filter)函数:
筛选器函数是返回布尔值的Eval函数。
它可以在任何适合布尔表达式的地方使用,包括FILTER运算符或者Bincond表达式。
Apache Pig完全不支持布尔值,因此Filter函数不能出现在诸如" Foreach"之类的语句中,该语句的结果输出到另一个运算符。
但是,可以在过滤器语句中使用过滤器函数。
下面的示例实现IsEmpty函数:
import java.io.IOException; import java.util.Map; import org.apache.pig.FilterFunc; import org.apache.pig.PigException; import org.apache.pig.backend.executionengine.ExecException; import org.apache.pig.data.DataBag; import org.apache.pig.data.Tuple; import org.apache.pig.data.DataType; /** * Determine whether a bag or map is empty. */ public class IsEmpty extends FilterFunc { @Override public Boolean exec(Tuple input) throws IOException { try { Object values = input.get(0); if (values instanceof DataBag) return ((DataBag)values).size() == 0; else if (values instanceof Map) return ((Map)values).size() == 0; else { int errCode = 2102; String msg = "Cannot test a " + DataType.findTypeName(values) + " for emptiness."; throw new ExecException(msg, errCode, PigException.BUG); } } catch (ExecException ee) { throw ee; } } }