Pulsar
This documentation is for an out-of-date version of Apache Flink. We recommend you use the latest stable version.

Apache Pulsar 连接器 #

Flink 当前只提供 Apache Pulsar 数据源,用户可以使用它从 Pulsar 读取数据,并保证每条数据只被处理一次。

添加依赖 #

Pulsar Source 当前支持 Pulsar 2.9.0 之后的版本,但是 Pulsar Source 使用到了 Pulsar 的事务机制,建议在 Pulsar 2.10.0 及其之后的版本上使用 Pulsar Source 进行数据读取。

如果想要了解更多关于 Pulsar API 兼容性设计,可以阅读文档 PIP-72

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-pulsar_2.11</artifactId>
    <version>1.14.4</version>
</dependency>

Flink 的流连接器并不会放到发行文件里面一同发布,阅读此文档,了解如何将连接器添加到集群实例内。

Pulsar Source #

Pulsar Source 基于 Flink 最新的批流一体 API 进行开发。

使用示例 #

Pulsar Source 提供了 builder 类来构造 PulsarSource 实例。下面的代码实例使用 builder 类创建的实例会从 “persistent://public/default/my-topic” 的数据开始端进行消费。对应的 Pulsar Source 使用了 Exclusive(独占)的订阅方式消费消息,订阅名称为 my-subscription,并把消息体的二进制字节流以 UTF-8 的方式编码为字符串。

PulsarSource<String> pulsarSource = PulsarSource.builder()
    .setServiceUrl(serviceUrl)
    .setAdminUrl(adminUrl)
    .setStartCursor(StartCursor.earliest())
    .setTopics("my-topic")
    .setDeserializationSchema(PulsarDeserializationSchema.flinkSchema(new SimpleStringSchema()))
    .setSubscriptionName("my-subscription")
    .setSubscriptionType(SubscriptionType.Exclusive)
    .build();

env.fromSource(source, WatermarkStrategy.noWatermarks(), "Pulsar Source");

如果使用构造类构造 PulsarSource,一定要提供下面几个属性:

  • Pulsar 数据消费的地址,使用 setServiceUrl(String) 方法提供。
  • Pulsar HTTP 管理地址,使用 setAdminUrl(String) 方法提供。
  • Pulsar 订阅名称,使用 setSubscriptionName(String) 方法提供。
  • 需要消费的 Topic 或者是 Topic 下面的分区,详见指定消费的 Topic 或者 Topic 分区
  • 解码 Pulsar 消息的反序列化器,详见反序列化器

指定消费的 Topic 或者 Topic 分区 #

Pulsar Source 提供了两种订阅 Topic 或 Topic 分区的方式。

  • Topic 列表,从这个 Topic 的所有分区上消费消息,例如:

    PulsarSource.builder().setTopics("some-topic1", "some-topic2");
    
    // 从 topic "topic-a" 的 0 和 2 分区上消费
    PulsarSource.builder().setTopics("topic-a-partition-0", "topic-a-partition-2");
    
  • Topic 正则,Pulsar Source 使用给定的正则表达式匹配出所有合规的 Topic,例如:

    PulsarSource.builder().setTopicPattern("topic-*");
    

Topic 名称简写 #

从 Pulsar 2.0 之后,完整的 Topic 名称格式为 {persistent|non-persistent}://租户/命名空间/topic。但是 Pulsar Source 不需要提供 Topic 名称的完整定义,因为 Topic 类型、租户、命名空间都设置了默认值。

Topic 属性 默认值
Topic 类型 persistent
租户 public
命名空间 default

下面的表格提供了当前 Pulsar Topic 支持的简写方式:

Topic 名称简写 翻译后的 Topic 名称
my-topic persistent://public/default/my-topic
my-tenant/my-namespace/my-topic persistent://my-tenant/my-namespace/my-topic
对于 Non-persistent(非持久化)Topic,Pulsar Source 不支持简写名称。所以无法将 non-persistent://public/default/my-topic 简写成 non-persistent://my-topic

Pulsar Topic 层次结构 #

对于 Pulsar 而言,Topic 分区也是一种 Topic。Pulsar 会将一个有分区的 Topic 在内部按照分区的大小拆分成等量的无分区 Topic。

由于 Pulsar 内部的分区实际实现为一个 Topic,我们将用“分区”来指代“仅有一个分区的 Topic(Non-partitioned Topic)”和“具有多个分区的 Topic 下属的分区”。

