This documentation is for an unreleased version of Apache Flink CDC. We recommend you use the latest stable version.
Elasticsearch
Elasticsearch Pipeline Connector #
Elasticsearch Pipeline 连接器可以用作 Pipeline 的 Data Sink, 将数据写入 Elasticsearch。 本文档介绍如何设置 Elasticsearch Pipeline 连接器。
How to create Pipeline #
从 MySQL 读取数据同步到 Elasticsearch 的 Pipeline 可以定义如下:
source:
type: mysql
name: MySQL Source
hostname: 127.0.0.1
port: 3306
username: admin
password: pass
tables: adb.\.*, bdb.user_table_[0-9]+, [app|web].order_\.*
server-id: 5401-5404
sink:
type: elasticsearch
name: Elasticsearch Sink
hosts: http://127.0.0.1:9092,http://127.0.0.1:9093
route:
- source-table: adb.\.*
sink-table: default_index
description: sync adb.\.* table to default_index
pipeline:
name: MySQL to Elasticsearch Pipeline
parallelism: 2
Pipeline Connector Options #
Option | Required | Default | Type | Description |
---|---|---|---|---|
type | required | (none) | String | 指定要使用的连接器, 这里需要设置成 'elasticsearch' . |
name | optional | (none) | String | Sink 的名称。 |
hosts | required | (none) | String | 要连接到的一台或多台 Elasticsearch 主机,例如: 'http://host_name:9092,http://host_name:9093'. |
version | optional | 7 | Integer | 指定要使用的连接器,有效值为:
|
username | optional | (none) | String | 用于连接 Elasticsearch 实例认证的用户名。 |
password | optional | (none) | String | 用于连接 Elasticsearch 实例认证的密码。 |
batch.size.max | optional | 500 | Integer | 每个批量请求的最大缓冲操作数。 可以设置为'0'来禁用它。 |
inflight.requests.max | optional | 5 | Integer | 连接器将尝试执行的最大并发请求数。 |
buffered.requests.max | optional | 1000 | Integer | 每个批量请求的内存缓冲区中保留的最大请求数。 |
batch.size.max.bytes | optional | 5242880 | Long | 每个批量请求的缓冲操作在内存中的最大值(以byte为单位)。 |
buffer.time.max.ms | optional | 5000 | Long | 每个批量请求的缓冲 flush 操作的间隔(以ms为单位)。 |
record.size.max.bytes | optional | 10485760 | Long | 单个记录的最大大小(以byte为单位)。 |
Usage Notes #
-
写入 Elasticsearch 的 index 默认为与上游表同名字符串,可以通过 pipeline 的 route 功能进行修改。
-
如果写入 Elasticsearch 的 index 不存在,不会被默认创建。
Data Type Mapping #
Elasticsearch 将文档存储在 JSON 字符串中,数据类型之间的映射关系如下表所示:
CDC type | JSON type | NOTE |
---|---|---|
TINYINT | NUMBER | |
SMALLINT | NUMBER | |
INT | NUMBER | |
BIGINT | NUMBER | |
FLOAT | NUMBER | |
DOUBLE | NUMBER | |
DECIMAL(p, s) | STRING | |
BOOLEAN | BOOLEAN | |
DATE | STRING | with format: date (yyyy-MM-dd), example: 2024-10-21 |
TIMESTAMP | STRING | with format: date-time (yyyy-MM-dd HH:mm:ss.SSSSSS, with UTC time zone), example: 2024-10-21 14:10:56.000000 |
TIMESTAMP_LTZ | STRING | with format: date-time (yyyy-MM-dd HH:mm:ss.SSSSSS, with UTC time zone), example: 2024-10-21 14:10:56.000000 |
CHAR(n) | STRING | |
VARCHAR(n) | STRING | |
ARRAY | ARRAY | |
MAP | STRING | |
ROW | STRING |