IN1
- The data type of the first input data set.IN2
- The data type of the second input data set.OUT
- The data type of the returned data set.@Public public abstract class TwoInputUdfOperator<IN1,IN2,OUT,O extends TwoInputUdfOperator<IN1,IN2,OUT,O>> extends TwoInputOperator<IN1,IN2,OUT,O> implements UdfOperator<O>
RichJoinFunction
or
RichCoGroupFunction
).
This class encapsulates utilities for the UDFs, such as broadcast variables, parameterization through configuration objects, and semantic properties.
name, parallelism
Modifier | Constructor and Description |
---|---|
protected |
TwoInputUdfOperator(DataSet<IN1> input1,
DataSet<IN2> input2,
TypeInformation<OUT> resultType)
Creates a new operators with the two given data sets as inputs.
|
Modifier and Type | Method and Description |
---|---|
protected DualInputSemanticProperties |
extractSemanticAnnotationsFromUdf(Class<?> udfClass) |
protected boolean |
getAnalyzedUdfSemanticsFlag() |
Map<String,DataSet<?>> |
getBroadcastSets()
Gets the broadcast sets (name and data set) that have been added to context of the UDF.
|
protected abstract Function |
getFunction() |
Configuration |
getParameters()
Gets the configuration parameters that will be passed to the UDF's open method
AbstractRichFunction.open(Configuration) . |
DualInputSemanticProperties |
getSemanticProperties()
Gets the semantic properties that have been set for the user-defined functions (UDF).
|
O |
returns(Class<OUT> typeClass)
Adds a type information hint about the return type of this operator.
|
O |
returns(String typeInfoString)
Deprecated.
Please use
returns(Class) or returns(TypeHint) instead. |
O |
returns(TypeHint<OUT> typeHint)
Adds a type information hint about the return type of this operator.
|
O |
returns(TypeInformation<OUT> typeInfo)
Adds a type information hint about the return type of this operator.
|
protected void |
setAnalyzedUdfSemanticsFlag() |
void |
setSemanticProperties(DualInputSemanticProperties properties)
Sets the semantic properties for the user-defined function (UDF).
|
protected boolean |
udfWithForwardedFieldsFirstAnnotation(Class<?> udfClass) |
protected boolean |
udfWithForwardedFieldsSecondAnnotation(Class<?> udfClass) |
O |
withBroadcastSet(DataSet<?> data,
String name)
Adds a certain data set as a broadcast set to this operator.
|
O |
withForwardedFieldsFirst(String... forwardedFieldsFirst)
Adds semantic information about forwarded fields of the first input of the user-defined function.
|
O |
withForwardedFieldsSecond(String... forwardedFieldsSecond)
Adds semantic information about forwarded fields of the second input of the user-defined function.
|
O |
withParameters(Configuration parameters)
Sets the configuration parameters for the UDF.
|
getInput1, getInput1Type, getInput2, getInput2Type, translateToDataFlow
getName, getParallelism, getResultType, name, setParallelism
aggregate, checkSameExecutionContext, clean, coGroup, collect, combineGroup, count, cross, crossWithHuge, crossWithTiny, distinct, distinct, distinct, distinct, fillInType, filter, first, flatMap, fullOuterJoin, fullOuterJoin, getExecutionEnvironment, getType, groupBy, groupBy, groupBy, iterate, iterateDelta, join, join, joinWithHuge, joinWithTiny, leftOuterJoin, leftOuterJoin, map, mapPartition, max, maxBy, min, minBy, output, partitionByHash, partitionByHash, partitionByHash, partitionByRange, partitionByRange, partitionByRange, partitionCustom, partitionCustom, partitionCustom, print, print, printOnTaskManager, printToErr, printToErr, project, rebalance, reduce, reduceGroup, rightOuterJoin, rightOuterJoin, runOperation, sortPartition, sortPartition, sortPartition, sum, union, write, write, writeAsCsv, writeAsCsv, writeAsCsv, writeAsCsv, writeAsFormattedText, writeAsFormattedText, writeAsText, writeAsText
protected TwoInputUdfOperator(DataSet<IN1> input1, DataSet<IN2> input2, TypeInformation<OUT> resultType)
input1
- The data set for the first input.input2
- The data set for the second input.resultType
- The type of the elements in the resulting data set.protected abstract Function getFunction()
public O withParameters(Configuration parameters)
UdfOperator
AbstractRichFunction.open(Configuration)
method.withParameters
in interface UdfOperator<O extends TwoInputUdfOperator<IN1,IN2,OUT,O>>
parameters
- The configuration parameters for the UDF.public O withBroadcastSet(DataSet<?> data, String name)
UdfOperator
RuntimeContext.getBroadcastVariable(String)
.
The runtime context itself is available in all UDFs via
AbstractRichFunction.getRuntimeContext()
.withBroadcastSet
in interface UdfOperator<O extends TwoInputUdfOperator<IN1,IN2,OUT,O>>
data
- The data set to be broadcasted.name
- The name under which the broadcast data set retrieved.public O withForwardedFieldsFirst(String... forwardedFieldsFirst)
Adds semantic information about forwarded fields of the first input of the user-defined function. The forwarded fields information declares fields which are never modified by the function and which are forwarded at the same position to the output or unchanged copied to another position in the output.
Fields that are forwarded at the same position are specified by their position.
The specified position must be valid for the input and output data type and have the same type.
For example withForwardedFieldsFirst("f2")
declares that the third field of a Java input tuple
from the first input is copied to the third field of an output tuple.
Fields which are unchanged copied from the first input to another position in the output are declared
by specifying the source field reference in the first input and the target field reference in the output.
withForwardedFieldsFirst("f0->f2")
denotes that the first field of the first input Java tuple is
unchanged copied to the third field of the Java output tuple. When using a wildcard ("*") ensure that
the number of declared fields and their types in first input and output type match.
Multiple forwarded fields can be annotated in one (withForwardedFieldsFirst("f2; f3->f0; f4")
)
or separate Strings (withForwardedFieldsFirst("f2", "f3->f0", "f4")
).
Please refer to the JavaDoc of Function
or Flink's documentation for
details on field references such as nested fields and wildcard.
It is not possible to override existing semantic information about forwarded fields of the first input which was
for example added by a FunctionAnnotation.ForwardedFieldsFirst
class annotation.
NOTE: Adding semantic information for functions is optional! If used correctly, semantic information can help the Flink optimizer to generate more efficient execution plans. However, incorrect semantic information can cause the optimizer to generate incorrect execution plans which compute wrong results! So be careful when adding semantic information.
forwardedFieldsFirst
- A list of forwarded field expressions for the first input of the function.FunctionAnnotation
,
FunctionAnnotation.ForwardedFieldsFirst
public O withForwardedFieldsSecond(String... forwardedFieldsSecond)
Adds semantic information about forwarded fields of the second input of the user-defined function. The forwarded fields information declares fields which are never modified by the function and which are forwarded at the same position to the output or unchanged copied to another position in the output.
Fields that are forwarded at the same position are specified by their position.
The specified position must be valid for the input and output data type and have the same type.
For example withForwardedFieldsSecond("f2")
declares that the third field of a Java input tuple
from the second input is copied to the third field of an output tuple.
Fields which are unchanged copied from the second input to another position in the output are declared
by specifying the source field reference in the second input and the target field reference in the output.
withForwardedFieldsSecond("f0->f2")
denotes that the first field of the second input Java tuple is
unchanged copied to the third field of the Java output tuple. When using a wildcard ("*") ensure that
the number of declared fields and their types in second input and output type match.
Multiple forwarded fields can be annotated in one (withForwardedFieldsSecond("f2; f3->f0; f4")
)
or separate Strings (withForwardedFieldsSecond("f2", "f3->f0", "f4")
).
Please refer to the JavaDoc of Function
or Flink's documentation for
details on field references such as nested fields and wildcard.
It is not possible to override existing semantic information about forwarded fields of the second input which was
for example added by a FunctionAnnotation.ForwardedFieldsSecond
class annotation.
NOTE: Adding semantic information for functions is optional! If used correctly, semantic information can help the Flink optimizer to generate more efficient execution plans. However, incorrect semantic information can cause the optimizer to generate incorrect execution plans which compute wrong results! So be careful when adding semantic information.
forwardedFieldsSecond
- A list of forwarded field expressions for the second input of the function.FunctionAnnotation
,
FunctionAnnotation.ForwardedFieldsSecond
public O returns(Class<OUT> typeClass)
Classes can be used as type hints for non-generic types (classes without generic parameters),
but not for generic types like for example Tuples. For those generic types, please
use the returns(TypeHint)
method.
Use this method the following way:
DataSet<String[]> result =
data1.join(data2).where("id").equalTo("fieldX")
.with(new JoinFunctionWithNonInferrableReturnType())
.returns(String[].class);
typeClass
- The class of the returned data type.public O returns(TypeHint<OUT> typeHint)
Use this method the following way:
DataSet<Tuple2<String, Double>> result =
data1.join(data2).where("id").equalTo("fieldX")
.with(new JoinFunctionWithNonInferrableReturnType())
.returns(new TypeHint<Tuple2<String, Double>>(){});
typeHint
- The type hint for the returned data type.public O returns(TypeInformation<OUT> typeInfo)
In most cases, the methods returns(Class)
and returns(TypeHint)
are preferable.
typeInfo
- The type information for the returned data type.@Deprecated @PublicEvolving public O returns(String typeInfoString)
returns(Class)
or returns(TypeHint)
instead.Type hints are important in cases where the Java compiler throws away generic type information necessary for efficient execution.
This method takes a type information string that will be parsed. A type information string can contain the following types:
Integer
, String
, etc.
Integer[]
,
String[]
, etc.
Tuple1<TYPE0>
,
Tuple2<TYPE0, TYPE1>
, etc.org.my.MyPojo<myFieldName=TYPE0,myFieldName2=TYPE1>
, etc.java.lang.Class
, etc.
org.my.CustomClass[]
,
org.my.CustomClass$StaticInnerClass[]
, etc.
DoubleValue
,
StringValue
, IntegerValue
, etc.Tuple2<TYPE0,TYPE1>[], etc.
Writable<org.my.CustomWritable>
Enum<org.my.CustomEnum>
"Tuple2<String,Tuple2<Integer,org.my.MyJob$Pojo<word=String>>>"
typeInfoString
- type information string to be parsed@Internal public Map<String,DataSet<?>> getBroadcastSets()
UdfOperator
UdfOperator.withBroadcastSet(DataSet, String)
.getBroadcastSets
in interface UdfOperator<O extends TwoInputUdfOperator<IN1,IN2,OUT,O>>
public Configuration getParameters()
UdfOperator
AbstractRichFunction.open(Configuration)
.
The configuration is set via the UdfOperator.withParameters(Configuration)
method.getParameters
in interface UdfOperator<O extends TwoInputUdfOperator<IN1,IN2,OUT,O>>
@Internal public DualInputSemanticProperties getSemanticProperties()
UdfOperator
getSemanticProperties
in interface UdfOperator<O extends TwoInputUdfOperator<IN1,IN2,OUT,O>>
@Internal public void setSemanticProperties(DualInputSemanticProperties properties)
UdfOperator.getSemanticProperties()
.properties
- The semantic properties for the UDF.UdfOperator.getSemanticProperties()
protected boolean getAnalyzedUdfSemanticsFlag()
protected void setAnalyzedUdfSemanticsFlag()
protected DualInputSemanticProperties extractSemanticAnnotationsFromUdf(Class<?> udfClass)
protected boolean udfWithForwardedFieldsFirstAnnotation(Class<?> udfClass)
protected boolean udfWithForwardedFieldsSecondAnnotation(Class<?> udfClass)
Copyright © 2014–2017 The Apache Software Foundation. All rights reserved.