例如,在 Pulsar 的 sample 租户下面的 flink 命名空间里面创建了一个有 3 个分区的 Topic,给它起名为 simple-string。可以在 Pulsar 上看到如下的 Topic 列表:

Topic 名称 是否分区
persistent://sample/flink/simple-string
persistent://sample/flink/simple-string-partition-0
persistent://sample/flink/simple-string-partition-1
persistent://sample/flink/simple-string-partition-2

这意味着,用户可以用上面的子 Topic 去直接消费分区里面的数据,不需要再去基于上层的父 Topic 去消费全部分区的数据。例如:使用 PulsarSource.builder().setTopics("sample/flink/simple-string-partition-1", "sample/flink/simple-string-partition-2") 将会只消费 Topic sample/flink/simple-string 分区 1 和 2 里面的消息。

配置 Topic 正则表达式 #

前面提到了 Pulsar Topic 有 persistentnon-persistent 两种类型,使用正则表达式消费数据的时候,Pulsar Source 会尝试从正则表达式里面解析出消息的类型。例如:PulsarSource.builder().setTopicPattern("non-persistent://my-topic*") 会解析出 non-persistent 这个 Topic 类型。如果用户使用 Topic 名称简写的方式,Pulsar Source 会使用默认的消息类型 persistent

如果想用正则去消费 persistentnon-persistent 类型的 Topic,需要使用 RegexSubscriptionMode 定义 Topic 类型,例如:setTopicPattern("topic-*", RegexSubscriptionMode.AllTopics)

反序列化器 #

反序列化器用于解析 Pulsar 消息,Pulsar Source 使用 PulsarDeserializationSchema 来定义反序列化器。用户可以在 builder 类中使用 setDeserializationSchema(PulsarDeserializationSchema) 方法配置反序列化器。

如果用户只关心消息体的二进制字节流,并不需要其他属性来解析数据。可以直接使用预定义的 PulsarDeserializationSchema。Pulsar Source里面提供了 3 种预定义的反序列化器。

  • 使用 Pulsar 的 Schema 解析消息。
    // 基础数据类型
    PulsarDeserializationSchema.pulsarSchema(Schema);
    
    // 结构类型 (JSON, Protobuf, Avro, etc.)
    PulsarDeserializationSchema.pulsarSchema(Schema, Class);
    
    // 键值对类型
    PulsarDeserializationSchema.pulsarSchema(Schema, Class, Class);
    
  • 使用 Flink 的 DeserializationSchema 解析消息。
    PulsarDeserializationSchema.flinkSchema(DeserializationSchema);
    
  • 使用 Flink 的 TypeInformation 解析消息。
    PulsarDeserializationSchema.flinkTypeInfo(TypeInformation, ExecutionConfig);
    

Pulsar 的 Message<byte[]> 包含了很多 额外的属性。例如,消息的 key、消息发送时间、消息生产时间、用户在消息上自定义的键值对属性等。可以使用 Message<byte[]> 接口来获取这些属性。

如果用户需要基于这些额外的属性来解析一条消息,可以实现 PulsarDeserializationSchema 接口。并一定要确保 PulsarDeserializationSchema.getProducedType() 方法返回的 TypeInformation 是正确的结果。Flink 使用 TypeInformation 将解析出来的结果序列化传递到下游算子。

Pulsar 订阅 #

订阅是命名好的配置规则,指导消息如何投递给消费者。Pulsar Source 需要提供一个独立的订阅名称,支持 Pulsar 的四种订阅模式:

当前 Pulsar Source 里,独占灾备 的实现没有区别,如果 Flink 的一个 reader 挂了,Pulsar Source 会把所有未消费的数据交给其他的 reader 来消费数据。

默认情况下,如果没有指定订阅类型,Pulsar Source 使用共享订阅类型(SubscriptionType.Shared)。

// 名为 "my-shared" 的共享订阅
PulsarSource.builder().setSubscriptionName("my-shared");

// 名为 "my-exclusive" 的独占订阅
PulsarSource.builder().setSubscriptionName("my-exclusive").setSubscriptionType(SubscriptionType.Exclusive);

如果想在 Pulsar Source 里面使用 key 共享 订阅,需要提供 RangeGenerator 实例。RangeGenerator 会生成一组消息 key 的 hash 范围,Pulsar Source 会基于给定的范围来消费数据。

