Debugging & Monitoring
Monitoring REST API
Monitoring REST API
Flink has a monitoring API that can be used to query status and statistics of running jobs, as well as recent completed jobs.
This monitoring API is used by Flinkās own dashboard, but is designed to be used also by custom monitoring tools.
The monitoring API is a REST-ful API that accepts HTTP requests and responds with JSON data.
Overview
The monitoring API is backed by a web server that runs as part of the Dispatcher . By default, this server listens at post 8081
, which can be configured in flink-conf.yaml
via rest.port
. Note that the monitoring API web server and the web dashboard web server are currently the same and thus run together at the same port. They respond to different HTTP URLs, though.
In the case of multiple Dispatchers (for high availability), each Dispatcher will run its own instance of the monitoring API, which offers information about completed and running job while that Dispatcher was elected the cluster leader.
Developing
The REST API backend is in the flink-runtime
project. The core class is org.apache.flink.runtime.webmonitor.WebMonitorEndpoint
, which sets up the server and the request routing.
We use Netty and the Netty Router library to handle REST requests and translate URLs. This choice was made because this combination has lightweight dependencies, and the performance of Netty HTTP is very good.
To add new requests, one needs to
add a new MessageHeaders
class which serves as an interface for the new request,
add a new AbstractRestHandler
class which handles the request according to the added MessageHeaders
class,
add the handler to org.apache.flink.runtime.webmonitor.WebMonitorEndpoint#initializeHandlers()
.
A good example is the org.apache.flink.runtime.rest.handler.job.JobExceptionsHandler
that uses the org.apache.flink.runtime.rest.messages.JobExceptionsHeaders
.
Available Requests
Dispatcher
/cluster
Verb: DELETE
Response code: 200 OK
Shuts down the cluster
Request
Response
/config
Verb: GET
Response code: 200 OK
Returns the configuration of the WebUI.
Request
Response
{
"type" : "object",
"id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:DashboardConfiguration",
"properties" : {
"refresh-interval" : {
"type" : "integer"
},
"timezone-name" : {
"type" : "string"
},
"timezone-offset" : {
"type" : "integer"
},
"flink-version" : {
"type" : "string"
},
"flink-revision" : {
"type" : "string"
}
}
}
/jars
Verb: GET
Response code: 200 OK
Returns a list of all jars previously uploaded via '/jars/upload'.
Request
Response
{
"type" : "object",
"id" : "urn:jsonschema:org:apache:flink:runtime:webmonitor:handlers:JarListInfo",
"properties" : {
"address" : {
"type" : "string"
},
"files" : {
"type" : "array",
"items" : {
"type" : "object",
"id" : "urn:jsonschema:org:apache:flink:runtime:webmonitor:handlers:JarListInfo:JarFileInfo",
"properties" : {
"id" : {
"type" : "string"
},
"name" : {
"type" : "string"
},
"uploaded" : {
"type" : "integer"
},
"entry" : {
"type" : "array",
"items" : {
"type" : "object",
"id" : "urn:jsonschema:org:apache:flink:runtime:webmonitor:handlers:JarListInfo:JarEntryInfo",
"properties" : {
"name" : {
"type" : "string"
},
"description" : {
"type" : "string"
}
}
}
}
}
}
}
}
}
/jars/upload
Verb: POST
Response code: 200 OK
Uploads a jar to the cluster. The jar must be sent as multi-part data. Make sure that the "Content-Type" header is set to "application/x-java-archive", as some http libraries do not add the header by default.
Using 'curl' you can upload a jar via 'curl -X POST -H "Expect:" -F "jarfile=@path/to/flink-job.jar" http://hostname:port/jars/upload'.
Request
Response
{
"type" : "object",
"id" : "urn:jsonschema:org:apache:flink:runtime:webmonitor:handlers:JarUploadResponseBody",
"properties" : {
"filename" : {
"type" : "string"
},
"status" : {
"type" : "string",
"enum" : [ "success" ]
}
}
}
/jars/:jarid
Verb: DELETE
Response code: 200 OK
Deletes a jar previously uploaded via '/jars/upload'.
Path parameters
Request
Response
/jars/:jarid/plan
Verb: GET
Response code: 200 OK
Returns the dataflow plan of a job contained in a jar previously uploaded via '/jars/upload'.
Path parameters
Query parameters
entry-class
(optional): description
parallelism
(optional): description
program-args
(optional): description
Request
Response
/jars/:jarid/run
Verb: POST
Response code: 200 OK
Submits a job by running a jar previously uploaded via '/jars/upload'.
Path parameters
Query parameters
program-args
(optional): description
entry-class
(optional): description
parallelism
(optional): description
allowNonRestoredState
(optional): description
savepointPath
(optional): description
Request
{
"type" : "object",
"id" : "urn:jsonschema:org:apache:flink:runtime:webmonitor:handlers:JarRunRequestBody",
"properties" : {
"entryClass" : {
"type" : "string"
},
"programArgs" : {
"type" : "string"
},
"parallelism" : {
"type" : "integer"
},
"allowNonRestoredState" : {
"type" : "boolean"
},
"savepointPath" : {
"type" : "string"
}
}
}
Response
{
"type" : "object",
"id" : "urn:jsonschema:org:apache:flink:runtime:webmonitor:handlers:JarRunResponseBody",
"properties" : {
"jobid" : {
"type" : "any"
}
}
}
/jobmanager/config
Verb: GET
Response code: 200 OK
Returns the cluster configuration.
Request
Response
{
"type" : "array",
"items" : {
"type" : "object",
"id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:ClusterConfigurationInfoEntry",
"properties" : {
"key" : {
"type" : "string"
},
"value" : {
"type" : "string"
}
}
}
}
/jobmanager/metrics
Verb: GET
Response code: 200 OK
Provides access to job manager metrics.
Query parameters
get
(optional): description
Request
Response
/jobs
Verb: GET
Response code: 200 OK
Returns an overview over all jobs and their current state.
Request
Response
{
"type" : "object",
"id" : "urn:jsonschema:org:apache:flink:runtime:messages:webmonitor:JobIdsWithStatusOverview",
"properties" : {
"jobs" : {
"type" : "array",
"items" : {
"type" : "object",
"id" : "urn:jsonschema:org:apache:flink:runtime:messages:webmonitor:JobIdsWithStatusOverview:JobIdWithStatus",
"properties" : {
"id" : {
"type" : "any"
},
"status" : {
"type" : "string",
"enum" : [ "CREATED", "RUNNING", "FAILING", "FAILED", "CANCELLING", "CANCELED", "FINISHED", "RESTARTING", "SUSPENDING", "SUSPENDED", "RECONCILING" ]
}
}
}
}
}
}
/jobs
Verb: POST
Response code: 202 Accepted
Submits a job. This call is primarily intended to be used by the Flink client. This call expects amultipart/form-data request that consists of file uploads for the serialized JobGraph, jars anddistributed cache artifacts and an attribute named "request"for the JSON payload.
Request
{
"type" : "object",
"id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:job:JobSubmitRequestBody",
"properties" : {
"jobGraphFileName" : {
"type" : "string"
},
"jobJarFileNames" : {
"type" : "array",
"items" : {
"type" : "string"
}
}
}
}
Response
{
"type" : "object",
"id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:job:JobSubmitResponseBody",
"properties" : {
"jobUrl" : {
"type" : "string"
}
}
}
/jobs/metrics
Verb: GET
Response code: 200 OK
Provides access to aggregated job metrics.
Query parameters
get
(optional): description
agg
(optional): description
jobs
(optional): description
Request
Response
/jobs/overview
Verb: GET
Response code: 200 OK
Returns an overview over all jobs.
Request
Response
{
"type" : "object",
"id" : "urn:jsonschema:org:apache:flink:runtime:messages:webmonitor:MultipleJobsDetails",
"properties" : {
"jobs" : {
"type" : "array",
"items" : {
"type" : "any"
}
}
}
}
/jobs/:jobid
Verb: GET
Response code: 200 OK
Returns details of a job.
Path parameters
Request
Response
{
"type" : "object",
"id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:job:JobDetailsInfo",
"properties" : {
"jid" : {
"type" : "any"
},
"name" : {
"type" : "string"
},
"isStoppable" : {
"type" : "boolean"
},
"state" : {
"type" : "string",
"enum" : [ "CREATED", "RUNNING", "FAILING", "FAILED", "CANCELLING", "CANCELED", "FINISHED", "RESTARTING", "SUSPENDING", "SUSPENDED", "RECONCILING" ]
},
"start-time" : {
"type" : "integer"
},
"end-time" : {
"type" : "integer"
},
"duration" : {
"type" : "integer"
},
"now" : {
"type" : "integer"
},
"timestamps" : {
"type" : "object",
"additionalProperties" : {
"type" : "integer"
}
},
"vertices" : {
"type" : "array",
"items" : {
"type" : "object",
"id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:job:JobDetailsInfo:JobVertexDetailsInfo",
"properties" : {
"id" : {
"type" : "any"
},
"name" : {
"type" : "string"
},
"parallelism" : {
"type" : "integer"
},
"status" : {
"type" : "string",
"enum" : [ "CREATED", "SCHEDULED", "DEPLOYING", "RUNNING", "FINISHED", "CANCELING", "CANCELED", "FAILED", "RECONCILING" ]
},
"start-time" : {
"type" : "integer"
},
"end-time" : {
"type" : "integer"
},
"duration" : {
"type" : "integer"
},
"tasks" : {
"type" : "object",
"additionalProperties" : {
"type" : "integer"
}
},
"metrics" : {
"type" : "object",
"id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:job:metrics:IOMetricsInfo",
"properties" : {
"read-bytes" : {
"type" : "integer"
},
"read-bytes-complete" : {
"type" : "boolean"
},
"write-bytes" : {
"type" : "integer"
},
"write-bytes-complete" : {
"type" : "boolean"
},
"read-records" : {
"type" : "integer"
},
"read-records-complete" : {
"type" : "boolean"
},
"write-records" : {
"type" : "integer"
},
"write-records-complete" : {
"type" : "boolean"
}
}
}
}
}
},
"status-counts" : {
"type" : "object",
"additionalProperties" : {
"type" : "integer"
}
},
"plan" : {
"type" : "string"
}
}
}
/jobs/:jobid
Verb: PATCH
Response code: 202 Accepted
Terminates a job.
Path parameters
Query parameters
mode
(optional): description
Request
Response
/jobs/:jobid/accumulators
Verb: GET
Response code: 200 OK
Returns the accumulators for all tasks of a job, aggregated across the respective subtasks.
Path parameters
Query parameters
includeSerializedValue
(optional): description
Request
Response
{
"type" : "object",
"id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:JobAccumulatorsInfo",
"properties" : {
"job-accumulators" : {
"type" : "array",
"items" : {
"type" : "any"
}
},
"user-task-accumulators" : {
"type" : "array",
"items" : {
"type" : "object",
"id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:JobAccumulatorsInfo:UserTaskAccumulator",
"properties" : {
"name" : {
"type" : "string"
},
"type" : {
"type" : "string"
},
"value" : {
"type" : "string"
}
}
}
},
"serialized-user-task-accumulators" : {
"type" : "object",
"additionalProperties" : {
"type" : "any"
}
}
}
}
/jobs/:jobid/checkpoints
Verb: GET
Response code: 200 OK
Returns checkpointing statistics for a job.
Path parameters
Request
Response
{
"type" : "object",
"id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:checkpoints:CheckpointingStatistics",
"properties" : {
"counts" : {
"type" : "object",
"id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:checkpoints:CheckpointingStatistics:Counts",
"properties" : {
"restored" : {
"type" : "integer"
},
"total" : {
"type" : "integer"
},
"in_progress" : {
"type" : "integer"
},
"completed" : {
"type" : "integer"
},
"failed" : {
"type" : "integer"
}
}
},
"summary" : {
"type" : "object",
"id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:checkpoints:CheckpointingStatistics:Summary",
"properties" : {
"state_size" : {
"type" : "object",
"id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:checkpoints:MinMaxAvgStatistics",
"properties" : {
"min" : {
"type" : "integer"
},
"max" : {
"type" : "integer"
},
"avg" : {
"type" : "integer"
}
}
},
"end_to_end_duration" : {
"type" : "object",
"$ref" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:checkpoints:MinMaxAvgStatistics"
},
"alignment_buffered" : {
"type" : "object",
"$ref" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:checkpoints:MinMaxAvgStatistics"
}
}
},
"latest" : {
"type" : "object",
"id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:checkpoints:CheckpointingStatistics:LatestCheckpoints",
"properties" : {
"completed" : {
"type" : "object",
"id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:checkpoints:CheckpointStatistics:CompletedCheckpointStatistics",
"properties" : {
"id" : {
"type" : "integer"
},
"status" : {
"type" : "string",
"enum" : [ "IN_PROGRESS", "COMPLETED", "FAILED" ]
},
"is_savepoint" : {
"type" : "boolean"
},
"trigger_timestamp" : {
"type" : "integer"
},
"latest_ack_timestamp" : {
"type" : "integer"
},
"state_size" : {
"type" : "integer"
},
"end_to_end_duration" : {
"type" : "integer"
},
"alignment_buffered" : {
"type" : "integer"
},
"num_subtasks" : {
"type" : "integer"
},
"num_acknowledged_subtasks" : {
"type" : "integer"
},
"tasks" : {
"type" : "object",
"additionalProperties" : {
"type" : "object",
"id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:checkpoints:TaskCheckpointStatistics",
"properties" : {
"id" : {
"type" : "integer"
},
"status" : {
"type" : "string",
"enum" : [ "IN_PROGRESS", "COMPLETED", "FAILED" ]
},
"latest_ack_timestamp" : {
"type" : "integer"
},
"state_size" : {
"type" : "integer"
},
"end_to_end_duration" : {
"type" : "integer"
},
"alignment_buffered" : {
"type" : "integer"
},
"num_subtasks" : {
"type" : "integer"
},
"num_acknowledged_subtasks" : {
"type" : "integer"
}
}
}
},
"external_path" : {
"type" : "string"
},
"discarded" : {
"type" : "boolean"
}
}
},
"savepoint" : {
"type" : "object",
"$ref" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:checkpoints:CheckpointStatistics:CompletedCheckpointStatistics"
},
"failed" : {
"type" : "object",
"id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:checkpoints:CheckpointStatistics:FailedCheckpointStatistics",
"properties" : {
"id" : {
"type" : "integer"
},
"status" : {
"type" : "string",
"enum" : [ "IN_PROGRESS", "COMPLETED", "FAILED" ]
},
"is_savepoint" : {
"type" : "boolean"
},
"trigger_timestamp" : {
"type" : "integer"
},
"latest_ack_timestamp" : {
"type" : "integer"
},
"state_size" : {
"type" : "integer"
},
"end_to_end_duration" : {
"type" : "integer"
},
"alignment_buffered" : {
"type" : "integer"
},
"num_subtasks" : {
"type" : "integer"
},
"num_acknowledged_subtasks" : {
"type" : "integer"
},
"tasks" : {
"type" : "object",
"additionalProperties" : {
"type" : "object",
"$ref" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:checkpoints:TaskCheckpointStatistics"
}
},
"failure_timestamp" : {
"type" : "integer"
},
"failure_message" : {
"type" : "string"
}
}
},
"restored" : {
"type" : "object",
"id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:checkpoints:CheckpointingStatistics:RestoredCheckpointStatistics",
"properties" : {
"id" : {
"type" : "integer"
},
"restore_timestamp" : {
"type" : "integer"
},
"is_savepoint" : {
"type" : "boolean"
},
"external_path" : {
"type" : "string"
}
}
}
}
},
"history" : {
"type" : "array",
"items" : {
"type" : "object",
"id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:checkpoints:CheckpointStatistics",
"properties" : {
"id" : {
"type" : "integer"
},
"status" : {
"type" : "string",
"enum" : [ "IN_PROGRESS", "COMPLETED", "FAILED" ]
},
"is_savepoint" : {
"type" : "boolean"
},
"trigger_timestamp" : {
"type" : "integer"
},
"latest_ack_timestamp" : {
"type" : "integer"
},
"state_size" : {
"type" : "integer"
},
"end_to_end_duration" : {
"type" : "integer"
},
"alignment_buffered" : {
"type" : "integer"
},
"num_subtasks" : {
"type" : "integer"
},
"num_acknowledged_subtasks" : {
"type" : "integer"
},
"tasks" : {
"type" : "object",
"additionalProperties" : {
"type" : "object",
"$ref" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:checkpoints:TaskCheckpointStatistics"
}
}
}
}
}
}
}
/jobs/:jobid/checkpoints/config
Verb: GET
Response code: 200 OK
Returns the checkpointing configuration.
Path parameters
Request
Response
{
"type" : "object",
"id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:checkpoints:CheckpointConfigInfo",
"properties" : {
"mode" : {
"type" : "any"
},
"interval" : {
"type" : "integer"
},
"timeout" : {
"type" : "integer"
},
"min_pause" : {
"type" : "integer"
},
"max_concurrent" : {
"type" : "integer"
},
"externalization" : {
"type" : "object",
"id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:checkpoints:CheckpointConfigInfo:ExternalizedCheckpointInfo",
"properties" : {
"enabled" : {
"type" : "boolean"
},
"delete_on_cancellation" : {
"type" : "boolean"
}
}
}
}
}
/jobs/:jobid/checkpoints/details/:checkpointid
Verb: GET
Response code: 200 OK
Returns details for a checkpoint.
Path parameters
jobid
- description
checkpointid
- description
Request
Response
{
"type" : "object",
"id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:checkpoints:CheckpointStatistics",
"properties" : {
"id" : {
"type" : "integer"
},
"status" : {
"type" : "string",
"enum" : [ "IN_PROGRESS", "COMPLETED", "FAILED" ]
},
"is_savepoint" : {
"type" : "boolean"
},
"trigger_timestamp" : {
"type" : "integer"
},
"latest_ack_timestamp" : {
"type" : "integer"
},
"state_size" : {
"type" : "integer"
},
"end_to_end_duration" : {
"type" : "integer"
},
"alignment_buffered" : {
"type" : "integer"
},
"num_subtasks" : {
"type" : "integer"
},
"num_acknowledged_subtasks" : {
"type" : "integer"
},
"tasks" : {
"type" : "object",
"additionalProperties" : {
"type" : "object",
"id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:checkpoints:TaskCheckpointStatistics",
"properties" : {
"id" : {
"type" : "integer"
},
"status" : {
"type" : "string",
"enum" : [ "IN_PROGRESS", "COMPLETED", "FAILED" ]
},
"latest_ack_timestamp" : {
"type" : "integer"
},
"state_size" : {
"type" : "integer"
},
"end_to_end_duration" : {
"type" : "integer"
},
"alignment_buffered" : {
"type" : "integer"
},
"num_subtasks" : {
"type" : "integer"
},
"num_acknowledged_subtasks" : {
"type" : "integer"
}
}
}
}
}
}
/jobs/:jobid/checkpoints/details/:checkpointid/subtasks/:vertexid
Verb: GET
Response code: 200 OK
Returns checkpoint statistics for a task and its subtasks.
Path parameters
jobid
- description
checkpointid
- description
vertexid
- description
Request
Response
{
"type" : "object",
"id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:checkpoints:TaskCheckpointStatisticsWithSubtaskDetails",
"properties" : {
"id" : {
"type" : "integer"
},
"status" : {
"type" : "string",
"enum" : [ "IN_PROGRESS", "COMPLETED", "FAILED" ]
},
"latest_ack_timestamp" : {
"type" : "integer"
},
"state_size" : {
"type" : "integer"
},
"end_to_end_duration" : {
"type" : "integer"
},
"alignment_buffered" : {
"type" : "integer"
},
"num_subtasks" : {
"type" : "integer"
},
"num_acknowledged_subtasks" : {
"type" : "integer"
},
"summary" : {
"type" : "object",
"id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:checkpoints:TaskCheckpointStatisticsWithSubtaskDetails:Summary",
"properties" : {
"state_size" : {
"type" : "object",
"id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:checkpoints:MinMaxAvgStatistics",
"properties" : {
"min" : {
"type" : "integer"
},
"max" : {
"type" : "integer"
},
"avg" : {
"type" : "integer"
}
}
},
"end_to_end_duration" : {
"type" : "object",
"$ref" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:checkpoints:MinMaxAvgStatistics"
},
"checkpoint_duration" : {
"type" : "object",
"id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:checkpoints:TaskCheckpointStatisticsWithSubtaskDetails:CheckpointDuration",
"properties" : {
"sync" : {
"type" : "object",
"$ref" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:checkpoints:MinMaxAvgStatistics"
},
"async" : {
"type" : "object",
"$ref" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:checkpoints:MinMaxAvgStatistics"
}
}
},
"alignment" : {
"type" : "object",
"id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:checkpoints:TaskCheckpointStatisticsWithSubtaskDetails:CheckpointAlignment",
"properties" : {
"buffered" : {
"type" : "object",
"$ref" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:checkpoints:MinMaxAvgStatistics"
},
"duration" : {
"type" : "object",
"$ref" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:checkpoints:MinMaxAvgStatistics"
}
}
}
}
},
"subtasks" : {
"type" : "array",
"items" : {
"type" : "object",
"id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:checkpoints:SubtaskCheckpointStatistics",
"properties" : {
"index" : {
"type" : "integer"
},
"status" : {
"type" : "string"
}
}
}
}
}
}
/jobs/:jobid/config
Verb: GET
Response code: 200 OK
Returns the configuration of a job.
Path parameters
Request
Response
/jobs/:jobid/exceptions
Verb: GET
Response code: 200 OK
Returns the non-recoverable exceptions that have been observed by the job. The truncated flag defines whether more exceptions occurred, but are not listed, because the response would otherwise get too big.
Path parameters
Request
Response
{
"type" : "object",
"id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:JobExceptionsInfo",
"properties" : {
"root-exception" : {
"type" : "string"
},
"timestamp" : {
"type" : "integer"
},
"all-exceptions" : {
"type" : "array",
"items" : {
"type" : "object",
"id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:JobExceptionsInfo:ExecutionExceptionInfo",
"properties" : {
"exception" : {
"type" : "string"
},
"task" : {
"type" : "string"
},
"location" : {
"type" : "string"
},
"timestamp" : {
"type" : "integer"
}
}
}
},
"truncated" : {
"type" : "boolean"
}
}
}
/jobs/:jobid/execution-result
Verb: GET
Response code: 200 OK
Returns the result of a job execution. Gives access to the execution time of the job and to all accumulators created by this job.
Path parameters
Request
Response
{
"type" : "object",
"id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:job:JobExecutionResultResponseBody",
"properties" : {
"status" : {
"type" : "object",
"id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:queue:QueueStatus",
"required" : true,
"properties" : {
"id" : {
"type" : "string",
"required" : true,
"enum" : [ "IN_PROGRESS", "COMPLETED" ]
}
}
},
"job-execution-result" : {
"type" : "any"
}
}
}
/jobs/:jobid/metrics
Verb: GET
Response code: 200 OK
Provides access to job metrics.
Path parameters
Query parameters
get
(optional): description
Request
Response
/jobs/:jobid/plan
Verb: GET
Response code: 200 OK
Returns the dataflow plan of a job.
Path parameters
Request
Response
/jobs/:jobid/rescaling
Verb: PATCH
Response code: 200 OK
Triggers the rescaling of a job.
Path parameters
Query parameters
parallelism
(mandatory): description
Request
Response
{
"type" : "object",
"id" : "urn:jsonschema:org:apache:flink:runtime:rest:handler:async:TriggerResponse",
"properties" : {
"request-id" : {
"type" : "any"
}
}
}
/jobs/:jobid/rescaling/:triggerid
Verb: GET
Response code: 200 OK
Returns the status of a rescaling operation.
Path parameters
jobid
- description
triggerid
- description
Request
Response
{
"type" : "object",
"id" : "urn:jsonschema:org:apache:flink:runtime:rest:handler:async:AsynchronousOperationResult",
"properties" : {
"status" : {
"type" : "object",
"id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:queue:QueueStatus",
"properties" : {
"id" : {
"type" : "string",
"required" : true,
"enum" : [ "IN_PROGRESS", "COMPLETED" ]
}
}
},
"operation" : {
"type" : "any"
}
}
}
/jobs/:jobid/savepoints
Verb: POST
Response code: 202 Accepted
Triggers a savepoint, and optionally cancels the job afterwards.
Path parameters
Request
{
"type" : "object",
"id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:job:savepoints:SavepointTriggerRequestBody",
"properties" : {
"target-directory" : {
"type" : "string"
},
"cancel-job" : {
"type" : "boolean"
}
}
}
Response
{
"type" : "object",
"id" : "urn:jsonschema:org:apache:flink:runtime:rest:handler:async:TriggerResponse",
"properties" : {
"request-id" : {
"type" : "any"
}
}
}
/jobs/:jobid/savepoints/:triggerid
Verb: GET
Response code: 200 OK
Returns the status of a savepoint operation.
Path parameters
jobid
- description
triggerid
- description
Request
Response
{
"type" : "object",
"id" : "urn:jsonschema:org:apache:flink:runtime:rest:handler:async:AsynchronousOperationResult",
"properties" : {
"status" : {
"type" : "object",
"id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:queue:QueueStatus",
"properties" : {
"id" : {
"type" : "string",
"required" : true,
"enum" : [ "IN_PROGRESS", "COMPLETED" ]
}
}
},
"operation" : {
"type" : "any"
}
}
}
/jobs/:jobid/vertices/:vertexid
Verb: GET
Response code: 200 OK
Returns details for a task, with a summary for each of its subtasks.
Path parameters
jobid
- description
vertexid
- description
Request
Response
{
"type" : "object",
"id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:JobVertexDetailsInfo",
"properties" : {
"id" : {
"type" : "any"
},
"name" : {
"type" : "string"
},
"parallelism" : {
"type" : "integer"
},
"now" : {
"type" : "integer"
},
"subtasks" : {
"type" : "array",
"items" : {
"type" : "object",
"id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:JobVertexDetailsInfo:VertexTaskDetail",
"properties" : {
"subtask" : {
"type" : "integer"
},
"status" : {
"type" : "string",
"enum" : [ "CREATED", "SCHEDULED", "DEPLOYING", "RUNNING", "FINISHED", "CANCELING", "CANCELED", "FAILED", "RECONCILING" ]
},
"attempt" : {
"type" : "integer"
},
"host" : {
"type" : "string"
},
"start_time" : {
"type" : "integer"
},
"end-time" : {
"type" : "integer"
},
"duration" : {
"type" : "integer"
},
"metrics" : {
"type" : "object",
"id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:job:metrics:IOMetricsInfo",
"properties" : {
"read-bytes" : {
"type" : "integer"
},
"read-bytes-complete" : {
"type" : "boolean"
},
"write-bytes" : {
"type" : "integer"
},
"write-bytes-complete" : {
"type" : "boolean"
},
"read-records" : {
"type" : "integer"
},
"read-records-complete" : {
"type" : "boolean"
},
"write-records" : {
"type" : "integer"
},
"write-records-complete" : {
"type" : "boolean"
}
}
}
}
}
}
}
}
/jobs/:jobid/vertices/:vertexid/accumulators
Verb: GET
Response code: 200 OK
Returns user-defined accumulators of a task, aggregated across all subtasks.
Path parameters
jobid
- description
vertexid
- description
Request
Response
{
"type" : "object",
"id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:JobVertexAccumulatorsInfo",
"properties" : {
"id" : {
"type" : "string"
},
"user-accumulators" : {
"type" : "array",
"items" : {
"type" : "object",
"id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:job:UserAccumulator",
"properties" : {
"name" : {
"type" : "string"
},
"type" : {
"type" : "string"
},
"value" : {
"type" : "string"
}
}
}
}
}
}
/jobs/:jobid/vertices/:vertexid/backpressure
Verb: GET
Response code: 200 OK
Returns back-pressure information for a job, and may initiate back-pressure sampling if necessary.
Path parameters
jobid
- description
vertexid
- description
Request
Response
{
"type" : "object",
"id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:JobVertexBackPressureInfo",
"properties" : {
"status" : {
"type" : "string",
"enum" : [ "deprecated", "ok" ]
},
"backpressure-level" : {
"type" : "string",
"enum" : [ "ok", "low", "high" ]
},
"end-timestamp" : {
"type" : "integer"
},
"subtasks" : {
"type" : "array",
"items" : {
"type" : "object",
"id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:JobVertexBackPressureInfo:SubtaskBackPressureInfo",
"properties" : {
"subtask" : {
"type" : "integer"
},
"backpressure-level" : {
"type" : "string",
"enum" : [ "ok", "low", "high" ]
},
"ratio" : {
"type" : "number"
}
}
}
}
}
}
/jobs/:jobid/vertices/:vertexid/metrics
Verb: GET
Response code: 200 OK
Provides access to task metrics.
Path parameters
jobid
- description
vertexid
- description
Query parameters
get
(optional): description
Request
Response
/jobs/:jobid/vertices/:vertexid/subtasks/accumulators
Verb: GET
Response code: 200 OK
Returns all user-defined accumulators for all subtasks of a task.
Path parameters
jobid
- description
vertexid
- description
Request
Response
{
"type" : "object",
"id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:job:SubtasksAllAccumulatorsInfo",
"properties" : {
"id" : {
"type" : "any"
},
"parallelism" : {
"type" : "integer"
},
"subtasks" : {
"type" : "array",
"items" : {
"type" : "object",
"id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:job:SubtasksAllAccumulatorsInfo:SubtaskAccumulatorsInfo",
"properties" : {
"subtask" : {
"type" : "integer"
},
"attempt" : {
"type" : "integer"
},
"host" : {
"type" : "string"
},
"user-accumulators" : {
"type" : "array",
"items" : {
"type" : "object",
"id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:job:UserAccumulator",
"properties" : {
"name" : {
"type" : "string"
},
"type" : {
"type" : "string"
},
"value" : {
"type" : "string"
}
}
}
}
}
}
}
}
}
/jobs/:jobid/vertices/:vertexid/subtasks/metrics
Verb: GET
Response code: 200 OK
Provides access to aggregated subtask metrics.
Path parameters
jobid
- description
vertexid
- description
Query parameters
get
(optional): description
agg
(optional): description
subtasks
(optional): description
Request
Response
/jobs/:jobid/vertices/:vertexid/subtasks/:subtaskindex
Verb: GET
Response code: 200 OK
Returns details of the current or latest execution attempt of a subtask.
Path parameters
jobid
- description
vertexid
- description
subtaskindex
- description
Request
Response
{
"type" : "object",
"id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:job:SubtaskExecutionAttemptDetailsInfo",
"properties" : {
"subtask" : {
"type" : "integer"
},
"status" : {
"type" : "string",
"enum" : [ "CREATED", "SCHEDULED", "DEPLOYING", "RUNNING", "FINISHED", "CANCELING", "CANCELED", "FAILED", "RECONCILING" ]
},
"attempt" : {
"type" : "integer"
},
"host" : {
"type" : "string"
},
"start-time" : {
"type" : "integer"
},
"end-time" : {
"type" : "integer"
},
"duration" : {
"type" : "integer"
},
"metrics" : {
"type" : "object",
"id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:job:metrics:IOMetricsInfo",
"properties" : {
"read-bytes" : {
"type" : "integer"
},
"read-bytes-complete" : {
"type" : "boolean"
},
"write-bytes" : {
"type" : "integer"
},
"write-bytes-complete" : {
"type" : "boolean"
},
"read-records" : {
"type" : "integer"
},
"read-records-complete" : {
"type" : "boolean"
},
"write-records" : {
"type" : "integer"
},
"write-records-complete" : {
"type" : "boolean"
}
}
}
}
}
/jobs/:jobid/vertices/:vertexid/subtasks/:subtaskindex/attempts/:attempt
Verb: GET
Response code: 200 OK
Returns details of an execution attempt of a subtask. Multiple execution attempts happen in case of failure/recovery.
Path parameters
jobid
- description
vertexid
- description
subtaskindex
- description
attempt
- description
Request
Response
{
"type" : "object",
"id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:job:SubtaskExecutionAttemptDetailsInfo",
"properties" : {
"subtask" : {
"type" : "integer"
},
"status" : {
"type" : "string",
"enum" : [ "CREATED", "SCHEDULED", "DEPLOYING", "RUNNING", "FINISHED", "CANCELING", "CANCELED", "FAILED", "RECONCILING" ]
},
"attempt" : {
"type" : "integer"
},
"host" : {
"type" : "string"
},
"start-time" : {
"type" : "integer"
},
"end-time" : {
"type" : "integer"
},
"duration" : {
"type" : "integer"
},
"metrics" : {
"type" : "object",
"id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:job:metrics:IOMetricsInfo",
"properties" : {
"read-bytes" : {
"type" : "integer"
},
"read-bytes-complete" : {
"type" : "boolean"
},
"write-bytes" : {
"type" : "integer"
},
"write-bytes-complete" : {
"type" : "boolean"
},
"read-records" : {
"type" : "integer"
},
"read-records-complete" : {
"type" : "boolean"
},
"write-records" : {
"type" : "integer"
},
"write-records-complete" : {
"type" : "boolean"
}
}
}
}
}
/jobs/:jobid/vertices/:vertexid/subtasks/:subtaskindex/attempts/:attempt/accumulators
Verb: GET
Response code: 200 OK
Returns the accumulators of an execution attempt of a subtask. Multiple execution attempts happen in case of failure/recovery.
Path parameters
jobid
- description
vertexid
- description
subtaskindex
- description
attempt
- description
Request
Response
{
"type" : "object",
"id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:job:SubtaskExecutionAttemptAccumulatorsInfo",
"properties" : {
"subtask" : {
"type" : "integer"
},
"attempt" : {
"type" : "integer"
},
"id" : {
"type" : "string"
},
"user-accumulators" : {
"type" : "array",
"items" : {
"type" : "object",
"id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:job:UserAccumulator",
"properties" : {
"name" : {
"type" : "string"
},
"type" : {
"type" : "string"
},
"value" : {
"type" : "string"
}
}
}
}
}
}
/jobs/:jobid/vertices/:vertexid/subtasks/:subtaskindex/metrics
Verb: GET
Response code: 200 OK
Provides access to subtask metrics.
Path parameters
jobid
- description
vertexid
- description
subtaskindex
- description
Query parameters
get
(optional): description
Request
Response
/jobs/:jobid/vertices/:vertexid/subtasktimes
Verb: GET
Response code: 200 OK
Returns time-related information for all subtasks of a task.
Path parameters
jobid
- description
vertexid
- description
Request
Response
{
"type" : "object",
"id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:SubtasksTimesInfo",
"properties" : {
"id" : {
"type" : "string"
},
"name" : {
"type" : "string"
},
"now" : {
"type" : "integer"
},
"subtasks" : {
"type" : "array",
"items" : {
"type" : "object",
"id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:SubtasksTimesInfo:SubtaskTimeInfo",
"properties" : {
"subtask" : {
"type" : "integer"
},
"host" : {
"type" : "string"
},
"duration" : {
"type" : "integer"
},
"timestamps" : {
"type" : "object",
"additionalProperties" : {
"type" : "integer"
}
}
}
}
}
}
}
/jobs/:jobid/vertices/:vertexid/taskmanagers
Verb: GET
Response code: 200 OK
Returns task information aggregated by task manager.
Path parameters
jobid
- description
vertexid
- description
Request
Response
{
"type" : "object",
"id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:JobVertexTaskManagersInfo",
"properties" : {
"id" : {
"type" : "any"
},
"name" : {
"type" : "string"
},
"now" : {
"type" : "integer"
},
"taskmanagers" : {
"type" : "array",
"items" : {
"type" : "object",
"id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:JobVertexTaskManagersInfo:TaskManagersInfo",
"properties" : {
"host" : {
"type" : "string"
},
"status" : {
"type" : "string",
"enum" : [ "CREATED", "SCHEDULED", "DEPLOYING", "RUNNING", "FINISHED", "CANCELING", "CANCELED", "FAILED", "RECONCILING" ]
},
"start-time" : {
"type" : "integer"
},
"end-time" : {
"type" : "integer"
},
"duration" : {
"type" : "integer"
},
"metrics" : {
"type" : "object",
"id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:job:metrics:IOMetricsInfo",
"properties" : {
"read-bytes" : {
"type" : "integer"
},
"read-bytes-complete" : {
"type" : "boolean"
},
"write-bytes" : {
"type" : "integer"
},
"write-bytes-complete" : {
"type" : "boolean"
},
"read-records" : {
"type" : "integer"
},
"read-records-complete" : {
"type" : "boolean"
},
"write-records" : {
"type" : "integer"
},
"write-records-complete" : {
"type" : "boolean"
}
}
},
"status-counts" : {
"type" : "object",
"additionalProperties" : {
"type" : "integer"
}
}
}
}
}
}
}
/overview
Verb: GET
Response code: 200 OK
Returns an overview over the Flink cluster.
Request
Response
{
"type" : "object",
"id" : "urn:jsonschema:org:apache:flink:runtime:rest:handler:legacy:messages:ClusterOverviewWithVersion",
"properties" : {
"taskmanagers" : {
"type" : "integer"
},
"slots-total" : {
"type" : "integer"
},
"slots-available" : {
"type" : "integer"
},
"jobs-running" : {
"type" : "integer"
},
"jobs-finished" : {
"type" : "integer"
},
"jobs-cancelled" : {
"type" : "integer"
},
"jobs-failed" : {
"type" : "integer"
},
"flink-version" : {
"type" : "string"
},
"flink-commit" : {
"type" : "string"
}
}
}
/savepoint-disposal
Verb: POST
Response code: 200 OK
Triggers the desposal of a savepoint.
Request
{
"type" : "object",
"id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:job:savepoints:SavepointDisposalRequest",
"properties" : {
"savepoint-path" : {
"type" : "string"
}
}
}
Response
{
"type" : "object",
"id" : "urn:jsonschema:org:apache:flink:runtime:rest:handler:async:TriggerResponse",
"properties" : {
"request-id" : {
"type" : "any"
}
}
}
/savepoint-disposal/:triggerid
Verb: GET
Response code: 200 OK
Returns the status of a savepoint disposal operation.
Path parameters
Request
Response
{
"type" : "object",
"id" : "urn:jsonschema:org:apache:flink:runtime:rest:handler:async:AsynchronousOperationResult",
"properties" : {
"status" : {
"type" : "object",
"id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:queue:QueueStatus",
"properties" : {
"id" : {
"type" : "string",
"required" : true,
"enum" : [ "IN_PROGRESS", "COMPLETED" ]
}
}
},
"operation" : {
"type" : "any"
}
}
}
/taskmanagers
Verb: GET
Response code: 200 OK
Returns an overview over all task managers.
Request
Response
{
"type" : "object",
"id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:taskmanager:TaskManagersInfo",
"properties" : {
"taskmanagers" : {
"type" : "array",
"items" : {
"type" : "object",
"id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:taskmanager:TaskManagerInfo",
"properties" : {
"id" : {
"type" : "any"
},
"path" : {
"type" : "string"
},
"dataPort" : {
"type" : "integer"
},
"timeSinceLastHeartbeat" : {
"type" : "integer"
},
"slotsNumber" : {
"type" : "integer"
},
"freeSlots" : {
"type" : "integer"
},
"hardware" : {
"type" : "object",
"id" : "urn:jsonschema:org:apache:flink:runtime:instance:HardwareDescription",
"properties" : {
"cpuCores" : {
"type" : "integer"
},
"physicalMemory" : {
"type" : "integer"
},
"freeMemory" : {
"type" : "integer"
},
"managedMemory" : {
"type" : "integer"
}
}
}
}
}
}
}
}
/taskmanagers/metrics
Verb: GET
Response code: 200 OK
Provides access to aggregated task manager metrics.
Query parameters
get
(optional): description
agg
(optional): description
taskmanagers
(optional): description
Request
Response
/taskmanagers/:taskmanagerid
Verb: GET
Response code: 200 OK
Returns details for a task manager.
Path parameters
taskmanagerid
- description
Request
Response
{
"type" : "object",
"id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:taskmanager:TaskManagerDetailsInfo",
"properties" : {
"id" : {
"type" : "any"
},
"path" : {
"type" : "string"
},
"dataPort" : {
"type" : "integer"
},
"timeSinceLastHeartbeat" : {
"type" : "integer"
},
"slotsNumber" : {
"type" : "integer"
},
"freeSlots" : {
"type" : "integer"
},
"hardware" : {
"type" : "object",
"id" : "urn:jsonschema:org:apache:flink:runtime:instance:HardwareDescription",
"properties" : {
"cpuCores" : {
"type" : "integer"
},
"physicalMemory" : {
"type" : "integer"
},
"freeMemory" : {
"type" : "integer"
},
"managedMemory" : {
"type" : "integer"
}
}
},
"metrics" : {
"type" : "object",
"id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:taskmanager:TaskManagerMetricsInfo",
"properties" : {
"heapUsed" : {
"type" : "integer"
},
"heapCommitted" : {
"type" : "integer"
},
"heapMax" : {
"type" : "integer"
},
"nonHeapUsed" : {
"type" : "integer"
},
"nonHeapCommitted" : {
"type" : "integer"
},
"nonHeapMax" : {
"type" : "integer"
},
"directCount" : {
"type" : "integer"
},
"directUsed" : {
"type" : "integer"
},
"directMax" : {
"type" : "integer"
},
"mappedCount" : {
"type" : "integer"
},
"mappedUsed" : {
"type" : "integer"
},
"mappedMax" : {
"type" : "integer"
},
"memorySegmentsAvailable" : {
"type" : "integer"
},
"memorySegmentsTotal" : {
"type" : "integer"
},
"garbageCollectors" : {
"type" : "array",
"items" : {
"type" : "object",
"id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:taskmanager:TaskManagerMetricsInfo:GarbageCollectorInfo",
"properties" : {
"name" : {
"type" : "string"
},
"count" : {
"type" : "integer"
},
"time" : {
"type" : "integer"
}
}
}
}
}
}
}
}
/taskmanagers/:taskmanagerid/metrics
Verb: GET
Response code: 200 OK
Provides access to task manager metrics.
Path parameters
taskmanagerid
- description
Query parameters
get
(optional): description
Request
Response
Legacy
This section is only relevant if the cluster runs in legacy mode .
Below is a list of available requests, with a sample JSON response. All requests are of the sample form http://hostname:8081/jobs
, below we list only the path part of the URLs.
Values in angle brackets are variables, for example http://hostname:8081/jobs/<jobid>/exceptions
will have to requested for example as http://hostname:8081/jobs/7684be6004e4e955c2a558a9bc463f65/exceptions
.
/config
/overview
/jobs/overview
/jobs/<jobid>
/jobs/<jobid>/vertices
/jobs/<jobid>/config
/jobs/<jobid>/exceptions
/jobs/<jobid>/accumulators
/jobs/<jobid>/vertices/<vertexid>
/jobs/<jobid>/vertices/<vertexid>/subtasktimes
/jobs/<jobid>/vertices/<vertexid>/taskmanagers
/jobs/<jobid>/vertices/<vertexid>/accumulators
/jobs/<jobid>/vertices/<vertexid>/subtasks/accumulators
/jobs/<jobid>/vertices/<vertexid>/subtasks/<subtasknum>
/jobs/<jobid>/vertices/<vertexid>/subtasks/<subtasknum>/attempts/<attempt>
/jobs/<jobid>/vertices/<vertexid>/subtasks/<subtasknum>/attempts/<attempt>/accumulators
/jobs/<jobid>/plan
/jars/upload
/jars
/jars/:jarid
/jars/:jarid/plan
/jars/:jarid/run
General
/config
Some information about the monitoring API and the server setup.
Sample Result:
{
"refresh-interval" : 3000 ,
"timezone-offset" : 3600000 ,
"timezone-name" : "Central European Time" ,
"flink-version" : "1.5.4" ,
"flink-revision" : "8124545 @ 16.09.2015 @ 15:38:42 CEST"
}
/overview
Simple summary of the Flink cluster status.
Sample Result:
{
"taskmanagers" : 17 ,
"slots-total" : 68 ,
"slots-available" : 68 ,
"jobs-running" : 0 ,
"jobs-finished" : 3 ,
"jobs-cancelled" : 1 ,
"jobs-failed" : 0
}
Overview of Jobs
/jobs/overview
Jobs, grouped by status, each with a small summary of its status.
Sample Result:
{
"jobs" :[
{
"jid" : "7684be6004e4e955c2a558a9bc463f65" ,
"name" : "Flink Java Job at Wed Sep 16 18:08:21 CEST 2015" ,
"state" : "FINISHED" ,
"start-time" : 1442419702857 ,
"end-time" : 1442419975312 ,
"duration" : 272455 ,
"last-modification" : 1442419975312 ,
"tasks" : {
"total" : 6 ,
"pending" : 0 ,
"running" : 0 ,
"finished" : 6 ,
"canceling" : 0 ,
"canceled" : 0 ,
"failed" : 0
}
},
{
"jid" : "49306f94d0920216b636e8dd503a6409" ,
"name" : "Flink Java Job at Wed Sep 16 18:16:39 CEST 2015" ,
...
}]
}
Details of a Running or Completed Job
/jobs/<jobid>
Summary of one job, listing dataflow plan, status, timestamps of state transitions, aggregate information for each vertex (operator).
Sample Result:
{
"jid" : "ab78dcdbb1db025539e30217ec54ee16" ,
"name" : "WordCount Example" ,
"state" : "FINISHED" ,
"start-time" : 1442421277536 ,
"end-time" : 1442421299791 ,
"duration" : 22255 ,
"now" : 1442421991768 ,
"timestamps" : {
"CREATED" : 1442421277536 , "RUNNING" : 1442421277609 , "FAILING" : 0 , "FAILED" : 0 , "CANCELLING" : 0 , "CANCELED" : 0 , "FINISHED" : 1442421299791 , "RESTARTING" : 0
},
"vertices" : [ {
"id" : "19b5b24062c48a06e4eac65422ac3317" ,
"name" : "CHAIN DataSource (at getTextDataSet(WordCount.java:142) ..." ,
"parallelism" : 2 ,
"status" : "FINISHED" ,
"start-time" : 1442421277609 ,
"end-time" : 1442421299469 ,
"duration" : 21860 ,
"tasks" : {
"CREATED" : 0 , "SCHEDULED" : 0 , "DEPLOYING" : 0 , "RUNNING" : 0 , "FINISHED" : 2 , "CANCELING" : 0 , "CANCELED" : 0 , "FAILED" : 0
},
"metrics" : {
"read-bytes" : 0 , "write-bytes" : 37098 , "read-records" : 0 , "write-records" : 3312
}
}, {
"id" : "f00c89b349b5c998cfd9fe2a06e50fd0" ,
"name" : "Reduce (SUM(1), at main(WordCount.java:67)" ,
"parallelism" : 2 ,
....
}, {
"id" : "0a36cbc29102d7bc993d0a9bf23afa12" ,
"name" : "DataSink (CsvOutputFormat (path: /tmp/abzs, delimiter: ))" ,
...
} ],
"status-counts" : {
"CREATED" : 0 , "SCHEDULED" : 0 , "DEPLOYING" : 0 , "RUNNING" : 0 , "FINISHED" : 3 , "CANCELING" : 0 , "CANCELED" : 0 , "FAILED" : 0
},
"plan" : {
// see plan details below
}
}
/jobs/<jobid>/vertices
Currently the same as /jobs/<jobid>
/jobs/<jobid>/config
The user-defined execution config used by the job.
Sample Result:
{
"jid" : "ab78dcdbb1db025539e30217ec54ee16" ,
"name" : "WordCount Example" ,
"execution-config" : {
"execution-mode" : "PIPELINED" ,
"restart-strategy" : "Restart deactivated" ,
"job-parallelism" : -1 ,
"object-reuse-mode" : false
}
}
/jobs/<jobid>/exceptions
The non-recoverable exceptions that have been observed by the job.
The truncated
flag defines whether more exceptions occurred, but are not listed, because the response would otherwise get too big.
Sample Result:
{
"root-exception" : "java.io.IOException: File already exists:/tmp/abzs/2 \n\t at org.apache.flink.core.fs.local.LocalFileSystem. ..." ,
"all-exceptions" : [ {
"exception" : "java.io.IOException: File already exists:/tmp/abzs/1 \n\t at org.apache.flink..." ,
"task" : "DataSink (CsvOutputFormat (path: /tmp/abzs, delimiter: )) (1/2)" ,
"location" : "localhost:49220"
}, {
"exception" : "java.io.IOException: File already exists:/tmp/abzs/2 \n\t at org.apache.flink..." ,
"task" : "DataSink (CsvOutputFormat (path: /tmp/abzs, delimiter: )) (2/2)" ,
"location" : "localhost:49220"
} ],
"truncated" : false
}
/jobs/<jobid>/accumulators
The aggregated user accumulators plus job accumulators.
Sample Result:
{
"job-accumulators" :[],
"user-task-accumulators" : [ {
"name" : "avglen" ,
"type" : "DoubleCounter" ,
"value" : "DoubleCounter 61.5162972"
},
{
"name" : "genwords" ,
"type" : "LongCounter" ,
"value" : "LongCounter 37500000"
} ]
}
/jobs/<jobid>/vertices/<vertexid>
Information about one specific vertex, with a summary for each of its subtasks.
Sample Result:
{
"id" : "dceafe2df1f57a1206fcb907cb38ad97" ,
"name" : "CHAIN DataSource -> Map -> FlatMap -> Combine(SUM(1))" ,
"parallelism" : 2 ,
"now" : 1442424002154 ,
"subtasks" : [ {
"subtask" : 0 ,
"status" : "FINISHED" ,
"attempt" : 0 ,
"host" : "localhost" ,
"start-time" : 1442421093762 ,
"end-time" : 1442421386680 ,
"duration" : 292918 ,
"metrics" : {
"read-bytes" : 0 , "write-bytes" : 12684375 , "read-records" : 0 , "write-records" : 1153125
}
}, {
"subtask" : 1 ,
"status" : "FINISHED" ,
"attempt" : 0 ,
"host" : "localhost" ,
"start-time" : 1442421093774 ,
"end-time" : 1442421386267 ,
"duration" : 292493 ,
"metrics" : {
"read-bytes" : 0 , "write-bytes" : 12684375 , "read-records" : 0 , "write-records" : 1153125
}
} ]
}
/jobs/<jobid>/vertices/<vertexid>/subtasktimes
This request returns the timestamps for the state transitions of all subtasks of a given vertex.
These can be used, for example, to create time-line comparisons between subtasks.
Sample Result:
{
"id" : "dceafe2df1f57a1206fcb907cb38ad97" ,
"name" : "CHAIN DataSource -> Map -> Combine(SUM(1))" ,
"now" : 1442423745088 ,
"subtasks" : [ {
"subtask" : 0 ,
"host" : "localhost" ,
"duration" : 292924 ,
"timestamps" : {
"CREATED" : 1442421093741 , "SCHEDULED" : 1442421093756 , "DEPLOYING" : 1442421093762 , "RUNNING" : 1442421094026 , "FINISHED" : 1442421386680 , "CANCELING" : 0 , "CANCELED" : 0 , "FAILED" : 0
}
}, {
"subtask" : 1 ,
"host" : "localhost" ,
"duration" : 292494 ,
"timestamps" : {
"CREATED" : 1442421093741 , "SCHEDULED" : 1442421093773 , "DEPLOYING" : 1442421093774 , "RUNNING" : 1442421094013 , "FINISHED" : 1442421386267 , "CANCELING" : 0 , "CANCELED" : 0 , "FAILED" : 0
}
} ]
}
/jobs/<jobid>/vertices/<vertexid>/taskmanagers
TaskManager statistics for one specific vertex. This is an aggregation of subtask statistics returned by /jobs/<jobid>/vertices/<vertexid>
.
Sample Result:
{
"id" : "fe20bcc29b87cdc76589ca42114c2499" ,
"name" : "Reduce (SUM(1), at main(WordCount.java:72)" ,
"now" : 1454348282653 ,
"taskmanagers" : [ {
"host" : "ip-10-0-43-227:35413" ,
"status" : "FINISHED" ,
"start-time" : 1454347870991 ,
"end-time" : 1454347872111 ,
"duration" : 1120 ,
"metrics" : {
"read-bytes" : 32503056 , "write-bytes" : 9637041 , "read-records" : 2906087 , "write-records" : 849467
},
"status-counts" : {
"CREATED" : 0 , "SCHEDULED" : 0 , "DEPLOYING" : 0 , "RUNNING" : 0 , "FINISHED" : 18 , "CANCELING" : 0 , "CANCELED" : 0 , "FAILED" : 0
}
},{
"host" : "ip-10-0-43-227:41486" ,
"status" : "FINISHED" ,
"start-time" : 1454347871001 ,
"end-time" : 1454347872395 ,
"duration" : 1394 ,
"metrics" : {
"read-bytes" : 32389499 , "write-bytes" : 9608829 , "read-records" : 2895999 , "write-records" : 846948
},
"status-counts" : {
"CREATED" : 0 , "SCHEDULED" : 0 , "DEPLOYING" : 0 , "RUNNING" : 0 , "FINISHED" : 18 , "CANCELING" : 0 , "CANCELED" : 0 , "FAILED" : 0
}
} ]
}
/jobs/<jobid>/vertices/<vertexid>/accumulators
The aggregated user-defined accumulators, for a specific vertex.
Sample Result:
{
"id" : "dceafe2df1f57a1206fcb907cb38ad97" ,
"user-accumulators" : [ {
"name" : "avglen" , "type" : "DoubleCounter" , "value" : "DoubleCounter 123.03259440000001"
}, {
"name" : "genwords" , "type" : "LongCounter" , "value" : "LongCounter 75000000"
} ]
}
/jobs/<jobid>/vertices/<vertexid>/subtasks/accumulators
Gets all user-defined accumulators for all subtasks of a given vertex. These are the individual accumulators that are returned in aggregated form by the
request /jobs/<jobid>/vertices/<vertexid>/accumulators
.
Sample Result:
{
"id" : "dceafe2df1f57a1206fcb907cb38ad97" ,
"parallelism" : 2 ,
"subtasks" : [ {
"subtask" : 0 ,
"attempt" : 0 ,
"host" : "localhost" ,
"user-accumulators" : [ {
"name" : "genwords" , "type" : "LongCounter" , "value" : "LongCounter 62500000"
}, {
"name" : "genletters" , "type" : "LongCounter" , "value" : "LongCounter 1281589525"
} ]
}, {
"subtask" : 1 ,
"attempt" : 0 ,
"host" : "localhost" ,
"user-accumulators" : [ {
"name" : "genwords" , "type" : "LongCounter" , "value" : "LongCounter 12500000"
}, {
"name" : "genletters" , "type" : "LongCounter" , "value" : "LongCounter 256317905"
} ]
} ]
}
/jobs/<jobid>/vertices/<vertexid>/subtasks/<subtasknum>
Summary of the current or latest execution attempt of a specific subtask. See below for a sample.
/jobs/<jobid>/vertices/<vertexid>/subtasks/<subtasknum>/attempts/<attempt>
Summary of a specific execution attempt of a specific subtask. Multiple execution attempts happen in case of failure/recovery.
Sample Result:
{
"subtask" : 0 ,
"status" : "FINISHED" ,
"attempt" : 0 ,
"host" : "localhost" ,
"start-time" : 1442421093762 ,
"end-time" : 1442421386680 ,
"duration" : 292918 ,
"metrics" : {
"read-bytes" : 0 , "write-bytes" : 12684375 , "read-records" : 0 , "write-records" : 1153125
}
}
/jobs/<jobid>/vertices/<vertexid>/subtasks/<subtasknum>/attempts/<attempt>/accumulators
The accumulators collected for one specific subtask during one specific execution attempt (multiple attempts happen in case of failure/recovery).
Sample Result:
{
"subtask" : 0 ,
"attempt" : 0 ,
"id" : "b22f94d91bf41ddb" ,
"user-accumulators" : [ {
"name" : "genwords" , "type" : "LongCounter" , "value" : "LongCounter 62500000"
}, {
"name" : "genletters" , "type" : "LongCounter" , "value" : "LongCounter 1281589525"
}, {
"name" : "avglen" , "type" : "DoubleCounter" , "value" : "DoubleCounter 102.527162"
} ]
}
/jobs/<jobid>/plan
The dataflow plan of a job. The plan is also included in the job summary (/jobs/<jobid>
).
Sample Result:
{
"jid" : "ab78dcdbb1db025539e30217ec54ee16" ,
"name" : "WordCount Example" ,
"nodes" : [ {
"id" : "f00c89b349b5c998cfd9fe2a06e50fd0" ,
"parallelism" : 2 ,
"operator" : "GroupReduce" ,
"operator_strategy" : "Sorted Group Reduce" ,
"description" : "Reduce (SUM(1), at main(WordCount.java:67)" ,
"inputs" : [ {
"num" : 0 ,
"id" : "19b5b24062c48a06e4eac65422ac3317" ,
"ship_strategy" : "Hash Partition on [0]" ,
"local_strategy" : "Sort (combining) on [0:ASC]" ,
"exchange" : "pipelined"
} ],
"optimizer_properties" : {
"global_properties" : [
{ "name" : "Partitioning" , "value" : "HASH_PARTITIONED" },
{ "name" : "Partitioned on" , "value" : "[0]" },
{ "name" : "Partitioning Order" , "value" : "(none)" },
{ "name" : "Uniqueness" , "value" : "not unique" }
],
"local_properties" : [
{ "name" : "Order" , "value" : "[0:ASC]" },
{ "name" : "Grouped on" , "value" : "[0]" },
{ "name" : "Uniqueness" , "value" : "not unique" }
],
"estimates" : [
{ "name" : "Est. Output Size" , "value" : "(unknown)" },
{ "name" : "Est. Cardinality" , "value" : "(unknown)" }
],
"costs" : [
{ "name" : "Network" , "value" : "(unknown)" },
{ "name" : "Disk I/O" , "value" : "(unknown)" },
{ "name" : "CPU" , "value" : "(unknown)" },
{ "name" : "Cumulative Network" , "value" : "(unknown)" },
{ "name" : "Cumulative Disk I/O" , "value" : "(unknown)" },
{ "name" : "Cumulative CPU" , "value" : "(unknown)" }
],
"compiler_hints" : [
{ "name" : "Output Size (bytes)" , "value" : "(none)" },
{ "name" : "Output Cardinality" , "value" : "(none)" },
{ "name" : "Avg. Output Record Size (bytes)" , "value" : "(none)" },
{ "name" : "Filter Factor" , "value" : "(none)" }
]
}
},
{
"id" : "19b5b24062c48a06e4eac65422ac3317" ,
"parallelism" : 2 ,
"operator" : "Data Source -> FlatMap -> GroupCombine" ,
"operator_strategy" : " (none) -> FlatMap -> Sorted Combine" ,
"description" : "DataSource (at getTextDataSet(WordCount.java:142) (org.apache.flink.api.java.io.TextInputFormat)) -> FlatMap (FlatMap at main(WordCount.java:67)) -> Combine(SUM(1), at main(WordCount.java:67)" ,
"optimizer_properties" : {
...
}
},
{
"id" : "0a36cbc29102d7bc993d0a9bf23afa12" ,
"parallelism" : 2 ,
"operator" : "Data Sink" ,
"operator_strategy" : "(none)" ,
"description" : "DataSink (CsvOutputFormat (path: /tmp/abzs, delimiter: ))" ,
"inputs" :[ {
"num" : 0 ,
"id" : "f00c89b349b5c998cfd9fe2a06e50fd0" ,
"ship_strategy" : "Forward" ,
"exchange" : "pipelined"
} ],
"optimizer_properties" : {
...
}
} ]
}
Job Cancellation
Cancel Job
DELETE
request to /jobs/:jobid/cancel
.
Triggers job cancellation, result on success is {}
.
Cancel Job with Savepoint
Triggers a savepoint and cancels the job after the savepoint succeeds.
GET
request to /jobs/:jobid/cancel-with-savepoint/
triggers a savepoint to the default savepoint directory and cancels the job.
GET
request to /jobs/:jobid/cancel-with-savepoint/target-directory/:targetDirectory
triggers a savepoint to the given target directory and cancels the job.
Since savepoints can take some time to complete this operation happens asynchronously. The result to this request is the location of the in-progress cancellation.
Sample Trigger Result:
{
"status" : "accepted" ,
"request-id" : 1 ,
"location" : "/jobs/:jobid/cancel-with-savepoint/in-progress/1"
}
Monitoring Progress
The progress of the cancellation has to be monitored by the user at
/jobs/ : jobid/cancel-with-savepoint/in-progress/ : requestId
The request ID is returned by the trigger result.
In-Progress
{
"status" : "in-progress" ,
"request-id" : 1
}
Success
{
"status" : "success" ,
"request-id" : 1 ,
"savepoint-path" : "<savepointPath>"
}
The savepointPath
points to the external path of the savepoint, which can be used to resume the savepoint.
Failed
{
"status" : "failed" ,
"request-id" : 1 ,
"cause" : "<error message>"
}
Submitting Programs
It is possible to upload, run, and list Flink programs via the REST APIs and web frontend.
Upload a new JAR file
Send a POST
request to /jars/upload
with your jar file sent as multi-part data under the jarfile
file.
Also make sure that the multi-part data includes the Content-Type
of the file itself, some http libraries do not add the header by default.
The multi-part payload should start like
------BoundaryXXXX
Content-Disposition: form-data; name="jarfile"; filename="YourFileName.jar"
Content-Type: application/x-java-archive
Run a Program (POST)
Send a POST
request to /jars/:jarid/run
. The jarid
parameter is the file name of the program JAR in the configured web frontend upload directory (configuration key jobmanager.web.upload.dir
).
You can specify the following query parameters (all optional):
Program arguments : program-args=arg1 arg2 arg3
Main class to execute : entry-class=EntryClassName.class
Default parallelism : parallelism=4
Savepoint path to restore from : savepointPath=hdfs://path/to/savepoint
Allow non restored state : allowNonRestoredState=true
If the call succeeds, you will get a response with the ID of the submitted job.
Example: Run program with a savepoint
Request:
POST: /jars/MyProgram.jar/run?savepointPath= /my-savepoints/savepoint-1bae02a80464&allowNonRestoredState= true
Response:
{ "jobid" : "869a9868d49c679e7355700e0857af85" }
Back to top