Class HybridSource<T>
- java.lang.Object
-
- org.apache.flink.connector.base.source.hybrid.HybridSource<T>
-
- All Implemented Interfaces:
Serializable
,Source<T,HybridSourceSplit,HybridSourceEnumeratorState>
,SourceReaderFactory<T,HybridSourceSplit>
@PublicEvolving public class HybridSource<T> extends Object implements Source<T,HybridSourceSplit,HybridSourceEnumeratorState>
Hybrid source that switches underlying sources based on configured source chain.A simple example with FileSource and KafkaSource with fixed Kafka start position:
FileSource<String> fileSource = FileSource.forRecordStreamFormat(new TextLineInputFormat(), Path.fromLocalFile(testDir)).build(); KafkaSource<String> kafkaSource = KafkaSource.<String>builder() .setBootstrapServers("localhost:9092") .setGroupId("MyGroup") .setTopics(Arrays.asList("quickstart-events")) .setDeserializer( KafkaRecordDeserializer.valueOnly(StringDeserializer.class)) .setStartingOffsets(OffsetsInitializer.earliest()) .build(); HybridSource<String> hybridSource = HybridSource.builder(fileSource) .addSource(kafkaSource) .build();
A more complex example with Kafka start position derived from previous source:
HybridSource<String> hybridSource = HybridSource.<String, StaticFileSplitEnumerator>builder(fileSource) .addSource( switchContext -> { StaticFileSplitEnumerator previousEnumerator = switchContext.getPreviousEnumerator(); // how to get timestamp depends on specific enumerator long timestamp = previousEnumerator.getEndTimestamp(); OffsetsInitializer offsets = OffsetsInitializer.timestamp(timestamp); KafkaSource<String> kafkaSource = KafkaSource.<String>builder() .setBootstrapServers("localhost:9092") .setGroupId("MyGroup") .setTopics(Arrays.asList("quickstart-events")) .setDeserializer( KafkaRecordDeserializer.valueOnly(StringDeserializer.class)) .setStartingOffsets(offsets) .build(); return kafkaSource; }, Boundedness.CONTINUOUS_UNBOUNDED) .build();
- See Also:
- Serialized Form
-
-
Nested Class Summary
Nested Classes Modifier and Type Class Description static class
HybridSource.HybridSourceBuilder<T,EnumT extends SplitEnumerator>
Builder for HybridSource.static interface
HybridSource.SourceFactory<T,SourceT extends Source<T,?,?>,FromEnumT extends SplitEnumerator>
Factory for underlying sources ofHybridSource
.static interface
HybridSource.SourceSwitchContext<EnumT>
Context provided to source factory.
-
Constructor Summary
Constructors Modifier Constructor Description protected
HybridSource(List<org.apache.flink.connector.base.source.hybrid.HybridSource.SourceListEntry> sources)
Protected for subclass, usebuilder(Source)
to construct source.
-
Method Summary
-
-
-
Constructor Detail
-
HybridSource
protected HybridSource(List<org.apache.flink.connector.base.source.hybrid.HybridSource.SourceListEntry> sources)
Protected for subclass, usebuilder(Source)
to construct source.
-
-
Method Detail
-
builder
public static <T,EnumT extends SplitEnumerator> HybridSource.HybridSourceBuilder<T,EnumT> builder(Source<T,?,?> firstSource)
Builder forHybridSource
.
-
getBoundedness
public Boundedness getBoundedness()
Description copied from interface:Source
Get the boundedness of this source.- Specified by:
getBoundedness
in interfaceSource<T,HybridSourceSplit,HybridSourceEnumeratorState>
- Returns:
- the boundedness of this source.
-
createReader
@Internal public SourceReader<T,HybridSourceSplit> createReader(SourceReaderContext readerContext) throws Exception
Description copied from interface:SourceReaderFactory
Creates a new reader to read data from the splits it gets assigned. The reader starts fresh and does not have any state to resume.- Specified by:
createReader
in interfaceSourceReaderFactory<T,HybridSourceSplit>
- Parameters:
readerContext
- Thecontext
for the source reader.- Returns:
- A new SourceReader.
- Throws:
Exception
- The implementor is free to forward all exceptions directly. Exceptions thrown from this method cause task failure/recovery.
-
createEnumerator
@Internal public SplitEnumerator<HybridSourceSplit,HybridSourceEnumeratorState> createEnumerator(SplitEnumeratorContext<HybridSourceSplit> enumContext)
Description copied from interface:Source
Creates a new SplitEnumerator for this source, starting a new input.- Specified by:
createEnumerator
in interfaceSource<T,HybridSourceSplit,HybridSourceEnumeratorState>
- Parameters:
enumContext
- Thecontext
for the split enumerator.- Returns:
- A new SplitEnumerator.
-
restoreEnumerator
@Internal public SplitEnumerator<HybridSourceSplit,HybridSourceEnumeratorState> restoreEnumerator(SplitEnumeratorContext<HybridSourceSplit> enumContext, HybridSourceEnumeratorState checkpoint) throws Exception
Description copied from interface:Source
Restores an enumerator from a checkpoint.- Specified by:
restoreEnumerator
in interfaceSource<T,HybridSourceSplit,HybridSourceEnumeratorState>
- Parameters:
enumContext
- Thecontext
for the restored split enumerator.checkpoint
- The checkpoint to restore the SplitEnumerator from.- Returns:
- A SplitEnumerator restored from the given checkpoint.
- Throws:
Exception
- The implementor is free to forward all exceptions directly. Exceptions thrown from this method cause JobManager failure/recovery.
-
getSplitSerializer
@Internal public SimpleVersionedSerializer<HybridSourceSplit> getSplitSerializer()
Description copied from interface:Source
Creates a serializer for the source splits. Splits are serialized when sending them from enumerator to reader, and when checkpointing the reader's current state.- Specified by:
getSplitSerializer
in interfaceSource<T,HybridSourceSplit,HybridSourceEnumeratorState>
- Returns:
- The serializer for the split type.
-
getEnumeratorCheckpointSerializer
@Internal public SimpleVersionedSerializer<HybridSourceEnumeratorState> getEnumeratorCheckpointSerializer()
Description copied from interface:Source
Creates the serializer for theSplitEnumerator
checkpoint. The serializer is used for the result of theSplitEnumerator.snapshotState(long)
method.- Specified by:
getEnumeratorCheckpointSerializer
in interfaceSource<T,HybridSourceSplit,HybridSourceEnumeratorState>
- Returns:
- The serializer for the SplitEnumerator checkpoint.
-
-