diff --git a/akka-cluster-tools/src/main/mima-filters/2.5.x.backwards.excludes b/akka-cluster-tools/src/main/mima-filters/2.5.x.backwards.excludes index c22d70fa3d..2b6554c6c3 100644 --- a/akka-cluster-tools/src/main/mima-filters/2.5.x.backwards.excludes +++ b/akka-cluster-tools/src/main/mima-filters/2.5.x.backwards.excludes @@ -1,3 +1,8 @@ # Protobuf 3 ProblemFilters.exclude[Problem]("akka.cluster.client.protobuf.msg.*") ProblemFilters.exclude[Problem]("akka.cluster.pubsub.protobuf.msg.*") + +# #27487 Singleton issue when several nodes leaving +ProblemFilters.exclude[Problem]("akka.cluster.singleton.ClusterSingletonManager#Internal#OldestChangedBuffer*") +ProblemFilters.exclude[Problem]("akka.cluster.singleton.ClusterSingletonManager#Internal#YoungerData.*") +ProblemFilters.exclude[Problem]("akka.cluster.singleton.ClusterSingletonManager#Internal#BecomingOldestData.*") diff --git a/akka-cluster-tools/src/main/scala/akka/cluster/singleton/ClusterSingletonManager.scala b/akka-cluster-tools/src/main/scala/akka/cluster/singleton/ClusterSingletonManager.scala index 5756af5c72..57f25679ef 100644 --- a/akka-cluster-tools/src/main/scala/akka/cluster/singleton/ClusterSingletonManager.scala +++ b/akka-cluster-tools/src/main/scala/akka/cluster/singleton/ClusterSingletonManager.scala @@ -233,8 +233,8 @@ object ClusterSingletonManager { case object End extends State case object Uninitialized extends Data - final case class YoungerData(oldestOption: Option[UniqueAddress]) extends Data - final case class BecomingOldestData(previousOldestOption: Option[UniqueAddress]) extends Data + final case class YoungerData(oldest: List[UniqueAddress]) extends Data + final case class BecomingOldestData(previousOldest: List[UniqueAddress]) extends Data final case class OldestData(singleton: Option[ActorRef]) extends Data final case class WasOldestData(singleton: Option[ActorRef], newOldestOption: Option[UniqueAddress]) extends Data final case class HandingOverData(singleton: ActorRef, handOverTo: Option[ActorRef]) extends Data @@ -259,7 +259,7 @@ object ClusterSingletonManager { /** * The first event, corresponding to CurrentClusterState. */ - final case class InitialOldestState(oldest: Option[UniqueAddress], safeToBeOldest: Boolean) + final case class InitialOldestState(oldest: List[UniqueAddress], safeToBeOldest: Boolean) final case class OldestChanged(oldest: Option[UniqueAddress]) } @@ -324,19 +324,23 @@ object ClusterSingletonManager { } def handleInitial(state: CurrentClusterState): Unit = { + // all members except Joining and WeaklyUp membersByAge = immutable.SortedSet .empty(ageOrdering) - .union(state.members.filter(m => m.status == MemberStatus.Up && matchingRole(m))) + .union(state.members.filter(m => m.upNumber != Int.MaxValue && matchingRole(m))) + // If there is some removal in progress of an older node it's not safe to immediately become oldest, // removal of younger nodes doesn't matter. Note that it can also be started via restart after // ClusterSingletonManagerIsStuck. val selfUpNumber = state.members .collectFirst { case m if m.uniqueAddress == cluster.selfUniqueAddress => m.upNumber } .getOrElse(Int.MaxValue) - val safeToBeOldest = !state.members.exists { m => - m.upNumber <= selfUpNumber && matchingRole(m) && (m.status == MemberStatus.Down || m.status == MemberStatus.Exiting || m.status == MemberStatus.Leaving) + val oldest = membersByAge.takeWhile(_.upNumber <= selfUpNumber) + val safeToBeOldest = !oldest.exists { m => + m.status == MemberStatus.Down || m.status == MemberStatus.Exiting || m.status == MemberStatus.Leaving } - val initial = InitialOldestState(membersByAge.headOption.map(_.uniqueAddress), safeToBeOldest) + + val initial = InitialOldestState(oldest.toList.map(_.uniqueAddress), safeToBeOldest) changes :+= initial } @@ -600,36 +604,40 @@ class ClusterSingletonManager(singletonProps: Props, terminationMessage: Any, se getNextOldestChanged() stay - case Event(InitialOldestState(oldestOption, safeToBeOldest), _) => + case Event(InitialOldestState(oldest, safeToBeOldest), _) => oldestChangedReceived = true - if (oldestOption == selfUniqueAddressOption && safeToBeOldest) + + if (oldest.headOption == selfUniqueAddressOption && safeToBeOldest) // oldest immediately tryGotoOldest() - else if (oldestOption == selfUniqueAddressOption) - goto(BecomingOldest).using(BecomingOldestData(None)) + else if (oldest.headOption == selfUniqueAddressOption) + goto(BecomingOldest).using(BecomingOldestData(oldest.filterNot(_ == cluster.selfUniqueAddress))) else - goto(Younger).using(YoungerData(oldestOption)) + goto(Younger).using(YoungerData(oldest.filterNot(_ == cluster.selfUniqueAddress))) } when(Younger) { - case Event(OldestChanged(oldestOption), YoungerData(previousOldestOption)) => + case Event(OldestChanged(oldestOption), YoungerData(previousOldest)) => oldestChangedReceived = true if (oldestOption == selfUniqueAddressOption) { - logInfo("Younger observed OldestChanged: [{} -> myself]", previousOldestOption.map(_.address)) - previousOldestOption match { - case None => tryGotoOldest() - case Some(prev) if removed.contains(prev) => tryGotoOldest() - case Some(prev) => - peer(prev.address) ! HandOverToMe - goto(BecomingOldest).using(BecomingOldestData(previousOldestOption)) + logInfo("Younger observed OldestChanged: [{} -> myself]", previousOldest.headOption.map(_.address)) + if (previousOldest.forall(removed.contains)) + tryGotoOldest() + else { + peer(previousOldest.head.address) ! HandOverToMe + goto(BecomingOldest).using(BecomingOldestData(previousOldest)) } } else { logInfo( "Younger observed OldestChanged: [{} -> {}]", - previousOldestOption.map(_.address), + previousOldest.headOption.map(_.address), oldestOption.map(_.address)) getNextOldestChanged() - stay.using(YoungerData(oldestOption)) + val newPreviousOldest = oldestOption match { + case Some(oldest) if !previousOldest.contains(oldest) => oldest :: previousOldest + case _ => previousOldest + } + stay.using(YoungerData(newPreviousOldest)) } case Event(MemberDowned(m), _) if m.uniqueAddress == cluster.selfUniqueAddress => @@ -644,16 +652,23 @@ class ClusterSingletonManager(singletonProps: Props, terminationMessage: Any, se scheduleDelayedMemberRemoved(m) stay - case Event(DelayedMemberRemoved(m), YoungerData(Some(previousOldest))) if m.uniqueAddress == previousOldest => - logInfo("Previous oldest removed [{}]", m.address) + case Event(DelayedMemberRemoved(m), YoungerData(previousOldest)) => + if (!selfExited) + logInfo("Member removed [{}]", m.address) addRemoved(m.uniqueAddress) // transition when OldestChanged - stay.using(YoungerData(None)) + stay.using(YoungerData(previousOldest.filterNot(_ == m.uniqueAddress))) case Event(HandOverToMe, _) => - // this node was probably quickly restarted with same hostname:port, - // confirm that the old singleton instance has been stopped - sender() ! HandOverDone + val selfStatus = cluster.selfMember.status + if (selfStatus == MemberStatus.Leaving || selfStatus == MemberStatus.Exiting) + logInfo("Ignoring HandOverToMe in Younger from [{}] because self is [{}].", sender().path.address, selfStatus) + else { + // this node was probably quickly restarted with same hostname:port, + // confirm that the old singleton instance has been stopped + sender() ! HandOverDone + } + stay } @@ -665,15 +680,21 @@ class ClusterSingletonManager(singletonProps: Props, terminationMessage: Any, se cancelTimer(HandOverRetryTimer) stay - case Event(HandOverDone, BecomingOldestData(Some(previousOldest))) => - if (sender().path.address == previousOldest.address) - tryGotoOldest() - else { - logInfo( - "Ignoring HandOverDone in BecomingOldest from [{}]. Expected previous oldest [{}]", - sender().path.address, - previousOldest.address) - stay + case Event(HandOverDone, BecomingOldestData(previousOldest)) => + previousOldest.headOption match { + case Some(oldest) => + if (sender().path.address == oldest.address) + tryGotoOldest() + else { + logInfo( + "Ignoring HandOverDone in BecomingOldest from [{}]. Expected previous oldest [{}]", + sender().path.address, + oldest.address) + stay + } + case None => + logInfo("Ignoring HandOverDone in BecomingOldest from [{}].", sender().path.address) + stay } case Event(MemberDowned(m), _) if m.uniqueAddress == cluster.selfUniqueAddress => @@ -688,13 +709,20 @@ class ClusterSingletonManager(singletonProps: Props, terminationMessage: Any, se scheduleDelayedMemberRemoved(m) stay - case Event(DelayedMemberRemoved(m), BecomingOldestData(Some(previousOldest))) - if m.uniqueAddress == previousOldest => - logInfo("Previous oldest [{}] removed", previousOldest.address) + case Event(DelayedMemberRemoved(m), BecomingOldestData(previousOldest)) => + if (!selfExited) + logInfo("Member removed [{}], previous oldest [{}]", m.address, previousOldest.map(_.address).mkString(", ")) addRemoved(m.uniqueAddress) - tryGotoOldest() + if (cluster.isTerminated) { + // don't act on DelayedMemberRemoved (starting singleton) if this node is shutting its self down, + // just wait for self MemberRemoved + stay + } else if (previousOldest.contains(m.uniqueAddress) && previousOldest.forall(removed.contains)) + tryGotoOldest() + else + stay.using(BecomingOldestData(previousOldest.filterNot(_ == m.uniqueAddress))) - case Event(TakeOverFromMe, BecomingOldestData(previousOldestOption)) => + case Event(TakeOverFromMe, BecomingOldestData(previousOldest)) => val senderAddress = sender().path.address // it would have been better to include the UniqueAddress in the TakeOverFromMe message, // but can't change due to backwards compatibility @@ -704,28 +732,29 @@ class ClusterSingletonManager(singletonProps: Props, terminationMessage: Any, se logInfo("Ignoring TakeOver request from unknown node in BecomingOldest from [{}].", senderAddress) stay case Some(senderUniqueAddress) => - previousOldestOption match { - case Some(previousOldest) => - if (previousOldest == senderUniqueAddress) sender() ! HandOverToMe + previousOldest.headOption match { + case Some(oldest) => + if (oldest == senderUniqueAddress) + sender() ! HandOverToMe else logInfo( "Ignoring TakeOver request in BecomingOldest from [{}]. Expected previous oldest [{}]", sender().path.address, - previousOldest.address) + oldest.address) stay case None => sender() ! HandOverToMe - stay.using(BecomingOldestData(Some(senderUniqueAddress))) + stay.using(BecomingOldestData(senderUniqueAddress :: previousOldest)) } } - case Event(HandOverRetry(count), BecomingOldestData(previousOldestOption)) => + case Event(HandOverRetry(count), BecomingOldestData(previousOldest)) => if (count <= maxHandOverRetries) { - logInfo("Retry [{}], sending HandOverToMe to [{}]", count, previousOldestOption.map(_.address)) - previousOldestOption.foreach(node => peer(node.address) ! HandOverToMe) + logInfo("Retry [{}], sending HandOverToMe to [{}]", count, previousOldest.headOption.map(_.address)) + previousOldest.headOption.foreach(node => peer(node.address) ! HandOverToMe) startSingleTimer(HandOverRetryTimer, HandOverRetry(count + 1), handOverRetryInterval) stay() - } else if (previousOldestOption.forall(removed.contains)) { + } else if (previousOldest.forall(removed.contains)) { // can't send HandOverToMe, previousOldest unknown for new node (or restart) // previous oldest might be down or removed, so no TakeOverFromMe message is received logInfo("Timeout in BecomingOldest. Previous oldest unknown, removed and no TakeOver request.") @@ -734,7 +763,7 @@ class ClusterSingletonManager(singletonProps: Props, terminationMessage: Any, se stop() else throw new ClusterSingletonManagerIsStuck( - s"Becoming singleton oldest was stuck because previous oldest [$previousOldestOption] is unresponsive") + s"Becoming singleton oldest was stuck because previous oldest [${previousOldest.headOption}] is unresponsive") } def scheduleDelayedMemberRemoved(m: Member): Unit = { @@ -964,7 +993,7 @@ class ClusterSingletonManager(singletonProps: Props, terminationMessage: Any, se logInfo("Self removed, stopping ClusterSingletonManager") stop() } else if (handOverTo.isEmpty) - goto(Younger).using(YoungerData(None)) + goto(Younger).using(YoungerData(Nil)) else goto(End).using(EndData) } diff --git a/akka-cluster-tools/src/multi-jvm/scala/akka/cluster/singleton/ClusterSingletonManagerLeave2Spec.scala b/akka-cluster-tools/src/multi-jvm/scala/akka/cluster/singleton/ClusterSingletonManagerLeave2Spec.scala new file mode 100644 index 0000000000..fd04d69275 --- /dev/null +++ b/akka-cluster-tools/src/multi-jvm/scala/akka/cluster/singleton/ClusterSingletonManagerLeave2Spec.scala @@ -0,0 +1,200 @@ +/* + * Copyright (C) 2009-2019 Lightbend Inc. + */ + +package akka.cluster.singleton + +import scala.concurrent.duration._ + +import akka.actor.Actor +import akka.actor.ActorIdentity +import akka.actor.ActorLogging +import akka.actor.ActorRef +import akka.actor.Identify +import akka.actor.PoisonPill +import akka.actor.Props +import akka.cluster.Cluster +import akka.cluster.MemberStatus +import akka.remote.testconductor.RoleName +import akka.remote.testkit.MultiNodeConfig +import akka.remote.testkit.MultiNodeSpec +import akka.remote.testkit.STMultiNodeSpec +import akka.testkit._ +import com.typesafe.config.ConfigFactory + +object ClusterSingletonManagerLeave2Spec extends MultiNodeConfig { + val first = role("first") + val second = role("second") + val third = role("third") + val fourth = role("fourth") + val fifth = role("fifth") + + commonConfig(ConfigFactory.parseString(""" + akka.loglevel = INFO + akka.actor.provider = "cluster" + akka.remote.log-remote-lifecycle-events = off + akka.cluster.auto-down-unreachable-after = off + """)) + + case object EchoStarted + + /** + * The singleton actor + */ + class Echo(testActor: ActorRef) extends Actor with ActorLogging { + override def preStart(): Unit = { + log.debug("Started singleton at [{}]", Cluster(context.system).selfAddress) + testActor ! "preStart" + } + override def postStop(): Unit = { + log.debug("Stopped singleton at [{}]", Cluster(context.system).selfAddress) + testActor ! "postStop" + } + + def receive = { + case "stop" => + testActor ! "stop" + // this is the stop message from singleton manager, but don't stop immediately + // will be stopped via PoisonPill from the test to simulate delay + case _ => + sender() ! self + } + } +} + +class ClusterSingletonManagerLeave2MultiJvmNode1 extends ClusterSingletonManagerLeave2Spec +class ClusterSingletonManagerLeave2MultiJvmNode2 extends ClusterSingletonManagerLeave2Spec +class ClusterSingletonManagerLeave2MultiJvmNode3 extends ClusterSingletonManagerLeave2Spec +class ClusterSingletonManagerLeave2MultiJvmNode4 extends ClusterSingletonManagerLeave2Spec +class ClusterSingletonManagerLeave2MultiJvmNode5 extends ClusterSingletonManagerLeave2Spec + +class ClusterSingletonManagerLeave2Spec + extends MultiNodeSpec(ClusterSingletonManagerLeave2Spec) + with STMultiNodeSpec + with ImplicitSender { + import ClusterSingletonManagerLeave2Spec._ + + override def initialParticipants = roles.size + + lazy val cluster = Cluster(system) + + def join(from: RoleName, to: RoleName): Unit = { + runOn(from) { + cluster.join(node(to).address) + createSingleton() + } + } + + def createSingleton(): ActorRef = { + system.actorOf( + ClusterSingletonManager.props( + singletonProps = Props(classOf[Echo], testActor), + terminationMessage = "stop", + settings = ClusterSingletonManagerSettings(system)), + name = "echo") + } + + val echoProxyTerminatedProbe = TestProbe() + + lazy val echoProxy: ActorRef = { + echoProxyTerminatedProbe.watch( + system.actorOf( + ClusterSingletonProxy + .props(singletonManagerPath = "/user/echo", settings = ClusterSingletonProxySettings(system)), + name = "echoProxy")) + } + + "Leaving ClusterSingletonManager with two nodes" must { + + "hand-over to new instance" in { + join(first, first) + + runOn(first) { + within(5.seconds) { + expectMsg("preStart") + echoProxy ! "hello" + expectMsgType[ActorRef] + } + } + enterBarrier("first-active") + + join(second, first) + runOn(first, second) { + within(10.seconds) { + awaitAssert(cluster.state.members.count(m => m.status == MemberStatus.Up) should be(2)) + } + } + enterBarrier("second-up") + + join(third, first) + runOn(first, second, third) { + within(10.seconds) { + awaitAssert(cluster.state.members.count(m => m.status == MemberStatus.Up) should be(3)) + } + } + enterBarrier("third-up") + runOn(first, second, third, fourth) { + join(fourth, first) + within(10.seconds) { + awaitAssert(cluster.state.members.count(m => m.status == MemberStatus.Up) should be(4)) + } + } + enterBarrier("fourth-up") + join(fifth, first) + within(10.seconds) { + awaitAssert(cluster.state.members.count(m => m.status == MemberStatus.Up) should be(5)) + } + enterBarrier("all-up") + + runOn(first) { + cluster.registerOnMemberRemoved(testActor ! "MemberRemoved") + cluster.leave(cluster.selfAddress) + expectMsg(10.seconds, "stop") // from singleton manager, but will not stop immediately + } + runOn(second, fourth) { + cluster.registerOnMemberRemoved(testActor ! "MemberRemoved") + cluster.leave(cluster.selfAddress) + expectMsg(10.seconds, "MemberRemoved") + } + + runOn(second, third) { + (1 to 3).foreach { n => + Thread.sleep(1000) + // singleton should not be started before old has been stopped + system.actorSelection("/user/echo/singleton") ! Identify(n) + expectMsg(ActorIdentity(n, None)) // not started + } + } + enterBarrier("still-running-at-first") + + runOn(first) { + system.actorSelection("/user/echo/singleton") ! PoisonPill + expectMsg("postStop") + // CoordinatedShutdown makes sure that singleton actors are + // stopped before Cluster shutdown + expectMsg(10.seconds, "MemberRemoved") + echoProxyTerminatedProbe.expectTerminated(echoProxy, 10.seconds) + } + enterBarrier("stopped") + + runOn(third) { + expectMsg("preStart") + } + enterBarrier("third-started") + + runOn(third, fifth) { + val p = TestProbe() + val firstAddress = node(first).address + p.within(15.seconds) { + p.awaitAssert { + echoProxy.tell("hello2", p.ref) + p.expectMsgType[ActorRef](1.seconds).path.address should not be (firstAddress) + + } + } + } + enterBarrier("third-working") + } + + } +}