@ExtendWith(value={ConnectorTestingExtension.class,TestLoggerExtension.class,TestCaseInvocationContextProvider.class}) @TestInstance(value=PER_CLASS) @Experimental public abstract class SinkTestSuiteBase<T extends Comparable<T>> extends Object
All cases should have well-descriptive JavaDoc, including:
Constructor and Description |
---|
SinkTestSuiteBase() |
Modifier and Type | Method and Description |
---|---|
protected CollectResultIterator<T> |
addCollectSink(DataStream<T> stream) |
protected void |
checkResultWithSemantic(ExternalSystemDataReader<T> reader,
List<T> testData,
CheckpointingMode semantic)
Compare the test data with actual data in given semantic.
|
protected List<T> |
generateTestData(TestingSinkSettings testingSinkSettings,
DataStreamSinkExternalContext<T> externalContext)
Generate a set of test records.
|
void |
testBasicSink(TestEnvironment testEnv,
DataStreamSinkExternalContext<T> externalContext,
CheckpointingMode semantic)
Test DataStream connector sink.
|
void |
testMetrics(TestEnvironment testEnv,
DataStreamSinkExternalContext<T> externalContext,
CheckpointingMode semantic)
Test connector sink metrics.
|
void |
testScaleDown(TestEnvironment testEnv,
DataStreamSinkExternalContext<T> externalContext,
CheckpointingMode semantic)
Test connector sink restart from a completed savepoint with a lower parallelism.
|
void |
testScaleUp(TestEnvironment testEnv,
DataStreamSinkExternalContext<T> externalContext,
CheckpointingMode semantic)
Test connector sink restart from a completed savepoint with a higher parallelism.
|
void |
testStartFromSavepoint(TestEnvironment testEnv,
DataStreamSinkExternalContext<T> externalContext,
CheckpointingMode semantic)
Test connector sink restart from a completed savepoint with the same parallelism.
|
@TestTemplate @DisplayName(value="Test data stream sink") public void testBasicSink(TestEnvironment testEnv, DataStreamSinkExternalContext<T> externalContext, CheckpointingMode semantic) throws Exception
The following tests will create a sink in the external system, generate a collection of test data and write them to this sink by the Flink Job.
In order to pass these tests, the number of records produced by Flink need to be equals to the generated test data. And the records in the sink will be compared to the test data by the different semantics. There's no requirement for records order.
Exception
@TestTemplate @DisplayName(value="Test sink restarting from a savepoint") public void testStartFromSavepoint(TestEnvironment testEnv, DataStreamSinkExternalContext<T> externalContext, CheckpointingMode semantic) throws Exception
This test will create a sink in the external system, generate a collection of test data and write a half part of them to this sink by the Flink Job with parallelism 2 at first. Then stop the job, restart the same job from the completed savepoint. After the job has been running, write the other part to the sink and compare the result.
In order to pass this test, the number of records produced by Flink need to be equals to the generated test data. And the records in the sink will be compared to the test data by the different semantic. There's no requirement for record order.
Exception
@TestTemplate @DisplayName(value="Test sink restarting with a higher parallelism") public void testScaleUp(TestEnvironment testEnv, DataStreamSinkExternalContext<T> externalContext, CheckpointingMode semantic) throws Exception
This test will create a sink in the external system, generate a collection of test data and write a half part of them to this sink by the Flink Job with parallelism 2 at first. Then stop the job, restart the same job from the completed savepoint with a higher parallelism 4. After the job has been running, write the other part to the sink and compare the result.
In order to pass this test, the number of records produced by Flink need to be equals to the generated test data. And the records in the sink will be compared to the test data by the different semantic. There's no requirement for record order.
Exception
@TestTemplate @DisplayName(value="Test sink restarting with a lower parallelism") public void testScaleDown(TestEnvironment testEnv, DataStreamSinkExternalContext<T> externalContext, CheckpointingMode semantic) throws Exception
This test will create a sink in the external system, generate a collection of test data and write a half part of them to this sink by the Flink Job with parallelism 4 at first. Then stop the job, restart the same job from the completed savepoint with a lower parallelism 2. After the job has been running, write the other part to the sink and compare the result.
In order to pass this test, the number of records produced by Flink need to be equals to the generated test data. And the records in the sink will be compared to the test data by the different semantic. There's no requirement for record order.
Exception
@TestTemplate @DisplayName(value="Test sink metrics") public void testMetrics(TestEnvironment testEnv, DataStreamSinkExternalContext<T> externalContext, CheckpointingMode semantic) throws Exception
This test will create a sink in the external system, generate test data and write them to the sink via a Flink job. Then read and compare the metrics.
Now test: numRecordsOut
Exception
protected List<T> generateTestData(TestingSinkSettings testingSinkSettings, DataStreamSinkExternalContext<T> externalContext)
testingSinkSettings
- sink settingsexternalContext
- External contextprotected void checkResultWithSemantic(ExternalSystemDataReader<T> reader, List<T> testData, CheckpointingMode semantic) throws Exception
reader
- the data reader for the sinktestData
- the test datasemantic
- the supported semantic, see CheckpointingMode
Exception
protected CollectResultIterator<T> addCollectSink(DataStream<T> stream)
Copyright © 2014–2024 The Apache Software Foundation. All rights reserved.