properly shutdown ArteryTransport using CoordinatedShutdown, #22671 (#22698)

* properly shutdown ArteryTransport using CoordinatedShutdown, #22671

* The shutdownHook changed hasBeenShutdown flag to true, and then when
  the transport.shutdown was invoked the shutdown sequence was ignored
  until it was too late, ActorSystem already terminated.
* Also improved the cluster shutdown tasks when the cluster node had not
  joined

* CoordinatedShutdownLeave explicit events
This commit is contained in:
Patrik Nordwall 2017-04-11 21:48:51 +02:00 committed by GitHub
parent 193e8beb0a
commit 41c756f169
5 changed files with 34 additions and 17 deletions

View file

@ -104,9 +104,11 @@ object CoordinatedShutdown extends ExtensionId[CoordinatedShutdown] with Extensi
runningJvmHook = true // avoid System.exit from PhaseActorSystemTerminate task
if (!system.whenTerminated.isCompleted) {
coord.log.info("Starting coordinated shutdown from JVM shutdown hook")
try
Await.ready(coord.run(), coord.totalTimeout())
catch {
try {
// totalTimeout will be 0 when no tasks registered, so at least 3.seconds
val totalTimeout = coord.totalTimeout().max(3.seconds)
Await.ready(coord.run(), totalTimeout)
} catch {
case NonFatal(e)
coord.log.warning(
"CoordinatedShutdown from JVM shutdown failed: {}",

View file

@ -299,7 +299,10 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with
val selfExiting = Promise[Done]()
val coordShutdown = CoordinatedShutdown(context.system)
coordShutdown.addTask(CoordinatedShutdown.PhaseClusterExiting, "wait-exiting") { ()
selfExiting.future
if (latestGossip.members.isEmpty)
Future.successful(Done) // not joined yet
else
selfExiting.future
}
coordShutdown.addTask(CoordinatedShutdown.PhaseClusterExitingDone, "exiting-completed") {
val sys = context.system

View file

@ -49,7 +49,7 @@ object ClusterEvent {
/**
* Marker interface for cluster domain events.
*/
sealed trait ClusterDomainEvent
sealed trait ClusterDomainEvent extends DeadLetterSuppression
/**
* Current snapshot state of the cluster. Sent to new subscriber.
@ -198,7 +198,7 @@ object ClusterEvent {
* This event is published when the cluster node is shutting down,
* before the final [[MemberRemoved]] events are published.
*/
final case object ClusterShuttingDown extends ClusterDomainEvent with DeadLetterSuppression
final case object ClusterShuttingDown extends ClusterDomainEvent
/**
* Java API: get the singleton instance of `ClusterShuttingDown` event

View file

@ -41,16 +41,25 @@ private[akka] class CoordinatedShutdownLeave extends Actor {
def waitingLeaveCompleted(replyTo: ActorRef): Receive = {
case s: CurrentClusterState
if (s.members.exists(m m.uniqueAddress == cluster.selfUniqueAddress &&
if (s.members.isEmpty) {
// not joined yet
done(replyTo)
} else if (s.members.exists(m m.uniqueAddress == cluster.selfUniqueAddress &&
(m.status == Leaving || m.status == Exiting || m.status == Down))) {
replyTo ! Done
context.stop(self)
}
case evt: MemberEvent
if (evt.member.uniqueAddress == cluster.selfUniqueAddress) {
replyTo ! Done
context.stop(self)
done(replyTo)
}
case MemberLeft(m)
if (m.uniqueAddress == cluster.selfUniqueAddress)
done(replyTo)
case MemberRemoved(m, _)
// in case it was downed instead
if (m.uniqueAddress == cluster.selfUniqueAddress)
done(replyTo)
}
private def done(replyTo: ActorRef): Unit = {
replyTo ! Done
context.stop(self)
}
}

View file

@ -458,16 +458,19 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R
private lazy val shutdownHook = new Thread {
override def run(): Unit = {
if (hasBeenShutdown.compareAndSet(false, true)) {
if (!hasBeenShutdown.get) {
val coord = CoordinatedShutdown(system)
val totalTimeout = coord.totalTimeout()
// totalTimeout will be 0 when no tasks registered, so at least 3.seconds
val totalTimeout = coord.totalTimeout().max(3.seconds)
if (!coord.jvmHooksLatch.await(totalTimeout.toMillis, TimeUnit.MILLISECONDS))
log.warning(
"CoordinatedShutdown took longer than [{}]. Shutting down [{}] via shutdownHook",
totalTimeout, localAddress)
else
log.debug("Shutting down [{}] via shutdownHook", localAddress)
Await.result(internalShutdown(), settings.Advanced.DriverTimeout + 3.seconds)
if (hasBeenShutdown.compareAndSet(false, true)) {
Await.result(internalShutdown(), settings.Advanced.DriverTimeout + 3.seconds)
}
}
}
}