IN
- Input type for SinkFunction
.TXN
- Transaction to store all of the information required to handle a transaction.CONTEXT
- Context that will be shared across all invocations for the given TwoPhaseCommitSinkFunction
instance. Context is created once@PublicEvolving public abstract class TwoPhaseCommitSinkFunction<IN,TXN,CONTEXT> extends RichSinkFunction<IN> implements CheckpointedFunction, CheckpointListener
SinkFunction
that intend to implement
exactly-once semantic. It does that by implementing two phase commit algorithm on top of the
CheckpointedFunction
and CheckpointListener
. User should provide custom TXN
(transaction handle) and implement abstract methods handling this transaction handle.Modifier and Type | Class and Description |
---|---|
static class |
TwoPhaseCommitSinkFunction.State<TXN,CONTEXT>
State POJO class coupling pendingTransaction, context and pendingCommitTransactions.
|
static class |
TwoPhaseCommitSinkFunction.StateSerializer<TXN,CONTEXT>
Custom
TypeSerializer for the sink state. |
static class |
TwoPhaseCommitSinkFunction.StateSerializerConfigSnapshot<TXN,CONTEXT>
Deprecated.
this snapshot class is no longer in use, and is maintained only for backwards
compatibility purposes. It is fully replaced by
TwoPhaseCommitSinkFunction.StateSerializerSnapshot . |
static class |
TwoPhaseCommitSinkFunction.StateSerializerSnapshot<TXN,CONTEXT>
Snapshot for the
TwoPhaseCommitSinkFunction.StateSerializer . |
static class |
TwoPhaseCommitSinkFunction.TransactionHolder<TXN>
Adds metadata (currently only the start time of the transaction) to the transaction object.
|
SinkFunction.Context
Modifier and Type | Field and Description |
---|---|
protected LinkedHashMap<Long,TwoPhaseCommitSinkFunction.TransactionHolder<TXN>> |
pendingCommitTransactions |
protected ListState<TwoPhaseCommitSinkFunction.State<TXN,CONTEXT>> |
state |
protected Optional<CONTEXT> |
userContext |
Constructor and Description |
---|
TwoPhaseCommitSinkFunction(TypeSerializer<TXN> transactionSerializer,
TypeSerializer<CONTEXT> contextSerializer)
Use default
ListStateDescriptor for internal state serialization. |
Modifier and Type | Method and Description |
---|---|
protected abstract void |
abort(TXN transaction)
Abort a transaction.
|
protected abstract TXN |
beginTransaction()
Method that starts a new transaction.
|
void |
close()
Tear-down method for the user code.
|
protected abstract void |
commit(TXN transaction)
Commit a pre-committed transaction.
|
protected TXN |
currentTransaction() |
protected TwoPhaseCommitSinkFunction<IN,TXN,CONTEXT> |
enableTransactionTimeoutWarnings(double warningRatio)
Enables logging of warnings if a transaction's elapsed time reaches a specified ratio of the
transactionTimeout . |
protected void |
finishRecoveringContext(Collection<TXN> handledTransactions)
Callback for subclasses which is called after restoring (each) user context.
|
protected Optional<CONTEXT> |
getUserContext() |
protected TwoPhaseCommitSinkFunction<IN,TXN,CONTEXT> |
ignoreFailuresAfterTransactionTimeout()
If called, the sink will only log but not propagate exceptions thrown in
recoverAndCommit(Object) if the transaction is older than a specified transaction timeout. |
void |
initializeState(FunctionInitializationContext context)
This method is called when the parallel function instance is created during distributed
execution.
|
protected Optional<CONTEXT> |
initializeUserContext() |
void |
invoke(IN value)
This should not be implemented by subclasses.
|
void |
invoke(IN value,
SinkFunction.Context context)
Writes the given value to the sink.
|
protected abstract void |
invoke(TXN transaction,
IN value,
SinkFunction.Context context)
Write value within a transaction.
|
void |
notifyCheckpointAborted(long checkpointId)
This method is called as a notification once a distributed checkpoint has been aborted.
|
void |
notifyCheckpointComplete(long checkpointId)
Notifies the listener that the checkpoint with the given
checkpointId completed and
was committed. |
protected java.util.stream.Stream<Map.Entry<Long,TXN>> |
pendingTransactions() |
protected abstract void |
preCommit(TXN transaction)
Pre commit previously created transaction.
|
protected void |
recoverAndAbort(TXN transaction)
Abort a transaction that was rejected by a coordinator after a failure.
|
protected void |
recoverAndCommit(TXN transaction)
Invoked on recovered transactions after a failure.
|
protected TwoPhaseCommitSinkFunction<IN,TXN,CONTEXT> |
setTransactionTimeout(long transactionTimeout)
Sets the transaction timeout.
|
void |
snapshotState(FunctionSnapshotContext context)
This method is called when a snapshot for a checkpoint is requested.
|
getIterationRuntimeContext, getRuntimeContext, open, setRuntimeContext
protected final LinkedHashMap<Long,TwoPhaseCommitSinkFunction.TransactionHolder<TXN>> pendingCommitTransactions
protected transient ListState<TwoPhaseCommitSinkFunction.State<TXN,CONTEXT>> state
public TwoPhaseCommitSinkFunction(TypeSerializer<TXN> transactionSerializer, TypeSerializer<CONTEXT> contextSerializer)
ListStateDescriptor
for internal state serialization. Helpful utilities
for using this constructor are TypeInformation.of(Class)
, TypeHint
and TypeInformation.of(TypeHint)
.
Example:
TwoPhaseCommitSinkFunction(TypeInformation.of(new TypeHint<State<TXN, CONTEXT>>() {}));
transactionSerializer
- TypeSerializer
for the transaction type of this sinkcontextSerializer
- TypeSerializer
for the context type of this sink@Nonnull protected java.util.stream.Stream<Map.Entry<Long,TXN>> pendingTransactions()
protected abstract void invoke(TXN transaction, IN value, SinkFunction.Context context) throws Exception
Exception
protected abstract TXN beginTransaction() throws Exception
Exception
protected abstract void preCommit(TXN transaction) throws Exception
Usually implementation involves flushing the data.
Exception
protected abstract void commit(TXN transaction)
recoverAndCommit(Object)
will be called again for the
same transaction.protected void recoverAndCommit(TXN transaction)
protected abstract void abort(TXN transaction)
protected void recoverAndAbort(TXN transaction)
protected void finishRecoveringContext(Collection<TXN> handledTransactions)
handledTransactions
- transactions which were already committed or aborted and do not
need further handlingpublic final void invoke(IN value) throws Exception
invoke
in interface SinkFunction<IN>
Exception
public final void invoke(IN value, SinkFunction.Context context) throws Exception
SinkFunction
You have to override this method when implementing a SinkFunction
, this is a
default
method for backward compatibility with the old-style method only.
invoke
in interface SinkFunction<IN>
value
- The input record.context
- Additional context about the input record.Exception
- This method may throw exceptions. Throwing an exception will cause the
operation to fail and may trigger recovery.public final void notifyCheckpointComplete(long checkpointId) throws Exception
CheckpointListener
checkpointId
completed and
was committed.
These notifications are "best effort", meaning they can sometimes be skipped. To behave
properly, implementers need to follow the "Checkpoint Subsuming Contract". Please see the
class-level JavaDocs
for details.
Please note that checkpoints may generally overlap, so you cannot assume that the notifyCheckpointComplete()
call is always for the latest prior checkpoint (or snapshot) that
was taken on the function/operator implementing this interface. It might be for a checkpoint
that was triggered earlier. Implementing the "Checkpoint Subsuming Contract" (see above)
properly handles this situation correctly as well.
Please note that throwing exceptions from this method will not cause the completed checkpoint to be revoked. Throwing exceptions will typically cause task/job failure and trigger recovery.
notifyCheckpointComplete
in interface CheckpointListener
checkpointId
- The ID of the checkpoint that has been completed.Exception
- This method can propagate exceptions, which leads to a failure/recovery for
the task. Not that this will NOT lead to the checkpoint being revoked.public void notifyCheckpointAborted(long checkpointId)
CheckpointListener
Important: The fact that a checkpoint has been aborted does NOT mean that the data
and artifacts produced between the previous checkpoint and the aborted checkpoint are to be
discarded. The expected behavior is as if this checkpoint was never triggered in the first
place, and the next successful checkpoint simply covers a longer time span. See the
"Checkpoint Subsuming Contract" in the class-level JavaDocs
for
details.
These notifications are "best effort", meaning they can sometimes be skipped.
This method is very rarely necessary to implement. The "best effort" guarantee, together with the fact that this method should not result in discarding any data (per the "Checkpoint Subsuming Contract") means it is mainly useful for earlier cleanups of auxiliary resources. One example is to pro-actively clear a local per-checkpoint state cache upon checkpoint failure.
notifyCheckpointAborted
in interface CheckpointListener
checkpointId
- The ID of the checkpoint that has been aborted.public void snapshotState(FunctionSnapshotContext context) throws Exception
CheckpointedFunction
FunctionInitializationContext
when the Function was initialized, or offered now by FunctionSnapshotContext
itself.snapshotState
in interface CheckpointedFunction
context
- the context for drawing a snapshot of the operatorException
- Thrown, if state could not be created ot restored.public void initializeState(FunctionInitializationContext context) throws Exception
CheckpointedFunction
initializeState
in interface CheckpointedFunction
context
- the context for initializing the operatorException
- Thrown, if state could not be created ot restored.public void close() throws Exception
RichFunction
This method can be used for clean up work.
close
in interface RichFunction
close
in class AbstractRichFunction
Exception
- Implementations may forward exceptions, which are caught by the runtime.
When the runtime catches an exception, it aborts the task and lets the fail-over logic
decide whether to retry the task execution.protected TwoPhaseCommitSinkFunction<IN,TXN,CONTEXT> setTransactionTimeout(long transactionTimeout)
transactionTimeout
- The transaction timeout in ms.ignoreFailuresAfterTransactionTimeout()
,
enableTransactionTimeoutWarnings(double)
protected TwoPhaseCommitSinkFunction<IN,TXN,CONTEXT> ignoreFailuresAfterTransactionTimeout()
recoverAndCommit(Object)
if the transaction is older than a specified transaction timeout.
The start time of an transaction is determined by System.currentTimeMillis()
. By
default, failures are propagated.protected TwoPhaseCommitSinkFunction<IN,TXN,CONTEXT> enableTransactionTimeoutWarnings(double warningRatio)
transactionTimeout
. If warningRatio
is 0, a warning will be always
logged when committing the transaction.warningRatio
- A value in the range [0,1].Copyright © 2014–2021 The Apache Software Foundation. All rights reserved.