diff --git a/akka-actor-tests/src/test/scala/akka/actor/CoordinatedShutdownSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/CoordinatedShutdownSpec.scala index d837cd6116..4b155f0458 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/CoordinatedShutdownSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/CoordinatedShutdownSpec.scala @@ -9,7 +9,7 @@ import scala.concurrent.duration._ import scala.concurrent.Await import scala.concurrent.Future import akka.Done -import akka.testkit.{ AkkaSpec, TestKit } +import akka.testkit.{ AkkaSpec, EventFilter, TestKit } import com.typesafe.config.{ Config, ConfigFactory } import akka.actor.CoordinatedShutdown.Phase import akka.actor.CoordinatedShutdown.UnknownReason @@ -18,20 +18,24 @@ import scala.collection.JavaConverters._ import scala.concurrent.Promise import java.util.concurrent.TimeoutException -class CoordinatedShutdownSpec extends AkkaSpec { +class CoordinatedShutdownSpec extends AkkaSpec(ConfigFactory.parseString( + """ + akka.loglevel=INFO + akka.loggers = ["akka.testkit.TestEventListener"] + """)) { def extSys = system.asInstanceOf[ExtendedActorSystem] // some convenience to make the test readable - def phase(dependsOn: String*): Phase = Phase(dependsOn.toSet, timeout = 10.seconds, recover = true) - val emptyPhase: Phase = Phase(Set.empty, timeout = 10.seconds, recover = true) + def phase(dependsOn: String*): Phase = Phase(dependsOn.toSet, timeout = 10.seconds, recover = true, enabled = true) + val emptyPhase: Phase = Phase(Set.empty, timeout = 10.seconds, recover = true, enabled = true) private def checkTopologicalSort(phases: Map[String, Phase]): List[String] = { val result = CoordinatedShutdown.topologicalSort(phases) result.zipWithIndex.foreach { case (phase, i) ⇒ phases.get(phase) match { - case Some(Phase(dependsOn, _, _)) ⇒ + case Some(Phase(dependsOn, _, _, _)) ⇒ dependsOn.foreach { depPhase ⇒ withClue(s"phase [$phase] depends on [$depPhase] but was ordered before it in topological sort result $result") { i should be > result.indexOf(depPhase) @@ -47,7 +51,7 @@ class CoordinatedShutdownSpec extends AkkaSpec { "CoordinatedShutdown" must { - "sort phases in topolgical order" in { + "sort phases in topological order" in { checkTopologicalSort(Map.empty) should ===(Nil) checkTopologicalSort(Map( @@ -204,7 +208,7 @@ class CoordinatedShutdownSpec extends AkkaSpec { import system.dispatcher val phases = Map( "a" → emptyPhase, - "b" → Phase(dependsOn = Set("a"), timeout = 100.millis, recover = true), + "b" → Phase(dependsOn = Set("a"), timeout = 100.millis, recover = true, enabled = true), "c" → phase("b", "a")) val co = new CoordinatedShutdown(extSys, phases) co.addTask("a", "a1") { () ⇒ @@ -227,7 +231,11 @@ class CoordinatedShutdownSpec extends AkkaSpec { testActor ! "C" Future.successful(Done) } - Await.result(co.run(UnknownReason), remainingOrDefault) + EventFilter.warning(message = "Task [a1] failed in phase [a]: boom", occurrences = 1).intercept { + EventFilter.warning(message = "Coordinated shutdown phase [b] timed out after 100 milliseconds", occurrences = 1).intercept { + Await.result(co.run(UnknownReason), remainingOrDefault) + } + } expectMsg("A") expectMsg("A") expectMsg("B") @@ -237,7 +245,8 @@ class CoordinatedShutdownSpec extends AkkaSpec { "abort if recover=off" in { import system.dispatcher val phases = Map( - "b" → Phase(dependsOn = Set("a"), timeout = 100.millis, recover = false), + "a" → emptyPhase, + "b" → Phase(dependsOn = Set("a"), timeout = 100.millis, recover = false, enabled = true), "c" → phase("b", "a")) val co = new CoordinatedShutdown(extSys, phases) co.addTask("b", "b1") { () ⇒ @@ -256,6 +265,27 @@ class CoordinatedShutdownSpec extends AkkaSpec { expectNoMsg(200.millis) // C not run } + "skip tasks in disabled phase" in { + val phases = Map( + "a" → emptyPhase, + "b" → Phase(dependsOn = Set("a"), timeout = 100.millis, recover = false, enabled = false), + "c" → phase("b", "a")) + val co = new CoordinatedShutdown(extSys, phases) + co.addTask("b", "b1") { () ⇒ + testActor ! "B" + Future.failed(new RuntimeException("Was expected to not be executed")) + } + co.addTask("c", "c1") { () ⇒ + testActor ! "C" + Future.successful(Done) + } + EventFilter.info(start = "Phase [b] disabled through configuration", occurrences = 1).intercept { + val result = co.run(UnknownReason) + expectMsg("C") + result.futureValue should ===(Done) + } + } + "be possible to add tasks in later phase from task in earlier phase" in { import system.dispatcher val phases = Map( @@ -291,9 +321,9 @@ class CoordinatedShutdownSpec extends AkkaSpec { } } """)) should ===(Map( - "a" → Phase(dependsOn = Set.empty, timeout = 10.seconds, recover = true), - "b" → Phase(dependsOn = Set("a"), timeout = 15.seconds, recover = true), - "c" → Phase(dependsOn = Set("a", "b"), timeout = 10.seconds, recover = false))) + "a" → Phase(dependsOn = Set.empty, timeout = 10.seconds, recover = true, enabled = true), + "b" → Phase(dependsOn = Set("a"), timeout = 15.seconds, recover = true, enabled = true), + "c" → Phase(dependsOn = Set("a", "b"), timeout = 10.seconds, recover = false, enabled = true))) } // this must be the last test, since it terminates the ActorSystem diff --git a/akka-actor/src/main/mima-filters/2.5.10.backwards.excludes b/akka-actor/src/main/mima-filters/2.5.10.backwards.excludes new file mode 100644 index 0000000000..5a925561ef --- /dev/null +++ b/akka-actor/src/main/mima-filters/2.5.10.backwards.excludes @@ -0,0 +1,5 @@ +# Disable phases in Coordinated Shutdown +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.actor.CoordinatedShutdown#Phase.copy") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.actor.CoordinatedShutdown#Phase.this") +ProblemFilters.exclude[MissingTypesProblem]("akka.actor.CoordinatedShutdown$Phase$") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.actor.CoordinatedShutdown#Phase.apply") \ No newline at end of file diff --git a/akka-actor/src/main/resources/reference.conf b/akka-actor/src/main/resources/reference.conf index 17a28108b3..898b1b198b 100644 --- a/akka-actor/src/main/resources/reference.conf +++ b/akka-actor/src/main/resources/reference.conf @@ -1077,6 +1077,10 @@ akka { # - timeout=15s: Override the default-phase-timeout for this phase. # - recover=off: If the phase fails the shutdown is aborted # and depending phases will not be executed. + # - enabled=off: Skip all tasks registered in this phase. DO NOT use + # this to disable phases unless you are absolutely sure what the + # consequences are. Many of the built in tasks depend on other tasks + # having been executed in earlier phases and may break if those are disabled. # depends-on=[]: Run the phase after the given phases phases { diff --git a/akka-actor/src/main/scala/akka/actor/CoordinatedShutdown.scala b/akka-actor/src/main/scala/akka/actor/CoordinatedShutdown.scala index 28cc646fd6..a9512134bb 100644 --- a/akka-actor/src/main/scala/akka/actor/CoordinatedShutdown.scala +++ b/akka-actor/src/main/scala/akka/actor/CoordinatedShutdown.scala @@ -230,7 +230,7 @@ object CoordinatedShutdown extends ExtensionId[CoordinatedShutdown] with Extensi /** * INTERNAL API */ - private[akka] final case class Phase(dependsOn: Set[String], timeout: FiniteDuration, recover: Boolean) + private[akka] final case class Phase(dependsOn: Set[String], timeout: FiniteDuration, recover: Boolean, enabled: Boolean) /** * INTERNAL API @@ -242,6 +242,7 @@ object CoordinatedShutdown extends ExtensionId[CoordinatedShutdown] with Extensi val defaultPhaseConfig = ConfigFactory.parseString(s""" timeout = $defaultPhaseTimeout recover = true + enabled = true depends-on = [] """) phasesConf.root.unwrapped.asScala.toMap.map { @@ -250,7 +251,8 @@ object CoordinatedShutdown extends ExtensionId[CoordinatedShutdown] with Extensi val dependsOn = c.getStringList("depends-on").asScala.toSet val timeout = c.getDuration("timeout", MILLISECONDS).millis val recover = c.getBoolean("recover") - k → Phase(dependsOn, timeout, recover) + val enabled = c.getBoolean("enabled") + k → Phase(dependsOn, timeout, recover, enabled) case (k, v) ⇒ throw new IllegalArgumentException(s"Expected object value for [$k], got [$v]") } @@ -275,8 +277,8 @@ object CoordinatedShutdown extends ExtensionId[CoordinatedShutdown] with Extensi if (unmarked(u)) { tempMark += u phases.get(u) match { - case Some(Phase(dependsOn, _, _)) ⇒ dependsOn.foreach(depthFirstSearch) - case None ⇒ + case Some(p) ⇒ p.dependsOn.foreach(depthFirstSearch) + case None ⇒ } unmarked -= u // permanent mark tempMark -= u @@ -292,10 +294,8 @@ object CoordinatedShutdown extends ExtensionId[CoordinatedShutdown] with Extensi final class CoordinatedShutdown private[akka] ( system: ExtendedActorSystem, phases: Map[String, CoordinatedShutdown.Phase]) extends Extension { - import CoordinatedShutdown.Phase import CoordinatedShutdown.Reason import CoordinatedShutdown.UnknownReason - import CoordinatedShutdown.JvmExitReason /** INTERNAL API */ private[akka] val log = Logging(system, getClass) @@ -406,6 +406,12 @@ final class CoordinatedShutdown private[akka] ( def loop(remainingPhases: List[String]): Future[Done] = { remainingPhases match { case Nil ⇒ Future.successful(Done) + case phase :: remaining if !phases(phase).enabled ⇒ + tasks.get(phase) match { + case null ⇒ // This pretty much is ok as there are no tasks + case tasks ⇒ log.info("Phase [{}] disabled through configuration, skipping [{}] tasks", phase, tasks.size) + } + loop(remaining) case phase :: remaining ⇒ val phaseResult = tasks.get(phase) match { case null ⇒ @@ -502,7 +508,7 @@ final class CoordinatedShutdown private[akka] ( */ def timeout(phase: String): FiniteDuration = phases.get(phase) match { - case Some(Phase(_, timeout, _)) ⇒ timeout + case Some(p) ⇒ p.timeout case None ⇒ throw new IllegalArgumentException(s"Unknown phase [$phase]. All phases must be defined in configuration") }