Merge pull request #200 from jboner/wip-1583-patterns-stop-patriknw
Added akka.pattern.gracefulStop. See #1583
This commit is contained in:
commit
ffb6a03fce
7 changed files with 205 additions and 9 deletions
|
|
@ -0,0 +1,52 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
|
||||
package akka.pattern
|
||||
|
||||
import akka.testkit.AkkaSpec
|
||||
import akka.actor.Props
|
||||
import akka.actor.Actor
|
||||
import akka.actor.ActorTimeoutException
|
||||
import akka.dispatch.Await
|
||||
import akka.util.Duration
|
||||
import akka.util.duration._
|
||||
|
||||
object PatternSpec {
|
||||
case class Work(duration: Duration)
|
||||
class TargetActor extends Actor {
|
||||
def receive = {
|
||||
case Work(duration) ⇒ duration.sleep()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
|
||||
class PatternSpec extends AkkaSpec {
|
||||
|
||||
import PatternSpec._
|
||||
|
||||
"pattern.gracefulStop" must {
|
||||
|
||||
"provide Future for stopping an actor" in {
|
||||
val target = system.actorOf(Props[TargetActor])
|
||||
val result = gracefulStop(target, 5 seconds)
|
||||
Await.result(result, 6 seconds) must be(true)
|
||||
}
|
||||
|
||||
"complete Future when actor already terminated" in {
|
||||
val target = system.actorOf(Props[TargetActor])
|
||||
Await.ready(gracefulStop(target, 5 seconds), 6 seconds)
|
||||
Await.ready(gracefulStop(target, 1 millis), 1 second)
|
||||
}
|
||||
|
||||
"complete Future with ActorTimeoutException when actor not terminated within timeout" in {
|
||||
val target = system.actorOf(Props[TargetActor])
|
||||
target ! Work(250 millis)
|
||||
val result = gracefulStop(target, 10 millis)
|
||||
intercept[ActorTimeoutException] {
|
||||
Await.result(result, 200 millis)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
30
akka-actor/src/main/scala/akka/pattern/Patterns.scala
Normal file
30
akka-actor/src/main/scala/akka/pattern/Patterns.scala
Normal file
|
|
@ -0,0 +1,30 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
package akka.pattern
|
||||
|
||||
import akka.actor.ActorRef
|
||||
import akka.actor.ActorSystem
|
||||
import akka.dispatch.Future
|
||||
import akka.util.Duration
|
||||
|
||||
/**
|
||||
* Patterns is the Java API for the Akka patterns that provide solutions
|
||||
* to commonly occurring problems.
|
||||
*/
|
||||
object Patterns {
|
||||
|
||||
/**
|
||||
* Returns a [[akka.dispatch.Future]] that will be completed with success (value `true`) when
|
||||
* existing messages of the target actor has been processed and the actor has been
|
||||
* terminated.
|
||||
*
|
||||
* Useful when you need to wait for termination or compose ordered termination of several actors.
|
||||
*
|
||||
* If the target actor isn't terminated within the timeout the [[akka.dispatch.Future]]
|
||||
* is completed with failure [[akka.actor.ActorTimeoutException]].
|
||||
*/
|
||||
def gracefulStop(target: ActorRef, timeout: Duration, system: ActorSystem): Future[java.lang.Boolean] = {
|
||||
akka.pattern.gracefulStop(target, timeout)(system).asInstanceOf[Future[java.lang.Boolean]]
|
||||
}
|
||||
}
|
||||
59
akka-actor/src/main/scala/akka/pattern/package.scala
Normal file
59
akka-actor/src/main/scala/akka/pattern/package.scala
Normal file
|
|
@ -0,0 +1,59 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
package akka
|
||||
|
||||
import akka.actor.Actor
|
||||
import akka.actor.ActorRef
|
||||
import akka.actor.ActorSystem
|
||||
import akka.actor.ActorTimeoutException
|
||||
import akka.actor.PoisonPill
|
||||
import akka.actor.Props
|
||||
import akka.actor.ReceiveTimeout
|
||||
import akka.actor.Terminated
|
||||
import akka.dispatch.Future
|
||||
import akka.dispatch.Promise
|
||||
import akka.util.Duration
|
||||
|
||||
/**
|
||||
* Akka patterns that provide solutions to commonly occurring problems.
|
||||
*/
|
||||
package object pattern {
|
||||
|
||||
/**
|
||||
* Returns a [[akka.dispatch.Future]] that will be completed with success (value `true`) when
|
||||
* existing messages of the target actor has been processed and the actor has been
|
||||
* terminated.
|
||||
*
|
||||
* Useful when you need to wait for termination or compose ordered termination of several actors.
|
||||
*
|
||||
* If the target actor isn't terminated within the timeout the [[akka.dispatch.Future]]
|
||||
* is completed with failure [[akka.actor.ActorTimeoutException]].
|
||||
*/
|
||||
def gracefulStop(target: ActorRef, timeout: Duration)(implicit system: ActorSystem): Future[Boolean] = {
|
||||
if (target.isTerminated) {
|
||||
Promise.successful(true)
|
||||
} else {
|
||||
val result = Promise[Boolean]()
|
||||
system.actorOf(Props(new Actor {
|
||||
// Terminated will be received when target has been stopped
|
||||
context watch target
|
||||
target ! PoisonPill
|
||||
// ReceiveTimeout will be received if nothing else is received within the timeout
|
||||
context setReceiveTimeout timeout
|
||||
|
||||
def receive = {
|
||||
case Terminated(a) if a == target ⇒
|
||||
result success true
|
||||
context stop self
|
||||
case ReceiveTimeout ⇒
|
||||
result failure new ActorTimeoutException(
|
||||
"Failed to stop [%s] within [%s]".format(target.path, context.receiveTimeout))
|
||||
context stop self
|
||||
}
|
||||
}))
|
||||
result
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -28,6 +28,14 @@ import akka.japi.Procedure;
|
|||
import akka.actor.Terminated;
|
||||
//#import-watch
|
||||
|
||||
//#import-gracefulStop
|
||||
import static akka.pattern.Patterns.gracefulStop;
|
||||
import akka.dispatch.Future;
|
||||
import akka.dispatch.Await;
|
||||
import akka.util.Duration;
|
||||
import akka.actor.ActorTimeoutException;
|
||||
//#import-gracefulStop
|
||||
|
||||
import akka.actor.Props;
|
||||
import akka.actor.UntypedActor;
|
||||
import akka.actor.UntypedActorFactory;
|
||||
|
|
@ -100,8 +108,7 @@ public class UntypedActorDocTestBase {
|
|||
public void propsActorOf() {
|
||||
ActorSystem system = ActorSystem.create("MySystem");
|
||||
//#creating-props
|
||||
ActorRef myActor = system.actorOf(new Props(MyUntypedActor.class).withDispatcher("my-dispatcher"),
|
||||
"myactor");
|
||||
ActorRef myActor = system.actorOf(new Props(MyUntypedActor.class).withDispatcher("my-dispatcher"), "myactor");
|
||||
//#creating-props
|
||||
myActor.tell("test");
|
||||
system.shutdown();
|
||||
|
|
@ -174,6 +181,23 @@ public class UntypedActorDocTestBase {
|
|||
system.shutdown();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void usePatternsGracefulStop() {
|
||||
ActorSystem system = ActorSystem.create("MySystem");
|
||||
ActorRef actorRef = system.actorOf(new Props(MyUntypedActor.class));
|
||||
//#gracefulStop
|
||||
|
||||
try {
|
||||
Future<Boolean> stopped = gracefulStop(actorRef, Duration.create(5, TimeUnit.SECONDS), system);
|
||||
Await.result(stopped, Duration.create(6, TimeUnit.SECONDS));
|
||||
// the actor has been stopped
|
||||
} catch (ActorTimeoutException e) {
|
||||
// the actor wasn't stopped within 5 seconds
|
||||
}
|
||||
//#gracefulStop
|
||||
system.shutdown();
|
||||
}
|
||||
|
||||
public static class MyActor extends UntypedActor {
|
||||
|
||||
public MyActor(String s) {
|
||||
|
|
@ -264,6 +288,7 @@ public class UntypedActorDocTestBase {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
//#hot-swap-actor
|
||||
|
||||
//#watch
|
||||
|
|
|
|||
|
|
@ -485,6 +485,16 @@ Use it like this:
|
|||
.. includecode:: code/akka/docs/actor/UntypedActorDocTestBase.java
|
||||
:include: import-actors,poison-pill
|
||||
|
||||
Graceful Stop
|
||||
-------------
|
||||
|
||||
:meth:`gracefulStop` is useful if you need to wait for termination or compose ordered
|
||||
termination of several actors:
|
||||
|
||||
.. includecode:: code/akka/docs/actor/UntypedActorDocTestBase.java
|
||||
:include: import-gracefulStop,gracefulStop
|
||||
|
||||
|
||||
.. _UntypedActor.HotSwap:
|
||||
|
||||
HotSwap
|
||||
|
|
|
|||
|
|
@ -538,6 +538,15 @@ stop the actor when the message is processed. ``PoisonPill`` is enqueued as
|
|||
ordinary messages and will be handled after messages that were already queued
|
||||
in the mailbox.
|
||||
|
||||
Graceful Stop
|
||||
-------------
|
||||
|
||||
:meth:`gracefulStop` is useful if you need to wait for termination or compose ordered
|
||||
termination of several actors:
|
||||
|
||||
.. includecode:: code/akka/docs/actor/ActorDocSpec.scala#gracefulStop
|
||||
|
||||
|
||||
.. _Actor.HotSwap:
|
||||
|
||||
Become/Unbecome
|
||||
|
|
|
|||
|
|
@ -8,13 +8,7 @@ import akka.actor.Actor
|
|||
import akka.actor.Props
|
||||
import akka.event.Logging
|
||||
import akka.dispatch.Future
|
||||
|
||||
//#imports1
|
||||
|
||||
//#imports2
|
||||
import akka.actor.ActorSystem
|
||||
//#imports2
|
||||
|
||||
import org.scalatest.{ BeforeAndAfterAll, WordSpec }
|
||||
import org.scalatest.matchers.MustMatchers
|
||||
import akka.testkit._
|
||||
|
|
@ -114,7 +108,6 @@ object SwapperApp extends App {
|
|||
//#swapper
|
||||
|
||||
//#receive-orElse
|
||||
import akka.actor.Actor.Receive
|
||||
|
||||
abstract class GenericActor extends Actor {
|
||||
// to be defined in subclassing actor
|
||||
|
|
@ -317,4 +310,22 @@ class ActorDocSpec extends AkkaSpec(Map("akka.loglevel" -> "INFO")) {
|
|||
a ! "kill"
|
||||
expectMsg("finished")
|
||||
}
|
||||
|
||||
"using pattern gracefulStop" in {
|
||||
val actorRef = system.actorOf(Props[MyActor])
|
||||
//#gracefulStop
|
||||
import akka.pattern.gracefulStop
|
||||
import akka.dispatch.Await
|
||||
import akka.actor.ActorTimeoutException
|
||||
|
||||
try {
|
||||
val stopped: Future[Boolean] = gracefulStop(actorRef, 5 seconds)(system)
|
||||
Await.result(stopped, 6 seconds)
|
||||
// the actor has been stopped
|
||||
} catch {
|
||||
case e: ActorTimeoutException ⇒ // the actor wasn't stopped within 5 seconds
|
||||
}
|
||||
//#gracefulStop
|
||||
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue