Class Buckets<IN,​BucketID>

  • Type Parameters:
    IN - The type of input elements.
    BucketID - The type of ids for the buckets, as returned by the BucketAssigner.

    @Internal
    public class Buckets<IN,​BucketID>
    extends Object
    The manager of the different active buckets in the StreamingFileSink.

    This class is responsible for all bucket-related operations and the actual StreamingFileSink is just plugging in the functionality offered by this class to the lifecycle of the operator.

    • Constructor Detail

      • Buckets

        public Buckets​(org.apache.flink.core.fs.Path basePath,
                       org.apache.flink.streaming.api.functions.sink.filesystem.BucketAssigner<IN,​BucketID> bucketAssigner,
                       BucketFactory<IN,​BucketID> bucketFactory,
                       org.apache.flink.streaming.api.functions.sink.filesystem.BucketWriter<IN,​BucketID> bucketWriter,
                       org.apache.flink.streaming.api.functions.sink.filesystem.RollingPolicy<IN,​BucketID> rollingPolicy,
                       int subtaskIndex,
                       org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig outputFileConfig)
        A constructor creating a new empty bucket manager.
        Parameters:
        basePath - The base path for our buckets.
        bucketAssigner - The BucketAssigner provided by the user.
        bucketFactory - The BucketFactory to be used to create buckets.
        bucketWriter - The BucketWriter to be used when writing data.
        rollingPolicy - The RollingPolicy as specified by the user.
    • Method Detail

      • initializeState

        public void initializeState​(org.apache.flink.api.common.state.ListState<byte[]> bucketStates,
                                    org.apache.flink.api.common.state.ListState<Long> partCounterState)
                             throws Exception
        Initializes the state after recovery from a failure.

        During this process:

        1. we set the initial value for part counter to the maximum value used before across all tasks and buckets. This guarantees that we do not overwrite valid data,
        2. we commit any pending files for previous checkpoints (previous to the last successful one from which we restore),
        3. we resume writing to the previous in-progress file of each bucket, and
        4. if we receive multiple states for the same bucket, we merge them.
        Parameters:
        bucketStates - the state holding recovered state about active buckets.
        partCounterState - the state holding the max previously used part counters.
        Throws:
        Exception - if anything goes wrong during retrieving the state or restoring/committing of any in-progress/pending part files
      • commitUpToCheckpoint

        public void commitUpToCheckpoint​(long checkpointId)
                                  throws IOException
        Throws:
        IOException
      • snapshotState

        public void snapshotState​(long checkpointId,
                                  org.apache.flink.api.common.state.ListState<byte[]> bucketStatesContainer,
                                  org.apache.flink.api.common.state.ListState<Long> partCounterStateContainer)
                           throws Exception
        Throws:
        Exception
      • onElement

        @VisibleForTesting
        public Bucket<IN,​BucketID> onElement​(IN value,
                                                   org.apache.flink.streaming.api.functions.sink.legacy.SinkFunction.Context context)
                                            throws Exception
        Throws:
        Exception
      • onProcessingTime

        public void onProcessingTime​(long timestamp)
                              throws Exception
        Throws:
        Exception
      • close

        public void close()
      • getMaxPartCounter

        @VisibleForTesting
        public long getMaxPartCounter()