UDF
udf 就是一个自定义的 function,输入一个或多个参数,返回一个返回值,类似 substr/trim 之类。写起来比较简单,重构 UDF 类的 evaluate 方法就可以了。可以参考 http://richiehu.blog.51cto.com/2093113/386112 。
这是一个 urldecode 函数。
import org.apache.hadoop.hive.ql.exec.UDF; import java.net.URLDecoder; public final class urldecode extends UDF { public String evaluate(final String s) { if (s == null) { return null; } return getString(s); } public static String getString(String s) { String a; try { a = URLDecoder.decode(s); } catch ( Exception e) { a = ""; } return a; } public static void main(String args[]) { String t = "%E5%A4%AA%E5%8E%9F-%E4%B8%89%E4%BA%9A"; System.out.println( getString(t) ); } }
UDAF
udaf 就是自定义的聚合函数,类似 sum/avg 这类,输入多行的一个或多个参数,返回一个返回值。这个还没写过,可以参考 http://richiehu.blog.51cto.com/2093113/386113 。
UDTF
udtf 是针对输入一行,输出多行的需求来的,类似 explode 函数。可以参考 http://www.linezing.com/blog/?p=323 。
这个是输入数组字段,输出两列,一列是数组元素的位置,一列是数组元素。比 explode 多了一列位置,不过数组元素只能是 String 类型的。
import java.util.ArrayList; import java.util.List; //import org.apache.hadoop.io.Text; import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF; import org.apache.hadoop.hive.ql.exec.UDFArgumentException; import org.apache.hadoop.hive.ql.exec.description; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; @description( name = "explodeWithPos", value = "_FUNC_(a) - separates the elements of array a into multiple rows with pos as first col " ) public class explodeWithPos extends GenericUDTF { ListObjectInspector listOI = null; @Override public void close() throws HiveException{ } @Override public StructObjectInspector initialize(ObjectInspector [] args) throws UDFArgumentException { if (args.length != 1) { throw new UDFArgumentException("explodeWithPos() takes only one argument"); } if (args[0].getCategory() != ObjectInspector.Category.LIST) { throw new UDFArgumentException("explodeWithPos() takes an array as a parameter"); } listOI = (ListObjectInspector)args[0]; ArrayList fieldNames = new ArrayList(); ArrayList fieldOIs = new ArrayList(); fieldNames.add("col1"); //fieldOIs.add(listOI.getListElementObjectInspector()); fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector); fieldNames.add("col2"); //fieldOIs.add(listOI.getListElementObjectInspector()); fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector); return ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames, fieldOIs); } //Object forwardObj[] = new Object[2]; Object forwardObj[] = new String[2]; @Override public void process(Object [] o) throws HiveException { List list = listOI.getList(o[0]); if ( list == null ) { return; } int i =0; for (Object r : list) { forwardObj[0] = String.valueOf(i); forwardObj[1] = r.toString(); this.forward(forwardObj); i++; } } @Override public String toString() { return "explodeWithPos"; } }