When deciding how and where to run Flink, there’s a wide range of options available.
Flink can execute applications in one of three ways:
The above modes differ in:
main()
method is executed on the client or on the cluster.Session mode assumes an already running cluster and uses the resources of that cluster to execute any submitted application. Applications executed in the same (session) cluster use, and consequently compete for, the same resources. This has the advantage that you do not pay the resource overhead of spinning up a full cluster for every submitted job. But, if one of the jobs misbehaves or brings down a Task Manager, then all jobs running on that Task Manager will be affected by the failure. This, apart from a negative impact on the job that caused the failure, implies a potential massive recovery process with all the restarting jobs accessing the filesystem concurrently and making it unavailable to other services. Additionally, having a single cluster running multiple jobs implies more load for the JobManager, who is responsible for the book-keeping of all the jobs in the cluster.
Aiming at providing better resource isolation guarantees, the Per-Job mode uses the available cluster manager framework (e.g. YARN, Kubernetes) to spin up a cluster for each submitted job. This cluster is available to that job only. When the job finishes, the cluster is torn down and any lingering resources (files, etc) are cleared up. This provides better resource isolation, as a misbehaving job can only bring down its own Task Managers. In addition, it spreads the load of book-keeping across multiple JobManagers, as there is one per job. For these reasons, the Per-Job resource allocation model is the preferred mode by many production reasons.
In all the above modes, the application’s main()
method is executed on the client side. This process
includes downloading the application’s dependencies locally, executing the main()
to extract a representation
of the application that Flink’s runtime can understand (i.e. the JobGraph
) and ship the dependencies and
the JobGraph(s)
to the cluster. This makes the Client a heavy resource consumer as it may need substantial
network bandwidth to download dependencies and ship binaries to the cluster, and CPU cycles to execute the
main()
. This problem can be more pronounced when the Client is shared across users.
Building on this observation, the Application Mode creates a cluster per submitted application, but this time,
the main()
method of the application is executed on the JobManager. Creating a cluster per application can be
seen as creating a session cluster shared only among the jobs of a particular application, and torn down when
the application finishes. With this architecture, the Application Mode provides the same resource isolation
and load balancing guarantees as the Per-Job mode, but at the granularity of a whole application. Executing
the main()
on the JobManager allows for saving the CPU cycles required, but also save the bandwidth required
for downloading the dependencies locally. Furthermore, it allows for more even spread of the network load of
downloading the dependencies of the applications in the cluster, as there is one JobManager per application.
main()
is executed on the cluster and not on the client,
as in the other modes. This may have implications for your code as, for example, any paths you register in
your environment using the registerCachedFile()
must be accessible by the JobManager of your application.
Compared to the Per-Job mode, the Application Mode allows the submission of applications consisting of
multiple jobs. The order of job execution is not affected by the deployment mode but by the call used
to launch the job. Using execute()
, which is blocking, establishes an order and it will lead to the
execution of the “next” job being postponed until “this” job finishes. Using executeAsync()
, which is
non-blocking, will lead to the “next” job starting before “this” job finishes.
execute()
applications but
High-Availability is not supported in these cases. High-Availability in Application Mode is only
supported for single-execute()
applications.
In Session Mode, the cluster lifecycle is independent of that of any job running on the cluster
and the resources are shared across all jobs. The Per-Job mode pays the price of spinning up a cluster
for every submitted job, but this comes with better isolation guarantees as the resources are not shared
across jobs. In this case, the lifecycle of the cluster is bound to that of the job. Finally, the
Application Mode creates a session cluster per application and executes the application’s main()
method on the cluster.
Apache Flink ships with first class support for a number of common deployment targets.
A number of vendors offer managed or fully hosted Flink solutions. None of these vendors are officially supported or endorsed by the Apache Flink PMC. Please refer to vendor maintained documentation on how to use these products.
Supported Environments: AliCloud
Supported Environments: AWS
Supported Environments: AWS
Supported Environment: AWS Azure Google Cloud On-Premise
Supported Environment: AWS
Supported Environment: Huawei Cloud
Supported Environments: AliCloud AWS Azure Google Cloud On-Premise
Flink provides several approaches for providing dependencies (such as *.jar
files or static data) to Flink or user-provided
applications. These approaches differ based on the deployment mode and target, but also have commonalities, which are described here.
To provide a dependency, there are the following options:
files in the lib/
folder are added to the classpath used to start Flink. It is suitable for libraries such as Hadoop or file systems not available as plugins. Beware that classes added here can potentially interfere with Flink, for example if you are adding a different version of a library already provided by Flink.
plugins/<name>/
are loaded at runtime by Flink through separate classloaders to avoid conflicts with classes loaded and used by Flink. Only jar files which are prepared as plugins can be added here.
If you need to extend the Flink with a Maven dependency (and its transitive dependencies), you can use an Apache Maven pom.xml file to download all required files into a local folder:
pom.xml:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.apache.flink</groupId>
<artifactId>docker-dependencies</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<!-- Put your dependency here, for example a Hadoop GCS connector -->
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-dependency-plugin</artifactId>
<version>3.1.2</version>
<executions>
<execution>
<id>copy-dependencies</id>
<phase>package</phase>
<goals><goal>copy-dependencies</goal></goals>
<configuration><outputDirectory>jars</outputDirectory></configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
Running mvn package
in the same directory will create a jars/
folder containing all the jar files,
which you can add to the desired folder, Docker image etc.