Elasticsearch

Elasticsearch 连接器 #

此连接器提供可以向 Elasticsearch 索引请求文档操作的 sinks。 要使用此连接器,请根据 Elasticsearch 的安装版本将以下依赖之一添加到你的项目中:

Elasticsearch 版本 Maven 依赖
6.x

There is no connector (yet) available for Flink version 1.20.

7.x

There is no connector (yet) available for Flink version 1.20.

为了在 PyFlink 作业中使用 ,需要添加下列依赖:

Version PyFlink JAR
flink-connector-elasticsearch6 There is no SQL jar (yet) available for Flink version 1.20.
flink-connector-elasticsearch7 There is no SQL jar (yet) available for Flink version 1.20.
在 PyFlink 中如何添加 JAR 包依赖请参考 Python 依赖管理

请注意,流连接器目前不是二进制发行版的一部分。 有关如何将程序和用于集群执行的库一起打包,参考此文档

安装 Elasticsearch #

Elasticsearch 集群的设置可以参考此文档

Elasticsearch Sink #

下面的示例展示了如何配置并创建一个 sink:

Elasticsearch 6:

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.connector.elasticsearch.sink.Elasticsearch6SinkBuilder;
import org.apache.flink.streaming.api.datastream.DataStream;

import org.apache.http.HttpHost;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.Requests;

import java.util.HashMap;
import java.util.Map;

DataStream<String> input = ...;

input.sinkTo(
    new Elasticsearch6SinkBuilder<String>()
        // 下面的设置使 sink 在接收每个元素之后立即提交,否则这些元素将被缓存起来
        .setBulkFlushMaxActions(1)
        .setHosts(new HttpHost("127.0.0.1", 9200, "http"))
        .setEmitter(
        (element, context, indexer) ->
        indexer.add(createIndexRequest(element)))
        .build());

private static IndexRequest createIndexRequest(String element) {
    Map<String, Object> json = new HashMap<>();
    json.put("data", element);

    return Requests.indexRequest()
        .index("my-index")
        .type("my-type")
        .id(element)
        .source(json);
}

Elasticsearch 7:

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.connector.elasticsearch.sink.Elasticsearch7SinkBuilder;
import org.apache.flink.streaming.api.datastream.DataStream;

import org.apache.http.HttpHost;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.Requests;

import java.util.HashMap;
import java.util.Map;

DataStream<String> input = ...;

input.sinkTo(
    new Elasticsearch7SinkBuilder<String>()
        // 下面的设置使 sink 在接收每个元素之后立即提交,否则这些元素将被缓存起来
        .setBulkFlushMaxActions(1)
        .setHosts(new HttpHost("127.0.0.1", 9200, "http"))
        .setEmitter(
        (element, context, indexer) ->
        indexer.add(createIndexRequest(element)))
        .build());

private static IndexRequest createIndexRequest(String element) {
    Map<String, Object> json = new HashMap<>();
    json.put("data", element);

    return Requests.indexRequest()
        .index("my-index")
        .id(element)
        .source(json);
}

Elasticsearch 6:

import org.apache.flink.api.connector.sink.SinkWriter
import org.apache.flink.connector.elasticsearch.sink.{Elasticsearch6SinkBuilder, RequestIndexer}
import org.apache.flink.streaming.api.datastream.DataStream
import org.apache.http.HttpHost
import org.elasticsearch.action.index.IndexRequest
import org.elasticsearch.client.Requests

val input: DataStream[String] = ...

input.sinkTo(
  new Elasticsearch6SinkBuilder[String]
    // 下面的设置使 sink 在接收每个元素之后立即提交,否则这些元素将被缓存起来
    .setBulkFlushMaxActions(1)
    .setHosts(new HttpHost("127.0.0.1", 9200, "http"))
    .setEmitter((element: String, context: SinkWriter.Context, indexer: RequestIndexer) => 
    indexer.add(createIndexRequest(element)))
    .build())

def createIndexRequest(element: (String)): IndexRequest = {

  val json = Map(
    "data" -> element.asInstanceOf[AnyRef]
  )

  Requests.indexRequest.index("my-index").source(mapAsJavaMap(json))
}

Elasticsearch 7:

import org.apache.flink.api.connector.sink.SinkWriter
import org.apache.flink.connector.elasticsearch.sink.{Elasticsearch7SinkBuilder, RequestIndexer}
import org.apache.flink.streaming.api.datastream.DataStream
import org.apache.http.HttpHost
import org.elasticsearch.action.index.IndexRequest
import org.elasticsearch.client.Requests

