@Internal public class JdbcXaSinkFunction<T> extends AbstractRichFunction implements CheckpointedFunction, CheckpointListener, SinkFunction<T>, AutoCloseable, InputTypeConfigurable
Each parallel subtask has it's own transactions, independent from other subtasks. Therefore, consistency is only guaranteed within partitions.
XA uses a two-phase commit protocol, which solves the consistency problem, but leaves the following issues:
The following table summarizes effects of failures during transaction state transitions and ways to mitigate them:
Transition | Methods | What happens if transition lost | Ways to mitigate |
---|---|---|---|
none > started, started > ended | open(), snapshotState() | Database eventually discards these transactions |
|
ended > prepared | snapshotState() | Database keeps these transactions prepared forever ("in-doubt" state) |
|
prepared > committed | open(), notifyCheckpointComplete() |
Upon job recovery state contains committed transactions; or JM may notifyCheckpointComplete again after recovery.
Committing results in |
Distinguish between transactions created during this run and restored from state and ignore XAER_NOTA for the latter.
|
SinkFunction.Context
Constructor and Description |
---|
JdbcXaSinkFunction(JdbcOutputFormat<T,T,JdbcBatchStatementExecutor<T>> outputFormat,
XaFacade xaFacade,
XidGenerator xidGenerator,
org.apache.flink.connector.jdbc.xa.XaSinkStateHandler stateHandler,
JdbcExactlyOnceOptions options,
org.apache.flink.connector.jdbc.xa.XaGroupOps xaGroupOps)
Creates a
JdbcXaSinkFunction . |
JdbcXaSinkFunction(String sql,
JdbcStatementBuilder<T> statementBuilder,
XaFacade xaFacade,
JdbcExecutionOptions executionOptions,
JdbcExactlyOnceOptions options)
Creates a
JdbcXaSinkFunction . |
Modifier and Type | Method and Description |
---|---|
void |
close()
Tear-down method for the user code.
|
void |
initializeState(FunctionInitializationContext context)
This method is called when the parallel function instance is created during distributed
execution.
|
void |
invoke(T value,
SinkFunction.Context context)
Writes the given value to the sink.
|
void |
notifyCheckpointComplete(long checkpointId)
Notifies the listener that the checkpoint with the given
checkpointId completed and
was committed. |
void |
open(Configuration configuration)
Initialization method for the function.
|
void |
setInputType(TypeInformation<?> type,
ExecutionConfig executionConfig)
Method that is called on an
OutputFormat when it is
passed to the DataSet's output method. |
void |
snapshotState(FunctionSnapshotContext context)
This method is called when a snapshot for a checkpoint is requested.
|
getIterationRuntimeContext, getRuntimeContext, setRuntimeContext
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
notifyCheckpointAborted
finish, invoke, writeWatermark
public JdbcXaSinkFunction(String sql, JdbcStatementBuilder<T> statementBuilder, XaFacade xaFacade, JdbcExecutionOptions executionOptions, JdbcExactlyOnceOptions options)
JdbcXaSinkFunction
.
All parameters must be serializable
.
Note: JdbcExecutionOptions
maxRetries setting must be strictly set to 0 for this
sink to work properly and not to produce duplicates. See issue FLINK-22311 for details.
xaFacade
- XaFacade
to manage XA transactionspublic JdbcXaSinkFunction(JdbcOutputFormat<T,T,JdbcBatchStatementExecutor<T>> outputFormat, XaFacade xaFacade, XidGenerator xidGenerator, org.apache.flink.connector.jdbc.xa.XaSinkStateHandler stateHandler, JdbcExactlyOnceOptions options, org.apache.flink.connector.jdbc.xa.XaGroupOps xaGroupOps)
JdbcXaSinkFunction
.
All parameters must be serializable
.
outputFormat
- JdbcOutputFormat
to write records withxaFacade
- XaFacade
to manage XA transactionsxidGenerator
- XidGenerator
to generate new transaction idspublic void initializeState(FunctionInitializationContext context) throws Exception
CheckpointedFunction
initializeState
in interface CheckpointedFunction
context
- the context for initializing the operatorException
- Thrown, if state could not be created ot restored.public void open(Configuration configuration) throws Exception
RichFunction
The configuration object passed to the function can be used for configuration and initialization. The configuration contains all parameters that were configured on the function in the program composition.
public class MyFilter extends RichFilterFunction<String> {
private String searchString;
public void open(Configuration parameters) {
this.searchString = parameters.getString("foo");
}
public boolean filter(String value) {
return value.equals(searchString);
}
}
By default, this method does nothing.
open
in interface RichFunction
open
in class AbstractRichFunction
configuration
- The configuration containing the parameters attached to the contract.Exception
- Implementations may forward exceptions, which are caught by the runtime.
When the runtime catches an exception, it aborts the task and lets the fail-over logic
decide whether to retry the task execution.Configuration
public void snapshotState(FunctionSnapshotContext context) throws Exception
CheckpointedFunction
FunctionInitializationContext
when the Function was initialized, or offered now by FunctionSnapshotContext
itself.snapshotState
in interface CheckpointedFunction
context
- the context for drawing a snapshot of the operatorException
- Thrown, if state could not be created ot restored.public void notifyCheckpointComplete(long checkpointId)
CheckpointListener
checkpointId
completed and
was committed.
These notifications are "best effort", meaning they can sometimes be skipped. To behave
properly, implementers need to follow the "Checkpoint Subsuming Contract". Please see the
class-level JavaDocs
for details.
Please note that checkpoints may generally overlap, so you cannot assume that the notifyCheckpointComplete()
call is always for the latest prior checkpoint (or snapshot) that
was taken on the function/operator implementing this interface. It might be for a checkpoint
that was triggered earlier. Implementing the "Checkpoint Subsuming Contract" (see above)
properly handles this situation correctly as well.
Please note that throwing exceptions from this method will not cause the completed checkpoint to be revoked. Throwing exceptions will typically cause task/job failure and trigger recovery.
notifyCheckpointComplete
in interface CheckpointListener
checkpointId
- The ID of the checkpoint that has been completed.public void invoke(T value, SinkFunction.Context context) throws IOException
SinkFunction
You have to override this method when implementing a SinkFunction
, this is a
default
method for backward compatibility with the old-style method only.
invoke
in interface SinkFunction<T>
value
- The input record.context
- Additional context about the input record.IOException
public void close() throws Exception
RichFunction
This method can be used for clean up work.
close
in interface AutoCloseable
close
in interface RichFunction
close
in class AbstractRichFunction
Exception
- Implementations may forward exceptions, which are caught by the runtime.
When the runtime catches an exception, it aborts the task and lets the fail-over logic
decide whether to retry the task execution.public void setInputType(TypeInformation<?> type, ExecutionConfig executionConfig)
InputTypeConfigurable
OutputFormat
when it is
passed to the DataSet's output method. May be used to configures the output format based on
the data type.setInputType
in interface InputTypeConfigurable
type
- The data type of the input.executionConfig
- The execution config for this parallel execution.Copyright © 2014–2023 The Apache Software Foundation. All rights reserved.