hive 里面的 UDTF

hive 支持 UDF, UDAF, UDTF,这几个让你使用 hive 更加便捷。

UDF



udf 就是一个自定义的 function,输入一个或多个参数,返回一个返回值,类似 substr/trim 之类。写起来比较简单,重构 UDF 类的 evaluate 方法就可以了。可以参考 http://richiehu.blog.51cto.com/2093113/386112
这是一个 urldecode 函数。

import org.apache.hadoop.hive.ql.exec.UDF;
import java.net.URLDecoder;

public final class urldecode extends UDF {

    public String evaluate(final String s) {
        if (s == null) { return null; }
        return getString(s);
    }

    public static String getString(String s) {
        String a;
        try {
            a = URLDecoder.decode(s);
        } catch ( Exception e) {
            a = "";
        }
        return a;
    }

    public static void main(String args[]) {
        String t = "%E5%A4%AA%E5%8E%9F-%E4%B8%89%E4%BA%9A";
        System.out.println( getString(t) );
    }
}

UDAF

udaf 就是自定义的聚合函数,类似 sum/avg 这类,输入多行的一个或多个参数,返回一个返回值。这个还没写过,可以参考 http://richiehu.blog.51cto.com/2093113/386113 。  
 

UDTF

udtf 是针对输入一行,输出多行的需求来的,类似 explode 函数。可以参考 http://www.linezing.com/blog/?p=323 。 这个是输入数组字段,输出两列,一列是数组元素的位置,一列是数组元素。比 explode 多了一列位置,不过数组元素只能是 String 类型的。

import java.util.ArrayList;
import java.util.List;

//import org.apache.hadoop.io.Text;

import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF;

import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.exec.description;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;

import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;

@description(
        name = "explodeWithPos",
        value = "_FUNC_(a) - separates the elements of array a into multiple rows with pos as first col "
        )

public class explodeWithPos extends GenericUDTF {

    ListObjectInspector listOI = null;

    @Override
        public void close() throws HiveException{
        }

    @Override
        public StructObjectInspector initialize(ObjectInspector [] args) throws UDFArgumentException {

            if (args.length != 1) {
                throw new UDFArgumentException("explodeWithPos() takes only one argument");
            }

            if (args[0].getCategory() != ObjectInspector.Category.LIST) {
                throw new UDFArgumentException("explodeWithPos() takes an array as a parameter");
            }

            listOI = (ListObjectInspector)args[0];

            ArrayList fieldNames = new ArrayList();
            ArrayList fieldOIs = new ArrayList();

            fieldNames.add("col1");
            //fieldOIs.add(listOI.getListElementObjectInspector());
            fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);

            fieldNames.add("col2");
            //fieldOIs.add(listOI.getListElementObjectInspector());
            fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);

            return ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames, fieldOIs);
        }

        //Object forwardObj[] = new Object[2];
        Object forwardObj[] = new String[2];

    @Override
        public void process(Object [] o) throws HiveException {

            List list = listOI.getList(o[0]);

            if ( list == null ) {
                return;
            }

            int i =0;
            for (Object r : list) {
                forwardObj[0] = String.valueOf(i);
                forwardObj[1] = r.toString();
                this.forward(forwardObj);
                i++;
            }
        }

    @Override
        public String toString() {
            return "explodeWithPos";
        }
}

Happy every day!

Good good study, day day up!


2011-04-24