Google Cloud Storage #
Google Cloud Storage (GCS) provides cloud storage for a variety of use cases. You can use it for reading and writing data, and for checkpoint storage when using
FileSystemCheckpointStorage) with the streaming state backends.
You can use GCS objects like regular files by specifying paths in the following format:
The endpoint can either be a single file or a directory, for example:
// Read from GCS bucket env.readTextFile("gs://<bucket>/<endpoint>"); // Write to GCS bucket stream.writeAsText("gs://<bucket>/<endpoint>"); // Use GCS as checkpoint storage env.getCheckpointConfig().setCheckpointStorage("gs://<bucket>/<endpoint>");
Note that these examples are not exhaustive and you can use GCS in other places as well, including your high availability setup or the EmbeddedRocksDBStateBackend; everywhere that Flink expects a FileSystem URI.
GCS File System plugin #
Flink provides the
flink-gs-fs-hadoop file system to write to GCS.
This implementation is self-contained with no dependency footprint, so there is no need to add Hadoop to the classpath to use it.
flink-gs-fs-hadoop registers a
FileSystem wrapper for URIs with the gs:// scheme. It uses Google’s gcs-connector Hadoop library to access GCS. It also uses Google’s google-cloud-storage library to provide
This file system can be used with the FileSystem connector.
flink-gs-fs-hadoop, copy the JAR file from the
opt directory to the
plugins directory of your Flink distribution before starting Flink, i.e.
mkdir ./plugins/gs-fs-hadoop cp ./opt/flink-gs-fs-hadoop-1.16.0.jar ./plugins/gs-fs-hadoop/
The underlying Hadoop file system can be configured using the Hadoop configuration keys for
gcs-connector by adding the configurations to your
gcs-connector has a
fs.gs.http.connect-timeout configuration key. If you want to change it, you need to set
gs.http.connect-timeout: xyz in
flink-conf.yaml. Flink will internally translate this back to
You can also set
gcs-connector options directly in the Hadoop
core-site.xml configuration file, so long as the Hadoop configuration directory is made known to Flink via the
env.hadoop.conf.dir Flink option or via the
HADOOP_CONF_DIR environment variable.
flink-gs-fs-hadoop can also be configured by setting the following options in
|gs.writer.temporary.bucket.name||Set this property to choose a bucket to hold temporary blobs for in-progress writes via
It is recommended to choose a separate bucket in order to assign it a TTL, to provide a mechanism to clean up orphaned blobs that can occur when restoring from check/savepoints.
If you do use a separate bucket with a TTL for temporary blobs, attempts to restart jobs from check/savepoints after the TTL interval expires may fail.
|gs.writer.chunk.size||Set this property to set the chunk size for writes via
If not set, a Google-determined default chunk size will be used.
Authentication to access GCS #
Most operations on GCS require authentication. To provide authentication credentials, either:
GOOGLE_APPLICATION_CREDENTIALSenvironment variable to the path of the JSON credentials file, as described here, where JobManagers and TaskManagers run. This is the recommended method.
core-site.xmlto the path to the JSON credentials file (and make sure that the Hadoop configuration directory is specified to Flink as described above):
<configuration> <property> <name>google.cloud.auth.service.account.json.keyfile</name> <value>PATH TO GOOGLE AUTHENTICATION JSON FILE</value> </property> </configuration>
flink-gs-fs-hadoop to use credentials via either of these two methods, the use of service accounts for authentication must be enabled. This is enabled by default; however, it can be disabled in
core-site.xml by setting:
<configuration> <property> <name>google.cloud.auth.service.account.enable</name> <value>false</value> </property> </configuration>
gcs-connectorsupports additional options to provide authentication credentials besides the
google.cloud.auth.service.account.json.keyfileoption described above.
However, if you use any of those other options, the provided credentials will not be used by the
google-cloud-storagelibrary, which provides
RecoverableWritersupport, so Flink recoverable-write operations would be expected to fail.
For this reason, use of the
gcs-connectorauthentication-credentials options other than
google.cloud.auth.service.account.json.keyfileis not recommended.