Elasticsearch
This documentation is for an unreleased version of Apache Flink CDC. We recommend you use the latest stable version.

Elasticsearch Pipeline Connector #

Elasticsearch Pipeline 连接器可以用作 Pipeline 的 Data Sink, 将数据写入 Elasticsearch。 本文档介绍如何设置 Elasticsearch Pipeline 连接器。

How to create Pipeline #

从 MySQL 读取数据同步到 Elasticsearch 的 Pipeline 可以定义如下:

source:
   type: mysql
   name: MySQL Source
   hostname: 127.0.0.1
   port: 3306
   username: admin
   password: pass
   tables: adb.\.*, bdb.user_table_[0-9]+, [app|web].order_\.*
   server-id: 5401-5404

sink:
  type: elasticsearch
  name: Elasticsearch Sink
  hosts: http://127.0.0.1:9092,http://127.0.0.1:9093
  
route:
  - source-table: adb.\.*
    sink-table: default_index
    description: sync adb.\.* table to default_index

pipeline:
  name: MySQL to Elasticsearch Pipeline
  parallelism: 2

Pipeline Connector Options #

Option Required Default Type Description
type required (none) String 指定要使用的连接器, 这里需要设置成 'elasticsearch'.
name optional (none) String Sink 的名称。
hosts required (none) String 要连接到的一台或多台 Elasticsearch 主机,例如: 'http://host_name:9092,http://host_name:9093'.
version optional 7 Integer 指定要使用的连接器,有效值为:
  • 6: 连接到 Elasticsearch 6.x 的集群。
  • 7: 连接到 Elasticsearch 7.x 的集群。
  • 8: 连接到 Elasticsearch 8.x 的集群。
username optional (none) String 用于连接 Elasticsearch 实例认证的用户名。
password optional (none) String 用于连接 Elasticsearch 实例认证的密码。
batch.size.max optional 500 Integer 每个批量请求的最大缓冲操作数。 可以设置为'0'来禁用它。
inflight.requests.max optional 5 Integer 连接器将尝试执行的最大并发请求数。
buffered.requests.max optional 1000 Integer 每个批量请求的内存缓冲区中保留的最大请求数。
batch.size.max.bytes optional 5242880 Long 每个批量请求的缓冲操作在内存中的最大值(以byte为单位)。
buffer.time.max.ms optional 5000 Long 每个批量请求的缓冲 flush 操作的间隔(以ms为单位)。
record.size.max.bytes optional 10485760 Long 单个记录的最大大小(以byte为单位)。

Usage Notes #

  • 写入 Elasticsearch 的 index 默认为与上游表同名字符串,可以通过 pipeline 的 route 功能进行修改。

  • 如果写入 Elasticsearch 的 index 不存在,不会被默认创建。

Data Type Mapping #

Elasticsearch 将文档存储在 JSON 字符串中,数据类型之间的映射关系如下表所示:

CDC type JSON type NOTE
TINYINT NUMBER
SMALLINT NUMBER
INT NUMBER
BIGINT NUMBER
FLOAT NUMBER
DOUBLE NUMBER
DECIMAL(p, s) STRING
BOOLEAN BOOLEAN
DATE STRING with format: date (yyyy-MM-dd), example: 2024-10-21
TIMESTAMP STRING with format: date-time (yyyy-MM-dd HH:mm:ss.SSSSSS, with UTC time zone), example: 2024-10-21 14:10:56.000000
TIMESTAMP_LTZ STRING with format: date-time (yyyy-MM-dd HH:mm:ss.SSSSSS, with UTC time zone), example: 2024-10-21 14:10:56.000000
CHAR(n) STRING
VARCHAR(n) STRING
ARRAY ARRAY
MAP STRING
ROW STRING

Back to top