Class ScalingMetricEvaluator


  • public class ScalingMetricEvaluator
    extends java.lang.Object
    Job scaling evaluator for autoscaler.
    • Method Summary

      All Methods Static Methods Instance Methods Concrete Methods 
      Modifier and Type Method Description
      protected static double computeBusyTimeAvg​(org.apache.flink.configuration.Configuration conf, java.util.SortedMap<java.time.Instant,​CollectedMetrics> metricsHistory, org.apache.flink.runtime.jobgraph.JobVertexID vertex, int parallelism)
      Compute the average busy time for the given vertex for the current metric window.
      protected static double computeEdgeDataRate​(JobTopology topology, java.util.SortedMap<java.time.Instant,​CollectedMetrics> metricsHistory, org.apache.flink.runtime.jobgraph.JobVertexID from, org.apache.flink.runtime.jobgraph.JobVertexID to)
      Compute how many records flow between two job vertices in the pipeline.
      protected static double computeEdgeOutputRatio​(org.apache.flink.runtime.jobgraph.JobVertexID from, org.apache.flink.runtime.jobgraph.JobVertexID to, JobTopology topology, java.util.SortedMap<java.time.Instant,​CollectedMetrics> metricsHistory)
      Compute the In/Out ratio between the (from, to) vertices.
      protected static void computeProcessingRateThresholds​(java.util.Map<ScalingMetric,​EvaluatedScalingMetric> metrics, org.apache.flink.configuration.Configuration conf, boolean processingBacklog, java.time.Duration restartTime)  
      protected static double computeTrueProcessingRate​(double busyTimeAvg, double inputRateAvg, java.util.SortedMap<java.time.Instant,​CollectedMetrics> metricsHistory, org.apache.flink.runtime.jobgraph.JobVertexID vertex, org.apache.flink.configuration.Configuration conf)
      Compute the true processing rate for the given vertex for the current metric window.
      EvaluatedMetrics evaluate​(org.apache.flink.configuration.Configuration conf, CollectedMetricHistory collectedMetrics, java.time.Duration restartTime)  
      protected static java.util.Map<ScalingMetric,​EvaluatedScalingMetric> evaluateGlobalMetrics​(java.util.SortedMap<java.time.Instant,​CollectedMetrics> metricHistory)  
      static double getAverage​(ScalingMetric metric, org.apache.flink.runtime.jobgraph.JobVertexID jobVertexId, java.util.SortedMap<java.time.Instant,​CollectedMetrics> metricsHistory)  
      static double getAverage​(ScalingMetric metric, org.apache.flink.runtime.jobgraph.JobVertexID jobVertexId, java.util.SortedMap<java.time.Instant,​CollectedMetrics> metricsHistory, int minElements)  
      static double getRate​(ScalingMetric metric, org.apache.flink.runtime.jobgraph.JobVertexID jobVertexId, java.util.SortedMap<java.time.Instant,​CollectedMetrics> metricsHistory)
      Compute per second rate for the given accumulated metric over the metric window.
      protected static boolean isProcessingBacklog​(JobTopology topology, java.util.SortedMap<java.time.Instant,​CollectedMetrics> metricsHistory, org.apache.flink.configuration.Configuration conf)  
      • Methods inherited from class java.lang.Object

        clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
    • Constructor Detail

      • ScalingMetricEvaluator

        public ScalingMetricEvaluator()
    • Method Detail

      • isProcessingBacklog

        @VisibleForTesting
        protected static boolean isProcessingBacklog​(JobTopology topology,
                                                     java.util.SortedMap<java.time.Instant,​CollectedMetrics> metricsHistory,
                                                     org.apache.flink.configuration.Configuration conf)
      • computeBusyTimeAvg

        @VisibleForTesting
        protected static double computeBusyTimeAvg​(org.apache.flink.configuration.Configuration conf,
                                                   java.util.SortedMap<java.time.Instant,​CollectedMetrics> metricsHistory,
                                                   org.apache.flink.runtime.jobgraph.JobVertexID vertex,
                                                   int parallelism)
        Compute the average busy time for the given vertex for the current metric window. Depending on the MetricAggregator chosen we use two different mechanisms:
        1. For AVG aggregator we compute from accumulated busy time to get the most precise metric
        2. or MAX/MIN aggregators we have to average over the point-in-time MAX/MIN values collected over the metric windows. These are stored in the LOAD metric.
        Parameters:
        conf -
        metricsHistory -
        vertex -
        parallelism -
        Returns:
        Average busy time in the current metric window
      • computeTrueProcessingRate

        protected static double computeTrueProcessingRate​(double busyTimeAvg,
                                                          double inputRateAvg,
                                                          java.util.SortedMap<java.time.Instant,​CollectedMetrics> metricsHistory,
                                                          org.apache.flink.runtime.jobgraph.JobVertexID vertex,
                                                          org.apache.flink.configuration.Configuration conf)
        Compute the true processing rate for the given vertex for the current metric window. The computation takes into account both observed (during catchup) and busy time based processing rate and selects the right metric depending on the config.
        Parameters:
        busyTimeAvg -
        inputRateAvg -
        metricsHistory -
        vertex -
        conf -
        Returns:
        Average true processing rate over metric window.
      • computeProcessingRateThresholds

        @VisibleForTesting
        protected static void computeProcessingRateThresholds​(java.util.Map<ScalingMetric,​EvaluatedScalingMetric> metrics,
                                                              org.apache.flink.configuration.Configuration conf,
                                                              boolean processingBacklog,
                                                              java.time.Duration restartTime)
      • getAverage

        public static double getAverage​(ScalingMetric metric,
                                        @Nullable
                                        org.apache.flink.runtime.jobgraph.JobVertexID jobVertexId,
                                        java.util.SortedMap<java.time.Instant,​CollectedMetrics> metricsHistory)
      • getRate

        public static double getRate​(ScalingMetric metric,
                                     @Nullable
                                     org.apache.flink.runtime.jobgraph.JobVertexID jobVertexId,
                                     java.util.SortedMap<java.time.Instant,​CollectedMetrics> metricsHistory)
        Compute per second rate for the given accumulated metric over the metric window.
        Parameters:
        metric -
        jobVertexId -
        metricsHistory -
        Returns:
        Per second rate or Double.NaN if we don't have at least 2 observations.
      • getAverage

        public static double getAverage​(ScalingMetric metric,
                                        @Nullable
                                        org.apache.flink.runtime.jobgraph.JobVertexID jobVertexId,
                                        java.util.SortedMap<java.time.Instant,​CollectedMetrics> metricsHistory,
                                        int minElements)
      • computeEdgeOutputRatio

        @VisibleForTesting
        protected static double computeEdgeOutputRatio​(org.apache.flink.runtime.jobgraph.JobVertexID from,
                                                       org.apache.flink.runtime.jobgraph.JobVertexID to,
                                                       JobTopology topology,
                                                       java.util.SortedMap<java.time.Instant,​CollectedMetrics> metricsHistory)
        Compute the In/Out ratio between the (from, to) vertices. The rate estimates the number of output records produced to the downstream vertex for every input received for the upstream vertex. For example output ratio 2.0 means that we produce approximately 2 outputs to the "to" vertex for every 1 input received in the "from" vertex.
        Parameters:
        from - Upstream vertex
        to - Downstream vertex
        topology -
        metricsHistory -
        Returns:
        Output ratio
      • computeEdgeDataRate

        @VisibleForTesting
        protected static double computeEdgeDataRate​(JobTopology topology,
                                                    java.util.SortedMap<java.time.Instant,​CollectedMetrics> metricsHistory,
                                                    org.apache.flink.runtime.jobgraph.JobVertexID from,
                                                    org.apache.flink.runtime.jobgraph.JobVertexID to)
        Compute how many records flow between two job vertices in the pipeline. Since Flink does not expose any output / data rate metric on an edge level we have to compute this from the vertex level input/output metrics.
        Parameters:
        topology -
        metricsHistory -
        from - Upstream vertex
        to - Downstream vertex
        Returns:
        Records per second data rate between the two vertices