Pulsar

Apache Pulsar 连接器 #

Flink 当前提供 Apache Pulsar Source 和 Sink 连接器,用户可以使用它从 Pulsar 读取数据,并保证每条数据只被处理一次。

添加依赖 #

当前支持 Pulsar 2.10.0 及其之后的版本,建议在总是将 Pulsar 升级至最新版。如果想要了解更多关于 Pulsar API 兼容性设计,可以阅读文档 PIP-72

There is no connector (yet) available for Flink version 1.20.

为了在 PyFlink 作业中使用 ,需要添加下列依赖:

Version PyFlink JAR
flink-connector-pulsar There is no SQL jar (yet) available for Flink version 1.20.
在 PyFlink 中如何添加 JAR 包依赖请参考 Python 依赖管理

Flink 的流连接器并不会放到发行文件里面一同发布,阅读此文档,了解如何将连接器添加到集群实例内。

Pulsar Source #

Pulsar Source 基于 Flink 最新的批流一体 API 进行开发。

使用示例 #

Pulsar Source 提供了 builder 类来构造 PulsarSource 实例。下面的代码实例使用 builder 类创建的实例会从 “persistent://public/default/my-topic” 的数据开始端进行消费。对应的 Pulsar Source 使用了 Exclusive(独占)的订阅方式消费消息,订阅名称为 my-subscription,并把消息体的二进制字节流以 UTF-8 的方式编码为字符串。

PulsarSource<String> source = PulsarSource.builder()
    .setServiceUrl(serviceUrl)
    .setAdminUrl(adminUrl)
    .setStartCursor(StartCursor.earliest())
    .setTopics("my-topic")
    .setDeserializationSchema(new SimpleStringSchema())
    .setSubscriptionName("my-subscription")
    .build();

env.fromSource(source, WatermarkStrategy.noWatermarks(), "Pulsar Source");
pulsar_source = PulsarSource.builder() \
    .set_service_url('pulsar://localhost:6650') \
    .set_admin_url('http://localhost:8080') \
    .set_start_cursor(StartCursor.earliest()) \
    .set_topics("my-topic") \
    .set_deserialization_schema(SimpleStringSchema()) \
    .set_subscription_name('my-subscription') \
    .build()

env.from_source(source=pulsar_source,
                watermark_strategy=WatermarkStrategy.for_monotonous_timestamps(),
                source_name="pulsar source")

如果使用构造类构造 PulsarSource,一定要提供下面几个属性:

  • Pulsar 数据消费的地址,使用 setServiceUrl(String) 方法提供。
  • Pulsar HTTP 管理地址,使用 setAdminUrl(String) 方法提供。
  • Pulsar 订阅名称,使用 setSubscriptionName(String) 方法提供。
  • 需要消费的 Topic 或者是 Topic 下面的分区,详见指定消费的 Topic 或者 Topic 分区
  • 解码 Pulsar 消息的反序列化器,详见反序列化器

指定消费的 Topic 或者 Topic 分区 #

Pulsar Source 提供了两种订阅 Topic 或 Topic 分区的方式。

  • Topic 列表,从这个 Topic 的所有分区上消费消息,例如:

    PulsarSource.builder().setTopics("some-topic1", "some-topic2");
    
    // 从 topic "topic-a" 的 0 和 2 分区上消费
    PulsarSource.builder().setTopics("topic-a-partition-0", "topic-a-partition-2");
    
    PulsarSource.builder().set_topics(["some-topic1", "some-topic2"])
    
    # 从 topic "topic-a" 的 0 和 2 分区上消费
    PulsarSource.builder().set_topics(["topic-a-partition-0", "topic-a-partition-2"])
    

  • Topic 正则,Pulsar Source 使用给定的正则表达式匹配出所有合规的 Topic,例如:

    PulsarSource.builder().setTopicPattern("topic-.*");
    
    PulsarSource.builder().set_topic_pattern("topic-.*")
    

Topic 名称简写 #

从 Pulsar 2.0 之后,完整的 Topic 名称格式为 {persistent|non-persistent}://租户/命名空间/topic。但是 Pulsar Source 不需要提供 Topic 名称的完整定义,因为 Topic 类型、租户、命名空间都设置了默认值。

Topic 属性 默认值
Topic 类型 persistent
租户 public
命名空间 default

下面的表格提供了当前 Pulsar Topic 支持的简写方式:

Topic 名称简写 翻译后的 Topic 名称
my-topic persistent://public/default/my-topic
my-tenant/my-namespace/my-topic persistent://my-tenant/my-namespace/my-topic
对于 Non-persistent(非持久化)Topic,Pulsar Source 不支持简写名称。所以无法将 non-persistent://public/default/my-topic 简写成 non-persistent://my-topic

Pulsar Topic 层次结构 #

对于 Pulsar 而言,Topic 分区也是一种 Topic。Pulsar 会将一个有分区的 Topic 在内部按照分区的大小拆分成等量的无分区 Topic。

由于 Pulsar 内部的分区实际实现为一个 Topic,我们将用“分区”来指代“仅有一个分区的 Topic(Non-partitioned Topic)”和“具有多个分区的 Topic 下属的分区”。

例如,在 Pulsar 的 sample 租户下面的 flink 命名空间里面创建了一个有 3 个分区的 Topic,给它起名为 simple-string。可以在 Pulsar 上看到如下的 Topic 列表:

Topic 名称 是否分区
persistent://sample/flink/simple-string
persistent://sample/flink/simple-string-partition-0
persistent://sample/flink/simple-string-partition-1
persistent://sample/flink/simple-string-partition-2

这意味着,用户可以用上面的子 Topic 去直接消费分区里面的数据,不需要再去基于上层的父 Topic 去消费全部分区的数据。例如:使用 PulsarSource.builder().setTopics("sample/flink/simple-string-partition-1", "sample/flink/simple-string-partition-2") 将会只消费 Topic sample/flink/simple-string 分区 1 和 2 里面的消息。

配置 Topic 正则表达式 #

Pulsar source 只支持使用正则表达式在某个 tenant 下的某个 namespace 内订阅一组 Topic。Topic 有 persistentnon-persistent 两种类型,但 Topic 类型并不从正则里面获取。即使你使用类似 PulsarSource.builder().setTopicPattern("non-persistent://public/default/my-topic.*") 的表达式去订阅消息,我们同样会基于 persistentnon-persistent 类型的 Topic 去过滤出名称满足正则 public/default/my-topic.* 的 Topic。

如果只想订阅 non-persistent 类型的 Topic,你需要给定 RegexSubscriptionModeRegexSubscriptionMode.NonPersistentOnly,例如:setTopicPattern("topic-.*", RegexSubscriptionMode.NonPersistentOnly)。如果表达式类似 setTopicPattern("topic-.*", RegexSubscriptionMode.PersistentOnly),将只会基于 persistent 的 Topic 进行正则过滤。

正则表达式需要遵循对应的命名格式,只有 Topic 名称部分可以使用正则表达式。举例来说,如果你的正则表达式为 some-topic-\d,我们将会在 public 租户下的 default 命名空间去过滤 Topic。如果正则表达式为 flink/sample/topic-.*,我们将会在 flink 租户下的 sample 命名空间去过滤 Topic。

当前刚刚发布的 2.11.0 版本的 Pulsar 无法正确地返回 non-persistent 类型的 Topic。在 2.11.0 的 Pulsar 上无法使用正则订阅 non-persistent 类型的 Topic。

查阅此链接获取相关问题的上下文:https://github.com/apache/pulsar/issues/19316

反序列化器 #

反序列化器用于解析 Pulsar 消息,Pulsar Source 使用 PulsarDeserializationSchema 来定义反序列化器。用户可以在 builder 类中使用 setDeserializationSchema(PulsarDeserializationSchema) 方法配置反序列化器。

如果用户只关心消息体的二进制字节流,并不需要其他属性来解析数据。可以直接使用预定义的 PulsarDeserializationSchema。Pulsar Source里面提供了 3 种预定义的反序列化器。

  • 使用 Pulsar 的 Schema 解析消息。如果使用 KeyValue 或者 Struct 类型的 Schema, 那么 Pulsar 的 Schema 将不会含有类型类信息, 但 PulsarSchemaTypeInformation 需要通过传入类型类信息来构造。因此我们提供的 API 支持用户传入类型信息。
    // 基础数据类型
    PulsarSourceBuilder.setDeserializationSchema(Schema);
    
    // 结构类型 (JSON, Protobuf, Avro, etc.)
    PulsarSourceBuilder.setDeserializationSchema(Schema, Class);
    
    // 键值对类型
    PulsarSourceBuilder.setDeserializationSchema(Schema, Class, Class);
    
  • 使用 Flink 的 DeserializationSchema 解析消息。
    PulsarSourceBuilder.setDeserializationSchema(DeserializationSchema);
    
  • 使用 Flink 的 TypeInformation 解析消息。
    PulsarSourceBuilder.setDeserializationSchema(TypeInformation, ExecutionConfig);
    

Pulsar 的 Message<byte[]> 包含了很多 额外的属性。例如,消息的 key、消息发送时间、消息生产时间、用户在消息上自定义的键值对属性等。可以使用 Message<byte[]> 接口来获取这些属性。

