Elasticsearch Pipeline Connector #
The Elasticsearch Pipeline connector can be used as the Data Sink of the pipeline, and write data to Elasticsearch. This document describes how to set up the Elasticsearch Pipeline connector.
How to create Pipeline #
The pipeline for reading data from MySQL and sink to Elasticsearch can be defined as follows:
source:
type: mysql
name: MySQL Source
hostname: 127.0.0.1
port: 3306
username: admin
password: pass
tables: adb.\.*, bdb.user_table_[0-9]+, [app|web].order_\.*
server-id: 5401-5404
sink:
type: elasticsearch
name: Elasticsearch Sink
hosts: http://127.0.0.1:9092,http://127.0.0.1:9093
route:
- source-table: adb.\.*
sink-table: default_index
description: sync adb.\.* table to default_index
pipeline:
name: MySQL to Elasticsearch Pipeline
parallelism: 2
Pipeline Connector Options #
Option | Required | Default | Type | Description |
---|---|---|---|---|
type | required | (none) | String | Specify what connector to use, here should be 'elasticsearch' . |
name | optional | (none) | String | The name of the sink. |
hosts | required | (none) | String | One or more Elasticsearch hosts to connect to, e.g. 'http://host_name:9092,http://host_name:9093'. |
version | optional | 7 | Integer | Specify what connector to use, valid values are:
|
username | optional | (none) | String | The username for Elasticsearch authentication. |
password | optional | (none) | String | The password for Elasticsearch authentication. |
batch.size.max | optional | 500 | Integer | Maximum number of buffered actions per bulk request. Can be set to '0' to disable it. |
inflight.requests.max | optional | 5 | Integer | The maximum number of concurrent requests that the sink will try to execute. |
buffered.requests.max | optional | 1000 | Integer | The maximum number of requests to keep in the in-memory buffer. |
batch.size.max.bytes | optional | 5242880 | Long | The maximum size of batch requests in bytes. |
buffer.time.max.ms | optional | 5000 | Long | The maximum time to wait for incomplete batches before flushing. |
record.size.max.bytes | optional | 10485760 | Long | The maximum size of a single record in bytes. |
Usage Notes #
-
The written index of Elasticsearch will be
namespace.schemaName.tableName
string of TableId,this can be changed using route function of pipeline. -
No support for automatic Elasticsearch index creation.
Data Type Mapping #
Elasticsearch stores document in a JSON string. So the data type mapping is between Flink CDC data type and JSON data type.
CDC type | JSON type | NOTE |
---|---|---|
TINYINT | NUMBER | |
SMALLINT | NUMBER | |
INT | NUMBER | |
BIGINT | NUMBER | |
FLOAT | NUMBER | |
DOUBLE | NUMBER | |
DECIMAL(p, s) | STRING | |
BOOLEAN | BOOLEAN | |
DATE | STRING | with format: date (yyyy-MM-dd), example: 2024-10-21 |
TIMESTAMP | STRING | with format: date-time (yyyy-MM-dd HH:mm:ss.SSSSSS, with UTC time zone), example: 2024-10-21 14:10:56.000000 |
TIMESTAMP_LTZ | STRING | with format: date-time (yyyy-MM-dd HH:mm:ss.SSSSSS, with UTC time zone), example: 2024-10-21 14:10:56.000000 |
CHAR(n) | STRING | |
VARCHAR(n) | STRING | |
ARRAY | ARRAY | |
MAP | STRING | |
ROW | STRING |