diff --git a/akka-actor/src/main/scala/akka/pattern/AskSupport.scala b/akka-actor/src/main/scala/akka/pattern/AskSupport.scala index 3e637fc81d..46ccc25307 100644 --- a/akka-actor/src/main/scala/akka/pattern/AskSupport.scala +++ b/akka-actor/src/main/scala/akka/pattern/AskSupport.scala @@ -22,7 +22,69 @@ class AskTimeoutException(message: String, cause: Throwable) extends TimeoutExce /** * This object contains implementation details of the “ask” pattern. */ -object AskSupport { +trait AskSupport { + + /** + * Import this implicit conversion to gain `?` and `ask` methods on + * [[akka.actor.ActorRef]], which will defer to the + * `ask(actorRef, message)(timeout)` method defined here. + * + * {{{ + * import akka.pattern.ask + * + * val future = actor ? message // => ask(actor, message) + * val future = actor ask message // => ask(actor, message) + * val future = actor.ask(message)(timeout) // => ask(actor, message)(timeout) + * }}} + * + * All of the above use an implicit [[akka.actor.Timeout]]. + */ + implicit def ask(actorRef: ActorRef): AskableActorRef = new AskableActorRef(actorRef) + + /** + * Sends a message asynchronously and returns a [[akka.dispatch.Future]] + * holding the eventual reply message; this means that the target actor + * needs to send the result to the `sender` reference provided. The Future + * will be completed with an [[akka.actor.AskTimeoutException]] after the + * given timeout has expired; this is independent from any timeout applied + * while awaiting a result for this future (i.e. in + * `Await.result(..., timeout)`). + * + * Warning: + * When using future callbacks, inside actors you need to carefully avoid closing over + * the containing actor’s object, i.e. do not call methods or access mutable state + * on the enclosing actor from within the callback. This would break the actor + * encapsulation and may introduce synchronization bugs and race conditions because + * the callback will be scheduled concurrently to the enclosing actor. Unfortunately + * there is not yet a way to detect these illegal accesses at compile time. + * + * Recommended usage: + * + * {{{ + * val f = ask(worker, request)(timeout) + * flow { + * EnrichedRequest(request, f()) + * } pipeTo nextActor + * }}} + * + * [see [[akka.dispatch.Future]] for a description of `flow`] + */ + def ask(actorRef: ActorRef, message: Any)(implicit timeout: Timeout): Future[Any] = actorRef match { + case ref: InternalActorRef if ref.isTerminated ⇒ + actorRef.tell(message) + Promise.failed(new AskTimeoutException("sending to terminated ref breaks promises"))(ref.provider.dispatcher) + case ref: InternalActorRef ⇒ + val provider = ref.provider + if (timeout.duration.length <= 0) { + actorRef.tell(message) + Promise.failed(new AskTimeoutException("not asking with negative timeout"))(provider.dispatcher) + } else { + val a = createAsker(provider, timeout) + actorRef.tell(message, a) + a.result + } + case _ ⇒ throw new IllegalArgumentException("incompatible ActorRef " + actorRef) + } /** * Implementation detail of the “ask” pattern enrichment of ActorRef diff --git a/akka-actor/src/main/scala/akka/pattern/GracefulStopSupport.scala b/akka-actor/src/main/scala/akka/pattern/GracefulStopSupport.scala new file mode 100644 index 0000000000..d6fbd31c1e --- /dev/null +++ b/akka-actor/src/main/scala/akka/pattern/GracefulStopSupport.scala @@ -0,0 +1,47 @@ +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ + +package akka.pattern + +import akka.actor.{ ActorRef, Actor, ActorSystem, Props, PoisonPill, Terminated, ReceiveTimeout, ActorTimeoutException } +import akka.dispatch.{ Promise, Future } +import akka.util.Duration + +trait GracefulStopSupport { + /** + * Returns a [[akka.dispatch.Future]] that will be completed with success (value `true`) when + * existing messages of the target actor has been processed and the actor has been + * terminated. + * + * Useful when you need to wait for termination or compose ordered termination of several actors. + * + * If the target actor isn't terminated within the timeout the [[akka.dispatch.Future]] + * is completed with failure [[akka.actor.ActorTimeoutException]]. + */ + def gracefulStop(target: ActorRef, timeout: Duration)(implicit system: ActorSystem): Future[Boolean] = { + if (target.isTerminated) { + Promise.successful(true) + } else { + val result = Promise[Boolean]() + system.actorOf(Props(new Actor { + // Terminated will be received when target has been stopped + context watch target + target ! PoisonPill + // ReceiveTimeout will be received if nothing else is received within the timeout + context setReceiveTimeout timeout + + def receive = { + case Terminated(a) if a == target ⇒ + result success true + context stop self + case ReceiveTimeout ⇒ + result failure new ActorTimeoutException( + "Failed to stop [%s] within [%s]".format(target.path, context.receiveTimeout)) + context stop self + } + })) + result + } + } +} \ No newline at end of file diff --git a/akka-actor/src/main/scala/akka/pattern/PipeToSupport.scala b/akka-actor/src/main/scala/akka/pattern/PipeToSupport.scala index f386209458..fef4a91ee8 100644 --- a/akka-actor/src/main/scala/akka/pattern/PipeToSupport.scala +++ b/akka-actor/src/main/scala/akka/pattern/PipeToSupport.scala @@ -3,13 +3,46 @@ */ package akka.pattern -import akka.actor.ActorRef import akka.dispatch.Future +import akka.actor.{ Status, ActorRef } -object PipeToSupport { +trait PipeToSupport { - class PipeableFuture[T](val future: Future[T]) { + final class PipeableFuture[T](val future: Future[T]) { def pipeTo(actorRef: ActorRef): Future[T] = akka.pattern.pipe(future, actorRef) } + /** + * Import this implicit conversion to gain the `pipeTo` method on [[akka.dispatch.Future]]: + * + * {{{ + * import akka.pattern.pipeTo + * + * Future { doExpensiveCalc() } pipeTo nextActor + * }}} + */ + implicit def pipeTo[T](future: Future[T]): PipeableFuture[T] = new PipeableFuture(future) + + /** + * Register an onComplete callback on this [[akka.dispatch.Future]] to send + * the result to the given actor reference. Returns the original Future to + * allow method chaining. + * + * Recommended usage example: + * + * {{{ + * val f = ask(worker, request)(timeout) + * flow { + * EnrichedRequest(request, f()) + * } pipeTo nextActor + * }}} + * + * [see [[akka.dispatch.Future]] for a description of `flow`] + */ + def pipe[T](future: Future[T], recipient: ActorRef): Future[T] = + future onComplete { + case Right(r) ⇒ recipient ! r + case Left(f) ⇒ recipient ! Status.Failure(f) + } + } \ No newline at end of file diff --git a/akka-actor/src/main/scala/akka/pattern/package.scala b/akka-actor/src/main/scala/akka/pattern/package.scala index 2a8c03229f..d223bf2a32 100644 --- a/akka-actor/src/main/scala/akka/pattern/package.scala +++ b/akka-actor/src/main/scala/akka/pattern/package.scala @@ -40,137 +40,6 @@ import akka.util.{ Timeout, Duration } * ask(actor, message); * }}} */ -package object pattern { - - /** - * Import this implicit conversion to gain `?` and `ask` methods on - * [[akka.actor.ActorRef]], which will defer to the - * `ask(actorRef, message)(timeout)` method defined here. - * - * {{{ - * import akka.pattern.ask - * - * val future = actor ? message // => ask(actor, message) - * val future = actor ask message // => ask(actor, message) - * val future = actor.ask(message)(timeout) // => ask(actor, message)(timeout) - * }}} - * - * All of the above use an implicit [[akka.actor.Timeout]]. - */ - implicit def ask(actorRef: ActorRef): AskSupport.AskableActorRef = new AskSupport.AskableActorRef(actorRef) - - /** - * Sends a message asynchronously and returns a [[akka.dispatch.Future]] - * holding the eventual reply message; this means that the target actor - * needs to send the result to the `sender` reference provided. The Future - * will be completed with an [[akka.actor.AskTimeoutException]] after the - * given timeout has expired; this is independent from any timeout applied - * while awaiting a result for this future (i.e. in - * `Await.result(..., timeout)`). - * - * Warning: - * When using future callbacks, inside actors you need to carefully avoid closing over - * the containing actor’s object, i.e. do not call methods or access mutable state - * on the enclosing actor from within the callback. This would break the actor - * encapsulation and may introduce synchronization bugs and race conditions because - * the callback will be scheduled concurrently to the enclosing actor. Unfortunately - * there is not yet a way to detect these illegal accesses at compile time. - * - * Recommended usage: - * - * {{{ - * val f = ask(worker, request)(timeout) - * flow { - * EnrichedRequest(request, f()) - * } pipeTo nextActor - * }}} - * - * [see [[akka.dispatch.Future]] for a description of `flow`] - */ - def ask(actorRef: ActorRef, message: Any)(implicit timeout: Timeout): Future[Any] = actorRef match { - case ref: InternalActorRef if ref.isTerminated ⇒ - actorRef.tell(message) - Promise.failed(new AskTimeoutException("sending to terminated ref breaks promises"))(ref.provider.dispatcher) - case ref: InternalActorRef ⇒ - val provider = ref.provider - if (timeout.duration.length <= 0) { - actorRef.tell(message) - Promise.failed(new AskTimeoutException("not asking with negative timeout"))(provider.dispatcher) - } else { - val a = AskSupport.createAsker(provider, timeout) - actorRef.tell(message, a) - a.result - } - case _ ⇒ throw new IllegalArgumentException("incompatible ActorRef " + actorRef) - } - - /** - * Import this implicit conversion to gain the `pipeTo` method on [[akka.dispatch.Future]]: - * - * {{{ - * import akka.pattern.pipeTo - * - * Future { doExpensiveCalc() } pipeTo nextActor - * }}} - */ - implicit def pipeTo[T](future: Future[T]): PipeToSupport.PipeableFuture[T] = new PipeToSupport.PipeableFuture(future) - - /** - * Register an onComplete callback on this [[akka.dispatch.Future]] to send - * the result to the given actor reference. Returns the original Future to - * allow method chaining. - * - * Recommended usage example: - * - * {{{ - * val f = ask(worker, request)(timeout) - * flow { - * EnrichedRequest(request, f()) - * } pipeTo nextActor - * }}} - * - * [see [[akka.dispatch.Future]] for a description of `flow`] - */ - def pipe[T](future: Future[T], recipient: ActorRef): Future[T] = - future onComplete { - case Right(r) ⇒ recipient ! r - case Left(f) ⇒ recipient ! Status.Failure(f) - } - - /** - * Returns a [[akka.dispatch.Future]] that will be completed with success (value `true`) when - * existing messages of the target actor has been processed and the actor has been - * terminated. - * - * Useful when you need to wait for termination or compose ordered termination of several actors. - * - * If the target actor isn't terminated within the timeout the [[akka.dispatch.Future]] - * is completed with failure [[akka.actor.ActorTimeoutException]]. - */ - def gracefulStop(target: ActorRef, timeout: Duration)(implicit system: ActorSystem): Future[Boolean] = { - if (target.isTerminated) { - Promise.successful(true) - } else { - val result = Promise[Boolean]() - system.actorOf(Props(new Actor { - // Terminated will be received when target has been stopped - context watch target - target ! PoisonPill - // ReceiveTimeout will be received if nothing else is received within the timeout - context setReceiveTimeout timeout - - def receive = { - case Terminated(a) if a == target ⇒ - result success true - context stop self - case ReceiveTimeout ⇒ - result failure new ActorTimeoutException( - "Failed to stop [%s] within [%s]".format(target.path, context.receiveTimeout)) - context stop self - } - })) - result - } - } +package object pattern extends PipeToSupport with AskSupport with GracefulStopSupport { } diff --git a/akka-actor/src/main/scala/akka/routing/Routing.scala b/akka-actor/src/main/scala/akka/routing/Routing.scala index e2e6f14db7..46dcbde8d7 100644 --- a/akka-actor/src/main/scala/akka/routing/Routing.scala +++ b/akka-actor/src/main/scala/akka/routing/Routing.scala @@ -766,7 +766,7 @@ trait ScatterGatherFirstCompletedLike { this: RouterConfig ⇒ { case (sender, message) ⇒ val provider: ActorRefProvider = routeeProvider.context.asInstanceOf[ActorCell].systemImpl.provider - val asker = AskSupport.createAsker(provider, within) + val asker = akka.pattern.createAsker(provider, within) asker.result.pipeTo(sender) toAll(asker, routeeProvider.routees) } diff --git a/akka-remote/src/test/scala/akka/remote/RemoteCommunicationSpec.scala b/akka-remote/src/test/scala/akka/remote/RemoteCommunicationSpec.scala index 0f6898a239..88d80d6d81 100644 --- a/akka-remote/src/test/scala/akka/remote/RemoteCommunicationSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/RemoteCommunicationSpec.scala @@ -82,8 +82,8 @@ akka { "support ask" in { Await.result(here ? "ping", timeout.duration) match { - case ("pong", s: akka.pattern.AskSupport.PromiseActorRef) ⇒ // good - case m ⇒ fail(m + " was not (pong, AskActorRef)") + case ("pong", s: akka.pattern.PromiseActorRef) ⇒ // good + case m ⇒ fail(m + " was not (pong, AskActorRef)") } }