@ExtendWith(value={ConnectorTestingExtension.class,TestLoggerExtension.class,TestCaseInvocationContextProvider.class}) @TestInstance(value=PER_CLASS) @Experimental public abstract class SourceTestSuiteBase<T> extends Object
All cases should have well-descriptive JavaDoc, including:
Modifier and Type | Class and Description |
---|---|
protected static class |
SourceTestSuiteBase.CollectIteratorBuilder<T>
Builder class for constructing
CollectResultIterator of collect sink. |
Constructor and Description |
---|
SourceTestSuiteBase() |
Modifier and Type | Method and Description |
---|---|
protected SourceTestSuiteBase.CollectIteratorBuilder<T> |
addCollectSink(DataStream<T> stream)
Add a collect sink in the job.
|
protected void |
checkResultWithSemantic(CloseableIterator<T> resultIterator,
List<List<T>> testData,
CheckpointingMode semantic,
Integer limit)
Compare the test data with the result.
|
protected List<T> |
generateAndWriteTestData(int splitIndex,
DataStreamSourceExternalContext<T> externalContext,
TestingSourceSettings testingSourceSettings)
Generate a set of test records and write it to the given split writer.
|
protected List<T> |
generateTestDataForWriter(DataStreamSourceExternalContext<T> externalContext,
TestingSourceSettings sourceSettings,
int splitIndex,
ExternalSystemSplitDataWriter<T> writer)
Generate a set of split writers.
|
protected int |
getTestDataSize(List<List<T>> collections)
Get the size of test data.
|
protected JobClient |
submitJob(StreamExecutionEnvironment env,
String jobName) |
void |
testIdleReader(TestEnvironment testEnv,
DataStreamSourceExternalContext<T> externalContext,
CheckpointingMode semantic)
Test connector source with an idle reader.
|
void |
testMultipleSplits(TestEnvironment testEnv,
DataStreamSourceExternalContext<T> externalContext,
CheckpointingMode semantic)
Test connector source with multiple splits in the external system
This test will create 4 splits in the external system, write test data to all splits, and
consume back via a Flink job with 4 parallelism.
|
void |
testSavepoint(TestEnvironment testEnv,
DataStreamSourceExternalContext<T> externalContext,
CheckpointingMode semantic)
Test connector source restart from a savepoint.
|
void |
testScaleDown(TestEnvironment testEnv,
DataStreamSourceExternalContext<T> externalContext,
CheckpointingMode semantic)
Test connector source restart from a savepoint with a lower parallelism.
|
void |
testScaleUp(TestEnvironment testEnv,
DataStreamSourceExternalContext<T> externalContext,
CheckpointingMode semantic)
Test connector source restart from a savepoint with a higher parallelism.
|
void |
testSourceMetrics(TestEnvironment testEnv,
DataStreamSourceExternalContext<T> externalContext,
CheckpointingMode semantic)
Test connector source metrics.
|
void |
testSourceSingleSplit(TestEnvironment testEnv,
DataStreamSourceExternalContext<T> externalContext,
CheckpointingMode semantic)
Test connector source with only one split in the external system.
|
void |
testTaskManagerFailure(TestEnvironment testEnv,
DataStreamSourceExternalContext<T> externalContext,
ClusterControllable controller,
CheckpointingMode semantic)
Test connector source with task manager failover.
|
protected Source<T,?,?> |
tryCreateSource(DataStreamSourceExternalContext<T> externalContext,
TestingSourceSettings sourceOptions) |
@TestTemplate @DisplayName(value="Test source with single split") public void testSourceSingleSplit(TestEnvironment testEnv, DataStreamSourceExternalContext<T> externalContext, CheckpointingMode semantic) throws Exception
This test will create one split in the external system, write test data into it, and consume back via a Flink job with 1 parallelism.
The number and order of records consumed by Flink need to be identical to the test data written to the external system in order to pass this test.
A bounded source is required for this test.
Exception
@TestTemplate @DisplayName(value="Test source with multiple splits") public void testMultipleSplits(TestEnvironment testEnv, DataStreamSourceExternalContext<T> externalContext, CheckpointingMode semantic) throws Exception
This test will create 4 splits in the external system, write test data to all splits, and consume back via a Flink job with 4 parallelism.
The number and order of records in each split consumed by Flink need to be identical to the test data written into the external system to pass this test. There's no requirement for record order across splits.
A bounded source is required for this test.
Exception
@TestTemplate @DisplayName(value="Test source restarting from a savepoint") public void testSavepoint(TestEnvironment testEnv, DataStreamSourceExternalContext<T> externalContext, CheckpointingMode semantic) throws Exception
This test will create 4 splits in the external system first, write test data to all splits, and consume back via a Flink job. Then stop the job with savepoint, restart the job from the checkpoint. After the job has been running, add some extra data to the source and compare the result.
The number and order of records in each split consumed by Flink need to be identical to the test data written into the external system to pass this test. There's no requirement for record order across splits.
Exception
@TestTemplate @DisplayName(value="Test source restarting with a higher parallelism") public void testScaleUp(TestEnvironment testEnv, DataStreamSourceExternalContext<T> externalContext, CheckpointingMode semantic) throws Exception
This test will create 4 splits in the external system first, write test data to all splits and consume back via a Flink job with parallelism 2. Then stop the job with savepoint, restart the job from the checkpoint with a higher parallelism 4. After the job has been running, add some extra data to the source and compare the result.
The number and order of records in each split consumed by Flink need to be identical to the test data written into the external system to pass this test. There's no requirement for record order across splits.
Exception
@TestTemplate @DisplayName(value="Test source restarting with a lower parallelism") public void testScaleDown(TestEnvironment testEnv, DataStreamSourceExternalContext<T> externalContext, CheckpointingMode semantic) throws Exception
This test will create 4 splits in the external system first, write test data to all splits and consume back via a Flink job with parallelism 4. Then stop the job with savepoint, restart the job from the checkpoint with a lower parallelism 2. After the job has been running, add some extra data to the source and compare the result.
The number and order of records in each split consumed by Flink need to be identical to the test data written into the external system to pass this test. There's no requirement for record order across splits.
Exception
@TestTemplate @DisplayName(value="Test source metrics") public void testSourceMetrics(TestEnvironment testEnv, DataStreamSourceExternalContext<T> externalContext, CheckpointingMode semantic) throws Exception
This test will create 4 splits in the external system first, write test data to all splits and consume back via a Flink job with parallelism 4. Then read and compare the metrics.
Now test: numRecordsIn
Exception
@TestTemplate @DisplayName(value="Test source with at least one idle parallelism") public void testIdleReader(TestEnvironment testEnv, DataStreamSourceExternalContext<T> externalContext, CheckpointingMode semantic) throws Exception
This test will create 4 split in the external system, write test data to all splits, and consume back via a Flink job with 5 parallelism, so at least one parallelism / source reader will be idle (assigned with no splits). If the split enumerator of the source doesn't signal NoMoreSplitsEvent to the idle source reader, the Flink job will never spin to FINISHED state.
The number and order of records in each split consumed by Flink need to be identical to the test data written into the external system to pass this test. There's no requirement for record order across splits.
A bounded source is required for this test.
Exception
@TestTemplate @DisplayName(value="Test TaskManager failure") public void testTaskManagerFailure(TestEnvironment testEnv, DataStreamSourceExternalContext<T> externalContext, ClusterControllable controller, CheckpointingMode semantic) throws Exception
This test will create 1 split in the external system, write test record set A into the split, restart task manager to trigger job failover, write test record set B into the split, and terminate the Flink job finally.
The number and order of records consumed by Flink should be identical to A before the failover and B after the failover in order to pass the test.
An unbounded source is required for this test, since TaskManager failover will be triggered in the middle of the test.
Exception
protected List<T> generateAndWriteTestData(int splitIndex, DataStreamSourceExternalContext<T> externalContext, TestingSourceSettings testingSourceSettings)
externalContext
- External contextprotected Source<T,?,?> tryCreateSource(DataStreamSourceExternalContext<T> externalContext, TestingSourceSettings sourceOptions)
protected JobClient submitJob(StreamExecutionEnvironment env, String jobName) throws Exception
Exception
protected SourceTestSuiteBase.CollectIteratorBuilder<T> addCollectSink(DataStream<T> stream)
protected List<T> generateTestDataForWriter(DataStreamSourceExternalContext<T> externalContext, TestingSourceSettings sourceSettings, int splitIndex, ExternalSystemSplitDataWriter<T> writer)
externalContext
- External contextsplitIndex
- the split indexwriter
- the writer to send dataprotected int getTestDataSize(List<List<T>> collections)
collections
- test dataprotected void checkResultWithSemantic(CloseableIterator<T> resultIterator, List<List<T>> testData, CheckpointingMode semantic, Integer limit)
If the source is bounded, limit should be null.
resultIterator
- the data read from the jobtestData
- the test datasemantic
- the supported semantic, see CheckpointingMode
limit
- expected number of the data to read from the jobCopyright © 2014–2024 The Apache Software Foundation. All rights reserved.