Release Notes - Flink 1.8

Release Notes - Flink 1.8 #

These release notes discuss important aspects, such as configuration, behavior, or dependencies, that changed between Flink 1.7 and Flink 1.8. Please read these notes carefully if you are planning to upgrade your Flink version to 1.8.

State #

Continuous incremental cleanup of old Keyed State with TTL #

We introduced TTL (time-to-live) for Keyed state in Flink 1.6 (FLINK-9510). This feature allowed to clean up and make inaccessible keyed state entries when accessing them. In addition state would now also being cleaned up when writing a savepoint/checkpoint.

Flink 1.8 introduces continuous cleanup of old entries for both the RocksDB state backend (FLINK-10471) and the heap state backend (FLINK-10473). This means that old entries (according to the ttl setting) are continuously being cleanup up.

New Support for Schema Migration when restoring Savepoints #

With Flink 1.7.0 we added support for changing the schema of state when using the AvroSerializer (FLINK-10605). With Flink 1.8.0 we made great progress migrating all built-in TypeSerializers to a new serializer snapshot abstraction that theoretically allows schema migration. Of the serializers that come with Flink, we now support schema migration for the PojoSerializer (FLINK-11485), and Java EnumSerializer (FLINK-11334), As well as for Kryo in limited cases (FLINK-11323).

Savepoint compatibility #

Savepoints from Flink 1.2 that contain a Scala TraversableSerializer are not compatible with Flink 1.8 anymore because of an update in this serializer (FLINK-11539). You can get around this restriction by first upgrading to a version between Flink 1.3 and Flink 1.7 and then updating to Flink 1.8.

We needed to switch to a custom build of RocksDB called FRocksDB because we need certain changes in RocksDB for supporting continuous state cleanup with TTL. The used build of FRocksDB is based on the upgraded version 5.17.2 of RocksDB. For Mac OS X, RocksDB version 5.17.2 is supported only for OS X version >= 10.13. See also: https://github.com/facebook/rocksdb/issues/4862.

Maven Dependencies #

Convenience binaries that include hadoop are no longer released.

If a deployment relies on flink-shaded-hadoop2 being included in flink-dist, then you must manually download a pre-packaged Hadoop jar from the optional components section of the download page and copy it into the /lib directory. Alternatively, a Flink distribution that includes hadoop can be built by packaging flink-dist and activating the include-hadoop maven profile.

As hadoop is no longer included in flink-dist by default, specifying -DwithoutHadoop when packaging flink-dist no longer impacts the build.

Configuration #

TaskManagers now bind to the host IP address instead of the hostname by default . This behaviour can be controlled by the configuration option taskmanager.network.bind-policy. If your Flink cluster should experience inexplicable connection problems after upgrading, try to set taskmanager.network.bind-policy: name in your flink-conf.yaml to return to the pre-1.8 behaviour.

Table API #

Flink 1.8 deprecates direct usage of the constructor of the Table class in the Table API. This constructor would previously be used to perform a join with a lateral table. You should now use table.joinLateral() or table.leftOuterJoinLateral() instead.

This change is necessary for converting the Table class into an interface, which will make the API more maintainable and cleaner in the future.

This release introduces a new format descriptor for CSV files that is compliant with RFC 4180. The new descriptor is available as org.apache.flink.table.descriptors.Csv. For now, this can only be used together with the Kafka connector. The old descriptor is available as org.apache.flink.table.descriptors.OldCsv for use with file system connectors.

In order to separate API from actual implementation, the static methods TableEnvironment.getTableEnvironment() are deprecated. You should now use Batch/StreamTableEnvironment.create() instead.

Users that had a flink-table dependency before, need to update their dependencies to flink-table-planner and the correct dependency of flink-table-api-*, depending on whether Java or Scala is used: one of flink-table-api-java-bridge or flink-table-api-scala-bridge.

ExternalCatalogTable.builder() is deprecated in favour of ExternalCatalogTableBuilder().

The naming scheme for kafka/elasticsearch6 sql-jars has been changed.

In maven terms, they no longer have the sql-jar qualifier and the artifactId is now prefixed with flink-sql instead of flink, e.g., flink-sql-connector-kafka....

Null literals in the Table API need to be defined with nullOf(type) instead of Null(type) from now on. The old approach is deprecated.

Connectors #

For the Flink KafkaConsumers, we introduced a new KafkaDeserializationSchema that gives direct access to the Kafka ConsumerRecord. This subsumes the KeyedSerializationSchema functionality, which is deprecated but still available for now.

Starting from Flink 1.8.0, the FlinkKafkaConsumer now always filters out restored partitions that are no longer associated with a specified topic to subscribe to in the restored execution. This behaviour did not exist in previous versions of the FlinkKafkaConsumer. If you wish to retain the previous behaviour, please use the disableFilterRestoredPartitionsWithSubscribedTopics() configuration method on the FlinkKafkaConsumer.

Consider this example: if you had a Kafka Consumer that was consuming from topic A, you did a savepoint, then changed your Kafka consumer to instead consume from topic B, and then restarted your job from the savepoint. Before this change, your consumer would now consume from both topic A and B because it was stored in state that the consumer was consuming from topic A. With the change, your consumer would only consume from topic B after restore because we filter the topics that are stored in state using the configured topics.

Miscellaneous Interface changes #

The canEqual() methods are usually used to make proper equality checks across hierarchies of types. The TypeSerializer actually doesn’t require this property, so the method is now removed.

The CompositeSerializerSnapshot utility class has been removed. You should now use CompositeTypeSerializerSnapshot instead, for snapshots of composite serializers that delegate serialization to multiple nested serializers. Please see here for instructions on using CompositeTypeSerializerSnapshot.

Memory management #

In Flink 1.8.0 and prior version, the managed memory fraction of taskmanager is controlled by taskmanager.memory.fraction, and with 0.7 as the default value. However, sometimes this will cause OOMs due to the fact that the default value of JVM parameter NewRatio is 2, which means the old generation occupied only 2/3 (0.66) of the heap memory. So if you run into this case, please manually change this value to a lower value.

Back to top