This documentation is for an out-of-date version of Apache Flink. We recommend you use the latest stable version.
Google Cloud PubSub #
这个连接器可向 Google Cloud PubSub 读取与写入数据。添加下面的依赖来使用此连接器:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-pubsub</artifactId>
<version>1.15.4</version>
</dependency>
注意:此连接器最近才加到 Flink 里,还未接受广泛测试。
注意连接器目前还不是二进制发行版的一部分,添加依赖、打包配置以及集群运行信息请参考这里
Consuming or Producing PubSubMessages #
连接器可以接收和发送 Google PubSub 的信息。和 Google PubSub 一样,这个连接器能够保证至少一次
的语义。
PubSub SourceFunction #
PubSubSource
类的对象由构建类来构建: PubSubSource.newBuilder(...)
有多种可选的方法来创建 PubSubSource,但最低要求是要提供 Google Project、Pubsub 订阅和反序列化 PubSubMessages 的方法。
Example:
StreamExecutionEnvironment streamExecEnv = StreamExecutionEnvironment.getExecutionEnvironment();
DeserializationSchema<SomeObject> deserializer = (...);
SourceFunction<SomeObject> pubsubSource = PubSubSource.newBuilder()
.withDeserializationSchema(deserializer)
.withProjectName("project")
.withSubscriptionName("subscription")
.build();
streamExecEnv.addSource(pubsubSource);
当前还不支持 PubSub 的 source functions pulls messages 和 push endpoints。
PubSub Sink #
PubSubSink
类的对象由构建类来构建: PubSubSink.newBuilder(...)
构建类的使用方式与 PubSubSource 类似。
Example:
DataStream<SomeObject> dataStream = (...);
SerializationSchema<SomeObject> serializationSchema = (...);
SinkFunction<SomeObject> pubsubSink = PubSubSink.newBuilder()
.withSerializationSchema(serializationSchema)
.withProjectName("project")
.withSubscriptionName("subscription")
.build()
dataStream.addSink(pubsubSink);
Google Credentials #
应用程序需要使用 Credentials 来通过认证和授权才能使用 Google Cloud Platform 的资源,例如 PubSub。
上述的两个构建类都允许你提供 Credentials, 但是连接器默认会通过环境变量: GOOGLE_APPLICATION_CREDENTIALS 来获取 Credentials 的路径。
如果你想手动提供 Credentials,例如你想从外部系统读取 Credentials,你可以使用 PubSubSource.newBuilder(...).withCredentials(...)
。
集成测试 #
在集成测试的时候,如果你不想直接连 PubSub 而是想读取和写入一个 docker container,可以参照 PubSub testing locally。
下面的例子展示了如何使用 source 来从仿真器读取信息并发送回去:
String hostAndPort = "localhost:1234";
DeserializationSchema<SomeObject> deserializationSchema = (...);
SourceFunction<SomeObject> pubsubSource = PubSubSource.newBuilder()
.withDeserializationSchema(deserializationSchema)
.withProjectName("my-fake-project")
.withSubscriptionName("subscription")
.withPubSubSubscriberFactory(new PubSubSubscriberFactoryForEmulator(hostAndPort, "my-fake-project", "subscription", 10, Duration.ofSeconds(15), 100))
.build();
SerializationSchema<SomeObject> serializationSchema = (...);
SinkFunction<SomeObject> pubsubSink = PubSubSink.newBuilder()
.withSerializationSchema(serializationSchema)
.withProjectName("my-fake-project")
.withSubscriptionName("subscription")
.withHostAndPortForEmulator(hostAndPort)
.build();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.addSource(pubsubSource)
.addSink(pubsubSink);
至少一次语义保证 #
SourceFunction #
有很多原因导致会一个信息会被多次发出,例如 Google PubSub 的故障。
另一个可能的原因是超过了确认的截止时间,即收到与确认信息之间的时间间隔。PubSubSource 只有在信息被成功快照之后才会确认以保证至少一次的语义。这意味着,如果你的快照间隔大于信息确认的截止时间,那么你订阅的信息很有可能会被多次处理。
因此,我们建议把快照的间隔设置得比信息确认截止时间更短。
参照 PubSub 来增加信息确认截止时间。
注意: PubSubMessagesProcessedNotAcked
显示了有多少信息正在等待下一个 checkpoint 还没被确认。
SinkFunction #
Sink function 会把准备发到 PubSub 的信息短暂地缓存以提高性能。每次 checkpoint 前,它会刷新缓冲区,并且只有当所有信息成功发送到 PubSub 之后,checkpoint 才会成功完成。
常见问题 #
由于Pulsar Connector大量依赖 PulsarClient 和 PulsarAdmin 来实现相关功能,有时Flink Job出现 可能是由于Pulsar broker版本过低或者配置不合适导致的。调试配置或升级pulsar可能会解决部分问题。
当数据量很小时,source读数据出现大约10s延迟 #
当Pulsar Source从一个数据量很小的topic读取数据时,用户可能会观测到消息间出现10秒钟的间隔。这是由于Pulsar Source
会将从broker读到的消息暂存至等待队列中,只有当队列中消息数量达到了 PulsarSourceOptions.PULSAR_MAX_FETCH_RECORDS
才会向下游算子发送数据。如果数据量一直很小,那么就会在等待直到等待时长超过PULSAR_MAX_FETCH_TIME
规定的值 (默认10秒钟)时
也向下游算子发送数据。
为了避免这种情况,用户需要改变两个配置项的值。