Flink’s Cassandra sink are created by using the static CassandraSink.addSink(DataStream input) method.
This method returns a CassandraSinkBuilder, which offers methods to further configure the sink, and finally `build()` the sink instance.
The following configuration methods can be used:
setQuery(String query)
Sets the upsert query that is executed for every record the sink receives.
The query is internally treated as CQL statement.
DO set the upsert query for processing Tuple data type.
DO NOT set the query for processing POJO data types.
setClusterBuilder()
Sets the cluster builder that is used to configure the connection to cassandra with more sophisticated settings such as consistency level, retry policy and etc.
setHost(String host[, int port])
Simple version of setClusterBuilder() with host/port information to connect to Cassandra instances
setMapperOptions(MapperOptions options)
Sets the mapper options that are used to configure the DataStax ObjectMapper.
Allows exactly-once processing for non-deterministic algorithms.
build()
Finalizes the configuration and constructs the CassandraSink instance.
Write-ahead Log
A checkpoint committer stores additional information about completed checkpoints
in some resource. This information is used to prevent a full replay of the last
completed checkpoint in case of a failure.
You can use a CassandraCommitter to store these in a separate table in cassandra.
Note that this table will NOT be cleaned up by Flink.
Flink can provide exactly-once guarantees if the query is idempotent (meaning it can be applied multiple
times without changing the result) and checkpointing is enabled. In case of a failure the failed
checkpoint will be replayed completely.
Furthermore, for non-deterministic programs the write-ahead log has to be enabled. For such a program
the replayed checkpoint may be completely different than the previous attempt, which may leave the
database in an inconsistent state since part of the first attempt may already be written.
The write-ahead log guarantees that the replayed checkpoint is identical to the first attempt.
Note that that enabling this feature will have an adverse impact on latency.
Note: The write-ahead log functionality is currently experimental. In many cases it is sufficient to use the connector without enabling it. Please report problems to the development mailing list.
Checkpointing and Fault Tolerance
With checkpointing enabled, Cassandra Sink guarantees at-least-once delivery of action requests to C* instance.
The Cassandra sinks currently support both Tuple and POJO data types, and Flink automatically detects which type of input is used. For general use case of those streaming data type, please refer to Supported Data Types. We show two implementations based on SocketWindowWordCount, for Pojo and Tuple data types respectively.
In all these examples, we assumed the associated Keyspace example and Table wordcount have been created.
Cassandra Sink Example for Streaming Tuple Data Type
While storing the result with Java/Scala Tuple data type to a Cassandra sink, it is required to set a CQL upsert statement (via setQuery(‘stmt’)) to persist each record back to the database. With the upsert query cached as PreparedStatement, each Tuple element is converted to parameters of the statement.
Cassandra Sink Example for Streaming POJO Data Type
An example of streaming a POJO data type and store the same POJO entity back to Cassandra. In addition, this POJO implementation needs to follow DataStax Java Driver Manual to annotate the class as each field of this entity is mapped to an associated column of the designated table using the DataStax Java Driver com.datastax.driver.mapping.Mapper class.
The mapping of each table column can be defined through annotations placed on a field declaration in the Pojo class. For details of the mapping, please refer to CQL documentation on Definition of Mapped Classes and CQL Data types