diff --git a/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala b/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala index 510b6ecbef..2bb7575d2b 100644 --- a/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala @@ -194,15 +194,9 @@ class RoutingSpec extends AkkaSpec { } }) - val props = RoutedProps().withRoundRobinRouter.withLocalConnections(List(connection1)) - val actor = app.actorOf(props, "foo") + val actor = app.actorOf(RoutedProps().withRoundRobinRouter.withLocalConnections(List(connection1)), "foo") - try { - actor ? Broadcast(1) - fail() - } catch { - case e: RoutingException ⇒ - } + intercept[RoutingException] { actor ? Broadcast(1) } actor ! "end" doneLatch.await(5, TimeUnit.SECONDS) must be(true) diff --git a/akka-actor/src/main/scala/akka/actor/ActorRef.scala b/akka-actor/src/main/scala/akka/actor/ActorRef.scala index 3c9d2144ec..6d6e738602 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRef.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRef.scala @@ -368,23 +368,32 @@ class DeadLetterActorRef(app: AkkaApplication) extends MinimalActorRef { def ?(message: Any)(implicit timeout: Timeout): Future[Any] = brokenPromise } -abstract class AskActorRef(promise: Promise[Any], app: AkkaApplication) extends MinimalActorRef { +abstract class AskActorRef(app: AkkaApplication)(timeout: Timeout = app.AkkaConfig.ActorTimeout, dispatcher: MessageDispatcher = app.dispatcher) extends MinimalActorRef { + final val result = new DefaultPromise[Any](timeout)(dispatcher) - promise onComplete { _ ⇒ app.deathWatch.publish(Terminated(AskActorRef.this)); whenDone() } - promise onTimeout { _ ⇒ app.deathWatch.publish(Terminated(AskActorRef.this)); whenDone() } + { + val callback: Future[Any] ⇒ Unit = { _ ⇒ app.deathWatch.publish(Terminated(AskActorRef.this)); whenDone() } + result onComplete callback + result onTimeout callback + } protected def whenDone(): Unit - override protected[akka] def postMessageToMailbox(message: Any, sender: ActorRef): Unit = message match { - case akka.actor.Status.Success(r) ⇒ promise.completeWithResult(r) - case akka.actor.Status.Failure(f) ⇒ promise.completeWithException(f) - case other ⇒ promise.completeWithResult(other) + protected[akka] override def postMessageToMailbox(message: Any, sender: ActorRef): Unit = message match { + case akka.actor.Status.Success(r) ⇒ result.completeWithResult(r) + case akka.actor.Status.Failure(f) ⇒ result.completeWithException(f) + case other ⇒ result.completeWithResult(other) } - def ?(message: Any)(implicit timeout: Timeout): Future[Any] = - new KeptPromise[Any](Left(new ActorKilledException("Not possible to ask/? a reference to an ask/?.")))(app.dispatcher) + protected[akka] override def sendSystemMessage(message: SystemMessage): Unit = message match { + case _: Terminate ⇒ stop() + case _ ⇒ + } - override def isShutdown = promise.isCompleted || promise.isExpired + override def ?(message: Any)(implicit timeout: Timeout): Future[Any] = + new KeptPromise[Any](Left(new UnsupportedOperationException("Ask/? is not supported for %s".format(getClass.getName))))(dispatcher) - override def stop(): Unit = if (!isShutdown) promise.completeWithException(new ActorKilledException("Stopped")) + override def isShutdown = result.isCompleted || result.isExpired + + override def stop(): Unit = if (!isShutdown) result.completeWithException(new ActorKilledException("Stopped")) } diff --git a/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala b/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala index e823b7d1f6..81c4b66afc 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala @@ -223,12 +223,11 @@ class LocalActorRefProvider(val app: AkkaApplication) extends ActorRefProvider { import akka.dispatch.{ Future, Promise, DefaultPromise } (if (within == null) app.AkkaConfig.ActorTimeout else within) match { case t if t.duration.length <= 0 ⇒ new DefaultPromise[Any](0)(app.dispatcher) //Abort early if nonsensical timeout - case other ⇒ - val result = new DefaultPromise[Any](other)(app.dispatcher) - val a = new AskActorRef(result, app) { def whenDone() = actors.remove(this) } + case t ⇒ + val a = new AskActorRef(app)(timeout = t) { def whenDone() = actors.remove(this) } assert(actors.putIfAbsent(a.address, a) eq null) //If this fails, we're in deep trouble recipient.tell(message, a) - result + a.result } } } diff --git a/akka-actor/src/main/scala/akka/routing/Routing.scala b/akka-actor/src/main/scala/akka/routing/Routing.scala index ca7c60a619..899dcb7f5a 100644 --- a/akka-actor/src/main/scala/akka/routing/Routing.scala +++ b/akka-actor/src/main/scala/akka/routing/Routing.scala @@ -50,7 +50,7 @@ trait Router { * * @throws RoutingExceptionif something goes wrong while routing the message. */ - def route[T](message: Any, timeout: Timeout)(implicit sender: ActorRef): Future[T] + def route[T](message: Any, timeout: Timeout): Future[T] } /** @@ -98,7 +98,7 @@ abstract private[akka] class AbstractRoutedActorRef(val app: AkkaApplication, va override def postMessageToMailbox(message: Any, sender: ActorRef) = router.route(message)(sender) - override def ?(message: Any)(implicit timeout: Timeout): Future[Any] = app.provider.ask(message, this, timeout) + override def ?(message: Any)(implicit timeout: Timeout): Future[Any] = router.route(message, timeout) } /** @@ -139,6 +139,7 @@ trait BasicRouter extends Router { def route(message: Any)(implicit sender: ActorRef) = message match { case Routing.Broadcast(message) ⇒ + //it is a broadcast message, we are going to send to message to all connections. connectionManager.connections.iterable foreach { connection ⇒ try { @@ -165,7 +166,7 @@ trait BasicRouter extends Router { } } - def route[T](message: Any, timeout: Timeout)(implicit sender: ActorRef): Future[T] = message match { + def route[T](message: Any, timeout: Timeout): Future[T] = message match { case Routing.Broadcast(message) ⇒ throw new RoutingException("Broadcasting using '?'/'ask' is for the time being is not supported. Use ScatterGatherRouter.") case _ ⇒ @@ -340,7 +341,7 @@ trait ScatterGatherRouter extends BasicRouter with Serializable { */ protected def gather[S, G >: S](results: Iterable[Future[S]]): Future[G] - private def scatterGather[S, G >: S](message: Any, timeout: Timeout)(implicit sender: ActorRef): Future[G] = { + private def scatterGather[S, G >: S](message: Any, timeout: Timeout): Future[G] = { val responses = connectionManager.connections.iterable.flatMap { actor ⇒ try { if (actor.isShutdown) throw ActorInitializationException(actor, "For compatability - check death first", new Exception) // for stack trace @@ -357,9 +358,9 @@ trait ScatterGatherRouter extends BasicRouter with Serializable { else gather(responses) } - override def route[T](message: Any, timeout: Timeout)(implicit sender: ActorRef): Future[T] = message match { + override def route[T](message: Any, timeout: Timeout): Future[T] = message match { case Routing.Broadcast(message) ⇒ scatterGather(message, timeout) - case message ⇒ super.route(message, timeout)(sender) + case message ⇒ super.route(message, timeout) } }