Pulsar Source 也提供了一个名为 UniformRangeGenerator 的默认实现,它会基于 flink 数据源的并行度将 hash 范围均分。

起始消费位置 #

Pulsar Source 使用 setStartCursor(StartCursor) 方法给定开始消费的位置。内置的开始消费位置有:

  • 从 Topic 里面最早的一条消息开始消费。
    StartCursor.earliest();
    
  • 从 Topic 里面最新的一条消息开始消费。
    StartCursor.latest();
    
  • 从给定的消息开始消费。
    StartCursor.fromMessageId(MessageId);
    
  • 与前者不同的是,给定的消息可以跳过,再进行消费。
    StartCursor.fromMessageId(MessageId, boolean);
    
  • 从给定的消息发布时间开始消费,这个方法因为名称容易导致误解现在已经不建议使用。你可以使用方法 StartCursor.fromPublishTime(long)
    StartCursor.fromMessageTime(long);
    
  • 从给定的消息发布时间开始消费。
    StartCursor.fromPublishTime(long);
    

每条消息都有一个固定的序列号,这个序列号在 Pulsar 上有序排列,其包含了 ledger、entry、partition 等原始信息,用于在 Pulsar 底层存储上查找到具体的消息。

Pulsar 称这个序列号为 MessageId,用户可以使用 DefaultImplementation.newMessageId(long ledgerId, long entryId, int partitionIndex) 创建它。

边界 #

Pulsar Source 默认情况下使用流的方式消费数据。除非任务失败或者被取消,否则将持续消费数据。用户可以使用 setBoundedStopCursor(StopCursor) 给定停止消费的位置,这种情况下会使用批的方式进行消费。使用流的方式一样可以给定停止位置,使用 setUnboundedStopCursor(StopCursor) 方法即可。

在批模式下,使用 setBoundedStopCursor(StopCursor) 来指定一个消费停止位置。

内置的停止消费位置如下:

  • 永不停止。
    StopCursor.never();
    
  • 停止于 Pulsar 启动时 Topic 里面最新的那条数据。
    StopCursor.latest();
    
  • 停止于某条消息,结果里不包含此消息。
    StopCursor.atMessageId(MessageId);
    
  • 停止于某条消息之后,结果里包含此消息。
    StopCursor.afterMessageId(MessageId);
    
  • 停止于某个给定的消息事件时间戳,比如 Message<byte[]>.getEventTime(),消费结果里不包含此时间戳的消息。
    StopCursor.atEventTime(long);
    
  • 停止于某个给定的消息事件时间戳,比如 Message<byte[]>.getEventTime(),消费结果里包含此时间戳的消息。
    StopCursor.afterEventTime(long);
    
  • 停止于某个给定的消息发布时间戳,比如 Message<byte[]>.getPublishTime(),消费结果里不包含此时间戳的消息。
    StopCursor.atPublishTime(long);
    
  • 停止于某个给定的消息发布时间戳,比如 Message<byte[]>.getPublishTime(),消费结果里包含此时间戳的消息。
    StopCursor.afterPublishTime(long);
    

Source 配置项 #

除了前面提到的配置选项,Pulsar Source 还提供了丰富的选项供 Pulsar 专家使用,在 builder 类里通过 setConfig(ConfigOption<T>, T)setConfig(Configuration) 方法给定下述的全部配置。

Pulsar Java 客户端配置项 #

Pulsar Source 使用 Java 客户端来创建消费实例,相关的配置定义于 Pulsar 的 ClientConfigurationData 内。在 PulsarOptions 选项中,定义大部分的可供用户定义的配置。

Key Default Type Description
pulsar.client.authParamMap
(none) Map Parameters for the authentication plugin.
pulsar.client.authParams
(none) String Parameters for the authentication plugin.

