diff --git a/akka-actor-tests/src/test/scala/akka/pattern/PatternSpec.scala b/akka-actor-tests/src/test/scala/akka/pattern/PatternSpec.scala new file mode 100644 index 0000000000..ce1b9db201 --- /dev/null +++ b/akka-actor-tests/src/test/scala/akka/pattern/PatternSpec.scala @@ -0,0 +1,52 @@ +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ + +package akka.pattern + +import akka.testkit.AkkaSpec +import akka.actor.Props +import akka.actor.Actor +import akka.actor.ActorTimeoutException +import akka.dispatch.Await +import akka.util.Duration +import akka.util.duration._ + +object PatternSpec { + case class Work(duration: Duration) + class TargetActor extends Actor { + def receive = { + case Work(duration) ⇒ duration.sleep() + } + } +} + +@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) +class PatternSpec extends AkkaSpec { + + import PatternSpec._ + + "pattern.gracefulStop" must { + + "provide Future for stopping an actor" in { + val target = system.actorOf(Props[TargetActor]) + val result = gracefulStop(target, 5 seconds) + Await.result(result, 6 seconds) must be(true) + } + + "complete Future when actor already terminated" in { + val target = system.actorOf(Props[TargetActor]) + Await.ready(gracefulStop(target, 5 seconds), 6 seconds) + Await.ready(gracefulStop(target, 1 millis), 1 second) + } + + "complete Future with ActorTimeoutException when actor not terminated within timeout" in { + val target = system.actorOf(Props[TargetActor]) + target ! Work(250 millis) + val result = gracefulStop(target, 10 millis) + intercept[ActorTimeoutException] { + Await.result(result, 200 millis) + } + } + } +} diff --git a/akka-actor/src/main/scala/akka/pattern/Patterns.scala b/akka-actor/src/main/scala/akka/pattern/Patterns.scala new file mode 100644 index 0000000000..abf435edc5 --- /dev/null +++ b/akka-actor/src/main/scala/akka/pattern/Patterns.scala @@ -0,0 +1,30 @@ +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ +package akka.pattern + +import akka.actor.ActorRef +import akka.actor.ActorSystem +import akka.dispatch.Future +import akka.util.Duration + +/** + * Patterns is the Java API for the Akka patterns that provide solutions + * to commonly occurring problems. + */ +object Patterns { + + /** + * Returns a [[akka.dispatch.Future]] that will be completed with success (value `true`) when + * existing messages of the target actor has been processed and the actor has been + * terminated. + * + * Useful when you need to wait for termination or compose ordered termination of several actors. + * + * If the target actor isn't terminated within the timeout the [[akka.dispatch.Future]] + * is completed with failure [[akka.actor.ActorTimeoutException]]. + */ + def gracefulStop(target: ActorRef, timeout: Duration, system: ActorSystem): Future[java.lang.Boolean] = { + akka.pattern.gracefulStop(target, timeout)(system).asInstanceOf[Future[java.lang.Boolean]] + } +} \ No newline at end of file diff --git a/akka-actor/src/main/scala/akka/pattern/package.scala b/akka-actor/src/main/scala/akka/pattern/package.scala new file mode 100644 index 0000000000..b09ee56897 --- /dev/null +++ b/akka-actor/src/main/scala/akka/pattern/package.scala @@ -0,0 +1,59 @@ +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ +package akka + +import akka.actor.Actor +import akka.actor.ActorRef +import akka.actor.ActorSystem +import akka.actor.ActorTimeoutException +import akka.actor.PoisonPill +import akka.actor.Props +import akka.actor.ReceiveTimeout +import akka.actor.Terminated +import akka.dispatch.Future +import akka.dispatch.Promise +import akka.util.Duration + +/** + * Akka patterns that provide solutions to commonly occurring problems. + */ +package object pattern { + + /** + * Returns a [[akka.dispatch.Future]] that will be completed with success (value `true`) when + * existing messages of the target actor has been processed and the actor has been + * terminated. + * + * Useful when you need to wait for termination or compose ordered termination of several actors. + * + * If the target actor isn't terminated within the timeout the [[akka.dispatch.Future]] + * is completed with failure [[akka.actor.ActorTimeoutException]]. + */ + def gracefulStop(target: ActorRef, timeout: Duration)(implicit system: ActorSystem): Future[Boolean] = { + if (target.isTerminated) { + Promise.successful(true) + } else { + val result = Promise[Boolean]() + system.actorOf(Props(new Actor { + // Terminated will be received when target has been stopped + context watch target + target ! PoisonPill + // ReceiveTimeout will be received if nothing else is received within the timeout + context setReceiveTimeout timeout + + def receive = { + case Terminated(a) if a == target ⇒ + result success true + context stop self + case ReceiveTimeout ⇒ + result failure new ActorTimeoutException( + "Failed to stop [%s] within [%s]".format(target.path, context.receiveTimeout)) + context stop self + } + })) + result + } + } + +} diff --git a/akka-docs/java/code/akka/docs/actor/UntypedActorDocTestBase.java b/akka-docs/java/code/akka/docs/actor/UntypedActorDocTestBase.java index d442ae6461..b1d84a5841 100644 --- a/akka-docs/java/code/akka/docs/actor/UntypedActorDocTestBase.java +++ b/akka-docs/java/code/akka/docs/actor/UntypedActorDocTestBase.java @@ -28,6 +28,14 @@ import akka.japi.Procedure; import akka.actor.Terminated; //#import-watch +//#import-gracefulStop +import static akka.pattern.Patterns.gracefulStop; +import akka.dispatch.Future; +import akka.dispatch.Await; +import akka.util.Duration; +import akka.actor.ActorTimeoutException; +//#import-gracefulStop + import akka.actor.Props; import akka.actor.UntypedActor; import akka.actor.UntypedActorFactory; @@ -100,8 +108,7 @@ public class UntypedActorDocTestBase { public void propsActorOf() { ActorSystem system = ActorSystem.create("MySystem"); //#creating-props - ActorRef myActor = system.actorOf(new Props(MyUntypedActor.class).withDispatcher("my-dispatcher"), - "myactor"); + ActorRef myActor = system.actorOf(new Props(MyUntypedActor.class).withDispatcher("my-dispatcher"), "myactor"); //#creating-props myActor.tell("test"); system.shutdown(); @@ -174,6 +181,23 @@ public class UntypedActorDocTestBase { system.shutdown(); } + @Test + public void usePatternsGracefulStop() { + ActorSystem system = ActorSystem.create("MySystem"); + ActorRef actorRef = system.actorOf(new Props(MyUntypedActor.class)); + //#gracefulStop + + try { + Future stopped = gracefulStop(actorRef, Duration.create(5, TimeUnit.SECONDS), system); + Await.result(stopped, Duration.create(6, TimeUnit.SECONDS)); + // the actor has been stopped + } catch (ActorTimeoutException e) { + // the actor wasn't stopped within 5 seconds + } + //#gracefulStop + system.shutdown(); + } + public static class MyActor extends UntypedActor { public MyActor(String s) { @@ -264,6 +288,7 @@ public class UntypedActorDocTestBase { } } } + //#hot-swap-actor //#watch diff --git a/akka-docs/java/untyped-actors.rst b/akka-docs/java/untyped-actors.rst index b24b1d6e6c..7e0d788590 100644 --- a/akka-docs/java/untyped-actors.rst +++ b/akka-docs/java/untyped-actors.rst @@ -485,6 +485,16 @@ Use it like this: .. includecode:: code/akka/docs/actor/UntypedActorDocTestBase.java :include: import-actors,poison-pill +Graceful Stop +------------- + +:meth:`gracefulStop` is useful if you need to wait for termination or compose ordered +termination of several actors: + +.. includecode:: code/akka/docs/actor/UntypedActorDocTestBase.java + :include: import-gracefulStop,gracefulStop + + .. _UntypedActor.HotSwap: HotSwap diff --git a/akka-docs/scala/actors.rst b/akka-docs/scala/actors.rst index 204aa3ce56..558b50fac8 100644 --- a/akka-docs/scala/actors.rst +++ b/akka-docs/scala/actors.rst @@ -538,6 +538,15 @@ stop the actor when the message is processed. ``PoisonPill`` is enqueued as ordinary messages and will be handled after messages that were already queued in the mailbox. +Graceful Stop +------------- + +:meth:`gracefulStop` is useful if you need to wait for termination or compose ordered +termination of several actors: + +.. includecode:: code/akka/docs/actor/ActorDocSpec.scala#gracefulStop + + .. _Actor.HotSwap: Become/Unbecome diff --git a/akka-docs/scala/code/akka/docs/actor/ActorDocSpec.scala b/akka-docs/scala/code/akka/docs/actor/ActorDocSpec.scala index cdba3d07f3..20ac33480b 100644 --- a/akka-docs/scala/code/akka/docs/actor/ActorDocSpec.scala +++ b/akka-docs/scala/code/akka/docs/actor/ActorDocSpec.scala @@ -8,13 +8,7 @@ import akka.actor.Actor import akka.actor.Props import akka.event.Logging import akka.dispatch.Future - -//#imports1 - -//#imports2 import akka.actor.ActorSystem -//#imports2 - import org.scalatest.{ BeforeAndAfterAll, WordSpec } import org.scalatest.matchers.MustMatchers import akka.testkit._ @@ -114,7 +108,6 @@ object SwapperApp extends App { //#swapper //#receive-orElse -import akka.actor.Actor.Receive abstract class GenericActor extends Actor { // to be defined in subclassing actor @@ -317,4 +310,22 @@ class ActorDocSpec extends AkkaSpec(Map("akka.loglevel" -> "INFO")) { a ! "kill" expectMsg("finished") } + + "using pattern gracefulStop" in { + val actorRef = system.actorOf(Props[MyActor]) + //#gracefulStop + import akka.pattern.gracefulStop + import akka.dispatch.Await + import akka.actor.ActorTimeoutException + + try { + val stopped: Future[Boolean] = gracefulStop(actorRef, 5 seconds)(system) + Await.result(stopped, 6 seconds) + // the actor has been stopped + } catch { + case e: ActorTimeoutException ⇒ // the actor wasn't stopped within 5 seconds + } + //#gracefulStop + + } }