This documentation is for an out-of-date version of Apache Flink. We recommend you use the latest stable version.
Elasticsearch SQL 连接器 #
Sink: Batch Sink: Streaming Append & Upsert Mode
Elasticsearch 连接器允许将数据写入到 Elasticsearch 引擎的索引中。本文档描述运行 SQL 查询时如何设置 Elasticsearch 连接器。
连接器可以工作在 upsert 模式,使用 DDL 中定义的主键与外部系统交换 UPDATE/DELETE 消息。
如果 DDL 中没有定义主键,那么连接器只能工作在 append 模式,只能与外部系统交换 INSERT 消息。
依赖 #
In order to use the Elasticsearch connector the following dependencies are required for both projects using a build automation tool (such as Maven or SBT) and SQL Client with SQL JAR bundles.
Elasticsearch version | Maven dependency | SQL Client JAR |
---|---|---|
6.x |
|
Download |
7.x and later versions |
|
Download |
Elasticsearch 连接器不是二进制发行版的一部分,请查阅这里了解如何在集群运行中引用 Elasticsearch 连接器。
如何创建 Elasticsearch 表 #
以下示例展示了如何创建 Elasticsearch sink 表:
CREATE TABLE myUserTable (
user_id STRING,
user_name STRING,
uv BIGINT,
pv BIGINT,
PRIMARY KEY (user_id) NOT ENFORCED
) WITH (
'connector' = 'elasticsearch-7',
'hosts' = 'http://localhost:9200',
'index' = 'users'
);
连接器参数 #
参数 | 是否必选 | 默认值 | 数据类型 | 描述 |
---|---|---|---|---|
connector |
必选 | (none) | String | 指定要使用的连接器,有效值为:
|
hosts |
必选 | (none) | String | 要连接到的一台或多台 Elasticsearch 主机,例如 'http://host_name:9092;http://host_name:9093' 。 |
index |
必选 | (none) | String | Elasticsearch 中每条记录的索引。可以是一个静态索引(例如 'myIndex' )或一个动态索引(例如 'index-{log_ts|yyyy-MM-dd}' )。
更多详细信息,请参见下面的动态索引部分。 |
document-type |
6.x 版本中必选 | (none) | String | Elasticsearch 文档类型。在 elasticsearch-7 中不再需要。 |
document-id.key-delimiter |
可选 | _ | String | 复合键的分隔符(默认为"_"),例如,指定为"$"将导致文档 ID 为"KEY1$KEY2$KEY3"。 |
username |
可选 | (none) | String | 用于连接 Elasticsearch 实例的用户名。请注意,Elasticsearch 没有预绑定安全特性,但你可以通过如下指南启用它来保护 Elasticsearch 集群。 |
password |
可选 | (none) | String | 用于连接 Elasticsearch 实例的密码。如果配置了username ,则此选项也必须配置为非空字符串。 |
failure-handler |
optional | fail | String | 对 Elasticsearch 请求失败情况下的失败处理策略。有效策略为:
|
sink.flush-on-checkpoint |
optional | true | Boolean | 在进行 checkpoint 时是否保证刷出缓冲区中的数据。如果关闭这一选项,在进行checkpoint时 sink 将不再为所有进行 中的请求等待 Elasticsearch 的执行完成确认。因此,在这种情况下 sink 将不对至少一次的请求的一致性提供任何保证。 |
sink.bulk-flush.max-actions |
可选 | 1000 | Integer | 每个批量请求的最大缓冲操作数。
可以设置为'0' 来禁用它。
|
sink.bulk-flush.max-size |
可选 | 2mb | MemorySize | 每个批量请求的缓冲操作在内存中的最大值。单位必须为 MB。
可以设置为'0' 来禁用它。
|
sink.bulk-flush.interval |
可选 | 1s | Duration | flush 缓冲操作的间隔。
可以设置为'0' 来禁用它。注意,'sink.bulk-flush.max-size' 和'sink.bulk-flush.max-actions' 都设置为'0' 的这种 flush 间隔设置允许对缓冲操作进行完全异步处理。
|
sink.bulk-flush.backoff.strategy |
可选 | DISABLED | String | 指定在由于临时请求错误导致任何 flush 操作失败时如何执行重试。有效策略为:
|
sink.bulk-flush.backoff.max-retries |
可选 | (none) | Integer | 最大回退重试次数。 |
sink.bulk-flush.backoff.delay |
可选 | (none) | Duration | 每次退避尝试之间的延迟。对于 CONSTANT 退避策略,该值是每次重试之间的延迟。对于 EXPONENTIAL 退避策略,该值是初始的延迟。 |
connection.path-prefix |
可选 | (none) | String | 添加到每个 REST 通信中的前缀字符串,例如,'/v1' 。 |
connection.request-timeout |
可选 | (none) | Duration | 从连接管理器请求连接的超时时间。超时时间必须大于或者等于 0,如果设置为 0 则是无限超时。 |
connection.timeout |
可选 | (none) | Duration | 建立请求的超时时间 。超时时间必须大于或者等于 0 ,如果设置为 0 则是无限超时。 |
socket.timeout |
可选 | (none) | Duration | 等待数据的 socket 的超时时间 (SO_TIMEOUT)。超时时间必须大于或者等于 0,如果设置为 0 则是无限超时。 |
format |
可选 | json | String | Elasticsearch 连接器支持指定格式。该格式必须生成一个有效的 json 文档。
默认使用内置的 'json' 格式。更多详细信息,请参阅 JSON Format 页面。
|
特性 #
Key 处理 #
Elasticsearch sink 可以根据是否定义了一个主键来确定是在 upsert 模式还是 append 模式下工作。 如果定义了主键,Elasticsearch sink 将以 upsert 模式工作,该模式可以消费包含 UPDATE/DELETE 消息的查询。 如果未定义主键,Elasticsearch sink 将以 append 模式工作,该模式只能消费包含 INSERT 消息的查询。
在 Elasticsearch 连接器中,主键用于计算 Elasticsearch 的文档 id,文档 id 为最多 512 字节且不包含空格的字符串。
Elasticsearch 连接器通过使用 document-id.key-delimiter
指定的键分隔符按照 DDL 中定义的顺序连接所有主键字段,为每一行记录生成一个文档 ID 字符串。
某些类型不允许作为主键字段,因为它们没有对应的字符串表示形式,例如,BYTES
,ROW
,ARRAY
,MAP
等。
如果未指定主键,Elasticsearch 将自动生成文档 id。
有关 PRIMARY KEY 语法的更多详细信息,请参见 CREATE TABLE DDL。
动态索引 #
Elasticsearch sink 同时支持静态索引和动态索引。
如果你想使用静态索引,则 index
选项值应为纯字符串,例如 'myusers'
,所有记录都将被写入到 “myusers” 索引中。
如果你想使用动态索引,你可以使用 {field_name}
来引用记录中的字段值来动态生成目标索引。
你也可以使用 '{field_name|date_format_string}'
将 TIMESTAMP/DATE/TIME
类型的字段值转换为 date_format_string
指定的格式。
date_format_string
与 Java 的 DateTimeFormatter 兼容。
例如,如果选项值设置为 'myusers-{log_ts|yyyy-MM-dd}'
,则 log_ts
字段值为 2020-03-27 12:25:55
的记录将被写入到 “myusers-2020-03-27” 索引中。
你也可以使用 '{now()|date_format_string}'
将当前的系统时间转换为 date_format_string
指定的格式。now()
对应的时间类型是 TIMESTAMP_WITH_LTZ
。
在将系统时间格式化为字符串时会使用 session 中通过 table.local-time-zone
中配置的时区。 使用 NOW()
, now()
, CURRENT_TIMESTAMP
, current_timestamp
均可以。
注意: 使用当前系统时间生成的动态索引时, 对于 changelog 的流,无法保证同一主键对应的记录能产生相同的索引名, 因此使用基于系统时间的动态索引,只能支持 append only 的流。
数据类型映射 #
Elasticsearch 将文档存储在 JSON 字符串中。因此数据类型映射介于 Flink 数据类型和 JSON 数据类型之间。
Flink 为 Elasticsearch 连接器使用内置的 'json'
格式。更多类型映射的详细信息,请参阅 JSON Format 页面。