val input: DataStream[String] = ...

input.sinkTo(
  new Elasticsearch7SinkBuilder[String]
    // 下面的设置使 sink 在接收每个元素之后立即提交,否则这些元素将被缓存起来
    .setBulkFlushMaxActions(1)
    .setHosts(new HttpHost("127.0.0.1", 9200, "http"))
    .setEmitter((element: String, context: SinkWriter.Context, indexer: RequestIndexer) => 
    indexer.add(createIndexRequest(element)))
    .build())

def createIndexRequest(element: (String)): IndexRequest = {

  val json = Map(
    "data" -> element.asInstanceOf[AnyRef]
  )

  Requests.indexRequest.index("my-index").`type`("my-type").source(mapAsJavaMap(json))
}

Elasticsearch 6 静态索引:

from pyflink.datastream.connectors.elasticsearch import Elasticsearch6SinkBuilder, ElasticsearchEmitter

env = StreamExecutionEnvironment.get_execution_environment()
env.add_jars(ELASTICSEARCH_SQL_CONNECTOR_PATH)

input = ...

# 下面的 set_bulk_flush_max_actions 使 sink 在接收每个元素之后立即提交,否则这些元素将被缓存起来
es6_sink = Elasticsearch6SinkBuilder() \
    .set_bulk_flush_max_actions(1) \
    .set_emitter(ElasticsearchEmitter.static_index('foo', 'id', 'bar')) \
    .set_hosts(['localhost:9200']) \
    .build()

input.sink_to(es6_sink).name('es6 sink')

Elasticsearch 6 动态索引:

from pyflink.datastream.connectors.elasticsearch import Elasticsearch6SinkBuilder, ElasticsearchEmitter

env = StreamExecutionEnvironment.get_execution_environment()
env.add_jars(ELASTICSEARCH_SQL_CONNECTOR_PATH)

input = ...

es_sink = Elasticsearch6SinkBuilder() \
    .set_emitter(ElasticsearchEmitter.dynamic_index('name', 'id', 'bar')) \
    .set_hosts(['localhost:9200']) \
    .build()

input.sink_to(es6_sink).name('es6 dynamic index sink')

Elasticsearch 7 静态索引:

from pyflink.datastream.connectors.elasticsearch import Elasticsearch7SinkBuilder, ElasticsearchEmitter

env = StreamExecutionEnvironment.get_execution_environment()
env.add_jars(ELASTICSEARCH_SQL_CONNECTOR_PATH)

input = ...

# 下面的 set_bulk_flush_max_actions 使 sink 在接收每个元素之后立即提交,否则这些元素将被缓存起来
es7_sink = Elasticsearch7SinkBuilder() \
    .set_bulk_flush_max_actions(1) \
    .set_emitter(ElasticsearchEmitter.static('foo', 'id')) \
    .set_hosts(['localhost:9200']) \
    .build()

input.sink_to(es7_sink).name('es7 sink')

Elasticsearch 7 动态索引:

from pyflink.datastream.connectors.elasticsearch import Elasticsearch7SinkBuilder, ElasticsearchEmitter

env = StreamExecutionEnvironment.get_execution_environment()
env.add_jars(ELASTICSEARCH_SQL_CONNECTOR_PATH)

input = ...

es7_sink = Elasticsearch7SinkBuilder() \
    .set_emitter(ElasticsearchEmitter.dynamic_index('name', 'id')) \
    .set_hosts(['localhost:9200']) \
    .build()

input.sink_to(es7_sink).name('es7 dynamic index sink')

需要注意的是,该示例仅演示了对每个传入的元素执行单个索引请求。 通常,ElasticsearchSinkFunction 可用于执行多个不同类型的请求(例如 DeleteRequestUpdateRequest 等)。

在内部,Flink Elasticsearch Sink 的每个并行实例使用一个 BulkProcessor 向集群发送操作请求。 这会在元素批量发送到集群之前进行缓存。 BulkProcessor 一次执行一个批量请求,即不会存在两个并行刷新缓存的操作。

Elasticsearch Sinks 和容错 #

通过启用 Flink checkpoint,Flink Elasticsearch Sink 保证至少一次将操作请求发送到 Elasticsearch 集群。 这是通过在进行 checkpoint 时等待 BulkProcessor 中所有挂起的操作请求来实现。 这有效地保证了在触发 checkpoint 之前所有的请求被 Elasticsearch 成功确认,然后继续处理发送到 sink 的记录。