如果用户需要基于这些额外的属性来解析一条消息,可以实现 PulsarDeserializationSchema 接口。并一定要确保 PulsarDeserializationSchema.getProducedType() 方法返回的 TypeInformation 是正确的结果。Flink 使用 TypeInformation 将解析出来的结果序列化传递到下游算子。

在 Source 中启用 Schema Evolution #

同时使用 Pulsar 的 Schema 以及在 builder 中指定 PulsarSourceBuilder.enableSchemaEvolution() 可以启用 Schema evolution 特性。该特性会使用 Pulsar Broker 端提供的 Schema 版本兼容性检测以及 Schema 版本演进。下列示例展示了如何启用 Schema Evolution。

Schema<SomePojo> schema = Schema.AVRO(SomePojo.class);

PulsarSource<SomePojo> source = PulsarSource.builder()
    ...
    .setDeserializationSchema(schema, SomePojo.class)
    .enableSchemaEvolution()
    .build();

如果使用 Pulsar 原生的 Schema 来反序列化消息却不启用 Schema Evolution 特性,我们将会跳过 Schema 兼容性检查,解析一些消息时可能会遇到未知的错误。

使用动态 Schema #

Pulsar 提供了 Schema.AUTO_CONSUME() 这个 Schema 来消费消息。它支持所有的 Topic,并常常用于消费 Topic 中存在多种 Schema 的情况。使用此 Schema 消费时,Pulsar 会将消息解析为 GenericRecord

但是 PulsarSourceBuilder.setDeserializationSchema(Schema) 等方法并不支持 Schema.AUTO_CONSUME() 类型。所以,我们提供了 GenericRecordDeserializer 接口来让用户定义如何解析 GenericRecord。在 PulsarSourceBuilder.setDeserializationSchema(GenericRecordDeserializer) 方法内予以指定即可。

GenericRecordDeserializer<SomePojo> deserializer = ...
PulsarSource<SomePojo> source = PulsarSource.builder()
    ...
    .setDeserializationSchema(deserializer)
    .build();
当前动态 Schema 只支持 AVRO,JSON 和 Protobuf 类型的消息解析。

定义 RangeGenerator #

如果想在 Pulsar Source 里面使用 Key_Shared 订阅,需要提供 RangeGenerator 实例。RangeGenerator 会生成一组消息 key 的 hash 范围,Pulsar Source 会基于给定的范围来消费数据。

由于 Pulsar 并未提供 Key 的 Hash 计算方法,所以我们在 Flink 中提供了名为 FixedKeysRangeGenerator 的实现,你可以在 builder 中依次提供需要消费的 Key 内容即可。但需要注意的是,Pulsar 的 Key Hash 值并不对应唯一的一个 Key,所以如果你只想消费某几个 Key 的消息,还需要在后面的代码中使用 DataStream.filter() 方法来过滤出对应的消息。

起始消费位置 #

Pulsar Source 使用 setStartCursor(StartCursor) 方法给定开始消费的位置。内置的开始消费位置有:

  • 从 Topic 里面最早的一条消息开始消费。

    StartCursor.earliest();
    
    StartCursor.earliest()
    

  • 从 Topic 里面最新的一条消息开始消费。

    StartCursor.latest();
    
    StartCursor.latest()
    

  • 从给定的消息开始消费。

    StartCursor.fromMessageId(MessageId);
    
    StartCursor.from_message_id(message_id)
    

  • 与前者不同的是,给定的消息可以跳过,再进行消费。

    StartCursor.fromMessageId(MessageId, boolean);
    
    StartCursor.from_message_id(message_id, boolean)
    

  • 从给定的消息发布时间开始消费,这个方法因为名称容易导致误解现在已经不建议使用。你可以使用方法 StartCursor.fromPublishTime(long)

    StartCursor.fromMessageTime(long);
    
    StartCursor.from_message_time(int)
    

  • 从给定的消息发布时间开始消费。

    StartCursor.fromPublishTime(long);
    
    StartCursor.from_publish_time(int)
    

StartCursor 仅用于创建订阅的初始位置,消费初始位置的优先级顺序为:检查点 > 已有订阅里的起始位置 > StartCursor。但有时候,用户想一直使用 StartCursor 给定的位置作为起始消费的位置。此时,你可以通过启用 pulsar.source.resetSubscriptionCursor 配置项,并不基于 checkpoint 对应的文件启动程序来实现对应的诉求。需要注意的是,checkpoint 中给定的消费位置永远是最高优先级的。

每条消息都有一个固定的序列号,这个序列号在 Pulsar 上有序排列,其包含了 ledger、entry、partition 等原始信息,用于在 Pulsar 底层存储上查找到具体的消息。

Pulsar 称这个序列号为 MessageId,用户可以使用 DefaultImplementation.newMessageId(long ledgerId, long entryId, int partitionIndex) 创建它。

边界 #

Pulsar Source 默认情况下使用流的方式消费数据。除非任务失败或者被取消,否则将持续消费数据。用户可以使用 setBoundedStopCursor(StopCursor) 给定停止消费的位置,这种情况下会使用批的方式进行消费。使用流的方式一样可以给定停止位置,使用 setUnboundedStopCursor(StopCursor) 方法即可。

在批模式下,使用 setBoundedStopCursor(StopCursor) 来指定一个消费停止位置。

内置的停止消费位置如下:

  • 永不停止。

    StopCursor.never();
    
    StopCursor.never()
    

  • 停止于 Pulsar 启动时 Topic 里面最新的那条数据。

    StopCursor.latest();
    
    StopCursor.latest()
    

  • 停止于某条消息,结果里不包含此消息。

    StopCursor.atMessageId(MessageId);
    
    StopCursor.at_message_id(message_id)
    

  • 停止于某条消息之后,结果里包含此消息。

    StopCursor.afterMessageId(MessageId);
    
    StopCursor.after_message_id(message_id)
    

  • 停止于某个给定的消息事件时间戳,比如 Message<byte[]>.getEventTime(),消费结果里不包含此时间戳的消息。

    StopCursor.atEventTime(long);
    
    StopCursor.at_event_time(int)
    

  • 停止于某个给定的消息事件时间戳,比如 Message<byte[]>.getEventTime(),消费结果里包含此时间戳的消息。

    StopCursor.afterEventTime(long);
    
    StopCursor.after_event_time(int)
    

  • 停止于某个给定的消息发布时间戳,比如 Message<byte[]>.getPublishTime(),消费结果里不包含此时间戳的消息。

    StopCursor.atPublishTime(long);
    
    StopCursor.at_publish_time(int)
    

  • 停止于某个给定的消息发布时间戳,比如 Message<byte[]>.getPublishTime(),消费结果里包含此时间戳的消息。

    StopCursor.afterPublishTime(long);
    
    StopCursor.after_publish_time(int)
    

Source 配置项 #

除了前面提到的配置选项,Pulsar Source 还提供了丰富的选项供 Pulsar 专家使用,在 builder 类里通过 setConfig(ConfigOption<T>, T)setConfig(Configuration) 方法给定下述的全部配置。

Pulsar Java 客户端配置项 #

Pulsar Source 使用 Java 客户端来创建消费实例,相关的配置定义于 Pulsar 的 ClientConfigurationData 内。在 PulsarOptions 选项中,定义大部分的可供用户定义的配置。

Key Default Type Description
pulsar.client.authParamMap
(none) Map Parameters for the authentication plugin.
pulsar.client.authParams
(none) String Parameters for the authentication plugin.

