这个连接器提供了一个 Sink 来将分区文件写入到支持 Flink FileSystem
接口的文件系统中。
由于在流处理中输入可能是无限的,所以流处理的文件 sink 会将数据写入到桶中。如何分桶是可以配置的,一种有效的默认 策略是基于时间的分桶,这种策略每个小时写入一个新的桶,这些桶各包含了无限输出流的一部分数据。
在一个桶内部,会进一步将输出基于滚动策略切分成更小的文件。这有助于防止桶文件变得过大。滚动策略也是可以配置的,默认 策略会根据文件大小和超时时间来滚动文件,超时时间是指没有新数据写入部分文件(part file)的时间。
StreamingFileSink
支持行编码格式和批量编码格式,比如 Apache Parquet。
只需要配置一个输出路径和一个 Encoder。
Encoder负责为每个文件的 OutputStream
序列化数据。
基本用法如下:
上面的代码创建了一个按小时分桶、按默认策略滚动的 sink。默认分桶器是 DateTimeBucketAssigner ,默认滚动策略是 DefaultRollingPolicy。 可以为 sink builder 自定义 BucketAssigner 和 RollingPolicy。 更多配置操作以及分桶器和滚动策略的工作机制和相互影响请参考: StreamingFileSink。
上面的示例使用 Encoder
分别序列化每一个记录。除此之外,流式文件 sink 还支持批量编码的输出格式,比如 Apache Parquet。
使用这种编码格式需要用 StreamingFileSink.forBulkFormat()
来代替 StreamingFileSink.forRowFormat()
,然后指定一个 BulkWriter.Factory
。
ParquetAvroWriters
中包含了为各种类型创建 BulkWriter.Factory
的静态方法。
重要提示 1: 对于 S3,StreamingFileSink
只支持基于 Hadoop
的文件系统实现,不支持基于 Presto 的实现。如果想使用 StreamingFileSink
向 S3 写入数据并且将
checkpoint 放在基于 Presto 的文件系统,建议明确指定 “s3a://” (for Hadoop)作为sink的目标路径方案,并且为 checkpoint 路径明确指定 “s3p://” (for Presto)。
如果 Sink 和 checkpoint 都使用 “s3://” 路径的话,可能会导致不可预知的行为,因为双方的实现都在“监听”这个路径。
重要提示 2: StreamingFileSink
使用 S3 的 Multi-part Upload
(后续使用MPU代替)特性可以保证精确一次的语义。这个特性支持以独立的块(因此被称为”multi-part”)模式上传文件,当 MPU 的所有部分文件
成功上传之后,可以合并成原始文件。对于失效的 MPUs,S3 提供了一个基于桶生命周期的规则,用户可以用这个规则来丢弃在指定时间内未完成的MPU。
如果在一些部分文件还未上传时触发 savepoint,并且这个规则设置的比较严格,这意味着相关的 MPU在作业重启之前可能会超时。后续的部分文件没
有写入到 savepoint, 那么在 Flink 作业从 savepoint 恢复时,会因为拿不到缺失的部分文件,导致任务失败并抛出异常。