In a nutshell, this feature allows users to query Flink’s managed partitioned state (see Working with State) from outside of Flink. For some scenarios, queryable state thus eliminates the need for distributed operations/transactions with external systems such as key-value stores which are often the bottleneck in practice.
In order to make state queryable, the queryable state server first needs to be enabled globally
by setting the query.server.enable
configuration parameter to true
(current default).
Then appropriate state needs to be made queryable by using either
QueryableStateStream
, a convenience object which behaves like a sink and offers incoming values as
queryable state, orStateDescriptor#setQueryable(String queryableStateName)
, which makes the keyed state of an
operator queryable.The following sections explain the use of these two approaches.
A KeyedStream
may offer its values as queryable state by using the following methods:
// ValueState
QueryableStateStream asQueryableState(
String queryableStateName,
ValueStateDescriptor stateDescriptor)
// Shortcut for explicit ValueStateDescriptor variant
QueryableStateStream asQueryableState(String queryableStateName)
// FoldingState
QueryableStateStream asQueryableState(
String queryableStateName,
FoldingStateDescriptor stateDescriptor)
// ReducingState
QueryableStateStream asQueryableState(
String queryableStateName,
ReducingStateDescriptor stateDescriptor)
ListState
sink as it would result in an ever-growing
list which may not be cleaned up and thus will eventually consume too much memory.
A call to these methods returns a QueryableStateStream
, which cannot be further transformed and
currently only holds the name as well as the value and key serializer for the queryable state
stream. It is comparable to a sink, and cannot be followed by further transformations.
Internally a QueryableStateStream
gets translated to an operator which uses all incoming
records to update the queryable state instance.
In a program like the following, all records of the keyed stream will be used to update the state
instance, either via ValueState#update(value)
or AppendingState#add(value)
, depending on
the chosen state variant:
stream.keyBy(0).asQueryableState("query-name")
This acts like the Scala API’s flatMapWithState
.
Managed keyed state of an operator
(see Using Managed Keyed State)
can be made queryable by making the appropriate state descriptor queryable via
StateDescriptor#setQueryable(String queryableStateName)
, as in the example below:
ValueStateDescriptor<Tuple2<Long, Long>> descriptor =
new ValueStateDescriptor<>(
"average", // the state name
TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {}), // type information
Tuple2.of(0L, 0L)); // default value of the state, if nothing was set
descriptor.setQueryable("query-name"); // queryable state name
The QueryableStateClient
helper class may be used for queries against the KvState
instances that
serve the state internally. It needs to be set up with a valid JobManager
address and port and is
created as follows:
final Configuration config = new Configuration();
config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, queryAddress);
config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, queryPort);
final HighAvailabilityServices highAvailabilityServices =
HighAvailabilityServicesUtils.createHighAvailabilityServices(
config,
Executors.newSingleThreadScheduledExecutor(),
HighAvailabilityServicesUtils.AddressResolution.TRY_ADDRESS_RESOLUTION);
QueryableStateClient client = new QueryableStateClient(config, highAvailabilityServices);
The query method is this:
Future<byte[]> getKvState(
JobID jobID,
String queryableStateName,
int keyHashCode,
byte[] serializedKeyAndNamespace)
A call to this method returns a Future
eventually holding the serialized state value for the
queryable state instance identified by queryableStateName
of the job with ID jobID
. The
keyHashCode
is the hash code of the key as returned by Object.hashCode()
and the
serializedKeyAndNamespace
is the serialized key and namespace.
QueryableStateClient#shutdown()
when unused in order to free
resources.
The current implementation is still pretty low-level in the sense that it only works with serialized data both for providing the key/namespace and the returned results. It is the responsibility of the user (or some follow-up utilities) to set up the serializers for this. The nice thing about this is that the query services don’t have to get into the business of worrying about any class loading issues etc.
There are some serialization utils for key/namespace and value serialization included in
KvStateRequestSerializer
.
The following example extends the CountWindowAverage
example
(see Using Managed Keyed State)
by making it queryable and showing how to query this value:
public class CountWindowAverage extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {
private transient ValueState<Tuple2<Long /* count */, Long /* sum */>> sum;
@Override
public void flatMap(Tuple2<Long, Long> input, Collector<Tuple2<Long, Long>> out) throws Exception {
Tuple2<Long, Long> currentSum = sum.value();
currentSum.f0 += 1;
currentSum.f1 += input.f1;
sum.update(currentSum);
if (currentSum.f0 >= 2) {
out.collect(new Tuple2<>(input.f0, currentSum.f1 / currentSum.f0));
sum.clear();
}
}
@Override
public void open(Configuration config) {
ValueStateDescriptor<Tuple2<Long, Long>> descriptor =
new ValueStateDescriptor<>(
"average", // the state name
TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {}), // type information
Tuple2.of(0L, 0L)); // default value of the state, if nothing was set
descriptor.setQueryable("query-name");
sum = getRuntimeContext().getState(descriptor);
}
}
Once used in a job, you can retrieve the job ID and then query any key’s current state from this operator:
final Configuration config = new Configuration();
config.setString(JobManagerOptions.ADDRESS, queryAddress);
config.setInteger(JobManagerOptions.PORT, queryPort);
final HighAvailabilityServices highAvailabilityServices =
HighAvailabilityServicesUtils.createHighAvailabilityServices(
config,
Executors.newSingleThreadScheduledExecutor(),
HighAvailabilityServicesUtils.AddressResolution.TRY_ADDRESS_RESOLUTION);
QueryableStateClient client = new QueryableStateClient(config, highAvailabilityServices);
final TypeSerializer<Long> keySerializer =
TypeInformation.of(new TypeHint<Long>() {}).createSerializer(new ExecutionConfig());
final TypeSerializer<Tuple2<Long, Long>> valueSerializer =
TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {}).createSerializer(new ExecutionConfig());
final byte[] serializedKey =
KvStateRequestSerializer.serializeKeyAndNamespace(
key, keySerializer,
VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE);
Future<byte[]> serializedResult =
client.getKvState(jobId, "query-name", key.hashCode(), serializedKey);
// now wait for the result and return it
final FiniteDuration duration = new FiniteDuration(1, TimeUnit.SECONDS);
byte[] serializedValue = Await.result(serializedResult, duration);
Tuple2<Long, Long> value =
KvStateRequestSerializer.deserializeValue(serializedValue, valueSerializer);
Please use the available Scala extensions when creating the TypeSerializer
instances. Add the following import:
import org.apache.flink.streaming.api.scala._
Now you can create the type serializers as follows:
val keySerializer = createTypeInformation[Long]
.createSerializer(new ExecutionConfig)
If you don’t do this, you can run into mismatches between the serializers used in the Flink job and in your client code, because types like scala.Long
cannot be captured at runtime.
The following configuration parameters influence the behaviour of the queryable state server and client.
They are defined in QueryableStateOptions
.
query.server.enable
: flag to indicate whether to start the queryable state serverquery.server.port
: port to bind to the internal KvStateServer
(0 => pick random available port)query.server.network-threads
: number of network (event loop) threads for the KvStateServer
(0 => #slots)query.server.query-threads
: number of asynchronous query threads for the KvStateServerHandler
(0 => #slots).QueryableStateClient
)query.client.network-threads
: number of network (event loop) threads for the KvStateClient
(0 => number of available cores)query.client.lookup.num-retries
: number of retries on location lookup failuresquery.client.lookup.retry-delay
: retry delay on location lookup failures (millis)