From e49acb7daae041b3fd59632cf792ce1f953aefa5 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Mon, 4 Dec 2017 12:22:59 +0100 Subject: [PATCH] add Reason to CoordinatedShutdown, #24048 --- .../akka/actor/CoordinatedShutdownSpec.scala | 24 +++-- .../akka/actor/CoordinatedShutdown.scala | 102 +++++++++++++++--- .../scala/akka/cluster/ClusterDaemon.scala | 9 +- .../test/scala/akka/cluster/ClusterSpec.scala | 4 +- .../test/java/jdocs/actor/ActorDocTest.java | 3 +- .../test/scala/docs/actor/ActorDocSpec.scala | 2 +- 6 files changed, 116 insertions(+), 28 deletions(-) diff --git a/akka-actor-tests/src/test/scala/akka/actor/CoordinatedShutdownSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/CoordinatedShutdownSpec.scala index 926a35b620..398d3e3769 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/CoordinatedShutdownSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/CoordinatedShutdownSpec.scala @@ -12,6 +12,7 @@ import akka.Done import akka.testkit.{ AkkaSpec, TestKit } import com.typesafe.config.{ Config, ConfigFactory } import akka.actor.CoordinatedShutdown.Phase +import akka.actor.CoordinatedShutdown.UnknownReason import scala.collection.JavaConverters._ import scala.concurrent.Promise @@ -42,6 +43,8 @@ class CoordinatedShutdownSpec extends AkkaSpec { result } + case object CustomReason extends CoordinatedShutdown.Reason + "CoordinatedShutdown" must { "sort phases in topolgical order" in { @@ -151,7 +154,7 @@ class CoordinatedShutdownSpec extends AkkaSpec { testActor ! "C" Future.successful(Done) } - Await.result(co.run(), remainingOrDefault) + Await.result(co.run(UnknownReason), remainingOrDefault) receiveN(4) should ===(List("A", "B", "B", "C")) } @@ -174,8 +177,9 @@ class CoordinatedShutdownSpec extends AkkaSpec { testActor ! "C" Future.successful(Done) } - Await.result(co.run(Some("b")), remainingOrDefault) + Await.result(co.run(CustomReason, Some("b")), remainingOrDefault) receiveN(2) should ===(List("B", "C")) + co.shutdownReason() should ===(Some(CustomReason)) } "only run once" in { @@ -186,11 +190,14 @@ class CoordinatedShutdownSpec extends AkkaSpec { testActor ! "A" Future.successful(Done) } - Await.result(co.run(), remainingOrDefault) + co.shutdownReason() should ===(None) + Await.result(co.run(CustomReason), remainingOrDefault) + co.shutdownReason() should ===(Some(CustomReason)) expectMsg("A") - Await.result(co.run(), remainingOrDefault) + Await.result(co.run(UnknownReason), remainingOrDefault) testActor ! "done" expectMsg("done") // no additional A + co.shutdownReason() should ===(Some(CustomReason)) } "continue after timeout or failure" in { @@ -220,7 +227,7 @@ class CoordinatedShutdownSpec extends AkkaSpec { testActor ! "C" Future.successful(Done) } - Await.result(co.run(), remainingOrDefault) + Await.result(co.run(UnknownReason), remainingOrDefault) expectMsg("A") expectMsg("A") expectMsg("B") @@ -241,7 +248,7 @@ class CoordinatedShutdownSpec extends AkkaSpec { testActor ! "C" Future.successful(Done) } - val result = co.run() + val result = co.run(UnknownReason) expectMsg("B") intercept[TimeoutException] { Await.result(result, remainingOrDefault) @@ -263,7 +270,7 @@ class CoordinatedShutdownSpec extends AkkaSpec { } Future.successful(Done) } - Await.result(co.run(), remainingOrDefault) + Await.result(co.run(UnknownReason), remainingOrDefault) expectMsg("A") expectMsg("B") } @@ -291,8 +298,9 @@ class CoordinatedShutdownSpec extends AkkaSpec { // this must be the last test, since it terminates the ActorSystem "terminate ActorSystem" in { - Await.result(CoordinatedShutdown(system).run(), 10.seconds) should ===(Done) + Await.result(CoordinatedShutdown(system).run(CustomReason), 10.seconds) should ===(Done) system.whenTerminated.isCompleted should ===(true) + CoordinatedShutdown(system).shutdownReason() === (Some(CustomReason)) } "add and remove user JVM hooks with run-by-jvm-shutdown-hook = off, terminate-actor-system = off" in new JvmHookTest { diff --git a/akka-actor/src/main/scala/akka/actor/CoordinatedShutdown.scala b/akka-actor/src/main/scala/akka/actor/CoordinatedShutdown.scala index e8299101c6..b418d15789 100644 --- a/akka-actor/src/main/scala/akka/actor/CoordinatedShutdown.scala +++ b/akka-actor/src/main/scala/akka/actor/CoordinatedShutdown.scala @@ -7,7 +7,6 @@ import scala.concurrent.duration._ import scala.compat.java8.FutureConverters._ import scala.compat.java8.OptionConverters._ import java.util.concurrent._ -import java.util.concurrent.atomic.AtomicBoolean import java.util.concurrent.TimeUnit.MILLISECONDS import scala.concurrent.Future @@ -99,6 +98,54 @@ object CoordinatedShutdown extends ExtensionId[CoordinatedShutdown] with Extensi */ val PhaseActorSystemTerminate = "actor-system-terminate" + /** + * Reason for the shutdown, which can be used by tasks in case they need to do + * different things depending on what caused the shutdown. There are some + * predefined reasons, but external libraries applications may also define + * other reasons. + */ + trait Reason + + /** + * Scala API: The reason for the shutdown was unknown. Needed for backwards compatibility. + */ + case object UnknownReason extends Reason + + /** + * Java API: The reason for the shutdown was unknown. Needed for backwards compatibility. + */ + def unknownReason: Reason = UnknownReason + + /** + * Scala API: The shutdown was initiated by a JVM shutdown hook, e.g. triggered by SIGTERM. + */ + object JvmExitReason extends Reason + + /** + * Java API: The shutdown was initiated by a JVM shutdown hook, e.g. triggered by SIGTERM. + */ + def jvmExitReason: Reason = JvmExitReason + + /** + * Scala API: The shutdown was initiated by Cluster downing. + */ + object ClusterDowningReason extends Reason + + /** + * Java API: The shutdown was initiated by Cluster downing. + */ + def clusterDowningReason: Reason = ClusterDowningReason + + /** + * Scala API: The shutdown was initiated by Cluster leaving. + */ + object ClusterLeavingReason extends Reason + + /** + * Java API: The shutdown was initiated by Cluster leaving. + */ + def clusterLeavingReason: Reason = ClusterLeavingReason + @volatile private var runningJvmHook = false override def get(system: ActorSystem): CoordinatedShutdown = super.get(system) @@ -168,7 +215,7 @@ object CoordinatedShutdown extends ExtensionId[CoordinatedShutdown] with Extensi try { // totalTimeout will be 0 when no tasks registered, so at least 3.seconds val totalTimeout = coord.totalTimeout().max(3.seconds) - Await.ready(coord.run(), totalTimeout) + Await.ready(coord.run(JvmExitReason), totalTimeout) } catch { case NonFatal(e) ⇒ coord.log.warning( @@ -246,6 +293,9 @@ final class CoordinatedShutdown private[akka] ( system: ExtendedActorSystem, phases: Map[String, CoordinatedShutdown.Phase]) extends Extension { import CoordinatedShutdown.Phase + import CoordinatedShutdown.Reason + import CoordinatedShutdown.UnknownReason + import CoordinatedShutdown.JvmExitReason /** INTERNAL API */ private[akka] val log = Logging(system, getClass) @@ -253,7 +303,7 @@ final class CoordinatedShutdown private[akka] ( /** INTERNAL API */ private[akka] val orderedPhases = CoordinatedShutdown.topologicalSort(phases) private val tasks = new ConcurrentHashMap[String, Vector[(String, () ⇒ Future[Done])]] - private val runStarted = new AtomicBoolean(false) + private val runStarted = new AtomicReference[Option[Reason]](None) private val runPromise = Promise[Done]() private var _jvmHooksLatch = new AtomicReference[CountDownLatch](new CountDownLatch(0)) @@ -306,33 +356,51 @@ final class CoordinatedShutdown private[akka] ( def addTask(phase: String, taskName: String, task: Supplier[CompletionStage[Done]]): Unit = addTask(phase, taskName)(() ⇒ task.get().toScala) + /** + * The `Reason` for the shutdown as passed to the `run` method. `None` if the shutdown + * has not been started. + */ + def shutdownReason(): Option[Reason] = runStarted.get() + + /** + * The `Reason` for the shutdown as passed to the `run` method. `Optional.empty` if the shutdown + * has not been started. + */ + def getShutdownReason(): Optional[Reason] = shutdownReason().asJava + /** * Scala API: Run tasks of all phases. The returned * `Future` is completed when all tasks have been completed, * or there is a failure when recovery is disabled. * - * It's safe to call this method multiple times. It will only run the once. + * It's safe to call this method multiple times. It will only run the shutdown sequence once. */ - def run(): Future[Done] = run(None) + def run(reason: Reason): Future[Done] = run(reason, None) + + @deprecated("Use the method with `reason` parameter instead", since = "2.5.8") + def run(): Future[Done] = run(UnknownReason) /** * Java API: Run tasks of all phases. The returned * `CompletionStage` is completed when all tasks have been completed, * or there is a failure when recovery is disabled. * - * It's safe to call this method multiple times. It will only run the once. + * It's safe to call this method multiple times. It will only run the shutdown sequence once. */ - def runAll(): CompletionStage[Done] = run().toJava + def runAll(reason: Reason): CompletionStage[Done] = run(reason).toJava + + @deprecated("Use the method with `reason` parameter instead", since = "2.5.8") + def runAll(): CompletionStage[Done] = runAll(UnknownReason) /** * Scala API: Run tasks of all phases including and after the given phase. * The returned `Future` is completed when all such tasks have been completed, * or there is a failure when recovery is disabled. * - * It's safe to call this method multiple times. It will only run the once. + * It's safe to call this method multiple times. It will only run shutdown sequence once. */ - def run(fromPhase: Option[String]): Future[Done] = { - if (runStarted.compareAndSet(false, true)) { + def run(reason: Reason, fromPhase: Option[String]): Future[Done] = { + if (runStarted.compareAndSet(None, Some(reason))) { import system.dispatcher val debugEnabled = log.isDebugEnabled def loop(remainingPhases: List[String]): Future[Done] = { @@ -409,15 +477,23 @@ final class CoordinatedShutdown private[akka] ( runPromise.future } + @deprecated("Use the method with `reason` parameter instead", since = "2.5.8") + def run(fromPhase: Option[String]): Future[Done] = + run(UnknownReason, fromPhase) + /** * Java API: Run tasks of all phases including and after the given phase. * The returned `CompletionStage` is completed when all such tasks have been completed, * or there is a failure when recovery is disabled. * - * It's safe to call this method multiple times. It will only run once. + * It's safe to call this method multiple times. It will only run the shutdown sequence once. */ + def run(reason: Reason, fromPhase: Optional[String]): CompletionStage[Done] = + run(reason, fromPhase.asScala).toJava + + @deprecated("Use the method with `reason` parameter instead", since = "2.5.8") def run(fromPhase: Optional[String]): CompletionStage[Done] = - run(fromPhase.asScala).toJava + run(UnknownReason, fromPhase) /** * The configured timeout for a given `phase`. @@ -462,7 +538,7 @@ final class CoordinatedShutdown private[akka] ( * shutdown hooks the standard library JVM shutdown hooks APIs are better suited. */ @tailrec def addCancellableJvmShutdownHook[T](hook: ⇒ T): Cancellable = { - if (!runStarted.get) { + if (runStarted.get == None) { val currentLatch = _jvmHooksLatch.get val newLatch = new CountDownLatch(currentLatch.getCount.toInt + 1) if (_jvmHooksLatch.compareAndSet(currentLatch, newLatch)) { diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala index 5db9792920..3faef8a0e1 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala @@ -190,7 +190,8 @@ private[cluster] final class ClusterDaemon(settings: ClusterSettings) extends Ac override def postStop(): Unit = { clusterShutdown.trySuccess(Done) if (Cluster(context.system).settings.RunCoordinatedShutdownWhenDown) { - coordShutdown.run() + // if it was stopped due to leaving CoordinatedShutdown was started earlier + coordShutdown.run(CoordinatedShutdown.ClusterDowningReason) } } @@ -442,7 +443,7 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with "shutdown-after-unsuccessful-join-seed-nodes [{}]. Running CoordinatedShutdown.", seedNodes.mkString(", "), ShutdownAfterUnsuccessfulJoinSeedNodes) joinSeedNodesDeadline = None - CoordinatedShutdown(context.system).run() + CoordinatedShutdown(context.system).run(CoordinatedShutdown.ClusterDowningReason) } def becomeUninitialized(): Unit = { @@ -921,7 +922,7 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with exitingTasksInProgress = true logInfo("Exiting, starting coordinated shutdown") selfExiting.trySuccess(Done) - coordShutdown.run() + coordShutdown.run(CoordinatedShutdown.ClusterLeavingReason) } if (talkback) { @@ -1104,7 +1105,7 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with exitingTasksInProgress = true logInfo("Exiting (leader), starting coordinated shutdown") selfExiting.trySuccess(Done) - coordShutdown.run() + coordShutdown.run(CoordinatedShutdown.ClusterLeavingReason) } exitingConfirmed = exitingConfirmed.filterNot(removedExitingConfirmed) diff --git a/akka-cluster/src/test/scala/akka/cluster/ClusterSpec.scala b/akka-cluster/src/test/scala/akka/cluster/ClusterSpec.scala index 0f8a6efe2d..d438345c1b 100644 --- a/akka-cluster/src/test/scala/akka/cluster/ClusterSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/ClusterSpec.scala @@ -158,7 +158,7 @@ class ClusterSpec extends AkkaSpec(ClusterSpec.config) with ImplicitSender { Cluster(sys2).join(Cluster(sys2).selfAddress) probe.expectMsgType[MemberUp] - CoordinatedShutdown(sys2).run() + CoordinatedShutdown(sys2).run(CoordinatedShutdown.UnknownReason) probe.expectMsgType[MemberLeft] probe.expectMsgType[MemberExited] probe.expectMsgType[MemberRemoved] @@ -187,6 +187,7 @@ class ClusterSpec extends AkkaSpec(ClusterSpec.config) with ImplicitSender { probe.expectMsgType[MemberRemoved] Await.result(sys2.whenTerminated, 10.seconds) Cluster(sys2).isTerminated should ===(true) + CoordinatedShutdown(sys2).shutdownReason() should ===(Some(CoordinatedShutdown.ClusterLeavingReason)) } finally { shutdown(sys2) } @@ -212,6 +213,7 @@ akka.loglevel=DEBUG probe.expectMsgType[MemberRemoved] Await.result(sys3.whenTerminated, 10.seconds) Cluster(sys3).isTerminated should ===(true) + CoordinatedShutdown(sys3).shutdownReason() should ===(Some(CoordinatedShutdown.ClusterDowningReason)) } finally { shutdown(sys3) } diff --git a/akka-docs/src/test/java/jdocs/actor/ActorDocTest.java b/akka-docs/src/test/java/jdocs/actor/ActorDocTest.java index 16ac702e52..dce77d183a 100644 --- a/akka-docs/src/test/java/jdocs/actor/ActorDocTest.java +++ b/akka-docs/src/test/java/jdocs/actor/ActorDocTest.java @@ -832,7 +832,8 @@ public class ActorDocTest extends AbstractJavaTest { // don't run this if (false) { //#coordinated-shutdown-run - CompletionStage done = CoordinatedShutdown.get(system).runAll(); + CompletionStage done = CoordinatedShutdown.get(system).runAll( + CoordinatedShutdown.unknownReason()); //#coordinated-shutdown-run } } diff --git a/akka-docs/src/test/scala/docs/actor/ActorDocSpec.scala b/akka-docs/src/test/scala/docs/actor/ActorDocSpec.scala index bd8584ac38..f22d1421a6 100644 --- a/akka-docs/src/test/scala/docs/actor/ActorDocSpec.scala +++ b/akka-docs/src/test/scala/docs/actor/ActorDocSpec.scala @@ -756,7 +756,7 @@ class ActorDocSpec extends AkkaSpec(""" // don't run this def dummy(): Unit = { //#coordinated-shutdown-run - val done: Future[Done] = CoordinatedShutdown(system).run() + val done: Future[Done] = CoordinatedShutdown(system).run(CoordinatedShutdown.UnknownReason) //#coordinated-shutdown-run } }