Class FutureUtils

    • Constructor Detail

      • FutureUtils

        public FutureUtils()
    • Method Detail

      • completedVoidFuture

        public static CompletableFuture<Void> completedVoidFuture()
        Returns a completed future of type Void.
        Returns:
        a completed future of type Void
      • unsupportedOperationFuture

        public static <T> CompletableFuture<T> unsupportedOperationFuture()
        Returns an exceptionally completed future with an UnsupportedOperationException.
        Type Parameters:
        T - type of the future
        Returns:
        exceptionally completed future
      • completeFromCallable

        public static <T> void completeFromCallable​(CompletableFuture<T> future,
                                                    Callable<T> operation)
        Fakes asynchronous execution by immediately executing the operation and completing the supplied future either normally or exceptionally.
        Type Parameters:
        T - type of the result
        Parameters:
        operation - to executed
      • retry

        public static <T> CompletableFuture<T> retry​(Supplier<CompletableFuture<T>> operation,
                                                     int retries,
                                                     Executor executor)
        Retry the given operation the given number of times in case of a failure.
        Type Parameters:
        T - type of the result
        Parameters:
        operation - to executed
        retries - if the operation failed
        executor - to use to run the futures
        Returns:
        Future containing either the result of the operation or a FutureUtils.RetryException
      • retry

        public static <T> CompletableFuture<T> retry​(Supplier<CompletableFuture<T>> operation,
                                                     int retries,
                                                     Predicate<Throwable> retryPredicate,
                                                     Executor executor)
        Retry the given operation the given number of times in case of a failure only when an exception is retryable.
        Type Parameters:
        T - type of the result
        Parameters:
        operation - to executed
        retries - if the operation failed
        retryPredicate - Predicate to test whether an exception is retryable
        executor - to use to run the futures
        Returns:
        Future containing either the result of the operation or a FutureUtils.RetryException
      • retryWithDelay

        public static <T> CompletableFuture<T> retryWithDelay​(Supplier<CompletableFuture<T>> operation,
                                                              RetryStrategy retryStrategy,
                                                              Predicate<Throwable> retryPredicate,
                                                              ScheduledExecutor scheduledExecutor)
        Retry the given operation with the given delay in between failures.
        Type Parameters:
        T - type of the result
        Parameters:
        operation - to retry
        retryStrategy - the RetryStrategy
        retryPredicate - Predicate to test whether an exception is retryable
        scheduledExecutor - executor to be used for the retry operation
        Returns:
        Future which retries the given operation a given amount of times and delays the retry in case of failures
      • retryWithDelay

        public static <T> CompletableFuture<T> retryWithDelay​(Supplier<CompletableFuture<T>> operation,
                                                              RetryStrategy retryStrategy,
                                                              ScheduledExecutor scheduledExecutor)
        Retry the given operation with the given delay in between failures.
        Type Parameters:
        T - type of the result
        Parameters:
        operation - to retry
        retryStrategy - the RetryStrategy
        scheduledExecutor - executor to be used for the retry operation
        Returns:
        Future which retries the given operation a given amount of times and delays the retry in case of failures
      • retrySuccessfulWithDelay

        public static <T> CompletableFuture<T> retrySuccessfulWithDelay​(Supplier<CompletableFuture<T>> operation,
                                                                        Duration retryDelay,
                                                                        Deadline deadline,
                                                                        Predicate<T> acceptancePredicate,
                                                                        ScheduledExecutor scheduledExecutor)
        Retry the given operation with the given delay in between successful completions where the result does not match a given predicate.
        Type Parameters:
        T - type of the result
        Parameters:
        operation - to retry
        retryDelay - delay between retries
        deadline - A deadline that specifies at what point we should stop retrying
        acceptancePredicate - Predicate to test whether the result is acceptable
        scheduledExecutor - executor to be used for the retry operation
        Returns:
        Future which retries the given operation a given amount of times and delays the retry in case the predicate isn't matched
      • orTimeout

        public static <T> CompletableFuture<T> orTimeout​(CompletableFuture<T> future,
                                                         long timeout,
                                                         TimeUnit timeUnit,
                                                         @Nullable
                                                         String timeoutMsg)
        Times the given future out after the timeout.
        Type Parameters:
        T - type of the given future
        Parameters:
        future - to time out
        timeout - after which the given future is timed out
        timeUnit - time unit of the timeout
        timeoutMsg - timeout message for exception
        Returns:
        The timeout enriched future
      • orTimeout

        public static <T> CompletableFuture<T> orTimeout​(CompletableFuture<T> future,
                                                         long timeout,
                                                         TimeUnit timeUnit,
                                                         Executor timeoutFailExecutor,
                                                         @Nullable
                                                         String timeoutMsg)
        Times the given future out after the timeout.
        Type Parameters:
        T - type of the given future
        Parameters:
        future - to time out
        timeout - after which the given future is timed out
        timeUnit - time unit of the timeout
        timeoutFailExecutor - executor that will complete the future exceptionally after the timeout is reached
        timeoutMsg - timeout message for exception
        Returns:
        The timeout enriched future
      • completeDelayed

        public static <T> void completeDelayed​(CompletableFuture<T> future,
                                               T success,
                                               Duration delay)
        Asynchronously completes the future after a certain delay.
        Parameters:
        future - The future to complete.
        success - The element to complete the future with.
        delay - The delay after which the future should be completed.
      • runIfNotDoneAndGet

        public static <T> T runIfNotDoneAndGet​(RunnableFuture<T> future)
                                        throws ExecutionException,
                                               InterruptedException
        Run the given RunnableFuture if it is not done, and then retrieves its result.
        Type Parameters:
        T - type of the result
        Parameters:
        future - to run if not done and get
        Returns:
        the result after running the future
        Throws:
        ExecutionException - if a problem occurred
        InterruptedException - if the current thread has been interrupted
      • runAfterwards

        public static CompletableFuture<Void> runAfterwards​(CompletableFuture<?> future,
                                                            org.apache.flink.util.function.RunnableWithException runnable)
        Run the given action after the completion of the given future. The given future can be completed normally or exceptionally. In case of an exceptional completion the, the action's exception will be added to the initial exception.
        Parameters:
        future - to wait for its completion
        runnable - action which is triggered after the future's completion
        Returns:
        Future which is completed after the action has completed. This future can contain an exception, if an error occurred in the given future or action.
      • runAfterwardsAsync

        public static CompletableFuture<Void> runAfterwardsAsync​(CompletableFuture<?> future,
                                                                 org.apache.flink.util.function.RunnableWithException runnable)
        Run the given action after the completion of the given future. The given future can be completed normally or exceptionally. In case of an exceptional completion the, the action's exception will be added to the initial exception.
        Parameters:
        future - to wait for its completion
        runnable - action which is triggered after the future's completion
        Returns:
        Future which is completed after the action has completed. This future can contain an exception, if an error occurred in the given future or action.
      • runAfterwardsAsync

        public static CompletableFuture<Void> runAfterwardsAsync​(CompletableFuture<?> future,
                                                                 org.apache.flink.util.function.RunnableWithException runnable,
                                                                 Executor executor)
        Run the given action after the completion of the given future. The given future can be completed normally or exceptionally. In case of an exceptional completion the action's exception will be added to the initial exception.
        Parameters:
        future - to wait for its completion
        runnable - action which is triggered after the future's completion
        executor - to run the given action
        Returns:
        Future which is completed after the action has completed. This future can contain an exception, if an error occurred in the given future or action.
      • composeAfterwards

        public static CompletableFuture<Void> composeAfterwards​(CompletableFuture<?> future,
                                                                Supplier<CompletableFuture<?>> composedAction)
        Run the given asynchronous action after the completion of the given future. The given future can be completed normally or exceptionally. In case of an exceptional completion, the asynchronous action's exception will be added to the initial exception.
        Parameters:
        future - to wait for its completion
        composedAction - asynchronous action which is triggered after the future's completion
        Returns:
        Future which is completed after the asynchronous action has completed. This future can contain an exception if an error occurred in the given future or asynchronous action.
      • composeAfterwardsAsync

        public static CompletableFuture<Void> composeAfterwardsAsync​(CompletableFuture<?> future,
                                                                     Supplier<CompletableFuture<?>> composedAction,
                                                                     Executor executor)
        Run the given asynchronous action after the completion of the given future. The given future can be completed normally or exceptionally. In case of an exceptional completion, the asynchronous action's exception will be added to the initial exception.
        Parameters:
        future - to wait for its completion
        composedAction - asynchronous action which is triggered after the future's completion
        Returns:
        Future which is completed on the passed Executor after the asynchronous action has completed. This future can contain an exception if an error occurred in the given future or asynchronous action.
      • combineAll

        public static <T> FutureUtils.ConjunctFuture<Collection<T>> combineAll​(Collection<? extends CompletableFuture<? extends T>> futures)
        Creates a future that is complete once multiple other futures completed. The future fails (completes exceptionally) once one of the futures in the conjunction fails. Upon successful completion, the future returns the collection of the futures' results.

        The ConjunctFuture gives access to how many Futures in the conjunction have already completed successfully, via FutureUtils.ConjunctFuture.getNumFuturesCompleted().

        Parameters:
        futures - The futures that make up the conjunction. No null entries are allowed.
        Returns:
        The ConjunctFuture that completes once all given futures are complete (or one fails).
      • waitForAll

        public static FutureUtils.ConjunctFuture<Void> waitForAll​(Collection<? extends CompletableFuture<?>> futures)
        Creates a future that is complete once all of the given futures have completed. The future fails (completes exceptionally) once one of the given futures fails.

        The ConjunctFuture gives access to how many Futures have already completed successfully, via FutureUtils.ConjunctFuture.getNumFuturesCompleted().

        Parameters:
        futures - The futures to wait on. No null entries are allowed.
        Returns:
        The WaitingFuture that completes once all given futures are complete (or one fails).
      • completeAll

        public static FutureUtils.ConjunctFuture<Void> completeAll​(Collection<? extends CompletableFuture<?>> futuresToComplete)
        Creates a FutureUtils.ConjunctFuture which is only completed after all given futures have completed. Unlike waitForAll(Collection), the resulting future won't be completed directly if one of the given futures is completed exceptionally. Instead, all occurring exception will be collected and combined to a single exception. If at least on exception occurs, then the resulting future will be completed exceptionally.
        Parameters:
        futuresToComplete - futures to complete
        Returns:
        Future which is completed after all given futures have been completed.
      • completedExceptionally

        public static <T> CompletableFuture<T> completedExceptionally​(Throwable cause)
        Returns an exceptionally completed CompletableFuture.
        Type Parameters:
        T - type of the future
        Parameters:
        cause - to complete the future with
        Returns:
        An exceptionally completed CompletableFuture
      • supplyAsync

        public static <T> CompletableFuture<T> supplyAsync​(org.apache.flink.util.function.SupplierWithException<T,​?> supplier,
                                                           Executor executor)
        Returns a future which is completed with the result of the SupplierWithException.
        Type Parameters:
        T - type of the result
        Parameters:
        supplier - to provide the future's value
        executor - to execute the supplier
        Returns:
        Future which is completed with the value of the supplier
      • runAsync

        public static CompletableFuture<Void> runAsync​(org.apache.flink.util.function.RunnableWithException runnable,
                                                       Executor executor)
        Returns a future which is completed when RunnableWithException is finished.
        Parameters:
        runnable - represents the task
        executor - to execute the runnable
        Returns:
        Future which is completed when runnable is finished
      • thenApplyAsyncIfNotDone

        public static <IN,​OUT> CompletableFuture<OUT> thenApplyAsyncIfNotDone​(CompletableFuture<IN> completableFuture,
                                                                                    Executor executor,
                                                                                    Function<? super IN,​? extends OUT> applyFun)
        This function takes a CompletableFuture and a function to apply to this future. If the input future is already done, this function returns CompletableFuture.thenApply(Function). Otherwise, the return value is CompletableFuture.thenApplyAsync(Function, Executor) with the given executor.
        Type Parameters:
        IN - type of the input future.
        OUT - type of the output future.
        Parameters:
        completableFuture - the completable future for which we want to apply.
        executor - the executor to run the apply function if the future is not yet done.
        applyFun - the function to apply.
        Returns:
        a completable future that is applying the given function to the input future.
      • thenComposeAsyncIfNotDone

        public static <IN,​OUT> CompletableFuture<OUT> thenComposeAsyncIfNotDone​(CompletableFuture<IN> completableFuture,
                                                                                      Executor executor,
                                                                                      Function<? super IN,​? extends CompletionStage<OUT>> composeFun)
        This function takes a CompletableFuture and a function to compose with this future. If the input future is already done, this function returns CompletableFuture.thenCompose(Function). Otherwise, the return value is CompletableFuture.thenComposeAsync(Function, Executor) with the given executor.
        Type Parameters:
        IN - type of the input future.
        OUT - type of the output future.
        Parameters:
        completableFuture - the completable future for which we want to compose.
        executor - the executor to run the compose function if the future is not yet done.
        composeFun - the function to compose.
        Returns:
        a completable future that is a composition of the input future and the function.
      • whenCompleteAsyncIfNotDone

        public static <IN> CompletableFuture<IN> whenCompleteAsyncIfNotDone​(CompletableFuture<IN> completableFuture,
                                                                            Executor executor,
                                                                            BiConsumer<? super IN,​? super Throwable> whenCompleteFun)
        This function takes a CompletableFuture and a bi-consumer to call on completion of this future. If the input future is already done, this function returns CompletableFuture.whenComplete(BiConsumer). Otherwise, the return value is CompletableFuture.whenCompleteAsync(BiConsumer, Executor) with the given executor.
        Type Parameters:
        IN - type of the input future.
        Parameters:
        completableFuture - the completable future for which we want to call #whenComplete.
        executor - the executor to run the whenComplete function if the future is not yet done.
        whenCompleteFun - the bi-consumer function to call when the future is completed.
        Returns:
        the new completion stage.
      • thenAcceptAsyncIfNotDone

        public static <IN> CompletableFuture<Void> thenAcceptAsyncIfNotDone​(CompletableFuture<IN> completableFuture,
                                                                            Executor executor,
                                                                            Consumer<? super IN> consumer)
        This function takes a CompletableFuture and a consumer to accept the result of this future. If the input future is already done, this function returns CompletableFuture.thenAccept(Consumer). Otherwise, the return value is CompletableFuture.thenAcceptAsync(Consumer, Executor) with the given executor.
        Type Parameters:
        IN - type of the input future.
        Parameters:
        completableFuture - the completable future for which we want to call #thenAccept.
        executor - the executor to run the thenAccept function if the future is not yet done.
        consumer - the consumer function to call when the future is completed.
        Returns:
        the new completion stage.
      • handleAsyncIfNotDone

        public static <IN,​OUT> CompletableFuture<OUT> handleAsyncIfNotDone​(CompletableFuture<IN> completableFuture,
                                                                                 Executor executor,
                                                                                 BiFunction<? super IN,​Throwable,​? extends OUT> handler)
        This function takes a CompletableFuture and a handler function for the result of this future. If the input future is already done, this function returns CompletableFuture.handle(BiFunction). Otherwise, the return value is CompletableFuture.handleAsync(BiFunction, Executor) with the given executor.
        Type Parameters:
        IN - type of the handler input argument.
        OUT - type of the handler return value.
        Parameters:
        completableFuture - the completable future for which we want to call #handle.
        executor - the executor to run the handle function if the future is not yet done.
        handler - the handler function to call when the future is completed.
        Returns:
        the new completion stage.
      • isCompletedNormally

        public static boolean isCompletedNormally​(CompletableFuture<?> future)
        Returns:
        true if future has completed normally, false otherwise.
      • checkStateAndGet

        public static <T> T checkStateAndGet​(CompletableFuture<T> future)
        Perform check state that future has completed normally and return the result.
        Returns:
        the result of completable future.
        Throws:
        IllegalStateException - Thrown, if future has not completed or it has completed exceptionally.
      • getWithoutException

        @Nullable
        public static <T> T getWithoutException​(CompletableFuture<T> future)
        Gets the result of a completable future without any exception thrown.
        Type Parameters:
        T - the type of result
        Parameters:
        future - the completable future specified.
        Returns:
        the result of completable future, or null if it's unfinished or finished exceptionally
      • getOrDefault

        public static <T> T getOrDefault​(CompletableFuture<T> future,
                                         T defaultValue)
        Returns:
        the result of completable future, or the defaultValue if it has not yet completed.
      • assertNoException

        public static void assertNoException​(CompletableFuture<?> completableFuture)
        Asserts that the given CompletableFuture is not completed exceptionally. If the future is completed exceptionally, then it will call the FatalExitExceptionHandler.
        Parameters:
        completableFuture - to assert for no exceptions
      • handleException

        public static <T,​E extends ThrowableCompletableFuture<T> handleException​(CompletableFuture<? extends T> completableFuture,
                                                                                         Class<E> exceptionClass,
                                                                                         Function<? super E,​? extends T> exceptionHandler)
        Checks that the given CompletableFuture is not completed exceptionally with the specified class. If the future is completed exceptionally with the specific class, then try to recover using a given exception handler. If the exception does not match the specified class, just pass it through to later stages.
        Parameters:
        completableFuture - to assert for a given exception
        exceptionClass - exception class to assert for
        exceptionHandler - to call if the future is completed exceptionally with the specific exception
        Returns:
        completable future, that can recover from a specified exception
      • handleUncaughtException

        public static void handleUncaughtException​(CompletableFuture<?> completableFuture,
                                                   Thread.UncaughtExceptionHandler uncaughtExceptionHandler)
        Checks that the given CompletableFuture is not completed exceptionally. If the future is completed exceptionally, then it will call the given uncaught exception handler.
        Parameters:
        completableFuture - to assert for no exceptions
        uncaughtExceptionHandler - to call if the future is completed exceptionally
      • forward

        public static <T> void forward​(CompletableFuture<T> source,
                                       CompletableFuture<T> target)
        Forwards the value from the source future to the target future.
        Type Parameters:
        T - type of the value
        Parameters:
        source - future to forward the value from
        target - future to forward the value to
      • forwardAsync

        public static <T> void forwardAsync​(CompletableFuture<T> source,
                                            CompletableFuture<T> target,
                                            Executor executor)
        Forwards the value from the source future to the target future using the provided executor.
        Type Parameters:
        T - type of the value
        Parameters:
        source - future to forward the value from
        target - future to forward the value to
        executor - executor to forward the source value to the target future
      • throwIfCompletedExceptionally

        public static void throwIfCompletedExceptionally​(CompletableFuture<?> future)
                                                  throws Exception
        Throws the causing exception if the given future is completed exceptionally, otherwise do nothing.
        Parameters:
        future - the future to check.
        Throws:
        Exception - when the future is completed exceptionally.
      • doForward

        public static <T> void doForward​(@Nullable
                                         T value,
                                         @Nullable
                                         Throwable throwable,
                                         CompletableFuture<T> target)
        Completes the given future with either the given value or throwable, depending on which parameter is not null.
        Type Parameters:
        T - completed future
        Parameters:
        value - value with which the future should be completed
        throwable - throwable with which the future should be completed exceptionally
        target - future to complete
      • switchExecutor

        public static <T> CompletableFuture<T> switchExecutor​(CompletableFuture<? extends T> source,
                                                              Executor executor)
        Switches the execution context of the given source future. This works for normally and exceptionally completed futures.
        Type Parameters:
        T - type of the source
        Parameters:
        source - source to switch the execution context for
        executor - executor representing the new execution context
        Returns:
        future which is executed by the given executor