Example:
key1:val1,key2:val2
pulsar.client.authPluginClassName
(none) String Name of the authentication plugin.
pulsar.client.concurrentLookupRequest
5000 Integer The number of concurrent lookup requests allowed to send on each broker connection to prevent overload on the broker. It should be configured with a higher value only in case of it requires to produce or subscribe on thousands of topic using a created PulsarClient
pulsar.client.connectionTimeoutMs
10000 Integer Duration (in ms) of waiting for a connection to a broker to be established.
If the duration passes without a response from a broker, the connection attempt is dropped.
pulsar.client.connectionsPerBroker
1 Integer The maximum number of connections that the client library will open to a single broker.
By default, the connection pool will use a single connection for all the producers and consumers. Increasing this parameter may improve throughput when using many producers over a high latency connection.
pulsar.client.enableBusyWait
false Boolean Option to enable busy-wait settings.
This option will enable spin-waiting on executors and IO threads in order to reduce latency during context switches. The spinning will consume 100% CPU even when the broker is not doing any work. It is recommended to reduce the number of IO threads and BookKeeper client threads to only have fewer CPU cores busy.
pulsar.client.enableTransaction
false Boolean If transaction is enabled, start the transactionCoordinatorClient with PulsarClient.
pulsar.client.initialBackoffIntervalNanos
100000000 Long Default duration (in nanoseconds) for a backoff interval.
pulsar.client.keepAliveIntervalSeconds
30 Integer Interval (in seconds) for keeping connection between the Pulsar client and broker alive.
pulsar.client.listenerName
(none) String Configure the listenerName that the broker will return the corresponding advertisedListener.
pulsar.client.maxBackoffIntervalNanos
60000000000 Long The maximum duration (in nanoseconds) for a backoff interval.
pulsar.client.maxLookupRedirects
20 Integer The maximum number of times a lookup-request redirections to a broker.
pulsar.client.maxLookupRequest
50000 Integer The maximum number of lookup requests allowed on each broker connection to prevent overload on the broker. It should be greater than maxConcurrentLookupRequests. Requests that inside maxConcurrentLookupRequests are already sent to broker, and requests beyond maxConcurrentLookupRequests and under maxLookupRequests will wait in each client cnx.
pulsar.client.maxNumberOfRejectedRequestPerConnection
50 Integer The maximum number of rejected requests of a broker in a certain period (30s) after the current connection is closed and the client creates a new connection to connect to a different broker.
pulsar.client.memoryLimitBytes
0 Long The limit (in bytes) on the amount of direct memory that will be allocated by this client instance.
Note: at this moment this is only limiting the memory for producers. Setting this to 0 will disable the limit.
pulsar.client.numIoThreads
1 Integer The number of threads used for handling connections to brokers.
pulsar.client.numListenerThreads
1 Integer The number of threads used for handling message listeners. The listener thread pool is shared across all the consumers and readers that are using a listener model to get messages. For a given consumer, the listener is always invoked from the same thread to ensure ordering.
pulsar.client.operationTimeoutMs
30000 Integer Operation timeout (in ms). Operations such as creating producers, subscribing or unsubscribing topics are retried during this interval. If the operation is not completed during this interval, the operation will be marked as failed.
pulsar.client.proxyProtocol
SNI

Enum

Protocol type to determine the type of proxy routing when a client connects to the proxy using pulsar.client.proxyServiceUrl.

Possible values:
  • "SNI"
pulsar.client.proxyServiceUrl
(none) String Proxy-service URL when a client connects to the broker via the proxy. The client can choose the type of proxy-routing.
pulsar.client.requestTimeoutMs
60000 Integer Maximum duration (in ms) for completing a request.
pulsar.client.serviceUrl
(none) String Service URL provider for Pulsar service.
To connect to Pulsar using client libraries, you need to specify a Pulsar protocol URL.
You can assign Pulsar protocol URLs to specific clusters and use the pulsar scheme.
  • This is an example of localhost: pulsar://localhost:6650.
  • If you have multiple brokers, the URL is as: pulsar://localhost:6550,localhost:6651,localhost:6652
  • A URL for a production Pulsar cluster is as: pulsar://pulsar.us-west.example.com:6650
  • If you use TLS authentication, the URL is as pulsar+ssl://pulsar.us-west.example.com:6651
pulsar.client.sslProvider
(none) String The name of the security provider used for SSL connections. The default value is the default security provider of the JVM.
pulsar.client.statsIntervalSeconds
60 Long Interval between each stats info.
  • Stats is activated with positive statsInterval
  • Set statsIntervalSeconds to 1 second at least.
