Removing ActorTimeoutException since it was only used in GracefulStop, and changed GracefulStop to use PromiseActorRef instead of spawning a toplevel actor
This commit is contained in:
parent
5eba9fceef
commit
5afe6601ff
4 changed files with 18 additions and 38 deletions
|
|
@ -7,11 +7,9 @@ package akka.pattern
|
|||
import akka.testkit.AkkaSpec
|
||||
import akka.actor.Props
|
||||
import akka.actor.Actor
|
||||
import akka.actor.ActorTimeoutException
|
||||
import akka.util.Duration
|
||||
import akka.util.duration._
|
||||
import akka.dispatch.{ Future, Promise, Await }
|
||||
import java.lang.IllegalStateException
|
||||
|
||||
object PatternSpec {
|
||||
case class Work(duration: Duration)
|
||||
|
|
@ -41,13 +39,10 @@ class PatternSpec extends AkkaSpec {
|
|||
Await.ready(gracefulStop(target, 1 millis), 1 second)
|
||||
}
|
||||
|
||||
"complete Future with ActorTimeoutException when actor not terminated within timeout" in {
|
||||
"complete Future with AskTimeoutException when actor not terminated within timeout" in {
|
||||
val target = system.actorOf(Props[TargetActor])
|
||||
target ! Work(250 millis)
|
||||
val result = gracefulStop(target, 10 millis)
|
||||
intercept[ActorTimeoutException] {
|
||||
Await.result(result, 200 millis)
|
||||
}
|
||||
intercept[AskTimeoutException] { Await.result(gracefulStop(target, 10 millis), 200 millis) }
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -129,12 +129,6 @@ case class ActorInitializationException private[akka] (actor: ActorRef, message:
|
|||
def this(msg: String) = this(null, msg, null)
|
||||
}
|
||||
|
||||
//FIXME: Only used by gracefulStop we should remove this if possible
|
||||
class ActorTimeoutException private[akka] (message: String, cause: Throwable = null)
|
||||
extends AkkaException(message, cause) {
|
||||
def this(msg: String) = this(msg, null)
|
||||
}
|
||||
|
||||
/**
|
||||
* InvalidMessageException is thrown when an invalid message is sent to an Actor.
|
||||
* Technically it's only "null" which is an InvalidMessageException but who knows,
|
||||
|
|
|
|||
|
|
@ -272,9 +272,7 @@ private[akka] object PromiseActorRef {
|
|||
val result = Promise[Any]()(provider.dispatcher)
|
||||
val a = new PromiseActorRef(provider, result)
|
||||
val f = provider.scheduler.scheduleOnce(timeout.duration) { result.tryComplete(Left(new AskTimeoutException("Timed out"))) }
|
||||
result onComplete { _ ⇒
|
||||
try a.stop() finally f.cancel()
|
||||
}
|
||||
result onComplete { _ ⇒ try a.stop() finally f.cancel() }
|
||||
a
|
||||
}
|
||||
}
|
||||
|
|
@ -4,9 +4,9 @@
|
|||
|
||||
package akka.pattern
|
||||
|
||||
import akka.actor.{ ActorRef, Actor, ActorSystem, Props, PoisonPill, Terminated, ReceiveTimeout, ActorTimeoutException }
|
||||
import akka.dispatch.{ Promise, Future }
|
||||
import akka.util.Duration
|
||||
import akka.actor._
|
||||
import akka.util.{ Timeout, Duration }
|
||||
|
||||
trait GracefulStopSupport {
|
||||
/**
|
||||
|
|
@ -14,7 +14,8 @@ trait GracefulStopSupport {
|
|||
* existing messages of the target actor has been processed and the actor has been
|
||||
* terminated.
|
||||
*
|
||||
* Useful when you need to wait for termination or compose ordered termination of several actors.
|
||||
* Useful when you need to wait for termination or compose ordered termination of several actors,
|
||||
* which should only be done outside of the ActorSystem as blocking inside Actors is discouraged.
|
||||
*
|
||||
* If the target actor isn't terminated within the timeout the [[akka.dispatch.Future]]
|
||||
* is completed with failure [[akka.actor.ActorTimeoutException]].
|
||||
|
|
@ -22,26 +23,18 @@ trait GracefulStopSupport {
|
|||
def gracefulStop(target: ActorRef, timeout: Duration)(implicit system: ActorSystem): Future[Boolean] = {
|
||||
if (target.isTerminated) {
|
||||
Promise.successful(true)
|
||||
} else {
|
||||
val result = Promise[Boolean]()
|
||||
system.actorOf(Props(new Actor {
|
||||
// Terminated will be received when target has been stopped
|
||||
context watch target
|
||||
} else system match {
|
||||
case e: ExtendedActorSystem ⇒
|
||||
val ref = PromiseActorRef(e.provider, Timeout(timeout))
|
||||
e.deathWatch.subscribe(ref, target)
|
||||
ref.result onComplete { case x ⇒ println(x) }
|
||||
ref.result onComplete {
|
||||
case Right(Terminated(`target`)) ⇒ () // Ignore
|
||||
case _ ⇒ e.deathWatch.unsubscribe(ref, target)
|
||||
} // Just making sure we're not leaking here
|
||||
target ! PoisonPill
|
||||
// ReceiveTimeout will be received if nothing else is received within the timeout
|
||||
context setReceiveTimeout timeout
|
||||
|
||||
def receive = {
|
||||
case Terminated(a) if a == target ⇒
|
||||
result success true
|
||||
context stop self
|
||||
case ReceiveTimeout ⇒
|
||||
result failure new ActorTimeoutException(
|
||||
"Failed to stop [%s] within [%s]".format(target.path, context.receiveTimeout))
|
||||
context stop self
|
||||
}
|
||||
}))
|
||||
result
|
||||
ref.result map { case Terminated(`target`) ⇒ true }
|
||||
case s ⇒ throw new IllegalArgumentException("Unknown ActorSystem implementation: '" + s + "'")
|
||||
}
|
||||
}
|
||||
}
|
||||
Loading…
Add table
Add a link
Reference in a new issue