org.apache.pekko.pattern

== Commonly Used Patterns With Pekko ==

This package is used as a collection point for usage patterns which involve actors, futures, etc. but are loosely enough coupled to (multiple of) them to present them separately from the core implementation. Currently supported are:

In Scala the recommended usage is to import the pattern from the package object:

import org.apache.pekko.pattern.ask

ask(actor, message) // use it directly
actor ask message   // use it by implicit conversion

For Java the patterns are available as static methods of the org.apache.pekko.pattern.Patterns class:

import static org.apache.pekko.pattern.Patterns.ask;

ask(actor, message);

Attributes

Members list

Packages

== Extended Versions Of Pekko Patterns ==

== Extended Versions Of Pekko Patterns ==

This subpackage contains extended versions of Apache Pekko patterns.

Currently supported are:

  • ask: create a temporary one-off actor for receiving a reply to a message and complete a scala.concurrent.Future with it; returns said Future. a message.

In Scala the recommended usage is to import the pattern from the package object:

import org.apache.pekko.pattern.extended.ask

ask(actor, askSender => Request(askSender)) // use it directly
actor ask (Request(_))   // use it by implicit conversion

For Java the patterns are available as static methods of the org.apache.pekko.pattern.Patterns class:

import static org.apache.pekko.pattern.Patterns.ask;

ask(actor, new org.apache.pekko.japi.function.Function<ActorRef, Object> {
 Object apply(ActorRef askSender) {
   return new Request(askSender);
 }
});

Attributes

Type members

Classlikes

trait AskSupport

This object contains implementation details of the “ask” pattern.

This object contains implementation details of the “ask” pattern.

Attributes

Source
AskSupport.scala
Supertypes
class Object
trait Matchable
class Any
class AskTimeoutException(message: String, cause: Throwable) extends TimeoutException, NoStackTrace

This is what is used to complete a Future that is returned from an ask/? call, when it times out. A typical reason for AskTimeoutException is that the recipient actor didn't send a reply.

This is what is used to complete a Future that is returned from an ask/? call, when it times out. A typical reason for AskTimeoutException is that the recipient actor didn't send a reply.

Attributes

Source
AskSupport.scala
Supertypes
trait NoStackTrace
class TimeoutException
class Exception
class Throwable
trait Serializable
class Object
trait Matchable
class Any
Show all

Attributes

Companion
class
Source
AskSupport.scala
Supertypes
class Object
trait Matchable
class Any
Self type
final class AskableActorRef(val actorRef: ActorRef) extends AnyVal

Attributes

Companion
object
Source
AskSupport.scala
Supertypes
class AnyVal
trait Matchable
class Any
final class AskableActorSelection(val actorSel: ActorSelection) extends AnyVal

Attributes

Source
AskSupport.scala
Supertypes
class AnyVal
trait Matchable
class Any
@DoNotInherit

Attributes

Source
BackoffOptions.scala
Supertypes
class Object
trait Matchable
class Any
@DoNotInherit
sealed trait BackoffOnStopOptions

Attributes

Source
BackoffOptions.scala
Supertypes
class Object
trait Matchable
class Any
object BackoffOpts

Backoff options allow to specify a number of properties for backoff supervisors.

Backoff options allow to specify a number of properties for backoff supervisors.

Attributes

Source
BackoffOptions.scala
Supertypes
class Object
trait Matchable
class Any
Self type

Attributes

Source
BackoffSupervisor.scala
Supertypes
class Object
trait Matchable
class Any
Self type

Companion object providing factory methods for Circuit Breaker which runs callbacks in caller's thread

Companion object providing factory methods for Circuit Breaker which runs callbacks in caller's thread

Attributes

Companion
class
Source
CircuitBreaker.scala
Supertypes
class Object
trait Matchable
class Any
Self type
class CircuitBreaker(scheduler: Scheduler, maxFailures: Int, callTimeout: FiniteDuration, val resetTimeout: FiniteDuration, maxResetTimeout: FiniteDuration, exponentialBackoffFactor: Double, randomFactor: Double, val allowExceptions: Set[String], val telemetry: CircuitBreakerTelemetry)(implicit executor: ExecutionContext)

Provides circuit breaker functionality for stability when working with "dangerous" operations, e.g. calls to remote systems.

