cluster singleton improvements, #20942

* track nodes by UniqueAddress in Cluster Singleton, #20942
* reply with HandOverDone from new incarnation, #20942
* confirm as terminated immediately when new incarnation joins, #20942 instead of waiting for failure detector to mark it as unreachable this will speed-up removal when restarting cluster node with same hostname:port
This commit is contained in:
Patrik Nordwall 2016-08-19 11:56:55 +02:00 committed by Johan Andrén
parent 53d34f3b09
commit 0c4d4c37ba
4 changed files with 207 additions and 69 deletions

View file

@ -23,6 +23,7 @@ import akka.cluster.Member
import akka.cluster.MemberStatus
import akka.AkkaException
import akka.actor.NoSerializationVerificationNeeded
import akka.cluster.UniqueAddress
object ClusterSingletonManagerSettings {
@ -184,11 +185,11 @@ object ClusterSingletonManager {
case object End extends State
case object Uninitialized extends Data
final case class YoungerData(oldestOption: Option[Address]) extends Data
final case class BecomingOldestData(previousOldestOption: Option[Address]) extends Data
final case class YoungerData(oldestOption: Option[UniqueAddress]) extends Data
final case class BecomingOldestData(previousOldestOption: Option[UniqueAddress]) extends Data
final case class OldestData(singleton: ActorRef, singletonTerminated: Boolean = false) extends Data
final case class WasOldestData(singleton: ActorRef, singletonTerminated: Boolean,
newOldestOption: Option[Address]) extends Data
newOldestOption: Option[UniqueAddress]) extends Data
final case class HandingOverData(singleton: ActorRef, handOverTo: Option[ActorRef]) extends Data
case object EndData extends Data
final case class DelayedMemberRemoved(member: Member)
@ -205,9 +206,9 @@ object ClusterSingletonManager {
/**
* The first event, corresponding to CurrentClusterState.
*/
final case class InitialOldestState(oldest: Option[Address], safeToBeOldest: Boolean)
final case class InitialOldestState(oldest: Option[UniqueAddress], safeToBeOldest: Boolean)
final case class OldestChanged(oldest: Option[Address])
final case class OldestChanged(oldest: Option[UniqueAddress])
}
/**
@ -245,14 +246,14 @@ object ClusterSingletonManager {
block()
val after = membersByAge.headOption
if (before != after)
changes :+= OldestChanged(after.map(_.address))
changes :+= OldestChanged(after.map(_.uniqueAddress))
}
def handleInitial(state: CurrentClusterState): Unit = {
membersByAge = immutable.SortedSet.empty(ageOrdering) union state.members.filter(m
(m.status == MemberStatus.Up || m.status == MemberStatus.Leaving) && matchingRole(m))
val safeToBeOldest = !state.members.exists { m (m.status == MemberStatus.Down || m.status == MemberStatus.Exiting) }
val initial = InitialOldestState(membersByAge.headOption.map(_.address), safeToBeOldest)
val initial = InitialOldestState(membersByAge.headOption.map(_.uniqueAddress), safeToBeOldest)
changes :+= initial
}
@ -376,7 +377,7 @@ class ClusterSingletonManager(
import FSM.`→`
val cluster = Cluster(context.system)
val selfAddressOption = Some(cluster.selfAddress)
val selfUniqueAddressOption = Some(cluster.selfUniqueAddress)
import cluster.settings.LogInfo
require(
@ -406,13 +407,13 @@ class ClusterSingletonManager(
var selfExited = false
// keep track of previously removed members
var removed = Map.empty[Address, Deadline]
var removed = Map.empty[UniqueAddress, Deadline]
def addRemoved(address: Address): Unit =
removed += address (Deadline.now + 15.minutes)
def addRemoved(node: UniqueAddress): Unit =
removed += node (Deadline.now + 15.minutes)
def cleanupOverdueNotMemberAnyMore(): Unit = {
removed = removed filter { case (address, deadline) deadline.hasTimeLeft }
removed = removed filter { case (_, deadline) deadline.hasTimeLeft }
}
def logInfo(message: String): Unit =
@ -463,10 +464,10 @@ class ClusterSingletonManager(
case Event(InitialOldestState(oldestOption, safeToBeOldest), _)
oldestChangedReceived = true
if (oldestOption == selfAddressOption && safeToBeOldest)
if (oldestOption == selfUniqueAddressOption && safeToBeOldest)
// oldest immediately
gotoOldest()
else if (oldestOption == selfAddressOption)
else if (oldestOption == selfUniqueAddressOption)
goto(BecomingOldest) using BecomingOldestData(None)
else
goto(Younger) using YoungerData(oldestOption)
@ -475,22 +476,22 @@ class ClusterSingletonManager(
when(Younger) {
case Event(OldestChanged(oldestOption), YoungerData(previousOldestOption))
oldestChangedReceived = true
if (oldestOption == selfAddressOption) {
logInfo("Younger observed OldestChanged: [{} -> myself]", previousOldestOption)
if (oldestOption == selfUniqueAddressOption) {
logInfo("Younger observed OldestChanged: [{} -> myself]", previousOldestOption.map(_.address))
previousOldestOption match {
case None gotoOldest()
case Some(prev) if removed.contains(prev) gotoOldest()
case Some(prev)
peer(prev) ! HandOverToMe
peer(prev.address) ! HandOverToMe
goto(BecomingOldest) using BecomingOldestData(previousOldestOption)
}
} else {
logInfo("Younger observed OldestChanged: [{} -> {}]", previousOldestOption, oldestOption)
logInfo("Younger observed OldestChanged: [{} -> {}]", previousOldestOption.map(_.address), oldestOption.map(_.address))
getNextOldestChanged()
stay using YoungerData(oldestOption)
}
case Event(MemberRemoved(m, _), _) if m.address == cluster.selfAddress
case Event(MemberRemoved(m, _), _) if m.uniqueAddress == cluster.selfUniqueAddress
logInfo("Self removed, stopping ClusterSingletonManager")
stop()
@ -498,11 +499,17 @@ class ClusterSingletonManager(
scheduleDelayedMemberRemoved(m)
stay
case Event(DelayedMemberRemoved(m), YoungerData(Some(previousOldest))) if m.address == previousOldest
case Event(DelayedMemberRemoved(m), YoungerData(Some(previousOldest))) if m.uniqueAddress == previousOldest
logInfo("Previous oldest removed [{}]", m.address)
addRemoved(m.address)
addRemoved(m.uniqueAddress)
// transition when OldestChanged
stay using YoungerData(None)
case Event(HandOverToMe, _)
// this node was probably quickly restarted with same hostname:port,
// confirm that the old singleton instance has been stopped
sender() ! HandOverDone
stay
}
when(BecomingOldest) {
@ -514,16 +521,16 @@ class ClusterSingletonManager(
stay
case Event(HandOverDone, BecomingOldestData(Some(previousOldest)))
if (sender().path.address == previousOldest)
if (sender().path.address == previousOldest.address)
gotoOldest()
else {
logInfo(
"Ignoring HandOverDone in BecomingOldest from [{}]. Expected previous oldest [{}]",
sender().path.address, previousOldest)
sender().path.address, previousOldest.address)
stay
}
case Event(MemberRemoved(m, _), _) if m.address == cluster.selfAddress
case Event(MemberRemoved(m, _), _) if m.uniqueAddress == cluster.selfUniqueAddress
logInfo("Self removed, stopping ClusterSingletonManager")
stop()
@ -531,26 +538,39 @@ class ClusterSingletonManager(
scheduleDelayedMemberRemoved(m)
stay
case Event(DelayedMemberRemoved(m), BecomingOldestData(Some(previousOldest))) if m.address == previousOldest
logInfo("Previous oldest [{}] removed", previousOldest)
addRemoved(m.address)
case Event(DelayedMemberRemoved(m), BecomingOldestData(Some(previousOldest))) if m.uniqueAddress == previousOldest
logInfo("Previous oldest [{}] removed", previousOldest.address)
addRemoved(m.uniqueAddress)
gotoOldest()
case Event(TakeOverFromMe, BecomingOldestData(None))
sender() ! HandOverToMe
stay using BecomingOldestData(Some(sender().path.address))
case Event(TakeOverFromMe, BecomingOldestData(Some(previousOldest)))
if (previousOldest == sender().path.address) sender() ! HandOverToMe
else logInfo(
"Ignoring TakeOver request in BecomingOldest from [{}]. Expected previous oldest [{}]",
sender().path.address, previousOldest)
stay
case Event(TakeOverFromMe, BecomingOldestData(previousOldestOption))
val senderAddress = sender().path.address
// it would have been better to include the UniqueAddress in the TakeOverFromMe message,
// but can't change due to backwards compatibility
cluster.state.members.collectFirst { case m if m.address == senderAddress m.uniqueAddress } match {
case None
// from unknown node, ignore
logInfo(
"Ignoring TakeOver request from unknown node in BecomingOldest from [{}].", senderAddress)
stay
case Some(senderUniqueAddress)
previousOldestOption match {
case Some(previousOldest)
if (previousOldest == senderUniqueAddress) sender() ! HandOverToMe
else logInfo(
"Ignoring TakeOver request in BecomingOldest from [{}]. Expected previous oldest [{}]",
sender().path.address, previousOldest.address)
stay
case None
sender() ! HandOverToMe
stay using BecomingOldestData(Some(senderUniqueAddress))
}
}
case Event(HandOverRetry(count), BecomingOldestData(previousOldestOption))
if (count <= maxHandOverRetries) {
logInfo("Retry [{}], sending HandOverToMe to [{}]", count, previousOldestOption)
previousOldestOption foreach { peer(_) ! HandOverToMe }
logInfo("Retry [{}], sending HandOverToMe to [{}]", count, previousOldestOption.map(_.address))
previousOldestOption.foreach(node peer(node.address) ! HandOverToMe)
setTimer(HandOverRetryTimer, HandOverRetry(count + 1), handOverRetryInterval, repeat = false)
stay()
} else if (previousOldestOption forall removed.contains) {
@ -582,16 +602,16 @@ class ClusterSingletonManager(
when(Oldest) {
case Event(OldestChanged(oldestOption), OldestData(singleton, singletonTerminated))
oldestChangedReceived = true
logInfo("Oldest observed OldestChanged: [{} -> {}]", cluster.selfAddress, oldestOption)
logInfo("Oldest observed OldestChanged: [{} -> {}]", cluster.selfAddress, oldestOption.map(_.address))
oldestOption match {
case Some(a) if a == cluster.selfAddress
case Some(a) if a == cluster.selfUniqueAddress
// already oldest
stay
case Some(a) if !selfExited && removed.contains(a)
gotoHandingOver(singleton, singletonTerminated, None)
case Some(a)
// send TakeOver request in case the new oldest doesn't know previous oldest
peer(a) ! TakeOverFromMe
peer(a.address) ! TakeOverFromMe
setTimer(TakeOverRetryTimer, TakeOverRetry(1), handOverRetryInterval, repeat = false)
goto(WasOldest) using WasOldestData(singleton, singletonTerminated, newOldestOption = Some(a))
case None
@ -610,8 +630,8 @@ class ClusterSingletonManager(
when(WasOldest) {
case Event(TakeOverRetry(count), WasOldestData(_, _, newOldestOption))
if (count <= maxTakeOverRetries) {
logInfo("Retry [{}], sending TakeOverFromMe to [{}]", count, newOldestOption)
newOldestOption foreach { peer(_) ! TakeOverFromMe }
logInfo("Retry [{}], sending TakeOverFromMe to [{}]", count, newOldestOption.map(_.address))
newOldestOption.foreach(node peer(node.address) ! TakeOverFromMe)
setTimer(TakeOverRetryTimer, TakeOverRetry(count + 1), handOverRetryInterval, repeat = false)
stay
} else if (cluster.isTerminated)
@ -622,12 +642,12 @@ class ClusterSingletonManager(
case Event(HandOverToMe, WasOldestData(singleton, singletonTerminated, _))
gotoHandingOver(singleton, singletonTerminated, Some(sender()))
case Event(MemberRemoved(m, _), _) if m.address == cluster.selfAddress && !selfExited
case Event(MemberRemoved(m, _), _) if m.uniqueAddress == cluster.selfUniqueAddress && !selfExited
logInfo("Self removed, stopping ClusterSingletonManager")
stop()
case Event(MemberRemoved(m, _), WasOldestData(singleton, singletonTerminated, Some(newOldest))) if !selfExited && m.address == newOldest
addRemoved(m.address)
case Event(MemberRemoved(m, _), WasOldestData(singleton, singletonTerminated, Some(newOldest))) if !selfExited && m.uniqueAddress == newOldest
addRemoved(m.uniqueAddress)
gotoHandingOver(singleton, singletonTerminated, None)
case Event(Terminated(ref), d @ WasOldestData(singleton, _, _)) if ref == singleton
@ -660,17 +680,15 @@ class ClusterSingletonManager(
val newOldest = handOverTo.map(_.path.address)
logInfo("Singleton terminated, hand-over done [{} -> {}]", cluster.selfAddress, newOldest)
handOverTo foreach { _ ! HandOverDone }
if (removed.contains(cluster.selfAddress)) {
if (removed.contains(cluster.selfUniqueAddress)) {
logInfo("Self removed, stopping ClusterSingletonManager")
stop()
} else if (selfExited)
} else
goto(End) using EndData
else
goto(Younger) using YoungerData(newOldest)
}
when(End) {
case Event(MemberRemoved(m, _), _) if m.address == cluster.selfAddress
case Event(MemberRemoved(m, _), _) if m.uniqueAddress == cluster.selfUniqueAddress
logInfo("Self removed, stopping ClusterSingletonManager")
stop()
}
@ -678,21 +696,21 @@ class ClusterSingletonManager(
whenUnhandled {
case Event(_: CurrentClusterState, _) stay
case Event(MemberExited(m), _)
if (m.address == cluster.selfAddress) {
if (m.uniqueAddress == cluster.selfUniqueAddress) {
selfExited = true
logInfo("Exited [{}]", m.address)
}
stay
case Event(MemberRemoved(m, _), _) if m.address == cluster.selfAddress && !selfExited
case Event(MemberRemoved(m, _), _) if m.uniqueAddress == cluster.selfUniqueAddress && !selfExited
logInfo("Self removed, stopping ClusterSingletonManager")
stop()
case Event(MemberRemoved(m, _), _)
if (!selfExited) logInfo("Member removed [{}]", m.address)
addRemoved(m.address)
addRemoved(m.uniqueAddress)
stay
case Event(DelayedMemberRemoved(m), _)
if (!selfExited) logInfo("Member removed [{}]", m.address)
addRemoved(m.address)
addRemoved(m.uniqueAddress)
stay
case Event(TakeOverFromMe, _)
logInfo("Ignoring TakeOver request in [{}] from [{}].", stateName, sender().path.address)
@ -720,7 +738,7 @@ class ClusterSingletonManager(
}
onTransition {
case _ (Younger | End) if removed.contains(cluster.selfAddress)
case _ (Younger | End) if removed.contains(cluster.selfUniqueAddress)
logInfo("Self removed, stopping ClusterSingletonManager")
// note that FSM.stop() can't be used in onTransition
context.stop(self)