Class CollectSinkFunction<IN>

  • Type Parameters:
    IN - type of results to be written into the sink.
    All Implemented Interfaces:
    Serializable, Function, RichFunction, CheckpointListener, CheckpointedFunction, SinkFunction<IN>

    @Internal
    public class CollectSinkFunction<IN>
    extends RichSinkFunction<IN>
    implements CheckpointedFunction, CheckpointListener
    A sink function that collects query results and sends them back to the client.

    This sink works by limiting the number of results buffered in it (can be configured) so that when the buffer is full, it back-pressures the job until the client consumes some results.

    NOTE: When using this sink, make sure that its parallelism is 1, and make sure that it is used in a StreamTask.

    Communication Protocol Explanation

    We maintain the following variables in this communication protocol

    1. version: This variable will be set to a random value when the sink opens. Client discovers that the sink has restarted if this variable is different.
    2. offset: This indicates that client has successfully received the results before this offset. Sink can safely throw these results away.
    3. lastCheckpointedOffset: This is the value of offset when the checkpoint happens. This value will be restored from the checkpoint and set back to offset when the sink restarts. Clients who need exactly-once semantics need to rely on this value for the position to revert when a failover happens.

    Client will put version and offset into the request, indicating that it thinks what the current version is and it has received this much results.

    Sink will check the validity of the request. If version mismatches or offset is smaller than expected, sink will send back the current version and lastCheckpointedOffset with an empty result list.

    If the request is valid, sink prepares some results starting from offset and sends them back to the client with lastCheckpointedOffset. If there is currently no results starting from offset, sink will not wait but will instead send back an empty result list.

    For client who wants exactly-once semantics, when receiving the response, the client will check for the following conditions:

    1. If the version mismatches, client knows that sink has restarted. It will throw away all uncheckpointed results after lastCheckpointedOffset.
    2. If lastCheckpointedOffset increases, client knows that a checkpoint happens. It can now move all results before this offset to a user-visible buffer.
    3. If the response also contains new results, client will now move these new results into uncheckpointed buffer.

    Note that

    1. user can only see results before a lastCheckpointedOffset, and
    2. client will go back to the latest lastCheckpointedOffset when sink restarts,

    client will never throw away results in user-visible buffer. So this communication protocol achieves exactly-once semantics.

    In order not to block job finishing/cancelling, if there are still results in sink's buffer when job terminates, these results will be sent back to client through accumulators.

    See Also:
    Serialized Form
    • Constructor Detail

      • CollectSinkFunction

        public CollectSinkFunction​(TypeSerializer<IN> serializer,
                                   long maxBytesPerBatch,
                                   String accumulatorName)
    • Method Detail

      • initializeState

        public void initializeState​(FunctionInitializationContext context)
                             throws Exception
        Description copied from interface: CheckpointedFunction
        This method is called when the parallel function instance is created during distributed execution. Functions typically set up their state storing data structures in this method.
        Specified by:
        initializeState in interface CheckpointedFunction
        Parameters:
        context - the context for initializing the operator
        Throws:
        Exception - Thrown, if state could not be created ot restored.
      • open

        public void open​(OpenContext openContext)
                  throws Exception
        Description copied from interface: RichFunction
        Initialization method for the function. It is called before the actual working methods (like map or join) and thus suitable for one time setup work. For functions that are part of an iteration, this method will be invoked at the beginning of each iteration superstep.

        The openContext object passed to the function can be used for configuration and initialization. The openContext contains some necessary information that were configured on the function in the program composition.

        
         public class MyFilter extends RichFilterFunction<String> {
        
             private String searchString;
        
             public void open(OpenContext openContext) {
                 // initialize the value of searchString
             }
        
             public boolean filter(String value) {
                 return value.equals(searchString);
             }
         }
         
        Specified by:
        open in interface RichFunction
        Overrides:
        open in class AbstractRichFunction
        Parameters:
        openContext - The context containing information about the context in which the function is opened.
        Throws:
        Exception - Implementations may forward exceptions, which are caught by the runtime. When the runtime catches an exception, it aborts the task and lets the fail-over logic decide whether to retry the task execution.
      • invoke

        public void invoke​(IN value,
                           SinkFunction.Context context)
                    throws Exception
        Description copied from interface: SinkFunction
        Writes the given value to the sink. This function is called for every record.

        You have to override this method when implementing a SinkFunction, this is a default method for backward compatibility with the old-style method only.

        Specified by:
        invoke in interface SinkFunction<IN>
        Parameters:
        value - The input record.
        context - Additional context about the input record.
        Throws:
        Exception - This method may throw exceptions. Throwing an exception will cause the operation to fail and may trigger recovery.
      • close

        public void close()
                   throws Exception
        Description copied from interface: RichFunction
        Tear-down method for the user code. It is called after the last call to the main working methods (e.g. map or join). For functions that are part of an iteration, this method will be invoked after each iteration superstep.

        This method can be used for clean up work.

        Specified by:
        close in interface RichFunction
        Overrides:
        close in class AbstractRichFunction
        Throws:
        Exception - Implementations may forward exceptions, which are caught by the runtime. When the runtime catches an exception, it aborts the task and lets the fail-over logic decide whether to retry the task execution.
      • accumulateFinalResults

        public void accumulateFinalResults()
                                    throws Exception
        Throws:
        Exception
      • notifyCheckpointComplete

        public void notifyCheckpointComplete​(long checkpointId)
        Description copied from interface: CheckpointListener
        Notifies the listener that the checkpoint with the given checkpointId completed and was committed.

        These notifications are "best effort", meaning they can sometimes be skipped. To behave properly, implementers need to follow the "Checkpoint Subsuming Contract". Please see the class-level JavaDocs for details.

        Please note that checkpoints may generally overlap, so you cannot assume that the notifyCheckpointComplete() call is always for the latest prior checkpoint (or snapshot) that was taken on the function/operator implementing this interface. It might be for a checkpoint that was triggered earlier. Implementing the "Checkpoint Subsuming Contract" (see above) properly handles this situation correctly as well.

        Please note that throwing exceptions from this method will not cause the completed checkpoint to be revoked. Throwing exceptions will typically cause task/job failure and trigger recovery.

        Specified by:
        notifyCheckpointComplete in interface CheckpointListener
        Parameters:
        checkpointId - The ID of the checkpoint that has been completed.
      • notifyCheckpointAborted

        public void notifyCheckpointAborted​(long checkpointId)
        Description copied from interface: CheckpointListener
        This method is called as a notification once a distributed checkpoint has been aborted.

        Important: The fact that a checkpoint has been aborted does NOT mean that the data and artifacts produced between the previous checkpoint and the aborted checkpoint are to be discarded. The expected behavior is as if this checkpoint was never triggered in the first place, and the next successful checkpoint simply covers a longer time span. See the "Checkpoint Subsuming Contract" in the class-level JavaDocs for details.

        These notifications are "best effort", meaning they can sometimes be skipped.

        This method is very rarely necessary to implement. The "best effort" guarantee, together with the fact that this method should not result in discarding any data (per the "Checkpoint Subsuming Contract") means it is mainly useful for earlier cleanups of auxiliary resources. One example is to pro-actively clear a local per-checkpoint state cache upon checkpoint failure.

        Specified by:
        notifyCheckpointAborted in interface CheckpointListener
        Parameters:
        checkpointId - The ID of the checkpoint that has been aborted.
      • setOperatorEventGateway

        public void setOperatorEventGateway​(OperatorEventGateway eventGateway)