Docs: revise Coordinated Shutdown with new actors APIs (#29057)
This commit is contained in:
parent
a232b42abe
commit
8244f41d2e
5 changed files with 264 additions and 109 deletions
|
|
@ -1,37 +1,39 @@
|
|||
# Coordinated Shutdown
|
||||
|
||||
Under normal conditions when `ActorSystem` is terminated or the JVM process is shut down certain
|
||||
Under normal conditions, when an `ActorSystem` is terminated or the JVM process is shut down, certain
|
||||
actors and services will be stopped in a specific order.
|
||||
|
||||
This is handled by an extension named `CoordinatedShutdown`. It will run the registered tasks
|
||||
during the shutdown process. The order of the shutdown phases is defined in configuration `akka.coordinated-shutdown.phases`.
|
||||
The default phases are defined as:
|
||||
The @apidoc[CoordinatedShutdown$] extension registers internal and user-defined tasks to be executed during the shutdown process. The tasks are grouped in configuration-defined "phases" which define the shutdown order.
|
||||
|
||||
@@snip [reference.conf](/akka-actor/src/main/resources/reference.conf) { #coordinated-shutdown-phases }
|
||||
|
||||
More phases can be added in the application's configuration if needed by overriding a phase with an
|
||||
additional `depends-on`. Especially the phases `before-service-unbind`, `before-cluster-shutdown` and
|
||||
Especially the phases `before-service-unbind`, `before-cluster-shutdown` and
|
||||
`before-actor-system-terminate` are intended for application specific phases or tasks.
|
||||
|
||||
The order of the shutdown phases is defined in configuration `akka.coordinated-shutdown.phases`. See the default phases in the `reference.conf` tab:
|
||||
|
||||
Most relevant default phases
|
||||
: | Phase | Description |
|
||||
|-------------|----------------------------------------------|
|
||||
| before-service-unbind | The first pre-defined phase during shutdown. |
|
||||
| before-cluster-shutdown | Phase for custom application tasks that are to be run after service shutdown and before cluster shutdown. |
|
||||
| before-actor-system-terminate | Phase for custom application tasks that are to be run after cluster shutdown and before `ActorSystem` termination. |
|
||||
|
||||
reference.conf (HOCON)
|
||||
: @@snip [reference.conf](/akka-actor/src/main/resources/reference.conf) { #coordinated-shutdown-phases }
|
||||
|
||||
More phases can be added in the application's `application.conf` if needed by overriding a phase with an
|
||||
additional `depends-on`.
|
||||
|
||||
The default phases are defined in a single linear order, but the phases can be ordered as a
|
||||
directed acyclic graph (DAG) by defining the dependencies between the phases.
|
||||
The phases are ordered with [topological](https://en.wikipedia.org/wiki/Topological_sorting) sort of the DAG.
|
||||
|
||||
Tasks can be added to a phase with:
|
||||
Tasks can be added to a phase like in this example which allows a certain actor to react before termination starts:
|
||||
|
||||
Scala
|
||||
: @@snip [ActorDocSpec.scala](/akka-docs/src/test/scala/docs/actor/ActorDocSpec.scala) { #coordinated-shutdown-addTask }
|
||||
: @@snip [snip](/akka-docs/src/test/scala/docs/actor/typed/CoordinatedActorShutdownSpec.scala) { #coordinated-shutdown-addTask }
|
||||
|
||||
Java
|
||||
: @@snip [ActorDocTest.java](/akka-docs/src/test/java/jdocs/actor/ActorDocTest.java) { #coordinated-shutdown-addTask }
|
||||
|
||||
If cancellation of previously added tasks is required:
|
||||
|
||||
Scala
|
||||
: @@snip [ActorDocSpec.scala](/akka-docs/src/test/scala/docs/actor/ActorDocSpec.scala) { #coordinated-shutdown-cancellable }
|
||||
|
||||
Java
|
||||
: @@snip [ActorDocTest.java](/akka-docs/src/test/java/jdocs/actor/ActorDocTest.java) { #coordinated-shutdown-cancellable }
|
||||
: @@snip [snip](/akka-docs/src/test/java/jdocs/actor/typed/CoordinatedActorShutdownTest.java) { #coordinated-shutdown-addTask }
|
||||
|
||||
The returned @scala[`Future[Done]`] @java[`CompletionStage<Done>`] should be completed when the task is completed. The task name parameter
|
||||
is only used for debugging/logging.
|
||||
|
|
@ -43,9 +45,17 @@ If tasks are not completed within a configured timeout (see @ref:[reference.conf
|
|||
the next phase will be started anyway. It is possible to configure `recover=off` for a phase
|
||||
to abort the rest of the shutdown process if a task fails or is not completed within the timeout.
|
||||
|
||||
If cancellation of previously added tasks is required:
|
||||
|
||||
Scala
|
||||
: @@snip [snip](/akka-docs/src/test/scala/docs/actor/typed/CoordinatedActorShutdownSpec.scala) { #coordinated-shutdown-cancellable }
|
||||
|
||||
Java
|
||||
: @@snip [snip](/akka-docs/src/test/java/jdocs/actor/typed/CoordinatedActorShutdownTest.java) { #coordinated-shutdown-cancellable }
|
||||
|
||||
In the above example, it may be more convenient to simply stop the actor when it's done shutting down, rather than send back a done message,
|
||||
and for the shutdown task to not complete until the actor is terminated. A convenience method is provided that adds a task that sends
|
||||
a message to the actor and then watches its termination:
|
||||
a message to the actor and then watches its termination (there is currently no corresponding functionality for the new actors API @github[see #29056](#29056)):
|
||||
|
||||
Scala
|
||||
: @@snip [ActorDocSpec.scala](/akka-docs/src/test/scala/docs/actor/ActorDocSpec.scala) { #coordinated-shutdown-addActorTerminationTask }
|
||||
|
|
@ -57,14 +67,14 @@ Tasks should typically be registered as early as possible after system startup.
|
|||
the coordinated shutdown tasks that have been registered will be performed but tasks that are
|
||||
added too late will not be run.
|
||||
|
||||
To start the coordinated shutdown process you can invoke @scala[`run`] @java[`runAll`] on the `CoordinatedShutdown`
|
||||
extension:
|
||||
To start the coordinated shutdown process you can either invoke `terminate()` on the `ActorSystem`, or @scala[`run`] @java[`runAll`] on the `CoordinatedShutdown`
|
||||
extension and pass it a class implementing @apidoc[CoordinatedShutdown.Reason] for informational purposes:
|
||||
|
||||
Scala
|
||||
: @@snip [ActorDocSpec.scala](/akka-docs/src/test/scala/docs/actor/ActorDocSpec.scala) { #coordinated-shutdown-run }
|
||||
: @@snip [snip](/akka-docs/src/test/scala/docs/actor/typed/CoordinatedActorShutdownSpec.scala) { #coordinated-shutdown-run }
|
||||
|
||||
Java
|
||||
: @@snip [ActorDocTest.java](/akka-docs/src/test/java/jdocs/actor/ActorDocTest.java) { #coordinated-shutdown-run }
|
||||
: @@snip [snip](/akka-docs/src/test/java/jdocs/actor/typed/CoordinatedActorShutdownTest.java) { #coordinated-shutdown-run }
|
||||
|
||||
It's safe to call the @scala[`run`] @java[`runAll`] method multiple times. It will only run once.
|
||||
|
||||
|
|
@ -76,7 +86,7 @@ To enable a hard `System.exit` as a final action you can configure:
|
|||
akka.coordinated-shutdown.exit-jvm = on
|
||||
```
|
||||
|
||||
The coordinated shutdown process can also be started by calling `ActorSystem.terminate()`.
|
||||
The coordinated shutdown process is also started once the actor system's root actor is stopped.
|
||||
|
||||
When using @ref:[Akka Cluster](cluster-usage.md) the `CoordinatedShutdown` will automatically run
|
||||
when the cluster node sees itself as `Exiting`, i.e. leaving from another node will trigger
|
||||
|
|
@ -96,10 +106,10 @@ If you have application specific JVM shutdown hooks it's recommended that you re
|
|||
those shutting down Akka Remoting (Artery).
|
||||
|
||||
Scala
|
||||
: @@snip [ActorDocSpec.scala](/akka-docs/src/test/scala/docs/actor/ActorDocSpec.scala) { #coordinated-shutdown-jvm-hook }
|
||||
: @@snip [snip](/akka-docs/src/test/scala/docs/actor/typed/CoordinatedActorShutdownSpec.scala) { #coordinated-shutdown-jvm-hook }
|
||||
|
||||
Java
|
||||
: @@snip [ActorDocTest.java](/akka-docs/src/test/java/jdocs/actor/ActorDocTest.java) { #coordinated-shutdown-jvm-hook }
|
||||
: @@snip [snip](/akka-docs/src/test/java/jdocs/actor/typed/CoordinatedActorShutdownTest.java) { #coordinated-shutdown-jvm-hook }
|
||||
|
||||
For some tests it might be undesired to terminate the `ActorSystem` via `CoordinatedShutdown`.
|
||||
You can disable that by adding the following to the configuration of the `ActorSystem` that is
|
||||
|
|
|
|||
|
|
@ -847,51 +847,11 @@ public class ActorDocTest extends AbstractJavaTest {
|
|||
};
|
||||
}
|
||||
|
||||
private CompletionStage<Done> cleanup() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Test
|
||||
public void coordinatedShutdown() {
|
||||
final ActorRef someActor = system.actorOf(Props.create(FirstActor.class));
|
||||
// #coordinated-shutdown-addTask
|
||||
CoordinatedShutdown.get(system)
|
||||
.addTask(
|
||||
CoordinatedShutdown.PhaseBeforeServiceUnbind(),
|
||||
"someTaskName",
|
||||
() -> {
|
||||
return akka.pattern.Patterns.ask(someActor, "stop", Duration.ofSeconds(5))
|
||||
.thenApply(reply -> Done.getInstance());
|
||||
});
|
||||
// #coordinated-shutdown-addTask
|
||||
|
||||
// #coordinated-shutdown-cancellable
|
||||
Cancellable cancellable =
|
||||
CoordinatedShutdown.get(system)
|
||||
.addCancellableTask(
|
||||
CoordinatedShutdown.PhaseBeforeServiceUnbind(), "someTaskCleanup", () -> cleanup());
|
||||
// much later...
|
||||
cancellable.cancel();
|
||||
// #coordinated-shutdown-cancellable
|
||||
|
||||
// #coordinated-shutdown-jvm-hook
|
||||
CoordinatedShutdown.get(system)
|
||||
.addJvmShutdownHook(() -> System.out.println("custom JVM shutdown hook..."));
|
||||
// #coordinated-shutdown-jvm-hook
|
||||
|
||||
// don't run this
|
||||
if (false) {
|
||||
// #coordinated-shutdown-run
|
||||
CompletionStage<Done> done =
|
||||
CoordinatedShutdown.get(system).runAll(CoordinatedShutdown.unknownReason());
|
||||
// #coordinated-shutdown-run
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void coordinatedShutdownActorTermination() {
|
||||
ActorRef someActor = system.actorOf(Props.create(FirstActor.class));
|
||||
someActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
|
||||
// https://github.com/akka/akka/issues/29056
|
||||
// #coordinated-shutdown-addActorTerminationTask
|
||||
CoordinatedShutdown.get(system)
|
||||
.addActorTerminationTask(
|
||||
|
|
|
|||
|
|
@ -0,0 +1,133 @@
|
|||
/*
|
||||
* Copyright (C) 2020 Lightbend Inc. <https://www.lightbend.com>
|
||||
*/
|
||||
|
||||
package jdocs.actor.typed;
|
||||
|
||||
import akka.Done;
|
||||
import akka.actor.Cancellable;
|
||||
import akka.actor.CoordinatedShutdown;
|
||||
import akka.actor.typed.ActorRef;
|
||||
import akka.actor.typed.ActorSystem;
|
||||
import akka.actor.typed.Behavior;
|
||||
import akka.actor.typed.javadsl.*;
|
||||
// #coordinated-shutdown-addTask
|
||||
import static akka.actor.typed.javadsl.AskPattern.ask;
|
||||
|
||||
// #coordinated-shutdown-addTask
|
||||
|
||||
import java.time.Duration;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.CompletionStage;
|
||||
|
||||
public class CoordinatedActorShutdownTest {
|
||||
|
||||
// #coordinated-shutdown-addTask
|
||||
public static class MyActor extends AbstractBehavior<MyActor.Messages> {
|
||||
interface Messages {}
|
||||
|
||||
// ...
|
||||
|
||||
static final class Stop implements Messages {
|
||||
final ActorRef<Done> replyTo;
|
||||
|
||||
Stop(ActorRef<Done> replyTo) {
|
||||
this.replyTo = replyTo;
|
||||
}
|
||||
}
|
||||
// #coordinated-shutdown-addTask
|
||||
|
||||
public static Behavior<Messages> create() {
|
||||
return Behaviors.setup(MyActor::new);
|
||||
}
|
||||
|
||||
private MyActor(ActorContext<Messages> context) {
|
||||
super(context);
|
||||
}
|
||||
|
||||
// #coordinated-shutdown-addTask
|
||||
@Override
|
||||
public Receive<Messages> createReceive() {
|
||||
return newReceiveBuilder().onMessage(Stop.class, this::stop).build();
|
||||
}
|
||||
|
||||
private Behavior<Messages> stop(Stop stop) {
|
||||
// shut down the actor internal
|
||||
// ...
|
||||
stop.replyTo.tell(Done.done());
|
||||
return Behaviors.stopped();
|
||||
}
|
||||
}
|
||||
|
||||
// #coordinated-shutdown-addTask
|
||||
|
||||
public static class Root extends AbstractBehavior<Void> {
|
||||
|
||||
public static Behavior<Void> create() {
|
||||
return Behaviors.setup(
|
||||
context -> {
|
||||
ActorRef<MyActor.Messages> myActor = context.spawn(MyActor.create(), "my-actor");
|
||||
ActorSystem<Void> system = context.getSystem();
|
||||
// #coordinated-shutdown-addTask
|
||||
CoordinatedShutdown.get(system)
|
||||
.addTask(
|
||||
CoordinatedShutdown.PhaseBeforeServiceUnbind(),
|
||||
"someTaskName",
|
||||
() ->
|
||||
ask(myActor, MyActor.Stop::new, Duration.ofSeconds(5), system.scheduler()));
|
||||
// #coordinated-shutdown-addTask
|
||||
return Behaviors.empty();
|
||||
});
|
||||
}
|
||||
|
||||
private Root(ActorContext<Void> context) {
|
||||
super(context);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Receive<Void> createReceive() {
|
||||
return newReceiveBuilder().build();
|
||||
}
|
||||
}
|
||||
|
||||
private CompletionStage<Done> cleanup() {
|
||||
return CompletableFuture.completedFuture(Done.done());
|
||||
}
|
||||
|
||||
public void mount() {
|
||||
ActorSystem<Void> system = ActorSystem.create(Root.create(), "main");
|
||||
|
||||
// #coordinated-shutdown-cancellable
|
||||
Cancellable cancellable =
|
||||
CoordinatedShutdown.get(system)
|
||||
.addCancellableTask(
|
||||
CoordinatedShutdown.PhaseBeforeServiceUnbind(), "someTaskCleanup", () -> cleanup());
|
||||
// much later...
|
||||
cancellable.cancel();
|
||||
// #coordinated-shutdown-cancellable
|
||||
|
||||
// #coordinated-shutdown-jvm-hook
|
||||
CoordinatedShutdown.get(system)
|
||||
.addJvmShutdownHook(() -> System.out.println("custom JVM shutdown hook..."));
|
||||
// #coordinated-shutdown-jvm-hook
|
||||
|
||||
// don't run this
|
||||
if (false) {
|
||||
// #coordinated-shutdown-run
|
||||
// shut down with `ActorSystemTerminateReason`
|
||||
system.terminate();
|
||||
|
||||
// or define a specific reason
|
||||
class UserInitiatedShutdown implements CoordinatedShutdown.Reason {
|
||||
@Override
|
||||
public String toString() {
|
||||
return "UserInitiatedShutdown";
|
||||
}
|
||||
}
|
||||
|
||||
CompletionStage<Done> done =
|
||||
CoordinatedShutdown.get(system).runAll(new UserInitiatedShutdown());
|
||||
// #coordinated-shutdown-run
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -724,34 +724,8 @@ class ActorDocSpec extends AkkaSpec("""
|
|||
}
|
||||
|
||||
"using CoordinatedShutdown" in {
|
||||
val someActor = system.actorOf(Props(classOf[Replier], this))
|
||||
//#coordinated-shutdown-addTask
|
||||
CoordinatedShutdown(system).addTask(CoordinatedShutdown.PhaseBeforeServiceUnbind, "someTaskName") { () =>
|
||||
import akka.pattern.ask
|
||||
import system.dispatcher
|
||||
implicit val timeout = Timeout(5.seconds)
|
||||
(someActor ? "stop").map(_ => Done)
|
||||
}
|
||||
//#coordinated-shutdown-addTask
|
||||
|
||||
{
|
||||
def cleanup(): Unit = {}
|
||||
import system.dispatcher
|
||||
//#coordinated-shutdown-cancellable
|
||||
val c = CoordinatedShutdown(system).addCancellableTask(CoordinatedShutdown.PhaseBeforeServiceUnbind, "cleanup") {
|
||||
() =>
|
||||
Future {
|
||||
cleanup()
|
||||
Done
|
||||
}
|
||||
}
|
||||
|
||||
// much later...
|
||||
c.cancel()
|
||||
//#coordinated-shutdown-cancellable
|
||||
}
|
||||
|
||||
{
|
||||
// other snippets moved to docs.actor.typed.CoordinatedActorShutdownSpec
|
||||
{ // https://github.com/akka/akka/issues/29056
|
||||
val someActor = system.actorOf(Props(classOf[Replier], this))
|
||||
someActor ! PoisonPill
|
||||
//#coordinated-shutdown-addActorTerminationTask
|
||||
|
|
@ -762,19 +736,6 @@ class ActorDocSpec extends AkkaSpec("""
|
|||
Some("stop"))
|
||||
//#coordinated-shutdown-addActorTerminationTask
|
||||
}
|
||||
|
||||
//#coordinated-shutdown-jvm-hook
|
||||
CoordinatedShutdown(system).addJvmShutdownHook {
|
||||
println("custom JVM shutdown hook...")
|
||||
}
|
||||
//#coordinated-shutdown-jvm-hook
|
||||
|
||||
// don't run this
|
||||
def dummy(): Unit = {
|
||||
//#coordinated-shutdown-run
|
||||
val done: Future[Done] = CoordinatedShutdown(system).run(CoordinatedShutdown.UnknownReason)
|
||||
//#coordinated-shutdown-run
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -0,0 +1,91 @@
|
|||
/*
|
||||
* Copyright (C) 2020 Lightbend Inc. <https://www.lightbend.com>
|
||||
*/
|
||||
|
||||
package docs.actor.typed
|
||||
|
||||
import akka.Done
|
||||
import akka.actor.{ Cancellable, CoordinatedShutdown }
|
||||
import akka.actor.typed.{ ActorRef, ActorSystem, Behavior }
|
||||
import akka.actor.typed.scaladsl.Behaviors
|
||||
import akka.actor.typed.scaladsl.AskPattern._
|
||||
import akka.util.Timeout
|
||||
|
||||
import scala.concurrent.Future
|
||||
import scala.concurrent.duration._
|
||||
|
||||
class CoordinatedActorShutdownSpec {
|
||||
|
||||
//#coordinated-shutdown-addTask
|
||||
object MyActor {
|
||||
|
||||
trait Messages
|
||||
case class Stop(replyTo: ActorRef[Done]) extends Messages
|
||||
|
||||
def behavior: Behavior[Messages] =
|
||||
Behaviors.receiveMessage {
|
||||
// ...
|
||||
case Stop(replyTo) =>
|
||||
// shut down the actor internals
|
||||
// ..
|
||||
replyTo.tell(Done)
|
||||
Behaviors.stopped
|
||||
}
|
||||
}
|
||||
|
||||
//#coordinated-shutdown-addTask
|
||||
|
||||
trait Message
|
||||
|
||||
def root: Behavior[Message] = Behaviors.setup[Message] { context =>
|
||||
implicit val system = context.system
|
||||
val myActor = context.spawn(MyActor.behavior, "my-actor")
|
||||
//#coordinated-shutdown-addTask
|
||||
CoordinatedShutdown(context.system).addTask(CoordinatedShutdown.PhaseBeforeServiceUnbind, "someTaskName") { () =>
|
||||
implicit val timeout = Timeout(5.seconds)
|
||||
myActor.ask(MyActor.Stop)
|
||||
}
|
||||
//#coordinated-shutdown-addTask
|
||||
|
||||
Behaviors.empty
|
||||
|
||||
}
|
||||
|
||||
def showCancel: Unit = {
|
||||
val system = ActorSystem(root, "main")
|
||||
|
||||
def cleanup(): Unit = {}
|
||||
import system.executionContext
|
||||
//#coordinated-shutdown-cancellable
|
||||
val c: Cancellable =
|
||||
CoordinatedShutdown(system).addCancellableTask(CoordinatedShutdown.PhaseBeforeServiceUnbind, "cleanup") { () =>
|
||||
Future {
|
||||
cleanup()
|
||||
Done
|
||||
}
|
||||
}
|
||||
|
||||
// much later...
|
||||
c.cancel()
|
||||
//#coordinated-shutdown-cancellable
|
||||
|
||||
//#coordinated-shutdown-jvm-hook
|
||||
CoordinatedShutdown(system).addJvmShutdownHook {
|
||||
println("custom JVM shutdown hook...")
|
||||
}
|
||||
//#coordinated-shutdown-jvm-hook
|
||||
|
||||
// don't run this
|
||||
def dummy(): Unit = {
|
||||
//#coordinated-shutdown-run
|
||||
// shut down with `ActorSystemTerminateReason`
|
||||
system.terminate()
|
||||
|
||||
// or define a specific reason
|
||||
case object UserInitiatedShutdown extends CoordinatedShutdown.Reason
|
||||
|
||||
val done: Future[Done] = CoordinatedShutdown(system).run(UserInitiatedShutdown)
|
||||
//#coordinated-shutdown-run
|
||||
}
|
||||
}
|
||||
}
|
||||
Loading…
Add table
Add a link
Reference in a new issue