关于 checkpoint 和容错的更多详细信息,请参见容错文档

要使用具有容错特性的 Elasticsearch Sinks,需要在执行环境中启用作业拓扑的 checkpoint:

Elasticsearch 6:

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(5000); // 每 5000 毫秒执行一次 checkpoint

Elasticsearch6SinkBuilder sinkBuilder = new Elasticsearch6SinkBuilder<String>()
    .setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
    .setHosts(new HttpHost("127.0.0.1", 9200, "http"))
    .setEmitter(
    (element, context, indexer) -> 
    indexer.add(createIndexRequest(element)));

Elasticsearch 7:

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(5000); // 每 5000 毫秒执行一次 checkpoint

Elasticsearch7SinkBuilder sinkBuilder = new Elasticsearch7SinkBuilder<String>()
    .setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
    .setHosts(new HttpHost("127.0.0.1", 9200, "http"))
    .setEmitter(
    (element, context, indexer) -> 
    indexer.add(createIndexRequest(element)));

Elasticsearch 6:

val env = StreamExecutionEnvironment.getExecutionEnvironment()
env.enableCheckpointing(5000) // 每 5000 毫秒执行一次 checkpoint

val sinkBuilder = new Elasticsearch6SinkBuilder[String]
  .setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
  .setHosts(new HttpHost("127.0.0.1", 9200, "http"))
  .setEmitter((element: String, context: SinkWriter.Context, indexer: RequestIndexer) =>
  indexer.add(createIndexRequest(element)))

Elasticsearch 7:

val env = StreamExecutionEnvironment.getExecutionEnvironment()
env.enableCheckpointing(5000) // 每 5000 毫秒执行一次 checkpoint

val sinkBuilder = new Elasticsearch7SinkBuilder[String]
  .setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
  .setHosts(new HttpHost("127.0.0.1", 9200, "http"))
  .setEmitter((element: String, context: SinkWriter.Context, indexer: RequestIndexer) =>
  indexer.add(createIndexRequest(element)))

Elasticsearch 6:

env = StreamExecutionEnvironment.get_execution_environment()
# 每 5000 毫秒执行一次 checkpoint
env.enable_checkpointing(5000)

sink_builder = Elasticsearch6SinkBuilder() \
    .set_delivery_guarantee(DeliveryGuarantee.AT_LEAST_ONCE) \
    .set_emitter(ElasticsearchEmitter.static_index('foo', 'id', 'bar')) \
    .set_hosts(['localhost:9200'])

Elasticsearch 7:

env = StreamExecutionEnvironment.get_execution_environment()
# 每 5000 毫秒执行一次 checkpoint
env.enable_checkpointing(5000)

sink_builder = Elasticsearch7SinkBuilder() \
    .set_delivery_guarantee(DeliveryGuarantee.AT_LEAST_ONCE) \
    .set_emitter(ElasticsearchEmitter.static_index('foo', 'id')) \
    .set_hosts(['localhost:9200'])

Using UpdateRequests with deterministic ids and the upsert method it is possible to achieve exactly-once semantics in Elasticsearch when AT_LEAST_ONCE delivery is configured for the connector.

处理失败的 Elasticsearch 请求 #

Elasticsearch 操作请求可能由于多种原因而失败,包括节点队列容量暂时已满或者要被索引的文档格式错误。 Flink Elasticsearch Sink 允许用户通过通过指定一个退避策略来重试请求。

下面是一个例子:

Elasticsearch 6:

DataStream<String> input = ...;

input.sinkTo(
    new Elasticsearch6SinkBuilder<String>()
        .setHosts(new HttpHost("127.0.0.1", 9200, "http"))
        .setEmitter(
        (element, context, indexer) ->
        indexer.add(createIndexRequest(element)))
        // 这里启用了一个指数退避重试策略,初始延迟为 1000 毫秒且最大重试次数为 5
        .setBulkFlushBackoffStrategy(FlushBackoffType.EXPONENTIAL, 5, 1000)
        .build());

Elasticsearch 7:

DataStream<String> input = ...;

input.sinkTo(
    new Elasticsearch7SinkBuilder<String>()
        .setHosts(new HttpHost("127.0.0.1", 9200, "http"))
        .setEmitter(
        (element, context, indexer) ->
        indexer.add(createIndexRequest(element)))
        // 这里启用了一个指数退避重试策略,初始延迟为 1000 毫秒且最大重试次数为 5
        .setBulkFlushBackoffStrategy(FlushBackoffType.EXPONENTIAL, 5, 1000)
        .build());

