@Experimental public class DataGeneratorSource<OUT> extends Object implements Source<OUT,NumberSequenceSource.NumberSequenceSplit,Collection<NumberSequenceSource.NumberSequenceSplit>>, ResultTypeQueryable<OUT>
The source splits the sequence into as many parallel sub-sequences as there are parallel source readers.
Users can supply a GeneratorFunction
for mapping the (sub-)sequences of Long values
into the generated events. For instance, the following code will produce the sequence of
["Number: 0", "Number: 1", ... , "Number: 999"] elements.
GeneratorFunction<Long, String> generatorFunction = index -> "Number: " + index;
DataGeneratorSource<String> source =
new DataGeneratorSource<>(generatorFunction, 1000, Types.STRING);
DataStreamSource<String> stream =
env.fromSource(source,
WatermarkStrategy.noWatermarks(),
"Generator Source");
The order of elements depends on the parallelism. Each sub-sequence will be produced in order. Consequently, if the parallelism is limited to one, this will produce one sequence in order from "Number: 0" to "Number: 999".
Note that this approach also makes it possible to produce deterministic watermarks at the
source based on the generated events and a custom WatermarkStrategy
.
This source has built-in support for rate limiting. The following code will produce an effectively unbounded (Long.MAX_VALUE from practical perspective will never be reached) stream of Long values at the overall source rate (across all source subtasks) of 100 events per second.
GeneratorFunction<Long, Long> generatorFunction = index -> index;
DataGeneratorSource<String> source =
new DataGeneratorSource<>(
generatorFunctionStateless,
Long.MAX_VALUE,
RateLimiterStrategy.perSecond(100),
Types.STRING);
This source is always bounded. For very long sequences (for example when the count
is
set to Long.MAX_VALUE), users may want to consider executing the application in a streaming
manner, because, despite the fact that the produced stream is bounded, the end bound is pretty
far away.
Constructor and Description |
---|
DataGeneratorSource(GeneratorFunction<Long,OUT> generatorFunction,
long count,
RateLimiterStrategy rateLimiterStrategy,
TypeInformation<OUT> typeInfo)
Instantiates a new
DataGeneratorSource . |
DataGeneratorSource(GeneratorFunction<Long,OUT> generatorFunction,
long count,
TypeInformation<OUT> typeInfo)
Instantiates a new
DataGeneratorSource . |
public DataGeneratorSource(GeneratorFunction<Long,OUT> generatorFunction, long count, TypeInformation<OUT> typeInfo)
DataGeneratorSource
.generatorFunction
- The GeneratorFunction
function.count
- The number of generated data points.typeInfo
- The type of the produced data points.public DataGeneratorSource(GeneratorFunction<Long,OUT> generatorFunction, long count, RateLimiterStrategy rateLimiterStrategy, TypeInformation<OUT> typeInfo)
DataGeneratorSource
.generatorFunction
- The GeneratorFunction
function.count
- The number of generated data points.rateLimiterStrategy
- The strategy for rate limiting.typeInfo
- The type of the produced data points.public TypeInformation<OUT> getProducedType()
ResultTypeQueryable
TypeInformation
) produced by this function or input format.getProducedType
in interface ResultTypeQueryable<OUT>
public Boundedness getBoundedness()
Source
getBoundedness
in interface Source<OUT,NumberSequenceSource.NumberSequenceSplit,Collection<NumberSequenceSource.NumberSequenceSplit>>
public SourceReader<OUT,NumberSequenceSource.NumberSequenceSplit> createReader(SourceReaderContext readerContext) throws Exception
SourceReaderFactory
createReader
in interface SourceReaderFactory<OUT,NumberSequenceSource.NumberSequenceSplit>
readerContext
- The context
for the source reader.Exception
- The implementor is free to forward all exceptions directly. Exceptions
thrown from this method cause task failure/recovery.public SplitEnumerator<NumberSequenceSource.NumberSequenceSplit,Collection<NumberSequenceSource.NumberSequenceSplit>> restoreEnumerator(SplitEnumeratorContext<NumberSequenceSource.NumberSequenceSplit> enumContext, Collection<NumberSequenceSource.NumberSequenceSplit> checkpoint)
Source
restoreEnumerator
in interface Source<OUT,NumberSequenceSource.NumberSequenceSplit,Collection<NumberSequenceSource.NumberSequenceSplit>>
enumContext
- The context
for the restored split
enumerator.checkpoint
- The checkpoint to restore the SplitEnumerator from.public SplitEnumerator<NumberSequenceSource.NumberSequenceSplit,Collection<NumberSequenceSource.NumberSequenceSplit>> createEnumerator(SplitEnumeratorContext<NumberSequenceSource.NumberSequenceSplit> enumContext)
Source
createEnumerator
in interface Source<OUT,NumberSequenceSource.NumberSequenceSplit,Collection<NumberSequenceSource.NumberSequenceSplit>>
enumContext
- The context
for the split enumerator.public SimpleVersionedSerializer<NumberSequenceSource.NumberSequenceSplit> getSplitSerializer()
Source
getSplitSerializer
in interface Source<OUT,NumberSequenceSource.NumberSequenceSplit,Collection<NumberSequenceSource.NumberSequenceSplit>>
public SimpleVersionedSerializer<Collection<NumberSequenceSource.NumberSequenceSplit>> getEnumeratorCheckpointSerializer()
Source
SplitEnumerator
checkpoint. The serializer is used for
the result of the SplitEnumerator.snapshotState(long)
method.getEnumeratorCheckpointSerializer
in interface Source<OUT,NumberSequenceSource.NumberSequenceSplit,Collection<NumberSequenceSource.NumberSequenceSplit>>
Copyright © 2014–2023 The Apache Software Foundation. All rights reserved.