Stop singleton and shards when self MemberDowned, #26336 (#26339)

* Stop singleton when self MemberDowned, #26336
  * It's safer to stop singleton instance early in case of downing.
  * Instead of waiting for MemberRemoved and trying to hand over.
* Stop ShardRegion when self MemberDowned, #26336
* Upper bound when waiting for seen in shutdownSelfWhenDown, #26336
This commit is contained in:
Patrik Nordwall 2019-02-12 15:05:33 +01:00 committed by GitHub
parent 8e2d378228
commit ddada9a8e1
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 165 additions and 2 deletions

View file

@ -516,6 +516,12 @@ private[akka] class ShardRegion(
else if (matchingRole(m)) else if (matchingRole(m))
changeMembers(membersByAge.filterNot(_.uniqueAddress == m.uniqueAddress)) changeMembers(membersByAge.filterNot(_.uniqueAddress == m.uniqueAddress))
case MemberDowned(m)
if (m.uniqueAddress == cluster.selfUniqueAddress) {
log.info("Self downed, stopping ShardRegion [{}]", self.path)
context.stop(self)
}
case _: MemberEvent // these are expected, no need to warn about them case _: MemberEvent // these are expected, no need to warn about them
case _ unhandled(evt) case _ unhandled(evt)

View file

@ -511,7 +511,7 @@ class ClusterSingletonManager(
require(!cluster.isTerminated, "Cluster node must not be terminated") require(!cluster.isTerminated, "Cluster node must not be terminated")
// subscribe to cluster changes, re-subscribe when restart // subscribe to cluster changes, re-subscribe when restart
cluster.subscribe(self, ClusterEvent.InitialStateAsEvents, classOf[MemberRemoved]) cluster.subscribe(self, ClusterEvent.InitialStateAsEvents, classOf[MemberRemoved], classOf[MemberDowned])
setTimer(CleanupTimer, Cleanup, 1.minute, repeat = true) setTimer(CleanupTimer, Cleanup, 1.minute, repeat = true)
@ -573,6 +573,10 @@ class ClusterSingletonManager(
stay using YoungerData(oldestOption) stay using YoungerData(oldestOption)
} }
case Event(MemberDowned(m), _) if m.uniqueAddress == cluster.selfUniqueAddress
logInfo("Self downed, stopping ClusterSingletonManager")
stop()
case Event(MemberRemoved(m, _), _) if m.uniqueAddress == cluster.selfUniqueAddress case Event(MemberRemoved(m, _), _) if m.uniqueAddress == cluster.selfUniqueAddress
logInfo("Self removed, stopping ClusterSingletonManager") logInfo("Self removed, stopping ClusterSingletonManager")
stop() stop()
@ -612,6 +616,10 @@ class ClusterSingletonManager(
stay stay
} }
case Event(MemberDowned(m), _) if m.uniqueAddress == cluster.selfUniqueAddress
logInfo("Self downed, stopping ClusterSingletonManager")
stop()
case Event(MemberRemoved(m, _), _) if m.uniqueAddress == cluster.selfUniqueAddress case Event(MemberRemoved(m, _), _) if m.uniqueAddress == cluster.selfUniqueAddress
logInfo("Self removed, stopping ClusterSingletonManager") logInfo("Self removed, stopping ClusterSingletonManager")
stop() stop()
@ -722,6 +730,15 @@ class ClusterSingletonManager(
// complete memberExitingProgress when handOverDone // complete memberExitingProgress when handOverDone
sender() ! Done // reply to ask sender() ! Done // reply to ask
stay stay
case Event(MemberDowned(m), OldestData(singleton, singletonTerminated)) if m.uniqueAddress == cluster.selfUniqueAddress
if (singletonTerminated) {
logInfo("Self downed, stopping ClusterSingletonManager")
stop()
} else {
logInfo("Self downed, stopping")
gotoStopping(singleton)
}
} }
when(WasOldest) { when(WasOldest) {
@ -761,6 +778,15 @@ class ClusterSingletonManager(
sender() ! Done // reply to ask sender() ! Done // reply to ask
stay stay
case Event(MemberDowned(m), OldestData(singleton, singletonTerminated)) if m.uniqueAddress == cluster.selfUniqueAddress
if (singletonTerminated) {
logInfo("Self downed, stopping ClusterSingletonManager")
stop()
} else {
logInfo("Self downed, stopping")
gotoStopping(singleton)
}
} }
def gotoHandingOver(singleton: ActorRef, singletonTerminated: Boolean, handOverTo: Option[ActorRef]): State = { def gotoHandingOver(singleton: ActorRef, singletonTerminated: Boolean, handOverTo: Option[ActorRef]): State = {
@ -853,6 +879,10 @@ class ClusterSingletonManager(
case Event(Cleanup, _) case Event(Cleanup, _)
cleanupOverdueNotMemberAnyMore() cleanupOverdueNotMemberAnyMore()
stay stay
case Event(MemberDowned(m), _)
if (m.uniqueAddress == cluster.selfUniqueAddress)
logInfo("Self downed, waiting for removal")
stay
} }
onTransition { onTransition {

View file

@ -0,0 +1,123 @@
/*
* Copyright (C) 2019 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.cluster.singleton
import scala.concurrent.duration._
import akka.actor.Actor
import akka.actor.ActorRef
import akka.actor.PoisonPill
import akka.actor.Props
import akka.cluster.Cluster
import akka.cluster.MemberStatus
import akka.remote.testconductor.RoleName
import akka.remote.testkit.MultiNodeConfig
import akka.remote.testkit.MultiNodeSpec
import akka.remote.testkit.STMultiNodeSpec
import akka.remote.transport.ThrottlerTransportAdapter
import akka.testkit._
import com.typesafe.config.ConfigFactory
object ClusterSingletonManagerDownedSpec extends MultiNodeConfig {
val first = role("first")
val second = role("second")
val third = role("third")
commonConfig(ConfigFactory.parseString("""
akka.loglevel = INFO
akka.actor.provider = "cluster"
akka.remote.log-remote-lifecycle-events = off
"""))
testTransport(on = true)
case object EchoStarted
case object EchoStopped
/**
* The singleton actor
*/
class Echo(testActor: ActorRef) extends Actor {
testActor ! EchoStarted
override def postStop(): Unit = {
testActor ! EchoStopped
}
def receive = {
case _ sender() ! self
}
}
}
class ClusterSingletonManagerDownedMultiJvmNode1 extends ClusterSingletonManagerDownedSpec
class ClusterSingletonManagerDownedMultiJvmNode2 extends ClusterSingletonManagerDownedSpec
class ClusterSingletonManagerDownedMultiJvmNode3 extends ClusterSingletonManagerDownedSpec
class ClusterSingletonManagerDownedSpec extends MultiNodeSpec(ClusterSingletonManagerDownedSpec) with STMultiNodeSpec with ImplicitSender {
import ClusterSingletonManagerDownedSpec._
override def initialParticipants = roles.size
private val cluster = Cluster(system)
def join(from: RoleName, to: RoleName): Unit = {
runOn(from) {
cluster.join(node(to).address)
createSingleton()
}
}
def createSingleton(): ActorRef = {
system.actorOf(
ClusterSingletonManager.props(
singletonProps = Props(classOf[Echo], testActor),
terminationMessage = PoisonPill,
settings = ClusterSingletonManagerSettings(system)),
name = "echo")
}
"A ClusterSingletonManager downing" must {
"startup 3 node" in {
join(first, first)
join(second, first)
join(third, first)
within(15.seconds) {
awaitAssert {
cluster.state.members.size should ===(3)
cluster.state.members.map(_.status) should ===(Set(MemberStatus.Up))
}
}
runOn(first) {
expectMsg(EchoStarted)
}
enterBarrier("started")
}
"stop instance when member is downed" in {
runOn(first) {
testConductor.blackhole(first, third, ThrottlerTransportAdapter.Direction.Both).await
testConductor.blackhole(second, third, ThrottlerTransportAdapter.Direction.Both).await
within(15.seconds) {
awaitAssert {
cluster.state.unreachable.size should ===(1)
}
}
}
enterBarrier("blackhole-1")
runOn(first) {
// another blackhole so that second can't mark gossip as seen and thereby deferring shutdown of first
testConductor.blackhole(first, second, ThrottlerTransportAdapter.Direction.Both).await
cluster.down(node(second).address)
cluster.down(cluster.selfAddress)
// singleton instance stopped, before failure detection of first-second
expectMsg(3.seconds, EchoStopped)
}
enterBarrier("stopped")
}
}
}

View file

@ -286,6 +286,7 @@ private[cluster] final class ClusterCoreSupervisor(joinConfigCompatChecker: Join
private[cluster] object ClusterCoreDaemon { private[cluster] object ClusterCoreDaemon {
val NumberOfGossipsBeforeShutdownWhenLeaderExits = 5 val NumberOfGossipsBeforeShutdownWhenLeaderExits = 5
val MaxGossipsBeforeShuttingDownMyself = 5 val MaxGossipsBeforeShuttingDownMyself = 5
val MaxTicksBeforeShuttingDownMyself = 4
} }
@ -333,6 +334,7 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef, joinConfigCompatCh
var seedNodeProcessCounter = 0 // for unique names var seedNodeProcessCounter = 0 // for unique names
var joinSeedNodesDeadline: Option[Deadline] = None var joinSeedNodesDeadline: Option[Deadline] = None
var leaderActionCounter = 0 var leaderActionCounter = 0
var selfDownCounter = 0
var exitingTasksInProgress = false var exitingTasksInProgress = false
val selfExiting = Promise[Done]() val selfExiting = Promise[Done]()
@ -1112,7 +1114,7 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef, joinConfigCompatCh
// status Down. The down commands should spread before we shutdown. // status Down. The down commands should spread before we shutdown.
val unreachable = membershipState.dcReachability.allUnreachableOrTerminated val unreachable = membershipState.dcReachability.allUnreachableOrTerminated
val downed = membershipState.dcMembers.collect { case m if m.status == Down m.uniqueAddress } val downed = membershipState.dcMembers.collect { case m if m.status == Down m.uniqueAddress }
if (downed.forall(node unreachable(node) || latestGossip.seenByNode(node))) { if (selfDownCounter >= MaxTicksBeforeShuttingDownMyself || downed.forall(node unreachable(node) || latestGossip.seenByNode(node))) {
// the reason for not shutting down immediately is to give the gossip a chance to spread // the reason for not shutting down immediately is to give the gossip a chance to spread
// the downing information to other downed nodes, so that they can shutdown themselves // the downing information to other downed nodes, so that they can shutdown themselves
logInfo("Shutting down myself") logInfo("Shutting down myself")
@ -1120,6 +1122,8 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef, joinConfigCompatCh
// if other downed know that this node has seen the version // if other downed know that this node has seen the version
gossipRandomN(MaxGossipsBeforeShuttingDownMyself) gossipRandomN(MaxGossipsBeforeShuttingDownMyself)
shutdown() shutdown()
} else {
selfDownCounter += 1
} }
} }
} }