Improve AskTimeoutException message, #25644
This commit is contained in:
parent
5a4ec8a557
commit
f2ad055c77
6 changed files with 203 additions and 84 deletions
|
|
@ -18,7 +18,8 @@ import scala.util.{ Failure, Success }
|
|||
|
||||
/**
|
||||
* This is what is used to complete a Future that is returned from an ask/? call,
|
||||
* when it times out.
|
||||
* when it times out. A typical reason for `AskTimeoutException` is that the recipient
|
||||
* actor didn't send a reply.
|
||||
*/
|
||||
class AskTimeoutException(message: String, cause: Throwable) extends TimeoutException(message) {
|
||||
def this(message: String) = this(message, null: Throwable)
|
||||
|
|
@ -50,11 +51,13 @@ trait AskSupport {
|
|||
/**
|
||||
* Sends a message asynchronously and returns a [[scala.concurrent.Future]]
|
||||
* holding the eventual reply message; this means that the target actor
|
||||
* needs to send the result to the `sender` reference provided. The Future
|
||||
* will be completed with an [[akka.pattern.AskTimeoutException]] after the
|
||||
* needs to send the result to the `sender` reference provided.
|
||||
*
|
||||
* The Future will be completed with an [[akka.pattern.AskTimeoutException]] after the
|
||||
* given timeout has expired; this is independent from any timeout applied
|
||||
* while awaiting a result for this future (i.e. in
|
||||
* `Await.result(..., timeout)`).
|
||||
* `Await.result(..., timeout)`). A typical reason for `AskTimeoutException` is that the
|
||||
* recipient actor didn't send a reply.
|
||||
*
|
||||
* <b>Warning:</b>
|
||||
* When using future callbacks, inside actors you need to carefully avoid closing over
|
||||
|
|
@ -99,11 +102,13 @@ trait AskSupport {
|
|||
/**
|
||||
* Sends a message asynchronously and returns a [[scala.concurrent.Future]]
|
||||
* holding the eventual reply message; this means that the target actor
|
||||
* needs to send the result to the `sender` reference provided. The Future
|
||||
* will be completed with an [[akka.pattern.AskTimeoutException]] after the
|
||||
* needs to send the result to the `sender` reference provided.
|
||||
*
|
||||
* The Future will be completed with an [[akka.pattern.AskTimeoutException]] after the
|
||||
* given timeout has expired; this is independent from any timeout applied
|
||||
* while awaiting a result for this future (i.e. in
|
||||
* `Await.result(..., timeout)`).
|
||||
* `Await.result(..., timeout)`). A typical reason for `AskTimeoutException` is that the
|
||||
* recipient actor didn't send a reply.
|
||||
*
|
||||
* <b>Warning:</b>
|
||||
* When using future callbacks, inside actors you need to carefully avoid closing over
|
||||
|
|
@ -161,11 +166,13 @@ trait ExplicitAskSupport {
|
|||
/**
|
||||
* Sends a message asynchronously and returns a [[scala.concurrent.Future]]
|
||||
* holding the eventual reply message; this means that the target actor
|
||||
* needs to send the result to the `sender` reference provided. The Future
|
||||
* will be completed with an [[akka.pattern.AskTimeoutException]] after the
|
||||
* needs to send the result to the `sender` reference provided.
|
||||
*
|
||||
* The Future will be completed with an [[akka.pattern.AskTimeoutException]] after the
|
||||
* given timeout has expired; this is independent from any timeout applied
|
||||
* while awaiting a result for this future (i.e. in
|
||||
* `Await.result(..., timeout)`).
|
||||
* `Await.result(..., timeout)`). A typical reason for `AskTimeoutException` is that the
|
||||
* recipient actor didn't send a reply.
|
||||
*
|
||||
* <b>Warning:</b>
|
||||
* When using future callbacks, inside actors you need to carefully avoid closing over
|
||||
|
|
@ -215,11 +222,13 @@ trait ExplicitAskSupport {
|
|||
/**
|
||||
* Sends a message asynchronously and returns a [[scala.concurrent.Future]]
|
||||
* holding the eventual reply message; this means that the target actor
|
||||
* needs to send the result to the `sender` reference provided. The Future
|
||||
* will be completed with an [[akka.pattern.AskTimeoutException]] after the
|
||||
* needs to send the result to the `sender` reference provided.
|
||||
*
|
||||
* The Future will be completed with an [[akka.pattern.AskTimeoutException]] after the
|
||||
* given timeout has expired; this is independent from any timeout applied
|
||||
* while awaiting a result for this future (i.e. in
|
||||
* `Await.result(..., timeout)`).
|
||||
* `Await.result(..., timeout)`). A typical reason for `AskTimeoutException` is that the
|
||||
* recipient actor didn't send a reply.
|
||||
*
|
||||
* <b>Warning:</b>
|
||||
* When using future callbacks, inside actors you need to carefully avoid closing over
|
||||
|
|
@ -257,6 +266,36 @@ object AskableActorRef {
|
|||
*/
|
||||
private[pattern] def $qmark$extension(actorRef: ActorRef, message: Any, timeout: Timeout): Future[Any] =
|
||||
actorRef.internalAsk(message, timeout, ActorRef.noSender)
|
||||
|
||||
private def messagePartOfException(message: Any, sender: ActorRef): String = {
|
||||
val msg = if (message == null) "unknown" else message
|
||||
val wasSentBy = if (sender == ActorRef.noSender) "" else s" was sent by [$sender]"
|
||||
s"Message of type [${msg.getClass.getName}]$wasSentBy."
|
||||
}
|
||||
|
||||
/**
|
||||
* INTERNAL API
|
||||
*/
|
||||
@InternalApi private[akka] def negativeTimeoutException(recipient: Any, message: Any, sender: ActorRef): IllegalArgumentException = {
|
||||
new IllegalArgumentException(s"Timeout length must be positive, question not sent to [$recipient]. " +
|
||||
messagePartOfException(message, sender))
|
||||
}
|
||||
|
||||
/**
|
||||
* INTERNAL API
|
||||
*/
|
||||
@InternalApi private[akka] def recipientTerminatedExcpetion(recipient: Any, message: Any, sender: ActorRef): AskTimeoutException = {
|
||||
new AskTimeoutException(s"Recipient [$recipient] had already been terminated. " +
|
||||
messagePartOfException(message, sender))
|
||||
}
|
||||
|
||||
/**
|
||||
* INTERNAL API
|
||||
*/
|
||||
@InternalApi private[akka] def unsupportedRecipientType(recipient: Any, message: Any, sender: ActorRef): IllegalArgumentException = {
|
||||
new IllegalArgumentException(s"Unsupported recipient type, question not sent to [$recipient]. " +
|
||||
messagePartOfException(message, sender))
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
|
|
@ -288,16 +327,16 @@ final class AskableActorRef(val actorRef: ActorRef) extends AnyVal {
|
|||
private[pattern] def internalAsk(message: Any, timeout: Timeout, sender: ActorRef) = actorRef match {
|
||||
case ref: InternalActorRef if ref.isTerminated ⇒
|
||||
actorRef ! message
|
||||
Future.failed[Any](new AskTimeoutException(s"""Recipient[$actorRef] had already been terminated. Sender[$sender] sent the message of type "${message.getClass.getName}"."""))
|
||||
Future.failed[Any](AskableActorRef.recipientTerminatedExcpetion(actorRef, message, sender))
|
||||
case ref: InternalActorRef ⇒
|
||||
if (timeout.duration.length <= 0)
|
||||
Future.failed[Any](new IllegalArgumentException(s"""Timeout length must be positive, question not sent to [$actorRef]. Sender[$sender] sent the message of type "${message.getClass.getName}"."""))
|
||||
Future.failed[Any](AskableActorRef.negativeTimeoutException(actorRef, message, sender))
|
||||
else {
|
||||
val a = PromiseActorRef(ref.provider, timeout, targetName = actorRef, message.getClass.getName, sender)
|
||||
actorRef.tell(message, a)
|
||||
a.result.future
|
||||
}
|
||||
case _ ⇒ Future.failed[Any](new IllegalArgumentException(s"""Unsupported recipient ActorRef type, question not sent to [$actorRef]. Sender[$sender] sent the message of type "${message.getClass.getName}"."""))
|
||||
case _ ⇒ Future.failed[Any](AskableActorRef.unsupportedRecipientType(actorRef, message, sender))
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -320,11 +359,11 @@ final class ExplicitlyAskableActorRef(val actorRef: ActorRef) extends AnyVal {
|
|||
case ref: InternalActorRef if ref.isTerminated ⇒
|
||||
val message = messageFactory(ref.provider.deadLetters)
|
||||
actorRef ! message
|
||||
Future.failed[Any](new AskTimeoutException(s"""Recipient[$actorRef] had already been terminated. Sender[$sender] sent the message of type "${message.getClass.getName}"."""))
|
||||
Future.failed[Any](AskableActorRef.recipientTerminatedExcpetion(actorRef, message, sender))
|
||||
case ref: InternalActorRef ⇒
|
||||
if (timeout.duration.length <= 0) {
|
||||
val message = messageFactory(ref.provider.deadLetters)
|
||||
Future.failed[Any](new IllegalArgumentException(s"""Timeout length must be positive, question not sent to [$actorRef]. Sender[$sender] sent the message of type "${message.getClass.getName}"."""))
|
||||
Future.failed[Any](AskableActorRef.negativeTimeoutException(actorRef, message, sender))
|
||||
} else {
|
||||
val a = PromiseActorRef(ref.provider, timeout, targetName = actorRef, "unknown", sender)
|
||||
val message = messageFactory(a)
|
||||
|
|
@ -333,10 +372,11 @@ final class ExplicitlyAskableActorRef(val actorRef: ActorRef) extends AnyVal {
|
|||
a.result.future
|
||||
}
|
||||
case _ if sender eq null ⇒
|
||||
Future.failed[Any](new IllegalArgumentException(s"""No recipient provided, question not sent to [$actorRef]."""))
|
||||
Future.failed[Any](new IllegalArgumentException("No recipient for the reply was provided, " +
|
||||
s"question not sent to [$actorRef]."))
|
||||
case _ ⇒
|
||||
val message = messageFactory(sender.asInstanceOf[InternalActorRef].provider.deadLetters)
|
||||
Future.failed[Any](new IllegalArgumentException(s"""Unsupported recipient ActorRef type, question not sent to [$actorRef]. Sender[$sender] sent the message of type "${message.getClass.getName}"."""))
|
||||
val message = if (sender == null) null else messageFactory(sender.asInstanceOf[InternalActorRef].provider.deadLetters)
|
||||
Future.failed[Any](AskableActorRef.unsupportedRecipientType(actorRef, message, sender))
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -383,14 +423,13 @@ final class AskableActorSelection(val actorSel: ActorSelection) extends AnyVal {
|
|||
private[pattern] def internalAsk(message: Any, timeout: Timeout, sender: ActorRef): Future[Any] = actorSel.anchor match {
|
||||
case ref: InternalActorRef ⇒
|
||||
if (timeout.duration.length <= 0)
|
||||
Future.failed[Any](
|
||||
new IllegalArgumentException(s"""Timeout length must be positive, question not sent to [$actorSel]. Sender[$sender] sent the message of type "${message.getClass.getName}"."""))
|
||||
Future.failed[Any](AskableActorRef.negativeTimeoutException(actorSel, message, sender))
|
||||
else {
|
||||
val a = PromiseActorRef(ref.provider, timeout, targetName = actorSel, message.getClass.getName, sender)
|
||||
actorSel.tell(message, a)
|
||||
a.result.future
|
||||
}
|
||||
case _ ⇒ Future.failed[Any](new IllegalArgumentException(s"""Unsupported recipient ActorRef type, question not sent to [$actorSel]. Sender[$sender] sent the message of type "${message.getClass.getName}"."""))
|
||||
case _ ⇒ Future.failed[Any](AskableActorRef.unsupportedRecipientType(actorSel, message, sender))
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -412,8 +451,7 @@ final class ExplicitlyAskableActorSelection(val actorSel: ActorSelection) extend
|
|||
case ref: InternalActorRef ⇒
|
||||
if (timeout.duration.length <= 0) {
|
||||
val message = messageFactory(ref.provider.deadLetters)
|
||||
Future.failed[Any](
|
||||
new IllegalArgumentException(s"""Timeout length must be positive, question not sent to [$actorSel]. Sender[$sender] sent the message of type "${message.getClass.getName}"."""))
|
||||
Future.failed[Any](AskableActorRef.negativeTimeoutException(actorSel, message, sender))
|
||||
} else {
|
||||
val a = PromiseActorRef(ref.provider, timeout, targetName = actorSel, "unknown", sender)
|
||||
val message = messageFactory(a)
|
||||
|
|
@ -422,10 +460,11 @@ final class ExplicitlyAskableActorSelection(val actorSel: ActorSelection) extend
|
|||
a.result.future
|
||||
}
|
||||
case _ if sender eq null ⇒
|
||||
Future.failed[Any](new IllegalArgumentException(s"""No recipient provided, question not sent to [$actorSel]."""))
|
||||
Future.failed[Any](new IllegalArgumentException("No recipient for the reply was provided, " +
|
||||
s"question not sent to [$actorSel]."))
|
||||
case _ ⇒
|
||||
val message = messageFactory(sender.asInstanceOf[InternalActorRef].provider.deadLetters)
|
||||
Future.failed[Any](new IllegalArgumentException(s"""Unsupported recipient ActorRef type, question not sent to [$actorSel]. Sender[$sender] sent the message of type "${message.getClass.getName}"."""))
|
||||
val message = if (sender == null) null else messageFactory(sender.asInstanceOf[InternalActorRef].provider.deadLetters)
|
||||
Future.failed[Any](AskableActorRef.unsupportedRecipientType(actorSel, message, sender))
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -602,8 +641,14 @@ private[akka] object PromiseActorRef {
|
|||
val a = new PromiseActorRef(provider, result, messageClassName)
|
||||
implicit val ec = a.internalCallingThreadExecutionContext
|
||||
val f = scheduler.scheduleOnce(timeout.duration) {
|
||||
result tryComplete Failure(
|
||||
onTimeout(s"""Ask timed out on [$targetName] after [${timeout.duration.toMillis} ms]. Sender[$sender] sent message of type "${a.messageClassName}"."""))
|
||||
result tryComplete {
|
||||
val wasSentBy = if (sender == ActorRef.noSender) "" else s" was sent by [$sender]"
|
||||
val messagePart = s"Message of type [${a.messageClassName}]$wasSentBy."
|
||||
Failure(
|
||||
onTimeout(s"Ask timed out on [$targetName] after [${timeout.duration.toMillis} ms]. " +
|
||||
messagePart +
|
||||
" A typical reason for `AskTimeoutException` is that the recipient actor didn't send a reply."))
|
||||
}
|
||||
}
|
||||
result.future onComplete { _ ⇒ try a.stop() finally f.cancel() }
|
||||
a
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue