import MemberStatus._

This commit is contained in:
Patrik Nordwall 2012-06-13 15:33:38 +02:00
parent 5b89d25c37
commit afbeb3e5f9

View file

@ -26,6 +26,7 @@ import scala.annotation.tailrec
import com.google.protobuf.ByteString
import akka.util.internal.HashedWheelTimer
import akka.dispatch.MonitorableThreadFactory
import MemberStatus._
/**
* Interface for membership change listener.
@ -96,7 +97,6 @@ class Member(val address: Address, val status: MemberStatus) extends ClusterMess
* Factory and Utility module for Member instances.
*/
object Member {
import MemberStatus._
/**
* Sort Address by host and port
@ -151,7 +151,7 @@ sealed trait MemberStatus extends ClusterMessage {
/**
* Using the same notion for 'unavailable' as 'non-convergence': DOWN and REMOVED.
*/
def isUnavailable: Boolean = this == MemberStatus.Down || this == MemberStatus.Removed
def isUnavailable: Boolean = this == Down || this == Removed
}
object MemberStatus {
@ -223,7 +223,7 @@ case class Gossip(
throw new IllegalArgumentException("Same nodes in both members and unreachable is not allowed, got [%s]"
format unreachableAndLive.mkString(", "))
val allowedLiveMemberStatuses: Set[MemberStatus] = Set(MemberStatus.Joining, MemberStatus.Up, MemberStatus.Leaving, MemberStatus.Exiting)
val allowedLiveMemberStatuses: Set[MemberStatus] = Set(Joining, Up, Leaving, Exiting)
def hasNotAllowedLiveMemberStatus(m: Member) = !allowedLiveMemberStatuses.contains(m.status)
if (members exists hasNotAllowedLiveMemberStatus)
throw new IllegalArgumentException("Live members must have status [%s], got [%s]"
@ -473,7 +473,7 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector)
}
private val state = {
val member = Member(selfAddress, MemberStatus.Joining)
val member = Member(selfAddress, Joining)
val versionedGossip = Gossip(members = Gossip.emptyMembers + member) + vclockNode // add me as member and update my vector clock
val seenVersionedGossip = versionedGossip seen selfAddress
new AtomicReference[State](State(seenVersionedGossip))
@ -702,7 +702,7 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector)
val alreadyMember = localMembers.exists(_.address == node)
val isUnreachable = localUnreachable.exists { m
m.address == node && m.status != MemberStatus.Down && m.status != MemberStatus.Removed
m.address == node && m.status != Down && m.status != Removed
}
if (!alreadyMember && !isUnreachable) {
@ -711,7 +711,7 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector)
val newUnreachableMembers = localUnreachable filterNot { _.address == node }
val newOverview = localGossip.overview copy (unreachable = newUnreachableMembers)
val newMembers = localMembers + Member(node, MemberStatus.Joining) // add joining node as Joining
val newMembers = localMembers + Member(node, Joining) // add joining node as Joining
val newGossip = localGossip copy (overview = newOverview, members = newMembers)
val versionedGossip = newGossip + vclockNode
@ -739,7 +739,7 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector)
val localGossip = localState.latestGossip
val localMembers = localGossip.members
val newMembers = localMembers + Member(address, MemberStatus.Leaving) // mark node as LEAVING
val newMembers = localMembers + Member(address, Leaving) // mark node as LEAVING
val newGossip = localGossip copy (members = newMembers)
val versionedGossip = newGossip + vclockNode
@ -792,7 +792,7 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector)
// 1. check if the node to DOWN is in the 'members' set
val downedMember: Option[Member] = localMembers.collectFirst {
case m if m.address == address m.copy(status = MemberStatus.Down)
case m if m.address == address m.copy(status = Down)
}
val newMembers = downedMember match {
case Some(m)
@ -805,9 +805,9 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector)
val newUnreachableMembers =
localUnreachableMembers.map { member
// no need to DOWN members already DOWN
if (member.address == address && member.status != MemberStatus.Down) {
if (member.address == address && member.status != Down) {
log.info("Cluster Node [{}] - Marking unreachable node [{}] as DOWN", selfAddress, member.address)
member copy (status = MemberStatus.Down)
member copy (status = Down)
} else member
}
@ -816,7 +816,7 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector)
// 4. remove nodes marked as DOWN from the 'seen' table
val newSeen = localSeen -- newUnreachablePlusNewlyDownedMembers.collect {
case m if m.status == MemberStatus.Down m.address
case m if m.status == Down m.address
}
// update gossip overview
@ -1073,30 +1073,30 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector)
// ----------------------
// 1. Move JOINING => UP (once all nodes have seen that this node is JOINING e.g. we have a convergence)
// ----------------------
if (member.status == MemberStatus.Joining) {
if (member.status == Joining) {
log.info("Cluster Node [{}] - Leader is moving node [{}] from JOINING to UP", selfAddress, member.address)
hasChangedState = true
member copy (status = MemberStatus.Up)
member copy (status = Up)
} else member
} map { member
// ----------------------
// 2. Move EXITING => REMOVED (once all nodes have seen that this node is EXITING e.g. we have a convergence)
// ----------------------
if (member.status == MemberStatus.Exiting) {
if (member.status == Exiting) {
log.info("Cluster Node [{}] - Leader is moving node [{}] from EXITING to REMOVED", selfAddress, member.address)
hasChangedState = true
member copy (status = MemberStatus.Removed)
member copy (status = Removed)
} else member
} map { member
// ----------------------
// 3. Move LEAVING => EXITING (once we have a convergence on LEAVING *and* if we have a successful partition handoff)
// ----------------------
if (member.status == MemberStatus.Leaving && hasPartionHandoffCompletedSuccessfully(localGossip)) {
if (member.status == Leaving && hasPartionHandoffCompletedSuccessfully(localGossip)) {
log.info("Cluster Node [{}] - Leader is moving node [{}] from LEAVING to EXITING", selfAddress, member.address)
hasChangedState = true
member copy (status = MemberStatus.Exiting)
member copy (status = Exiting)
} else member
}
@ -1112,17 +1112,17 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector)
val newUnreachableMembers =
localUnreachableMembers.map { member
// no need to DOWN members already DOWN
if (member.status == MemberStatus.Down) member
if (member.status == Down) member
else {
log.info("Cluster Node [{}] - Leader is marking unreachable node [{}] as DOWN", selfAddress, member.address)
hasChangedState = true
member copy (status = MemberStatus.Down)
member copy (status = Down)
}
}
// removing nodes marked as DOWN from the 'seen' table
val newSeen = localSeen -- newUnreachableMembers.collect {
case m if m.status == MemberStatus.Down m.address
case m if m.status == Down m.address
}
val newOverview = localOverview copy (seen = newSeen, unreachable = newUnreachableMembers) // update gossip overview
@ -1169,8 +1169,7 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector)
// Else we can't continue to check for convergence
// When that is done we check that all the entries in the 'seen' table have the same vector clock version
if (unreachable.isEmpty || !unreachable.exists { m
m.status != MemberStatus.Down &&
m.status != MemberStatus.Removed
m.status != Down && m.status != Removed
}) {
val seen = gossip.overview.seen
val views = Set.empty[VectorClock] ++ seen.values