Connectors#
Kafka#
Kafka With CSV Format#
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
import logging
import sys
from pyflink.common import Types
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.connectors.kafka import FlinkKafkaProducer, FlinkKafkaConsumer
from pyflink.datastream.formats.csv import CsvRowSerializationSchema
from pyflink.datastream.formats.json import JsonRowDeserializationSchema
# Make sure that the Kafka cluster is started and the topic 'test_csv_topic' is
# created before executing this job.
def write_to_kafka(env):
type_info = Types.ROW([Types.INT(), Types.STRING()])
ds = env.from_collection([
(1, 'hi'), (2, 'hello'), (3, 'hi'), (4, 'hello'), (5, 'hi'), (6, 'hello'), (6, 'hello')],
type_info=type_info)
serialization_schema = CsvRowSerializationSchema.Builder(type_info).build()
kafka_producer = FlinkKafkaProducer(
topic='test_csv_topic',
serialization_schema=serialization_schema,
producer_config={'bootstrap.servers': 'localhost:9092', 'group.id': 'test_group'}
)
# note that the output type of ds must be RowTypeInfo
ds.add_sink(kafka_producer)
env.execute()
def read_from_kafka(env):
deserialization_schema = JsonRowDeserializationSchema.Builder() \
.type_info(Types.ROW([Types.INT(), Types.STRING()])) \
.build()
kafka_consumer = FlinkKafkaConsumer(
topics='test_csv_topic',
deserialization_schema=deserialization_schema,
properties={'bootstrap.servers': 'localhost:9092', 'group.id': 'test_group_1'}
)
kafka_consumer.set_start_from_earliest()
env.add_source(kafka_consumer).print()
env.execute()
if __name__ == '__main__':
logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="%(message)s")
env = StreamExecutionEnvironment.get_execution_environment()
env.add_jars("file:///path/to/flink-sql-connector-kafka-1.15.0.jar")
print("start writing data to kafka")
write_to_kafka(env)
print("start reading data from kafka")
read_from_kafka(env)
Kafka With Json Format#
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
import logging
import sys
from pyflink.common import Types
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.connectors.kafka import FlinkKafkaProducer, FlinkKafkaConsumer
from pyflink.datastream.formats.json import JsonRowSerializationSchema, JsonRowDeserializationSchema
# Make sure that the Kafka cluster is started and the topic 'test_json_topic' is
# created before executing this job.
def write_to_kafka(env):
type_info = Types.ROW([Types.INT(), Types.STRING()])
ds = env.from_collection(
[(1, 'hi'), (2, 'hello'), (3, 'hi'), (4, 'hello'), (5, 'hi'), (6, 'hello'), (6, 'hello')],
type_info=type_info)
serialization_schema = JsonRowSerializationSchema.Builder() \
.with_type_info(type_info) \
.build()
kafka_producer = FlinkKafkaProducer(
topic='test_json_topic',
serialization_schema=serialization_schema,
producer_config={'bootstrap.servers': 'localhost:9092', 'group.id': 'test_group'}
)
# note that the output type of ds must be RowTypeInfo
ds.add_sink(kafka_producer)
env.execute()
def read_from_kafka(env):
deserialization_schema = JsonRowDeserializationSchema.Builder() \
.type_info(Types.ROW([Types.INT(), Types.STRING()])) \
.build()
kafka_consumer = FlinkKafkaConsumer(
topics='test_json_topic',
deserialization_schema=deserialization_schema,
properties={'bootstrap.servers': 'localhost:9092', 'group.id': 'test_group_1'}
)
kafka_consumer.set_start_from_earliest()
env.add_source(kafka_consumer).print()
env.execute()
if __name__ == '__main__':
logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="%(message)s")
env = StreamExecutionEnvironment.get_execution_environment()
env.add_jars("file:///path/to/flink-sql-connector-kafka-1.15.0.jar")
print("start writing data to kafka")
write_to_kafka(env)
print("start reading data from kafka")
read_from_kafka(env)
Kafka With Avro Format#
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
import logging
import sys
from pyflink.common import Types
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.connectors.kafka import FlinkKafkaProducer, FlinkKafkaConsumer
from pyflink.datastream.formats.avro import AvroRowSerializationSchema, AvroRowDeserializationSchema
# Make sure that the Kafka cluster is started and the topic 'test_avro_topic' is
# created before executing this job.
def write_to_kafka(env):
ds = env.from_collection([
(1, 'hi'), (2, 'hello'), (3, 'hi'), (4, 'hello'), (5, 'hi'), (6, 'hello'), (6, 'hello')],
type_info=Types.ROW([Types.INT(), Types.STRING()]))
serialization_schema = AvroRowSerializationSchema(
avro_schema_string="""
{
"type": "record",
"name": "TestRecord",
"fields": [
{"name": "id", "type": "int"},
{"name": "name", "type": "string"}
]
}"""
)
kafka_producer = FlinkKafkaProducer(
topic='test_avro_topic',
serialization_schema=serialization_schema,
producer_config={'bootstrap.servers': 'localhost:9092', 'group.id': 'test_group'}
)
# note that the output type of ds must be RowTypeInfo
ds.add_sink(kafka_producer)
env.execute()
def read_from_kafka(env):
deserialization_schema = AvroRowDeserializationSchema(
avro_schema_string="""
{
"type": "record",
"name": "TestRecord",
"fields": [
{"name": "id", "type": "int"},
{"name": "name", "type": "string"}
]
}"""
)
kafka_consumer = FlinkKafkaConsumer(
topics='test_avro_topic',
deserialization_schema=deserialization_schema,
properties={'bootstrap.servers': 'localhost:9092', 'group.id': 'test_group_1'}
)
kafka_consumer.set_start_from_earliest()
env.add_source(kafka_consumer).print()
env.execute()
if __name__ == '__main__':
logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="%(message)s")
env = StreamExecutionEnvironment.get_execution_environment()
env.add_jars("file:///path/to/flink-sql-avro-1.15.0.jar",
"file:///path/to/flink-sql-connector-kafka-1.15.0.jar")
print("start writing data to kafka")
write_to_kafka(env)
print("start reading data from kafka")
read_from_kafka(env)
Pulsar#
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
import logging
import sys
from pyflink.common import SimpleStringSchema, WatermarkStrategy
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.connectors.pulsar import PulsarSource, PulsarSink,\
PulsarSerializationSchema, StartCursor, StopCursor, SubscriptionType, \
PulsarDeserializationSchema, DeliveryGuarantee, TopicRoutingMode
if __name__ == '__main__':
logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="%(message)s")
PULSAR_SQL_CONNECTOR_PATH = 'file:///path/to/flink-sql-connector-pulsar-1.16.0.jar'
SERVICE_URL = 'pulsar://localhost:6650'
ADMIN_URL = 'http://localhost:8080'
env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)
env.add_jars(PULSAR_SQL_CONNECTOR_PATH)
pulsar_source = PulsarSource.builder() \
.set_service_url(SERVICE_URL) \
.set_admin_url(ADMIN_URL) \
.set_topics('ada') \
.set_start_cursor(StartCursor.latest()) \
.set_unbounded_stop_cursor(StopCursor.never()) \
.set_subscription_name('pyflink_subscription') \
.set_subscription_type(SubscriptionType.Exclusive) \
.set_deserialization_schema(
PulsarDeserializationSchema.flink_schema(SimpleStringSchema())) \
.set_config('pulsar.source.enableAutoAcknowledgeMessage', True) \
.set_properties({'pulsar.source.autoCommitCursorInterval': '1000'}) \
.build()
ds = env.from_source(source=pulsar_source,
watermark_strategy=WatermarkStrategy.for_monotonous_timestamps(),
source_name="pulsar source")
pulsar_sink = PulsarSink.builder() \
.set_service_url(SERVICE_URL) \
.set_admin_url(ADMIN_URL) \
.set_producer_name('pyflink_producer') \
.set_topics('beta') \
.set_serialization_schema(
PulsarSerializationSchema.flink_schema(SimpleStringSchema())) \
.set_delivery_guarantee(DeliveryGuarantee.AT_LEAST_ONCE) \
.set_topic_routing_mode(TopicRoutingMode.ROUND_ROBIN) \
.set_config('pulsar.producer.maxPendingMessages', 1000) \
.set_properties({'pulsar.producer.batchingMaxMessages': '100'}) \
.build()
ds.sink_to(pulsar_sink).name('pulsar sink')
env.execute()
Elasticsearch#
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
import logging
import sys
from pyflink.datastream.connectors.elasticsearch import Elasticsearch6SinkBuilder, \
Elasticsearch7SinkBuilder, FlushBackoffType, ElasticsearchEmitter
from pyflink.common import Types
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.connectors import DeliveryGuarantee
def write_to_es6(env):
ELASTICSEARCH_SQL_CONNECTOR_PATH = \
'file:///path/to/flink-sql-connector-elasticsearch6-1.16.0.jar'
env.add_jars(ELASTICSEARCH_SQL_CONNECTOR_PATH)
ds = env.from_collection(
[{'name': 'ada', 'id': '1'}, {'name': 'luna', 'id': '2'}],
type_info=Types.MAP(Types.STRING(), Types.STRING()))
es_sink = Elasticsearch6SinkBuilder() \
.set_emitter(ElasticsearchEmitter.static_index('foo', 'id', 'bar')) \
.set_hosts(['localhost:9200']) \
.set_delivery_guarantee(DeliveryGuarantee.AT_LEAST_ONCE) \
.set_bulk_flush_max_actions(1) \
.set_bulk_flush_max_size_mb(2) \
.set_bulk_flush_interval(1000) \
.set_bulk_flush_backoff_strategy(FlushBackoffType.CONSTANT, 3, 3000) \
.set_connection_username('foo') \
.set_connection_password('bar') \
.set_connection_path_prefix('foo-bar') \
.set_connection_request_timeout(30000) \
.set_connection_timeout(31000) \
.set_socket_timeout(32000) \
.build()
ds.sink_to(es_sink).name('es6 sink')
env.execute()
def write_to_es6_dynamic_index(env):
ELASTICSEARCH_SQL_CONNECTOR_PATH = \
'file:///path/to/flink-sql-connector-elasticsearch6-1.16.0.jar'
env.add_jars(ELASTICSEARCH_SQL_CONNECTOR_PATH)
ds = env.from_collection(
[{'name': 'ada', 'id': '1'}, {'name': 'luna', 'id': '2'}],
type_info=Types.MAP(Types.STRING(), Types.STRING()))
es_sink = Elasticsearch6SinkBuilder() \
.set_emitter(ElasticsearchEmitter.dynamic_index('name', 'id', 'bar')) \
.set_hosts(['localhost:9200']) \
.build()
ds.sink_to(es_sink).name('es6 dynamic index sink')
env.execute()
def write_to_es7(env):
ELASTICSEARCH_SQL_CONNECTOR_PATH = \
'file:///path/to/flink-sql-connector-elasticsearch7-1.16.0.jar'
env.add_jars(ELASTICSEARCH_SQL_CONNECTOR_PATH)
ds = env.from_collection(
[{'name': 'ada', 'id': '1'}, {'name': 'luna', 'id': '2'}],
type_info=Types.MAP(Types.STRING(), Types.STRING()))
es7_sink = Elasticsearch7SinkBuilder() \
.set_emitter(ElasticsearchEmitter.static_index('foo', 'id')) \
.set_hosts(['localhost:9200']) \
.set_delivery_guarantee(DeliveryGuarantee.AT_LEAST_ONCE) \
.set_bulk_flush_max_actions(1) \
.set_bulk_flush_max_size_mb(2) \
.set_bulk_flush_interval(1000) \
.set_bulk_flush_backoff_strategy(FlushBackoffType.CONSTANT, 3, 3000) \
.set_connection_username('foo') \
.set_connection_password('bar') \
.set_connection_path_prefix('foo-bar') \
.set_connection_request_timeout(30000) \
.set_connection_timeout(31000) \
.set_socket_timeout(32000) \
.build()
ds.sink_to(es7_sink).name('es7 sink')
env.execute()
def write_to_es7_dynamic_index(env):
ELASTICSEARCH_SQL_CONNECTOR_PATH = \
'file:///path/to/flink-sql-connector-elasticsearch7-1.16.0.jar'
env.add_jars(ELASTICSEARCH_SQL_CONNECTOR_PATH)
ds = env.from_collection(
[{'name': 'ada', 'id': '1'}, {'name': 'luna', 'id': '2'}],
type_info=Types.MAP(Types.STRING(), Types.STRING()))
es7_sink = Elasticsearch7SinkBuilder() \
.set_emitter(ElasticsearchEmitter.dynamic_index('name', 'id')) \
.set_hosts(['localhost:9200']) \
.build()
ds.sink_to(es7_sink).name('es7 dynamic index sink')
env.execute()
if __name__ == '__main__':
logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="%(message)s")
env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)
print("start writing data to elasticsearch6")
write_to_es6(env)
write_to_es6_dynamic_index(env)
print("start writing data to elasticsearch7")
write_to_es7(env)
write_to_es7_dynamic_index(env)