diff --git a/akka-actor/src/main/scala/akka/AkkaException.scala b/akka-actor/src/main/scala/akka/AkkaException.scala
index 04e820419f..8e49c7cb11 100644
--- a/akka-actor/src/main/scala/akka/AkkaException.scala
+++ b/akka-actor/src/main/scala/akka/AkkaException.scala
@@ -9,7 +9,6 @@ package akka
*
* - a uuid for tracking purposes
* - toString that includes exception name, message and uuid
- * - toLongString which also includes the stack trace
*
*/
//TODO add @SerialVersionUID(1L) when SI-4804 is fixed
diff --git a/akka-cluster/src/main/scala/akka/cluster/AccrualFailureDetector.scala b/akka-cluster/src/main/scala/akka/cluster/AccrualFailureDetector.scala
index 6632111f00..62d5fa4eb9 100644
--- a/akka-cluster/src/main/scala/akka/cluster/AccrualFailureDetector.scala
+++ b/akka-cluster/src/main/scala/akka/cluster/AccrualFailureDetector.scala
@@ -165,8 +165,7 @@ class AccrualFailureDetector(
else PhiFactor * timestampDiff / mean
}
- // FIXME change to debug log level, when failure detector is stable
- log.info("Phi value [{}] and threshold [{}] for connection [{}] ", phi, threshold, connection)
+ log.debug("Phi value [{}] and threshold [{}] for connection [{}] ", phi, threshold, connection)
phi
}
diff --git a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala
index c495e470ce..411c9d4b18 100644
--- a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala
+++ b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala
@@ -6,27 +6,27 @@ package akka.cluster
import akka.actor._
import akka.actor.Status._
+import akka.ConfigurationException
+import akka.dispatch.Await
+import akka.dispatch.MonitorableThreadFactory
+import akka.event.Logging
+import akka.jsr166y.ThreadLocalRandom
+import akka.pattern.ask
import akka.remote._
import akka.routing._
-import akka.event.Logging
-import akka.dispatch.Await
-import akka.pattern.ask
import akka.util._
import akka.util.duration._
-import akka.ConfigurationException
-import java.util.concurrent.atomic.{ AtomicReference, AtomicBoolean }
-import java.util.concurrent.TimeUnit._
-import java.util.concurrent.TimeoutException
-import akka.jsr166y.ThreadLocalRandom
-import java.lang.management.ManagementFactory
-import java.io.Closeable
-import javax.management._
-import scala.collection.immutable.{ Map, SortedSet }
-import scala.annotation.tailrec
-import com.google.protobuf.ByteString
import akka.util.internal.HashedWheelTimer
-import akka.dispatch.MonitorableThreadFactory
+import com.google.protobuf.ByteString
+import java.io.Closeable
+import java.lang.management.ManagementFactory
+import java.util.concurrent.atomic.{ AtomicReference, AtomicBoolean }
+import java.util.concurrent.TimeoutException
+import java.util.concurrent.TimeUnit._
+import javax.management._
import MemberStatus._
+import scala.annotation.tailrec
+import scala.collection.immutable.{ Map, SortedSet }
/**
* Interface for membership change listener.
@@ -52,7 +52,7 @@ sealed trait ClusterMessage extends Serializable
/**
* Cluster commands sent by the USER.
*/
-object ClusterAction {
+object ClusterUserAction {
/**
* Command to join the cluster. Sent when a node (reprsesented by 'address')
@@ -69,22 +69,33 @@ object ClusterAction {
* Command to mark node as temporary down.
*/
case class Down(address: Address) extends ClusterMessage
+}
+
+/**
+ * Cluster commands sent by the LEADER.
+ */
+object ClusterLeaderAction {
/**
- * Command to remove a node from the cluster immediately.
- */
- case class Remove(address: Address) extends ClusterMessage
-
- /**
+ * INTERNAL API.
+ *
* Command to mark a node to be removed from the cluster immediately.
* Can only be sent by the leader.
*/
- private[akka] case class Exit(address: Address) extends ClusterMessage
+ private[cluster] case class Exit(address: Address) extends ClusterMessage
+
+ /**
+ * INTERNAL API.
+ *
+ * Command to remove a node from the cluster immediately.
+ */
+ private[cluster] case class Remove(address: Address) extends ClusterMessage
}
/**
* Represents the address and the current status of a cluster member node.
*
+ * Note: `hashCode` and `equals` are solely based on the underlying `Address`, not its `MemberStatus`.
*/
class Member(val address: Address, val status: MemberStatus) extends ClusterMessage {
override def hashCode = address.##
@@ -94,12 +105,12 @@ class Member(val address: Address, val status: MemberStatus) extends ClusterMess
}
/**
- * Factory and Utility module for Member instances.
+ * Module with factory and ordering methods for Member instances.
*/
object Member {
/**
- * Sort Address by host and port
+ * `Address` ordering type class, sorts addresses by host and port.
*/
implicit val addressOrdering: Ordering[Address] = Ordering.fromLessThan[Address] { (a, b) ⇒
if (a.host != b.host) a.host.getOrElse("").compareTo(b.host.getOrElse("")) < 0
@@ -107,8 +118,14 @@ object Member {
else false
}
- implicit val ordering: Ordering[Member] = new Ordering[Member] {
- def compare(x: Member, y: Member) = addressOrdering.compare(x.address, y.address)
+ /**
+ * `Member` ordering type class, sorts members by host and port with the exception that
+ * it puts all members that are in MemberStatus.EXITING last.
+ */
+ implicit val ordering: Ordering[Member] = Ordering.fromLessThan[Member] { (a, b) ⇒
+ if (a.status == Exiting && b.status != Exiting) false
+ else if (a.status != Exiting && b.status == Exiting) true
+ else addressOrdering.compare(a.address, b.address) < 0
}
def apply(address: Address, status: MemberStatus): Member = new Member(address, status)
@@ -157,10 +174,11 @@ case class GossipEnvelope(from: Address, gossip: Gossip) extends ClusterMessage
* Can be one of: Joining, Up, Leaving, Exiting and Down.
*/
sealed trait MemberStatus extends ClusterMessage {
+
/**
- * Using the same notion for 'unavailable' as 'non-convergence': DOWN and REMOVED.
+ * Using the same notion for 'unavailable' as 'non-convergence': DOWN
*/
- def isUnavailable: Boolean = this == Down || this == Removed
+ def isUnavailable: Boolean = this == Down
}
object MemberStatus {
@@ -226,6 +244,7 @@ case class Gossip(
// FIXME can be disabled as optimization
assertInvariants
+
private def assertInvariants: Unit = {
val unreachableAndLive = members.intersect(overview.unreachable)
if (unreachableAndLive.nonEmpty)
@@ -251,14 +270,17 @@ case class Gossip(
*/
def :+(node: VectorClock.Node): Gossip = copy(version = version :+ node)
+ /**
+ * Adds a member to the member node ring.
+ */
def :+(member: Member): Gossip = {
if (members contains member) this
else this copy (members = members + member)
}
/**
- * Marks the gossip as seen by this node (selfAddress) by updating the address entry in the 'gossip.overview.seen'
- * Map with the VectorClock for the new gossip.
+ * Marks the gossip as seen by this node (address) by updating the address entry in the 'gossip.overview.seen'
+ * Map with the VectorClock (version) for the new gossip.
*/
def seen(address: Address): Gossip = {
if (overview.seen.contains(address) && overview.seen(address) == version) this
@@ -282,8 +304,7 @@ case class Gossip(
// 4. merge members by selecting the single Member with highest MemberStatus out of the Member groups,
// and exclude unreachable
- val mergedMembers = Gossip.emptyMembers ++ Member.pickHighestPriority(this.members, that.members).
- filterNot(mergedUnreachable.contains)
+ val mergedMembers = Gossip.emptyMembers ++ Member.pickHighestPriority(this.members, that.members).filterNot(mergedUnreachable.contains)
// 5. fresh seen table
val mergedSeen = Map.empty[Address, VectorClock]
@@ -306,11 +327,14 @@ case class Gossip(
case class Heartbeat(from: Address) extends ClusterMessage
/**
+ * INTERNAL API.
+ *
* Manages routing of the different cluster commands.
* Instantiated as a single instance for each Cluster - e.g. commands are serialized to Cluster message after message.
*/
-private[akka] final class ClusterCommandDaemon(cluster: Cluster) extends Actor {
- import ClusterAction._
+private[cluster] final class ClusterCommandDaemon(cluster: Cluster) extends Actor {
+ import ClusterUserAction._
+ import ClusterLeaderAction._
val log = Logging(context.system, this)
@@ -326,10 +350,12 @@ private[akka] final class ClusterCommandDaemon(cluster: Cluster) extends Actor {
}
/**
+ * INTERNAL API.
+ *
* Pooled and routed with N number of configurable instances.
* Concurrent access to Cluster.
*/
-private[akka] final class ClusterGossipDaemon(cluster: Cluster) extends Actor {
+private[cluster] final class ClusterGossipDaemon(cluster: Cluster) extends Actor {
val log = Logging(context.system, this)
def receive = {
@@ -341,9 +367,11 @@ private[akka] final class ClusterGossipDaemon(cluster: Cluster) extends Actor {
}
/**
+ * INTERNAL API.
+ *
* Supervisor managing the different Cluster daemons.
*/
-private[akka] final class ClusterDaemonSupervisor(cluster: Cluster) extends Actor {
+private[cluster] final class ClusterDaemonSupervisor(cluster: Cluster) extends Actor {
val log = Logging(context.system, this)
private val commands = context.actorOf(Props(new ClusterCommandDaemon(cluster)), "commands")
@@ -396,13 +424,11 @@ trait ClusterNodeMBean {
def isSingleton: Boolean
def isConvergence: Boolean
def isAvailable: Boolean
+ def isRunning: Boolean
def join(address: String)
def leave(address: String)
def down(address: String)
- def remove(address: String)
-
- def shutdown()
}
/**
@@ -455,7 +481,7 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector)
private val serialization = remote.serialization
- private val isRunning = new AtomicBoolean(true)
+ private val _isRunning = new AtomicBoolean(true)
private val log = Logging(system, "Node")
private val mBeanServer = ManagementFactory.getPlatformMBeanServer
@@ -566,15 +592,27 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector)
}
}
+ /**
+ * Returns true if the cluster node is up and running, false if it is shut down.
+ */
+ def isRunning: Boolean = _isRunning.get
+
/**
* Latest gossip.
*/
def latestGossip: Gossip = state.get.latestGossip
/**
- * Member status for this node.
+ * Member status for this node (`MemberStatus`).
+ *
+ * NOTE: If the node has been removed from the cluster (and shut down) then it's status is set to the 'REMOVED' tombstone state
+ * and is no longer present in the node ring or any other part of the gossiping state. However in order to maintain the
+ * model and the semantics the user would expect, this method will in this situation return `MemberStatus.Removed`.
*/
- def status: MemberStatus = self.status
+ def status: MemberStatus = {
+ if (isRunning) self.status
+ else MemberStatus.Removed
+ }
/**
* Is this node the leader?
@@ -606,33 +644,6 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector)
*/
def isAvailable: Boolean = !isUnavailable(state.get)
- /**
- * Shuts down all connections to other members, the cluster daemon and the periodic gossip and cleanup tasks.
- */
- def shutdown(): Unit = {
- if (isRunning.compareAndSet(true, false)) {
- log.info("Cluster Node [{}] - Shutting down cluster Node and cluster daemons...", selfAddress)
-
- // cancel the periodic tasks, note that otherwise they will be run when scheduler is shutdown
- gossipTask.cancel()
- heartbeatTask.cancel()
- failureDetectorReaperTask.cancel()
- leaderActionsTask.cancel()
- clusterScheduler.close()
-
- // FIXME isTerminated check can be removed when ticket #2221 is fixed
- // now it prevents logging if system is shutdown (or in progress of shutdown)
- if (!clusterDaemons.isTerminated)
- system.stop(clusterDaemons)
-
- try {
- mBeanServer.unregisterMBean(clusterMBeanName)
- } catch {
- case e: InstanceNotFoundException ⇒ // ignore - we are running multiple cluster nodes in the same JVM (probably for testing)
- }
- }
- }
-
/**
* Registers a listener to subscribe to cluster membership changes.
*/
@@ -661,7 +672,7 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector)
*/
def join(address: Address): Unit = {
val connection = clusterCommandConnectionFor(address)
- val command = ClusterAction.Join(selfAddress)
+ val command = ClusterUserAction.Join(selfAddress)
log.info("Cluster Node [{}] - Trying to send JOIN to [{}] through connection [{}]", selfAddress, address, connection)
connection ! command
}
@@ -670,21 +681,14 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector)
* Send command to issue state transition to LEAVING for the node specified by 'address'.
*/
def leave(address: Address): Unit = {
- clusterCommandDaemon ! ClusterAction.Leave(address)
+ clusterCommandDaemon ! ClusterUserAction.Leave(address)
}
/**
- * Send command to issue state transition to from DOWN to EXITING for the node specified by 'address'.
+ * Send command to DOWN the node specified by 'address'.
*/
def down(address: Address): Unit = {
- clusterCommandDaemon ! ClusterAction.Down(address)
- }
-
- /**
- * Send command to issue state transition to REMOVED for the node specified by 'address'.
- */
- def remove(address: Address): Unit = {
- clusterCommandDaemon ! ClusterAction.Remove(address)
+ clusterCommandDaemon ! ClusterUserAction.Down(address)
}
// ========================================================
@@ -692,22 +696,52 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector)
// ========================================================
/**
- * State transition to JOINING.
- * New node joining.
+ * INTERNAL API.
+ *
+ * Shuts down all connections to other members, the cluster daemon and the periodic gossip and cleanup tasks.
+ *
+ * Should not called by the user. The user can issue a LEAVE command which will tell the node
+ * to go through graceful handoff process `LEAVE -> EXITING -> REMOVED -> SHUTDOWN`.
+ */
+ private[cluster] def shutdown(): Unit = {
+ if (_isRunning.compareAndSet(true, false)) {
+ log.info("Cluster Node [{}] - Shutting down cluster Node and cluster daemons...", selfAddress)
+
+ // cancel the periodic tasks, note that otherwise they will be run when scheduler is shutdown
+ gossipTask.cancel()
+ heartbeatTask.cancel()
+ failureDetectorReaperTask.cancel()
+ leaderActionsTask.cancel()
+ clusterScheduler.close()
+
+ // FIXME isTerminated check can be removed when ticket #2221 is fixed
+ // now it prevents logging if system is shutdown (or in progress of shutdown)
+ if (!clusterDaemons.isTerminated)
+ system.stop(clusterDaemons)
+
+ try {
+ mBeanServer.unregisterMBean(clusterMBeanName)
+ } catch {
+ case e: InstanceNotFoundException ⇒ // ignore - we are running multiple cluster nodes in the same JVM (probably for testing)
+ }
+ log.info("Cluster Node [{}] - Cluster node successfully shut down", selfAddress)
+ }
+ }
+
+ /**
+ * INTERNAL API.
+ *
+ * State transition to JOINING - new node joining.
*/
@tailrec
private[cluster] final def joining(node: Address): Unit = {
- log.info("Cluster Node [{}] - Node [{}] is JOINING", selfAddress, node)
-
val localState = state.get
val localGossip = localState.latestGossip
val localMembers = localGossip.members
val localUnreachable = localGossip.overview.unreachable
val alreadyMember = localMembers.exists(_.address == node)
- val isUnreachable = localUnreachable.exists { m ⇒
- m.address == node && m.status != Down && m.status != Removed
- }
+ val isUnreachable = localUnreachable.exists { m ⇒ m.address == node && m.status != Down }
if (!alreadyMember && !isUnreachable) {
@@ -725,6 +759,7 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector)
if (!state.compareAndSet(localState, newState)) joining(node) // recur if we failed update
else {
+ log.info("Cluster Node [{}] - Node [{}] is JOINING", selfAddress, node)
// treat join as initial heartbeat, so that it becomes unavailable if nothing more happens
if (node != selfAddress) failureDetector heartbeat node
notifyMembershipChangeListeners(localState, newState)
@@ -733,52 +768,60 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector)
}
/**
+ * INTERNAL API.
+ *
* State transition to LEAVING.
*/
@tailrec
private[cluster] final def leaving(address: Address) {
- log.info("Cluster Node [{}] - Marking address [{}] as LEAVING", selfAddress, address)
-
val localState = state.get
val localGossip = localState.latestGossip
- val localMembers = localGossip.members
+ if (localGossip.members.exists(_.address == address)) { // only try to update if the node is available (in the member ring)
+ val newMembers = localGossip.members map { member ⇒ if (member.address == address) Member(address, Leaving) else member } // mark node as LEAVING
+ val newGossip = localGossip copy (members = newMembers)
- val newMembers = localMembers + Member(address, Leaving) // mark node as LEAVING
- val newGossip = localGossip copy (members = newMembers)
+ val versionedGossip = newGossip :+ vclockNode
+ val seenVersionedGossip = versionedGossip seen selfAddress
- val versionedGossip = newGossip :+ vclockNode
- val seenVersionedGossip = versionedGossip seen selfAddress
+ val newState = localState copy (latestGossip = seenVersionedGossip)
- val newState = localState copy (latestGossip = seenVersionedGossip)
-
- if (!state.compareAndSet(localState, newState)) leaving(address) // recur if we failed update
- else {
- notifyMembershipChangeListeners(localState, newState)
+ if (!state.compareAndSet(localState, newState)) leaving(address) // recur if we failed update
+ else {
+ log.info("Cluster Node [{}] - Marked address [{}] as LEAVING", selfAddress, address)
+ notifyMembershipChangeListeners(localState, newState)
+ }
}
}
- private def notifyMembershipChangeListeners(oldState: State, newState: State): Unit = {
- val oldMembersStatus = oldState.latestGossip.members.toSeq.map(m ⇒ (m.address, m.status))
- val newMembersStatus = newState.latestGossip.members.toSeq.map(m ⇒ (m.address, m.status))
- if (newMembersStatus != oldMembersStatus)
- newState.memberMembershipChangeListeners foreach { _ notify newState.latestGossip.members }
- }
-
/**
+ * INTERNAL API.
+ *
* State transition to EXITING.
*/
private[cluster] final def exiting(address: Address): Unit = {
- log.info("Cluster Node [{}] - Marking node [{}] as EXITING", selfAddress, address)
+ log.info("Cluster Node [{}] - Marked node [{}] as EXITING", selfAddress, address)
+ // FIXME implement when we implement hand-off
}
/**
+ * INTERNAL API.
+ *
* State transition to REMOVED.
+ *
+ * This method is for now only called after the LEADER have sent a Removed message - telling the node
+ * to shut down himself.
+ *
+ * In the future we might change this to allow the USER to send a Removed(address) message telling an
+ * arbitrary node to be moved direcly from UP -> REMOVED.
*/
private[cluster] final def removing(address: Address): Unit = {
- log.info("Cluster Node [{}] - Marking node [{}] as REMOVED", selfAddress, address)
+ log.info("Cluster Node [{}] - Node has been REMOVED by the leader - shutting down...", selfAddress)
+ shutdown()
}
/**
+ * INTERNAL API.
+ *
* The node to DOWN is removed from the 'members' set and put in the 'unreachable' set (if not already there)
* and its status is set to DOWN. The node is also removed from the 'seen' table.
*
@@ -836,6 +879,8 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector)
}
/**
+ * INTERNAL API.
+ *
* Receive new gossip.
*/
@tailrec
@@ -849,9 +894,8 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector)
val mergedGossip = remoteGossip merge localGossip
val versionedMergedGossip = mergedGossip :+ vclockNode
- // FIXME change to debug log level, when failure detector is stable
- log.info(
- """Can't establish a causal relationship between "remote" gossip [{}] and "local" gossip [{}] - merging them into [{}]""",
+ log.debug(
+ """Can't establish a causal relationship between "remote" gossip and "local" gossip - Remote[{}] - Local[{}] - merging them into [{}]""",
remoteGossip, localGossip, versionedMergedGossip)
versionedMergedGossip
@@ -876,7 +920,7 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector)
}
/**
- * INTERNAL API
+ * INTERNAL API.
*/
private[cluster] def receiveHeartbeat(from: Address): Unit = failureDetector heartbeat from
@@ -886,11 +930,11 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector)
private def autoJoin(): Unit = nodeToJoin foreach join
/**
- * INTERNAL API
+ * INTERNAL API.
*
* Gossips latest gossip to an address.
*/
- private[akka] def gossipTo(address: Address): Unit = {
+ private[cluster] def gossipTo(address: Address): Unit = {
val connection = clusterGossipConnectionFor(address)
log.debug("Cluster Node [{}] - Gossiping to [{}]", selfAddress, connection)
connection ! GossipEnvelope(selfAddress, latestGossip)
@@ -910,18 +954,18 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector)
}
/**
- * INTERNAL API
+ * INTERNAL API.
*/
- private[akka] def gossipToUnreachableProbablity(membersSize: Int, unreachableSize: Int): Double =
+ private[cluster] def gossipToUnreachableProbablity(membersSize: Int, unreachableSize: Int): Double =
(membersSize + unreachableSize) match {
case 0 ⇒ 0.0
case sum ⇒ unreachableSize.toDouble / sum
}
/**
- * INTERNAL API
+ * INTERNAL API.
*/
- private[akka] def gossipToDeputyProbablity(membersSize: Int, unreachableSize: Int, nrOfDeputyNodes: Int): Double = {
+ private[cluster] def gossipToDeputyProbablity(membersSize: Int, unreachableSize: Int, nrOfDeputyNodes: Int): Double = {
if (nrOfDeputyNodes > membersSize) 1.0
else if (nrOfDeputyNodes == 0) 0.0
else (membersSize + unreachableSize) match {
@@ -931,11 +975,11 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector)
}
/**
- * INTERNAL API
+ * INTERNAL API.
*
* Initates a new round of gossip.
*/
- private[akka] def gossip(): Unit = {
+ private[cluster] def gossip(): Unit = {
val localState = state.get
log.debug("Cluster Node [{}] - Initiating new round of gossip", selfAddress)
@@ -972,9 +1016,9 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector)
}
/**
- * INTERNAL API
+ * INTERNAL API.
*/
- private[akka] def heartbeat(): Unit = {
+ private[cluster] def heartbeat(): Unit = {
val localState = state.get
if (!isSingletonCluster(localState)) {
@@ -989,12 +1033,12 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector)
}
/**
- * INTERNAL API
+ * INTERNAL API.
*
* Reaps the unreachable members (moves them to the 'unreachable' list in the cluster overview) according to the failure detector's verdict.
*/
@tailrec
- final private[akka] def reapUnreachableMembers(): Unit = {
+ final private[cluster] def reapUnreachableMembers(): Unit = {
val localState = state.get
if (!isSingletonCluster(localState) && isAvailable(localState)) {
@@ -1033,124 +1077,187 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector)
}
/**
- * INTERNAL API
+ * INTERNAL API.
*
* Runs periodic leader actions, such as auto-downing unreachable nodes, assigning partitions etc.
*/
@tailrec
- final private[akka] def leaderActions(): Unit = {
+ final private[cluster] def leaderActions(): Unit = {
val localState = state.get
val localGossip = localState.latestGossip
val localMembers = localGossip.members
val isLeader = localMembers.nonEmpty && (selfAddress == localMembers.head.address)
- // FIXME implement partion handoff and a check if it is completed - now just returns TRUE - e.g. has completed successfully
- def hasPartionHandoffCompletedSuccessfully(gossip: Gossip): Boolean = {
- true
- }
-
if (isLeader && isAvailable(localState)) {
// only run the leader actions if we are the LEADER and available
val localOverview = localGossip.overview
val localSeen = localOverview.seen
val localUnreachableMembers = localOverview.unreachable
+ val hasPartionHandoffCompletedSuccessfully: Boolean = {
+ // FIXME implement partion handoff and a check if it is completed - now just returns TRUE - e.g. has completed successfully
+ true
+ }
// Leader actions are as follows:
- // 1. Move JOINING => UP -- When a node joins the cluster
- // 2. Move EXITING => REMOVED -- When all nodes have seen that the node is EXITING (convergence)
+ // 1. Move EXITING => REMOVED -- When all nodes have seen that the node is EXITING (convergence) - remove the nodes from the node ring and seen table
+ // 2. Move JOINING => UP -- When a node joins the cluster
// 3. Move LEAVING => EXITING -- When all partition handoff has completed
// 4. Move UNREACHABLE => DOWN -- When the node is in the UNREACHABLE set it can be auto-down by leader
- // 5. Updating the vclock version for the changes
- // 6. Updating the 'seen' table
+ // 5. Store away all stuff needed for the side-effecting processing in 10.
+ // 6. Updating the vclock version for the changes
+ // 7. Updating the 'seen' table
+ // 8. Try to update the state with the new gossip
+ // 9. If failure - retry
+ // 10. If success - run all the side-effecting processing
- var hasChangedState = false
- val newGossip =
+ val (
+ newGossip: Gossip,
+ hasChangedState: Boolean,
+ upMembers,
+ exitingMembers,
+ removedMembers,
+ unreachableButNotDownedMembers) =
if (convergence(localGossip).isDefined) {
// we have convergence - so we can't have unreachable nodes
+ // transform the node member ring - filterNot/map/map
val newMembers =
-
- localMembers map { member ⇒
+ localMembers filterNot { member ⇒
// ----------------------
- // 1. Move JOINING => UP (once all nodes have seen that this node is JOINING e.g. we have a convergence)
+ // 1. Move EXITING => REMOVED - e.g. remove the nodes from the 'members' set/node ring and seen table
// ----------------------
- if (member.status == Joining) {
- log.info("Cluster Node [{}] - Leader is moving node [{}] from JOINING to UP", selfAddress, member.address)
- hasChangedState = true
- member copy (status = Up)
- } else member
+ member.status == MemberStatus.Exiting
} map { member ⇒
// ----------------------
- // 2. Move EXITING => REMOVED (once all nodes have seen that this node is EXITING e.g. we have a convergence)
+ // 2. Move JOINING => UP (once all nodes have seen that this node is JOINING e.g. we have a convergence)
// ----------------------
- if (member.status == Exiting) {
- log.info("Cluster Node [{}] - Leader is moving node [{}] from EXITING to REMOVED", selfAddress, member.address)
- hasChangedState = true
- member copy (status = Removed)
- } else member
+ if (member.status == Joining) member copy (status = Up)
+ else member
} map { member ⇒
// ----------------------
// 3. Move LEAVING => EXITING (once we have a convergence on LEAVING *and* if we have a successful partition handoff)
// ----------------------
- if (member.status == Leaving && hasPartionHandoffCompletedSuccessfully(localGossip)) {
- log.info("Cluster Node [{}] - Leader is moving node [{}] from LEAVING to EXITING", selfAddress, member.address)
- hasChangedState = true
- member copy (status = Exiting)
- } else member
-
+ if (member.status == Leaving && hasPartionHandoffCompletedSuccessfully) member copy (status = Exiting)
+ else member
}
- localGossip copy (members = newMembers) // update gossip
+
+ // ----------------------
+ // 5. Store away all stuff needed for the side-effecting processing in 10.
+ // ----------------------
+
+ // Check for the need to do side-effecting on successful state change
+ // Repeat the checking for transitions between JOINING -> UP, LEAVING -> EXITING, EXITING -> REMOVED
+ // to check for state-changes and to store away removed and exiting members for later notification
+ // 1. check for state-changes to update
+ // 2. store away removed and exiting members so we can separate the pure state changes (that can be retried on collision) and the side-effecting message sending
+ val (removedMembers, newMembers1) = localMembers partition (_.status == Exiting)
+
+ val (upMembers, newMembers2) = newMembers1 partition (_.status == Joining)
+
+ val (exitingMembers, newMembers3) = newMembers2 partition (_.status == Leaving && hasPartionHandoffCompletedSuccessfully)
+
+ val hasChangedState = removedMembers.nonEmpty || upMembers.nonEmpty || exitingMembers.nonEmpty
+
+ // removing REMOVED nodes from the 'seen' table
+ val newSeen = localSeen -- removedMembers.map(_.address)
+
+ // removing REMOVED nodes from the 'unreachable' set
+ val newUnreachableMembers = localUnreachableMembers -- removedMembers
+
+ val newOverview = localOverview copy (seen = newSeen, unreachable = newUnreachableMembers) // update gossip overview
+ val newGossip = localGossip copy (members = newMembers, overview = newOverview) // update gossip
+
+ (newGossip, hasChangedState, upMembers, exitingMembers, removedMembers, Set.empty[Member])
} else if (AutoDown) {
// we don't have convergence - so we might have unreachable nodes
+
// if 'auto-down' is turned on, then try to auto-down any unreachable nodes
-
- // ----------------------
- // 4. Move UNREACHABLE => DOWN (auto-downing by leader)
- // ----------------------
- val newUnreachableMembers =
- localUnreachableMembers.map { member ⇒
- // no need to DOWN members already DOWN
- if (member.status == Down) member
- else {
- log.info("Cluster Node [{}] - Leader is marking unreachable node [{}] as DOWN", selfAddress, member.address)
- hasChangedState = true
- member copy (status = Down)
- }
- }
-
- // removing nodes marked as DOWN from the 'seen' table
- val newSeen = localSeen -- newUnreachableMembers.collect {
- case m if m.status == Down ⇒ m.address
+ val newUnreachableMembers = localUnreachableMembers.map { member ⇒
+ // ----------------------
+ // 5. Move UNREACHABLE => DOWN (auto-downing by leader)
+ // ----------------------
+ if (member.status == Down) member // no need to DOWN members already DOWN
+ else member copy (status = Down)
}
- val newOverview = localOverview copy (seen = newSeen, unreachable = newUnreachableMembers) // update gossip overview
- localGossip copy (overview = newOverview) // update gossip
+ // Check for the need to do side-effecting on successful state change
+ val (unreachableButNotDownedMembers, _) = localUnreachableMembers partition (_.status != Down)
- } else localGossip
+ // removing nodes marked as DOWN from the 'seen' table
+ val newSeen = localSeen -- newUnreachableMembers.collect { case m if m.status == Down ⇒ m.address }
+
+ val newOverview = localOverview copy (seen = newSeen, unreachable = newUnreachableMembers) // update gossip overview
+ val newGossip = localGossip copy (overview = newOverview) // update gossip
+
+ (newGossip, unreachableButNotDownedMembers.nonEmpty, Set.empty[Member], Set.empty[Member], Set.empty[Member], unreachableButNotDownedMembers)
+
+ } else (localGossip, false, Set.empty[Member], Set.empty[Member], Set.empty[Member], Set.empty[Member])
if (hasChangedState) { // we have a change of state - version it and try to update
-
// ----------------------
- // 5. Updating the vclock version for the changes
+ // 6. Updating the vclock version for the changes
// ----------------------
val versionedGossip = newGossip :+ vclockNode
// ----------------------
- // 6. Updating the 'seen' table
+ // 7. Updating the 'seen' table
+ // Unless the leader (this node) is part of the removed members, i.e. the leader have moved himself from EXITING -> REMOVED
// ----------------------
- val seenVersionedGossip = versionedGossip seen selfAddress
+ val seenVersionedGossip =
+ if (removedMembers.exists(_.address == selfAddress)) versionedGossip
+ else versionedGossip seen selfAddress
val newState = localState copy (latestGossip = seenVersionedGossip)
- // if we won the race then update else try again
- if (!state.compareAndSet(localState, newState)) leaderActions() // recur
- else {
+ // ----------------------
+ // 8. Try to update the state with the new gossip
+ // ----------------------
+ if (!state.compareAndSet(localState, newState)) {
+
+ // ----------------------
+ // 9. Failure - retry
+ // ----------------------
+ leaderActions() // recur
+
+ } else {
+ // ----------------------
+ // 10. Success - run all the side-effecting processing
+ // ----------------------
+
+ // if (removedMembers.exists(_.address == selfAddress)) {
+ // // we now know that this node (the leader) is just about to shut down since it will be moved from EXITING -> REMOVED
+ // // so now let's gossip out this information directly since there will not be any other chance
+ // gossip()
+ // }
+
+ // log the move of members from joining to up
+ upMembers foreach { member ⇒ log.info("Cluster Node [{}] - Leader is moving node [{}] from JOINING to UP", selfAddress, member.address) }
+
+ // tell all removed members to remove and shut down themselves
+ removedMembers foreach { member ⇒
+ val address = member.address
+ log.info("Cluster Node [{}] - Leader is moving node [{}] from EXITING to REMOVED - and removing node from node ring", selfAddress, address)
+ clusterCommandConnectionFor(address) ! ClusterLeaderAction.Remove(address)
+ }
+
+ // tell all exiting members to exit
+ exitingMembers foreach { member ⇒
+ val address = member.address
+ log.info("Cluster Node [{}] - Leader is moving node [{}] from LEAVING to EXITING", selfAddress, address)
+ clusterCommandConnectionFor(address) ! ClusterLeaderAction.Exit(address) // FIXME should use ? to await completion of handoff?
+ }
+
+ // log the auto-downing of the unreachable nodes
+ unreachableButNotDownedMembers foreach { member ⇒
+ log.info("Cluster Node [{}] - Leader is marking unreachable node [{}] as DOWN", selfAddress, member.address)
+ }
+
notifyMembershipChangeListeners(localState, newState)
}
}
@@ -1174,9 +1281,7 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector)
// Else we can't continue to check for convergence
// When that is done we check that all the entries in the 'seen' table have the same vector clock version
// and that all members exists in seen table
- val hasUnreachable = unreachable.nonEmpty && unreachable.exists { m ⇒
- m.status != Down && m.status != Removed
- }
+ val hasUnreachable = unreachable.nonEmpty && unreachable.exists { _.status != Down }
val allMembersInSeen = gossip.members.forall(m ⇒ seen.contains(m.address))
if (hasUnreachable) {
@@ -1205,14 +1310,18 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector)
private def isUnavailable(state: State): Boolean = {
val localGossip = state.latestGossip
- val localOverview = localGossip.overview
- val localMembers = localGossip.members
- val localUnreachableMembers = localOverview.unreachable
- val isUnreachable = localUnreachableMembers exists { _.address == selfAddress }
- val hasUnavailableMemberStatus = localMembers exists { m ⇒ (m == self) && m.status.isUnavailable }
+ val isUnreachable = localGossip.overview.unreachable exists { _.address == selfAddress }
+ val hasUnavailableMemberStatus = localGossip.members exists { m ⇒ (m == self) && m.status.isUnavailable }
isUnreachable || hasUnavailableMemberStatus
}
+ private def notifyMembershipChangeListeners(oldState: State, newState: State): Unit = {
+ val oldMembersStatus = oldState.latestGossip.members.map(m ⇒ (m.address, m.status))
+ val newMembersStatus = newState.latestGossip.members.map(m ⇒ (m.address, m.status))
+ if (newMembersStatus != oldMembersStatus)
+ newState.memberMembershipChangeListeners foreach { _ notify newState.latestGossip.members }
+ }
+
/**
* Looks up and returns the local cluster command connection.
*/
@@ -1235,9 +1344,9 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector)
addresses drop 1 take NrOfDeputyNodes filterNot (_ == selfAddress)
/**
- * INTERNAL API
+ * INTERNAL API.
*/
- private[akka] def selectRandomNode(addresses: IndexedSeq[Address]): Option[Address] =
+ private[cluster] def selectRandomNode(addresses: IndexedSeq[Address]): Option[Address] =
if (addresses.isEmpty) None
else Some(addresses(ThreadLocalRandom.current nextInt addresses.size))
@@ -1280,6 +1389,8 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector)
def isAvailable: Boolean = clusterNode.isAvailable
+ def isRunning: Boolean = clusterNode.isRunning
+
// JMX commands
def join(address: String) = clusterNode.join(AddressFromURIString(address))
@@ -1287,10 +1398,6 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector)
def leave(address: String) = clusterNode.leave(AddressFromURIString(address))
def down(address: String) = clusterNode.down(AddressFromURIString(address))
-
- def remove(address: String) = clusterNode.remove(AddressFromURIString(address))
-
- def shutdown() = clusterNode.shutdown()
}
log.info("Cluster Node [{}] - registering cluster JMX MBean [{}]", selfAddress, clusterMBeanName)
try {
diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/LeaderLeavingSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/LeaderLeavingSpec.scala
new file mode 100644
index 0000000000..37312a7351
--- /dev/null
+++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/LeaderLeavingSpec.scala
@@ -0,0 +1,87 @@
+/**
+ * Copyright (C) 2009-2012 Typesafe Inc.
+ */
+package akka.cluster
+
+import scala.collection.immutable.SortedSet
+import com.typesafe.config.ConfigFactory
+import akka.remote.testkit.MultiNodeConfig
+import akka.remote.testkit.MultiNodeSpec
+import akka.testkit._
+import akka.util.duration._
+
+object LeaderLeavingMultiJvmSpec extends MultiNodeConfig {
+ val first = role("first")
+ val second = role("second")
+ val third = role("third")
+
+ commonConfig(
+ debugConfig(on = false)
+ .withFallback(ConfigFactory.parseString("""
+ akka.cluster {
+ leader-actions-interval = 5 s # increase the leader action task frequency to make sure we get a chance to test the LEAVING state
+ unreachable-nodes-reaper-interval = 30 s
+ }""")
+ .withFallback(MultiNodeClusterSpec.clusterConfig)))
+}
+
+class LeaderLeavingMultiJvmNode1 extends LeaderLeavingSpec with FailureDetectorPuppetStrategy
+class LeaderLeavingMultiJvmNode2 extends LeaderLeavingSpec with FailureDetectorPuppetStrategy
+class LeaderLeavingMultiJvmNode3 extends LeaderLeavingSpec with FailureDetectorPuppetStrategy
+
+abstract class LeaderLeavingSpec
+ extends MultiNodeSpec(LeaderLeavingMultiJvmSpec)
+ with MultiNodeClusterSpec {
+
+ import LeaderLeavingMultiJvmSpec._
+
+ lazy val firstAddress = node(first).address
+ lazy val secondAddress = node(second).address
+ lazy val thirdAddress = node(third).address
+
+ "A LEADER that is LEAVING" must {
+
+ "be moved to LEAVING, then to EXITING, then to REMOVED, then be shut down and then a new LEADER should be elected" taggedAs LongRunningTest in {
+
+ awaitClusterUp(first, second, third)
+
+ val oldLeaderAddress = cluster.leader
+
+ if (cluster.isLeader) {
+
+ cluster.leave(oldLeaderAddress)
+ testConductor.enter("leader-left")
+
+ // verify that a NEW LEADER have taken over
+ awaitCond(!cluster.isLeader)
+
+ // verify that the LEADER is shut down
+ awaitCond(!cluster.isRunning, 30.seconds.dilated)
+
+ // verify that the LEADER is REMOVED
+ awaitCond(cluster.status == MemberStatus.Removed)
+
+ } else {
+
+ testConductor.enter("leader-left")
+
+ // verify that the LEADER is LEAVING
+ awaitCond(cluster.latestGossip.members.exists(m => m.status == MemberStatus.Leaving && m.address == oldLeaderAddress)) // wait on LEAVING
+
+ // verify that the LEADER is EXITING
+ awaitCond(cluster.latestGossip.members.exists(m => m.status == MemberStatus.Exiting && m.address == oldLeaderAddress)) // wait on EXITING
+
+ // verify that the LEADER is no longer part of the 'members' set
+ awaitCond(cluster.latestGossip.members.forall(_.address != oldLeaderAddress))
+
+ // verify that the LEADER is not part of the 'unreachable' set
+ awaitCond(cluster.latestGossip.overview.unreachable.forall(_.address != oldLeaderAddress))
+
+ // verify that we have a new LEADER
+ awaitCond(cluster.leader != oldLeaderAddress)
+ }
+
+ testConductor.enter("finished")
+ }
+ }
+}
diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeLeavingAndExitingAndBeingRemovedSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeLeavingAndExitingAndBeingRemovedSpec.scala
index 01e5f8aa74..7a233f9395 100644
--- a/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeLeavingAndExitingAndBeingRemovedSpec.scala
+++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeLeavingAndExitingAndBeingRemovedSpec.scala
@@ -18,9 +18,9 @@ object NodeLeavingAndExitingAndBeingRemovedMultiJvmSpec extends MultiNodeConfig
commonConfig(debugConfig(on = false).withFallback(MultiNodeClusterSpec.clusterConfig))
}
-class NodeLeavingAndExitingAndBeingRemovedMultiJvmNode1 extends NodeLeavingAndExitingAndBeingRemovedSpec with AccrualFailureDetectorStrategy
-class NodeLeavingAndExitingAndBeingRemovedMultiJvmNode2 extends NodeLeavingAndExitingAndBeingRemovedSpec with AccrualFailureDetectorStrategy
-class NodeLeavingAndExitingAndBeingRemovedMultiJvmNode3 extends NodeLeavingAndExitingAndBeingRemovedSpec with AccrualFailureDetectorStrategy
+class NodeLeavingAndExitingAndBeingRemovedMultiJvmNode1 extends NodeLeavingAndExitingAndBeingRemovedSpec with FailureDetectorPuppetStrategy
+class NodeLeavingAndExitingAndBeingRemovedMultiJvmNode2 extends NodeLeavingAndExitingAndBeingRemovedSpec with FailureDetectorPuppetStrategy
+class NodeLeavingAndExitingAndBeingRemovedMultiJvmNode3 extends NodeLeavingAndExitingAndBeingRemovedSpec with FailureDetectorPuppetStrategy
abstract class NodeLeavingAndExitingAndBeingRemovedSpec
extends MultiNodeSpec(NodeLeavingAndExitingAndBeingRemovedMultiJvmSpec)
@@ -36,8 +36,7 @@ abstract class NodeLeavingAndExitingAndBeingRemovedSpec
"A node that is LEAVING a non-singleton cluster" must {
- // FIXME make it work and remove ignore
- "be moved to EXITING and then to REMOVED by the reaper" taggedAs LongRunningTest ignore {
+ "eventually set to REMOVED by the reaper, and removed from membership ring and seen table" taggedAs LongRunningTest in {
awaitClusterUp(first, second, third)
@@ -50,13 +49,14 @@ abstract class NodeLeavingAndExitingAndBeingRemovedSpec
// verify that the 'second' node is no longer part of the 'members' set
awaitCond(cluster.latestGossip.members.forall(_.address != secondAddress), reaperWaitingTime)
- // verify that the 'second' node is part of the 'unreachable' set
- awaitCond(cluster.latestGossip.overview.unreachable.exists(_.status == MemberStatus.Removed), reaperWaitingTime)
+ // verify that the 'second' node is not part of the 'unreachable' set
+ awaitCond(cluster.latestGossip.overview.unreachable.forall(_.address != secondAddress), reaperWaitingTime)
+ }
- // verify node that got removed is 'second' node
- val isRemoved = cluster.latestGossip.overview.unreachable.find(_.status == MemberStatus.Removed)
- isRemoved must be('defined)
- isRemoved.get.address must be(secondAddress)
+ runOn(second) {
+ // verify that the second node is shut down and has status REMOVED
+ awaitCond(!cluster.isRunning, reaperWaitingTime)
+ awaitCond(cluster.status == MemberStatus.Removed, reaperWaitingTime)
}
testConductor.enter("finished")
diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeLeavingAndExitingSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeLeavingAndExitingSpec.scala
index fc62c17c1d..2507dd8825 100644
--- a/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeLeavingAndExitingSpec.scala
+++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeLeavingAndExitingSpec.scala
@@ -42,8 +42,7 @@ abstract class NodeLeavingAndExitingSpec
"A node that is LEAVING a non-singleton cluster" must {
- // FIXME make it work and remove ignore
- "be moved to EXITING by the leader" taggedAs LongRunningTest ignore {
+ "be moved to EXITING by the leader" taggedAs LongRunningTest in {
awaitClusterUp(first, second, third)
diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeLeavingSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeLeavingSpec.scala
index 8ea21e9380..8f637d87e5 100644
--- a/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeLeavingSpec.scala
+++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeLeavingSpec.scala
@@ -36,8 +36,7 @@ abstract class NodeLeavingSpec
"A node that is LEAVING a non-singleton cluster" must {
- // FIXME make it work and remove ignore
- "be marked as LEAVING in the converged membership table" taggedAs LongRunningTest ignore {
+ "be marked as LEAVING in the converged membership table" taggedAs LongRunningTest in {
awaitClusterUp(first, second, third)
diff --git a/akka-cluster/src/test/scala/akka/cluster/MemberOrderingSpec.scala b/akka-cluster/src/test/scala/akka/cluster/MemberOrderingSpec.scala
new file mode 100644
index 0000000000..d8687312da
--- /dev/null
+++ b/akka-cluster/src/test/scala/akka/cluster/MemberOrderingSpec.scala
@@ -0,0 +1,138 @@
+/**
+ * Copyright (C) 2009-2012 Typesafe Inc.
+ */
+
+package akka.cluster
+
+import akka.actor.{ Address, AddressFromURIString }
+import java.net.InetSocketAddress
+import org.scalatest.matchers.MustMatchers
+import org.scalatest.WordSpec
+import scala.collection.immutable.SortedSet
+import scala.util.Random
+
+@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
+class MemberOrderingSpec extends WordSpec with MustMatchers {
+ import Member.ordering
+ import Member.addressOrdering
+ import MemberStatus._
+
+ "An Ordering[Member]" must {
+
+ "order non-exiting members by host:port" in {
+ val members = SortedSet.empty[Member] +
+ Member(AddressFromURIString("akka://sys@darkstar:1112"), Up) +
+ Member(AddressFromURIString("akka://sys@darkstar:1113"), Joining) +
+ Member(AddressFromURIString("akka://sys@darkstar:1111"), Up)
+
+ val seq = members.toSeq
+ seq.size must equal(3)
+ seq(0) must equal(Member(AddressFromURIString("akka://sys@darkstar:1111"), Up))
+ seq(1) must equal(Member(AddressFromURIString("akka://sys@darkstar:1112"), Up))
+ seq(2) must equal(Member(AddressFromURIString("akka://sys@darkstar:1113"), Joining))
+ }
+
+ "order exiting members by last" in {
+ val members = SortedSet.empty[Member] +
+ Member(AddressFromURIString("akka://sys@darkstar:1112"), Exiting) +
+ Member(AddressFromURIString("akka://sys@darkstar:1113"), Up) +
+ Member(AddressFromURIString("akka://sys@darkstar:1111"), Joining)
+
+ val seq = members.toSeq
+ seq.size must equal(3)
+ seq(0) must equal(Member(AddressFromURIString("akka://sys@darkstar:1111"), Joining))
+ seq(1) must equal(Member(AddressFromURIString("akka://sys@darkstar:1113"), Up))
+ seq(2) must equal(Member(AddressFromURIString("akka://sys@darkstar:1112"), Exiting))
+ }
+
+ "order multiple exiting members by last but internally by host:port" in {
+ val members = SortedSet.empty[Member] +
+ Member(AddressFromURIString("akka://sys@darkstar:1112"), Exiting) +
+ Member(AddressFromURIString("akka://sys@darkstar:1113"), Leaving) +
+ Member(AddressFromURIString("akka://sys@darkstar:1111"), Up) +
+ Member(AddressFromURIString("akka://sys@darkstar:1110"), Exiting)
+
+ val seq = members.toSeq
+ seq.size must equal(4)
+ seq(0) must equal(Member(AddressFromURIString("akka://sys@darkstar:1111"), Up))
+ seq(1) must equal(Member(AddressFromURIString("akka://sys@darkstar:1113"), Leaving))
+ seq(2) must equal(Member(AddressFromURIString("akka://sys@darkstar:1110"), Exiting))
+ seq(3) must equal(Member(AddressFromURIString("akka://sys@darkstar:1112"), Exiting))
+ }
+
+ "be sorted by address correctly" in {
+ import Member.ordering
+ // sorting should be done on host and port, only
+ val m1 = Member(Address("akka", "sys1", "host1", 9000), MemberStatus.Up)
+ val m2 = Member(Address("akka", "sys1", "host1", 10000), MemberStatus.Up)
+ val m3 = Member(Address("cluster", "sys2", "host2", 8000), MemberStatus.Up)
+ val m4 = Member(Address("cluster", "sys2", "host2", 9000), MemberStatus.Up)
+ val m5 = Member(Address("cluster", "sys1", "host2", 10000), MemberStatus.Up)
+
+ val expected = IndexedSeq(m1, m2, m3, m4, m5)
+ val shuffled = Random.shuffle(expected)
+ shuffled.sorted must be(expected)
+ (SortedSet.empty[Member] ++ shuffled).toIndexedSeq must be(expected)
+ }
+
+ "have stable equals and hashCode" in {
+ val m1 = Member(Address("akka", "sys1", "host1", 9000), MemberStatus.Joining)
+ val m2 = Member(Address("akka", "sys1", "host1", 9000), MemberStatus.Up)
+ val m3 = Member(Address("akka", "sys1", "host1", 10000), MemberStatus.Up)
+
+ m1 must be(m2)
+ m1.hashCode must be(m2.hashCode)
+
+ m3 must not be (m2)
+ m3 must not be (m1)
+ }
+ }
+
+ "An Ordering[Address]" must {
+
+ "order addresses by port" in {
+ val addresses = SortedSet.empty[Address] +
+ AddressFromURIString("akka://sys@darkstar:1112") +
+ AddressFromURIString("akka://sys@darkstar:1113") +
+ AddressFromURIString("akka://sys@darkstar:1110") +
+ AddressFromURIString("akka://sys@darkstar:1111")
+
+ val seq = addresses.toSeq
+ seq.size must equal(4)
+ seq(0) must equal(AddressFromURIString("akka://sys@darkstar:1110"))
+ seq(1) must equal(AddressFromURIString("akka://sys@darkstar:1111"))
+ seq(2) must equal(AddressFromURIString("akka://sys@darkstar:1112"))
+ seq(3) must equal(AddressFromURIString("akka://sys@darkstar:1113"))
+ }
+
+ "order addresses by hostname" in {
+ val addresses = SortedSet.empty[Address] +
+ AddressFromURIString("akka://sys@darkstar2:1110") +
+ AddressFromURIString("akka://sys@darkstar1:1110") +
+ AddressFromURIString("akka://sys@darkstar3:1110") +
+ AddressFromURIString("akka://sys@darkstar0:1110")
+
+ val seq = addresses.toSeq
+ seq.size must equal(4)
+ seq(0) must equal(AddressFromURIString("akka://sys@darkstar0:1110"))
+ seq(1) must equal(AddressFromURIString("akka://sys@darkstar1:1110"))
+ seq(2) must equal(AddressFromURIString("akka://sys@darkstar2:1110"))
+ seq(3) must equal(AddressFromURIString("akka://sys@darkstar3:1110"))
+ }
+
+ "order addresses by hostname and port" in {
+ val addresses = SortedSet.empty[Address] +
+ AddressFromURIString("akka://sys@darkstar2:1110") +
+ AddressFromURIString("akka://sys@darkstar0:1111") +
+ AddressFromURIString("akka://sys@darkstar2:1111") +
+ AddressFromURIString("akka://sys@darkstar0:1110")
+
+ val seq = addresses.toSeq
+ seq.size must equal(4)
+ seq(0) must equal(AddressFromURIString("akka://sys@darkstar0:1110"))
+ seq(1) must equal(AddressFromURIString("akka://sys@darkstar0:1111"))
+ seq(2) must equal(AddressFromURIString("akka://sys@darkstar2:1110"))
+ seq(3) must equal(AddressFromURIString("akka://sys@darkstar2:1111"))
+ }
+ }
+}
diff --git a/akka-cluster/src/test/scala/akka/cluster/MemberSpec.scala b/akka-cluster/src/test/scala/akka/cluster/MemberSpec.scala
deleted file mode 100644
index bc1f70ae86..0000000000
--- a/akka-cluster/src/test/scala/akka/cluster/MemberSpec.scala
+++ /dev/null
@@ -1,45 +0,0 @@
-/**
- * Copyright (C) 2009-2012 Typesafe Inc.
- */
-
-package akka.cluster
-
-import org.scalatest.WordSpec
-import org.scalatest.matchers.MustMatchers
-import akka.actor.Address
-import scala.util.Random
-import scala.collection.immutable.SortedSet
-
-@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
-class MemberSpec extends WordSpec with MustMatchers {
-
- "Member" must {
-
- "be sorted by address correctly" in {
- import Member.ordering
- // sorting should be done on host and port, only
- val m1 = Member(Address("akka", "sys1", "host1", 9000), MemberStatus.Up)
- val m2 = Member(Address("akka", "sys1", "host1", 10000), MemberStatus.Up)
- val m3 = Member(Address("cluster", "sys2", "host2", 8000), MemberStatus.Up)
- val m4 = Member(Address("cluster", "sys2", "host2", 9000), MemberStatus.Up)
- val m5 = Member(Address("cluster", "sys1", "host2", 10000), MemberStatus.Up)
-
- val expected = IndexedSeq(m1, m2, m3, m4, m5)
- val shuffled = Random.shuffle(expected)
- shuffled.sorted must be(expected)
- (SortedSet.empty[Member] ++ shuffled).toIndexedSeq must be(expected)
- }
-
- "have stable equals and hashCode" in {
- val m1 = Member(Address("akka", "sys1", "host1", 9000), MemberStatus.Joining)
- val m2 = Member(Address("akka", "sys1", "host1", 9000), MemberStatus.Up)
- val m3 = Member(Address("akka", "sys1", "host1", 10000), MemberStatus.Up)
-
- m1 must be(m2)
- m1.hashCode must be(m2.hashCode)
-
- m3 must not be (m2)
- m3 must not be (m1)
- }
- }
-}
diff --git a/akka-docs/cluster/cluster.rst b/akka-docs/cluster/cluster.rst
index fb53f13131..0126897dab 100644
--- a/akka-docs/cluster/cluster.rst
+++ b/akka-docs/cluster/cluster.rst
@@ -5,8 +5,7 @@
Cluster Specification
######################
-.. note:: *This document describes the new clustering coming in Akka Coltrane and
-is not available in the latest stable release)*
+.. note:: *This document describes the new clustering coming in Akka Coltrane and is not available in the latest stable release)*
Intro
=====
@@ -164,8 +163,8 @@ After gossip convergence a ``leader`` for the cluster can be determined. There i
``leader`` election process, the ``leader`` can always be recognised deterministically
by any node whenever there is gossip convergence. The ``leader`` is simply the first
node in sorted order that is able to take the leadership role, where the only
-allowed member states for a ``leader`` are ``up`` or ``leaving`` (see below for more
-information about member states).
+allowed member states for a ``leader`` are ``up``, ``leaving`` or ``exiting`` (see
+below for more information about member states).
The role of the ``leader`` is to shift members in and out of the cluster, changing
``joining`` members to the ``up`` state or ``exiting`` members to the
@@ -302,10 +301,6 @@ handoff has completed then the node will change to the ``exiting`` state. Once
all nodes have seen the exiting state (convergence) the ``leader`` will remove the
node from the cluster, marking it as ``removed``.
-A node can also be removed forcefully by moving it directly to the ``removed``
-state using the ``remove`` action. The cluster will rebalance based on the new
-cluster membership.
-
If a node is unreachable then gossip convergence is not possible and therefore
any ``leader`` actions are also not possible (for instance, allowing a node to
become a part of the cluster, or changing actor distribution). To be able to
@@ -314,11 +309,12 @@ unreachable node is experiencing only transient difficulties then it can be
explicitly marked as ``down`` using the ``down`` user action. When this node
comes back up and begins gossiping it will automatically go through the joining
process again. If the unreachable node will be permanently down then it can be
-removed from the cluster directly with the ``remove`` user action. The cluster
-can also *auto-down* a node using the accrual failure detector.
+removed from the cluster directly by shutting the actor system down or killing it
+through an external ``SIGKILL`` signal, invocation of ``System.exit(status)`` or
+similar. The cluster can, through the leader, also *auto-down* a node.
-This means that nodes can join and leave the cluster at any point in time,
-e.g. provide cluster elasticity.
+This means that nodes can join and leave the cluster at any point in time, i.e.
+provide cluster elasticity.
State Diagram for the Member States
@@ -339,12 +335,12 @@ Member States
- **leaving** / **exiting**
states during graceful removal
-- **removed**
- tombstone state (no longer a member)
-
- **down**
marked as down/offline/unreachable
+- **removed**
+ tombstone state (no longer a member)
+
User Actions
^^^^^^^^^^^^
@@ -359,9 +355,6 @@ User Actions
- **down**
mark a node as temporarily down
-- **remove**
- remove a node from the cluster immediately
-
Leader Actions
^^^^^^^^^^^^^^
diff --git a/akka-docs/general/message-send-semantics.rst b/akka-docs/general/message-send-semantics.rst
index d9488d1f2b..41eb727358 100644
--- a/akka-docs/general/message-send-semantics.rst
+++ b/akka-docs/general/message-send-semantics.rst
@@ -48,14 +48,14 @@ At-most-once
Actual transports may provide stronger semantics,
but at-most-once is the semantics you should expect.
-The alternatives would be once-and-only-once, which is extremely costly,
+The alternatives would be once-and-only-once, which is extremely costly,
or at-least-once which essentially requires idempotency of message processing,
which is a user-level concern.
Ordering is preserved on a per-sender basis
-------------------------------------------
-Actor ``A1` sends messages ``M1``, ``M2``, ``M3`` to ``A2``
+Actor ``A1`` sends messages ``M1``, ``M2``, ``M3`` to ``A2``
Actor ``A3`` sends messages ``M4``, ``M5``, ``M6`` to ``A2``
This means that:
@@ -66,4 +66,4 @@ This means that:
5) ``A2`` can see messages from ``A1`` interleaved with messages from ``A3``
6) Since there is no guaranteed delivery, none, some or all of the messages may arrive to ``A2``
-.. _Erlang documentation: http://www.erlang.org/faq/academic.html
\ No newline at end of file
+.. _Erlang documentation: http://www.erlang.org/faq/academic.html
diff --git a/akka-kernel/src/main/dist/bin/akka-cluster b/akka-kernel/src/main/dist/bin/akka-cluster
index 3e76cdbb11..fe3af38449 100755
--- a/akka-kernel/src/main/dist/bin/akka-cluster
+++ b/akka-kernel/src/main/dist/bin/akka-cluster
@@ -63,20 +63,6 @@ case "$2" in
$JMX_CLIENT $HOST akka:type=Cluster leave=$ACTOR_SYSTEM_URL
;;
- remove)
- if [ $# -ne 3 ]; then
- echo "Usage: $SELF remove "
- exit 1
- fi
-
- ensureNodeIsRunningAndAvailable
- shift
-
- ACTOR_SYSTEM_URL=$2
- echo "Scheduling $ACTOR_SYSTEM_URL to REMOVE"
- $JMX_CLIENT $HOST akka:type=Cluster remove=$ACTOR_SYSTEM_URL
- ;;
-
down)
if [ $# -ne 3 ]; then
echo "Usage: $SELF down "
@@ -169,19 +155,32 @@ case "$2" in
$JMX_CLIENT $HOST akka:type=Cluster Available
;;
+ is-running)
+ if [ $# -ne 2 ]; then
+ echo "Usage: $SELF is-running"
+ exit 1
+ fi
+
+ ensureNodeIsRunningAndAvailable
+ shift
+
+ echo "Checking if member node on $HOST is AVAILABLE"
+ $JMX_CLIENT $HOST akka:type=Cluster Running
+ ;;
+
*)
printf "Usage: bin/$SELF ...\n"
printf "\n"
printf "Supported commands are:\n"
printf "%26s - %s\n" "join " "Sends request a JOIN node with the specified URL"
printf "%26s - %s\n" "leave " "Sends a request for node with URL to LEAVE the cluster"
- printf "%26s - %s\n" "remove " "Sends a request for node with URL to be instantly REMOVED from the cluster"
printf "%26s - %s\n" "down " "Sends a request for marking node with URL as DOWN"
printf "%26s - %s\n" member-status "Asks the member node for its current status"
printf "%26s - %s\n" cluster-status "Asks the cluster for its current status (member ring, unavailable nodes, meta data etc.)"
printf "%26s - %s\n" leader "Asks the cluster who the current leader is"
printf "%26s - %s\n" is-singleton "Checks if the cluster is a singleton cluster (single node cluster)"
printf "%26s - %s\n" is-available "Checks if the member node is available"
+ printf "%26s - %s\n" is-running "Checks if the member node is running"
printf "%26s - %s\n" has-convergence "Checks if there is a cluster convergence"
printf "Where the should be on the format of 'akka://actor-system-name@hostname:port'\n"
printf "\n"