Package org.apache.flink.autoscaler
Class ScalingMetricEvaluator
- java.lang.Object
-
- org.apache.flink.autoscaler.ScalingMetricEvaluator
-
public class ScalingMetricEvaluator extends java.lang.Object
Job scaling evaluator for autoscaler.
-
-
Constructor Summary
Constructors Constructor Description ScalingMetricEvaluator()
-
Method Summary
All Methods Static Methods Instance Methods Concrete Methods Modifier and Type Method Description protected static double
computeBusyTimeAvg(org.apache.flink.configuration.Configuration conf, java.util.SortedMap<java.time.Instant,CollectedMetrics> metricsHistory, org.apache.flink.runtime.jobgraph.JobVertexID vertex, int parallelism)
Compute the average busy time for the given vertex for the current metric window.protected static double
computeEdgeDataRate(JobTopology topology, java.util.SortedMap<java.time.Instant,CollectedMetrics> metricsHistory, org.apache.flink.runtime.jobgraph.JobVertexID from, org.apache.flink.runtime.jobgraph.JobVertexID to)
Compute how many records flow between two job vertices in the pipeline.protected static double
computeEdgeOutputRatio(org.apache.flink.runtime.jobgraph.JobVertexID from, org.apache.flink.runtime.jobgraph.JobVertexID to, JobTopology topology, java.util.SortedMap<java.time.Instant,CollectedMetrics> metricsHistory)
Compute the In/Out ratio between the (from, to) vertices.protected static void
computeProcessingRateThresholds(java.util.Map<ScalingMetric,EvaluatedScalingMetric> metrics, org.apache.flink.configuration.Configuration conf, boolean processingBacklog, java.time.Duration restartTime)
protected static double
computeTrueProcessingRate(double busyTimeAvg, double inputRateAvg, java.util.SortedMap<java.time.Instant,CollectedMetrics> metricsHistory, org.apache.flink.runtime.jobgraph.JobVertexID vertex, org.apache.flink.configuration.Configuration conf)
Compute the true processing rate for the given vertex for the current metric window.EvaluatedMetrics
evaluate(org.apache.flink.configuration.Configuration conf, CollectedMetricHistory collectedMetrics, java.time.Duration restartTime)
protected static java.util.Map<ScalingMetric,EvaluatedScalingMetric>
evaluateGlobalMetrics(java.util.SortedMap<java.time.Instant,CollectedMetrics> metricHistory)
static double
getAverage(ScalingMetric metric, org.apache.flink.runtime.jobgraph.JobVertexID jobVertexId, java.util.SortedMap<java.time.Instant,CollectedMetrics> metricsHistory)
static double
getAverage(ScalingMetric metric, org.apache.flink.runtime.jobgraph.JobVertexID jobVertexId, java.util.SortedMap<java.time.Instant,CollectedMetrics> metricsHistory, int minElements)
static double
getRate(ScalingMetric metric, org.apache.flink.runtime.jobgraph.JobVertexID jobVertexId, java.util.SortedMap<java.time.Instant,CollectedMetrics> metricsHistory)
Compute per second rate for the given accumulated metric over the metric window.protected static boolean
isProcessingBacklog(JobTopology topology, java.util.SortedMap<java.time.Instant,CollectedMetrics> metricsHistory, org.apache.flink.configuration.Configuration conf)
-
-
-
Method Detail
-
evaluate
public EvaluatedMetrics evaluate(org.apache.flink.configuration.Configuration conf, CollectedMetricHistory collectedMetrics, java.time.Duration restartTime)
-
isProcessingBacklog
@VisibleForTesting protected static boolean isProcessingBacklog(JobTopology topology, java.util.SortedMap<java.time.Instant,CollectedMetrics> metricsHistory, org.apache.flink.configuration.Configuration conf)
-
computeBusyTimeAvg
@VisibleForTesting protected static double computeBusyTimeAvg(org.apache.flink.configuration.Configuration conf, java.util.SortedMap<java.time.Instant,CollectedMetrics> metricsHistory, org.apache.flink.runtime.jobgraph.JobVertexID vertex, int parallelism)
Compute the average busy time for the given vertex for the current metric window. Depending on theMetricAggregator
chosen we use two different mechanisms:- For AVG aggregator we compute from accumulated busy time to get the most precise metric
- or MAX/MIN aggregators we have to average over the point-in-time MAX/MIN values collected over the metric windows. These are stored in the LOAD metric.
- Parameters:
conf
-metricsHistory
-vertex
-parallelism
-- Returns:
- Average busy time in the current metric window
-
computeTrueProcessingRate
protected static double computeTrueProcessingRate(double busyTimeAvg, double inputRateAvg, java.util.SortedMap<java.time.Instant,CollectedMetrics> metricsHistory, org.apache.flink.runtime.jobgraph.JobVertexID vertex, org.apache.flink.configuration.Configuration conf)
Compute the true processing rate for the given vertex for the current metric window. The computation takes into account both observed (during catchup) and busy time based processing rate and selects the right metric depending on the config.- Parameters:
busyTimeAvg
-inputRateAvg
-metricsHistory
-vertex
-conf
-- Returns:
- Average true processing rate over metric window.
-
computeProcessingRateThresholds
@VisibleForTesting protected static void computeProcessingRateThresholds(java.util.Map<ScalingMetric,EvaluatedScalingMetric> metrics, org.apache.flink.configuration.Configuration conf, boolean processingBacklog, java.time.Duration restartTime)
-
evaluateGlobalMetrics
@VisibleForTesting protected static java.util.Map<ScalingMetric,EvaluatedScalingMetric> evaluateGlobalMetrics(java.util.SortedMap<java.time.Instant,CollectedMetrics> metricHistory)
-
getAverage
public static double getAverage(ScalingMetric metric, @Nullable org.apache.flink.runtime.jobgraph.JobVertexID jobVertexId, java.util.SortedMap<java.time.Instant,CollectedMetrics> metricsHistory)
-
getRate
public static double getRate(ScalingMetric metric, @Nullable org.apache.flink.runtime.jobgraph.JobVertexID jobVertexId, java.util.SortedMap<java.time.Instant,CollectedMetrics> metricsHistory)
Compute per second rate for the given accumulated metric over the metric window.- Parameters:
metric
-jobVertexId
-metricsHistory
-- Returns:
- Per second rate or Double.NaN if we don't have at least 2 observations.
-
getAverage
public static double getAverage(ScalingMetric metric, @Nullable org.apache.flink.runtime.jobgraph.JobVertexID jobVertexId, java.util.SortedMap<java.time.Instant,CollectedMetrics> metricsHistory, int minElements)
-
computeEdgeOutputRatio
@VisibleForTesting protected static double computeEdgeOutputRatio(org.apache.flink.runtime.jobgraph.JobVertexID from, org.apache.flink.runtime.jobgraph.JobVertexID to, JobTopology topology, java.util.SortedMap<java.time.Instant,CollectedMetrics> metricsHistory)
Compute the In/Out ratio between the (from, to) vertices. The rate estimates the number of output records produced to the downstream vertex for every input received for the upstream vertex. For example output ratio 2.0 means that we produce approximately 2 outputs to the "to" vertex for every 1 input received in the "from" vertex.- Parameters:
from
- Upstream vertexto
- Downstream vertextopology
-metricsHistory
-- Returns:
- Output ratio
-
computeEdgeDataRate
@VisibleForTesting protected static double computeEdgeDataRate(JobTopology topology, java.util.SortedMap<java.time.Instant,CollectedMetrics> metricsHistory, org.apache.flink.runtime.jobgraph.JobVertexID from, org.apache.flink.runtime.jobgraph.JobVertexID to)
Compute how many records flow between two job vertices in the pipeline. Since Flink does not expose any output / data rate metric on an edge level we have to compute this from the vertex level input/output metrics.- Parameters:
topology
-metricsHistory
-from
- Upstream vertexto
- Downstream vertex- Returns:
- Records per second data rate between the two vertices
-
-