Example:
key1:val1,key2:val2
pulsar.client.authPluginClassName
(none) String Name of the authentication plugin.
pulsar.client.concurrentLookupRequest
5000 Integer The number of concurrent lookup requests allowed to send on each broker connection to prevent overload on the broker. It should be configured with a higher value only in case of it requires to produce or subscribe on thousands of topic using a created PulsarClient
pulsar.client.connectionMaxIdleSeconds
180 Integer Release the connection if it is not used for more than [connectionMaxIdleSeconds] seconds. If [connectionMaxIdleSeconds] < 0, disabled the feature that auto release the idle connections
pulsar.client.connectionTimeoutMs
10000 Integer Duration (in ms) of waiting for a connection to a broker to be established.
If the duration passes without a response from a broker, the connection attempt is dropped.
pulsar.client.connectionsPerBroker
1 Integer The maximum number of connections that the client library will open to a single broker.
By default, the connection pool will use a single connection for all the producers and consumers. Increasing this parameter may improve throughput when using many producers over a high latency connection.
pulsar.client.dnsLookupBindAddress
(none) String The Pulsar client dns lookup bind address, default behavior is bind on 0.0.0.0:0 The default port is 0 which means random port. The max allowed port is 65535. The bind address should in host:port format.
pulsar.client.enableBusyWait
false Boolean Option to enable busy-wait settings.
This option will enable spin-waiting on executors and IO threads in order to reduce latency during context switches. The spinning will consume 100% CPU even when the broker is not doing any work. It is recommended to reduce the number of IO threads and BookKeeper client threads to only have fewer CPU cores busy.
pulsar.client.enableTransaction
false Boolean If transaction is enabled, start the transactionCoordinatorClient with PulsarClient.
pulsar.client.initialBackoffIntervalNanos
100000000 Long Default duration (in nanoseconds) for a backoff interval.
pulsar.client.keepAliveIntervalSeconds
30 Integer Interval (in seconds) for keeping connection between the Pulsar client and broker alive.
pulsar.client.listenerName
(none) String Configure the listenerName that the broker will return the corresponding advertisedListener.
pulsar.client.lookupTimeoutMs
-1 Integer Client lookup timeout (in milliseconds).
Lookup operations have a different load pattern to other operations. They can be handled by any broker, are not proportional to throughput, and are harmless to retry. Given this, it makes sense to allow them to retry longer than normal operation, especially if they experience a timeout.
By default, this is set to match operation timeout. This is to maintain legacy behaviour. However, in practice it should be set to 5-10x the operation timeout.
pulsar.client.maxBackoffIntervalNanos
60000000000 Long The maximum duration (in nanoseconds) for a backoff interval.
pulsar.client.maxLookupRedirects
20 Integer The maximum number of times a lookup-request redirections to a broker.
pulsar.client.maxLookupRequest
50000 Integer The maximum number of lookup requests allowed on each broker connection to prevent overload on the broker. It should be greater than pulsar.client.concurrentLookupRequest. Requests that inside pulsar.client.concurrentLookupRequest are already sent to broker, and requests beyond pulsar.client.concurrentLookupRequest and under maxLookupRequests will wait in each client cnx.
pulsar.client.maxNumberOfRejectedRequestPerConnection
50 Integer The maximum number of rejected requests of a broker in a certain period (30s) after the current connection is closed and the client creates a new connection to connect to a different broker.
pulsar.client.memoryLimitBytes
67108864 Long The limit (in bytes) on the amount of direct memory that will be allocated by this client instance.
Setting this to 0 will disable the limit.
pulsar.client.numIoThreads
1 Integer The number of threads used for handling connections to brokers.
pulsar.client.numListenerThreads
1 Integer The number of threads used for handling message listeners. The listener thread pool is shared across all the consumers and readers that are using a listener model to get messages. For a given consumer, the listener is always invoked from the same thread to ensure ordering.
pulsar.client.operationTimeoutMs
30000 Integer Operation timeout (in ms). Operations such as creating producers, subscribing or unsubscribing topics are retried during this interval. If the operation is not completed during this interval, the operation will be marked as failed.
pulsar.client.proxyProtocol
SNI

Enum

Protocol type to determine the type of proxy routing when a client connects to the proxy using pulsar.client.proxyServiceUrl.

Possible values:
  • "SNI"
pulsar.client.proxyServiceUrl
(none) String Proxy-service URL when a client connects to the broker via the proxy. The client can choose the type of proxy-routing.
pulsar.client.requestTimeoutMs
60000 Integer Maximum duration (in ms) for completing a request. This config option is not supported before Pulsar 2.8.1
pulsar.client.serviceUrl
(none) String Service URL provider for Pulsar service.
To connect to Pulsar using client libraries, you need to specify a Pulsar protocol URL.
You can assign Pulsar protocol URLs to specific clusters and use the Pulsar scheme.
  • This is an example of localhost: pulsar://localhost:6650.
  • If you have multiple brokers, the URL is as: pulsar://localhost:6550,localhost:6651,localhost:6652
  • A URL for a production Pulsar cluster is as: pulsar://pulsar.us-west.example.com:6650
  • If you use TLS authentication, the URL is as pulsar+ssl://pulsar.us-west.example.com:6651
pulsar.client.socks5ProxyAddress
(none) String Address of SOCKS5 proxy. It should in host:port format.
pulsar.client.socks5ProxyPassword
(none) String Password of SOCKS5 proxy.
pulsar.client.socks5ProxyUsername
(none) String User name of SOCKS5 proxy.
pulsar.client.sslProvider
(none) String The name of the security provider used for SSL connections. The default value is the default security provider of the JVM.
pulsar.client.statsIntervalSeconds
60 Long Interval between each stats info.
  • Stats is activated with positive statsInterval
  • Set statsIntervalSeconds to 1 second at least.
pulsar.client.tlsAllowInsecureConnection
false Boolean Whether the Pulsar client accepts untrusted TLS certificate from the broker.
pulsar.client.tlsCertificateFilePath
(none) String Path to the TLS certificate file.
pulsar.client.tlsCiphers
List<String> A list of cipher suites. This is a named combination of authentication, encryption, MAC and the key exchange algorithm used to negotiate the security settings for a network connection using the TLS or SSL network protocol. By default all the available cipher suites are supported.
pulsar.client.tlsHostnameVerificationEnable
false Boolean Whether to enable TLS hostname verification. It allows to validate hostname verification when a client connects to the broker over TLS. It validates incoming x509 certificate and matches provided hostname (CN/SAN) with the expected broker's host name. It follows RFC 2818, 3.1. Server Identity hostname verification.
pulsar.client.tlsKeyFilePath
(none) String Path to the TLS key file.
pulsar.client.tlsKeyStorePassword
(none) String The store password for the key store file.
pulsar.client.tlsKeyStorePath
(none) String The location of the key store file.
pulsar.client.tlsKeyStoreType
"JKS" String The file format of the key store file.
pulsar.client.tlsProtocols
List<String> The SSL protocol used to generate the SSLContext. By default, it is set TLS, which is fine for most cases. Allowed values in recent JVMs are TLS, TLSv1.3, TLSv1.2 and TLSv1.1.
pulsar.client.tlsTrustCertsFilePath
(none) String Path to the trusted TLS certificate file.
pulsar.client.tlsTrustStorePassword
(none) String The store password for the key store file.
pulsar.client.tlsTrustStorePath
(none) String The location of the trust store file.
pulsar.client.tlsTrustStoreType
"JKS" String The file format of the trust store file.
pulsar.client.useKeyStoreTls
false Boolean If TLS is enabled, whether use the KeyStore type as the TLS configuration parameter. If it is set to false, it means to use the default pem type configuration.
pulsar.client.useTcpNoDelay
true Boolean Whether to use the TCP no-delay flag on the connection to disable Nagle algorithm.
No-delay features ensures that packets are sent out on the network as soon as possible, and it is critical to achieve low latency publishes. On the other hand, sending out a huge number of small packets might limit the overall throughput. Therefore, if latency is not a concern, it is recommended to set this option to false.
By default, it is set to true.

Pulsar 管理 API 配置项 #

管理 API 用于查询 Topic 的元数据和用正则订阅的时候的 Topic 查找,它与 Java 客户端共享大部分配置。下面列举的配置只供管理 API 使用,PulsarOptions 包含了这些配置 。

Key Default Type Description
pulsar.admin.adminUrl
(none) String The Pulsar service HTTP URL for the admin endpoint. For example, http://my-broker.example.com:8080, or https://my-broker.example.com:8443 for TLS.
pulsar.admin.autoCertRefreshTime
300000 Integer The auto cert refresh time (in ms) if Pulsar admin supports TLS authentication.
pulsar.admin.connectTimeout
60000 Integer The connection time out (in ms) for the PulsarAdmin client.
pulsar.admin.readTimeout
60000 Integer The server response read timeout (in ms) for the PulsarAdmin client for any request.
pulsar.admin.requestRates
5 Integer It will add ratelimit for PulsarAdmin metadata requests, stands for requests per second.
pulsar.admin.requestRetries
5 Integer For PulsarAdmin request, it will retry until we get a success response, fail if we exhausted retry count.
pulsar.admin.requestTimeout
300000 Integer The server request timeout (in ms) for the PulsarAdmin client for any request.
pulsar.admin.requestWaitMillis
3000 Long For PulsarAdmin request, We will sleep the given time before retrying the failed request.

Pulsar 消费者 API 配置项 #

Pulsar 提供了消费者 API 和读者 API 两套 API 来进行数据消费,它们可用于不同的业务场景。Flink 上的 Pulsar Source 使用消费者 API 进行消费,它的配置定义于 Pulsar 的 ConsumerConfigurationData 内。Pulsar Source 将其中大部分的可供用户定义的配置定义于 PulsarSourceOptions 内。

Key Default Type Description
pulsar.consumer.ackReceiptEnabled
false Boolean Acknowledgement will return a receipt but this does not mean that the message will not be resent after getting the receipt.
pulsar.consumer.ackTimeoutMillis
0 Long The timeout (in ms) for unacknowledged messages, truncated to the nearest millisecond. The timeout needs to be greater than 1 second.
By default, the acknowledge timeout is disabled and that means that messages delivered to a consumer will not be re-delivered unless the consumer crashes.
When acknowledgement timeout being enabled, if a message is not acknowledged within the specified timeout it will be re-delivered to the consumer.
pulsar.consumer.acknowledgementsGroupTimeMicros
100000 Long Group a consumer acknowledgment for a specified time (in μs). By default, a consumer uses 100μs grouping time to send out acknowledgments to a broker. If the group time is set to 0, acknowledgments are sent out immediately. A longer ack group time is more efficient at the expense of a slight increase in message re-deliveries after a failure.
pulsar.consumer.autoAckOldestChunkedMessageOnQueueFull
false Boolean Buffering a large number of outstanding uncompleted chunked messages can bring memory pressure and it can be guarded by providing this pulsar.consumer.maxPendingChunkedMessage threshold. Once a consumer reaches this threshold, it drops the outstanding unchunked-messages by silently acknowledging if pulsar.consumer.autoAckOldestChunkedMessageOnQueueFull is true. Otherwise, it marks them for redelivery.
pulsar.consumer.autoScaledReceiverQueueSizeEnabled
true Boolean This option is enabled by default. The consumer receiver queue size is initialized with 1, and will double itself until it reaches the value set by pulsar.consumer.receiverQueueSize.
The feature should be able to reduce client memory usage.
pulsar.consumer.consumerName
(none) String The consumer name is informative and it can be used to identify a particular consumer instance from the topic stats.
pulsar.consumer.cryptoFailureAction
FAIL