pulsar.client.tlsAllowInsecureConnection
false Boolean Whether the Pulsar client accepts untrusted TLS certificate from the broker.
pulsar.client.tlsCiphers
List<String> A list of cipher suites. This is a named combination of authentication, encryption, MAC and the key exchange algorithm used to negotiate the security settings for a network connection using the TLS or SSL network protocol. By default all the available cipher suites are supported.
pulsar.client.tlsHostnameVerificationEnable
false Boolean Whether to enable TLS hostname verification. It allows to validate hostname verification when a client connects to the broker over TLS. It validates incoming x509 certificate and matches provided hostname (CN/SAN) with the expected broker's host name. It follows RFC 2818, 3.1. Server Identity hostname verification.
pulsar.client.tlsProtocols
List<String> The SSL protocol used to generate the SSLContext. By default, it is set TLS, which is fine for most cases. Allowed values in recent JVMs are TLS, TLSv1.3, TLSv1.2 and TLSv1.1.
pulsar.client.tlsTrustCertsFilePath
(none) String Path to the trusted TLS certificate file.
pulsar.client.tlsTrustStorePassword
(none) String The store password for the key store file.
pulsar.client.tlsTrustStorePath
(none) String The location of the trust store file.
pulsar.client.tlsTrustStoreType
"JKS" String The file format of the trust store file.
pulsar.client.useKeyStoreTls
false Boolean If TLS is enabled, whether use the KeyStore type as the TLS configuration parameter. If it is set to false, it means to use the default pem type configuration.
pulsar.client.useTcpNoDelay
true Boolean Whether to use the TCP no-delay flag on the connection to disable Nagle algorithm.
No-delay features ensures that packets are sent out on the network as soon as possible, and it is critical to achieve low latency publishes. On the other hand, sending out a huge number of small packets might limit the overall throughput. Therefore, if latency is not a concern, it is recommended to set the useTcpNoDelay flag to false.
By default, it is set to true.

Pulsar 管理 API 配置项 #

管理 API 用于查询 Topic 的元数据和用正则订阅的时候的 Topic 查找,它与 Java 客户端共享大部分配置。下面列举的配置只供管理 API 使用,PulsarOptions 包含了这些配置 。

Key Default Type Description
pulsar.admin.adminUrl
(none) String The Pulsar service HTTP URL for the admin endpoint. For example, http://my-broker.example.com:8080, or https://my-broker.example.com:8443 for TLS.
pulsar.admin.autoCertRefreshTime
300000 Integer The auto cert refresh time (in ms) if Pulsar admin supports TLS authentication.
pulsar.admin.connectTimeout
60000 Integer The connection time out (in ms) for the PulsarAdmin client.
pulsar.admin.readTimeout
60000 Integer The server response read timeout (in ms) for the PulsarAdmin client for any request.
pulsar.admin.requestTimeout
300000 Integer The server request timeout (in ms) for the PulsarAdmin client for any request.

Pulsar 消费者 API 配置项 #

Pulsar 提供了消费者 API 和读者 API 两套 API 来进行数据消费,它们可用于不同的业务场景。Flink 上的 Pulsar Source 使用消费者 API 进行消费,它的配置定义于 Pulsar 的 ConsumerConfigurationData 内。Pulsar Source 将其中大部分的可供用户定义的配置定义于 PulsarSourceOptions 内。

Key Default Type Description
pulsar.consumer.ackReceiptEnabled
false Boolean Acknowledgement will return a receipt but this does not mean that the message will not be resent after getting the receipt.
pulsar.consumer.ackTimeoutMillis
0 Long The timeout (in ms) for unacknowledged messages, truncated to the nearest millisecond. The timeout needs to be greater than 1 second.
By default, the acknowledge timeout is disabled and that means that messages delivered to a consumer will not be re-delivered unless the consumer crashes.
When acknowledgement timeout being enabled, if a message is not acknowledged within the specified timeout it will be re-delivered to the consumer (possibly to a different consumer in case of a shared subscription).
pulsar.consumer.acknowledgementsGroupTimeMicros
100000 Long Group a consumer acknowledgment for a specified time (in μs). By default, a consumer uses 100μs grouping time to send out acknowledgments to a broker. If the group time is set to 0, acknowledgments are sent out immediately. A longer ack group time is more efficient at the expense of a slight increase in message re-deliveries after a failure.
pulsar.consumer.autoAckOldestChunkedMessageOnQueueFull
false Boolean Buffering a large number of outstanding uncompleted chunked messages can bring memory pressure and it can be guarded by providing this pulsar.consumer.maxPendingChunkedMessage threshold. Once a consumer reaches this threshold, it drops the outstanding unchunked-messages by silently acknowledging if pulsar.consumer.autoAckOldestChunkedMessageOnQueueFull is true. Otherwise, it marks them for redelivery.
pulsar.consumer.autoUpdatePartitionsIntervalSeconds
60 Integer The interval (in seconds) of updating partitions. This only works if autoUpdatePartitions is enabled.
pulsar.consumer.consumerName
(none) String The consumer name is informative and it can be used to identify a particular consumer instance from the topic stats.
pulsar.consumer.cryptoFailureAction
FAIL

