diff --git a/akka-actor/src/main/scala/akka/actor/CoordinatedShutdown.scala b/akka-actor/src/main/scala/akka/actor/CoordinatedShutdown.scala index 8a5f194aee..4242696973 100644 --- a/akka-actor/src/main/scala/akka/actor/CoordinatedShutdown.scala +++ b/akka-actor/src/main/scala/akka/actor/CoordinatedShutdown.scala @@ -104,9 +104,11 @@ object CoordinatedShutdown extends ExtensionId[CoordinatedShutdown] with Extensi runningJvmHook = true // avoid System.exit from PhaseActorSystemTerminate task if (!system.whenTerminated.isCompleted) { coord.log.info("Starting coordinated shutdown from JVM shutdown hook") - try - Await.ready(coord.run(), coord.totalTimeout()) - catch { + try { + // totalTimeout will be 0 when no tasks registered, so at least 3.seconds + val totalTimeout = coord.totalTimeout().max(3.seconds) + Await.ready(coord.run(), totalTimeout) + } catch { case NonFatal(e) ⇒ coord.log.warning( "CoordinatedShutdown from JVM shutdown failed: {}", diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala index 161209f8fd..a778cec61d 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala @@ -299,7 +299,10 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with val selfExiting = Promise[Done]() val coordShutdown = CoordinatedShutdown(context.system) coordShutdown.addTask(CoordinatedShutdown.PhaseClusterExiting, "wait-exiting") { () ⇒ - selfExiting.future + if (latestGossip.members.isEmpty) + Future.successful(Done) // not joined yet + else + selfExiting.future } coordShutdown.addTask(CoordinatedShutdown.PhaseClusterExitingDone, "exiting-completed") { val sys = context.system diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterEvent.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterEvent.scala index 522042cb97..319581b9cf 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterEvent.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterEvent.scala @@ -49,7 +49,7 @@ object ClusterEvent { /** * Marker interface for cluster domain events. */ - sealed trait ClusterDomainEvent + sealed trait ClusterDomainEvent extends DeadLetterSuppression /** * Current snapshot state of the cluster. Sent to new subscriber. @@ -198,7 +198,7 @@ object ClusterEvent { * This event is published when the cluster node is shutting down, * before the final [[MemberRemoved]] events are published. */ - final case object ClusterShuttingDown extends ClusterDomainEvent with DeadLetterSuppression + final case object ClusterShuttingDown extends ClusterDomainEvent /** * Java API: get the singleton instance of `ClusterShuttingDown` event diff --git a/akka-cluster/src/main/scala/akka/cluster/CoordinatedShutdownLeave.scala b/akka-cluster/src/main/scala/akka/cluster/CoordinatedShutdownLeave.scala index f26166316a..801214033b 100644 --- a/akka-cluster/src/main/scala/akka/cluster/CoordinatedShutdownLeave.scala +++ b/akka-cluster/src/main/scala/akka/cluster/CoordinatedShutdownLeave.scala @@ -41,16 +41,25 @@ private[akka] class CoordinatedShutdownLeave extends Actor { def waitingLeaveCompleted(replyTo: ActorRef): Receive = { case s: CurrentClusterState ⇒ - if (s.members.exists(m ⇒ m.uniqueAddress == cluster.selfUniqueAddress && + if (s.members.isEmpty) { + // not joined yet + done(replyTo) + } else if (s.members.exists(m ⇒ m.uniqueAddress == cluster.selfUniqueAddress && (m.status == Leaving || m.status == Exiting || m.status == Down))) { - replyTo ! Done - context.stop(self) - } - case evt: MemberEvent ⇒ - if (evt.member.uniqueAddress == cluster.selfUniqueAddress) { - replyTo ! Done - context.stop(self) + done(replyTo) } + case MemberLeft(m) ⇒ + if (m.uniqueAddress == cluster.selfUniqueAddress) + done(replyTo) + case MemberRemoved(m, _) ⇒ + // in case it was downed instead + if (m.uniqueAddress == cluster.selfUniqueAddress) + done(replyTo) + } + + private def done(replyTo: ActorRef): Unit = { + replyTo ! Done + context.stop(self) } } diff --git a/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala b/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala index b06c2ecbc4..affba7da27 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala @@ -458,16 +458,19 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R private lazy val shutdownHook = new Thread { override def run(): Unit = { - if (hasBeenShutdown.compareAndSet(false, true)) { + if (!hasBeenShutdown.get) { val coord = CoordinatedShutdown(system) - val totalTimeout = coord.totalTimeout() + // totalTimeout will be 0 when no tasks registered, so at least 3.seconds + val totalTimeout = coord.totalTimeout().max(3.seconds) if (!coord.jvmHooksLatch.await(totalTimeout.toMillis, TimeUnit.MILLISECONDS)) log.warning( "CoordinatedShutdown took longer than [{}]. Shutting down [{}] via shutdownHook", totalTimeout, localAddress) else log.debug("Shutting down [{}] via shutdownHook", localAddress) - Await.result(internalShutdown(), settings.Advanced.DriverTimeout + 3.seconds) + if (hasBeenShutdown.compareAndSet(false, true)) { + Await.result(internalShutdown(), settings.Advanced.DriverTimeout + 3.seconds) + } } } }