diff --git a/akka-actor-tests/src/test/scala/akka/actor/CoordinatedShutdownSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/CoordinatedShutdownSpec.scala index 926a35b620..6a23dcf7fe 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/CoordinatedShutdownSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/CoordinatedShutdownSpec.scala @@ -12,6 +12,7 @@ import akka.Done import akka.testkit.{ AkkaSpec, TestKit } import com.typesafe.config.{ Config, ConfigFactory } import akka.actor.CoordinatedShutdown.Phase +import akka.actor.CoordinatedShutdown.UnknownReason import scala.collection.JavaConverters._ import scala.concurrent.Promise @@ -42,6 +43,8 @@ class CoordinatedShutdownSpec extends AkkaSpec { result } + case object CustomReason extends CoordinatedShutdown.Reason + "CoordinatedShutdown" must { "sort phases in topolgical order" in { @@ -151,7 +154,7 @@ class CoordinatedShutdownSpec extends AkkaSpec { testActor ! "C" Future.successful(Done) } - Await.result(co.run(), remainingOrDefault) + Await.result(co.run(UnknownReason), remainingOrDefault) receiveN(4) should ===(List("A", "B", "B", "C")) } @@ -174,8 +177,9 @@ class CoordinatedShutdownSpec extends AkkaSpec { testActor ! "C" Future.successful(Done) } - Await.result(co.run(Some("b")), remainingOrDefault) + Await.result(co.run(CustomReason, Some("b")), remainingOrDefault) receiveN(2) should ===(List("B", "C")) + co.shutdownReason() should ===(Some(CustomReason)) } "only run once" in { @@ -186,11 +190,14 @@ class CoordinatedShutdownSpec extends AkkaSpec { testActor ! "A" Future.successful(Done) } - Await.result(co.run(), remainingOrDefault) + co.shutdownReason() should ===(None) + Await.result(co.run(CustomReason), remainingOrDefault) + co.shutdownReason() should ===(Some(CustomReason)) expectMsg("A") - Await.result(co.run(), remainingOrDefault) + Await.result(co.run(UnknownReason), remainingOrDefault) testActor ! "done" expectMsg("done") // no additional A + co.shutdownReason() should ===(Some(CustomReason)) } "continue after timeout or failure" in { @@ -220,7 +227,7 @@ class CoordinatedShutdownSpec extends AkkaSpec { testActor ! "C" Future.successful(Done) } - Await.result(co.run(), remainingOrDefault) + Await.result(co.run(UnknownReason), remainingOrDefault) expectMsg("A") expectMsg("A") expectMsg("B") @@ -241,7 +248,7 @@ class CoordinatedShutdownSpec extends AkkaSpec { testActor ! "C" Future.successful(Done) } - val result = co.run() + val result = co.run(UnknownReason) expectMsg("B") intercept[TimeoutException] { Await.result(result, remainingOrDefault) @@ -263,7 +270,7 @@ class CoordinatedShutdownSpec extends AkkaSpec { } Future.successful(Done) } - Await.result(co.run(), remainingOrDefault) + Await.result(co.run(UnknownReason), remainingOrDefault) expectMsg("A") expectMsg("B") } @@ -291,8 +298,9 @@ class CoordinatedShutdownSpec extends AkkaSpec { // this must be the last test, since it terminates the ActorSystem "terminate ActorSystem" in { - Await.result(CoordinatedShutdown(system).run(), 10.seconds) should ===(Done) + Await.result(CoordinatedShutdown(system).run(CustomReason), 10.seconds) should ===(Done) system.whenTerminated.isCompleted should ===(true) + CoordinatedShutdown(system).shutdownReason() === (Some(CustomReason)) } "add and remove user JVM hooks with run-by-jvm-shutdown-hook = off, terminate-actor-system = off" in new JvmHookTest { @@ -307,7 +315,7 @@ class CoordinatedShutdownSpec extends AkkaSpec { val cancellable = CoordinatedShutdown(newSystem).addCancellableJvmShutdownHook( println(s"User JVM hook from ${newSystem.name}") ) - myHooksCount should ===(2) // one user, one from system + myHooksCount should ===(1) // one user, none from system cancellable.cancel() } } @@ -345,7 +353,23 @@ class CoordinatedShutdownSpec extends AkkaSpec { myHooksCount should ===(2) // one user, one from actor system cancellable.cancel() } + } + "add and remove user JVM hooks with run-by-jvm-shutdown-hook = on, akka.jvm-shutdown-hooks = off" in new JvmHookTest { + lazy val systemName = s"CoordinatedShutdownSpec-JvmHooks-4-${System.currentTimeMillis()}" + lazy val systemConfig = ConfigFactory.parseString( + """ + akka.jvm-shutdown-hooks = off + akka.coordinated-shutdown.run-by-jvm-shutdown-hook = on + """) + + def withSystemRunning(newSystem: ActorSystem): Unit = { + val cancellable = CoordinatedShutdown(newSystem).addCancellableJvmShutdownHook( + println(s"User JVM hook from ${newSystem.name}") + ) + myHooksCount should ===(1) // one user, none from actor system + cancellable.cancel() + } } } @@ -358,13 +382,7 @@ class CoordinatedShutdownSpec extends AkkaSpec { def systemConfig: Config def withSystemRunning(system: ActorSystem): Unit - val newSystem = ActorSystem( - systemName, - ConfigFactory.parseString( - """ - akka.coordinated-shutdown.run-by-jvm-shutdown-hook = on - akka.coordinated-shutdown.terminate-actor-system = on - """)) + val newSystem = ActorSystem(systemName, systemConfig) withSystemRunning(newSystem) diff --git a/akka-actor-tests/src/test/scala/akka/config/ConfigSpec.scala b/akka-actor-tests/src/test/scala/akka/config/ConfigSpec.scala index e513135e39..01e7eca52e 100644 --- a/akka-actor-tests/src/test/scala/akka/config/ConfigSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/config/ConfigSpec.scala @@ -45,6 +45,7 @@ class ConfigSpec extends AkkaSpec(ConfigFactory.defaultReference(ActorSystem.fin getBoolean("akka.jvm-exit-on-fatal-error") should ===(true) settings.JvmExitOnFatalError should ===(true) + settings.JvmShutdownHooks should ===(true) getInt("akka.actor.deployment.default.virtual-nodes-factor") should ===(10) settings.DefaultVirtualNodesFactor should ===(10) diff --git a/akka-actor/src/main/resources/reference.conf b/akka-actor/src/main/resources/reference.conf index 18bf520e08..387952b24e 100644 --- a/akka-actor/src/main/resources/reference.conf +++ b/akka-actor/src/main/resources/reference.conf @@ -81,6 +81,12 @@ akka { # such as OutOfMemoryError jvm-exit-on-fatal-error = on + # Akka installs JVM shutdown hooks by default, e.g. in CoordinatedShutdown and Artery. + # This property makes it possible to disable all such hooks if the application itself + # or a higher level framework such as Play prefers to install the JVM shutdown hook and + # terminate the ActorSystem itself, with or without using CoordinatedShutdown. + jvm-shutdown-hooks = on + actor { # Either one of "local", "remote" or "cluster" or the diff --git a/akka-actor/src/main/scala/akka/actor/ActorSystem.scala b/akka-actor/src/main/scala/akka/actor/ActorSystem.scala index c2c47b321a..a4328b54ed 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorSystem.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorSystem.scala @@ -363,6 +363,7 @@ object ActorSystem { final val SchedulerClass: String = getString("akka.scheduler.implementation") final val Daemonicity: Boolean = getBoolean("akka.daemonic") final val JvmExitOnFatalError: Boolean = getBoolean("akka.jvm-exit-on-fatal-error") + final val JvmShutdownHooks: Boolean = getBoolean("akka.jvm-shutdown-hooks") final val DefaultVirtualNodesFactor: Int = getInt("akka.actor.deployment.default.virtual-nodes-factor") diff --git a/akka-actor/src/main/scala/akka/actor/CoordinatedShutdown.scala b/akka-actor/src/main/scala/akka/actor/CoordinatedShutdown.scala index e8299101c6..0ebe1ab233 100644 --- a/akka-actor/src/main/scala/akka/actor/CoordinatedShutdown.scala +++ b/akka-actor/src/main/scala/akka/actor/CoordinatedShutdown.scala @@ -7,7 +7,6 @@ import scala.concurrent.duration._ import scala.compat.java8.FutureConverters._ import scala.compat.java8.OptionConverters._ import java.util.concurrent._ -import java.util.concurrent.atomic.AtomicBoolean import java.util.concurrent.TimeUnit.MILLISECONDS import scala.concurrent.Future @@ -99,6 +98,54 @@ object CoordinatedShutdown extends ExtensionId[CoordinatedShutdown] with Extensi */ val PhaseActorSystemTerminate = "actor-system-terminate" + /** + * Reason for the shutdown, which can be used by tasks in case they need to do + * different things depending on what caused the shutdown. There are some + * predefined reasons, but external libraries applications may also define + * other reasons. + */ + trait Reason + + /** + * Scala API: The reason for the shutdown was unknown. Needed for backwards compatibility. + */ + case object UnknownReason extends Reason + + /** + * Java API: The reason for the shutdown was unknown. Needed for backwards compatibility. + */ + def unknownReason: Reason = UnknownReason + + /** + * Scala API: The shutdown was initiated by a JVM shutdown hook, e.g. triggered by SIGTERM. + */ + object JvmExitReason extends Reason + + /** + * Java API: The shutdown was initiated by a JVM shutdown hook, e.g. triggered by SIGTERM. + */ + def jvmExitReason: Reason = JvmExitReason + + /** + * Scala API: The shutdown was initiated by Cluster downing. + */ + object ClusterDowningReason extends Reason + + /** + * Java API: The shutdown was initiated by Cluster downing. + */ + def clusterDowningReason: Reason = ClusterDowningReason + + /** + * Scala API: The shutdown was initiated by Cluster leaving. + */ + object ClusterLeavingReason extends Reason + + /** + * Java API: The shutdown was initiated by Cluster leaving. + */ + def clusterLeavingReason: Reason = ClusterLeavingReason + @volatile private var runningJvmHook = false override def get(system: ActorSystem): CoordinatedShutdown = super.get(system) @@ -159,7 +206,7 @@ object CoordinatedShutdown extends ExtensionId[CoordinatedShutdown] with Extensi } private def initJvmHook(system: ActorSystem, conf: Config, coord: CoordinatedShutdown): Unit = { - val runByJvmShutdownHook = conf.getBoolean("run-by-jvm-shutdown-hook") + val runByJvmShutdownHook = system.settings.JvmShutdownHooks && conf.getBoolean("run-by-jvm-shutdown-hook") if (runByJvmShutdownHook) { coord.actorSystemJvmHook = OptionVal.Some(coord.addCancellableJvmShutdownHook { runningJvmHook = true // avoid System.exit from PhaseActorSystemTerminate task @@ -168,7 +215,7 @@ object CoordinatedShutdown extends ExtensionId[CoordinatedShutdown] with Extensi try { // totalTimeout will be 0 when no tasks registered, so at least 3.seconds val totalTimeout = coord.totalTimeout().max(3.seconds) - Await.ready(coord.run(), totalTimeout) + Await.ready(coord.run(JvmExitReason), totalTimeout) } catch { case NonFatal(e) ⇒ coord.log.warning( @@ -246,6 +293,9 @@ final class CoordinatedShutdown private[akka] ( system: ExtendedActorSystem, phases: Map[String, CoordinatedShutdown.Phase]) extends Extension { import CoordinatedShutdown.Phase + import CoordinatedShutdown.Reason + import CoordinatedShutdown.UnknownReason + import CoordinatedShutdown.JvmExitReason /** INTERNAL API */ private[akka] val log = Logging(system, getClass) @@ -253,10 +303,10 @@ final class CoordinatedShutdown private[akka] ( /** INTERNAL API */ private[akka] val orderedPhases = CoordinatedShutdown.topologicalSort(phases) private val tasks = new ConcurrentHashMap[String, Vector[(String, () ⇒ Future[Done])]] - private val runStarted = new AtomicBoolean(false) + private val runStarted = new AtomicReference[Option[Reason]](None) private val runPromise = Promise[Done]() - private var _jvmHooksLatch = new AtomicReference[CountDownLatch](new CountDownLatch(0)) + private val _jvmHooksLatch = new AtomicReference[CountDownLatch](new CountDownLatch(0)) @volatile private var actorSystemJvmHook: OptionVal[Cancellable] = OptionVal.None /** @@ -306,33 +356,51 @@ final class CoordinatedShutdown private[akka] ( def addTask(phase: String, taskName: String, task: Supplier[CompletionStage[Done]]): Unit = addTask(phase, taskName)(() ⇒ task.get().toScala) + /** + * The `Reason` for the shutdown as passed to the `run` method. `None` if the shutdown + * has not been started. + */ + def shutdownReason(): Option[Reason] = runStarted.get() + + /** + * The `Reason` for the shutdown as passed to the `run` method. `Optional.empty` if the shutdown + * has not been started. + */ + def getShutdownReason(): Optional[Reason] = shutdownReason().asJava + /** * Scala API: Run tasks of all phases. The returned * `Future` is completed when all tasks have been completed, * or there is a failure when recovery is disabled. * - * It's safe to call this method multiple times. It will only run the once. + * It's safe to call this method multiple times. It will only run the shutdown sequence once. */ - def run(): Future[Done] = run(None) + def run(reason: Reason): Future[Done] = run(reason, None) + + @deprecated("Use the method with `reason` parameter instead", since = "2.5.8") + def run(): Future[Done] = run(UnknownReason) /** * Java API: Run tasks of all phases. The returned * `CompletionStage` is completed when all tasks have been completed, * or there is a failure when recovery is disabled. * - * It's safe to call this method multiple times. It will only run the once. + * It's safe to call this method multiple times. It will only run the shutdown sequence once. */ - def runAll(): CompletionStage[Done] = run().toJava + def runAll(reason: Reason): CompletionStage[Done] = run(reason).toJava + + @deprecated("Use the method with `reason` parameter instead", since = "2.5.8") + def runAll(): CompletionStage[Done] = runAll(UnknownReason) /** * Scala API: Run tasks of all phases including and after the given phase. * The returned `Future` is completed when all such tasks have been completed, * or there is a failure when recovery is disabled. * - * It's safe to call this method multiple times. It will only run the once. + * It's safe to call this method multiple times. It will only run shutdown sequence once. */ - def run(fromPhase: Option[String]): Future[Done] = { - if (runStarted.compareAndSet(false, true)) { + def run(reason: Reason, fromPhase: Option[String]): Future[Done] = { + if (runStarted.compareAndSet(None, Some(reason))) { import system.dispatcher val debugEnabled = log.isDebugEnabled def loop(remainingPhases: List[String]): Future[Done] = { @@ -409,15 +477,23 @@ final class CoordinatedShutdown private[akka] ( runPromise.future } + @deprecated("Use the method with `reason` parameter instead", since = "2.5.8") + def run(fromPhase: Option[String]): Future[Done] = + run(UnknownReason, fromPhase) + /** * Java API: Run tasks of all phases including and after the given phase. * The returned `CompletionStage` is completed when all such tasks have been completed, * or there is a failure when recovery is disabled. * - * It's safe to call this method multiple times. It will only run once. + * It's safe to call this method multiple times. It will only run the shutdown sequence once. */ + def run(reason: Reason, fromPhase: Optional[String]): CompletionStage[Done] = + run(reason, fromPhase.asScala).toJava + + @deprecated("Use the method with `reason` parameter instead", since = "2.5.8") def run(fromPhase: Optional[String]): CompletionStage[Done] = - run(fromPhase.asScala).toJava + run(UnknownReason, fromPhase) /** * The configured timeout for a given `phase`. @@ -462,7 +538,7 @@ final class CoordinatedShutdown private[akka] ( * shutdown hooks the standard library JVM shutdown hooks APIs are better suited. */ @tailrec def addCancellableJvmShutdownHook[T](hook: ⇒ T): Cancellable = { - if (!runStarted.get) { + if (runStarted.get == None) { val currentLatch = _jvmHooksLatch.get val newLatch = new CountDownLatch(currentLatch.getCount.toInt + 1) if (_jvmHooksLatch.compareAndSet(currentLatch, newLatch)) { diff --git a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardRegion.scala b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardRegion.scala index 20051cdba8..4390955882 100644 --- a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardRegion.scala +++ b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardRegion.scala @@ -406,8 +406,12 @@ private[akka] class ShardRegion( CoordinatedShutdown(context.system).addTask( CoordinatedShutdown.PhaseClusterShardingShutdownRegion, "region-shutdown") { () ⇒ - self ! GracefulShutdown - gracefulShutdownProgress.future + if (cluster.isTerminated || cluster.selfMember.status == MemberStatus.Down) { + Future.successful(Done) + } else { + self ! GracefulShutdown + gracefulShutdownProgress.future + } } // subscribe to MemberEvent, re-subscribe when restart diff --git a/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/CoordinatedShutdownShardingSpec.scala b/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/CoordinatedShutdownShardingSpec.scala new file mode 100644 index 0000000000..e0605044de --- /dev/null +++ b/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/CoordinatedShutdownShardingSpec.scala @@ -0,0 +1,151 @@ +/** + * Copyright (C) 2017 Lightbend Inc. + */ +package akka.cluster.sharding + +import scala.concurrent.Future + +import scala.concurrent.duration._ +import akka.Done +import akka.actor.ActorSystem +import akka.actor.CoordinatedShutdown +import akka.actor.Props +import akka.cluster.Cluster +import akka.cluster.MemberStatus +import akka.testkit.AkkaSpec +import akka.testkit.TestActors.EchoActor +import akka.testkit.TestProbe + +object CoordinatedShutdownShardingSpec { + val config = + """ + akka.loglevel = INFO + akka.actor.provider = "cluster" + akka.remote.netty.tcp.port = 0 + akka.remote.artery.canonical.port = 0 + """ + + val extractEntityId: ShardRegion.ExtractEntityId = { + case msg: Int ⇒ (msg.toString, msg) + } + + val extractShardId: ShardRegion.ExtractShardId = { + case msg: Int ⇒ (msg % 10).toString + } +} + +class CoordinatedShutdownShardingSpec extends AkkaSpec(CoordinatedShutdownShardingSpec.config) { + import CoordinatedShutdownShardingSpec._ + + val sys1 = ActorSystem(system.name, system.settings.config) + val sys2 = ActorSystem(system.name, system.settings.config) + val sys3 = system + + val region1 = ClusterSharding(sys1).start("type1", Props[EchoActor](), ClusterShardingSettings(sys1), + extractEntityId, extractShardId) + val region2 = ClusterSharding(sys2).start("type1", Props[EchoActor](), ClusterShardingSettings(sys2), + extractEntityId, extractShardId) + val region3 = ClusterSharding(sys3).start("type1", Props[EchoActor](), ClusterShardingSettings(sys3), + extractEntityId, extractShardId) + + val probe1 = TestProbe()(sys1) + val probe2 = TestProbe()(sys2) + val probe3 = TestProbe()(sys3) + + CoordinatedShutdown(sys1).addTask(CoordinatedShutdown.PhaseBeforeServiceUnbind, "unbind") { () ⇒ + probe1.ref ! "CS-unbind-1" + Future.successful(Done) + } + CoordinatedShutdown(sys2).addTask(CoordinatedShutdown.PhaseBeforeServiceUnbind, "unbind") { () ⇒ + probe2.ref ! "CS-unbind-2" + Future.successful(Done) + } + CoordinatedShutdown(sys3).addTask(CoordinatedShutdown.PhaseBeforeServiceUnbind, "unbind") { () ⇒ + probe1.ref ! "CS-unbind-3" + Future.successful(Done) + } + + override def beforeTermination(): Unit = { + shutdown(sys1) + shutdown(sys2) + } + + def pingEntities(): Unit = { + region3.tell(1, probe3.ref) + probe3.expectMsg(10.seconds, 1) + region3.tell(2, probe3.ref) + probe3.expectMsg(2) + region3.tell(3, probe3.ref) + probe3.expectMsg(3) + } + + "Sharding and CoordinatedShutdown" must { + "init cluster" in { + Cluster(sys1).join(Cluster(sys1).selfAddress) // coordinator will initially run on sys2 + awaitAssert(Cluster(sys1).selfMember.status should ===(MemberStatus.Up)) + + Cluster(sys2).join(Cluster(sys1).selfAddress) + within(10.seconds) { + awaitAssert { + Cluster(sys1).state.members.size should ===(2) + Cluster(sys1).state.members.map(_.status) should ===(Set(MemberStatus.Up)) + Cluster(sys2).state.members.size should ===(2) + Cluster(sys2).state.members.map(_.status) should ===(Set(MemberStatus.Up)) + } + } + + Cluster(sys3).join(Cluster(sys1).selfAddress) + within(10.seconds) { + awaitAssert { + Cluster(sys1).state.members.size should ===(3) + Cluster(sys1).state.members.map(_.status) should ===(Set(MemberStatus.Up)) + Cluster(sys2).state.members.size should ===(3) + Cluster(sys2).state.members.map(_.status) should ===(Set(MemberStatus.Up)) + Cluster(sys3).state.members.size should ===(3) + Cluster(sys3).state.members.map(_.status) should ===(Set(MemberStatus.Up)) + } + } + + pingEntities() + } + + "run coordinated shutdown when leaving" in { + Cluster(sys3).leave(Cluster(sys1).selfAddress) + probe1.expectMsg("CS-unbind-1") + + within(10.seconds) { + awaitAssert { + Cluster(sys2).state.members.size should ===(2) + Cluster(sys3).state.members.size should ===(2) + } + } + within(10.seconds) { + awaitAssert { + Cluster(sys1).isTerminated should ===(true) + sys1.whenTerminated.isCompleted should ===(true) + } + } + + pingEntities() + } + + "run coordinated shutdown when downing" in { + Cluster(sys3).down(Cluster(sys2).selfAddress) + probe2.expectMsg("CS-unbind-2") + + within(10.seconds) { + awaitAssert { + Cluster(system).state.members.size should ===(1) + } + } + within(10.seconds) { + awaitAssert { + Cluster(sys2).isTerminated should ===(true) + sys2.whenTerminated.isCompleted should ===(true) + } + } + + pingEntities() + } + } +} diff --git a/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/GetShardTypeNamesSpec.scala b/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/GetShardTypeNamesSpec.scala index 10550b1c84..0290daaa08 100644 --- a/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/GetShardTypeNamesSpec.scala +++ b/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/GetShardTypeNamesSpec.scala @@ -13,6 +13,8 @@ object GetShardTypeNamesSpec { """ akka.loglevel = INFO akka.actor.provider = "cluster" + akka.remote.netty.tcp.port = 0 + akka.remote.artery.canonical.port = 0 """ val extractEntityId: ShardRegion.ExtractEntityId = { diff --git a/akka-cluster-tools/src/main/scala/akka/cluster/singleton/ClusterSingletonManager.scala b/akka-cluster-tools/src/main/scala/akka/cluster/singleton/ClusterSingletonManager.scala index 292be74935..0271d491dc 100644 --- a/akka-cluster-tools/src/main/scala/akka/cluster/singleton/ClusterSingletonManager.scala +++ b/akka-cluster-tools/src/main/scala/akka/cluster/singleton/ClusterSingletonManager.scala @@ -5,9 +5,10 @@ package akka.cluster.singleton import com.typesafe.config.Config - import scala.concurrent.duration._ import scala.collection.immutable +import scala.concurrent.Future + import akka.actor.Actor import akka.actor.Deploy import akka.actor.ActorSystem @@ -26,8 +27,8 @@ import akka.AkkaException import akka.actor.NoSerializationVerificationNeeded import akka.cluster.UniqueAddress import akka.cluster.ClusterEvent - import scala.concurrent.Promise + import akka.Done import akka.actor.CoordinatedShutdown import akka.annotation.DoNotInherit @@ -254,8 +255,12 @@ object ClusterSingletonManager { // should preferably complete before stopping the singleton sharding coordinator on same node. val coordShutdown = CoordinatedShutdown(context.system) coordShutdown.addTask(CoordinatedShutdown.PhaseClusterExiting, "singleton-exiting-1") { () ⇒ - implicit val timeout = Timeout(coordShutdown.timeout(CoordinatedShutdown.PhaseClusterExiting)) - self.ask(SelfExiting).mapTo[Done] + if (cluster.isTerminated || cluster.selfMember.status == MemberStatus.Down) { + Future.successful(Done) + } else { + implicit val timeout = Timeout(coordShutdown.timeout(CoordinatedShutdown.PhaseClusterExiting)) + self.ask(SelfExiting).mapTo[Done] + } } } override def postStop(): Unit = cluster.unsubscribe(self) @@ -468,11 +473,19 @@ class ClusterSingletonManager( // for CoordinatedShutdown val coordShutdown = CoordinatedShutdown(context.system) val memberExitingProgress = Promise[Done]() - coordShutdown.addTask(CoordinatedShutdown.PhaseClusterExiting, "wait-singleton-exiting")(() ⇒ - memberExitingProgress.future) + coordShutdown.addTask(CoordinatedShutdown.PhaseClusterExiting, "wait-singleton-exiting") { () ⇒ + if (cluster.isTerminated || cluster.selfMember.status == MemberStatus.Down) + Future.successful(Done) + else + memberExitingProgress.future + } coordShutdown.addTask(CoordinatedShutdown.PhaseClusterExiting, "singleton-exiting-2") { () ⇒ - implicit val timeout = Timeout(coordShutdown.timeout(CoordinatedShutdown.PhaseClusterExiting)) - self.ask(SelfExiting).mapTo[Done] + if (cluster.isTerminated || cluster.selfMember.status == MemberStatus.Down) { + Future.successful(Done) + } else { + implicit val timeout = Timeout(coordShutdown.timeout(CoordinatedShutdown.PhaseClusterExiting)) + self.ask(SelfExiting).mapTo[Done] + } } def logInfo(message: String): Unit = diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala index 8688024bbe..3faef8a0e1 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala @@ -176,7 +176,7 @@ private[cluster] final class ClusterDaemon(settings: ClusterSettings) extends Ac coordShutdown.addTask(CoordinatedShutdown.PhaseClusterLeave, "leave") { val sys = context.system () ⇒ - if (Cluster(sys).isTerminated) + if (Cluster(sys).isTerminated || Cluster(sys).selfMember.status == Down) Future.successful(Done) else { implicit val timeout = Timeout(coordShutdown.timeout(CoordinatedShutdown.PhaseClusterLeave)) @@ -190,8 +190,8 @@ private[cluster] final class ClusterDaemon(settings: ClusterSettings) extends Ac override def postStop(): Unit = { clusterShutdown.trySuccess(Done) if (Cluster(context.system).settings.RunCoordinatedShutdownWhenDown) { - // run the last phases e.g. if node was downed (not leaving) - coordShutdown.run(Some(CoordinatedShutdown.PhaseClusterShutdown)) + // if it was stopped due to leaving CoordinatedShutdown was started earlier + coordShutdown.run(CoordinatedShutdown.ClusterDowningReason) } } @@ -325,7 +325,7 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with coordShutdown.addTask(CoordinatedShutdown.PhaseClusterExitingDone, "exiting-completed") { val sys = context.system () ⇒ - if (Cluster(sys).isTerminated) + if (Cluster(sys).isTerminated || Cluster(sys).selfMember.status == Down) Future.successful(Done) else { implicit val timeout = Timeout(coordShutdown.timeout(CoordinatedShutdown.PhaseClusterExitingDone)) @@ -443,7 +443,7 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with "shutdown-after-unsuccessful-join-seed-nodes [{}]. Running CoordinatedShutdown.", seedNodes.mkString(", "), ShutdownAfterUnsuccessfulJoinSeedNodes) joinSeedNodesDeadline = None - CoordinatedShutdown(context.system).run() + CoordinatedShutdown(context.system).run(CoordinatedShutdown.ClusterDowningReason) } def becomeUninitialized(): Unit = { @@ -922,7 +922,7 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with exitingTasksInProgress = true logInfo("Exiting, starting coordinated shutdown") selfExiting.trySuccess(Done) - coordShutdown.run() + coordShutdown.run(CoordinatedShutdown.ClusterLeavingReason) } if (talkback) { @@ -1105,7 +1105,7 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with exitingTasksInProgress = true logInfo("Exiting (leader), starting coordinated shutdown") selfExiting.trySuccess(Done) - coordShutdown.run() + coordShutdown.run(CoordinatedShutdown.ClusterLeavingReason) } exitingConfirmed = exitingConfirmed.filterNot(removedExitingConfirmed) diff --git a/akka-cluster/src/test/scala/akka/cluster/ClusterSpec.scala b/akka-cluster/src/test/scala/akka/cluster/ClusterSpec.scala index 0f8a6efe2d..d438345c1b 100644 --- a/akka-cluster/src/test/scala/akka/cluster/ClusterSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/ClusterSpec.scala @@ -158,7 +158,7 @@ class ClusterSpec extends AkkaSpec(ClusterSpec.config) with ImplicitSender { Cluster(sys2).join(Cluster(sys2).selfAddress) probe.expectMsgType[MemberUp] - CoordinatedShutdown(sys2).run() + CoordinatedShutdown(sys2).run(CoordinatedShutdown.UnknownReason) probe.expectMsgType[MemberLeft] probe.expectMsgType[MemberExited] probe.expectMsgType[MemberRemoved] @@ -187,6 +187,7 @@ class ClusterSpec extends AkkaSpec(ClusterSpec.config) with ImplicitSender { probe.expectMsgType[MemberRemoved] Await.result(sys2.whenTerminated, 10.seconds) Cluster(sys2).isTerminated should ===(true) + CoordinatedShutdown(sys2).shutdownReason() should ===(Some(CoordinatedShutdown.ClusterLeavingReason)) } finally { shutdown(sys2) } @@ -212,6 +213,7 @@ akka.loglevel=DEBUG probe.expectMsgType[MemberRemoved] Await.result(sys3.whenTerminated, 10.seconds) Cluster(sys3).isTerminated should ===(true) + CoordinatedShutdown(sys3).shutdownReason() should ===(Some(CoordinatedShutdown.ClusterDowningReason)) } finally { shutdown(sys3) } diff --git a/akka-docs/src/test/java/jdocs/actor/ActorDocTest.java b/akka-docs/src/test/java/jdocs/actor/ActorDocTest.java index 16ac702e52..dce77d183a 100644 --- a/akka-docs/src/test/java/jdocs/actor/ActorDocTest.java +++ b/akka-docs/src/test/java/jdocs/actor/ActorDocTest.java @@ -832,7 +832,8 @@ public class ActorDocTest extends AbstractJavaTest { // don't run this if (false) { //#coordinated-shutdown-run - CompletionStage done = CoordinatedShutdown.get(system).runAll(); + CompletionStage done = CoordinatedShutdown.get(system).runAll( + CoordinatedShutdown.unknownReason()); //#coordinated-shutdown-run } } diff --git a/akka-docs/src/test/scala/docs/actor/ActorDocSpec.scala b/akka-docs/src/test/scala/docs/actor/ActorDocSpec.scala index bd8584ac38..f22d1421a6 100644 --- a/akka-docs/src/test/scala/docs/actor/ActorDocSpec.scala +++ b/akka-docs/src/test/scala/docs/actor/ActorDocSpec.scala @@ -756,7 +756,7 @@ class ActorDocSpec extends AkkaSpec(""" // don't run this def dummy(): Unit = { //#coordinated-shutdown-run - val done: Future[Done] = CoordinatedShutdown(system).run() + val done: Future[Done] = CoordinatedShutdown(system).run(CoordinatedShutdown.UnknownReason) //#coordinated-shutdown-run } } diff --git a/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala b/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala index 4f687eea8a..ea6559eea2 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala @@ -407,7 +407,9 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R override def settings: ArterySettings = provider.remoteSettings.Artery override def start(): Unit = { - Runtime.getRuntime.addShutdownHook(shutdownHook) + if (system.settings.JvmShutdownHooks) + Runtime.getRuntime.addShutdownHook(shutdownHook) + startMediaDriver() startAeron() topLevelFREvents.loFreq(Transport_AeronStarted, NoMetaData) @@ -877,7 +879,8 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R override def shutdown(): Future[Done] = { if (hasBeenShutdown.compareAndSet(false, true)) { log.debug("Shutting down [{}]", localAddress) - Runtime.getRuntime.removeShutdownHook(shutdownHook) + if (system.settings.JvmShutdownHooks) + Runtime.getRuntime.removeShutdownHook(shutdownHook) val allAssociations = associationRegistry.allAssociations val flushing: Future[Done] = if (allAssociations.isEmpty) Future.successful(Done)