Release Notes - Flink 1.13
This documentation is for an unreleased version of Apache Flink. We recommend you use the latest stable version.

Release Notes - Flink 1.13 #

These release notes discuss important aspects, such as configuration, behavior, or dependencies, that changed between Flink 1.12 and Flink 1.13. Please read these notes carefully if you are planning to upgrade your Flink version to 1.13.

Failover #

Remove state.backend.async option. #

The state.backend.async option is deprecated. Snapshots are always asynchronous now (as they were by default before) and there is no option to configure a synchronous snapshot any more.

The constructors of FsStateBackend and MemoryStateBackend that take a flag for sync/async snapshots are kept for API compatibility, but the flags are ignored now.

Disentangle StateBackends from Checkpointing #

Flink has always separated local state storage from fault tolerance. Keyed state is maintained locally in state backends, either on the JVM heap or in embedded RocksDB instances. Fault tolerance comes from checkpoints and savepoints - periodic snapshots of a job’s internal state to some durable file system - such as Amazon S3 or HDFS.

Historically, Flink’s StateBackend interface intermixed these concepts in a way that confused many users. In 1.13, checkpointing configurations have been extracted into their own interface, CheckpointStorage.

This change does not affect the runtime behavior and simply provides a better mental model to users. Pipelines can be updated to use the new the new abstractions without losing state, consistency, or change in semantics.

Please follow the migration guide or the JavaDoc on the deprecated state backend classes - MemoryStateBackend, FsStateBackend and RocksDBStateBackend for migration details.

Unify binary format for Keyed State savepoints #

Flink’s savepoint binary format is unified across all state backends. That means you can take a savepoint with one state backend and then restore it using another.

If you want to switch the state backend you should first upgrade your Flink version to 1.13, then take a savepoint with the new version, and only after that, you can restore it with a different state backend.

FailureRateRestartBackoffTimeStrategy allows one less restart than configured #

The Failure Rate Restart Strategy was allowing 1 less restart per interval than configured. Users wishing to keep the current behavior should reduce the maximum number of allowed failures per interval by 1.

Support rescaling for Unaligned Checkpoints #

While recovering from unaligned checkpoints, users can now change the parallelism of the job. This change allows users to quickly upscale the job under backpressure.

SQL #

Officially deprecate the legacy planner #

The old planner of the Table & SQL API is deprecated and will be dropped in Flink 1.14. This means that both the BatchTableEnvironment and DataSet API interop are reaching end of life. Use the unified TableEnvironment for batch and stream processing with the new planner, or the DataStream API in batch execution mode.

Use TIMESTAMP_LTZ as return type for function PROCTIME() #

Before Flink 1.13, the function return type of PROCTIME() is TIMESTAMP, and the return value is the TIMESTAMP in UTC time zone, e.g. the wall-clock shows 2021-03-01 12:00:00 at Shanghai, however the PROCTIME() displays 2021-03-01 04:00:00 which is wrong. Flink 1.13 fixes this issue and uses TIMESTAMP_LTZ type as return type of PROCTIME(), users don’t need to deal time zone problems anymore.

Support defining event time attribute on TIMESTAMP_LTZ column #

Support defining event time attribute on TIMESTAMP_LTZ column, base on this, Flink SQL gracefully support the Daylight Saving Time.

Correct function CURRENT_TIMESTAMP/CURRENT_TIME/CURRENT_DATE/LOCALTIME/LOCALTIMESTAMP/NOW() #

The value of time function CURRENT_TIMESTAMP and NOW() are corrected from UTC time with TIMESTAMP type to epoch time with TIMESTAMP_LTZ type. Time function LOCALTIME, LOCALTIMESTAMP, CURRENT_DATE, CURRENT_TIME, CURRENT_TIMESTAMP and NOW() are corrected from evaluates for per record in batch mode to evaluate once at query-start for batch job.

Disable problematic cast conversion between NUMERIC type and TIMESTAMP type #

The CAST operation between NUMERIC type and TIMESTAMP type is problematic and is disabled now, e.g. CAST(numeric AS TIMESTAMP(3)) is disabled and should use TO_TIMESTAMP(FROM_UNIXTIME(numeric)) instead.

Support USE MODULES syntax #

The term MODULES is a reserved keyword now. Use backticks to escape column names and other identifiers with this name.

Update TableResult.collect()/TableResult.print() to the new type system #

Table.execute().collect() might return slightly different results for column types and row kind. The most important differences include:

  • Structured types are represented as POJOs of the original class and not Row anymore.
  • Raw types are serialized according to the configuration in TableConfig.

Add new StreamTableEnvironment.fromDataStream #

StreamTableEnvironment.fromDataStream has slightly different semantics now because it has been integrated into the new type system. Esp. row fields derived from composite type information might be in a different order compared to 1.12. The old behavior is still available via the overloaded method that takes expressions like fromDataStream(ds, $("field1"), $("field2")).

Update the Row.toString method #

The Row.toSting() method has been reworked. This is an incompatible change. If the legacy representation is still required for tests, the old behavior can be restored via the flag RowUtils.USE_LEGACY_TO_STRING for the local JVM. However, relying on the row’s string representation for tests is not a good idea in general as field data types are not verified.

Support start SQL Client with an initialization SQL file #

The sql-client-defaults.yaml YAML file is deprecated and not provided in the release package. To be compatible, it’s still supported to initialize the SQL Client with the YAML file if manually provided. But it’s recommend to use the new introduced -i startup option to execute an initialization SQL file to setup the SQL Client session. The so-called initialization SQL file can use Flink DDLs to define available catalogs, table sources and sinks, user-defined functions, and other properties required for execution and deployment. The support of legacy SQL Client YAML file will be totally dropped in Flink 1.14.

Hive dialect supports HiveQL for DML and DQL. Please switch to default dialect in order to write in Flink syntax.

Runtime #

BoundedOneInput.endInput is called when taking synchronous savepoint #

endInput() is not called anymore (on BoundedOneInput and BoundedMultiInput) when the job is stopping with savepoint.

Remove JobManagerOptions.SCHEDULING_STRATEGY #

The configuration parameter jobmanager.scheduler.scheduling-strategy has been removed, because the legacy scheduler has been removed from Flink 1.13.0.

Warn user if System.exit() is called in user code #

A new configuration value cluster.intercept-user-system-exit allows to log a warning, or throw an exception if user code calls System.exit().

This feature is not covering all locations in Flink where user code is executed. It just adds the infrastructure for such an interception. We are tracking this improvement in FLINK-21307.

MiniClusterJobClient#getAccumulators was infinitely blocking in local environment for a streaming job #

The semantics for accumulators have now changed in MiniClusterJobClient to fix this bug and comply with other JobClient implementations: Previously MiniClusterJobClient assumed that getAccumulator() was called on a bounded pipeline and that the user wanted to acquire the final accumulator values after the job is finished. But now it returns the current value of accumulators immediately to be compatible with unbounded pipelines.

If it is run on a bounded pipeline, then to get the final accumulator values after the job is finished, one needs to call

getJobExecutionResult().thenApply(JobExecutionResult::getAllAccumulatorResults)

Docker #

Consider removing automatic configuration fo number of slots from docker #

The docker images no longer set the default number of taskmanager slots to the number of CPU cores. This behavior was inconsistent with all other deployment methods and ignored any limits on the CPU usage set via docker.

Rework jemalloc switch to use an environment variable #

The docker switch for disabling the jemalloc memory allocator has been reworked from a script argument to an environment variable called DISABLE_JEMALLOC. If set to “true” jemalloc will not be enabled.

Connectors #

Remove swift FS filesystem #

The Swift filesystem is no longer being actively developed and has been removed from the project and distribution.

The unified source API for connectors has a minor breaking change. The SplitEnumerator.snapshotState() method was adjusted to accept the Checkpoint ID of the checkpoint for which the snapshot is created.

Monitoring & debugging #

Introduce latency tracking state #

State access latency metrics are introduced to track all kinds of keyed state access to help debug state performance. This feature is not enabled by default and can be turned on by setting state.backend.latency-track.keyed-state-enabled to true.

Support for CPU flame graphs in web UI #

Flink now offers flame graphs for each node in the job graph. Please enable this experimental feature by setting the respective configuration flag rest.flamegraph.enabled.

Display last n exceptions/causes for job restarts in Web UI #

Flink exposes the exception history now through the REST API and the UI. The amount of most-recently handled exceptions that shall be tracked can be defined through web.exception-history-size. Some values of the exception history’s REST API Json response are deprecated as part of this effort.

Create backPressuredTimeMsPerSecond metric #

Previously idleTimeMsPerSecond was defined as the time task spent waiting for either the input or the back pressure. Now idleTimeMsPerSecond excludes back pressured time, so if the task is back pressured it is not idle. The back pressured time is now measured separately as backPressuredTimeMsPerSecond.

Enable log4j2 monitor interval by default #

The Log4j support for updating the Log4j configuration at runtime has been enabled by default. The configuration files are checked for changes every 30 seconds.

ZooKeeper quorum fails to start due to missing log4j library #

The Zookeeper scripts in the Flink distribution have been modified to disable the Log4j JMX integration due to an incompatibility between Zookeeper 3.4 and Log4j 2. To re-enable this feature, remove the line in the zookeeper.sh file that sets zookeeper.jmx.log4j.disable.

Expose stage of task initialization #

Task’s RUNNING state was split into two states: INITIALIZING and RUNNING. Task is INITIALIZING while state is initialising and in case of unaligned checkpoints, until all the in-flight data has been recovered.

Deployment #

Officially deprecate Mesos support #

The community decided to deprecate the Apache Mesos support for Apache Flink. It is subject to removal in the future. Users are encouraged to switch to a different resource manager.