Fix singleton issue when leaving several nodes, #27487 (#27488)

* Fix singleton issue when leaving several nodes, #27487

* When leaving several nodes at about the same time the new singleton
  could be started before previous had been completely stopped.
* Found two possible ways this could happen.
  * Acting on MemberRemoved that is emitted when the self
    cluster node is shutting down.
  * The HandOverDone confirmation when in Younger state,
    but that node is also Leaving so could be seen as Exiting
    from a third node that is the next singleton.

* keep track of all previous oldest, not only the latest

* Option => List
* Otherwise in BecomingOldest it could transition to Oldest
  when the previous oldest was removed even though the previous-previous wasn't removed yet

* fix failure in ClusterSingletonRestart2Spec

* OldestChanged was not emitted when Exiting member was removed
* The initial membersByAge must also contain Leaving, Exiting members

(cherry picked from commit ee188565b9f3cf2257ebda218cec6af5a4777439)
This commit is contained in:
Patrik Nordwall 2019-08-19 14:59:35 +02:00
parent c97e966f8b
commit ddb085255d
3 changed files with 287 additions and 53 deletions

View file

@ -1,3 +1,8 @@
# Protobuf 3
ProblemFilters.exclude[Problem]("akka.cluster.client.protobuf.msg.*")
ProblemFilters.exclude[Problem]("akka.cluster.pubsub.protobuf.msg.*")
# #27487 Singleton issue when several nodes leaving
ProblemFilters.exclude[Problem]("akka.cluster.singleton.ClusterSingletonManager#Internal#OldestChangedBuffer*")
ProblemFilters.exclude[Problem]("akka.cluster.singleton.ClusterSingletonManager#Internal#YoungerData.*")
ProblemFilters.exclude[Problem]("akka.cluster.singleton.ClusterSingletonManager#Internal#BecomingOldestData.*")

View file