Enum

The consumer should take action when it receives a message that can not be decrypted.
  • FAIL: this is the default option to fail messages until crypto succeeds.
  • DISCARD: silently acknowledge but do not deliver messages to an application.
  • CONSUME: deliver encrypted messages to applications. It is the application's responsibility to decrypt the message.

Fail to decompress the messages.
If messages contain batch messages, a client is not be able to retrieve individual messages in batch.
The delivered encrypted message contains EncryptionContext which contains encryption and compression information in. You can use an application to decrypt the consumed message payload.

Possible values:
  • "FAIL"
  • "DISCARD"
  • "CONSUME"
pulsar.consumer.deadLetterPolicy.deadLetterTopic
(none) String Name of the dead topic where the failed messages are sent.
pulsar.consumer.deadLetterPolicy.maxRedeliverCount
(none) Integer The maximum number of times that a message are redelivered before being sent to the dead letter queue.
pulsar.consumer.deadLetterPolicy.retryLetterTopic
(none) String Name of the retry topic where the failed messages are sent.
pulsar.consumer.expireTimeOfIncompleteChunkedMessageMillis
60000 Long If a producer fails to publish all the chunks of a message, the consumer can expire incomplete chunks if the consumer cannot receive all chunks in expire times (default 1 hour, in ms).
pulsar.consumer.maxPendingChunkedMessage
10 Integer The consumer buffers chunk messages into memory until it receives all the chunks of the original message. While consuming chunk-messages, chunks from the same message might not be contiguous in the stream and they might be mixed with other messages' chunks. So, consumer has to maintain multiple buffers to manage chunks coming from different messages. This mainly happens when multiple publishers are publishing messages on the topic concurrently or publishers failed to publish all chunks of the messages.
For example, there are M1-C1, M2-C1, M1-C2, M2-C2 messages.Messages M1-C1 and M1-C2 belong to the M1 original message while M2-C1 and M2-C2 belong to the M2 message.
Buffering a large number of outstanding uncompleted chunked messages can bring memory pressure and it can be guarded by providing this pulsar.consumer.maxPendingChunkedMessage threshold. Once, a consumer reaches this threshold, it drops the outstanding unchunked messages by silently acknowledging or asking the broker to redeliver messages later by marking it unacknowledged. This behavior can be controlled by the pulsar.consumer.autoAckOldestChunkedMessageOnQueueFull option.
pulsar.consumer.maxTotalReceiverQueueSizeAcrossPartitions
50000 Integer The maximum total receiver queue size across partitions.
This setting reduces the receiver queue size for individual partitions if the total receiver queue size exceeds this value.
pulsar.consumer.negativeAckRedeliveryDelayMicros
60000000 Long Delay (in μs) to wait before redelivering messages that failed to be processed.
When an application uses Consumer.negativeAcknowledge(Message), failed messages are redelivered after a fixed timeout.
pulsar.consumer.poolMessages
false Boolean Enable pooling of messages and the underlying data buffers.
pulsar.consumer.priorityLevel
0 Integer Priority level for a consumer to which a broker gives more priorities while dispatching messages in the subscription.
The broker follows descending priorities. For example, 0=max-priority, 1, 2,...

Example 1
If a subscription has consumer A with priorityLevel 0 and consumer B with priorityLevel 1, then the broker only dispatches messages to consumer A until it runs out permits and then starts dispatching messages to consumer B.
Example 2
Consumer Priority, Level, Permits C1, 0, 2 C2, 0, 1 C3, 0, 1 C4, 1, 2 C5, 1, 1
The order in which a broker dispatches messages to consumers is: C1, C2, C3, C1, C4, C5, C4.
pulsar.consumer.properties
Map A name or value property of this consumer. properties is application defined metadata attached to a consumer. When getting a topic stats, associate this metadata with the consumer stats for easier identification.
pulsar.consumer.readCompacted
false Boolean If enabling readCompacted, a consumer reads messages from a compacted topic rather than reading a full message backlog of a topic.
A consumer only sees the latest value for each key in the compacted topic, up until reaching the point in the topic message when compacting backlog. Beyond that point, send messages as normal.
Only enabling readCompacted on subscriptions to persistent topics, which have a single active consumer (like failure or exclusive subscriptions).
Attempting to enable it on subscriptions to non-persistent topics leads to a subscription call throwing a PulsarClientException.
pulsar.consumer.receiverQueueSize
1000 Integer Size of a consumer's receiver queue.
For example, the number of messages accumulated by a consumer before an application calls Receive.
A value higher than the default value increases consumer throughput, though at the expense of more memory utilization.
pulsar.consumer.replicateSubscriptionState
false Boolean If replicateSubscriptionState is enabled, a subscription state is replicated to geo-replicated clusters.
pulsar.consumer.retryEnable
false Boolean If enabled, the consumer will automatically retry messages.
pulsar.consumer.subscriptionMode
Durable

Enum

Select the subscription mode to be used when subscribing to the topic.
  • Durable: Make the subscription to be backed by a durable cursor that will retain messages and persist the current position.
  • NonDurable: Lightweight subscription mode that doesn't have a durable cursor associated


Possible values:
  • "Durable"
  • "NonDurable"
pulsar.consumer.subscriptionName
(none) String Specify the subscription name for this consumer. This argument is required when constructing the consumer.
pulsar.consumer.subscriptionProperties
(none) Map Subscription properties is an optional attribute, which can be set when subscribing to topic. These properties cannot be modified. We can only delete the subscription and create it again.
pulsar.consumer.tickDurationMillis
1000 Long Granularity (in ms) of the ack-timeout redelivery.
A greater (for example, 1 hour) tickDurationMillis reduces the memory overhead to track messages.

Pulsar Source配置项 #

下述配置主要用于性能调优或者是控制消息确认的行为。如非必要,可以不用强制配置。

Key Default Type Description
pulsar.source.allowKeySharedOutOfOrderDelivery
false Boolean If enabled, it will relax the ordering requirement, allowing the broker to send out-of-order messages in case of failures. This will make it faster for new consumers to join without being stalled by an existing slow consumer.
In this case, a single consumer will still receive all the keys, but they may be coming in different orders.
pulsar.source.autoCommitCursorInterval
5000 Long This option is used only when the user disables the checkpoint and uses Exclusive or Failover subscription. We would automatically commit the cursor using the given period (in ms).
pulsar.source.enableAutoAcknowledgeMessage
false Boolean Flink commits the consuming position with pulsar transactions on checkpoint. However, if you have disabled the Flink checkpoint or disabled transaction for your Pulsar cluster, ensure that you have set this option to true.
The source would use pulsar client's internal mechanism and commit cursor in a given interval.
pulsar.source.enableMetrics
true Boolean The metrics from Pulsar Consumer are only exposed if you enable this option.You should set the pulsar.client.statsIntervalSeconds to a positive value if you enable this option.
pulsar.source.enableSchemaEvolution
false Boolean If you enable this option and use PulsarSourceBuilder.setDeserializationSchema(Schema), we would consume and deserialize the message by using Pulsar's Schema interface with extra schema evolution check.
pulsar.source.fetchOneMessageTime
(none) Integer The time (in ms) for fetching one message from Pulsar. If time exceed and no message returned from Pulsar. We would consider there is no record at the current topic partition and stop fetching until next switch.
It's not configured by default. We will use the remaining time in pulsar.source.maxFetchTime by default, which may cause a long wait in small message rates. Add this option in source builder avoiding waiting too long.
pulsar.source.maxFetchRecords
100 Integer The maximum number of records to fetch to wait when polling. A longer time increases throughput but also latency. A fetch batch might be finished earlier because of pulsar.source.maxFetchTime.
pulsar.source.maxFetchTime
10000 Long The maximum time (in ms) to wait when fetching records. A longer time increases throughput but also latency. A fetch batch might be finished earlier because of pulsar.source.maxFetchRecords.
pulsar.source.partitionDiscoveryIntervalMs
300000 Long The interval (in ms) for the Pulsar source to discover the new partitions. A non-positive value disables the partition discovery.
pulsar.source.resetSubscriptionCursor
false Boolean The StartCursor in connector is used to create the initial subscription. Enable this option will reset the start cursor in subscription by using StartCursor everytime you start the application without the checkpoint.
pulsar.source.verifyInitialOffsets
WARN_ON_MISMATCH

Enum

Upon (re)starting the source, check whether the expected message can be read. If failure is enabled, the application fails. Otherwise, it logs a warning. A possible solution is to adjust the retention settings in Pulsar or ignoring the check result.

Possible values:
  • "FAIL_ON_MISMATCH": Fail the consuming from Pulsar when we don't find the related cursor.
  • "WARN_ON_MISMATCH": Print a warn message and start consuming from the valid offset.

