Restructuring the pattern internals so it is modular
This commit is contained in:
parent
0c02adaa75
commit
aca5693ce6
6 changed files with 150 additions and 139 deletions
|
|
@ -22,7 +22,69 @@ class AskTimeoutException(message: String, cause: Throwable) extends TimeoutExce
|
|||
/**
|
||||
* This object contains implementation details of the “ask” pattern.
|
||||
*/
|
||||
object AskSupport {
|
||||
trait AskSupport {
|
||||
|
||||
/**
|
||||
* Import this implicit conversion to gain `?` and `ask` methods on
|
||||
* [[akka.actor.ActorRef]], which will defer to the
|
||||
* `ask(actorRef, message)(timeout)` method defined here.
|
||||
*
|
||||
* {{{
|
||||
* import akka.pattern.ask
|
||||
*
|
||||
* val future = actor ? message // => ask(actor, message)
|
||||
* val future = actor ask message // => ask(actor, message)
|
||||
* val future = actor.ask(message)(timeout) // => ask(actor, message)(timeout)
|
||||
* }}}
|
||||
*
|
||||
* All of the above use an implicit [[akka.actor.Timeout]].
|
||||
*/
|
||||
implicit def ask(actorRef: ActorRef): AskableActorRef = new AskableActorRef(actorRef)
|
||||
|
||||
/**
|
||||
* Sends a message asynchronously and returns a [[akka.dispatch.Future]]
|
||||
* holding the eventual reply message; this means that the target actor
|
||||
* needs to send the result to the `sender` reference provided. The Future
|
||||
* will be completed with an [[akka.actor.AskTimeoutException]] after the
|
||||
* given timeout has expired; this is independent from any timeout applied
|
||||
* while awaiting a result for this future (i.e. in
|
||||
* `Await.result(..., timeout)`).
|
||||
*
|
||||
* <b>Warning:</b>
|
||||
* When using future callbacks, inside actors you need to carefully avoid closing over
|
||||
* the containing actor’s object, i.e. do not call methods or access mutable state
|
||||
* on the enclosing actor from within the callback. This would break the actor
|
||||
* encapsulation and may introduce synchronization bugs and race conditions because
|
||||
* the callback will be scheduled concurrently to the enclosing actor. Unfortunately
|
||||
* there is not yet a way to detect these illegal accesses at compile time.
|
||||
*
|
||||
* <b>Recommended usage:</b>
|
||||
*
|
||||
* {{{
|
||||
* val f = ask(worker, request)(timeout)
|
||||
* flow {
|
||||
* EnrichedRequest(request, f())
|
||||
* } pipeTo nextActor
|
||||
* }}}
|
||||
*
|
||||
* [see [[akka.dispatch.Future]] for a description of `flow`]
|
||||
*/
|
||||
def ask(actorRef: ActorRef, message: Any)(implicit timeout: Timeout): Future[Any] = actorRef match {
|
||||
case ref: InternalActorRef if ref.isTerminated ⇒
|
||||
actorRef.tell(message)
|
||||
Promise.failed(new AskTimeoutException("sending to terminated ref breaks promises"))(ref.provider.dispatcher)
|
||||
case ref: InternalActorRef ⇒
|
||||
val provider = ref.provider
|
||||
if (timeout.duration.length <= 0) {
|
||||
actorRef.tell(message)
|
||||
Promise.failed(new AskTimeoutException("not asking with negative timeout"))(provider.dispatcher)
|
||||
} else {
|
||||
val a = createAsker(provider, timeout)
|
||||
actorRef.tell(message, a)
|
||||
a.result
|
||||
}
|
||||
case _ ⇒ throw new IllegalArgumentException("incompatible ActorRef " + actorRef)
|
||||
}
|
||||
|
||||
/**
|
||||
* Implementation detail of the “ask” pattern enrichment of ActorRef
|
||||
|
|
|
|||
|
|
@ -0,0 +1,47 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
|
||||
package akka.pattern
|
||||
|
||||
import akka.actor.{ ActorRef, Actor, ActorSystem, Props, PoisonPill, Terminated, ReceiveTimeout, ActorTimeoutException }
|
||||
import akka.dispatch.{ Promise, Future }
|
||||
import akka.util.Duration
|
||||
|
||||
trait GracefulStopSupport {
|
||||
/**
|
||||
* Returns a [[akka.dispatch.Future]] that will be completed with success (value `true`) when
|
||||
* existing messages of the target actor has been processed and the actor has been
|
||||
* terminated.
|
||||
*
|
||||
* Useful when you need to wait for termination or compose ordered termination of several actors.
|
||||
*
|
||||
* If the target actor isn't terminated within the timeout the [[akka.dispatch.Future]]
|
||||
* is completed with failure [[akka.actor.ActorTimeoutException]].
|
||||
*/
|
||||
def gracefulStop(target: ActorRef, timeout: Duration)(implicit system: ActorSystem): Future[Boolean] = {
|
||||
if (target.isTerminated) {
|
||||
Promise.successful(true)
|
||||
} else {
|
||||
val result = Promise[Boolean]()
|
||||
system.actorOf(Props(new Actor {
|
||||
// Terminated will be received when target has been stopped
|
||||
context watch target
|
||||
target ! PoisonPill
|
||||
// ReceiveTimeout will be received if nothing else is received within the timeout
|
||||
context setReceiveTimeout timeout
|
||||
|
||||
def receive = {
|
||||
case Terminated(a) if a == target ⇒
|
||||
result success true
|
||||
context stop self
|
||||
case ReceiveTimeout ⇒
|
||||
result failure new ActorTimeoutException(
|
||||
"Failed to stop [%s] within [%s]".format(target.path, context.receiveTimeout))
|
||||
context stop self
|
||||
}
|
||||
}))
|
||||
result
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -3,13 +3,46 @@
|
|||
*/
|
||||
package akka.pattern
|
||||
|
||||
import akka.actor.ActorRef
|
||||
import akka.dispatch.Future
|
||||
import akka.actor.{ Status, ActorRef }
|
||||
|
||||
object PipeToSupport {
|
||||
trait PipeToSupport {
|
||||
|
||||
class PipeableFuture[T](val future: Future[T]) {
|
||||
final class PipeableFuture[T](val future: Future[T]) {
|
||||
def pipeTo(actorRef: ActorRef): Future[T] = akka.pattern.pipe(future, actorRef)
|
||||
}
|
||||
|
||||
/**
|
||||
* Import this implicit conversion to gain the `pipeTo` method on [[akka.dispatch.Future]]:
|
||||
*
|
||||
* {{{
|
||||
* import akka.pattern.pipeTo
|
||||
*
|
||||
* Future { doExpensiveCalc() } pipeTo nextActor
|
||||
* }}}
|
||||
*/
|
||||
implicit def pipeTo[T](future: Future[T]): PipeableFuture[T] = new PipeableFuture(future)
|
||||
|
||||
/**
|
||||
* Register an onComplete callback on this [[akka.dispatch.Future]] to send
|
||||
* the result to the given actor reference. Returns the original Future to
|
||||
* allow method chaining.
|
||||
*
|
||||
* <b>Recommended usage example:</b>
|
||||
*
|
||||
* {{{
|
||||
* val f = ask(worker, request)(timeout)
|
||||
* flow {
|
||||
* EnrichedRequest(request, f())
|
||||
* } pipeTo nextActor
|
||||
* }}}
|
||||
*
|
||||
* [see [[akka.dispatch.Future]] for a description of `flow`]
|
||||
*/
|
||||
def pipe[T](future: Future[T], recipient: ActorRef): Future[T] =
|
||||
future onComplete {
|
||||
case Right(r) ⇒ recipient ! r
|
||||
case Left(f) ⇒ recipient ! Status.Failure(f)
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -40,137 +40,6 @@ import akka.util.{ Timeout, Duration }
|
|||
* ask(actor, message);
|
||||
* }}}
|
||||
*/
|
||||
package object pattern {
|
||||
|
||||
/**
|
||||
* Import this implicit conversion to gain `?` and `ask` methods on
|
||||
* [[akka.actor.ActorRef]], which will defer to the
|
||||
* `ask(actorRef, message)(timeout)` method defined here.
|
||||
*
|
||||
* {{{
|
||||
* import akka.pattern.ask
|
||||
*
|
||||
* val future = actor ? message // => ask(actor, message)
|
||||
* val future = actor ask message // => ask(actor, message)
|
||||
* val future = actor.ask(message)(timeout) // => ask(actor, message)(timeout)
|
||||
* }}}
|
||||
*
|
||||
* All of the above use an implicit [[akka.actor.Timeout]].
|
||||
*/
|
||||
implicit def ask(actorRef: ActorRef): AskSupport.AskableActorRef = new AskSupport.AskableActorRef(actorRef)
|
||||
|
||||
/**
|
||||
* Sends a message asynchronously and returns a [[akka.dispatch.Future]]
|
||||
* holding the eventual reply message; this means that the target actor
|
||||
* needs to send the result to the `sender` reference provided. The Future
|
||||
* will be completed with an [[akka.actor.AskTimeoutException]] after the
|
||||
* given timeout has expired; this is independent from any timeout applied
|
||||
* while awaiting a result for this future (i.e. in
|
||||
* `Await.result(..., timeout)`).
|
||||
*
|
||||
* <b>Warning:</b>
|
||||
* When using future callbacks, inside actors you need to carefully avoid closing over
|
||||
* the containing actor’s object, i.e. do not call methods or access mutable state
|
||||
* on the enclosing actor from within the callback. This would break the actor
|
||||
* encapsulation and may introduce synchronization bugs and race conditions because
|
||||
* the callback will be scheduled concurrently to the enclosing actor. Unfortunately
|
||||
* there is not yet a way to detect these illegal accesses at compile time.
|
||||
*
|
||||
* <b>Recommended usage:</b>
|
||||
*
|
||||
* {{{
|
||||
* val f = ask(worker, request)(timeout)
|
||||
* flow {
|
||||
* EnrichedRequest(request, f())
|
||||
* } pipeTo nextActor
|
||||
* }}}
|
||||
*
|
||||
* [see [[akka.dispatch.Future]] for a description of `flow`]
|
||||
*/
|
||||
def ask(actorRef: ActorRef, message: Any)(implicit timeout: Timeout): Future[Any] = actorRef match {
|
||||
case ref: InternalActorRef if ref.isTerminated ⇒
|
||||
actorRef.tell(message)
|
||||
Promise.failed(new AskTimeoutException("sending to terminated ref breaks promises"))(ref.provider.dispatcher)
|
||||
case ref: InternalActorRef ⇒
|
||||
val provider = ref.provider
|
||||
if (timeout.duration.length <= 0) {
|
||||
actorRef.tell(message)
|
||||
Promise.failed(new AskTimeoutException("not asking with negative timeout"))(provider.dispatcher)
|
||||
} else {
|
||||
val a = AskSupport.createAsker(provider, timeout)
|
||||
actorRef.tell(message, a)
|
||||
a.result
|
||||
}
|
||||
case _ ⇒ throw new IllegalArgumentException("incompatible ActorRef " + actorRef)
|
||||
}
|
||||
|
||||
/**
|
||||
* Import this implicit conversion to gain the `pipeTo` method on [[akka.dispatch.Future]]:
|
||||
*
|
||||
* {{{
|
||||
* import akka.pattern.pipeTo
|
||||
*
|
||||
* Future { doExpensiveCalc() } pipeTo nextActor
|
||||
* }}}
|
||||
*/
|
||||
implicit def pipeTo[T](future: Future[T]): PipeToSupport.PipeableFuture[T] = new PipeToSupport.PipeableFuture(future)
|
||||
|
||||
/**
|
||||
* Register an onComplete callback on this [[akka.dispatch.Future]] to send
|
||||
* the result to the given actor reference. Returns the original Future to
|
||||
* allow method chaining.
|
||||
*
|
||||
* <b>Recommended usage example:</b>
|
||||
*
|
||||
* {{{
|
||||
* val f = ask(worker, request)(timeout)
|
||||
* flow {
|
||||
* EnrichedRequest(request, f())
|
||||
* } pipeTo nextActor
|
||||
* }}}
|
||||
*
|
||||
* [see [[akka.dispatch.Future]] for a description of `flow`]
|
||||
*/
|
||||
def pipe[T](future: Future[T], recipient: ActorRef): Future[T] =
|
||||
future onComplete {
|
||||
case Right(r) ⇒ recipient ! r
|
||||
case Left(f) ⇒ recipient ! Status.Failure(f)
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a [[akka.dispatch.Future]] that will be completed with success (value `true`) when
|
||||
* existing messages of the target actor has been processed and the actor has been
|
||||
* terminated.
|
||||
*
|
||||
* Useful when you need to wait for termination or compose ordered termination of several actors.
|
||||
*
|
||||
* If the target actor isn't terminated within the timeout the [[akka.dispatch.Future]]
|
||||
* is completed with failure [[akka.actor.ActorTimeoutException]].
|
||||
*/
|
||||
def gracefulStop(target: ActorRef, timeout: Duration)(implicit system: ActorSystem): Future[Boolean] = {
|
||||
if (target.isTerminated) {
|
||||
Promise.successful(true)
|
||||
} else {
|
||||
val result = Promise[Boolean]()
|
||||
system.actorOf(Props(new Actor {
|
||||
// Terminated will be received when target has been stopped
|
||||
context watch target
|
||||
target ! PoisonPill
|
||||
// ReceiveTimeout will be received if nothing else is received within the timeout
|
||||
context setReceiveTimeout timeout
|
||||
|
||||
def receive = {
|
||||
case Terminated(a) if a == target ⇒
|
||||
result success true
|
||||
context stop self
|
||||
case ReceiveTimeout ⇒
|
||||
result failure new ActorTimeoutException(
|
||||
"Failed to stop [%s] within [%s]".format(target.path, context.receiveTimeout))
|
||||
context stop self
|
||||
}
|
||||
}))
|
||||
result
|
||||
}
|
||||
}
|
||||
package object pattern extends PipeToSupport with AskSupport with GracefulStopSupport {
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -766,7 +766,7 @@ trait ScatterGatherFirstCompletedLike { this: RouterConfig ⇒
|
|||
{
|
||||
case (sender, message) ⇒
|
||||
val provider: ActorRefProvider = routeeProvider.context.asInstanceOf[ActorCell].systemImpl.provider
|
||||
val asker = AskSupport.createAsker(provider, within)
|
||||
val asker = akka.pattern.createAsker(provider, within)
|
||||
asker.result.pipeTo(sender)
|
||||
toAll(asker, routeeProvider.routees)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -82,8 +82,8 @@ akka {
|
|||
|
||||
"support ask" in {
|
||||
Await.result(here ? "ping", timeout.duration) match {
|
||||
case ("pong", s: akka.pattern.AskSupport.PromiseActorRef) ⇒ // good
|
||||
case m ⇒ fail(m + " was not (pong, AskActorRef)")
|
||||
case ("pong", s: akka.pattern.PromiseActorRef) ⇒ // good
|
||||
case m ⇒ fail(m + " was not (pong, AskActorRef)")
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue