diff --git a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala index 98d0a3f11e..c5ad773989 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala @@ -58,11 +58,6 @@ object ClusterAction { */ case class Join(address: Address) extends ClusterMessage - /** - * Command to set a node to Up (from Joining). - */ - case class Up(address: Address) extends ClusterMessage - /** * Command to leave the cluster. */ @@ -73,15 +68,16 @@ object ClusterAction { */ case class Down(address: Address) extends ClusterMessage - /** - * Command to mark a node to be removed from the cluster immediately. - */ - case class Exit(address: Address) extends ClusterMessage - /** * Command to remove a node from the cluster immediately. */ case class Remove(address: Address) extends ClusterMessage + + /** + * Command to mark a node to be removed from the cluster immediately. + * Can only be sent by the leader. + */ + private[akka] case class Exit(address: Address) extends ClusterMessage } /** @@ -158,12 +154,10 @@ object MemberStatus { case object Down extends MemberStatus case object Removed extends MemberStatus - def isUnavailable(status: MemberStatus): Boolean = { - status == MemberStatus.Down || - status == MemberStatus.Exiting || - status == MemberStatus.Removed || - status == MemberStatus.Leaving - } + /** + * Using the same notion for 'unavailable' as 'non-convergence': DOWN and REMOVED. + */ + def isUnavailable(status: MemberStatus): Boolean = status == MemberStatus.Down || status == MemberStatus.Removed } /** @@ -266,7 +260,6 @@ final class ClusterCommandDaemon extends Actor { def receive = { case Join(address) ⇒ cluster.joining(address) - case Up(address) ⇒ cluster.up(address) case Down(address) ⇒ cluster.downing(address) case Leave(address) ⇒ cluster.leaving(address) case Exit(address) ⇒ cluster.exiting(address) @@ -453,9 +446,16 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒ // ===================== PUBLIC API ===================== // ====================================================== - def self: Member = latestGossip.members - .find(_.address == selfAddress) - .getOrElse(throw new IllegalStateException("Can't find 'this' Member (" + selfAddress + ") in the cluster membership ring")) + def self: Member = { + val gossip = latestGossip + gossip.members + .find(_.address == selfAddress) + .getOrElse { + gossip.overview.unreachable + .find(_.address == selfAddress) + .getOrElse(throw new IllegalStateException("Can't find 'this' Member [" + selfAddress + "] in the cluster membership ring or in the unreachable set")) + } + } /** * Latest gossip. @@ -609,18 +609,32 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒ } } - /** - * State transition to UP. - */ - private[cluster] final def up(address: Address): Unit = { - log.info("Cluster Node [{}] - Marking node [{}] as UP", selfAddress, address) - } - /** * State transition to LEAVING. */ - private[cluster] final def leaving(address: Address): Unit = { - log.info("Cluster Node [{}] - Marking node [{}] as LEAVING", selfAddress, address) + @tailrec + private[cluster] final def leaving(address: Address) { + log.info("Cluster Node [{}] - Marking address [{}] as LEAVING", selfAddress, address) + + val localState = state.get + val localGossip = localState.latestGossip + val localMembers = localGossip.members + + val newMembers = localMembers + Member(address, MemberStatus.Leaving) // mark node as LEAVING + val newGossip = localGossip copy (members = newMembers) + + val versionedGossip = newGossip + vclockNode + val seenVersionedGossip = versionedGossip seen selfAddress + + val newState = localState copy (latestGossip = seenVersionedGossip) + + if (!state.compareAndSet(localState, newState)) leaving(address) // recur if we failed update + else { + failureDetector heartbeat address // update heartbeat in failure detector + if (convergence(newState.latestGossip).isDefined) { + newState.memberMembershipChangeListeners foreach { _ notify newMembers } + } + } } /** @@ -909,6 +923,11 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒ val isLeader = localMembers.nonEmpty && (selfAddress == localMembers.head.address) + // FIXME implement partion handoff and a check if it is completed - now just returns TRUE - e.g. has completed successfully + def hasPartionHandoffCompletedSuccessfully(gossip: Gossip): Boolean = { + true + } + if (isLeader && isAvailable(localState)) { // only run the leader actions if we are the LEADER and available @@ -917,11 +936,12 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒ val localUnreachableMembers = localOverview.unreachable // Leader actions are as follows: - // 1. Move JOINING => UP - // 2. Move EXITING => REMOVED - // 3. Move UNREACHABLE => DOWN (auto-downing by leader) - // 4. Updating the vclock version for the changes - // 5. Updating the 'seen' table + // 1. Move JOINING => UP -- When a node joins the cluster + // 2. Move EXITING => REMOVED -- When all nodes have seen that the node is EXITING (convergence) + // 3. Move LEAVING => EXITING -- When all partition handoff has completed + // 4. Move UNREACHABLE => DOWN -- When the node is in the UNREACHABLE set it can be auto-down by leader + // 5. Updating the vclock version for the changes + // 6. Updating the 'seen' table var hasChangedState = false val newGossip = @@ -930,20 +950,37 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒ // we have convergence - so we can't have unreachable nodes val newMembers = + localMembers map { member ⇒ - // 1. Move JOINING => UP + // ---------------------- + // 1. Move JOINING => UP (once all nodes have seen that this node is JOINING e.g. we have a convergence) + // ---------------------- if (member.status == MemberStatus.Joining) { log.info("Cluster Node [{}] - Leader is moving node [{}] from JOINING to UP", selfAddress, member.address) hasChangedState = true member copy (status = MemberStatus.Up) } else member + } map { member ⇒ - // 2. Move EXITING => REMOVED + // ---------------------- + // 2. Move EXITING => REMOVED (once all nodes have seen that this node is EXITING e.g. we have a convergence) + // ---------------------- if (member.status == MemberStatus.Exiting) { log.info("Cluster Node [{}] - Leader is moving node [{}] from EXITING to REMOVED", selfAddress, member.address) hasChangedState = true member copy (status = MemberStatus.Removed) } else member + + } map { member ⇒ + // ---------------------- + // 3. Move LEAVING => EXITING (once we have a convergence on LEAVING *and* if we have a successful partition handoff) + // ---------------------- + if (member.status == MemberStatus.Leaving && hasPartionHandoffCompletedSuccessfully(localGossip)) { + log.info("Cluster Node [{}] - Leader is moving node [{}] from LEAVING to EXITING", selfAddress, member.address) + hasChangedState = true + member copy (status = MemberStatus.Exiting) + } else member + } localGossip copy (members = newMembers) // update gossip @@ -951,7 +988,9 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒ // we don't have convergence - so we might have unreachable nodes // if 'auto-down' is turned on, then try to auto-down any unreachable nodes - // 3. Move UNREACHABLE => DOWN (auto-downing by leader) + // ---------------------- + // 4. Move UNREACHABLE => DOWN (auto-downing by leader) + // ---------------------- val newUnreachableMembers = localUnreachableMembers .filter(_.status != MemberStatus.Down) // no need to DOWN members already DOWN @@ -971,10 +1010,14 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒ if (hasChangedState) { // we have a change of state - version it and try to update - // 4. Updating the vclock version for the changes + // ---------------------- + // 5. Updating the vclock version for the changes + // ---------------------- val versionedGossip = newGossip + vclockNode - // 5. Updating the 'seen' table + // ---------------------- + // 6. Updating the 'seen' table + // ---------------------- val seenVersionedGossip = versionedGossip seen selfAddress val newState = localState copy (latestGossip = seenVersionedGossip) @@ -1005,7 +1048,10 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒ // 2. all unreachable members in the set have status DOWN // Else we can't continue to check for convergence // When that is done we check that all the entries in the 'seen' table have the same vector clock version - if (unreachable.isEmpty || !unreachable.exists(m ⇒ (m.status != MemberStatus.Down) && (m.status != MemberStatus.Removed))) { + if (unreachable.isEmpty || !unreachable.exists { m ⇒ + m.status != MemberStatus.Down && + m.status != MemberStatus.Removed + }) { val seen = gossip.overview.seen val views = Set.empty[VectorClock] ++ seen.values diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/JoinTwoClustersSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/JoinTwoClustersSpec.scala index c736018806..9f1395b5dd 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/JoinTwoClustersSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/JoinTwoClustersSpec.scala @@ -18,7 +18,6 @@ object JoinTwoClustersMultiJvmSpec extends MultiNodeConfig { val c2 = role("c2") commonConfig(debugConfig(on = false).withFallback(MultiNodeClusterSpec.clusterConfig)) - } class JoinTwoClustersMultiJvmNode1 extends JoinTwoClustersSpec diff --git a/akka-docs/cluster/cluster.rst b/akka-docs/cluster/cluster.rst index a0aca11114..fb53f13131 100644 --- a/akka-docs/cluster/cluster.rst +++ b/akka-docs/cluster/cluster.rst @@ -5,7 +5,8 @@ Cluster Specification ###################### -.. note:: *This document describes the new clustering coming in Akka 2.1 (not 2.0)* +.. note:: *This document describes the new clustering coming in Akka Coltrane and +is not available in the latest stable release)* Intro =====