动态分区发现 #

为了能在启动 Flink 任务之后还能发现在 Pulsar 上扩容的分区或者是新创建的 Topic,Pulsar Source 提供了动态分区发现机制。该机制不需要重启 Flink 任务。对选项 PulsarSourceOptions.PULSAR_PARTITION_DISCOVERY_INTERVAL_MS 设置一个正整数即可启用。

// 10 秒查询一次分区信息
PulsarSource.builder()
        .setConfig(PulsarSourceOptions.PULSAR_PARTITION_DISCOVERY_INTERVAL_MS, 10000);
# 10 秒查询一次分区信息
PulsarSource.builder()
    .set_config("pulsar.source.partitionDiscoveryIntervalMs", 10000)
  • 默认情况下,Pulsar 启用分区发现,查询间隔为 5 分钟。用户可以给定一个负数,将该功能禁用。如果使用批的方式消费数据,将无法启用该功能。
  • 如果需要禁用分区发现功能,你需要将查询间隔设置为负值。
  • 在 bounded 消费模式下,即使将分区发现的查询间隔设置为正值,也会被禁用。

事件时间和水位线 #

默认情况下,Pulsar Source 使用 Pulsar 的 Message<byte[]> 里面的时间作为解析结果的时间戳。用户可以使用 WatermarkStrategy 来自行解析出想要的消息时间,并向下游传递对应的水位线。

env.fromSource(pulsarSource, new CustomWatermarkStrategy(), "Pulsar Source With Custom Watermark Strategy");
env.from_source(pulsar_source, CustomWatermarkStrategy(), "Pulsar Source With Custom Watermark Strategy")

这篇文档详细讲解了如何定义 WatermarkStrategy

消息确认 #

一旦在 Topic 上创建了订阅,消息便会存储在 Pulsar 里。即使没有消费者,消息也不会被丢弃。只有当 Flink 同 Pulsar 确认此条消息已经被消费,该消息才以某种机制会被移除。

我们使用 独占 作为默认的订阅模式。此订阅下,Pulsar Source 使用累进式确认方式。确认某条消息已经被处理时,其前面消息会自动被置为已读。Pulsar Source 会在 Flink 完成检查点时将对应时刻消费的消息置为已读,以此来保证 Pulsar 状态与 Flink 状态一致。

如果用户没有在 Flink 上启用检查点,Pulsar Source 可以使用周期性提交来将消费状态提交给 Pulsar,使用配置 PulsarSourceOptions.PULSAR_AUTO_COMMIT_CURSOR_INTERVAL 来进行定义。

需要注意的是,此种场景下,Pulsar Source 并不依赖于提交到 Pulsar 的状态来做容错。消息确认只是为了能在 Pulsar 端看到对应的消费处理情况。

Pulsar Sink #

Pulsar Sink 连接器可以将经过 Flink 处理后的数据写入一个或多个 Pulsar Topic 或者 Topic 下的某些分区。

Pulsar Sink 基于 Flink 最新的 Sink API 实现。

如果想要使用旧版的使用 SinkFunction 接口实现的 Sink 连接器,可以使用 StreamNative 维护的 pulsar-flink

使用示例 #

Pulsar Sink 使用 builder 类来创建 PulsarSink 实例。

下面示例展示了如何通过 Pulsar Sink 以“至少一次”的语义将字符串类型的数据发送给 topic1。

DataStream<String> stream = ...

PulsarSink<String> sink = PulsarSink.builder()
    .setServiceUrl(serviceUrl)
    .setAdminUrl(adminUrl)
    .setTopics("topic1")
    .setSerializationSchema(new SimpleStringSchema())
    .setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
    .build();

stream.sinkTo(sink);
stream = ...

pulsar_sink = PulsarSink.builder() \
    .set_service_url('pulsar://localhost:6650') \
    .set_admin_url('http://localhost:8080') \
    .set_topics("topic1") \
    .set_serialization_schema(SimpleStringSchema()) \
    .set_delivery_guarantee(DeliveryGuarantee.AT_LEAST_ONCE) \
    .build()

stream.sink_to(pulsar_sink)

下列为创建一个 PulsarSink 实例必需的属性:

  • Pulsar 数据消费的地址,使用 setServiceUrl(String) 方法提供。
  • Pulsar HTTP 管理地址,使用 setAdminUrl(String) 方法提供。
  • 需要发送到的 Topic 或者是 Topic 下面的分区,详见指定写入的 Topic 或者 Topic 分区
  • 编码 Pulsar 消息的序列化器,详见序列化器

在创建 PulsarSink 时,建议使用 setProducerName(String) 来指定 PulsarSink 内部使用的 Pulsar 生产者名称。这样方便在数据监控页面找到对应的生产者监控指标。

指定写入的 Topic 或者 Topic 分区 #

PulsarSink 指定写入 Topic 的方式和 Pulsar Source 指定消费的 Topic 或者 Topic 分区的方式类似。PulsarSink 支持以 mixin 风格指定写入的 Topic 或分区。因此,可以指定一组 Topic 或者分区或者是两者都有。

// Topic "some-topic1" 和 "some-topic2"
PulsarSink.builder().setTopics("some-topic1", "some-topic2")

// Topic "topic-a" 的分区 0 和 2
PulsarSink.builder().setTopics("topic-a-partition-0", "topic-a-partition-2")

// Topic "topic-a" 以及 Topic "some-topic2" 分区 0 和 2
PulsarSink.builder().setTopics("topic-a-partition-0", "topic-a-partition-2", "some-topic2")
# Topic "some-topic1" 和 "some-topic2"
PulsarSink.builder().set_topics(["some-topic1", "some-topic2"])

# Topic "topic-a" 的分区 0 和 2
PulsarSink.builder().set_topics(["topic-a-partition-0", "topic-a-partition-2"])

# Topic "topic-a" 以及 Topic "some-topic2" 分区 0 和 2
PulsarSink.builder().set_topics(["topic-a-partition-0", "topic-a-partition-2", "some-topic2"])

动态分区发现默认处于开启状态,这意味着 PulsarSink 将会周期性地从 Pulsar 集群中查询 Topic 的元数据来获取可能有的分区数量变更信息。使用 PulsarSinkOptions.PULSAR_TOPIC_METADATA_REFRESH_INTERVAL 配置项来指定查询的间隔时间。

可以选择实现 TopicRouter 接口来自定义消息路由策略。此外,阅读 Topic 名称简写将有助于理解 Pulsar 的分区在 Pulsar 连接器中的配置方式。

如果在 PulsarSink 中同时指定了某个 Topic 和其下属的分区,那么 PulsarSink 将会自动将两者合并,仅使用外层的 Topic。

举个例子,如果通过 PulsarSink.builder().setTopics("some-topic1", "some-topic1-partition-0") 来指定写入的 Topic,那么其结果等价于 PulsarSink.builder().setTopics("some-topic1")

基于消息实例的动态 Topic 指定 #

除了前面说的一开始就指定 Topic 或者是 Topic 分区,你还可以在程序启动后基于消息内容动态指定 Topic,只需要实现 TopicRouter 接口即可。使用 PulsarSinkContext.topicMetadata(String) 方法来查询某个 Topic 在 Pulsar 上有多少个分区,查询结果会缓存并在 PulsarSinkOptions.PULSAR_TOPIC_METADATA_REFRESH_INTERVAL 毫秒之后失效。

此方法同样支持将消息写入一个不存在的 Topic,可以在 TopicRouter 内返回想要创建的 Topic,连接器将会尝试创建。

如果使用 Topic 自动创建功能,需要在 Pulsar 的 broker.conf 配置文件内配置 allowAutoTopicCreation=true 来启用对应的功能。

broker.conf 配置文件的 allowAutoTopicCreationType 选项可以控制自动创建的 Topic 的类型。

  • non-partitioned: 默认配置,创建的 Topic 没有分区,并且不可以手动创建分区。
  • partitioned: 创建的 Topic 将按照 defaultNumPartitions 选项定义的个数创建对应的分区。

序列化器 #

序列化器(PulsarSerializationSchema)负责将 Flink 中的每条记录序列化成 byte 数组,并通过网络发送至指定的写入 Topic。和 Pulsar Source 类似的是,序列化器同时支持使用基于 Flink 的 SerializationSchema 接口实现序列化器和使用 Pulsar 原生的 Schema 类型实现的序列化器。不过序列化器并不支持 Pulsar 的 Schema.AUTO_PRODUCE_BYTES()

如果不需要指定 Message 接口中提供的 key 或者其他的消息属性,可以从上述 2 种预定义的 PulsarSerializationSchema 实现中选择适合需求的一种使用。

  • 使用 Pulsar 的 Schema 来序列化 Flink 中的数据。
    // 原始数据类型
    PulsarSinkBuilder.setSerializationSchema(Schema)
    
    // 有结构数据类型(JSON、Protobuf、Avro 等)
    PulsarSinkBuilder.setSerializationSchema(Schema, Class)
    
    // 键值对类型
    PulsarSinkBuilder.setSerializationSchema(Schema, Class, Class)
    
  • 使用 Flink 的 SerializationSchema 来序列化数据。
    PulsarSinkBuilder.setSerializationSchema(SerializationSchema)
    

在 Sink 中启用 Schema Evolution #

