Elasticsearch

Elasticsearch Pipeline Connector #

The Elasticsearch Pipeline connector can be used as the Data Sink of the pipeline, and write data to Elasticsearch. This document describes how to set up the Elasticsearch Pipeline connector.

How to create Pipeline #

The pipeline for reading data from MySQL and sink to Elasticsearch can be defined as follows:

source:
   type: mysql
   name: MySQL Source
   hostname: 127.0.0.1
   port: 3306
   username: admin
   password: pass
   tables: adb.\.*, bdb.user_table_[0-9]+, [app|web].order_\.*
   server-id: 5401-5404

sink:
  type: elasticsearch
  name: Elasticsearch Sink
  hosts: http://127.0.0.1:9092,http://127.0.0.1:9093
  
route:
  - source-table: adb.\.*
    sink-table: default_index
    description: sync adb.\.* table to default_index

pipeline:
  name: MySQL to Elasticsearch Pipeline
  parallelism: 2

Pipeline Connector Options #

Option Required Default Type Description
type required (none) String Specify what connector to use, here should be 'elasticsearch'.
name optional (none) String The name of the sink.
hosts required (none) String One or more Elasticsearch hosts to connect to, e.g. 'http://host_name:9092,http://host_name:9093'.
version optional 7 Integer Specify what connector to use, valid values are:
  • 6: connect to Elasticsearch 6.x cluster.
  • 7: connect to Elasticsearch 7.x cluster.
  • 8: connect to Elasticsearch 8.x cluster.
username optional (none) String The username for Elasticsearch authentication.
password optional (none) String The password for Elasticsearch authentication.
batch.size.max optional 500 Integer Maximum number of buffered actions per bulk request. Can be set to '0' to disable it.
inflight.requests.max optional 5 Integer The maximum number of concurrent requests that the sink will try to execute.
buffered.requests.max optional 1000 Integer The maximum number of requests to keep in the in-memory buffer.
batch.size.max.bytes optional 5242880 Long The maximum size of batch requests in bytes.
buffer.time.max.ms optional 5000 Long The maximum time to wait for incomplete batches before flushing.
record.size.max.bytes optional 10485760 Long The maximum size of a single record in bytes.

Usage Notes #

  • The written index of Elasticsearch will be namespace.schemaName.tableName string of TableId,this can be changed using route function of pipeline.

  • No support for automatic Elasticsearch index creation.

Data Type Mapping #

Elasticsearch stores document in a JSON string. So the data type mapping is between Flink CDC data type and JSON data type.

CDC type JSON type NOTE
TINYINT NUMBER
SMALLINT NUMBER
INT NUMBER
BIGINT NUMBER
FLOAT NUMBER
DOUBLE NUMBER
DECIMAL(p, s) STRING
BOOLEAN BOOLEAN
DATE STRING with format: date (yyyy-MM-dd), example: 2024-10-21
TIMESTAMP STRING with format: date-time (yyyy-MM-dd HH:mm:ss.SSSSSS, with UTC time zone), example: 2024-10-21 14:10:56.000000
TIMESTAMP_LTZ STRING with format: date-time (yyyy-MM-dd HH:mm:ss.SSSSSS, with UTC time zone), example: 2024-10-21 14:10:56.000000
CHAR(n) STRING
VARCHAR(n) STRING
ARRAY ARRAY
MAP STRING
ROW STRING

Back to top