REST API #
Flink has a monitoring API that can be used to query status and statistics of running jobs, as well as recent completed jobs. This monitoring API is used by Flink’s own dashboard, but is designed to be used also by custom monitoring tools.
The monitoring API is a REST-ful API that accepts HTTP requests and responds with JSON data.
Overview #
The monitoring API is backed by a web server that runs as part of the JobManager. By default, this server listens at port 8081
, which can be configured in flink-conf.yaml
via rest.port
. Note that the monitoring API web server and the web dashboard web server are currently the same and thus run together at the same port. They respond to different HTTP URLs, though.
In the case of multiple JobManagers (for high availability), each JobManager will run its own instance of the monitoring API, which offers information about completed and running job while that JobManager was elected the cluster leader.
Developing #
The REST API backend is in the flink-runtime
project. The core class is org.apache.flink.runtime.webmonitor.WebMonitorEndpoint
, which sets up the server and the request routing.
We use Netty and the Netty Router library to handle REST requests and translate URLs. This choice was made because this combination has lightweight dependencies, and the performance of Netty HTTP is very good.
To add new requests, one needs to
- add a new
MessageHeaders
class which serves as an interface for the new request, - add a new
AbstractRestHandler
class which handles the request according to the addedMessageHeaders
class, - add the handler to
org.apache.flink.runtime.webmonitor.WebMonitorEndpoint#initializeHandlers()
.
A good example is the org.apache.flink.runtime.rest.handler.job.JobExceptionsHandler
that uses the org.apache.flink.runtime.rest.messages.JobExceptionsHeaders
.
API #
The REST API is versioned, with specific versions being queryable by prefixing the url with the version prefix. Prefixes are always of the form v[version_number]
.
For example, to access version 1 of /foo/bar
one would query /v1/foo/bar
.
If no version is specified Flink will default to the oldest version supporting the request.
Querying unsupported/non-existing versions will return a 404 error.
There exist several async operations among these APIs, e.g. trigger savepoint
, rescale a job
. They would return a triggerid
to identify the operation you just POST and then you need to use that triggerid
to query for the status of the operation.
For (stop-with-)savepoint operations you can control this triggerId
by setting it in the body of the request that triggers the operation.
This allow you to safely* retry such operations without triggering multiple savepoints.
The retry is only safe until the async operation store duration has elapsed.
JobManager #
The OpenAPI specification is still experimental.
API reference #
/cluster |
|
Verb: DELETE |
Response code: 200 OK |
Shuts down the cluster | |
/config |
|
Verb: GET |
Response code: 200 OK |
Returns the configuration of the WebUI. | |
/datasets |
|
Verb: GET |
Response code: 200 OK |
Returns all cluster data sets. | |
/datasets/delete/:triggerid |
|
Verb: GET |
Response code: 200 OK |
Returns the status for the delete operation of a cluster data set. | |
Path parameters | |
|
|
/datasets/:datasetid |
|
Verb: DELETE |
Response code: 202 Accepted |
Triggers the deletion of a cluster data set. This async operation would return a 'triggerid' for further query identifier. | |
Path parameters | |
|
|
/jars |
|
Verb: GET |
Response code: 200 OK |
Returns a list of all jars previously uploaded via '/jars/upload'. | |
/jars/upload |
|
Verb: POST |
Response code: 200 OK |
Uploads a jar to the cluster. The jar must be sent as multi-part data. Make sure that the "Content-Type" header is set to "application/x-java-archive", as some http libraries do not add the header by default. Using 'curl' you can upload a jar via 'curl -X POST -H "Expect:" -F "jarfile=@path/to/flink-job.jar" http://hostname:port/jars/upload'. | |
/jars/:jarid |
|
Verb: DELETE |
Response code: 200 OK |
Deletes a jar previously uploaded via '/jars/upload'. | |
Path parameters | |
|
|
/jars/:jarid/plan |
|
Verb: POST |
Response code: 200 OK |
Returns the dataflow plan of a job contained in a jar previously uploaded via '/jars/upload'. Program arguments can be passed both via the JSON request (recommended) or query parameters. | |
Path parameters | |
|
|
Query parameters | |
|
|
/jars/:jarid/run |
|
Verb: POST |
Response code: 200 OK |
Submits a job by running a jar previously uploaded via '/jars/upload'. Program arguments can be passed both via the JSON request (recommended) or query parameters. | |
Path parameters | |
|
|
Query parameters | |
|
|
/jobmanager/config |
|
Verb: GET |
Response code: 200 OK |
Returns the cluster configuration. | |
/jobmanager/environment |
|
Verb: GET |
Response code: 200 OK |
Returns the jobmanager environment. | |
/jobmanager/logs |
|
Verb: GET |
Response code: 200 OK |
Returns the list of log files on the JobManager. | |
/jobmanager/metrics |
|
Verb: GET |
Response code: 200 OK |
Provides access to job manager metrics. | |
Query parameters | |
|
|
/jobmanager/thread-dump |
|
Verb: GET |
Response code: 200 OK |
Returns the thread dump of the JobManager. | |
/jobs |
|
Verb: GET |
Response code: 200 OK |
Returns an overview over all jobs and their current state. | |
/jobs |
|
Verb: POST |
Response code: 202 Accepted |
Submits a job. This call is primarily intended to be used by the Flink client. This call expects a multipart/form-data request that consists of file uploads for the serialized JobGraph, jars and distributed cache artifacts and an attribute named "request" for the JSON payload. | |
/jobs/metrics |
|
Verb: GET |
Response code: 200 OK |
Provides access to aggregated job metrics. | |
Query parameters | |
|
|
/jobs/overview |
|
Verb: GET |
Response code: 200 OK |
Returns an overview over all jobs. | |
/jobs/:jobid |
|
Verb: GET |
Response code: 200 OK |
Returns details of a job. | |
Path parameters | |
|
|
/jobs/:jobid |
|
Verb: PATCH |
Response code: 202 Accepted |
Terminates a job. | |
Path parameters | |
|
|
Query parameters | |
|
|
/jobs/:jobid/accumulators |
|
Verb: GET |
Response code: 200 OK |
Returns the accumulators for all tasks of a job, aggregated across the respective subtasks. | |
Path parameters | |
|
|
Query parameters | |
|
|
/jobs/:jobid/checkpoints |
|
Verb: GET |
Response code: 200 OK |
Returns checkpointing statistics for a job. | |
Path parameters | |
|
|
/jobs/:jobid/checkpoints |
|
Verb: POST |
Response code: 202 Accepted |
Triggers a checkpoint. This async operation would return a 'triggerid' for further query identifier. | |
Path parameters | |
|
|
/jobs/:jobid/checkpoints/config |
|
Verb: GET |
Response code: 200 OK |
Returns the checkpointing configuration. | |
Path parameters | |
|
|
/jobs/:jobid/checkpoints/details/:checkpointid |
|
Verb: GET |
Response code: 200 OK |
Returns details for a checkpoint. | |
Path parameters | |
|
|
/jobs/:jobid/checkpoints/details/:checkpointid/subtasks/:vertexid |
|
Verb: GET |
Response code: 200 OK |
Returns checkpoint statistics for a task and its subtasks. | |
Path parameters | |
|
|
/jobs/:jobid/checkpoints/:triggerid |
|
Verb: GET |
Response code: 200 OK |
Returns the status of a checkpoint trigger operation. | |
Path parameters | |
|
|
/jobs/:jobid/clientHeartbeat |
|
Verb: PATCH |
Response code: 202 Accepted |
Report the jobClient's aliveness. | |
Path parameters | |
|
|
/jobs/:jobid/config |
|
Verb: GET |
Response code: 200 OK |
Returns the configuration of a job. | |
Path parameters | |
|
|
/jobs/:jobid/exceptions |
|
Verb: GET |
Response code: 200 OK |
Returns the most recent exceptions that have been handled by Flink for this job. The 'exceptionHistory.truncated' flag defines whether exceptions were filtered out through the GET parameter. The backend collects only a specific amount of most recent exceptions per job. This can be configured through web.exception-history-size in the Flink configuration. The following first-level members are deprecated: 'root-exception', 'timestamp', 'all-exceptions', and 'truncated'. Use the data provided through 'exceptionHistory', instead. | |
Path parameters | |
|
|
Query parameters | |
|
|
/jobs/:jobid/execution-result |
|
Verb: GET |
Response code: 200 OK |
Returns the result of a job execution. Gives access to the execution time of the job and to all accumulators created by this job. | |
Path parameters | |
|
|
/jobs/:jobid/jobmanager/config |
|
Verb: GET |
Response code: 200 OK |
Returns the jobmanager's configuration of a specific job. | |
Path parameters | |
|
|
/jobs/:jobid/jobmanager/environment |
|
Verb: GET |
Response code: 200 OK |
Returns the jobmanager's environment of a specific job. | |
Path parameters | |
|
|
/jobs/:jobid/jobmanager/log-url |
|
Verb: GET |
Response code: 200 OK |
Returns the log url of jobmanager of a specific job. | |
Path parameters | |
|
|
/jobs/:jobid/metrics |
|
Verb: GET |
Response code: 200 OK |
Provides access to job metrics. | |
Path parameters | |
|
|
Query parameters | |
|
|
/jobs/:jobid/plan |
|
Verb: GET |
Response code: 200 OK |
Returns the dataflow plan of a job. | |
Path parameters | |
|
|
/jobs/:jobid/rescaling |
|
Verb: PATCH |
Response code: 200 OK |
Triggers the rescaling of a job. This async operation would return a 'triggerid' for further query identifier. | |
Path parameters | |
|
|
Query parameters | |
|
|
/jobs/:jobid/rescaling/:triggerid |
|
Verb: GET |
Response code: 200 OK |
Returns the status of a rescaling operation. | |
Path parameters | |
|
|
/jobs/:jobid/savepoints |
|
Verb: POST |
Response code: 202 Accepted |
Triggers a savepoint, and optionally cancels the job afterwards. This async operation would return a 'triggerid' for further query identifier. | |
Path parameters | |
|
|
/jobs/:jobid/savepoints/:triggerid |
|
Verb: GET |
Response code: 200 OK |
Returns the status of a savepoint operation. | |
Path parameters | |
|
|
/jobs/:jobid/status |
|
Verb: GET |
Response code: 200 OK |
Returns the current status of a job execution. | |
Path parameters | |
|
|
/jobs/:jobid/stop |
|
Verb: POST |
Response code: 202 Accepted |
Stops a job with a savepoint. Optionally, it can also emit a MAX_WATERMARK before taking the savepoint to flush out any state waiting for timers to fire. This async operation would return a 'triggerid' for further query identifier. | |
Path parameters | |
|
|
/jobs/:jobid/taskmanagers/:taskmanagerid/log-url |
|
Verb: GET |
Response code: 200 OK |
Returns the log url of jobmanager of a specific job. | |
Path parameters | |
|
|
/jobs/:jobid/vertices/:vertexid |
|
Verb: GET |
Response code: 200 OK |
Returns details for a task, with a summary for each of its subtasks. | |
Path parameters | |
|
|
/jobs/:jobid/vertices/:vertexid/accumulators |
|
Verb: GET |
Response code: 200 OK |
Returns user-defined accumulators of a task, aggregated across all subtasks. | |
Path parameters | |
|
|
/jobs/:jobid/vertices/:vertexid/backpressure |
|
Verb: GET |
Response code: 200 OK |
Returns back-pressure information for a job, and may initiate back-pressure sampling if necessary. | |
Path parameters | |
|
|
/jobs/:jobid/vertices/:vertexid/flamegraph |
|
Verb: GET |
Response code: 200 OK |
Returns flame graph information for a vertex, and may initiate flame graph sampling if necessary. | |
Path parameters | |
|
|
Query parameters | |
|
|
/jobs/:jobid/vertices/:vertexid/metrics |
|
Verb: GET |
Response code: 200 OK |
Provides access to task metrics. | |
Path parameters | |
|
|
Query parameters | |
|
|
/jobs/:jobid/vertices/:vertexid/subtasks/accumulators |
|
Verb: GET |
Response code: 200 OK |
Returns all user-defined accumulators for all subtasks of a task. | |
Path parameters | |
|
|
/jobs/:jobid/vertices/:vertexid/subtasks/metrics |
|
Verb: GET |
Response code: 200 OK |
Provides access to aggregated subtask metrics. | |
Path parameters | |
|
|
Query parameters | |
|
|
/jobs/:jobid/vertices/:vertexid/subtasks/:subtaskindex |
|
Verb: GET |
Response code: 200 OK |
Returns details of the current or latest execution attempt of a subtask. | |
Path parameters | |
|
|
/jobs/:jobid/vertices/:vertexid/subtasks/:subtaskindex/attempts/:attempt |
|
Verb: GET |
Response code: 200 OK |
Returns details of an execution attempt of a subtask. Multiple execution attempts happen in case of failure/recovery. | |
Path parameters | |
|
|
/jobs/:jobid/vertices/:vertexid/subtasks/:subtaskindex/attempts/:attempt/accumulators |
|
Verb: GET |
Response code: 200 OK |
Returns the accumulators of an execution attempt of a subtask. Multiple execution attempts happen in case of failure/recovery. | |
Path parameters | |
|
|
/jobs/:jobid/vertices/:vertexid/subtasks/:subtaskindex/metrics |
|
Verb: GET |
Response code: 200 OK |
Provides access to subtask metrics. | |
Path parameters | |
|
|
Query parameters | |
|
|
/jobs/:jobid/vertices/:vertexid/subtasktimes |
|
Verb: GET |
Response code: 200 OK |
Returns time-related information for all subtasks of a task. | |
Path parameters | |
|
|
/jobs/:jobid/vertices/:vertexid/taskmanagers |
|
Verb: GET |
Response code: 200 OK |
Returns task information aggregated by task manager. | |
Path parameters | |
|
|
/jobs/:jobid/vertices/:vertexid/watermarks |
|
Verb: GET |
Response code: 200 OK |
Returns the watermarks for all subtasks of a task. | |
Path parameters | |
|
|
/overview |
|
Verb: GET |
Response code: 200 OK |
Returns an overview over the Flink cluster. | |
/savepoint-disposal |
|
Verb: POST |
Response code: 200 OK |
Triggers the desposal of a savepoint. This async operation would return a 'triggerid' for further query identifier. | |
/savepoint-disposal/:triggerid |
|
Verb: GET |
Response code: 200 OK |
Returns the status of a savepoint disposal operation. | |
Path parameters | |
|
|
/taskmanagers |
|
Verb: GET |
Response code: 200 OK |
Returns an overview over all task managers. | |
/taskmanagers/metrics |
|
Verb: GET |
Response code: 200 OK |
Provides access to aggregated task manager metrics. | |
Query parameters | |
|
|
/taskmanagers/:taskmanagerid |
|
Verb: GET |
Response code: 200 OK |
Returns details for a task manager. "metrics.memorySegmentsAvailable" and "metrics.memorySegmentsTotal" are deprecated. Please use "metrics.nettyShuffleMemorySegmentsAvailable" and "metrics.nettyShuffleMemorySegmentsTotal" instead. | |
Path parameters | |
|
|
/taskmanagers/:taskmanagerid/logs |
|
Verb: GET |
Response code: 200 OK |
Returns the list of log files on a TaskManager. | |
Path parameters | |
|
|
/taskmanagers/:taskmanagerid/metrics |
|
Verb: GET |
Response code: 200 OK |
Provides access to task manager metrics. | |
Path parameters | |
|
|
Query parameters | |
|
|
/taskmanagers/:taskmanagerid/thread-dump |
|
Verb: GET |
Response code: 200 OK |
Returns the thread dump of the requested TaskManager. | |
Path parameters | |
|
|