同时使用 Pulsar 的 Schema 以及在 builder 中指定 PulsarSinkBuilder.enableSchemaEvolution() 可以启用 Schema evolution 特性。该特性会使用 Pulsar Broker 端提供的 Schema 版本兼容性检测以及 Schema 版本演进。下列示例展示了如何启用 Schema Evolution。

Schema<SomePojo> schema = Schema.AVRO(SomePojo.class);

PulsarSink<String> sink = PulsarSink.builder()
    ...
    .setSerializationSchema(schema, SomePojo.class)
    .enableSchemaEvolution()
    .build();

如果想要使用 Pulsar 原生的 Schema 序列化消息而不需要 Schema Evolution 特性,那么写入的 Topic 会使用 Schema.BYTES 作为消息的 Schema。Pulsar 并不会存储 Schema.BYTES,所以通过此方式写入的 Topic 可能不存在 Schema 信息,对应 Topic 的消费者需要自己负责反序列化的工作。

例如,如果使用 PulsarSinkBuilder.setSerializationSchema(Schema.STRING) 而不使用 PulsarSinkBuilder.enableSchemaEvolution()。那么在写入 Topic 中所记录的消息 Schema 将会是 Schema.BYTES

PulsarMessage<byte[]> 类型的消息的校验 #

Pulsar 的 topic 至少会包含一种 Schema,Schema.BYTES 是默认的 Schema 类型并常作为没有 Schema 的 topic 的 Schema 类型。使用 Schema.BYTES 发送消息将会跳过类型检测,这意味着使用 SerializationSchema 和没有启用 Schema evolution 的 Schema 所发送的消息并不安全。

可以启用 pulsar.sink.validateSinkMessageBytes 选项来让链接器使用 Pulsar 提供的 Schema.AUTO_PRODUCE_BYTES() 发送消息。它会在发送字节数组消息时额外进行校验,与 topic 上最新版本的 Schema 进行对比,保证消息内容能正确。

但是,并非所有的 Pulsar 的 Schema 都支持校验字符串,所以在默认情况下我们禁用了此选项。可以按需启用。

自定义序列化器 #

可以通过继承 PulsarSerializationSchema 接口来实现自定义的序列化逻辑。接口需要返回一个类型为 PulsarMessage 的消息,此类型实例无法被直接创建,连接器提供了构造方法并定义了三种消息类型的构建。

  • 使用 Pulsar 的 Scheme 来构建消息,常用于你知道 topic 对应的 schema 是什么的时候。我们会检查你提供的 Schema 在 topic 上是否兼容。
    PulsarMessage.builder(Schema<M> schema, M message)
        ...
        .build();
    
  • 创建一个消息类型为字节数组的消息。默认情况下不进行 Schema 检查。
    PulsarMessage.builder(byte[] bytes)
        ...
        .build();
    
  • 创建一个消息体为空的墓碑消息。墓碑 是一种特殊的消息,并在 Pulsar 中提供了支持。
    PulsarMessage.builder()
        ...
        .build();
    

消息路由策略 #

在 Pulsar Sink 中,消息路由发生在于分区之间,而非上层 Topic。对于给定 Topic 的情况,路由算法会首先会查询出 Topic 之上所有的分区信息,并在这些分区上实现消息的路由。Pulsar Sink 默认提供 2 种路由策略的实现。

  • KeyHashTopicRouter:使用消息的 key 对应的哈希值来取模计算出消息对应的 Topic 分区。

    使用此路由可以将具有相同 key 的消息发送至同一个 Topic 分区。消息的 key 可以在自定义 PulsarSerializationSchema 时,在 serialize() 方法内使用 PulsarMessageBuilder.key(String key) 来予以指定。

    如果消息没有包含 key,此路由策略将从 Topic 分区中随机选择一个发送。

    可以使用 MessageKeyHash.JAVA_HASH 或者 MessageKeyHash.MURMUR3_32_HASH 两种不同的哈希算法来计算消息 key 的哈希值。使用 PulsarSinkOptions.PULSAR_MESSAGE_KEY_HASH 配置项来指定想要的哈希算法。

  • RoundRobinRouter:轮换使用用户给定的 Topic 分区。

    消息将会轮替地选取 Topic 分区,当往某个 Topic 分区里写入指定数量的消息后,将会轮换至下一个 Topic 分区。使用 PulsarSinkOptions.PULSAR_BATCHING_MAX_MESSAGES 指定向一个 Topic 分区中写入的消息数量。

还可以通过实现 TopicRouter 接口来自定义消息路由策略,请注意 TopicRouter 的实现需要能被序列化。

TopicRouter 内可以指定任意的 Topic 分区(即使这个 Topic 分区不在 setTopics() 指定的列表中)。因此,当使用自定义的 TopicRouter 时,PulsarSinkBuilder.setTopics 选项是可选的。

@PublicEvolving
public interface TopicRouter<IN> extends Serializable {

    TopicPartition route(IN in, List<TopicPartition> partitions, PulsarSinkContext context);

    default void open(SinkConfiguration sinkConfiguration) {
        // 默认无操作
    }
}

如前文所述,Pulsar 分区的内部被实现为一个无分区的 Topic,一般情况下 Pulsar 客户端会隐藏这个实现,并且提供内置的消息路由策略。Pulsar Sink 并没有使用 Pulsar 客户端提供的路由策略和封装,而是使用了 Pulsar 客户端更底层的 API 自行实现了消息路由逻辑。这样做的主要目的是能够在属于不同 Topic 的分区之间定义更灵活的消息路由策略。

详情请参考 Pulsar 的 partitioned topics 文档。

发送一致性 #

PulsarSink 支持三种发送一致性。

  • NONE:Flink 应用运行时可能出现数据丢失的情况。在这种模式下,Pulsar Sink 发送消息后并不会检查消息是否发送成功。此模式具有最高的吞吐量,可用于一致性没有要求的场景。
  • AT_LEAST_ONCE:每条消息至少有一条对应消息发送至 Pulsar,发送至 Pulsar 的消息可能会因为 Flink 应用重启而出现重复。
  • EXACTLY_ONCE:每条消息有且仅有一条对应消息发送至 Pulsar。发送至 Pulsar 的消息不会有重复也不会丢失。Pulsar Sink 内部依赖 Pulsar 事务和两阶段提交协议来保证每条记录都能正确发往 Pulsar。

如果想要使用 EXACTLY_ONCE,需要用户确保在 Flink 程序上启用 checkpoint,同时在 Pulsar 上启用事务。在此模式下,Pulsar sink 会将消息写入到某个未提交的事务下,并在成功执行完 checkpoint 后提交对应的事务。

基于 Pulsar 的设计,任何在开启的事务之后写入的消息是无法被消费到的。只有这个事务提交了,对应的消息才能被消费到。

消息延时发送 #

消息延时发送特性可以让指定发送的每一条消息需要延时一段时间后才能被下游的消费者所消费。当延时消息发送特性启用时,Pulsar Sink 会立刻将消息发送至 Pulsar Broker。但该消息在指定的延迟时间到达前将会保持对下游消费者不可见。

消息延时发送仅在 Shared 订阅模式下有效,在 ExclusiveFailover 模式下该特性无效。

可以使用 MessageDelayer.fixed(Duration) 创建一个 MessageDelayer 来为所有消息指定恒定的接收时延,或者实现 MessageDelayer 接口来为不同的消息指定不同的接收时延。

消息对下游消费者的可见时间应当基于 PulsarSinkContext.processTime() 计算得到。

Sink 配置项 #

可以在 builder 类里通过 setConfig(ConfigOption<T>, T)setConfig(Configuration) 方法给定下述的全部配置。

PulsarClient 和 PulsarAdmin 配置项 #

Pulsar Sink 和 Pulsar Source 公用的配置选项可参考

Pulsar 生产者 API 配置项 #

Pulsar Sink 使用生产者 API 来发送消息。Pulsar 的 ProducerConfigurationData 中大部分的配置项被映射为 PulsarSinkOptions 里的选项。

Key Default Type Description
pulsar.producer.batchingEnabled
true Boolean Enable batch send ability, it was enabled by default.
pulsar.producer.batchingMaxBytes
131072 Integer The maximum size of messages permitted in a batch. Keep the maximum consistent as previous versions.
pulsar.producer.batchingMaxMessages
1000 Integer The maximum number of messages permitted in a batch.
pulsar.producer.batchingMaxPublishDelayMicros
1000 Long Batching time period of sending messages.
pulsar.producer.batchingPartitionSwitchFrequencyByPublishDelay
10 Integer The maximum wait time for switching topic partitions.
pulsar.producer.chunkMaxMessageSize
-1 Integer Max chunk message size in bytes. Producer chunks the message if chunking is enabled and message size is larger than max chunk-message size. By default, chunkMaxMessageSize value is -1 and producer chunks based on max-message size configured at the broker.
pulsar.producer.chunkingEnabled
false Boolean If message size is higher than allowed max publish-payload size by broker, then enableChunking helps producer to split message into multiple chunks and publish them to broker separately and in order. So, it allows client to successfully publish large size of messages in pulsar.
pulsar.producer.compressionType
NONE

Enum

Message data compression type used by a producer.Available options:

Possible values:
  • "NONE"
  • "LZ4"
  • "ZLIB"
  • "ZSTD"
  • "SNAPPY"