Elasticsearch 6:

val input: DataStream[String] = ...

input.sinkTo(
  new Elasticsearch6SinkBuilder[String]
    .setHosts(new HttpHost("127.0.0.1", 9200, "http"))
    .setEmitter((element: String, context: SinkWriter.Context, indexer: RequestIndexer) => 
    indexer.add(createIndexRequest(element)))
    // 这里启用了一个指数退避重试策略,初始延迟为 1000 毫秒且最大重试次数为 5
    .setBulkFlushBackoffStrategy(FlushBackoffType.EXPONENTIAL, 5, 1000)
    .build())

Elasticsearch 7:

val input: DataStream[String] = ...

input.sinkTo(
  new Elasticsearch7SinkBuilder[String]
    .setHosts(new HttpHost("127.0.0.1", 9200, "http"))
    .setEmitter((element: String, context: SinkWriter.Context, indexer: RequestIndexer) => 
    indexer.add(createIndexRequest(element)))
    // 这里启用了一个指数退避重试策略,初始延迟为 1000 毫秒且最大重试次数为 5
    .setBulkFlushBackoffStrategy(FlushBackoffType.EXPONENTIAL, 5, 1000)
    .build())

Elasticsearch 6:

input = ...

# 这里启用了一个指数退避重试策略,初始延迟为 1000 毫秒且最大重试次数为 5
es_sink = Elasticsearch6SinkBuilder() \
    .set_bulk_flush_backoff_strategy(FlushBackoffType.CONSTANT, 5, 1000) \
    .set_emitter(ElasticsearchEmitter.static_index('foo', 'id', 'bar')) \
    .set_hosts(['localhost:9200']) \
    .build()

input.sink_to(es_sink).name('es6 sink')

Elasticsearch 7:

input = ...

# 这里启用了一个指数退避重试策略,初始延迟为 1000 毫秒且最大重试次数为 5
es7_sink = Elasticsearch7SinkBuilder() \
    .set_bulk_flush_backoff_strategy(FlushBackoffType.EXPONENTIAL, 5, 1000) \
    .set_emitter(ElasticsearchEmitter.static_index('foo', 'id')) \
    .set_hosts(['localhost:9200']) \
    .build()

input.sink_to(es7_sink).name('es7 sink')

上面的示例 sink 重新添加由于资源受限(例如:队列容量已满)而失败的请求。对于其它类型的故障,例如文档格式错误,sink 将会失败。 如若未设置 BulkFlushBackoffStrategy (或者 FlushBackoffType.NONE),那么任何类型的错误都会导致 sink 失败。

重要提示:在失败时将请求重新添加回内部 BulkProcessor 会导致更长的 checkpoint,因为在进行 checkpoint 时,sink 还需要等待重新添加的请求被刷新。 例如,当使用 FlushBackoffType.EXPONENTIAL 时, checkpoint 会进行等待,直到 Elasticsearch 节点队列有足够的容量来处理所有挂起的请求,或者达到最大重试次数。

配置内部批量处理器 #

通过使用以下在 Elasticsearch6SinkBuilder 中提供的方法,可以进一步配置内部的 BulkProcessor 关于其如何刷新缓存操作请求的行为:

  • setBulkFlushMaxActions(int numMaxActions):刷新前最大缓存的操作数。
  • setBulkFlushMaxSizeMb(int maxSizeMb):刷新前最大缓存的数据量(以兆字节为单位)。
  • setBulkFlushInterval(long intervalMillis):刷新的时间间隔(不论缓存操作的数量或大小如何)。

还支持配置如何对暂时性请求错误进行重试:

  • setBulkFlushBackoffStrategy(FlushBackoffType flushBackoffType, int maxRetries, long delayMillis):退避延迟的类型,CONSTANT 或者 EXPONENTIAL,退避重试次数,退避重试的时间间隔。 对于常量延迟来说,此值是每次重试间的间隔。对于指数延迟来说,此值是延迟的初始值。

可以在此文档找到 Elasticsearch 的更多信息。

将 Elasticsearch 连接器打包到 Uber-Jar 中 #

建议构建一个包含所有依赖的 uber-jar (可执行的 jar),以便更好地执行你的 Flink 程序。 (更多信息参见此文档)。

或者,你可以将连接器的 jar 文件放入 Flink 的 lib/ 目录下,使其在全局范围内可用,即可用于所有的作业。

Back to top