@PublicEvolving public interface Procedure
The behavior of Procedure
can be defined by implements a custom call method. An call
method must be declared publicly, not static, and named call
. Call methods can also
be overloaded by implementing multiple methods named call
. Currently, it doesn't
allow users to custom their own procedure, the customer Procedure
can only be provided by
Catalog
. To provide Procedure
, Catalog
must implement Catalog.getProcedure(ObjectPath)
.
When calling a stored procedure, Flink will always pass the
org.apache.flink.table.procedure.ProcedureContext
which provides
StreamExecutionEnvironment currently as the first parameter of the call
method. So,
the custom call
method must accept the
org.apache.flink.table.procedure.ProcedureContext
as the first parameter, and the other parameters of the call
method are the
actual parameter of the stored procedure.
By default, input and output data types are automatically extracted using reflection. The
input arguments are derived from one or more call()
methods. If the reflective
information is not sufficient, it can be supported and enriched with DataTypeHint
and
ProcedureHint
. If ProcedureHint
is used to hint input arguments, it should only
hint the input arguments that start from the second argument since the first argument is always
ProcedureContext
which doesn't need to be annotated with data type hint.
Note: The return type of the call()
method should always be T[] where T can be an
atomic type, Row, Pojo. An atomic type will be implicitly wrapped into a row consisting of one
field. Also, the DataTypeHint
for output data type is used to hint T.
The following examples with pseudocode show how to write a stored procedure:
// a stored procedure that tries to rewrite data files for iceberg, it accept STRING
// and return an array of explicit ROW < STRING, STRING >.
class IcebergRewriteDataFilesProcedure implements Procedure {
public @DataTypeHint("ROW< rewritten_data_files_count STRING, added_data_files_count STRING >")
Row[] call(ProcedureContext procedureContext, String tableName) {
// plan for scanning the table to do rewriting
Table table = loadTable(tableName);
List<CombinedScanTask> combinedScanTasks = planScanTask(table);
// now, rewrite the files according to the planning task
StreamExecutionEnvironment env = procedureContext.getExecutionEnvironment();
DataStream<CombinedScanTask> dataStream = env.fromCollection(combinedScanTasks);
RowDataRewriter rowDataRewriter =
new RowDataRewriter(table(), caseSensitive(), fileIO(), encryptionManager());
List<DataFile> addedDataFiles;
try {
addedDataFiles = rowDataRewriter.rewriteDataForTasks(dataStream, parallelism);
} catch (Exception e) {
throw new RuntimeException("Rewrite data file error.", e);
}
// replace the current files
List<DataFile> currentDataFiles = combinedScanTasks.stream()
.flatMap(tasks -> tasks.files().stream().map(FileScanTask::file))
.collect(Collectors.toList());
replaceDataFiles(currentDataFiles, addedDataFiles, startingSnapshotId);
// return the result for rewriting
return new Row[] {Row.of(currentDataFiles.size(), addedDataFiles.size())};
}
}
// a stored procedure that accepts < STRING, LONG > and
// return an array of STRING without datatype hint.
class RollbackToSnapShotProcedure implements Procedure {
public String[] call(ProcedureContext procedureContext, String tableName, Long snapshot) {
Table table = loadTable(tableName);
Long previousSnapShotId = table.currentSnapshot();
table.manageSnapshots().rollbackTo(snapshotId).commit();
return new String[] {
"previous_snapshot_id: " + previousSnapShotId,
"current_snapshot_id " + snapshot
};
}
}
In term of the API, a stored procedure can be used as follows:
// for SQL users
TableEnvironment tEnv = ...
tEnv.executeSql("CALL rollback_to_snapshot('t', 1001)");
Copyright © 2014–2024 The Apache Software Foundation. All rights reserved.