################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
import warnings
from enum import Enum
from typing import Dict, Union, List, Optional
from pyflink.common import DeserializationSchema, ConfigOptions, Duration, SerializationSchema, \
ConfigOption
from pyflink.datastream.connectors import Source, Sink, DeliveryGuarantee
from pyflink.java_gateway import get_gateway
from pyflink.util.java_utils import load_java_class
__all__ = [
'PulsarSource',
'PulsarSourceBuilder',
'StartCursor',
'StopCursor',
'RangeGenerator',
'PulsarSink',
'PulsarSinkBuilder',
'MessageDelayer',
'TopicRoutingMode'
]
# ---- PulsarSource ----
[docs]class StartCursor(object):
"""
A factory class for users to specify the start position of a pulsar subscription.
Since it would be serialized into split.
The implementation for this interface should be well considered.
I don't recommend adding extra internal state for this implementation.
This class would be used only for SubscriptionType.Exclusive and SubscriptionType.Failover.
"""
def __init__(self, _j_start_cursor):
self._j_start_cursor = _j_start_cursor
@staticmethod
def default_start_cursor() -> 'StartCursor':
return StartCursor.earliest()
@staticmethod
def earliest() -> 'StartCursor':
JStartCursor = get_gateway().jvm \
.org.apache.flink.connector.pulsar.source.enumerator.cursor.StartCursor
return StartCursor(JStartCursor.earliest())
@staticmethod
def latest() -> 'StartCursor':
JStartCursor = get_gateway().jvm \
.org.apache.flink.connector.pulsar.source.enumerator.cursor.StartCursor
return StartCursor(JStartCursor.latest())
@staticmethod
def from_message_id(message_id: bytes, inclusive: bool = True) -> 'StartCursor':
"""
Find the available message id and start consuming from it. User could call pulsar Python
library serialize method to cover messageId bytes.
Example:
::
>>> from pulsar import MessageId
>>> message_id_bytes = MessageId().serialize()
>>> start_cursor = StartCursor.from_message_id(message_id_bytes)
"""
JStartCursor = get_gateway().jvm \
.org.apache.flink.connector.pulsar.source.enumerator.cursor.StartCursor
j_message_id = get_gateway().jvm.org.apache.pulsar.client.api.MessageId \
.fromByteArray(message_id)
return StartCursor(JStartCursor.fromMessageId(j_message_id, inclusive))
@staticmethod
def from_publish_time(timestamp: int) -> 'StartCursor':
"""
Seek the start position by using message publish time.
"""
JStartCursor = get_gateway().jvm \
.org.apache.flink.connector.pulsar.source.enumerator.cursor.StartCursor
return StartCursor(JStartCursor.fromPublishTime(timestamp))
[docs]class StopCursor(object):
"""
A factory class for users to specify the stop position of a pulsar subscription. Since it would
be serialized into split. The implementation for this interface should be well considered. I
don't recommend adding extra internal state for this implementation.
"""
def __init__(self, _j_stop_cursor):
self._j_stop_cursor = _j_stop_cursor
@staticmethod
def default_stop_cursor() -> 'StopCursor':
return StopCursor.never()
@staticmethod
def never() -> 'StopCursor':
JStopCursor = get_gateway().jvm \
.org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor
return StopCursor(JStopCursor.never())
@staticmethod
def latest() -> 'StopCursor':
JStopCursor = get_gateway().jvm \
.org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor
return StopCursor(JStopCursor.latest())
@staticmethod
def at_event_time(timestamp: int) -> 'StopCursor':
"""
Stop consuming when message eventTime is greater than or equals the specified timestamp.
"""
JStopCursor = get_gateway().jvm \
.org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor
return StopCursor(JStopCursor.atEventTime(timestamp))
@staticmethod
def after_event_time(timestamp: int) -> 'StopCursor':
"""
Stop consuming when message eventTime is greater than the specified timestamp.
"""
JStopCursor = get_gateway().jvm \
.org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor
return StopCursor(JStopCursor.afterEventTime(timestamp))
@staticmethod
def at_publish_time(timestamp: int) -> 'StopCursor':
"""
Stop consuming when message publishTime is greater than or equals the specified timestamp.
"""
JStopCursor = get_gateway().jvm \
.org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor
return StopCursor(JStopCursor.atPublishTime(timestamp))
@staticmethod
def after_publish_time(timestamp: int) -> 'StopCursor':
"""
Stop consuming when message publishTime is greater than the specified timestamp.
"""
JStopCursor = get_gateway().jvm \
.org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor
return StopCursor(JStopCursor.afterPublishTime(timestamp))
@staticmethod
def at_message_id(message_id: bytes) -> 'StopCursor':
"""
Stop consuming when the messageId is equal or greater than the specified messageId.
Message that is equal to the specified messageId will not be consumed. User could call
pulsar Python library serialize method to cover messageId bytes.
Example:
::
>>> from pulsar import MessageId
>>> message_id_bytes = MessageId().serialize()
>>> stop_cursor = StopCursor.at_message_id(message_id_bytes)
"""
JStopCursor = get_gateway().jvm \
.org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor
j_message_id = get_gateway().jvm.org.apache.pulsar.client.api.MessageId \
.fromByteArray(message_id)
return StopCursor(JStopCursor.atMessageId(j_message_id))
@staticmethod
def after_message_id(message_id: bytes) -> 'StopCursor':
"""
Stop consuming when the messageId is greater than the specified messageId. Message that is
equal to the specified messageId will be consumed. User could call pulsar Python library
serialize method to cover messageId bytes.
Example:
::
>>> from pulsar import MessageId
>>> message_id_bytes = MessageId().serialize()
>>> stop_cursor = StopCursor.after_message_id(message_id_bytes)
"""
JStopCursor = get_gateway().jvm \
.org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor
j_message_id = get_gateway().jvm.org.apache.pulsar.client.api.MessageId \
.fromByteArray(message_id)
return StopCursor(JStopCursor.afterMessageId(j_message_id))
[docs]class RangeGenerator(object):
"""
A generator for generating the TopicRange for given topic. It was used for pulsar's
SubscriptionType#Key_Shared mode. TopicRange would be used in KeySharedPolicy for different
pulsar source readers.
If you implement this interface, make sure that each TopicRange would be assigned to a
specified source reader. Since flink parallelism is provided, make sure the pulsar message key's
hashcode is evenly distributed among these topic ranges.
"""
def __init__(self, j_range_generator):
self._j_range_generator = j_range_generator
@staticmethod
def full() -> 'RangeGenerator':
"""
Default implementation for SubscriptionType#Shared, SubscriptionType#Failover and
SubscriptionType#Exclusive subscription.
"""
JFullRangeGenerator = get_gateway().jvm \
.org.apache.flink.connector.pulsar.source.enumerator.topic.range.FullRangeGenerator
return RangeGenerator(JFullRangeGenerator())
@staticmethod
def fixed_key(support_null_key: bool = False,
keys: Optional[Union[str, List[str]]] = None,
key_bytes: Optional[bytes] = None,
ordering_key_bytes: Optional[bytes] = None) -> 'RangeGenerator':
"""
Pulsar didn't expose the key hash range method. We have to provide an implementation for
end-user. You can add the keys you want to consume, no need to provide any hash ranges.
Since the key's hash isn't specified to only one key. The consuming results may contain the
messages with different keys comparing the keys you have defined in this range generator.
Remember to use flink's DataStream.filter() method.
:param support_null_key: Some Message in Pulsar may not have Message#getOrderingKey() or
Message#getKey(), use this method for supporting consuming such
messages.
:param keys: If you set the message key by using PulsarMessageBuilder#key(String) or
TypedMessageBuilder#key(String), use this method for supporting consuming such
messages.
:param key_bytes: If you set the message key by using TypedMessageBuilder#keyBytes(byte[]),
use this method for supporting consuming such messages.
:param ordering_key_bytes: Pulsar's ordering key is prior to the message key. If you set
the ordering key by using
PulsarMessageBuilder#orderingKey(byte[]) or
TypedMessageBuilder#orderingKey(byte[]), use this method for
supporting consuming such messages.
* messages.
"""
JFixedKeysRangeGenerator = get_gateway().jvm \
.org.apache.flink.connector.pulsar.source.enumerator.topic.range.FixedKeysRangeGenerator
j_range_generator_builder = JFixedKeysRangeGenerator.builder()
if support_null_key:
j_range_generator_builder.supportNullKey()
if keys is not None:
if isinstance(keys, str):
j_range_generator_builder.key(keys)
else:
for key in keys:
j_range_generator_builder.key(key)
if key_bytes is not None:
j_range_generator_builder.keyBytes(key_bytes)
if ordering_key_bytes is not None:
j_range_generator_builder.orderingKey(ordering_key_bytes)
return RangeGenerator(j_range_generator_builder.build())
[docs]class PulsarSource(Source):
"""
The Source implementation of Pulsar. Please use a PulsarSourceBuilder to construct a
PulsarSource. The following example shows how to create a PulsarSource emitting records of
String type.
Example:
::
>>> source = PulsarSource() \\
... .builder() \\
... .set_topics([TOPIC1, TOPIC2]) \\
... .set_service_url(get_service_url()) \\
... .set_admin_url(get_admin_url()) \\
... .set_subscription_name("test") \\
... .set_deserialization_schema(SimpleStringSchema()) \\
... .set_bounded_stop_cursor(StopCursor.default_stop_cursor()) \\
... .build()
See PulsarSourceBuilder for more details.
"""
def __init__(self, j_pulsar_source):
super(PulsarSource, self).__init__(source=j_pulsar_source)
@staticmethod
def builder() -> 'PulsarSourceBuilder':
"""
Get a PulsarSourceBuilder to builder a PulsarSource.
"""
return PulsarSourceBuilder()
[docs]class PulsarSourceBuilder(object):
"""
The builder class for PulsarSource to make it easier for the users to construct a PulsarSource.
The following example shows the minimum setup to create a PulsarSource that reads the String
values from a Pulsar topic.
Example:
::
>>> source = PulsarSource() \\
... .builder() \\
... .set_service_url(PULSAR_BROKER_URL) \\
... .set_admin_url(PULSAR_BROKER_HTTP_URL) \\
... .set_subscription_name("flink-source-1") \\
... .set_topics([TOPIC1, TOPIC2]) \\
... .set_deserialization_schema(SimpleStringSchema()) \\
... .build()
The service url, admin url, subscription name, topics to consume, and the record deserializer
are required fields that must be set.
To specify the starting position of PulsarSource, one can call set_start_cursor(StartCursor).
By default the PulsarSource runs in an Boundedness.CONTINUOUS_UNBOUNDED mode and never stop
until the Flink job is canceled or fails. To let the PulsarSource run in
Boundedness.CONTINUOUS_UNBOUNDED but stops at some given offsets, one can call
set_unbounded_stop_cursor(StopCursor).
For example the following PulsarSource stops after it consumes up to a event time when the
Flink started.
Example:
::
>>> source = PulsarSource() \\
... .builder() \\
... .set_service_url(PULSAR_BROKER_URL) \\
... .set_admin_url(PULSAR_BROKER_HTTP_URL) \\
... .set_subscription_name("flink-source-1") \\
... .set_topics([TOPIC1, TOPIC2]) \\
... .set_deserialization_schema(SimpleStringSchema()) \\
... .set_bounded_stop_cursor(StopCursor.at_publish_time(int(time.time() * 1000)))
... .build()
"""
def __init__(self):
JPulsarSource = \
get_gateway().jvm.org.apache.flink.connector.pulsar.source.PulsarSource
self._j_pulsar_source_builder = JPulsarSource.builder()
def set_admin_url(self, admin_url: str) -> 'PulsarSourceBuilder':
"""
Sets the admin endpoint for the PulsarAdmin of the PulsarSource.
"""
self._j_pulsar_source_builder.setAdminUrl(admin_url)
return self
def set_service_url(self, service_url: str) -> 'PulsarSourceBuilder':
"""
Sets the server's link for the PulsarConsumer of the PulsarSource.
"""
self._j_pulsar_source_builder.setServiceUrl(service_url)
return self
def set_subscription_name(self, subscription_name: str) -> 'PulsarSourceBuilder':
"""
Sets the name for this pulsar subscription.
"""
self._j_pulsar_source_builder.setSubscriptionName(subscription_name)
return self
def set_topics(self, topics: Union[str, List[str]]) -> 'PulsarSourceBuilder':
"""
Set a pulsar topic list for flink source. Some topic may not exist currently, consuming this
non-existed topic wouldn't throw any exception. But the best solution is just consuming by
using a topic regex. You can set topics once either with setTopics or setTopicPattern in
this builder.
"""
if not isinstance(topics, list):
topics = [topics]
self._j_pulsar_source_builder.setTopics(topics)
return self
def set_topic_pattern(self, topic_pattern: str) -> 'PulsarSourceBuilder':
"""
Set a topic pattern to consume from the java regex str. You can set topics once either with
set_topics or set_topic_pattern in this builder.
"""
self._j_pulsar_source_builder.setTopicPattern(topic_pattern)
return self
def set_consumer_name(self, consumer_name: str) -> 'PulsarSourceBuilder':
"""
The consumer name is informative, and it can be used to identify a particular consumer
instance from the topic stats.
.. versionadded:: 1.17.2
"""
self._j_pulsar_source_builder.setConsumerName(consumer_name)
return self
def set_range_generator(self, range_generator: RangeGenerator) -> 'PulsarSourceBuilder':
"""
Set a topic range generator for consuming a sub set of keys.
:param range_generator: A generator which would generate a set of TopicRange for given
topic.
.. versionadded:: 1.17.2
"""
self._j_pulsar_source_builder.setRangeGenerator(range_generator._j_range_generator)
return self
def set_start_cursor(self, start_cursor: StartCursor) -> 'PulsarSourceBuilder':
"""
Specify from which offsets the PulsarSource should start consume from by providing an
StartCursor.
"""
self._j_pulsar_source_builder.setStartCursor(start_cursor._j_start_cursor)
return self
def set_unbounded_stop_cursor(self, stop_cursor: StopCursor) -> 'PulsarSourceBuilder':
"""
By default the PulsarSource is set to run in Boundedness.CONTINUOUS_UNBOUNDED manner and
thus never stops until the Flink job fails or is canceled. To let the PulsarSource run as a
streaming source but still stops at some point, one can set an StopCursor to specify the
stopping offsets for each partition. When all the partitions have reached their stopping
offsets, the PulsarSource will then exit.
This method is different from set_bounded_stop_cursor(StopCursor) that after setting the
stopping offsets with this method, PulsarSource.getBoundedness() will still return
Boundedness.CONTINUOUS_UNBOUNDED even though it will stop at the stopping offsets specified
by the stopping offsets StopCursor.
"""
self._j_pulsar_source_builder.setUnboundedStopCursor(stop_cursor._j_stop_cursor)
return self
def set_bounded_stop_cursor(self, stop_cursor: StopCursor) -> 'PulsarSourceBuilder':
"""
By default the PulsarSource is set to run in Boundedness.CONTINUOUS_UNBOUNDED manner and
thus never stops until the Flink job fails or is canceled. To let the PulsarSource run in
Boundedness.BOUNDED manner and stops at some point, one can set an StopCursor to specify
the stopping offsets for each partition. When all the partitions have reached their stopping
offsets, the PulsarSource will then exit.
This method is different from set_unbounded_stop_cursor(StopCursor) that after setting the
stopping offsets with this method, PulsarSource.getBoundedness() will return
Boundedness.BOUNDED instead of Boundedness.CONTINUOUS_UNBOUNDED.
"""
self._j_pulsar_source_builder.setBoundedStopCursor(stop_cursor._j_stop_cursor)
return self
def set_deserialization_schema(self, deserialization_schema: DeserializationSchema) \
-> 'PulsarSourceBuilder':
"""
Sets the :class:`~pyflink.common.serialization.DeserializationSchema` for deserializing the
value of Pulsars message.
:param deserialization_schema: the :class:`DeserializationSchema` to use for
deserialization.
:return: this PulsarSourceBuilder.
"""
self._j_pulsar_source_builder.setDeserializationSchema(
deserialization_schema._j_deserialization_schema)
return self
def set_authentication(self,
auth_plugin_class_name: str,
auth_params_string: Union[str, Dict[str, str]]) \
-> 'PulsarSourceBuilder':
"""
Configure the authentication provider to use in the Pulsar client instance.
:param auth_plugin_class_name: Name of the Authentication-Plugin you want to use.
:param auth_params_string: String which represents parameters for the Authentication-Plugin,
e.g., "key1:val1,key2:val2".
.. versionadded:: 1.17.2
"""
if isinstance(auth_params_string, str):
self._j_pulsar_source_builder.setAuthentication(
auth_plugin_class_name, auth_params_string)
else:
j_auth_params_map = get_gateway().jvm.java.util.HashMap()
for k, v in auth_params_string.items():
j_auth_params_map.put(k, v)
self._j_pulsar_source_builder.setAuthentication(
auth_plugin_class_name, j_auth_params_map)
return self
def set_config(self, key: Union[str, ConfigOption], value) -> 'PulsarSourceBuilder':
"""
Set arbitrary properties for the PulsarSource and PulsarConsumer. The valid keys can be
found in PulsarSourceOptions and PulsarOptions.
Make sure the option could be set only once or with same value.
"""
if isinstance(key, ConfigOption):
warnings.warn("set_config(key: ConfigOption, value) is deprecated. "
"Use set_config(key: str, value) instead.",
DeprecationWarning, stacklevel=2)
j_config_option = key._j_config_option
else:
j_config_option = \
ConfigOptions.key(key).string_type().no_default_value()._j_config_option
self._j_pulsar_source_builder.setConfig(j_config_option, value)
return self
def set_config_with_dict(self, config: Dict) -> 'PulsarSourceBuilder':
"""
Set arbitrary properties for the PulsarSource and PulsarConsumer. The valid keys can be
found in PulsarSourceOptions and PulsarOptions.
"""
warnings.warn("set_config_with_dict is deprecated. Use set_properties instead.",
DeprecationWarning, stacklevel=2)
self.set_properties(config)
return self
def set_properties(self, config: Dict) -> 'PulsarSourceBuilder':
"""
Set arbitrary properties for the PulsarSource and PulsarConsumer. The valid keys can be
found in PulsarSourceOptions and PulsarOptions.
"""
JConfiguration = get_gateway().jvm.org.apache.flink.configuration.Configuration
self._j_pulsar_source_builder.setConfig(JConfiguration.fromMap(config))
return self
def build(self) -> 'PulsarSource':
"""
Build the PulsarSource.
"""
return PulsarSource(self._j_pulsar_source_builder.build())
# ---- PulsarSink ----
[docs]class TopicRoutingMode(Enum):
"""
The routing policy for choosing the desired topic by the given message.
:data: `ROUND_ROBIN`:
The producer will publish messages across all partitions in a round-robin fashion to achieve
maximum throughput. Please note that round-robin is not done per individual message but
rather it's set to the same boundary of batching delay, to ensure batching is effective.
:data: `MESSAGE_KEY_HASH`:
If no key is provided, The partitioned producer will randomly pick one single topic partition
and publish all the messages into that partition. If a key is provided on the message, the
partitioned producer will hash the key and assign the message to a particular partition.
:data: `CUSTOM`:
Use custom topic router implementation that will be called to determine the partition for a
particular message.
"""
ROUND_ROBIN = 0
MESSAGE_KEY_HASH = 1
CUSTOM = 2
def _to_j_topic_routing_mode(self):
JTopicRoutingMode = get_gateway().jvm \
.org.apache.flink.connector.pulsar.sink.writer.router.TopicRoutingMode
return getattr(JTopicRoutingMode, self.name)
[docs]class MessageDelayer(object):
"""
A delayer for Pulsar broker passing the sent message to the downstream consumer. This is only
works in :data:`SubscriptionType.Shared` subscription.
Read delayed message delivery
https://pulsar.apache.org/docs/en/next/concepts-messaging/#delayed-message-delivery for better
understanding this feature.
"""
def __init__(self, _j_message_delayer):
self._j_message_delayer = _j_message_delayer
@staticmethod
def never() -> 'MessageDelayer':
"""
All the messages should be consumed immediately.
"""
JMessageDelayer = get_gateway().jvm \
.org.apache.flink.connector.pulsar.sink.writer.delayer.MessageDelayer
return MessageDelayer(JMessageDelayer.never())
@staticmethod
def fixed(duration: Duration) -> 'MessageDelayer':
"""
All the messages should be consumed in a fixed duration.
"""
JMessageDelayer = get_gateway().jvm \
.org.apache.flink.connector.pulsar.sink.writer.delayer.MessageDelayer
return MessageDelayer(JMessageDelayer.fixed(duration._j_duration))
[docs]class PulsarSink(Sink):
"""
The Sink implementation of Pulsar. Please use a PulsarSinkBuilder to construct a
PulsarSink. The following example shows how to create a PulsarSink receiving records of
String type.
Example:
::
>>> sink = PulsarSink.builder() \\
... .set_service_url(PULSAR_BROKER_URL) \\
... .set_admin_url(PULSAR_BROKER_HTTP_URL) \\
... .set_topics(topic) \\
... .set_serialization_schema(SimpleStringSchema()) \\
... .build()
The sink supports all delivery guarantees described by DeliveryGuarantee.
DeliveryGuarantee#NONE does not provide any guarantees: messages may be lost in
case of issues on the Pulsar broker and messages may be duplicated in case of a Flink
failure.
DeliveryGuarantee#AT_LEAST_ONCE the sink will wait for all outstanding records in
the Pulsar buffers to be acknowledged by the Pulsar producer on a checkpoint. No messages
will be lost in case of any issue with the Pulsar brokers but messages may be duplicated
when Flink restarts.
DeliveryGuarantee#EXACTLY_ONCE: In this mode the PulsarSink will write all messages
in a Pulsar transaction that will be committed to Pulsar on a checkpoint. Thus, no
duplicates will be seen in case of a Flink restart. However, this delays record writing
effectively until a checkpoint is written, so adjust the checkpoint duration accordingly.
Additionally, it is highly recommended to tweak Pulsar transaction timeout (link) >>
maximum checkpoint duration + maximum restart duration or data loss may happen when Pulsar
expires an uncommitted transaction.
See PulsarSinkBuilder for more details.
"""
def __init__(self, j_pulsar_sink):
super(PulsarSink, self).__init__(sink=j_pulsar_sink)
@staticmethod
def builder() -> 'PulsarSinkBuilder':
"""
Get a PulsarSinkBuilder to builder a PulsarSink.
"""
return PulsarSinkBuilder()
[docs]class PulsarSinkBuilder(object):
"""
The builder class for PulsarSink to make it easier for the users to construct a PulsarSink.
The following example shows the minimum setup to create a PulsarSink that reads the String
values from a Pulsar topic.
Example:
::
>>> sink = PulsarSink.builder() \\
... .set_service_url(PULSAR_BROKER_URL) \\
... .set_admin_url(PULSAR_BROKER_HTTP_URL) \\
... .set_topics([TOPIC1, TOPIC2]) \\
... .set_serialization_schema(SimpleStringSchema()) \\
... .build()
The service url, admin url, and the record serializer are required fields that must be set. If
you don't set the topics, make sure you have provided a custom TopicRouter. Otherwise,
you must provide the topics to produce.
To specify the delivery guarantees of PulsarSink, one can call
#setDeliveryGuarantee(DeliveryGuarantee). The default value of the delivery guarantee is
DeliveryGuarantee#NONE, and it wouldn't promise the consistence when write the message into
Pulsar.
Example:
::
>>> sink = PulsarSink.builder() \\
... .set_service_url(PULSAR_BROKER_URL) \\
... .set_admin_url(PULSAR_BROKER_HTTP_URL) \\
... .set_topics([TOPIC1, TOPIC2]) \\
... .set_serialization_schema(SimpleStringSchema()) \\
... .set_delivery_guarantee(DeliveryGuarantee.EXACTLY_ONCE)
... .build()
"""
def __init__(self):
JPulsarSink = get_gateway().jvm.org.apache.flink.connector.pulsar.sink.PulsarSink
self._j_pulsar_sink_builder = JPulsarSink.builder()
def set_admin_url(self, admin_url: str) -> 'PulsarSinkBuilder':
"""
Sets the admin endpoint for the PulsarAdmin of the PulsarSink.
"""
self._j_pulsar_sink_builder.setAdminUrl(admin_url)
return self
def set_service_url(self, service_url: str) -> 'PulsarSinkBuilder':
"""
Sets the server's link for the PulsarProducer of the PulsarSink.
"""
self._j_pulsar_sink_builder.setServiceUrl(service_url)
return self
def set_producer_name(self, producer_name: str) -> 'PulsarSinkBuilder':
"""
The producer name is informative, and it can be used to identify a particular producer
instance from the topic stats.
"""
self._j_pulsar_sink_builder.setProducerName(producer_name)
return self
def set_topics(self, topics: Union[str, List[str]]) -> 'PulsarSinkBuilder':
"""
Set a pulsar topic list for flink sink. Some topic may not exist currently, write to this
non-existed topic wouldn't throw any exception.
"""
if not isinstance(topics, list):
topics = [topics]
self._j_pulsar_sink_builder.setTopics(topics)
return self
def set_delivery_guarantee(self, delivery_guarantee: DeliveryGuarantee) -> 'PulsarSinkBuilder':
"""
Sets the wanted the DeliveryGuarantee. The default delivery guarantee is
DeliveryGuarantee#NONE.
"""
self._j_pulsar_sink_builder.setDeliveryGuarantee(
delivery_guarantee._to_j_delivery_guarantee())
return self
def set_topic_routing_mode(self, topic_routing_mode: TopicRoutingMode) -> 'PulsarSinkBuilder':
"""
Set a routing mode for choosing right topic partition to send messages.
"""
self._j_pulsar_sink_builder.setTopicRoutingMode(
topic_routing_mode._to_j_topic_routing_mode())
return self
def set_topic_router(self, topic_router_class_name: str) -> 'PulsarSinkBuilder':
"""
Use a custom topic router instead predefine topic routing.
"""
j_topic_router = load_java_class(topic_router_class_name).newInstance()
self._j_pulsar_sink_builder.setTopicRouter(j_topic_router)
return self
def set_serialization_schema(self, serialization_schema: SerializationSchema) \
-> 'PulsarSinkBuilder':
"""
Sets the SerializationSchema of the PulsarSinkBuilder.
"""
self._j_pulsar_sink_builder.setSerializationSchema(
serialization_schema._j_serialization_schema)
return self
def set_authentication(self,
auth_plugin_class_name: str,
auth_params_string: Union[str, Dict[str, str]]) \
-> 'PulsarSinkBuilder':
"""
Configure the authentication provider to use in the Pulsar client instance.
:param auth_plugin_class_name: Name of the Authentication-Plugin you want to use.
:param auth_params_string: String which represents parameters for the Authentication-Plugin,
e.g., "key1:val1,key2:val2".
.. versionadded:: 1.17.2
"""
if isinstance(auth_params_string, str):
self._j_pulsar_sink_builder.setAuthentication(
auth_plugin_class_name, auth_params_string)
else:
j_auth_params_map = get_gateway().jvm.java.util.HashMap()
for k, v in auth_params_string.items():
j_auth_params_map.put(k, v)
self._j_pulsar_sink_builder.setAuthentication(
auth_plugin_class_name, j_auth_params_map)
return self
def delay_sending_message(self, message_delayer: MessageDelayer) -> 'PulsarSinkBuilder':
"""
Set a message delayer for enable Pulsar message delay delivery.
"""
self._j_pulsar_sink_builder.delaySendingMessage(message_delayer._j_message_delayer)
return self
def set_config(self, key: str, value) -> 'PulsarSinkBuilder':
"""
Set an arbitrary property for the PulsarSink and Pulsar Producer. The valid keys can be
found in PulsarSinkOptions and PulsarOptions.
Make sure the option could be set only once or with same value.
"""
j_config_option = ConfigOptions.key(key).string_type().no_default_value()._j_config_option
self._j_pulsar_sink_builder.setConfig(j_config_option, value)
return self
def set_properties(self, config: Dict) -> 'PulsarSinkBuilder':
"""
Set an arbitrary property for the PulsarSink and Pulsar Producer. The valid keys can be
found in PulsarSinkOptions and PulsarOptions.
"""
JConfiguration = get_gateway().jvm.org.apache.flink.configuration.Configuration
self._j_pulsar_sink_builder.setConfig(JConfiguration.fromMap(config))
return self
def build(self) -> 'PulsarSink':
"""
Build the PulsarSink.
"""
return PulsarSink(self._j_pulsar_sink_builder.build())