Class SinkTestSuiteBase<T extends Comparable<T>>
- java.lang.Object
-
- org.apache.flink.connector.testframe.testsuites.SinkTestSuiteBase<T>
-
@ExtendWith({ConnectorTestingExtension.class,TestLoggerExtension.class,TestCaseInvocationContextProvider.class}) @TestInstance(PER_CLASS) @Experimental public abstract class SinkTestSuiteBase<T extends Comparable<T>> extends Object
Base class for sink test suite.All cases should have well-descriptive JavaDoc, including:
- What's the purpose of this case
- Simple description of how this case works
- Condition to fulfill in order to pass this case
- Requirement of running this case
-
-
Constructor Summary
Constructors Constructor Description SinkTestSuiteBase()
-
Method Summary
All Methods Instance Methods Concrete Methods Deprecated Methods Modifier and Type Method Description protected CollectResultIterator<T>
addCollectSink(DataStream<T> stream)
protected void
checkResultWithSemantic(ExternalSystemDataReader<T> reader, List<T> testData, CheckpointingMode semantic)
Compare the test data with actual data in given semantic.protected void
checkResultWithSemantic(ExternalSystemDataReader<T> reader, List<T> testData, CheckpointingMode semantic)
Deprecated.protected List<T>
generateTestData(TestingSinkSettings testingSinkSettings, DataStreamSinkExternalContext<T> externalContext)
Generate a set of test records.void
testBasicSink(TestEnvironment testEnv, DataStreamSinkExternalContext<T> externalContext, CheckpointingMode semantic)
Test DataStream connector sink.void
testMetrics(TestEnvironment testEnv, DataStreamSinkExternalContext<T> externalContext, CheckpointingMode semantic)
Test connector sink metrics.void
testScaleDown(TestEnvironment testEnv, DataStreamSinkExternalContext<T> externalContext, CheckpointingMode semantic)
Test connector sink restart from a completed savepoint with a lower parallelism.void
testScaleUp(TestEnvironment testEnv, DataStreamSinkExternalContext<T> externalContext, CheckpointingMode semantic)
Test connector sink restart from a completed savepoint with a higher parallelism.void
testStartFromSavepoint(TestEnvironment testEnv, DataStreamSinkExternalContext<T> externalContext, CheckpointingMode semantic)
Test connector sink restart from a completed savepoint with the same parallelism.
-
-
-
Method Detail
-
testBasicSink
@TestTemplate @DisplayName("Test data stream sink") public void testBasicSink(TestEnvironment testEnv, DataStreamSinkExternalContext<T> externalContext, CheckpointingMode semantic) throws Exception
Test DataStream connector sink.The following tests will create a sink in the external system, generate a collection of test data and write them to this sink by the Flink Job.
In order to pass these tests, the number of records produced by Flink need to be equals to the generated test data. And the records in the sink will be compared to the test data by the different semantics. There's no requirement for records order.
- Throws:
Exception
-
testStartFromSavepoint
@TestTemplate @DisplayName("Test sink restarting from a savepoint") public void testStartFromSavepoint(TestEnvironment testEnv, DataStreamSinkExternalContext<T> externalContext, CheckpointingMode semantic) throws Exception
Test connector sink restart from a completed savepoint with the same parallelism.This test will create a sink in the external system, generate a collection of test data and write a half part of them to this sink by the Flink Job with parallelism 2 at first. Then stop the job, restart the same job from the completed savepoint. After the job has been running, write the other part to the sink and compare the result.
In order to pass this test, the number of records produced by Flink need to be equals to the generated test data. And the records in the sink will be compared to the test data by the different semantic. There's no requirement for record order.
- Throws:
Exception
-
testScaleUp
@TestTemplate @DisplayName("Test sink restarting with a higher parallelism") public void testScaleUp(TestEnvironment testEnv, DataStreamSinkExternalContext<T> externalContext, CheckpointingMode semantic) throws Exception
Test connector sink restart from a completed savepoint with a higher parallelism.This test will create a sink in the external system, generate a collection of test data and write a half part of them to this sink by the Flink Job with parallelism 2 at first. Then stop the job, restart the same job from the completed savepoint with a higher parallelism 4. After the job has been running, write the other part to the sink and compare the result.
In order to pass this test, the number of records produced by Flink need to be equals to the generated test data. And the records in the sink will be compared to the test data by the different semantic. There's no requirement for record order.
- Throws:
Exception
-
testScaleDown
@TestTemplate @DisplayName("Test sink restarting with a lower parallelism") public void testScaleDown(TestEnvironment testEnv, DataStreamSinkExternalContext<T> externalContext, CheckpointingMode semantic) throws Exception
Test connector sink restart from a completed savepoint with a lower parallelism.This test will create a sink in the external system, generate a collection of test data and write a half part of them to this sink by the Flink Job with parallelism 4 at first. Then stop the job, restart the same job from the completed savepoint with a lower parallelism 2. After the job has been running, write the other part to the sink and compare the result.
In order to pass this test, the number of records produced by Flink need to be equals to the generated test data. And the records in the sink will be compared to the test data by the different semantic. There's no requirement for record order.
- Throws:
Exception
-
testMetrics
@TestTemplate @DisplayName("Test sink metrics") public void testMetrics(TestEnvironment testEnv, DataStreamSinkExternalContext<T> externalContext, CheckpointingMode semantic) throws Exception
Test connector sink metrics.This test will create a sink in the external system, generate test data and write them to the sink via a Flink job. Then read and compare the metrics.
Now test: numRecordsOut
- Throws:
Exception
-
generateTestData
protected List<T> generateTestData(TestingSinkSettings testingSinkSettings, DataStreamSinkExternalContext<T> externalContext)
Generate a set of test records.- Parameters:
testingSinkSettings
- sink settingsexternalContext
- External context- Returns:
- Collection of generated test records
-
checkResultWithSemantic
protected void checkResultWithSemantic(ExternalSystemDataReader<T> reader, List<T> testData, CheckpointingMode semantic) throws Exception
Compare the test data with actual data in given semantic.- Parameters:
reader
- the data reader for the sinktestData
- the test datasemantic
- the supported semantic, seeCheckpointingMode
- Throws:
Exception
-
checkResultWithSemantic
@Deprecated protected void checkResultWithSemantic(ExternalSystemDataReader<T> reader, List<T> testData, CheckpointingMode semantic) throws Exception
Deprecated.This method is required for downstream projects e.g. Flink connectors extending this test for the case when there should be supported Flink versions below 1.20. Could be removed together with dropping support for Flink 1.19.- Throws:
Exception
-
addCollectSink
protected CollectResultIterator<T> addCollectSink(DataStream<T> stream)
-
-