diff --git a/akka-docs/src/main/paradox/coordinated-shutdown.md b/akka-docs/src/main/paradox/coordinated-shutdown.md index 6fb5e5808d..28b366275b 100644 --- a/akka-docs/src/main/paradox/coordinated-shutdown.md +++ b/akka-docs/src/main/paradox/coordinated-shutdown.md @@ -1,37 +1,39 @@ # Coordinated Shutdown -Under normal conditions when `ActorSystem` is terminated or the JVM process is shut down certain +Under normal conditions, when an `ActorSystem` is terminated or the JVM process is shut down, certain actors and services will be stopped in a specific order. -This is handled by an extension named `CoordinatedShutdown`. It will run the registered tasks -during the shutdown process. The order of the shutdown phases is defined in configuration `akka.coordinated-shutdown.phases`. -The default phases are defined as: +The @apidoc[CoordinatedShutdown$] extension registers internal and user-defined tasks to be executed during the shutdown process. The tasks are grouped in configuration-defined "phases" which define the shutdown order. -@@snip [reference.conf](/akka-actor/src/main/resources/reference.conf) { #coordinated-shutdown-phases } - -More phases can be added in the application's configuration if needed by overriding a phase with an -additional `depends-on`. Especially the phases `before-service-unbind`, `before-cluster-shutdown` and +Especially the phases `before-service-unbind`, `before-cluster-shutdown` and `before-actor-system-terminate` are intended for application specific phases or tasks. +The order of the shutdown phases is defined in configuration `akka.coordinated-shutdown.phases`. See the default phases in the `reference.conf` tab: + +Most relevant default phases +: | Phase | Description | +|-------------|----------------------------------------------| +| before-service-unbind | The first pre-defined phase during shutdown. | +| before-cluster-shutdown | Phase for custom application tasks that are to be run after service shutdown and before cluster shutdown. | +| before-actor-system-terminate | Phase for custom application tasks that are to be run after cluster shutdown and before `ActorSystem` termination. | + +reference.conf (HOCON) +: @@snip [reference.conf](/akka-actor/src/main/resources/reference.conf) { #coordinated-shutdown-phases } + +More phases can be added in the application's `application.conf` if needed by overriding a phase with an +additional `depends-on`. + The default phases are defined in a single linear order, but the phases can be ordered as a directed acyclic graph (DAG) by defining the dependencies between the phases. The phases are ordered with [topological](https://en.wikipedia.org/wiki/Topological_sorting) sort of the DAG. -Tasks can be added to a phase with: +Tasks can be added to a phase like in this example which allows a certain actor to react before termination starts: Scala -: @@snip [ActorDocSpec.scala](/akka-docs/src/test/scala/docs/actor/ActorDocSpec.scala) { #coordinated-shutdown-addTask } +: @@snip [snip](/akka-docs/src/test/scala/docs/actor/typed/CoordinatedActorShutdownSpec.scala) { #coordinated-shutdown-addTask } Java -: @@snip [ActorDocTest.java](/akka-docs/src/test/java/jdocs/actor/ActorDocTest.java) { #coordinated-shutdown-addTask } - -If cancellation of previously added tasks is required: - -Scala -: @@snip [ActorDocSpec.scala](/akka-docs/src/test/scala/docs/actor/ActorDocSpec.scala) { #coordinated-shutdown-cancellable } - -Java -: @@snip [ActorDocTest.java](/akka-docs/src/test/java/jdocs/actor/ActorDocTest.java) { #coordinated-shutdown-cancellable } +: @@snip [snip](/akka-docs/src/test/java/jdocs/actor/typed/CoordinatedActorShutdownTest.java) { #coordinated-shutdown-addTask } The returned @scala[`Future[Done]`] @java[`CompletionStage`] should be completed when the task is completed. The task name parameter is only used for debugging/logging. @@ -43,9 +45,17 @@ If tasks are not completed within a configured timeout (see @ref:[reference.conf the next phase will be started anyway. It is possible to configure `recover=off` for a phase to abort the rest of the shutdown process if a task fails or is not completed within the timeout. +If cancellation of previously added tasks is required: + +Scala +: @@snip [snip](/akka-docs/src/test/scala/docs/actor/typed/CoordinatedActorShutdownSpec.scala) { #coordinated-shutdown-cancellable } + +Java +: @@snip [snip](/akka-docs/src/test/java/jdocs/actor/typed/CoordinatedActorShutdownTest.java) { #coordinated-shutdown-cancellable } + In the above example, it may be more convenient to simply stop the actor when it's done shutting down, rather than send back a done message, and for the shutdown task to not complete until the actor is terminated. A convenience method is provided that adds a task that sends -a message to the actor and then watches its termination: +a message to the actor and then watches its termination (there is currently no corresponding functionality for the new actors API @github[see #29056](#29056)): Scala : @@snip [ActorDocSpec.scala](/akka-docs/src/test/scala/docs/actor/ActorDocSpec.scala) { #coordinated-shutdown-addActorTerminationTask } @@ -57,14 +67,14 @@ Tasks should typically be registered as early as possible after system startup. the coordinated shutdown tasks that have been registered will be performed but tasks that are added too late will not be run. -To start the coordinated shutdown process you can invoke @scala[`run`] @java[`runAll`] on the `CoordinatedShutdown` -extension: +To start the coordinated shutdown process you can either invoke `terminate()` on the `ActorSystem`, or @scala[`run`] @java[`runAll`] on the `CoordinatedShutdown` +extension and pass it a class implementing @apidoc[CoordinatedShutdown.Reason] for informational purposes: Scala -: @@snip [ActorDocSpec.scala](/akka-docs/src/test/scala/docs/actor/ActorDocSpec.scala) { #coordinated-shutdown-run } +: @@snip [snip](/akka-docs/src/test/scala/docs/actor/typed/CoordinatedActorShutdownSpec.scala) { #coordinated-shutdown-run } Java -: @@snip [ActorDocTest.java](/akka-docs/src/test/java/jdocs/actor/ActorDocTest.java) { #coordinated-shutdown-run } +: @@snip [snip](/akka-docs/src/test/java/jdocs/actor/typed/CoordinatedActorShutdownTest.java) { #coordinated-shutdown-run } It's safe to call the @scala[`run`] @java[`runAll`] method multiple times. It will only run once. @@ -76,7 +86,7 @@ To enable a hard `System.exit` as a final action you can configure: akka.coordinated-shutdown.exit-jvm = on ``` -The coordinated shutdown process can also be started by calling `ActorSystem.terminate()`. +The coordinated shutdown process is also started once the actor system's root actor is stopped. When using @ref:[Akka Cluster](cluster-usage.md) the `CoordinatedShutdown` will automatically run when the cluster node sees itself as `Exiting`, i.e. leaving from another node will trigger @@ -96,10 +106,10 @@ If you have application specific JVM shutdown hooks it's recommended that you re those shutting down Akka Remoting (Artery). Scala -: @@snip [ActorDocSpec.scala](/akka-docs/src/test/scala/docs/actor/ActorDocSpec.scala) { #coordinated-shutdown-jvm-hook } +: @@snip [snip](/akka-docs/src/test/scala/docs/actor/typed/CoordinatedActorShutdownSpec.scala) { #coordinated-shutdown-jvm-hook } Java -: @@snip [ActorDocTest.java](/akka-docs/src/test/java/jdocs/actor/ActorDocTest.java) { #coordinated-shutdown-jvm-hook } +: @@snip [snip](/akka-docs/src/test/java/jdocs/actor/typed/CoordinatedActorShutdownTest.java) { #coordinated-shutdown-jvm-hook } For some tests it might be undesired to terminate the `ActorSystem` via `CoordinatedShutdown`. You can disable that by adding the following to the configuration of the `ActorSystem` that is diff --git a/akka-docs/src/test/java/jdocs/actor/ActorDocTest.java b/akka-docs/src/test/java/jdocs/actor/ActorDocTest.java index c31b1d191a..ebe238eefc 100644 --- a/akka-docs/src/test/java/jdocs/actor/ActorDocTest.java +++ b/akka-docs/src/test/java/jdocs/actor/ActorDocTest.java @@ -847,51 +847,11 @@ public class ActorDocTest extends AbstractJavaTest { }; } - private CompletionStage cleanup() { - return null; - } - - @Test - public void coordinatedShutdown() { - final ActorRef someActor = system.actorOf(Props.create(FirstActor.class)); - // #coordinated-shutdown-addTask - CoordinatedShutdown.get(system) - .addTask( - CoordinatedShutdown.PhaseBeforeServiceUnbind(), - "someTaskName", - () -> { - return akka.pattern.Patterns.ask(someActor, "stop", Duration.ofSeconds(5)) - .thenApply(reply -> Done.getInstance()); - }); - // #coordinated-shutdown-addTask - - // #coordinated-shutdown-cancellable - Cancellable cancellable = - CoordinatedShutdown.get(system) - .addCancellableTask( - CoordinatedShutdown.PhaseBeforeServiceUnbind(), "someTaskCleanup", () -> cleanup()); - // much later... - cancellable.cancel(); - // #coordinated-shutdown-cancellable - - // #coordinated-shutdown-jvm-hook - CoordinatedShutdown.get(system) - .addJvmShutdownHook(() -> System.out.println("custom JVM shutdown hook...")); - // #coordinated-shutdown-jvm-hook - - // don't run this - if (false) { - // #coordinated-shutdown-run - CompletionStage done = - CoordinatedShutdown.get(system).runAll(CoordinatedShutdown.unknownReason()); - // #coordinated-shutdown-run - } - } - @Test public void coordinatedShutdownActorTermination() { ActorRef someActor = system.actorOf(Props.create(FirstActor.class)); someActor.tell(PoisonPill.getInstance(), ActorRef.noSender()); + // https://github.com/akka/akka/issues/29056 // #coordinated-shutdown-addActorTerminationTask CoordinatedShutdown.get(system) .addActorTerminationTask( diff --git a/akka-docs/src/test/java/jdocs/actor/typed/CoordinatedActorShutdownTest.java b/akka-docs/src/test/java/jdocs/actor/typed/CoordinatedActorShutdownTest.java new file mode 100644 index 0000000000..c4690305eb --- /dev/null +++ b/akka-docs/src/test/java/jdocs/actor/typed/CoordinatedActorShutdownTest.java @@ -0,0 +1,133 @@ +/* + * Copyright (C) 2020 Lightbend Inc. + */ + +package jdocs.actor.typed; + +import akka.Done; +import akka.actor.Cancellable; +import akka.actor.CoordinatedShutdown; +import akka.actor.typed.ActorRef; +import akka.actor.typed.ActorSystem; +import akka.actor.typed.Behavior; +import akka.actor.typed.javadsl.*; +// #coordinated-shutdown-addTask +import static akka.actor.typed.javadsl.AskPattern.ask; + +// #coordinated-shutdown-addTask + +import java.time.Duration; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; + +public class CoordinatedActorShutdownTest { + + // #coordinated-shutdown-addTask + public static class MyActor extends AbstractBehavior { + interface Messages {} + + // ... + + static final class Stop implements Messages { + final ActorRef replyTo; + + Stop(ActorRef replyTo) { + this.replyTo = replyTo; + } + } + // #coordinated-shutdown-addTask + + public static Behavior create() { + return Behaviors.setup(MyActor::new); + } + + private MyActor(ActorContext context) { + super(context); + } + + // #coordinated-shutdown-addTask + @Override + public Receive createReceive() { + return newReceiveBuilder().onMessage(Stop.class, this::stop).build(); + } + + private Behavior stop(Stop stop) { + // shut down the actor internal + // ... + stop.replyTo.tell(Done.done()); + return Behaviors.stopped(); + } + } + + // #coordinated-shutdown-addTask + + public static class Root extends AbstractBehavior { + + public static Behavior create() { + return Behaviors.setup( + context -> { + ActorRef myActor = context.spawn(MyActor.create(), "my-actor"); + ActorSystem system = context.getSystem(); + // #coordinated-shutdown-addTask + CoordinatedShutdown.get(system) + .addTask( + CoordinatedShutdown.PhaseBeforeServiceUnbind(), + "someTaskName", + () -> + ask(myActor, MyActor.Stop::new, Duration.ofSeconds(5), system.scheduler())); + // #coordinated-shutdown-addTask + return Behaviors.empty(); + }); + } + + private Root(ActorContext context) { + super(context); + } + + @Override + public Receive createReceive() { + return newReceiveBuilder().build(); + } + } + + private CompletionStage cleanup() { + return CompletableFuture.completedFuture(Done.done()); + } + + public void mount() { + ActorSystem system = ActorSystem.create(Root.create(), "main"); + + // #coordinated-shutdown-cancellable + Cancellable cancellable = + CoordinatedShutdown.get(system) + .addCancellableTask( + CoordinatedShutdown.PhaseBeforeServiceUnbind(), "someTaskCleanup", () -> cleanup()); + // much later... + cancellable.cancel(); + // #coordinated-shutdown-cancellable + + // #coordinated-shutdown-jvm-hook + CoordinatedShutdown.get(system) + .addJvmShutdownHook(() -> System.out.println("custom JVM shutdown hook...")); + // #coordinated-shutdown-jvm-hook + + // don't run this + if (false) { + // #coordinated-shutdown-run + // shut down with `ActorSystemTerminateReason` + system.terminate(); + + // or define a specific reason + class UserInitiatedShutdown implements CoordinatedShutdown.Reason { + @Override + public String toString() { + return "UserInitiatedShutdown"; + } + } + + CompletionStage done = + CoordinatedShutdown.get(system).runAll(new UserInitiatedShutdown()); + // #coordinated-shutdown-run + } + } +} diff --git a/akka-docs/src/test/scala/docs/actor/ActorDocSpec.scala b/akka-docs/src/test/scala/docs/actor/ActorDocSpec.scala index 49f8176e9b..8362b3f01f 100644 --- a/akka-docs/src/test/scala/docs/actor/ActorDocSpec.scala +++ b/akka-docs/src/test/scala/docs/actor/ActorDocSpec.scala @@ -724,34 +724,8 @@ class ActorDocSpec extends AkkaSpec(""" } "using CoordinatedShutdown" in { - val someActor = system.actorOf(Props(classOf[Replier], this)) - //#coordinated-shutdown-addTask - CoordinatedShutdown(system).addTask(CoordinatedShutdown.PhaseBeforeServiceUnbind, "someTaskName") { () => - import akka.pattern.ask - import system.dispatcher - implicit val timeout = Timeout(5.seconds) - (someActor ? "stop").map(_ => Done) - } - //#coordinated-shutdown-addTask - - { - def cleanup(): Unit = {} - import system.dispatcher - //#coordinated-shutdown-cancellable - val c = CoordinatedShutdown(system).addCancellableTask(CoordinatedShutdown.PhaseBeforeServiceUnbind, "cleanup") { - () => - Future { - cleanup() - Done - } - } - - // much later... - c.cancel() - //#coordinated-shutdown-cancellable - } - - { + // other snippets moved to docs.actor.typed.CoordinatedActorShutdownSpec + { // https://github.com/akka/akka/issues/29056 val someActor = system.actorOf(Props(classOf[Replier], this)) someActor ! PoisonPill //#coordinated-shutdown-addActorTerminationTask @@ -762,19 +736,6 @@ class ActorDocSpec extends AkkaSpec(""" Some("stop")) //#coordinated-shutdown-addActorTerminationTask } - - //#coordinated-shutdown-jvm-hook - CoordinatedShutdown(system).addJvmShutdownHook { - println("custom JVM shutdown hook...") - } - //#coordinated-shutdown-jvm-hook - - // don't run this - def dummy(): Unit = { - //#coordinated-shutdown-run - val done: Future[Done] = CoordinatedShutdown(system).run(CoordinatedShutdown.UnknownReason) - //#coordinated-shutdown-run - } } } diff --git a/akka-docs/src/test/scala/docs/actor/typed/CoordinatedActorShutdownSpec.scala b/akka-docs/src/test/scala/docs/actor/typed/CoordinatedActorShutdownSpec.scala new file mode 100644 index 0000000000..c8728c27ed --- /dev/null +++ b/akka-docs/src/test/scala/docs/actor/typed/CoordinatedActorShutdownSpec.scala @@ -0,0 +1,91 @@ +/* + * Copyright (C) 2020 Lightbend Inc. + */ + +package docs.actor.typed + +import akka.Done +import akka.actor.{ Cancellable, CoordinatedShutdown } +import akka.actor.typed.{ ActorRef, ActorSystem, Behavior } +import akka.actor.typed.scaladsl.Behaviors +import akka.actor.typed.scaladsl.AskPattern._ +import akka.util.Timeout + +import scala.concurrent.Future +import scala.concurrent.duration._ + +class CoordinatedActorShutdownSpec { + + //#coordinated-shutdown-addTask + object MyActor { + + trait Messages + case class Stop(replyTo: ActorRef[Done]) extends Messages + + def behavior: Behavior[Messages] = + Behaviors.receiveMessage { + // ... + case Stop(replyTo) => + // shut down the actor internals + // .. + replyTo.tell(Done) + Behaviors.stopped + } + } + + //#coordinated-shutdown-addTask + + trait Message + + def root: Behavior[Message] = Behaviors.setup[Message] { context => + implicit val system = context.system + val myActor = context.spawn(MyActor.behavior, "my-actor") + //#coordinated-shutdown-addTask + CoordinatedShutdown(context.system).addTask(CoordinatedShutdown.PhaseBeforeServiceUnbind, "someTaskName") { () => + implicit val timeout = Timeout(5.seconds) + myActor.ask(MyActor.Stop) + } + //#coordinated-shutdown-addTask + + Behaviors.empty + + } + + def showCancel: Unit = { + val system = ActorSystem(root, "main") + + def cleanup(): Unit = {} + import system.executionContext + //#coordinated-shutdown-cancellable + val c: Cancellable = + CoordinatedShutdown(system).addCancellableTask(CoordinatedShutdown.PhaseBeforeServiceUnbind, "cleanup") { () => + Future { + cleanup() + Done + } + } + + // much later... + c.cancel() + //#coordinated-shutdown-cancellable + + //#coordinated-shutdown-jvm-hook + CoordinatedShutdown(system).addJvmShutdownHook { + println("custom JVM shutdown hook...") + } + //#coordinated-shutdown-jvm-hook + + // don't run this + def dummy(): Unit = { + //#coordinated-shutdown-run + // shut down with `ActorSystemTerminateReason` + system.terminate() + + // or define a specific reason + case object UserInitiatedShutdown extends CoordinatedShutdown.Reason + + val done: Future[Done] = CoordinatedShutdown(system).run(UserInitiatedShutdown) + //#coordinated-shutdown-run + } + } +}