* Testing of singleton leaving * gossip optimization, exiting change to two oldest per role * hardening ClusterSingletonManagerIsStuck restart, increase ClusterSingletonManagerIsStuck
This commit is contained in:
parent
f6dbb91175
commit
90bc4cfa3e
7 changed files with 268 additions and 9 deletions
|
|
@ -175,7 +175,17 @@ akka.cluster.singleton {
|
|||
# The number of retries are derived from hand-over-retry-interval and
|
||||
# akka.cluster.down-removal-margin (or ClusterSingletonManagerSettings.removalMargin),
|
||||
# but it will never be less than this property.
|
||||
min-number-of-hand-over-retries = 10
|
||||
# After the hand over retries and it's still not able to exchange the hand over messages
|
||||
# with the previous oldest it will restart itself by throwing ClusterSingletonManagerIsStuck,
|
||||
# to start from a clean state. After that it will still not start the singleton instance
|
||||
# until the previous oldest node has been removed from the cluster.
|
||||
# On the other side, on the previous oldest node, the same number of retries - 3 are used
|
||||
# and after that the singleton instance is stopped.
|
||||
# For large clusters it might be necessary to increase this to avoid too early timeouts while
|
||||
# gossip dissemination of the Leaving to Exiting phase occurs. For normal leaving scenarios
|
||||
# it will not be a quicker hand over by reducing this value, but in extreme failure scenarios
|
||||
# the recovery might be faster.
|
||||
min-number-of-hand-over-retries = 15
|
||||
}
|
||||
# //#singleton-config
|
||||
|
||||
|
|
|
|||
|
|
@ -282,8 +282,14 @@ object ClusterSingletonManager {
|
|||
|
||||
def handleInitial(state: CurrentClusterState): Unit = {
|
||||
membersByAge = immutable.SortedSet.empty(ageOrdering) union state.members.filter(m ⇒
|
||||
(m.status == MemberStatus.Up || m.status == MemberStatus.Leaving) && matchingRole(m))
|
||||
val safeToBeOldest = !state.members.exists { m ⇒ (m.status == MemberStatus.Down || m.status == MemberStatus.Exiting) }
|
||||
m.status == MemberStatus.Up && matchingRole(m))
|
||||
// If there is some removal in progress of an older node it's not safe to immediately become oldest,
|
||||
// removal of younger nodes doesn't matter. Note that it can also be started via restart after
|
||||
// ClusterSingletonManagerIsStuck.
|
||||
val selfUpNumber = state.members.collectFirst { case m if m.uniqueAddress == cluster.selfUniqueAddress ⇒ m.upNumber }.getOrElse(Int.MaxValue)
|
||||
val safeToBeOldest = !state.members.exists { m ⇒
|
||||
m.upNumber <= selfUpNumber && matchingRole(m) && (m.status == MemberStatus.Down || m.status == MemberStatus.Exiting || m.status == MemberStatus.Leaving)
|
||||
}
|
||||
val initial = InitialOldestState(membersByAge.headOption.map(_.uniqueAddress), safeToBeOldest)
|
||||
changes :+= initial
|
||||
}
|
||||
|
|
@ -658,7 +664,7 @@ class ClusterSingletonManager(
|
|||
stop()
|
||||
else
|
||||
throw new ClusterSingletonManagerIsStuck(
|
||||
s"Becoming singleton oldest was stuck because previous oldest [${previousOldestOption}] is unresponsive")
|
||||
s"Becoming singleton oldest was stuck because previous oldest [$previousOldestOption] is unresponsive")
|
||||
}
|
||||
|
||||
def scheduleDelayedMemberRemoved(m: Member): Unit = {
|
||||
|
|
@ -708,6 +714,7 @@ class ClusterSingletonManager(
|
|||
stay
|
||||
|
||||
case Event(Terminated(ref), d @ OldestData(singleton, _)) if ref == singleton ⇒
|
||||
logInfo("Singleton actor [{}] was terminated", singleton.path)
|
||||
stay using d.copy(singletonTerminated = true)
|
||||
|
||||
case Event(SelfExiting, _) ⇒
|
||||
|
|
@ -723,12 +730,15 @@ class ClusterSingletonManager(
|
|||
if (singletonTerminated) stop()
|
||||
else gotoStopping(singleton)
|
||||
} else if (count <= maxTakeOverRetries) {
|
||||
if (maxTakeOverRetries - count <= 3)
|
||||
logInfo("Retry [{}], sending TakeOverFromMe to [{}]", count, newOldestOption.map(_.address))
|
||||
else
|
||||
log.debug("Retry [{}], sending TakeOverFromMe to [{}]", count, newOldestOption.map(_.address))
|
||||
newOldestOption.foreach(node ⇒ peer(node.address) ! TakeOverFromMe)
|
||||
setTimer(TakeOverRetryTimer, TakeOverRetry(count + 1), handOverRetryInterval, repeat = false)
|
||||
stay
|
||||
} else
|
||||
throw new ClusterSingletonManagerIsStuck(s"Expected hand-over to [${newOldestOption}] never occurred")
|
||||
throw new ClusterSingletonManagerIsStuck(s"Expected hand-over to [$newOldestOption] never occurred")
|
||||
|
||||
case Event(HandOverToMe, WasOldestData(singleton, singletonTerminated, _)) ⇒
|
||||
gotoHandingOver(singleton, singletonTerminated, Some(sender()))
|
||||
|
|
@ -742,6 +752,7 @@ class ClusterSingletonManager(
|
|||
gotoHandingOver(singleton, singletonTerminated, None)
|
||||
|
||||
case Event(Terminated(ref), d @ WasOldestData(singleton, _, _)) if ref == singleton ⇒
|
||||
logInfo("Singleton actor [{}] was terminated", singleton.path)
|
||||
stay using d.copy(singletonTerminated = true)
|
||||
|
||||
case Event(SelfExiting, _) ⇒
|
||||
|
|
@ -757,6 +768,7 @@ class ClusterSingletonManager(
|
|||
handOverDone(handOverTo)
|
||||
} else {
|
||||
handOverTo foreach { _ ! HandOverInProgress }
|
||||
logInfo("Singleton manager stopping singleton actor [{}]", singleton.path)
|
||||
singleton ! terminationMessage
|
||||
goto(HandingOver) using HandingOverData(singleton, handOverTo)
|
||||
}
|
||||
|
|
@ -793,12 +805,14 @@ class ClusterSingletonManager(
|
|||
}
|
||||
|
||||
def gotoStopping(singleton: ActorRef): State = {
|
||||
logInfo("Singleton manager starting singleton actor [{}]", singleton.path)
|
||||
singleton ! terminationMessage
|
||||
goto(Stopping) using StoppingData(singleton)
|
||||
}
|
||||
|
||||
when(Stopping) {
|
||||
case (Event(Terminated(ref), StoppingData(singleton))) if ref == singleton ⇒
|
||||
logInfo("Singleton actor [{}] was terminated", singleton.path)
|
||||
stop()
|
||||
}
|
||||
|
||||
|
|
@ -834,7 +848,7 @@ class ClusterSingletonManager(
|
|||
addRemoved(m.uniqueAddress)
|
||||
stay
|
||||
case Event(TakeOverFromMe, _) ⇒
|
||||
logInfo("Ignoring TakeOver request in [{}] from [{}].", stateName, sender().path.address)
|
||||
log.debug("Ignoring TakeOver request in [{}] from [{}].", stateName, sender().path.address)
|
||||
stay
|
||||
case Event(Cleanup, _) ⇒
|
||||
cleanupOverdueNotMemberAnyMore()
|
||||
|
|
|
|||
|
|
@ -0,0 +1,143 @@
|
|||
/*
|
||||
* Copyright (C) 2009-2018 Lightbend Inc. <https://www.lightbend.com>
|
||||
*/
|
||||
|
||||
package akka.cluster.singleton
|
||||
|
||||
import scala.concurrent.duration._
|
||||
|
||||
import akka.actor.Actor
|
||||
import akka.actor.ActorRef
|
||||
import akka.actor.ActorSystem
|
||||
import akka.actor.CoordinatedShutdown
|
||||
import akka.actor.PoisonPill
|
||||
import akka.actor.Props
|
||||
import akka.cluster.Cluster
|
||||
import akka.cluster.MemberStatus
|
||||
import akka.cluster.singleton.ClusterSingletonLeavingSpeedSpec.TheSingleton
|
||||
import akka.testkit.AkkaSpec
|
||||
import akka.testkit.TestActors
|
||||
import akka.testkit.TestProbe
|
||||
import com.typesafe.config.ConfigFactory
|
||||
|
||||
object ClusterSingletonLeavingSpeedSpec {
|
||||
|
||||
object TheSingleton {
|
||||
def props(probe: ActorRef): Props =
|
||||
Props(new TheSingleton(probe))
|
||||
}
|
||||
|
||||
class TheSingleton(probe: ActorRef) extends Actor {
|
||||
probe ! "started"
|
||||
|
||||
override def postStop(): Unit = {
|
||||
probe ! "stopped"
|
||||
}
|
||||
|
||||
override def receive: Receive = {
|
||||
case msg ⇒ sender() ! msg
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
class ClusterSingletonLeavingSpeedSpec extends AkkaSpec("""
|
||||
akka.loglevel = DEBUG
|
||||
akka.actor.provider = akka.cluster.ClusterActorRefProvider
|
||||
akka.cluster.auto-down-unreachable-after = 2s
|
||||
|
||||
# With 10 systems and setting min-number-of-hand-over-retries to 5 and gossip-interval to 2s it's possible to
|
||||
# reproduce the ClusterSingletonManagerIsStuck and slow hand over in issue #25639
|
||||
# akka.cluster.singleton.min-number-of-hand-over-retries = 5
|
||||
# akka.cluster.gossip-interval = 2s
|
||||
|
||||
akka.remote {
|
||||
netty.tcp {
|
||||
hostname = "127.0.0.1"
|
||||
port = 0
|
||||
}
|
||||
artery.canonical {
|
||||
hostname = "127.0.0.1"
|
||||
port = 0
|
||||
}
|
||||
}
|
||||
""") {
|
||||
|
||||
private val systems = (1 to 3).map { n ⇒
|
||||
val roleConfig = ConfigFactory.parseString(s"""akka.cluster.roles=[role-${n % 3}]""")
|
||||
ActorSystem(system.name, roleConfig.withFallback(system.settings.config))
|
||||
}
|
||||
private val probes = systems.map(TestProbe()(_))
|
||||
|
||||
override def expectedTestDuration: FiniteDuration = 10.minutes
|
||||
|
||||
def join(from: ActorSystem, to: ActorSystem, probe: ActorRef): Unit = {
|
||||
from.actorOf(
|
||||
ClusterSingletonManager.props(
|
||||
singletonProps = TheSingleton.props(probe),
|
||||
terminationMessage = PoisonPill,
|
||||
settings = ClusterSingletonManagerSettings(from)),
|
||||
name = "echo")
|
||||
|
||||
Cluster(from).join(Cluster(to).selfAddress)
|
||||
within(15.seconds) {
|
||||
awaitAssert {
|
||||
Cluster(from).state.members.map(_.uniqueAddress) should contain(Cluster(from).selfUniqueAddress)
|
||||
Cluster(from).state.members.map(_.status) should ===(Set(MemberStatus.Up))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
"ClusterSingleton that is leaving" must {
|
||||
"join cluster" in {
|
||||
systems.indices.foreach { i ⇒
|
||||
join(systems(i), systems.head, probes(i).ref)
|
||||
}
|
||||
// leader is most likely on system, lowest port
|
||||
join(system, systems.head, testActor)
|
||||
|
||||
probes(0).expectMsg("started")
|
||||
}
|
||||
|
||||
"quickly hand-over to next oldest" in {
|
||||
|
||||
val durations = systems.indices.take(1).map { i ⇒
|
||||
val t0 = System.nanoTime()
|
||||
val leaveAddress = Cluster(systems(i)).selfAddress
|
||||
CoordinatedShutdown(systems(i)).run(CoordinatedShutdown.ClusterLeavingReason)
|
||||
probes(i).expectMsg(10.seconds, "stopped")
|
||||
val stoppedDuration = (System.nanoTime() - t0).nanos
|
||||
val startedProbe = if (i == systems.size - 1) this else probes(i + 1)
|
||||
startedProbe.expectMsg(30.seconds, "started")
|
||||
val startedDuration = (System.nanoTime() - t0).nanos
|
||||
|
||||
within(15.seconds) {
|
||||
awaitAssert {
|
||||
Cluster(systems(i)).isTerminated should ===(true)
|
||||
Cluster(system).state.members.map(_.address) should not contain leaveAddress
|
||||
systems.foreach { sys ⇒
|
||||
if (!Cluster(sys).isTerminated)
|
||||
Cluster(sys).state.members.map(_.address) should not contain leaveAddress
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
println(s"Singleton $i stopped in ${stoppedDuration.toMillis} ms, started in ${startedDuration.toMillis} ms, " +
|
||||
s"diff ${(startedDuration - stoppedDuration).toMillis} ms")
|
||||
|
||||
(stoppedDuration, startedDuration)
|
||||
}
|
||||
|
||||
durations.zipWithIndex.foreach {
|
||||
case ((stoppedDuration, startedDuration), i) ⇒
|
||||
println(s"Singleton $i stopped in ${stoppedDuration.toMillis} ms, started in ${startedDuration.toMillis} ms, " +
|
||||
s"diff ${(startedDuration - stoppedDuration).toMillis} ms")
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
override def afterTermination(): Unit = {
|
||||
systems.foreach(shutdown(_))
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -32,6 +32,7 @@ class ClusterSingletonRestart2Spec extends AkkaSpec("""
|
|||
akka.cluster.roles = [singleton]
|
||||
akka.actor.provider = akka.cluster.ClusterActorRefProvider
|
||||
akka.cluster.auto-down-unreachable-after = 2s
|
||||
akka.cluster.singleton.min-number-of-hand-over-retries = 5
|
||||
akka.remote {
|
||||
netty.tcp {
|
||||
hostname = "127.0.0.1"
|
||||
|
|
|
|||
|
|
@ -992,6 +992,7 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef, joinConfigCompatCh
|
|||
// ExitingCompleted will be received via CoordinatedShutdown to continue
|
||||
// the leaving process. Meanwhile the gossip state is not marked as seen.
|
||||
exitingTasksInProgress = true
|
||||
if (coordShutdown.shutdownReason().isEmpty)
|
||||
logInfo("Exiting, starting coordinated shutdown")
|
||||
selfExiting.trySuccess(Done)
|
||||
coordShutdown.run(CoordinatedShutdown.ClusterLeavingReason)
|
||||
|
|
@ -1192,6 +1193,7 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef, joinConfigCompatCh
|
|||
// ExitingCompleted will be received via CoordinatedShutdown to continue
|
||||
// the leaving process. Meanwhile the gossip state is not marked as seen.
|
||||
exitingTasksInProgress = true
|
||||
if (coordShutdown.shutdownReason().isEmpty)
|
||||
logInfo("Exiting (leader), starting coordinated shutdown")
|
||||
selfExiting.trySuccess(Done)
|
||||
coordShutdown.run(CoordinatedShutdown.ClusterLeavingReason)
|
||||
|
|
@ -1221,6 +1223,23 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef, joinConfigCompatCh
|
|||
if (pruned ne latestGossip) {
|
||||
updateLatestGossip(pruned)
|
||||
publishMembershipState()
|
||||
gossipExitingMembersToOldest(changedMembers.filter(_.status == Exiting))
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Gossip the Exiting change to the two oldest nodes for quick dissemination to potential Singleton nodes
|
||||
*/
|
||||
private def gossipExitingMembersToOldest(exitingMembers: Set[Member]): Unit = {
|
||||
val targets = membershipState.gossipTargetsForExitingMembers(exitingMembers)
|
||||
if (targets.nonEmpty) {
|
||||
|
||||
if (log.isDebugEnabled)
|
||||
log.debug(
|
||||
"Cluster Node [{}] - Gossip exiting members [{}] to the two oldest (per role) [{}] (singleton optimization).",
|
||||
selfAddress, exitingMembers.mkString(", "), targets.mkString(", "))
|
||||
|
||||
targets.foreach(m ⇒ gossipTo(m.uniqueAddress))
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -180,6 +180,32 @@ import scala.util.Random
|
|||
mbrs.maxBy(m ⇒ if (m.upNumber == Int.MaxValue) 0 else m.upNumber)
|
||||
}
|
||||
|
||||
/**
|
||||
* The Exiting change is gossiped to the two oldest nodes for quick dissemination to potential Singleton nodes
|
||||
*/
|
||||
def gossipTargetsForExitingMembers(exitingMembers: Set[Member]): Set[Member] = {
|
||||
if (exitingMembers.nonEmpty) {
|
||||
val roles = exitingMembers.flatten(_.roles).filterNot(_.startsWith(ClusterSettings.DcRolePrefix))
|
||||
val membersSortedByAge = latestGossip.members.toList.filter(_.dataCenter == selfDc).sorted(Member.ageOrdering)
|
||||
var targets = Set.empty[Member]
|
||||
if (membersSortedByAge.nonEmpty) {
|
||||
targets += membersSortedByAge.head // oldest of all nodes (in DC)
|
||||
if (membersSortedByAge.tail.nonEmpty)
|
||||
targets += membersSortedByAge.tail.head // second oldest of all nodes (in DC)
|
||||
roles.foreach { role ⇒
|
||||
membersSortedByAge.find(_.hasRole(role)).foreach { first ⇒
|
||||
targets += first // oldest with the role (in DC)
|
||||
membersSortedByAge.find(m ⇒ m != first && m.hasRole(role)).foreach { next ⇒
|
||||
targets += next // second oldest with the role (in DC)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
targets
|
||||
} else
|
||||
Set.empty
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
|||
|
|
@ -40,5 +40,51 @@ class MembershipStateSpec extends WordSpec with Matchers {
|
|||
"dc-b" -> SortedSet(bOldest, b1)
|
||||
))
|
||||
}
|
||||
|
||||
"find two oldest as targets for Exiting change" in {
|
||||
val a1Exiting = a1.copy(MemberStatus.Leaving).copy(MemberStatus.Exiting)
|
||||
val gossip = Gossip(SortedSet(a1Exiting, a2, a3, a4))
|
||||
val membershipState = MembershipState(
|
||||
gossip,
|
||||
a1.uniqueAddress,
|
||||
"dc-a",
|
||||
2
|
||||
)
|
||||
|
||||
membershipState.gossipTargetsForExitingMembers(Set(a1Exiting)) should ===(Set(a1Exiting, a2))
|
||||
}
|
||||
|
||||
"find two oldest in DC as targets for Exiting change" in {
|
||||
val a4Exiting = a4.copy(MemberStatus.Leaving).copy(MemberStatus.Exiting)
|
||||
val gossip = Gossip(SortedSet(a2, a3, a4Exiting, b1, b2))
|
||||
val membershipState = MembershipState(
|
||||
gossip,
|
||||
a1.uniqueAddress,
|
||||
"dc-a",
|
||||
2
|
||||
)
|
||||
|
||||
membershipState.gossipTargetsForExitingMembers(Set(a4Exiting)) should ===(Set(a2, a3))
|
||||
}
|
||||
|
||||
"find two oldest per role as targets for Exiting change" in {
|
||||
val a5 = TestMember(Address("akka.tcp", "sys", "a5", 2552), MemberStatus.Exiting, roles = Set("role1", "role2"), upNumber = 5, dataCenter = "dc-a")
|
||||
val a6 = TestMember(Address("akka.tcp", "sys", "a6", 2552), MemberStatus.Exiting, roles = Set("role1", "role3"), upNumber = 6, dataCenter = "dc-a")
|
||||
val a7 = TestMember(Address("akka.tcp", "sys", "a7", 2552), MemberStatus.Exiting, roles = Set("role1"), upNumber = 7, dataCenter = "dc-a")
|
||||
val a8 = TestMember(Address("akka.tcp", "sys", "a8", 2552), MemberStatus.Exiting, roles = Set("role1"), upNumber = 8, dataCenter = "dc-a")
|
||||
val a9 = TestMember(Address("akka.tcp", "sys", "a9", 2552), MemberStatus.Exiting, roles = Set("role2"), upNumber = 9, dataCenter = "dc-a")
|
||||
val b5 = TestMember(Address("akka.tcp", "sys", "b5", 2552), MemberStatus.Exiting, roles = Set("role1"), upNumber = 5, dataCenter = "dc-b")
|
||||
val b6 = TestMember(Address("akka.tcp", "sys", "b6", 2552), MemberStatus.Exiting, roles = Set("role2"), upNumber = 6, dataCenter = "dc-b")
|
||||
val theExiting = Set(a5, a6)
|
||||
val gossip = Gossip(SortedSet(a1, a2, a3, a4, a5, a6, a7, a8, a9, b1, b2, b3, b5, b6))
|
||||
val membershipState = MembershipState(
|
||||
gossip,
|
||||
a1.uniqueAddress,
|
||||
"dc-a",
|
||||
2
|
||||
)
|
||||
|
||||
membershipState.gossipTargetsForExitingMembers(theExiting) should ===(Set(a1, a2, a5, a6, a9))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue