Apache Pulsar SQL Connector #
Scan Source: Unbounded Scan Source: Bounded Sink: Streaming Append Mode
The Pulsar connector allows for reading data from and writing data into Pulsar topics.
Dependencies #
The Pulsar connector is not part of the binary distribution. See how to link with it for cluster execution here.
How to create a Pulsar table #
The example below shows how to create a Pulsar table:
CREATE TABLE PulsarTable (
`user_id` BIGINT,
`item_id` BIGINT,
`behavior` STRING,
`ts` TIMESTAMP_LTZ(3) METADATA FROM 'timestamp'
) WITH (
'connector' = 'pulsar',
'topics' = 'user_behavior',
'service-url' = 'pulsar://my-broker.com:6650',
)
Connector Options #
Key | Default | Type | Description |
---|---|---|---|
explicit |
true | Boolean | Indicate if the table is an explicit Flink table. |
key.fields |
List<String> | An explicit list of physical columns from the table schema that are decoded/encoded from the key bytes of a Pulsar message. By default, this list is empty and thus a key is undefined. | |
key.format |
(none) | String | Defines the format identifier for decoding/encoding key bytes in Pulsar message. The identifier is used to discover a suitable format factory. |
service-url |
(none) | String | Service URL provider for Pulsar service. To connect to Pulsar using client libraries, you need to specify a Pulsar protocol URL. You can assign Pulsar protocol URLs to specific clusters and use the Pulsar scheme.
|
sink.custom-topic-router |
(none) | String | (Optional) the custom topic router class URL that is used in the [Pulsar DataStream sink connector](https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/pulsar/#pulsar-sink). If this option is provided, the sink.topic-routing-mode option will be ignored. |
sink.message-delay-interval |
0 ms | Duration | (Optional) the message delay delivery interval that is used in the [Pulsar DataStream sink connector](https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/pulsar/#pulsar-sink). |
sink.topic-routing-mode |
round-robin | Enum |
(Optional) the topic routing mode. Available options are round-robin and message-key-hash . By default, it is set to round-robin . If you want to use a custom topic router, use the sink.custom-topic-router option to determine the partition for a particular message.Possible values:
|
source.start.message-id |
(none) | String | (Optional) Message id that is used to specify a consuming starting point for source. Use earliest , latest or pass in a message id representation in ledgerId:entryId:partitionId , such as 12:2:-1 . This option takes precedence over source.start.publish-time. |
source.start.publish-time |
(none) | Long | (Optional) Publish timestamp that is used to specify a starting point for the [Pulsar DataStream source connector](https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/pulsar/#pulsar-source) to consume data. Option source.start.message-id takes precedence over this one. |
source.stop.after-message-id |
(none) | String | Optional message id used to specify a stop position but include the given message in the consuming result for the unbounded sql source. Pass in a message id representation in "ledgerId:entryId:partitionId", such as "12:2:-1". |
source.stop.at-message-id |
(none) | String | Optional message id used to specify a stop cursor for the unbounded sql source. Use "never", "latest" or pass in a message id representation in "ledgerId:entryId:partitionId", such as "12:2:-1" |
source.stop.at-publish-time |
(none) | Long | Optional publish timestamp used to specify a stop cursor for the unbounded sql source. |
source.subscription-name |
(none) | String | The subscription name of the consumer that is used by the runtime [Pulsar DataStream source connector](https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/pulsar/#pulsar-source). This argument is required for constructing the consumer. |
source.subscription-type |
Exclusive | Enum |
The [subscription type](https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/pulsar/#pulsar-subscriptions) that is supported by the [Pulsar DataStream source connector](https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/pulsar/#pulsar-source). Currently, only Exclusive and Shared subscription types are supported.Possible values:
|
topics |
(none) | List<String> | Topic name(s) the table reads data from. It can be a single topic name or a list of topic names separated by a semicolon symbol (; ) like topic-1;topic-2 . When a list of topics configured, please ensure that all the topics are in the same schema as Flink Table need a fixed schema. |
value.format |
(none) | String | Defines the format identifier for decoding/encoding value data. The identifier is used to discover a suitable format factory. |