diff --git a/.gitignore b/.gitignore
index 25d3fa2323..7d8723dcb9 100755
--- a/.gitignore
+++ b/.gitignore
@@ -24,8 +24,7 @@ logs
.#*
.codefellow
storage
-.codefellow
-.ensime
+.ensime*
_dump
.manager
manifest.mf
diff --git a/akka-actor/src/main/scala/akka/actor/ActorCell.scala b/akka-actor/src/main/scala/akka/actor/ActorCell.scala
index fb394fac2e..3b068d5c2b 100644
--- a/akka-actor/src/main/scala/akka/actor/ActorCell.scala
+++ b/akka-actor/src/main/scala/akka/actor/ActorCell.scala
@@ -426,7 +426,7 @@ private[akka] class ActorCell(
final def start(): Unit = {
/*
- * Create the mailbox and enqueue the Create() message to ensure that
+ * Create the mailbox and enqueue the Create() message to ensure that
* this is processed before anything else.
*/
mailbox = dispatcher.createMailbox(this)
diff --git a/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala b/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala
index 9a065e4c90..b22218d6a3 100644
--- a/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala
+++ b/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala
@@ -509,30 +509,30 @@ class LocalActorRefProvider(
def actorFor(ref: InternalActorRef, path: String): InternalActorRef = path match {
case RelativeActorPath(elems) ⇒
if (elems.isEmpty) {
- log.debug("look-up of empty path string '{}' fails (per definition)", path)
+ log.debug("look-up of empty path string [{}] fails (per definition)", path)
deadLetters
} else if (elems.head.isEmpty) actorFor(rootGuardian, elems.tail)
else actorFor(ref, elems)
case ActorPathExtractor(address, elems) if address == rootPath.address ⇒ actorFor(rootGuardian, elems)
case _ ⇒
- log.debug("look-up of unknown path '{}' failed", path)
+ log.warning("look-up of unknown path [{}] failed", path)
deadLetters
}
def actorFor(path: ActorPath): InternalActorRef =
if (path.root == rootPath) actorFor(rootGuardian, path.elements)
else {
- log.debug("look-up of foreign ActorPath '{}' failed", path)
+ log.warning("look-up of foreign ActorPath [{}] failed", path)
deadLetters
}
def actorFor(ref: InternalActorRef, path: Iterable[String]): InternalActorRef =
if (path.isEmpty) {
- log.debug("look-up of empty path sequence fails (per definition)")
+ log.warning("look-up of empty path sequence fails (per definition)")
deadLetters
} else ref.getChild(path.iterator) match {
case Nobody ⇒
- log.debug("look-up of path sequence '{}' failed", path)
+ log.warning("look-up of path sequence [/{}] failed", path.mkString("/"))
new EmptyLocalActorRef(system.provider, ref.path / path, eventStream)
case x ⇒ x
}
diff --git a/akka-cluster/src/main/resources/reference.conf b/akka-cluster/src/main/resources/reference.conf
index 3142d548b5..42ce7e4a77 100644
--- a/akka-cluster/src/main/resources/reference.conf
+++ b/akka-cluster/src/main/resources/reference.conf
@@ -8,9 +8,21 @@
akka {
cluster {
- seed-nodes = []
- seed-node-connection-timeout = 30s
- max-time-to-retry-joining-cluster = 30s
+ # node to join - the full URI defined by a string on the form of "akka://system@hostname:port"
+ # leave as empty string if the node should be a singleton cluster
+ node-to-join = ""
+
+ # should the 'leader' in the cluster be allowed to automatically mark unreachable nodes as DOWN?
+ auto-down = on
+
+ # the number of gossip daemon actors
+ nr-of-gossip-daemons = 4
+ nr-of-deputy-nodes = 3
+
+ gossip {
+ initialDelay = 5s
+ frequency = 1s
+ }
# accrual failure detection config
failure-detector {
@@ -24,10 +36,5 @@ akka {
max-sample-size = 1000
}
-
- gossip {
- initial-delay = 5s
- frequency = 1s
- }
}
}
diff --git a/akka-cluster/src/main/scala/akka/cluster/AccrualFailureDetector.scala b/akka-cluster/src/main/scala/akka/cluster/AccrualFailureDetector.scala
index 379bf98a6b..d2dce19a80 100644
--- a/akka-cluster/src/main/scala/akka/cluster/AccrualFailureDetector.scala
+++ b/akka-cluster/src/main/scala/akka/cluster/AccrualFailureDetector.scala
@@ -23,14 +23,17 @@ import System.{ currentTimeMillis ⇒ newTimestamp }
*
* Default threshold is 8, but can be configured in the Akka config.
*/
-class AccrualFailureDetector(system: ActorSystem, val threshold: Int = 8, val maxSampleSize: Int = 1000) {
+class AccrualFailureDetector(system: ActorSystem, address: Address, val threshold: Int = 8, val maxSampleSize: Int = 1000) {
private final val PhiFactor = 1.0 / math.log(10.0)
- private case class FailureStats(mean: Double = 0.0D, variance: Double = 0.0D, deviation: Double = 0.0D)
-
private val log = Logging(system, "FailureDetector")
+ /**
+ * Holds the failure statistics for a specific node Address.
+ */
+ private case class FailureStats(mean: Double = 0.0D, variance: Double = 0.0D, deviation: Double = 0.0D)
+
/**
* Implement using optimistic lockless concurrency, all state is represented
* by this immutable case class and managed by an AtomicReference.
@@ -54,22 +57,26 @@ class AccrualFailureDetector(system: ActorSystem, val threshold: Int = 8, val ma
*/
@tailrec
final def heartbeat(connection: Address) {
- log.debug("Heartbeat from connection [{}] ", connection)
- val oldState = state.get
+ log.debug("Node [{}] - Heartbeat from connection [{}] ", address, connection)
+ val oldState = state.get
+ val oldFailureStats = oldState.failureStats
+ val oldTimestamps = oldState.timestamps
val latestTimestamp = oldState.timestamps.get(connection)
+
if (latestTimestamp.isEmpty) {
// this is heartbeat from a new connection
// add starter records for this new connection
- val failureStats = oldState.failureStats + (connection -> FailureStats())
- val intervalHistory = oldState.intervalHistory + (connection -> Vector.empty[Long])
- val timestamps = oldState.timestamps + (connection -> newTimestamp)
+ val newFailureStats = oldFailureStats + (connection -> FailureStats())
+ val newIntervalHistory = oldState.intervalHistory + (connection -> Vector.empty[Long])
+ val newTimestamps = oldTimestamps + (connection -> newTimestamp)
- val newState = oldState copy (version = oldState.version + 1,
- failureStats = failureStats,
- intervalHistory = intervalHistory,
- timestamps = timestamps)
+ val newState = oldState copy (
+ version = oldState.version + 1,
+ failureStats = newFailureStats,
+ intervalHistory = newIntervalHistory,
+ timestamps = newTimestamps)
// if we won the race then update else try again
if (!state.compareAndSet(oldState, newState)) heartbeat(connection) // recur
@@ -79,46 +86,49 @@ class AccrualFailureDetector(system: ActorSystem, val threshold: Int = 8, val ma
val timestamp = newTimestamp
val interval = timestamp - latestTimestamp.get
- val timestamps = oldState.timestamps + (connection -> timestamp) // record new timestamp
+ val newTimestamps = oldTimestamps + (connection -> timestamp) // record new timestamp
- var newIntervalsForConnection =
- oldState.intervalHistory.get(connection).getOrElse(Vector.empty[Long]) :+ interval // append the new interval to history
+ var newIntervalsForConnection = (oldState.intervalHistory.get(connection) match {
+ case Some(history) ⇒ history
+ case _ ⇒ Vector.empty[Long]
+ }) :+ interval
if (newIntervalsForConnection.size > maxSampleSize) {
// reached max history, drop first interval
newIntervalsForConnection = newIntervalsForConnection drop 0
}
- val failureStats =
+ val newFailureStats =
if (newIntervalsForConnection.size > 1) {
- val mean: Double = newIntervalsForConnection.sum / newIntervalsForConnection.size.toDouble
+ val newMean: Double = newIntervalsForConnection.sum / newIntervalsForConnection.size.toDouble
- val oldFailureStats = oldState.failureStats.get(connection).getOrElse(FailureStats())
+ val oldConnectionFailureStats = oldState.failureStats.get(connection) match {
+ case Some(stats) ⇒ stats
+ case _ ⇒ throw new IllegalStateException("Can't calculate new failure statistics due to missing heartbeat history")
+ }
val deviationSum =
newIntervalsForConnection
.map(_.toDouble)
- .foldLeft(0.0D)((x, y) ⇒ x + (y - mean))
+ .foldLeft(0.0D)((x, y) ⇒ x + (y - newMean))
- val variance: Double = deviationSum / newIntervalsForConnection.size.toDouble
- val deviation: Double = math.sqrt(variance)
+ val newVariance: Double = deviationSum / newIntervalsForConnection.size.toDouble
+ val newDeviation: Double = math.sqrt(newVariance)
- val newFailureStats = oldFailureStats copy (mean = mean,
- deviation = deviation,
- variance = variance)
+ val newFailureStats = oldConnectionFailureStats copy (mean = newMean, deviation = newDeviation, variance = newVariance)
+ oldFailureStats + (connection -> newFailureStats)
- oldState.failureStats + (connection -> newFailureStats)
} else {
- oldState.failureStats
+ oldFailureStats
}
- val intervalHistory = oldState.intervalHistory + (connection -> newIntervalsForConnection)
+ val newIntervalHistory = oldState.intervalHistory + (connection -> newIntervalsForConnection)
val newState = oldState copy (version = oldState.version + 1,
- failureStats = failureStats,
- intervalHistory = intervalHistory,
- timestamps = timestamps)
+ failureStats = newFailureStats,
+ intervalHistory = newIntervalHistory,
+ timestamps = newTimestamps)
// if we won the race then update else try again
if (!state.compareAndSet(oldState, newState)) heartbeat(connection) // recur
@@ -138,14 +148,23 @@ class AccrualFailureDetector(system: ActorSystem, val threshold: Int = 8, val ma
def phi(connection: Address): Double = {
val oldState = state.get
val oldTimestamp = oldState.timestamps.get(connection)
+
val phi =
if (oldTimestamp.isEmpty) 0.0D // treat unmanaged connections, e.g. with zero heartbeats, as healthy connections
else {
val timestampDiff = newTimestamp - oldTimestamp.get
- val mean = oldState.failureStats.get(connection).getOrElse(FailureStats()).mean
- PhiFactor * timestampDiff / mean
+
+ val mean = oldState.failureStats.get(connection) match {
+ case Some(FailureStats(mean, _, _)) ⇒ mean
+ case _ ⇒ throw new IllegalStateException("Can't calculate Failure Detector Phi value for a node that have no heartbeat history")
+ }
+
+ if (mean == 0.0D) 0.0D
+ else PhiFactor * timestampDiff / mean
}
- log.debug("Phi value [{}] and threshold [{}] for connection [{}] ", phi, threshold, connection)
+
+ // only log if PHI value is starting to get interesting
+ if (phi > 0.0D) log.debug("Node [{}] - Phi value [{}] and threshold [{}] for connection [{}] ", address, phi, threshold, connection)
phi
}
diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala
index dc081623bc..50b0f5bd0b 100644
--- a/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala
+++ b/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala
@@ -13,14 +13,15 @@ import akka.actor.AddressFromURIString
class ClusterSettings(val config: Config, val systemName: String) {
import config._
- // cluster config section
val FailureDetectorThreshold = getInt("akka.cluster.failure-detector.threshold")
val FailureDetectorMaxSampleSize = getInt("akka.cluster.failure-detector.max-sample-size")
- val SeedNodeConnectionTimeout = Duration(config.getMilliseconds("akka.cluster.seed-node-connection-timeout"), MILLISECONDS)
- val MaxTimeToRetryJoiningCluster = Duration(config.getMilliseconds("akka.cluster.max-time-to-retry-joining-cluster"), MILLISECONDS)
- val InitialDelayForGossip = Duration(getMilliseconds("akka.cluster.gossip.initial-delay"), MILLISECONDS)
- val GossipFrequency = Duration(getMilliseconds("akka.cluster.gossip.frequency"), MILLISECONDS)
- val SeedNodes = Set.empty[Address] ++ getStringList("akka.cluster.seed-nodes").asScala.collect {
- case AddressFromURIString(addr) ⇒ addr
+ val NodeToJoin: Option[Address] = getString("akka.cluster.node-to-join") match {
+ case "" ⇒ None
+ case AddressFromURIString(addr) ⇒ Some(addr)
}
+ val GossipInitialDelay = Duration(getMilliseconds("akka.cluster.gossip.initialDelay"), MILLISECONDS)
+ val GossipFrequency = Duration(getMilliseconds("akka.cluster.gossip.frequency"), MILLISECONDS)
+ val NrOfGossipDaemons = getInt("akka.cluster.nr-of-gossip-daemons")
+ val NrOfDeputyNodes = getInt("akka.cluster.nr-of-deputy-nodes")
+ val AutoDown = getBoolean("akka.cluster.auto-down")
}
diff --git a/akka-cluster/src/main/scala/akka/cluster/Gossiper.scala b/akka-cluster/src/main/scala/akka/cluster/Gossiper.scala
deleted file mode 100644
index bb15223842..0000000000
--- a/akka-cluster/src/main/scala/akka/cluster/Gossiper.scala
+++ /dev/null
@@ -1,438 +0,0 @@
-/**
- * Copyright (C) 2009-2012 Typesafe Inc.
- */
-
-package akka.cluster
-
-import akka.actor._
-import akka.actor.Status._
-import akka.remote._
-import akka.event.Logging
-import akka.dispatch.Await
-import akka.pattern.ask
-import akka.util._
-import akka.config.ConfigurationException
-
-import java.util.concurrent.atomic.{ AtomicReference, AtomicBoolean }
-import java.util.concurrent.TimeUnit._
-import java.util.concurrent.TimeoutException
-import java.security.SecureRandom
-import System.{ currentTimeMillis ⇒ newTimestamp }
-
-import scala.collection.immutable.{ Map, SortedSet }
-import scala.annotation.tailrec
-
-import com.google.protobuf.ByteString
-
-/**
- * Interface for member membership change listener.
- */
-trait NodeMembershipChangeListener {
- def memberConnected(member: Member)
- def memberDisconnected(member: Member)
-}
-
-/**
- * Base trait for all cluster messages. All ClusterMessage's are serializable.
- */
-sealed trait ClusterMessage extends Serializable
-
-/**
- * Command to join the cluster.
- */
-case object JoinCluster extends ClusterMessage
-
-/**
- * Represents the state of the cluster; cluster ring membership, ring convergence, meta data - all versioned by a vector clock.
- */
-case class Gossip(
- version: VectorClock = VectorClock(),
- member: Address,
- // sorted set of members with their status, sorted by name
- members: SortedSet[Member] = SortedSet.empty[Member](Ordering.fromLessThan[Member](_.address.toString > _.address.toString)),
- unavailableMembers: Set[Member] = Set.empty[Member],
- // for ring convergence
- seen: Map[Member, VectorClock] = Map.empty[Member, VectorClock],
- // for handoff
- //pendingChanges: Option[Vector[PendingPartitioningChange]] = None,
- meta: Option[Map[String, Array[Byte]]] = None)
- extends ClusterMessage // is a serializable cluster message
- with Versioned // has a vector clock as version
-
-/**
- * Represents the address and the current status of a cluster member node.
- */
-case class Member(address: Address, status: MemberStatus) extends ClusterMessage
-
-/**
- * Defines the current status of a cluster member node
- *
- * Can be one of: Joining, Up, Leaving, Exiting and Down.
- */
-sealed trait MemberStatus extends ClusterMessage with Versioned
-object MemberStatus {
- case class Joining(version: VectorClock = VectorClock()) extends MemberStatus
- case class Up(version: VectorClock = VectorClock()) extends MemberStatus
- case class Leaving(version: VectorClock = VectorClock()) extends MemberStatus
- case class Exiting(version: VectorClock = VectorClock()) extends MemberStatus
- case class Down(version: VectorClock = VectorClock()) extends MemberStatus
-}
-
-// sealed trait PendingPartitioningStatus
-// object PendingPartitioningStatus {
-// case object Complete extends PendingPartitioningStatus
-// case object Awaiting extends PendingPartitioningStatus
-// }
-
-// case class PendingPartitioningChange(
-// owner: Address,
-// nextOwner: Address,
-// changes: Vector[VNodeMod],
-// status: PendingPartitioningStatus)
-
-final class ClusterDaemon(system: ActorSystem, gossiper: Gossiper) extends Actor {
- val log = Logging(system, "ClusterDaemon")
-
- def receive = {
- case JoinCluster ⇒ sender ! gossiper.latestGossip
- case gossip: Gossip ⇒
- gossiper.tell(gossip)
-
- case unknown ⇒ log.error("Unknown message sent to cluster daemon [" + unknown + "]")
- }
-}
-
-/**
- * This module is responsible for Gossiping cluster information. The abstraction maintains the list of live
- * and dead members. Periodically i.e. every 1 second this module chooses a random member and initiates a round
- * of Gossip with it. Whenever it gets gossip updates it updates the Failure Detector with the liveness
- * information.
- *
- * During each of these runs the member initiates gossip exchange according to following rules (as defined in the
- * Cassandra documentation [http://wiki.apache.org/cassandra/ArchitectureGossip]:
- *
- * 1) Gossip to random live member (if any)
- * 2) Gossip to random unreachable member with certain probability depending on number of unreachable and live members
- * 3) If the member gossiped to at (1) was not seed, or the number of live members is less than number of seeds,
- * gossip to random seed with certain probability depending on number of unreachable, seed and live members.
- *
- */
-case class Gossiper(remote: RemoteActorRefProvider, system: ActorSystemImpl) {
-
- /**
- * Represents the state for this Gossiper. Implemented using optimistic lockless concurrency,
- * all state is represented by this immutable case class and managed by an AtomicReference.
- */
- private case class State(
- currentGossip: Gossip,
- memberMembershipChangeListeners: Set[NodeMembershipChangeListener] = Set.empty[NodeMembershipChangeListener])
-
- val remoteSettings = new RemoteSettings(system.settings.config, system.name)
- val clusterSettings = new ClusterSettings(system.settings.config, system.name)
-
- val protocol = "akka" // TODO should this be hardcoded?
- val address = remote.transport.address
-
- val memberFingerprint = address.##
- val initialDelayForGossip = clusterSettings.InitialDelayForGossip
- val gossipFrequency = clusterSettings.GossipFrequency
- implicit val seedNodeConnectionTimeout = clusterSettings.SeedNodeConnectionTimeout
- implicit val defaultTimeout = Timeout(remoteSettings.RemoteSystemDaemonAckTimeout)
-
- // seed members
- private val seeds: Set[Member] = {
- if (clusterSettings.SeedNodes.isEmpty) throw new ConfigurationException(
- "At least one seed member must be defined in the configuration [akka.cluster.seed-members]")
- else clusterSettings.SeedNodes map (address ⇒ Member(address, MemberStatus.Up()))
- }
-
- private val serialization = remote.serialization
- private val failureDetector = new AccrualFailureDetector(system, clusterSettings.FailureDetectorThreshold, clusterSettings.FailureDetectorMaxSampleSize)
-
- private val isRunning = new AtomicBoolean(true)
- private val log = Logging(system, "Gossiper")
- private val random = SecureRandom.getInstance("SHA1PRNG")
-
- // Is it right to put this guy under the /system path or should we have a top-level /cluster or something else...?
- private val clusterDaemon = system.systemActorOf(Props(new ClusterDaemon(system, this)), "cluster")
- private val state = new AtomicReference[State](State(currentGossip = newGossip()))
-
- // FIXME manage connections in some other way so we can delete the RemoteConnectionManager (SINCE IT SUCKS!!!)
- private val connectionManager = new RemoteConnectionManager(system, remote, failureDetector, Map.empty[Address, ActorRef])
-
- log.info("Starting cluster Gossiper...")
-
- // join the cluster by connecting to one of the seed members and retrieve current cluster state (Gossip)
- joinCluster(clusterSettings.MaxTimeToRetryJoiningCluster fromNow)
-
- // start periodic gossip and cluster scrutinization
- val initateGossipCanceller = system.scheduler.schedule(initialDelayForGossip, gossipFrequency)(initateGossip())
- val scrutinizeCanceller = system.scheduler.schedule(initialDelayForGossip, gossipFrequency)(scrutinize())
-
- /**
- * Shuts down all connections to other members, the cluster daemon and the periodic gossip and cleanup tasks.
- */
- def shutdown() {
- if (isRunning.compareAndSet(true, false)) {
- log.info("Shutting down Gossiper for [{}]...", address)
- try connectionManager.shutdown() finally {
- try system.stop(clusterDaemon) finally {
- try initateGossipCanceller.cancel() finally {
- try scrutinizeCanceller.cancel() finally {
- log.info("Gossiper for [{}] is shut down", address)
- }
- }
- }
- }
- }
- }
-
- def latestGossip: Gossip = state.get.currentGossip
-
- /**
- * Tell the gossiper some gossip.
- */
- //@tailrec
- final def tell(newGossip: Gossip) {
- val gossipingNode = newGossip.member
-
- failureDetector heartbeat gossipingNode // update heartbeat in failure detector
-
- // FIXME all below here is WRONG - redesign with cluster convergence in mind
-
- // val oldState = state.get
- // println("-------- NEW VERSION " + newGossip)
- // println("-------- OLD VERSION " + oldState.currentGossip)
- // val latestGossip = VectorClock.latestVersionOf(newGossip, oldState.currentGossip)
- // println("-------- WINNING VERSION " + latestGossip)
-
- // val latestAvailableNodes = latestGossip.members
- // val latestUnavailableNodes = latestGossip.unavailableMembers
- // println("=======>>> gossipingNode: " + gossipingNode)
- // println("=======>>> latestAvailableNodes: " + latestAvailableNodes)
- // if (!(latestAvailableNodes contains gossipingNode) && !(latestUnavailableNodes contains gossipingNode)) {
- // println("-------- NEW NODE")
- // // we have a new member
- // val newGossip = latestGossip copy (availableNodes = latestAvailableNodes + gossipingNode)
- // val newState = oldState copy (currentGossip = incrementVersionForGossip(newGossip))
-
- // println("--------- new GOSSIP " + newGossip.members)
- // println("--------- new STATE " + newState)
- // // if we won the race then update else try again
- // if (!state.compareAndSet(oldState, newState)) tell(newGossip) // recur
- // else {
- // println("---------- WON RACE - setting state")
- // // create connections for all new members in the latest gossip
- // (latestAvailableNodes + gossipingNode) foreach { member ⇒
- // setUpConnectionToNode(member)
- // oldState.memberMembershipChangeListeners foreach (_ memberConnected member) // notify listeners about the new members
- // }
- // }
-
- // } else if (latestUnavailableNodes contains gossipingNode) {
- // // gossip from an old former dead member
-
- // val newUnavailableMembers = latestUnavailableNodes - gossipingNode
- // val newMembers = latestAvailableNodes + gossipingNode
-
- // val newGossip = latestGossip copy (availableNodes = newMembers, unavailableNodes = newUnavailableMembers)
- // val newState = oldState copy (currentGossip = incrementVersionForGossip(newGossip))
-
- // // if we won the race then update else try again
- // if (!state.compareAndSet(oldState, newState)) tell(newGossip) // recur
- // else oldState.memberMembershipChangeListeners foreach (_ memberConnected gossipingNode) // notify listeners on successful update of state
- // }
- }
-
- /**
- * Registers a listener to subscribe to cluster membership changes.
- */
- @tailrec
- final def registerListener(listener: NodeMembershipChangeListener) {
- val oldState = state.get
- val newListeners = oldState.memberMembershipChangeListeners + listener
- val newState = oldState copy (memberMembershipChangeListeners = newListeners)
- if (!state.compareAndSet(oldState, newState)) registerListener(listener) // recur
- }
-
- /**
- * Unsubscribes to cluster membership changes.
- */
- @tailrec
- final def unregisterListener(listener: NodeMembershipChangeListener) {
- val oldState = state.get
- val newListeners = oldState.memberMembershipChangeListeners - listener
- val newState = oldState copy (memberMembershipChangeListeners = newListeners)
- if (!state.compareAndSet(oldState, newState)) unregisterListener(listener) // recur
- }
-
- /**
- * Sets up remote connections to all the members in the argument list.
- */
- private def connectToNodes(members: Seq[Member]) {
- members foreach { member ⇒
- setUpConnectionToNode(member)
- state.get.memberMembershipChangeListeners foreach (_ memberConnected member) // notify listeners about the new members
- }
- }
-
- // FIXME should shuffle list randomly before start traversing to avoid connecting to some member on every member
- @tailrec
- final private def connectToRandomNodeOf(members: Seq[Member]): ActorRef = {
- members match {
- case member :: rest ⇒
- setUpConnectionToNode(member) match {
- case Some(connection) ⇒ connection
- case None ⇒ connectToRandomNodeOf(rest) // recur if
- }
- case Nil ⇒
- throw new RemoteConnectionException(
- "Could not establish connection to any of the members in the argument list")
- }
- }
-
- /**
- * Joins the cluster by connecting to one of the seed members and retrieve current cluster state (Gossip).
- */
- private def joinCluster(deadline: Deadline) {
- val seedNodes = seedNodesWithoutMyself // filter out myself
-
- if (!seedNodes.isEmpty) { // if we have seed members to contact
- connectToNodes(seedNodes)
-
- try {
- log.info("Trying to join cluster through one of the seed members [{}]", seedNodes.mkString(", "))
-
- Await.result(connectToRandomNodeOf(seedNodes) ? JoinCluster, seedNodeConnectionTimeout) match {
- case initialGossip: Gossip ⇒
- // just sets/overwrites the state/gossip regardless of what it was before
- // since it should be treated as the initial state
- state.set(state.get copy (currentGossip = initialGossip))
- log.debug("Received initial gossip [{}] from seed member", initialGossip)
-
- case unknown ⇒
- throw new IllegalStateException("Expected initial gossip from seed, received [" + unknown + "]")
- }
- } catch {
- case e: Exception ⇒
- log.error(
- "Could not join cluster through any of the seed members - retrying for another {} seconds",
- deadline.timeLeft.toSeconds)
-
- // retry joining the cluster unless
- // 1. Gossiper is shut down
- // 2. The connection time window has expired
- if (isRunning.get) {
- if (deadline.timeLeft.toMillis > 0) joinCluster(deadline) // recur
- else throw new RemoteConnectionException(
- "Could not join cluster (any of the seed members) - giving up after trying for " +
- deadline.time.toSeconds + " seconds")
- }
- }
- }
- }
-
- /**
- * Initates a new round of gossip.
- */
- private def initateGossip() {
- val oldState = state.get
- val oldGossip = oldState.currentGossip
-
- val oldMembers = oldGossip.members
- val oldMembersSize = oldMembers.size
-
- val oldUnavailableMembers = oldGossip.unavailableMembers
- val oldUnavailableMembersSize = oldUnavailableMembers.size
-
- // 1. gossip to alive members
- val gossipedToSeed =
- if (oldUnavailableMembersSize > 0) gossipToRandomNodeOf(oldMembers)
- else false
-
- // 2. gossip to dead members
- if (oldUnavailableMembersSize > 0) {
- val probability: Double = oldUnavailableMembersSize / (oldMembersSize + 1)
- if (random.nextDouble() < probability) gossipToRandomNodeOf(oldUnavailableMembers)
- }
-
- // 3. gossip to a seed for facilitating partition healing
- if ((!gossipedToSeed || oldMembersSize < 1) && (seeds.head != address)) {
- if (oldMembersSize == 0) gossipToRandomNodeOf(seeds)
- else {
- val probability = 1.0 / oldMembersSize + oldUnavailableMembersSize
- if (random.nextDouble() <= probability) gossipToRandomNodeOf(seeds)
- }
- }
- }
-
- /**
- * Gossips to a random member in the set of members passed in as argument.
- *
- * @return 'true' if it gossiped to a "seed" member.
- */
- private def gossipToRandomNodeOf(members: Set[Member]): Boolean = {
- val peers = members filter (_.address != address) // filter out myself
- val peer = selectRandomNode(peers)
- val oldState = state.get
- val oldGossip = oldState.currentGossip
- // if connection can't be established/found => ignore it since the failure detector will take care of the potential problem
- setUpConnectionToNode(peer) foreach { _ ! newGossip }
- seeds exists (peer == _)
- }
-
- /**
- * Scrutinizes the cluster; marks members detected by the failure detector as unavailable, and notifies all listeners
- * of the change in the cluster membership.
- */
- @tailrec
- final private def scrutinize() {
- val oldState = state.get
- val oldGossip = oldState.currentGossip
-
- val oldMembers = oldGossip.members
- val oldUnavailableMembers = oldGossip.unavailableMembers
- val newlyDetectedUnavailableMembers = oldMembers filterNot (member ⇒ failureDetector.isAvailable(member.address))
-
- if (!newlyDetectedUnavailableMembers.isEmpty) { // we have newly detected members marked as unavailable
- val newMembers = oldMembers diff newlyDetectedUnavailableMembers
- val newUnavailableMembers = oldUnavailableMembers ++ newlyDetectedUnavailableMembers
-
- val newGossip = oldGossip copy (members = newMembers, unavailableMembers = newUnavailableMembers)
- val newState = oldState copy (currentGossip = incrementVersionForGossip(newGossip))
-
- // if we won the race then update else try again
- if (!state.compareAndSet(oldState, newState)) scrutinize() // recur
- else {
- // notify listeners on successful update of state
- for {
- deadNode ← newUnavailableMembers
- listener ← oldState.memberMembershipChangeListeners
- } listener memberDisconnected deadNode
- }
- }
- }
-
- private def setUpConnectionToNode(member: Member): Option[ActorRef] = {
- val address = member.address
- try {
- Some(
- connectionManager.putIfAbsent(
- address,
- () ⇒ system.actorFor(RootActorPath(Address(protocol, system.name)) / "system" / "cluster")))
- } catch {
- case e: Exception ⇒ None
- }
- }
-
- private def newGossip(): Gossip = Gossip(member = address)
-
- private def incrementVersionForGossip(from: Gossip): Gossip = {
- val newVersion = from.version.increment(memberFingerprint, newTimestamp)
- from copy (version = newVersion)
- }
-
- private def seedNodesWithoutMyself: List[Member] = seeds.filter(_.address != address).toList
-
- private def selectRandomNode(members: Set[Member]): Member = members.toList(random.nextInt(members.size))
-}
diff --git a/akka-cluster/src/main/scala/akka/cluster/Node.scala b/akka-cluster/src/main/scala/akka/cluster/Node.scala
new file mode 100644
index 0000000000..61ad9a10d4
--- /dev/null
+++ b/akka-cluster/src/main/scala/akka/cluster/Node.scala
@@ -0,0 +1,1006 @@
+/**
+ * Copyright (C) 2009-2012 Typesafe Inc.
+ */
+
+package akka.cluster
+
+import akka.actor._
+import akka.actor.Status._
+import akka.remote._
+import akka.routing._
+import akka.event.Logging
+import akka.dispatch.Await
+import akka.pattern.ask
+import akka.util._
+import akka.util.duration._
+import akka.config.ConfigurationException
+
+import java.util.concurrent.atomic.{ AtomicReference, AtomicBoolean }
+import java.util.concurrent.TimeUnit._
+import java.util.concurrent.TimeoutException
+import java.security.SecureRandom
+
+import scala.collection.immutable.{ Map, SortedSet }
+import scala.annotation.tailrec
+
+import com.google.protobuf.ByteString
+
+/**
+ * Interface for membership change listener.
+ */
+trait MembershipChangeListener {
+ def notify(members: SortedSet[Member]): Unit
+}
+
+/**
+ * Interface for meta data change listener.
+ */
+trait MetaDataChangeListener {
+ def notify(meta: Map[String, Array[Byte]]): Unit
+}
+
+/**
+ * Base trait for all cluster messages. All ClusterMessage's are serializable.
+ */
+sealed trait ClusterMessage extends Serializable
+
+/**
+ * Cluster commands sent by the USER.
+ */
+object ClusterAction {
+
+ /**
+ * Command to join the cluster. Sent when a node (reprsesented by 'address')
+ * wants to join another node (the receiver).
+ */
+ case class Join(address: Address) extends ClusterMessage
+
+ /**
+ * Command to set a node to Up (from Joining).
+ */
+ case class Up(address: Address) extends ClusterMessage
+
+ /**
+ * Command to leave the cluster.
+ */
+ case class Leave(address: Address) extends ClusterMessage
+
+ /**
+ * Command to mark node as temporary down.
+ */
+ case class Down(address: Address) extends ClusterMessage
+
+ /**
+ * Command to mark a node to be removed from the cluster immediately.
+ */
+ case class Exit(address: Address) extends ClusterMessage
+
+ /**
+ * Command to remove a node from the cluster immediately.
+ */
+ case class Remove(address: Address) extends ClusterMessage
+}
+
+/**
+ * Represents the address and the current status of a cluster member node.
+ */
+class Member(val address: Address, val status: MemberStatus) extends ClusterMessage {
+ override def hashCode = address.##
+ override def equals(other: Any) = Member.unapply(this) == Member.unapply(other)
+ override def toString = "Member(address = %s, status = %s)" format (address, status)
+ def copy(address: Address = this.address, status: MemberStatus = this.status): Member = new Member(address, status)
+}
+
+/**
+ * Factory and Utility module for Member instances.
+ */
+object Member {
+ import MemberStatus._
+
+ implicit val ordering = Ordering.fromLessThan[Member](_.address.toString < _.address.toString)
+
+ def apply(address: Address, status: MemberStatus): Member = new Member(address, status)
+
+ def unapply(other: Any) = other match {
+ case m: Member ⇒ Some(m.address)
+ case _ ⇒ None
+ }
+
+ /**
+ * Picks the Member with the highest "priority" MemberStatus.
+ */
+ def highestPriorityOf(m1: Member, m2: Member): Member = (m1.status, m2.status) match {
+ case (Removed, _) ⇒ m1
+ case (_, Removed) ⇒ m2
+ case (Down, _) ⇒ m1
+ case (_, Down) ⇒ m2
+ case (Exiting, _) ⇒ m1
+ case (_, Exiting) ⇒ m2
+ case (Leaving, _) ⇒ m1
+ case (_, Leaving) ⇒ m2
+ case (Up, Joining) ⇒ m1
+ case (Joining, Up) ⇒ m2
+ case (Joining, Joining) ⇒ m1
+ case (Up, Up) ⇒ m1
+ }
+}
+
+/**
+ * Envelope adding a sender address to the gossip.
+ */
+case class GossipEnvelope(sender: Member, gossip: Gossip) extends ClusterMessage
+
+/**
+ * Defines the current status of a cluster member node
+ *
+ * Can be one of: Joining, Up, Leaving, Exiting and Down.
+ */
+sealed trait MemberStatus extends ClusterMessage
+object MemberStatus {
+ case object Joining extends MemberStatus
+ case object Up extends MemberStatus
+ case object Leaving extends MemberStatus
+ case object Exiting extends MemberStatus
+ case object Down extends MemberStatus
+ case object Removed extends MemberStatus
+
+ def isUnavailable(status: MemberStatus): Boolean = {
+ status == MemberStatus.Down ||
+ status == MemberStatus.Exiting ||
+ status == MemberStatus.Removed ||
+ status == MemberStatus.Leaving
+ }
+}
+
+/**
+ * Represents the overview of the cluster, holds the cluster convergence table and set with unreachable nodes.
+ */
+case class GossipOverview(
+ seen: Map[Address, VectorClock] = Map.empty[Address, VectorClock],
+ unreachable: Set[Member] = Set.empty[Member]) {
+
+ // FIXME document when nodes are put in 'unreachable' set and removed from 'members'
+
+ override def toString =
+ "GossipOverview(seen = [" + seen.mkString(", ") +
+ "], unreachable = [" + unreachable.mkString(", ") +
+ "])"
+}
+
+/**
+ * Represents the state of the cluster; cluster ring membership, ring convergence, meta data - all versioned by a vector clock.
+ */
+case class Gossip(
+ overview: GossipOverview = GossipOverview(),
+ members: SortedSet[Member], // sorted set of members with their status, sorted by name
+ meta: Map[String, Array[Byte]] = Map.empty[String, Array[Byte]],
+ version: VectorClock = VectorClock()) // vector clock version
+ extends ClusterMessage // is a serializable cluster message
+ with Versioned[Gossip] {
+
+ /**
+ * Increments the version for this 'Node'.
+ */
+ def +(node: VectorClock.Node): Gossip = copy(version = version + node)
+
+ def +(member: Member): Gossip = {
+ if (members contains member) this
+ else this copy (members = members + member)
+ }
+
+ /**
+ * Marks the gossip as seen by this node (remoteAddress) by updating the address entry in the 'gossip.overview.seen'
+ * Map with the VectorClock for the new gossip.
+ */
+ def seen(address: Address): Gossip = {
+ if (overview.seen.contains(address) && overview.seen(address) == version) this
+ else this copy (overview = overview copy (seen = overview.seen + (address -> version)))
+ }
+
+ /**
+ * Merges two Gossip instances including membership tables, meta-data tables and the VectorClock histories.
+ */
+ def merge(that: Gossip): Gossip = {
+ import Member.ordering
+
+ // 1. merge vector clocks
+ val mergedVClock = this.version merge that.version
+
+ // 2. group all members by Address => Vector[Member]
+ var membersGroupedByAddress = Map.empty[Address, Vector[Member]]
+ (this.members ++ that.members) foreach { m ⇒
+ val ms = membersGroupedByAddress.get(m.address).getOrElse(Vector.empty[Member])
+ membersGroupedByAddress += (m.address -> (ms :+ m))
+ }
+
+ // 3. merge members by selecting the single Member with highest MemberStatus out of the Member groups
+ val mergedMembers =
+ SortedSet.empty[Member] ++
+ membersGroupedByAddress.values.foldLeft(Vector.empty[Member]) { (acc, members) ⇒
+ acc :+ members.reduceLeft(Member.highestPriorityOf(_, _))
+ }
+
+ // 4. merge meta-data
+ val mergedMeta = this.meta ++ that.meta
+
+ // 5. merge gossip overview
+ val mergedOverview = GossipOverview(
+ this.overview.seen ++ that.overview.seen,
+ this.overview.unreachable ++ that.overview.unreachable)
+
+ Gossip(mergedOverview, mergedMembers, mergedMeta, mergedVClock)
+ }
+
+ override def toString =
+ "Gossip(" +
+ "overview = " + overview +
+ ", members = [" + members.mkString(", ") +
+ "], meta = [" + meta.mkString(", ") +
+ "], version = " + version +
+ ")"
+}
+
+/**
+ * Manages routing of the different cluster commands.
+ * Instantiated as a single instance for each Node - e.g. commands are serialized to Node message after message.
+ */
+final class ClusterCommandDaemon extends Actor {
+ import ClusterAction._
+
+ val node = Node(context.system)
+ val log = Logging(context.system, this)
+
+ def receive = {
+ case Join(address) ⇒ node.joining(address)
+ case Up(address) ⇒ node.up(address)
+ case Down(address) ⇒ node.downing(address)
+ case Leave(address) ⇒ node.leaving(address)
+ case Exit(address) ⇒ node.exiting(address)
+ case Remove(address) ⇒ node.removing(address)
+ }
+
+ override def unhandled(unknown: Any) = log.error("Illegal command [{}]", unknown)
+}
+
+/**
+ * Pooled and routed with N number of configurable instances.
+ * Concurrent access to Node.
+ */
+final class ClusterGossipDaemon extends Actor {
+ val log = Logging(context.system, this)
+ val node = Node(context.system)
+
+ def receive = {
+ case GossipEnvelope(sender, gossip) ⇒ node.receive(sender, gossip)
+ }
+
+ override def unhandled(unknown: Any) = log.error("[/system/cluster/gossip] can not respond to messages - received [{}]", unknown)
+}
+
+/**
+ * Supervisor managing the different cluste daemons.
+ */
+final class ClusterDaemonSupervisor extends Actor {
+ val log = Logging(context.system, this)
+ val node = Node(context.system)
+
+ private val commands = context.actorOf(Props[ClusterCommandDaemon], "commands")
+ private val gossip = context.actorOf(
+ Props[ClusterGossipDaemon].withRouter(RoundRobinRouter(node.clusterSettings.NrOfGossipDaemons)), "gossip")
+
+ def receive = Actor.emptyBehavior
+
+ override def unhandled(unknown: Any): Unit = log.error("[/system/cluster] can not respond to messages - received [{}]", unknown)
+}
+
+/**
+ * Node Extension Id and factory for creating Node extension.
+ * Example:
+ * {{{
+ * val node = Node(system)
+ *
+ * if (node.isLeader) { ... }
+ * }}}
+ */
+object Node extends ExtensionId[Node] with ExtensionIdProvider {
+ override def get(system: ActorSystem): Node = super.get(system)
+
+ override def lookup = Node
+
+ override def createExtension(system: ExtendedActorSystem): Node = new Node(system)
+}
+
+/**
+ * This module is responsible for Gossiping cluster information. The abstraction maintains the list of live
+ * and dead members. Periodically i.e. every 1 second this module chooses a random member and initiates a round
+ * of Gossip with it. Whenever it gets gossip updates it updates the Failure Detector with the liveness
+ * information.
+ *
+ * During each of these runs the member initiates gossip exchange according to following rules (as defined in the
+ * Cassandra documentation [http://wiki.apache.org/cassandra/ArchitectureGossip]:
+ *
+ * 1) Gossip to random live member (if any)
+ * 2) Gossip to random unreachable member with certain probability depending on number of unreachable and live members
+ * 3) If the member gossiped to at (1) was not deputy, or the number of live members is less than number of deputy list,
+ * gossip to random deputy with certain probability depending on number of unreachable, deputy and live members.
+ *
+ *
+ * Example:
+ * {{{
+ * val node = Node(system)
+ *
+ * if (node.isLeader) { ... }
+ * }}}
+ */
+class Node(system: ExtendedActorSystem) extends Extension {
+
+ /**
+ * Represents the state for this Node. Implemented using optimistic lockless concurrency,
+ * all state is represented by this immutable case class and managed by an AtomicReference.
+ */
+ private case class State(
+ latestGossip: Gossip,
+ memberMembershipChangeListeners: Set[MembershipChangeListener] = Set.empty[MembershipChangeListener])
+
+ if (!system.provider.isInstanceOf[RemoteActorRefProvider])
+ throw new ConfigurationException("ActorSystem[" + system + "] needs to have a 'RemoteActorRefProvider' enabled in the configuration")
+
+ private val remote: RemoteActorRefProvider = system.provider.asInstanceOf[RemoteActorRefProvider]
+
+ val remoteSettings = new RemoteSettings(system.settings.config, system.name)
+ val clusterSettings = new ClusterSettings(system.settings.config, system.name)
+
+ val remoteAddress = remote.transport.address
+ val failureDetector = new AccrualFailureDetector(
+ system, remoteAddress, clusterSettings.FailureDetectorThreshold, clusterSettings.FailureDetectorMaxSampleSize)
+
+ private val vclockNode = VectorClock.Node(remoteAddress.toString)
+
+ private val gossipInitialDelay = clusterSettings.GossipInitialDelay
+ private val gossipFrequency = clusterSettings.GossipFrequency
+
+ implicit private val defaultTimeout = Timeout(remoteSettings.RemoteSystemDaemonAckTimeout)
+
+ private val autoDown = clusterSettings.AutoDown
+ private val nrOfDeputyNodes = clusterSettings.NrOfDeputyNodes
+ private val nrOfGossipDaemons = clusterSettings.NrOfGossipDaemons
+ private val nodeToJoin: Option[Address] = clusterSettings.NodeToJoin filter (_ != remoteAddress)
+
+ private val serialization = remote.serialization
+
+ private val isRunning = new AtomicBoolean(true)
+ private val log = Logging(system, "Node")
+ private val random = SecureRandom.getInstance("SHA1PRNG")
+
+ log.info("Node [{}] - is JOINING cluster...", remoteAddress)
+
+ // create superisor for daemons under path "/system/cluster"
+ private val clusterDaemons = {
+ val createChild = CreateChild(Props[ClusterDaemonSupervisor], "cluster")
+ Await.result(system.systemGuardian ? createChild, defaultTimeout.duration) match {
+ case a: ActorRef ⇒ a
+ case e: Exception ⇒ throw e
+ }
+ }
+
+ private val state = {
+ val member = Member(remoteAddress, MemberStatus.Joining)
+ val gossip = Gossip(members = SortedSet.empty[Member] + member) + vclockNode // add me as member and update my vector clock
+ new AtomicReference[State](State(gossip))
+ }
+
+ // try to join the node defined in the 'akka.cluster.node-to-join' option
+ autoJoin()
+
+ // ========================================================
+ // ===================== WORK DAEMONS =====================
+ // ========================================================
+
+ // start periodic gossip to random nodes in cluster
+ private val gossipCanceller = system.scheduler.schedule(gossipInitialDelay, gossipFrequency) {
+ gossip()
+ }
+
+ // start periodic cluster failure detector reaping (moving nodes condemned by the failure detector to unreachable list)
+ private val failureDetectorReaperCanceller = system.scheduler.schedule(gossipInitialDelay, gossipFrequency) { // TODO: should we use the same gossipFrequency for reaping?
+ reapUnreachableMembers()
+ }
+
+ // start periodic leader action management (only applies for the current leader)
+ private val leaderActionsCanceller = system.scheduler.schedule(gossipInitialDelay, gossipFrequency) { // TODO: should we use the same gossipFrequency for leaderActions?
+ leaderActions()
+ }
+
+ log.info("Node [{}] - has JOINED cluster successfully", remoteAddress)
+
+ // ======================================================
+ // ===================== PUBLIC API =====================
+ // ======================================================
+
+ def self: Member = latestGossip.members
+ .find(_.address == remoteAddress)
+ .getOrElse(throw new IllegalStateException("Can't find 'this' Member in the cluster membership ring"))
+
+ /**
+ * Latest gossip.
+ */
+ def latestGossip: Gossip = state.get.latestGossip
+
+ /**
+ * Member status for this node.
+ */
+ def status: MemberStatus = self.status
+
+ /**
+ * Is this node the leader?
+ */
+ def isLeader: Boolean = {
+ val members = latestGossip.members
+ !members.isEmpty && (remoteAddress == members.head.address)
+ }
+
+ /**
+ * Is this node a singleton cluster?
+ */
+ def isSingletonCluster: Boolean = isSingletonCluster(state.get)
+
+ /**
+ * Checks if we have a cluster convergence.
+ *
+ * @returns Some(convergedGossip) if convergence have been reached and None if not
+ */
+ def convergence: Option[Gossip] = convergence(latestGossip)
+
+ /**
+ * Returns true if the node is UP or JOINING.
+ */
+ def isAvailable: Boolean = !isUnavailable(state.get)
+
+ /**
+ * Shuts down all connections to other members, the cluster daemon and the periodic gossip and cleanup tasks.
+ */
+ def shutdown() {
+ if (isRunning.compareAndSet(true, false)) {
+ log.info("Node [{}] - Shutting down Node and cluster daemons...", remoteAddress)
+ gossipCanceller.cancel()
+ failureDetectorReaperCanceller.cancel()
+ leaderActionsCanceller.cancel()
+ system.stop(clusterDaemons)
+ }
+ }
+
+ /**
+ * Registers a listener to subscribe to cluster membership changes.
+ */
+ @tailrec
+ final def registerListener(listener: MembershipChangeListener) {
+ val localState = state.get
+ val newListeners = localState.memberMembershipChangeListeners + listener
+ val newState = localState copy (memberMembershipChangeListeners = newListeners)
+ if (!state.compareAndSet(localState, newState)) registerListener(listener) // recur
+ }
+
+ /**
+ * Unsubscribes to cluster membership changes.
+ */
+ @tailrec
+ final def unregisterListener(listener: MembershipChangeListener) {
+ val localState = state.get
+ val newListeners = localState.memberMembershipChangeListeners - listener
+ val newState = localState copy (memberMembershipChangeListeners = newListeners)
+ if (!state.compareAndSet(localState, newState)) unregisterListener(listener) // recur
+ }
+
+ /**
+ * Send command to JOIN one node to another.
+ */
+ def scheduleNodeJoin(address: Address) {
+ clusterCommandDaemon ! ClusterAction.Join(address)
+ }
+
+ /**
+ * Send command to issue state transition to LEAVING.
+ */
+ def scheduleNodeLeave(address: Address) {
+ clusterCommandDaemon ! ClusterAction.Leave(address)
+ }
+
+ /**
+ * Send command to issue state transition to EXITING.
+ */
+ def scheduleNodeDown(address: Address) {
+ clusterCommandDaemon ! ClusterAction.Down(address)
+ }
+
+ /**
+ * Send command to issue state transition to REMOVED.
+ */
+ def scheduleNodeRemove(address: Address) {
+ clusterCommandDaemon ! ClusterAction.Remove(address)
+ }
+
+ // ========================================================
+ // ===================== INTERNAL API =====================
+ // ========================================================
+
+ /**
+ * State transition to JOINING.
+ * New node joining.
+ */
+ @tailrec
+ private[cluster] final def joining(node: Address) {
+ log.info("Node [{}] - Node [{}] is JOINING", remoteAddress, node)
+
+ val localState = state.get
+ val localGossip = localState.latestGossip
+ val localMembers = localGossip.members
+ val localOverview = localGossip.overview
+ val localUnreachableMembers = localOverview.unreachable
+
+ // remove the node from the 'unreachable' set in case it is a DOWN node that is rejoining cluster
+ val newUnreachableMembers = localUnreachableMembers filterNot { _.address == node }
+ val newOverview = localOverview copy (unreachable = newUnreachableMembers)
+
+ val newMembers = localMembers + Member(node, MemberStatus.Joining) // add joining node as Joining
+ val newGossip = localGossip copy (overview = newOverview, members = newMembers)
+
+ val versionedGossip = newGossip + vclockNode
+ val seenVersionedGossip = versionedGossip seen remoteAddress
+
+ val newState = localState copy (latestGossip = seenVersionedGossip)
+
+ if (!state.compareAndSet(localState, newState)) joining(node) // recur if we failed update
+ else {
+ failureDetector heartbeat node // update heartbeat in failure detector
+ if (convergence(newState.latestGossip).isDefined) {
+ newState.memberMembershipChangeListeners foreach { _ notify newMembers }
+ }
+ }
+ }
+
+ /**
+ * State transition to UP.
+ */
+ private[cluster] final def up(address: Address) {
+ log.info("Node [{}] - Marking node [{}] as UP", remoteAddress, address)
+ }
+
+ /**
+ * State transition to LEAVING.
+ */
+ private[cluster] final def leaving(address: Address) {
+ log.info("Node [{}] - Marking node [{}] as LEAVING", remoteAddress, address)
+ }
+
+ /**
+ * State transition to EXITING.
+ */
+ private[cluster] final def exiting(address: Address) {
+ log.info("Node [{}] - Marking node [{}] as EXITING", remoteAddress, address)
+ }
+
+ /**
+ * State transition to REMOVED.
+ */
+ private[cluster] final def removing(address: Address) {
+ log.info("Node [{}] - Marking node [{}] as REMOVED", remoteAddress, address)
+ }
+
+ /**
+ * The node to DOWN is removed from the 'members' set and put in the 'unreachable' set (if not alread there)
+ * and its status is set to DOWN. The node is alo removed from the 'seen' table.
+ *
+ * The node will reside as DOWN in the 'unreachable' set until an explicit command JOIN command is sent directly
+ * to this node and it will then go through the normal JOINING procedure.
+ */
+ @tailrec
+ final private[cluster] def downing(address: Address) {
+ val localState = state.get
+ val localGossip = localState.latestGossip
+ val localMembers = localGossip.members
+ val localOverview = localGossip.overview
+ val localSeen = localOverview.seen
+ val localUnreachableMembers = localOverview.unreachable
+
+ // 1. check if the node to DOWN is in the 'members' set
+ var downedMember: Option[Member] = None
+ val newMembers =
+ localMembers
+ .map { member ⇒
+ if (member.address == address) {
+ log.info("Node [{}] - Marking node [{}] as DOWN", remoteAddress, member.address)
+ val newMember = member copy (status = MemberStatus.Down)
+ downedMember = Some(newMember)
+ newMember
+ } else member
+ }
+ .filter(_.status != MemberStatus.Down)
+
+ // 2. check if the node to DOWN is in the 'unreachable' set
+ val newUnreachableMembers =
+ localUnreachableMembers
+ .filter(_.status != MemberStatus.Down) // no need to DOWN members already DOWN
+ .map { member ⇒
+ if (member.address == address) {
+ log.info("Node [{}] - Marking unreachable node [{}] as DOWN", remoteAddress, member.address)
+ member copy (status = MemberStatus.Down)
+ } else member
+ }
+
+ // 3. add the newly DOWNED members from the 'members' (in step 1.) to the 'newUnreachableMembers' set.
+ val newUnreachablePlusNewlyDownedMembers = downedMember match {
+ case Some(member) ⇒ newUnreachableMembers + member
+ case None ⇒ newUnreachableMembers
+ }
+
+ // 4. remove nodes marked as DOWN from the 'seen' table
+ val newSeen = newUnreachablePlusNewlyDownedMembers.foldLeft(localSeen) { (currentSeen, member) ⇒
+ currentSeen - member.address
+ }
+
+ val newOverview = localOverview copy (seen = newSeen, unreachable = newUnreachablePlusNewlyDownedMembers) // update gossip overview
+ val newGossip = localGossip copy (overview = newOverview, members = newMembers) // update gossip
+ val versionedGossip = newGossip + vclockNode
+ val newState = localState copy (latestGossip = versionedGossip seen remoteAddress)
+
+ if (!state.compareAndSet(localState, newState)) downing(address) // recur if we fail the update
+ else {
+ if (convergence(newState.latestGossip).isDefined) {
+ newState.memberMembershipChangeListeners foreach { _ notify newState.latestGossip.members }
+ }
+ }
+ }
+
+ /**
+ * Receive new gossip.
+ */
+ @tailrec
+ final private[cluster] def receive(sender: Member, remoteGossip: Gossip) {
+ val localState = state.get
+ val localGossip = localState.latestGossip
+
+ val winningGossip =
+ if (remoteGossip.version <> localGossip.version) {
+ // concurrent
+ val mergedGossip = remoteGossip merge localGossip
+ val versionedMergedGossip = mergedGossip + vclockNode
+
+ log.debug(
+ "Can't establish a causal relationship between \"remote\" gossip [{}] and \"local\" gossip [{}] - merging them into [{}]",
+ remoteGossip, localGossip, versionedMergedGossip)
+
+ versionedMergedGossip
+
+ } else if (remoteGossip.version < localGossip.version) {
+ // local gossip is newer
+ localGossip
+
+ } else {
+ // remote gossip is newer
+ remoteGossip
+ }
+
+ val newState = localState copy (latestGossip = winningGossip seen remoteAddress)
+
+ // if we won the race then update else try again
+ if (!state.compareAndSet(localState, newState)) receive(sender, remoteGossip) // recur if we fail the update
+ else {
+ log.debug("Node [{}] - Receiving gossip from [{}]", remoteAddress, sender.address)
+
+ failureDetector heartbeat sender.address // update heartbeat in failure detector
+
+ if (convergence(newState.latestGossip).isDefined) {
+ newState.memberMembershipChangeListeners foreach { _ notify newState.latestGossip.members }
+ }
+ }
+ }
+
+ /**
+ * Joins the pre-configured contact point and retrieves current gossip state.
+ */
+ private def autoJoin() = nodeToJoin foreach { address ⇒
+ val connection = clusterCommandConnectionFor(address)
+ val command = ClusterAction.Join(remoteAddress)
+ log.info("Node [{}] - Sending [{}] to [{}] through connection [{}]", remoteAddress, command, address, connection)
+ connection ! command
+ }
+
+ /**
+ * Switches the member status.
+ *
+ * @param newStatus the new member status
+ * @param oldState the state to change the member status in
+ * @return the updated new state with the new member status
+ */
+ private def switchMemberStatusTo(newStatus: MemberStatus, state: State): State = {
+ log.info("Node [{}] - Switching membership status to [{}]", remoteAddress, newStatus)
+
+ val localSelf = self
+
+ val localGossip = state.latestGossip
+ val localMembers = localGossip.members
+
+ // change my state into a "new" self
+ val newSelf = localSelf copy (status = newStatus)
+
+ // change my state in 'gossip.members'
+ val newMembersSet = localMembers map { member ⇒
+ if (member.address == remoteAddress) newSelf
+ else member
+ }
+
+ // ugly crap to work around bug in scala colletions ('val ss: SortedSet[Member] = SortedSet.empty[Member] ++ aSet' does not compile)
+ val newMembersSortedSet = SortedSet[Member](newMembersSet.toList: _*)
+ val newGossip = localGossip copy (members = newMembersSortedSet)
+
+ // version my changes
+ val versionedGossip = newGossip + vclockNode
+ val seenVersionedGossip = versionedGossip seen remoteAddress
+
+ state copy (latestGossip = seenVersionedGossip)
+ }
+
+ /**
+ * Gossips latest gossip to an address.
+ */
+ private def gossipTo(address: Address) {
+ val connection = clusterGossipConnectionFor(address)
+ log.debug("Node [{}] - Gossiping to [{}]", remoteAddress, connection)
+ connection ! GossipEnvelope(self, latestGossip)
+ }
+
+ /**
+ * Gossips latest gossip to a random member in the set of members passed in as argument.
+ *
+ * @return 'true' if it gossiped to a "deputy" member.
+ */
+ private def gossipToRandomNodeOf(addresses: Iterable[Address]): Boolean = {
+ if (addresses.isEmpty) false
+ else {
+ val peers = addresses filter (_ != remoteAddress) // filter out myself
+ val peer = selectRandomNode(peers)
+ gossipTo(peer)
+ deputyNodes exists (peer == _)
+ }
+ }
+
+ /**
+ * Initates a new round of gossip.
+ */
+ private def gossip() {
+ val localState = state.get
+ val localGossip = localState.latestGossip
+ val localMembers = localGossip.members
+
+ if (!isSingletonCluster(localState) && isAvailable(localState)) {
+ // only gossip if we are a non-singleton cluster and available
+
+ log.debug("Node [{}] - Initiating new round of gossip", remoteAddress)
+
+ val localGossip = localState.latestGossip
+ val localMembers = localGossip.members
+ val localMembersSize = localMembers.size
+
+ val localUnreachableMembers = localGossip.overview.unreachable
+ val localUnreachableSize = localUnreachableMembers.size
+
+ // 1. gossip to alive members
+ val gossipedToDeputy = gossipToRandomNodeOf(localMembers map { _.address })
+
+ // 2. gossip to unreachable members
+ if (localUnreachableSize > 0) {
+ val probability: Double = localUnreachableSize / (localMembersSize + 1)
+ if (random.nextDouble() < probability) gossipToRandomNodeOf(localUnreachableMembers.map(_.address))
+ }
+
+ // 3. gossip to a deputy nodes for facilitating partition healing
+ val deputies = deputyNodes
+ if ((!gossipedToDeputy || localMembersSize < 1) && !deputies.isEmpty) {
+ if (localMembersSize == 0) gossipToRandomNodeOf(deputies)
+ else {
+ val probability = 1.0 / localMembersSize + localUnreachableSize
+ if (random.nextDouble() <= probability) gossipToRandomNodeOf(deputies)
+ }
+ }
+ }
+ }
+
+ /**
+ * Reaps the unreachable members (moves them to the 'unreachable' list in the cluster overview) according to the failure detector's verdict.
+ */
+ @tailrec
+ final private def reapUnreachableMembers() {
+ val localState = state.get
+
+ if (!isSingletonCluster(localState) && isAvailable(localState)) {
+ // only scrutinize if we are a non-singleton cluster and available
+
+ val localGossip = localState.latestGossip
+ val localOverview = localGossip.overview
+ val localSeen = localOverview.seen
+ val localMembers = localGossip.members
+ val localUnreachableMembers = localGossip.overview.unreachable
+
+ val newlyDetectedUnreachableMembers = localMembers filterNot { member ⇒ failureDetector.isAvailable(member.address) }
+
+ if (!newlyDetectedUnreachableMembers.isEmpty) { // we have newly detected members marked as unavailable
+
+ val newMembers = localMembers diff newlyDetectedUnreachableMembers
+ val newUnreachableMembers: Set[Member] = localUnreachableMembers ++ newlyDetectedUnreachableMembers
+
+ val newOverview = localOverview copy (unreachable = newUnreachableMembers)
+ val newGossip = localGossip copy (overview = newOverview, members = newMembers)
+
+ // updating vclock and 'seen' table
+ val versionedGossip = newGossip + vclockNode
+ val seenVersionedGossip = versionedGossip seen remoteAddress
+
+ val newState = localState copy (latestGossip = seenVersionedGossip)
+
+ // if we won the race then update else try again
+ if (!state.compareAndSet(localState, newState)) reapUnreachableMembers() // recur
+ else {
+ log.info("Node [{}] - Marking node(s) as UNREACHABLE [{}]", remoteAddress, newlyDetectedUnreachableMembers.mkString(", "))
+
+ if (convergence(newState.latestGossip).isDefined) {
+ newState.memberMembershipChangeListeners foreach { _ notify newMembers }
+ }
+ }
+ }
+ }
+ }
+
+ /**
+ * Runs periodic leader actions, such as auto-downing unreachable nodes, assigning partitions etc.
+ */
+ @tailrec
+ final private def leaderActions() {
+ val localState = state.get
+ val localGossip = localState.latestGossip
+ val localMembers = localGossip.members
+
+ val isLeader = !localMembers.isEmpty && (remoteAddress == localMembers.head.address)
+
+ if (isLeader && isAvailable(localState)) {
+ // only run the leader actions if we are the LEADER and available
+
+ val localOverview = localGossip.overview
+ val localSeen = localOverview.seen
+ val localUnreachableMembers = localGossip.overview.unreachable
+
+ // Leader actions are as follows:
+ // 1. Move JOINING => UP
+ // 2. Move EXITING => REMOVED
+ // 3. Move UNREACHABLE => DOWN (auto-downing by leader)
+ // 4. Updating the vclock version for the changes
+ // 5. Updating the 'seen' table
+
+ var hasChangedState = false
+ val newGossip =
+
+ if (convergence(localGossip).isDefined) {
+ // we have convergence - so we can't have unreachable nodes
+
+ val newMembers =
+ localMembers map { member ⇒
+ // 1. Move JOINING => UP
+ if (member.status == MemberStatus.Joining) {
+ log.info("Node [{}] - Leader is moving node [{}] from JOINING to UP", remoteAddress, member.address)
+ hasChangedState = true
+ member copy (status = MemberStatus.Up)
+ } else member
+ } map { member ⇒
+ // 2. Move EXITING => REMOVED
+ if (member.status == MemberStatus.Exiting) {
+ log.info("Node [{}] - Leader is moving node [{}] from EXITING to REMOVED", remoteAddress, member.address)
+ hasChangedState = true
+ member copy (status = MemberStatus.Removed)
+ } else member
+ }
+ localGossip copy (members = newMembers) // update gossip
+
+ } else if (autoDown) {
+ // we don't have convergence - so we might have unreachable nodes
+ // if 'auto-down' is turned on, then try to auto-down any unreachable nodes
+
+ // 3. Move UNREACHABLE => DOWN (auto-downing by leader)
+ val newUnreachableMembers =
+ localUnreachableMembers
+ .filter(_.status != MemberStatus.Down) // no need to DOWN members already DOWN
+ .map { member ⇒
+ log.info("Node [{}] - Leader is marking unreachable node [{}] as DOWN", remoteAddress, member.address)
+ hasChangedState = true
+ member copy (status = MemberStatus.Down)
+ }
+
+ // removing nodes marked as DOWN from the 'seen' table
+ val newSeen = localUnreachableMembers.foldLeft(localSeen)((currentSeen, member) ⇒ currentSeen - member.address)
+
+ val newOverview = localOverview copy (seen = newSeen, unreachable = newUnreachableMembers) // update gossip overview
+ localGossip copy (overview = newOverview) // update gossip
+
+ } else localGossip
+
+ if (hasChangedState) { // we have a change of state - version it and try to update
+
+ // 4. Updating the vclock version for the changes
+ val versionedGossip = newGossip + vclockNode
+
+ // 5. Updating the 'seen' table
+ val seenVersionedGossip = versionedGossip seen remoteAddress
+
+ val newState = localState copy (latestGossip = seenVersionedGossip)
+
+ // if we won the race then update else try again
+ if (!state.compareAndSet(localState, newState)) leaderActions() // recur
+ else {
+ if (convergence(newState.latestGossip).isDefined) {
+ newState.memberMembershipChangeListeners map { _ notify newGossip.members }
+ }
+ }
+ }
+ }
+ }
+
+ /**
+ * Checks if we have a cluster convergence. If there are any unreachable nodes then we can't have a convergence -
+ * waiting for user to act (issuing DOWN) or leader to act (issuing DOWN through auto-down).
+ *
+ * @returns Some(convergedGossip) if convergence have been reached and None if not
+ */
+ private def convergence(gossip: Gossip): Option[Gossip] = {
+ val overview = gossip.overview
+ val unreachable = overview.unreachable
+
+ // First check that:
+ // 1. we don't have any members that are unreachable (unreachable.isEmpty == true), or
+ // 2. all unreachable members in the set have status DOWN
+ // Else we can't continue to check for convergence
+ // When that is done we check that all the entries in the 'seen' table have the same vector clock version
+ if (unreachable.isEmpty || !unreachable.exists(m ⇒ m.status != MemberStatus.Down || m.status != MemberStatus.Removed)) {
+ val seen = gossip.overview.seen
+ val views = Set.empty[VectorClock] ++ seen.values
+
+ if (views.size == 1) {
+ log.debug("Node [{}] - Cluster convergence reached", remoteAddress)
+ Some(gossip)
+ } else None
+ } else None
+ }
+
+ private def isAvailable(state: State): Boolean = !isUnavailable(state)
+
+ private def isUnavailable(state: State): Boolean = {
+ val localGossip = state.latestGossip
+ val localOverview = localGossip.overview
+ val localMembers = localGossip.members
+ val localUnreachableMembers = localOverview.unreachable
+ val isUnreachable = localUnreachableMembers exists { _.address == remoteAddress }
+ val hasUnavailableMemberStatus = localMembers exists { m ⇒ (m == self) && MemberStatus.isUnavailable(m.status) }
+ isUnreachable || hasUnavailableMemberStatus
+ }
+
+ /**
+ * Looks up and returns the local cluster command connection.
+ */
+ private def clusterCommandDaemon = system.actorFor(RootActorPath(remoteAddress) / "system" / "cluster" / "commands")
+
+ /**
+ * Looks up and returns the remote cluster command connection for the specific address.
+ */
+ private def clusterCommandConnectionFor(address: Address): ActorRef = system.actorFor(RootActorPath(address) / "system" / "cluster" / "commands")
+
+ /**
+ * Looks up and returns the remote cluster gossip connection for the specific address.
+ */
+ private def clusterGossipConnectionFor(address: Address): ActorRef = system.actorFor(RootActorPath(address) / "system" / "cluster" / "gossip")
+
+ /**
+ * Gets an Iterable with the addresses of a all the 'deputy' nodes - excluding this node if part of the group.
+ */
+ private def deputyNodes: Iterable[Address] = state.get.latestGossip.members.toIterable map (_.address) drop 1 take nrOfDeputyNodes filter (_ != remoteAddress)
+
+ private def selectRandomNode(addresses: Iterable[Address]): Address = addresses.toSeq(random nextInt addresses.size)
+
+ private def isSingletonCluster(currentState: State): Boolean = currentState.latestGossip.members.size == 1
+}
diff --git a/akka-cluster/src/main/scala/akka/cluster/VectorClock.scala b/akka-cluster/src/main/scala/akka/cluster/VectorClock.scala
index ef1f1be490..512d29caad 100644
--- a/akka-cluster/src/main/scala/akka/cluster/VectorClock.scala
+++ b/akka-cluster/src/main/scala/akka/cluster/VectorClock.scala
@@ -5,82 +5,36 @@
package akka.cluster
import akka.AkkaException
+import akka.event.Logging
+import akka.actor.ActorSystem
+
+import System.{ currentTimeMillis ⇒ newTimestamp }
+import java.security.MessageDigest
+import java.util.concurrent.atomic.AtomicLong
class VectorClockException(message: String) extends AkkaException(message)
/**
* Trait to be extended by classes that wants to be versioned using a VectorClock.
*/
-trait Versioned {
+trait Versioned[T] {
def version: VectorClock
+ def +(node: VectorClock.Node): T
}
/**
* Utility methods for comparing Versioned instances.
*/
object Versioned {
- def latestVersionOf[T <: Versioned](versioned1: T, versioned2: T): T = {
- (versioned1.version compare versioned2.version) match {
- case VectorClock.Before ⇒ versioned2 // version 1 is BEFORE (older), use version 2
- case VectorClock.After ⇒ versioned1 // version 1 is AFTER (newer), use version 1
- case VectorClock.Concurrent ⇒ versioned1 // can't establish a causal relationship between versions => conflict - keeping version 1
- }
- }
-}
-
-/**
- * Representation of a Vector-based clock (counting clock), inspired by Lamport logical clocks.
- * {{
- * Reference:
- * 1) Leslie Lamport (1978). "Time, clocks, and the ordering of events in a distributed system". Communications of the ACM 21 (7): 558-565.
- * 2) Friedemann Mattern (1988). "Virtual Time and Global States of Distributed Systems". Workshop on Parallel and Distributed Algorithms: pp. 215-226
- * }}
- */
-case class VectorClock(
- versions: Vector[VectorClock.Entry] = Vector.empty[VectorClock.Entry],
- timestamp: Long = System.currentTimeMillis) {
- import VectorClock._
-
- def compare(other: VectorClock): Ordering = VectorClock.compare(this, other)
-
- def increment(fingerprint: Int, timestamp: Long): VectorClock = {
- val newVersions =
- if (versions exists (entry ⇒ entry.fingerprint == fingerprint)) {
- // update existing node entry
- versions map { entry ⇒
- if (entry.fingerprint == fingerprint) entry.increment()
- else entry
- }
- } else {
- // create and append a new node entry
- versions :+ Entry(fingerprint = fingerprint)
- }
- if (newVersions.size > MaxNrOfVersions) throw new VectorClockException("Max number of versions reached")
- copy(versions = newVersions, timestamp = timestamp)
- }
-
- def maxVersion: Long = versions.foldLeft(1L)((max, entry) ⇒ math.max(max, entry.version))
-
- // FIXME Do we need to implement VectorClock.merge?
- def merge(other: VectorClock): VectorClock = {
- sys.error("Not implemented")
- }
-}
-
-/**
- * Module with helper classes and methods.
- */
-object VectorClock {
- final val MaxNrOfVersions = Short.MaxValue
/**
- * The result of comparing two vector clocks.
+ * The result of comparing two Versioned objects.
* Either:
- * {{
- * 1) v1 is BEFORE v2
- * 2) v1 is AFTER t2
- * 3) v1 happens CONCURRENTLY to v2
- * }}
+ * {{{
+ * 1) v1 is BEFORE v2 => Before
+ * 2) v1 is AFTER t2 => After
+ * 3) v1 happens CONCURRENTLY to v2 => Concurrent
+ * }}}
*/
sealed trait Ordering
case object Before extends Ordering
@@ -88,55 +42,153 @@ object VectorClock {
case object Concurrent extends Ordering
/**
- * Versioned entry in a vector clock.
+ * Returns or 'Ordering' for the two 'Versioned' instances.
*/
- case class Entry(fingerprint: Int, version: Long = 1L) {
- def increment(): Entry = copy(version = version + 1L)
+ def compare[T <: Versioned[T]](versioned1: Versioned[T], versioned2: Versioned[T]): Ordering = {
+ if (versioned1.version <> versioned2.version) Concurrent
+ else if (versioned1.version < versioned2.version) Before
+ else After
}
/**
- * Compare two vector clocks. The outcomes will be one of the following:
- *
- * {{
- * 1. Clock 1 is BEFORE clock 2 if there exists an i such that c1(i) <= c(2) and there does not exist a j such that c1(j) > c2(j).
- * 2. Clock 1 is CONCURRENT to clock 2 if there exists an i, j such that c1(i) < c2(i) and c1(j) > c2(j).
- * 3. Clock 1 is AFTER clock 2 otherwise.
- * }}
- *
- * @param v1 The first VectorClock
- * @param v2 The second VectorClock
+ * Returns the Versioned that have the latest version.
*/
- def compare(v1: VectorClock, v2: VectorClock): Ordering = {
- if ((v1 eq null) || (v2 eq null)) throw new IllegalArgumentException("Can't compare null VectorClocks")
-
- // FIXME rewrite to functional style, now uses ugly imperative algorithm
-
- var v1Bigger, v2Bigger = false // We do two checks: v1 <= v2 and v2 <= v1 if both are true then
- var p1, p2 = 0
-
- while (p1 < v1.versions.size && p2 < v2.versions.size) {
- val ver1 = v1.versions(p1)
- val ver2 = v2.versions(p2)
- if (ver1.fingerprint == ver2.fingerprint) {
- if (ver1.version > ver2.version) v1Bigger = true
- else if (ver2.version > ver1.version) v2Bigger = true
- p1 += 1
- p2 += 1
- } else if (ver1.fingerprint > ver2.fingerprint) {
- v2Bigger = true // Since ver1 is bigger that means it is missing a version that ver2 has
- p2 += 1
- } else {
- v1Bigger = true // This means ver2 is bigger which means it is missing a version ver1 has
- p1 += 1
- }
+ def latestVersionOf[T <: Versioned[T]](versioned1: T, versioned2: T): T = {
+ compare(versioned1, versioned2) match {
+ case Concurrent ⇒ versioned2
+ case Before ⇒ versioned2
+ case After ⇒ versioned1
}
-
- if (p1 < v1.versions.size) v1Bigger = true
- else if (p2 < v2.versions.size) v2Bigger = true
-
- if (!v1Bigger && !v2Bigger) Before // This is the case where they are equal, return BEFORE arbitrarily
- else if (v1Bigger && !v2Bigger) After // This is the case where v1 is a successor clock to v2
- else if (!v1Bigger && v2Bigger) Before // This is the case where v2 is a successor clock to v1
- else Concurrent // This is the case where both clocks are parallel to one another
}
}
+
+/**
+ * VectorClock module with helper classes and methods.
+ *
+ * Based on code from the 'vlock' VectorClock library by Coda Hale.
+ */
+object VectorClock {
+
+ /**
+ * Hash representation of a versioned node name.
+ */
+ sealed trait Node extends Serializable
+
+ object Node {
+ private case class NodeImpl(name: String) extends Node {
+ override def toString(): String = "Node(" + name + ")"
+ }
+
+ def apply(name: String): Node = NodeImpl(hash(name))
+
+ private def hash(name: String): String = {
+ val digester = MessageDigest.getInstance("MD5")
+ digester update name.getBytes("UTF-8")
+ digester.digest.map { h ⇒ "%02x".format(0xFF & h) }.mkString
+ }
+ }
+
+ /**
+ * Timestamp representation a unique 'Ordered' timestamp.
+ */
+ case class Timestamp private (time: Long) extends Ordered[Timestamp] {
+ def max(other: Timestamp) = {
+ if (this < other) other
+ else this
+ }
+
+ def compare(other: Timestamp) = time compare other.time
+
+ override def toString = "%016x" format time
+ }
+
+ object Timestamp {
+ private val counter = new AtomicLong(newTimestamp)
+
+ def zero(): Timestamp = Timestamp(0L)
+
+ def apply(): Timestamp = {
+ var newTime: Long = 0L
+ while (newTime == 0) {
+ val last = counter.get
+ val current = newTimestamp
+ val next = if (current > last) current else last + 1
+ if (counter.compareAndSet(last, next)) {
+ newTime = next
+ }
+ }
+ new Timestamp(newTime)
+ }
+ }
+}
+
+/**
+ * Representation of a Vector-based clock (counting clock), inspired by Lamport logical clocks.
+ * {{{
+ * Reference:
+ * 1) Leslie Lamport (1978). "Time, clocks, and the ordering of events in a distributed system". Communications of the ACM 21 (7): 558-565.
+ * 2) Friedemann Mattern (1988). "Virtual Time and Global States of Distributed Systems". Workshop on Parallel and Distributed Algorithms: pp. 215-226
+ * }}}
+ *
+ * Based on code from the 'vlock' VectorClock library by Coda Hale.
+ */
+case class VectorClock(
+ timestamp: VectorClock.Timestamp = VectorClock.Timestamp(),
+ versions: Map[VectorClock.Node, VectorClock.Timestamp] = Map.empty[VectorClock.Node, VectorClock.Timestamp])
+ extends PartiallyOrdered[VectorClock] {
+
+ import VectorClock._
+
+ /**
+ * Increment the version for the node passed as argument. Returns a new VectorClock.
+ */
+ def +(node: Node): VectorClock = copy(versions = versions + (node -> Timestamp()))
+
+ /**
+ * Returns true if this and that are concurrent else false.
+ */
+ def <>(that: VectorClock): Boolean = tryCompareTo(that) == None
+
+ /**
+ * Returns true if this VectorClock has the same history as the 'that' VectorClock else false.
+ */
+ def ==(that: VectorClock): Boolean = versions == that.versions
+
+ /**
+ * For the 'PartiallyOrdered' trait, to allow natural comparisons using <, > and ==.
+ *
+ * Compare two vector clocks. The outcomes will be one of the following:
+ *
+ * {{{
+ * 1. Clock 1 is BEFORE (>) Clock 2 if there exists an i such that c1(i) <= c(2) and there does not exist a j such that c1(j) > c2(j).
+ * 2. Clock 1 is CONCURRENT (<>) to Clock 2 if there exists an i, j such that c1(i) < c2(i) and c1(j) > c2(j).
+ * 3. Clock 1 is AFTER (<) Clock 2 otherwise.
+ * }}}
+ */
+ def tryCompareTo[V >: VectorClock <% PartiallyOrdered[V]](vclock: V): Option[Int] = {
+ def compare(versions1: Map[Node, Timestamp], versions2: Map[Node, Timestamp]): Boolean = {
+ versions1.forall { case ((n, t)) ⇒ t <= versions2.getOrElse(n, Timestamp.zero) } &&
+ (versions1.exists { case ((n, t)) ⇒ t < versions2.getOrElse(n, Timestamp.zero) } ||
+ (versions1.size < versions2.size))
+ }
+ vclock match {
+ case VectorClock(_, otherVersions) ⇒
+ if (compare(versions, otherVersions)) Some(-1)
+ else if (compare(otherVersions, versions)) Some(1)
+ else if (versions == otherVersions) Some(0)
+ else None
+ case _ ⇒ None
+ }
+ }
+
+ /**
+ * Merges this VectorClock with another VectorClock. E.g. merges its versioned history.
+ */
+ def merge(that: VectorClock): VectorClock = {
+ val mergedVersions = scala.collection.mutable.Map.empty[Node, Timestamp] ++ that.versions
+ for ((node, time) ← versions) mergedVersions(node) = time max mergedVersions.getOrElse(node, time)
+ VectorClock(timestamp, Map.empty[Node, Timestamp] ++ mergedVersions)
+ }
+
+ override def toString = versions.map { case ((n, t)) ⇒ n + " -> " + t }.mkString("VectorClock(", ", ", ")")
+}
diff --git a/akka-cluster/src/test/scala/akka/cluster/AccrualFailureDetectorSpec.scala b/akka-cluster/src/test/scala/akka/cluster/AccrualFailureDetectorSpec.scala
index 4aab105273..275cd32c75 100644
--- a/akka-cluster/src/test/scala/akka/cluster/AccrualFailureDetectorSpec.scala
+++ b/akka-cluster/src/test/scala/akka/cluster/AccrualFailureDetectorSpec.scala
@@ -1,3 +1,7 @@
+/**
+ * Copyright (C) 2009-2012 Typesafe Inc.
+ */
+
package akka.cluster
import java.net.InetSocketAddress
@@ -5,14 +9,21 @@ import akka.testkit.AkkaSpec
import akka.actor.Address
class AccrualFailureDetectorSpec extends AkkaSpec("""
- akka.loglevel = "DEBUG"
+ akka.loglevel = "INFO"
""") {
"An AccrualFailureDetector" must {
val conn = Address("akka", "", "localhost", 2552)
+ val conn2 = Address("akka", "", "localhost", 2553)
+
+ "return phi value of 0.0D on startup for each address" in {
+ val fd = new AccrualFailureDetector(system, conn)
+ fd.phi(conn) must be(0.0D)
+ fd.phi(conn2) must be(0.0D)
+ }
"mark node as available after a series of successful heartbeats" in {
- val fd = new AccrualFailureDetector(system)
+ val fd = new AccrualFailureDetector(system, conn)
fd.heartbeat(conn)
@@ -27,7 +38,7 @@ class AccrualFailureDetectorSpec extends AkkaSpec("""
// FIXME how should we deal with explicit removal of connection? - if triggered as failure then we have a problem in boostrap - see line 142 in AccrualFailureDetector
"mark node as dead after explicit removal of connection" ignore {
- val fd = new AccrualFailureDetector(system)
+ val fd = new AccrualFailureDetector(system, conn)
fd.heartbeat(conn)
@@ -45,7 +56,7 @@ class AccrualFailureDetectorSpec extends AkkaSpec("""
}
"mark node as dead if heartbeat are missed" in {
- val fd = new AccrualFailureDetector(system, threshold = 3)
+ val fd = new AccrualFailureDetector(system, conn, threshold = 3)
fd.heartbeat(conn)
@@ -63,7 +74,7 @@ class AccrualFailureDetectorSpec extends AkkaSpec("""
}
"mark node as available if it starts heartbeat again after being marked dead due to detection of failure" in {
- val fd = new AccrualFailureDetector(system, threshold = 3)
+ val fd = new AccrualFailureDetector(system, conn, threshold = 3)
fd.heartbeat(conn)
diff --git a/akka-cluster/src/test/scala/akka/cluster/ClientDowningSpec.scala b/akka-cluster/src/test/scala/akka/cluster/ClientDowningSpec.scala
new file mode 100644
index 0000000000..16651af9b5
--- /dev/null
+++ b/akka-cluster/src/test/scala/akka/cluster/ClientDowningSpec.scala
@@ -0,0 +1,186 @@
+/**
+ * Copyright (C) 2009-2012 Typesafe Inc.
+ */
+
+package akka.cluster
+
+import akka.testkit._
+import akka.dispatch._
+import akka.actor._
+import akka.remote._
+import akka.util.duration._
+
+import com.typesafe.config._
+
+import java.net.InetSocketAddress
+
+class ClientDowningSpec extends AkkaSpec("""
+ akka {
+ loglevel = "INFO"
+ actor.provider = "akka.remote.RemoteActorRefProvider"
+ cluster {
+ failure-detector.threshold = 3
+ auto-down = off
+ }
+ }
+ """) with ImplicitSender {
+
+ var node1: Node = _
+ var node2: Node = _
+ var node3: Node = _
+ var node4: Node = _
+
+ var system1: ActorSystemImpl = _
+ var system2: ActorSystemImpl = _
+ var system3: ActorSystemImpl = _
+ var system4: ActorSystemImpl = _
+
+ try {
+ "Client of a 4 node cluster" must {
+
+ // ======= NODE 1 ========
+ system1 = ActorSystem("system1", ConfigFactory
+ .parseString("""
+ akka {
+ remote.netty {
+ hostname = localhost
+ port=5550
+ }
+ }""")
+ .withFallback(system.settings.config))
+ .asInstanceOf[ActorSystemImpl]
+ val remote1 = system1.provider.asInstanceOf[RemoteActorRefProvider]
+ node1 = Node(system1)
+ val fd1 = node1.failureDetector
+ val address1 = node1.remoteAddress
+
+ // ======= NODE 2 ========
+ system2 = ActorSystem("system2", ConfigFactory
+ .parseString("""
+ akka {
+ remote.netty {
+ hostname = localhost
+ port = 5551
+ }
+ cluster.node-to-join = "akka://system1@localhost:5550"
+ }""")
+ .withFallback(system.settings.config))
+ .asInstanceOf[ActorSystemImpl]
+ val remote2 = system2.provider.asInstanceOf[RemoteActorRefProvider]
+ node2 = Node(system2)
+ val fd2 = node2.failureDetector
+ val address2 = node2.remoteAddress
+
+ // ======= NODE 3 ========
+ system3 = ActorSystem("system3", ConfigFactory
+ .parseString("""
+ akka {
+ remote.netty {
+ hostname = localhost
+ port=5552
+ }
+ cluster.node-to-join = "akka://system1@localhost:5550"
+ }""")
+ .withFallback(system.settings.config))
+ .asInstanceOf[ActorSystemImpl]
+ val remote3 = system3.provider.asInstanceOf[RemoteActorRefProvider]
+ node3 = Node(system3)
+ val fd3 = node3.failureDetector
+ val address3 = node3.remoteAddress
+
+ // ======= NODE 4 ========
+ system4 = ActorSystem("system4", ConfigFactory
+ .parseString("""
+ akka {
+ remote.netty {
+ hostname = localhost
+ port=5553
+ }
+ cluster.node-to-join = "akka://system1@localhost:5550"
+ }""")
+ .withFallback(system.settings.config))
+ .asInstanceOf[ActorSystemImpl]
+ val remote4 = system4.provider.asInstanceOf[RemoteActorRefProvider]
+ node4 = Node(system4)
+ val fd4 = node4.failureDetector
+ val address4 = node4.remoteAddress
+
+ "be able to DOWN a node that is UP" taggedAs LongRunningTest in {
+
+ println("Give the system time to converge...")
+ Thread.sleep(30.seconds.dilated.toMillis) // let them gossip for 30 seconds
+
+ // check cluster convergence
+ node1.convergence must be('defined)
+ node2.convergence must be('defined)
+ node3.convergence must be('defined)
+ node4.convergence must be('defined)
+
+ // shut down node3
+ node3.shutdown()
+ system3.shutdown()
+
+ // wait for convergence
+ println("Give the system time to converge...")
+ Thread.sleep(30.seconds.dilated.toMillis)
+
+ // client marks node3 as DOWN
+ node1.scheduleNodeDown(address3)
+
+ println("Give the system time to converge...")
+ Thread.sleep(30.seconds.dilated.toMillis) // let them gossip for 30 seconds
+
+ // check cluster convergence
+ node1.convergence must be('defined)
+ node2.convergence must be('defined)
+ node4.convergence must be('defined)
+
+ node1.latestGossip.members.size must be(3)
+ node1.latestGossip.members.exists(_.address == address3) must be(false)
+ }
+
+ "be able to DOWN a node that is UNREACHABLE" taggedAs LongRunningTest in {
+
+ // shut down system1 - the leader
+ node4.shutdown()
+ system4.shutdown()
+
+ // wait for convergence
+ println("Give the system time to converge...")
+ Thread.sleep(30.seconds.dilated.toMillis)
+
+ // clien marks node4 as DOWN
+ node2.scheduleNodeDown(address4)
+
+ println("Give the system time to converge...")
+ Thread.sleep(30.seconds.dilated.toMillis) // let them gossip for 30 seconds
+
+ // check cluster convergence
+ node1.convergence must be('defined)
+ node2.convergence must be('defined)
+
+ node1.latestGossip.members.size must be(2)
+ node1.latestGossip.members.exists(_.address == address4) must be(false)
+ node1.latestGossip.members.exists(_.address == address3) must be(false)
+ }
+ }
+ } catch {
+ case e: Exception ⇒
+ e.printStackTrace
+ fail(e.toString)
+ }
+
+ override def atTermination() {
+ if (node1 ne null) node1.shutdown()
+ if (system1 ne null) system1.shutdown()
+
+ if (node2 ne null) node2.shutdown()
+ if (system2 ne null) system2.shutdown()
+
+ if (node3 ne null) node3.shutdown()
+ if (system3 ne null) system3.shutdown()
+
+ if (node4 ne null) node4.shutdown()
+ if (system4 ne null) system4.shutdown()
+ }
+}
diff --git a/akka-cluster/src/test/scala/akka/cluster/ClusterConfigSpec.scala b/akka-cluster/src/test/scala/akka/cluster/ClusterConfigSpec.scala
index 240d1ad3ff..c8fd8e6bda 100644
--- a/akka-cluster/src/test/scala/akka/cluster/ClusterConfigSpec.scala
+++ b/akka-cluster/src/test/scala/akka/cluster/ClusterConfigSpec.scala
@@ -25,11 +25,12 @@ class ClusterConfigSpec extends AkkaSpec(
import settings._
FailureDetectorThreshold must be(8)
FailureDetectorMaxSampleSize must be(1000)
- SeedNodeConnectionTimeout must be(30 seconds)
- MaxTimeToRetryJoiningCluster must be(30 seconds)
- InitialDelayForGossip must be(5 seconds)
+ NodeToJoin must be(None)
+ GossipInitialDelay must be(5 seconds)
GossipFrequency must be(1 second)
- SeedNodes must be(Set())
+ NrOfGossipDaemons must be(4)
+ NrOfDeputyNodes must be(3)
+ AutoDown must be(true)
}
}
}
diff --git a/akka-cluster/src/test/scala/akka/cluster/GossipingAccrualFailureDetectorSpec.scala b/akka-cluster/src/test/scala/akka/cluster/GossipingAccrualFailureDetectorSpec.scala
index 6366a9f65e..fb9b8408db 100644
--- a/akka-cluster/src/test/scala/akka/cluster/GossipingAccrualFailureDetectorSpec.scala
+++ b/akka-cluster/src/test/scala/akka/cluster/GossipingAccrualFailureDetectorSpec.scala
@@ -1,95 +1,115 @@
-// /**
-// * Copyright (C) 2009-2011 Typesafe Inc.
-// */
-// package akka.cluster
+/**
+ * Copyright (C) 2009-2012 Typesafe Inc.
+ */
+package akka.cluster
-// import java.net.InetSocketAddress
+import akka.testkit._
+import akka.dispatch._
+import akka.actor._
+import akka.remote._
+import akka.util.duration._
-// import akka.testkit._
-// import akka.dispatch._
-// import akka.actor._
-// import com.typesafe.config._
+import com.typesafe.config._
-// class GossipingAccrualFailureDetectorSpec extends AkkaSpec("""
-// akka {
-// loglevel = "INFO"
-// actor.provider = "akka.remote.RemoteActorRefProvider"
+import java.net.InetSocketAddress
-// remote.server.hostname = localhost
-// remote.server.port = 5550
-// remote.failure-detector.threshold = 3
-// cluster.seed-nodes = ["akka://localhost:5551"]
-// }
-// """) with ImplicitSender {
+class GossipingAccrualFailureDetectorSpec extends AkkaSpec("""
+ akka {
+ loglevel = "INFO"
+ actor.debug.lifecycle = on
+ actor.debug.autoreceive = on
+ actor.provider = akka.remote.RemoteActorRefProvider
+ remote.netty.hostname = localhost
+ cluster.failure-detector.threshold = 3
+ }
+ """) with ImplicitSender {
-// val conn1 = Address("akka", system.systemName, Some("localhost"), Some(5551))
-// val node1 = ActorSystem("GossiperSpec", ConfigFactory
-// .parseString("akka { remote.server.port=5551, cluster.use-cluster = on }")
-// .withFallback(system.settings.config))
-// val remote1 =
-// node1.asInstanceOf[ActorSystemImpl]
-// .provider.asInstanceOf[RemoteActorRefProvider]
-// .remote
-// val gossiper1 = remote1.gossiper
-// val fd1 = remote1.failureDetector
-// gossiper1 must be('defined)
+ var node1: Node = _
+ var node2: Node = _
+ var node3: Node = _
-// val conn2 = RemoteNettyAddress("localhost", 5552)
-// val node2 = ActorSystem("GossiperSpec", ConfigFactory
-// .parseString("akka { remote.server.port=5552, cluster.use-cluster = on }")
-// .withFallback(system.settings.config))
-// val remote2 =
-// node2.asInstanceOf[ActorSystemImpl]
-// .provider.asInstanceOf[RemoteActorRefProvider]
-// .remote
-// val gossiper2 = remote2.gossiper
-// val fd2 = remote2.failureDetector
-// gossiper2 must be('defined)
+ var system1: ActorSystemImpl = _
+ var system2: ActorSystemImpl = _
+ var system3: ActorSystemImpl = _
-// val conn3 = RemoteNettyAddress("localhost", 5553)
-// val node3 = ActorSystem("GossiperSpec", ConfigFactory
-// .parseString("akka { remote.server.port=5553, cluster.use-cluster = on }")
-// .withFallback(system.settings.config))
-// val remote3 =
-// node3.asInstanceOf[ActorSystemImpl]
-// .provider.asInstanceOf[RemoteActorRefProvider]
-// .remote
-// val gossiper3 = remote3.gossiper
-// val fd3 = remote3.failureDetector
-// gossiper3 must be('defined)
+ try {
+ "A Gossip-driven Failure Detector" must {
-// "A Gossip-driven Failure Detector" must {
+ // ======= NODE 1 ========
+ system1 = ActorSystem("system1", ConfigFactory
+ .parseString("akka.remote.netty.port=5550")
+ .withFallback(system.settings.config))
+ .asInstanceOf[ActorSystemImpl]
+ val remote1 = system1.provider.asInstanceOf[RemoteActorRefProvider]
+ node1 = Node(system1)
+ val fd1 = node1.failureDetector
+ val address1 = node1.remoteAddress
-// "receive gossip heartbeats so that all healthy nodes in the cluster are marked 'available'" ignore {
-// Thread.sleep(5000) // let them gossip for 10 seconds
-// fd1.isAvailable(conn2) must be(true)
-// fd1.isAvailable(conn3) must be(true)
-// fd2.isAvailable(conn1) must be(true)
-// fd2.isAvailable(conn3) must be(true)
-// fd3.isAvailable(conn1) must be(true)
-// fd3.isAvailable(conn2) must be(true)
-// }
+ // ======= NODE 2 ========
+ system2 = ActorSystem("system2", ConfigFactory
+ .parseString("""
+ akka {
+ remote.netty.port=5551
+ cluster.node-to-join = "akka://system1@localhost:5550"
+ }""")
+ .withFallback(system.settings.config))
+ .asInstanceOf[ActorSystemImpl]
+ val remote2 = system2.provider.asInstanceOf[RemoteActorRefProvider]
+ node2 = Node(system2)
+ val fd2 = node2.failureDetector
+ val address2 = node2.remoteAddress
-// "mark node as 'unavailable' if a node in the cluster is shut down and its heartbeats stops" ignore {
-// // kill node 3
-// gossiper3.get.shutdown()
-// node3.shutdown()
-// Thread.sleep(5000) // let them gossip for 10 seconds
+ // ======= NODE 3 ========
+ system3 = ActorSystem("system3", ConfigFactory
+ .parseString("""
+ akka {
+ remote.netty.port=5552
+ cluster.node-to-join = "akka://system1@localhost:5550"
+ }""")
+ .withFallback(system.settings.config))
+ .asInstanceOf[ActorSystemImpl]
+ val remote3 = system3.provider.asInstanceOf[RemoteActorRefProvider]
+ node3 = Node(system3)
+ val fd3 = node3.failureDetector
+ val address3 = node3.remoteAddress
-// fd1.isAvailable(conn2) must be(true)
-// fd1.isAvailable(conn3) must be(false)
-// fd2.isAvailable(conn1) must be(true)
-// fd2.isAvailable(conn3) must be(false)
-// }
-// }
+ "receive gossip heartbeats so that all healthy systems in the cluster are marked 'available'" taggedAs LongRunningTest in {
+ println("Let the systems gossip for a while...")
+ Thread.sleep(30.seconds.dilated.toMillis) // let them gossip for 30 seconds
+ fd1.isAvailable(address2) must be(true)
+ fd1.isAvailable(address3) must be(true)
+ fd2.isAvailable(address1) must be(true)
+ fd2.isAvailable(address3) must be(true)
+ fd3.isAvailable(address1) must be(true)
+ fd3.isAvailable(address2) must be(true)
+ }
-// override def atTermination() {
-// gossiper1.get.shutdown()
-// gossiper2.get.shutdown()
-// gossiper3.get.shutdown()
-// node1.shutdown()
-// node2.shutdown()
-// node3.shutdown()
-// // FIXME Ordering problem - If we shut down the ActorSystem before the Gossiper then we get an IllegalStateException
-// }
-// }
+ "mark system as 'unavailable' if a system in the cluster is shut down (and its heartbeats stops)" taggedAs LongRunningTest in {
+ // shut down system3
+ node3.shutdown()
+ system3.shutdown()
+ println("Give the remaning systems time to detect failure...")
+ Thread.sleep(30.seconds.dilated.toMillis) // give them 30 seconds to detect failure of system3
+ fd1.isAvailable(address2) must be(true)
+ fd1.isAvailable(address3) must be(false)
+ fd2.isAvailable(address1) must be(true)
+ fd2.isAvailable(address3) must be(false)
+ }
+ }
+ } catch {
+ case e: Exception ⇒
+ e.printStackTrace
+ fail(e.toString)
+ }
+
+ override def atTermination() {
+ if (node1 ne null) node1.shutdown()
+ if (system1 ne null) system1.shutdown()
+
+ if (node2 ne null) node2.shutdown()
+ if (system2 ne null) system2.shutdown()
+
+ if (node3 ne null) node3.shutdown()
+ if (system3 ne null) system3.shutdown()
+ }
+}
diff --git a/akka-cluster/src/test/scala/akka/cluster/LeaderDowningSpec.scala b/akka-cluster/src/test/scala/akka/cluster/LeaderDowningSpec.scala
new file mode 100644
index 0000000000..957f8ed4aa
--- /dev/null
+++ b/akka-cluster/src/test/scala/akka/cluster/LeaderDowningSpec.scala
@@ -0,0 +1,179 @@
+/**
+ * Copyright (C) 2009-2012 Typesafe Inc.
+ */
+
+package akka.cluster
+
+import akka.testkit._
+import akka.dispatch._
+import akka.actor._
+import akka.remote._
+import akka.util.duration._
+
+import com.typesafe.config._
+
+import java.net.InetSocketAddress
+
+class LeaderDowningSpec extends AkkaSpec("""
+ akka {
+ loglevel = "INFO"
+ actor.provider = "akka.remote.RemoteActorRefProvider"
+ cluster {
+ failure-detector.threshold = 3
+ auto-down = on
+ }
+ }
+ """) with ImplicitSender {
+
+ var node1: Node = _
+ var node2: Node = _
+ var node3: Node = _
+ var node4: Node = _
+
+ var system1: ActorSystemImpl = _
+ var system2: ActorSystemImpl = _
+ var system3: ActorSystemImpl = _
+ var system4: ActorSystemImpl = _
+
+ try {
+ "The Leader in a 4 node cluster" must {
+
+ // ======= NODE 1 ========
+ system1 = ActorSystem("system1", ConfigFactory
+ .parseString("""
+ akka {
+ remote.netty {
+ hostname = localhost
+ port=5550
+ }
+ }""")
+ .withFallback(system.settings.config))
+ .asInstanceOf[ActorSystemImpl]
+ val remote1 = system1.provider.asInstanceOf[RemoteActorRefProvider]
+ node1 = Node(system1)
+ val fd1 = node1.failureDetector
+ val address1 = node1.remoteAddress
+
+ // ======= NODE 2 ========
+ system2 = ActorSystem("system2", ConfigFactory
+ .parseString("""
+ akka {
+ remote.netty {
+ hostname = localhost
+ port = 5551
+ }
+ cluster.node-to-join = "akka://system1@localhost:5550"
+ }""")
+ .withFallback(system.settings.config))
+ .asInstanceOf[ActorSystemImpl]
+ val remote2 = system2.provider.asInstanceOf[RemoteActorRefProvider]
+ node2 = Node(system2)
+ val fd2 = node2.failureDetector
+ val address2 = node2.remoteAddress
+
+ // ======= NODE 3 ========
+ system3 = ActorSystem("system3", ConfigFactory
+ .parseString("""
+ akka {
+ remote.netty {
+ hostname = localhost
+ port=5552
+ }
+ cluster.node-to-join = "akka://system1@localhost:5550"
+ }""")
+ .withFallback(system.settings.config))
+ .asInstanceOf[ActorSystemImpl]
+ val remote3 = system3.provider.asInstanceOf[RemoteActorRefProvider]
+ node3 = Node(system3)
+ val fd3 = node3.failureDetector
+ val address3 = node3.remoteAddress
+
+ // ======= NODE 4 ========
+ system4 = ActorSystem("system4", ConfigFactory
+ .parseString("""
+ akka {
+ remote.netty {
+ hostname = localhost
+ port=5553
+ }
+ cluster.node-to-join = "akka://system1@localhost:5550"
+ }""")
+ .withFallback(system.settings.config))
+ .asInstanceOf[ActorSystemImpl]
+ val remote4 = system4.provider.asInstanceOf[RemoteActorRefProvider]
+ node4 = Node(system4)
+ val fd4 = node4.failureDetector
+ val address4 = node4.remoteAddress
+
+ "be able to DOWN a (last) node that is UNREACHABLE" taggedAs LongRunningTest in {
+
+ println("Give the system time to converge...")
+ Thread.sleep(30.seconds.dilated.toMillis) // let them gossip for 30 seconds
+
+ // check cluster convergence
+ node1.convergence must be('defined)
+ node2.convergence must be('defined)
+ node3.convergence must be('defined)
+ node4.convergence must be('defined)
+
+ // shut down system4
+ node4.shutdown()
+ system4.shutdown()
+
+ // wait for convergence - e.g. the leader to auto-down the failed node
+ println("Give the system time to converge...")
+ Thread.sleep(30.seconds.dilated.toMillis) // let them gossip for 30 seconds
+
+ // check cluster convergence
+ node1.convergence must be('defined)
+ node2.convergence must be('defined)
+ node3.convergence must be('defined)
+
+ node1.latestGossip.members.size must be(3)
+ node1.latestGossip.members.exists(_.address == address4) must be(false)
+ }
+
+ "be able to DOWN a (middle) node that is UNREACHABLE" taggedAs LongRunningTest in {
+
+ // check cluster convergence
+ node1.convergence must be('defined)
+ node2.convergence must be('defined)
+ node3.convergence must be('defined)
+
+ // shut down system4
+ node2.shutdown()
+ system2.shutdown()
+
+ // wait for convergence - e.g. the leader to auto-down the failed node
+ println("Give the system time to converge...")
+ Thread.sleep(30.seconds.dilated.toMillis) // let them gossip for 30 seconds
+
+ // check cluster convergence
+ node1.convergence must be('defined)
+ node3.convergence must be('defined)
+
+ node1.latestGossip.members.size must be(2)
+ node1.latestGossip.members.exists(_.address == address4) must be(false)
+ node1.latestGossip.members.exists(_.address == address2) must be(false)
+ }
+ }
+ } catch {
+ case e: Exception ⇒
+ e.printStackTrace
+ fail(e.toString)
+ }
+
+ override def atTermination() {
+ if (node1 ne null) node1.shutdown()
+ if (system1 ne null) system1.shutdown()
+
+ if (node2 ne null) node2.shutdown()
+ if (system2 ne null) system2.shutdown()
+
+ if (node3 ne null) node3.shutdown()
+ if (system3 ne null) system3.shutdown()
+
+ if (node4 ne null) node4.shutdown()
+ if (system4 ne null) system4.shutdown()
+ }
+}
diff --git a/akka-cluster/src/test/scala/akka/cluster/LeaderElectionSpec.scala b/akka-cluster/src/test/scala/akka/cluster/LeaderElectionSpec.scala
new file mode 100644
index 0000000000..08d4201bd3
--- /dev/null
+++ b/akka-cluster/src/test/scala/akka/cluster/LeaderElectionSpec.scala
@@ -0,0 +1,158 @@
+/**
+ * Copyright (C) 2009-2012 Typesafe Inc.
+ */
+
+package akka.cluster
+
+import akka.testkit._
+import akka.dispatch._
+import akka.actor._
+import akka.remote._
+import akka.util.duration._
+
+import com.typesafe.config._
+
+import java.net.InetSocketAddress
+
+class LeaderElectionSpec extends AkkaSpec("""
+ akka {
+ loglevel = "INFO"
+ actor.provider = "akka.remote.RemoteActorRefProvider"
+ cluster.failure-detector.threshold = 3
+ }
+ """) with ImplicitSender {
+
+ var node1: Node = _
+ var node2: Node = _
+ var node3: Node = _
+
+ var system1: ActorSystemImpl = _
+ var system2: ActorSystemImpl = _
+ var system3: ActorSystemImpl = _
+
+ try {
+ "A cluster of three nodes" must {
+
+ // ======= NODE 1 ========
+ system1 = ActorSystem("system1", ConfigFactory
+ .parseString("""
+ akka {
+ remote.netty {
+ hostname = localhost
+ port=5550
+ }
+ }""")
+ .withFallback(system.settings.config))
+ .asInstanceOf[ActorSystemImpl]
+ val remote1 = system1.provider.asInstanceOf[RemoteActorRefProvider]
+ node1 = Node(system1)
+ val fd1 = node1.failureDetector
+ val address1 = node1.remoteAddress
+
+ // ======= NODE 2 ========
+ system2 = ActorSystem("system2", ConfigFactory
+ .parseString("""
+ akka {
+ remote.netty {
+ hostname = localhost
+ port = 5551
+ }
+ cluster.node-to-join = "akka://system1@localhost:5550"
+ }""")
+ .withFallback(system.settings.config))
+ .asInstanceOf[ActorSystemImpl]
+ val remote2 = system2.provider.asInstanceOf[RemoteActorRefProvider]
+ node2 = Node(system2)
+ val fd2 = node2.failureDetector
+ val address2 = node2.remoteAddress
+
+ // ======= NODE 3 ========
+ system3 = ActorSystem("system3", ConfigFactory
+ .parseString("""
+ akka {
+ remote.netty {
+ hostname = localhost
+ port=5552
+ }
+ cluster.node-to-join = "akka://system1@localhost:5550"
+ }""")
+ .withFallback(system.settings.config))
+ .asInstanceOf[ActorSystemImpl]
+ val remote3 = system3.provider.asInstanceOf[RemoteActorRefProvider]
+ node3 = Node(system3)
+ val fd3 = node3.failureDetector
+ val address3 = node3.remoteAddress
+
+ "be able to 'elect' a single leader" taggedAs LongRunningTest in {
+
+ println("Give the system time to converge...")
+ Thread.sleep(30.seconds.dilated.toMillis) // let them gossip for 30 seconds
+
+ // check cluster convergence
+ node1.convergence must be('defined)
+ node2.convergence must be('defined)
+ node3.convergence must be('defined)
+
+ // check leader
+ node1.isLeader must be(true)
+ node2.isLeader must be(false)
+ node3.isLeader must be(false)
+ }
+
+ "be able to 're-elect' a single leader after leader has left" taggedAs LongRunningTest in {
+
+ // shut down system1 - the leader
+ node1.shutdown()
+ system1.shutdown()
+
+ // user marks node1 as DOWN
+ node2.scheduleNodeDown(address1)
+
+ println("Give the system time to converge...")
+ Thread.sleep(30.seconds.dilated.toMillis) // give them 30 seconds to detect failure of system3
+
+ // check cluster convergence
+ node2.convergence must be('defined)
+ node3.convergence must be('defined)
+
+ // check leader
+ node2.isLeader must be(true)
+ node3.isLeader must be(false)
+ }
+
+ "be able to 're-elect' a single leader after leader has left (again, leaving a single node)" taggedAs LongRunningTest in {
+
+ // shut down system1 - the leader
+ node2.shutdown()
+ system2.shutdown()
+
+ // user marks node2 as DOWN
+ node3.scheduleNodeDown(address2)
+
+ println("Give the system time to converge...")
+ Thread.sleep(30.seconds.dilated.toMillis) // give them 30 seconds to detect failure of system3
+
+ // check cluster convergence
+ node3.convergence must be('defined)
+
+ // check leader
+ node3.isLeader must be(true)
+ }
+ }
+ } catch {
+ case e: Exception ⇒
+ e.printStackTrace
+ fail(e.toString)
+ }
+
+ override def atTermination() {
+ if (node1 ne null) node1.shutdown()
+ if (system1 ne null) system1.shutdown()
+
+ if (node2 ne null) node2.shutdown()
+ if (system2 ne null) system2.shutdown()
+
+ if (node3 ne null) node3.shutdown()
+ if (system3 ne null) system3.shutdown()
+ }
+}
diff --git a/akka-cluster/src/test/scala/akka/cluster/MembershipChangeListenerSpec.scala b/akka-cluster/src/test/scala/akka/cluster/MembershipChangeListenerSpec.scala
new file mode 100644
index 0000000000..f3f34e19c1
--- /dev/null
+++ b/akka-cluster/src/test/scala/akka/cluster/MembershipChangeListenerSpec.scala
@@ -0,0 +1,135 @@
+/**
+ * Copyright (C) 2009-2012 Typesafe Inc.
+ */
+package akka.cluster
+
+import akka.testkit._
+import akka.dispatch._
+import akka.actor._
+import akka.remote._
+import akka.util.duration._
+
+import java.net.InetSocketAddress
+import java.util.concurrent.{ CountDownLatch, TimeUnit }
+
+import scala.collection.immutable.SortedSet
+
+import com.typesafe.config._
+
+class MembershipChangeListenerSpec extends AkkaSpec("""
+ akka {
+ actor.provider = akka.remote.RemoteActorRefProvider
+ remote.netty.hostname = localhost
+ loglevel = "INFO"
+ }
+ """) with ImplicitSender {
+
+ var node0: Node = _
+ var node1: Node = _
+ var node2: Node = _
+
+ var system0: ActorSystemImpl = _
+ var system1: ActorSystemImpl = _
+ var system2: ActorSystemImpl = _
+
+ try {
+ "A set of connected cluster systems" must {
+ "(when two systems) after cluster convergence updates the membership table then all MembershipChangeListeners should be triggered" taggedAs LongRunningTest in {
+ system0 = ActorSystem("system0", ConfigFactory
+ .parseString("akka.remote.netty.port=5550")
+ .withFallback(system.settings.config))
+ .asInstanceOf[ActorSystemImpl]
+ val remote0 = system0.provider.asInstanceOf[RemoteActorRefProvider]
+ node0 = Node(system0)
+
+ system1 = ActorSystem("system1", ConfigFactory
+ .parseString("""
+ akka {
+ remote.netty.port=5551
+ cluster.node-to-join = "akka://system0@localhost:5550"
+ }""")
+ .withFallback(system.settings.config))
+ .asInstanceOf[ActorSystemImpl]
+ val remote1 = system1.provider.asInstanceOf[RemoteActorRefProvider]
+ node1 = Node(system1)
+
+ val latch = new CountDownLatch(2)
+
+ node0.registerListener(new MembershipChangeListener {
+ def notify(members: SortedSet[Member]) {
+ latch.countDown()
+ }
+ })
+ node1.registerListener(new MembershipChangeListener {
+ def notify(members: SortedSet[Member]) {
+ latch.countDown()
+ }
+ })
+
+ latch.await(10.seconds.dilated.toMillis, TimeUnit.MILLISECONDS)
+
+ Thread.sleep(10.seconds.dilated.toMillis)
+
+ // check cluster convergence
+ node0.convergence must be('defined)
+ node1.convergence must be('defined)
+ }
+
+ "(when three systems) after cluster convergence updates the membership table then all MembershipChangeListeners should be triggered" taggedAs LongRunningTest in {
+
+ // ======= NODE 2 ========
+ system2 = ActorSystem("system2", ConfigFactory
+ .parseString("""
+ akka {
+ remote.netty.port=5552
+ cluster.node-to-join = "akka://system0@localhost:5550"
+ }""")
+ .withFallback(system.settings.config))
+ .asInstanceOf[ActorSystemImpl]
+ val remote2 = system2.provider.asInstanceOf[RemoteActorRefProvider]
+ node2 = Node(system2)
+
+ val latch = new CountDownLatch(3)
+ node0.registerListener(new MembershipChangeListener {
+ def notify(members: SortedSet[Member]) {
+ latch.countDown()
+ }
+ })
+ node1.registerListener(new MembershipChangeListener {
+ def notify(members: SortedSet[Member]) {
+ latch.countDown()
+ }
+ })
+ node2.registerListener(new MembershipChangeListener {
+ def notify(members: SortedSet[Member]) {
+ latch.countDown()
+ }
+ })
+
+ latch.await(30.seconds.dilated.toMillis, TimeUnit.MILLISECONDS)
+
+ Thread.sleep(30.seconds.dilated.toMillis)
+
+ // check cluster convergence
+ node0.convergence must be('defined)
+ node1.convergence must be('defined)
+ node2.convergence must be('defined)
+ }
+ }
+ } catch {
+ case e: Exception ⇒
+ e.printStackTrace
+ fail(e.toString)
+ }
+
+ override def atTermination() {
+ if (node0 ne null) node0.shutdown()
+ if (system0 ne null) system0.shutdown()
+
+ if (node1 ne null) node1.shutdown()
+ if (system1 ne null) system1.shutdown()
+
+ if (node2 ne null) node2.shutdown()
+ if (system2 ne null) system2.shutdown()
+ }
+}
diff --git a/akka-cluster/src/test/scala/akka/cluster/NodeMembershipSpec.scala b/akka-cluster/src/test/scala/akka/cluster/NodeMembershipSpec.scala
new file mode 100644
index 0000000000..fd3e31e83e
--- /dev/null
+++ b/akka-cluster/src/test/scala/akka/cluster/NodeMembershipSpec.scala
@@ -0,0 +1,143 @@
+/**
+ * Copyright (C) 2009-2012 Typesafe Inc.
+ */
+package akka.cluster
+
+import java.net.InetSocketAddress
+
+import akka.testkit._
+import akka.dispatch._
+import akka.actor._
+import akka.remote._
+import akka.util.duration._
+
+import com.typesafe.config._
+
+class NodeMembershipSpec extends AkkaSpec("""
+ akka {
+ actor.provider = akka.remote.RemoteActorRefProvider
+ remote.netty.hostname = localhost
+ loglevel = "INFO"
+ }
+ """) with ImplicitSender {
+
+ var node0: Node = _
+ var node1: Node = _
+ var node2: Node = _
+
+ var system0: ActorSystemImpl = _
+ var system1: ActorSystemImpl = _
+ var system2: ActorSystemImpl = _
+
+ try {
+ "A set of connected cluster systems" must {
+ "(when two systems) start gossiping to each other so that both systems gets the same gossip info" taggedAs LongRunningTest in {
+
+ // ======= NODE 0 ========
+ system0 = ActorSystem("system0", ConfigFactory
+ .parseString("akka.remote.netty.port=5550")
+ .withFallback(system.settings.config))
+ .asInstanceOf[ActorSystemImpl]
+ val remote0 = system0.provider.asInstanceOf[RemoteActorRefProvider]
+ node0 = Node(system0)
+
+ // ======= NODE 1 ========
+ system1 = ActorSystem("system1", ConfigFactory
+ .parseString("""
+ akka {
+ remote.netty.port=5551
+ cluster.node-to-join = "akka://system0@localhost:5550"
+ }""")
+ .withFallback(system.settings.config))
+ .asInstanceOf[ActorSystemImpl]
+ val remote1 = system1.provider.asInstanceOf[RemoteActorRefProvider]
+ node1 = Node(system1)
+
+ Thread.sleep(10.seconds.dilated.toMillis)
+
+ // check cluster convergence
+ node0.convergence must be('defined)
+ node1.convergence must be('defined)
+
+ val members0 = node0.latestGossip.members.toArray
+ members0.size must be(2)
+ members0(0).address.port.get must be(5550)
+ members0(0).status must be(MemberStatus.Up)
+ members0(1).address.port.get must be(5551)
+ members0(1).status must be(MemberStatus.Up)
+
+ val members1 = node1.latestGossip.members.toArray
+ members1.size must be(2)
+ members1(0).address.port.get must be(5550)
+ members1(0).status must be(MemberStatus.Up)
+ members1(1).address.port.get must be(5551)
+ members1(1).status must be(MemberStatus.Up)
+ }
+
+ "(when three systems) start gossiping to each other so that both systems gets the same gossip info" taggedAs LongRunningTest ignore {
+
+ // ======= NODE 2 ========
+ system2 = ActorSystem("system2", ConfigFactory
+ .parseString("""
+ akka {
+ remote.netty.port=5552
+ cluster.node-to-join = "akka://system0@localhost:5550"
+ }""")
+ .withFallback(system.settings.config))
+ .asInstanceOf[ActorSystemImpl]
+ val remote2 = system2.provider.asInstanceOf[RemoteActorRefProvider]
+ node2 = Node(system2)
+
+ Thread.sleep(10.seconds.dilated.toMillis)
+
+ // check cluster convergence
+ node0.convergence must be('defined)
+ node1.convergence must be('defined)
+ node2.convergence must be('defined)
+
+ val members0 = node0.latestGossip.members.toArray
+ val version = node0.latestGossip.version
+ members0.size must be(3)
+ members0(0).address.port.get must be(5550)
+ members0(0).status must be(MemberStatus.Up)
+ members0(1).address.port.get must be(5551)
+ members0(1).status must be(MemberStatus.Up)
+ members0(2).address.port.get must be(5552)
+ members0(2).status must be(MemberStatus.Up)
+
+ val members1 = node1.latestGossip.members.toArray
+ members1.size must be(3)
+ members1(0).address.port.get must be(5550)
+ members1(0).status must be(MemberStatus.Up)
+ members1(1).address.port.get must be(5551)
+ members1(1).status must be(MemberStatus.Up)
+ members1(2).address.port.get must be(5552)
+ members1(2).status must be(MemberStatus.Up)
+
+ val members2 = node2.latestGossip.members.toArray
+ members2.size must be(3)
+ members2(0).address.port.get must be(5550)
+ members2(0).status must be(MemberStatus.Up)
+ members2(1).address.port.get must be(5551)
+ members2(1).status must be(MemberStatus.Up)
+ members2(2).address.port.get must be(5552)
+ members2(2).status must be(MemberStatus.Up)
+ }
+ }
+ } catch {
+ case e: Exception ⇒
+ e.printStackTrace
+ fail(e.toString)
+ }
+
+ override def atTermination() {
+ if (node0 ne null) node0.shutdown()
+ if (system0 ne null) system0.shutdown()
+
+ if (node1 ne null) node1.shutdown()
+ if (system1 ne null) system1.shutdown()
+
+ if (node2 ne null) node2.shutdown()
+ if (system2 ne null) system2.shutdown()
+ }
+}
diff --git a/akka-cluster/src/test/scala/akka/cluster/NodeStartupSpec.scala b/akka-cluster/src/test/scala/akka/cluster/NodeStartupSpec.scala
new file mode 100644
index 0000000000..640a541971
--- /dev/null
+++ b/akka-cluster/src/test/scala/akka/cluster/NodeStartupSpec.scala
@@ -0,0 +1,84 @@
+/**
+ * Copyright (C) 2009-2011 Typesafe Inc.
+ */
+package akka.cluster
+
+import java.net.InetSocketAddress
+
+import akka.testkit._
+import akka.dispatch._
+import akka.actor._
+import akka.remote._
+import akka.util.duration._
+
+import com.typesafe.config._
+
+class NodeStartupSpec extends AkkaSpec("""
+ akka {
+ loglevel = "INFO"
+ actor.provider = akka.remote.RemoteActorRefProvider
+ remote.netty.hostname = localhost
+ }
+ """) with ImplicitSender {
+
+ var node0: Node = _
+ var node1: Node = _
+ var system0: ActorSystemImpl = _
+ var system1: ActorSystemImpl = _
+
+ try {
+ "A first cluster node with a 'node-to-join' config set to empty string (singleton cluster)" must {
+ system0 = ActorSystem("system0", ConfigFactory
+ .parseString("akka.remote.netty.port=5550")
+ .withFallback(system.settings.config))
+ .asInstanceOf[ActorSystemImpl]
+ val remote0 = system0.provider.asInstanceOf[RemoteActorRefProvider]
+ node0 = Node(system0)
+
+ "be a singleton cluster when started up" in {
+ Thread.sleep(1.seconds.dilated.toMillis)
+ node0.isSingletonCluster must be(true)
+ }
+
+ "be in 'Up' phase when started up" in {
+ val members = node0.latestGossip.members
+ val joiningMember = members find (_.address.port.get == 5550)
+ joiningMember must be('defined)
+ joiningMember.get.status must be(MemberStatus.Joining)
+ }
+ }
+
+ "A second cluster node with a 'node-to-join' config defined" must {
+ "join the other node cluster as 'Joining' when sending a Join command" in {
+ system1 = ActorSystem("system1", ConfigFactory
+ .parseString("""
+ akka {
+ remote.netty.port=5551
+ cluster.node-to-join = "akka://system0@localhost:5550"
+ }""")
+ .withFallback(system.settings.config))
+ .asInstanceOf[ActorSystemImpl]
+ val remote1 = system1.provider.asInstanceOf[RemoteActorRefProvider]
+ node1 = Node(system1)
+
+ Thread.sleep(1.seconds.dilated.toMillis) // give enough time for node1 to JOIN node0
+ val members = node0.latestGossip.members
+ val joiningMember = members find (_.address.port.get == 5551)
+ joiningMember must be('defined)
+ joiningMember.get.status must be(MemberStatus.Joining)
+ }
+ }
+ } catch {
+ case e: Exception ⇒
+ e.printStackTrace
+ fail(e.toString)
+ }
+
+ override def atTermination() {
+ if (node0 ne null) node0.shutdown()
+ if (system0 ne null) system0.shutdown()
+
+ if (node1 ne null) node1.shutdown()
+ if (system1 ne null) system1.shutdown()
+ }
+}
diff --git a/akka-cluster/src/test/scala/akka/cluster/VectorClockSpec.scala b/akka-cluster/src/test/scala/akka/cluster/VectorClockSpec.scala
index df9cead7f8..d0e4c8da13 100644
--- a/akka-cluster/src/test/scala/akka/cluster/VectorClockSpec.scala
+++ b/akka-cluster/src/test/scala/akka/cluster/VectorClockSpec.scala
@@ -1,7 +1,12 @@
+/**
+ * Copyright (C) 2009-2012 Typesafe Inc.
+ */
+
package akka.cluster
import java.net.InetSocketAddress
import akka.testkit.AkkaSpec
+import akka.actor.ActorSystem
class VectorClockSpec extends AkkaSpec {
import VectorClock._
@@ -10,193 +15,266 @@ class VectorClockSpec extends AkkaSpec {
"have zero versions when created" in {
val clock = VectorClock()
- clock.versions must be(Vector())
+ clock.versions must be(Map())
}
- "be able to add Entry if non-existing" in {
- val clock1 = VectorClock()
- clock1.versions must be(Vector())
- val clock2 = clock1.increment(1, System.currentTimeMillis)
- val clock3 = clock2.increment(2, System.currentTimeMillis)
-
- clock3.versions must be(Vector(Entry(1, 1), Entry(2, 1)))
- }
-
- "be able to increment version of existing Entry" in {
- val clock1 = VectorClock()
- val clock2 = clock1.increment(1, System.currentTimeMillis)
- val clock3 = clock2.increment(2, System.currentTimeMillis)
- val clock4 = clock3.increment(1, System.currentTimeMillis)
- val clock5 = clock4.increment(2, System.currentTimeMillis)
- val clock6 = clock5.increment(2, System.currentTimeMillis)
-
- clock6.versions must be(Vector(Entry(1, 2), Entry(2, 3)))
- }
-
- "The empty clock should not happen before itself" in {
+ "not happen before itself" in {
val clock1 = VectorClock()
val clock2 = VectorClock()
- clock1.compare(clock2) must not be (Concurrent)
+ clock1 <> clock2 must be(false)
}
- "not happen before an identical clock" in {
+ "pass misc comparison test 1" in {
val clock1_1 = VectorClock()
- val clock2_1 = clock1_1.increment(1, System.currentTimeMillis)
- val clock3_1 = clock2_1.increment(2, System.currentTimeMillis)
- val clock4_1 = clock3_1.increment(1, System.currentTimeMillis)
+ val clock2_1 = clock1_1 + Node("1")
+ val clock3_1 = clock2_1 + Node("2")
+ val clock4_1 = clock3_1 + Node("1")
val clock1_2 = VectorClock()
- val clock2_2 = clock1_2.increment(1, System.currentTimeMillis)
- val clock3_2 = clock2_2.increment(2, System.currentTimeMillis)
- val clock4_2 = clock3_2.increment(1, System.currentTimeMillis)
+ val clock2_2 = clock1_2 + Node("1")
+ val clock3_2 = clock2_2 + Node("2")
+ val clock4_2 = clock3_2 + Node("1")
- clock4_1.compare(clock4_2) must not be (Concurrent)
+ clock4_1 <> clock4_2 must be(false)
}
- "happen before an identical clock with a single additional event" in {
+ "pass misc comparison test 2" in {
val clock1_1 = VectorClock()
- val clock2_1 = clock1_1.increment(1, System.currentTimeMillis)
- val clock3_1 = clock2_1.increment(2, System.currentTimeMillis)
- val clock4_1 = clock3_1.increment(1, System.currentTimeMillis)
+ val clock2_1 = clock1_1 + Node("1")
+ val clock3_1 = clock2_1 + Node("2")
+ val clock4_1 = clock3_1 + Node("1")
val clock1_2 = VectorClock()
- val clock2_2 = clock1_2.increment(1, System.currentTimeMillis)
- val clock3_2 = clock2_2.increment(2, System.currentTimeMillis)
- val clock4_2 = clock3_2.increment(1, System.currentTimeMillis)
- val clock5_2 = clock4_2.increment(3, System.currentTimeMillis)
+ val clock2_2 = clock1_2 + Node("1")
+ val clock3_2 = clock2_2 + Node("2")
+ val clock4_2 = clock3_2 + Node("1")
+ val clock5_2 = clock4_2 + Node("3")
- clock4_1.compare(clock5_2) must be(Before)
+ clock4_1 < clock5_2 must be(true)
}
- "Two clocks with different events should be concurrent: 1" in {
+ "pass misc comparison test 3" in {
var clock1_1 = VectorClock()
- val clock2_1 = clock1_1.increment(1, System.currentTimeMillis)
+ val clock2_1 = clock1_1 + Node("1")
val clock1_2 = VectorClock()
- val clock2_2 = clock1_2.increment(2, System.currentTimeMillis)
+ val clock2_2 = clock1_2 + Node("2")
- clock2_1.compare(clock2_2) must be(Concurrent)
+ clock2_1 <> clock2_2 must be(true)
}
- "Two clocks with different events should be concurrent: 2" in {
+ "pass misc comparison test 4" in {
val clock1_3 = VectorClock()
- val clock2_3 = clock1_3.increment(1, System.currentTimeMillis)
- val clock3_3 = clock2_3.increment(2, System.currentTimeMillis)
- val clock4_3 = clock3_3.increment(1, System.currentTimeMillis)
+ val clock2_3 = clock1_3 + Node("1")
+ val clock3_3 = clock2_3 + Node("2")
+ val clock4_3 = clock3_3 + Node("1")
val clock1_4 = VectorClock()
- val clock2_4 = clock1_4.increment(1, System.currentTimeMillis)
- val clock3_4 = clock2_4.increment(1, System.currentTimeMillis)
- val clock4_4 = clock3_4.increment(3, System.currentTimeMillis)
+ val clock2_4 = clock1_4 + Node("1")
+ val clock3_4 = clock2_4 + Node("1")
+ val clock4_4 = clock3_4 + Node("3")
- clock4_3.compare(clock4_4) must be(Concurrent)
+ clock4_3 <> clock4_4 must be(true)
}
- ".." in {
+ "pass misc comparison test 5" in {
val clock1_1 = VectorClock()
- val clock2_1 = clock1_1.increment(2, System.currentTimeMillis)
- val clock3_1 = clock2_1.increment(2, System.currentTimeMillis)
+ val clock2_1 = clock1_1 + Node("2")
+ val clock3_1 = clock2_1 + Node("2")
val clock1_2 = VectorClock()
- val clock2_2 = clock1_2.increment(1, System.currentTimeMillis)
- val clock3_2 = clock2_2.increment(2, System.currentTimeMillis)
- val clock4_2 = clock3_2.increment(2, System.currentTimeMillis)
- val clock5_2 = clock4_2.increment(3, System.currentTimeMillis)
+ val clock2_2 = clock1_2 + Node("1")
+ val clock3_2 = clock2_2 + Node("2")
+ val clock4_2 = clock3_2 + Node("2")
+ val clock5_2 = clock4_2 + Node("3")
- clock3_1.compare(clock5_2) must be(Before)
+ clock3_1 < clock5_2 must be(true)
+ clock5_2 > clock3_1 must be(true)
}
- "..." in {
+ "pass misc comparison test 6" in {
val clock1_1 = VectorClock()
- val clock2_1 = clock1_1.increment(1, System.currentTimeMillis)
- val clock3_1 = clock2_1.increment(2, System.currentTimeMillis)
- val clock4_1 = clock3_1.increment(2, System.currentTimeMillis)
- val clock5_1 = clock4_1.increment(3, System.currentTimeMillis)
+ val clock2_1 = clock1_1 + Node("1")
+ val clock3_1 = clock2_1 + Node("2")
val clock1_2 = VectorClock()
- val clock2_2 = clock1_2.increment(2, System.currentTimeMillis)
- val clock3_2 = clock2_2.increment(2, System.currentTimeMillis)
+ val clock2_2 = clock1_2 + Node("1")
+ val clock3_2 = clock2_2 + Node("1")
- clock5_1.compare(clock3_2) must be(After)
+ clock3_1 <> clock3_2 must be(true)
+ clock3_2 <> clock3_1 must be(true)
+ }
+
+ "pass misc comparison test 7" in {
+ val clock1_1 = VectorClock()
+ val clock2_1 = clock1_1 + Node("1")
+ val clock3_1 = clock2_1 + Node("2")
+ val clock4_1 = clock3_1 + Node("2")
+ val clock5_1 = clock4_1 + Node("3")
+
+ val clock1_2 = VectorClock()
+ val clock2_2 = clock1_2 + Node("2")
+ val clock3_2 = clock2_2 + Node("2")
+
+ clock5_1 <> clock3_2 must be(true)
+ clock3_2 <> clock5_1 must be(true)
+ }
+
+ "correctly merge two clocks" in {
+ val node1 = Node("1")
+ val node2 = Node("2")
+ val node3 = Node("3")
+
+ val clock1_1 = VectorClock()
+ val clock2_1 = clock1_1 + node1
+ val clock3_1 = clock2_1 + node2
+ val clock4_1 = clock3_1 + node2
+ val clock5_1 = clock4_1 + node3
+
+ val clock1_2 = VectorClock()
+ val clock2_2 = clock1_2 + node2
+ val clock3_2 = clock2_2 + node2
+
+ val merged1 = clock3_2 merge clock5_1
+ merged1.versions.size must be(3)
+ merged1.versions.contains(node1) must be(true)
+ merged1.versions.contains(node2) must be(true)
+ merged1.versions.contains(node3) must be(true)
+
+ val merged2 = clock5_1 merge clock3_2
+ merged2.versions.size must be(3)
+ merged2.versions.contains(node1) must be(true)
+ merged2.versions.contains(node2) must be(true)
+ merged2.versions.contains(node3) must be(true)
+
+ clock3_2 < merged1 must be(true)
+ clock5_1 < merged1 must be(true)
+
+ clock3_2 < merged2 must be(true)
+ clock5_1 < merged2 must be(true)
+
+ merged1 == merged2 must be(true)
+ }
+
+ "pass blank clock incrementing" in {
+ val node1 = Node("1")
+ val node2 = Node("2")
+ val node3 = Node("3")
+
+ val v1 = VectorClock()
+ val v2 = VectorClock()
+
+ val vv1 = v1 + node1
+ val vv2 = v2 + node2
+
+ (vv1 > v1) must equal(true)
+ (vv2 > v2) must equal(true)
+
+ (vv1 > v2) must equal(true)
+ (vv2 > v1) must equal(true)
+
+ (vv2 > vv1) must equal(false)
+ (vv1 > vv2) must equal(false)
+ }
+
+ "pass merging behavior" in {
+ val node1 = Node("1")
+ val node2 = Node("2")
+ val node3 = Node("3")
+
+ val a = VectorClock()
+ val b = VectorClock()
+
+ val a1 = a + node1
+ val b1 = b + node2
+
+ var a2 = a1 + node1
+ var c = a2.merge(b1)
+ var c1 = c + node3
+
+ (c1 > a2) must equal(true)
+ (c1 > b1) must equal(true)
}
}
- "A Versioned" must {
- class TestVersioned(val version: VectorClock = VectorClock()) extends Versioned {
- def increment(v: Int, time: Long) = new TestVersioned(version.increment(v, time))
+ "An instance of Versioned" must {
+ class TestVersioned(val version: VectorClock = VectorClock()) extends Versioned[TestVersioned] {
+ def +(node: Node): TestVersioned = new TestVersioned(version + node)
}
+ import Versioned.latestVersionOf
+
"have zero versions when created" in {
val versioned = new TestVersioned()
- versioned.version.versions must be(Vector())
+ versioned.version.versions must be(Map())
}
"happen before an identical versioned with a single additional event" in {
val versioned1_1 = new TestVersioned()
- val versioned2_1 = versioned1_1.increment(1, System.currentTimeMillis)
- val versioned3_1 = versioned2_1.increment(2, System.currentTimeMillis)
- val versioned4_1 = versioned3_1.increment(1, System.currentTimeMillis)
+ val versioned2_1 = versioned1_1 + Node("1")
+ val versioned3_1 = versioned2_1 + Node("2")
+ val versioned4_1 = versioned3_1 + Node("1")
val versioned1_2 = new TestVersioned()
- val versioned2_2 = versioned1_2.increment(1, System.currentTimeMillis)
- val versioned3_2 = versioned2_2.increment(2, System.currentTimeMillis)
- val versioned4_2 = versioned3_2.increment(1, System.currentTimeMillis)
- val versioned5_2 = versioned4_2.increment(3, System.currentTimeMillis)
+ val versioned2_2 = versioned1_2 + Node("1")
+ val versioned3_2 = versioned2_2 + Node("2")
+ val versioned4_2 = versioned3_2 + Node("1")
+ val versioned5_2 = versioned4_2 + Node("3")
- Versioned.latestVersionOf[TestVersioned](versioned4_1, versioned5_2) must be(versioned5_2)
+ latestVersionOf[TestVersioned](versioned4_1, versioned5_2) must be(versioned5_2)
}
- "Two versioneds with different events should be concurrent: 1" in {
+ "pass misc comparison test 1" in {
var versioned1_1 = new TestVersioned()
- val versioned2_1 = versioned1_1.increment(1, System.currentTimeMillis)
+ val versioned2_1 = versioned1_1 + Node("1")
val versioned1_2 = new TestVersioned()
- val versioned2_2 = versioned1_2.increment(2, System.currentTimeMillis)
+ val versioned2_2 = versioned1_2 + Node("2")
- Versioned.latestVersionOf[TestVersioned](versioned2_1, versioned2_2) must be(versioned2_1)
+ latestVersionOf[TestVersioned](versioned2_1, versioned2_2) must be(versioned2_2)
}
- "Two versioneds with different events should be concurrent: 2" in {
+ "pass misc comparison test 2" in {
val versioned1_3 = new TestVersioned()
- val versioned2_3 = versioned1_3.increment(1, System.currentTimeMillis)
- val versioned3_3 = versioned2_3.increment(2, System.currentTimeMillis)
- val versioned4_3 = versioned3_3.increment(1, System.currentTimeMillis)
+ val versioned2_3 = versioned1_3 + Node("1")
+ val versioned3_3 = versioned2_3 + Node("2")
+ val versioned4_3 = versioned3_3 + Node("1")
val versioned1_4 = new TestVersioned()
- val versioned2_4 = versioned1_4.increment(1, System.currentTimeMillis)
- val versioned3_4 = versioned2_4.increment(1, System.currentTimeMillis)
- val versioned4_4 = versioned3_4.increment(3, System.currentTimeMillis)
+ val versioned2_4 = versioned1_4 + Node("1")
+ val versioned3_4 = versioned2_4 + Node("1")
+ val versioned4_4 = versioned3_4 + Node("3")
- Versioned.latestVersionOf[TestVersioned](versioned4_3, versioned4_4) must be(versioned4_3)
+ latestVersionOf[TestVersioned](versioned4_3, versioned4_4) must be(versioned4_4)
}
- "be earlier than another versioned if it has an older version" in {
+ "pass misc comparison test 3" in {
val versioned1_1 = new TestVersioned()
- val versioned2_1 = versioned1_1.increment(2, System.currentTimeMillis)
- val versioned3_1 = versioned2_1.increment(2, System.currentTimeMillis)
+ val versioned2_1 = versioned1_1 + Node("2")
+ val versioned3_1 = versioned2_1 + Node("2")
val versioned1_2 = new TestVersioned()
- val versioned2_2 = versioned1_2.increment(1, System.currentTimeMillis)
- val versioned3_2 = versioned2_2.increment(2, System.currentTimeMillis)
- val versioned4_2 = versioned3_2.increment(2, System.currentTimeMillis)
- val versioned5_2 = versioned4_2.increment(3, System.currentTimeMillis)
+ val versioned2_2 = versioned1_2 + Node("1")
+ val versioned3_2 = versioned2_2 + Node("2")
+ val versioned4_2 = versioned3_2 + Node("2")
+ val versioned5_2 = versioned4_2 + Node("3")
- Versioned.latestVersionOf[TestVersioned](versioned3_1, versioned5_2) must be(versioned5_2)
+ latestVersionOf[TestVersioned](versioned3_1, versioned5_2) must be(versioned5_2)
}
- "be later than another versioned if it has an newer version" in {
+ "pass misc comparison test 4" in {
val versioned1_1 = new TestVersioned()
- val versioned2_1 = versioned1_1.increment(1, System.currentTimeMillis)
- val versioned3_1 = versioned2_1.increment(2, System.currentTimeMillis)
- val versioned4_1 = versioned3_1.increment(2, System.currentTimeMillis)
- val versioned5_1 = versioned4_1.increment(3, System.currentTimeMillis)
+ val versioned2_1 = versioned1_1 + Node("1")
+ val versioned3_1 = versioned2_1 + Node("2")
+ val versioned4_1 = versioned3_1 + Node("2")
+ val versioned5_1 = versioned4_1 + Node("3")
val versioned1_2 = new TestVersioned()
- val versioned2_2 = versioned1_2.increment(2, System.currentTimeMillis)
- val versioned3_2 = versioned2_2.increment(2, System.currentTimeMillis)
+ val versioned2_2 = versioned1_2 + Node("2")
+ val versioned3_2 = versioned2_2 + Node("2")
- Versioned.latestVersionOf[TestVersioned](versioned5_1, versioned3_2) must be(versioned5_1)
+ latestVersionOf[TestVersioned](versioned5_1, versioned3_2) must be(versioned3_2)
}
}
}
diff --git a/akka-docs/cluster/cluster.rst b/akka-docs/cluster/cluster.rst
index 9790b6bccd..231830cecb 100644
--- a/akka-docs/cluster/cluster.rst
+++ b/akka-docs/cluster/cluster.rst
@@ -53,17 +53,44 @@ These terms are used throughout the documentation.
A mapping from partition path to a set of instance nodes (where the nodes are
referred to by the ordinal position given the nodes in sorted order).
+**leader**
+ A single node in the cluster that acts as the leader. Managing cluster convergence,
+ partitions, fail-over, rebalancing etc.
+
+**deputy nodes**
+ A set of nodes responsible for breaking logical partitions.
+
Membership
==========
A cluster is made up of a set of member nodes. The identifier for each node is a
-`hostname:port` pair. An Akka application is distributed over a cluster with
+``hostname:port`` pair. An Akka application is distributed over a cluster with
each node hosting some part of the application. Cluster membership and
partitioning of the application are decoupled. A node could be a member of a
cluster without hosting any actors.
+Singleton Cluster
+-----------------
+
+If a node does not have a preconfigured contact point to join in the Akka
+configuration, then it is considered a singleton cluster (single node cluster)
+and will automatically transition from ``joining`` to ``up``. Singleton clusters
+can later explicitly send a ``Join`` message to another node to form a N-node
+cluster. It is also possible to link multiple N-node clusters by ``joining`` them.
+
+
+Singleton Cluster
+-----------------
+
+If a node does not have a preconfigured contact point to join in the Akka
+configuration, then it is considered a singleton cluster (single node cluster)
+and will automatically transition from ``joining`` to ``up``. Singleton clusters
+can later explicitly send a ``Join`` message to another node to form a N-node
+cluster. It is also possible to link multiple N-node clusters by ``joining`` them.
+
+
Gossip
------
@@ -71,8 +98,8 @@ The cluster membership used in Akka is based on Amazon's `Dynamo`_ system and
particularly the approach taken in Basho's' `Riak`_ distributed database.
Cluster membership is communicated using a `Gossip Protocol`_, where the current
state of the cluster is gossiped randomly through the cluster. Joining a cluster
-is initiated by specifying a set of ``seed`` nodes with which to begin
-gossiping.
+is initiated by issuing a ``Join`` command to one of the nodes in the cluster to
+join.
.. _Gossip Protocol: http://en.wikipedia.org/wiki/Gossip_protocol
.. _Dynamo: http://www.allthingsdistributed.com/files/amazon-dynamo-sosp2007.pdf
@@ -98,7 +125,7 @@ the `pruning algorithm`_ in Riak.
.. _pruning algorithm: http://wiki.basho.com/Vector-Clocks.html#Vector-Clock-Pruning
-Gossip convergence
+Gossip Convergence
^^^^^^^^^^^^^^^^^^
Information about the cluster converges at certain points of time. This is when
@@ -142,31 +169,45 @@ order to account for network issues that sometimes occur on such platforms.
Leader
^^^^^^
-After gossip convergence a leader for the cluster can be determined. There is no
-leader election process, the leader can always be recognised deterministically
-by any node whenever there is gossip convergence. The leader is simply the first
+After gossip convergence a ``leader`` for the cluster can be determined. There is no
+``leader`` election process, the ``leader`` can always be recognised deterministically
+by any node whenever there is gossip convergence. The ``leader`` is simply the first
node in sorted order that is able to take the leadership role, where the only
-allowed member states for a leader are ``up`` or ``leaving`` (see below for more
+allowed member states for a ``leader`` are ``up`` or ``leaving`` (see below for more
information about member states).
-The role of the leader is to shift members in and out of the cluster, changing
+The role of the ``leader`` is to shift members in and out of the cluster, changing
``joining`` members to the ``up`` state or ``exiting`` members to the
``removed`` state, and to schedule rebalancing across the cluster. Currently
-leader actions are only triggered by receiving a new cluster state with gossip
+``leader`` actions are only triggered by receiving a new cluster state with gossip
convergence but it may also be possible for the user to explicitly rebalance the
cluster by specifying migrations, or to rebalance the cluster automatically
based on metrics from member nodes. Metrics may be spread using the gossip
protocol or possibly more efficiently using a *random chord* method, where the
-leader contacts several random nodes around the cluster ring and each contacted
+``leader`` contacts several random nodes around the cluster ring and each contacted
node gathers information from their immediate neighbours, giving a random
sampling of load information.
-The leader also has the power, if configured so, to "auto-down" a node that
+The ``leader`` also has the power, if configured so, to "auto-down" a node that
according to the Failure Detector is considered unreachable. This means setting
the unreachable node status to ``down`` automatically.
-Gossip protocol
+Deputy Nodes
+^^^^^^^^^^^^
+
+After gossip convergence a set of ``deputy`` nodes for the cluster can be
+determined. As with the ``leader``, there is no ``deputy`` election process,
+the deputies can always be recognised deterministically by any node whenever there
+is gossip convergence. The list of ``deputy`` nodes is simply the N - 1 number
+of nodes (e.g. starting with the first node after the ``leader``) in sorted order.
+
+The nodes defined as ``deputy`` nodes are just regular member nodes whose only
+"special role" is to help breaking logical partitions as seen in the gossip
+algorithm defined below.
+
+
+Gossip Protocol
^^^^^^^^^^^^^^^
A variation of *push-pull gossip* is used to reduce the amount of gossip
@@ -182,14 +223,14 @@ nodes involved in a gossip exchange.
Periodically, the default is every 1 second, each node chooses another random
node to initiate a round of gossip with. The choice of node is random but can
-also include extra gossiping for unreachable nodes, seed nodes, and nodes with
+also include extra gossiping for unreachable nodes, ``deputy`` nodes, and nodes with
either newer or older state versions.
The gossip overview contains the current state version for all nodes and also a
list of unreachable nodes. Whenever a node receives a gossip overview it updates
the `Failure Detector`_ with the liveness information.
-The nodes defined as ``seed`` nodes are just regular member nodes whose only
+The nodes defined as ``deputy`` nodes are just regular member nodes whose only
"special role" is to function as contact points in the cluster and to help
breaking logical partitions as seen in the gossip algorithm defined below.
@@ -200,9 +241,9 @@ During each round of gossip exchange the following process is used:
2. Gossip to random unreachable node with certain probability depending on the
number of unreachable and live nodes
-3. If the node gossiped to at (1) was not a ``seed`` node, or the number of live
- nodes is less than number of seeds, gossip to random ``seed`` node with
- certain probability depending on number of unreachable, seed, and live nodes.
+3. If the node gossiped to at (1) was not a ``deputy`` node, or the number of live
+ nodes is less than number of ``deputy`` nodes, gossip to random ``deputy`` node with
+ certain probability depending on number of unreachable, ``deputy``, and live nodes.
4. Gossip to random node with newer or older state information, based on the
current gossip overview, with some probability (?)
@@ -256,18 +297,18 @@ Some of the other structures used are::
PartitionChangeStatus = Awaiting | Complete
-Membership lifecycle
+Membership Lifecycle
--------------------
A node begins in the ``joining`` state. Once all nodes have seen that the new
-node is joining (through gossip convergence) the leader will set the member
+node is joining (through gossip convergence) the ``leader`` will set the member
state to ``up`` and can start assigning partitions to the new node.
If a node is leaving the cluster in a safe, expected manner then it switches to
-the ``leaving`` state. The leader will reassign partitions across the cluster
-(it is possible for a leaving node to itself be the leader). When all partition
+the ``leaving`` state. The ``leader`` will reassign partitions across the cluster
+(it is possible for a leaving node to itself be the ``leader``). When all partition
handoff has completed then the node will change to the ``exiting`` state. Once
-all nodes have seen the exiting state (convergence) the leader will remove the
+all nodes have seen the exiting state (convergence) the ``leader`` will remove the
node from the cluster, marking it as ``removed``.
A node can also be removed forcefully by moving it directly to the ``removed``
@@ -275,7 +316,7 @@ state using the ``remove`` action. The cluster will rebalance based on the new
cluster membership.
If a node is unreachable then gossip convergence is not possible and therefore
-any leader actions are also not possible (for instance, allowing a node to
+any ``leader`` actions are also not possible (for instance, allowing a node to
become a part of the cluster, or changing actor distribution). To be able to
move forward the state of the unreachable nodes must be changed. If the
unreachable node is experiencing only transient difficulties then it can be
@@ -289,13 +330,13 @@ This means that nodes can join and leave the cluster at any point in time,
e.g. provide cluster elasticity.
-State diagram for the member states
+State Diagram for the Member States
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
.. image:: images/member-states.png
-Member states
+Member States
^^^^^^^^^^^^^
- **joining**
@@ -314,12 +355,12 @@ Member states
marked as down/offline/unreachable
-User actions
+User Actions
^^^^^^^^^^^^
- **join**
join a single node to a cluster - can be explicit or automatic on
- startup if a list of seed nodes have been specified in the configuration
+ startup if a node to join have been specified in the configuration
- **leave**
tell a node to leave the cluster gracefully
@@ -331,10 +372,10 @@ User actions
remove a node from the cluster immediately
-Leader actions
+Leader Actions
^^^^^^^^^^^^^^
-The leader has the following duties:
+The ``leader`` has the following duties:
- shifting members in and out of the cluster
@@ -360,7 +401,7 @@ set of nodes in the cluster. The actor at the head of the partition is referred
to as the partition point. The mapping from partition path (actor address of the
format "a/b/c") to instance nodes is stored in the partition table and is
maintained as part of the cluster state through the gossip protocol. The
-partition table is only updated by the leader node. Currently the only possible
+partition table is only updated by the ``leader`` node. Currently the only possible
partition points are *routed* actors.
Routed actors can have an instance count greater than one. The instance count is
@@ -371,7 +412,7 @@ Note that in the first implementation there may be a restriction such that only
top-level partitions are possible (the highest possible partition points are
used and sub-partitioning is not allowed). Still to be explored in more detail.
-The cluster leader determines the current instance count for a partition based
+The cluster ``leader`` determines the current instance count for a partition based
on two axes: fault-tolerance and scaling.
Fault-tolerance determines a minimum number of instances for a routed actor
@@ -411,8 +452,8 @@ the following, with all instances on the same physical nodes as before::
B -> { 7, 9, 10 }
C -> { 12, 14, 15, 1, 2 }
-When rebalancing is required the leader will schedule handoffs, gossiping a set
-of pending changes, and when each change is complete the leader will update the
+When rebalancing is required the ``leader`` will schedule handoffs, gossiping a set
+of pending changes, and when each change is complete the ``leader`` will update the
partition table.
@@ -432,7 +473,7 @@ the handoff), given a previous host node ``N1``, a new host node ``N2``, and an
actor partition ``A`` to be migrated from ``N1`` to ``N2``, has this general
structure:
- 1. the leader sets a pending change for ``N1`` to handoff ``A`` to ``N2``
+ 1. the ``leader`` sets a pending change for ``N1`` to handoff ``A`` to ``N2``
2. ``N1`` notices the pending change and sends an initialization message to ``N2``
@@ -441,7 +482,7 @@ structure:
4. after receiving the ready message ``N1`` marks the change as
complete and shuts down ``A``
- 5. the leader sees the migration is complete and updates the partition table
+ 5. the ``leader`` sees the migration is complete and updates the partition table
6. all nodes eventually see the new partitioning and use ``N2``
@@ -453,7 +494,7 @@ There are transition times in the handoff process where different approaches can
be used to give different guarantees.
-Migration transition
+Migration Transition
~~~~~~~~~~~~~~~~~~~~
The first transition starts when ``N1`` initiates the moving of ``A`` and ends
@@ -476,7 +517,7 @@ buffered until the actor is ready, or the messages are simply dropped by
terminating the actor and allowing the normal dead letter process to be used.
-Update transition
+Update Transition
~~~~~~~~~~~~~~~~~
The second transition begins when the migration is marked as complete and ends
@@ -510,12 +551,12 @@ messages sent directly to ``N2`` before the acknowledgement has been forwarded
that will be buffered.
-Graceful handoff
+Graceful Handoff
^^^^^^^^^^^^^^^^
A more complete process for graceful handoff would be:
- 1. the leader sets a pending change for ``N1`` to handoff ``A`` to ``N2``
+ 1. the ``leader`` sets a pending change for ``N1`` to handoff ``A`` to ``N2``
2. ``N1`` notices the pending change and sends an initialization message to
@@ -546,7 +587,7 @@ A more complete process for graceful handoff would be:
becoming dead letters)
- 5. the leader sees the migration is complete and updates the partition table
+ 5. the ``leader`` sees the migration is complete and updates the partition table
6. all nodes eventually see the new partitioning and use ``N2``
@@ -590,7 +631,7 @@ distributed datastore. See the next section for a rough outline on how the
distributed datastore could be implemented.
-Implementing a Dynamo-style distributed database on top of Akka Cluster
+Implementing a Dynamo-style Distributed Database on top of Akka Cluster
-----------------------------------------------------------------------
The missing pieces to implement a full Dynamo-style eventually consistent data
diff --git a/akka-docs/dev/building-akka.rst b/akka-docs/dev/building-akka.rst
index 1189b143b3..88cd31491a 100644
--- a/akka-docs/dev/building-akka.rst
+++ b/akka-docs/dev/building-akka.rst
@@ -10,7 +10,7 @@
This page describes how to build and run Akka from the latest source code.
-Get the source code
+Get the Source Code
===================
Akka uses `Git`_ and is hosted at `Github`_.
@@ -82,7 +82,20 @@ launch script to activate parallel execution::
-Dakka.parallelExecution=true
-Publish to local Ivy repository
+Long Running and Time Sensitive Tests
+-------------------------------------
+
+By default are the long running tests (mainly cluster tests) and time sensitive tests (dependent on the
+performance of the machine it is running on) disabled. You can enable them by adding one of the flags::
+
+ -Dakka.test.tags.include=long-running
+ -Dakka.test.tags.include=timing
+
+Or if you need to enable them both::
+
+ -Dakka.test.tags.include=long-running,timing
+
+Publish to Local Ivy Repository
-------------------------------
If you want to deploy the artifacts to your local Ivy repository (for example,
@@ -91,7 +104,7 @@ to use from an sbt project) use the ``publish-local`` command::
sbt publish-local
-sbt interactive mode
+sbt Interactive Mode
--------------------
Note that in the examples above we are calling ``sbt compile`` and ``sbt test``
@@ -111,7 +124,7 @@ For example, building Akka as above is more commonly done like this::
...
-sbt batch mode
+sbt Batch Mode
--------------
It's also possible to combine commands in a single call. For example, testing,
diff --git a/akka-docs/scala/fsm.rst b/akka-docs/scala/fsm.rst
index 278dcbbd35..0dcc12ed67 100644
--- a/akka-docs/scala/fsm.rst
+++ b/akka-docs/scala/fsm.rst
@@ -154,7 +154,7 @@ Defining States
A state is defined by one or more invocations of the method
:func:`when([, stateTimeout = ])(stateFunction)`.
-
+
The given name must be an object which is type-compatible with the first type
parameter given to the :class:`FSM` trait. This object is used as a hash key,
so you must ensure that it properly implements :meth:`equals` and
@@ -437,7 +437,7 @@ and in the following.
Event Tracing
-------------
-The setting ``akka.actor.debug.fsm`` in `:ref:`configuration` enables logging of an
+The setting ``akka.actor.debug.fsm`` in :ref:`configuration` enables logging of an
event trace by :class:`LoggingFSM` instances::
class MyFSM extends Actor with LoggingFSM[X, Z] {
diff --git a/akka-remote/src/main/scala/akka/remote/RemoteAddress.scala b/akka-remote/src/main/scala/akka/remote/RemoteAddress.scala
deleted file mode 100644
index f7274c2356..0000000000
--- a/akka-remote/src/main/scala/akka/remote/RemoteAddress.scala
+++ /dev/null
@@ -1,5 +0,0 @@
-/**
- * Copyright (C) 2009-2012 Typesafe Inc.
- */
-package akka.remote
-
diff --git a/akka-remote/src/main/scala/akka/remote/RemoteTransport.scala b/akka-remote/src/main/scala/akka/remote/RemoteTransport.scala
index 0d55f17915..41f76fea17 100644
--- a/akka-remote/src/main/scala/akka/remote/RemoteTransport.scala
+++ b/akka-remote/src/main/scala/akka/remote/RemoteTransport.scala
@@ -33,7 +33,7 @@ case class RemoteClientError(
@BeanProperty remoteAddress: Address) extends RemoteClientLifeCycleEvent {
override def logLevel = Logging.ErrorLevel
override def toString =
- "RemoteClientError@" + remoteAddress + ": Error[" + AkkaException.toStringWithStackTrace(cause) + "]"
+ "RemoteClientError@" + remoteAddress + ": Error[" + cause + "]"
}
case class RemoteClientDisconnected(
@@ -77,7 +77,7 @@ case class RemoteClientWriteFailed(
override def toString =
"RemoteClientWriteFailed@" + remoteAddress +
": MessageClass[" + (if (request ne null) request.getClass.getName else "no message") +
- "] Error[" + AkkaException.toStringWithStackTrace(cause) + "]"
+ "] Error[" + cause + "]"
}
/**
@@ -104,7 +104,7 @@ case class RemoteServerError(
@BeanProperty remote: RemoteTransport) extends RemoteServerLifeCycleEvent {
override def logLevel = Logging.ErrorLevel
override def toString =
- "RemoteServerError@" + remote + "] Error[" + AkkaException.toStringWithStackTrace(cause) + "]"
+ "RemoteServerError@" + remote + "] Error[" + cause + "]"
}
case class RemoteServerClientConnected(
@@ -191,7 +191,7 @@ abstract class RemoteTransport {
protected[akka] def notifyListeners(message: RemoteLifeCycleEvent): Unit = {
system.eventStream.publish(message)
- system.log.log(message.logLevel, "REMOTE: {}", message)
+ system.log.log(message.logLevel, "{}", message)
}
override def toString = address.toString
diff --git a/akka-testkit/src/test/scala/akka/testkit/AkkaSpec.scala b/akka-testkit/src/test/scala/akka/testkit/AkkaSpec.scala
index 95ce267320..3a0f02c79a 100644
--- a/akka-testkit/src/test/scala/akka/testkit/AkkaSpec.scala
+++ b/akka-testkit/src/test/scala/akka/testkit/AkkaSpec.scala
@@ -20,6 +20,7 @@ import akka.dispatch.Dispatchers
import akka.pattern.ask
object TimingTest extends Tag("timing")
+object LongRunningTest extends Tag("long-running")
object AkkaSpec {
val testConf: Config = ConfigFactory.parseString("""
diff --git a/project/AkkaBuild.scala b/project/AkkaBuild.scala
index 2c96c54ac9..5a62e6bdb5 100644
--- a/project/AkkaBuild.scala
+++ b/project/AkkaBuild.scala
@@ -342,7 +342,7 @@ object AkkaBuild extends Build {
val excludeTestTags = SettingKey[Seq[String]]("exclude-test-tags")
val includeTestTags = SettingKey[Seq[String]]("include-test-tags")
- val defaultExcludedTags = Seq("timing")
+ val defaultExcludedTags = Seq("timing", "long-running")
lazy val defaultSettings = baseSettings ++ formatSettings ++ Seq(
resolvers += "Typesafe Repo" at "http://repo.typesafe.com/typesafe/releases/",