Provides circuit breaker functionality for stability when working with "dangerous" operations, e.g. calls to remote systems.

Transitions through three states:

  • In Closed state, calls pass through until the maxFailures count is reached. This causes the circuit breaker to open. Both exceptions and calls exceeding callTimeout are considered failures.
  • In Open state, calls fail-fast with an exception. After resetTimeout, circuit breaker transitions to half-open state.
  • In Half-Open state, the first call will be allowed through, if it succeeds the circuit breaker will reset to closed state. If it fails, the circuit breaker will re-open to open state. All calls beyond the first that execute while the first is running will fail-fast with an exception.

Value parameters

callTimeout

scala.concurrent.duration.FiniteDuration of time after which to consider a call a failure

executor

scala.concurrent.ExecutionContext used for execution of state transition listeners

maxFailures

Maximum number of failures before opening the circuit

randomFactor

after calculation of the exponential back-off an additional random delay based on this factor is added, e.g. 0.2 adds up to 20% delay. randomFactor should be in range 0.0 (inclusive) and 1.0 (inclusive). In order to skip this additional delay pass in 0.

resetTimeout

scala.concurrent.duration.FiniteDuration of time after which to attempt to close the circuit

scheduler

Reference to Pekko scheduler

Attributes

Companion
object
Source
CircuitBreaker.scala
Supertypes
class Object
trait Matchable
class Any
class CircuitBreakerOpenException(val remainingDuration: FiniteDuration, message: String = ...) extends PekkoException, NoStackTrace

Exception thrown when Circuit Breaker is open.

Exception thrown when Circuit Breaker is open.

Value parameters

message

Defaults to "Circuit Breaker is open; calls are failing fast"

remainingDuration

Stores remaining time before attempting a reset. Zero duration means the breaker is currently in half-open state.

Attributes

Source
CircuitBreaker.scala
Supertypes
trait NoStackTrace
class RuntimeException
class Exception
class Throwable
trait Serializable
class Object
trait Matchable
class Any
Show all

Companion object providing factory methods for Circuit Breaker which runs callbacks in caller's thread

Companion object providing factory methods for Circuit Breaker which runs callbacks in caller's thread

Attributes

Companion
class
Source
CircuitBreakersRegistry.scala
Supertypes
class Object
trait Matchable
class Any
Self type

A CircuitBreakersPanel is a central point collecting all circuit breakers in Akka.

A CircuitBreakersPanel is a central point collecting all circuit breakers in Akka.

Attributes

Companion
object
Source
CircuitBreakersRegistry.scala
Supertypes
trait Extension
class Object
trait Matchable
class Any

This object contains implementation details of the “ask” pattern, which can be combined with "replyTo" pattern.

This object contains implementation details of the “ask” pattern, which can be combined with "replyTo" pattern.

Attributes

Source
AskSupport.scala
Supertypes
class Object
trait Matchable
class Any
final class ExplicitlyAskableActorRef(val actorRef: ActorRef) extends AnyVal

Attributes

Source
AskSupport.scala
Supertypes
class AnyVal
trait Matchable
class Any
final class ExplicitlyAskableActorSelection(val actorSel: ActorSelection) extends AnyVal

Attributes

Source
AskSupport.scala
Supertypes
class AnyVal
trait Matchable
class Any
trait FutureRef[T]

A combination of a Future and an ActorRef associated with it, which points to an actor performing a task which will eventually resolve the Future.

A combination of a Future and an ActorRef associated with it, which points to an actor performing a task which will eventually resolve the Future.

Attributes

Companion
object
Source
PromiseRef.scala
Supertypes
class Object
trait Matchable
class Any
object FutureRef

Attributes

Companion
trait
Source
PromiseRef.scala
Supertypes
class Object
trait Matchable
class Any
Self type
FutureRef.type

Attributes

Source
FutureTimeoutSupport.scala
Supertypes
class Object
trait Matchable
class Any

Attributes

Source
GracefulStopSupport.scala
Supertypes
class Object
trait Matchable
class Any
object Patterns

Java API: for Pekko patterns such as ask, pipe and others which work with java.util.concurrent.CompletionStage.

Java API: for Pekko patterns such as ask, pipe and others which work with java.util.concurrent.CompletionStage.

Attributes

Source
Patterns.scala
Supertypes
class Object
trait Matchable
class Any
Self type
Patterns.type

Attributes

Source
PipeToSupport.scala
Supertypes
class Object
trait Matchable
class Any
trait PromiseRef[T]

A combination of a Promise and an ActorRef associated with it, which points to an actor performing a task which will eventually resolve the Promise.

A combination of a Promise and an ActorRef associated with it, which points to an actor performing a task which will eventually resolve the Promise.

Attributes

Companion
object
Source
PromiseRef.scala
Supertypes
class Object
trait Matchable
class Any
Self type
object PromiseRef

Attributes

Companion
trait
Source
PromiseRef.scala
Supertypes
class Object
trait Matchable
class Any
Self type
PromiseRef.type
trait RetrySupport

This trait provides the retry utility function

This trait provides the retry utility function

Attributes

Companion
object
Source
RetrySupport.scala
Supertypes
class Object
trait Matchable
class Any
Known subtypes
object RetrySupport
object RetrySupport extends RetrySupport

Attributes

Companion
trait
Source
RetrySupport.scala
Supertypes
trait RetrySupport
class Object
trait Matchable
class Any
Self type
final class StatusReply[+T]

Generic top-level message type for replies that signal failure or success. Convenient to use together with the askWithStatus ask variants.

Generic top-level message type for replies that signal failure or success. Convenient to use together with the askWithStatus ask variants.

Create using the factory methods StatusReply#success and StatusReply#error.

Pekko contains predefined serializers for the wrapper type and the textual error messages.

Type parameters

T

the type of value a successful reply would have

Attributes

Companion
object
Source
StatusReply.scala
Supertypes
class Object
trait Matchable
class Any
object StatusReply

Attributes

Companion
class
Source
StatusReply.scala
Supertypes
class Object
trait Matchable
class Any
Self type

Inherited classlikes

final class PipeableCompletionStage[T](val future: CompletionStage[T])(implicit executionContext: ExecutionContext)

Attributes

Inherited from:
PipeToSupport
Source
PipeToSupport.scala
Supertypes
class Object
trait Matchable
class Any
final class PipeableFuture[T](val future: Future[T])(implicit executionContext: ExecutionContext)

Attributes

Inherited from:
PipeToSupport
Source
PipeToSupport.scala
Supertypes
class Object
trait Matchable
class Any

Value members

Inherited methods

def after[T](duration: FiniteDuration, using: Scheduler)(value: => Future[T])(implicit ec: ExecutionContext): Future[T]

Returns a scala.concurrent.Future that will be completed with the success or failure of the provided value after the specified duration.

Returns a scala.concurrent.Future that will be completed with the success or failure of the provided value after the specified duration.

Attributes

Inherited from:
FutureTimeoutSupport
Source
FutureTimeoutSupport.scala
def after[T](duration: FiniteDuration)(value: => Future[T])(implicit system: ClassicActorSystemProvider): Future[T]

Returns a scala.concurrent.Future that will be completed with the success or failure of the provided value after the specified duration.

Returns a scala.concurrent.Future that will be completed with the success or failure of the provided value after the specified duration.

Attributes

Inherited from:
FutureTimeoutSupport
Source
FutureTimeoutSupport.scala
def afterCompletionStage[T](duration: Duration, using: Scheduler)(value: => CompletionStage[T])(implicit ec: ExecutionContext): CompletionStage[T]

Returns a java.util.concurrent.CompletionStage that will be completed with the success or failure of the provided value after the specified duration.

Returns a java.util.concurrent.CompletionStage that will be completed with the success or failure of the provided value after the specified duration.

Attributes

Inherited from:
FutureTimeoutSupport
Source
FutureTimeoutSupport.scala
def afterCompletionStage[T](duration: Duration)(value: => CompletionStage[T])(implicit system: ClassicActorSystemProvider): CompletionStage[T]

Returns a java.util.concurrent.CompletionStage that will be completed with the success or failure of the provided value after the specified duration.

Returns a java.util.concurrent.CompletionStage that will be completed with the success or failure of the provided value after the specified duration.

Attributes

Inherited from:
FutureTimeoutSupport
Source
FutureTimeoutSupport.scala
def ask(actorSelection: ActorSelection, message: Any, sender: ActorRef)(implicit timeout: Timeout): Future[Any]

Attributes

