Connectors#

Kafka#

Kafka With CSV Format#

################################################################################
#  Licensed to the Apache Software Foundation (ASF) under one
#  or more contributor license agreements.  See the NOTICE file
#  distributed with this work for additional information
#  regarding copyright ownership.  The ASF licenses this file
#  to you under the Apache License, Version 2.0 (the
#  "License"); you may not use this file except in compliance
#  with the License.  You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
#  Unless required by applicable law or agreed to in writing, software
#  distributed under the License is distributed on an "AS IS" BASIS,
#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#  See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
import logging
import sys

from pyflink.common import Types
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.connectors.kafka import FlinkKafkaProducer, FlinkKafkaConsumer
from pyflink.datastream.formats.csv import CsvRowSerializationSchema
from pyflink.datastream.formats.json import JsonRowDeserializationSchema


# Make sure that the Kafka cluster is started and the topic 'test_csv_topic' is
# created before executing this job.
def write_to_kafka(env):
    type_info = Types.ROW([Types.INT(), Types.STRING()])
    ds = env.from_collection([
        (1, 'hi'), (2, 'hello'), (3, 'hi'), (4, 'hello'), (5, 'hi'), (6, 'hello'), (6, 'hello')],
        type_info=type_info)

    serialization_schema = CsvRowSerializationSchema.Builder(type_info).build()
    kafka_producer = FlinkKafkaProducer(
        topic='test_csv_topic',
        serialization_schema=serialization_schema,
        producer_config={'bootstrap.servers': 'localhost:9092', 'group.id': 'test_group'}
    )

    # note that the output type of ds must be RowTypeInfo
    ds.add_sink(kafka_producer)
    env.execute()


def read_from_kafka(env):
    deserialization_schema = JsonRowDeserializationSchema.Builder() \
        .type_info(Types.ROW([Types.INT(), Types.STRING()])) \
        .build()

    kafka_consumer = FlinkKafkaConsumer(
        topics='test_csv_topic',
        deserialization_schema=deserialization_schema,
        properties={'bootstrap.servers': 'localhost:9092', 'group.id': 'test_group_1'}
    )
    kafka_consumer.set_start_from_earliest()

    env.add_source(kafka_consumer).print()
    env.execute()


if __name__ == '__main__':
    logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="%(message)s")

    env = StreamExecutionEnvironment.get_execution_environment()
    env.add_jars("file:///path/to/flink-sql-connector-kafka-1.15.0.jar")

    print("start writing data to kafka")
    write_to_kafka(env)

    print("start reading data from kafka")
    read_from_kafka(env)

Kafka With Json Format#

################################################################################
#  Licensed to the Apache Software Foundation (ASF) under one
#  or more contributor license agreements.  See the NOTICE file
#  distributed with this work for additional information
#  regarding copyright ownership.  The ASF licenses this file
#  to you under the Apache License, Version 2.0 (the
#  "License"); you may not use this file except in compliance
#  with the License.  You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
#  Unless required by applicable law or agreed to in writing, software
#  distributed under the License is distributed on an "AS IS" BASIS,
#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#  See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
import logging
import sys

from pyflink.common import Types
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.connectors.kafka import FlinkKafkaProducer, FlinkKafkaConsumer
from pyflink.datastream.formats.json import JsonRowSerializationSchema, JsonRowDeserializationSchema


# Make sure that the Kafka cluster is started and the topic 'test_json_topic' is
# created before executing this job.
def write_to_kafka(env):
    type_info = Types.ROW([Types.INT(), Types.STRING()])
    ds = env.from_collection(
        [(1, 'hi'), (2, 'hello'), (3, 'hi'), (4, 'hello'), (5, 'hi'), (6, 'hello'), (6, 'hello')],
        type_info=type_info)

    serialization_schema = JsonRowSerializationSchema.Builder() \
        .with_type_info(type_info) \
        .build()
    kafka_producer = FlinkKafkaProducer(
        topic='test_json_topic',
        serialization_schema=serialization_schema,
        producer_config={'bootstrap.servers': 'localhost:9092', 'group.id': 'test_group'}
    )

    # note that the output type of ds must be RowTypeInfo
    ds.add_sink(kafka_producer)
    env.execute()


def read_from_kafka(env):
    deserialization_schema = JsonRowDeserializationSchema.Builder() \
        .type_info(Types.ROW([Types.INT(), Types.STRING()])) \
        .build()
    kafka_consumer = FlinkKafkaConsumer(
        topics='test_json_topic',
        deserialization_schema=deserialization_schema,
        properties={'bootstrap.servers': 'localhost:9092', 'group.id': 'test_group_1'}
    )
    kafka_consumer.set_start_from_earliest()

    env.add_source(kafka_consumer).print()
    env.execute()


if __name__ == '__main__':
    logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="%(message)s")

    env = StreamExecutionEnvironment.get_execution_environment()
    env.add_jars("file:///path/to/flink-sql-connector-kafka-1.15.0.jar")

    print("start writing data to kafka")
    write_to_kafka(env)

    print("start reading data from kafka")
    read_from_kafka(env)

Kafka With Avro Format#

################################################################################
#  Licensed to the Apache Software Foundation (ASF) under one
#  or more contributor license agreements.  See the NOTICE file
#  distributed with this work for additional information
#  regarding copyright ownership.  The ASF licenses this file
#  to you under the Apache License, Version 2.0 (the
#  "License"); you may not use this file except in compliance
#  with the License.  You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
#  Unless required by applicable law or agreed to in writing, software
#  distributed under the License is distributed on an "AS IS" BASIS,
#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#  See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
import logging
import sys

from pyflink.common import Types
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.connectors.kafka import FlinkKafkaProducer, FlinkKafkaConsumer
from pyflink.datastream.formats.avro import AvroRowSerializationSchema, AvroRowDeserializationSchema


# Make sure that the Kafka cluster is started and the topic 'test_avro_topic' is
# created before executing this job.
def write_to_kafka(env):
    ds = env.from_collection([
        (1, 'hi'), (2, 'hello'), (3, 'hi'), (4, 'hello'), (5, 'hi'), (6, 'hello'), (6, 'hello')],
        type_info=Types.ROW([Types.INT(), Types.STRING()]))

    serialization_schema = AvroRowSerializationSchema(
        avro_schema_string="""
            {
                "type": "record",
                "name": "TestRecord",
                "fields": [
                    {"name": "id", "type": "int"},
                    {"name": "name", "type": "string"}
                ]
            }"""
    )

    kafka_producer = FlinkKafkaProducer(
        topic='test_avro_topic',
        serialization_schema=serialization_schema,
        producer_config={'bootstrap.servers': 'localhost:9092', 'group.id': 'test_group'}
    )

    # note that the output type of ds must be RowTypeInfo
    ds.add_sink(kafka_producer)
    env.execute()


def read_from_kafka(env):
    deserialization_schema = AvroRowDeserializationSchema(
        avro_schema_string="""
            {
                "type": "record",
                "name": "TestRecord",
                "fields": [
                    {"name": "id", "type": "int"},
                    {"name": "name", "type": "string"}
                ]
            }"""
    )

    kafka_consumer = FlinkKafkaConsumer(
        topics='test_avro_topic',
        deserialization_schema=deserialization_schema,
        properties={'bootstrap.servers': 'localhost:9092', 'group.id': 'test_group_1'}
    )
    kafka_consumer.set_start_from_earliest()

    env.add_source(kafka_consumer).print()
    env.execute()


if __name__ == '__main__':
    logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="%(message)s")

    env = StreamExecutionEnvironment.get_execution_environment()
    env.add_jars("file:///path/to/flink-sql-avro-1.15.0.jar",
                 "file:///path/to/flink-sql-connector-kafka-1.15.0.jar")

    print("start writing data to kafka")
    write_to_kafka(env)

    print("start reading data from kafka")
    read_from_kafka(env)

Pulsar#

################################################################################
#  Licensed to the Apache Software Foundation (ASF) under one
#  or more contributor license agreements.  See the NOTICE file
#  distributed with this work for additional information
#  regarding copyright ownership.  The ASF licenses this file
#  to you under the Apache License, Version 2.0 (the
#  "License"); you may not use this file except in compliance
#  with the License.  You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
#  Unless required by applicable law or agreed to in writing, software
#  distributed under the License is distributed on an "AS IS" BASIS,
#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#  See the License for the specific language governing permissions and
# limitations under the License.
################################################################################

import logging
import sys

from pyflink.common import SimpleStringSchema, WatermarkStrategy
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.connectors.pulsar import PulsarSource, PulsarSink,\
    PulsarSerializationSchema, StartCursor, StopCursor, SubscriptionType, \
    PulsarDeserializationSchema, DeliveryGuarantee, TopicRoutingMode

if __name__ == '__main__':
    logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="%(message)s")

    PULSAR_SQL_CONNECTOR_PATH = 'file:///path/to/flink-sql-connector-pulsar-1.16.0.jar'
    SERVICE_URL = 'pulsar://localhost:6650'
    ADMIN_URL = 'http://localhost:8080'

    env = StreamExecutionEnvironment.get_execution_environment()
    env.set_parallelism(1)
    env.add_jars(PULSAR_SQL_CONNECTOR_PATH)

    pulsar_source = PulsarSource.builder() \
        .set_service_url(SERVICE_URL) \
        .set_admin_url(ADMIN_URL) \
        .set_topics('ada') \
        .set_start_cursor(StartCursor.latest()) \
        .set_unbounded_stop_cursor(StopCursor.never()) \
        .set_subscription_name('pyflink_subscription') \
        .set_subscription_type(SubscriptionType.Exclusive) \
        .set_deserialization_schema(
            PulsarDeserializationSchema.flink_schema(SimpleStringSchema())) \
        .set_config('pulsar.source.enableAutoAcknowledgeMessage', True) \
        .set_properties({'pulsar.source.autoCommitCursorInterval': '1000'}) \
        .build()

    ds = env.from_source(source=pulsar_source,
                         watermark_strategy=WatermarkStrategy.for_monotonous_timestamps(),
                         source_name="pulsar source")

    pulsar_sink = PulsarSink.builder() \
        .set_service_url(SERVICE_URL) \
        .set_admin_url(ADMIN_URL) \
        .set_producer_name('pyflink_producer') \
        .set_topics('beta') \
        .set_serialization_schema(
            PulsarSerializationSchema.flink_schema(SimpleStringSchema())) \
        .set_delivery_guarantee(DeliveryGuarantee.AT_LEAST_ONCE) \
        .set_topic_routing_mode(TopicRoutingMode.ROUND_ROBIN) \
        .set_config('pulsar.producer.maxPendingMessages', 1000) \
        .set_properties({'pulsar.producer.batchingMaxMessages': '100'}) \
        .build()

    ds.sink_to(pulsar_sink).name('pulsar sink')

    env.execute()

Elasticsearch#

################################################################################
#  Licensed to the Apache Software Foundation (ASF) under one
#  or more contributor license agreements.  See the NOTICE file
#  distributed with this work for additional information
#  regarding copyright ownership.  The ASF licenses this file
#  to you under the Apache License, Version 2.0 (the
#  "License"); you may not use this file except in compliance
#  with the License.  You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
#  Unless required by applicable law or agreed to in writing, software
#  distributed under the License is distributed on an "AS IS" BASIS,
#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#  See the License for the specific language governing permissions and
# limitations under the License.
################################################################################

import logging
import sys

from pyflink.datastream.connectors.elasticsearch import Elasticsearch6SinkBuilder, \
    Elasticsearch7SinkBuilder, FlushBackoffType, ElasticsearchEmitter

from pyflink.common import Types
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.connectors import DeliveryGuarantee


def write_to_es6(env):
    ELASTICSEARCH_SQL_CONNECTOR_PATH = \
        'file:///path/to/flink-sql-connector-elasticsearch6-1.16.0.jar'
    env.add_jars(ELASTICSEARCH_SQL_CONNECTOR_PATH)

    ds = env.from_collection(
        [{'name': 'ada', 'id': '1'}, {'name': 'luna', 'id': '2'}],
        type_info=Types.MAP(Types.STRING(), Types.STRING()))

    es_sink = Elasticsearch6SinkBuilder() \
        .set_emitter(ElasticsearchEmitter.static_index('foo', 'id', 'bar')) \
        .set_hosts(['localhost:9200']) \
        .set_delivery_guarantee(DeliveryGuarantee.AT_LEAST_ONCE) \
        .set_bulk_flush_max_actions(1) \
        .set_bulk_flush_max_size_mb(2) \
        .set_bulk_flush_interval(1000) \
        .set_bulk_flush_backoff_strategy(FlushBackoffType.CONSTANT, 3, 3000) \
        .set_connection_username('foo') \
        .set_connection_password('bar') \
        .set_connection_path_prefix('foo-bar') \
        .set_connection_request_timeout(30000) \
        .set_connection_timeout(31000) \
        .set_socket_timeout(32000) \
        .build()

    ds.sink_to(es_sink).name('es6 sink')

    env.execute()


def write_to_es6_dynamic_index(env):
    ELASTICSEARCH_SQL_CONNECTOR_PATH = \
        'file:///path/to/flink-sql-connector-elasticsearch6-1.16.0.jar'
    env.add_jars(ELASTICSEARCH_SQL_CONNECTOR_PATH)

    ds = env.from_collection(
        [{'name': 'ada', 'id': '1'}, {'name': 'luna', 'id': '2'}],
        type_info=Types.MAP(Types.STRING(), Types.STRING()))

    es_sink = Elasticsearch6SinkBuilder() \
        .set_emitter(ElasticsearchEmitter.dynamic_index('name', 'id', 'bar')) \
        .set_hosts(['localhost:9200']) \
        .build()

    ds.sink_to(es_sink).name('es6 dynamic index sink')

    env.execute()


def write_to_es7(env):
    ELASTICSEARCH_SQL_CONNECTOR_PATH = \
        'file:///path/to/flink-sql-connector-elasticsearch7-1.16.0.jar'
    env.add_jars(ELASTICSEARCH_SQL_CONNECTOR_PATH)

    ds = env.from_collection(
        [{'name': 'ada', 'id': '1'}, {'name': 'luna', 'id': '2'}],
        type_info=Types.MAP(Types.STRING(), Types.STRING()))

    es7_sink = Elasticsearch7SinkBuilder() \
        .set_emitter(ElasticsearchEmitter.static_index('foo', 'id')) \
        .set_hosts(['localhost:9200']) \
        .set_delivery_guarantee(DeliveryGuarantee.AT_LEAST_ONCE) \
        .set_bulk_flush_max_actions(1) \
        .set_bulk_flush_max_size_mb(2) \
        .set_bulk_flush_interval(1000) \
        .set_bulk_flush_backoff_strategy(FlushBackoffType.CONSTANT, 3, 3000) \
        .set_connection_username('foo') \
        .set_connection_password('bar') \
        .set_connection_path_prefix('foo-bar') \
        .set_connection_request_timeout(30000) \
        .set_connection_timeout(31000) \
        .set_socket_timeout(32000) \
        .build()

    ds.sink_to(es7_sink).name('es7 sink')

    env.execute()


def write_to_es7_dynamic_index(env):
    ELASTICSEARCH_SQL_CONNECTOR_PATH = \
        'file:///path/to/flink-sql-connector-elasticsearch7-1.16.0.jar'
    env.add_jars(ELASTICSEARCH_SQL_CONNECTOR_PATH)

    ds = env.from_collection(
        [{'name': 'ada', 'id': '1'}, {'name': 'luna', 'id': '2'}],
        type_info=Types.MAP(Types.STRING(), Types.STRING()))

    es7_sink = Elasticsearch7SinkBuilder() \
        .set_emitter(ElasticsearchEmitter.dynamic_index('name', 'id')) \
        .set_hosts(['localhost:9200']) \
        .build()

    ds.sink_to(es7_sink).name('es7 dynamic index sink')

    env.execute()


if __name__ == '__main__':
    logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="%(message)s")

    env = StreamExecutionEnvironment.get_execution_environment()
    env.set_parallelism(1)

    print("start writing data to elasticsearch6")
    write_to_es6(env)
    write_to_es6_dynamic_index(env)

    print("start writing data to elasticsearch7")
    write_to_es7(env)
    write_to_es7_dynamic_index(env)