pulsar.producer.initialSequenceId
(none) Long The sequence id for avoiding the duplication, it's used when Pulsar doesn't have transaction.
pulsar.producer.producerCryptoFailureAction
FAIL

Enum

The action the producer will take in case of encryption failures.

Possible values:
  • "FAIL"
  • "SEND"
pulsar.producer.producerName
(none) String A producer name which would be displayed in the Pulsar's dashboard. If no producer name was provided, we would use a Pulsar generated name instead.
pulsar.producer.properties
Map A name or value property of this consumer. properties is application defined metadata attached to a consumer. When getting a topic stats, associate this metadata with the consumer stats for easier identification.
pulsar.producer.sendTimeoutMs
30000 Long Message send timeout in ms.If a message is not acknowledged by a server before the sendTimeout expires, an error occurs.

Pulsar Sink 配置项 #

下述配置主要用于性能调优或者是控制消息确认的行为。如非必要,可以不用考虑配置。

Key Default Type Description
pulsar.sink.deliveryGuarantee
none

Enum

Optional delivery guarantee when committing.

Possible values:
  • "exactly-once": Records are only delivered exactly-once also under failover scenarios. To build a complete exactly-once pipeline is required that the source and sink support exactly-once and are properly configured.
  • "at-least-once": Records are ensured to be delivered but it may happen that the same record is delivered multiple times. Usually, this guarantee is faster than the exactly-once delivery.
  • "none": Records are delivered on a best effort basis. It is often the fastest way to process records but it may happen that records are lost or duplicated.
pulsar.sink.enableMetrics
true Boolean The metrics from Pulsar Producer are only exposed if you enable this option. You should set the pulsar.client.statsIntervalSeconds to a positive value if you enable this option.
pulsar.sink.enableSchemaEvolution
false Boolean If you enable this option and use PulsarSinkBuilder.setSerializationSchema(Schema), we would produce and serialize the message by using Pulsar's Schema.
pulsar.sink.maxRecommitTimes
5 Integer The allowed transaction recommit times if we meet some retryable exception. This is used in Pulsar Transaction.
pulsar.sink.messageKeyHash
murmur-3-32-hash

Enum

The hash policy for routing message by calculating the hash code of message key.

Possible values:
  • "java-hash": This hash would use String.hashCode() to calculate the message key string's hash code.
  • "murmur-3-32-hash": This hash would calculate message key's hash code by using Murmur3 algorithm.
pulsar.sink.topicMetadataRefreshInterval
1800000 Long Auto update the topic metadata in a fixed interval (in ms). The default value is 30 minutes.
pulsar.sink.transactionTimeoutMillis
10800000 Long This option is used when the user require the DeliveryGuarantee.EXACTLY_ONCE semantic. We would use transaction for making sure the message could be write only once.
pulsar.sink.validateSinkMessageBytes
false Boolean Pulsar client can validate the raw message bytes with the latest topic schema. This can make sure your serialized messages bytes is valid for consumer.

设计思想简述 #

Pulsar Sink 遵循 FLIP-191 中定义的 Sink API 设计。

无状态的 SinkWriter #

EXACTLY_ONCE 一致性下,Pulsar Sink 不会将事务相关的信息存放于检查点快照中。这意味着当 Flink 应用重启时,Pulsar Sink 会创建新的事务实例。上一次运行过程中任何未提交事务中的消息会因为超时中止而无法被下游的消费者所消费。这样的设计保证了 SinkWriter 是无状态的。

Pulsar Schema Evolution #

Pulsar Schema Evolution 允许用户在一个 Flink 应用程序中使用的数据模型发生特定改变后(比如向基于 ARVO 的 POJO 类中增加或删除一个字段),仍能使用同一个 Flink 应用程序的代码。

可以在 Pulsar 集群内指定哪些类型的数据模型的改变是被允许的,详情请参阅 Pulsar Schema Evolution

监控指标 #

默认情况下,Pulsar client 每隔 60 秒才会刷新一次监控数据。如果想要提高刷新频率,可以通过如下方式来将 Pulsar client 的监控数据刷新频率调整至相应值(最低为1s):

builder.setConfig(PulsarOptions.PULSAR_STATS_INTERVAL_SECONDS, 1L);
builder.set_config("pulsar.client.statsIntervalSeconds", "1")

Source 监控指标 #

FLIP-33: Standardize Connector Metrics 定义的基础 Source 指标之上,我们额外提供了一些来自 Client 的监控指标。你需要启用 pulsar.source.enableMetrics 选项来获得这些监控指标,所有的指标列举在下面的表格中。

builder.setConfig(PulsarSourceOptions.PULSAR_ENABLE_SOURCE_METRICS, true);
builder.set_config("pulsar.source.enableMetrics", "true")
指标 变量 描述 类型
PulsarConsumer.“Topic”.“ConsumerName”.numMsgsReceived Topic, ConsumerName 在过去的一个统计窗口内消费的消息数 Gauge
PulsarConsumer.“Topic”.“ConsumerName”.numBytesReceived Topic, ConsumerName 在过去的一个统计窗口内消费的字节数 Gauge
PulsarConsumer.“Topic”.“ConsumerName”.rateMsgsReceived Topic, ConsumerName 在过去的一个统计窗口内消费的消息速率 Gauge
PulsarConsumer.“Topic”.“ConsumerName”.rateBytesReceived Topic, ConsumerName 在过去的一个统计窗口内消费的字节速率 Gauge
PulsarConsumer.“Topic”.“ConsumerName”.numAcksSent Topic, ConsumerName 在过去的一个统计窗口内确认消费成功的消息数 Gauge
PulsarConsumer.“Topic”.“ConsumerName”.numAcksFailed Topic, ConsumerName 在过去的一个统计窗口内确认消费失败的消息数 Gauge
PulsarConsumer.“Topic”.“ConsumerName”.numReceiveFailed Topic, ConsumerName 在过去的一个统计窗口内消费失败的消息数 Gauge
PulsarConsumer.“Topic”.“ConsumerName”.numBatchReceiveFailed Topic, ConsumerName 在过去的一个统计窗口内批量消费失败的消息数 Gauge
PulsarConsumer.“Topic”.“ConsumerName”.totalMsgsReceived Topic, ConsumerName Consumer 消费的全部消息数 Gauge
PulsarConsumer.“Topic”.“ConsumerName”.totalBytesReceived Topic, ConsumerName Consumer 消费的全部字节数 Gauge
PulsarConsumer.“Topic”.“ConsumerName”.totalReceivedFailed Topic, ConsumerName Consumer 消费失败的消息数 Gauge
PulsarConsumer.“Topic”.“ConsumerName”.totalBatchReceivedFailed Topic, ConsumerName Consumer 批量消费失败的消息数 Gauge
PulsarConsumer.“Topic”.“ConsumerName”.totalAcksSent Topic, ConsumerName Consumer 确认消费成功的消息数 Gauge
PulsarConsumer.“Topic”.“ConsumerName”.totalAcksFailed Topic, ConsumerName Consumer 确认消费失败的消息数 Gauge
PulsarConsumer.“Topic”.“ConsumerName”.msgNumInReceiverQueue Topic, ConsumerName Consumer 当前待消费的消息队列大小 Gauge

Sink 监控指标 #

下列表格列出了当前 Sink 支持的监控指标,前 6 个指标是 FLIP-33: Standardize Connector Metrics 中规定的 Sink 连接器应当支持的标准指标。前 5 个指标会默认暴露给用户,其他指标需要通过启用 pulsar.sink.enableMetrics 选项来获得。

