In a nutshell, this feature exposes Flink’s managed keyed (partitioned) state (see Working with State) to the outside world and allows the user to query a job’s state from outside Flink. For some scenarios, queryable state eliminates the need for distributed operations/transactions with external systems such as key-value stores which are often the bottleneck in practice. In addition, this feature may be particularly useful for debugging purposes.
FsStateBackend, does not work with copies when retrieving values but instead directly references the stored values, read-modify-write patterns are unsafe and may cause the queryable state server to fail due to concurrent modifications. The
RocksDBStateBackendis safe from these issues.
Before showing how to use the Queryable State, it is useful to briefly describe the entities that compose it. The Queryable State feature consists of three main entities:
QueryableStateClient, which (potentially) runs outside the Flink cluster and submits the user queries,
QueryableStateClientProxy, which runs on each
TaskManager(i.e. inside the Flink cluster) and is responsible for receiving the client’s queries, fetching the requested state from the responsible Task Manager on his behalf, and returning it to the client, and
QueryableStateServerwhich runs on each
TaskManagerand is responsible for serving the locally stored state.
The client connects to one of the proxies and sends a request for the state associated with a specific
k. As stated in Working with State, keyed state is organized in
Key Groups, and each
TaskManager is assigned a number of these key groups. To discover which
responsible for the key group holding
k, the proxy will ask the
JobManager. Based on the answer, the proxy will
then query the
QueryableStateServer running on that
TaskManager for the state associated with
k, and forward the
response back to the client.
To enable queryable state on your Flink cluster, you need to do the following:
opt/folder of your Flink distribution, to the
true. See the Configuration documentation for details and additional parameters.
To verify that your cluster is running with queryable state enabled, check the logs of any
task manager for the line:
"Started the Queryable State Proxy Server @ ...".
Now that you have activated queryable state on your cluster, it is time to see how to use it. In order for a state to be visible to the outside world, it needs to be explicitly made queryable by using:
QueryableStateStream, a convenience object which acts as a sink and offers its incoming values as queryable state, or
stateDescriptor.setQueryable(String queryableStateName)method, which makes the keyed state represented by the state descriptor, queryable.
The following sections explain the use of these two approaches.
.asQueryableState(stateName, stateDescriptor) on a
KeyedStream returns a
QueryableStateStream which offers
its values as queryable state. Depending on the type of state, there are the following variants of the
ListStatesink as it would result in an ever-growing list which may not be cleaned up and thus will eventually consume too much memory.
QueryableStateStream can be seen as a sink and cannot be further transformed. Internally, a
QueryableStateStream gets translated to an operator which uses all incoming records to update the queryable state
instance. The updating logic is implied by the type of the
StateDescriptor provided in the
In a program like the following, all records of the keyed stream will be used to update the state instance via the
This acts like the Scala API’s
Managed keyed state of an operator
(see Using Managed Keyed State)
can be made queryable by making the appropriate state descriptor queryable via
StateDescriptor.setQueryable(String queryableStateName), as in the example below:
queryableStateNameparameter may be chosen arbitrarily and is only used for queries. It does not have to be identical to the state's own name.
This variant has no limitations as to which type of state can be made queryable. This means that this can be used for
So far, you have set up your cluster to run with queryable state and you have declared (some of) your state as queryable. Now it is time to see how to query this state.
For this you can use the
QueryableStateClient helper class. This is available in the
jar which must be explicitly included as a dependency in the
pom.xml of your project along with
flink-core, as shown below:
For more on this, you can check how to set up a Flink program.
QueryableStateClient will submit your query to the internal proxy, which will then process your query and return
the final result. The only requirement to initialize the client is to provide a valid
TaskManager hostname (remember
that there is a queryable state proxy running on each task manager) and the port where the proxy listens. More on how
to configure the proxy and state server port(s) in the Configuration Section.
With the client ready, to query a state of type
V, associated with a key of type
K, you can use the method:
The above returns a
CompletableFuture eventually holding the state value for the queryable state instance identified
queryableStateName of the job with ID
key is the key whose state you are interested in and the
keyTypeInfo will tell Flink how to serialize/deserialize it. Finally, the
stateDescriptor contains the necessary
information about the requested state, namely its type (
Reduce, etc) and the necessary information on how
to serialize/deserialize it.
The careful reader will notice that the returned future contains a value of type
S, i.e. a
State object containing
the actual value. This can be any of the state types supported by Flink:
valueState.get(), or iterate over the contained
<K, V>entries, e.g. using the
mapState.entries(), but you cannot modify them. As an example, calling the
add()method on a returned list state will throw an
QueryableStateClient.shutdown()when unused in order to free resources.
The following example extends the
(see Using Managed Keyed State)
by making it queryable and shows how to query this value:
Once used in a job, you can retrieve the job ID and then query any key’s current state from this operator:
The following configuration parameters influence the behaviour of the queryable state server and client.
They are defined in
queryable-state.server.ports: the server port range of the queryable state server. This is useful to avoid port clashes if more than 1 task managers run on the same machine. The specified range can be: a port: “9123”, a range of ports: “50100-50200”, or a list of ranges and or points: “50100-50200,50300-50400,51234”. The default port is 9067.
queryable-state.server.network-threads: number of network (event loop) threads receiving incoming requests for the state server (0 => #slots)
queryable-state.server.query-threads: number of threads handling/serving incoming requests for the state server (0 => #slots).
queryable-state.proxy.ports: the server port range of the queryable state proxy. This is useful to avoid port clashes if more than 1 task managers run on the same machine. The specified range can be: a port: “9123”, a range of ports: “50100-50200”, or a list of ranges and or points: “50100-50200,50300-50400,51234”. The default port is 9069.
queryable-state.proxy.network-threads: number of network (event loop) threads receiving incoming requests for the client proxy (0 => #slots)
queryable-state.proxy.query-threads: number of threads handling/serving incoming requests for the client proxy (0 => #slots).