Class SinkTestSuiteBase<T extends Comparable<T>>

    • Constructor Detail

      • SinkTestSuiteBase

        public SinkTestSuiteBase()
    • Method Detail

      • testBasicSink

        @TestTemplate
        @DisplayName("Test data stream sink")
        public void testBasicSink​(TestEnvironment testEnv,
                                  DataStreamSinkExternalContext<T> externalContext,
                                  CheckpointingMode semantic)
                           throws Exception
        Test DataStream connector sink.

        The following tests will create a sink in the external system, generate a collection of test data and write them to this sink by the Flink Job.

        In order to pass these tests, the number of records produced by Flink need to be equals to the generated test data. And the records in the sink will be compared to the test data by the different semantics. There's no requirement for records order.

        Throws:
        Exception
      • testStartFromSavepoint

        @TestTemplate
        @DisplayName("Test sink restarting from a savepoint")
        public void testStartFromSavepoint​(TestEnvironment testEnv,
                                           DataStreamSinkExternalContext<T> externalContext,
                                           CheckpointingMode semantic)
                                    throws Exception
        Test connector sink restart from a completed savepoint with the same parallelism.

        This test will create a sink in the external system, generate a collection of test data and write a half part of them to this sink by the Flink Job with parallelism 2 at first. Then stop the job, restart the same job from the completed savepoint. After the job has been running, write the other part to the sink and compare the result.

        In order to pass this test, the number of records produced by Flink need to be equals to the generated test data. And the records in the sink will be compared to the test data by the different semantic. There's no requirement for record order.

        Throws:
        Exception
      • testScaleUp

        @TestTemplate
        @DisplayName("Test sink restarting with a higher parallelism")
        public void testScaleUp​(TestEnvironment testEnv,
                                DataStreamSinkExternalContext<T> externalContext,
                                CheckpointingMode semantic)
                         throws Exception
        Test connector sink restart from a completed savepoint with a higher parallelism.

        This test will create a sink in the external system, generate a collection of test data and write a half part of them to this sink by the Flink Job with parallelism 2 at first. Then stop the job, restart the same job from the completed savepoint with a higher parallelism 4. After the job has been running, write the other part to the sink and compare the result.

        In order to pass this test, the number of records produced by Flink need to be equals to the generated test data. And the records in the sink will be compared to the test data by the different semantic. There's no requirement for record order.

        Throws:
        Exception
      • testScaleDown

        @TestTemplate
        @DisplayName("Test sink restarting with a lower parallelism")
        public void testScaleDown​(TestEnvironment testEnv,
                                  DataStreamSinkExternalContext<T> externalContext,
                                  CheckpointingMode semantic)
                           throws Exception
        Test connector sink restart from a completed savepoint with a lower parallelism.

        This test will create a sink in the external system, generate a collection of test data and write a half part of them to this sink by the Flink Job with parallelism 4 at first. Then stop the job, restart the same job from the completed savepoint with a lower parallelism 2. After the job has been running, write the other part to the sink and compare the result.

        In order to pass this test, the number of records produced by Flink need to be equals to the generated test data. And the records in the sink will be compared to the test data by the different semantic. There's no requirement for record order.

        Throws:
        Exception
      • testMetrics

        @TestTemplate
        @DisplayName("Test sink metrics")
        public void testMetrics​(TestEnvironment testEnv,
                                DataStreamSinkExternalContext<T> externalContext,
                                CheckpointingMode semantic)
                         throws Exception
        Test connector sink metrics.

        This test will create a sink in the external system, generate test data and write them to the sink via a Flink job. Then read and compare the metrics.

        Now test: numRecordsOut

        Throws:
        Exception
      • generateTestData

        protected List<T> generateTestData​(TestingSinkSettings testingSinkSettings,
                                           DataStreamSinkExternalContext<T> externalContext)
        Generate a set of test records.
        Parameters:
        testingSinkSettings - sink settings
        externalContext - External context
        Returns:
        Collection of generated test records
      • checkResultWithSemantic

        @Deprecated
        protected void checkResultWithSemantic​(ExternalSystemDataReader<T> reader,
                                               List<T> testData,
                                               CheckpointingMode semantic)
                                        throws Exception
        Deprecated.
        This method is required for downstream projects e.g. Flink connectors extending this test for the case when there should be supported Flink versions below 1.20. Could be removed together with dropping support for Flink 1.19.
        Throws:
        Exception