diff --git a/akka-actor/src/main/scala/akka/actor/ActorRef.scala b/akka-actor/src/main/scala/akka/actor/ActorRef.scala index c4009fdbb4..b689aa5370 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRef.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRef.scala @@ -466,14 +466,13 @@ class AskTimeoutException(message: String, cause: Throwable) extends TimeoutExce def this(message: String) = this(message, null: Throwable) } -class AskActorRef( +private[akka] final class PromiseActorRef( val path: ActorPath, override val getParent: InternalActorRef, - val dispatcher: MessageDispatcher, + private final val result: Promise[Any], val deathWatch: DeathWatch) extends MinimalActorRef { final val running = new AtomicBoolean(true) - final val result = Promise[Any]()(dispatcher) override def !(message: Any)(implicit sender: ActorRef = null): Unit = if (running.get) message match { case Status.Success(r) ⇒ result.success(r) diff --git a/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala b/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala index 861a234db3..0b3215cd60 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala @@ -104,7 +104,7 @@ trait ActorRefProvider { * Create AskActorRef and register it properly so it can be serialized/deserialized; * caller needs to send the message. */ - def ask(within: Timeout): Option[AskActorRef] + def ask(result: Promise[Any], within: Timeout): Option[ActorRef] /** * This Future is completed upon termination of this ActorRefProvider, which @@ -494,15 +494,14 @@ class LocalActorRefProvider( } } - def ask(within: Timeout): Option[AskActorRef] = { + def ask(result: Promise[Any], within: Timeout): Option[ActorRef] = { (if (within == null) settings.ActorTimeout else within) match { case t if t.duration.length <= 0 ⇒ None case t ⇒ val path = tempPath() val name = path.name - val a = new AskActorRef(path, tempContainer, dispatcher, deathWatch) + val a = new PromiseActorRef(path, tempContainer, result, deathWatch) tempContainer.addChild(name, a) - val result = a.result val f = dispatcher.prerequisites.scheduler.scheduleOnce(t.duration) { result.failure(new AskTimeoutException("Timed out")) } result onComplete { _ ⇒ try { a.stop(); f.cancel() } @@ -510,6 +509,24 @@ class LocalActorRefProvider( } Some(a) + + // Alternative implementation: + // Create a full-blown actor to complete the promise. + // This would also work but not as efficient as PromiseActorRef. + //val b = actorOf(system, Props(new Actor { + // def receive = { + // case Status.Success(r) ⇒ result.success(r) + // case Status.Failure(f) ⇒ result.failure(f) + // case other ⇒ result.success(other) + // } + //}), systemGuardian, systemGuardian.path / "promise" / tempName(), false, None) + //val ff = system.scheduler.scheduleOnce(t.duration) { b.stop() } + //result onComplete { _ ⇒ + // b.stop() + // ff.cancel() + //} + // + //Some(b) } } } diff --git a/akka-actor/src/main/scala/akka/dispatch/Future.scala b/akka-actor/src/main/scala/akka/dispatch/Future.scala index c70ec32c43..fc4225600d 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Future.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Future.scala @@ -56,14 +56,14 @@ object Futures { def ask(actor: ActorRef, message: Any)(implicit timeout: Timeout): Future[Any] = { val provider = actor.asInstanceOf[InternalActorRef].provider - provider.ask(timeout) match { + val promise = Promise[Any]()(provider.dispatcher) + provider.ask(promise, timeout) match { case Some(a) ⇒ actor.!(message)(a) - a.result case None ⇒ actor.!(message)(null) - Promise[Any]()(provider.dispatcher) } + promise } def ask(actor: ActorRef, message: Any, timeout: Timeout)(implicit ignore: Int = 0): Future[Any] = diff --git a/akka-actor/src/main/scala/akka/routing/Routing.scala b/akka-actor/src/main/scala/akka/routing/Routing.scala index 0c02952b3e..36f5fe9670 100644 --- a/akka-actor/src/main/scala/akka/routing/Routing.scala +++ b/akka-actor/src/main/scala/akka/routing/Routing.scala @@ -8,6 +8,7 @@ import java.util.concurrent.atomic.AtomicInteger import scala.collection.JavaConversions._ import akka.util.{ Duration, Timeout } import akka.config.ConfigurationException +import akka.dispatch.Promise /** * A RoutedActorRef is an ActorRef that has a set of connected ActorRef and it uses a Router to @@ -405,8 +406,10 @@ trait ScatterGatherFirstCompletedLike { this: RouterConfig ⇒ { case (sender, message) ⇒ - val asker = context.asInstanceOf[ActorCell].systemImpl.provider.ask(Timeout(within)).get - asker.result.pipeTo(sender) + val provider: ActorRefProvider = context.asInstanceOf[ActorCell].systemImpl.provider + val promise = Promise[Any]()(provider.dispatcher) + val asker = provider.ask(promise, Timeout(within)).get + promise.pipeTo(sender) message match { case _ ⇒ toAll(asker, ref.routees) } diff --git a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala index 6f11d4321c..5b38996d3b 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala @@ -138,7 +138,7 @@ class RemoteActorRefProvider( def actorFor(ref: InternalActorRef, path: Iterable[String]): InternalActorRef = local.actorFor(ref, path) - def ask(within: Timeout): Option[AskActorRef] = local.ask(within) + def ask(result: Promise[Any], within: Timeout): Option[ActorRef] = local.ask(result, within) /** * Using (checking out) actor on a specific node. diff --git a/akka-remote/src/test/scala/akka/remote/RemoteCommunicationSpec.scala b/akka-remote/src/test/scala/akka/remote/RemoteCommunicationSpec.scala index dd62ae48e2..0be7c0f361 100644 --- a/akka-remote/src/test/scala/akka/remote/RemoteCommunicationSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/RemoteCommunicationSpec.scala @@ -82,8 +82,8 @@ akka { "support ask" in { Await.result(here ? "ping", timeout.duration) match { - case ("pong", s: AskActorRef) ⇒ // good - case m ⇒ fail(m + " was not (pong, AskActorRef)") + case ("pong", s: PromiseActorRef) ⇒ // good + case m ⇒ fail(m + " was not (pong, AskActorRef)") } }