Class BatchTask<S extends Function,OT>
- java.lang.Object
-
- org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable
-
- org.apache.flink.runtime.operators.BatchTask<S,OT>
-
- All Implemented Interfaces:
CheckpointableTask
,CoordinatedTask
,TaskInvokable
,TaskContext<S,OT>
- Direct Known Subclasses:
AbstractIterativeTask
public class BatchTask<S extends Function,OT> extends AbstractInvokable implements TaskContext<S,OT>
The base class for all batch tasks. Encapsulated common behavior and implements the main life-cycle of the user code.
-
-
Field Summary
Fields Modifier and Type Field Description protected Map<String,Accumulator<?,?>>
accumulatorMap
The accumulator map used in the RuntimeContext.protected MutableReader<?>[]
broadcastInputReaders
The input readers for the configured broadcast variables for this task.protected TypeSerializerFactory<?>[]
broadcastInputSerializers
The serializers for the broadcast input data types.protected ArrayList<ChainedDriver<?,?>>
chainedTasks
A list of chained drivers, if there are any.protected TaskConfig
config
The task configuration with the setup parameters.protected Driver<S,OT>
driver
The driver that invokes the user code (the stub implementation).protected List<RecordWriter<?>>
eventualOutputs
The output writers for the data that this task forwards to the next task.protected TypeComparator<?>[]
inputComparators
The comparators for the central driver.protected MutableObjectIterator<?>[]
inputIterators
The inputs reader, wrapped in an iterator.protected MutableReader<?>[]
inputReaders
The input readers of this task.protected MutableObjectIterator<?>[]
inputs
The inputs to the operator.protected TypeSerializerFactory<?>[]
inputSerializers
The serializers for the input data type.protected int[]
iterativeBroadcastInputs
The indices of the iterative broadcast inputs.protected int[]
iterativeInputs
The indices of the iterative inputs.protected CloseableInputProvider<?>[]
localStrategies
The local strategies that are applied on the inputs.protected static org.slf4j.Logger
LOG
protected Collector<OT>
output
The collector that forwards the user code's results.protected SpillingResettableMutableObjectIterator<?>[]
resettableInputs
The resettable inputs in the case where no temp barrier is needed.protected boolean
running
The flag that tags the task as still running.protected DistributedRuntimeUDFContext
runtimeUdfContext
The udf's runtime context.protected S
stub
The instantiated user code of this task's main operator (driver).protected TempBarrier<?>[]
tempBarriers
The optional temp barriers on the inputs for dead-lock breaking.
-
Constructor Summary
Constructors Constructor Description BatchTask(Environment environment)
Create an Invokable task and set its environment.
-
Method Summary
All Methods Static Methods Instance Methods Concrete Methods Modifier and Type Method Description void
cancel()
This method is called when a task is canceled either as a result of a user abort or an execution failure.static void
cancelChainedTasks(List<ChainedDriver<?,?>> tasks)
Cancels all tasks via theirChainedDriver.cancelTask()
method.static void
clearReaders(MutableReader<?>[] readers)
static void
clearWriters(List<RecordWriter<?>> writers)
static void
closeChainedTasks(List<ChainedDriver<?,?>> tasks, AbstractInvokable parent)
Closes all chained tasks, in the order as they are stored in the array.protected void
closeLocalStrategiesAndCaches()
static void
closeUserCode(Function stub)
Closes the given stub using itsRichFunction.close()
method.static String
constructLogString(String message, String taskName, AbstractInvokable parent)
Utility function that composes a string for logging purposes.protected MutableObjectIterator<?>
createInputIterator(MutableReader<?> inputReader, TypeSerializerFactory<?> serializerFactory)
DistributedRuntimeUDFContext
createRuntimeContext(OperatorMetricGroup metrics)
protected void
excludeFromReset(int inputNum)
String
formatLogString(String message)
AbstractInvokable
getContainingTask()
<X> TypeComparator<X>
getDriverComparator(int index)
<X> MutableObjectIterator<X>
getInput(int index)
<X> TypeSerializerFactory<X>
getInputSerializer(int index)
IOManager
getIOManager()
protected Collector<OT>
getLastOutputCollector()
TaskConfig
getLastTasksConfig()
MemoryManager
getMemoryManager()
OperatorMetricGroup
getMetricGroup()
protected int
getNumTaskInputs()
Collector<OT>
getOutputCollector()
static <T> Collector<T>
getOutputCollector(AbstractInvokable task, TaskConfig config, ClassLoader cl, List<RecordWriter<?>> eventualOutputs, int outputOffset, int numOutputs)
Creates theCollector
for the given task, as described by the given configuration.S
getStub()
TaskConfig
getTaskConfig()
TaskManagerRuntimeInfo
getTaskManagerInfo()
protected void
initBroadcastInputReaders()
Creates the record readers for the extra broadcast inputs as configured byTaskConfig.getNumBroadcastInputs()
.protected void
initBroadcastInputsSerializers(int numBroadcastInputs)
Creates all the serializers and iterators for the broadcast inputs.protected void
initialize()
protected void
initInputReaders()
Creates the record readers for the number of inputs as defined bygetNumTaskInputs()
.protected void
initInputsSerializersAndComparators(int numInputs, int numComparators)
Creates all the serializers and comparators.protected void
initLocalStrategies(int numInputs)
NOTE: This method must be invoked after the invocation of#initInputReaders()
and#initInputSerializersAndComparators(int)
!protected void
initOutputs()
Creates a writer for each output.static <T> Collector<T>
initOutputs(AbstractInvokable containingTask, UserCodeClassLoader cl, TaskConfig config, List<ChainedDriver<?,?>> chainedTasksTarget, List<RecordWriter<?>> eventualOutputs, ExecutionConfig executionConfig, Map<String,Accumulator<?,?>> accumulatorMap)
Creates a writer for each output.protected S
initStub(Class<? super S> stubSuperClass)
static <T> T
instantiateUserCode(TaskConfig config, ClassLoader cl, Class<? super T> superClass)
Instantiates a user code class from is definition in the task configuration.void
invoke()
The main work method.static void
logAndThrowException(Exception ex, AbstractInvokable parent)
Prints an error message and throws the given exception.static void
openChainedTasks(List<ChainedDriver<?,?>> tasks, AbstractInvokable parent)
Opens all chained tasks, in the order as they are stored in the array.static void
openUserCode(Function stub, Configuration parameters)
Opens the given stub using itsRichFunction.open(OpenContext)
method.protected <X> void
readAndSetBroadcastInput(int inputNum, String bcVarName, DistributedRuntimeUDFContext context, int superstep)
protected void
releaseBroadcastVariables(String bcVarName, int superstep, DistributedRuntimeUDFContext context)
protected void
resetAllInputs()
protected void
run()
protected void
setLastOutputCollector(Collector<OT> newOutputCollector)
-
Methods inherited from class org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable
abortCheckpointOnBarrier, cleanUp, dispatchOperatorEvent, getCurrentNumberOfSubtasks, getEnvironment, getExecutionConfig, getIndexInSubtaskGroup, getJobConfiguration, getTaskConfiguration, getUserCodeClassLoader, isUsingNonBlockingInput, maybeInterruptOnCancel, notifyCheckpointAbortAsync, notifyCheckpointCompleteAsync, notifyCheckpointSubsumedAsync, restore, triggerCheckpointAsync, triggerCheckpointOnBarrier
-
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
-
Methods inherited from interface org.apache.flink.runtime.operators.TaskContext
getExecutionConfig, getUserCodeClassLoader
-
-
-
-
Field Detail
-
LOG
protected static final org.slf4j.Logger LOG
-
driver
protected volatile Driver<S extends Function,OT> driver
The driver that invokes the user code (the stub implementation). The central driver in this task (further drivers may be chained behind this driver).
-
stub
protected S extends Function stub
The instantiated user code of this task's main operator (driver). May be null if the operator has no udf.
-
runtimeUdfContext
protected DistributedRuntimeUDFContext runtimeUdfContext
The udf's runtime context.
-
output
protected Collector<OT> output
The collector that forwards the user code's results. May forward to a channel or to chained drivers within this task.
-
eventualOutputs
protected List<RecordWriter<?>> eventualOutputs
The output writers for the data that this task forwards to the next task. The latest driver (the central, if no chained drivers exist, otherwise the last chained driver) produces its output to these writers.
-
inputReaders
protected MutableReader<?>[] inputReaders
The input readers of this task.
-
broadcastInputReaders
protected MutableReader<?>[] broadcastInputReaders
The input readers for the configured broadcast variables for this task.
-
inputIterators
protected MutableObjectIterator<?>[] inputIterators
The inputs reader, wrapped in an iterator. Prior to the local strategies, etc...
-
iterativeInputs
protected int[] iterativeInputs
The indices of the iterative inputs. Empty, if the task is not iterative.
-
iterativeBroadcastInputs
protected int[] iterativeBroadcastInputs
The indices of the iterative broadcast inputs. Empty, if non of the inputs is iterative.
-
localStrategies
protected volatile CloseableInputProvider<?>[] localStrategies
The local strategies that are applied on the inputs.
-
tempBarriers
protected volatile TempBarrier<?>[] tempBarriers
The optional temp barriers on the inputs for dead-lock breaking. Are optionally resettable.
-
resettableInputs
protected volatile SpillingResettableMutableObjectIterator<?>[] resettableInputs
The resettable inputs in the case where no temp barrier is needed.
-
inputs
protected MutableObjectIterator<?>[] inputs
The inputs to the operator. Return the readers' data after the application of the local strategy and the temp-table barrier.
-
inputSerializers
protected TypeSerializerFactory<?>[] inputSerializers
The serializers for the input data type.
-
broadcastInputSerializers
protected TypeSerializerFactory<?>[] broadcastInputSerializers
The serializers for the broadcast input data types.
-
inputComparators
protected TypeComparator<?>[] inputComparators
The comparators for the central driver.
-
config
protected TaskConfig config
The task configuration with the setup parameters.
-
chainedTasks
protected ArrayList<ChainedDriver<?,?>> chainedTasks
A list of chained drivers, if there are any.
-
running
protected volatile boolean running
The flag that tags the task as still running. Checked periodically to abort processing.
-
accumulatorMap
protected Map<String,Accumulator<?,?>> accumulatorMap
The accumulator map used in the RuntimeContext.
-
-
Constructor Detail
-
BatchTask
public BatchTask(Environment environment)
Create an Invokable task and set its environment.- Parameters:
environment
- The environment assigned to this invokable.
-
-
Method Detail
-
invoke
public void invoke() throws Exception
The main work method.- Specified by:
invoke
in interfaceTaskInvokable
- Specified by:
invoke
in classAbstractInvokable
- Throws:
Exception
-
cancel
public void cancel() throws Exception
Description copied from interface:TaskInvokable
This method is called when a task is canceled either as a result of a user abort or an execution failure. It can be overwritten to respond to shut down the user code properly.- Specified by:
cancel
in interfaceTaskInvokable
- Overrides:
cancel
in classAbstractInvokable
- Throws:
Exception
-
readAndSetBroadcastInput
protected <X> void readAndSetBroadcastInput(int inputNum, String bcVarName, DistributedRuntimeUDFContext context, int superstep) throws IOException
- Throws:
IOException
-
releaseBroadcastVariables
protected void releaseBroadcastVariables(String bcVarName, int superstep, DistributedRuntimeUDFContext context)
-
closeLocalStrategiesAndCaches
protected void closeLocalStrategiesAndCaches()
-
getLastOutputCollector
protected Collector<OT> getLastOutputCollector()
- Returns:
- the last output collector in the collector chain
-
setLastOutputCollector
protected void setLastOutputCollector(Collector<OT> newOutputCollector)
Sets the last outputCollector
of the collector chain of thisBatchTask
.In case of chained tasks, the output collector of the last
ChainedDriver
is set. Otherwise it is the single collector of theBatchTask
.- Parameters:
newOutputCollector
- new output collector to set as last collector
-
getLastTasksConfig
public TaskConfig getLastTasksConfig()
-
initInputReaders
protected void initInputReaders() throws Exception
Creates the record readers for the number of inputs as defined bygetNumTaskInputs()
. This method requires that the task configuration, the driver, and the user-code class loader are set.- Throws:
Exception
-
initBroadcastInputReaders
protected void initBroadcastInputReaders() throws Exception
Creates the record readers for the extra broadcast inputs as configured byTaskConfig.getNumBroadcastInputs()
. This method requires that the task configuration, the driver, and the user-code class loader are set.- Throws:
Exception
-
initInputsSerializersAndComparators
protected void initInputsSerializersAndComparators(int numInputs, int numComparators)
Creates all the serializers and comparators.
-
initBroadcastInputsSerializers
protected void initBroadcastInputsSerializers(int numBroadcastInputs)
Creates all the serializers and iterators for the broadcast inputs.
-
initLocalStrategies
protected void initLocalStrategies(int numInputs) throws Exception
NOTE: This method must be invoked after the invocation of#initInputReaders()
and#initInputSerializersAndComparators(int)
!- Throws:
Exception
-
excludeFromReset
protected void excludeFromReset(int inputNum)
-
createInputIterator
protected MutableObjectIterator<?> createInputIterator(MutableReader<?> inputReader, TypeSerializerFactory<?> serializerFactory)
-
getNumTaskInputs
protected int getNumTaskInputs()
-
initOutputs
protected void initOutputs() throws Exception
Creates a writer for each output. Creates an OutputCollector which forwards its input to all writers. The output collector applies the configured shipping strategies for each writer.- Throws:
Exception
-
createRuntimeContext
public DistributedRuntimeUDFContext createRuntimeContext(OperatorMetricGroup metrics)
-
getTaskConfig
public TaskConfig getTaskConfig()
- Specified by:
getTaskConfig
in interfaceTaskContext<S extends Function,OT>
-
getTaskManagerInfo
public TaskManagerRuntimeInfo getTaskManagerInfo()
- Specified by:
getTaskManagerInfo
in interfaceTaskContext<S extends Function,OT>
-
getMemoryManager
public MemoryManager getMemoryManager()
- Specified by:
getMemoryManager
in interfaceTaskContext<S extends Function,OT>
-
getIOManager
public IOManager getIOManager()
- Specified by:
getIOManager
in interfaceTaskContext<S extends Function,OT>
-
getOutputCollector
public Collector<OT> getOutputCollector()
- Specified by:
getOutputCollector
in interfaceTaskContext<S extends Function,OT>
-
getContainingTask
public AbstractInvokable getContainingTask()
- Specified by:
getContainingTask
in interfaceTaskContext<S extends Function,OT>
-
formatLogString
public String formatLogString(String message)
- Specified by:
formatLogString
in interfaceTaskContext<S extends Function,OT>
-
getMetricGroup
public OperatorMetricGroup getMetricGroup()
- Specified by:
getMetricGroup
in interfaceTaskContext<S extends Function,OT>
-
getInput
public <X> MutableObjectIterator<X> getInput(int index)
- Specified by:
getInput
in interfaceTaskContext<S extends Function,OT>
-
getInputSerializer
public <X> TypeSerializerFactory<X> getInputSerializer(int index)
- Specified by:
getInputSerializer
in interfaceTaskContext<S extends Function,OT>
-
getDriverComparator
public <X> TypeComparator<X> getDriverComparator(int index)
- Specified by:
getDriverComparator
in interfaceTaskContext<S extends Function,OT>
-
constructLogString
public static String constructLogString(String message, String taskName, AbstractInvokable parent)
Utility function that composes a string for logging purposes. The string includes the given message, the given name of the task and the index in its subtask group as well as the number of instances that exist in its subtask group.- Parameters:
message
- The main message for the log.taskName
- The name of the task.parent
- The task that contains the code producing the message.- Returns:
- The string for logging.
-
logAndThrowException
public static void logAndThrowException(Exception ex, AbstractInvokable parent) throws Exception
Prints an error message and throws the given exception. If the exception is of the typeExceptionInChainedStubException
then the chain of contained exceptions is followed until an exception of a different type is found.- Parameters:
ex
- The exception to be thrown.parent
- The parent task, whose information is included in the log message.- Throws:
Exception
- Always thrown.
-
getOutputCollector
public static <T> Collector<T> getOutputCollector(AbstractInvokable task, TaskConfig config, ClassLoader cl, List<RecordWriter<?>> eventualOutputs, int outputOffset, int numOutputs) throws Exception
Creates theCollector
for the given task, as described by the given configuration. The output collector contains the writers that forward the data to the different tasks that the given task is connected to. Each writer applies the partitioning as described in the configuration.- Parameters:
task
- The task that the output collector is created for.config
- The configuration describing the output shipping strategies.cl
- The classloader used to load user defined types.eventualOutputs
- The output writers that this task forwards to the next task for each output.outputOffset
- The offset to start to get the writers for the outputsnumOutputs
- The number of outputs described in the configuration.- Returns:
- The OutputCollector that data produced in this task is submitted to.
- Throws:
Exception
-
initOutputs
public static <T> Collector<T> initOutputs(AbstractInvokable containingTask, UserCodeClassLoader cl, TaskConfig config, List<ChainedDriver<?,?>> chainedTasksTarget, List<RecordWriter<?>> eventualOutputs, ExecutionConfig executionConfig, Map<String,Accumulator<?,?>> accumulatorMap) throws Exception
Creates a writer for each output. Creates an OutputCollector which forwards its input to all writers. The output collector applies the configured shipping strategy.- Throws:
Exception
-
openUserCode
public static void openUserCode(Function stub, Configuration parameters) throws Exception
Opens the given stub using itsRichFunction.open(OpenContext)
method. If the open call produces an exception, a new exception with a standard error message is created, using the encountered exception as its cause.- Parameters:
stub
- The user code instance to be opened.parameters
- The parameters supplied to the user code.- Throws:
Exception
- Thrown, if the user code's open method produces an exception.
-
closeUserCode
public static void closeUserCode(Function stub) throws Exception
Closes the given stub using itsRichFunction.close()
method. If the close call produces an exception, a new exception with a standard error message is created, using the encountered exception as its cause.- Parameters:
stub
- The user code instance to be closed.- Throws:
Exception
- Thrown, if the user code's close method produces an exception.
-
openChainedTasks
public static void openChainedTasks(List<ChainedDriver<?,?>> tasks, AbstractInvokable parent) throws Exception
Opens all chained tasks, in the order as they are stored in the array. The opening process creates a standardized log info message.- Parameters:
tasks
- The tasks to be opened.parent
- The parent task, used to obtain parameters to include in the log message.- Throws:
Exception
- Thrown, if the opening encounters an exception.
-
closeChainedTasks
public static void closeChainedTasks(List<ChainedDriver<?,?>> tasks, AbstractInvokable parent) throws Exception
Closes all chained tasks, in the order as they are stored in the array. The closing process creates a standardized log info message.- Parameters:
tasks
- The tasks to be closed.parent
- The parent task, used to obtain parameters to include in the log message.- Throws:
Exception
- Thrown, if the closing encounters an exception.
-
cancelChainedTasks
public static void cancelChainedTasks(List<ChainedDriver<?,?>> tasks)
Cancels all tasks via theirChainedDriver.cancelTask()
method. Any occurring exception and error is suppressed, such that the canceling method of every task is invoked in all cases.- Parameters:
tasks
- The tasks to be canceled.
-
instantiateUserCode
public static <T> T instantiateUserCode(TaskConfig config, ClassLoader cl, Class<? super T> superClass)
Instantiates a user code class from is definition in the task configuration. The class is instantiated without arguments using the null-ary constructor. Instantiation will fail if this constructor does not exist or is not public.- Type Parameters:
T
- The generic type of the user code class.- Parameters:
config
- The task configuration containing the class description.cl
- The class loader to be used to load the class.superClass
- The super class that the user code class extends or implements, for type checking.- Returns:
- An instance of the user code class.
-
clearWriters
public static void clearWriters(List<RecordWriter<?>> writers)
-
clearReaders
public static void clearReaders(MutableReader<?>[] readers)
-
-