Validator

Custom FlinkResourceValidator implementation #

FlinkResourceValidator, an interface for validating the resources of FlinkDeployment and FlinkSessionJob, is a pluggable component based on the Plugins mechanism. During development, we can customize the implementation of FlinkResourceValidator and make sure to retain the service definition in META-INF/services. The following steps demonstrate how to develop and use a custom validator.

  1. Implement FlinkResourceValidator interface:

    package org.apache.flink.kubernetes.operator.validation;
      
    import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
    import org.apache.flink.kubernetes.operator.crd.FlinkSessionJob;
      
    import java.util.Optional;
      
    /** Custom validator implementation of {@link FlinkResourceValidator}. */
    public class CustomValidator implements FlinkResourceValidator {
      
        @Override
        public Optional<String> validateDeployment(FlinkDeployment deployment) {
            if (deployment.getSpec().getFlinkVersion() == null) {
              return Optional.of("Flink Version must be defined.");
            }
            return Optional.empty();
        }
      
        @Override
        public Optional<String> validateSessionJob(
                 FlinkSessionJob sessionJob, Optional<FlinkDeployment> session) {
            if (sessionJob.getSpec().getJob() == null) {
              return Optional.of("The job spec should not be empty");
            }
            return Optional.empty();
        }
    }
    
  2. Create service definition file org.apache.flink.kubernetes.operator.validation.FlinkResourceValidator in META-INF/services. With custom FlinkResourceValidator implementation, the service definition describes as follows:

    org.apache.flink.kubernetes.operator.validation.CustomValidator
    
  3. Use the Maven tool to package the project and generate the custom validator JAR.

  4. Create Dockerfile to build a custom image from the apache/flink-kubernetes-operator official image and copy the generated JAR to custom validator plugin directory. /opt/flink/plugins is the value of FLINK_PLUGINS_DIR environment variable in the flink-kubernetes-operator helm chart. The structure of custom validator directory under /opt/flink/plugins is as follows:

    /opt/flink/plugins
        ├── custom-validator
        │   ├── custom-validator.jar
        └── ...
    

    With the custom validator directory location, the Dockerfile is defined as follows:

    FROM apache/flink-kubernetes-operator
    ENV FLINK_PLUGINS_DIR=/opt/flink/plugins
    ENV CUSTOM_VALIDATOR_DIR=custom-validator
    RUN mkdir $FLINK_PLUGINS_DIR/$CUSTOM_VALIDATOR_DIR
    COPY custom-validator.jar $FLINK_PLUGINS_DIR/$CUSTOM_VALIDATOR_DIR/
    
  5. Install the flink-kubernetes-operator helm chart with the custom image and verify the deploy/flink-kubernetes-operator log has:

    2022-05-04 14:01:46,551 o.a.f.k.o.u.FlinkUtils         [INFO ] Discovered resource validator from plugin directory[/opt/flink/plugins]: org.apache.flink.kubernetes.operator.validation.CustomValidator.