SSL 设置 #
This page provides instructions on how to enable TLS/SSL authentication and encryption for network communication with and between Flink processes. NOTE: TLS/SSL authentication is not enabled by default.
Internal and External Connectivity #
When securing network connections between machines processes through authentication and encryption, Apache Flink differentiates between internal and external connectivity. Internal Connectivity refers to all connections made between Flink processes. These connections run Flink custom protocols. Users never connect directly to internal connectivity endpoints. External / REST Connectivity endpoints refers to all connections made from the outside to Flink processes. This includes the web UI and REST commands to start and control running Flink jobs/applications, including the communication of the Flink CLI with the JobManager / Dispatcher.
For more flexibility, security for internal and external connectivity can be enabled and configured separately.
Internal Connectivity #
Internal connectivity includes:
- Control messages: RPC between JobManager / TaskManager / Dispatcher / ResourceManager
- The data plane: The connections between TaskManagers to exchange data during shuffles, broadcasts, redistribution, etc.
- The Blob Service (distribution of libraries and other artifacts).
All internal connections are SSL authenticated and encrypted. The connections use mutual authentication (mTLS), meaning both server and client side of each connection need to present the certificate to each other. The certificate acts effectively as a shared secret when a dedicated CA is used to exclusively sign an internal cert. The certificate for internal communication is not needed by any other party to interact with Flink, and can be simply added to the container images, or attached to the YARN deployment.
-
The easiest way to realize this setup is by generating a dedicated public/private key pair and self-signed certificate for the Flink deployment. The key- and truststore are identical and contains only that key pair / certificate. An example is shown below.
-
In an environment where operators are constrained to use firm-wide Internal CA (cannot generated self-signed certificates), the recommendation is to still have a dedicated key pair / certificate for the Flink deployment, signed by that CA. However, the TrustStore must then also contain the CA’s public certificate tho accept the deployment’s certificate during the SSL handshake (requirement in JDK TrustStore implementation).
NOTE: Because of that, it is critical that you specify the fingerprint of the deployment certificate (
security.ssl.internal.cert.fingerprint
), when it is not self-signed, to pin that certificate as the only trusted certificate and prevent the TrustStore from trusting all certificates signed by that CA.
Note: Because internal connections are mutually authenticated with shared certificates, Flink can skip hostname verification. This makes container-based setups easier.
External / REST Connectivity #
All external connectivity is exposed via an HTTP/REST endpoint, used for example by the web UI and the CLI:
- Communication with the Dispatcher to submit jobs (session clusters)
- Communication with the JobMaster to inspect and modify a running job/application
The REST endpoints can be configured to require SSL connections. The server will, however, accept connections from any client by default, meaning the REST endpoint does not authenticate the client.
Simple mutual authentication may be enabled by configuration if authentication of connections to the REST endpoint is required, but we recommend to deploy a “side car proxy”: Bind the REST endpoint to the loopback interface (or the pod-local interface in Kubernetes) and start a REST proxy that authenticates and forwards the requests to Flink. Examples for proxies that Flink users have deployed are Envoy Proxy or NGINX with MOD_AUTH.
The rationale behind delegating authentication to a proxy is that such proxies offer a wide variety of authentication options and thus better integration into existing infrastructures.
Configuring SSL #
SSL can be enabled separately for internal and external connectivity:
- security.ssl.internal.enabled: Enable SSL for all internal connections.
- security.ssl.rest.enabled: Enable SSL for REST / external connections.
Note: For backwards compatibility, the security.ssl.enabled option still exists and enables SSL for both internal and REST endpoints.
For internal connectivity, you can optionally disable security for different connection types separately.
When security.ssl.internal.enabled
is set to true
, you can set the following parameters to false
to disable SSL for that particular connection type:
taskmanager.data.ssl.enabled
: Data communication between TaskManagersblob.service.ssl.enabled
: Transport of BLOBs from JobManager to TaskManagerpekko.ssl.enabled
: Pekko-based RPC connections between JobManager / TaskManager / ResourceManager
Keystores and Truststores #
The SSL configuration requires to configure a keystore and a truststore. The keystore contains the public certificate (public key) and the private key, while the truststore contains the trusted certificates or the trusted authorities. Both stores need to be set up such that the truststore trusts the keystore’s certificate.
Internal Connectivity #
Because internal communication is mutually authenticated between server and client side, keystore and truststore typically refer to a dedicated certificate that acts as a shared secret. In such a setup, the certificate can use wild card hostnames or addresses. When using self-signed certificates, it is even possible to use the same file as keystore and truststore.
security.ssl.internal.keystore: /path/to/file.keystore
security.ssl.internal.keystore-password: keystore_password
security.ssl.internal.key-password: key_password
security.ssl.internal.truststore: /path/to/file.truststore
security.ssl.internal.truststore-password: truststore_password
When using a certificate that is not self-signed, but signed by a CA, you need to use certificate pinning to allow only a a specific certificate to be trusted when establishing the connectivity.
security.ssl.internal.cert.fingerprint: 00:00:00:00:00:00:00:00:00:00:00:00:00:00:00:00:00:00:00:00
REST Endpoints (external connectivity) #
For REST endpoints, by default the keystore is used by the server endpoint, and the truststore is used by the REST clients (including the CLI client) to accept the server’s certificate. In the case where the REST keystore has a self-signed certificate, the truststore must trust that certificate directly. If the REST endpoint uses a certificate that is signed through a proper certification hierarchy, the roots of that hierarchy should be in the trust store.
If mutual authentication is enabled, the keystore and the truststore are used by both, the server endpoint and the REST clients as with internal connectivity.
security.ssl.rest.keystore: /path/to/file.keystore
security.ssl.rest.keystore-password: keystore_password
security.ssl.rest.key-password: key_password
security.ssl.rest.truststore: /path/to/file.truststore
security.ssl.rest.truststore-password: truststore_password
security.ssl.rest.authentication-enabled: false
Cipher suites #
The IETF RFC 7525 recommends to use a specific set of cipher suites for strong security. Because these cipher suites were not available on many setups out of the box, Flink’s default value is set to a slightly weaker but more compatible cipher suite. We recommend that SSL setups update to the stronger cipher suites, if possible, by adding the below entry to the Flink configuration:
security.ssl.algorithms: TLS_DHE_RSA_WITH_AES_128_GCM_SHA256,TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256,TLS_DHE_RSA_WITH_AES_256_GCM_SHA384,TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384
If these cipher suites are not supported on your setup, you will see that Flink processes will not be able to connect to each other.
Complete List of SSL Options #
Key | Default | Type | Description |
---|---|---|---|
security.context.factory.classes |
"org.apache.flink.runtime.security.contexts.HadoopSecurityContextFactory"; |
List<String> | List of factories that should be used to instantiate a security context. If multiple are configured, Flink will use the first compatible factory. You should have a NoOpSecurityContextFactory in this list as a fallback. |
security.delegation.token.provider.<serviceName>.enabled |
true | Boolean | Controls whether to obtain credentials for services when security is enabled. By default, credentials for all supported services are retrieved when those services are configured, but it's possible to disable that behavior if it somehow conflicts with the application being run. |
security.delegation.tokens.enabled |
true | Boolean | Indicates whether to start delegation tokens system for external services. |
security.delegation.tokens.renewal.retry.backoff |
1 h | Duration | The time period how long to wait before retrying to obtain new delegation tokens after a failure. |
security.delegation.tokens.renewal.time-ratio |
0.75 | Double | Ratio of the tokens's expiration time when new credentials should be re-obtained. |
security.kerberos.access.hadoopFileSystems |
(none) | List<String> | A semicolon-separated list of Kerberos-secured Hadoop filesystems Flink is going to access. For example, security.kerberos.access.hadoopFileSystems=hdfs://namenode2:9002;hdfs://namenode3:9003. The JobManager needs to have access to these filesystems to retrieve the security tokens. |
security.kerberos.krb5-conf.path |
(none) | String | Specify the local location of the krb5.conf file. If defined, this conf would be mounted on the JobManager and TaskManager containers/pods for Kubernetes and Yarn. Note: The KDC defined needs to be visible from inside the containers. |
security.kerberos.login.contexts |
(none) | String | A comma-separated list of login contexts to provide the Kerberos credentials to (for example, `Client,KafkaClient` to use the credentials for ZooKeeper authentication and for Kafka authentication) |
security.kerberos.login.keytab |
(none) | String | Absolute path to a Kerberos keytab file that contains the user credentials. |
security.kerberos.login.principal |
(none) | String | Kerberos principal name associated with the keytab. |
security.kerberos.login.use-ticket-cache |
true | Boolean | Indicates whether to read from your Kerberos ticket cache. |
security.kerberos.relogin.period |
1 min | Duration | The time period when keytab login happens automatically in order to always have a valid TGT. |
security.module.factory.classes |
"org.apache.flink.runtime.security.modules.HadoopModuleFactory"; |
List<String> | List of factories that should be used to instantiate security modules. All listed modules will be installed. Keep in mind that the configured security context might rely on some modules being present. |
security.ssl.algorithms |
"TLS_RSA_WITH_AES_128_CBC_SHA" | String | The comma separated list of standard SSL algorithms to be supported. Read more here |
security.ssl.internal.cert.fingerprint |
(none) | String | The sha1 fingerprint of the internal certificate. This further protects the internal communication to present the exact certificate used by Flink.This is necessary where one cannot use private CA(self signed) or there is internal firm wide CA is required |
security.ssl.internal.close-notify-flush-timeout |
-1 | Integer | The timeout (in ms) for flushing the `close_notify` that was triggered by closing a channel. If the `close_notify` was not flushed in the given timeout the channel will be closed forcibly. (-1 = use system default) |
security.ssl.internal.enabled |
false | Boolean | Turns on SSL for internal network communication. Optionally, specific components may override this through their own settings (rpc, data transport, REST, etc). |
security.ssl.internal.handshake-timeout |
-1 | Integer | The timeout (in ms) during SSL handshake. (-1 = use system default) |
security.ssl.internal.key-password |
(none) | String | The secret to decrypt the key in the keystore for Flink's internal endpoints (rpc, data transport, blob server). |
security.ssl.internal.keystore |
(none) | String | The Java keystore file with SSL Key and Certificate, to be used Flink's internal endpoints (rpc, data transport, blob server). |
security.ssl.internal.keystore-password |
(none) | String | The secret to decrypt the keystore file for Flink's for Flink's internal endpoints (rpc, data transport, blob server). |
security.ssl.internal.keystore-type |
JVM default keystore type | String | The type of keystore for Flink's internal endpoints (rpc, data transport, blob server). |
security.ssl.internal.session-cache-size |
-1 | Integer | The size of the cache used for storing SSL session objects. According to here, you should always set this to an appropriate number to not run into a bug with stalling IO threads during garbage collection. (-1 = use system default). |
security.ssl.internal.session-timeout |
-1 | Integer | The timeout (in ms) for the cached SSL session objects. (-1 = use system default) |
security.ssl.internal.truststore |
(none) | String | The truststore file containing the public CA certificates to verify the peer for Flink's internal endpoints (rpc, data transport, blob server). |
security.ssl.internal.truststore-password |
(none) | String | The password to decrypt the truststore for Flink's internal endpoints (rpc, data transport, blob server). |
security.ssl.internal.truststore-type |
JVM default keystore type | String | The type of truststore for Flink's internal endpoints (rpc, data transport, blob server). |
security.ssl.protocol |
"TLSv1.2" | String | The SSL protocol version to be supported for the ssl transport. Note that it doesn’t support comma separated list. |
security.ssl.provider |
"JDK" | String | The SSL engine provider to use for the ssl transport:
OPENSSL is based on netty-tcnative and comes in two flavours:
|
security.ssl.rest.authentication-enabled |
false | Boolean | Turns on mutual SSL authentication for external communication via the REST endpoints. |
security.ssl.rest.cert.fingerprint |
(none) | String | The sha1 fingerprint of the rest certificate. This further protects the rest REST endpoints to present certificate which is only used by proxy serverThis is necessary where once uses public CA or internal firm wide CA |
security.ssl.rest.enabled |
false | Boolean | Turns on SSL for external communication via the REST endpoints. |
security.ssl.rest.key-password |
(none) | String | The secret to decrypt the key in the keystore for Flink's external REST endpoints. |
security.ssl.rest.keystore |
(none) | String | The Java keystore file with SSL Key and Certificate, to be used Flink's external REST endpoints. |
security.ssl.rest.keystore-password |
(none) | String | The secret to decrypt the keystore file for Flink's for Flink's external REST endpoints. |
security.ssl.rest.keystore-type |
JVM default keystore type | String | The type of the keystore for Flink's external REST endpoints. |
security.ssl.rest.truststore |
(none) | String | The truststore file containing the public CA certificates to verify the peer for Flink's external REST endpoints. |
security.ssl.rest.truststore-password |
(none) | String | The password to decrypt the truststore for Flink's external REST endpoints. |
security.ssl.rest.truststore-type |
JVM default keystore type | String | The type of the truststore for Flink's external REST endpoints. |
security.ssl.verify-hostname |
true | Boolean | Flag to enable peer’s hostname verification during ssl handshake. |
zookeeper.sasl.disable |
false | Boolean | |
zookeeper.sasl.login-context-name |
"Client" | String | |
zookeeper.sasl.service-name |
"zookeeper" | String |
Creating and Deploying Keystores and Truststores #
Keys, Certificates, and the Keystores and Truststores can be generated using the keytool utility. You need to have an appropriate Java Keystore and Truststore accessible from each node in the Flink cluster.
- For standalone setups, this means copying the files to each node, or adding them to a shared mounted directory.
- For container based setups, add the keystore and truststore files to the container images.
- For Yarn setups, the cluster deployment phase can automatically distribute the keystore and truststore files.
For the externally facing REST endpoint, the common name or subject alternative names in the certificate should match the node’s hostname and IP address.
Example SSL Setup Standalone and Kubernetes #
Internal Connectivity
Execute the following keytool commands to create a key pair in a keystore:
$ keytool -genkeypair \
-alias flink.internal \
-keystore internal.keystore \
-dname "CN=flink.internal" \
-storepass internal_store_password \
-keyalg RSA \
-keysize 4096 \
-storetype PKCS12
The single key/certificate in the keystore is used the same way by the server and client endpoints (mutual authentication). The key pair acts as the shared secret for internal security, and we can directly use it as keystore and truststore.
security.ssl.internal.enabled: true
security.ssl.internal.keystore: /path/to/flink/conf/internal.keystore
security.ssl.internal.truststore: /path/to/flink/conf/internal.keystore
security.ssl.internal.keystore-password: internal_store_password
security.ssl.internal.truststore-password: internal_store_password
security.ssl.internal.key-password: internal_store_password
REST Endpoint
The REST endpoint may receive connections from external processes, including tools that are not part of Flink (for example curl request to the REST API). Setting up a proper certificate that is signed though a CA hierarchy may make sense for the REST endpoint.
However, as mentioned above, the REST endpoint does not authenticate clients and thus typically needs to be secured via a proxy anyways.
REST Endpoint (simple self signed certificate)
This example shows how to create a simple keystore / truststore pair. The truststore does not contain the primary key and can be shared with other applications. In this example, myhost.company.org / ip:10.0.2.15 is the node (or service) for the JobManager.
$ keytool -genkeypair -alias flink.rest -keystore rest.keystore -dname "CN=myhost.company.org" -ext "SAN=dns:myhost.company.org,ip:10.0.2.15" -storepass rest_keystore_password -keyalg RSA -keysize 4096 -storetype PKCS12
$ keytool -exportcert -keystore rest.keystore -alias flink.rest -storepass rest_keystore_password -file flink.cer
$ keytool -importcert -keystore rest.truststore -alias flink.rest -storepass rest_truststore_password -file flink.cer -noprompt
security.ssl.rest.enabled: true
security.ssl.rest.keystore: /path/to/flink/conf/rest.keystore
security.ssl.rest.truststore: /path/to/flink/conf/rest.truststore
security.ssl.rest.keystore-password: rest_keystore_password
security.ssl.rest.truststore-password: rest_truststore_password
security.ssl.rest.key-password: rest_keystore_password
REST Endpoint (with a self signed CA)
Execute the following keytool commands to create a truststore with a self signed CA.
$ keytool -genkeypair -alias ca -keystore ca.keystore -dname "CN=Sample CA" -storepass ca_keystore_password -keyalg RSA -keysize 4096 -ext "bc=ca:true" -storetype PKCS12
$ keytool -exportcert -keystore ca.keystore -alias ca -storepass ca_keystore_password -file ca.cer
$ keytool -importcert -keystore ca.truststore -alias ca -storepass ca_truststore_password -file ca.cer -noprompt
Now create a keystore for the REST endpoint with a certificate signed by the above CA. Let flink.company.org / ip:10.0.2.15 be the hostname of the JobManager.
$ keytool -genkeypair -alias flink.rest -keystore rest.signed.keystore -dname "CN=flink.company.org" -ext "SAN=dns:flink.company.org" -storepass rest_keystore_password -keyalg RSA -keysize 4096 -storetype PKCS12
$ keytool -certreq -alias flink.rest -keystore rest.signed.keystore -storepass rest_keystore_password -file rest.csr
$ keytool -gencert -alias ca -keystore ca.keystore -storepass ca_keystore_password -ext "SAN=dns:flink.company.org,ip:10.0.2.15" -infile rest.csr -outfile rest.cer
$ keytool -importcert -keystore rest.signed.keystore -storepass rest_keystore_password -file ca.cer -alias ca -noprompt
$ keytool -importcert -keystore rest.signed.keystore -storepass rest_keystore_password -file rest.cer -alias flink.rest -noprompt
Now add the following configuration to your Flink configuration file:
security.ssl.rest.enabled: true
security.ssl.rest.keystore: /path/to/flink/conf/rest.signed.keystore
security.ssl.rest.truststore: /path/to/flink/conf/ca.truststore
security.ssl.rest.keystore-password: rest_keystore_password
security.ssl.rest.key-password: rest_keystore_password
security.ssl.rest.truststore-password: ca_truststore_password
Tips to query REST Endpoint with curl utility
You can convert the keystore into the PEM
format using openssl
:
$ openssl pkcs12 -passin pass:rest_keystore_password -in rest.keystore -out rest.pem -nodes
Then you can query REST Endpoint with curl
:
$ curl --cacert rest.pem flink_url
If mutual SSL is enabled:
$ curl --cacert rest.pem --cert rest.pem flink_url
Tips for YARN Deployment #
For YARN, you can use the tools of Yarn to help:
-
Configuring security for internal communication is exactly the same as in the example above.
-
To secure the REST endpoint, you need to issue the REST endpoint’s certificate such that it is valid for all hosts that the JobManager may get deployed to. This can be done with a wild card DNS name, or by adding multiple DNS names.
-
The easiest way to deploy keystores and truststore is by YARN client’s ship files option (
-yt
). Copy the keystore and truststore files into a local directory (saydeploy-keys/
) and start the YARN session as follows:flink run -m yarn-cluster -yt deploy-keys/ flinkapp.jar
-
When deployed using YARN, Flink’s web dashboard is accessible through YARN proxy’s Tracking URL. To ensure that the YARN proxy is able to access Flink’s HTTPS URL, you need to configure YARN proxy to accept Flink’s SSL certificates. For that, add the custom CA certificate into Java’s default truststore on the YARN Proxy node.