Enum

The consumer should take action when it receives a message that can not be decrypted.
  • FAIL: this is the default option to fail messages until crypto succeeds.
  • DISCARD: silently acknowledge but do not deliver messages to an application.
  • CONSUME: deliver encrypted messages to applications. It is the application's responsibility to decrypt the message.

Fail to decompress the messages.
If messages contain batch messages, a client is not be able to retrieve individual messages in batch.
The delivered encrypted message contains EncryptionContext which contains encryption and compression information in. You can use an application to decrypt the consumed message payload.

Possible values:
  • "FAIL"
  • "DISCARD"
  • "CONSUME"
pulsar.consumer.deadLetterPolicy.deadLetterTopic
(none) String Name of the dead topic where the failed messages are sent.
pulsar.consumer.deadLetterPolicy.maxRedeliverCount
0 Integer The maximum number of times that a message are redelivered before being sent to the dead letter queue.
pulsar.consumer.deadLetterPolicy.retryLetterTopic
(none) String Name of the retry topic where the failed messages are sent.
pulsar.consumer.expireTimeOfIncompleteChunkedMessageMillis
60000 Long If a producer fails to publish all the chunks of a message, the consumer can expire incomplete chunks if the consumer cannot receive all chunks in expire times (default 1 hour, in ms).
pulsar.consumer.maxPendingChunkedMessage
10 Integer The consumer buffers chunk messages into memory until it receives all the chunks of the original message. While consuming chunk-messages, chunks from the same message might not be contiguous in the stream and they might be mixed with other messages' chunks. So, consumer has to maintain multiple buffers to manage chunks coming from different messages. This mainly happens when multiple publishers are publishing messages on the topic concurrently or publishers failed to publish all chunks of the messages.
For example, there are M1-C1, M2-C1, M1-C2, M2-C2 messages.Messages M1-C1 and M1-C2 belong to the M1 original message while M2-C1 and M2-C2 belong to the M2 message.
Buffering a large number of outstanding uncompleted chunked messages can bring memory pressure and it can be guarded by providing this pulsar.consumer.maxPendingChunkedMessage threshold. Once, a consumer reaches this threshold, it drops the outstanding unchunked messages by silently acknowledging or asking the broker to redeliver messages later by marking it unacknowledged. This behavior can be controlled by the pulsar.consumer.autoAckOldestChunkedMessageOnQueueFull option.
pulsar.consumer.maxTotalReceiverQueueSizeAcrossPartitions
50000 Integer The maximum total receiver queue size across partitions.
This setting reduces the receiver queue size for individual partitions if the total receiver queue size exceeds this value.
pulsar.consumer.negativeAckRedeliveryDelayMicros
60000000 Long Delay (in μs) to wait before redelivering messages that failed to be processed.
When an application uses Consumer.negativeAcknowledge(Message), failed messages are redelivered after a fixed timeout.
pulsar.consumer.poolMessages
false Boolean Enable pooling of messages and the underlying data buffers.
pulsar.consumer.priorityLevel
0 Integer Priority level for a consumer to which a broker gives more priorities while dispatching messages in the shared subscription type.
The broker follows descending priorities. For example, 0=max-priority, 1, 2,...
In shared subscription mode, the broker first dispatches messages to the consumers on the highest priority level if they have permits. Otherwise, the broker considers consumers on the next priority level.

