diff --git a/akka-actor-tests/src/test/scala/akka/pattern/AskSpec.scala b/akka-actor-tests/src/test/scala/akka/pattern/AskSpec.scala index 22c5a170fb..ef247d5ca1 100644 --- a/akka-actor-tests/src/test/scala/akka/pattern/AskSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/pattern/AskSpec.scala @@ -50,16 +50,17 @@ class AskSpec extends AkkaSpec { implicit val timeout = Timeout(5 seconds) val f = ask(null: ActorRef, 3.14) f.isCompleted should ===(true) + intercept[IllegalArgumentException] { Await.result(f, timeout.duration) - }.getMessage should ===("Unsupported recipient ActorRef type, question not sent to [null]. Sender[null] sent the message of type \"java.lang.Double\".") + }.getMessage should ===("Unsupported recipient type, question not sent to [null]. Message of type [java.lang.Double].") } "return broken promises on 0 timeout" in { implicit val timeout = Timeout(0 seconds) val echo = system.actorOf(Props(new Actor { def receive = { case x ⇒ sender() ! x } })) val f = echo ? "foo" - val expectedMsg = "Timeout length must be positive, question not sent to [%s]. Sender[null] sent the message of type \"java.lang.String\"." format echo + val expectedMsg = s"Timeout length must be positive, question not sent to [$echo]. Message of type [java.lang.String]." intercept[IllegalArgumentException] { Await.result(f, timeout.duration) }.getMessage should ===(expectedMsg) @@ -69,7 +70,7 @@ class AskSpec extends AkkaSpec { implicit val timeout = Timeout(-1000 seconds) val echo = system.actorOf(Props(new Actor { def receive = { case x ⇒ sender() ! x } })) val f = echo ? "foo" - val expectedMsg = "Timeout length must be positive, question not sent to [%s]. Sender[null] sent the message of type \"java.lang.String\"." format echo + val expectedMsg = s"Timeout length must be positive, question not sent to [$echo]. Message of type [java.lang.String]." intercept[IllegalArgumentException] { Await.result(f, timeout.duration) }.getMessage should ===(expectedMsg) @@ -103,10 +104,10 @@ class AskSpec extends AkkaSpec { "include message class information in AskTimeout" in { implicit val timeout = Timeout(0.5 seconds) - val f = system.actorOf(Props.empty) ? "noreply" + val f = system.actorOf(Props.empty) ? Integer.valueOf(17) intercept[AskTimeoutException] { Await.result(f, 1 second) - }.getMessage.contains("\"java.lang.String\"") should ===(true) + }.getMessage should include("[java.lang.Integer") } "work for ActorSelection" in { diff --git a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/scaladsl/ActorContextAskSpec.scala b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/scaladsl/ActorContextAskSpec.scala index 483bbecc57..3474d0251e 100644 --- a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/scaladsl/ActorContextAskSpec.scala +++ b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/scaladsl/ActorContextAskSpec.scala @@ -54,8 +54,8 @@ class ActorContextAskSpec extends ScalaTestWithActorTestKit(ActorContextAskSpec. // Timeout comes from TypedAkkaSpec ctx.ask(pingPong)(Ping) { - case Success(pong) ⇒ Pong(ctx.self.path.name + "1", Thread.currentThread().getName) - case Failure(ex) ⇒ throw ex + case Success(_) ⇒ Pong(ctx.self.path.name + "1", Thread.currentThread().getName) + case Failure(ex) ⇒ throw ex } Behaviors.receiveMessage { pong ⇒ @@ -87,7 +87,7 @@ class ActorContextAskSpec extends ScalaTestWithActorTestKit(ActorContextAskSpec. } )) - val snitch = Behaviors.setup[AnyRef] { (ctx) ⇒ + val snitch = Behaviors.setup[AnyRef] { ctx ⇒ ctx.ask(pingPong)(Ping) { case Success(msg) ⇒ throw new NotImplementedError(msg.toString) case Failure(x) ⇒ x @@ -115,17 +115,16 @@ class ActorContextAskSpec extends ScalaTestWithActorTestKit(ActorContextAskSpec. "deal with timeouts in ask" in { val probe = TestProbe[AnyRef]() - val snitch = Behaviors.setup[AnyRef] { (ctx) ⇒ + val snitch = Behaviors.setup[AnyRef] { ctx ⇒ ctx.ask[String, String](system.deadLetters)(ref ⇒ "boo") { case Success(m) ⇒ m case Failure(x) ⇒ x - }(20.millis, implicitly[ClassTag[String]]) + }(10.millis, implicitly[ClassTag[String]]) - Behaviors.receive { - case (_, msg) ⇒ - probe.ref ! msg - Behaviors.same + Behaviors.receiveMessage { msg ⇒ + probe.ref ! msg + Behaviors.same } } @@ -135,8 +134,33 @@ class ActorContextAskSpec extends ScalaTestWithActorTestKit(ActorContextAskSpec. } } - probe.expectMessageType[TimeoutException] + val exc = probe.expectMessageType[TimeoutException] + exc.getMessage should include("had already been terminated") + } + "must timeout if recipient doesn't reply in time" in { + val target = spawn(Behaviors.ignore[String]) + val probe = TestProbe[AnyRef]() + val snitch = Behaviors.setup[AnyRef] { ctx ⇒ + + ctx.ask[String, String](target)(_ ⇒ "bar") { + case Success(m) ⇒ m + case Failure(x) ⇒ x + }(10.millis, implicitly[ClassTag[String]]) + + Behaviors.receiveMessage { msg ⇒ + probe.ref ! msg + Behaviors.same + } + } + + spawn(snitch) + + val exc = probe.expectMessageType[TimeoutException] + exc.getMessage should startWith("Ask timed out on") + exc.getMessage should include(target.path.toString) + exc.getMessage should include("[java.lang.String]") // message class + exc.getMessage should include("[10 ms]") // timeout } } diff --git a/akka-actor/src/main/scala/akka/pattern/AskSupport.scala b/akka-actor/src/main/scala/akka/pattern/AskSupport.scala index 11ad2c34be..2e070126a2 100644 --- a/akka-actor/src/main/scala/akka/pattern/AskSupport.scala +++ b/akka-actor/src/main/scala/akka/pattern/AskSupport.scala @@ -18,7 +18,8 @@ import scala.util.{ Failure, Success } /** * This is what is used to complete a Future that is returned from an ask/? call, - * when it times out. + * when it times out. A typical reason for `AskTimeoutException` is that the recipient + * actor didn't send a reply. */ class AskTimeoutException(message: String, cause: Throwable) extends TimeoutException(message) { def this(message: String) = this(message, null: Throwable) @@ -50,11 +51,13 @@ trait AskSupport { /** * Sends a message asynchronously and returns a [[scala.concurrent.Future]] * holding the eventual reply message; this means that the target actor - * needs to send the result to the `sender` reference provided. The Future - * will be completed with an [[akka.pattern.AskTimeoutException]] after the + * needs to send the result to the `sender` reference provided. + * + * The Future will be completed with an [[akka.pattern.AskTimeoutException]] after the * given timeout has expired; this is independent from any timeout applied * while awaiting a result for this future (i.e. in - * `Await.result(..., timeout)`). + * `Await.result(..., timeout)`). A typical reason for `AskTimeoutException` is that the + * recipient actor didn't send a reply. * * Warning: * When using future callbacks, inside actors you need to carefully avoid closing over @@ -99,11 +102,13 @@ trait AskSupport { /** * Sends a message asynchronously and returns a [[scala.concurrent.Future]] * holding the eventual reply message; this means that the target actor - * needs to send the result to the `sender` reference provided. The Future - * will be completed with an [[akka.pattern.AskTimeoutException]] after the + * needs to send the result to the `sender` reference provided. + * + * The Future will be completed with an [[akka.pattern.AskTimeoutException]] after the * given timeout has expired; this is independent from any timeout applied * while awaiting a result for this future (i.e. in - * `Await.result(..., timeout)`). + * `Await.result(..., timeout)`). A typical reason for `AskTimeoutException` is that the + * recipient actor didn't send a reply. * * Warning: * When using future callbacks, inside actors you need to carefully avoid closing over @@ -161,11 +166,13 @@ trait ExplicitAskSupport { /** * Sends a message asynchronously and returns a [[scala.concurrent.Future]] * holding the eventual reply message; this means that the target actor - * needs to send the result to the `sender` reference provided. The Future - * will be completed with an [[akka.pattern.AskTimeoutException]] after the + * needs to send the result to the `sender` reference provided. + * + * The Future will be completed with an [[akka.pattern.AskTimeoutException]] after the * given timeout has expired; this is independent from any timeout applied * while awaiting a result for this future (i.e. in - * `Await.result(..., timeout)`). + * `Await.result(..., timeout)`). A typical reason for `AskTimeoutException` is that the + * recipient actor didn't send a reply. * * Warning: * When using future callbacks, inside actors you need to carefully avoid closing over @@ -215,11 +222,13 @@ trait ExplicitAskSupport { /** * Sends a message asynchronously and returns a [[scala.concurrent.Future]] * holding the eventual reply message; this means that the target actor - * needs to send the result to the `sender` reference provided. The Future - * will be completed with an [[akka.pattern.AskTimeoutException]] after the + * needs to send the result to the `sender` reference provided. + * + * The Future will be completed with an [[akka.pattern.AskTimeoutException]] after the * given timeout has expired; this is independent from any timeout applied * while awaiting a result for this future (i.e. in - * `Await.result(..., timeout)`). + * `Await.result(..., timeout)`). A typical reason for `AskTimeoutException` is that the + * recipient actor didn't send a reply. * * Warning: * When using future callbacks, inside actors you need to carefully avoid closing over @@ -257,6 +266,36 @@ object AskableActorRef { */ private[pattern] def $qmark$extension(actorRef: ActorRef, message: Any, timeout: Timeout): Future[Any] = actorRef.internalAsk(message, timeout, ActorRef.noSender) + + private def messagePartOfException(message: Any, sender: ActorRef): String = { + val msg = if (message == null) "unknown" else message + val wasSentBy = if (sender == ActorRef.noSender) "" else s" was sent by [$sender]" + s"Message of type [${msg.getClass.getName}]$wasSentBy." + } + + /** + * INTERNAL API + */ + @InternalApi private[akka] def negativeTimeoutException(recipient: Any, message: Any, sender: ActorRef): IllegalArgumentException = { + new IllegalArgumentException(s"Timeout length must be positive, question not sent to [$recipient]. " + + messagePartOfException(message, sender)) + } + + /** + * INTERNAL API + */ + @InternalApi private[akka] def recipientTerminatedExcpetion(recipient: Any, message: Any, sender: ActorRef): AskTimeoutException = { + new AskTimeoutException(s"Recipient [$recipient] had already been terminated. " + + messagePartOfException(message, sender)) + } + + /** + * INTERNAL API + */ + @InternalApi private[akka] def unsupportedRecipientType(recipient: Any, message: Any, sender: ActorRef): IllegalArgumentException = { + new IllegalArgumentException(s"Unsupported recipient type, question not sent to [$recipient]. " + + messagePartOfException(message, sender)) + } } /* @@ -288,16 +327,16 @@ final class AskableActorRef(val actorRef: ActorRef) extends AnyVal { private[pattern] def internalAsk(message: Any, timeout: Timeout, sender: ActorRef) = actorRef match { case ref: InternalActorRef if ref.isTerminated ⇒ actorRef ! message - Future.failed[Any](new AskTimeoutException(s"""Recipient[$actorRef] had already been terminated. Sender[$sender] sent the message of type "${message.getClass.getName}".""")) + Future.failed[Any](AskableActorRef.recipientTerminatedExcpetion(actorRef, message, sender)) case ref: InternalActorRef ⇒ if (timeout.duration.length <= 0) - Future.failed[Any](new IllegalArgumentException(s"""Timeout length must be positive, question not sent to [$actorRef]. Sender[$sender] sent the message of type "${message.getClass.getName}".""")) + Future.failed[Any](AskableActorRef.negativeTimeoutException(actorRef, message, sender)) else { val a = PromiseActorRef(ref.provider, timeout, targetName = actorRef, message.getClass.getName, sender) actorRef.tell(message, a) a.result.future } - case _ ⇒ Future.failed[Any](new IllegalArgumentException(s"""Unsupported recipient ActorRef type, question not sent to [$actorRef]. Sender[$sender] sent the message of type "${message.getClass.getName}".""")) + case _ ⇒ Future.failed[Any](AskableActorRef.unsupportedRecipientType(actorRef, message, sender)) } } @@ -320,11 +359,11 @@ final class ExplicitlyAskableActorRef(val actorRef: ActorRef) extends AnyVal { case ref: InternalActorRef if ref.isTerminated ⇒ val message = messageFactory(ref.provider.deadLetters) actorRef ! message - Future.failed[Any](new AskTimeoutException(s"""Recipient[$actorRef] had already been terminated. Sender[$sender] sent the message of type "${message.getClass.getName}".""")) + Future.failed[Any](AskableActorRef.recipientTerminatedExcpetion(actorRef, message, sender)) case ref: InternalActorRef ⇒ if (timeout.duration.length <= 0) { val message = messageFactory(ref.provider.deadLetters) - Future.failed[Any](new IllegalArgumentException(s"""Timeout length must be positive, question not sent to [$actorRef]. Sender[$sender] sent the message of type "${message.getClass.getName}".""")) + Future.failed[Any](AskableActorRef.negativeTimeoutException(actorRef, message, sender)) } else { val a = PromiseActorRef(ref.provider, timeout, targetName = actorRef, "unknown", sender) val message = messageFactory(a) @@ -333,10 +372,11 @@ final class ExplicitlyAskableActorRef(val actorRef: ActorRef) extends AnyVal { a.result.future } case _ if sender eq null ⇒ - Future.failed[Any](new IllegalArgumentException(s"""No recipient provided, question not sent to [$actorRef].""")) + Future.failed[Any](new IllegalArgumentException("No recipient for the reply was provided, " + + s"question not sent to [$actorRef].")) case _ ⇒ - val message = messageFactory(sender.asInstanceOf[InternalActorRef].provider.deadLetters) - Future.failed[Any](new IllegalArgumentException(s"""Unsupported recipient ActorRef type, question not sent to [$actorRef]. Sender[$sender] sent the message of type "${message.getClass.getName}".""")) + val message = if (sender == null) null else messageFactory(sender.asInstanceOf[InternalActorRef].provider.deadLetters) + Future.failed[Any](AskableActorRef.unsupportedRecipientType(actorRef, message, sender)) } } @@ -383,14 +423,13 @@ final class AskableActorSelection(val actorSel: ActorSelection) extends AnyVal { private[pattern] def internalAsk(message: Any, timeout: Timeout, sender: ActorRef): Future[Any] = actorSel.anchor match { case ref: InternalActorRef ⇒ if (timeout.duration.length <= 0) - Future.failed[Any]( - new IllegalArgumentException(s"""Timeout length must be positive, question not sent to [$actorSel]. Sender[$sender] sent the message of type "${message.getClass.getName}".""")) + Future.failed[Any](AskableActorRef.negativeTimeoutException(actorSel, message, sender)) else { val a = PromiseActorRef(ref.provider, timeout, targetName = actorSel, message.getClass.getName, sender) actorSel.tell(message, a) a.result.future } - case _ ⇒ Future.failed[Any](new IllegalArgumentException(s"""Unsupported recipient ActorRef type, question not sent to [$actorSel]. Sender[$sender] sent the message of type "${message.getClass.getName}".""")) + case _ ⇒ Future.failed[Any](AskableActorRef.unsupportedRecipientType(actorSel, message, sender)) } } @@ -412,8 +451,7 @@ final class ExplicitlyAskableActorSelection(val actorSel: ActorSelection) extend case ref: InternalActorRef ⇒ if (timeout.duration.length <= 0) { val message = messageFactory(ref.provider.deadLetters) - Future.failed[Any]( - new IllegalArgumentException(s"""Timeout length must be positive, question not sent to [$actorSel]. Sender[$sender] sent the message of type "${message.getClass.getName}".""")) + Future.failed[Any](AskableActorRef.negativeTimeoutException(actorSel, message, sender)) } else { val a = PromiseActorRef(ref.provider, timeout, targetName = actorSel, "unknown", sender) val message = messageFactory(a) @@ -422,10 +460,11 @@ final class ExplicitlyAskableActorSelection(val actorSel: ActorSelection) extend a.result.future } case _ if sender eq null ⇒ - Future.failed[Any](new IllegalArgumentException(s"""No recipient provided, question not sent to [$actorSel].""")) + Future.failed[Any](new IllegalArgumentException("No recipient for the reply was provided, " + + s"question not sent to [$actorSel].")) case _ ⇒ - val message = messageFactory(sender.asInstanceOf[InternalActorRef].provider.deadLetters) - Future.failed[Any](new IllegalArgumentException(s"""Unsupported recipient ActorRef type, question not sent to [$actorSel]. Sender[$sender] sent the message of type "${message.getClass.getName}".""")) + val message = if (sender == null) null else messageFactory(sender.asInstanceOf[InternalActorRef].provider.deadLetters) + Future.failed[Any](AskableActorRef.unsupportedRecipientType(actorSel, message, sender)) } } @@ -602,8 +641,14 @@ private[akka] object PromiseActorRef { val a = new PromiseActorRef(provider, result, messageClassName) implicit val ec = a.internalCallingThreadExecutionContext val f = scheduler.scheduleOnce(timeout.duration) { - result tryComplete Failure( - onTimeout(s"""Ask timed out on [$targetName] after [${timeout.duration.toMillis} ms]. Sender[$sender] sent message of type "${a.messageClassName}".""")) + result tryComplete { + val wasSentBy = if (sender == ActorRef.noSender) "" else s" was sent by [$sender]" + val messagePart = s"Message of type [${a.messageClassName}]$wasSentBy." + Failure( + onTimeout(s"Ask timed out on [$targetName] after [${timeout.duration.toMillis} ms]. " + + messagePart + + " A typical reason for `AskTimeoutException` is that the recipient actor didn't send a reply.")) + } } result.future onComplete { _ ⇒ try a.stop() finally f.cancel() } a diff --git a/akka-actor/src/main/scala/akka/pattern/Patterns.scala b/akka-actor/src/main/scala/akka/pattern/Patterns.scala index d7fc651153..da4f08981c 100644 --- a/akka-actor/src/main/scala/akka/pattern/Patterns.scala +++ b/akka-actor/src/main/scala/akka/pattern/Patterns.scala @@ -40,11 +40,13 @@ object Patterns { * Java API for `akka.pattern.ask`: * Sends a message asynchronously and returns a [[scala.concurrent.Future]] * holding the eventual reply message; this means that the target actor - * needs to send the result to the `sender` reference provided. The Future - * will be completed with an [[akka.pattern.AskTimeoutException]] after the + * needs to send the result to the `sender` reference provided. + * + * The Future will be completed with an [[akka.pattern.AskTimeoutException]] after the * given timeout has expired; this is independent from any timeout applied * while awaiting a result for this future (i.e. in - * `Await.result(..., timeout)`). + * `Await.result(..., timeout)`). A typical reason for `AskTimeoutException` is that the + * recipient actor didn't send a reply. * * Warning: * When using future callbacks, inside actors you need to carefully avoid closing over @@ -85,11 +87,13 @@ object Patterns { * Java API for `akka.pattern.ask`: * Sends a message asynchronously and returns a [[scala.concurrent.Future]] * holding the eventual reply message; this means that the target actor - * needs to send the result to the `sender` reference provided. The Future - * will be completed with an [[akka.pattern.AskTimeoutException]] after the + * needs to send the result to the `sender` reference provided. + * + * The Future will be completed with an [[akka.pattern.AskTimeoutException]] after the * given timeout has expired; this is independent from any timeout applied * while awaiting a result for this future (i.e. in - * `Await.result(..., timeout)`). + * `Await.result(..., timeout)`). A typical reason for `AskTimeoutException` is that the + * recipient actor didn't send a reply. * * Warning: * When using future callbacks, inside actors you need to carefully avoid closing over @@ -131,11 +135,13 @@ object Patterns { * Java API for `akka.pattern.ask`: * Sends a message asynchronously and returns a [[scala.concurrent.Future]] * holding the eventual reply message; this means that the target [[akka.actor.ActorSelection]] - * needs to send the result to the `sender` reference provided. The Future - * will be completed with an [[akka.pattern.AskTimeoutException]] after the + * needs to send the result to the `sender` reference provided. + * + * The Future will be completed with an [[akka.pattern.AskTimeoutException]] after the * given timeout has expired; this is independent from any timeout applied * while awaiting a result for this future (i.e. in - * `Await.result(..., timeout)`). + * `Await.result(..., timeout)`). A typical reason for `AskTimeoutException` is that the + * recipient actor didn't send a reply. * * Warning: * When using future callbacks, inside actors you need to carefully avoid closing over @@ -163,11 +169,13 @@ object Patterns { * Java API for `akka.pattern.ask`: * Sends a message asynchronously and returns a [[scala.concurrent.Future]] * holding the eventual reply message; this means that the target [[akka.actor.ActorSelection]] - * needs to send the result to the `sender` reference provided. The Future - * will be completed with an [[akka.pattern.AskTimeoutException]] after the + * needs to send the result to the `sender` reference provided. + * + * The Future will be completed with an [[akka.pattern.AskTimeoutException]] after the * given timeout has expired; this is independent from any timeout applied * while awaiting a result for this future (i.e. in - * `Await.result(..., timeout)`). + * `Await.result(..., timeout)`). A typical reason for `AskTimeoutException` is that the + * recipient actor didn't send a reply. * * Warning: * When using future callbacks, inside actors you need to carefully avoid closing over @@ -297,11 +305,13 @@ object PatternsCS { * Java API for `akka.pattern.ask`: * Sends a message asynchronously and returns a [[java.util.concurrent.CompletionStage]] * holding the eventual reply message; this means that the target actor - * needs to send the result to the `sender` reference provided. The CompletionStage - * will be completed with an [[akka.pattern.AskTimeoutException]] after the + * needs to send the result to the `sender` reference provided. + * + * The CompletionStage will be completed with an [[akka.pattern.AskTimeoutException]] after the * given timeout has expired; this is independent from any timeout applied * while awaiting a result for this future (i.e. in - * `Await.result(..., timeout)`). + * `Await.result(..., timeout)`). A typical reason for `AskTimeoutException` is that the + * recipient actor didn't send a reply. * * Warning: * When using future callbacks, inside actors you need to carefully avoid closing over @@ -326,11 +336,13 @@ object PatternsCS { * Java API for `akka.pattern.ask`: * Sends a message asynchronously and returns a [[java.util.concurrent.CompletionStage]] * holding the eventual reply message; this means that the target actor - * needs to send the result to the `sender` reference provided. The CompletionStage - * will be completed with an [[akka.pattern.AskTimeoutException]] after the + * needs to send the result to the `sender` reference provided. + * + * The CompletionStage will be completed with an [[akka.pattern.AskTimeoutException]] after the * given timeout has expired; this is independent from any timeout applied * while awaiting a result for this future (i.e. in - * `Await.result(..., timeout)`). + * `Await.result(..., timeout)`). A typical reason for `AskTimeoutException` is that the + * recipient actor didn't send a reply. * * Warning: * When using future callbacks, inside actors you need to carefully avoid closing over @@ -391,11 +403,13 @@ object PatternsCS { * Java API for `akka.pattern.ask`: * Sends a message asynchronously and returns a [[java.util.concurrent.CompletionStage]] * holding the eventual reply message; this means that the target actor - * needs to send the result to the `sender` reference provided. The CompletionStage - * will be completed with an [[akka.pattern.AskTimeoutException]] after the + * needs to send the result to the `sender` reference provided. + * + * The CompletionStage will be completed with an [[akka.pattern.AskTimeoutException]] after the * given timeout has expired; this is independent from any timeout applied * while awaiting a result for this future (i.e. in - * `Await.result(..., timeout)`). + * `Await.result(..., timeout)`). A typical reason for `AskTimeoutException` is that the + * recipient actor didn't send a reply. * * Warning: * When using future callbacks, inside actors you need to carefully avoid closing over @@ -437,11 +451,13 @@ object PatternsCS { * Java API for `akka.pattern.ask`: * Sends a message asynchronously and returns a [[java.util.concurrent.CompletionStage]] * holding the eventual reply message; this means that the target [[akka.actor.ActorSelection]] - * needs to send the result to the `sender` reference provided. The CompletionStage - * will be completed with an [[akka.pattern.AskTimeoutException]] after the + * needs to send the result to the `sender` reference provided. + * + * The CompletionStage will be completed with an [[akka.pattern.AskTimeoutException]] after the * given timeout has expired; this is independent from any timeout applied * while awaiting a result for this future (i.e. in - * `Await.result(..., timeout)`). + * `Await.result(..., timeout)`). A typical reason for `AskTimeoutException` is that the + * recipient actor didn't send a reply. * * Warning: * When using future callbacks, inside actors you need to carefully avoid closing over @@ -466,11 +482,13 @@ object PatternsCS { * Java API for `akka.pattern.ask`: * Sends a message asynchronously and returns a [[java.util.concurrent.CompletionStage]] * holding the eventual reply message; this means that the target [[akka.actor.ActorSelection]] - * needs to send the result to the `sender` reference provided. The CompletionStage - * will be completed with an [[akka.pattern.AskTimeoutException]] after the + * needs to send the result to the `sender` reference provided. + * + * The CompletionStage will be completed with an [[akka.pattern.AskTimeoutException]] after the * given timeout has expired; this is independent from any timeout applied * while awaiting a result for this future (i.e. in - * `Await.result(..., timeout)`). + * `Await.result(..., timeout)`). A typical reason for `AskTimeoutException` is that the + * recipient actor didn't send a reply. * * Warning: * When using future callbacks, inside actors you need to carefully avoid closing over @@ -494,11 +512,13 @@ object PatternsCS { * Java API for `akka.pattern.ask`: * Sends a message asynchronously and returns a [[java.util.concurrent.CompletionStage]] * holding the eventual reply message; this means that the target [[akka.actor.ActorSelection]] - * needs to send the result to the `sender` reference provided. The CompletionStage - * will be completed with an [[akka.pattern.AskTimeoutException]] after the + * needs to send the result to the `sender` reference provided. + * + * The CompletionStage will be completed with an [[akka.pattern.AskTimeoutException]] after the * given timeout has expired; this is independent from any timeout applied * while awaiting a result for this future (i.e. in - * `Await.result(..., timeout)`). + * `Await.result(..., timeout)`). A typical reason for `AskTimeoutException` is that the + * recipient actor didn't send a reply. * * Warning: * When using future callbacks, inside actors you need to carefully avoid closing over diff --git a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/internal/ClusterShardingImpl.scala b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/internal/ClusterShardingImpl.scala index 7a4478bd56..9f1c715c90 100644 --- a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/internal/ClusterShardingImpl.scala +++ b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/internal/ClusterShardingImpl.scala @@ -196,7 +196,7 @@ import akka.util.Timeout typeNames.putIfAbsent(typeKey.name, messageClassName) match { case spawnedMessageClassName: String if messageClassName != spawnedMessageClassName ⇒ - throw new IllegalArgumentException(s"[${typeKey.name}] already spawned for [$spawnedMessageClassName]") + throw new IllegalArgumentException(s"[${typeKey.name}] already started for [$spawnedMessageClassName]") case _ ⇒ () } @@ -204,11 +204,13 @@ import akka.util.Timeout } override def entityRefFor[M](typeKey: scaladsl.EntityTypeKey[M], entityId: String): scaladsl.EntityRef[M] = { - new EntityRefImpl[M](untypedSharding.shardRegion(typeKey.name), entityId, system.scheduler) + new EntityRefImpl[M](untypedSharding.shardRegion(typeKey.name), entityId, + typeKey.asInstanceOf[EntityTypeKeyImpl[M]], system.scheduler) } override def entityRefFor[M](typeKey: javadsl.EntityTypeKey[M], entityId: String): javadsl.EntityRef[M] = { - new EntityRefImpl[M](untypedSharding.shardRegion(typeKey.name), entityId, system.scheduler) + new EntityRefImpl[M](untypedSharding.shardRegion(typeKey.name), entityId, + typeKey.asInstanceOf[EntityTypeKeyImpl[M]], system.scheduler) } override def defaultShardAllocationStrategy(settings: ClusterShardingSettings): ShardAllocationStrategy = { @@ -223,7 +225,7 @@ import akka.util.Timeout * INTERNAL API */ @InternalApi private[akka] final class EntityRefImpl[M](shardRegion: akka.actor.ActorRef, entityId: String, - scheduler: Scheduler) + typeKey: EntityTypeKeyImpl[M], scheduler: Scheduler) extends javadsl.EntityRef[M] with scaladsl.EntityRef[M] with InternalRecipientRef[M] { override def tell(msg: M): Unit = @@ -250,16 +252,19 @@ import akka.util.Timeout if (untyped.isTerminated) ( adapt.ActorRefAdapter[U](untyped.provider.deadLetters), - Future.failed[U](new AskTimeoutException(s"Recipient[$untyped] had already been terminated.")), + Future.failed[U](new AskTimeoutException( + s"Recipient shard region of [${EntityRefImpl.this}] had already been terminated.")), null) else if (timeout.duration.length <= 0) ( adapt.ActorRefAdapter[U](untyped.provider.deadLetters), - Future.failed[U](new IllegalArgumentException(s"Timeout length must be positive, question not sent to [$untyped]")), + Future.failed[U](new IllegalArgumentException( + s"Timeout length must be positive, question not sent to [${EntityRefImpl.this}]")), null ) else { - val a = PromiseActorRef(untyped.provider, timeout, untyped, "unknown") + // note that the real messageClassName will be set afterwards, replyTo pattern + val a = PromiseActorRef(untyped.provider, timeout, targetName = EntityRefImpl.this, messageClassName = "unknown") val b = adapt.ActorRefAdapter[U](a) (b, a.result.future.asInstanceOf[Future[U]], a) } @@ -281,6 +286,8 @@ import akka.util.Timeout shardRegion.toTyped.asInstanceOf[InternalRecipientRef[_]].isTerminated } + override def toString: String = s"EntityRef($typeKey, $entityId)" + } /** diff --git a/akka-cluster-sharding-typed/src/test/scala/akka/cluster/sharding/typed/scaladsl/ClusterShardingSpec.scala b/akka-cluster-sharding-typed/src/test/scala/akka/cluster/sharding/typed/scaladsl/ClusterShardingSpec.scala index ded7df579a..bf409f2b98 100644 --- a/akka-cluster-sharding-typed/src/test/scala/akka/cluster/sharding/typed/scaladsl/ClusterShardingSpec.scala +++ b/akka-cluster-sharding-typed/src/test/scala/akka/cluster/sharding/typed/scaladsl/ClusterShardingSpec.scala @@ -26,6 +26,7 @@ import akka.cluster.sharding.typed.ShardingMessageExtractor import akka.cluster.typed.Cluster import akka.cluster.typed.Join import akka.cluster.typed.Leave +import akka.pattern.AskTimeoutException import akka.serialization.SerializerWithStringManifest import akka.util.Timeout import com.typesafe.config.ConfigFactory @@ -286,7 +287,7 @@ class ClusterShardingSpec extends ScalaTestWithActorTestKit(ClusterShardingSpec. IdStopPlz())) } - ex.getMessage should include("already spawned") + ex.getMessage should include("already started") } "EntityRef - tell" in { @@ -343,6 +344,27 @@ class ClusterShardingSpec extends ScalaTestWithActorTestKit(ClusterShardingSpec. aliceRef ! StopPlz() } + "EntityRef - AskTimeoutException" in { + val ignorantKey = EntityTypeKey[TestProtocol]("ignorant") + + sharding.start(ShardedEntity( + _ ⇒ Behaviors.ignore[TestProtocol], + ignorantKey, + StopPlz())) + + val ref = sharding.entityRefFor(ignorantKey, "sloppy") + + val reply = ref.ask(WhoAreYou)(Timeout(10.millis)) + val exc = reply.failed.futureValue + exc.getClass should ===(classOf[AskTimeoutException]) + exc.getMessage should startWith("Ask timed out on") + exc.getMessage should include(ignorantKey.toString) + exc.getMessage should include("sloppy") // the entity id + exc.getMessage should include(ref.toString) + exc.getMessage should include(s"[${classOf[WhoAreYou].getName}]") // message class + exc.getMessage should include("[10 ms]") // timeout + } + "handle untyped StartEntity message" in { // it is normally using envolopes, but the untyped StartEntity message can be sent internally, // e.g. for remember entities