Added akka.pattern.gracefulStop. See #1583

This commit is contained in:
Patrik Nordwall 2012-01-01 20:48:03 +01:00
parent 774584642e
commit 39a96b2ac3
3 changed files with 141 additions and 0 deletions

View file

@ -0,0 +1,52 @@
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.pattern
import akka.testkit.AkkaSpec
import akka.actor.Props
import akka.actor.Actor
import akka.actor.ActorTimeoutException
import akka.dispatch.Await
import akka.util.Duration
import akka.util.duration._
object PatternSpec {
case class Work(duration: Duration)
class TargetActor extends Actor {
def receive = {
case Work(duration) duration.sleep()
}
}
}
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class PatternSpec extends AkkaSpec {
import PatternSpec._
"pattern.stop" must {
"provide Future for stopping an actor" in {
val target = system.actorOf(Props[TargetActor])
val result = gracefulStop(target, 5 seconds)
Await.result(result, 6 seconds) must be(true)
}
"complete Future when actor already terminated" in {
val target = system.actorOf(Props[TargetActor])
Await.ready(gracefulStop(target, 5 seconds), 6 seconds)
Await.ready(gracefulStop(target, 1 millis), 1 second)
}
"complete Future with ActorTimeoutException when actor not terminated within timeout" in {
val target = system.actorOf(Props[TargetActor])
target ! Work(200 millis)
val result = gracefulStop(target, 50 millis)
intercept[ActorTimeoutException] {
Await.result(result, 100 millis)
}
}
}
}

View file

@ -0,0 +1,30 @@
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.pattern
import akka.actor.ActorRef
import akka.actor.ActorSystem
import akka.dispatch.Future
import akka.util.Duration
/**
* Patterns is the Java API for the Akka patterns that provide solutions
* to commonly occurring problems.
*/
object Patterns {
/**
* Returns a [[akka.dispatch.Future]] that will be completed with `Right` `true` when
* existing messages of the target actor has been processed and the actor has been
* terminated.
*
* Useful when you need to wait for termination or compose ordered termination of several actors.
*
* If the target actor isn't terminated within the timeout the [[akka.dispatch.Future]]
* is completed with `Left` [[akka.actor.ActorTimeoutException]].
*/
def gracefulStop(target: ActorRef, timeout: Duration, system: ActorSystem): Future[Boolean] = {
akka.pattern.gracefulStop(target, timeout)(system)
}
}

View file

@ -0,0 +1,59 @@
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka
import akka.actor.Actor
import akka.actor.ActorRef
import akka.actor.ActorSystem
import akka.actor.ActorTimeoutException
import akka.actor.PoisonPill
import akka.actor.Props
import akka.actor.ReceiveTimeout
import akka.actor.Terminated
import akka.dispatch.Future
import akka.dispatch.Promise
import akka.util.Duration
/**
* Akka patterns that provide solutions to commonly occurring problems.
*/
package object pattern {
/**
* Returns a [[akka.dispatch.Future]] that will be completed with `Right` `true` when
* existing messages of the target actor has been processed and the actor has been
* terminated.
*
* Useful when you need to wait for termination or compose ordered termination of several actors.
*
* If the target actor isn't terminated within the timeout the [[akka.dispatch.Future]]
* is completed with `Left` [[akka.actor.ActorTimeoutException]].
*/
def gracefulStop(target: ActorRef, timeout: Duration)(implicit system: ActorSystem): Future[Boolean] = {
if (target.isTerminated) {
Promise.successful(true)(system.dispatcher)
} else {
val result = Promise[Boolean]()(system.dispatcher)
system.actorOf(Props(new Actor {
// Terminated will be received when target has been stopped
context watch target
target ! PoisonPill
// ReceiveTimeout will be received if nothing else is received within the timeout
context setReceiveTimeout timeout
def receive = {
case Terminated(a)
result.complete(Right(true))
system.stop(self)
case ReceiveTimeout
result.complete(Left(
new ActorTimeoutException("Failed to stop [%s] within [%s]".format(target.path, context.receiveTimeout))))
system.stop(self)
}
}))
result
}
}
}