Example 1
If a subscription has consumer A with priorityLevel 0 and consumer B with priorityLevel 1, then the broker only dispatches messages to consumer A until it runs out permits and then starts dispatching messages to consumer B.
Example 2
Consumer Priority, Level, Permits C1, 0, 2 C2, 0, 1 C3, 0, 1 C4, 1, 2 C5, 1, 1
The order in which a broker dispatches messages to consumers is: C1, C2, C3, C1, C4, C5, C4.
pulsar.consumer.properties
Map A name or value property of this consumer. properties is application defined metadata attached to a consumer. When getting a topic stats, associate this metadata with the consumer stats for easier identification.
pulsar.consumer.readCompacted
false Boolean If enabling readCompacted, a consumer reads messages from a compacted topic rather than reading a full message backlog of a topic.
A consumer only sees the latest value for each key in the compacted topic, up until reaching the point in the topic message when compacting backlog. Beyond that point, send messages as normal.
Only enabling readCompacted on subscriptions to persistent topics, which have a single active consumer (like failure or exclusive subscriptions).
Attempting to enable it on subscriptions to non-persistent topics or on shared subscriptions leads to a subscription call throwing a PulsarClientException.
pulsar.consumer.receiverQueueSize
1000 Integer Size of a consumer's receiver queue.
For example, the number of messages accumulated by a consumer before an application calls Receive.
A value higher than the default value increases consumer throughput, though at the expense of more memory utilization.
pulsar.consumer.replicateSubscriptionState
false Boolean If replicateSubscriptionState is enabled, a subscription state is replicated to geo-replicated clusters.
pulsar.consumer.retryEnable
false Boolean If enabled, the consumer will automatically retry messages.
pulsar.consumer.subscriptionMode
Durable

Enum

Select the subscription mode to be used when subscribing to the topic.
  • Durable: Make the subscription to be backed by a durable cursor that will retain messages and persist the current position.
  • NonDurable: Lightweight subscription mode that doesn't have a durable cursor associated


Possible values:
  • "Durable"
  • "NonDurable"
pulsar.consumer.subscriptionName
(none) String Specify the subscription name for this consumer. This argument is required when constructing the consumer.
pulsar.consumer.subscriptionType
Shared

Enum

Subscription type.

Four subscription types are available:
  • Exclusive
  • Failover
  • Shared
  • Key_Shared


Possible values:
  • "Exclusive"
  • "Shared"
  • "Failover"
  • "Key_Shared"
pulsar.consumer.tickDurationMillis
1000 Long Granularity (in ms) of the ack-timeout redelivery.
A greater (for example, 1 hour) tickDurationMillis reduces the memory overhead to track messages.

Pulsar Source配置项 #

下述配置主要用于性能调优或者是控制消息确认的行为。如非必要,可以不用强制配置。

Key Default Type Description
pulsar.source.autoCommitCursorInterval
5000 Long This option is used only when the user disables the checkpoint and uses Exclusive or Failover subscription. We would automatically commit the cursor using the given period (in ms).
pulsar.source.enableAutoAcknowledgeMessage
false Boolean Flink commits the consuming position with pulsar transactions on checkpoint. However, if you have disabled the Flink checkpoint or disabled transaction for your Pulsar cluster, ensure that you have set this option to true.
The source would use pulsar client's internal mechanism and commit cursor in two ways.
  • For Key_Shared and Shared subscription, the cursor would be committed once the message is consumed.
  • For Exclusive and Failover subscription, the cursor would be committed in a given interval.
pulsar.source.maxFetchRecords
100 Integer The maximum number of records to fetch to wait when polling. A longer time increases throughput but also latency. A fetch batch might be finished earlier because of pulsar.source.maxFetchTime.
pulsar.source.maxFetchTime
10000 Long The maximum time (in ms) to wait when fetching records. A longer time increases throughput but also latency. A fetch batch might be finished earlier because of pulsar.source.maxFetchRecords.
pulsar.source.partitionDiscoveryIntervalMs
30000 Long The interval (in ms) for the Pulsar source to discover the new partitions. A non-positive value disables the partition discovery.
pulsar.source.transactionTimeoutMillis
10800000 Long This option is used in Shared or Key_Shared subscription. You should configure this option when you do not enable the pulsar.source.enableAutoAcknowledgeMessage option.
The value (in ms) should be greater than the checkpoint interval.
pulsar.source.verifyInitialOffsets
WARN_ON_MISMATCH

Enum

Upon (re)starting the source, check whether the expected message can be read. If failure is enabled, the application fails. Otherwise, it logs a warning. A possible solution is to adjust the retention settings in Pulsar or ignoring the check result.

Possible values:
  • "FAIL_ON_MISMATCH"
  • "WARN_ON_MISMATCH"

动态分区发现 #

