################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
import warnings
from enum import Enum
from typing import Dict, Union, List
from pyflink.common import DeserializationSchema, TypeInformation, ExecutionConfig, \
ConfigOptions, Duration, SerializationSchema, ConfigOption
from pyflink.datastream.connectors import Source, Sink, DeliveryGuarantee
from pyflink.java_gateway import get_gateway
from pyflink.util.java_utils import load_java_class
__all__ = [
'PulsarSource',
'PulsarSourceBuilder',
'PulsarDeserializationSchema',
'SubscriptionType',
'StartCursor',
'StopCursor',
'PulsarSink',
'PulsarSinkBuilder',
'PulsarSerializationSchema',
'MessageDelayer',
'TopicRoutingMode'
]
# ---- PulsarSource ----
[docs]class PulsarDeserializationSchema(object):
"""
A schema bridge for deserializing the pulsar's Message into a flink managed instance. We
support both the pulsar's self managed schema and flink managed schema.
"""
def __init__(self, _j_pulsar_deserialization_schema):
self._j_pulsar_deserialization_schema = _j_pulsar_deserialization_schema
@staticmethod
def flink_schema(deserialization_schema: DeserializationSchema) \
-> 'PulsarDeserializationSchema':
"""
Create a PulsarDeserializationSchema by using the flink's DeserializationSchema. It would
consume the pulsar message as byte array and decode the message by using flink's logic.
"""
JPulsarDeserializationSchema = get_gateway().jvm.org.apache.flink \
.connector.pulsar.source.reader.deserializer.PulsarDeserializationSchema
_j_pulsar_deserialization_schema = JPulsarDeserializationSchema.flinkSchema(
deserialization_schema._j_deserialization_schema)
return PulsarDeserializationSchema(_j_pulsar_deserialization_schema)
@staticmethod
def flink_type_info(type_information: TypeInformation,
execution_config: ExecutionConfig = None) -> 'PulsarDeserializationSchema':
"""
Create a PulsarDeserializationSchema by using the given TypeInformation. This method is
only used for treating message that was written into pulsar by TypeInformation.
"""
JPulsarDeserializationSchema = get_gateway().jvm.org.apache.flink \
.connector.pulsar.source.reader.deserializer.PulsarDeserializationSchema
JExecutionConfig = get_gateway().jvm.org.apache.flink.api.common.ExecutionConfig
_j_execution_config = execution_config._j_execution_config \
if execution_config is not None else JExecutionConfig()
_j_pulsar_deserialization_schema = JPulsarDeserializationSchema.flinkTypeInfo(
type_information.get_java_type_info(), _j_execution_config)
return PulsarDeserializationSchema(_j_pulsar_deserialization_schema)
[docs]class SubscriptionType(Enum):
"""
Types of subscription supported by Pulsar.
:data: `Exclusive`:
There can be only 1 consumer on the same topic with the same subscription name.
:data: `Shared`:
Multiple consumer will be able to use the same subscription name and the messages will be
dispatched according to a round-robin rotation between the connected consumers. In this mode,
the consumption order is not guaranteed.
:data: `Failover`:
Multiple consumer will be able to use the same subscription name but only 1 consumer will
receive the messages. If that consumer disconnects, one of the other connected consumers will
start receiving messages. In failover mode, the consumption ordering is guaranteed. In case of
partitioned topics, the ordering is guaranteed on a per-partition basis. The partitions
assignments will be split across the available consumers. On each partition, at most one
consumer will be active at a given point in time.
:data: `Key_Shared`:
Multiple consumer will be able to use the same subscription and all messages with the same key
will be dispatched to only one consumer. Use ordering_key to overwrite the message key for
message ordering.
"""
Exclusive = 0,
Shared = 1,
Failover = 2,
Key_Shared = 3
def _to_j_subscription_type(self):
JSubscriptionType = get_gateway().jvm.org.apache.pulsar.client.api.SubscriptionType
return getattr(JSubscriptionType, self.name)
[docs]class StartCursor(object):
"""
A factory class for users to specify the start position of a pulsar subscription.
Since it would be serialized into split.
The implementation for this interface should be well considered.
I don't recommend adding extra internal state for this implementation.
This class would be used only for SubscriptionType.Exclusive and SubscriptionType.Failover.
"""
def __init__(self, _j_start_cursor):
self._j_start_cursor = _j_start_cursor
@staticmethod
def default_start_cursor() -> 'StartCursor':
return StartCursor.earliest()
@staticmethod
def earliest() -> 'StartCursor':
JStartCursor = get_gateway().jvm \
.org.apache.flink.connector.pulsar.source.enumerator.cursor.StartCursor
return StartCursor(JStartCursor.earliest())
@staticmethod
def latest() -> 'StartCursor':
JStartCursor = get_gateway().jvm \
.org.apache.flink.connector.pulsar.source.enumerator.cursor.StartCursor
return StartCursor(JStartCursor.latest())
@staticmethod
def from_message_time(timestamp: int) -> 'StartCursor':
"""
This method is designed for seeking message from event time. But Pulsar didn't support
seeking from message time, instead, it would seek the position from publish time. We only
keep this method for backward compatible.
"""
warnings.warn("Deprecated in 1.16, use from_publish_time() instead.", DeprecationWarning)
JStartCursor = get_gateway().jvm \
.org.apache.flink.connector.pulsar.source.enumerator.cursor.StartCursor
return StartCursor(JStartCursor.fromMessageTime(timestamp))
@staticmethod
def from_message_id(message_id: bytes, inclusive: bool = True) -> 'StartCursor':
"""
Find the available message id and start consuming from it. User could call pulsar Python
library serialize method to cover messageId bytes.
Example:
::
>>> from pulsar import MessageId
>>> message_id_bytes = MessageId().serialize()
>>> start_cursor = StartCursor.from_message_id(message_id_bytes)
"""
JStartCursor = get_gateway().jvm \
.org.apache.flink.connector.pulsar.source.enumerator.cursor.StartCursor
j_message_id = get_gateway().jvm.org.apache.pulsar.client.api.MessageId \
.fromByteArray(message_id)
return StartCursor(JStartCursor.fromMessageId(j_message_id, inclusive))
@staticmethod
def from_publish_time(timestamp: int) -> 'StartCursor':
"""
Seek the start position by using message publish time.
"""
JStartCursor = get_gateway().jvm \
.org.apache.flink.connector.pulsar.source.enumerator.cursor.StartCursor
return StartCursor(JStartCursor.fromPublishTime(timestamp))
[docs]class StopCursor(object):
"""
A factory class for users to specify the stop position of a pulsar subscription. Since it would
be serialized into split. The implementation for this interface should be well considered. I
don't recommend adding extra internal state for this implementation.
"""
def __init__(self, _j_stop_cursor):
self._j_stop_cursor = _j_stop_cursor
@staticmethod
def default_stop_cursor() -> 'StopCursor':
return StopCursor.never()
@staticmethod
def never() -> 'StopCursor':
JStopCursor = get_gateway().jvm \
.org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor
return StopCursor(JStopCursor.never())
@staticmethod
def latest() -> 'StopCursor':
JStopCursor = get_gateway().jvm \
.org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor
return StopCursor(JStopCursor.latest())
@staticmethod
def at_event_time(timestamp: int) -> 'StopCursor':
"""
Stop consuming when message eventTime is greater than or equals the specified timestamp.
"""
JStopCursor = get_gateway().jvm \
.org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor
return StopCursor(JStopCursor.atEventTime(timestamp))
@staticmethod
def after_event_time(timestamp: int) -> 'StopCursor':
"""
Stop consuming when message eventTime is greater than the specified timestamp.
"""
JStopCursor = get_gateway().jvm \
.org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor
return StopCursor(JStopCursor.afterEventTime(timestamp))
@staticmethod
def at_publish_time(timestamp: int) -> 'StopCursor':
"""
Stop consuming when message publishTime is greater than or equals the specified timestamp.
"""
JStopCursor = get_gateway().jvm \
.org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor
return StopCursor(JStopCursor.atPublishTime(timestamp))
@staticmethod
def after_publish_time(timestamp: int) -> 'StopCursor':
"""
Stop consuming when message publishTime is greater than the specified timestamp.
"""
JStopCursor = get_gateway().jvm \
.org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor
return StopCursor(JStopCursor.afterPublishTime(timestamp))
@staticmethod
def at_message_id(message_id: bytes) -> 'StopCursor':
"""
Stop consuming when the messageId is equal or greater than the specified messageId.
Message that is equal to the specified messageId will not be consumed. User could call
pulsar Python library serialize method to cover messageId bytes.
Example:
::
>>> from pulsar import MessageId
>>> message_id_bytes = MessageId().serialize()
>>> stop_cursor = StopCursor.at_message_id(message_id_bytes)
"""
JStopCursor = get_gateway().jvm \
.org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor
j_message_id = get_gateway().jvm.org.apache.pulsar.client.api.MessageId \
.fromByteArray(message_id)
return StopCursor(JStopCursor.atMessageId(j_message_id))
@staticmethod
def after_message_id(message_id: bytes) -> 'StopCursor':
"""
Stop consuming when the messageId is greater than the specified messageId. Message that is
equal to the specified messageId will be consumed. User could call pulsar Python library
serialize method to cover messageId bytes.
Example:
::
>>> from pulsar import MessageId
>>> message_id_bytes = MessageId().serialize()
>>> stop_cursor = StopCursor.after_message_id(message_id_bytes)
"""
JStopCursor = get_gateway().jvm \
.org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor
j_message_id = get_gateway().jvm.org.apache.pulsar.client.api.MessageId \
.fromByteArray(message_id)
return StopCursor(JStopCursor.afterMessageId(j_message_id))
[docs]class PulsarSource(Source):
"""
The Source implementation of Pulsar. Please use a PulsarSourceBuilder to construct a
PulsarSource. The following example shows how to create a PulsarSource emitting records of
String type.
Example:
::
>>> source = PulsarSource() \\
... .builder() \\
... .set_topics([TOPIC1, TOPIC2]) \\
... .set_service_url(get_service_url()) \\
... .set_admin_url(get_admin_url()) \\
... .set_subscription_name("test") \\
... .set_deserialization_schema(
... PulsarDeserializationSchema.flink_schema(SimpleStringSchema())) \\
... .set_bounded_stop_cursor(StopCursor.default_stop_cursor()) \\
... .build()
See PulsarSourceBuilder for more details.
"""
def __init__(self, j_pulsar_source):
super(PulsarSource, self).__init__(source=j_pulsar_source)
@staticmethod
def builder() -> 'PulsarSourceBuilder':
"""
Get a PulsarSourceBuilder to builder a PulsarSource.
"""
return PulsarSourceBuilder()
[docs]class PulsarSourceBuilder(object):
"""
The builder class for PulsarSource to make it easier for the users to construct a PulsarSource.
The following example shows the minimum setup to create a PulsarSource that reads the String
values from a Pulsar topic.
Example:
::
>>> source = PulsarSource() \\
... .builder() \\
... .set_service_url(PULSAR_BROKER_URL) \\
... .set_admin_url(PULSAR_BROKER_HTTP_URL) \\
... .set_subscription_name("flink-source-1") \\
... .set_topics([TOPIC1, TOPIC2]) \\
... .set_deserialization_schema(
... PulsarDeserializationSchema.flink_schema(SimpleStringSchema())) \\
... .build()
The service url, admin url, subscription name, topics to consume, and the record deserializer
are required fields that must be set.
To specify the starting position of PulsarSource, one can call set_start_cursor(StartCursor).
By default the PulsarSource runs in an Boundedness.CONTINUOUS_UNBOUNDED mode and never stop
until the Flink job is canceled or fails. To let the PulsarSource run in
Boundedness.CONTINUOUS_UNBOUNDED but stops at some given offsets, one can call
set_unbounded_stop_cursor(StopCursor).
For example the following PulsarSource stops after it consumes up to a event time when the
Flink started.
Example:
::
>>> source = PulsarSource() \\
... .builder() \\
... .set_service_url(PULSAR_BROKER_URL) \\
... .set_admin_url(PULSAR_BROKER_HTTP_URL) \\
... .set_subscription_name("flink-source-1") \\
... .set_topics([TOPIC1, TOPIC2]) \\
... .set_deserialization_schema(
... PulsarDeserializationSchema.flink_schema(SimpleStringSchema())) \\
... .set_bounded_stop_cursor(StopCursor.at_publish_time(int(time.time() * 1000)))
... .build()
"""
def __init__(self):
JPulsarSource = \
get_gateway().jvm.org.apache.flink.connector.pulsar.source.PulsarSource
self._j_pulsar_source_builder = JPulsarSource.builder()
def set_admin_url(self, admin_url: str) -> 'PulsarSourceBuilder':
"""
Sets the admin endpoint for the PulsarAdmin of the PulsarSource.
"""
self._j_pulsar_source_builder.setAdminUrl(admin_url)
return self
def set_service_url(self, service_url: str) -> 'PulsarSourceBuilder':
"""
Sets the server's link for the PulsarConsumer of the PulsarSource.
"""
self._j_pulsar_source_builder.setServiceUrl(service_url)
return self
def set_subscription_name(self, subscription_name: str) -> 'PulsarSourceBuilder':
"""
Sets the name for this pulsar subscription.
"""
self._j_pulsar_source_builder.setSubscriptionName(subscription_name)
return self
def set_subscription_type(self, subscription_type: SubscriptionType) -> 'PulsarSourceBuilder':
"""
SubscriptionType is the consuming behavior for pulsar, we would generator different split
by the given subscription type. Please take some time to consider which subscription type
matches your application best. Default is SubscriptionType.Shared.
"""
self._j_pulsar_source_builder.setSubscriptionType(
subscription_type._to_j_subscription_type())
return self
def set_topics(self, topics: Union[str, List[str]]) -> 'PulsarSourceBuilder':
"""
Set a pulsar topic list for flink source. Some topic may not exist currently, consuming this
non-existed topic wouldn't throw any exception. But the best solution is just consuming by
using a topic regex. You can set topics once either with setTopics or setTopicPattern in
this builder.
"""
if not isinstance(topics, list):
topics = [topics]
self._j_pulsar_source_builder.setTopics(topics)
return self
def set_topics_pattern(self, topics_pattern: str) -> 'PulsarSourceBuilder':
"""
Set a topic pattern to consume from the java regex str. You can set topics once either with
set_topics or set_topic_pattern in this builder.
"""
warnings.warn("set_topics_pattern is deprecated. Use set_topic_pattern instead.",
DeprecationWarning, stacklevel=2)
self._j_pulsar_source_builder.setTopicPattern(topics_pattern)
return self
def set_topic_pattern(self, topic_pattern: str) -> 'PulsarSourceBuilder':
"""
Set a topic pattern to consume from the java regex str. You can set topics once either with
set_topics or set_topic_pattern in this builder.
"""
self._j_pulsar_source_builder.setTopicPattern(topic_pattern)
return self
def set_start_cursor(self, start_cursor: StartCursor) -> 'PulsarSourceBuilder':
"""
Specify from which offsets the PulsarSource should start consume from by providing an
StartCursor.
"""
self._j_pulsar_source_builder.setStartCursor(start_cursor._j_start_cursor)
return self
def set_unbounded_stop_cursor(self, stop_cursor: StopCursor) -> 'PulsarSourceBuilder':
"""
By default the PulsarSource is set to run in Boundedness.CONTINUOUS_UNBOUNDED manner and
thus never stops until the Flink job fails or is canceled. To let the PulsarSource run as a
streaming source but still stops at some point, one can set an StopCursor to specify the
stopping offsets for each partition. When all the partitions have reached their stopping
offsets, the PulsarSource will then exit.
This method is different from set_bounded_stop_cursor(StopCursor) that after setting the
stopping offsets with this method, PulsarSource.getBoundedness() will still return
Boundedness.CONTINUOUS_UNBOUNDED even though it will stop at the stopping offsets specified
by the stopping offsets StopCursor.
"""
self._j_pulsar_source_builder.setUnboundedStopCursor(stop_cursor._j_stop_cursor)
return self
def set_bounded_stop_cursor(self, stop_cursor: StopCursor) -> 'PulsarSourceBuilder':
"""
By default the PulsarSource is set to run in Boundedness.CONTINUOUS_UNBOUNDED manner and
thus never stops until the Flink job fails or is canceled. To let the PulsarSource run in
Boundedness.BOUNDED manner and stops at some point, one can set an StopCursor to specify
the stopping offsets for each partition. When all the partitions have reached their stopping
offsets, the PulsarSource will then exit.
This method is different from set_unbounded_stop_cursor(StopCursor) that after setting the
stopping offsets with this method, PulsarSource.getBoundedness() will return
Boundedness.BOUNDED instead of Boundedness.CONTINUOUS_UNBOUNDED.
"""
self._j_pulsar_source_builder.setBoundedStopCursor(stop_cursor._j_stop_cursor)
return self
def set_deserialization_schema(self,
pulsar_deserialization_schema: PulsarDeserializationSchema) \
-> 'PulsarSourceBuilder':
"""
DeserializationSchema is required for getting the Schema for deserialize message from
pulsar and getting the TypeInformation for message serialization in flink.
We have defined a set of implementations, using PulsarDeserializationSchema#flink_type_info
or PulsarDeserializationSchema#flink_schema for creating the desired schema.
"""
self._j_pulsar_source_builder.setDeserializationSchema(
pulsar_deserialization_schema._j_pulsar_deserialization_schema)
return self
def set_config(self, key: Union[str, ConfigOption], value) -> 'PulsarSourceBuilder':
"""
Set arbitrary properties for the PulsarSource and PulsarConsumer. The valid keys can be
found in PulsarSourceOptions and PulsarOptions.
Make sure the option could be set only once or with same value.
"""
if isinstance(key, ConfigOption):
warnings.warn("set_config(key: ConfigOption, value) is deprecated. "
"Use set_config(key: str, value) instead.",
DeprecationWarning, stacklevel=2)
j_config_option = key._j_config_option
else:
j_config_option = \
ConfigOptions.key(key).string_type().no_default_value()._j_config_option
self._j_pulsar_source_builder.setConfig(j_config_option, value)
return self
def set_config_with_dict(self, config: Dict) -> 'PulsarSourceBuilder':
"""
Set arbitrary properties for the PulsarSource and PulsarConsumer. The valid keys can be
found in PulsarSourceOptions and PulsarOptions.
"""
warnings.warn("set_config_with_dict is deprecated. Use set_properties instead.",
DeprecationWarning, stacklevel=2)
self.set_properties(config)
return self
def set_properties(self, config: Dict) -> 'PulsarSourceBuilder':
"""
Set arbitrary properties for the PulsarSource and PulsarConsumer. The valid keys can be
found in PulsarSourceOptions and PulsarOptions.
"""
JConfiguration = get_gateway().jvm.org.apache.flink.configuration.Configuration
self._j_pulsar_source_builder.setConfig(JConfiguration.fromMap(config))
return self
def build(self) -> 'PulsarSource':
"""
Build the PulsarSource.
"""
return PulsarSource(self._j_pulsar_source_builder.build())
# ---- PulsarSink ----
[docs]class PulsarSerializationSchema(object):
"""
The serialization schema for how to serialize records into Pulsar.
"""
def __init__(self, _j_pulsar_serialization_schema):
self._j_pulsar_serialization_schema = _j_pulsar_serialization_schema
@staticmethod
def flink_schema(serialization_schema: SerializationSchema) \
-> 'PulsarSerializationSchema':
"""
Create a PulsarSerializationSchema by using the flink's SerializationSchema. It would
serialize the message into byte array and send it to Pulsar with Schema#BYTES.
"""
JPulsarSerializationSchema = get_gateway().jvm.org.apache.flink \
.connector.pulsar.sink.writer.serializer.PulsarSerializationSchema
_j_pulsar_serialization_schema = JPulsarSerializationSchema.flinkSchema(
serialization_schema._j_serialization_schema)
return PulsarSerializationSchema(_j_pulsar_serialization_schema)
[docs]class TopicRoutingMode(Enum):
"""
The routing policy for choosing the desired topic by the given message.
:data: `ROUND_ROBIN`:
The producer will publish messages across all partitions in a round-robin fashion to achieve
maximum throughput. Please note that round-robin is not done per individual message but
rather it's set to the same boundary of batching delay, to ensure batching is effective.
:data: `MESSAGE_KEY_HASH`:
If no key is provided, The partitioned producer will randomly pick one single topic partition
and publish all the messages into that partition. If a key is provided on the message, the
partitioned producer will hash the key and assign the message to a particular partition.
:data: `CUSTOM`:
Use custom topic router implementation that will be called to determine the partition for a
particular message.
"""
ROUND_ROBIN = 0
MESSAGE_KEY_HASH = 1
CUSTOM = 2
def _to_j_topic_routing_mode(self):
JTopicRoutingMode = get_gateway().jvm \
.org.apache.flink.connector.pulsar.sink.writer.router.TopicRoutingMode
return getattr(JTopicRoutingMode, self.name)
[docs]class MessageDelayer(object):
"""
A delayer for Pulsar broker passing the sent message to the downstream consumer. This is only
works in :data:`SubscriptionType.Shared` subscription.
Read delayed message delivery
https://pulsar.apache.org/docs/en/next/concepts-messaging/#delayed-message-delivery for better
understanding this feature.
"""
def __init__(self, _j_message_delayer):
self._j_message_delayer = _j_message_delayer
@staticmethod
def never() -> 'MessageDelayer':
"""
All the messages should be consumed immediately.
"""
JMessageDelayer = get_gateway().jvm \
.org.apache.flink.connector.pulsar.sink.writer.delayer.MessageDelayer
return MessageDelayer(JMessageDelayer.never())
@staticmethod
def fixed(duration: Duration) -> 'MessageDelayer':
"""
All the messages should be consumed in a fixed duration.
"""
JMessageDelayer = get_gateway().jvm \
.org.apache.flink.connector.pulsar.sink.writer.delayer.MessageDelayer
return MessageDelayer(JMessageDelayer.fixed(duration._j_duration))
[docs]class PulsarSink(Sink):
"""
The Sink implementation of Pulsar. Please use a PulsarSinkBuilder to construct a
PulsarSink. The following example shows how to create a PulsarSink receiving records of
String type.
Example:
::
>>> sink = PulsarSink.builder() \\
... .set_service_url(PULSAR_BROKER_URL) \\
... .set_admin_url(PULSAR_BROKER_HTTP_URL) \\
... .set_topics(topic) \\
... .set_serialization_schema(
... PulsarSerializationSchema.flink_schema(SimpleStringSchema())) \\
... .build()
The sink supports all delivery guarantees described by DeliveryGuarantee.
DeliveryGuarantee#NONE does not provide any guarantees: messages may be lost in
case of issues on the Pulsar broker and messages may be duplicated in case of a Flink
failure.
DeliveryGuarantee#AT_LEAST_ONCE the sink will wait for all outstanding records in
the Pulsar buffers to be acknowledged by the Pulsar producer on a checkpoint. No messages
will be lost in case of any issue with the Pulsar brokers but messages may be duplicated
when Flink restarts.
DeliveryGuarantee#EXACTLY_ONCE: In this mode the PulsarSink will write all messages
in a Pulsar transaction that will be committed to Pulsar on a checkpoint. Thus, no
duplicates will be seen in case of a Flink restart. However, this delays record writing
effectively until a checkpoint is written, so adjust the checkpoint duration accordingly.
Additionally, it is highly recommended to tweak Pulsar transaction timeout (link) >>
maximum checkpoint duration + maximum restart duration or data loss may happen when Pulsar
expires an uncommitted transaction.
See PulsarSinkBuilder for more details.
"""
def __init__(self, j_pulsar_sink):
super(PulsarSink, self).__init__(sink=j_pulsar_sink)
@staticmethod
def builder() -> 'PulsarSinkBuilder':
"""
Get a PulsarSinkBuilder to builder a PulsarSink.
"""
return PulsarSinkBuilder()
[docs]class PulsarSinkBuilder(object):
"""
The builder class for PulsarSink to make it easier for the users to construct a PulsarSink.
The following example shows the minimum setup to create a PulsarSink that reads the String
values from a Pulsar topic.
Example:
::
>>> sink = PulsarSink.builder() \\
... .set_service_url(PULSAR_BROKER_URL) \\
... .set_admin_url(PULSAR_BROKER_HTTP_URL) \\
... .set_topics([TOPIC1, TOPIC2]) \\
... .set_serialization_schema(
... PulsarSerializationSchema.flink_schema(SimpleStringSchema())) \\
... .build()
The service url, admin url, and the record serializer are required fields that must be set. If
you don't set the topics, make sure you have provided a custom TopicRouter. Otherwise,
you must provide the topics to produce.
To specify the delivery guarantees of PulsarSink, one can call
#setDeliveryGuarantee(DeliveryGuarantee). The default value of the delivery guarantee is
DeliveryGuarantee#NONE, and it wouldn't promise the consistence when write the message into
Pulsar.
Example:
::
>>> sink = PulsarSink.builder() \\
... .set_service_url(PULSAR_BROKER_URL) \\
... .set_admin_url(PULSAR_BROKER_HTTP_URL) \\
... .set_topics([TOPIC1, TOPIC2]) \\
... .set_serialization_schema(
... PulsarSerializationSchema.flink_schema(SimpleStringSchema())) \\
... .set_delivery_guarantee(DeliveryGuarantee.EXACTLY_ONCE)
... .build()
"""
def __init__(self):
JPulsarSink = get_gateway().jvm.org.apache.flink.connector.pulsar.sink.PulsarSink
self._j_pulsar_sink_builder = JPulsarSink.builder()
def set_admin_url(self, admin_url: str) -> 'PulsarSinkBuilder':
"""
Sets the admin endpoint for the PulsarAdmin of the PulsarSink.
"""
self._j_pulsar_sink_builder.setAdminUrl(admin_url)
return self
def set_service_url(self, service_url: str) -> 'PulsarSinkBuilder':
"""
Sets the server's link for the PulsarProducer of the PulsarSink.
"""
self._j_pulsar_sink_builder.setServiceUrl(service_url)
return self
def set_producer_name(self, producer_name: str) -> 'PulsarSinkBuilder':
"""
The producer name is informative, and it can be used to identify a particular producer
instance from the topic stats.
"""
self._j_pulsar_sink_builder.setProducerName(producer_name)
return self
def set_topics(self, topics: Union[str, List[str]]) -> 'PulsarSinkBuilder':
"""
Set a pulsar topic list for flink sink. Some topic may not exist currently, write to this
non-existed topic wouldn't throw any exception.
"""
if not isinstance(topics, list):
topics = [topics]
self._j_pulsar_sink_builder.setTopics(topics)
return self
def set_delivery_guarantee(self, delivery_guarantee: DeliveryGuarantee) -> 'PulsarSinkBuilder':
"""
Sets the wanted the DeliveryGuarantee. The default delivery guarantee is
DeliveryGuarantee#NONE.
"""
self._j_pulsar_sink_builder.setDeliveryGuarantee(
delivery_guarantee._to_j_delivery_guarantee())
return self
def set_topic_routing_mode(self, topic_routing_mode: TopicRoutingMode) -> 'PulsarSinkBuilder':
"""
Set a routing mode for choosing right topic partition to send messages.
"""
self._j_pulsar_sink_builder.setTopicRoutingMode(
topic_routing_mode._to_j_topic_routing_mode())
return self
def set_topic_router(self, topic_router_class_name: str) -> 'PulsarSinkBuilder':
"""
Use a custom topic router instead predefine topic routing.
"""
j_topic_router = load_java_class(topic_router_class_name).newInstance()
self._j_pulsar_sink_builder.setTopicRouter(j_topic_router)
return self
def set_serialization_schema(self, pulsar_serialization_schema: PulsarSerializationSchema) \
-> 'PulsarSinkBuilder':
"""
Sets the PulsarSerializationSchema that transforms incoming records to bytes.
"""
self._j_pulsar_sink_builder.setSerializationSchema(
pulsar_serialization_schema._j_pulsar_serialization_schema)
return self
def delay_sending_message(self, message_delayer: MessageDelayer) -> 'PulsarSinkBuilder':
"""
Set a message delayer for enable Pulsar message delay delivery.
"""
self._j_pulsar_sink_builder.delaySendingMessage(message_delayer._j_message_delayer)
return self
def set_config(self, key: str, value) -> 'PulsarSinkBuilder':
"""
Set an arbitrary property for the PulsarSink and Pulsar Producer. The valid keys can be
found in PulsarSinkOptions and PulsarOptions.
Make sure the option could be set only once or with same value.
"""
j_config_option = ConfigOptions.key(key).string_type().no_default_value()._j_config_option
self._j_pulsar_sink_builder.setConfig(j_config_option, value)
return self
def set_properties(self, config: Dict) -> 'PulsarSinkBuilder':
"""
Set an arbitrary property for the PulsarSink and Pulsar Producer. The valid keys can be
found in PulsarSinkOptions and PulsarOptions.
"""
JConfiguration = get_gateway().jvm.org.apache.flink.configuration.Configuration
self._j_pulsar_sink_builder.setConfig(JConfiguration.fromMap(config))
return self
def build(self) -> 'PulsarSink':
"""
Build the PulsarSink.
"""
return PulsarSink(self._j_pulsar_sink_builder.build())