From 90bc4cfa3ea2c71ae23191fe073aff1d49ebb977 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Fri, 9 Nov 2018 09:42:48 +0100 Subject: [PATCH] Improvements of singleton leaving scenario, #25639 (#25710) * Testing of singleton leaving * gossip optimization, exiting change to two oldest per role * hardening ClusterSingletonManagerIsStuck restart, increase ClusterSingletonManagerIsStuck --- .../src/main/resources/reference.conf | 12 +- .../singleton/ClusterSingletonManager.scala | 26 +++- .../ClusterSingletonLeavingSpeedSpec.scala | 143 ++++++++++++++++++ .../ClusterSingletonRestart2Spec.scala | 1 + .../scala/akka/cluster/ClusterDaemon.scala | 23 ++- .../scala/akka/cluster/MembershipState.scala | 26 ++++ .../akka/cluster/MembershipStateSpec.scala | 46 ++++++ 7 files changed, 268 insertions(+), 9 deletions(-) create mode 100644 akka-cluster-tools/src/test/scala/akka/cluster/singleton/ClusterSingletonLeavingSpeedSpec.scala diff --git a/akka-cluster-tools/src/main/resources/reference.conf b/akka-cluster-tools/src/main/resources/reference.conf index 2b0ba1d578..1de04b2a75 100644 --- a/akka-cluster-tools/src/main/resources/reference.conf +++ b/akka-cluster-tools/src/main/resources/reference.conf @@ -175,7 +175,17 @@ akka.cluster.singleton { # The number of retries are derived from hand-over-retry-interval and # akka.cluster.down-removal-margin (or ClusterSingletonManagerSettings.removalMargin), # but it will never be less than this property. - min-number-of-hand-over-retries = 10 + # After the hand over retries and it's still not able to exchange the hand over messages + # with the previous oldest it will restart itself by throwing ClusterSingletonManagerIsStuck, + # to start from a clean state. After that it will still not start the singleton instance + # until the previous oldest node has been removed from the cluster. + # On the other side, on the previous oldest node, the same number of retries - 3 are used + # and after that the singleton instance is stopped. + # For large clusters it might be necessary to increase this to avoid too early timeouts while + # gossip dissemination of the Leaving to Exiting phase occurs. For normal leaving scenarios + # it will not be a quicker hand over by reducing this value, but in extreme failure scenarios + # the recovery might be faster. + min-number-of-hand-over-retries = 15 } # //#singleton-config diff --git a/akka-cluster-tools/src/main/scala/akka/cluster/singleton/ClusterSingletonManager.scala b/akka-cluster-tools/src/main/scala/akka/cluster/singleton/ClusterSingletonManager.scala index 39f1e8f465..0ef688d4cc 100644 --- a/akka-cluster-tools/src/main/scala/akka/cluster/singleton/ClusterSingletonManager.scala +++ b/akka-cluster-tools/src/main/scala/akka/cluster/singleton/ClusterSingletonManager.scala @@ -282,8 +282,14 @@ object ClusterSingletonManager { def handleInitial(state: CurrentClusterState): Unit = { membersByAge = immutable.SortedSet.empty(ageOrdering) union state.members.filter(m ⇒ - (m.status == MemberStatus.Up || m.status == MemberStatus.Leaving) && matchingRole(m)) - val safeToBeOldest = !state.members.exists { m ⇒ (m.status == MemberStatus.Down || m.status == MemberStatus.Exiting) } + m.status == MemberStatus.Up && matchingRole(m)) + // If there is some removal in progress of an older node it's not safe to immediately become oldest, + // removal of younger nodes doesn't matter. Note that it can also be started via restart after + // ClusterSingletonManagerIsStuck. + val selfUpNumber = state.members.collectFirst { case m if m.uniqueAddress == cluster.selfUniqueAddress ⇒ m.upNumber }.getOrElse(Int.MaxValue) + val safeToBeOldest = !state.members.exists { m ⇒ + m.upNumber <= selfUpNumber && matchingRole(m) && (m.status == MemberStatus.Down || m.status == MemberStatus.Exiting || m.status == MemberStatus.Leaving) + } val initial = InitialOldestState(membersByAge.headOption.map(_.uniqueAddress), safeToBeOldest) changes :+= initial } @@ -658,7 +664,7 @@ class ClusterSingletonManager( stop() else throw new ClusterSingletonManagerIsStuck( - s"Becoming singleton oldest was stuck because previous oldest [${previousOldestOption}] is unresponsive") + s"Becoming singleton oldest was stuck because previous oldest [$previousOldestOption] is unresponsive") } def scheduleDelayedMemberRemoved(m: Member): Unit = { @@ -708,6 +714,7 @@ class ClusterSingletonManager( stay case Event(Terminated(ref), d @ OldestData(singleton, _)) if ref == singleton ⇒ + logInfo("Singleton actor [{}] was terminated", singleton.path) stay using d.copy(singletonTerminated = true) case Event(SelfExiting, _) ⇒ @@ -723,12 +730,15 @@ class ClusterSingletonManager( if (singletonTerminated) stop() else gotoStopping(singleton) } else if (count <= maxTakeOverRetries) { - logInfo("Retry [{}], sending TakeOverFromMe to [{}]", count, newOldestOption.map(_.address)) + if (maxTakeOverRetries - count <= 3) + logInfo("Retry [{}], sending TakeOverFromMe to [{}]", count, newOldestOption.map(_.address)) + else + log.debug("Retry [{}], sending TakeOverFromMe to [{}]", count, newOldestOption.map(_.address)) newOldestOption.foreach(node ⇒ peer(node.address) ! TakeOverFromMe) setTimer(TakeOverRetryTimer, TakeOverRetry(count + 1), handOverRetryInterval, repeat = false) stay } else - throw new ClusterSingletonManagerIsStuck(s"Expected hand-over to [${newOldestOption}] never occurred") + throw new ClusterSingletonManagerIsStuck(s"Expected hand-over to [$newOldestOption] never occurred") case Event(HandOverToMe, WasOldestData(singleton, singletonTerminated, _)) ⇒ gotoHandingOver(singleton, singletonTerminated, Some(sender())) @@ -742,6 +752,7 @@ class ClusterSingletonManager( gotoHandingOver(singleton, singletonTerminated, None) case Event(Terminated(ref), d @ WasOldestData(singleton, _, _)) if ref == singleton ⇒ + logInfo("Singleton actor [{}] was terminated", singleton.path) stay using d.copy(singletonTerminated = true) case Event(SelfExiting, _) ⇒ @@ -757,6 +768,7 @@ class ClusterSingletonManager( handOverDone(handOverTo) } else { handOverTo foreach { _ ! HandOverInProgress } + logInfo("Singleton manager stopping singleton actor [{}]", singleton.path) singleton ! terminationMessage goto(HandingOver) using HandingOverData(singleton, handOverTo) } @@ -793,12 +805,14 @@ class ClusterSingletonManager( } def gotoStopping(singleton: ActorRef): State = { + logInfo("Singleton manager starting singleton actor [{}]", singleton.path) singleton ! terminationMessage goto(Stopping) using StoppingData(singleton) } when(Stopping) { case (Event(Terminated(ref), StoppingData(singleton))) if ref == singleton ⇒ + logInfo("Singleton actor [{}] was terminated", singleton.path) stop() } @@ -834,7 +848,7 @@ class ClusterSingletonManager( addRemoved(m.uniqueAddress) stay case Event(TakeOverFromMe, _) ⇒ - logInfo("Ignoring TakeOver request in [{}] from [{}].", stateName, sender().path.address) + log.debug("Ignoring TakeOver request in [{}] from [{}].", stateName, sender().path.address) stay case Event(Cleanup, _) ⇒ cleanupOverdueNotMemberAnyMore() diff --git a/akka-cluster-tools/src/test/scala/akka/cluster/singleton/ClusterSingletonLeavingSpeedSpec.scala b/akka-cluster-tools/src/test/scala/akka/cluster/singleton/ClusterSingletonLeavingSpeedSpec.scala new file mode 100644 index 0000000000..c89d5c8f80 --- /dev/null +++ b/akka-cluster-tools/src/test/scala/akka/cluster/singleton/ClusterSingletonLeavingSpeedSpec.scala @@ -0,0 +1,143 @@ +/* + * Copyright (C) 2009-2018 Lightbend Inc. + */ + +package akka.cluster.singleton + +import scala.concurrent.duration._ + +import akka.actor.Actor +import akka.actor.ActorRef +import akka.actor.ActorSystem +import akka.actor.CoordinatedShutdown +import akka.actor.PoisonPill +import akka.actor.Props +import akka.cluster.Cluster +import akka.cluster.MemberStatus +import akka.cluster.singleton.ClusterSingletonLeavingSpeedSpec.TheSingleton +import akka.testkit.AkkaSpec +import akka.testkit.TestActors +import akka.testkit.TestProbe +import com.typesafe.config.ConfigFactory + +object ClusterSingletonLeavingSpeedSpec { + + object TheSingleton { + def props(probe: ActorRef): Props = + Props(new TheSingleton(probe)) + } + + class TheSingleton(probe: ActorRef) extends Actor { + probe ! "started" + + override def postStop(): Unit = { + probe ! "stopped" + } + + override def receive: Receive = { + case msg ⇒ sender() ! msg + } + } +} + +class ClusterSingletonLeavingSpeedSpec extends AkkaSpec(""" + akka.loglevel = DEBUG + akka.actor.provider = akka.cluster.ClusterActorRefProvider + akka.cluster.auto-down-unreachable-after = 2s + + # With 10 systems and setting min-number-of-hand-over-retries to 5 and gossip-interval to 2s it's possible to + # reproduce the ClusterSingletonManagerIsStuck and slow hand over in issue #25639 + # akka.cluster.singleton.min-number-of-hand-over-retries = 5 + # akka.cluster.gossip-interval = 2s + + akka.remote { + netty.tcp { + hostname = "127.0.0.1" + port = 0 + } + artery.canonical { + hostname = "127.0.0.1" + port = 0 + } + } + """) { + + private val systems = (1 to 3).map { n ⇒ + val roleConfig = ConfigFactory.parseString(s"""akka.cluster.roles=[role-${n % 3}]""") + ActorSystem(system.name, roleConfig.withFallback(system.settings.config)) + } + private val probes = systems.map(TestProbe()(_)) + + override def expectedTestDuration: FiniteDuration = 10.minutes + + def join(from: ActorSystem, to: ActorSystem, probe: ActorRef): Unit = { + from.actorOf( + ClusterSingletonManager.props( + singletonProps = TheSingleton.props(probe), + terminationMessage = PoisonPill, + settings = ClusterSingletonManagerSettings(from)), + name = "echo") + + Cluster(from).join(Cluster(to).selfAddress) + within(15.seconds) { + awaitAssert { + Cluster(from).state.members.map(_.uniqueAddress) should contain(Cluster(from).selfUniqueAddress) + Cluster(from).state.members.map(_.status) should ===(Set(MemberStatus.Up)) + } + } + } + + "ClusterSingleton that is leaving" must { + "join cluster" in { + systems.indices.foreach { i ⇒ + join(systems(i), systems.head, probes(i).ref) + } + // leader is most likely on system, lowest port + join(system, systems.head, testActor) + + probes(0).expectMsg("started") + } + + "quickly hand-over to next oldest" in { + + val durations = systems.indices.take(1).map { i ⇒ + val t0 = System.nanoTime() + val leaveAddress = Cluster(systems(i)).selfAddress + CoordinatedShutdown(systems(i)).run(CoordinatedShutdown.ClusterLeavingReason) + probes(i).expectMsg(10.seconds, "stopped") + val stoppedDuration = (System.nanoTime() - t0).nanos + val startedProbe = if (i == systems.size - 1) this else probes(i + 1) + startedProbe.expectMsg(30.seconds, "started") + val startedDuration = (System.nanoTime() - t0).nanos + + within(15.seconds) { + awaitAssert { + Cluster(systems(i)).isTerminated should ===(true) + Cluster(system).state.members.map(_.address) should not contain leaveAddress + systems.foreach { sys ⇒ + if (!Cluster(sys).isTerminated) + Cluster(sys).state.members.map(_.address) should not contain leaveAddress + } + } + } + + println(s"Singleton $i stopped in ${stoppedDuration.toMillis} ms, started in ${startedDuration.toMillis} ms, " + + s"diff ${(startedDuration - stoppedDuration).toMillis} ms") + + (stoppedDuration, startedDuration) + } + + durations.zipWithIndex.foreach { + case ((stoppedDuration, startedDuration), i) ⇒ + println(s"Singleton $i stopped in ${stoppedDuration.toMillis} ms, started in ${startedDuration.toMillis} ms, " + + s"diff ${(startedDuration - stoppedDuration).toMillis} ms") + } + + } + } + + override def afterTermination(): Unit = { + systems.foreach(shutdown(_)) + } +} + diff --git a/akka-cluster-tools/src/test/scala/akka/cluster/singleton/ClusterSingletonRestart2Spec.scala b/akka-cluster-tools/src/test/scala/akka/cluster/singleton/ClusterSingletonRestart2Spec.scala index 93e9828725..86477c7b6d 100644 --- a/akka-cluster-tools/src/test/scala/akka/cluster/singleton/ClusterSingletonRestart2Spec.scala +++ b/akka-cluster-tools/src/test/scala/akka/cluster/singleton/ClusterSingletonRestart2Spec.scala @@ -32,6 +32,7 @@ class ClusterSingletonRestart2Spec extends AkkaSpec(""" akka.cluster.roles = [singleton] akka.actor.provider = akka.cluster.ClusterActorRefProvider akka.cluster.auto-down-unreachable-after = 2s + akka.cluster.singleton.min-number-of-hand-over-retries = 5 akka.remote { netty.tcp { hostname = "127.0.0.1" diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala index 8e253a5894..274b17efb1 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala @@ -992,7 +992,8 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef, joinConfigCompatCh // ExitingCompleted will be received via CoordinatedShutdown to continue // the leaving process. Meanwhile the gossip state is not marked as seen. exitingTasksInProgress = true - logInfo("Exiting, starting coordinated shutdown") + if (coordShutdown.shutdownReason().isEmpty) + logInfo("Exiting, starting coordinated shutdown") selfExiting.trySuccess(Done) coordShutdown.run(CoordinatedShutdown.ClusterLeavingReason) } @@ -1192,7 +1193,8 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef, joinConfigCompatCh // ExitingCompleted will be received via CoordinatedShutdown to continue // the leaving process. Meanwhile the gossip state is not marked as seen. exitingTasksInProgress = true - logInfo("Exiting (leader), starting coordinated shutdown") + if (coordShutdown.shutdownReason().isEmpty) + logInfo("Exiting (leader), starting coordinated shutdown") selfExiting.trySuccess(Done) coordShutdown.run(CoordinatedShutdown.ClusterLeavingReason) } @@ -1221,6 +1223,23 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef, joinConfigCompatCh if (pruned ne latestGossip) { updateLatestGossip(pruned) publishMembershipState() + gossipExitingMembersToOldest(changedMembers.filter(_.status == Exiting)) + } + } + + /** + * Gossip the Exiting change to the two oldest nodes for quick dissemination to potential Singleton nodes + */ + private def gossipExitingMembersToOldest(exitingMembers: Set[Member]): Unit = { + val targets = membershipState.gossipTargetsForExitingMembers(exitingMembers) + if (targets.nonEmpty) { + + if (log.isDebugEnabled) + log.debug( + "Cluster Node [{}] - Gossip exiting members [{}] to the two oldest (per role) [{}] (singleton optimization).", + selfAddress, exitingMembers.mkString(", "), targets.mkString(", ")) + + targets.foreach(m ⇒ gossipTo(m.uniqueAddress)) } } diff --git a/akka-cluster/src/main/scala/akka/cluster/MembershipState.scala b/akka-cluster/src/main/scala/akka/cluster/MembershipState.scala index c04a86855b..b5e82380d6 100644 --- a/akka-cluster/src/main/scala/akka/cluster/MembershipState.scala +++ b/akka-cluster/src/main/scala/akka/cluster/MembershipState.scala @@ -180,6 +180,32 @@ import scala.util.Random mbrs.maxBy(m ⇒ if (m.upNumber == Int.MaxValue) 0 else m.upNumber) } + /** + * The Exiting change is gossiped to the two oldest nodes for quick dissemination to potential Singleton nodes + */ + def gossipTargetsForExitingMembers(exitingMembers: Set[Member]): Set[Member] = { + if (exitingMembers.nonEmpty) { + val roles = exitingMembers.flatten(_.roles).filterNot(_.startsWith(ClusterSettings.DcRolePrefix)) + val membersSortedByAge = latestGossip.members.toList.filter(_.dataCenter == selfDc).sorted(Member.ageOrdering) + var targets = Set.empty[Member] + if (membersSortedByAge.nonEmpty) { + targets += membersSortedByAge.head // oldest of all nodes (in DC) + if (membersSortedByAge.tail.nonEmpty) + targets += membersSortedByAge.tail.head // second oldest of all nodes (in DC) + roles.foreach { role ⇒ + membersSortedByAge.find(_.hasRole(role)).foreach { first ⇒ + targets += first // oldest with the role (in DC) + membersSortedByAge.find(m ⇒ m != first && m.hasRole(role)).foreach { next ⇒ + targets += next // second oldest with the role (in DC) + } + } + } + } + targets + } else + Set.empty + } + } /** diff --git a/akka-cluster/src/test/scala/akka/cluster/MembershipStateSpec.scala b/akka-cluster/src/test/scala/akka/cluster/MembershipStateSpec.scala index 52e762e877..8ca47e71fc 100644 --- a/akka-cluster/src/test/scala/akka/cluster/MembershipStateSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/MembershipStateSpec.scala @@ -40,5 +40,51 @@ class MembershipStateSpec extends WordSpec with Matchers { "dc-b" -> SortedSet(bOldest, b1) )) } + + "find two oldest as targets for Exiting change" in { + val a1Exiting = a1.copy(MemberStatus.Leaving).copy(MemberStatus.Exiting) + val gossip = Gossip(SortedSet(a1Exiting, a2, a3, a4)) + val membershipState = MembershipState( + gossip, + a1.uniqueAddress, + "dc-a", + 2 + ) + + membershipState.gossipTargetsForExitingMembers(Set(a1Exiting)) should ===(Set(a1Exiting, a2)) + } + + "find two oldest in DC as targets for Exiting change" in { + val a4Exiting = a4.copy(MemberStatus.Leaving).copy(MemberStatus.Exiting) + val gossip = Gossip(SortedSet(a2, a3, a4Exiting, b1, b2)) + val membershipState = MembershipState( + gossip, + a1.uniqueAddress, + "dc-a", + 2 + ) + + membershipState.gossipTargetsForExitingMembers(Set(a4Exiting)) should ===(Set(a2, a3)) + } + + "find two oldest per role as targets for Exiting change" in { + val a5 = TestMember(Address("akka.tcp", "sys", "a5", 2552), MemberStatus.Exiting, roles = Set("role1", "role2"), upNumber = 5, dataCenter = "dc-a") + val a6 = TestMember(Address("akka.tcp", "sys", "a6", 2552), MemberStatus.Exiting, roles = Set("role1", "role3"), upNumber = 6, dataCenter = "dc-a") + val a7 = TestMember(Address("akka.tcp", "sys", "a7", 2552), MemberStatus.Exiting, roles = Set("role1"), upNumber = 7, dataCenter = "dc-a") + val a8 = TestMember(Address("akka.tcp", "sys", "a8", 2552), MemberStatus.Exiting, roles = Set("role1"), upNumber = 8, dataCenter = "dc-a") + val a9 = TestMember(Address("akka.tcp", "sys", "a9", 2552), MemberStatus.Exiting, roles = Set("role2"), upNumber = 9, dataCenter = "dc-a") + val b5 = TestMember(Address("akka.tcp", "sys", "b5", 2552), MemberStatus.Exiting, roles = Set("role1"), upNumber = 5, dataCenter = "dc-b") + val b6 = TestMember(Address("akka.tcp", "sys", "b6", 2552), MemberStatus.Exiting, roles = Set("role2"), upNumber = 6, dataCenter = "dc-b") + val theExiting = Set(a5, a6) + val gossip = Gossip(SortedSet(a1, a2, a3, a4, a5, a6, a7, a8, a9, b1, b2, b3, b5, b6)) + val membershipState = MembershipState( + gossip, + a1.uniqueAddress, + "dc-a", + 2 + ) + + membershipState.gossipTargetsForExitingMembers(theExiting) should ===(Set(a1, a2, a5, a6, a9)) + } } }