=act #16361 Added more information to AskTimeoutException
This commit is contained in:
parent
fd498d34a5
commit
dee5ad3deb
4 changed files with 41 additions and 18 deletions
|
|
@ -73,6 +73,8 @@ trait AskSupport {
|
|||
*
|
||||
*/
|
||||
def ask(actorRef: ActorRef, message: Any)(implicit timeout: Timeout): Future[Any] = actorRef ? message
|
||||
def ask(actorRef: ActorRef, message: Any, sender: ActorRef)(implicit timeout: Timeout): Future[Any] =
|
||||
actorRef.?(message)(timeout, sender)
|
||||
|
||||
/**
|
||||
* Import this implicit conversion to gain `?` and `ask` methods on
|
||||
|
|
@ -119,6 +121,8 @@ trait AskSupport {
|
|||
*
|
||||
*/
|
||||
def ask(actorSelection: ActorSelection, message: Any)(implicit timeout: Timeout): Future[Any] = actorSelection ? message
|
||||
def ask(actorSelection: ActorSelection, message: Any, sender: ActorRef)(implicit timeout: Timeout): Future[Any] =
|
||||
actorSelection.?(message)(timeout, sender)
|
||||
}
|
||||
|
||||
/*
|
||||
|
|
@ -126,22 +130,22 @@ trait AskSupport {
|
|||
*/
|
||||
final class AskableActorRef(val actorRef: ActorRef) extends AnyVal {
|
||||
|
||||
def ask(message: Any)(implicit timeout: Timeout): Future[Any] = actorRef match {
|
||||
def ask(message: Any)(implicit timeout: Timeout, sender: ActorRef = Actor.noSender): Future[Any] = actorRef match {
|
||||
case ref: InternalActorRef if ref.isTerminated ⇒
|
||||
actorRef ! message
|
||||
Future.failed[Any](new AskTimeoutException(s"Recipient[$actorRef] had already been terminated."))
|
||||
Future.failed[Any](new AskTimeoutException(s"""Recipient[$actorRef] had already been terminated. Sender[$sender] sent the message of type "${message.getClass.getName}"."""))
|
||||
case ref: InternalActorRef ⇒
|
||||
if (timeout.duration.length <= 0)
|
||||
Future.failed[Any](new IllegalArgumentException(s"Timeout length must not be negative, question not sent to [$actorRef]"))
|
||||
Future.failed[Any](new IllegalArgumentException(s"""Timeout length must not be negative, question not sent to [$actorRef]. Sender[$sender] sent the message of type "${message.getClass.getName}"."""))
|
||||
else {
|
||||
val a = PromiseActorRef(ref.provider, timeout, targetName = actorRef.toString)
|
||||
val a = PromiseActorRef(ref.provider, timeout, targetName = actorRef.toString, message, sender)
|
||||
actorRef.tell(message, a)
|
||||
a.result.future
|
||||
}
|
||||
case _ ⇒ Future.failed[Any](new IllegalArgumentException(s"Unsupported recipient ActorRef type, question not sent to [$actorRef]"))
|
||||
case _ ⇒ Future.failed[Any](new IllegalArgumentException(s"""Unsupported recipient ActorRef type, question not sent to [$actorRef]. Sender[$sender] sent the message of type "${message.getClass.getName}"."""))
|
||||
}
|
||||
|
||||
def ?(message: Any)(implicit timeout: Timeout): Future[Any] = ask(message)(timeout)
|
||||
def ?(message: Any)(implicit timeout: Timeout, sender: ActorRef = Actor.noSender): Future[Any] = ask(message)(timeout, sender)
|
||||
}
|
||||
|
||||
/*
|
||||
|
|
@ -149,20 +153,20 @@ final class AskableActorRef(val actorRef: ActorRef) extends AnyVal {
|
|||
*/
|
||||
final class AskableActorSelection(val actorSel: ActorSelection) extends AnyVal {
|
||||
|
||||
def ask(message: Any)(implicit timeout: Timeout): Future[Any] = actorSel.anchor match {
|
||||
def ask(message: Any)(implicit timeout: Timeout, sender: ActorRef = Actor.noSender): Future[Any] = actorSel.anchor match {
|
||||
case ref: InternalActorRef ⇒
|
||||
if (timeout.duration.length <= 0)
|
||||
Future.failed[Any](
|
||||
new IllegalArgumentException(s"Timeout length must not be negative, question not sent to [$actorSel]"))
|
||||
new IllegalArgumentException(s"""Timeout length must not be negative, question not sent to [$actorSel]. Sender[$sender] sent the message of type "${message.getClass.getName}"."""))
|
||||
else {
|
||||
val a = PromiseActorRef(ref.provider, timeout, targetName = actorSel.toString)
|
||||
val a = PromiseActorRef(ref.provider, timeout, targetName = actorSel.toString, message, sender)
|
||||
actorSel.tell(message, a)
|
||||
a.result.future
|
||||
}
|
||||
case _ ⇒ Future.failed[Any](new IllegalArgumentException(s"Unsupported recipient ActorRef type, question not sent to [$actorSel]"))
|
||||
case _ ⇒ Future.failed[Any](new IllegalArgumentException(s"""Unsupported recipient ActorRef type, question not sent to [$actorSel]. Sender[$sender] sent the message of type "${message.getClass.getName}"."""))
|
||||
}
|
||||
|
||||
def ?(message: Any)(implicit timeout: Timeout): Future[Any] = ask(message)(timeout)
|
||||
def ?(message: Any)(implicit timeout: Timeout, sender: ActorRef = Actor.noSender): Future[Any] = ask(message)(timeout, sender)
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -324,13 +328,15 @@ private[akka] object PromiseActorRef {
|
|||
private case object Stopped
|
||||
private final case class StoppedWithPath(path: ActorPath)
|
||||
|
||||
def apply(provider: ActorRefProvider, timeout: Timeout, targetName: String): PromiseActorRef = {
|
||||
def apply(provider: ActorRefProvider, timeout: Timeout, targetName: String, message: Any, sender: ActorRef = Actor.noSender): PromiseActorRef = {
|
||||
val result = Promise[Any]()
|
||||
val scheduler = provider.guardian.underlying.system.scheduler
|
||||
val a = new PromiseActorRef(provider, result)
|
||||
implicit val ec = a.internalCallingThreadExecutionContext
|
||||
val messageClassName = message.getClass.getName
|
||||
val f = scheduler.scheduleOnce(timeout.duration) {
|
||||
result tryComplete Failure(new AskTimeoutException(s"Ask timed out on [$targetName] after [${timeout.duration.toMillis} ms]"))
|
||||
result tryComplete Failure(
|
||||
new AskTimeoutException(s"""Ask timed out on [$targetName] after [${timeout.duration.toMillis} ms]. Sender[$sender] sent message of type "$messageClassName"."""))
|
||||
}
|
||||
result.future onComplete { _ ⇒ try a.stop() finally f.cancel() }
|
||||
a
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue