Release notes - Flink 1.15 #
These release notes discuss important aspects, such as configuration, behavior, or dependencies, that changed between Flink 1.14 and Flink 1.15. Please read these notes carefully if you are planning to upgrade your Flink version to 1.15.
Summary of changed dependency names #
There are Several changes in Flink 1.15 that require updating dependency names when upgrading from earlier versions, mainly including the effort to opting-out Scala dependencies from non-scala modules and reorganize table modules. A quick checklist of the dependency changes is as follows:
Any dependency to one of the following modules needs to be updated to no longer include a suffix:
flink-cep flink-clients flink-connector-elasticsearch-base flink-connector-elasticsearch6 flink-connector-elasticsearch7 flink-connector-gcp-pubsub flink-connector-hbase-1.4 flink-connector-hbase-2.2 flink-connector-hbase-base flink-connector-jdbc flink-connector-kafka flink-connector-kinesis flink-connector-nifi flink-connector-pulsar flink-connector-rabbitmq flink-container flink-dstl-dfs flink-gelly flink-hadoop-bulk flink-kubernetes flink-runtime-web flink-sql-connector-elasticsearch6 flink-sql-connector-elasticsearch7 flink-sql-connector-hbase-1.4 flink-sql-connector-hbase-2.2 flink-sql-connector-kafka flink-sql-connector-kinesis flink-sql-connector-rabbitmq flink-state-processor-api flink-statebackend-rocksdb flink-streaming-java flink-test-utils flink-yarn flink-table-api-java-bridge flink-table-runtime flink-sql-client flink-orc flink-orc-nohive flink-parquet
For Table / SQL users, the new module
flink-table-planner_2.12and avoids the need for a Scala suffix. For backwards compatibility, users can still swap it with
flink-table-uberhas been split into
flink-table-runtime. Scala users need to explicitly add a dependency to
The detail of the involved issues are listed as follows.
Add support for opting-out of Scala #
The Java DataSet/-Stream APIs are now independent of Scala and no longer transitively depend on it.
The implications are the following:
If you only intend to use the Java APIs, with Java types, then you can opt-in to a Scala-free Flink by removing the
flink-scalajar from the
lib/directory of the distribution. You are then free to use any Scala version and Scala libraries. You can either bundle Scala itself in your user-jar; or put into the
lib/directory of the distribution.
If you relied on the Scala APIs, without an explicit dependency on them, then you may experience issues when building your projects. You can solve this by adding explicit dependencies to the APIs that you are using. This should primarily affect users of the Scala
A lot of modules have lost their Scala suffix. Further caution is advised when mixing dependencies from different Flink versions (e.g., an older connector), as you may now end up pulling in multiple versions of a single module (that would previously be prevented by the name being equal).
Reorganize table modules and introduce flink-table-planner-loader #
The new module
flink-table-planner_2.12 and avoids the need for a Scala suffix.
It is included in the Flink distribution under
lib/. For backwards compatibility, users can still swap it with
flink-table-planner_2.12 located in
opt/. As a consequence,
flink-table-uber has been split into
flink-sql-client has no Scala suffix anymore.
It is recommended to let new projects depend on
flink-table-planner-loader (without Scala suffix) in provided scope.
Note that the distribution does not include the Scala API by default.
Scala users need to explicitly add a dependency to
Remove flink-scala dependency from flink-table-runtime #
flink-table-runtime has no Scala suffix anymore.
Make sure to include
flink-scala if the legacy type system (based on TypeInformation) with case classes is still used within Table API.
flink-table uber jar should not include flink-connector-files dependency #
The table file system connector is not part of the
flink-table-uber JAR anymore but is a dedicated (but removable)
flink-connector-files JAR in the
/lib directory of a Flink distribution.
JDK Upgrade #
The support of Java 8 is now deprecated and will be removed in a future release (FLINK-25247). We recommend all users to migrate to Java 11.
The default Java version in the Flink docker images is now Java 11 (FLINK-25251). There are images built with Java 8, tagged with “java8”.
Drop support for Scala 2.11 #
Support for Scala 2.11 has been removed in
All Flink dependencies that (transitively)
depend on Scala are suffixed with the Scala version that they are built for, for
flink-streaming-scala_2.12. Users should update all Flink dependecies,
changing “2.11” to “2.12”.
Scala versions (2.11, 2.12, etc.) are not binary compatible with one another. That also means that there’s no guarantee that you can restore from a savepoint, made with a Flink Scala 2.11 application, if you’re upgrading to a Flink Scala 2.12 application. This depends on the data types that you have been using in your application.
The Scala Shell/REPL has been removed in FLINK-24360.
Table API & SQL #
Disable the legacy casting behavior by default #
The legacy casting behavior has been disabled by default. This might have
implications on corner cases (string parsing, numeric overflows, to string
representation, varchar/binary precisions). Set
table.exec.legacy-cast-behaviour=ENABLED to restore the old behavior.
Enforce CHAR/VARCHAR precision when outputting to a Sink #
VARCHAR lengths are enforced (trimmed/padded) by default now before entering
the table sink.
Support the new type inference in Scala Table API table functions #
Table functions that are called using Scala implicit conversions have been updated
to use the new type system and new type inference. Users are requested to update
their UDFs or use the deprecated
TableEnvironment.registerFunction to restore
the old behavior temporarily by calling the function via name.
Propagate executor config to TableConfig #
flink-conf.yaml and other configurations from outer layers (e.g. CLI) are now
TableConfig. Even though configuration set directly in
has still precedence, this change can have side effects if table configuration
was accidentally set in other layers.
Remove pre FLIP-84 methods #
The previously deprecated methods
TableEnvironment.explain have been removed. Please use
TableEnvironment.createStatementSet, as well as
Table.execute and the newly introduces classes
Fix parser generator warnings #
STATEMENT is a reserved keyword now. Use backticks to escape tables, fields and
Expose uid generator for DataStream/Transformation providers #
DataStreamSinkProvider for table connectors received
an additional method that might break implementations that used lambdas before.
We recommend static classes as a replacement and future robustness.
Add new STATEMENT SET syntax #
It is recommended to update statement sets to the new SQL syntax:
EXECUTE STATEMENT SET BEGIN ... END; EXPLAIN STATEMENT SET BEGIN ... END;
Check & possible fix decimal precision and scale for all Aggregate functions #
This changes the result of a decimal
SUM() with retraction and
AVG(). Part of the behavior
is restored back to be the same with 1.13 so that the behavior as a whole could be consistent
with Hive / Spark.
Clarify semantics of DecodingFormat and its data type #
DecodingFormat interface was used for both projectable and non-projectable
formats which led to inconsistent implementations. The
has been updated to distinguish between those two interfaces now. Users that
implement custom formats for
FileSystemTableSource might need to verify the
implementation and make sure to implement
ProjectableDecodingFormat if necessary.
Push down partitions before filters #
This might have an impact on existing table source implementations as push down filters might not contain partition predicates anymore. However, the connector implementation for table sources that implement both partition and filter push down became easier with this change.
SUM() causes a precision error
This changes the result of a decimal
SUM() between 1.14.0 and 1.14.1. It restores
the behavior of 1.13 to be consistent with Hive/Spark.
Use the new casting rules in TableResult#print #
The string representation of
BOOLEAN columns from DDL results
true/false -> TRUE/FALSE), and row columns in DQL results
+I[...] -> (...)) has changed for printing.
Casting from a string to a DATE and TIME allows incomplete strings #
The defaults for casting incomplete strings like
"12" to TIME have changed from
Casting from STRING to TIMESTAMP_LTZ looses fractional seconds #
TIMESTAMP(_LTZ) casting now considers fractional seconds.
Previously fractional seconds of any precision were ignored.
Sinks built with the unified sink framework do not receive timestamps when used in Table API #
This adds an additional operator to the topology if the new sink interfaces are used (e.g. for Kafka). It could cause issues in 1.14.1 when restoring from a 1.14 savepoint. A workaround is to cast the time attribute to a regular timestamp in the SQL statement closely before the sink.
SQL functions should return
STRING instead of
Functions that returned
VARCHAR(2000) in 1.14, return
VARCHAR with maximum
length now. In particular this includes:
SON_VALUE CHR REVERSE SPLIT_INDEX REGEXP_EXTRACT PARSE_URL FROM_UNIXTIME DECODE DATE_FORMAT CONVERT_TZ
Support IS JSON for Table API #
This issue added IS JSON for Table API. Notes that
IS JSON does not return
NULL anymore but always
FALSE (even if the argument is
Disable upsert into syntax in Flink SQL #
UPSERT INTO statement.
UPSERT INTO syntax was exposed by mistake in previous releases without detailed discussed.
From this release every
UPSERT INTO is going to throw an exception.
UPSERT INTO should use the documented
INSERT INTO statement instead.
RuntimeException: while resolving method ‘booleanValue’ in class class java.math.BigDecimal #
BOOLEAN is not allowed from decimal numeric types anymore.
Upsert materializer is not inserted for all sink providers #
This issue aims to fix various primary key issues that effectively made it impossible to use this feature. The change might affect savepoint backwards compatibility for those incorrect pipelines. Also the resulting changelog stream might be different after these changes. Pipelines that were correct before should be restorable from a savepoint.
Propagate unique keys for fromChangelogStream #
StreamTableEnvironment.fromChangelogStream might produce a different stream because
primary keys were not properly considered before.
TableResult#print() should use internal data types #
The results of
Table#print have changed to be closer to actual SQL data types.
E.g. decimal is printing correctly with leading/trailing zeros.
Remove MapR filesystem #
Support for the MapR FileSystem has been dropped.
Merge flink-connector-testing into flink-connector-test-utils #
flink-connector-testing module has been removed and users should use
flink-connector-test-utils module instead.
Support partition keys through metadata (for FileSystem connector) #
Now the formats implementing
BulkWriterFormatFactory don’t need to implement
partition keys reading anymore, as it’s managed internally by
Port ElasticSearch Sink to new Unified Sink API (FLIP-143) #
ElasticsearchSink.Builder and provides at-least-once writing with the
new unified sink interface supporting both batch and streaming mode of DataStream API.
For Elasticsearch 7 users that use the old ElasticsearchSink interface
and depend on their own elasticsearch-rest-high-level-client version,
updating the client dependency to a version >= 7.14.0 is required due to internal changes.
Reduce legacy in Table API connectors #
The old JDBC connector (indicated by
connector.type=jdbc in DDL) has been removed.
If not done already, users need to upgrade to the newer stack (indicated by
connector=jdbc in DDL).
Extensible unified Sink uses new metric to capture outgoing records #
numRecordsSendErrors have been introduced for users to monitor the number of
records sent to the external system. The
numRecordsOut should be used to monitor the number of records
transferred between sink tasks.
Connector developers should pay attention to the usage of these metrics numRecordsOut, numRecordsSend and numRecordsSendErrors while building sink connectors. Please refer to the new Kafka Sink for details. Additionally, since numRecordsOut now only counts the records sent between sink tasks and numRecordsOutErrors was designed for counting the records sent to the external system, we deprecated numRecordsOutErrors and recommend using numRecordsSendErrors instead.
Runtime & Coordination #
Integrate retry strategy for cleanup stage #
Adds retry logic to the cleanup steps of a finished job. This feature changes the way Flink jobs are cleaned up. Instead of trying once to clean up the job, this step will be repeated until it succeeds. Users are meant to fix the issue that prevents Flink from finalizing the job cleanup. The retry functionality can be configured and disabled. More details can be found in the documentation.
Introduce explicit shutdown signaling between
TaskManagers now explicitly send a signal to the
JobManager when shutting down. This reduces the down-scaling delay in reactive mode (which was previously bound to the heartbeat timeout).
TaskManagerJobMetricGroup with the last slot rather than task
Job metrics on the TaskManager are now removed when the last slot is released, rather than the last task. This means they may be reported for a longer time than before and when no tasks are running on the TaskManager.
Make errors happened during JobMaster initialization accessible through the exception history #
Fixes issue where the failover is not listed in the exception history but as a root
cause. That could have happened if the failure occurred during
DispatcherResourceManagerComponent fails to deregister application if no leading ResourceManager #
A new multiple component leader election service was implemented that only runs a single leader election per Flink process.
If this should cause any problems, then you can set
high-availability.use-old-ha-services: true in the
to use the old high availability services.
Allow idempotent job cancellation #
Attempting to cancel a
FINISHED/FAILED job now returns 409 Conflict instead of 404 Not Found.
Move async savepoint operation cache into Dispatcher #
JobManagers can now be queried for the status of a savepoint operation, irrespective of which
JobManager received the initial request.
Standby per job mode Dispatchers don’t know job’s JobSchedulingStatus #
The issue of re-submitting a job in Application Mode when the job finished but failed during cleanup is fixed through the introduction of the new component JobResultStore which enables Flink to persist the cleanup state of a job to the file system. (see FLINK-25431)
Change some default config values of blocking shuffle for better usability #
Since 1.15, sort-shuffle has become the default blocking shuffle implementation and shuffle data compression is enabled by default. These changes influence batch jobs only, for more information, please refer to the official document.
FLIP-193: Snapshots ownership #
When restoring from a savepoint or retained externalized checkpoint you can choose
the mode in which you want to perform the operation. You can choose from
LEGACY (the old behavior).
CLAIM mode Flink takes ownership of the snapshot and will potentially try to
remove the snapshot at a certain point in time. On the other hand the
mode will make sure Flink does not depend on the existence of any files belonging
to the initial snapshot.
For a more thorough description see the documentation.
Support native savepoints (w/o modifying the statebackend specific snapshot strategies) #
When taking a savepoint you can specify the binary format. You can choose from native (specific to a particular state backend) or canonical (unified across all state backends).
Prevent JM from discarding state on checkpoint abortion #
Shared state tracking changed to use checkpoint ID instead of reference counts. Shared state is not cleaned up on abortion anymore (but rather on subsumption or job termination).
This might result in delays in discarding the state of aborted checkpoints.
Introduce incremental/full checkpoint size stats #
Introduce metrics of persistent bytes within each checkpoint (via REST API and UI), which could help users to know how much data size had been persisted during the incremental or change-log based checkpoint.
Enables final checkpoint by default #
In 1.15 we enabled the support of checkpoints after part of tasks finished by default, and made tasks waiting for the final checkpoint before exit to ensure all data got committed.
However, it’s worth noting that this change forces tasks to wait for one more
checkpoint before exiting. In other words, this change will block the tasks until
the next checkpoint get triggered and completed. If the checkpoint interval is long,
the tasks' execution time would also be extended largely. In the worst case if the
checkpoint interval is
Long.MAX_VALUE, the tasks would be in fact blocked forever.
More information about this feature and how to disable it could be found in the documentation.
Migrate state processor API to DataStream API #
The State Processor API has been migrated from Flinks legacy DataSet API to now
run over DataStreams run under
Relocate RocksDB’s log under flink log directory by default #
The internal log of RocksDB would stay under flink’s log directory by default.
Dependency upgrades #
Upgrade the minimal supported hadoop version to 2.8.5 #
Minimal supported Hadoop client version is now 2.8.5 (version of the Flink runtime dependency). The client can still talk to older server versions as the binary protocol should be backward compatible.
Update Elasticsearch Sinks to latest minor versions #
Elasticsearch libraries used by the connector are bumped to 7.15.2 and 6.8.20 respectively.
For Elasticsearch 7 users that use the old ElasticsearchSink interface
depend on their own
elasticsearch-rest-high-level-client version, will need
to update the client dependency to a version >= 7.14.0 due to internal changes.
Drop support for Zookeeper 3.4 #
Support for using Zookeeper 3.4 for HA has been dropped. Users relying on Zookeeper need to upgrade to 3.5/3.6. By default Flink now uses a Zookeeper 3.5 client.
Upgrade Kafka dependency #
Kafka connector uses Kafka client 2.8.1 by default now.