This connector provides sinks that can request document actions to an
Elasticsearch Index. To use this connector, add one
of the following dependencies to your project, depending on the version
of the Elasticsearch installation:
Maven Dependency
Supported since
Elasticsearch version
flink-connector-elasticsearch_2.11
1.0.0
1.x
flink-connector-elasticsearch2_2.11
1.0.0
2.x
flink-connector-elasticsearch5_2.11
1.3.0
5.x
flink-connector-elasticsearch6_2.11
1.6.0
6 and later versions
Note that the streaming connectors are currently not part of the binary
distribution. See here for information
about how to package the program with the libraries for cluster execution.
Installing Elasticsearch
Instructions for setting up an Elasticsearch cluster can be found
here.
Make sure to set and remember a cluster name. This must be set when
creating an ElasticsearchSink for requesting document actions against your cluster.
Elasticsearch Sink
The ElasticsearchSink uses a TransportClient (before 6.x) or RestHighLevelClient (starting with 6.x) to communicate with an
Elasticsearch cluster.
The example below shows how to configure and create a sink:
For Elasticsearch versions that still uses the now deprecated TransportClient to communicate
with the Elasticsearch cluster (i.e., versions equal or below 5.x), note how a Map of Strings
is used to configure the ElasticsearchSink. This config map will be directly
forwarded when creating the internally used TransportClient.
The configuration keys are documented in the Elasticsearch documentation
here.
Especially important is the cluster.name parameter that must correspond to
the name of your cluster.
For Elasticsearch 6.x and above, internally, the RestHighLevelClient is used for cluster communication.
By default, the connector uses the default configurations for the REST client. To have custom
configuration for the REST client, users can provide a RestClientFactory implementation when
setting up the ElasticsearchClient.Builder that builds the sink.
Also note that the example only demonstrates performing a single index
request for each incoming element. Generally, the ElasticsearchSinkFunction
can be used to perform multiple requests of different types (ex.,
DeleteRequest, UpdateRequest, etc.).
Internally, each parallel instance of the Flink Elasticsearch Sink uses
a BulkProcessor to send action requests to the cluster.
This will buffer elements before sending them in bulk to the cluster. The BulkProcessor
executes bulk requests one at a time, i.e. there will be no two concurrent
flushes of the buffered actions in progress.
Elasticsearch Sinks and Fault Tolerance
With Flink’s checkpointing enabled, the Flink Elasticsearch Sink guarantees
at-least-once delivery of action requests to Elasticsearch clusters. It does
so by waiting for all pending action requests in the BulkProcessor at the
time of checkpoints. This effectively assures that all requests before the
checkpoint was triggered have been successfully acknowledged by Elasticsearch, before
proceeding to process more records sent to the sink.
More details on checkpoints and fault tolerance are in the fault tolerance docs.
To use fault tolerant Elasticsearch Sinks, checkpointing of the topology needs to be enabled at the execution environment:
NOTE: Users can disable flushing if they wish to do so, by calling
disableFlushOnCheckpoint() on the created ElasticsearchSink. Be aware
that this essentially means the sink will not provide any strong
delivery guarantees anymore, even with checkpoint for the topology enabled.
Communication using Embedded Node (only for Elasticsearch 1.x)
For Elasticsearch versions 1.x, communication using an embedded node is
also supported. See here
for information about the differences between communicating with Elasticsearch
with an embedded node and a TransportClient.
Below is an example of how to create an ElasticsearchSink use an
embedded node instead of a TransportClient:
The difference is that now we do not need to provide a list of addresses
of Elasticsearch nodes.
Handling Failing Elasticsearch Requests
Elasticsearch action requests may fail due to a variety of reasons, including
temporarily saturated node queue capacity or malformed documents to be indexed.
The Flink Elasticsearch Sink allows the user to specify how request
failures are handled, by simply implementing an ActionRequestFailureHandler and
providing it to the constructor.
Below is an example:
The above example will let the sink re-add requests that failed due to
queue capacity saturation and drop requests with malformed documents, without
failing the sink. For all other failures, the sink will fail. If a ActionRequestFailureHandler
is not provided to the constructor, the sink will fail for any kind of error.
Note that onFailure is called for failures that still occur only after the
BulkProcessor internally finishes all backoff retry attempts.
By default, the BulkProcessor retries to a maximum of 8 attempts with
an exponential backoff. For more information on the behaviour of the
internal BulkProcessor and how to configure it, please see the following section.
By default, if a failure handler is not provided, the sink uses a
NoOpFailureHandler that simply fails for all kinds of exceptions. The
connector also provides a RetryRejectedExecutionFailureHandler implementation
that always re-add requests that have failed due to queue capacity saturation.
IMPORTANT: Re-adding requests back to the internal BulkProcessor
on failures will lead to longer checkpoints, as the sink will also
need to wait for the re-added requests to be flushed when checkpointing.
For example, when using RetryRejectedExecutionFailureHandler, checkpoints
will need to wait until Elasticsearch node queues have enough capacity for
all the pending requests. This also means that if re-added requests never
succeed, the checkpoint will never finish.
Failure handling for Elasticsearch 1.x: For Elasticsearch 1.x, it
is not feasible to match the type of the failure because the exact type
could not be retrieved through the older version Java client APIs (thus,
the types will be general Exceptions and only differ in the
failure message). In this case, it is recommended to match on the
provided REST status code.
Configuring the Internal Bulk Processor
The internal BulkProcessor can be further configured for its behaviour
on how buffered action requests are flushed, by setting the following values in
the provided Map<String, String>:
bulk.flush.max.actions: Maximum amount of actions to buffer before flushing.
bulk.flush.max.size.mb: Maximum size of data (in megabytes) to buffer before flushing.
bulk.flush.interval.ms: Interval at which to flush regardless of the amount or size of buffered actions.
For versions 2.x and above, configuring how temporary request errors are
retried is also supported:
bulk.flush.backoff.enable: Whether or not to perform retries with backoff delay for a flush
if one or more of its actions failed due to a temporary EsRejectedExecutionException.
bulk.flush.backoff.type: The type of backoff delay, either CONSTANT or EXPONENTIAL
bulk.flush.backoff.delay: The amount of delay for backoff. For constant backoff, this
is simply the delay between each retry. For exponential backoff, this is the initial base delay.
bulk.flush.backoff.retries: The amount of backoff retries to attempt.
More information about Elasticsearch can be found here.
Packaging the Elasticsearch Connector into an Uber-Jar
For the execution of your Flink program, it is recommended to build a
so-called uber-jar (executable jar) containing all your dependencies
(see here for further information).
Alternatively, you can put the connector’s jar file into Flink’s lib/ folder to make it available
system-wide, i.e. for all job being run.