diff --git a/akka-actor-typed-tests/src/test/java/jdocs/akka/typed/GracefulStopDocTest.java b/akka-actor-typed-tests/src/test/java/jdocs/akka/typed/GracefulStopDocTest.java new file mode 100644 index 0000000000..447ce0bb49 --- /dev/null +++ b/akka-actor-typed-tests/src/test/java/jdocs/akka/typed/GracefulStopDocTest.java @@ -0,0 +1,104 @@ +/** + * Copyright (C) 2017 Lightbend Inc. + */ +package jdocs.akka.typed; + +//#imports + +import akka.actor.typed.ActorSystem; +import akka.actor.typed.Behavior; +import akka.actor.typed.PostStop; +import akka.actor.typed.javadsl.Behaviors; +import scala.concurrent.Await; +import scala.concurrent.duration.Duration; + +import java.util.concurrent.TimeUnit; + +//#imports + +public class GracefulStopDocTest { + + //#master-actor + + public abstract static class JobControl { + // no instances of this class, it's only a name space for messages + // and static methods + private JobControl() { + } + + static interface JobControlLanguage { + } + + public static final class SpawnJob implements JobControlLanguage { + public final String name; + + public SpawnJob(String name) { + this.name = name; + } + } + + public static final class GracefulShutdown implements JobControlLanguage { + + public GracefulShutdown() { + } + } + + public static final Behavior mcpa = Behaviors.immutable(JobControlLanguage.class) + .onMessage(SpawnJob.class, (ctx, msg) -> { + ctx.getSystem().log().info("Spawning job {}!", msg.name); + ctx.spawn(Job.job(msg.name), msg.name); + return Behaviors.same(); + }) + .onSignal(PostStop.class, (ctx, signal) -> { + ctx.getSystem().log().info("Master Control Programme stopped"); + return Behaviors.same(); + }) + .onMessage(GracefulShutdown.class, (ctx, msg) -> { + ctx.getSystem().log().info("Initiating graceful shutdown..."); + + // perform graceful stop, executing cleanup before final system termination + // behavior executing cleanup is passed as a parameter to Actor.stopped + return Behaviors.stopped(Behaviors.onSignal((context, PostStop) -> { + context.getSystem().log().info("Cleanup!"); + return Behaviors.same(); + })); + }) + .onSignal(PostStop.class, (ctx, signal) -> { + ctx.getSystem().log().info("Master Control Programme stopped"); + return Behaviors.same(); + }) + .build(); + } + //#master-actor + + public static void main(String[] args) throws Exception { + //#graceful-shutdown + + final ActorSystem system = + ActorSystem.create(JobControl.mcpa, "B6700"); + + system.tell(new JobControl.SpawnJob("a")); + system.tell(new JobControl.SpawnJob("b")); + + // sleep here to allow time for the new actors to be started + Thread.sleep(100); + + system.tell(new JobControl.GracefulShutdown()); + + Await.result(system.whenTerminated(), Duration.create(3, TimeUnit.SECONDS)); + //#graceful-shutdown + } + + //#worker-actor + + public static class Job { + public static Behavior job(String name) { + return Behaviors.onSignal((ctx, PostStop) -> { + ctx.getSystem().log().info("Worker {} stopped", name); + return Behaviors.same(); + }); + + } + } + //#worker-actor +} diff --git a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/scaladsl/GracefulStopSpec.scala b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/scaladsl/GracefulStopSpec.scala new file mode 100644 index 0000000000..4b3ecee788 --- /dev/null +++ b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/scaladsl/GracefulStopSpec.scala @@ -0,0 +1,70 @@ +/** + * Copyright (C) 2017 Lightbend Inc. + */ +package akka.actor.typed +package scaladsl + +import akka.Done +import akka.NotUsed +import akka.testkit.typed.TestKit +import akka.testkit.typed.scaladsl.TestProbe + +final class GracefulStopSpec extends TestKit with TypedAkkaSpecWithShutdown { + + "Graceful stop" must { + + "properly stop the children and perform the cleanup" in { + val probe = TestProbe[String]("probe") + + val behavior = + Behaviors.deferred[akka.NotUsed] { context ⇒ + val c1 = context.spawn[NotUsed](Behaviors.onSignal { + case (_, PostStop) ⇒ + probe.ref ! "child-done" + Behaviors.stopped + }, "child1") + + val c2 = context.spawn[NotUsed](Behaviors.onSignal { + case (_, PostStop) ⇒ + probe.ref ! "child-done" + Behaviors.stopped + }, "child2") + + Behaviors.stopped { + Behaviors.onSignal { + case (ctx, PostStop) ⇒ + // cleanup function body + probe.ref ! "parent-done" + Behaviors.same + } + } + } + + spawn(behavior) + probe.expectMsg("child-done") + probe.expectMsg("child-done") + probe.expectMsg("parent-done") + } + + "properly perform the cleanup and stop itself for no children case" in { + val probe = TestProbe[Done]("probe") + + val behavior = + Behaviors.deferred[akka.NotUsed] { context ⇒ + // do not spawn any children + Behaviors.stopped { + Behaviors.onSignal { + case (ctx, PostStop) ⇒ + // cleanup function body + probe.ref ! Done + Behaviors.same + } + } + } + + spawn(behavior) + probe.expectMsg(Done) + } + } + +} diff --git a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/scaladsl/ImmutablePartialSpec.scala b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/scaladsl/ImmutablePartialSpec.scala index 11e6e229ca..f3fbd5e4a4 100644 --- a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/scaladsl/ImmutablePartialSpec.scala +++ b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/scaladsl/ImmutablePartialSpec.scala @@ -4,11 +4,9 @@ package akka.actor.typed package scaladsl -import akka.testkit.typed.{ BehaviorTestkit, TestKit, TestKitSettings } +import akka.testkit.typed.TestKit import akka.testkit.typed.scaladsl.TestProbe -import scala.concurrent.duration.DurationInt - class ImmutablePartialSpec extends TestKit with TypedAkkaSpecWithShutdown { "An immutable partial" must { @@ -21,14 +19,12 @@ class ImmutablePartialSpec extends TestKit with TypedAkkaSpecWithShutdown { probe.ref ! Command2 Behaviors.same } - val testkit = BehaviorTestkit(behavior) + val actor = spawn(behavior) - testkit.run(Command1) - testkit.currentBehavior shouldBe behavior + actor ! Command1 probe.expectNoMessage() - testkit.run(Command2) - testkit.currentBehavior shouldBe behavior + actor ! Command2 probe.expectMsg(Command2) } } diff --git a/akka-actor-typed-tests/src/test/scala/docs/akka/typed/GracefulStopDocSpec.scala b/akka-actor-typed-tests/src/test/scala/docs/akka/typed/GracefulStopDocSpec.scala new file mode 100644 index 0000000000..3a096079b8 --- /dev/null +++ b/akka-actor-typed-tests/src/test/scala/docs/akka/typed/GracefulStopDocSpec.scala @@ -0,0 +1,119 @@ +/** + * Copyright (C) 2014-2017 Lightbend Inc. + */ +package docs.akka.typed + +//#imports +import scala.concurrent.Await +import scala.concurrent.duration._ + +import akka.actor.typed.ActorSystem +import akka.actor.typed.PostStop +import akka.actor.typed.scaladsl.Behaviors +import akka.event.LoggingAdapter + +//#imports + +import akka.actor.typed.TypedAkkaSpecWithShutdown +import akka.testkit.typed.TestKit + +object GracefulStopDocSpec { + + //#master-actor + + object MasterControlProgramActor { + sealed trait JobControlLanguage + final case class SpawnJob(name: String) extends JobControlLanguage + final case object GracefulShutdown extends JobControlLanguage + + // Predefined cleanup operation + def cleanup(log: LoggingAdapter): Unit = log.info("Cleaning up!") + + val mcpa = Behaviors.immutable[JobControlLanguage] { (ctx, msg) ⇒ + msg match { + case SpawnJob(jobName) ⇒ + ctx.system.log.info("Spawning job {}!", jobName) + ctx.spawn(Job.job(jobName), name = jobName) + Behaviors.same + case GracefulShutdown ⇒ + ctx.system.log.info("Initiating graceful shutdown...") + // perform graceful stop, executing cleanup before final system termination + // behavior executing cleanup is passed as a parameter to Actor.stopped + Behaviors.stopped { + Behaviors.onSignal { + case (context, PostStop) ⇒ + cleanup(context.system.log) + Behaviors.same + } + } + } + }.onSignal { + case (ctx, PostStop) ⇒ + ctx.system.log.info("MCPA stopped") + Behaviors.same + } + } + //#master-actor + + //#worker-actor + + object Job { + import GracefulStopDocSpec.MasterControlProgramActor.JobControlLanguage + + def job(name: String) = Behaviors.onSignal[JobControlLanguage] { + case (ctx, PostStop) ⇒ + ctx.system.log.info("Worker {} stopped", name) + Behaviors.same + } + } + //#worker-actor + +} + +class GracefulStopDocSpec extends TestKit with TypedAkkaSpecWithShutdown { + + import GracefulStopDocSpec._ + + "Graceful stop example" must { + + "start some workers" in { + //#start-workers + import MasterControlProgramActor._ + + val system: ActorSystem[JobControlLanguage] = ActorSystem(mcpa, "B6700") + + system ! SpawnJob("a") + system ! SpawnJob("b") + + // sleep here to allow time for the new actors to be started + Thread.sleep(100) + + // brutally stop the system + system.terminate() + + Await.result(system.whenTerminated, 3.seconds) + //#start-workers + } + + "gracefully stop workers and master" in { + //#graceful-shutdown + + import MasterControlProgramActor._ + + val system: ActorSystem[JobControlLanguage] = ActorSystem(mcpa, "B7700") + + system ! SpawnJob("a") + system ! SpawnJob("b") + + Thread.sleep(100) + + // gracefully stop the system + system ! GracefulShutdown + + Thread.sleep(100) + + Await.result(system.whenTerminated, 3.seconds) + //#graceful-shutdown + } + } +} diff --git a/akka-docs/src/main/paradox/actor-lifecycle-typed.md b/akka-docs/src/main/paradox/actor-lifecycle-typed.md new file mode 100644 index 0000000000..d5392ff13f --- /dev/null +++ b/akka-docs/src/main/paradox/actor-lifecycle-typed.md @@ -0,0 +1,38 @@ +# Actor lifecycle + +TODO intro + +## Creating Actors + +TODO + +## Stopping Actors + +An actor can stop itself by returning `Behaviors.stopped` as the next behavior. + +Child actors can be forced to be stopped after it finishes processing its current message by using the +`stop` method of the `ActorContext` from the parent actor. Only child actors can be stopped in that way. + +The child actors will be stopped as part of the shutdown procedure of the parent. + +The `PostStop` signal that results from stopping an actor can be used for cleaning up resources. Note that +a behavior that handles such `PostStop` signal can optionally be defined as a parameter to `Behaviors.stopped` +if different actions is needed when the actor gracefully stops itself from when it is stopped abruptly. + +Here is an illustrating example: + +Scala +: @@snip [IntroSpec.scala]($akka$/akka-actor-typed-tests/src/test/scala/docs/akka/typed/GracefulStopDocSpec.scala) { + #imports + #master-actor + #worker-actor + #graceful-shutdown + } + +Java +: @@snip [IntroSpec.scala]($akka$/akka-actor-typed-tests/src/test/java/jdocs/akka/typed/GracefulStopDocTest.java) { + #imports + #master-actor + #worker-actor + #graceful-shutdown + } diff --git a/akka-docs/src/main/paradox/index-typed.md b/akka-docs/src/main/paradox/index-typed.md index 364484c68c..160f84f8f5 100644 --- a/akka-docs/src/main/paradox/index-typed.md +++ b/akka-docs/src/main/paradox/index-typed.md @@ -6,6 +6,7 @@ * [actors](actors-typed.md) * [coexisting](coexisting.md) +* [actor-lifecycle](actor-lifecycle-typed.md) * [fault-tolerance](fault-tolerance-typed.md) * [actor-discovery](actor-discovery-typed.md) * [cluster](cluster-typed.md) @@ -13,4 +14,4 @@ * [persistence](persistence-typed.md) * [testing](testing-typed.md) -@@@ \ No newline at end of file +@@@