OUT
- The output type of the source.@PublicEvolving public final class PulsarSourceBuilder<OUT> extends Object
PulsarSource
to make it easier for the users to construct a PulsarSource
.
The following example shows the minimum setup to create a PulsarSource that reads the String values from a Pulsar topic.
PulsarSource<String> source = PulsarSource
.builder()
.setServiceUrl(PULSAR_BROKER_URL)
.setAdminUrl(PULSAR_BROKER_HTTP_URL)
.setSubscriptionName("flink-source-1")
.setTopics(Arrays.asList(TOPIC1, TOPIC2))
.setDeserializationSchema(PulsarDeserializationSchema.flinkSchema(new SimpleStringSchema()))
.build();
The service url, admin url, subscription name, topics to consume, and the record deserializer are required fields that must be set.
To specify the starting position of PulsarSource, one can call setStartCursor(StartCursor)
.
By default, the PulsarSource runs in an Boundedness.CONTINUOUS_UNBOUNDED
mode and
never stop until the Flink job is canceled or fails. To let the PulsarSource run in Boundedness.CONTINUOUS_UNBOUNDED
but stops at some given offsets, one can call setUnboundedStopCursor(StopCursor)
and disable auto partition discovery as described below. For
example the following PulsarSource stops after it consumes up to a event time when the Flink
started.
To stop the connector user has to disable the auto partition discovery. As auto partition
discovery always expected new splits to come and not exiting. To disable auto partition
discovery, use builder.setConfig(PulsarSourceOptions.PULSAR_PARTITION_DISCOVERY_INTERVAL_MS
, -1).
PulsarSource<String> source = PulsarSource
.builder()
.setServiceUrl(PULSAR_BROKER_URL)
.setAdminUrl(PULSAR_BROKER_HTTP_URL)
.setSubscriptionName("flink-source-1")
.setTopics(Arrays.asList(TOPIC1, TOPIC2))
.setDeserializationSchema(PulsarDeserializationSchema.flinkSchema(new SimpleStringSchema()))
.setUnboundedStopCursor(StopCursor.atEventTime(System.currentTimeMillis()))
.build();
Modifier and Type | Method and Description |
---|---|
PulsarSource<OUT> |
build()
Build the
PulsarSource . |
PulsarSourceBuilder<OUT> |
setAdminUrl(String adminUrl)
Sets the admin endpoint for the PulsarAdmin of the PulsarSource.
|
PulsarSourceBuilder<OUT> |
setBoundedStopCursor(StopCursor stopCursor)
By default, the PulsarSource is set to run in
Boundedness.CONTINUOUS_UNBOUNDED manner
and thus never stops until the Flink job fails or is canceled. |
<T> PulsarSourceBuilder<OUT> |
setConfig(ConfigOption<T> key,
T value)
Set an arbitrary property for the PulsarSource and Pulsar Consumer.
|
PulsarSourceBuilder<OUT> |
setConfig(Configuration config)
Set arbitrary properties for the PulsarSource and Pulsar Consumer.
|
PulsarSourceBuilder<OUT> |
setConsumerName(String consumerName)
The consumer name is informative, and it can be used to identify a particular consumer
instance from the topic stats.
|
<T extends OUT> |
setDeserializationSchema(PulsarDeserializationSchema<T> deserializationSchema)
DeserializationSchema is required for getting the
Schema for deserialize message from
pulsar and getting the TypeInformation for message serialization in flink. |
PulsarSourceBuilder<OUT> |
setRangeGenerator(RangeGenerator rangeGenerator)
Set a topic range generator for Key_Shared subscription.
|
PulsarSourceBuilder<OUT> |
setServiceUrl(String serviceUrl)
Sets the server's link for the PulsarConsumer of the PulsarSource.
|
PulsarSourceBuilder<OUT> |
setStartCursor(StartCursor startCursor)
Specify from which offsets the PulsarSource should start consume from by providing an
StartCursor . |
PulsarSourceBuilder<OUT> |
setSubscriptionName(String subscriptionName)
Sets the name for this pulsar subscription.
|
PulsarSourceBuilder<OUT> |
setSubscriptionType(org.apache.pulsar.client.api.SubscriptionType subscriptionType)
SubscriptionType is the consuming behavior for pulsar, we would generator different
split by the given subscription type. |
PulsarSourceBuilder<OUT> |
setTopicPattern(Pattern topicsPattern)
Set a topic pattern to consume from the java
Pattern . |
PulsarSourceBuilder<OUT> |
setTopicPattern(Pattern topicsPattern,
org.apache.pulsar.client.api.RegexSubscriptionMode regexSubscriptionMode)
Set a topic pattern to consume from the java
Pattern . |
PulsarSourceBuilder<OUT> |
setTopicPattern(String topicsPattern)
Set a topic pattern to consume from the java regex str.
|
PulsarSourceBuilder<OUT> |
setTopicPattern(String topicsPattern,
org.apache.pulsar.client.api.RegexSubscriptionMode regexSubscriptionMode)
Set a topic pattern to consume from the java regex str.
|
PulsarSourceBuilder<OUT> |
setTopics(List<String> topics)
Set a pulsar topic list for flink source.
|
PulsarSourceBuilder<OUT> |
setTopics(String... topics)
Set a pulsar topic list for flink source.
|
PulsarSourceBuilder<OUT> |
setUnboundedStopCursor(StopCursor stopCursor)
By default, the PulsarSource runs in an
Boundedness.CONTINUOUS_UNBOUNDED mode and
never stop until the Flink job is canceled or fails. |
public PulsarSourceBuilder<OUT> setAdminUrl(String adminUrl)
adminUrl
- the url for the PulsarAdmin.public PulsarSourceBuilder<OUT> setServiceUrl(String serviceUrl)
serviceUrl
- the server url of the Pulsar cluster.public PulsarSourceBuilder<OUT> setSubscriptionName(String subscriptionName)
subscriptionName
- the server url of the Pulsar cluster.public PulsarSourceBuilder<OUT> setSubscriptionType(org.apache.pulsar.client.api.SubscriptionType subscriptionType)
SubscriptionType
is the consuming behavior for pulsar, we would generator different
split by the given subscription type. Please take some time to consider which subscription
type matches your application best. Default is SubscriptionType.Shared
.subscriptionType
- The type of subscription.public PulsarSourceBuilder<OUT> setTopics(String... topics)
setTopics(java.lang.String...)
or setTopicPattern(java.lang.String)
in this builder.topics
- The topic list you would like to consume message.public PulsarSourceBuilder<OUT> setTopics(List<String> topics)
setTopics(java.lang.String...)
or setTopicPattern(java.lang.String)
in this builder.topics
- The topic list you would like to consume message.public PulsarSourceBuilder<OUT> setTopicPattern(String topicsPattern)
setTopics(java.lang.String...)
or setTopicPattern(java.lang.String)
in this builder.topicsPattern
- the pattern of the topic name to consume from.public PulsarSourceBuilder<OUT> setTopicPattern(Pattern topicsPattern)
Pattern
. You can set topics once either
with setTopics(java.lang.String...)
or setTopicPattern(java.lang.String)
in this builder.topicsPattern
- the pattern of the topic name to consume from.public PulsarSourceBuilder<OUT> setTopicPattern(String topicsPattern, org.apache.pulsar.client.api.RegexSubscriptionMode regexSubscriptionMode)
setTopics(java.lang.String...)
or setTopicPattern(java.lang.String)
in this builder.topicsPattern
- the pattern of the topic name to consume from.regexSubscriptionMode
- The topic filter for regex subscription.public PulsarSourceBuilder<OUT> setTopicPattern(Pattern topicsPattern, org.apache.pulsar.client.api.RegexSubscriptionMode regexSubscriptionMode)
Pattern
. You can set topics once either
with setTopics(java.lang.String...)
or setTopicPattern(java.lang.String)
in this builder.topicsPattern
- the pattern of the topic name to consume from.regexSubscriptionMode
- When subscribing to a topic using a regular expression, you can
pick a certain type of topics.
public PulsarSourceBuilder<OUT> setConsumerName(String consumerName)
public PulsarSourceBuilder<OUT> setRangeGenerator(RangeGenerator rangeGenerator)
rangeGenerator
- A generator which would generate a set of TopicRange
for given
topic.public PulsarSourceBuilder<OUT> setStartCursor(StartCursor startCursor)
StartCursor
.startCursor
- set the starting offsets for the Source.public PulsarSourceBuilder<OUT> setUnboundedStopCursor(StopCursor stopCursor)
Boundedness.CONTINUOUS_UNBOUNDED
mode and
never stop until the Flink job is canceled or fails. To let the PulsarSource run in Boundedness.CONTINUOUS_UNBOUNDED
but stops at some given offsets, one can call setUnboundedStopCursor(StopCursor)
and disable auto partition discovery as described below.
This method is different from setBoundedStopCursor(StopCursor)
that after setting
the stopping offsets with this method, PulsarSource.getBoundedness()
will still
return Boundedness.CONTINUOUS_UNBOUNDED
even though it will stop at the stopping
offsets specified by the stopping offsets StopCursor
.
To stop the connector user has to disable the auto partition discovery. As auto partition
discovery always expected new splits to come and not exiting. To disable auto partition
discovery, use builder.setConfig(PulsarSourceOptions.PULSAR_PARTITION_DISCOVERY_INTERVAL_MS
, -1).
stopCursor
- The StopCursor
to specify the stopping offset.setBoundedStopCursor(StopCursor)
public PulsarSourceBuilder<OUT> setBoundedStopCursor(StopCursor stopCursor)
Boundedness.CONTINUOUS_UNBOUNDED
manner
and thus never stops until the Flink job fails or is canceled. To let the PulsarSource run in
Boundedness.BOUNDED
manner and stops at some point, one can set an StopCursor
to specify the stopping offsets for each partition. When all the partitions have reached
their stopping offsets, the PulsarSource will then exit.
This method is different from setUnboundedStopCursor(StopCursor)
that after
setting the stopping offsets with this method, PulsarSource.getBoundedness()
will
return Boundedness.BOUNDED
instead of Boundedness.CONTINUOUS_UNBOUNDED
.
stopCursor
- the StopCursor
to specify the stopping offsets.setUnboundedStopCursor(StopCursor)
public <T extends OUT> PulsarSourceBuilder<T> setDeserializationSchema(PulsarDeserializationSchema<T> deserializationSchema)
Schema
for deserialize message from
pulsar and getting the TypeInformation
for message serialization in flink.
We have defined a set of implementations, using PulsarDeserializationSchema#pulsarSchema
or PulsarDeserializationSchema#flinkSchema
for creating the desired schema.
public <T> PulsarSourceBuilder<OUT> setConfig(ConfigOption<T> key, T value)
PulsarSourceOptions
and PulsarOptions
.
Make sure the option could be set only once or with same value.
key
- the key of the property.value
- the value of the property.public PulsarSourceBuilder<OUT> setConfig(Configuration config)
PulsarSourceOptions
and PulsarOptions
.config
- the config to set for the PulsarSource.public PulsarSource<OUT> build()
PulsarSource
.Copyright © 2014–2023 The Apache Software Foundation. All rights reserved.