Class Buckets<IN,BucketID>
- java.lang.Object
-
- org.apache.flink.streaming.api.functions.sink.filesystem.Buckets<IN,BucketID>
-
- Type Parameters:
IN
- The type of input elements.BucketID
- The type of ids for the buckets, as returned by theBucketAssigner
.
@Internal public class Buckets<IN,BucketID> extends Object
The manager of the different active buckets in theStreamingFileSink
.This class is responsible for all bucket-related operations and the actual
StreamingFileSink
is just plugging in the functionality offered by this class to the lifecycle of the operator.
-
-
Constructor Summary
Constructors Constructor Description Buckets(Path basePath, BucketAssigner<IN,BucketID> bucketAssigner, BucketFactory<IN,BucketID> bucketFactory, BucketWriter<IN,BucketID> bucketWriter, RollingPolicy<IN,BucketID> rollingPolicy, int subtaskIndex, OutputFileConfig outputFileConfig)
A constructor creating a new empty bucket manager.
-
Method Summary
All Methods Instance Methods Concrete Methods Modifier and Type Method Description void
close()
void
closePartFileForBucket(BucketID bucketID)
void
commitUpToCheckpoint(long checkpointId)
long
getMaxPartCounter()
void
initializeState(ListState<byte[]> bucketStates, ListState<Long> partCounterState)
Initializes the state after recovery from a failure.Bucket<IN,BucketID>
onElement(IN value, long currentProcessingTime, Long elementTimestamp, long currentWatermark)
Bucket<IN,BucketID>
onElement(IN value, SinkFunction.Context context)
void
onProcessingTime(long timestamp)
void
setBucketLifeCycleListener(BucketLifeCycleListener<IN,BucketID> bucketLifeCycleListener)
void
setFileLifeCycleListener(FileLifeCycleListener<BucketID> fileLifeCycleListener)
void
snapshotState(long checkpointId, ListState<byte[]> bucketStatesContainer, ListState<Long> partCounterStateContainer)
-
-
-
Constructor Detail
-
Buckets
public Buckets(Path basePath, BucketAssigner<IN,BucketID> bucketAssigner, BucketFactory<IN,BucketID> bucketFactory, BucketWriter<IN,BucketID> bucketWriter, RollingPolicy<IN,BucketID> rollingPolicy, int subtaskIndex, OutputFileConfig outputFileConfig)
A constructor creating a new empty bucket manager.- Parameters:
basePath
- The base path for our buckets.bucketAssigner
- TheBucketAssigner
provided by the user.bucketFactory
- TheBucketFactory
to be used to create buckets.bucketWriter
- TheBucketWriter
to be used when writing data.rollingPolicy
- TheRollingPolicy
as specified by the user.
-
-
Method Detail
-
setBucketLifeCycleListener
public void setBucketLifeCycleListener(BucketLifeCycleListener<IN,BucketID> bucketLifeCycleListener)
-
setFileLifeCycleListener
public void setFileLifeCycleListener(FileLifeCycleListener<BucketID> fileLifeCycleListener)
-
initializeState
public void initializeState(ListState<byte[]> bucketStates, ListState<Long> partCounterState) throws Exception
Initializes the state after recovery from a failure.During this process:
- we set the initial value for part counter to the maximum value used before across all tasks and buckets. This guarantees that we do not overwrite valid data,
- we commit any pending files for previous checkpoints (previous to the last successful one from which we restore),
- we resume writing to the previous in-progress file of each bucket, and
- if we receive multiple states for the same bucket, we merge them.
- Parameters:
bucketStates
- the state holding recovered state about active buckets.partCounterState
- the state holding the max previously used part counters.- Throws:
Exception
- if anything goes wrong during retrieving the state or restoring/committing of any in-progress/pending part files
-
commitUpToCheckpoint
public void commitUpToCheckpoint(long checkpointId) throws IOException
- Throws:
IOException
-
snapshotState
public void snapshotState(long checkpointId, ListState<byte[]> bucketStatesContainer, ListState<Long> partCounterStateContainer) throws Exception
- Throws:
Exception
-
onElement
@VisibleForTesting public Bucket<IN,BucketID> onElement(IN value, SinkFunction.Context context) throws Exception
- Throws:
Exception
-
onElement
public Bucket<IN,BucketID> onElement(IN value, long currentProcessingTime, @Nullable Long elementTimestamp, long currentWatermark) throws Exception
- Throws:
Exception
-
closePartFileForBucket
public void closePartFileForBucket(BucketID bucketID) throws Exception
- Throws:
Exception
-
close
public void close()
-
getMaxPartCounter
@VisibleForTesting public long getMaxPartCounter()
-
-