diff --git a/akka-actor-tests/src/test/scala/akka/pattern/AskSpec.scala b/akka-actor-tests/src/test/scala/akka/pattern/AskSpec.scala index 191892c90a..3ab2c2a3bd 100644 --- a/akka-actor-tests/src/test/scala/akka/pattern/AskSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/pattern/AskSpec.scala @@ -44,14 +44,14 @@ class AskSpec extends AkkaSpec { f.isCompleted should be(true) intercept[IllegalArgumentException] { Await.result(f, timeout.duration) - }.getMessage should be("Unsupported recipient ActorRef type, question not sent to [null]") + }.getMessage should be("Unsupported recipient ActorRef type, question not sent to [null]. Sender[null] sent the message of type \"java.lang.Double\".") } "return broken promises on 0 timeout" in { implicit val timeout = Timeout(0 seconds) val echo = system.actorOf(Props(new Actor { def receive = { case x ⇒ sender() ! x } })) val f = echo ? "foo" - val expectedMsg = "Timeout length must not be negative, question not sent to [%s]" format echo + val expectedMsg = "Timeout length must not be negative, question not sent to [%s]. Sender[null] sent the message of type \"java.lang.String\"." format echo intercept[IllegalArgumentException] { Await.result(f, timeout.duration) }.getMessage should be(expectedMsg) @@ -61,7 +61,7 @@ class AskSpec extends AkkaSpec { implicit val timeout = Timeout(-1000 seconds) val echo = system.actorOf(Props(new Actor { def receive = { case x ⇒ sender() ! x } })) val f = echo ? "foo" - val expectedMsg = "Timeout length must not be negative, question not sent to [%s]" format echo + val expectedMsg = "Timeout length must not be negative, question not sent to [%s]. Sender[null] sent the message of type \"java.lang.String\"." format echo intercept[IllegalArgumentException] { Await.result(f, timeout.duration) }.getMessage should be(expectedMsg) @@ -84,6 +84,23 @@ class AskSpec extends AkkaSpec { }.getMessage should include(timeout.duration.toMillis.toString) } + "include sender information in AskTimeout" in { + implicit val timeout = Timeout(0.5 seconds) + implicit val sender = system.actorOf(Props.empty) + val f = system.actorOf(Props.empty) ? "noreply" + intercept[AskTimeoutException] { + Await.result(f, 1 second) + }.getMessage.contains(sender.toString) should be(true) + } + + "include message class information in AskTimeout" in { + implicit val timeout = Timeout(0.5 seconds) + val f = system.actorOf(Props.empty) ? "noreply" + intercept[AskTimeoutException] { + Await.result(f, 1 second) + }.getMessage.contains("\"java.lang.String\"") should be(true) + } + "work for ActorSelection" in { implicit val timeout = Timeout(5 seconds) import system.dispatcher diff --git a/akka-actor/src/main/scala/akka/pattern/AskSupport.scala b/akka-actor/src/main/scala/akka/pattern/AskSupport.scala index 3291d9766c..0340c46eb7 100644 --- a/akka-actor/src/main/scala/akka/pattern/AskSupport.scala +++ b/akka-actor/src/main/scala/akka/pattern/AskSupport.scala @@ -73,6 +73,8 @@ trait AskSupport { * */ def ask(actorRef: ActorRef, message: Any)(implicit timeout: Timeout): Future[Any] = actorRef ? message + def ask(actorRef: ActorRef, message: Any, sender: ActorRef)(implicit timeout: Timeout): Future[Any] = + actorRef.?(message)(timeout, sender) /** * Import this implicit conversion to gain `?` and `ask` methods on @@ -119,6 +121,8 @@ trait AskSupport { * */ def ask(actorSelection: ActorSelection, message: Any)(implicit timeout: Timeout): Future[Any] = actorSelection ? message + def ask(actorSelection: ActorSelection, message: Any, sender: ActorRef)(implicit timeout: Timeout): Future[Any] = + actorSelection.?(message)(timeout, sender) } /* @@ -126,22 +130,22 @@ trait AskSupport { */ final class AskableActorRef(val actorRef: ActorRef) extends AnyVal { - def ask(message: Any)(implicit timeout: Timeout): Future[Any] = actorRef match { + def ask(message: Any)(implicit timeout: Timeout, sender: ActorRef = Actor.noSender): Future[Any] = actorRef match { case ref: InternalActorRef if ref.isTerminated ⇒ actorRef ! message - Future.failed[Any](new AskTimeoutException(s"Recipient[$actorRef] had already been terminated.")) + Future.failed[Any](new AskTimeoutException(s"""Recipient[$actorRef] had already been terminated. Sender[$sender] sent the message of type "${message.getClass.getName}".""")) case ref: InternalActorRef ⇒ if (timeout.duration.length <= 0) - Future.failed[Any](new IllegalArgumentException(s"Timeout length must not be negative, question not sent to [$actorRef]")) + Future.failed[Any](new IllegalArgumentException(s"""Timeout length must not be negative, question not sent to [$actorRef]. Sender[$sender] sent the message of type "${message.getClass.getName}".""")) else { - val a = PromiseActorRef(ref.provider, timeout, targetName = actorRef.toString) + val a = PromiseActorRef(ref.provider, timeout, targetName = actorRef.toString, message, sender) actorRef.tell(message, a) a.result.future } - case _ ⇒ Future.failed[Any](new IllegalArgumentException(s"Unsupported recipient ActorRef type, question not sent to [$actorRef]")) + case _ ⇒ Future.failed[Any](new IllegalArgumentException(s"""Unsupported recipient ActorRef type, question not sent to [$actorRef]. Sender[$sender] sent the message of type "${message.getClass.getName}".""")) } - def ?(message: Any)(implicit timeout: Timeout): Future[Any] = ask(message)(timeout) + def ?(message: Any)(implicit timeout: Timeout, sender: ActorRef = Actor.noSender): Future[Any] = ask(message)(timeout, sender) } /* @@ -149,20 +153,20 @@ final class AskableActorRef(val actorRef: ActorRef) extends AnyVal { */ final class AskableActorSelection(val actorSel: ActorSelection) extends AnyVal { - def ask(message: Any)(implicit timeout: Timeout): Future[Any] = actorSel.anchor match { + def ask(message: Any)(implicit timeout: Timeout, sender: ActorRef = Actor.noSender): Future[Any] = actorSel.anchor match { case ref: InternalActorRef ⇒ if (timeout.duration.length <= 0) Future.failed[Any]( - new IllegalArgumentException(s"Timeout length must not be negative, question not sent to [$actorSel]")) + new IllegalArgumentException(s"""Timeout length must not be negative, question not sent to [$actorSel]. Sender[$sender] sent the message of type "${message.getClass.getName}".""")) else { - val a = PromiseActorRef(ref.provider, timeout, targetName = actorSel.toString) + val a = PromiseActorRef(ref.provider, timeout, targetName = actorSel.toString, message, sender) actorSel.tell(message, a) a.result.future } - case _ ⇒ Future.failed[Any](new IllegalArgumentException(s"Unsupported recipient ActorRef type, question not sent to [$actorSel]")) + case _ ⇒ Future.failed[Any](new IllegalArgumentException(s"""Unsupported recipient ActorRef type, question not sent to [$actorSel]. Sender[$sender] sent the message of type "${message.getClass.getName}".""")) } - def ?(message: Any)(implicit timeout: Timeout): Future[Any] = ask(message)(timeout) + def ?(message: Any)(implicit timeout: Timeout, sender: ActorRef = Actor.noSender): Future[Any] = ask(message)(timeout, sender) } /** @@ -324,13 +328,15 @@ private[akka] object PromiseActorRef { private case object Stopped private final case class StoppedWithPath(path: ActorPath) - def apply(provider: ActorRefProvider, timeout: Timeout, targetName: String): PromiseActorRef = { + def apply(provider: ActorRefProvider, timeout: Timeout, targetName: String, message: Any, sender: ActorRef = Actor.noSender): PromiseActorRef = { val result = Promise[Any]() val scheduler = provider.guardian.underlying.system.scheduler val a = new PromiseActorRef(provider, result) implicit val ec = a.internalCallingThreadExecutionContext + val messageClassName = message.getClass.getName val f = scheduler.scheduleOnce(timeout.duration) { - result tryComplete Failure(new AskTimeoutException(s"Ask timed out on [$targetName] after [${timeout.duration.toMillis} ms]")) + result tryComplete Failure( + new AskTimeoutException(s"""Ask timed out on [$targetName] after [${timeout.duration.toMillis} ms]. Sender[$sender] sent message of type "$messageClassName".""")) } result.future onComplete { _ ⇒ try a.stop() finally f.cancel() } a diff --git a/akka-actor/src/main/scala/akka/pattern/GracefulStopSupport.scala b/akka-actor/src/main/scala/akka/pattern/GracefulStopSupport.scala index ed60682298..af64d0fef6 100644 --- a/akka-actor/src/main/scala/akka/pattern/GracefulStopSupport.scala +++ b/akka-actor/src/main/scala/akka/pattern/GracefulStopSupport.scala @@ -49,7 +49,7 @@ trait GracefulStopSupport { if (target.isTerminated) Future successful true else { val internalTarget = target.asInstanceOf[InternalActorRef] - val ref = PromiseActorRef(internalTarget.provider, Timeout(timeout), targetName = target.toString) + val ref = PromiseActorRef(internalTarget.provider, Timeout(timeout), targetName = target.toString, message = stopMessage) internalTarget.sendSystemMessage(Watch(internalTarget, ref)) target.tell(stopMessage, Actor.noSender) ref.result.future.transform( diff --git a/akka-remote/src/main/scala/akka/remote/transport/ThrottlerTransportAdapter.scala b/akka-remote/src/main/scala/akka/remote/transport/ThrottlerTransportAdapter.scala index 02ad18ff28..4d2c1a77a6 100644 --- a/akka-remote/src/main/scala/akka/remote/transport/ThrottlerTransportAdapter.scala +++ b/akka-remote/src/main/scala/akka/remote/transport/ThrottlerTransportAdapter.scala @@ -298,7 +298,7 @@ private[transport] class ThrottlerManager(wrappedTransport: Transport) extends A if (target.isTerminated) Future successful SetThrottleAck else { val internalTarget = target.asInstanceOf[InternalActorRef] - val ref = PromiseActorRef(internalTarget.provider, timeout, target.toString) + val ref = PromiseActorRef(internalTarget.provider, timeout, target.toString, mode) internalTarget.sendSystemMessage(Watch(internalTarget, ref)) target.tell(mode, ref) ref.result.future.transform({