Class CollectSinkFunction<IN>
- java.lang.Object
-
- org.apache.flink.api.common.functions.AbstractRichFunction
-
- org.apache.flink.streaming.api.functions.sink.legacy.RichSinkFunction<IN>
-
- org.apache.flink.streaming.api.operators.collect.CollectSinkFunction<IN>
-
- Type Parameters:
IN
- type of results to be written into the sink.
- All Implemented Interfaces:
Serializable
,Function
,RichFunction
,CheckpointListener
,CheckpointedFunction
,SinkFunction<IN>
@Internal public class CollectSinkFunction<IN> extends RichSinkFunction<IN> implements CheckpointedFunction, CheckpointListener
A sink function that collects query results and sends them back to the client.This sink works by limiting the number of results buffered in it (can be configured) so that when the buffer is full, it back-pressures the job until the client consumes some results.
NOTE: When using this sink, make sure that its parallelism is 1, and make sure that it is used in a
StreamTask
.Communication Protocol Explanation
We maintain the following variables in this communication protocol
- version: This variable will be set to a random value when the sink opens. Client discovers that the sink has restarted if this variable is different.
- offset: This indicates that client has successfully received the results before this offset. Sink can safely throw these results away.
- lastCheckpointedOffset: This is the value of
offset
when the checkpoint happens. This value will be restored from the checkpoint and set back tooffset
when the sink restarts. Clients who need exactly-once semantics need to rely on this value for the position to revert when a failover happens.
Client will put
version
andoffset
into the request, indicating that it thinks what the current version is and it has received this much results.Sink will check the validity of the request. If
version
mismatches oroffset
is smaller than expected, sink will send back the currentversion
andlastCheckpointedOffset
with an empty result list.If the request is valid, sink prepares some results starting from
offset
and sends them back to the client withlastCheckpointedOffset
. If there is currently no results starting fromoffset
, sink will not wait but will instead send back an empty result list.For client who wants exactly-once semantics, when receiving the response, the client will check for the following conditions:
- If the version mismatches, client knows that sink has restarted. It will throw away all
uncheckpointed results after
lastCheckpointedOffset
. - If
lastCheckpointedOffset
increases, client knows that a checkpoint happens. It can now move all results before this offset to a user-visible buffer. - If the response also contains new results, client will now move these new results into uncheckpointed buffer.
Note that
- user can only see results before a
lastCheckpointedOffset
, and - client will go back to the latest
lastCheckpointedOffset
when sink restarts,
client will never throw away results in user-visible buffer. So this communication protocol achieves exactly-once semantics.
In order not to block job finishing/cancelling, if there are still results in sink's buffer when job terminates, these results will be sent back to client through accumulators.
- See Also:
- Serialized Form
-
-
Nested Class Summary
-
Nested classes/interfaces inherited from interface org.apache.flink.streaming.api.functions.sink.legacy.SinkFunction
SinkFunction.Context
-
-
Constructor Summary
Constructors Constructor Description CollectSinkFunction(TypeSerializer<IN> serializer, long maxBytesPerBatch, String accumulatorName)
-
Method Summary
All Methods Static Methods Instance Methods Concrete Methods Modifier and Type Method Description void
accumulateFinalResults()
void
close()
Tear-down method for the user code.static Tuple2<Long,CollectCoordinationResponse>
deserializeAccumulatorResult(byte[] serializedAccResults)
void
initializeState(FunctionInitializationContext context)
This method is called when the parallel function instance is created during distributed execution.void
invoke(IN value, SinkFunction.Context context)
Writes the given value to the sink.void
notifyCheckpointAborted(long checkpointId)
This method is called as a notification once a distributed checkpoint has been aborted.void
notifyCheckpointComplete(long checkpointId)
Notifies the listener that the checkpoint with the givencheckpointId
completed and was committed.void
open(OpenContext openContext)
Initialization method for the function.static byte[]
serializeAccumulatorResult(long offset, String version, long lastCheckpointedOffset, List<byte[]> buffer)
void
setOperatorEventGateway(OperatorEventGateway eventGateway)
void
snapshotState(FunctionSnapshotContext context)
This method is called when a snapshot for a checkpoint is requested.-
Methods inherited from class org.apache.flink.api.common.functions.AbstractRichFunction
getIterationRuntimeContext, getRuntimeContext, setRuntimeContext
-
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
-
Methods inherited from interface org.apache.flink.streaming.api.functions.sink.legacy.SinkFunction
finish, invoke, writeWatermark
-
-
-
-
Constructor Detail
-
CollectSinkFunction
public CollectSinkFunction(TypeSerializer<IN> serializer, long maxBytesPerBatch, String accumulatorName)
-
-
Method Detail
-
initializeState
public void initializeState(FunctionInitializationContext context) throws Exception
Description copied from interface:CheckpointedFunction
This method is called when the parallel function instance is created during distributed execution. Functions typically set up their state storing data structures in this method.- Specified by:
initializeState
in interfaceCheckpointedFunction
- Parameters:
context
- the context for initializing the operator- Throws:
Exception
- Thrown, if state could not be created ot restored.
-
snapshotState
public void snapshotState(FunctionSnapshotContext context) throws Exception
Description copied from interface:CheckpointedFunction
This method is called when a snapshot for a checkpoint is requested. This acts as a hook to the function to ensure that all state is exposed by means previously offered throughFunctionInitializationContext
when the Function was initialized, or offered now byFunctionSnapshotContext
itself.- Specified by:
snapshotState
in interfaceCheckpointedFunction
- Parameters:
context
- the context for drawing a snapshot of the operator- Throws:
Exception
- Thrown, if state could not be created ot restored.
-
open
public void open(OpenContext openContext) throws Exception
Description copied from interface:RichFunction
Initialization method for the function. It is called before the actual working methods (like map or join) and thus suitable for one time setup work. For functions that are part of an iteration, this method will be invoked at the beginning of each iteration superstep.The openContext object passed to the function can be used for configuration and initialization. The openContext contains some necessary information that were configured on the function in the program composition.
public class MyFilter extends RichFilterFunction<String> { private String searchString; public void open(OpenContext openContext) { // initialize the value of searchString } public boolean filter(String value) { return value.equals(searchString); } }
- Specified by:
open
in interfaceRichFunction
- Overrides:
open
in classAbstractRichFunction
- Parameters:
openContext
- The context containing information about the context in which the function is opened.- Throws:
Exception
- Implementations may forward exceptions, which are caught by the runtime. When the runtime catches an exception, it aborts the task and lets the fail-over logic decide whether to retry the task execution.
-
invoke
public void invoke(IN value, SinkFunction.Context context) throws Exception
Description copied from interface:SinkFunction
Writes the given value to the sink. This function is called for every record.You have to override this method when implementing a
SinkFunction
, this is adefault
method for backward compatibility with the old-style method only.- Specified by:
invoke
in interfaceSinkFunction<IN>
- Parameters:
value
- The input record.context
- Additional context about the input record.- Throws:
Exception
- This method may throw exceptions. Throwing an exception will cause the operation to fail and may trigger recovery.
-
close
public void close() throws Exception
Description copied from interface:RichFunction
Tear-down method for the user code. It is called after the last call to the main working methods (e.g. map or join). For functions that are part of an iteration, this method will be invoked after each iteration superstep.This method can be used for clean up work.
- Specified by:
close
in interfaceRichFunction
- Overrides:
close
in classAbstractRichFunction
- Throws:
Exception
- Implementations may forward exceptions, which are caught by the runtime. When the runtime catches an exception, it aborts the task and lets the fail-over logic decide whether to retry the task execution.
-
notifyCheckpointComplete
public void notifyCheckpointComplete(long checkpointId)
Description copied from interface:CheckpointListener
Notifies the listener that the checkpoint with the givencheckpointId
completed and was committed.These notifications are "best effort", meaning they can sometimes be skipped. To behave properly, implementers need to follow the "Checkpoint Subsuming Contract". Please see the
class-level JavaDocs
for details.Please note that checkpoints may generally overlap, so you cannot assume that the
notifyCheckpointComplete()
call is always for the latest prior checkpoint (or snapshot) that was taken on the function/operator implementing this interface. It might be for a checkpoint that was triggered earlier. Implementing the "Checkpoint Subsuming Contract" (see above) properly handles this situation correctly as well.Please note that throwing exceptions from this method will not cause the completed checkpoint to be revoked. Throwing exceptions will typically cause task/job failure and trigger recovery.
- Specified by:
notifyCheckpointComplete
in interfaceCheckpointListener
- Parameters:
checkpointId
- The ID of the checkpoint that has been completed.
-
notifyCheckpointAborted
public void notifyCheckpointAborted(long checkpointId)
Description copied from interface:CheckpointListener
This method is called as a notification once a distributed checkpoint has been aborted.Important: The fact that a checkpoint has been aborted does NOT mean that the data and artifacts produced between the previous checkpoint and the aborted checkpoint are to be discarded. The expected behavior is as if this checkpoint was never triggered in the first place, and the next successful checkpoint simply covers a longer time span. See the "Checkpoint Subsuming Contract" in the
class-level JavaDocs
for details.These notifications are "best effort", meaning they can sometimes be skipped.
This method is very rarely necessary to implement. The "best effort" guarantee, together with the fact that this method should not result in discarding any data (per the "Checkpoint Subsuming Contract") means it is mainly useful for earlier cleanups of auxiliary resources. One example is to pro-actively clear a local per-checkpoint state cache upon checkpoint failure.
- Specified by:
notifyCheckpointAborted
in interfaceCheckpointListener
- Parameters:
checkpointId
- The ID of the checkpoint that has been aborted.
-
setOperatorEventGateway
public void setOperatorEventGateway(OperatorEventGateway eventGateway)
-
serializeAccumulatorResult
@VisibleForTesting public static byte[] serializeAccumulatorResult(long offset, String version, long lastCheckpointedOffset, List<byte[]> buffer) throws IOException
- Throws:
IOException
-
deserializeAccumulatorResult
public static Tuple2<Long,CollectCoordinationResponse> deserializeAccumulatorResult(byte[] serializedAccResults) throws IOException
- Throws:
IOException
-
-