diff --git a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala index 3fecd7524b..48035a0e4e 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala @@ -26,6 +26,7 @@ import scala.annotation.tailrec import com.google.protobuf.ByteString import akka.util.internal.HashedWheelTimer import akka.dispatch.MonitorableThreadFactory +import MemberStatus._ /** * Interface for membership change listener. @@ -96,7 +97,6 @@ class Member(val address: Address, val status: MemberStatus) extends ClusterMess * Factory and Utility module for Member instances. */ object Member { - import MemberStatus._ /** * Sort Address by host and port @@ -151,7 +151,7 @@ sealed trait MemberStatus extends ClusterMessage { /** * Using the same notion for 'unavailable' as 'non-convergence': DOWN and REMOVED. */ - def isUnavailable: Boolean = this == MemberStatus.Down || this == MemberStatus.Removed + def isUnavailable: Boolean = this == Down || this == Removed } object MemberStatus { @@ -223,7 +223,7 @@ case class Gossip( throw new IllegalArgumentException("Same nodes in both members and unreachable is not allowed, got [%s]" format unreachableAndLive.mkString(", ")) - val allowedLiveMemberStatuses: Set[MemberStatus] = Set(MemberStatus.Joining, MemberStatus.Up, MemberStatus.Leaving, MemberStatus.Exiting) + val allowedLiveMemberStatuses: Set[MemberStatus] = Set(Joining, Up, Leaving, Exiting) def hasNotAllowedLiveMemberStatus(m: Member) = !allowedLiveMemberStatuses.contains(m.status) if (members exists hasNotAllowedLiveMemberStatus) throw new IllegalArgumentException("Live members must have status [%s], got [%s]" @@ -473,7 +473,7 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector) } private val state = { - val member = Member(selfAddress, MemberStatus.Joining) + val member = Member(selfAddress, Joining) val versionedGossip = Gossip(members = Gossip.emptyMembers + member) + vclockNode // add me as member and update my vector clock val seenVersionedGossip = versionedGossip seen selfAddress new AtomicReference[State](State(seenVersionedGossip)) @@ -702,7 +702,7 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector) val alreadyMember = localMembers.exists(_.address == node) val isUnreachable = localUnreachable.exists { m ⇒ - m.address == node && m.status != MemberStatus.Down && m.status != MemberStatus.Removed + m.address == node && m.status != Down && m.status != Removed } if (!alreadyMember && !isUnreachable) { @@ -711,7 +711,7 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector) val newUnreachableMembers = localUnreachable filterNot { _.address == node } val newOverview = localGossip.overview copy (unreachable = newUnreachableMembers) - val newMembers = localMembers + Member(node, MemberStatus.Joining) // add joining node as Joining + val newMembers = localMembers + Member(node, Joining) // add joining node as Joining val newGossip = localGossip copy (overview = newOverview, members = newMembers) val versionedGossip = newGossip + vclockNode @@ -739,7 +739,7 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector) val localGossip = localState.latestGossip val localMembers = localGossip.members - val newMembers = localMembers + Member(address, MemberStatus.Leaving) // mark node as LEAVING + val newMembers = localMembers + Member(address, Leaving) // mark node as LEAVING val newGossip = localGossip copy (members = newMembers) val versionedGossip = newGossip + vclockNode @@ -792,7 +792,7 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector) // 1. check if the node to DOWN is in the 'members' set val downedMember: Option[Member] = localMembers.collectFirst { - case m if m.address == address ⇒ m.copy(status = MemberStatus.Down) + case m if m.address == address ⇒ m.copy(status = Down) } val newMembers = downedMember match { case Some(m) ⇒ @@ -805,9 +805,9 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector) val newUnreachableMembers = localUnreachableMembers.map { member ⇒ // no need to DOWN members already DOWN - if (member.address == address && member.status != MemberStatus.Down) { + if (member.address == address && member.status != Down) { log.info("Cluster Node [{}] - Marking unreachable node [{}] as DOWN", selfAddress, member.address) - member copy (status = MemberStatus.Down) + member copy (status = Down) } else member } @@ -816,7 +816,7 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector) // 4. remove nodes marked as DOWN from the 'seen' table val newSeen = localSeen -- newUnreachablePlusNewlyDownedMembers.collect { - case m if m.status == MemberStatus.Down ⇒ m.address + case m if m.status == Down ⇒ m.address } // update gossip overview @@ -1073,30 +1073,30 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector) // ---------------------- // 1. Move JOINING => UP (once all nodes have seen that this node is JOINING e.g. we have a convergence) // ---------------------- - if (member.status == MemberStatus.Joining) { + if (member.status == Joining) { log.info("Cluster Node [{}] - Leader is moving node [{}] from JOINING to UP", selfAddress, member.address) hasChangedState = true - member copy (status = MemberStatus.Up) + member copy (status = Up) } else member } map { member ⇒ // ---------------------- // 2. Move EXITING => REMOVED (once all nodes have seen that this node is EXITING e.g. we have a convergence) // ---------------------- - if (member.status == MemberStatus.Exiting) { + if (member.status == Exiting) { log.info("Cluster Node [{}] - Leader is moving node [{}] from EXITING to REMOVED", selfAddress, member.address) hasChangedState = true - member copy (status = MemberStatus.Removed) + member copy (status = Removed) } else member } map { member ⇒ // ---------------------- // 3. Move LEAVING => EXITING (once we have a convergence on LEAVING *and* if we have a successful partition handoff) // ---------------------- - if (member.status == MemberStatus.Leaving && hasPartionHandoffCompletedSuccessfully(localGossip)) { + if (member.status == Leaving && hasPartionHandoffCompletedSuccessfully(localGossip)) { log.info("Cluster Node [{}] - Leader is moving node [{}] from LEAVING to EXITING", selfAddress, member.address) hasChangedState = true - member copy (status = MemberStatus.Exiting) + member copy (status = Exiting) } else member } @@ -1112,17 +1112,17 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector) val newUnreachableMembers = localUnreachableMembers.map { member ⇒ // no need to DOWN members already DOWN - if (member.status == MemberStatus.Down) member + if (member.status == Down) member else { log.info("Cluster Node [{}] - Leader is marking unreachable node [{}] as DOWN", selfAddress, member.address) hasChangedState = true - member copy (status = MemberStatus.Down) + member copy (status = Down) } } // removing nodes marked as DOWN from the 'seen' table val newSeen = localSeen -- newUnreachableMembers.collect { - case m if m.status == MemberStatus.Down ⇒ m.address + case m if m.status == Down ⇒ m.address } val newOverview = localOverview copy (seen = newSeen, unreachable = newUnreachableMembers) // update gossip overview @@ -1169,8 +1169,7 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector) // Else we can't continue to check for convergence // When that is done we check that all the entries in the 'seen' table have the same vector clock version if (unreachable.isEmpty || !unreachable.exists { m ⇒ - m.status != MemberStatus.Down && - m.status != MemberStatus.Removed + m.status != Down && m.status != Removed }) { val seen = gossip.overview.seen val views = Set.empty[VectorClock] ++ seen.values