- 应用开发
- Streaming (DataStream API)
- Connectors
- Hadoop FileSystem
Hadoop FileSystem 连接器
本文档是 Apache Flink 的旧版本。建议访问 最新的稳定版本。
这个连接器可以向所有 Hadoop FileSystem 支持的文件系统写入分区文件。
使用前,需要在工程里添加下面的依赖:
注意连接器目前还不是二进制发行版的一部分,添加依赖、打包配置以及集群运行信息请参考 这里。
分桶文件 Sink
关于分桶的配置我们后面会有讲述,这里先创建一个分桶 sink,默认情况下这个 sink 会将数据写入到按照时间切分的滚动文件中:
初始化时只需要一个参数,这个参数表示分桶文件存储的路径。分桶 sink 可以通过指定自定义的 bucketer、 writer 和 batch 值进一步配置。
默认情况下,当数据到来时,分桶 sink 会按照系统时间对数据进行切分,并以 "yyyy-MM-dd--HH"
的时间格式给每个桶命名。然后
DateTimeFormatter
按照这个时间格式将当前系统时间以 JVM 默认时区转换成分桶的路径。用户可以自定义时区来生成
分桶的路径。每遇到一个新的日期都会产生一个新的桶。例如,如果时间的格式以分钟为粒度,那么每分钟都会产生一个桶。每个桶都是一个目录,
目录下包含了几个部分文件(part files):每个 sink 的并发实例都会创建一个属于自己的部分文件,当这些文件太大的时候,sink 会产生新的部分文件。
当一个桶不再活跃时,打开的部分文件会刷盘并且关闭。如果一个桶最近一段时间都没有写入,那么这个桶被认为是不活跃的。sink 默认会每分钟
检查不活跃的桶、关闭那些超过一分钟没有写入的桶。这些行为可以通过 BucketingSink
的 setInactiveBucketCheckInterval()
和 setInactiveBucketThreshold()
进行设置。
可以调用BucketingSink
的 setBucketer()
方法指定自定义的 bucketer,如果需要的话,也可以使用一个元素或者元组属性来决定桶的路径。
默认的 writer 是 StringWriter
。数据到达时,通过 toString()
方法得到内容,内容以换行符分隔,StringWriter
将数据
内容写入部分文件。可以通过 BucketingSink
的 setWriter()
指定自定义的 writer。SequenceFileWriter
支持写入 Hadoop
SequenceFiles,并且可以配置是否开启压缩。
关闭部分文件和打开新部分文件的时机可以通过两个配置来确定:
- 设置文件大小(默认文件大小是384MB)
- 设置文件滚动周期,单位是毫秒(默认滚动周期是
Long.MAX_VALUE
)
当上述两个条件中的任意一个被满足,都会生成一个新的部分文件。
示例:
上述代码会创建一个 sink,这个 sink 按下面的模式写入桶文件:
date-time
是我们从日期/时间格式获得的字符串,parallel-task
是 sink 并发实例的索引,count
是因文件大小或者滚动周期而产生的
文件的编号。
更多信息,请参考 BucketingSink。
Back to top