addActorTerminationTask fix (#29379)

This commit is contained in:
Johan Andrén 2020-07-16 19:00:45 +02:00 committed by GitHub
parent 1fe573d7ef
commit 772db9ce7c
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 97 additions and 43 deletions

View file

@ -31,7 +31,6 @@ class CoordinatedShutdownSpec
// some convenience to make the test readable
def phase(dependsOn: String*): Phase = Phase(dependsOn.toSet, timeout = 10.seconds, recover = true, enabled = true)
val emptyPhase: Phase = Phase(Set.empty, timeout = 10.seconds, recover = true, enabled = true)
val abortingPhase: Phase = Phase(Set.empty, timeout = 10.seconds, recover = false, enabled = true)
private def checkTopologicalSort(phases: Map[String, Phase]): List[String] = {
val result = CoordinatedShutdown.topologicalSort(phases)
@ -643,21 +642,41 @@ class CoordinatedShutdownSpec
}
}
def withCoordinatedShutdown(phases: Map[String, Phase])(block: CoordinatedShutdown => Unit): Unit = {
val co = new CoordinatedShutdown(extSys, phases)
def withCoordinatedShutdown(block: (ActorSystem, CoordinatedShutdown) => Unit): Unit = {
val system = ActorSystem(
s"CoordinatedShutdownSpec-terminated-${System.currentTimeMillis()}",
ConfigFactory.parseString("""
akka.coordinated-shutdown.phases {
before-actor-system-terminate {
}
a {
# as late as possible
dependsOn = [before-actor-system-terminate]
timeout=10s
recover=off
enabled=on
}
b {
dependsOn = [a]
timeout=10s
recover=off
enabled=on
}
}
"""))
try {
block(co)
block(system, CoordinatedShutdown(system))
} finally {
watch(co.terminationWatcher) ! PoisonPill
expectTerminated(co.terminationWatcher)
TestKit.shutdownActorSystem(system)
}
}
"support actor termination tasks with a stop message" in {
val phases = Map("a" -> abortingPhase)
withCoordinatedShutdown(phases) { co =>
val actorToWatch = TestProbe()
co.addActorTerminationTask("a", "a1", actorToWatch.ref, Some("stop"))
withCoordinatedShutdown { (system, co) =>
val actorToWatch = TestProbe()(system)
co.addActorTerminationTask("before-actor-system-terminate", "a1", actorToWatch.ref, Some("stop"))
val result = co.run(UnknownReason)
actorToWatch.expectMsg("stop")
result.isReadyWithin(100.millis) should be(false)
@ -667,10 +686,9 @@ class CoordinatedShutdownSpec
}
"support actor termination tasks without a stop message" in {
val phases = Map("a" -> abortingPhase)
withCoordinatedShutdown(phases) { co =>
val actorToWatch = TestProbe()
co.addActorTerminationTask("a", "a1", actorToWatch.ref, None)
withCoordinatedShutdown { (system, co) =>
val actorToWatch = TestProbe()(system)
co.addActorTerminationTask("before-actor-system-terminate", "a1", actorToWatch.ref, None)
val result = co.run(UnknownReason)
actorToWatch.expectNoMessage(100.millis)
result.isReadyWithin(100.millis) should be(false)
@ -680,24 +698,22 @@ class CoordinatedShutdownSpec
}
"support actor termination tasks for actors that are already shutdown" in {
val phases = Map("a" -> abortingPhase)
withCoordinatedShutdown(phases) { co =>
val actorToWatch = TestProbe()
withCoordinatedShutdown { (system, co) =>
val actorToWatch = TestProbe()(system)
watch(actorToWatch.ref)
actorToWatch.ref ! PoisonPill
expectTerminated(actorToWatch.ref)
co.addActorTerminationTask("a", "a1", actorToWatch.ref, None)
co.addActorTerminationTask("before-actor-system-terminate", "a1", actorToWatch.ref, None)
val result = co.run(UnknownReason)
result.futureValue should ===(Done)
}
}
"allow watching the same actor twice in the same phase" in {
val phases = Map("a" -> abortingPhase)
withCoordinatedShutdown(phases) { co =>
val actorToWatch = TestProbe()
co.addActorTerminationTask("a", "a1", actorToWatch.ref, Some("stop1"))
co.addActorTerminationTask("a", "a2", actorToWatch.ref, Some("stop2"))
withCoordinatedShutdown { (system, co) =>
val actorToWatch = TestProbe()(system)
co.addActorTerminationTask("before-actor-system-terminate", "a1", actorToWatch.ref, Some("stop1"))
co.addActorTerminationTask("before-actor-system-terminate", "a2", actorToWatch.ref, Some("stop2"))
val result = co.run(UnknownReason)
actorToWatch.expectMsgAllOf("stop1", "stop2")
actorToWatch.ref ! PoisonPill
@ -706,12 +722,12 @@ class CoordinatedShutdownSpec
}
"allow watching the same actor twice in different phases" in {
val phases = Map("a" -> abortingPhase, "b" -> abortingPhase.copy(dependsOn = Set("a")))
withCoordinatedShutdown(phases) { co =>
val actorToWatch = TestProbe()
co.addActorTerminationTask("a", "a1", actorToWatch.ref, Some("stopa"))
withCoordinatedShutdown { (system, co) =>
val actorToWatch = TestProbe()(system)
// arbitrary phase that runs before the phase of b1
co.addActorTerminationTask("service-stop", "a1", actorToWatch.ref, Some("stopa"))
// no stop message because it's just going to end up being dead lettered
co.addActorTerminationTask("b", "b1", actorToWatch.ref, None)
co.addActorTerminationTask("before-actor-system-terminate", "b1", actorToWatch.ref, None)
val result = co.run(UnknownReason)
actorToWatch.expectMsg("stopa")
actorToWatch.expectNoMessage(100.millis)

View file

@ -0,0 +1,8 @@
# Actor termination task not working #29355
# internals only
ProblemFilters.exclude[MissingTypesProblem]("akka.actor.CoordinatedShutdownTerminationWatcher$Watch$")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.actor.CoordinatedShutdownTerminationWatcher#Watch.apply")
ProblemFilters.exclude[IncompatibleSignatureProblem]("akka.actor.CoordinatedShutdownTerminationWatcher#Watch.unapply")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.actor.CoordinatedShutdownTerminationWatcher#Watch.copy")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.actor.CoordinatedShutdownTerminationWatcher#Watch.this")

View file

@ -18,16 +18,14 @@ import scala.concurrent.duration._
import scala.concurrent.duration.FiniteDuration
import scala.util.Try
import scala.util.control.NonFatal
import com.typesafe.config.Config
import com.typesafe.config.ConfigFactory
import akka.Done
import akka.annotation.InternalApi
import akka.dispatch.ExecutionContexts
import akka.event.Logging
import akka.pattern.after
import akka.util.{ OptionVal, Timeout }
import akka.util.OptionVal
object CoordinatedShutdown extends ExtensionId[CoordinatedShutdown] with ExtensionIdProvider {
@ -613,10 +611,12 @@ final class CoordinatedShutdown private[akka] (
def addActorTerminationTask(phase: String, taskName: String, actor: ActorRef, stopMsg: Option[Any]): Unit =
addTask(phase, taskName) { () =>
stopMsg.foreach(msg => actor ! msg)
import akka.pattern.ask
// addTask will verify that phases(phase) exists, so this should be safe
implicit val timeout = Timeout(phases(phase).timeout)
(terminationWatcher ? CoordinatedShutdownTerminationWatcher.Watch(actor)).mapTo[Done]
val deadline = phases(phase).timeout.fromNow
// Note: we cannot use ask because we cannot spawn actors during shutdown
val completionPromise = Promise[Done]()
terminationWatcher ! CoordinatedShutdownTerminationWatcher.Watch(actor, deadline, completionPromise)
completionPromise.future
}
/**
@ -878,28 +878,58 @@ final class CoordinatedShutdown private[akka] (
/** INTERNAL API */
@InternalApi
private[akka] object CoordinatedShutdownTerminationWatcher {
final case class Watch(actor: ActorRef)
final case class Watch(actor: ActorRef, deadline: Deadline, completionPromise: Promise[Done])
private final case class WatchedTimedOut(actor: ActorRef, completionPromise: Promise[Done], timeout: FiniteDuration)
def props: Props = Props(new CoordinatedShutdownTerminationWatcher)
}
/** INTERNAL API */
@InternalApi
private[akka] class CoordinatedShutdownTerminationWatcher extends Actor {
private[akka] class CoordinatedShutdownTerminationWatcher extends Actor with Timers {
private var waitingForActor: Map[ActorRef, Set[Promise[Done]]] = Map.empty
private var alreadySeenTerminated = Set.empty[ActorRef]
import CoordinatedShutdownTerminationWatcher._
private var watching = Map.empty[ActorRef, List[ActorRef]].withDefaultValue(Nil)
override def receive: Receive = {
case Watch(actor: ActorRef) =>
watching += (actor -> (sender() :: watching(actor)))
context.watch(actor)
case Watch(actor, deadline, completionPromise) =>
if (alreadySeenTerminated(actor)) completionPromise.trySuccess(Done)
else {
val newWaiting =
waitingForActor.get(actor) match {
case Some(alreadyWaiting) =>
alreadyWaiting + completionPromise
case None =>
context.watch(actor)
Set(completionPromise)
}
waitingForActor = waitingForActor.updated(actor, newWaiting)
timers.startSingleTimer(
completionPromise,
WatchedTimedOut(actor, completionPromise, deadline.time),
deadline.timeLeft)
}
case Terminated(actor) =>
watching(actor).foreach(_ ! Done)
watching -= actor
for {
waitingPromises <- waitingForActor.get(actor)
waitingPromise <- waitingPromises
} {
if (waitingPromise.trySuccess(Done)) {
timers.cancel(waitingPromise)
}
}
waitingForActor = waitingForActor - actor
alreadySeenTerminated += actor
case WatchedTimedOut(actor, completionPromise, timeout) =>
if (!completionPromise.isCompleted)
completionPromise.tryFailure(
new TimeoutException(s"Actor [$actor] termination timed out after [$timeout] during coordinated shutdown"))
}
}