builder.setConfig(PulsarSinkOptions.PULSAR_ENABLE_SINK_METRICS, true);
builder.set_config("pulsar.sink.enableMetrics", "true")
指标 变量 描述 类型
numBytesOut n/a Pulsar Sink 启动后总共发出的字节数 Counter
numBytesOutPerSecond n/a 每秒发送的字节数 Meter
numRecordsOut n/a Pulsar Sink 启动后总共发出的消息数 Counter
numRecordsOutPerSecond n/a 每秒发送的消息数 Meter
numRecordsOutErrors n/a 总共发送消息失败的次数 Counter
currentSendTime n/a 最近一条消息从被放入客户端缓冲队列到收到消息确认的时间 Gauge
PulsarProducer.“Topic”.“ProducerName”.numMsgsSent Topic, ProducerName 在过去的一个统计窗口内发送的消息数 Gauge
PulsarProducer.“Topic”.“ProducerName”.numBytesSent Topic, ProducerName 在过去的一个统计窗口内发送的字节数 Gauge
PulsarProducer.“Topic”.“ProducerName”.numSendFailed Topic, ProducerName 在过去的一个统计窗口内发送失败的消息数 Gauge
PulsarProducer.“Topic”.“ProducerName”.numAcksReceived Topic, ProducerName 在过去的一个统计窗口内总共收到的确认数 Gauge
PulsarProducer.“Topic”.“ProducerName”.sendMsgsRate Topic, ProducerName 在过去的一个统计窗口内发送的消息速率 Gauge
PulsarProducer.“Topic”.“ProducerName”.sendBytesRate Topic, ProducerName 在过去的一个统计窗口内发送的字节速率 Gauge
PulsarProducer.“Topic”.“ProducerName”.sendLatencyMillis50pct Topic, ProducerName 在过去的一个统计窗口内的发送延迟的中位数 Gauge
PulsarProducer.“Topic”.“ProducerName”.sendLatencyMillis75pct Topic, ProducerName 在过去的一个统计窗口内的发送延迟的 75 百分位数 Gauge
PulsarProducer.“Topic”.“ProducerName”.sendLatencyMillis95pct Topic, ProducerName 在过去的一个统计窗口内的发送延迟的 95 百分位数 Gauge
PulsarProducer.“Topic”.“ProducerName”.sendLatencyMillis99pct Topic, ProducerName 在过去的一个统计窗口内的发送延迟的 99 百分位数 Gauge
PulsarProducer.“Topic”.“ProducerName”.sendLatencyMillis999pct Topic, ProducerName 在过去的一个统计窗口内的发送延迟的 99.9 百分位数 Gauge
PulsarProducer.“Topic”.“ProducerName”.sendLatencyMillisMax Topic, ProducerName 在过去的一个统计窗口内的最大发送延迟 Gauge
PulsarProducer.“Topic”.“ProducerName”.totalMsgsSent Topic, ProducerName Producer 发送的全部消息数 Gauge
PulsarProducer.“Topic”.“ProducerName”.totalBytesSent Topic, ProducerName Producer 发送的全部字节数 Gauge
PulsarProducer.“Topic”.“ProducerName”.totalSendFailed Topic, ProducerName Producer 发送失败的消息数 Gauge
PulsarProducer.“Topic”.“ProducerName”.totalAcksReceived Topic, ProducerName Producer 确认发送成功的消息数 Gauge
PulsarProducer.“Topic”.“ProducerName”.pendingQueueSize Topic, ProducerName Producer 当前待发送的消息队列大小 Gauge
  • 指标 numBytesOutnumRecordsOutnumRecordsOutErrors 从 Pulsar client 实例的监控指标中获得。

  • numBytesOutRatenumRecordsOutRate 指标是 Flink 内部通过 numBytesOutnumRecordsOut 计数器,在一个 60 秒的窗口内计算得到的。

  • currentSendTime 记录了最近一条消息从放入生产者的缓冲队列到消息被消费确认所耗费的时间。这项指标在 NONE 发送一致性下不可用。

端到端加密 #

Flink 可以使用 Pulsar 提供的加解密功能在 Source 和 Sink 端加解密消息。用户需要提供一个合法的密钥对(即一个公钥和一个私钥,也就是非对称加密方式)来实现端到端的加密。

如何启用端到端加密 #

  1. 创建密钥对

    Pulsar 同时支持 ECDSA 或者 RSA 密钥对,你可以同时创建多组不同类型的密钥对,加密消息时会选择其中任意一组密钥来确保消息更加安全。

    # ECDSA(仅用于 Java 端)
    openssl ecparam -name secp521r1 -genkey -param_enc explicit -out test_ecdsa_privkey.pem
    openssl ec -in test_ecdsa_privkey.pem -pubout -outform pem -out test_ecdsa_pubkey.pem
    
    # RSA
    openssl genrsa -out test_rsa_privkey.pem 2048
    openssl rsa -in test_rsa_privkey.pem -pubout -outform pkcs8 -out test_rsa_pubkey.pem
    
  2. 实现 CryptoKeyReader 接口

    每个密钥对都需要有一个唯一的密钥名称,用户需要自行实现 CryptoKeyReader 接口并确保 CryptoKeyReader.getPublicKey()CryptoKeyReader.getPrivateKey() 方法能基于给定的密钥名称反正正确的密钥。

    Pulsar 提供了一个默认的 CryptoKeyReader 实现 DefaultCryptoKeyReader。用户需要使用对于的 builder 方法 DefaultCryptoKeyReader.builder() 来创建实例。需要注意的是,对应的密钥对文件需要放在 Flink 程序的运行环境上。

    // defaultPublicKey 和 defaultPrivateKey 也需要提供。
    // 文件 file:///path/to/default-public.key 需要在 Flink 的运行环境上存在。
    CryptoKeyReader keyReader = DefaultCryptoKeyReader.builder()
        .defaultPublicKey("file:///path/to/default-public.key")
        .defaultPrivateKey("file:///path/to/default-private.key")
        .publicKey("key1", "file:///path/to/public1.key").privateKey("key1", "file:///path/to/private1.key")
        .publicKey("key2", "file:///path/to/public2.key").privateKey("key2", "file:///path/to/private2.key")
        .build();
    
  3. (可选)实现 MessageCrypto<MessageMetadata, MessageMetadata> 接口

    Pulsar 原生支持 ECDSARSA 等常见非对称加解密方法。通常情况下,你不需要实现此接口,除非你想使用一个私有的加解密方法。你可以参考 Pulsar 的默认实现 MessageCryptoBc 来实现 MessageCrypto<MessageMetadata, MessageMetadata> 接口。

  4. 创建 PulsarCrypto 实例

    PulsarCrypto 用于提供所有必要的加解密信息,你可以使用对应的 builder 方法来创建实例。

    CryptoKeyReader keyReader = DefaultCryptoKeyReader.builder()
        .defaultPublicKey("file:///path/to/public1.key")
        .defaultPrivateKey("file:///path/to/private2.key")
        .publicKey("key1", "file:///path/to/public1.key").privateKey("key1", "file:///path/to/private1.key")
        .publicKey("key2", "file:///path/to/public2.key").privateKey("key2", "file:///path/to/private2.key")
        .build();
    
    // 此处只用于演示如何使用,实际上你不需要这么做。
    SerializableSupplier<MessageCrypto<MessageMetadata, MessageMetadata>> cryptoSupplier = () -> new MessageCryptoBc();
    
    PulsarCrypto pulsarCrypto = PulsarCrypto.builder()
        .cryptoKeyReader(keyReader)
        // 所有的密钥名称需要在此处给出。
        .addEncryptKeys("key1", "key2")
        // 一般情况下你不需要提供 MessageCrypto 实例。
        .messageCrypto(cryptoSupplier)
        .build()
    

在 Pulsar source 上解密消息 #

基于前面的指导创建对应的 PulsarCrypto 实例,然后在 PulsarSource.builder() 的构造方法里面予以给定。你需要同时定义解密失败的行为,Pulsar 在 ConsumerCryptoFailureAction 给定了 3 种实现。

  • ConsumerCryptoFailureAction.FAIL: Flink 程序将抛出异常并退出。

  • ConsumerCryptoFailureAction.DISCARD: 解密失败的消息将被丢弃。

  • ConsumerCryptoFailureAction.CONSUME

    解密失败的消息将以未解密的状态传递给后续的算子,你也可以在 PulsarDeserializationSchema 里手动对解密失败的消息进行再次解密。所有关于解密的上下文都定义在 Message.getEncryptionCtx() 内。

PulsarCrypto pulsarCrypto = ...

PulsarSource<String> sink = PulsarSource.builder()
    ...
    .setPulsarCrypto(pulsarCrypto, ConsumerCryptoFailureAction.FAIL)
    .build();

在 Pulsar sink 上加密消息 #

基于前面的指导创建对应的 PulsarCrypto 实例,然后在 PulsarSink.builder() 的构造方法里面予以给定。你需要同时定义加密失败的行为,Pulsar 在 ProducerCryptoFailureAction 给定了 2 种实现。

  • ProducerCryptoFailureAction.FAIL: Flink 程序将抛出异常并退出。
  • ProducerCryptoFailureAction.SEND: 消息将以未加密的形态发送。
PulsarCrypto pulsarCrypto = ...

PulsarSink<String> sink = PulsarSink.builder()
    ...
    .setPulsarCrypto(pulsarCrypto, ProducerCryptoFailureAction.FAIL)
    .build();

升级至最新的连接器 #

常见的升级步骤,请参阅升级应用程序和 Flink 版本。Pulsar 连接器没有在 Flink 端存储消费的状态,所有的消费信息都推送到了 Pulsar。所以需要注意下面的事项:

  • 不要同时升级 Pulsar 连接器和 Pulsar 服务端的版本。
  • 使用最新版本的 Pulsar 客户端来消费消息。

问题诊断 #

使用 Flink 和 Pulsar 交互时如果遇到问题,由于 Flink 内部实现只是基于 Pulsar 的 Java 客户端管理 API 而开发的。

用户遇到的问题可能与 Flink 无关,请先升级 Pulsar 的版本、Pulsar 客户端的版本,或者修改 Pulsar 的配置、Pulsar 连接器的配置来尝试解决问题。

已知问题 #

本节介绍有关 Pulsar 连接器的一些已知问题。

在 Java 11 上使用不稳定 #

Pulsar connector 在 Java 11 中有一些尚未修复的问题。我们当前推荐在 Java 8 环境中运行Pulsar connector.

不自动重连,而是抛出TransactionCoordinatorNotFound异常 #

Pulsar 事务机制仍在积极发展中,当前版本并不稳定。 Pulsar 2.9.2 引入了这个问题 a break change。 如果您使用 Pulsar 2.9.2或更高版本与较旧的 Pulsar 客户端一起使用,您可能会收到一个“TransactionCoordinatorNotFound”异常。

您可以使用最新的pulsar-client-all分支来解决这个问题。

Back to top