Class BeamPythonFunctionRunner
- java.lang.Object
-
- org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner
-
- All Implemented Interfaces:
AutoCloseable
,PythonFunctionRunner
- Direct Known Subclasses:
BeamDataStreamPythonFunctionRunner
,BeamTablePythonFunctionRunner
@Internal public abstract class BeamPythonFunctionRunner extends Object implements PythonFunctionRunner
ABeamPythonFunctionRunner
used to execute Python functions.
-
-
Field Summary
Fields Modifier and Type Field Description protected FlinkFnApi.CoderInfoDescriptor
inputCoderDescriptor
protected static org.slf4j.Logger
LOG
protected org.apache.beam.sdk.fn.data.FnDataReceiver<org.apache.beam.sdk.util.WindowedValue<byte[]>>
mainInputReceiver
The receiver which forwards the input elements to a remote environment for processing.protected FlinkFnApi.CoderInfoDescriptor
outputCoderDescriptor
protected LinkedBlockingQueue<Tuple2<String,byte[]>>
resultBuffer
Buffers the Python function execution result which has still not been processed.protected Map<String,FlinkFnApi.CoderInfoDescriptor>
sideOutputCoderDescriptors
-
Constructor Summary
Constructors Constructor Description BeamPythonFunctionRunner(String taskName, ProcessPythonEnvironmentManager environmentManager, FlinkMetricContainer flinkMetricContainer, KeyedStateBackend<?> keyedStateBackend, OperatorStateBackend operatorStateBackend, TypeSerializer<?> keySerializer, TypeSerializer<?> namespaceSerializer, TimerRegistration timerRegistration, MemoryManager memoryManager, double managedMemoryFraction, FlinkFnApi.CoderInfoDescriptor inputCoderDescriptor, FlinkFnApi.CoderInfoDescriptor outputCoderDescriptor, Map<String,FlinkFnApi.CoderInfoDescriptor> sideOutputCoderDescriptors)
-
Method Summary
All Methods Instance Methods Abstract Methods Concrete Methods Modifier and Type Method Description protected abstract void
buildTransforms(org.apache.beam.model.pipeline.v1.RunnerApi.Components.Builder componentsBuilder)
void
close()
Tear-down the Python function runner.org.apache.beam.runners.fnexecution.control.JobBundleFactory
createJobBundleFactory(org.apache.beam.vendor.grpc.v1p48p1.com.google.protobuf.Struct pipelineOptions)
void
flush()
Forces to finish the processing of the current bundle of elements.protected abstract Optional<org.apache.beam.model.pipeline.v1.RunnerApi.Coder>
getOptionalTimerCoderProto()
protected abstract List<org.apache.beam.runners.core.construction.graph.TimerReference>
getTimers(org.apache.beam.model.pipeline.v1.RunnerApi.Components components)
void
notifyNoMoreResults()
Interrupts the progress of takeResult.void
open(ReadableConfig config)
Prepares the Python function runner, such as preparing the Python execution environment, etc.Tuple3<String,byte[],Integer>
pollResult()
Retrieves the Python function result.void
process(byte[] data)
Executes the Python function with the input byte array.void
processTimer(byte[] timerData)
Send the triggered timer to the Python function.protected void
startBundle()
Tuple3<String,byte[],Integer>
takeResult()
Retrieves the Python function result, waiting if necessary until an element becomes available.
-
-
-
Field Detail
-
LOG
protected static final org.slf4j.Logger LOG
-
inputCoderDescriptor
protected final FlinkFnApi.CoderInfoDescriptor inputCoderDescriptor
-
outputCoderDescriptor
protected final FlinkFnApi.CoderInfoDescriptor outputCoderDescriptor
-
sideOutputCoderDescriptors
protected final Map<String,FlinkFnApi.CoderInfoDescriptor> sideOutputCoderDescriptors
-
resultBuffer
@VisibleForTesting protected transient LinkedBlockingQueue<Tuple2<String,byte[]>> resultBuffer
Buffers the Python function execution result which has still not been processed.
-
mainInputReceiver
@VisibleForTesting protected transient org.apache.beam.sdk.fn.data.FnDataReceiver<org.apache.beam.sdk.util.WindowedValue<byte[]>> mainInputReceiver
The receiver which forwards the input elements to a remote environment for processing.
-
-
Constructor Detail
-
BeamPythonFunctionRunner
public BeamPythonFunctionRunner(String taskName, ProcessPythonEnvironmentManager environmentManager, @Nullable FlinkMetricContainer flinkMetricContainer, @Nullable KeyedStateBackend<?> keyedStateBackend, @Nullable OperatorStateBackend operatorStateBackend, @Nullable TypeSerializer<?> keySerializer, @Nullable TypeSerializer<?> namespaceSerializer, @Nullable TimerRegistration timerRegistration, MemoryManager memoryManager, double managedMemoryFraction, FlinkFnApi.CoderInfoDescriptor inputCoderDescriptor, FlinkFnApi.CoderInfoDescriptor outputCoderDescriptor, Map<String,FlinkFnApi.CoderInfoDescriptor> sideOutputCoderDescriptors)
-
-
Method Detail
-
open
public void open(ReadableConfig config) throws Exception
Description copied from interface:PythonFunctionRunner
Prepares the Python function runner, such as preparing the Python execution environment, etc.- Specified by:
open
in interfacePythonFunctionRunner
- Throws:
Exception
-
close
public void close() throws Exception
Description copied from interface:PythonFunctionRunner
Tear-down the Python function runner.- Specified by:
close
in interfaceAutoCloseable
- Specified by:
close
in interfacePythonFunctionRunner
- Throws:
Exception
-
process
public void process(byte[] data) throws Exception
Description copied from interface:PythonFunctionRunner
Executes the Python function with the input byte array.- Specified by:
process
in interfacePythonFunctionRunner
- Parameters:
data
- the byte array data.- Throws:
Exception
-
processTimer
public void processTimer(byte[] timerData) throws Exception
Description copied from interface:PythonFunctionRunner
Send the triggered timer to the Python function.- Specified by:
processTimer
in interfacePythonFunctionRunner
- Throws:
Exception
-
startBundle
@VisibleForTesting protected void startBundle()
-
pollResult
public Tuple3<String,byte[],Integer> pollResult() throws Exception
Description copied from interface:PythonFunctionRunner
Retrieves the Python function result.- Specified by:
pollResult
in interfacePythonFunctionRunner
- Returns:
- the head of he Python function result buffer, or
null
if the result buffer is empty. f0 means the byte array buffer which stores the Python function result. f1 means the length of the Python function result byte array. - Throws:
Exception
-
takeResult
public Tuple3<String,byte[],Integer> takeResult() throws Exception
Description copied from interface:PythonFunctionRunner
Retrieves the Python function result, waiting if necessary until an element becomes available.- Specified by:
takeResult
in interfacePythonFunctionRunner
- Returns:
- the head of he Python function result buffer. f0 means the byte array buffer which stores the Python function result. f1 means the length of the Python function result byte array.
- Throws:
Exception
-
flush
public void flush() throws Exception
Description copied from interface:PythonFunctionRunner
Forces to finish the processing of the current bundle of elements. It will flush the data cached in the data buffer for processing and retrieves the state mutations (if exists) made by the Python function. The call blocks until all of the outputs produced by this bundle have been received.- Specified by:
flush
in interfacePythonFunctionRunner
- Throws:
Exception
-
notifyNoMoreResults
public void notifyNoMoreResults()
Interrupts the progress of takeResult.
-
buildTransforms
protected abstract void buildTransforms(org.apache.beam.model.pipeline.v1.RunnerApi.Components.Builder componentsBuilder)
-
getTimers
protected abstract List<org.apache.beam.runners.core.construction.graph.TimerReference> getTimers(org.apache.beam.model.pipeline.v1.RunnerApi.Components components)
-
getOptionalTimerCoderProto
protected abstract Optional<org.apache.beam.model.pipeline.v1.RunnerApi.Coder> getOptionalTimerCoderProto()
-
createJobBundleFactory
@VisibleForTesting public org.apache.beam.runners.fnexecution.control.JobBundleFactory createJobBundleFactory(org.apache.beam.vendor.grpc.v1p48p1.com.google.protobuf.Struct pipelineOptions) throws Exception
- Throws:
Exception
-
-