Pulsar

Apache Pulsar SQL Connector #

Scan Source: Unbounded Scan Source: Bounded Sink: Streaming Append Mode

The Pulsar connector allows for reading data from and writing data into Pulsar topics.

Dependencies #

The Pulsar connector is not part of the binary distribution. See how to link with it for cluster execution here.

How to create a Pulsar table #

The example below shows how to create a Pulsar table:

CREATE TABLE PulsarTable (
  `user_id` BIGINT,
  `item_id` BIGINT,
  `behavior` STRING,
  `ts` TIMESTAMP_LTZ(3) METADATA FROM 'timestamp'
) WITH (
  'connector' = 'pulsar',
  'topics' = 'user_behavior',
  'service-url' = 'pulsar://my-broker.com:6650',
)

Connector Options #

Key Default Type Description
explicit
true Boolean Indicate if the table is an explicit Flink table.
key.fields
List<String> An explicit list of physical columns from the table schema that are decoded/encoded from the key bytes of a Pulsar message. By default, this list is empty and thus a key is undefined.
key.format
(none) String Defines the format identifier for decoding/encoding key bytes in Pulsar message. The identifier is used to discover a suitable format factory.
service-url
(none) String Service URL provider for Pulsar service.
To connect to Pulsar using client libraries, you need to specify a Pulsar protocol URL.
You can assign Pulsar protocol URLs to specific clusters and use the Pulsar scheme.
  • This is an example of localhost: pulsar://localhost:6650.
  • If you have multiple brokers, the URL is as: pulsar://localhost:6550,localhost:6651,localhost:6652
  • A URL for a production Pulsar cluster is as: pulsar://pulsar.us-west.example.com:6650
  • If you use TLS authentication, the URL is as pulsar+ssl://pulsar.us-west.example.com:6651
sink.custom-topic-router
(none) String (Optional) the custom topic router class URL that is used in the [Pulsar DataStream sink connector](https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/pulsar/#pulsar-sink). If this option is provided, the sink.topic-routing-mode option will be ignored.
sink.message-delay-interval
0 ms Duration (Optional) the message delay delivery interval that is used in the [Pulsar DataStream sink connector](https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/pulsar/#pulsar-sink).
sink.topic-routing-mode
round-robin

Enum

(Optional) the topic routing mode. Available options are round-robin and message-key-hash. By default, it is set to round-robin. If you want to use a custom topic router, use the sink.custom-topic-router option to determine the partition for a particular message.

Possible values:
  • "round-robin": The producer will publish messages across all partitions in a round-robin fashion to achieve maximum throughput. Please note that round-robin is not done per individual message but rather it's set to the same boundary of pulsar.producer.batchingMaxMessages, to ensure batching is effective.
  • "message-key-hash": If no key is provided, The partitioned producer will randomly pick one single topic partition and publish all the messages into that partition. If a key is provided on the message, the partitioned producer will hash the key and assign the message to a particular partition.
  • "custom": Use custom TopicRouter implementation that will be called to determine the partition for a particular message.
source.start.message-id
(none) String (Optional) Message id that is used to specify a consuming starting point for source. Use earliest, latest or pass in a message id representation in ledgerId:entryId:partitionId, such as 12:2:-1. This option takes precedence over source.start.publish-time.
source.start.publish-time
(none) Long (Optional) Publish timestamp that is used to specify a starting point for the [Pulsar DataStream source connector](https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/pulsar/#pulsar-source) to consume data. Option source.start.message-id takes precedence over this one.
source.stop.after-message-id
(none) String Optional message id used to specify a stop position but include the given message in the consuming result for the unbounded sql source. Pass in a message id representation in "ledgerId:entryId:partitionId", such as "12:2:-1".
source.stop.at-message-id
(none) String Optional message id used to specify a stop cursor for the unbounded sql source. Use "never", "latest" or pass in a message id representation in "ledgerId:entryId:partitionId", such as "12:2:-1"
source.stop.at-publish-time
(none) Long Optional publish timestamp used to specify a stop cursor for the unbounded sql source.
source.subscription-name
(none) String The subscription name of the consumer that is used by the runtime [Pulsar DataStream source connector](https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/pulsar/#pulsar-source). This argument is required for constructing the consumer.
source.subscription-type
Exclusive

Enum

The [subscription type](https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/pulsar/#pulsar-subscriptions) that is supported by the [Pulsar DataStream source connector](https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/pulsar/#pulsar-source). Currently, only Exclusive and Shared subscription types are supported.

Possible values:
  • "Exclusive"
  • "Shared"
  • "Failover"
  • "Key_Shared"
topics
(none) List<String> Topic name(s) the table reads data from. It can be a single topic name or a list of topic names separated by a semicolon symbol (;) like topic-1;topic-2. When a list of topics configured, please ensure that all the topics are in the same schema as Flink Table need a fixed schema.
value.format
(none) String Defines the format identifier for decoding/encoding value data. The identifier is used to discover a suitable format factory.