Class SourceTestSuiteBase<T>
- java.lang.Object
-
- org.apache.flink.connector.testframe.testsuites.SourceTestSuiteBase<T>
-
@ExtendWith({ConnectorTestingExtension.class,TestLoggerExtension.class,TestCaseInvocationContextProvider.class}) @TestInstance(PER_CLASS) @Experimental public abstract class SourceTestSuiteBase<T> extends Object
Base class for all test suites.All cases should have well-descriptive JavaDoc, including:
- What's the purpose of this case
- Simple description of how this case works
- Condition to fulfill in order to pass this case
- Requirement of running this case
-
-
Nested Class Summary
Nested Classes Modifier and Type Class Description protected static class
SourceTestSuiteBase.CollectIteratorBuilder<T>
Builder class for constructingCollectResultIterator
of collect sink.
-
Constructor Summary
Constructors Constructor Description SourceTestSuiteBase()
-
Method Summary
All Methods Instance Methods Concrete Methods Modifier and Type Method Description protected SourceTestSuiteBase.CollectIteratorBuilder<T>
addCollectSink(DataStream<T> stream)
Add a collect sink in the job.protected void
checkResultWithSemantic(CloseableIterator<T> resultIterator, List<List<T>> testData, CheckpointingMode semantic, Integer limit)
Compare the test data with the result.protected List<T>
generateAndWriteTestData(int splitIndex, DataStreamSourceExternalContext<T> externalContext, TestingSourceSettings testingSourceSettings)
Generate a set of test records and write it to the given split writer.protected List<T>
generateTestDataForWriter(DataStreamSourceExternalContext<T> externalContext, TestingSourceSettings sourceSettings, int splitIndex, ExternalSystemSplitDataWriter<T> writer)
Generate a set of split writers.protected int
getTestDataSize(List<List<T>> collections)
Get the size of test data.protected JobClient
submitJob(StreamExecutionEnvironment env, String jobName)
void
testIdleReader(TestEnvironment testEnv, DataStreamSourceExternalContext<T> externalContext, CheckpointingMode semantic)
Test connector source with an idle reader.void
testMultipleSplits(TestEnvironment testEnv, DataStreamSourceExternalContext<T> externalContext, CheckpointingMode semantic)
Test connector source with multiple splits in the external systemvoid
testSavepoint(TestEnvironment testEnv, DataStreamSourceExternalContext<T> externalContext, CheckpointingMode semantic)
Test connector source restart from a savepoint.void
testScaleDown(TestEnvironment testEnv, DataStreamSourceExternalContext<T> externalContext, CheckpointingMode semantic)
Test connector source restart from a savepoint with a lower parallelism.void
testScaleUp(TestEnvironment testEnv, DataStreamSourceExternalContext<T> externalContext, CheckpointingMode semantic)
Test connector source restart from a savepoint with a higher parallelism.void
testSourceMetrics(TestEnvironment testEnv, DataStreamSourceExternalContext<T> externalContext, CheckpointingMode semantic)
Test connector source metrics.void
testSourceSingleSplit(TestEnvironment testEnv, DataStreamSourceExternalContext<T> externalContext, CheckpointingMode semantic)
Test connector source with only one split in the external system.void
testTaskManagerFailure(TestEnvironment testEnv, DataStreamSourceExternalContext<T> externalContext, ClusterControllable controller, CheckpointingMode semantic)
Test connector source with task manager failover.protected Source<T,?,?>
tryCreateSource(DataStreamSourceExternalContext<T> externalContext, TestingSourceSettings sourceOptions)
-
-
-
Method Detail
-
testSourceSingleSplit
@TestTemplate @DisplayName("Test source with single split") public void testSourceSingleSplit(TestEnvironment testEnv, DataStreamSourceExternalContext<T> externalContext, CheckpointingMode semantic) throws Exception
Test connector source with only one split in the external system.This test will create one split in the external system, write test data into it, and consume back via a Flink job with 1 parallelism.
The number and order of records consumed by Flink need to be identical to the test data written to the external system in order to pass this test.
A bounded source is required for this test.
- Throws:
Exception
-
testMultipleSplits
@TestTemplate @DisplayName("Test source with multiple splits") public void testMultipleSplits(TestEnvironment testEnv, DataStreamSourceExternalContext<T> externalContext, CheckpointingMode semantic) throws Exception
Test connector source with multiple splits in the external systemThis test will create 4 splits in the external system, write test data to all splits, and consume back via a Flink job with 4 parallelism.
The number and order of records in each split consumed by Flink need to be identical to the test data written into the external system to pass this test. There's no requirement for record order across splits.
A bounded source is required for this test.
- Throws:
Exception
-
testSavepoint
@TestTemplate @DisplayName("Test source restarting from a savepoint") public void testSavepoint(TestEnvironment testEnv, DataStreamSourceExternalContext<T> externalContext, CheckpointingMode semantic) throws Exception
Test connector source restart from a savepoint.This test will create 4 splits in the external system first, write test data to all splits, and consume back via a Flink job. Then stop the job with savepoint, restart the job from the checkpoint. After the job has been running, add some extra data to the source and compare the result.
The number and order of records in each split consumed by Flink need to be identical to the test data written into the external system to pass this test. There's no requirement for record order across splits.
- Throws:
Exception
-
testScaleUp
@TestTemplate @DisplayName("Test source restarting with a higher parallelism") public void testScaleUp(TestEnvironment testEnv, DataStreamSourceExternalContext<T> externalContext, CheckpointingMode semantic) throws Exception
Test connector source restart from a savepoint with a higher parallelism.This test will create 4 splits in the external system first, write test data to all splits and consume back via a Flink job with parallelism 2. Then stop the job with savepoint, restart the job from the checkpoint with a higher parallelism 4. After the job has been running, add some extra data to the source and compare the result.
The number and order of records in each split consumed by Flink need to be identical to the test data written into the external system to pass this test. There's no requirement for record order across splits.
- Throws:
Exception
-
testScaleDown
@TestTemplate @DisplayName("Test source restarting with a lower parallelism") public void testScaleDown(TestEnvironment testEnv, DataStreamSourceExternalContext<T> externalContext, CheckpointingMode semantic) throws Exception
Test connector source restart from a savepoint with a lower parallelism.This test will create 4 splits in the external system first, write test data to all splits and consume back via a Flink job with parallelism 4. Then stop the job with savepoint, restart the job from the checkpoint with a lower parallelism 2. After the job has been running, add some extra data to the source and compare the result.
The number and order of records in each split consumed by Flink need to be identical to the test data written into the external system to pass this test. There's no requirement for record order across splits.
- Throws:
Exception
-
testSourceMetrics
@TestTemplate @DisplayName("Test source metrics") public void testSourceMetrics(TestEnvironment testEnv, DataStreamSourceExternalContext<T> externalContext, CheckpointingMode semantic) throws Exception
Test connector source metrics.This test will create 4 splits in the external system first, write test data to all splits and consume back via a Flink job with parallelism 4. Then read and compare the metrics.
Now test: numRecordsIn
- Throws:
Exception
-
testIdleReader
@TestTemplate @DisplayName("Test source with at least one idle parallelism") public void testIdleReader(TestEnvironment testEnv, DataStreamSourceExternalContext<T> externalContext, CheckpointingMode semantic) throws Exception
Test connector source with an idle reader.This test will create 4 split in the external system, write test data to all splits, and consume back via a Flink job with 5 parallelism, so at least one parallelism / source reader will be idle (assigned with no splits). If the split enumerator of the source doesn't signal NoMoreSplitsEvent to the idle source reader, the Flink job will never spin to FINISHED state.
The number and order of records in each split consumed by Flink need to be identical to the test data written into the external system to pass this test. There's no requirement for record order across splits.
A bounded source is required for this test.
- Throws:
Exception
-
testTaskManagerFailure
@TestTemplate @DisplayName("Test TaskManager failure") public void testTaskManagerFailure(TestEnvironment testEnv, DataStreamSourceExternalContext<T> externalContext, ClusterControllable controller, CheckpointingMode semantic) throws Exception
Test connector source with task manager failover.This test will create 1 split in the external system, write test record set A into the split, restart task manager to trigger job failover, write test record set B into the split, and terminate the Flink job finally.
The number and order of records consumed by Flink should be identical to A before the failover and B after the failover in order to pass the test.
An unbounded source is required for this test, since TaskManager failover will be triggered in the middle of the test.
- Throws:
Exception
-
generateAndWriteTestData
protected List<T> generateAndWriteTestData(int splitIndex, DataStreamSourceExternalContext<T> externalContext, TestingSourceSettings testingSourceSettings)
Generate a set of test records and write it to the given split writer.- Parameters:
externalContext
- External context- Returns:
- List of generated test records
-
tryCreateSource
protected Source<T,?,?> tryCreateSource(DataStreamSourceExternalContext<T> externalContext, TestingSourceSettings sourceOptions)
-
submitJob
protected JobClient submitJob(StreamExecutionEnvironment env, String jobName) throws Exception
- Throws:
Exception
-
addCollectSink
protected SourceTestSuiteBase.CollectIteratorBuilder<T> addCollectSink(DataStream<T> stream)
Add a collect sink in the job.
-
generateTestDataForWriter
protected List<T> generateTestDataForWriter(DataStreamSourceExternalContext<T> externalContext, TestingSourceSettings sourceSettings, int splitIndex, ExternalSystemSplitDataWriter<T> writer)
Generate a set of split writers.- Parameters:
externalContext
- External contextsplitIndex
- the split indexwriter
- the writer to send data- Returns:
- List of generated test records
-
getTestDataSize
protected int getTestDataSize(List<List<T>> collections)
Get the size of test data.- Parameters:
collections
- test data- Returns:
- the size of test data
-
checkResultWithSemantic
protected void checkResultWithSemantic(CloseableIterator<T> resultIterator, List<List<T>> testData, CheckpointingMode semantic, Integer limit)
Compare the test data with the result.If the source is bounded, limit should be null.
- Parameters:
resultIterator
- the data read from the jobtestData
- the test datasemantic
- the supported semantic, seeCheckpointingMode
limit
- expected number of the data to read from the job
-
-