Google Cloud storage (GCS) provides cloud storage for a variety of use cases. You can use it for reading and writing data, and for checkpoint storage when using FileSystemCheckpointStorage
) with the streaming state backends.
You can use GCS objects like regular files by specifying paths in the following format:
gs://<your-bucket>/<endpoint>
The endpoint can either be a single file or a directory, for example:
// Read from GSC bucket
env.readTextFile("gs://<bucket>/<endpoint>");
// Write to GCS bucket
stream.writeAsText("gs://<bucket>/<endpoint>");
// Use GCS as checkpoint storage
env.getCheckpointConfig().setCheckpointStorage("gs://<bucket>/<endpoint>");
You must include the following jars in Flink’s lib
directory to connect Flink with gcs:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-shaded-hadoop2-uber</artifactId>
<version>${flink.shared_hadoop_latest_version}</version>
</dependency>
<dependency>
<groupId>com.google.cloud.bigdataoss</groupId>
<artifactId>gcs-connector</artifactId>
<version>hadoop2-2.2.0</version>
</dependency>
We have tested with flink-shared-hadoop2-uber
version >= 2.8.3-1.8.3
.
You can track the latest version of the gcs-connector hadoop 2.
Most operations on GCS require authentication. Please see the documentation on Google Cloud Storage authentication for more information.
You can use the following method for authentication
Configure via core-site.xml
You would need to add the following properties to core-site.xml
<configuration>
<property>
<name>google.cloud.auth.service.account.enable</name>
<value>true</value>
</property>
<property>
<name>google.cloud.auth.service.account.json.keyfile</name>
<value><PATH TO GOOGLE AUTHENTICATION JSON></value>
</property>
</configuration>
You would need to add the following to flink-conf.yaml
flinkConfiguration:
fs.hdfs.hadoopconf: <DIRECTORY PATH WHERE core-site.xml IS SAVED>
You can provide the necessary key via the GOOGLE_APPLICATION_CREDENTIALS
environment variable.