From 772db9ce7cd5fd2bfd0b3e9dae2b71632c203df5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Johan=20Andr=C3=A9n?= Date: Thu, 16 Jul 2020 19:00:45 +0200 Subject: [PATCH] addActorTerminationTask fix (#29379) --- .../akka/actor/CoordinatedShutdownSpec.scala | 72 +++++++++++-------- .../actor-termination-task.excludes | 8 +++ .../akka/actor/CoordinatedShutdown.scala | 60 ++++++++++++---- 3 files changed, 97 insertions(+), 43 deletions(-) create mode 100644 akka-actor/src/main/mima-filters/2.6.7.backwards.excludes/actor-termination-task.excludes diff --git a/akka-actor-tests/src/test/scala/akka/actor/CoordinatedShutdownSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/CoordinatedShutdownSpec.scala index 074af76513..23c23c46c8 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/CoordinatedShutdownSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/CoordinatedShutdownSpec.scala @@ -31,7 +31,6 @@ class CoordinatedShutdownSpec // some convenience to make the test readable def phase(dependsOn: String*): Phase = Phase(dependsOn.toSet, timeout = 10.seconds, recover = true, enabled = true) val emptyPhase: Phase = Phase(Set.empty, timeout = 10.seconds, recover = true, enabled = true) - val abortingPhase: Phase = Phase(Set.empty, timeout = 10.seconds, recover = false, enabled = true) private def checkTopologicalSort(phases: Map[String, Phase]): List[String] = { val result = CoordinatedShutdown.topologicalSort(phases) @@ -643,21 +642,41 @@ class CoordinatedShutdownSpec } } - def withCoordinatedShutdown(phases: Map[String, Phase])(block: CoordinatedShutdown => Unit): Unit = { - val co = new CoordinatedShutdown(extSys, phases) + def withCoordinatedShutdown(block: (ActorSystem, CoordinatedShutdown) => Unit): Unit = { + val system = ActorSystem( + s"CoordinatedShutdownSpec-terminated-${System.currentTimeMillis()}", + ConfigFactory.parseString(""" + akka.coordinated-shutdown.phases { + before-actor-system-terminate { + + } + a { + # as late as possible + dependsOn = [before-actor-system-terminate] + timeout=10s + recover=off + enabled=on + } + + b { + dependsOn = [a] + timeout=10s + recover=off + enabled=on + } + } + """)) try { - block(co) + block(system, CoordinatedShutdown(system)) } finally { - watch(co.terminationWatcher) ! PoisonPill - expectTerminated(co.terminationWatcher) + TestKit.shutdownActorSystem(system) } } "support actor termination tasks with a stop message" in { - val phases = Map("a" -> abortingPhase) - withCoordinatedShutdown(phases) { co => - val actorToWatch = TestProbe() - co.addActorTerminationTask("a", "a1", actorToWatch.ref, Some("stop")) + withCoordinatedShutdown { (system, co) => + val actorToWatch = TestProbe()(system) + co.addActorTerminationTask("before-actor-system-terminate", "a1", actorToWatch.ref, Some("stop")) val result = co.run(UnknownReason) actorToWatch.expectMsg("stop") result.isReadyWithin(100.millis) should be(false) @@ -667,10 +686,9 @@ class CoordinatedShutdownSpec } "support actor termination tasks without a stop message" in { - val phases = Map("a" -> abortingPhase) - withCoordinatedShutdown(phases) { co => - val actorToWatch = TestProbe() - co.addActorTerminationTask("a", "a1", actorToWatch.ref, None) + withCoordinatedShutdown { (system, co) => + val actorToWatch = TestProbe()(system) + co.addActorTerminationTask("before-actor-system-terminate", "a1", actorToWatch.ref, None) val result = co.run(UnknownReason) actorToWatch.expectNoMessage(100.millis) result.isReadyWithin(100.millis) should be(false) @@ -680,24 +698,22 @@ class CoordinatedShutdownSpec } "support actor termination tasks for actors that are already shutdown" in { - val phases = Map("a" -> abortingPhase) - withCoordinatedShutdown(phases) { co => - val actorToWatch = TestProbe() + withCoordinatedShutdown { (system, co) => + val actorToWatch = TestProbe()(system) watch(actorToWatch.ref) actorToWatch.ref ! PoisonPill expectTerminated(actorToWatch.ref) - co.addActorTerminationTask("a", "a1", actorToWatch.ref, None) + co.addActorTerminationTask("before-actor-system-terminate", "a1", actorToWatch.ref, None) val result = co.run(UnknownReason) result.futureValue should ===(Done) } } "allow watching the same actor twice in the same phase" in { - val phases = Map("a" -> abortingPhase) - withCoordinatedShutdown(phases) { co => - val actorToWatch = TestProbe() - co.addActorTerminationTask("a", "a1", actorToWatch.ref, Some("stop1")) - co.addActorTerminationTask("a", "a2", actorToWatch.ref, Some("stop2")) + withCoordinatedShutdown { (system, co) => + val actorToWatch = TestProbe()(system) + co.addActorTerminationTask("before-actor-system-terminate", "a1", actorToWatch.ref, Some("stop1")) + co.addActorTerminationTask("before-actor-system-terminate", "a2", actorToWatch.ref, Some("stop2")) val result = co.run(UnknownReason) actorToWatch.expectMsgAllOf("stop1", "stop2") actorToWatch.ref ! PoisonPill @@ -706,12 +722,12 @@ class CoordinatedShutdownSpec } "allow watching the same actor twice in different phases" in { - val phases = Map("a" -> abortingPhase, "b" -> abortingPhase.copy(dependsOn = Set("a"))) - withCoordinatedShutdown(phases) { co => - val actorToWatch = TestProbe() - co.addActorTerminationTask("a", "a1", actorToWatch.ref, Some("stopa")) + withCoordinatedShutdown { (system, co) => + val actorToWatch = TestProbe()(system) + // arbitrary phase that runs before the phase of b1 + co.addActorTerminationTask("service-stop", "a1", actorToWatch.ref, Some("stopa")) // no stop message because it's just going to end up being dead lettered - co.addActorTerminationTask("b", "b1", actorToWatch.ref, None) + co.addActorTerminationTask("before-actor-system-terminate", "b1", actorToWatch.ref, None) val result = co.run(UnknownReason) actorToWatch.expectMsg("stopa") actorToWatch.expectNoMessage(100.millis) diff --git a/akka-actor/src/main/mima-filters/2.6.7.backwards.excludes/actor-termination-task.excludes b/akka-actor/src/main/mima-filters/2.6.7.backwards.excludes/actor-termination-task.excludes new file mode 100644 index 0000000000..f73c2c5e2f --- /dev/null +++ b/akka-actor/src/main/mima-filters/2.6.7.backwards.excludes/actor-termination-task.excludes @@ -0,0 +1,8 @@ +# Actor termination task not working #29355 +# internals only + +ProblemFilters.exclude[MissingTypesProblem]("akka.actor.CoordinatedShutdownTerminationWatcher$Watch$") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.actor.CoordinatedShutdownTerminationWatcher#Watch.apply") +ProblemFilters.exclude[IncompatibleSignatureProblem]("akka.actor.CoordinatedShutdownTerminationWatcher#Watch.unapply") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.actor.CoordinatedShutdownTerminationWatcher#Watch.copy") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.actor.CoordinatedShutdownTerminationWatcher#Watch.this") \ No newline at end of file diff --git a/akka-actor/src/main/scala/akka/actor/CoordinatedShutdown.scala b/akka-actor/src/main/scala/akka/actor/CoordinatedShutdown.scala index 8985cef7da..5cd31e0611 100644 --- a/akka-actor/src/main/scala/akka/actor/CoordinatedShutdown.scala +++ b/akka-actor/src/main/scala/akka/actor/CoordinatedShutdown.scala @@ -18,16 +18,14 @@ import scala.concurrent.duration._ import scala.concurrent.duration.FiniteDuration import scala.util.Try import scala.util.control.NonFatal - import com.typesafe.config.Config import com.typesafe.config.ConfigFactory - import akka.Done import akka.annotation.InternalApi import akka.dispatch.ExecutionContexts import akka.event.Logging import akka.pattern.after -import akka.util.{ OptionVal, Timeout } +import akka.util.OptionVal object CoordinatedShutdown extends ExtensionId[CoordinatedShutdown] with ExtensionIdProvider { @@ -613,10 +611,12 @@ final class CoordinatedShutdown private[akka] ( def addActorTerminationTask(phase: String, taskName: String, actor: ActorRef, stopMsg: Option[Any]): Unit = addTask(phase, taskName) { () => stopMsg.foreach(msg => actor ! msg) - import akka.pattern.ask // addTask will verify that phases(phase) exists, so this should be safe - implicit val timeout = Timeout(phases(phase).timeout) - (terminationWatcher ? CoordinatedShutdownTerminationWatcher.Watch(actor)).mapTo[Done] + val deadline = phases(phase).timeout.fromNow + // Note: we cannot use ask because we cannot spawn actors during shutdown + val completionPromise = Promise[Done]() + terminationWatcher ! CoordinatedShutdownTerminationWatcher.Watch(actor, deadline, completionPromise) + completionPromise.future } /** @@ -878,28 +878,58 @@ final class CoordinatedShutdown private[akka] ( /** INTERNAL API */ @InternalApi private[akka] object CoordinatedShutdownTerminationWatcher { - final case class Watch(actor: ActorRef) + final case class Watch(actor: ActorRef, deadline: Deadline, completionPromise: Promise[Done]) + private final case class WatchedTimedOut(actor: ActorRef, completionPromise: Promise[Done], timeout: FiniteDuration) def props: Props = Props(new CoordinatedShutdownTerminationWatcher) } /** INTERNAL API */ @InternalApi -private[akka] class CoordinatedShutdownTerminationWatcher extends Actor { +private[akka] class CoordinatedShutdownTerminationWatcher extends Actor with Timers { + + private var waitingForActor: Map[ActorRef, Set[Promise[Done]]] = Map.empty + private var alreadySeenTerminated = Set.empty[ActorRef] import CoordinatedShutdownTerminationWatcher._ - private var watching = Map.empty[ActorRef, List[ActorRef]].withDefaultValue(Nil) - override def receive: Receive = { - case Watch(actor: ActorRef) => - watching += (actor -> (sender() :: watching(actor))) - context.watch(actor) + case Watch(actor, deadline, completionPromise) => + if (alreadySeenTerminated(actor)) completionPromise.trySuccess(Done) + else { + val newWaiting = + waitingForActor.get(actor) match { + case Some(alreadyWaiting) => + alreadyWaiting + completionPromise + case None => + context.watch(actor) + Set(completionPromise) + } + waitingForActor = waitingForActor.updated(actor, newWaiting) + timers.startSingleTimer( + completionPromise, + WatchedTimedOut(actor, completionPromise, deadline.time), + deadline.timeLeft) + } case Terminated(actor) => - watching(actor).foreach(_ ! Done) - watching -= actor + for { + waitingPromises <- waitingForActor.get(actor) + waitingPromise <- waitingPromises + } { + if (waitingPromise.trySuccess(Done)) { + timers.cancel(waitingPromise) + } + } + waitingForActor = waitingForActor - actor + alreadySeenTerminated += actor + + case WatchedTimedOut(actor, completionPromise, timeout) => + if (!completionPromise.isCompleted) + completionPromise.tryFailure( + new TimeoutException(s"Actor [$actor] termination timed out after [$timeout] during coordinated shutdown")) + } }