Class HybridSource<T>

  • All Implemented Interfaces:
    Serializable, Source<T,​HybridSourceSplit,​HybridSourceEnumeratorState>, SourceReaderFactory<T,​HybridSourceSplit>

    @PublicEvolving
    public class HybridSource<T>
    extends Object
    implements Source<T,​HybridSourceSplit,​HybridSourceEnumeratorState>
    Hybrid source that switches underlying sources based on configured source chain.

    A simple example with FileSource and KafkaSource with fixed Kafka start position:

    
     FileSource<String> fileSource =
       FileSource.forRecordStreamFormat(new TextLineInputFormat(), Path.fromLocalFile(testDir)).build();
     KafkaSource<String> kafkaSource =
               KafkaSource.<String>builder()
                       .setBootstrapServers("localhost:9092")
                       .setGroupId("MyGroup")
                       .setTopics(Arrays.asList("quickstart-events"))
                       .setDeserializer(
                               KafkaRecordDeserializer.valueOnly(StringDeserializer.class))
                       .setStartingOffsets(OffsetsInitializer.earliest())
                       .build();
     HybridSource<String> hybridSource =
               HybridSource.builder(fileSource)
                       .addSource(kafkaSource)
                       .build();
     

    A more complex example with Kafka start position derived from previous source:

    
     HybridSource<String> hybridSource =
         HybridSource.<String, StaticFileSplitEnumerator>builder(fileSource)
             .addSource(
                 switchContext -> {
                   StaticFileSplitEnumerator previousEnumerator =
                       switchContext.getPreviousEnumerator();
                   // how to get timestamp depends on specific enumerator
                   long timestamp = previousEnumerator.getEndTimestamp();
                   OffsetsInitializer offsets =
                       OffsetsInitializer.timestamp(timestamp);
                   KafkaSource<String> kafkaSource =
                       KafkaSource.<String>builder()
                           .setBootstrapServers("localhost:9092")
                           .setGroupId("MyGroup")
                           .setTopics(Arrays.asList("quickstart-events"))
                           .setDeserializer(
                               KafkaRecordDeserializer.valueOnly(StringDeserializer.class))
                           .setStartingOffsets(offsets)
                           .build();
                   return kafkaSource;
                 },
                 Boundedness.CONTINUOUS_UNBOUNDED)
             .build();
     
    See Also:
    Serialized Form