The HiveModule
provides Hive built-in functions as Flink system (built-in) functions to Flink SQL and Table API users.
For detailed information, please refer to HiveModule.
String name = "myhive";
String version = "2.3.4";
tableEnv.loadModue(name, new HiveModule(version));
val name = "myhive"
val version = "2.3.4"
tableEnv.loadModue(name, new HiveModule(version));
modules:
- name: core
type: core
- name: myhive
type: hive
hive-version: 2.3.4
Users can use their existing Hive User Defined Functions in Flink.
Supported UDF types include:
Upon query planning and execution, Hive’s UDF and GenericUDF are automatically translated into Flink’s ScalarFunction, Hive’s GenericUDTF is automatically translated into Flink’s TableFunction, and Hive’s UDAF and GenericUDAFResolver2 are translated into Flink’s AggregateFunction.
To use a Hive User Defined Function, user have to
Assuming we have the following Hive functions registered in Hive Metastore:
/**
* Test simple udf. Registered under name 'myudf'
*/
public class TestHiveSimpleUDF extends UDF {
public IntWritable evaluate(IntWritable i) {
return new IntWritable(i.get());
}
public Text evaluate(Text text) {
return new Text(text.toString());
}
}
/**
* Test generic udf. Registered under name 'mygenericudf'
*/
public class TestHiveGenericUDF extends GenericUDF {
@Override
public ObjectInspector initialize(ObjectInspector[] arguments) throws UDFArgumentException {
checkArgument(arguments.length == 2);
checkArgument(arguments[1] instanceof ConstantObjectInspector);
Object constant = ((ConstantObjectInspector) arguments[1]).getWritableConstantValue();
checkArgument(constant instanceof IntWritable);
checkArgument(((IntWritable) constant).get() == 1);
if (arguments[0] instanceof IntObjectInspector ||
arguments[0] instanceof StringObjectInspector) {
return arguments[0];
} else {
throw new RuntimeException("Not support argument: " + arguments[0]);
}
}
@Override
public Object evaluate(DeferredObject[] arguments) throws HiveException {
return arguments[0].get();
}
@Override
public String getDisplayString(String[] children) {
return "TestHiveGenericUDF";
}
}
/**
* Test split udtf. Registered under name 'mygenericudtf'
*/
public class TestHiveUDTF extends GenericUDTF {
@Override
public StructObjectInspector initialize(ObjectInspector[] argOIs) throws UDFArgumentException {
checkArgument(argOIs.length == 2);
// TEST for constant arguments
checkArgument(argOIs[1] instanceof ConstantObjectInspector);
Object constant = ((ConstantObjectInspector) argOIs[1]).getWritableConstantValue();
checkArgument(constant instanceof IntWritable);
checkArgument(((IntWritable) constant).get() == 1);
return ObjectInspectorFactory.getStandardStructObjectInspector(
Collections.singletonList("col1"),
Collections.singletonList(PrimitiveObjectInspectorFactory.javaStringObjectInspector));
}
@Override
public void process(Object[] args) throws HiveException {
String str = (String) args[0];
for (String s : str.split(",")) {
forward(s);
forward(s);
}
}
@Override
public void close() {
}
}
From Hive CLI, we can see they are registered:
hive> show functions;
OK
......
mygenericudf
myudf
myudtf
Then, users can use them in SQL as:
Flink SQL> select mygenericudf(myudf(name), 1) as a, mygenericudf(myudf(age), 1) as b, s from mysourcetable, lateral table(myudtf(name, 1)) as T(s);