IN
- Type of the elements in the input of the sink that are also the elements to be
written to its output@Experimental public class FileSink<IN> extends Object implements Sink<IN,FileSinkCommittable,FileWriterBucketState,Void>
FileSystem
files within buckets. This
sink achieves exactly-once semantics for both BATCH
and STREAMING
.
When creating the sink a basePath
must be specified. The base directory contains one
directory for every bucket. The bucket directories themselves contain several part files, with at
least one for each parallel subtask of the sink which is writing data to that bucket. These part
files contain the actual output data.
The sink uses a BucketAssigner
to determine in which bucket directory each element
should be written to inside the base directory. The BucketAssigner
can, for example, roll
on every checkpoint or use time or a property of the element to determine the bucket directory.
The default BucketAssigner
is a DateTimeBucketAssigner
which will create one new
bucket every hour. You can specify a custom BucketAssigner
using the setBucketAssigner(bucketAssigner)
method, after calling forRowFormat(Path,
Encoder)
or forBulkFormat(Path, BulkWriter.Factory)
.
The names of the part files could be defined using OutputFileConfig
. This
configuration contains a part prefix and a part suffix that will be used with a random uid
assigned to each subtask of the sink and a rolling counter to determine the file names. For
example with a prefix "prefix" and a suffix ".ext", a file named "prefix-81fc4980-a6af-41c8-9937-9939408a734b-17.ext"
contains the data from subtask with uid
81fc4980-a6af-41c8-9937-9939408a734b
of the sink and is the 17th
part-file
created by that subtask.
Part files roll based on the user-specified RollingPolicy
. By default, a DefaultRollingPolicy
is used for row-encoded sink output; a OnCheckpointRollingPolicy
is
used for bulk-encoded sink output.
In some scenarios, the open buckets are required to change based on time. In these cases, the
user can specify a bucketCheckInterval
(by default 1m) and the sink will check
periodically and roll the part file if the specified rolling policy says so.
Part files can be in one of three states: in-progress
, pending
or finished
. The reason for this is how the sink works to provide exactly-once semantics and
fault-tolerance. The part file that is currently being written to is in-progress
. Once a
part file is closed for writing it becomes pending
. When a checkpoint is successful (for
STREAMING
) or at the end of the job (for BATCH
) the currently pending files will
be moved to finished
.
For STREAMING
in order to guarantee exactly-once semantics in case of a failure, the
sink should roll back to the state it had when that last successful checkpoint occurred. To this
end, when restoring, the restored files in pending
state are transferred into the finished
state while any in-progress
files are rolled back, so that they do not contain
data that arrived after the checkpoint from which we restore.
Modifier and Type | Class and Description |
---|---|
static class |
FileSink.BulkFormatBuilder<IN,T extends FileSink.BulkFormatBuilder<IN,T>>
A builder for configuring the sink for bulk-encoding formats, e.g.
|
static class |
FileSink.DefaultBulkFormatBuilder<IN>
Builder for the vanilla
FileSink using a bulk format. |
static class |
FileSink.DefaultRowFormatBuilder<IN>
Builder for the vanilla
FileSink using a row format. |
static class |
FileSink.RowFormatBuilder<IN,T extends FileSink.RowFormatBuilder<IN,T>>
A builder for configuring the sink for row-wise encoding formats.
|
Sink.InitContext, Sink.ProcessingTimeService
public SinkWriter<IN,FileSinkCommittable,FileWriterBucketState> createWriter(Sink.InitContext context, List<FileWriterBucketState> states) throws IOException
Sink
SinkWriter
. If the application is resumed from a checkpoint or savepoint and
the sink is stateful, it will receive the corresponding state obtained with SinkWriter.snapshotState(long)
and serialized with Sink.getWriterStateSerializer()
. If
no state exists, the first existing, compatible state specified in Sink.getCompatibleStateNames()
will be loaded and passed.createWriter
in interface Sink<IN,FileSinkCommittable,FileWriterBucketState,Void>
context
- the runtime context.states
- the writer's previous state.IOException
- for any failure during creation.SinkWriter.snapshotState(long)
,
Sink.getWriterStateSerializer()
,
Sink.getCompatibleStateNames()
public Optional<SimpleVersionedSerializer<FileWriterBucketState>> getWriterStateSerializer()
Sink
SinkWriter.snapshotState(long)
properly. The respective state is used in Sink.createWriter(InitContext, List)
on recovery.getWriterStateSerializer
in interface Sink<IN,FileSinkCommittable,FileWriterBucketState,Void>
public Optional<Committer<FileSinkCommittable>> createCommitter() throws IOException
Sink
Committer
which is part of a 2-phase-commit protocol. The SinkWriter
creates committables through SinkWriter.prepareCommit(boolean)
in the
first phase. The committables are then passed to this committer and persisted with Committer.commit(List)
. If a committer is returned, the sink must also return a Sink.getCommittableSerializer()
.createCommitter
in interface Sink<IN,FileSinkCommittable,FileWriterBucketState,Void>
IOException
- for any failure during creation.public Optional<SimpleVersionedSerializer<FileSinkCommittable>> getCommittableSerializer()
Sink
Committer
or GlobalCommitter
.getCommittableSerializer
in interface Sink<IN,FileSinkCommittable,FileWriterBucketState,Void>
public Optional<GlobalCommitter<FileSinkCommittable,Void>> createGlobalCommitter()
Sink
GlobalCommitter
which is part of a 2-phase-commit protocol. The SinkWriter
creates committables through SinkWriter.prepareCommit(boolean)
in the
first phase. The committables are then passed to the Committer and persisted with Committer.commit(List)
. The committables are also passed to this GlobalCommitter
of
which only a single instance exists. If a global committer is returned, the sink must also
return a Sink.getCommittableSerializer()
and Sink.getGlobalCommittableSerializer()
.createGlobalCommitter
in interface Sink<IN,FileSinkCommittable,FileWriterBucketState,Void>
public Optional<SimpleVersionedSerializer<Void>> getGlobalCommittableSerializer()
Sink
GlobalCommitter
.getGlobalCommittableSerializer
in interface Sink<IN,FileSinkCommittable,FileWriterBucketState,Void>
public Collection<String> getCompatibleStateNames()
Sink
FileSink
can resume from the state of an old StreamingFileSink
as a drop-in
replacement when resuming from a checkpoint/savepoint.getCompatibleStateNames
in interface Sink<IN,FileSinkCommittable,FileWriterBucketState,Void>
public static <IN> FileSink.DefaultRowFormatBuilder<IN> forRowFormat(Path basePath, Encoder<IN> encoder)
public static <IN> FileSink.DefaultBulkFormatBuilder<IN> forBulkFormat(Path basePath, BulkWriter.Factory<IN> bulkWriterFactory)
Copyright © 2014–2023 The Apache Software Foundation. All rights reserved.