Making some changes in the cluster code to avoid doing work that isn't needed

This commit is contained in:
Viktor Klang 2012-07-07 20:55:02 +02:00
parent fcf0d9ad71
commit f1375b6bfb
3 changed files with 65 additions and 110 deletions

View file

@ -508,8 +508,7 @@ private[akka] class ActorCell(
} }
private def isTerminating = childrenRefs match { private def isTerminating = childrenRefs match {
case TerminatingChildrenContainer(_, _, Termination) true case TerminatedChildrenContainer | TerminatingChildrenContainer(_, _, Termination) true
case TerminatedChildrenContainer true
case _ false case _ false
} }
private def isNormal = childrenRefs match { private def isNormal = childrenRefs match {
@ -981,11 +980,10 @@ private[akka] class ActorCell(
childrenRefs match { childrenRefs match {
case tc @ TerminatingChildrenContainer(_, _, reason) case tc @ TerminatingChildrenContainer(_, _, reason)
val n = removeChild(child) val n = removeChild(child)
actor.supervisorStrategy.handleChildTerminated(this, child, children)
if (!n.isInstanceOf[TerminatingChildrenContainer]) reason match { if (!n.isInstanceOf[TerminatingChildrenContainer]) reason match {
case Recreation(cause) doRecreate(cause, actor) // doRecreate since this is the continuation of "recreate" case Recreation(cause) doRecreate(cause, actor) // doRecreate since this is the continuation of "recreate"
case Termination doTerminate() case Termination doTerminate()
case _ case _ actor.supervisorStrategy.handleChildTerminated(this, child, children)
} }
case _ case _
removeChild(child) removeChild(child)

View file

@ -258,14 +258,11 @@ private[cluster] final class ClusterCoreDaemon(environment: ClusterEnvironment)
def joinSeedNode(): Unit = { def joinSeedNode(): Unit = {
val seedRoutees = environment.seedNodes.collect { case a if a != selfAddress self.path.toStringWithAddress(a) } val seedRoutees = environment.seedNodes.collect { case a if a != selfAddress self.path.toStringWithAddress(a) }
if (seedRoutees.isEmpty) { if (seedRoutees.isEmpty) join(selfAddress)
join(selfAddress) else {
} else {
implicit val within = Timeout(SeedNodeTimeout) implicit val within = Timeout(SeedNodeTimeout)
val seedRouter = context.actorOf( val seedRouter = context.actorOf(Props.empty.withRouter(ScatterGatherFirstCompletedRouter(routees = seedRoutees, within = within.duration)))
Props.empty.withRouter(ScatterGatherFirstCompletedRouter( seedRouter ! InitJoin
routees = seedRoutees, within = within.duration)))
seedRouter ? InitJoin pipeTo self
seedRouter ! PoisonPill seedRouter ! PoisonPill
} }
} }
@ -289,8 +286,7 @@ private[cluster] final class ClusterCoreDaemon(environment: ClusterEnvironment)
notifyListeners(localGossip) notifyListeners(localGossip)
val command = ClusterUserAction.Join(selfAddress) coreSender ! SendClusterMessage(address, ClusterUserAction.Join(selfAddress))
coreSender ! SendClusterMessage(address, command)
} }
/** /**
@ -395,9 +391,9 @@ private[cluster] final class ClusterCoreDaemon(environment: ClusterEnvironment)
val localUnreachableMembers = localOverview.unreachable val localUnreachableMembers = localOverview.unreachable
// 1. check if the node to DOWN is in the 'members' set // 1. check if the node to DOWN is in the 'members' set
val downedMember: Option[Member] = localMembers.collectFirst { val downedMember: Option[Member] =
case m if m.address == address m.copy(status = Down) localMembers.collectFirst { case m if m.address == address m.copy(status = Down) }
}
val newMembers = downedMember match { val newMembers = downedMember match {
case Some(m) case Some(m)
log.info("Cluster Node [{}] - Marking node [{}] as DOWN", selfAddress, m.address) log.info("Cluster Node [{}] - Marking node [{}] as DOWN", selfAddress, m.address)
@ -419,9 +415,7 @@ private[cluster] final class ClusterCoreDaemon(environment: ClusterEnvironment)
val newUnreachablePlusNewlyDownedMembers = newUnreachableMembers ++ downedMember val newUnreachablePlusNewlyDownedMembers = newUnreachableMembers ++ downedMember
// 4. remove nodes marked as DOWN from the 'seen' table // 4. remove nodes marked as DOWN from the 'seen' table
val newSeen = localSeen -- newUnreachablePlusNewlyDownedMembers.collect { val newSeen = localSeen -- newUnreachablePlusNewlyDownedMembers.collect { case m if m.status == Down m.address }
case m if m.status == Down m.address
}
// update gossip overview // update gossip overview
val newOverview = localOverview copy (seen = newSeen, unreachable = newUnreachablePlusNewlyDownedMembers) val newOverview = localOverview copy (seen = newSeen, unreachable = newUnreachablePlusNewlyDownedMembers)
@ -488,45 +482,29 @@ private[cluster] final class ClusterCoreDaemon(environment: ClusterEnvironment)
stats = stats.incrementMergeConflictCount stats = stats.incrementMergeConflictCount
val rate = mergeRate(stats.mergeConflictCount) val rate = mergeRate(stats.mergeConflictCount)
if (rate <= MaxGossipMergeRate) {
coreSender ! SendClusterMessage( if (rate <= MaxGossipMergeRate)
to = localGossip.leader.get, coreSender ! SendClusterMessage(to = localGossip.leader.get, msg = GossipMergeConflict(GossipEnvelope(selfAddress, localGossip), envelope))
msg = GossipMergeConflict(GossipEnvelope(selfAddress, localGossip), envelope)) else
} else {
log.debug("Skipping gossip merge conflict due to rate [{}] / s ", rate) log.debug("Skipping gossip merge conflict due to rate [{}] / s ", rate)
}
} else { } else {
val winningGossip = val winningGossip =
if (conflict) (remoteGossip merge localGossip) :+ vclockNode // conflicting versions, merge, and new version
if (conflict) { else if (remoteGossip.version < localGossip.version) localGossip // local gossip is newer
// conflicting versions, merge, and new version else remoteGossip // remote gossip is newer
val mergedGossip = remoteGossip merge localGossip
mergedGossip :+ vclockNode
} else if (remoteGossip.version < localGossip.version) {
// local gossip is newer
localGossip
} else {
// remote gossip is newer
remoteGossip
}
val newJoinInProgress = val newJoinInProgress =
if (joinInProgress.isEmpty) joinInProgress if (joinInProgress.isEmpty) joinInProgress
else joinInProgress -- else joinInProgress -- winningGossip.members.map(_.address) -- winningGossip.overview.unreachable.map(_.address)
winningGossip.members.map(_.address) --
winningGossip.overview.unreachable.map(_.address)
latestGossip = winningGossip seen selfAddress latestGossip = winningGossip seen selfAddress
joinInProgress = newJoinInProgress joinInProgress = newJoinInProgress
// for all new joining nodes we remove them from the failure detector // for all new joining nodes we remove them from the failure detector
(latestGossip.members -- localGossip.members).filter(_.status == Joining).foreach { node (latestGossip.members -- localGossip.members).foreach {
failureDetector.remove(node.address) node if (node.status == Joining) failureDetector.remove(node.address)
} }
log.debug("Cluster Node [{}] - Receiving gossip from [{}]", selfAddress, from) log.debug("Cluster Node [{}] - Receiving gossip from [{}]", selfAddress, from)
@ -563,30 +541,24 @@ private[cluster] final class ClusterCoreDaemon(environment: ClusterEnvironment)
if (!isSingletonCluster && isAvailable) { if (!isSingletonCluster && isAvailable) {
val localGossip = latestGossip val localGossip = latestGossip
// important to not accidentally use `map` of the SortedSet, since the original order is not preserved
val localMembers = localGossip.members.toIndexedSeq
val localMembersSize = localMembers.size
val localMemberAddresses = localMembers map { _.address }
val localUnreachableMembers = localGossip.overview.unreachable.toIndexedSeq val preferredGossipTargets =
val localUnreachableSize = localUnreachableMembers.size if (ThreadLocalRandom.current.nextDouble() < GossipDifferentViewProbability) { // If it's time to try to gossip to some nodes with a different view
// gossip to a random alive member with preference to a member with older or newer gossip version
// gossip to a random alive member with preference to a member
// with older or newer gossip version
val nodesWithdifferentView = {
val localMemberAddressesSet = localGossip.members map { _.address } val localMemberAddressesSet = localGossip.members map { _.address }
for { val nodesWithDifferentView = for {
(address, version) localGossip.overview.seen (address, version) localGossip.overview.seen
if localMemberAddressesSet contains address if localMemberAddressesSet contains address
if version != localGossip.version if version != localGossip.version
} yield address } yield address
}
val gossipedToAlive =
if (nodesWithdifferentView.nonEmpty && ThreadLocalRandom.current.nextDouble() < GossipDifferentViewProbability)
gossipToRandomNodeOf(nodesWithdifferentView.toIndexedSeq)
else
gossipToRandomNodeOf(localMemberAddresses)
nodesWithDifferentView.toIndexedSeq
} else Vector.empty[Address]
gossipToRandomNodeOf(
if (preferredGossipTargets.nonEmpty) preferredGossipTargets
else localGossip.members.toIndexedSeq[Member].map(_.address) // Fall back to localGossip; important to not accidentally use `map` of the SortedSet, since the original order is not preserved)
)
} }
} }
@ -611,10 +583,11 @@ private[cluster] final class ClusterCoreDaemon(environment: ClusterEnvironment)
} }
// Leader actions are as follows: // Leader actions are as follows:
// 1. Move EXITING => REMOVED -- When all nodes have seen that the node is EXITING (convergence) - remove the nodes from the node ring and seen table // 1. Move JOINING => UP -- When a node joins the cluster
// 2. Move JOINING => UP -- When a node joins the cluster // 2. Move LEAVING => EXITING -- When all partition handoff has completed
// 3. Move LEAVING => EXITING -- When all partition handoff has completed // 3. Non-exiting remain -- When all partition handoff has completed
// 4. Move UNREACHABLE => DOWN -- When the node is in the UNREACHABLE set it can be auto-down by leader // 4. Move EXITING => REMOVED -- When all nodes have seen that the node is EXITING (convergence) - remove the nodes from the node ring and seen table
// 5. Move UNREACHABLE => DOWN -- When the node is in the UNREACHABLE set it can be auto-down by leader
// 5. Store away all stuff needed for the side-effecting processing in 10. // 5. Store away all stuff needed for the side-effecting processing in 10.
// 6. Updating the vclock version for the changes // 6. Updating the vclock version for the changes
// 7. Updating the 'seen' table // 7. Updating the 'seen' table
@ -633,27 +606,15 @@ private[cluster] final class ClusterCoreDaemon(environment: ClusterEnvironment)
if (localGossip.convergence) { if (localGossip.convergence) {
// we have convergence - so we can't have unreachable nodes // we have convergence - so we can't have unreachable nodes
// transform the node member ring - filterNot/map/map // transform the node member ring
val newMembers = val newMembers = localMembers collect {
localMembers filterNot { member // 1. Move JOINING => UP (once all nodes have seen that this node is JOINING e.g. we have a convergence)
// ---------------------- case member if member.status == Joining member copy (status = Up)
// 1. Move EXITING => REMOVED - e.g. remove the nodes from the 'members' set/node ring and seen table // 2. Move LEAVING => EXITING (once we have a convergence on LEAVING *and* if we have a successful partition handoff)
// ---------------------- case member if member.status == Leaving && hasPartionHandoffCompletedSuccessfully member copy (status = Exiting)
member.status == MemberStatus.Exiting // 3. Everyone else that is not Exiting stays as they are
case member if member.status != Exiting member
} map { member // 4. Move EXITING => REMOVED - e.g. remove the nodes from the 'members' set/node ring and seen table
// ----------------------
// 2. Move JOINING => UP (once all nodes have seen that this node is JOINING e.g. we have a convergence)
// ----------------------
if (member.status == Joining) member copy (status = Up)
else member
} map { member
// ----------------------
// 3. Move LEAVING => EXITING (once we have a convergence on LEAVING *and* if we have a successful partition handoff)
// ----------------------
if (member.status == Leaving && hasPartionHandoffCompletedSuccessfully) member copy (status = Exiting)
else member
} }
// ---------------------- // ----------------------
@ -669,7 +630,7 @@ private[cluster] final class ClusterCoreDaemon(environment: ClusterEnvironment)
val (upMembers, newMembers2) = newMembers1 partition (_.status == Joining) val (upMembers, newMembers2) = newMembers1 partition (_.status == Joining)
val (exitingMembers, newMembers3) = newMembers2 partition (_.status == Leaving && hasPartionHandoffCompletedSuccessfully) val exitingMembers = newMembers2 filter (_.status == Leaving && hasPartionHandoffCompletedSuccessfully)
val hasChangedState = removedMembers.nonEmpty || upMembers.nonEmpty || exitingMembers.nonEmpty val hasChangedState = removedMembers.nonEmpty || upMembers.nonEmpty || exitingMembers.nonEmpty
@ -682,22 +643,22 @@ private[cluster] final class ClusterCoreDaemon(environment: ClusterEnvironment)
val newOverview = localOverview copy (seen = newSeen, unreachable = newUnreachableMembers) // update gossip overview val newOverview = localOverview copy (seen = newSeen, unreachable = newUnreachableMembers) // update gossip overview
val newGossip = localGossip copy (members = newMembers, overview = newOverview) // update gossip val newGossip = localGossip copy (members = newMembers, overview = newOverview) // update gossip
(newGossip, hasChangedState, upMembers, exitingMembers, removedMembers, Set.empty[Member]) (newGossip, hasChangedState, upMembers, exitingMembers, removedMembers, Member.none)
} else if (AutoDown) { } else if (AutoDown) {
// we don't have convergence - so we might have unreachable nodes // we don't have convergence - so we might have unreachable nodes
// if 'auto-down' is turned on, then try to auto-down any unreachable nodes // if 'auto-down' is turned on, then try to auto-down any unreachable nodes
val newUnreachableMembers = localUnreachableMembers.map { member val newUnreachableMembers = localUnreachableMembers collect {
// ---------------------- // ----------------------
// 5. Move UNREACHABLE => DOWN (auto-downing by leader) // 6. Move UNREACHABLE => DOWN (auto-downing by leader)
// ---------------------- // ----------------------
if (member.status == Down) member // no need to DOWN members already DOWN case member if member.status != Down member copy (status = Down)
else member copy (status = Down) case downMember downMember // no need to DOWN members already DOWN
} }
// Check for the need to do side-effecting on successful state change // Check for the need to do side-effecting on successful state change
val (unreachableButNotDownedMembers, _) = localUnreachableMembers partition (_.status != Down) val unreachableButNotDownedMembers = localUnreachableMembers filter (_.status != Down)
// removing nodes marked as DOWN from the 'seen' table // removing nodes marked as DOWN from the 'seen' table
val newSeen = localSeen -- newUnreachableMembers.collect { case m if m.status == Down m.address } val newSeen = localSeen -- newUnreachableMembers.collect { case m if m.status == Down m.address }
@ -705,9 +666,9 @@ private[cluster] final class ClusterCoreDaemon(environment: ClusterEnvironment)
val newOverview = localOverview copy (seen = newSeen, unreachable = newUnreachableMembers) // update gossip overview val newOverview = localOverview copy (seen = newSeen, unreachable = newUnreachableMembers) // update gossip overview
val newGossip = localGossip copy (overview = newOverview) // update gossip val newGossip = localGossip copy (overview = newOverview) // update gossip
(newGossip, unreachableButNotDownedMembers.nonEmpty, Set.empty[Member], Set.empty[Member], Set.empty[Member], unreachableButNotDownedMembers) (newGossip, unreachableButNotDownedMembers.nonEmpty, Member.none, Member.none, Member.none, unreachableButNotDownedMembers)
} else (localGossip, false, Set.empty[Member], Set.empty[Member], Set.empty[Member], Set.empty[Member]) } else (localGossip, false, Member.none, Member.none, Member.none, Member.none)
if (hasChangedState) { // we have a change of state - version it and try to update if (hasChangedState) { // we have a change of state - version it and try to update
// ---------------------- // ----------------------
@ -769,20 +730,14 @@ private[cluster] final class ClusterCoreDaemon(environment: ClusterEnvironment)
val beatTo = latestGossip.members.toSeq.map(_.address) ++ joinInProgress.keys val beatTo = latestGossip.members.toSeq.map(_.address) ++ joinInProgress.keys
val deadline = Deadline.now + HeartbeatInterval val deadline = Deadline.now + HeartbeatInterval
for (address beatTo; if address != selfAddress) beatTo.foreach { address if (address != selfAddress) heartbeatSender ! SendHeartbeat(selfHeartbeat, address, deadline) }
heartbeatSender ! SendHeartbeat(selfHeartbeat, address, deadline)
} }
/** /**
* Removes overdue joinInProgress from State. * Removes overdue joinInProgress from State.
*/ */
def removeOverdueJoinInProgress(): Unit = { def removeOverdueJoinInProgress(): Unit = {
val overdueJoins = joinInProgress collect { joinInProgress --= joinInProgress collect { case (address, deadline) if deadline.isOverdue address }
case (address, deadline) if deadline.isOverdue address
}
if (overdueJoins.nonEmpty) {
joinInProgress = joinInProgress -- overdueJoins
}
} }
/** /**

View file

@ -26,6 +26,8 @@ class Member(val address: Address, val status: MemberStatus) extends ClusterMess
*/ */
object Member { object Member {
val none = Set.empty[Member]
/** /**
* `Address` ordering type class, sorts addresses by host and port. * `Address` ordering type class, sorts addresses by host and port.
*/ */
@ -56,7 +58,7 @@ object Member {
// group all members by Address => Seq[Member] // group all members by Address => Seq[Member]
val groupedByAddress = (a.toSeq ++ b.toSeq).groupBy(_.address) val groupedByAddress = (a.toSeq ++ b.toSeq).groupBy(_.address)
// pick highest MemberStatus // pick highest MemberStatus
(Set.empty[Member] /: groupedByAddress) { (Member.none /: groupedByAddress) {
case (acc, (_, members)) acc + members.reduceLeft(highestPriorityOf) case (acc, (_, members)) acc + members.reduceLeft(highestPriorityOf)
} }
} }