IN
- Input type for SinkFunction
.TXN
- Transaction to store all of the information required to handle a transaction.CONTEXT
- Context that will be shared across all invocations for the given TwoPhaseCommitSinkFunction
instance. Context is created once@PublicEvolving public abstract class TwoPhaseCommitSinkFunction<IN,TXN,CONTEXT> extends RichSinkFunction<IN> implements CheckpointedFunction, CheckpointListener
SinkFunction
that intend to implement exactly-once semantic.
It does that by implementing two phase commit algorithm on top of the CheckpointedFunction
and
CheckpointListener
. User should provide custom TXN
(transaction handle) and implement abstract
methods handling this transaction handle.Modifier and Type | Class and Description |
---|---|
static class |
TwoPhaseCommitSinkFunction.State<TXN,CONTEXT>
State POJO class coupling pendingTransaction, context and pendingCommitTransactions.
|
static class |
TwoPhaseCommitSinkFunction.StateSerializer<TXN,CONTEXT>
Custom
TypeSerializer for the sink state. |
static class |
TwoPhaseCommitSinkFunction.StateSerializerConfigSnapshot<TXN,CONTEXT>
TypeSerializerConfigSnapshot for sink state. |
static class |
TwoPhaseCommitSinkFunction.TransactionHolder<TXN>
Adds metadata (currently only the start time of the transaction) to the transaction object.
|
SinkFunction.Context<T>
Modifier and Type | Field and Description |
---|---|
protected LinkedHashMap<Long,TwoPhaseCommitSinkFunction.TransactionHolder<TXN>> |
pendingCommitTransactions |
protected ListState<TwoPhaseCommitSinkFunction.State<TXN,CONTEXT>> |
state |
protected Optional<CONTEXT> |
userContext |
Constructor and Description |
---|
TwoPhaseCommitSinkFunction(TypeSerializer<TXN> transactionSerializer,
TypeSerializer<CONTEXT> contextSerializer)
Use default
ListStateDescriptor for internal state serialization. |
Modifier and Type | Method and Description |
---|---|
protected abstract void |
abort(TXN transaction)
Abort a transaction.
|
protected abstract TXN |
beginTransaction()
Method that starts a new transaction.
|
void |
close()
Tear-down method for the user code.
|
protected abstract void |
commit(TXN transaction)
Commit a pre-committed transaction.
|
protected TXN |
currentTransaction() |
protected TwoPhaseCommitSinkFunction<IN,TXN,CONTEXT> |
enableTransactionTimeoutWarnings(double warningRatio)
Enables logging of warnings if a transaction's elapsed time reaches a specified ratio of
the
transactionTimeout . |
protected void |
finishRecoveringContext() |
protected Optional<CONTEXT> |
getUserContext() |
protected TwoPhaseCommitSinkFunction<IN,TXN,CONTEXT> |
ignoreFailuresAfterTransactionTimeout()
If called, the sink will only log but not propagate exceptions thrown in
recoverAndCommit(Object) if the transaction is older than a specified transaction
timeout. |
void |
initializeState(FunctionInitializationContext context)
This method is called when the parallel function instance is created during distributed
execution.
|
protected Optional<CONTEXT> |
initializeUserContext() |
void |
invoke(IN value)
This should not be implemented by subclasses.
|
void |
invoke(IN value,
SinkFunction.Context context)
Writes the given value to the sink.
|
protected abstract void |
invoke(TXN transaction,
IN value,
SinkFunction.Context context)
Write value within a transaction.
|
void |
notifyCheckpointComplete(long checkpointId)
This method is called as a notification once a distributed checkpoint has been completed.
|
protected java.util.stream.Stream<Map.Entry<Long,TXN>> |
pendingTransactions() |
protected abstract void |
preCommit(TXN transaction)
Pre commit previously created transaction.
|
protected void |
recoverAndAbort(TXN transaction)
Abort a transaction that was rejected by a coordinator after a failure.
|
protected void |
recoverAndCommit(TXN transaction)
Invoked on recovered transactions after a failure.
|
protected TwoPhaseCommitSinkFunction<IN,TXN,CONTEXT> |
setTransactionTimeout(long transactionTimeout)
Sets the transaction timeout.
|
void |
snapshotState(FunctionSnapshotContext context)
This method is called when a snapshot for a checkpoint is requested.
|
getIterationRuntimeContext, getRuntimeContext, open, setRuntimeContext
protected final LinkedHashMap<Long,TwoPhaseCommitSinkFunction.TransactionHolder<TXN>> pendingCommitTransactions
protected transient ListState<TwoPhaseCommitSinkFunction.State<TXN,CONTEXT>> state
public TwoPhaseCommitSinkFunction(TypeSerializer<TXN> transactionSerializer, TypeSerializer<CONTEXT> contextSerializer)
ListStateDescriptor
for internal state serialization. Helpful utilities for using this
constructor are TypeInformation.of(Class)
, TypeHint
and
TypeInformation.of(TypeHint)
. Example:
TwoPhaseCommitSinkFunction(TypeInformation.of(new TypeHint<State<TXN, CONTEXT>>() {}));
transactionSerializer
- TypeSerializer
for the transaction type of this sinkcontextSerializer
- TypeSerializer
for the context type of this sink@Nonnull protected java.util.stream.Stream<Map.Entry<Long,TXN>> pendingTransactions()
protected abstract void invoke(TXN transaction, IN value, SinkFunction.Context context) throws Exception
Exception
protected abstract TXN beginTransaction() throws Exception
Exception
protected abstract void preCommit(TXN transaction) throws Exception
Usually implementation involves flushing the data.
Exception
protected abstract void commit(TXN transaction)
recoverAndCommit(Object)
will be called again for the
same transaction.protected void recoverAndCommit(TXN transaction)
protected abstract void abort(TXN transaction)
protected void recoverAndAbort(TXN transaction)
protected void finishRecoveringContext()
public final void invoke(IN value) throws Exception
invoke
in interface SinkFunction<IN>
Exception
public final void invoke(IN value, SinkFunction.Context context) throws Exception
SinkFunction
You have to override this method when implementing a SinkFunction
, this is a
default
method for backward compatibility with the old-style method only.
invoke
in interface SinkFunction<IN>
value
- The input record.context
- Additional context about the input record.Exception
- This method may throw exceptions. Throwing an exception will cause the operation
to fail and may trigger recovery.public final void notifyCheckpointComplete(long checkpointId) throws Exception
CheckpointListener
notifyCheckpointComplete
in interface CheckpointListener
checkpointId
- The ID of the checkpoint that has been completed.Exception
public void snapshotState(FunctionSnapshotContext context) throws Exception
CheckpointedFunction
FunctionInitializationContext
when
the Function was initialized, or offered now by FunctionSnapshotContext
itself.snapshotState
in interface CheckpointedFunction
context
- the context for drawing a snapshot of the operatorException
public void initializeState(FunctionInitializationContext context) throws Exception
CheckpointedFunction
initializeState
in interface CheckpointedFunction
context
- the context for initializing the operatorException
public void close() throws Exception
RichFunction
This method can be used for clean up work.
close
in interface RichFunction
close
in class AbstractRichFunction
Exception
- Implementations may forward exceptions, which are caught by the runtime. When the
runtime catches an exception, it aborts the task and lets the fail-over logic
decide whether to retry the task execution.protected TwoPhaseCommitSinkFunction<IN,TXN,CONTEXT> setTransactionTimeout(long transactionTimeout)
transactionTimeout
- The transaction timeout in ms.ignoreFailuresAfterTransactionTimeout()
,
enableTransactionTimeoutWarnings(double)
protected TwoPhaseCommitSinkFunction<IN,TXN,CONTEXT> ignoreFailuresAfterTransactionTimeout()
recoverAndCommit(Object)
if the transaction is older than a specified transaction
timeout. The start time of an transaction is determined by System.currentTimeMillis()
.
By default, failures are propagated.protected TwoPhaseCommitSinkFunction<IN,TXN,CONTEXT> enableTransactionTimeoutWarnings(double warningRatio)
transactionTimeout
.
If warningRatio
is 0, a warning will be always logged when committing the
transaction.warningRatio
- A value in the range [0,1].Copyright © 2014–2019 The Apache Software Foundation. All rights reserved.