Ability to disable coordinated shutdown phases #24477

This commit is contained in:
Johan Andrén 2018-02-01 17:32:10 +01:00 committed by GitHub
parent e4397db44a
commit 3b54f238ea
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 64 additions and 19 deletions

View file

@ -9,7 +9,7 @@ import scala.concurrent.duration._
import scala.concurrent.Await import scala.concurrent.Await
import scala.concurrent.Future import scala.concurrent.Future
import akka.Done import akka.Done
import akka.testkit.{ AkkaSpec, TestKit } import akka.testkit.{ AkkaSpec, EventFilter, TestKit }
import com.typesafe.config.{ Config, ConfigFactory } import com.typesafe.config.{ Config, ConfigFactory }
import akka.actor.CoordinatedShutdown.Phase import akka.actor.CoordinatedShutdown.Phase
import akka.actor.CoordinatedShutdown.UnknownReason import akka.actor.CoordinatedShutdown.UnknownReason
@ -18,20 +18,24 @@ import scala.collection.JavaConverters._
import scala.concurrent.Promise import scala.concurrent.Promise
import java.util.concurrent.TimeoutException import java.util.concurrent.TimeoutException
class CoordinatedShutdownSpec extends AkkaSpec { class CoordinatedShutdownSpec extends AkkaSpec(ConfigFactory.parseString(
"""
akka.loglevel=INFO
akka.loggers = ["akka.testkit.TestEventListener"]
""")) {
def extSys = system.asInstanceOf[ExtendedActorSystem] def extSys = system.asInstanceOf[ExtendedActorSystem]
// some convenience to make the test readable // some convenience to make the test readable
def phase(dependsOn: String*): Phase = Phase(dependsOn.toSet, timeout = 10.seconds, recover = true) def phase(dependsOn: String*): Phase = Phase(dependsOn.toSet, timeout = 10.seconds, recover = true, enabled = true)
val emptyPhase: Phase = Phase(Set.empty, timeout = 10.seconds, recover = true) val emptyPhase: Phase = Phase(Set.empty, timeout = 10.seconds, recover = true, enabled = true)
private def checkTopologicalSort(phases: Map[String, Phase]): List[String] = { private def checkTopologicalSort(phases: Map[String, Phase]): List[String] = {
val result = CoordinatedShutdown.topologicalSort(phases) val result = CoordinatedShutdown.topologicalSort(phases)
result.zipWithIndex.foreach { result.zipWithIndex.foreach {
case (phase, i) case (phase, i)
phases.get(phase) match { phases.get(phase) match {
case Some(Phase(dependsOn, _, _)) case Some(Phase(dependsOn, _, _, _))
dependsOn.foreach { depPhase dependsOn.foreach { depPhase
withClue(s"phase [$phase] depends on [$depPhase] but was ordered before it in topological sort result $result") { withClue(s"phase [$phase] depends on [$depPhase] but was ordered before it in topological sort result $result") {
i should be > result.indexOf(depPhase) i should be > result.indexOf(depPhase)
@ -47,7 +51,7 @@ class CoordinatedShutdownSpec extends AkkaSpec {
"CoordinatedShutdown" must { "CoordinatedShutdown" must {
"sort phases in topolgical order" in { "sort phases in topological order" in {
checkTopologicalSort(Map.empty) should ===(Nil) checkTopologicalSort(Map.empty) should ===(Nil)
checkTopologicalSort(Map( checkTopologicalSort(Map(
@ -204,7 +208,7 @@ class CoordinatedShutdownSpec extends AkkaSpec {
import system.dispatcher import system.dispatcher
val phases = Map( val phases = Map(
"a" emptyPhase, "a" emptyPhase,
"b" Phase(dependsOn = Set("a"), timeout = 100.millis, recover = true), "b" Phase(dependsOn = Set("a"), timeout = 100.millis, recover = true, enabled = true),
"c" phase("b", "a")) "c" phase("b", "a"))
val co = new CoordinatedShutdown(extSys, phases) val co = new CoordinatedShutdown(extSys, phases)
co.addTask("a", "a1") { () co.addTask("a", "a1") { ()
@ -227,7 +231,11 @@ class CoordinatedShutdownSpec extends AkkaSpec {
testActor ! "C" testActor ! "C"
Future.successful(Done) Future.successful(Done)
} }
Await.result(co.run(UnknownReason), remainingOrDefault) EventFilter.warning(message = "Task [a1] failed in phase [a]: boom", occurrences = 1).intercept {
EventFilter.warning(message = "Coordinated shutdown phase [b] timed out after 100 milliseconds", occurrences = 1).intercept {
Await.result(co.run(UnknownReason), remainingOrDefault)
}
}
expectMsg("A") expectMsg("A")
expectMsg("A") expectMsg("A")
expectMsg("B") expectMsg("B")
@ -237,7 +245,8 @@ class CoordinatedShutdownSpec extends AkkaSpec {
"abort if recover=off" in { "abort if recover=off" in {
import system.dispatcher import system.dispatcher
val phases = Map( val phases = Map(
"b" Phase(dependsOn = Set("a"), timeout = 100.millis, recover = false), "a" emptyPhase,
"b" Phase(dependsOn = Set("a"), timeout = 100.millis, recover = false, enabled = true),
"c" phase("b", "a")) "c" phase("b", "a"))
val co = new CoordinatedShutdown(extSys, phases) val co = new CoordinatedShutdown(extSys, phases)
co.addTask("b", "b1") { () co.addTask("b", "b1") { ()
@ -256,6 +265,27 @@ class CoordinatedShutdownSpec extends AkkaSpec {
expectNoMsg(200.millis) // C not run expectNoMsg(200.millis) // C not run
} }
"skip tasks in disabled phase" in {
val phases = Map(
"a" emptyPhase,
"b" Phase(dependsOn = Set("a"), timeout = 100.millis, recover = false, enabled = false),
"c" phase("b", "a"))
val co = new CoordinatedShutdown(extSys, phases)
co.addTask("b", "b1") { ()
testActor ! "B"
Future.failed(new RuntimeException("Was expected to not be executed"))
}
co.addTask("c", "c1") { ()
testActor ! "C"
Future.successful(Done)
}
EventFilter.info(start = "Phase [b] disabled through configuration", occurrences = 1).intercept {
val result = co.run(UnknownReason)
expectMsg("C")
result.futureValue should ===(Done)
}
}
"be possible to add tasks in later phase from task in earlier phase" in { "be possible to add tasks in later phase from task in earlier phase" in {
import system.dispatcher import system.dispatcher
val phases = Map( val phases = Map(
@ -291,9 +321,9 @@ class CoordinatedShutdownSpec extends AkkaSpec {
} }
} }
""")) should ===(Map( """)) should ===(Map(
"a" Phase(dependsOn = Set.empty, timeout = 10.seconds, recover = true), "a" Phase(dependsOn = Set.empty, timeout = 10.seconds, recover = true, enabled = true),
"b" Phase(dependsOn = Set("a"), timeout = 15.seconds, recover = true), "b" Phase(dependsOn = Set("a"), timeout = 15.seconds, recover = true, enabled = true),
"c" Phase(dependsOn = Set("a", "b"), timeout = 10.seconds, recover = false))) "c" Phase(dependsOn = Set("a", "b"), timeout = 10.seconds, recover = false, enabled = true)))
} }
// this must be the last test, since it terminates the ActorSystem // this must be the last test, since it terminates the ActorSystem

View file

@ -0,0 +1,5 @@
# Disable phases in Coordinated Shutdown
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.actor.CoordinatedShutdown#Phase.copy")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.actor.CoordinatedShutdown#Phase.this")
ProblemFilters.exclude[MissingTypesProblem]("akka.actor.CoordinatedShutdown$Phase$")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.actor.CoordinatedShutdown#Phase.apply")

View file

@ -1077,6 +1077,10 @@ akka {
# - timeout=15s: Override the default-phase-timeout for this phase. # - timeout=15s: Override the default-phase-timeout for this phase.
# - recover=off: If the phase fails the shutdown is aborted # - recover=off: If the phase fails the shutdown is aborted
# and depending phases will not be executed. # and depending phases will not be executed.
# - enabled=off: Skip all tasks registered in this phase. DO NOT use
# this to disable phases unless you are absolutely sure what the
# consequences are. Many of the built in tasks depend on other tasks
# having been executed in earlier phases and may break if those are disabled.
# depends-on=[]: Run the phase after the given phases # depends-on=[]: Run the phase after the given phases
phases { phases {

View file

@ -230,7 +230,7 @@ object CoordinatedShutdown extends ExtensionId[CoordinatedShutdown] with Extensi
/** /**
* INTERNAL API * INTERNAL API
*/ */
private[akka] final case class Phase(dependsOn: Set[String], timeout: FiniteDuration, recover: Boolean) private[akka] final case class Phase(dependsOn: Set[String], timeout: FiniteDuration, recover: Boolean, enabled: Boolean)
/** /**
* INTERNAL API * INTERNAL API
@ -242,6 +242,7 @@ object CoordinatedShutdown extends ExtensionId[CoordinatedShutdown] with Extensi
val defaultPhaseConfig = ConfigFactory.parseString(s""" val defaultPhaseConfig = ConfigFactory.parseString(s"""
timeout = $defaultPhaseTimeout timeout = $defaultPhaseTimeout
recover = true recover = true
enabled = true
depends-on = [] depends-on = []
""") """)
phasesConf.root.unwrapped.asScala.toMap.map { phasesConf.root.unwrapped.asScala.toMap.map {
@ -250,7 +251,8 @@ object CoordinatedShutdown extends ExtensionId[CoordinatedShutdown] with Extensi
val dependsOn = c.getStringList("depends-on").asScala.toSet val dependsOn = c.getStringList("depends-on").asScala.toSet
val timeout = c.getDuration("timeout", MILLISECONDS).millis val timeout = c.getDuration("timeout", MILLISECONDS).millis
val recover = c.getBoolean("recover") val recover = c.getBoolean("recover")
k Phase(dependsOn, timeout, recover) val enabled = c.getBoolean("enabled")
k Phase(dependsOn, timeout, recover, enabled)
case (k, v) case (k, v)
throw new IllegalArgumentException(s"Expected object value for [$k], got [$v]") throw new IllegalArgumentException(s"Expected object value for [$k], got [$v]")
} }
@ -275,8 +277,8 @@ object CoordinatedShutdown extends ExtensionId[CoordinatedShutdown] with Extensi
if (unmarked(u)) { if (unmarked(u)) {
tempMark += u tempMark += u
phases.get(u) match { phases.get(u) match {
case Some(Phase(dependsOn, _, _)) dependsOn.foreach(depthFirstSearch) case Some(p) p.dependsOn.foreach(depthFirstSearch)
case None case None
} }
unmarked -= u // permanent mark unmarked -= u // permanent mark
tempMark -= u tempMark -= u
@ -292,10 +294,8 @@ object CoordinatedShutdown extends ExtensionId[CoordinatedShutdown] with Extensi
final class CoordinatedShutdown private[akka] ( final class CoordinatedShutdown private[akka] (
system: ExtendedActorSystem, system: ExtendedActorSystem,
phases: Map[String, CoordinatedShutdown.Phase]) extends Extension { phases: Map[String, CoordinatedShutdown.Phase]) extends Extension {
import CoordinatedShutdown.Phase
import CoordinatedShutdown.Reason import CoordinatedShutdown.Reason
import CoordinatedShutdown.UnknownReason import CoordinatedShutdown.UnknownReason
import CoordinatedShutdown.JvmExitReason
/** INTERNAL API */ /** INTERNAL API */
private[akka] val log = Logging(system, getClass) private[akka] val log = Logging(system, getClass)
@ -406,6 +406,12 @@ final class CoordinatedShutdown private[akka] (
def loop(remainingPhases: List[String]): Future[Done] = { def loop(remainingPhases: List[String]): Future[Done] = {
remainingPhases match { remainingPhases match {
case Nil Future.successful(Done) case Nil Future.successful(Done)
case phase :: remaining if !phases(phase).enabled
tasks.get(phase) match {
case null // This pretty much is ok as there are no tasks
case tasks log.info("Phase [{}] disabled through configuration, skipping [{}] tasks", phase, tasks.size)
}
loop(remaining)
case phase :: remaining case phase :: remaining
val phaseResult = tasks.get(phase) match { val phaseResult = tasks.get(phase) match {
case null case null
@ -502,7 +508,7 @@ final class CoordinatedShutdown private[akka] (
*/ */
def timeout(phase: String): FiniteDuration = def timeout(phase: String): FiniteDuration =
phases.get(phase) match { phases.get(phase) match {
case Some(Phase(_, timeout, _)) timeout case Some(p) p.timeout
case None case None
throw new IllegalArgumentException(s"Unknown phase [$phase]. All phases must be defined in configuration") throw new IllegalArgumentException(s"Unknown phase [$phase]. All phases must be defined in configuration")
} }