Class SourceTestSuiteBase<T>

    • Constructor Detail

      • SourceTestSuiteBase

        public SourceTestSuiteBase()
    • Method Detail

      • testSourceSingleSplit

        @TestTemplate
        @DisplayName("Test source with single split")
        public void testSourceSingleSplit​(TestEnvironment testEnv,
                                          DataStreamSourceExternalContext<T> externalContext,
                                          CheckpointingMode semantic)
                                   throws Exception
        Test connector source with only one split in the external system.

        This test will create one split in the external system, write test data into it, and consume back via a Flink job with 1 parallelism.

        The number and order of records consumed by Flink need to be identical to the test data written to the external system in order to pass this test.

        A bounded source is required for this test.

        Throws:
        Exception
      • testMultipleSplits

        @TestTemplate
        @DisplayName("Test source with multiple splits")
        public void testMultipleSplits​(TestEnvironment testEnv,
                                       DataStreamSourceExternalContext<T> externalContext,
                                       CheckpointingMode semantic)
                                throws Exception
        Test connector source with multiple splits in the external system

        This test will create 4 splits in the external system, write test data to all splits, and consume back via a Flink job with 4 parallelism.

        The number and order of records in each split consumed by Flink need to be identical to the test data written into the external system to pass this test. There's no requirement for record order across splits.

        A bounded source is required for this test.

        Throws:
        Exception
      • testSavepoint

        @TestTemplate
        @DisplayName("Test source restarting from a savepoint")
        public void testSavepoint​(TestEnvironment testEnv,
                                  DataStreamSourceExternalContext<T> externalContext,
                                  CheckpointingMode semantic)
                           throws Exception
        Test connector source restart from a savepoint.

        This test will create 4 splits in the external system first, write test data to all splits, and consume back via a Flink job. Then stop the job with savepoint, restart the job from the checkpoint. After the job has been running, add some extra data to the source and compare the result.

        The number and order of records in each split consumed by Flink need to be identical to the test data written into the external system to pass this test. There's no requirement for record order across splits.

        Throws:
        Exception
      • testScaleUp

        @TestTemplate
        @DisplayName("Test source restarting with a higher parallelism")
        public void testScaleUp​(TestEnvironment testEnv,
                                DataStreamSourceExternalContext<T> externalContext,
                                CheckpointingMode semantic)
                         throws Exception
        Test connector source restart from a savepoint with a higher parallelism.

        This test will create 4 splits in the external system first, write test data to all splits and consume back via a Flink job with parallelism 2. Then stop the job with savepoint, restart the job from the checkpoint with a higher parallelism 4. After the job has been running, add some extra data to the source and compare the result.

        The number and order of records in each split consumed by Flink need to be identical to the test data written into the external system to pass this test. There's no requirement for record order across splits.

        Throws:
        Exception
      • testScaleDown

        @TestTemplate
        @DisplayName("Test source restarting with a lower parallelism")
        public void testScaleDown​(TestEnvironment testEnv,
                                  DataStreamSourceExternalContext<T> externalContext,
                                  CheckpointingMode semantic)
                           throws Exception
        Test connector source restart from a savepoint with a lower parallelism.

        This test will create 4 splits in the external system first, write test data to all splits and consume back via a Flink job with parallelism 4. Then stop the job with savepoint, restart the job from the checkpoint with a lower parallelism 2. After the job has been running, add some extra data to the source and compare the result.

        The number and order of records in each split consumed by Flink need to be identical to the test data written into the external system to pass this test. There's no requirement for record order across splits.

        Throws:
        Exception
      • testSourceMetrics

        @TestTemplate
        @DisplayName("Test source metrics")
        public void testSourceMetrics​(TestEnvironment testEnv,
                                      DataStreamSourceExternalContext<T> externalContext,
                                      CheckpointingMode semantic)
                               throws Exception
        Test connector source metrics.

        This test will create 4 splits in the external system first, write test data to all splits and consume back via a Flink job with parallelism 4. Then read and compare the metrics.

        Now test: numRecordsIn

        Throws:
        Exception
      • testIdleReader

        @TestTemplate
        @DisplayName("Test source with at least one idle parallelism")
        public void testIdleReader​(TestEnvironment testEnv,
                                   DataStreamSourceExternalContext<T> externalContext,
                                   CheckpointingMode semantic)
                            throws Exception
        Test connector source with an idle reader.

        This test will create 4 split in the external system, write test data to all splits, and consume back via a Flink job with 5 parallelism, so at least one parallelism / source reader will be idle (assigned with no splits). If the split enumerator of the source doesn't signal NoMoreSplitsEvent to the idle source reader, the Flink job will never spin to FINISHED state.

        The number and order of records in each split consumed by Flink need to be identical to the test data written into the external system to pass this test. There's no requirement for record order across splits.

        A bounded source is required for this test.

        Throws:
        Exception
      • testTaskManagerFailure

        @TestTemplate
        @DisplayName("Test TaskManager failure")
        public void testTaskManagerFailure​(TestEnvironment testEnv,
                                           DataStreamSourceExternalContext<T> externalContext,
                                           ClusterControllable controller,
                                           CheckpointingMode semantic)
                                    throws Exception
        Test connector source with task manager failover.

        This test will create 1 split in the external system, write test record set A into the split, restart task manager to trigger job failover, write test record set B into the split, and terminate the Flink job finally.

        The number and order of records consumed by Flink should be identical to A before the failover and B after the failover in order to pass the test.

        An unbounded source is required for this test, since TaskManager failover will be triggered in the middle of the test.

        Throws:
        Exception
      • generateAndWriteTestData

        protected List<T> generateAndWriteTestData​(int splitIndex,
                                                   DataStreamSourceExternalContext<T> externalContext,
                                                   TestingSourceSettings testingSourceSettings)
        Generate a set of test records and write it to the given split writer.
        Parameters:
        externalContext - External context
        Returns:
        List of generated test records
      • getTestDataSize

        protected int getTestDataSize​(List<List<T>> collections)
        Get the size of test data.
        Parameters:
        collections - test data
        Returns:
        the size of test data
      • checkResultWithSemantic

        protected void checkResultWithSemantic​(CloseableIterator<T> resultIterator,
                                               List<List<T>> testData,
                                               CheckpointingMode semantic,
                                               Integer limit)
        Compare the test data with the result.

        If the source is bounded, limit should be null.

        Parameters:
        resultIterator - the data read from the job
        testData - the test data
        semantic - the supported semantic, see CheckpointingMode
        limit - expected number of the data to read from the job