From 39a96b2ac35618facc60f9dbc257a65e42d50af5 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Sun, 1 Jan 2012 20:48:03 +0100 Subject: [PATCH 1/4] Added akka.pattern.gracefulStop. See #1583 --- .../test/scala/akka/pattern/PatternSpec.scala | 52 ++++++++++++++++ .../main/scala/akka/pattern/Patterns.scala | 30 ++++++++++ .../src/main/scala/akka/pattern/package.scala | 59 +++++++++++++++++++ 3 files changed, 141 insertions(+) create mode 100644 akka-actor-tests/src/test/scala/akka/pattern/PatternSpec.scala create mode 100644 akka-actor/src/main/scala/akka/pattern/Patterns.scala create mode 100644 akka-actor/src/main/scala/akka/pattern/package.scala diff --git a/akka-actor-tests/src/test/scala/akka/pattern/PatternSpec.scala b/akka-actor-tests/src/test/scala/akka/pattern/PatternSpec.scala new file mode 100644 index 0000000000..69297b743a --- /dev/null +++ b/akka-actor-tests/src/test/scala/akka/pattern/PatternSpec.scala @@ -0,0 +1,52 @@ +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ + +package akka.pattern + +import akka.testkit.AkkaSpec +import akka.actor.Props +import akka.actor.Actor +import akka.actor.ActorTimeoutException +import akka.dispatch.Await +import akka.util.Duration +import akka.util.duration._ + +object PatternSpec { + case class Work(duration: Duration) + class TargetActor extends Actor { + def receive = { + case Work(duration) ⇒ duration.sleep() + } + } +} + +@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) +class PatternSpec extends AkkaSpec { + + import PatternSpec._ + + "pattern.stop" must { + + "provide Future for stopping an actor" in { + val target = system.actorOf(Props[TargetActor]) + val result = gracefulStop(target, 5 seconds) + Await.result(result, 6 seconds) must be(true) + } + + "complete Future when actor already terminated" in { + val target = system.actorOf(Props[TargetActor]) + Await.ready(gracefulStop(target, 5 seconds), 6 seconds) + Await.ready(gracefulStop(target, 1 millis), 1 second) + } + + "complete Future with ActorTimeoutException when actor not terminated within timeout" in { + val target = system.actorOf(Props[TargetActor]) + target ! Work(200 millis) + val result = gracefulStop(target, 50 millis) + intercept[ActorTimeoutException] { + Await.result(result, 100 millis) + } + } + } +} diff --git a/akka-actor/src/main/scala/akka/pattern/Patterns.scala b/akka-actor/src/main/scala/akka/pattern/Patterns.scala new file mode 100644 index 0000000000..6840716bad --- /dev/null +++ b/akka-actor/src/main/scala/akka/pattern/Patterns.scala @@ -0,0 +1,30 @@ +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ +package akka.pattern + +import akka.actor.ActorRef +import akka.actor.ActorSystem +import akka.dispatch.Future +import akka.util.Duration + +/** + * Patterns is the Java API for the Akka patterns that provide solutions + * to commonly occurring problems. + */ +object Patterns { + + /** + * Returns a [[akka.dispatch.Future]] that will be completed with `Right` `true` when + * existing messages of the target actor has been processed and the actor has been + * terminated. + * + * Useful when you need to wait for termination or compose ordered termination of several actors. + * + * If the target actor isn't terminated within the timeout the [[akka.dispatch.Future]] + * is completed with `Left` [[akka.actor.ActorTimeoutException]]. + */ + def gracefulStop(target: ActorRef, timeout: Duration, system: ActorSystem): Future[Boolean] = { + akka.pattern.gracefulStop(target, timeout)(system) + } +} \ No newline at end of file diff --git a/akka-actor/src/main/scala/akka/pattern/package.scala b/akka-actor/src/main/scala/akka/pattern/package.scala new file mode 100644 index 0000000000..eb24d9ae8e --- /dev/null +++ b/akka-actor/src/main/scala/akka/pattern/package.scala @@ -0,0 +1,59 @@ +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ +package akka + +import akka.actor.Actor +import akka.actor.ActorRef +import akka.actor.ActorSystem +import akka.actor.ActorTimeoutException +import akka.actor.PoisonPill +import akka.actor.Props +import akka.actor.ReceiveTimeout +import akka.actor.Terminated +import akka.dispatch.Future +import akka.dispatch.Promise +import akka.util.Duration + +/** + * Akka patterns that provide solutions to commonly occurring problems. + */ +package object pattern { + + /** + * Returns a [[akka.dispatch.Future]] that will be completed with `Right` `true` when + * existing messages of the target actor has been processed and the actor has been + * terminated. + * + * Useful when you need to wait for termination or compose ordered termination of several actors. + * + * If the target actor isn't terminated within the timeout the [[akka.dispatch.Future]] + * is completed with `Left` [[akka.actor.ActorTimeoutException]]. + */ + def gracefulStop(target: ActorRef, timeout: Duration)(implicit system: ActorSystem): Future[Boolean] = { + if (target.isTerminated) { + Promise.successful(true)(system.dispatcher) + } else { + val result = Promise[Boolean]()(system.dispatcher) + system.actorOf(Props(new Actor { + // Terminated will be received when target has been stopped + context watch target + target ! PoisonPill + // ReceiveTimeout will be received if nothing else is received within the timeout + context setReceiveTimeout timeout + + def receive = { + case Terminated(a) ⇒ + result.complete(Right(true)) + system.stop(self) + case ReceiveTimeout ⇒ + result.complete(Left( + new ActorTimeoutException("Failed to stop [%s] within [%s]".format(target.path, context.receiveTimeout)))) + system.stop(self) + } + })) + result + } + } + +} From 3d3b745a26af3c56ba231fccd7b7983826db5f06 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Sun, 1 Jan 2012 21:27:52 +0100 Subject: [PATCH 2/4] Improvements based on feedback. See #1583 --- .../test/scala/akka/pattern/PatternSpec.scala | 8 ++++---- .../src/main/scala/akka/pattern/package.scala | 16 ++++++++-------- 2 files changed, 12 insertions(+), 12 deletions(-) diff --git a/akka-actor-tests/src/test/scala/akka/pattern/PatternSpec.scala b/akka-actor-tests/src/test/scala/akka/pattern/PatternSpec.scala index 69297b743a..ce1b9db201 100644 --- a/akka-actor-tests/src/test/scala/akka/pattern/PatternSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/pattern/PatternSpec.scala @@ -26,7 +26,7 @@ class PatternSpec extends AkkaSpec { import PatternSpec._ - "pattern.stop" must { + "pattern.gracefulStop" must { "provide Future for stopping an actor" in { val target = system.actorOf(Props[TargetActor]) @@ -42,10 +42,10 @@ class PatternSpec extends AkkaSpec { "complete Future with ActorTimeoutException when actor not terminated within timeout" in { val target = system.actorOf(Props[TargetActor]) - target ! Work(200 millis) - val result = gracefulStop(target, 50 millis) + target ! Work(250 millis) + val result = gracefulStop(target, 10 millis) intercept[ActorTimeoutException] { - Await.result(result, 100 millis) + Await.result(result, 200 millis) } } } diff --git a/akka-actor/src/main/scala/akka/pattern/package.scala b/akka-actor/src/main/scala/akka/pattern/package.scala index eb24d9ae8e..728db960ed 100644 --- a/akka-actor/src/main/scala/akka/pattern/package.scala +++ b/akka-actor/src/main/scala/akka/pattern/package.scala @@ -32,9 +32,9 @@ package object pattern { */ def gracefulStop(target: ActorRef, timeout: Duration)(implicit system: ActorSystem): Future[Boolean] = { if (target.isTerminated) { - Promise.successful(true)(system.dispatcher) + Promise.successful(true) } else { - val result = Promise[Boolean]()(system.dispatcher) + val result = Promise[Boolean]() system.actorOf(Props(new Actor { // Terminated will be received when target has been stopped context watch target @@ -43,13 +43,13 @@ package object pattern { context setReceiveTimeout timeout def receive = { - case Terminated(a) ⇒ - result.complete(Right(true)) - system.stop(self) + case Terminated(a) if a == target ⇒ + result success true + context.stop(self) case ReceiveTimeout ⇒ - result.complete(Left( - new ActorTimeoutException("Failed to stop [%s] within [%s]".format(target.path, context.receiveTimeout)))) - system.stop(self) + result failure new ActorTimeoutException( + "Failed to stop [%s] within [%s]".format(target.path, context.receiveTimeout)) + context.stop(self) } })) result From f3cc1485387613200360758be1f942130633f6a0 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Sun, 1 Jan 2012 21:30:04 +0100 Subject: [PATCH 3/4] format --- akka-actor/src/main/scala/akka/pattern/package.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/akka-actor/src/main/scala/akka/pattern/package.scala b/akka-actor/src/main/scala/akka/pattern/package.scala index 728db960ed..91159d08a9 100644 --- a/akka-actor/src/main/scala/akka/pattern/package.scala +++ b/akka-actor/src/main/scala/akka/pattern/package.scala @@ -45,11 +45,11 @@ package object pattern { def receive = { case Terminated(a) if a == target ⇒ result success true - context.stop(self) + context stop self case ReceiveTimeout ⇒ result failure new ActorTimeoutException( "Failed to stop [%s] within [%s]".format(target.path, context.receiveTimeout)) - context.stop(self) + context stop self } })) result From 5ba0963d71716d91641b2ab59f2a9ddfad0a4339 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Tue, 3 Jan 2012 11:41:49 +0100 Subject: [PATCH 4/4] Added documentation. See #1583 --- .../main/scala/akka/pattern/Patterns.scala | 8 ++--- .../src/main/scala/akka/pattern/package.scala | 4 +-- .../docs/actor/UntypedActorDocTestBase.java | 29 +++++++++++++++++-- akka-docs/java/untyped-actors.rst | 10 +++++++ akka-docs/scala/actors.rst | 9 ++++++ .../code/akka/docs/actor/ActorDocSpec.scala | 25 +++++++++++----- 6 files changed, 70 insertions(+), 15 deletions(-) diff --git a/akka-actor/src/main/scala/akka/pattern/Patterns.scala b/akka-actor/src/main/scala/akka/pattern/Patterns.scala index 6840716bad..abf435edc5 100644 --- a/akka-actor/src/main/scala/akka/pattern/Patterns.scala +++ b/akka-actor/src/main/scala/akka/pattern/Patterns.scala @@ -15,16 +15,16 @@ import akka.util.Duration object Patterns { /** - * Returns a [[akka.dispatch.Future]] that will be completed with `Right` `true` when + * Returns a [[akka.dispatch.Future]] that will be completed with success (value `true`) when * existing messages of the target actor has been processed and the actor has been * terminated. * * Useful when you need to wait for termination or compose ordered termination of several actors. * * If the target actor isn't terminated within the timeout the [[akka.dispatch.Future]] - * is completed with `Left` [[akka.actor.ActorTimeoutException]]. + * is completed with failure [[akka.actor.ActorTimeoutException]]. */ - def gracefulStop(target: ActorRef, timeout: Duration, system: ActorSystem): Future[Boolean] = { - akka.pattern.gracefulStop(target, timeout)(system) + def gracefulStop(target: ActorRef, timeout: Duration, system: ActorSystem): Future[java.lang.Boolean] = { + akka.pattern.gracefulStop(target, timeout)(system).asInstanceOf[Future[java.lang.Boolean]] } } \ No newline at end of file diff --git a/akka-actor/src/main/scala/akka/pattern/package.scala b/akka-actor/src/main/scala/akka/pattern/package.scala index 91159d08a9..b09ee56897 100644 --- a/akka-actor/src/main/scala/akka/pattern/package.scala +++ b/akka-actor/src/main/scala/akka/pattern/package.scala @@ -21,14 +21,14 @@ import akka.util.Duration package object pattern { /** - * Returns a [[akka.dispatch.Future]] that will be completed with `Right` `true` when + * Returns a [[akka.dispatch.Future]] that will be completed with success (value `true`) when * existing messages of the target actor has been processed and the actor has been * terminated. * * Useful when you need to wait for termination or compose ordered termination of several actors. * * If the target actor isn't terminated within the timeout the [[akka.dispatch.Future]] - * is completed with `Left` [[akka.actor.ActorTimeoutException]]. + * is completed with failure [[akka.actor.ActorTimeoutException]]. */ def gracefulStop(target: ActorRef, timeout: Duration)(implicit system: ActorSystem): Future[Boolean] = { if (target.isTerminated) { diff --git a/akka-docs/java/code/akka/docs/actor/UntypedActorDocTestBase.java b/akka-docs/java/code/akka/docs/actor/UntypedActorDocTestBase.java index d442ae6461..b1d84a5841 100644 --- a/akka-docs/java/code/akka/docs/actor/UntypedActorDocTestBase.java +++ b/akka-docs/java/code/akka/docs/actor/UntypedActorDocTestBase.java @@ -28,6 +28,14 @@ import akka.japi.Procedure; import akka.actor.Terminated; //#import-watch +//#import-gracefulStop +import static akka.pattern.Patterns.gracefulStop; +import akka.dispatch.Future; +import akka.dispatch.Await; +import akka.util.Duration; +import akka.actor.ActorTimeoutException; +//#import-gracefulStop + import akka.actor.Props; import akka.actor.UntypedActor; import akka.actor.UntypedActorFactory; @@ -100,8 +108,7 @@ public class UntypedActorDocTestBase { public void propsActorOf() { ActorSystem system = ActorSystem.create("MySystem"); //#creating-props - ActorRef myActor = system.actorOf(new Props(MyUntypedActor.class).withDispatcher("my-dispatcher"), - "myactor"); + ActorRef myActor = system.actorOf(new Props(MyUntypedActor.class).withDispatcher("my-dispatcher"), "myactor"); //#creating-props myActor.tell("test"); system.shutdown(); @@ -174,6 +181,23 @@ public class UntypedActorDocTestBase { system.shutdown(); } + @Test + public void usePatternsGracefulStop() { + ActorSystem system = ActorSystem.create("MySystem"); + ActorRef actorRef = system.actorOf(new Props(MyUntypedActor.class)); + //#gracefulStop + + try { + Future stopped = gracefulStop(actorRef, Duration.create(5, TimeUnit.SECONDS), system); + Await.result(stopped, Duration.create(6, TimeUnit.SECONDS)); + // the actor has been stopped + } catch (ActorTimeoutException e) { + // the actor wasn't stopped within 5 seconds + } + //#gracefulStop + system.shutdown(); + } + public static class MyActor extends UntypedActor { public MyActor(String s) { @@ -264,6 +288,7 @@ public class UntypedActorDocTestBase { } } } + //#hot-swap-actor //#watch diff --git a/akka-docs/java/untyped-actors.rst b/akka-docs/java/untyped-actors.rst index b24b1d6e6c..7e0d788590 100644 --- a/akka-docs/java/untyped-actors.rst +++ b/akka-docs/java/untyped-actors.rst @@ -485,6 +485,16 @@ Use it like this: .. includecode:: code/akka/docs/actor/UntypedActorDocTestBase.java :include: import-actors,poison-pill +Graceful Stop +------------- + +:meth:`gracefulStop` is useful if you need to wait for termination or compose ordered +termination of several actors: + +.. includecode:: code/akka/docs/actor/UntypedActorDocTestBase.java + :include: import-gracefulStop,gracefulStop + + .. _UntypedActor.HotSwap: HotSwap diff --git a/akka-docs/scala/actors.rst b/akka-docs/scala/actors.rst index 204aa3ce56..558b50fac8 100644 --- a/akka-docs/scala/actors.rst +++ b/akka-docs/scala/actors.rst @@ -538,6 +538,15 @@ stop the actor when the message is processed. ``PoisonPill`` is enqueued as ordinary messages and will be handled after messages that were already queued in the mailbox. +Graceful Stop +------------- + +:meth:`gracefulStop` is useful if you need to wait for termination or compose ordered +termination of several actors: + +.. includecode:: code/akka/docs/actor/ActorDocSpec.scala#gracefulStop + + .. _Actor.HotSwap: Become/Unbecome diff --git a/akka-docs/scala/code/akka/docs/actor/ActorDocSpec.scala b/akka-docs/scala/code/akka/docs/actor/ActorDocSpec.scala index cdba3d07f3..20ac33480b 100644 --- a/akka-docs/scala/code/akka/docs/actor/ActorDocSpec.scala +++ b/akka-docs/scala/code/akka/docs/actor/ActorDocSpec.scala @@ -8,13 +8,7 @@ import akka.actor.Actor import akka.actor.Props import akka.event.Logging import akka.dispatch.Future - -//#imports1 - -//#imports2 import akka.actor.ActorSystem -//#imports2 - import org.scalatest.{ BeforeAndAfterAll, WordSpec } import org.scalatest.matchers.MustMatchers import akka.testkit._ @@ -114,7 +108,6 @@ object SwapperApp extends App { //#swapper //#receive-orElse -import akka.actor.Actor.Receive abstract class GenericActor extends Actor { // to be defined in subclassing actor @@ -317,4 +310,22 @@ class ActorDocSpec extends AkkaSpec(Map("akka.loglevel" -> "INFO")) { a ! "kill" expectMsg("finished") } + + "using pattern gracefulStop" in { + val actorRef = system.actorOf(Props[MyActor]) + //#gracefulStop + import akka.pattern.gracefulStop + import akka.dispatch.Await + import akka.actor.ActorTimeoutException + + try { + val stopped: Future[Boolean] = gracefulStop(actorRef, 5 seconds)(system) + Await.result(stopped, 6 seconds) + // the actor has been stopped + } catch { + case e: ActorTimeoutException ⇒ // the actor wasn't stopped within 5 seconds + } + //#gracefulStop + + } }