Inherited from:
AskSupport
Source
AskSupport.scala
def ask(actorSelection: ActorSelection, message: Any)(implicit timeout: Timeout): Future[Any]

Sends a message asynchronously and returns a scala.concurrent.Future holding the eventual reply message; this means that the target actor needs to send the result to the sender reference provided.

Sends a message asynchronously and returns a scala.concurrent.Future holding the eventual reply message; this means that the target actor needs to send the result to the sender reference provided.

The Future will be completed with a pekko.pattern.AskTimeoutException after the given timeout has expired; this is independent from any timeout applied while awaiting a result for this future (i.e. in Await.result(..., timeout)). A typical reason for AskTimeoutException is that the recipient actor didn't send a reply.

Warning: When using future callbacks, inside actors you need to carefully avoid closing over the containing actor’s object, i.e. do not call methods or access mutable state on the enclosing actor from within the callback. This would break the actor encapsulation and may introduce synchronization bugs and race conditions because the callback will be scheduled concurrently to the enclosing actor. Unfortunately there is not yet a way to detect these illegal accesses at compile time.

Recommended usage:

 val f = ask(worker, request)(timeout)
 f.map { response =>
   EnrichedMessage(response)
 } pipeTo nextActor

Attributes

Inherited from:
AskSupport
Source
AskSupport.scala
def ask(actorRef: ActorRef, message: Any, sender: ActorRef)(implicit timeout: Timeout): Future[Any]

Attributes

Inherited from:
AskSupport
Source
AskSupport.scala
def ask(actorRef: ActorRef, message: Any)(implicit timeout: Timeout): Future[Any]

Sends a message asynchronously and returns a scala.concurrent.Future holding the eventual reply message; this means that the target actor needs to send the result to the sender reference provided.

Sends a message asynchronously and returns a scala.concurrent.Future holding the eventual reply message; this means that the target actor needs to send the result to the sender reference provided.

The Future will be completed with a pekko.pattern.AskTimeoutException after the given timeout has expired; this is independent from any timeout applied while awaiting a result for this future (i.e. in Await.result(..., timeout)). A typical reason for AskTimeoutException is that the recipient actor didn't send a reply.

Warning: When using future callbacks, inside actors you need to carefully avoid closing over the containing actor’s object, i.e. do not call methods or access mutable state on the enclosing actor from within the callback. This would break the actor encapsulation and may introduce synchronization bugs and race conditions because the callback will be scheduled concurrently to the enclosing actor. Unfortunately there is not yet a way to detect these illegal accesses at compile time.

Recommended usage:

 val f = ask(worker, request)(timeout)
 f.map { response =>
   EnrichedMessage(response)
 } pipeTo nextActor

Attributes

Inherited from:
AskSupport
Source
AskSupport.scala
def askWithStatus(actorRef: ActorRef, message: Any, sender: ActorRef)(implicit timeout: Timeout): Future[Any]

Use for messages whose response is known to be a pekko.pattern.StatusReply. When a pekko.pattern.StatusReply.Success response arrives the future is completed with the wrapped value, if a pekko.pattern.StatusReply.Error arrives the future is instead failed.

Use for messages whose response is known to be a pekko.pattern.StatusReply. When a pekko.pattern.StatusReply.Success response arrives the future is completed with the wrapped value, if a pekko.pattern.StatusReply.Error arrives the future is instead failed.

Attributes

Inherited from:
AskSupport
Source
AskSupport.scala
def askWithStatus(actorRef: ActorRef, message: Any)(implicit timeout: Timeout): Future[Any]

Use for messages whose response is known to be a pekko.pattern.StatusReply. When a pekko.pattern.StatusReply.Success response arrives the future is completed with the wrapped value, if a pekko.pattern.StatusReply.Error arrives the future is instead failed.

Use for messages whose response is known to be a pekko.pattern.StatusReply. When a pekko.pattern.StatusReply.Success response arrives the future is completed with the wrapped value, if a pekko.pattern.StatusReply.Error arrives the future is instead failed.

Attributes

Inherited from:
AskSupport
Source
AskSupport.scala
def gracefulStop(target: ActorRef, timeout: FiniteDuration, stopMessage: Any = ...): Future[Boolean]

Returns a scala.concurrent.Future that will be completed with success (value true) when existing messages of the target actor has been processed and the actor has been terminated.

