Source code for pyflink.datastream.connectors.elasticsearch

################################################################################
#  Licensed to the Apache Software Foundation (ASF) under one
#  or more contributor license agreements.  See the NOTICE file
#  distributed with this work for additional information
#  regarding copyright ownership.  The ASF licenses this file
#  to you under the Apache License, Version 2.0 (the
#  "License"); you may not use this file except in compliance
#  with the License.  You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
#  Unless required by applicable law or agreed to in writing, software
#  distributed under the License is distributed on an "AS IS" BASIS,
#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#  See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
import abc
from enum import Enum
from typing import List, Union

from pyflink.datastream.connectors import Sink, DeliveryGuarantee
from pyflink.java_gateway import get_gateway
from pyflink.util.java_utils import to_jarray


__all__ = ['FlushBackoffType',
           'ElasticsearchEmitter',
           'Elasticsearch6SinkBuilder',
           'Elasticsearch7SinkBuilder',
           'ElasticsearchSink']


[docs]class FlushBackoffType(Enum): """ Used to control whether the sink should retry failed requests at all or with which kind back off strategy. :data: `CONSTANT`: After every failure, it waits a configured time until the retries are exhausted. :data: `EXPONENTIAL`: After every failure, it waits initially the configured time and increases the waiting time exponentially until the retries are exhausted. :data: `NONE`: The failure is not retried. """ CONSTANT = 0, EXPONENTIAL = 1, NONE = 2, def _to_j_flush_backoff_type(self): JFlushBackoffType = get_gateway().jvm \ .org.apache.flink.connector.elasticsearch.sink.FlushBackoffType return getattr(JFlushBackoffType, self.name)
[docs]class ElasticsearchEmitter(object): """ Emitter which is used by sinks to prepare elements for sending them to Elasticsearch. """ def __init__(self, j_emitter): self._j_emitter = j_emitter @staticmethod def static_index(index: str, key_field: str = None, doc_type: str = None) \ -> 'ElasticsearchEmitter': """ Creates an emitter with static index which is invoked on every record to convert it to Elasticsearch actions. """ JMapElasticsearchEmitter = get_gateway().jvm \ .org.apache.flink.connector.elasticsearch.sink.MapElasticsearchEmitter j_emitter = JMapElasticsearchEmitter(index, doc_type, key_field, False) return ElasticsearchEmitter(j_emitter) @staticmethod def dynamic_index(index_field: str, key_field: str = None, doc_type: str = None) \ -> 'ElasticsearchEmitter': """ Creates an emitter with dynamic index which is invoked on every record to convert it to Elasticsearch actions. """ JMapElasticsearchEmitter = get_gateway().jvm \ .org.apache.flink.connector.elasticsearch.sink.MapElasticsearchEmitter j_emitter = JMapElasticsearchEmitter(index_field, doc_type, key_field, True) return ElasticsearchEmitter(j_emitter)
class ElasticsearchSinkBuilderBase(abc.ABC): """ Base builder to construct a ElasticsearchSink. """ @abc.abstractmethod def __init__(self): self._j_elasticsearch_sink_builder = None @abc.abstractmethod def get_http_host_class(self): """ Gets the org.apache.http.HttpHost class which path is different in different Elasticsearch version. """ pass def set_emitter(self, emitter: ElasticsearchEmitter) -> 'ElasticsearchSinkBuilderBase': """ Sets the emitter which is invoked on every record to convert it to Elasticsearch actions. :param emitter: The emitter to process records into Elasticsearch actions. """ self._j_elasticsearch_sink_builder.setEmitter(emitter._j_emitter) return self def set_hosts(self, hosts: Union[str, List[str]]) -> 'ElasticsearchSinkBuilderBase': """ Sets the hosts where the Elasticsearch cluster nodes are reachable. """ if not isinstance(hosts, list): hosts = [hosts] JHttpHost = self.get_http_host_class() j_http_hosts_list = [JHttpHost.create(x) for x in hosts] j_http_hosts_array = to_jarray(JHttpHost, j_http_hosts_list) self._j_elasticsearch_sink_builder.setHosts(j_http_hosts_array) return self def set_delivery_guarantee(self, delivery_guarantee: DeliveryGuarantee) \ -> 'ElasticsearchSinkBuilderBase': """ Sets the wanted DeliveryGuarantee. The default delivery guarantee is DeliveryGuarantee#NONE """ j_delivery_guarantee = delivery_guarantee._to_j_delivery_guarantee() self._j_elasticsearch_sink_builder.setDeliveryGuarantee(j_delivery_guarantee) return self def set_bulk_flush_max_actions(self, num_max_actions: int) -> 'ElasticsearchSinkBuilderBase': """ Sets the maximum number of actions to buffer for each bulk request. You can pass -1 to disable it. The default flush size 1000. """ self._j_elasticsearch_sink_builder.setBulkFlushMaxActions(num_max_actions) return self def set_bulk_flush_max_size_mb(self, max_size_mb: int) -> 'ElasticsearchSinkBuilderBase': """ Sets the maximum size of buffered actions, in mb, per bulk request. You can pass -1 to disable it. """ self._j_elasticsearch_sink_builder.setBulkFlushMaxSizeMb(max_size_mb) return self def set_bulk_flush_interval(self, interval_millis: int) -> 'ElasticsearchSinkBuilderBase': """ Sets the bulk flush interval, in milliseconds. You can pass -1 to disable it. """ self._j_elasticsearch_sink_builder.setBulkFlushInterval(interval_millis) return self def set_bulk_flush_backoff_strategy(self, flush_backoff_type: FlushBackoffType, max_retries: int, delay_millis: int) -> 'ElasticsearchSinkBuilderBase': """ Sets the type of back off to use when flushing bulk requests. The default bulk flush back off type is FlushBackoffType#NONE. Sets the amount of delay between each backoff attempt when flushing bulk requests, in milliseconds. Sets the maximum number of retries for a backoff attempt when flushing bulk requests. """ self._j_elasticsearch_sink_builder.setBulkFlushBackoffStrategy( flush_backoff_type._to_j_flush_backoff_type(), max_retries, delay_millis) return self def set_connection_username(self, username: str) -> 'ElasticsearchSinkBuilderBase': """ Sets the username used to authenticate the connection with the Elasticsearch cluster. """ self._j_elasticsearch_sink_builder.setConnectionUsername(username) return self def set_connection_password(self, password: str) -> 'ElasticsearchSinkBuilderBase': """ Sets the password used to authenticate the connection with the Elasticsearch cluster. """ self._j_elasticsearch_sink_builder.setConnectionPassword(password) return self def set_connection_path_prefix(self, prefix: str) -> 'ElasticsearchSinkBuilderBase': """ Sets a prefix which used for every REST communication to the Elasticsearch cluster. """ self._j_elasticsearch_sink_builder.setConnectionPathPrefix(prefix) return self def set_connection_request_timeout(self, timeout: int) -> 'ElasticsearchSinkBuilderBase': """ Sets the timeout for requesting the connection of the Elasticsearch cluster from the connection manager. """ self._j_elasticsearch_sink_builder.setConnectionRequestTimeout(timeout) return self def set_connection_timeout(self, timeout: int) -> 'ElasticsearchSinkBuilderBase': """ Sets the timeout for establishing a connection of the Elasticsearch cluster. """ self._j_elasticsearch_sink_builder.setConnectionTimeout(timeout) return self def set_socket_timeout(self, timeout: int) -> 'ElasticsearchSinkBuilderBase': """ Sets the timeout for waiting for data or, put differently, a maximum period inactivity between two consecutive data packets. """ self._j_elasticsearch_sink_builder.setSocketTimeout(timeout) return self def build(self) -> 'ElasticsearchSink': """ Constructs the ElasticsearchSink with the properties configured this builder. """ return ElasticsearchSink(self._j_elasticsearch_sink_builder.build())
[docs]class Elasticsearch6SinkBuilder(ElasticsearchSinkBuilderBase): """ Builder to construct an Elasticsearch 6 compatible ElasticsearchSink. The following example shows the minimal setup to create a ElasticsearchSink that submits actions on checkpoint or the default number of actions was buffered (1000). Example: :: >>> sink = Elasticsearch6SinkBuilder() \\ ... .set_hosts('localhost:9200') \\ ... .set_emitter(ElasticsearchEmitter.static_index("user", "key_col")) \\ ... .build() """ def __init__(self): self._j_elasticsearch_sink_builder = get_gateway().jvm \ .org.apache.flink.connector.elasticsearch.sink.Elasticsearch6SinkBuilder() def get_http_host_class(self): return get_gateway().jvm.org.apache.flink.elasticsearch6.shaded.org.apache.http.HttpHost
[docs]class Elasticsearch7SinkBuilder(ElasticsearchSinkBuilderBase): """ Builder to construct an Elasticsearch 7 compatible ElasticsearchSink. The following example shows the minimal setup to create a ElasticsearchSink that submits actions on checkpoint or the default number of actions was buffered (1000). Example: :: >>> sink = Elasticsearch7SinkBuilder() \\ ... .set_hosts('localhost:9200') \\ ... .set_emitter(ElasticsearchEmitter.dynamic_index("index_col", "key_col")) \\ ... .build() """ def __init__(self): self._j_elasticsearch_sink_builder = get_gateway().jvm \ .org.apache.flink.connector.elasticsearch.sink.Elasticsearch7SinkBuilder() def get_http_host_class(self): return get_gateway().jvm.org.apache.flink.elasticsearch7.shaded.org.apache.http.HttpHost
[docs]class ElasticsearchSink(Sink): """ Flink Sink to insert or update data in an Elasticsearch index. The sink supports the following delivery guarantees. DeliveryGuarantee.NONE does not provide any guarantees: actions are flushed to Elasticsearch only depending on the configurations of the bulk processor. In case of a failure, it might happen that actions are lost if the bulk processor still has buffered actions. DeliveryGuarantee.AT_LEAST_ONCE on a checkpoint the sink will wait until all buffered actions are flushed to and acknowledged by Elasticsearch. No actions will be lost but actions might be sent to Elasticsearch multiple times when Flink restarts. These additional requests may cause inconsistent data in ElasticSearch right after the restart, but eventually everything will be consistent again. """ def __init__(self, j_elasticsearch_sink): super(ElasticsearchSink, self).__init__(sink=j_elasticsearch_sink)