public class BatchTask<S extends Function,OT> extends AbstractInvokable implements TaskContext<S,OT>
Modifier and Type | Field and Description |
---|---|
protected Map<String,Accumulator<?,?>> |
accumulatorMap
The accumulator map used in the RuntimeContext.
|
protected MutableReader<?>[] |
broadcastInputReaders
The input readers for the configured broadcast variables for this task.
|
protected TypeSerializerFactory<?>[] |
broadcastInputSerializers
The serializers for the broadcast input data types.
|
protected ArrayList<ChainedDriver<?,?>> |
chainedTasks
A list of chained drivers, if there are any.
|
protected TaskConfig |
config
The task configuration with the setup parameters.
|
protected Driver<S,OT> |
driver
The driver that invokes the user code (the stub implementation).
|
protected List<RecordWriter<?>> |
eventualOutputs
The output writers for the data that this task forwards to the next task.
|
protected TypeComparator<?>[] |
inputComparators
The comparators for the central driver.
|
protected MutableObjectIterator<?>[] |
inputIterators
The inputs reader, wrapped in an iterator.
|
protected MutableReader<?>[] |
inputReaders
The input readers of this task.
|
protected MutableObjectIterator<?>[] |
inputs
The inputs to the operator.
|
protected TypeSerializerFactory<?>[] |
inputSerializers
The serializers for the input data type.
|
protected int[] |
iterativeBroadcastInputs
The indices of the iterative broadcast inputs.
|
protected int[] |
iterativeInputs
The indices of the iterative inputs.
|
protected CloseableInputProvider<?>[] |
localStrategies
The local strategies that are applied on the inputs.
|
protected static org.slf4j.Logger |
LOG |
protected Collector<OT> |
output
The collector that forwards the user code's results.
|
protected SpillingResettableMutableObjectIterator<?>[] |
resettableInputs
The resettable inputs in the case where no temp barrier is needed.
|
protected boolean |
running
The flag that tags the task as still running.
|
protected DistributedRuntimeUDFContext |
runtimeUdfContext
The udf's runtime context.
|
protected S |
stub
The instantiated user code of this task's main operator (driver).
|
protected TempBarrier<?>[] |
tempBarriers
The optional temp barriers on the inputs for dead-lock breaking.
|
Constructor and Description |
---|
BatchTask(Environment environment)
Create an Invokable task and set its environment.
|
Modifier and Type | Method and Description |
---|---|
void |
cancel()
This method is called when a task is canceled either as a result of a user abort or an
execution failure.
|
static void |
cancelChainedTasks(List<ChainedDriver<?,?>> tasks)
Cancels all tasks via their
ChainedDriver.cancelTask() method. |
static void |
clearReaders(MutableReader<?>[] readers) |
static void |
clearWriters(List<RecordWriter<?>> writers) |
static void |
closeChainedTasks(List<ChainedDriver<?,?>> tasks,
AbstractInvokable parent)
Closes all chained tasks, in the order as they are stored in the array.
|
protected void |
closeLocalStrategiesAndCaches() |
static void |
closeUserCode(Function stub)
Closes the given stub using its
RichFunction.close() method. |
static String |
constructLogString(String message,
String taskName,
AbstractInvokable parent)
Utility function that composes a string for logging purposes.
|
protected MutableObjectIterator<?> |
createInputIterator(MutableReader<?> inputReader,
TypeSerializerFactory<?> serializerFactory) |
DistributedRuntimeUDFContext |
createRuntimeContext(OperatorMetricGroup metrics) |
protected void |
excludeFromReset(int inputNum) |
String |
formatLogString(String message) |
AbstractInvokable |
getContainingTask() |
<X> TypeComparator<X> |
getDriverComparator(int index) |
<X> MutableObjectIterator<X> |
getInput(int index) |
<X> TypeSerializerFactory<X> |
getInputSerializer(int index) |
IOManager |
getIOManager() |
protected Collector<OT> |
getLastOutputCollector() |
TaskConfig |
getLastTasksConfig() |
MemoryManager |
getMemoryManager() |
OperatorMetricGroup |
getMetricGroup() |
protected int |
getNumTaskInputs() |
Collector<OT> |
getOutputCollector() |
static <T> Collector<T> |
getOutputCollector(AbstractInvokable task,
TaskConfig config,
ClassLoader cl,
List<RecordWriter<?>> eventualOutputs,
int outputOffset,
int numOutputs)
Creates the
Collector for the given task, as described by the given configuration. |
S |
getStub() |
TaskConfig |
getTaskConfig() |
TaskManagerRuntimeInfo |
getTaskManagerInfo() |
protected void |
initBroadcastInputReaders()
Creates the record readers for the extra broadcast inputs as configured by
TaskConfig.getNumBroadcastInputs() . |
protected void |
initBroadcastInputsSerializers(int numBroadcastInputs)
Creates all the serializers and iterators for the broadcast inputs.
|
protected void |
initialize() |
protected void |
initInputReaders()
Creates the record readers for the number of inputs as defined by
getNumTaskInputs() . |
protected void |
initInputsSerializersAndComparators(int numInputs,
int numComparators)
Creates all the serializers and comparators.
|
protected void |
initLocalStrategies(int numInputs)
NOTE: This method must be invoked after the invocation of
#initInputReaders() and
#initInputSerializersAndComparators(int) ! |
protected void |
initOutputs()
Creates a writer for each output.
|
static <T> Collector<T> |
initOutputs(AbstractInvokable containingTask,
UserCodeClassLoader cl,
TaskConfig config,
List<ChainedDriver<?,?>> chainedTasksTarget,
List<RecordWriter<?>> eventualOutputs,
ExecutionConfig executionConfig,
Map<String,Accumulator<?,?>> accumulatorMap)
Creates a writer for each output.
|
protected S |
initStub(Class<? super S> stubSuperClass) |
static <T> T |
instantiateUserCode(TaskConfig config,
ClassLoader cl,
Class<? super T> superClass)
Instantiates a user code class from is definition in the task configuration.
|
void |
invoke()
The main work method.
|
static void |
logAndThrowException(Exception ex,
AbstractInvokable parent)
Prints an error message and throws the given exception.
|
static void |
openChainedTasks(List<ChainedDriver<?,?>> tasks,
AbstractInvokable parent)
Opens all chained tasks, in the order as they are stored in the array.
|
static void |
openUserCode(Function stub,
Configuration parameters)
Opens the given stub using its
RichFunction.open(Configuration) method. |
protected <X> void |
readAndSetBroadcastInput(int inputNum,
String bcVarName,
DistributedRuntimeUDFContext context,
int superstep) |
protected void |
releaseBroadcastVariables(String bcVarName,
int superstep,
DistributedRuntimeUDFContext context) |
protected void |
resetAllInputs() |
protected void |
run() |
protected void |
setLastOutputCollector(Collector<OT> newOutputCollector)
|
abortCheckpointOnBarrier, cleanUp, dispatchOperatorEvent, getCurrentNumberOfSubtasks, getEnvironment, getExecutionConfig, getIndexInSubtaskGroup, getJobConfiguration, getTaskConfiguration, getUserCodeClassLoader, isUsingNonBlockingInput, maybeInterruptOnCancel, notifyCheckpointAbortAsync, notifyCheckpointCompleteAsync, notifyCheckpointSubsumedAsync, restore, triggerCheckpointAsync, triggerCheckpointOnBarrier
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
getExecutionConfig, getUserCodeClassLoader
protected static final org.slf4j.Logger LOG
protected volatile Driver<S extends Function,OT> driver
protected S extends Function stub
protected DistributedRuntimeUDFContext runtimeUdfContext
protected Collector<OT> output
protected List<RecordWriter<?>> eventualOutputs
protected MutableReader<?>[] inputReaders
protected MutableReader<?>[] broadcastInputReaders
protected MutableObjectIterator<?>[] inputIterators
protected int[] iterativeInputs
protected int[] iterativeBroadcastInputs
protected volatile CloseableInputProvider<?>[] localStrategies
protected volatile TempBarrier<?>[] tempBarriers
protected volatile SpillingResettableMutableObjectIterator<?>[] resettableInputs
protected MutableObjectIterator<?>[] inputs
protected TypeSerializerFactory<?>[] inputSerializers
protected TypeSerializerFactory<?>[] broadcastInputSerializers
protected TypeComparator<?>[] inputComparators
protected TaskConfig config
protected ArrayList<ChainedDriver<?,?>> chainedTasks
protected volatile boolean running
protected Map<String,Accumulator<?,?>> accumulatorMap
public BatchTask(Environment environment)
environment
- The environment assigned to this invokable.public void invoke() throws Exception
invoke
in interface TaskInvokable
invoke
in class AbstractInvokable
Exception
public void cancel() throws Exception
TaskInvokable
cancel
in interface TaskInvokable
cancel
in class AbstractInvokable
Exception
protected <X> void readAndSetBroadcastInput(int inputNum, String bcVarName, DistributedRuntimeUDFContext context, int superstep) throws IOException
IOException
protected void releaseBroadcastVariables(String bcVarName, int superstep, DistributedRuntimeUDFContext context)
protected void closeLocalStrategiesAndCaches()
protected Collector<OT> getLastOutputCollector()
protected void setLastOutputCollector(Collector<OT> newOutputCollector)
Collector
of the collector chain of this BatchTask
.
In case of chained tasks, the output collector of the last ChainedDriver
is set.
Otherwise it is the single collector of the BatchTask
.
newOutputCollector
- new output collector to set as last collectorpublic TaskConfig getLastTasksConfig()
protected void initInputReaders() throws Exception
getNumTaskInputs()
. This method requires that the task configuration, the driver, and the
user-code class loader are set.Exception
protected void initBroadcastInputReaders() throws Exception
TaskConfig.getNumBroadcastInputs()
. This method requires that the task configuration, the
driver, and the user-code class loader are set.Exception
protected void initInputsSerializersAndComparators(int numInputs, int numComparators)
protected void initBroadcastInputsSerializers(int numBroadcastInputs)
protected void initLocalStrategies(int numInputs) throws Exception
#initInputReaders()
and
#initInputSerializersAndComparators(int)
!Exception
protected void excludeFromReset(int inputNum)
protected MutableObjectIterator<?> createInputIterator(MutableReader<?> inputReader, TypeSerializerFactory<?> serializerFactory)
protected int getNumTaskInputs()
protected void initOutputs() throws Exception
Exception
public DistributedRuntimeUDFContext createRuntimeContext(OperatorMetricGroup metrics)
public TaskConfig getTaskConfig()
getTaskConfig
in interface TaskContext<S extends Function,OT>
public TaskManagerRuntimeInfo getTaskManagerInfo()
getTaskManagerInfo
in interface TaskContext<S extends Function,OT>
public MemoryManager getMemoryManager()
getMemoryManager
in interface TaskContext<S extends Function,OT>
public IOManager getIOManager()
getIOManager
in interface TaskContext<S extends Function,OT>
public Collector<OT> getOutputCollector()
getOutputCollector
in interface TaskContext<S extends Function,OT>
public AbstractInvokable getContainingTask()
getContainingTask
in interface TaskContext<S extends Function,OT>
public String formatLogString(String message)
formatLogString
in interface TaskContext<S extends Function,OT>
public OperatorMetricGroup getMetricGroup()
getMetricGroup
in interface TaskContext<S extends Function,OT>
public <X> MutableObjectIterator<X> getInput(int index)
getInput
in interface TaskContext<S extends Function,OT>
public <X> TypeSerializerFactory<X> getInputSerializer(int index)
getInputSerializer
in interface TaskContext<S extends Function,OT>
public <X> TypeComparator<X> getDriverComparator(int index)
getDriverComparator
in interface TaskContext<S extends Function,OT>
public static String constructLogString(String message, String taskName, AbstractInvokable parent)
message
- The main message for the log.taskName
- The name of the task.parent
- The task that contains the code producing the message.public static void logAndThrowException(Exception ex, AbstractInvokable parent) throws Exception
ExceptionInChainedStubException
then the chain of contained exceptions is followed
until an exception of a different type is found.ex
- The exception to be thrown.parent
- The parent task, whose information is included in the log message.Exception
- Always thrown.public static <T> Collector<T> getOutputCollector(AbstractInvokable task, TaskConfig config, ClassLoader cl, List<RecordWriter<?>> eventualOutputs, int outputOffset, int numOutputs) throws Exception
Collector
for the given task, as described by the given configuration.
The output collector contains the writers that forward the data to the different tasks that
the given task is connected to. Each writer applies the partitioning as described in the
configuration.task
- The task that the output collector is created for.config
- The configuration describing the output shipping strategies.cl
- The classloader used to load user defined types.eventualOutputs
- The output writers that this task forwards to the next task for each
output.outputOffset
- The offset to start to get the writers for the outputsnumOutputs
- The number of outputs described in the configuration.Exception
public static <T> Collector<T> initOutputs(AbstractInvokable containingTask, UserCodeClassLoader cl, TaskConfig config, List<ChainedDriver<?,?>> chainedTasksTarget, List<RecordWriter<?>> eventualOutputs, ExecutionConfig executionConfig, Map<String,Accumulator<?,?>> accumulatorMap) throws Exception
Exception
public static void openUserCode(Function stub, Configuration parameters) throws Exception
RichFunction.open(Configuration)
method. If the open
call produces an exception, a new exception with a standard error message is created, using
the encountered exception as its cause.stub
- The user code instance to be opened.parameters
- The parameters supplied to the user code.Exception
- Thrown, if the user code's open method produces an exception.public static void closeUserCode(Function stub) throws Exception
RichFunction.close()
method. If the close call
produces an exception, a new exception with a standard error message is created, using the
encountered exception as its cause.stub
- The user code instance to be closed.Exception
- Thrown, if the user code's close method produces an exception.public static void openChainedTasks(List<ChainedDriver<?,?>> tasks, AbstractInvokable parent) throws Exception
tasks
- The tasks to be opened.parent
- The parent task, used to obtain parameters to include in the log message.Exception
- Thrown, if the opening encounters an exception.public static void closeChainedTasks(List<ChainedDriver<?,?>> tasks, AbstractInvokable parent) throws Exception
tasks
- The tasks to be closed.parent
- The parent task, used to obtain parameters to include in the log message.Exception
- Thrown, if the closing encounters an exception.public static void cancelChainedTasks(List<ChainedDriver<?,?>> tasks)
ChainedDriver.cancelTask()
method. Any occurring
exception and error is suppressed, such that the canceling method of every task is invoked in
all cases.tasks
- The tasks to be canceled.public static <T> T instantiateUserCode(TaskConfig config, ClassLoader cl, Class<? super T> superClass)
T
- The generic type of the user code class.config
- The task configuration containing the class description.cl
- The class loader to be used to load the class.superClass
- The super class that the user code class extends or implements, for type
checking.public static void clearWriters(List<RecordWriter<?>> writers)
public static void clearReaders(MutableReader<?>[] readers)
Copyright © 2014–2024 The Apache Software Foundation. All rights reserved.