Interface StreamFormat<T>
-
- Type Parameters:
T
- The type of records created by this format reader.
- All Superinterfaces:
ResultTypeQueryable<T>
,Serializable
- All Known Implementing Classes:
CsvReaderFormat
,SimpleStreamFormat
,TextLineInputFormat
@PublicEvolving public interface StreamFormat<T> extends Serializable, ResultTypeQueryable<T>
A reader format that reads individual records from a stream.The outer class
StreamFormat
acts mainly as a configuration holder and factory for the reader. The actual reading is done by theStreamFormat.Reader
, which is created based on an input stream in thecreateReader(Configuration, FSDataInputStream, long, long)
method and restored (from checkpointed positions) in the methodrestoreReader(Configuration, FSDataInputStream, long, long, long)
.Compared to the
BulkFormat
, the stream format handles a few things out-of-the-box, like deciding how to batch records or dealing with compression.For a simpler version of this interface, for format that do not support splitting or logical record offsets during checkpointing, see
SimpleStreamFormat
.Splitting
File splitting means dividing a file into multiple regions that can be read independently. Whether a format supports splitting is indicated via the
isSplittable()
method.Splitting has the potential to increase parallelism and performance, but poses additional constraints on the format readers: Readers need to be able to find a consistent starting point within the file near the offset where the split starts, (like the next record delimiter, or a block start or a sync marker). This is not necessarily possible for all formats, which is why splitting is optional.
Checkpointing
Readers can optionally return the current position of the reader, via the
StreamFormat.Reader.getCheckpointedPosition()
. This can improve recovery speed from a checkpoint.By default (if that method is not overridden or returns null), then recovery from a checkpoint works by reading the split again and skipping the number of records that were processed before the checkpoint. Implementing this method allows formats to directly seek to that position, rather than read and discard a number or records.
The position is a combination of offset in the file and a number of records to skip after this offset (see
CheckpointedPosition
). This helps formats that cannot describe all record positions by an offset, for example because records are compressed in batches or stored in a columnar layout (e.g., ORC, Parquet). The default behavior can be viewed as returning aCheckpointedPosition
where the offset is always zero and only theCheckpointedPosition.getRecordsAfterOffset()
is incremented with each emitted record.Serializable
Like many other API classes in Flink, the outer class is serializable to support sending instances to distributed workers for parallel execution. This is purely short-term serialization for RPC and no instance of this will be long-term persisted in a serialized form.
Record Batching
Internally in the file source, the readers pass batches of records from the reading threads (that perform the typically blocking I/O operations) to the async mailbox threads that do the streaming and batch data processing. Passing records in batches (rather than one-at-a-time) much reduces the thread-to-thread handover overhead.
This batching is by default based on I/O fetch size for the
StreamFormat
, meaning the set of records derived from one I/O buffer will be handed over as one. SeeFETCH_IO_SIZE
to configure that fetch size.
-
-
Nested Class Summary
Nested Classes Modifier and Type Interface Description static interface
StreamFormat.Reader<T>
The actual reader that reads the records.
-
Field Summary
Fields Modifier and Type Field Description static ConfigOption<MemorySize>
FETCH_IO_SIZE
The config option to define how many bytes to be read by the I/O thread in one fetch operation.
-
Method Summary
All Methods Instance Methods Abstract Methods Modifier and Type Method Description StreamFormat.Reader<T>
createReader(Configuration config, FSDataInputStream stream, long fileLen, long splitEnd)
Creates a new reader to read in this format.TypeInformation<T>
getProducedType()
Gets the type produced by this format.boolean
isSplittable()
Checks whether this format is splittable.StreamFormat.Reader<T>
restoreReader(Configuration config, FSDataInputStream stream, long restoredOffset, long fileLen, long splitEnd)
Restores a reader from a checkpointed position.
-
-
-
Field Detail
-
FETCH_IO_SIZE
static final ConfigOption<MemorySize> FETCH_IO_SIZE
The config option to define how many bytes to be read by the I/O thread in one fetch operation.
-
-
Method Detail
-
createReader
StreamFormat.Reader<T> createReader(Configuration config, FSDataInputStream stream, long fileLen, long splitEnd) throws IOException
Creates a new reader to read in this format. This method is called when a fresh reader is created for a split that was assigned from the enumerator. This method may also be called on recovery from a checkpoint, if the reader never stored an offset in the checkpoint (seerestoreReader(Configuration, FSDataInputStream, long, long, long)
for details.If the format is
splittable
, then thestream
is positioned to the beginning of the file split, otherwise it will be at position zero.The
fileLen
is the length of the entire file, whilesplitEnd
is the offset of the first byte after the split end boundary (exclusive end boundary). For non-splittable formats, both values are identical.- Throws:
IOException
-
restoreReader
StreamFormat.Reader<T> restoreReader(Configuration config, FSDataInputStream stream, long restoredOffset, long fileLen, long splitEnd) throws IOException
Restores a reader from a checkpointed position. This method is called when the reader is recovered from a checkpoint and the reader has previously stored an offset into the checkpoint, by returning from theStreamFormat.Reader.getCheckpointedPosition()
a value with non-negativeoffset
. That value is supplied as therestoredOffset
.If the format is
splittable
, then thestream
is positioned to the beginning of the file split, otherwise it will be at position zero. The stream is NOT positioned to the checkpointed offset, because the format is free to interpret this offset in a different way than the byte offset in the file (for example as a record index).If the reader never produced a
CheckpointedPosition
with a non-negative offset before, then this method is not called, and the reader is created in the same way as a fresh reader via the methodcreateReader(Configuration, FSDataInputStream, long, long)
and the appropriate number of records are read and discarded, to position to reader to the checkpointed position.Having a different method for restoring readers to a checkpointed position allows readers to seek to the start position differently in that case, compared to when the reader is created from a split offset generated at the enumerator. In the latter case, the offsets are commonly "approximate", because the enumerator typically generates splits based only on metadata. Readers then have to skip some bytes while searching for the next position to start from (based on a delimiter, sync marker, block offset, etc.). In contrast, checkpointed offsets are often precise, because they were recorded as the reader when through the data stream. Starting a reader from a checkpointed offset may hence not require and search for the next delimiter/block/marker.
The
fileLen
is the length of the entire file, whilesplitEnd
is the offset of the first byte after the split end boundary (exclusive end boundary). For non-splittable formats, both values are identical.- Throws:
IOException
-
isSplittable
boolean isSplittable()
Checks whether this format is splittable. Splittable formats allow Flink to create multiple splits per file, so that Flink can read multiple regions of the file concurrently.See
top-level JavaDocs
(section "Splitting") for details.
-
getProducedType
TypeInformation<T> getProducedType()
Gets the type produced by this format. This type will be the type produced by the file source as a whole.- Specified by:
getProducedType
in interfaceResultTypeQueryable<T>
- Returns:
- The data type produced by this function or input format.
-
-