@ -233,8 +233,8 @@ object ClusterSingletonManager {
case object End extends State
case object Uninitialized extends Data
final case class YoungerData(oldestOption: Option[UniqueAddress]) extends Data
final case class BecomingOldestData(previousOldestOption: Option[UniqueAddress]) extends Data
final case class YoungerData(oldest: List[UniqueAddress]) extends Data
final case class BecomingOldestData(previousOldest: List[UniqueAddress]) extends Data
final case class OldestData(singleton: Option[ActorRef]) extends Data
final case class WasOldestData(singleton: Option[ActorRef], newOldestOption: Option[UniqueAddress]) extends Data
final case class HandingOverData(singleton: ActorRef, handOverTo: Option[ActorRef]) extends Data
@ -259,7 +259,7 @@ object ClusterSingletonManager {
/**
* The first event, corresponding to CurrentClusterState.
*/
final case class InitialOldestState(oldest: Option[UniqueAddress], safeToBeOldest: Boolean)
final case class InitialOldestState(oldest: List[UniqueAddress], safeToBeOldest: Boolean)
final case class OldestChanged(oldest: Option[UniqueAddress])
}
@ -324,19 +324,23 @@ object ClusterSingletonManager {
}
def handleInitial(state: CurrentClusterState): Unit = {
// all members except Joining and WeaklyUp
membersByAge = immutable.SortedSet
.empty(ageOrdering)
.union(state.members.filter(m => m.status == MemberStatus.Up && matchingRole(m)))
.union(state.members.filter(m => m.upNumber != Int.MaxValue && matchingRole(m)))
// If there is some removal in progress of an older node it's not safe to immediately become oldest,
// removal of younger nodes doesn't matter. Note that it can also be started via restart after
// ClusterSingletonManagerIsStuck.
val selfUpNumber = state.members
.collectFirst { case m if m.uniqueAddress == cluster.selfUniqueAddress => m.upNumber }
.getOrElse(Int.MaxValue)
val safeToBeOldest = !state.members.exists { m =>
m.upNumber <= selfUpNumber && matchingRole(m) && (m.status == MemberStatus.Down || m.status == MemberStatus.Exiting || m.status == MemberStatus.Leaving)
val oldest = membersByAge.takeWhile(_.upNumber <= selfUpNumber)
val safeToBeOldest = !oldest.exists { m =>
m.status == MemberStatus.Down || m.status == MemberStatus.Exiting || m.status == MemberStatus.Leaving
}
val initial = InitialOldestState(membersByAge.headOption.map(_.uniqueAddress), safeToBeOldest)
val initial = InitialOldestState(oldest.toList.map(_.uniqueAddress), safeToBeOldest)
changes :+= initial
}
@ -600,36 +604,40 @@ class ClusterSingletonManager(singletonProps: Props, terminationMessage: Any, se
getNextOldestChanged()
stay
case Event(InitialOldestState(oldestOption, safeToBeOldest), _) =>
case Event(InitialOldestState(oldest, safeToBeOldest), _) =>
oldestChangedReceived = true
if (oldestOption == selfUniqueAddressOption && safeToBeOldest)
if (oldest.headOption == selfUniqueAddressOption && safeToBeOldest)
// oldest immediately
tryGotoOldest()
else if (oldestOption == selfUniqueAddressOption)
goto(BecomingOldest).using(BecomingOldestData(None))
else if (oldest.headOption == selfUniqueAddressOption)
goto(BecomingOldest).using(BecomingOldestData(oldest.filterNot(_ == cluster.selfUniqueAddress)))
else
goto(Younger).using(YoungerData(oldestOption))
goto(Younger).using(YoungerData(oldest.filterNot(_ == cluster.selfUniqueAddress)))
}
when(Younger) {
case Event(OldestChanged(oldestOption), YoungerData(previousOldestOption)) =>
case Event(OldestChanged(oldestOption), YoungerData(previousOldest)) =>
oldestChangedReceived = true
if (oldestOption == selfUniqueAddressOption) {
logInfo("Younger observed OldestChanged: [{} -> myself]", previousOldestOption.map(_.address))
previousOldestOption match {
case None => tryGotoOldest()
case Some(prev) if removed.contains(prev) => tryGotoOldest()
case Some(prev) =>
peer(prev.address) ! HandOverToMe
goto(BecomingOldest).using(BecomingOldestData(previousOldestOption))
logInfo("Younger observed OldestChanged: [{} -> myself]", previousOldest.headOption.map(_.address))
if (previousOldest.forall(removed.contains))
tryGotoOldest()
else {
peer(previousOldest.head.address) ! HandOverToMe
goto(BecomingOldest).using(BecomingOldestData(previousOldest))
}
} else {
logInfo(
"Younger observed OldestChanged: [{} -> {}]",
previousOldestOption.map(_.address),
previousOldest.headOption.map(_.address),
oldestOption.map(_.address))
getNextOldestChanged()
stay.using(YoungerData(oldestOption))
val newPreviousOldest = oldestOption match {
case Some(oldest) if !previousOldest.contains(oldest) => oldest :: previousOldest
case _ => previousOldest
}
stay.using(YoungerData(newPreviousOldest))
}
case Event(MemberDowned(m), _) if m.uniqueAddress == cluster.selfUniqueAddress =>
@ -644,16 +652,23 @@ class ClusterSingletonManager(singletonProps: Props, terminationMessage: Any, se
scheduleDelayedMemberRemoved(m)
stay
case Event(DelayedMemberRemoved(m), YoungerData(Some(previousOldest))) if m.uniqueAddress == previousOldest =>
logInfo("Previous oldest removed [{}]", m.address)
case Event(DelayedMemberRemoved(m), YoungerData(previousOldest)) =>
if (!selfExited)
logInfo("Member removed [{}]", m.address)
addRemoved(m.uniqueAddress)
// transition when OldestChanged
stay.using(YoungerData(None))
stay.using(YoungerData(previousOldest.filterNot(_ == m.uniqueAddress)))
case Event(HandOverToMe, _) =>
// this node was probably quickly restarted with same hostname:port,
// confirm that the old singleton instance has been stopped
sender() ! HandOverDone
val selfStatus = cluster.selfMember.status
if (selfStatus == MemberStatus.Leaving || selfStatus == MemberStatus.Exiting)
logInfo("Ignoring HandOverToMe in Younger from [{}] because self is [{}].", sender().path.address, selfStatus)
else {
// this node was probably quickly restarted with same hostname:port,
// confirm that the old singleton instance has been stopped
sender() ! HandOverDone
}
stay
}
@ -665,15 +680,21 @@ class ClusterSingletonManager(singletonProps: Props, terminationMessage: Any, se
cancelTimer(HandOverRetryTimer)
stay
case Event(HandOverDone, BecomingOldestData(Some(previousOldest))) =>
if (sender().path.address == previousOldest.address)
tryGotoOldest()
else {
logInfo(
"Ignoring HandOverDone in BecomingOldest from [{}]. Expected previous oldest [{}]",
sender().path.address,
previousOldest.address)
stay
case Event(HandOverDone, BecomingOldestData(previousOldest)) =>
previousOldest.headOption match {
case Some(oldest) =>
if (sender().path.address == oldest.address)
tryGotoOldest()
else {
logInfo(
"Ignoring HandOverDone in BecomingOldest from [{}]. Expected previous oldest [{}]",
sender().path.address,
oldest.address)
stay
}
case None =>
logInfo("Ignoring HandOverDone in BecomingOldest from [{}].", sender().path.address)
stay
}
case Event(MemberDowned(m), _) if m.uniqueAddress == cluster.selfUniqueAddress =>
@ -688,13 +709,20 @@ class ClusterSingletonManager(singletonProps: Props, terminationMessage: Any, se
scheduleDelayedMemberRemoved(m)
stay
case Event(DelayedMemberRemoved(m), BecomingOldestData(Some(previousOldest)))
if m.uniqueAddress == previousOldest =>
logInfo("Previous oldest [{}] removed", previousOldest.address)
case Event(DelayedMemberRemoved(m), BecomingOldestData(previousOldest)) =>
if (!selfExited)
logInfo("Member removed [{}], previous oldest [{}]", m.address, previousOldest.map(_.address).mkString(", "))
addRemoved(m.uniqueAddress)
tryGotoOldest()
if (cluster.isTerminated) {
// don't act on DelayedMemberRemoved (starting singleton) if this node is shutting its self down,
// just wait for self MemberRemoved
stay
} else if (previousOldest.contains(m.uniqueAddress) && previousOldest.forall(removed.contains))
tryGotoOldest()
else
stay.using(BecomingOldestData(previousOldest.filterNot(_ == m.uniqueAddress)))
case Event(TakeOverFromMe, BecomingOldestData(previousOldestOption)) =>
case Event(TakeOverFromMe, BecomingOldestData(previousOldest)) =>
val senderAddress = sender().path.address
// it would have been better to include the UniqueAddress in the TakeOverFromMe message,
// but can't change due to backwards compatibility
@ -704,28 +732,29 @@ class ClusterSingletonManager(singletonProps: Props, terminationMessage: Any, se
logInfo("Ignoring TakeOver request from unknown node in BecomingOldest from [{}].", senderAddress)
stay
case Some(senderUniqueAddress) =>
previousOldestOption match {
case Some(previousOldest) =>
if (previousOldest == senderUniqueAddress) sender() ! HandOverToMe
previousOldest.headOption match {
case Some(oldest) =>
if (oldest == senderUniqueAddress)
sender() ! HandOverToMe
else
logInfo(
"Ignoring TakeOver request in BecomingOldest from [{}]. Expected previous oldest [{}]",
sender().path.address,
previousOldest.address)
oldest.address)
stay
case None =>
sender() ! HandOverToMe
stay.using(BecomingOldestData(Some(senderUniqueAddress)))
stay.using(BecomingOldestData(senderUniqueAddress :: previousOldest))
}
}
case Event(HandOverRetry(count), BecomingOldestData(previousOldestOption)) =>
case Event(HandOverRetry(count), BecomingOldestData(previousOldest)) =>
if (count <= maxHandOverRetries) {
logInfo("Retry [{}], sending HandOverToMe to [{}]", count, previousOldestOption.map(_.address))
previousOldestOption.foreach(node => peer(node.address) ! HandOverToMe)
logInfo("Retry [{}], sending HandOverToMe to [{}]", count, previousOldest.headOption.map(_.address))
previousOldest.headOption.foreach(node => peer(node.address) ! HandOverToMe)
startSingleTimer(HandOverRetryTimer, HandOverRetry(count + 1), handOverRetryInterval)
stay()
} else if (previousOldestOption.forall(removed.contains)) {
} else if (previousOldest.forall(removed.contains)) {
// can't send HandOverToMe, previousOldest unknown for new node (or restart)
// previous oldest might be down or removed, so no TakeOverFromMe message is received
logInfo("Timeout in BecomingOldest. Previous oldest unknown, removed and no TakeOver request.")
@ -734,7 +763,7 @@ class ClusterSingletonManager(singletonProps: Props, terminationMessage: Any, se
stop()
else
throw new ClusterSingletonManagerIsStuck(
s"Becoming singleton oldest was stuck because previous oldest [$previousOldestOption] is unresponsive")
s"Becoming singleton oldest was stuck because previous oldest [${previousOldest.headOption}] is unresponsive")
}
def scheduleDelayedMemberRemoved(m: Member): Unit = {
@ -964,7 +993,7 @@ class ClusterSingletonManager(singletonProps: Props, terminationMessage: Any, se
logInfo("Self removed, stopping ClusterSingletonManager")
stop()
} else if (handOverTo.isEmpty)
goto(Younger).using(YoungerData(None))
goto(Younger).using(YoungerData(Nil))
else
goto(End).using(EndData)
}

View file

@ -0,0 +1,200 @@
/*
* Copyright (C) 2009-2019 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.cluster.singleton
import scala.concurrent.duration._
import akka.actor.Actor
import akka.actor.ActorIdentity
import akka.actor.ActorLogging
import akka.actor.ActorRef
import akka.actor.Identify
import akka.actor.PoisonPill
import akka.actor.Props
import akka.cluster.Cluster
import akka.cluster.MemberStatus
import akka.remote.testconductor.RoleName
import akka.remote.testkit.MultiNodeConfig
import akka.remote.testkit.MultiNodeSpec
import akka.remote.testkit.STMultiNodeSpec
import akka.testkit._
import com.typesafe.config.ConfigFactory
object ClusterSingletonManagerLeave2Spec extends MultiNodeConfig {
val first = role("first")
val second = role("second")
val third = role("third")
val fourth = role("fourth")
val fifth = role("fifth")
commonConfig(ConfigFactory.parseString("""
akka.loglevel = INFO
akka.actor.provider = "cluster"
akka.remote.log-remote-lifecycle-events = off
akka.cluster.auto-down-unreachable-after = off
"""))
case object EchoStarted
/**
* The singleton actor
*/
class Echo(testActor: ActorRef) extends Actor with ActorLogging {
override def preStart(): Unit = {
log.debug("Started singleton at [{}]", Cluster(context.system).selfAddress)
testActor ! "preStart"
}
override def postStop(): Unit = {
log.debug("Stopped singleton at [{}]", Cluster(context.system).selfAddress)
testActor ! "postStop"
}
def receive = {
case "stop" =>
testActor ! "stop"
// this is the stop message from singleton manager, but don't stop immediately
// will be stopped via PoisonPill from the test to simulate delay
case _ =>
sender() ! self
}
}
}
class ClusterSingletonManagerLeave2MultiJvmNode1 extends ClusterSingletonManagerLeave2Spec
class ClusterSingletonManagerLeave2MultiJvmNode2 extends ClusterSingletonManagerLeave2Spec
class ClusterSingletonManagerLeave2MultiJvmNode3 extends ClusterSingletonManagerLeave2Spec
class ClusterSingletonManagerLeave2MultiJvmNode4 extends ClusterSingletonManagerLeave2Spec
class ClusterSingletonManagerLeave2MultiJvmNode5 extends ClusterSingletonManagerLeave2Spec
class ClusterSingletonManagerLeave2Spec
extends MultiNodeSpec(ClusterSingletonManagerLeave2Spec)
with STMultiNodeSpec
with ImplicitSender {
import ClusterSingletonManagerLeave2Spec._
override def initialParticipants = roles.size
lazy val cluster = Cluster(system)
def join(from: RoleName, to: RoleName): Unit = {
runOn(from) {
cluster.join(node(to).address)
createSingleton()
}
}
def createSingleton(): ActorRef = {
system.actorOf(
ClusterSingletonManager.props(
singletonProps = Props(classOf[Echo], testActor),
terminationMessage = "stop",
settings = ClusterSingletonManagerSettings(system)),
name = "echo")
}
val echoProxyTerminatedProbe = TestProbe()
lazy val echoProxy: ActorRef = {
echoProxyTerminatedProbe.watch(
system.actorOf(
ClusterSingletonProxy
.props(singletonManagerPath = "/user/echo", settings = ClusterSingletonProxySettings(system)),
name = "echoProxy"))
}
"Leaving ClusterSingletonManager with two nodes" must {
"hand-over to new instance" in {
join(first, first)
runOn(first) {
within(5.seconds) {
expectMsg("preStart")
echoProxy ! "hello"
expectMsgType[ActorRef]
}
}
enterBarrier("first-active")
join(second, first)
runOn(first, second) {
within(10.seconds) {
awaitAssert(cluster.state.members.count(m => m.status == MemberStatus.Up) should be(2))
}
}
enterBarrier("second-up")
join(third, first)
runOn(first, second, third) {
within(10.seconds) {
awaitAssert(cluster.state.members.count(m => m.status == MemberStatus.Up) should be(3))
}
}
enterBarrier("third-up")
runOn(first, second, third, fourth) {
join(fourth, first)
within(10.seconds) {
awaitAssert(cluster.state.members.count(m => m.status == MemberStatus.Up) should be(4))
}
}
enterBarrier("fourth-up")
join(fifth, first)
within(10.seconds) {
awaitAssert(cluster.state.members.count(m => m.status == MemberStatus.Up) should be(5))
}
enterBarrier("all-up")
runOn(first) {
cluster.registerOnMemberRemoved(testActor ! "MemberRemoved")
cluster.leave(cluster.selfAddress)
expectMsg(10.seconds, "stop") // from singleton manager, but will not stop immediately
}
runOn(second, fourth) {
cluster.registerOnMemberRemoved(testActor ! "MemberRemoved")
cluster.leave(cluster.selfAddress)
expectMsg(10.seconds, "MemberRemoved")
}
runOn(second, third) {
(1 to 3).foreach { n =>
Thread.sleep(1000)
// singleton should not be started before old has been stopped
system.actorSelection("/user/echo/singleton") ! Identify(n)
expectMsg(ActorIdentity(n, None)) // not started
}
}
enterBarrier("still-running-at-first")
runOn(first) {
system.actorSelection("/user/echo/singleton") ! PoisonPill
expectMsg("postStop")
// CoordinatedShutdown makes sure that singleton actors are
// stopped before Cluster shutdown
expectMsg(10.seconds, "MemberRemoved")
echoProxyTerminatedProbe.expectTerminated(echoProxy, 10.seconds)
}
enterBarrier("stopped")
runOn(third) {
expectMsg("preStart")
}
enterBarrier("third-started")
runOn(third, fifth) {
val p = TestProbe()
val firstAddress = node(first).address
p.within(15.seconds) {
p.awaitAssert {
echoProxy.tell("hello2", p.ref)
p.expectMsgType[ActorRef](1.seconds).path.address should not be (firstAddress)
}
}
}
enterBarrier("third-working")
}
}
}