Data Pipeline
This documentation is for an unreleased version of Apache Flink CDC. We recommend you use the latest stable version.

Definition #

Since events in Flink CDC flow from the upstream to the downstream in a pipeline manner, the whole ETL task is referred as a Data Pipeline.

Parameters #

A pipeline corresponds to a chain of operators in Flink.
To describe a Data Pipeline, the following parts are required:

the following parts are optional:

Example #

Only required #

We could use following yaml file to define a concise Data Pipeline describing synchronize all tables under MySQL app_db database to Doris :

   source:
     type: mysql
     hostname: localhost
     port: 3306
     username: root
     password: 123456
     tables: app_db.\.*

   sink:
     type: doris
     fenodes: 127.0.0.1:8030
     username: root
     password: ""

   transform:
     - source-table: adb.web_order01
       projection: \*, UPPER(product_name) as product_name
       filter: id > 10 AND order_id > 100
       description: project fields and filter
     - source-table: adb.web_order02
       projection: \*, UPPER(product_name) as product_name
       filter: id > 20 AND order_id > 200
       description: project fields and filter

   route:
     - source-table: app_db.orders
       sink-table: ods_db.ods_orders
     - source-table: app_db.shipments
       sink-table: ods_db.ods_shipments
     - source-table: app_db.products
       sink-table: ods_db.ods_products

   pipeline:
     name: Sync MySQL Database to Doris
     parallelism: 2

With optional #

We could use following yaml file to define a complicated Data Pipeline describing synchronize all tables under MySQL app_db database to Doris and give specific target database name ods_db and specific target table name prefix ods_ :

   source:
     type: mysql
     hostname: localhost
     port: 3306
     username: root
     password: 123456
     tables: app_db.\.*

   sink:
     type: doris
     fenodes: 127.0.0.1:8030
     username: root
     password: ""

   transform:
     - source-table: adb.web_order01
       projection: \*, format('%S', product_name) as product_name
       filter: addone(id) > 10 AND order_id > 100
       description: project fields and filter
     - source-table: adb.web_order02
       projection: \*, format('%S', product_name) as product_name
       filter: addone(id) > 20 AND order_id > 200
       description: project fields and filter

   route:
     - source-table: app_db.orders
       sink-table: ods_db.ods_orders
     - source-table: app_db.shipments
       sink-table: ods_db.ods_shipments
     - source-table: app_db.products
       sink-table: ods_db.ods_products

   pipeline:
     name: Sync MySQL Database to Doris
     parallelism: 2
     user-defined-function:
       - name: addone
         classpath: com.example.functions.AddOneFunctionClass
       - name: format
         classpath: com.example.functions.FormatFunctionClass

Pipeline Configurations #

The following config options of Data Pipeline level are supported:

parameter meaning optional/required
name The name of the pipeline, which will be submitted to the Flink cluster as the job name. optional
parallelism The global parallelism of the pipeline. Defaults to 1. optional
local-time-zone The local time zone defines current session time zone id. optional