阿里云对象存储服务 (OSS) #
OSS:对象存储服务 #
阿里云对象存储服务 (Aliyun OSS) 使用广泛,尤其在中国云用户中十分流行,能提供多种应用场景下的云对象存储。OSS 可与 Flink 一起使用以读取与存储数据,以及与流 State Backend 结合使用。
通过以下格式指定路径,OSS 对象可类似于普通文件使用:
oss://<your-bucket>/<object-name>
以下代码展示了如何在 Flink 作业中使用 OSS:
// 读取 OSS bucket
env.readTextFile("oss://<your-bucket>/<object-name>");
// 写入 OSS bucket
stream.writeAsText("oss://<your-bucket>/<object-name>");
// 将 OSS 用作 checkpoint storage
Configuration config = new Configuration();
config.set(CheckpointingOptions.CHECKPOINT_STORAGE, "filesystem");
config.set(CheckpointingOptions.CHECKPOINTS_DIRECTORY, "oss://<your-bucket>/<object-name>");
env.configure(config);
Shaded Hadoop OSS 文件系统 #
为使用 flink-oss-fs-hadoop
,在启动 Flink 之前,将对应的 JAR 文件从 opt
目录复制到 Flink 发行版中的 plugin
目录下的一个文件夹中,例如:
mkdir ./plugins/oss-fs-hadoop
cp ./opt/flink-oss-fs-hadoop-1.19.1.jar ./plugins/oss-fs-hadoop/
flink-oss-fs-hadoop
为使用 oss:// scheme 的 URI 注册了默认的文件系统包装器。
配置设置 #
在设置好 OSS 文件系统包装器之后,需要添加一些配置以保证 Flink 有权限访问 OSS buckets。
为了简单使用,可直接在 Flink 配置文件 中使用与 Hadoop core-site.xml
相同的配置关键字。
可在 Hadoop OSS 文档 中查看配置关键字。
一些配置必须添加至 Flink 配置文件 (在 Hadoop OSS 文档中定义的其它配置为用作性能调优的高级配置):
fs.oss.endpoint: 连接的 Aliyun OSS endpoint
fs.oss.accessKeyId: Aliyun access key ID
fs.oss.accessKeySecret: Aliyun access key secret
备选的 CredentialsProvider
也可在 Flink 配置文件 中配置,例如:
# 从 OSS_ACCESS_KEY_ID 和 OSS_ACCESS_KEY_SECRET 读取凭据 (Credentials)
fs.oss.credentials.provider: com.aliyun.oss.common.auth.EnvironmentVariableCredentialsProvider
其余的凭据提供者(credential providers)可在这里中找到。