pekko/akka-actor/src/main/scala/akka/pattern/package.scala

178 lines
6.2 KiB
Scala
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

/**
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
*/
package akka
import akka.actor._
import akka.dispatch.{ Future, Promise }
import akka.util.{ Timeout, Duration }
/**
* == Commonly Used Patterns With Akka ==
*
* This package is used as a collection point for usage patterns which involve
* actors, futures, etc. but are loosely enough coupled to (multiple of) them
* to present them separately from the core implementation. Currently supported
* are:
*
* <ul>
* <li><b>ask:</b> create a temporary one-off actor for receiving a reply to a
* message and complete a [[akka.dispatch.Future]] with it; returns said
* Future.</li>
* <li><b>pipeTo:</b> feed eventually computed value of a future to an actor as
* a message.</li>
* </ul>
*
* In Scala the recommended usage is to import the pattern from the package
* object:
* {{{
* import akka.pattern.ask
*
* ask(actor, message) // use it directly
* actor ask message // use it by implicit conversion
* }}}
*
* For Java the patterns are available as static methods of the [[akka.pattern.Patterns]]
* class:
* {{{
* import static akka.pattern.Patterns.ask;
*
* ask(actor, message);
* }}}
*/
package object pattern {
/**
* Import this implicit conversion to gain `?` and `ask` methods on
* [[akka.actor.ActorRef]], which will defer to the
* `ask(actorRef, message)(timeout)` method defined here.
*
* {{{
* import akka.pattern.ask
*
* val future = actor ? message // => ask(actor, message)
* val future = actor ask message // => ask(actor, message)
* val future = actor.ask(message)(timeout) // => ask(actor, message)(timeout)
* }}}
*
* All of the above use an implicit [[akka.actor.Timeout]].
*/
implicit def ask(actorRef: ActorRef): AskSupport.AskableActorRef = new AskSupport.AskableActorRef(actorRef)
/**
* Sends a message asynchronously and returns a [[akka.dispatch.Future]]
* holding the eventual reply message; this means that the target actor
* needs to send the result to the `sender` reference provided. The Future
* will be completed with an [[akka.actor.AskTimeoutException]] after the
* given timeout has expired; this is independent from any timeout applied
* while awaiting a result for this future (i.e. in
* `Await.result(..., timeout)`).
*
* <b>Warning:</b>
* When using future callbacks, inside actors you need to carefully avoid closing over
* the containing actors object, i.e. do not call methods or access mutable state
* on the enclosing actor from within the callback. This would break the actor
* encapsulation and may introduce synchronization bugs and race conditions because
* the callback will be scheduled concurrently to the enclosing actor. Unfortunately
* there is not yet a way to detect these illegal accesses at compile time.
*
* <b>Recommended usage:</b>
*
* {{{
* val f = ask(worker, request)(timeout)
* flow {
* EnrichedRequest(request, f())
* } pipeTo nextActor
* }}}
*
* [see [[akka.dispatch.Future]] for a description of `flow`]
*/
def ask(actorRef: ActorRef, message: Any)(implicit timeout: Timeout): Future[Any] = actorRef match {
case ref: InternalActorRef if ref.isTerminated
actorRef.tell(message)
Promise.failed(new AskTimeoutException("sending to terminated ref breaks promises"))(ref.provider.dispatcher)
case ref: InternalActorRef
val provider = ref.provider
if (timeout.duration.length <= 0) {
actorRef.tell(message)
Promise.failed(new AskTimeoutException("not asking with negative timeout"))(provider.dispatcher)
} else {
val a = AskSupport.createAsker(provider, timeout)
actorRef.tell(message, a)
a.result
}
case _ throw new IllegalArgumentException("incompatible ActorRef " + actorRef)
}
/**
* Import this implicit conversion to gain the `pipeTo` method on [[akka.dispatch.Future]]:
*
* {{{
* import akka.pattern.pipeTo
*
* Future { doExpensiveCalc() } pipeTo nextActor
* }}}
*/
implicit def pipeTo[T](future: Future[T]): PipeToSupport.PipeableFuture[T] = new PipeToSupport.PipeableFuture(future)
/**
* Register an onComplete callback on this [[akka.dispatch.Future]] to send
* the result to the given actor reference. Returns the original Future to
* allow method chaining.
*
* <b>Recommended usage example:</b>
*
* {{{
* val f = ask(worker, request)(timeout)
* flow {
* EnrichedRequest(request, f())
* } pipeTo nextActor
* }}}
*
* [see [[akka.dispatch.Future]] for a description of `flow`]
*/
def pipeTo[T](future: Future[T], actorRef: ActorRef): Future[T] = {
future onComplete {
case Right(r) actorRef ! r
case Left(f) actorRef ! Status.Failure(f)
}
future
}
/**
* Returns a [[akka.dispatch.Future]] that will be completed with success (value `true`) when
* existing messages of the target actor has been processed and the actor has been
* terminated.
*
* Useful when you need to wait for termination or compose ordered termination of several actors.
*
* If the target actor isn't terminated within the timeout the [[akka.dispatch.Future]]
* is completed with failure [[akka.actor.ActorTimeoutException]].
*/
def gracefulStop(target: ActorRef, timeout: Duration)(implicit system: ActorSystem): Future[Boolean] = {
if (target.isTerminated) {
Promise.successful(true)
} else {
val result = Promise[Boolean]()
system.actorOf(Props(new Actor {
// Terminated will be received when target has been stopped
context watch target
target ! PoisonPill
// ReceiveTimeout will be received if nothing else is received within the timeout
context setReceiveTimeout timeout
def receive = {
case Terminated(a) if a == target
result success true
context stop self
case ReceiveTimeout
result failure new ActorTimeoutException(
"Failed to stop [%s] within [%s]".format(target.path, context.receiveTimeout))
context stop self
}
}))
result
}
}
}