Returns a scala.concurrent.Future that will be completed with success (value true) when existing messages of the target actor has been processed and the actor has been terminated.

Useful when you need to wait for termination or compose ordered termination of several actors, which should only be done outside of the ActorSystem as blocking inside Actors is discouraged.

IMPORTANT NOTICE: the actor being terminated and its supervisor being informed of the availability of the deceased actor’s name are two distinct operations, which do not obey any reliable ordering. Especially the following will NOT work:

def receive = {
 case msg =>
   Await.result(gracefulStop(someChild, timeout), timeout)
   context.actorOf(Props(...), "someChild") // assuming that that was someChild’s name, this will NOT work
}

If the target actor isn't terminated within the timeout the scala.concurrent.Future is completed with failure pekko.pattern.AskTimeoutException.

If you want to invoke specialized stopping logic on your target actor instead of PoisonPill, you can pass your stop command as a parameter:

 gracefulStop(someChild, timeout, MyStopGracefullyMessage).onComplete {
    // Do something after someChild being stopped
 }

Attributes

Inherited from:
GracefulStopSupport
Source
GracefulStopSupport.scala
def retry[T](attempt: () => Future[T], shouldRetry: (T, Throwable) => Boolean, attempts: Int, delayFunction: Int => Option[FiniteDuration])(implicit ec: ExecutionContext, scheduler: Scheduler): Future[T]

Given a function from Unit to Future, returns an internally retrying Future.

Given a function from Unit to Future, returns an internally retrying Future.

When the future is completed, the shouldRetry predicate is always been invoked with the result (or null if none) and the exception (or null if none). If the shouldRetry predicate returns true, then a new attempt is made, each subsequent attempt will be made after the 'delay' return by delayFunction (the input next attempt count start from 1). Returns scala.None for no delay.

A scheduler (eg context.system.scheduler) must be provided to delay each retry. You could provide a function to generate the next delay duration after first attempt, this function should never return null, otherwise a java.lang.IllegalArgumentException will be through.

If attempts are exhausted the returned future is simply the result of invoking attempt. Note that the attempt function will be invoked on the given execution context for subsequent tries and therefore must be thread safe (i.e. not touch unsafe mutable state).

Example usage:

//retry with back off

protected val sendAndReceive: HttpRequest => Future[HttpResponse]
protected val shouldRetry: (HttpResponse, Throwable) => throwable ne null
private val sendReceiveRetry: HttpRequest => Future[HttpResponse] = (req: HttpRequest) => retry[HttpResponse](
 attempt = () => sendAndReceive(req),
 shouldRetry,
 attempts = 10,
 delayFunction = attempted => Option(2.seconds * attempted)
)

Value parameters

attempt

the function to be attempted

attempts

the maximum number of attempts

delayFunction

the function to generate the next delay duration, None for no delay

ec

the execution context

scheduler

the scheduler for scheduling a delay

shouldRetry

the predicate to determine if the attempt should be retried

Attributes

Returns

the result future which maybe retried

Since

1.1.0

Inherited from:
RetrySupport
Source
RetrySupport.scala
def retry[T](attempt: () => Future[T], attempts: Int, delayFunction: Int => Option[FiniteDuration])(implicit ec: ExecutionContext, scheduler: Scheduler): Future[T]

Given a function from Unit to Future, returns an internally retrying Future. The first attempt will be made immediately, each subsequent attempt will be made after the 'delay' return by delayFunction (the input next attempt count start from 1). Returns scala.None for no delay.

Given a function from Unit to Future, returns an internally retrying Future. The first attempt will be made immediately, each subsequent attempt will be made after the 'delay' return by delayFunction (the input next attempt count start from 1). Returns scala.None for no delay.

A scheduler (eg context.system.scheduler) must be provided to delay each retry. You could provide a function to generate the next delay duration after first attempt, this function should never return null, otherwise a java.lang.IllegalArgumentException will be through.

If attempts are exhausted the returned future is simply the result of invoking attempt. Note that the attempt function will be invoked on the given execution context for subsequent tries and therefore must be thread safe (i.e. not touch unsafe mutable state).

Example usage:

//retry with back off

protected val sendAndReceive: HttpRequest => Future[HttpResponse]
private val sendReceiveRetry: HttpRequest => Future[HttpResponse] = (req: HttpRequest) => retry[HttpResponse](
 attempt = () => sendAndReceive(req),
 attempts = 10,
 delayFunction = attempted => Option(2.seconds * attempted)
)

