Class Buckets<IN,BucketID>
- java.lang.Object
-
- org.apache.flink.streaming.api.functions.sink.filesystem.Buckets<IN,BucketID>
-
- Type Parameters:
IN- The type of input elements.BucketID- The type of ids for the buckets, as returned by theBucketAssigner.
@Internal public class Buckets<IN,BucketID> extends Object
The manager of the different active buckets in theStreamingFileSink.This class is responsible for all bucket-related operations and the actual
StreamingFileSinkis just plugging in the functionality offered by this class to the lifecycle of the operator.
-
-
Constructor Summary
Constructors Constructor Description Buckets(org.apache.flink.core.fs.Path basePath, org.apache.flink.streaming.api.functions.sink.filesystem.BucketAssigner<IN,BucketID> bucketAssigner, BucketFactory<IN,BucketID> bucketFactory, org.apache.flink.streaming.api.functions.sink.filesystem.BucketWriter<IN,BucketID> bucketWriter, org.apache.flink.streaming.api.functions.sink.filesystem.RollingPolicy<IN,BucketID> rollingPolicy, int subtaskIndex, org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig outputFileConfig)A constructor creating a new empty bucket manager.
-
Method Summary
All Methods Instance Methods Concrete Methods Modifier and Type Method Description voidclose()voidclosePartFileForBucket(BucketID bucketID)voidcommitUpToCheckpoint(long checkpointId)longgetMaxPartCounter()voidinitializeState(org.apache.flink.api.common.state.ListState<byte[]> bucketStates, org.apache.flink.api.common.state.ListState<Long> partCounterState)Initializes the state after recovery from a failure.Bucket<IN,BucketID>onElement(IN value, long currentProcessingTime, Long elementTimestamp, long currentWatermark)Bucket<IN,BucketID>onElement(IN value, org.apache.flink.streaming.api.functions.sink.legacy.SinkFunction.Context context)voidonProcessingTime(long timestamp)voidsetBucketLifeCycleListener(BucketLifeCycleListener<IN,BucketID> bucketLifeCycleListener)voidsetFileLifeCycleListener(FileLifeCycleListener<BucketID> fileLifeCycleListener)voidsnapshotState(long checkpointId, org.apache.flink.api.common.state.ListState<byte[]> bucketStatesContainer, org.apache.flink.api.common.state.ListState<Long> partCounterStateContainer)
-
-
-
Constructor Detail
-
Buckets
public Buckets(org.apache.flink.core.fs.Path basePath, org.apache.flink.streaming.api.functions.sink.filesystem.BucketAssigner<IN,BucketID> bucketAssigner, BucketFactory<IN,BucketID> bucketFactory, org.apache.flink.streaming.api.functions.sink.filesystem.BucketWriter<IN,BucketID> bucketWriter, org.apache.flink.streaming.api.functions.sink.filesystem.RollingPolicy<IN,BucketID> rollingPolicy, int subtaskIndex, org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig outputFileConfig)A constructor creating a new empty bucket manager.- Parameters:
basePath- The base path for our buckets.bucketAssigner- TheBucketAssignerprovided by the user.bucketFactory- TheBucketFactoryto be used to create buckets.bucketWriter- TheBucketWriterto be used when writing data.rollingPolicy- TheRollingPolicyas specified by the user.
-
-
Method Detail
-
setBucketLifeCycleListener
public void setBucketLifeCycleListener(BucketLifeCycleListener<IN,BucketID> bucketLifeCycleListener)
-
setFileLifeCycleListener
public void setFileLifeCycleListener(FileLifeCycleListener<BucketID> fileLifeCycleListener)
-
initializeState
public void initializeState(org.apache.flink.api.common.state.ListState<byte[]> bucketStates, org.apache.flink.api.common.state.ListState<Long> partCounterState) throws ExceptionInitializes the state after recovery from a failure.During this process:
- we set the initial value for part counter to the maximum value used before across all tasks and buckets. This guarantees that we do not overwrite valid data,
- we commit any pending files for previous checkpoints (previous to the last successful one from which we restore),
- we resume writing to the previous in-progress file of each bucket, and
- if we receive multiple states for the same bucket, we merge them.
- Parameters:
bucketStates- the state holding recovered state about active buckets.partCounterState- the state holding the max previously used part counters.- Throws:
Exception- if anything goes wrong during retrieving the state or restoring/committing of any in-progress/pending part files
-
commitUpToCheckpoint
public void commitUpToCheckpoint(long checkpointId) throws IOException- Throws:
IOException
-
snapshotState
public void snapshotState(long checkpointId, org.apache.flink.api.common.state.ListState<byte[]> bucketStatesContainer, org.apache.flink.api.common.state.ListState<Long> partCounterStateContainer) throws Exception- Throws:
Exception
-
onElement
@VisibleForTesting public Bucket<IN,BucketID> onElement(IN value, org.apache.flink.streaming.api.functions.sink.legacy.SinkFunction.Context context) throws Exception
- Throws:
Exception
-
onElement
public Bucket<IN,BucketID> onElement(IN value, long currentProcessingTime, @Nullable Long elementTimestamp, long currentWatermark) throws Exception
- Throws:
Exception
-
closePartFileForBucket
public void closePartFileForBucket(BucketID bucketID) throws Exception
- Throws:
Exception
-
close
public void close()
-
getMaxPartCounter
@VisibleForTesting public long getMaxPartCounter()
-
-