In addition to CPU and memory, many workloads also need some other resources, e.g. GPUs for deep learning. To support external resources, Flink provides an external resource framework. The framework supports requesting various types of resources from the underlying resource management systems (e.g., Kubernetes), and supplies information needed for using these resources to the operators. Different resource types can be supported. You can either leverage built-in plugins provided by Flink (currently only for GPU support), or implement your own plugins for custom resource types.
In general, the external resource framework does two things:
Set the corresponding fields of the resource requests (for requesting resources from the underlying system) with respect to your configuration.
Provide operators with the information needed for using the resources.
When deployed on resource management systems (Kubernetes / Yarn), the external resource framework will ensure that the allocated pod/container will contain the desired external resources. Currently, many resource management systems support external resources. For example, Kubernetes supports GPU, FPGA, etc. through its Device Plugin mechanism since v1.10, and Yarn supports GPU and FPGA resources since 2.10 and 3.1. External resources are not supported by Flink’s Mesos integration at the moment. In Standalone mode, the user has to ensure that the external resources are available.
The external resource framework will provide the corresponding information to operators. The external resource information, which contains the basic properties needed for using the resources, is generated by the configured external resource drivers.
To enable an external resource with the external resource framework, you need to:
Prepare the external resource plugin.
Set configurations for the external resource.
Get the external resource information from RuntimeContext
and use it in your operators.
You need to prepare the external resource plugin and put it into the plugins/
folder of your Flink distribution, see
Flink Plugins. Apache Flink provides a first-party plugin for GPU resources. You can also
implement a plugin for your custom resource type.
First, you need to add resource names for all the external resource types to the external resource list (with the configuration key ‘external-resources’) with delimiter “;”, e.g. “external-resources: gpu;fpga” for two external resources “gpu” and “fpga”. Only the <resource_name> defined here will go into effect in the external resource framework.
For each external resource, you could configure the below options. The <resource_name> in all the below configuration options corresponds to the name listed in the external resource list:
Amount (external.<resource_name>.amount
): This is the quantity of the external resource that should be requested from the external system.
Config key in Yarn (external-resource.<resource_name>.yarn.config-key
): optional. If configured, the external
resource framework will add this key to the resource profile of container requests for Yarn. The value will be set to the
value of external-resource.<resource_name>.amount
.
Config key in Kubernetes (external-resource.<resource_name>.kubernetes.config-key
): optional. If configured,
external resource framework will add resources.limits.<config-key>
and resources.requests.<config-key>
to the main
container spec of TaskManager and set the value to the value of external-resource.<resource_name>.amount
.
Driver Factory (external-resource.<resource_name>.driver-factory.class
): optional. Defines the factory class
name for the external resource identified by <resource_name>. If configured, the factory will be used to instantiate
drivers in the external resource framework. If not configured, the requested resource will still exist in the TaskManager
as long as the relevant options are configured. However, the operator will not get any information of the resource from RuntimeContext
in that case.
Driver Parameters (external-resource.<resource_name>.param.<param>
): optional. The naming pattern of custom
config options for the external resource specified by <resource_name>. Only the configurations that follow this pattern
will be passed into the driver factory of that external resource.
An example configuration that specifies two external resources:
To use the resources, operators need to get the ExternalResourceInfo
set from the RuntimeContext
. ExternalResourceInfo
wraps the information needed for using the resource, which can be retrieved with getProperty
. What properties are available
and how to access the resource with the properties depends on the specific plugin.
Operators can get the ExternalResourceInfo
set of a specific external resource from RuntimeContext
or FunctionContext
by
getExternalResourceInfos(String resourceName)
. The resourceName
here should have the same value as the name configured in the
external resource list. It can be used as follows:
Each ExternalResourceInfo
contains one or more properties with keys representing the different dimensions of the resource.
You could get all valid keys by ExternalResourceInfo#getKeys
.
To implement a plugin for your custom resource type, you need to:
Add your own external resource driver by implementing the org.apache.flink.api.common.externalresource.ExternalResourceDriver
interface.
Add a driver factory, which instantiates the driver, by implementing the org.apache.flink.api.common.externalresource.ExternalResourceDriverFactory
.
Add a service entry. Create a file META-INF/services/org.apache.flink.api.common.externalresource.ExternalResourceDriverFactory
which contains the class name of your driver factory class (see the Java Service Loader docs for more details).
For example, to implement a plugin for external resource named “FPGA”, you need to implement FPGADriver
and FPGADriverFactory
first:
Create a file with name org.apache.flink.api.common.externalresource.ExternalResourceDriverFactory
in META-INF/services/
and write the factory class name (e.g. your.domain.FPGADriverFactory
) to it.
Then, create a jar which includes FPGADriver
, FPGADriverFactory
, META-INF/services/
and all the external dependencies.
Make a directory in plugins/
of your Flink distribution with an arbitrary name, e.g. “fpga”, and put the jar into this directory.
See Flink Plugin for more details.
Currently, Flink supports GPUs as external resources.
We provide a first-party plugin for GPU resources. The plugin leverages a discovery script to discover indexes of GPU devices, which can be accessed from the resource information via the property “index”. We provide a default discovery script that can be used to discover NVIDIA GPUs. You can also provide your custom script.
We provide an example which shows how to use the GPUs to do matrix-vector multiplication in Flink.
To make GPU resources accessible, certain prerequisites are needed depending on your environment:
For standalone mode, administrators should ensure the NVIDIA driver is installed and GPU resources are accessible on all the nodes in the cluster.
For Yarn deployment, administrators should configure the Yarn cluster to enable GPU scheduling. Notice the required Hadoop version is 2.10+ or 3.1+.
For Kubernetes deployment, administrators should make sure the NVIDIA GPU device plugin is installed. Notice the required version is 1.10+. At the moment, Kubernetes only supports NVIDIA GPU and AMD GPU. Flink only provides discovery script for NVIDIA GPUs, but you can provide a custom discovery script for AMD GPUs yourself, see Discovery script.
As mentioned in Enable external resources for your workload, you also need to do two things to enable GPU resources:
Configure the GPU resource.
Get the information of GPU resources, which contains the GPU index as property with key “index”, in operators.
For the GPU plugin, you need to specify the common external resource configurations:
external-resources
: You need to append your resource name (e.g. gpu) for GPU resources to it.
external-resource.<resource_name>.amount
: The amount of GPU devices per TaskManager.
external-resource.<resource_name>.yarn.config-key
: For Yarn, the config key of GPU is yarn.io/gpu
. Notice that
Yarn only supports NVIDIA GPU at the moment.
external-resource.<resource_name>.kubernetes.config-key
: For Kubernetes, the config key of GPU is <vendor>.com/gpu
.
Currently, “nvidia” and “amd” are the two supported vendors. Notice that if you use AMD GPUs, you need to provide a discovery
script yourself, see Discovery script.
external-resource.
In addition, there are some specific configurations for the GPU plugin:
external-resource.<resource_name>.param.discovery-script.path
: The path of the discovery script. It
can either be an absolute path, or a relative path to FLINK_HOME
when defined or current directory otherwise. If not
explicitly configured, the default script will be used.
external-resource.<resource_name>.param.discovery-script.args
: The arguments passed to the discovery script. For the default
discovery script, see Default Script for the available parameters.
An example configuration for GPU resource:
The GPUDriver
leverages a discovery script to discover GPU resources and generate the GPU resource information.
We provide a default discovery script for NVIDIA GPU, located at plugins/external-resource-gpu/nvidia-gpu-discovery.sh
of your
Flink distribution. The script gets the indexes of visible GPU resources through the nvidia-smi
command. It tries to return
the required amount (specified by external-resource.<resource_name>.amount
) of GPU indexes in a list, and exit with non-zero if the amount cannot be satisfied.
For standalone mode, multiple TaskManagers might be co-located on the same machine, and each GPU device is visible to all the TaskManagers. The default discovery script supports a coordination mode, in which it leverages a coordination file to synchronize the allocation state of GPU devices and ensure each GPU device can only be used by one TaskManager process. The relevant arguments are:
--enable-coordination-mode
: Enable the coordination mode. By default the coordination mode is disabled.
--coordination-file filePath
: The path of the coordination file used to synchronize the allocation state of GPU resources. The default path is /var/tmp/flink-gpu-coordination
.
You can also provide a discovery script to address your custom requirements, e.g. discovering AMD GPU. Please make sure
the path of your custom script is accessible to Flink and configured (external-resource.<resource_name>.param.discovery-script.path
) correctly.
The contract of the discovery script:
GPUDriver
passes the amount (specified by external-resource.<resource_name>.amount
) as the first argument into the script.
The user-defined arguments in external-resource.<resource_name>.param.discovery-script.args
would be appended after it.
The script should return a list of the available GPU indexes, split by a comma. Whitespace only indexes will be ignored.
The script can also suggest that the discovery is not properly performed, by exiting with non-zero. In that case, no gpu information will be provided to operators.