Attributes

Inherited from:
RetrySupport
Source
RetrySupport.scala
def retry[T](attempt: () => Future[T], shouldRetry: (T, Throwable) => Boolean, attempts: Int, delay: FiniteDuration)(implicit ec: ExecutionContext, scheduler: Scheduler): Future[T]

Given a function from Unit to Future, returns an internally retrying Future.

Given a function from Unit to Future, returns an internally retrying Future.

When the future is completed, the shouldRetry predicate is always been invoked with the result (or null if none) and the exception (or null if none). If the shouldRetry predicate returns true, then a new attempt is made, each subsequent attempt will be made after the 'delay' return by delayFunction (the input next attempt count start from 1). Returns scala.None for no delay.

If attempts are exhausted the returned future is simply the result of invoking attempt. Note that the attempt function will be invoked on the given execution context for subsequent tries and therefore must be thread safe (i.e. not touch unsafe mutable state).

Example usage:

protected val sendAndReceive: HttpRequest => Future[HttpResponse]
protected val shouldRetry: (HttpResponse, Throwable) => throwable ne null
private val sendReceiveRetry: HttpRequest => Future[HttpResponse] = (req: HttpRequest) => retry[HttpResponse](
 attempt = () => sendAndReceive(req),
 shouldRetry,
 attempts = 10,
 delay = 2.seconds
)

Value parameters

attempt

the function to be attempted

attempts

the maximum number of attempts

delay

the delay duration

ec

the execution context

scheduler

the scheduler for scheduling a delay

shouldRetry

the predicate to determine if the attempt should be retried

Attributes

Returns

the result future which maybe retried

Since

1.1.0

Inherited from:
RetrySupport
Source
RetrySupport.scala
def retry[T](attempt: () => Future[T], attempts: Int, delay: FiniteDuration)(implicit ec: ExecutionContext, scheduler: Scheduler): Future[T]

Given a function from Unit to Future, returns an internally retrying Future. The first attempt will be made immediately, each subsequent attempt will be made after 'delay'. A scheduler (eg context.system.scheduler) must be provided to delay each retry.

Given a function from Unit to Future, returns an internally retrying Future. The first attempt will be made immediately, each subsequent attempt will be made after 'delay'. A scheduler (eg context.system.scheduler) must be provided to delay each retry.

If attempts are exhausted the returned future is simply the result of invoking attempt. Note that the attempt function will be invoked on the given execution context for subsequent tries and therefore must be thread safe (i.e. not touch unsafe mutable state).

Example usage:

protected val sendAndReceive: HttpRequest => Future[HttpResponse]
private val sendReceiveRetry: HttpRequest => Future[HttpResponse] = (req: HttpRequest) => retry[HttpResponse](
 attempt = () => sendAndReceive(req),
 attempts = 10,
 delay = 2.seconds
)

Attributes

Inherited from:
RetrySupport
Source
RetrySupport.scala
def retry[T](attempt: () => Future[T], shouldRetry: (T, Throwable) => Boolean, attempts: Int, minBackoff: FiniteDuration, maxBackoff: FiniteDuration, randomFactor: Double)(implicit ec: ExecutionContext, scheduler: Scheduler): Future[T]

Given a function from Unit to Future, returns an internally retrying Future.

Given a function from Unit to Future, returns an internally retrying Future.

When the future is completed, the shouldRetry predicate is always been invoked with the result (or null if none) and the exception (or null if none). If the shouldRetry predicate returns true, then a new attempt is made, each subsequent attempt will be made after the 'delay' return by delayFunction (the input next attempt count start from 1). Returns scala.None for no delay.

If attempts are exhausted the returned future is simply the result of invoking attempt. Note that the attempt function will be invoked on the given execution context for subsequent tries and therefore must be thread safe (i.e. not touch unsafe mutable state).

Example usage:

protected val sendAndReceive: HttpRequest => Future[HttpResponse]
protected val shouldRetry: (HttpResponse, Throwable) => throwable ne null
private val sendReceiveRetry: HttpRequest => Future[HttpResponse] = (req: HttpRequest) => retry[HttpResponse](
 attempt = () => sendAndReceive(req),
 shouldRetry,
 attempts = 10,
 minBackoff = 1.seconds,
 maxBackoff = 2.seconds,
 randomFactor = 0.5
)

Value parameters

attempt

the function to be attempted

attempts

the maximum number of attempts

ec

the execution context

maxBackoff

the exponential back-off is capped to this duration

minBackoff

minimum (initial) duration until the child actor will started again, if it is terminated

randomFactor

after calculation of the exponential back-off an additional random delay based on this factor is added, e.g. 0.2 adds up to 20% delay. In order to skip this additional delay pass in 0.

scheduler

the scheduler for scheduling a delay

shouldRetry

the predicate to determine if the attempt should be retried

Attributes

Returns

the result future which maybe retried

Since

1.1.0

Inherited from:
RetrySupport
Source
RetrySupport.scala
def retry[T](attempt: () => Future[T], attempts: Int, minBackoff: FiniteDuration, maxBackoff: FiniteDuration, randomFactor: Double)(implicit ec: ExecutionContext, scheduler: Scheduler): Future[T]

Given a function from Unit to Future, returns an internally retrying Future. The first attempt will be made immediately, each subsequent attempt will be made with a backoff time, if the previous attempt failed.

Given a function from Unit to Future, returns an internally retrying Future. The first attempt will be made immediately, each subsequent attempt will be made with a backoff time, if the previous attempt failed.

If attempts are exhausted the returned future is simply the result of invoking attempt. Note that the attempt function will be invoked on the given execution context for subsequent tries and therefore must be thread safe (i.e. not touch unsafe mutable state).

Example usage:

protected val sendAndReceive: HttpRequest => Future[HttpResponse]
private val sendReceiveRetry: HttpRequest => Future[HttpResponse] = (req: HttpRequest) => retry[HttpResponse](
 attempt = () => sendAndReceive(req),
 attempts = 10,
 minBackoff = 1.seconds,
 maxBackoff = 2.seconds,
 randomFactor = 0.5
)

Value parameters

maxBackoff

the exponential back-off is capped to this duration

minBackoff

minimum (initial) duration until the child actor will started again, if it is terminated

randomFactor

after calculation of the exponential back-off an additional random delay based on this factor is added, e.g. 0.2 adds up to 20% delay. In order to skip this additional delay pass in 0.

Attributes

Inherited from:
RetrySupport
Source
RetrySupport.scala
def retry[T](attempt: () => Future[T], shouldRetry: (T, Throwable) => Boolean, attempts: Int)(implicit ec: ExecutionContext): Future[T]

Given a function from Unit to Future, returns an internally retrying Future.

Given a function from Unit to Future, returns an internally retrying Future.

When the future is completed, the shouldRetry predicate is always been invoked with the result (or null if none) and the exception (or null if none). If the shouldRetry predicate returns true, then a new attempt is made, each subsequent attempt will be made after the 'delay' return by delayFunction (the input next attempt count start from 1). Returns scala.None for no delay.

If attempts are exhausted the returned future is simply the result of invoking attempt. Note that the attempt function will be invoked on the given execution context for subsequent tries and therefore must be thread safe (i.e. not touch unsafe mutable state).

Example usage:

def possiblyFailing(): Future[Something] = ???
val shouldRetry: (Something, Throwable) => throwable ne null
val withRetry: Future[Something] = retry(attempt = possiblyFailing, shouldRetry, attempts = 10)

Value parameters

attempt

the function to be attempted

attempts

the maximum number of attempts

ec

the execution context

shouldRetry

the predicate to determine if the attempt should be retried

Attributes

Returns

the result future which maybe retried

Since

1.1.0

Inherited from:
RetrySupport
Source
RetrySupport.scala
def retry[T](attempt: () => Future[T], attempts: Int)(implicit ec: ExecutionContext): Future[T]

Given a function from Unit to Future, returns an internally retrying Future. The first attempt will be made immediately, each subsequent attempt will be made immediately if the previous attempt failed.

Given a function from Unit to Future, returns an internally retrying Future. The first attempt will be made immediately, each subsequent attempt will be made immediately if the previous attempt failed.

If attempts are exhausted the returned future is simply the result of invoking attempt. Note that the attempt function will be invoked on the given execution context for subsequent tries and therefore must be thread safe (i.e. not touch unsafe mutable state).

Example usage:

def possiblyFailing(): Future[Something] = ???
val withRetry: Future[Something] = retry(attempt = possiblyFailing, attempts = 10)

Attributes

Inherited from:
RetrySupport
Source
RetrySupport.scala
def timeout[T](duration: FiniteDuration, using: Scheduler)(value: => Future[T])(implicit ec: ExecutionContext): Future[T]

Returns a scala.concurrent.Future that will be completed with a TimeoutException if the provided value is not completed within the specified duration.

Returns a scala.concurrent.Future that will be completed with a TimeoutException if the provided value is not completed within the specified duration.

Attributes

Since

1.2.0

Inherited from:
FutureTimeoutSupport
Source
FutureTimeoutSupport.scala

Deprecated and Inherited methods

def timeoutCompletionStage[T](duration: Duration, using: Scheduler)(value: => CompletionStage[T])(implicit ec: ExecutionContext): CompletionStage[T]

Returns a java.util.concurrent.CompletionStage that will be completed with a TimeoutException if the provided value is not completed within the specified duration.

Returns a java.util.concurrent.CompletionStage that will be completed with a TimeoutException if the provided value is not completed within the specified duration.

Attributes

Since

1.2.0

Deprecated
[Since version 2.0.0] Use `CompletableFuture#orTimeout instead.
Inherited from:
FutureTimeoutSupport
Source
FutureTimeoutSupport.scala

Implicits

Inherited implicits

implicit def ask(actorSelection: ActorSelection): AskableActorSelection

Import this implicit conversion to gain ? and ask methods on pekko.actor.ActorSelection, which will defer to the ask(actorSelection, message)(timeout) method defined here.

Import this implicit conversion to gain ? and ask methods on pekko.actor.ActorSelection, which will defer to the ask(actorSelection, message)(timeout) method defined here.

import org.apache.pekko.pattern.ask

val future = selection ? message             // => ask(selection, message)
val future = selection ask message           // => ask(selection, message)
val future = selection.ask(message)(timeout) // => ask(selection, message)(timeout)

All of the above use an implicit pekko.util.Timeout.

Attributes

Inherited from:
AskSupport
Source
AskSupport.scala
implicit def ask(actorRef: ActorRef): AskableActorRef

Import this implicit conversion to gain ? and ask methods on pekko.actor.ActorRef, which will defer to the ask(actorRef, message)(timeout) method defined here.

Import this implicit conversion to gain ? and ask methods on pekko.actor.ActorRef, which will defer to the ask(actorRef, message)(timeout) method defined here.

import org.apache.pekko.pattern.ask

val future = actor ? message             // => ask(actor, message)
val future = actor ask message           // => ask(actor, message)
val future = actor.ask(message)(timeout) // => ask(actor, message)(timeout)

All of the above use an implicit pekko.util.Timeout.

Attributes

Inherited from:
AskSupport
Source
AskSupport.scala
implicit def pipe[T](future: Future[T])(implicit executionContext: ExecutionContext): PipeableFuture[T]

Import this implicit conversion to gain the pipeTo method on scala.concurrent.Future:

Import this implicit conversion to gain the pipeTo method on scala.concurrent.Future:

import org.apache.pekko.pattern.pipe
// requires implicit ExecutionContext, e.g. by importing `context.dispatcher` inside an Actor

Future { doExpensiveCalc() } pipeTo nextActor

or

pipe(someFuture) to nextActor

The successful result of the future is sent as a message to the recipient, or the failure is sent in a pekko.actor.Status.Failure to the recipient.

Attributes

Inherited from:
PipeToSupport
Source
PipeToSupport.scala
implicit def pipeCompletionStage[T](future: CompletionStage[T])(implicit executionContext: ExecutionContext): PipeableCompletionStage[T]

Import this implicit conversion to gain the pipeTo method on scala.concurrent.Future:

Import this implicit conversion to gain the pipeTo method on scala.concurrent.Future:

import org.apache.pekko.pattern.pipe
// requires implicit ExecutionContext, e.g. by importing `context.dispatcher` inside an Actor

Future { doExpensiveCalc() } pipeTo nextActor

or

pipe(someFuture) to nextActor

The successful result of the future is sent as a message to the recipient, or the failure is sent in a pekko.actor.Status.Failure to the recipient.

Attributes

Inherited from:
PipeToSupport
Source
PipeToSupport.scala