diff --git a/akka-actor-tests/src/test/scala/akka/actor/CoordinatedShutdownSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/CoordinatedShutdownSpec.scala index 7267459ec6..d008d8797c 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/CoordinatedShutdownSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/CoordinatedShutdownSpec.scala @@ -11,7 +11,7 @@ import scala.concurrent.Await import scala.concurrent.Future import akka.Done -import akka.testkit.{ AkkaSpec, EventFilter, TestKit } +import akka.testkit.{ AkkaSpec, EventFilter, TestKit, TestProbe } import com.typesafe.config.{ Config, ConfigFactory } import akka.actor.CoordinatedShutdown.Phase import akka.actor.CoordinatedShutdown.UnknownReason @@ -33,6 +33,7 @@ class CoordinatedShutdownSpec // some convenience to make the test readable def phase(dependsOn: String*): Phase = Phase(dependsOn.toSet, timeout = 10.seconds, recover = true, enabled = true) val emptyPhase: Phase = Phase(Set.empty, timeout = 10.seconds, recover = true, enabled = true) + val abortingPhase: Phase = Phase(Set.empty, timeout = 10.seconds, recover = false, enabled = true) private def checkTopologicalSort(phases: Map[String, Phase]): List[String] = { val result = CoordinatedShutdown.topologicalSort(phases) @@ -449,6 +450,83 @@ class CoordinatedShutdownSpec } } + def withCoordinatedShutdown(phases: Map[String, Phase])(block: CoordinatedShutdown => Unit): Unit = { + val co = new CoordinatedShutdown(extSys, phases) + try { + block(co) + } finally { + watch(co.terminationWatcher) ! PoisonPill + expectTerminated(co.terminationWatcher) + } + } + + "support actor termination tasks with a stop message" in { + val phases = Map("a" -> abortingPhase) + withCoordinatedShutdown(phases) { co => + val actorToWatch = TestProbe() + co.addActorTerminationTask("a", "a1", actorToWatch.ref, Some("stop")) + val result = co.run(UnknownReason) + actorToWatch.expectMsg("stop") + result.isReadyWithin(100.millis) should be(false) + actorToWatch.ref ! PoisonPill + result.futureValue should ===(Done) + } + } + + "support actor termination tasks without a stop message" in { + val phases = Map("a" -> abortingPhase) + withCoordinatedShutdown(phases) { co => + val actorToWatch = TestProbe() + co.addActorTerminationTask("a", "a1", actorToWatch.ref, None) + val result = co.run(UnknownReason) + actorToWatch.expectNoMessage(100.millis) + result.isReadyWithin(100.millis) should be(false) + actorToWatch.ref ! PoisonPill + result.futureValue should ===(Done) + } + } + + "support actor termination tasks for actors that are already shutdown" in { + val phases = Map("a" -> abortingPhase) + withCoordinatedShutdown(phases) { co => + val actorToWatch = TestProbe() + watch(actorToWatch.ref) + actorToWatch.ref ! PoisonPill + expectTerminated(actorToWatch.ref) + co.addActorTerminationTask("a", "a1", actorToWatch.ref, None) + val result = co.run(UnknownReason) + result.futureValue should ===(Done) + } + } + + "allow watching the same actor twice in the same phase" in { + val phases = Map("a" -> abortingPhase) + withCoordinatedShutdown(phases) { co => + val actorToWatch = TestProbe() + co.addActorTerminationTask("a", "a1", actorToWatch.ref, Some("stop1")) + co.addActorTerminationTask("a", "a2", actorToWatch.ref, Some("stop2")) + val result = co.run(UnknownReason) + actorToWatch.expectMsgAllOf("stop1", "stop2") + actorToWatch.ref ! PoisonPill + result.futureValue should ===(Done) + } + } + + "allow watching the same actor twice in different phases" in { + val phases = Map("a" -> abortingPhase, "b" -> abortingPhase.copy(dependsOn = Set("a"))) + withCoordinatedShutdown(phases) { co => + val actorToWatch = TestProbe() + co.addActorTerminationTask("a", "a1", actorToWatch.ref, Some("stopa")) + // no stop message because it's just going to end up being dead lettered + co.addActorTerminationTask("b", "b1", actorToWatch.ref, None) + val result = co.run(UnknownReason) + actorToWatch.expectMsg("stopa") + actorToWatch.expectNoMessage(100.millis) + actorToWatch.ref ! PoisonPill + result.futureValue should ===(Done) + } + } + } abstract class JvmHookTest { diff --git a/akka-actor/src/main/scala/akka/actor/CoordinatedShutdown.scala b/akka-actor/src/main/scala/akka/actor/CoordinatedShutdown.scala index c4755c9962..f497041741 100644 --- a/akka-actor/src/main/scala/akka/actor/CoordinatedShutdown.scala +++ b/akka-actor/src/main/scala/akka/actor/CoordinatedShutdown.scala @@ -31,7 +31,7 @@ import java.util.function.Supplier import java.util.Optional import akka.annotation.InternalApi -import akka.util.OptionVal +import akka.util.{ OptionVal, Timeout } object CoordinatedShutdown extends ExtensionId[CoordinatedShutdown] with ExtensionIdProvider { @@ -370,6 +370,12 @@ final class CoordinatedShutdown private[akka] ( private val _jvmHooksLatch = new AtomicReference[CountDownLatch](new CountDownLatch(0)) @volatile private var actorSystemJvmHook: OptionVal[Cancellable] = OptionVal.None + /** + * INTERNAL API + */ + private[akka] lazy val terminationWatcher = + system.systemActorOf(CoordinatedShutdownTerminationWatcher.props, "coordinatedShutdownTerminationWatcher") + /** * INTERNAL API */ @@ -421,6 +427,50 @@ final class CoordinatedShutdown private[akka] ( def addTask(phase: String, taskName: String, task: Supplier[CompletionStage[Done]]): Unit = addTask(phase, taskName)(() => task.get().toScala) + /** + * Scala API: Add an actor termination task to a phase. It doesn't remove + * previously added tasks. Tasks added to the same phase are executed in + * parallel without any ordering assumptions. Next phase will not start until + * all tasks of previous phase have been completed. + * + * When executed, this task will first send the given stop message, if defined, + * to the actor, then it will watch the actor, and complete when the actor + * terminates. + * + * Tasks should typically be registered as early as possible after system + * startup. When running the coordinated shutdown tasks that have been registered + * will be performed but tasks that are added too late will not be run. + * It is possible to add a task to a later phase by a task in an earlier phase + * and it will be performed. + */ + def addActorTerminationTask(phase: String, taskName: String, actor: ActorRef, stopMsg: Option[Any]): Unit = + addTask(phase, taskName) { () => + stopMsg.foreach(msg => actor ! msg) + import akka.pattern.ask + // addTask will verify that phases(phase) exists, so this should be safe + implicit val timeout = Timeout(phases(phase).timeout) + (terminationWatcher ? CoordinatedShutdownTerminationWatcher.Watch(actor)).mapTo[Done] + } + + /** + * Java API: Add an actor termination task to a phase. It doesn't remove + * previously added tasks. Tasks added to the same phase are executed in + * parallel without any ordering assumptions. Next phase will not start until + * all tasks of previous phase have been completed. + * + * When executed, this task will first send the given stop message, if defined, + * to the actor, then it will watch the actor, and complete when the actor + * terminates. + * + * Tasks should typically be registered as early as possible after system + * startup. When running the coordinated shutdown tasks that have been registered + * will be performed but tasks that are added too late will not be run. + * It is possible to add a task to a later phase by a task in an earlier phase + * and it will be performed. + */ + def addActorTerminationTask(phase: String, taskName: String, actor: ActorRef, stopMsg: Optional[Any]): Unit = + addActorTerminationTask(phase, taskName, actor, stopMsg.asScala) + /** * The `Reason` for the shutdown as passed to the `run` method. `None` if the shutdown * has not been started. @@ -686,3 +736,32 @@ final class CoordinatedShutdown private[akka] ( addCancellableJvmShutdownHook(hook.run()) } + +/** INTERNAL API */ +@InternalApi +private[akka] object CoordinatedShutdownTerminationWatcher { + final case class Watch(actor: ActorRef) + + def props: Props = Props(new CoordinatedShutdownTerminationWatcher) +} + +/** INTERNAL API */ +@InternalApi +private[akka] class CoordinatedShutdownTerminationWatcher extends Actor { + + import CoordinatedShutdownTerminationWatcher._ + + private var watching = Map.empty[ActorRef, List[ActorRef]].withDefaultValue(Nil) + + override def receive: Receive = { + + case Watch(actor: ActorRef) => + watching += (actor -> (sender() :: watching(actor))) + context.watch(actor) + + case Terminated(actor) => + watching(actor).foreach(_ ! Done) + watching -= actor + } + +} diff --git a/akka-docs/src/main/paradox/actors.md b/akka-docs/src/main/paradox/actors.md index b80797d1d4..e59f4836f5 100644 --- a/akka-docs/src/main/paradox/actors.md +++ b/akka-docs/src/main/paradox/actors.md @@ -1081,6 +1081,16 @@ If tasks are not completed within a configured timeout (see @ref:[reference.conf the next phase will be started anyway. It is possible to configure `recover=off` for a phase to abort the rest of the shutdown process if a task fails or is not completed within the timeout. +In the above example, it may be more convenient to simply stop the actor when it's done shutting down, rather than send back a done message, +and for the shutdown task to not complete until the actor is terminated. A convenience method is provided that adds a task that sends +a message to the actor and then watches its termination: + +Scala +: @@snip [ActorDocSpec.scala](/akka-docs/src/test/scala/docs/actor/ActorDocSpec.scala) { #coordinated-shutdown-addActorTerminationTask } + +Java +: @@snip [ActorDocTest.java](/akka-docs/src/test/java/jdocs/actor/ActorDocTest.java) { #coordinated-shutdown-addActorTerminationTask } + Tasks should typically be registered as early as possible after system startup. When running the coordinated shutdown tasks that have been registered will be performed but tasks that are added too late will not be run. diff --git a/akka-docs/src/test/java/jdocs/actor/ActorDocTest.java b/akka-docs/src/test/java/jdocs/actor/ActorDocTest.java index 2e1b2db412..d4be02bc94 100644 --- a/akka-docs/src/test/java/jdocs/actor/ActorDocTest.java +++ b/akka-docs/src/test/java/jdocs/actor/ActorDocTest.java @@ -875,4 +875,18 @@ public class ActorDocTest extends AbstractJavaTest { // #coordinated-shutdown-run } } + + @Test + public void coordinatedShutdownActorTermination() { + ActorRef someActor = system.actorOf(Props.create(FirstActor.class)); + someActor.tell(PoisonPill.getInstance(), ActorRef.noSender()); + // #coordinated-shutdown-addActorTerminationTask + CoordinatedShutdown.get(system) + .addActorTerminationTask( + CoordinatedShutdown.PhaseBeforeServiceUnbind(), + "someTaskName", + someActor, + Optional.of("stop")); + // #coordinated-shutdown-addActorTerminationTask + } } diff --git a/akka-docs/src/test/scala/docs/actor/ActorDocSpec.scala b/akka-docs/src/test/scala/docs/actor/ActorDocSpec.scala index 58643f90cf..bf4298f6a7 100644 --- a/akka-docs/src/test/scala/docs/actor/ActorDocSpec.scala +++ b/akka-docs/src/test/scala/docs/actor/ActorDocSpec.scala @@ -735,6 +735,18 @@ class ActorDocSpec extends AkkaSpec(""" } //#coordinated-shutdown-addTask + { + val someActor = system.actorOf(Props(classOf[Replier], this)) + someActor ! PoisonPill + //#coordinated-shutdown-addActorTerminationTask + CoordinatedShutdown(system).addActorTerminationTask( + CoordinatedShutdown.PhaseBeforeServiceUnbind, + "someTaskName", + someActor, + Some("stop")) + //#coordinated-shutdown-addActorTerminationTask + } + //#coordinated-shutdown-jvm-hook CoordinatedShutdown(system).addJvmShutdownHook { println("custom JVM shutdown hook...")