2011-10-26 08:48:16 +02:00
/* *
2012-01-19 18:21:06 +01:00
* Copyright ( C ) 2009 - 2012 Typesafe Inc . < http : //www.typesafe.com>
2011-10-26 08:48:16 +02:00
*/
2012-01-31 13:33:04 +01:00
package akka.cluster
2011-10-26 08:48:16 +02:00
import akka.actor._
import akka.actor.Status._
2012-01-31 13:33:04 +01:00
import akka.remote._
2012-02-20 15:26:12 +01:00
import akka.routing._
2011-10-27 12:46:10 +02:00
import akka.event.Logging
2012-01-24 12:09:32 +01:00
import akka.dispatch.Await
2012-01-31 13:33:04 +01:00
import akka.pattern.ask
import akka.util._
2012-03-02 16:20:30 +01:00
import akka.util.duration._
2011-11-25 12:02:25 +01:00
import akka.config.ConfigurationException
2012-01-30 11:41:41 +01:00
import java.util.concurrent.atomic. { AtomicReference , AtomicBoolean }
2012-01-24 12:09:32 +01:00
import java.util.concurrent.TimeUnit._
import java.util.concurrent.TimeoutException
2011-10-27 15:14:15 +02:00
import java.security.SecureRandom
2011-11-25 12:02:25 +01:00
2012-01-30 11:41:41 +01:00
import scala.collection.immutable. { Map , SortedSet }
2011-10-26 08:48:16 +02:00
import scala.annotation.tailrec
2011-11-25 12:02:25 +01:00
2012-01-24 12:09:32 +01:00
import com.google.protobuf.ByteString
2011-10-26 08:48:16 +02:00
/* *
2012-02-09 15:59:10 +01:00
* Interface for membership change listener .
2011-10-26 08:48:16 +02:00
*/
2012-02-18 22:14:53 +01:00
trait MembershipChangeListener {
2012-03-09 12:56:56 +01:00
// FIXME bad for Java - convert to Array?
2012-02-09 15:59:10 +01:00
def notify ( members : SortedSet [ Member ] ) : Unit
}
/* *
* Interface for meta data change listener .
*/
trait MetaDataChangeListener { // FIXME add management and notification for MetaDataChangeListener
2012-03-09 12:56:56 +01:00
// FIXME bad for Java - convert to what?
2012-02-09 15:59:10 +01:00
def notify ( meta : Map [ String , Array [ Byte ] ] ) : Unit
2011-10-26 08:48:16 +02:00
}
2012-02-07 16:53:49 +01:00
// FIXME create Protobuf messages out of all the Gossip stuff - but wait until the prototol is fully stablized.
2011-10-26 08:48:16 +02:00
/* *
2012-01-30 11:41:41 +01:00
* Base trait for all cluster messages . All ClusterMessage 's are serializable .
2011-10-26 08:48:16 +02:00
*/
2012-01-30 19:40:28 +01:00
sealed trait ClusterMessage extends Serializable
2012-01-30 11:41:41 +01:00
/* *
2012-02-28 17:04:48 +01:00
* Cluster commands sent by the USER .
2012-01-30 11:41:41 +01:00
*/
2012-02-29 10:02:00 +01:00
object ClusterAction {
2011-10-26 08:48:16 +02:00
2012-02-28 17:04:48 +01:00
/* *
* Command to join the cluster . Sent when a node ( reprsesented by 'address' )
* wants to join another node ( the receiver ) .
*/
case class Join ( address : Address ) extends ClusterMessage
2012-02-09 13:36:39 +01:00
2012-02-29 10:02:00 +01:00
/* *
* Command to set a node to Up ( from Joining ) .
*/
2012-03-03 23:55:48 +01:00
case class Up ( address : Address ) extends ClusterMessage
2012-02-29 10:02:00 +01:00
2012-02-28 17:04:48 +01:00
/* *
* Command to leave the cluster .
*/
2012-03-03 23:55:48 +01:00
case class Leave ( address : Address ) extends ClusterMessage
2012-02-28 17:04:48 +01:00
/* *
* Command to mark node as temporary down .
*/
2012-03-03 23:55:48 +01:00
case class Down ( address : Address ) extends ClusterMessage
2012-02-28 17:04:48 +01:00
/* *
* Command to mark a node to be removed from the cluster immediately .
*/
2012-03-03 23:55:48 +01:00
case class Exit ( address : Address ) extends ClusterMessage
2012-02-28 17:04:48 +01:00
/* *
* Command to remove a node from the cluster immediately .
*/
2012-03-03 23:55:48 +01:00
case class Remove ( address : Address ) extends ClusterMessage
2012-02-28 17:04:48 +01:00
}
2011-10-26 08:48:16 +02:00
2012-01-30 11:41:41 +01:00
/* *
* Represents the address and the current status of a cluster member node .
*/
2012-03-09 12:56:56 +01:00
class Member ( val address : Address , val status : MemberStatus ) extends ClusterMessage {
override def hashCode = address . # #
override def equals ( other : Any ) = Member . unapply ( this ) == Member . unapply ( other )
override def toString = "Member(address = %s, status = %s)" format ( address , status )
def copy ( address : Address = this . address , status : MemberStatus = this . status ) : Member = new Member ( address , status )
}
/* *
* Factory and Utility module for Member instances .
*/
object Member {
import MemberStatus._
implicit val ordering = Ordering . fromLessThan [ Member ] ( _ . address . toString < _ . address . toString )
def apply ( address : Address , status : MemberStatus ) : Member = new Member ( address , status )
def unapply ( other : Any ) = other match {
case m : Member ⇒ Some ( m . address )
case _ ⇒ None
}
/* *
* Picks the Member with the highest "priority" MemberStatus .
*/
def highestPriorityOf ( m1 : Member , m2 : Member ) : Member = ( m1 . status , m2 . status ) match {
case ( Removed , _ ) ⇒ m1
case ( _ , Removed ) ⇒ m2
case ( Down , _ ) ⇒ m1
case ( _ , Down ) ⇒ m2
case ( Exiting , _ ) ⇒ m1
case ( _ , Exiting ) ⇒ m2
case ( Leaving , _ ) ⇒ m1
case ( _ , Leaving ) ⇒ m2
case ( Up , Joining ) ⇒ m1
case ( Joining , Up ) ⇒ m2
case ( Joining , Joining ) ⇒ m1
case ( Up , Up ) ⇒ m1
}
}
2011-10-26 08:48:16 +02:00
2012-02-09 15:59:10 +01:00
/* *
* Envelope adding a sender address to the gossip .
*/
case class GossipEnvelope ( sender : Member , gossip : Gossip ) extends ClusterMessage
2012-01-24 12:09:32 +01:00
/* *
2012-01-30 11:41:41 +01:00
* Defines the current status of a cluster member node
*
* Can be one of : Joining , Up , Leaving , Exiting and Down .
2012-01-24 12:09:32 +01:00
*/
2012-02-07 16:53:49 +01:00
sealed trait MemberStatus extends ClusterMessage
2012-01-30 11:41:41 +01:00
object MemberStatus {
2012-02-07 16:53:49 +01:00
case object Joining extends MemberStatus
case object Up extends MemberStatus
case object Leaving extends MemberStatus
case object Exiting extends MemberStatus
case object Down extends MemberStatus
2012-02-28 17:04:48 +01:00
case object Removed extends MemberStatus
2012-03-09 12:56:56 +01:00
def isUnavailable ( status : MemberStatus ) : Boolean = {
status == MemberStatus . Down ||
status == MemberStatus . Exiting ||
status == MemberStatus . Removed ||
status == MemberStatus . Leaving
}
2012-01-24 12:09:32 +01:00
}
2012-02-09 13:36:39 +01:00
/* *
2012-02-14 20:50:12 +01:00
* Represents the overview of the cluster , holds the cluster convergence table and set with unreachable nodes .
2012-02-09 13:36:39 +01:00
*/
case class GossipOverview (
2012-02-09 15:59:10 +01:00
seen : Map [ Address , VectorClock ] = Map . empty [ Address , VectorClock ] ,
2012-03-09 12:56:56 +01:00
unreachable : Set [ Member ] = Set . empty [ Member ] ) {
// FIXME document when nodes are put in 'unreachable' set and removed from 'members'
2012-02-14 20:50:12 +01:00
override def toString =
"GossipOverview(seen = [" + seen . mkString ( ", " ) +
"], unreachable = [" + unreachable . mkString ( ", " ) +
"])"
}
2012-02-09 13:36:39 +01:00
/* *
* Represents the state of the cluster ; cluster ring membership , ring convergence , meta data - all versioned by a vector clock .
*/
case class Gossip (
overview : GossipOverview = GossipOverview ( ) ,
members : SortedSet [ Member ] , // sorted set of members with their status, sorted by name
meta : Map [ String , Array [ Byte ] ] = Map . empty [ String , Array [ Byte ] ] ,
version : VectorClock = VectorClock ( ) ) // vector clock version
extends ClusterMessage // is a serializable cluster message
2012-02-14 20:50:12 +01:00
with Versioned [ Gossip ] {
/* *
* Increments the version for this 'Node' .
*/
def + ( node : VectorClock . Node ) : Gossip = copy ( version = version + node )
2012-02-09 15:59:10 +01:00
2012-02-14 20:50:12 +01:00
def + ( member : Member ) : Gossip = {
2012-02-09 15:59:10 +01:00
if ( members contains member ) this
else this copy ( members = members + member )
}
/* *
* Marks the gossip as seen by this node ( remoteAddress ) by updating the address entry in the 'gossip. overview . seen '
* Map with the VectorClock for the new gossip .
*/
2012-03-02 09:55:54 +01:00
def seen ( address : Address ) : Gossip = {
if ( overview . seen . contains ( address ) && overview . seen ( address ) == version ) this
else this copy ( overview = overview copy ( seen = overview . seen + ( address -> version ) ) )
}
2012-02-09 15:59:10 +01:00
2012-03-09 12:56:56 +01:00
/* *
* Merges two Gossip instances including membership tables , meta - data tables and the VectorClock histories .
*/
def merge ( that : Gossip ) : Gossip = {
import Member.ordering
// 1. merge vector clocks
val mergedVClock = this . version merge that . version
// 2. group all members by Address => Vector[Member]
var membersGroupedByAddress = Map . empty [ Address , Vector [ Member ] ]
( this . members ++ that . members ) foreach { m ⇒
val ms = membersGroupedByAddress . get ( m . address ) . getOrElse ( Vector . empty [ Member ] )
membersGroupedByAddress += ( m . address -> ( ms : + m ) )
}
// 3. merge members by selecting the single Member with highest MemberStatus out of the Member groups
val mergedMembers =
SortedSet . empty [ Member ] ++
membersGroupedByAddress . values . foldLeft ( Vector . empty [ Member ] ) { ( acc , members ) ⇒
acc : + members . reduceLeft ( Member . highestPriorityOf ( _ , _ ) )
}
// 4. merge meta-data
val mergedMeta = this . meta ++ that . meta
// 5. merge gossip overview
val mergedOverview = GossipOverview (
this . overview . seen ++ that . overview . seen ,
this . overview . unreachable ++ that . overview . unreachable )
Gossip ( mergedOverview , mergedMembers , mergedMeta , mergedVClock )
}
2012-02-14 20:50:12 +01:00
override def toString =
"Gossip(" +
"overview = " + overview +
", members = [" + members . mkString ( ", " ) +
"], meta = [" + meta . mkString ( ", " ) +
"], version = " + version +
")"
2012-02-09 15:59:10 +01:00
}
2012-01-24 12:09:32 +01:00
2012-02-20 15:26:12 +01:00
/* *
2012-03-09 12:56:56 +01:00
* Manages routing of the different cluster commands .
2012-03-03 23:55:48 +01:00
* Instantiated as a single instance for each Node - e . g . commands are serialized to Node message after message .
2012-02-20 15:26:12 +01:00
*/
2012-03-09 12:56:56 +01:00
final class ClusterCommandDaemon extends Actor {
import ClusterAction._
2012-03-03 23:55:48 +01:00
val node = Node ( context . system )
2012-03-09 12:56:56 +01:00
val log = Logging ( context . system , this )
2012-02-20 15:26:12 +01:00
2012-03-09 12:56:56 +01:00
def receive = {
case Join ( address ) ⇒ node . joining ( address )
case Up ( address ) ⇒ node . up ( address )
case Down ( address ) ⇒ node . downing ( address )
case Leave ( address ) ⇒ node . leaving ( address )
case Exit ( address ) ⇒ node . exiting ( address )
case Remove ( address ) ⇒ node . removing ( address )
2012-02-20 15:26:12 +01:00
}
2012-03-09 12:56:56 +01:00
override def unhandled ( unknown : Any ) = log . error ( "Illegal command [{}]" , unknown )
2012-02-20 15:26:12 +01:00
}
/* *
2012-03-03 23:55:48 +01:00
* Pooled and routed with N number of configurable instances .
2012-02-20 15:45:50 +01:00
* Concurrent access to Node .
2012-02-20 15:26:12 +01:00
*/
2012-03-03 23:55:48 +01:00
final class ClusterGossipDaemon extends Actor {
val log = Logging ( context . system , this )
val node = Node ( context . system )
2012-01-24 12:09:32 +01:00
def receive = {
2012-02-20 15:45:50 +01:00
case GossipEnvelope ( sender , gossip ) ⇒ node . receive ( sender , gossip )
2012-01-24 12:09:32 +01:00
}
2012-03-02 09:55:54 +01:00
2012-03-03 23:55:48 +01:00
override def unhandled ( unknown : Any ) = log . error ( "[/system/cluster/gossip] can not respond to messages - received [{}]" , unknown )
2012-03-02 16:20:30 +01:00
}
2012-03-09 12:56:56 +01:00
/* *
* Supervisor managing the different cluste daemons .
*/
2012-03-03 23:55:48 +01:00
final class ClusterDaemonSupervisor extends Actor {
val log = Logging ( context . system , this )
val node = Node ( context . system )
2012-03-02 16:20:30 +01:00
2012-03-03 23:55:48 +01:00
private val commands = context . actorOf ( Props [ ClusterCommandDaemon ] , "commands" )
private val gossip = context . actorOf (
Props [ ClusterGossipDaemon ] . withRouter ( RoundRobinRouter ( node . clusterSettings . NrOfGossipDaemons ) ) , "gossip" )
2012-03-02 16:20:30 +01:00
2012-03-03 23:55:48 +01:00
def receive = Actor . emptyBehavior
2012-03-02 16:20:30 +01:00
2012-03-03 23:55:48 +01:00
override def unhandled ( unknown : Any ) : Unit = log . error ( "/system/cluster can not respond to messages - received [{}]" , unknown )
2012-01-24 12:09:32 +01:00
}
2012-02-22 18:40:16 +01:00
/* *
* Node Extension Id and factory for creating Node extension .
* Example :
* { { {
2012-03-02 09:55:54 +01:00
* val node = Node ( system )
2012-02-29 10:02:00 +01:00
*
* if ( node . isLeader ) { . . . }
* } } }
2012-02-22 18:40:16 +01:00
*/
2012-03-02 09:55:54 +01:00
object Node extends ExtensionId [ Node ] with ExtensionIdProvider {
2012-02-22 18:40:16 +01:00
override def get ( system : ActorSystem ) : Node = super . get ( system )
2012-03-02 09:55:54 +01:00
override def lookup = Node
2012-02-22 18:40:16 +01:00
2012-03-02 09:55:54 +01:00
override def createExtension ( system : ExtendedActorSystem ) : Node = new Node ( system )
2012-02-22 18:40:16 +01:00
}
2012-02-14 20:50:12 +01:00
2011-10-26 08:48:16 +02:00
/* *
* This module is responsible for Gossiping cluster information . The abstraction maintains the list of live
2012-01-30 11:41:41 +01:00
* and dead members . Periodically i . e . every 1 second this module chooses a random member and initiates a round
2011-10-26 08:48:16 +02:00
* of Gossip with it . Whenever it gets gossip updates it updates the Failure Detector with the liveness
* information .
* < p />
2012-01-30 11:41:41 +01:00
* During each of these runs the member initiates gossip exchange according to following rules ( as defined in the
2011-10-26 08:48:16 +02:00
* Cassandra documentation [ http :// wiki . apache . org / cassandra / ArchitectureGossip ] :
* < pre >
2012-01-30 11:41:41 +01:00
* 1 ) Gossip to random live member ( if any )
* 2 ) Gossip to random unreachable member with certain probability depending on number of unreachable and live members
2012-02-07 16:53:49 +01:00
* 3 ) If the member gossiped to at ( 1 ) was not deputy , or the number of live members is less than number of deputy list ,
* gossip to random deputy with certain probability depending on number of unreachable , deputy and live members .
2011-10-26 08:48:16 +02:00
* </ pre >
2012-02-29 10:02:00 +01:00
*
* Example :
* { { {
2012-03-02 09:55:54 +01:00
* val node = Node ( system )
2012-02-29 10:02:00 +01:00
*
* if ( node . isLeader ) { . . . }
* } } }
2011-10-26 08:48:16 +02:00
*/
2012-03-02 09:55:54 +01:00
class Node ( system : ExtendedActorSystem ) extends Extension {
2012-02-22 18:40:16 +01:00
2011-10-26 08:48:16 +02:00
/* *
2012-02-20 15:45:50 +01:00
* Represents the state for this Node . Implemented using optimistic lockless concurrency ,
2011-10-26 08:48:16 +02:00
* all state is represented by this immutable case class and managed by an AtomicReference .
*/
private case class State (
2012-02-08 14:14:01 +01:00
latestGossip : Gossip ,
2012-02-09 15:59:10 +01:00
memberMembershipChangeListeners : Set [ MembershipChangeListener ] = Set . empty [ MembershipChangeListener ] )
2011-10-26 08:48:16 +02:00
2012-02-29 10:02:00 +01:00
if ( ! system . provider . isInstanceOf [ RemoteActorRefProvider ] )
throw new ConfigurationException ( "ActorSystem[" + system + "] needs to have a 'RemoteActorRefProvider' enabled in the configuration" )
private val remote : RemoteActorRefProvider = system . provider . asInstanceOf [ RemoteActorRefProvider ]
2012-01-30 19:40:28 +01:00
2012-03-02 16:20:30 +01:00
val remoteSettings = new RemoteSettings ( system . settings . config , system . name )
val clusterSettings = new ClusterSettings ( system . settings . config , system . name )
2012-02-07 16:53:49 +01:00
2012-03-09 12:56:56 +01:00
val remoteAddress = remote . transport . address
val failureDetector = new AccrualFailureDetector (
system , remoteAddress , clusterSettings . FailureDetectorThreshold , clusterSettings . FailureDetectorMaxSampleSize )
2012-02-29 10:02:00 +01:00
private val vclockNode = VectorClock . Node ( remoteAddress . toString )
2012-02-07 16:53:49 +01:00
2012-02-29 10:02:00 +01:00
private val gossipInitialDelay = clusterSettings . GossipInitialDelay
private val gossipFrequency = clusterSettings . GossipFrequency
2012-02-08 16:15:31 +01:00
2012-02-29 10:02:00 +01:00
implicit private val defaultTimeout = Timeout ( remoteSettings . RemoteSystemDaemonAckTimeout )
2012-01-24 12:09:32 +01:00
2012-03-09 12:56:56 +01:00
private val autoDown = clusterSettings . AutoDown
2011-01-01 01:50:33 +01:00
private val nrOfDeputyNodes = clusterSettings . NrOfDeputyNodes
2012-02-20 15:26:12 +01:00
private val nrOfGossipDaemons = clusterSettings . NrOfGossipDaemons
2012-02-09 15:59:10 +01:00
private val nodeToJoin : Option [ Address ] = clusterSettings . NodeToJoin filter ( _ != remoteAddress )
2011-10-26 08:48:16 +02:00
2012-01-31 15:00:46 +01:00
private val serialization = remote . serialization
2012-01-30 11:41:41 +01:00
private val isRunning = new AtomicBoolean ( true )
2012-02-20 15:45:50 +01:00
private val log = Logging ( system , "Node" )
2011-10-27 15:14:15 +02:00
private val random = SecureRandom . getInstance ( "SHA1PRNG" )
2012-01-30 11:41:41 +01:00
2012-03-09 12:56:56 +01:00
log . info ( "Node [{}] - Starting cluster Node..." , remoteAddress )
2012-03-02 16:20:30 +01:00
// create superisor for daemons under path "/system/cluster"
private val clusterDaemons = {
2012-03-03 23:55:48 +01:00
val createChild = CreateChild ( Props [ ClusterDaemonSupervisor ] , "cluster" )
2012-03-02 16:20:30 +01:00
Await . result ( system . systemGuardian ? createChild , defaultTimeout . duration ) match {
case a : ActorRef ⇒ a
case e : Exception ⇒ throw e
}
}
2012-02-20 15:26:12 +01:00
2012-02-08 14:14:01 +01:00
private val state = {
val member = Member ( remoteAddress , MemberStatus . Joining )
2012-02-20 15:45:50 +01:00
val gossip = Gossip ( members = SortedSet . empty [ Member ] + member ) + vclockNode // add me as member and update my vector clock
2012-03-09 12:56:56 +01:00
new AtomicReference [ State ] ( State ( gossip ) )
2012-02-08 14:14:01 +01:00
}
2011-10-26 08:48:16 +02:00
2012-02-08 14:14:01 +01:00
// try to join the node defined in the 'akka.cluster.node-to-join' option
2012-02-28 17:04:48 +01:00
autoJoin ( )
2012-01-24 12:09:32 +01:00
2012-03-09 12:56:56 +01:00
// ========================================================
// ===================== WORK DAEMONS =====================
// ========================================================
2012-02-14 20:50:12 +01:00
// start periodic gossip to random nodes in cluster
2011-01-01 01:50:33 +01:00
private val gossipCanceller = system . scheduler . schedule ( gossipInitialDelay , gossipFrequency ) {
2012-02-08 14:14:01 +01:00
gossip ( )
}
2012-02-14 20:50:12 +01:00
2012-03-09 12:56:56 +01:00
// start periodic cluster failure detector reaping (moving nodes condemned by the failure detector to unreachable list)
private val failureDetectorReaperCanceller = system . scheduler . schedule ( gossipInitialDelay , gossipFrequency ) { // TODO: should we use the same gossipFrequency for reaping?
reapUnreachableMembers ( )
2012-02-08 14:14:01 +01:00
}
2012-01-24 12:09:32 +01:00
2012-03-09 12:56:56 +01:00
// start periodic leader action management (only applies for the current leader)
private val leaderActionsCanceller = system . scheduler . schedule ( gossipInitialDelay , gossipFrequency ) { // TODO: should we use the same gossipFrequency for leaderActions?
leaderActions ( )
}
log . info ( "Node [{}] - Cluster Node started successfully" , remoteAddress )
2012-02-14 20:50:12 +01:00
// ======================================================
// ===================== PUBLIC API =====================
// ======================================================
2012-03-09 12:56:56 +01:00
def self : Member = latestGossip . members
. find ( _ . address == remoteAddress )
. getOrElse ( throw new IllegalStateException ( "Can't find 'this' Member in the cluster membership ring" ) )
2012-02-14 20:50:12 +01:00
/* *
* Latest gossip .
*/
def latestGossip : Gossip = state . get . latestGossip
/* *
* Member status for this node .
*/
2012-03-09 12:56:56 +01:00
def status : MemberStatus = self . status
2012-02-14 20:50:12 +01:00
2012-02-20 17:22:07 +01:00
/* *
* Is this node the leader ?
*/
def isLeader : Boolean = {
2012-03-09 12:56:56 +01:00
val members = latestGossip . members
! members . isEmpty && ( remoteAddress == members . head . address )
2012-02-20 17:22:07 +01:00
}
2012-02-14 20:50:12 +01:00
/* *
* Is this node a singleton cluster ?
*/
def isSingletonCluster : Boolean = isSingletonCluster ( state . get )
2011-01-01 01:50:33 +01:00
/* *
* Checks if we have a cluster convergence .
*
* @returns Some ( convergedGossip ) if convergence have been reached and None if not
*/
def convergence : Option [ Gossip ] = convergence ( latestGossip )
2012-03-09 12:56:56 +01:00
/* *
* Returns true if the node is UP or JOINING .
*/
def isAvailable : Boolean = ! isUnavailable ( state . get )
2012-01-24 12:09:32 +01:00
/* *
2012-01-30 11:41:41 +01:00
* Shuts down all connections to other members , the cluster daemon and the periodic gossip and cleanup tasks .
2012-01-24 12:09:32 +01:00
*/
def shutdown ( ) {
2012-02-14 20:50:12 +01:00
// FIXME Cheating for now. Can't just shut down. Node must first gossip an Leave command, wait for Leader to do proper Handoff and then await an Exit command before switching to Removed
2012-01-30 11:41:41 +01:00
if ( isRunning . compareAndSet ( true , false ) ) {
2012-03-02 16:20:30 +01:00
log . info ( "Node [{}] - Shutting down Node and cluster daemons..." , remoteAddress )
2012-03-02 09:55:54 +01:00
gossipCanceller . cancel ( )
2012-03-09 12:56:56 +01:00
failureDetectorReaperCanceller . cancel ( )
leaderActionsCanceller . cancel ( )
2012-03-02 16:20:30 +01:00
system . stop ( clusterDaemons )
2012-01-30 11:41:41 +01:00
}
2011-10-26 08:48:16 +02:00
}
2011-01-01 01:50:33 +01:00
/* *
* Registers a listener to subscribe to cluster membership changes .
*/
@tailrec
final def registerListener ( listener : MembershipChangeListener ) {
val localState = state . get
val newListeners = localState . memberMembershipChangeListeners + listener
val newState = localState copy ( memberMembershipChangeListeners = newListeners )
if ( ! state . compareAndSet ( localState , newState ) ) registerListener ( listener ) // recur
}
/* *
* Unsubscribes to cluster membership changes .
*/
@tailrec
final def unregisterListener ( listener : MembershipChangeListener ) {
val localState = state . get
val newListeners = localState . memberMembershipChangeListeners - listener
val newState = localState copy ( memberMembershipChangeListeners = newListeners )
if ( ! state . compareAndSet ( localState , newState ) ) unregisterListener ( listener ) // recur
}
2012-02-29 10:02:00 +01:00
/* *
* Send command to JOIN one node to another .
*/
2012-03-09 12:56:56 +01:00
def scheduleNodeJoin ( address : Address ) {
2012-02-29 10:02:00 +01:00
clusterCommandDaemon ! ClusterAction . Join ( address )
}
/* *
* Send command to issue state transition to LEAVING .
*/
2012-03-09 12:56:56 +01:00
def scheduleNodeLeave ( address : Address ) {
2012-03-03 23:55:48 +01:00
clusterCommandDaemon ! ClusterAction . Leave ( address )
2012-02-29 10:02:00 +01:00
}
/* *
* Send command to issue state transition to EXITING .
*/
2012-03-09 12:56:56 +01:00
def scheduleNodeDown ( address : Address ) {
2012-03-03 23:55:48 +01:00
clusterCommandDaemon ! ClusterAction . Down ( address )
2012-02-29 10:02:00 +01:00
}
/* *
* Send command to issue state transition to REMOVED .
*/
2012-03-09 12:56:56 +01:00
def scheduleNodeRemove ( address : Address ) {
2012-03-03 23:55:48 +01:00
clusterCommandDaemon ! ClusterAction . Remove ( address )
2012-02-29 10:02:00 +01:00
}
2011-01-01 01:50:33 +01:00
// ========================================================
// ===================== INTERNAL API =====================
// ========================================================
2012-02-08 14:14:01 +01:00
/* *
2012-02-28 17:04:48 +01:00
* State transition to JOINING .
2012-02-08 14:14:01 +01:00
* New node joining .
*/
@tailrec
2011-01-01 01:50:33 +01:00
private [ cluster ] final def joining ( node : Address ) {
2012-02-14 20:50:12 +01:00
log . info ( "Node [{}] - Node [{}] is joining" , remoteAddress , node )
2012-02-08 16:15:31 +01:00
2012-02-14 20:50:12 +01:00
val localState = state . get
val localGossip = localState . latestGossip
val localMembers = localGossip . members
2012-03-09 12:56:56 +01:00
val localOverview = localGossip . overview
val localUnreachableMembers = localOverview . unreachable
// remove the node from the 'unreachable' set in case it is a DOWN node that is rejoining cluster
val newUnreachableMembers = localUnreachableMembers filterNot { _ . address == node }
val newOverview = localOverview copy ( unreachable = newUnreachableMembers )
2012-02-08 16:15:31 +01:00
2012-02-14 20:50:12 +01:00
val newMembers = localMembers + Member ( node , MemberStatus . Joining ) // add joining node as Joining
2012-03-09 12:56:56 +01:00
val newGossip = localGossip copy ( overview = newOverview , members = newMembers )
2012-02-14 20:50:12 +01:00
2012-02-20 15:45:50 +01:00
val versionedGossip = newGossip + vclockNode
2012-02-14 20:50:12 +01:00
val seenVersionedGossip = versionedGossip seen remoteAddress
val newState = localState copy ( latestGossip = seenVersionedGossip )
if ( ! state . compareAndSet ( localState , newState ) ) joining ( node ) // recur if we failed update
2012-02-18 22:14:53 +01:00
else {
2012-03-02 09:55:54 +01:00
failureDetector heartbeat node // update heartbeat in failure detector
2012-02-18 22:14:53 +01:00
if ( convergence ( newState . latestGossip ) . isDefined ) {
2012-03-02 09:55:54 +01:00
newState . memberMembershipChangeListeners foreach { _ notify newMembers }
2012-02-18 22:14:53 +01:00
}
}
2012-02-08 14:14:01 +01:00
}
2012-01-24 12:09:32 +01:00
2012-02-28 17:04:48 +01:00
/* *
* State transition to UP .
*/
2012-03-09 12:56:56 +01:00
private [ cluster ] final def up ( address : Address ) {
// FIXME implement me
}
2012-02-28 17:04:48 +01:00
/* *
* State transition to LEAVING .
*/
2012-03-09 12:56:56 +01:00
private [ cluster ] final def leaving ( address : Address ) {
// FIXME implement me
}
2012-02-28 17:04:48 +01:00
/* *
* State transition to EXITING .
*/
2012-03-09 12:56:56 +01:00
private [ cluster ] final def exiting ( address : Address ) {
// FIXME implement me
}
2012-02-28 17:04:48 +01:00
/* *
* State transition to REMOVED .
*/
2012-03-09 12:56:56 +01:00
private [ cluster ] final def removing ( address : Address ) {
// FIXME implement me
}
2012-02-28 17:04:48 +01:00
/* *
2012-03-09 12:56:56 +01:00
* The node to DOWN is removed from the 'members' set and put in the 'unreachable' set ( if not alread there )
* and its status is set to DOWN . The node is alo removed from the 'seen' table .
*
* The node will reside as DOWN in the 'unreachable' set until an explicit command JOIN command is sent directly
* to this node and it will then go through the normal JOINING procedure .
2012-02-28 17:04:48 +01:00
*/
2012-03-09 12:56:56 +01:00
@tailrec
final private [ cluster ] def downing ( address : Address ) {
val localState = state . get
val localGossip = localState . latestGossip
val localMembers = localGossip . members
val localOverview = localGossip . overview
val localSeen = localOverview . seen
val localUnreachableMembers = localOverview . unreachable
// 1. check if the node to DOWN is in the 'members' set
var downedMember : Option [ Member ] = None
val newMembers =
localMembers
. map { member ⇒
if ( member . address == address ) {
log . info ( "Node [{}] - Marking node [{}] as DOWN" , remoteAddress , member . address )
val newMember = member copy ( status = MemberStatus . Down )
downedMember = Some ( newMember )
newMember
} else member
}
. filter ( _ . status != MemberStatus . Down )
// 2. check if the node to DOWN is in the 'unreachable' set
val newUnreachableMembers =
localUnreachableMembers
. filter ( _ . status != MemberStatus . Down ) // no need to DOWN members already DOWN
. map { member ⇒
if ( member . address == address ) {
log . info ( "Node [{}] - Marking unreachable node [{}] as DOWN" , remoteAddress , member . address )
member copy ( status = MemberStatus . Down )
} else member
}
// 3. add the newly DOWNED members from the 'members' (in step 1.) to the 'newUnreachableMembers' set.
val newUnreachablePlusNewlyDownedMembers = downedMember match {
case Some ( member ) ⇒ newUnreachableMembers + member
case None ⇒ newUnreachableMembers
}
// 4. remove nodes marked as DOWN from the 'seen' table
val newSeen = newUnreachablePlusNewlyDownedMembers . foldLeft ( localSeen ) { ( currentSeen , member ) ⇒
currentSeen - member . address
}
val newOverview = localOverview copy ( seen = newSeen , unreachable = newUnreachablePlusNewlyDownedMembers ) // update gossip overview
val newGossip = localGossip copy ( overview = newOverview , members = newMembers ) // update gossip
val versionedGossip = newGossip + vclockNode
val newState = localState copy ( latestGossip = versionedGossip seen remoteAddress )
if ( ! state . compareAndSet ( localState , newState ) ) downing ( address ) // recur if we fail the update
else {
if ( convergence ( newState . latestGossip ) . isDefined ) {
newState . memberMembershipChangeListeners foreach { _ notify newState . latestGossip . members }
}
}
}
2012-02-28 17:04:48 +01:00
2011-10-26 08:48:16 +02:00
/* *
2012-02-08 14:14:01 +01:00
* Receive new gossip .
2011-10-26 08:48:16 +02:00
*/
2012-02-09 15:59:10 +01:00
@tailrec
2012-03-09 12:56:56 +01:00
final private [ cluster ] def receive ( sender : Member , remoteGossip : Gossip ) {
2012-02-14 20:50:12 +01:00
val localState = state . get
val localGossip = localState . latestGossip
2012-02-09 15:59:10 +01:00
2012-02-14 20:50:12 +01:00
val winningGossip =
if ( remoteGossip . version <> localGossip . version ) {
// concurrent
2012-03-09 12:56:56 +01:00
val mergedGossip = remoteGossip merge localGossip
2012-02-20 15:45:50 +01:00
val versionedMergedGossip = mergedGossip + vclockNode
2012-02-09 15:59:10 +01:00
2012-02-18 22:14:53 +01:00
log . debug (
"Can't establish a causal relationship between \"remote\" gossip [{}] and \"local\" gossip [{}] - merging them into [{}]" ,
2012-02-14 20:50:12 +01:00
remoteGossip , localGossip , versionedMergedGossip )
versionedMergedGossip
} else if ( remoteGossip . version < localGossip . version ) {
// local gossip is newer
localGossip
} else {
// remote gossip is newer
remoteGossip
}
val newState = localState copy ( latestGossip = winningGossip seen remoteAddress )
2012-02-09 15:59:10 +01:00
// if we won the race then update else try again
2012-02-14 20:50:12 +01:00
if ( ! state . compareAndSet ( localState , newState ) ) receive ( sender , remoteGossip ) // recur if we fail the update
2012-02-18 22:14:53 +01:00
else {
2012-03-02 09:55:54 +01:00
log . debug ( "Node [{}] - Receiving gossip from [{}]" , remoteAddress , sender . address )
failureDetector heartbeat sender . address // update heartbeat in failure detector
2012-02-18 22:14:53 +01:00
if ( convergence ( newState . latestGossip ) . isDefined ) {
2012-03-02 09:55:54 +01:00
newState . memberMembershipChangeListeners foreach { _ notify newState . latestGossip . members }
2012-02-18 22:14:53 +01:00
}
}
2011-10-26 08:48:16 +02:00
}
2012-01-24 12:09:32 +01:00
/* *
2012-02-07 16:53:49 +01:00
* Joins the pre - configured contact point and retrieves current gossip state .
2012-01-24 12:09:32 +01:00
*/
2012-02-28 17:04:48 +01:00
private def autoJoin ( ) = nodeToJoin foreach { address ⇒
2012-02-20 15:26:12 +01:00
val connection = clusterCommandConnectionFor ( address )
2012-02-29 10:02:00 +01:00
val command = ClusterAction . Join ( remoteAddress )
2012-02-20 15:26:12 +01:00
log . info ( "Node [{}] - Sending [{}] to [{}] through connection [{}]" , remoteAddress , command , address , connection )
connection ! command
2012-01-24 12:09:32 +01:00
}
2012-02-09 15:59:10 +01:00
/* *
2012-02-28 17:04:48 +01:00
* Switches the member status .
*
* @param newStatus the new member status
* @param oldState the state to change the member status in
* @return the updated new state with the new member status
2012-02-09 15:59:10 +01:00
*/
2012-02-28 17:04:48 +01:00
private def switchMemberStatusTo ( newStatus : MemberStatus , state : State ) : State = {
2012-02-08 15:11:06 +01:00
log . info ( "Node [{}] - Switching membership status to [{}]" , remoteAddress , newStatus )
2012-03-09 12:56:56 +01:00
val localSelf = self
2012-02-08 15:11:06 +01:00
2012-02-28 17:04:48 +01:00
val localGossip = state . latestGossip
2012-02-14 20:50:12 +01:00
val localMembers = localGossip . members
2012-02-08 15:11:06 +01:00
2012-03-09 12:56:56 +01:00
// change my state into a "new" self
2012-02-14 20:50:12 +01:00
val newSelf = localSelf copy ( status = newStatus )
2012-03-09 12:56:56 +01:00
// change my state in 'gossip.members'
2012-02-14 20:50:12 +01:00
val newMembersSet = localMembers map { member ⇒
2012-02-08 15:11:06 +01:00
if ( member . address == remoteAddress ) newSelf
else member
}
2012-02-14 20:50:12 +01:00
2012-02-08 15:11:06 +01:00
// ugly crap to work around bug in scala colletions ('val ss: SortedSet[Member] = SortedSet.empty[Member] ++ aSet' does not compile)
2012-02-08 16:15:31 +01:00
val newMembersSortedSet = SortedSet [ Member ] ( newMembersSet . toList : _ * )
2012-02-14 20:50:12 +01:00
val newGossip = localGossip copy ( members = newMembersSortedSet )
2012-03-09 12:56:56 +01:00
// version my changes
2012-02-20 15:45:50 +01:00
val versionedGossip = newGossip + vclockNode
2012-02-14 20:50:12 +01:00
val seenVersionedGossip = versionedGossip seen remoteAddress
2012-02-08 15:11:06 +01:00
2012-03-09 12:56:56 +01:00
state copy ( latestGossip = seenVersionedGossip )
2012-02-08 15:11:06 +01:00
}
2011-10-26 08:48:16 +02:00
/* *
2012-02-09 15:59:10 +01:00
* Gossips latest gossip to an address .
2012-02-08 14:14:01 +01:00
*/
2012-02-09 15:59:10 +01:00
private def gossipTo ( address : Address ) {
2012-02-20 15:26:12 +01:00
val connection = clusterGossipConnectionFor ( address )
log . debug ( "Node [{}] - Gossiping to [{}]" , remoteAddress , connection )
connection ! GossipEnvelope ( self , latestGossip )
2012-02-08 14:14:01 +01:00
}
/* *
* Gossips latest gossip to a random member in the set of members passed in as argument .
2012-01-24 12:09:32 +01:00
*
2012-02-07 16:53:49 +01:00
* @return 'true' if it gossiped to a "deputy" member .
2011-10-26 08:48:16 +02:00
*/
2012-03-02 09:55:54 +01:00
private def gossipToRandomNodeOf ( addresses : Iterable [ Address ] ) : Boolean = {
2012-03-09 12:56:56 +01:00
if ( addresses . isEmpty ) false
else {
val peers = addresses filter ( _ != remoteAddress ) // filter out myself
val peer = selectRandomNode ( peers )
gossipTo ( peer )
deputyNodes exists ( peer == _ )
}
}
/* *
* Initates a new round of gossip .
*/
private def gossip ( ) {
val localState = state . get
val localGossip = localState . latestGossip
val localMembers = localGossip . members
if ( ! isSingletonCluster ( localState ) && isAvailable ( localState ) ) {
// only gossip if we are a non-singleton cluster and available
log . debug ( "Node [{}] - Initiating new round of gossip" , remoteAddress )
val localGossip = localState . latestGossip
val localMembers = localGossip . members
val localMembersSize = localMembers . size
val localUnreachableMembers = localGossip . overview . unreachable
val localUnreachableSize = localUnreachableMembers . size
// 1. gossip to alive members
val gossipedToDeputy = gossipToRandomNodeOf ( localMembers map { _ . address } )
// 2. gossip to unreachable members
if ( localUnreachableSize > 0 ) {
val probability : Double = localUnreachableSize / ( localMembersSize + 1 )
if ( random . nextDouble ( ) < probability ) gossipToRandomNodeOf ( localUnreachableMembers . map ( _ . address ) )
}
// 3. gossip to a deputy nodes for facilitating partition healing
val deputies = deputyNodes
if ( ( ! gossipedToDeputy || localMembersSize < 1 ) && ! deputies . isEmpty ) {
if ( localMembersSize == 0 ) gossipToRandomNodeOf ( deputies )
else {
val probability = 1.0 / localMembersSize + localUnreachableSize
if ( random . nextDouble ( ) <= probability ) gossipToRandomNodeOf ( deputies )
}
}
}
2011-10-26 08:48:16 +02:00
}
2012-02-07 16:53:49 +01:00
/* *
2012-03-09 12:56:56 +01:00
* Reaps the unreachable members ( moves them to the 'unreachable' list in the cluster overview ) according to the failure detector 's verdict .
2011-10-26 08:48:16 +02:00
*/
@tailrec
2012-03-09 12:56:56 +01:00
final private def reapUnreachableMembers ( ) {
2012-02-14 20:50:12 +01:00
val localState = state . get
2012-03-09 12:56:56 +01:00
if ( ! isSingletonCluster ( localState ) && isAvailable ( localState ) ) {
// only scrutinize if we are a non-singleton cluster and available
2012-02-14 20:50:12 +01:00
val localGossip = localState . latestGossip
val localOverview = localGossip . overview
2012-02-20 17:22:07 +01:00
val localSeen = localOverview . seen
2012-02-14 20:50:12 +01:00
val localMembers = localGossip . members
2012-03-09 12:56:56 +01:00
val localUnreachableMembers = localGossip . overview . unreachable
2012-02-14 20:50:12 +01:00
val newlyDetectedUnreachableMembers = localMembers filterNot { member ⇒ failureDetector . isAvailable ( member . address ) }
2012-02-09 15:59:10 +01:00
2012-03-09 12:56:56 +01:00
if ( ! newlyDetectedUnreachableMembers . isEmpty ) { // we have newly detected members marked as unavailable
2012-02-08 16:15:31 +01:00
2012-02-14 20:50:12 +01:00
val newMembers = localMembers diff newlyDetectedUnreachableMembers
2012-03-09 12:56:56 +01:00
val newUnreachableMembers : Set [ Member ] = localUnreachableMembers ++ newlyDetectedUnreachableMembers
2012-02-20 17:22:07 +01:00
2012-03-09 12:56:56 +01:00
val newOverview = localOverview copy ( unreachable = newUnreachableMembers )
2012-02-14 20:50:12 +01:00
val newGossip = localGossip copy ( overview = newOverview , members = newMembers )
2012-03-09 12:56:56 +01:00
// updating vclock and 'seen' table
2012-02-20 15:45:50 +01:00
val versionedGossip = newGossip + vclockNode
2012-02-14 20:50:12 +01:00
val seenVersionedGossip = versionedGossip seen remoteAddress
val newState = localState copy ( latestGossip = seenVersionedGossip )
2012-02-08 16:15:31 +01:00
// if we won the race then update else try again
2012-03-09 12:56:56 +01:00
if ( ! state . compareAndSet ( localState , newState ) ) reapUnreachableMembers ( ) // recur
2012-02-08 16:15:31 +01:00
else {
2012-03-09 12:56:56 +01:00
log . info ( "Node [{}] - Marking node(s) as UNREACHABLE [{}]" , remoteAddress , newlyDetectedUnreachableMembers . mkString ( ", " ) )
2012-03-02 09:55:54 +01:00
2012-02-18 22:14:53 +01:00
if ( convergence ( newState . latestGossip ) . isDefined ) {
2012-03-02 09:55:54 +01:00
newState . memberMembershipChangeListeners foreach { _ notify newMembers }
2012-02-18 22:14:53 +01:00
}
2012-02-08 16:15:31 +01:00
}
2011-10-26 08:48:16 +02:00
}
}
}
2012-02-18 22:14:53 +01:00
/* *
2012-03-09 12:56:56 +01:00
* Runs periodic leader actions , such as auto - downing unreachable nodes , assigning partitions etc .
*/
@tailrec
final private def leaderActions ( ) {
val localState = state . get
val localGossip = localState . latestGossip
val localMembers = localGossip . members
val isLeader = ! localMembers . isEmpty && ( remoteAddress == localMembers . head . address )
if ( isLeader && isAvailable ( localState ) ) {
// only run the leader actions if we are the LEADER and available
val localOverview = localGossip . overview
val localSeen = localOverview . seen
val localUnreachableMembers = localGossip . overview . unreachable
// Leader actions are as follows:
// 1. Move JOINING => UP
// 2. Move EXITING => REMOVED
// 3. Move UNREACHABLE => DOWN (auto-downing by leader)
// 4. Updating the vclock version for the changes
// 5. Updating the 'seen' table
var hasChangedState = false
val newGossip =
if ( convergence ( localGossip ) . isDefined ) {
// we have convergence - so we can't have unreachable nodes
val newMembers =
localMembers map { member ⇒
// 1. Move JOINING => UP
if ( member . status == MemberStatus . Joining ) {
log . info ( "Node [{}] - Leader is moving node [{}] from JOINING to UP" , remoteAddress , member . address )
hasChangedState = true
member copy ( status = MemberStatus . Up )
} else member
} map { member ⇒
// 2. Move EXITING => REMOVED
if ( member . status == MemberStatus . Exiting ) {
log . info ( "Node [{}] - Leader is moving node [{}] from EXITING to REMOVED" , remoteAddress , member . address )
hasChangedState = true
member copy ( status = MemberStatus . Removed )
} else member
}
localGossip copy ( members = newMembers ) // update gossip
} else if ( autoDown ) {
// we don't have convergence - so we might have unreachable nodes
// if 'auto-down' is turned on, then try to auto-down any unreachable nodes
// FIXME Should we let the leader auto-down every run (as it is now) or just every X seconds? So we can wait for user to invoke explicit DOWN.
// 3. Move UNREACHABLE => DOWN (auto-downing by leader)
val newUnreachableMembers =
localUnreachableMembers
. filter ( _ . status != MemberStatus . Down ) // no need to DOWN members already DOWN
. map { member ⇒
log . info ( "Node [{}] - Leader is marking unreachable node [{}] as DOWN" , remoteAddress , member . address )
hasChangedState = true
member copy ( status = MemberStatus . Down )
}
// removing nodes marked as DOWN from the 'seen' table
// FIXME this needs to be done if user issues DOWN as well
val newSeen = localUnreachableMembers . foldLeft ( localSeen ) ( ( currentSeen , member ) ⇒ currentSeen - member . address )
val newOverview = localOverview copy ( seen = newSeen , unreachable = newUnreachableMembers ) // update gossip overview
localGossip copy ( overview = newOverview ) // update gossip
} else localGossip
if ( hasChangedState ) { // we have a change of state - version it and try to update
// 4. Updating the vclock version for the changes
val versionedGossip = newGossip + vclockNode
// 5. Updating the 'seen' table
val seenVersionedGossip = versionedGossip seen remoteAddress
val newState = localState copy ( latestGossip = seenVersionedGossip )
// if we won the race then update else try again
if ( ! state . compareAndSet ( localState , newState ) ) leaderActions ( ) // recur
else {
if ( convergence ( newState . latestGossip ) . isDefined ) {
newState . memberMembershipChangeListeners map { _ notify newGossip . members }
}
}
}
}
}
/* *
* Checks if we have a cluster convergence . If there are any unreachable nodes then we can 't have a convergence -
* waiting for user to act ( issuing DOWN ) or leader to act ( issuing DOWN through auto - down ) .
2012-02-18 22:14:53 +01:00
*
* @returns Some ( convergedGossip ) if convergence have been reached and None if not
*/
private def convergence ( gossip : Gossip ) : Option [ Gossip ] = {
2012-02-29 10:02:00 +01:00
val overview = gossip . overview
2012-03-09 12:56:56 +01:00
val unreachable = overview . unreachable
// First check that:
// 1. we don't have any members that are unreachable (unreachable.isEmpty == true), or
// 2. all unreachable members in the set have status DOWN
// Else we can't continue to check for convergence
// When that is done we check that all the entries in the 'seen' table have the same vector clock version
if ( unreachable . isEmpty || ! unreachable . exists ( _ . status != MemberStatus . Down ) ) {
val seen = gossip . overview . seen
val views = Set . empty [ VectorClock ] ++ seen . values
if ( views . size == 1 ) {
log . debug ( "Node [{}] - Cluster convergence reached" , remoteAddress )
Some ( gossip )
} else None
2012-02-18 22:14:53 +01:00
} else None
2012-03-09 12:56:56 +01:00
}
private def isAvailable ( state : State ) : Boolean = ! isUnavailable ( state )
private def isUnavailable ( state : State ) : Boolean = {
val localGossip = state . latestGossip
val localOverview = localGossip . overview
val localMembers = localGossip . members
val localUnreachableMembers = localOverview . unreachable
val isUnreachable = localUnreachableMembers exists { _ . address == remoteAddress }
val hasUnavailableMemberStatus = localMembers exists { m ⇒ ( m == self ) && MemberStatus . isUnavailable ( m . status ) }
isUnreachable || hasUnavailableMemberStatus
2012-02-18 22:14:53 +01:00
}
2012-02-07 16:53:49 +01:00
/* *
2012-03-09 12:56:56 +01:00
* Looks up and returns the local cluster command connection .
2012-02-07 16:53:49 +01:00
*/
2012-03-09 12:56:56 +01:00
private def clusterCommandDaemon = system . actorFor ( RootActorPath ( remoteAddress ) / "system" / "cluster" / "commands" )
2012-02-07 16:53:49 +01:00
2012-03-03 23:55:48 +01:00
/* *
2012-03-09 12:56:56 +01:00
* Looks up and returns the remote cluster command connection for the specific address .
2012-03-03 23:55:48 +01:00
*/
2012-03-09 12:56:56 +01:00
private def clusterCommandConnectionFor ( address : Address ) : ActorRef = system . actorFor ( RootActorPath ( address ) / "system" / "cluster" / "commands" )
2012-03-03 23:55:48 +01:00
2012-02-07 16:53:49 +01:00
/* *
2012-03-09 12:56:56 +01:00
* Looks up and returns the remote cluster gossip connection for the specific address .
2012-02-07 16:53:49 +01:00
*/
2012-03-02 16:20:30 +01:00
private def clusterGossipConnectionFor ( address : Address ) : ActorRef = system . actorFor ( RootActorPath ( address ) / "system" / "cluster" / "gossip" )
2012-01-24 12:09:32 +01:00
2012-03-09 12:56:56 +01:00
/* *
* Gets an Iterable with the addresses of a all the 'deputy' nodes - excluding this node if part of the group .
*/
2012-03-02 09:55:54 +01:00
private def deputyNodes : Iterable [ Address ] = state . get . latestGossip . members . toIterable map ( _ . address ) drop 1 take nrOfDeputyNodes filter ( _ != remoteAddress )
2012-01-24 12:09:32 +01:00
2012-03-02 09:55:54 +01:00
private def selectRandomNode ( addresses : Iterable [ Address ] ) : Address = addresses . toSeq ( random nextInt addresses . size )
2012-02-14 20:50:12 +01:00
private def isSingletonCluster ( currentState : State ) : Boolean = currentState . latestGossip . members . size == 1
2011-10-26 08:48:16 +02:00
}