Job Management

Job Lifecycle Management #

The core responsibility of the Flink operator is to manage the full production lifecycle of Flink jobs.

What is covered:

  • Running, suspending and deleting applications
  • Stateful and stateless application upgrades
  • Triggering savepoints

The behaviour is always controlled by the respective configuration fields of the JobSpec object as introduced in the FlinkDeployment overview.

Let’s take a look at these operations in detail.

Running, suspending and deleting applications #

By controlling the state field of the JobSpec users can define the desired state of the application.

Supported application states:

  • running : The job is expected to be running and processing data.
  • suspended : Data processing should be temporarily suspended, with the intention of continuing later.

Job State transitions

There are 4 possible state change scenarios when updating the current FlinkDeployment.

  • running -> running : Job upgrade operation. In practice, a suspend followed by a restore operation.
  • running -> suspended : Suspend operation. Stops the job while maintaining state information for stateful applications.
  • suspended -> running : Restore operation. Start the application from current state using the latest spec.
  • suspended -> suspended : Update spec, job is not started.

The way state is handled for suspend and restore operations is described in detail in the next section.

Cancelling/Deleting applications

As you can see there is no cancelled or deleted among the possible desired states. When users no longer wish to process data with a given FlinkDeployment they can simply delete the deployment object using the Kubernetes api:

kubectl delete flinkdeployment my-deployment
Deleting a deployment will remove all checkpoint and status information. Future deployments will from an empty state unless manually overriden by the user.

Stateful and stateless application upgrades #

When the spec changes for a FlinkDeployment the running Application or Session cluster must be upgraded. In order to do this the operator will stop the currently running job (unless already suspended) and redeploy it using the latest spec and state carried over from the previous run for stateful applications.

Users have full control on how state should be managed when stopping and restoring stateful applications using the upgradeMode setting of the JobSpec.

Supported values:stateless, savepoint, last-state

The upgradeMode setting controls both the stop and restore mechanisms as detailed in the following table:

Stateless Last State Savepoint
Config Requirement None Checkpointing & Kubernetes HA Enabled Savepoint directory defined
Job Status Requirement None* None* Job Running
Suspend Mechanism Cancel / Delete Delete Flink deployment (keep HA metadata) Cancel with savepoint
Restore Mechanism Deploy from empty state Recover last state using HA metadata Restore From savepoint

*In general no update can be executed while a savepoint operation is in progress

The three different upgrade modes are intended to support different use-cases:

  • stateless: Stateless applications, prototyping
  • last-state: Suitable for most stateful production applications. Quick upgrades in any application state (even for failing jobs), does not require a healthy job. Requires Flink Kubernetes HA configuration (see example below).
  • savepoint: Suitable for forking, migrating applications. Requires a healthy running job as it requires a savepoint operation before shutdown.

Full example using the last-state strategy:

apiVersion: flink.apache.org/v1alpha1
kind: FlinkDeployment
metadata:
  namespace: default
  name: basic-checkpoint-ha-example
spec:
  image: flink:1.14.3
  flinkVersion: v1_14
  flinkConfiguration:
    taskmanager.numberOfTaskSlots: "2"
    state.savepoints.dir: file:///flink-data/savepoints
    high-availability: org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
    high-availability.storageDir: file:///flink-data/ha
  jobManager:
    replicas: 1
    resource:
      memory: "2048m"
      cpu: 1
  taskManager:
    resource:
      memory: "2048m"
      cpu: 1
  podTemplate:
    spec:
      serviceAccount: flink
      containers:
        - name: flink-main-container
          volumeMounts:
          - mountPath: /flink-data
            name: flink-volume
      volumes:
      - name: flink-volume
        hostPath:
          # directory location on host
          path: /tmp/flink
          # this field is optional
          type: Directory
  job:
    jarURI: local:///opt/flink/examples/streaming/StateMachineExample.jar
    parallelism: 2
    upgradeMode: last-state
    state: running

Savepoint management #

Savepoints can be triggered manually by defining a random (nonce) value to the variable savepointTriggerNonce in the job specification:

 job:
    jarURI: local:///opt/flink/examples/streaming/StateMachineExample.jar
    parallelism: 2
    upgradeMode: savepoint
    state: running
    savepointTriggerNonce: 123

Changing the nonce value will trigger a new savepoint. Information about pending and last savepoint is stored in the FlinkDeployment status.