Purpose of this production readiness checklist is to provide a condensed overview of configuration options that are important and need careful considerations if you plan to bring your Flink job into production. For most of these options Flink provides out-of-the-box defaults to make usage and adoption of Flink easier. For many users and scenarios, those defaults are good starting points for development and completely sufficient for “one-shot” jobs.
However, once you are planning to bring a Flink appplication to production the requirements typically increase. For example, you want your job to be (re-)scalable and to have a good upgrade story for your job and new Flink versions.
In the following, we present a collection of configuration options that you should check before your job goes into production.
Maximum parallelism is a configuration parameter that is newly introduced in Flink 1.2 and has important implications for the (re-)scalability of your Flink job. This parameter, which can be set on a per-job and/or per-operator granularity, determines the maximum parallelism to which you can scale operators. It is important to understand that (as of now) there is no way to change this parameter after your job has been started, except for restarting your job completely from scratch (i.e. with a new state, and not from a previous checkpoint/savepoint). Even if Flink would provide some way to change maximum parallelism for existing savepoints in the future, you can already assume that for large states this is likely a long running operation that you want to avoid. At this point, you might wonder why not just to use a very high value as default for this parameter. The reason behind this is that high maximum parallelism can have some impact on your application’s performance and even state sizes, because Flink has to maintain certain metadata for its ability to rescale which can increase with the maximum parallelism. In general, you should choose a max parallelism that is high enough to fit your future needs in scalability, but keeping it as low as possible can give slightly better performance. In particular, a maximum parallelism higher that 128 will typically result in slightly bigger state snapshots from the keyed backends.
Notice that maximum parallelism must fulfill the following conditions:
0 < parallelism <= max parallelism <= 2^15
You can set the maximum parallelism by setMaxParallelism(int maxparallelism)
. By default, Flink will choose the maximum
parallelism as a function of the parallelism when the job is first started:
128
: for all parallelism <= 128.MIN(nextPowerOfTwo(parallelism + (parallelism / 2)), 2^15)
: for all parallelism > 128.As mentioned in the documentation for savepoints, users should set uids for
operators. Those operator uids are important for Flink’s mapping of operator states to operators which, in turn, is
essential for savepoints. By default operator uids are generated by traversing the JobGraph and hashing certain operator
properties. While this is comfortable from a user perspective, it is also very fragile, as changes to the JobGraph (e.g.
exchanging an operator) will result in new UUIDs. To establish a stable mapping, we need stable operator uids provided
by the user through setUid(String uid)
.
Currently, Flink has the limitation that it can only restore the state from a savepoint for the same state backend that took the savepoint. For example, this means that we can not take a savepoint with a memory state backend, then change the job to use a RocksDB state backend and restore. While we are planning to make backends interoperable in the near future, they are not yet. This means you should carefully consider which backend you use for your job before going to production.
In general, we recommend using RocksDB because this is currently the only state backend that supports large states (i.e. state that exceeds the available main memory) and asynchronous snapshots. From our experience, asynchronous snapshots are very important for large states because they do not block the operators and Flink can write the snapshots without stopping stream processing. However, RocksDB can have worse performance than, for example, the memory-based state backends. If you are sure that your state will never exceed main memory and blocking the stream processing to write it is not an issue, you could consider to not use the RocksDB backends. However, at this point, we strongly recommend using RocksDB for production.