为了能在启动 Flink 任务之后还能发现在 Pulsar 上扩容的分区或者是新创建的 Topic,Pulsar Source 提供了动态分区发现机制。该机制不需要重启 Flink 任务。对选项 PulsarSourceOptions.PULSAR_PARTITION_DISCOVERY_INTERVAL_MS 设置一个正整数即可启用。

// 10 秒查询一次分区信息
PulsarSource.builder()
        .setConfig(PulsarSourceOptions.PULSAR_PARTITION_DISCOVERY_INTERVAL_MS, 10000);
默认情况下,Pulsar 启用动态分区发现,查询间隔为 30 秒。用户可以给定一个负数,将该功能禁用。如果使用批的方式消费数据,将无法启用该功能。

事件时间和水位线 #

默认情况下,Pulsar Source 使用 Pulsar 的 Message<byte[]> 里面的时间作为解析结果的时间戳。用户可以使用 WatermarkStrategy 来自行解析出想要的消息时间,并向下游传递对应的水位线。

env.fromSource(pulsarSource, new CustomWatermarkStrategy(), "Pulsar Source With Custom Watermark Strategy");

这篇文档详细讲解了如何定义 WatermarkStrategy

消息确认 #

一旦在 Topic 上创建了订阅,消息便会存储在 Pulsar 里。即使没有消费者,消息也不会被丢弃。只有当 Pulsar Source 同 Pulsar 确认此条消息已经被消费,该消息才以某种机制会被移除。Pulsar Source 支持四种订阅方式,它们的消息确认方式也大不相同。

独占和灾备订阅下的消息确认 #

独占灾备 订阅下,Pulsar Source 使用累进式确认方式。确认某条消息已经被处理时,其前面消息会自动被置为已读。Pulsar Source 会在 Flink 完成检查点时将对应时刻消费的消息置为已读,以此来保证 Pulsar 状态与 Flink 状态一致。

如果用户没有在 Flink 上启用检查点,Pulsar Source 可以使用周期性提交来将消费状态提交给 Pulsar,使用配置 PulsarSourceOptions.PULSAR_AUTO_COMMIT_CURSOR_INTERVAL 来进行定义。

需要注意的是,此种场景下,Pulsar Source 并不依赖于提交到 Pulsar 的状态来做容错。消息确认只是为了能在 Pulsar 端看到对应的消费处理情况。

共享和 key 共享订阅下的消息确认 #

共享key 共享 需要依次确认每一条消息,所以 Pulsar Source 在 Pulsar 事务里面进行消息确认,然后将事务提交到 Pulsar。

首先需要在 Pulsar 的 borker.conf 文件里面启用事务:

transactionCoordinatorEnabled=true

Pulsar Source 创建的事务的默认超时时间为 3 小时,请确保这个时间大于 Flink 检查点的间隔。用户可以使用 PulsarSourceOptions.PULSAR_TRANSACTION_TIMEOUT_MILLIS 来设置事务的超时时间。

如果用户无法启用 Pulsar 的事务,或者是因为项目禁用了检查点,需要将 PulsarSourceOptions.PULSAR_ENABLE_AUTO_ACKNOWLEDGE_MESSAGE 选项设置为 true,消息从 Pulsar 消费后会被立刻置为已读。Pulsar Source 无法保证此种场景下的消息一致性。

Pulsar Source 在 Pulsar 上使用日志的形式记录某个事务下的消息确认,为了更好的性能,请缩短 Flink 做检查点的间隔。

升级至最新的连接器 #

常见的升级步骤,请参阅升级应用程序和 Flink 版本。Pulsar 连接器没有在 Flink 端存储消费的状态,所有的消费信息都推送到了 Pulsar。所以需要注意下面的事项:

  • 不要同时升级 Pulsar 连接器和 Pulsar 服务端的版本。
  • 使用最新版本的 Pulsar 客户端来消费消息。

问题诊断 #

使用 Flink 和 Pulsar 交互时如果遇到问题,由于 Flink 内部实现只是基于 Pulsar 的 Java 客户端管理 API 而开发的。

用户遇到的问题可能与 Flink 无关,请先升级 Pulsar 的版本、Pulsar 客户端的版本,或者修改 Pulsar 的配置、Pulsar 连接器的配置来尝试解决问题。

在 Java 11 上使用不稳定 #

Pulsar connector 在 Java 11 中有一些尚未修复的问题。我们当前推荐在 Java 8 环境中运行Pulsar connector.

Back to top