2011-10-26 08:48:16 +02:00
/* *
2012-01-19 18:21:06 +01:00
* Copyright ( C ) 2009 - 2012 Typesafe Inc . < http : //www.typesafe.com>
2011-10-26 08:48:16 +02:00
*/
2012-01-31 13:33:04 +01:00
package akka.cluster
2011-10-26 08:48:16 +02:00
import akka.actor._
import akka.actor.Status._
2012-01-31 13:33:04 +01:00
import akka.remote._
2012-02-20 15:26:12 +01:00
import akka.routing._
2011-10-27 12:46:10 +02:00
import akka.event.Logging
2012-01-24 12:09:32 +01:00
import akka.dispatch.Await
2012-01-31 13:33:04 +01:00
import akka.pattern.ask
import akka.util._
2012-03-02 16:20:30 +01:00
import akka.util.duration._
2011-11-25 12:02:25 +01:00
import akka.config.ConfigurationException
2012-01-30 11:41:41 +01:00
import java.util.concurrent.atomic. { AtomicReference , AtomicBoolean }
2012-01-24 12:09:32 +01:00
import java.util.concurrent.TimeUnit._
import java.util.concurrent.TimeoutException
2011-10-27 15:14:15 +02:00
import java.security.SecureRandom
2011-11-25 12:02:25 +01:00
2012-01-30 11:41:41 +01:00
import scala.collection.immutable. { Map , SortedSet }
2011-10-26 08:48:16 +02:00
import scala.annotation.tailrec
2011-11-25 12:02:25 +01:00
2012-01-24 12:09:32 +01:00
import com.google.protobuf.ByteString
2011-10-26 08:48:16 +02:00
/* *
2012-02-09 15:59:10 +01:00
* Interface for membership change listener .
2011-10-26 08:48:16 +02:00
*/
2012-02-18 22:14:53 +01:00
trait MembershipChangeListener {
2012-02-09 15:59:10 +01:00
def notify ( members : SortedSet [ Member ] ) : Unit
}
/* *
* Interface for meta data change listener .
*/
2012-03-12 19:22:02 +01:00
trait MetaDataChangeListener {
2012-02-09 15:59:10 +01:00
def notify ( meta : Map [ String , Array [ Byte ] ] ) : Unit
2011-10-26 08:48:16 +02:00
}
/* *
2012-01-30 11:41:41 +01:00
* Base trait for all cluster messages . All ClusterMessage 's are serializable .
2011-10-26 08:48:16 +02:00
*/
2012-01-30 19:40:28 +01:00
sealed trait ClusterMessage extends Serializable
2012-01-30 11:41:41 +01:00
/* *
2012-02-28 17:04:48 +01:00
* Cluster commands sent by the USER .
2012-01-30 11:41:41 +01:00
*/
2012-02-29 10:02:00 +01:00
object ClusterAction {
2011-10-26 08:48:16 +02:00
2012-02-28 17:04:48 +01:00
/* *
* Command to join the cluster . Sent when a node ( reprsesented by 'address' )
* wants to join another node ( the receiver ) .
*/
case class Join ( address : Address ) extends ClusterMessage
2012-02-09 13:36:39 +01:00
2012-02-29 10:02:00 +01:00
/* *
* Command to set a node to Up ( from Joining ) .
*/
2012-03-03 23:55:48 +01:00
case class Up ( address : Address ) extends ClusterMessage
2012-02-29 10:02:00 +01:00
2012-02-28 17:04:48 +01:00
/* *
* Command to leave the cluster .
*/
2012-03-03 23:55:48 +01:00
case class Leave ( address : Address ) extends ClusterMessage
2012-02-28 17:04:48 +01:00
/* *
* Command to mark node as temporary down .
*/
2012-03-03 23:55:48 +01:00
case class Down ( address : Address ) extends ClusterMessage
2012-02-28 17:04:48 +01:00
/* *
* Command to mark a node to be removed from the cluster immediately .
*/
2012-03-03 23:55:48 +01:00
case class Exit ( address : Address ) extends ClusterMessage
2012-02-28 17:04:48 +01:00
/* *
* Command to remove a node from the cluster immediately .
*/
2012-03-03 23:55:48 +01:00
case class Remove ( address : Address ) extends ClusterMessage
2012-02-28 17:04:48 +01:00
}
2011-10-26 08:48:16 +02:00
2012-01-30 11:41:41 +01:00
/* *
* Represents the address and the current status of a cluster member node .
*/
2012-03-09 12:56:56 +01:00
class Member ( val address : Address , val status : MemberStatus ) extends ClusterMessage {
override def hashCode = address . # #
override def equals ( other : Any ) = Member . unapply ( this ) == Member . unapply ( other )
override def toString = "Member(address = %s, status = %s)" format ( address , status )
def copy ( address : Address = this . address , status : MemberStatus = this . status ) : Member = new Member ( address , status )
}
/* *
* Factory and Utility module for Member instances .
*/
object Member {
import MemberStatus._
implicit val ordering = Ordering . fromLessThan [ Member ] ( _ . address . toString < _ . address . toString )
def apply ( address : Address , status : MemberStatus ) : Member = new Member ( address , status )
def unapply ( other : Any ) = other match {
case m : Member ⇒ Some ( m . address )
case _ ⇒ None
}
/* *
* Picks the Member with the highest "priority" MemberStatus .
*/
def highestPriorityOf ( m1 : Member , m2 : Member ) : Member = ( m1 . status , m2 . status ) match {
case ( Removed , _ ) ⇒ m1
case ( _ , Removed ) ⇒ m2
case ( Down , _ ) ⇒ m1
case ( _ , Down ) ⇒ m2
case ( Exiting , _ ) ⇒ m1
case ( _ , Exiting ) ⇒ m2
case ( Leaving , _ ) ⇒ m1
case ( _ , Leaving ) ⇒ m2
case ( Up , Joining ) ⇒ m1
case ( Joining , Up ) ⇒ m2
case ( Joining , Joining ) ⇒ m1
case ( Up , Up ) ⇒ m1
}
}
2011-10-26 08:48:16 +02:00
2012-02-09 15:59:10 +01:00
/* *
* Envelope adding a sender address to the gossip .
*/
case class GossipEnvelope ( sender : Member , gossip : Gossip ) extends ClusterMessage
2012-01-24 12:09:32 +01:00
/* *
2012-01-30 11:41:41 +01:00
* Defines the current status of a cluster member node
*
* Can be one of : Joining , Up , Leaving , Exiting and Down .
2012-01-24 12:09:32 +01:00
*/
2012-02-07 16:53:49 +01:00
sealed trait MemberStatus extends ClusterMessage
2012-01-30 11:41:41 +01:00
object MemberStatus {
2012-02-07 16:53:49 +01:00
case object Joining extends MemberStatus
case object Up extends MemberStatus
case object Leaving extends MemberStatus
case object Exiting extends MemberStatus
case object Down extends MemberStatus
2012-02-28 17:04:48 +01:00
case object Removed extends MemberStatus
2012-03-09 12:56:56 +01:00
def isUnavailable ( status : MemberStatus ) : Boolean = {
status == MemberStatus . Down ||
status == MemberStatus . Exiting ||
status == MemberStatus . Removed ||
status == MemberStatus . Leaving
}
2012-01-24 12:09:32 +01:00
}
2012-02-09 13:36:39 +01:00
/* *
2012-02-14 20:50:12 +01:00
* Represents the overview of the cluster , holds the cluster convergence table and set with unreachable nodes .
2012-02-09 13:36:39 +01:00
*/
case class GossipOverview (
2012-02-09 15:59:10 +01:00
seen : Map [ Address , VectorClock ] = Map . empty [ Address , VectorClock ] ,
2012-03-09 12:56:56 +01:00
unreachable : Set [ Member ] = Set . empty [ Member ] ) {
// FIXME document when nodes are put in 'unreachable' set and removed from 'members'
2012-02-14 20:50:12 +01:00
override def toString =
"GossipOverview(seen = [" + seen . mkString ( ", " ) +
"], unreachable = [" + unreachable . mkString ( ", " ) +
"])"
}
2012-02-09 13:36:39 +01:00
/* *
* Represents the state of the cluster ; cluster ring membership , ring convergence , meta data - all versioned by a vector clock .
*/
case class Gossip (
overview : GossipOverview = GossipOverview ( ) ,
members : SortedSet [ Member ] , // sorted set of members with their status, sorted by name
meta : Map [ String , Array [ Byte ] ] = Map . empty [ String , Array [ Byte ] ] ,
version : VectorClock = VectorClock ( ) ) // vector clock version
extends ClusterMessage // is a serializable cluster message
2012-02-14 20:50:12 +01:00
with Versioned [ Gossip ] {
/* *
* Increments the version for this 'Node' .
*/
def + ( node : VectorClock . Node ) : Gossip = copy ( version = version + node )
2012-02-09 15:59:10 +01:00
2012-02-14 20:50:12 +01:00
def + ( member : Member ) : Gossip = {
2012-02-09 15:59:10 +01:00
if ( members contains member ) this
else this copy ( members = members + member )
}
/* *
* Marks the gossip as seen by this node ( remoteAddress ) by updating the address entry in the 'gossip. overview . seen '
* Map with the VectorClock for the new gossip .
*/
2012-03-02 09:55:54 +01:00
def seen ( address : Address ) : Gossip = {
if ( overview . seen . contains ( address ) && overview . seen ( address ) == version ) this
else this copy ( overview = overview copy ( seen = overview . seen + ( address -> version ) ) )
}
2012-02-09 15:59:10 +01:00
2012-03-09 12:56:56 +01:00
/* *
* Merges two Gossip instances including membership tables , meta - data tables and the VectorClock histories .
*/
def merge ( that : Gossip ) : Gossip = {
import Member.ordering
// 1. merge vector clocks
val mergedVClock = this . version merge that . version
// 2. group all members by Address => Vector[Member]
var membersGroupedByAddress = Map . empty [ Address , Vector [ Member ] ]
( this . members ++ that . members ) foreach { m ⇒
val ms = membersGroupedByAddress . get ( m . address ) . getOrElse ( Vector . empty [ Member ] )
membersGroupedByAddress += ( m . address -> ( ms : + m ) )
}
// 3. merge members by selecting the single Member with highest MemberStatus out of the Member groups
val mergedMembers =
SortedSet . empty [ Member ] ++
membersGroupedByAddress . values . foldLeft ( Vector . empty [ Member ] ) { ( acc , members ) ⇒
acc : + members . reduceLeft ( Member . highestPriorityOf ( _ , _ ) )
}
// 4. merge meta-data
val mergedMeta = this . meta ++ that . meta
// 5. merge gossip overview
val mergedOverview = GossipOverview (
this . overview . seen ++ that . overview . seen ,
this . overview . unreachable ++ that . overview . unreachable )
Gossip ( mergedOverview , mergedMembers , mergedMeta , mergedVClock )
}
2012-02-14 20:50:12 +01:00
override def toString =
"Gossip(" +
"overview = " + overview +
", members = [" + members . mkString ( ", " ) +
"], meta = [" + meta . mkString ( ", " ) +
"], version = " + version +
")"
2012-02-09 15:59:10 +01:00
}
2012-01-24 12:09:32 +01:00
2012-02-20 15:26:12 +01:00
/* *
2012-03-09 12:56:56 +01:00
* Manages routing of the different cluster commands .
2012-03-03 23:55:48 +01:00
* Instantiated as a single instance for each Node - e . g . commands are serialized to Node message after message .
2012-02-20 15:26:12 +01:00
*/
2012-03-09 12:56:56 +01:00
final class ClusterCommandDaemon extends Actor {
import ClusterAction._
2012-03-03 23:55:48 +01:00
val node = Node ( context . system )
2012-03-09 12:56:56 +01:00
val log = Logging ( context . system , this )
2012-02-20 15:26:12 +01:00
2012-03-09 12:56:56 +01:00
def receive = {
case Join ( address ) ⇒ node . joining ( address )
case Up ( address ) ⇒ node . up ( address )
case Down ( address ) ⇒ node . downing ( address )
case Leave ( address ) ⇒ node . leaving ( address )
case Exit ( address ) ⇒ node . exiting ( address )
case Remove ( address ) ⇒ node . removing ( address )
2012-02-20 15:26:12 +01:00
}
2012-03-09 12:56:56 +01:00
override def unhandled ( unknown : Any ) = log . error ( "Illegal command [{}]" , unknown )
2012-02-20 15:26:12 +01:00
}
/* *
2012-03-03 23:55:48 +01:00
* Pooled and routed with N number of configurable instances .
2012-02-20 15:45:50 +01:00
* Concurrent access to Node .
2012-02-20 15:26:12 +01:00
*/
2012-03-03 23:55:48 +01:00
final class ClusterGossipDaemon extends Actor {
val log = Logging ( context . system , this )
val node = Node ( context . system )
2012-01-24 12:09:32 +01:00
def receive = {
2012-02-20 15:45:50 +01:00
case GossipEnvelope ( sender , gossip ) ⇒ node . receive ( sender , gossip )
2012-01-24 12:09:32 +01:00
}
2012-03-02 09:55:54 +01:00
2012-03-03 23:55:48 +01:00
override def unhandled ( unknown : Any ) = log . error ( "[/system/cluster/gossip] can not respond to messages - received [{}]" , unknown )
2012-03-02 16:20:30 +01:00
}
2012-03-09 12:56:56 +01:00
/* *
* Supervisor managing the different cluste daemons .
*/
2012-03-03 23:55:48 +01:00
final class ClusterDaemonSupervisor extends Actor {
val log = Logging ( context . system , this )
val node = Node ( context . system )
2012-03-02 16:20:30 +01:00
2012-03-03 23:55:48 +01:00
private val commands = context . actorOf ( Props [ ClusterCommandDaemon ] , "commands" )
private val gossip = context . actorOf (
Props [ ClusterGossipDaemon ] . withRouter ( RoundRobinRouter ( node . clusterSettings . NrOfGossipDaemons ) ) , "gossip" )
2012-03-02 16:20:30 +01:00
2012-03-03 23:55:48 +01:00
def receive = Actor . emptyBehavior
2012-03-02 16:20:30 +01:00
2012-03-12 19:22:02 +01:00
override def unhandled ( unknown : Any ) : Unit = log . error ( "[/system/cluster] can not respond to messages - received [{}]" , unknown )
2012-01-24 12:09:32 +01:00
}
2012-02-22 18:40:16 +01:00
/* *
* Node Extension Id and factory for creating Node extension .
* Example :
* { { {
2012-03-02 09:55:54 +01:00
* val node = Node ( system )
2012-02-29 10:02:00 +01:00
*
* if ( node . isLeader ) { . . . }
* } } }
2012-02-22 18:40:16 +01:00
*/
2012-03-02 09:55:54 +01:00
object Node extends ExtensionId [ Node ] with ExtensionIdProvider {
2012-02-22 18:40:16 +01:00
override def get ( system : ActorSystem ) : Node = super . get ( system )
2012-03-02 09:55:54 +01:00
override def lookup = Node
2012-02-22 18:40:16 +01:00
2012-03-02 09:55:54 +01:00
override def createExtension ( system : ExtendedActorSystem ) : Node = new Node ( system )
2012-02-22 18:40:16 +01:00
}
2012-02-14 20:50:12 +01:00
2011-10-26 08:48:16 +02:00
/* *
* This module is responsible for Gossiping cluster information . The abstraction maintains the list of live
2012-01-30 11:41:41 +01:00
* and dead members . Periodically i . e . every 1 second this module chooses a random member and initiates a round
2011-10-26 08:48:16 +02:00
* of Gossip with it . Whenever it gets gossip updates it updates the Failure Detector with the liveness
* information .
* < p />
2012-01-30 11:41:41 +01:00
* During each of these runs the member initiates gossip exchange according to following rules ( as defined in the
2011-10-26 08:48:16 +02:00
* Cassandra documentation [ http :// wiki . apache . org / cassandra / ArchitectureGossip ] :
* < pre >
2012-01-30 11:41:41 +01:00
* 1 ) Gossip to random live member ( if any )
* 2 ) Gossip to random unreachable member with certain probability depending on number of unreachable and live members
2012-02-07 16:53:49 +01:00
* 3 ) If the member gossiped to at ( 1 ) was not deputy , or the number of live members is less than number of deputy list ,
* gossip to random deputy with certain probability depending on number of unreachable , deputy and live members .
2011-10-26 08:48:16 +02:00
* </ pre >
2012-02-29 10:02:00 +01:00
*
* Example :
* { { {
2012-03-02 09:55:54 +01:00
* val node = Node ( system )
2012-02-29 10:02:00 +01:00
*
* if ( node . isLeader ) { . . . }
* } } }
2011-10-26 08:48:16 +02:00
*/
2012-03-02 09:55:54 +01:00
class Node ( system : ExtendedActorSystem ) extends Extension {
2012-02-22 18:40:16 +01:00
2011-10-26 08:48:16 +02:00
/* *
2012-02-20 15:45:50 +01:00
* Represents the state for this Node . Implemented using optimistic lockless concurrency ,
2011-10-26 08:48:16 +02:00
* all state is represented by this immutable case class and managed by an AtomicReference .
*/
private case class State (
2012-02-08 14:14:01 +01:00
latestGossip : Gossip ,
2012-02-09 15:59:10 +01:00
memberMembershipChangeListeners : Set [ MembershipChangeListener ] = Set . empty [ MembershipChangeListener ] )
2011-10-26 08:48:16 +02:00
2012-02-29 10:02:00 +01:00
if ( ! system . provider . isInstanceOf [ RemoteActorRefProvider ] )
throw new ConfigurationException ( "ActorSystem[" + system + "] needs to have a 'RemoteActorRefProvider' enabled in the configuration" )
private val remote : RemoteActorRefProvider = system . provider . asInstanceOf [ RemoteActorRefProvider ]
2012-01-30 19:40:28 +01:00
2012-03-02 16:20:30 +01:00
val remoteSettings = new RemoteSettings ( system . settings . config , system . name )
val clusterSettings = new ClusterSettings ( system . settings . config , system . name )
2012-02-07 16:53:49 +01:00
2012-03-09 12:56:56 +01:00
val remoteAddress = remote . transport . address
val failureDetector = new AccrualFailureDetector (
system , remoteAddress , clusterSettings . FailureDetectorThreshold , clusterSettings . FailureDetectorMaxSampleSize )
2012-02-29 10:02:00 +01:00
private val vclockNode = VectorClock . Node ( remoteAddress . toString )
2012-02-07 16:53:49 +01:00
2012-02-29 10:02:00 +01:00
private val gossipInitialDelay = clusterSettings . GossipInitialDelay
private val gossipFrequency = clusterSettings . GossipFrequency
2012-02-08 16:15:31 +01:00
2012-02-29 10:02:00 +01:00
implicit private val defaultTimeout = Timeout ( remoteSettings . RemoteSystemDaemonAckTimeout )
2012-01-24 12:09:32 +01:00
2012-03-09 12:56:56 +01:00
private val autoDown = clusterSettings . AutoDown
2011-01-01 01:50:33 +01:00
private val nrOfDeputyNodes = clusterSettings . NrOfDeputyNodes
2012-02-20 15:26:12 +01:00
private val nrOfGossipDaemons = clusterSettings . NrOfGossipDaemons
2012-02-09 15:59:10 +01:00
private val nodeToJoin : Option [ Address ] = clusterSettings . NodeToJoin filter ( _ != remoteAddress )
2011-10-26 08:48:16 +02:00
2012-01-31 15:00:46 +01:00
private val serialization = remote . serialization
2012-01-30 11:41:41 +01:00
private val isRunning = new AtomicBoolean ( true )
2012-02-20 15:45:50 +01:00
private val log = Logging ( system , "Node" )
2011-10-27 15:14:15 +02:00
private val random = SecureRandom . getInstance ( "SHA1PRNG" )
2012-01-30 11:41:41 +01:00
2012-03-12 19:22:02 +01:00
log . info ( "Node [{}] - is JOINING cluster..." , remoteAddress )
2012-03-09 12:56:56 +01:00
2012-03-02 16:20:30 +01:00
// create superisor for daemons under path "/system/cluster"
private val clusterDaemons = {
2012-03-03 23:55:48 +01:00
val createChild = CreateChild ( Props [ ClusterDaemonSupervisor ] , "cluster" )
2012-03-02 16:20:30 +01:00
Await . result ( system . systemGuardian ? createChild , defaultTimeout . duration ) match {
case a : ActorRef ⇒ a
case e : Exception ⇒ throw e
}
}
2012-02-20 15:26:12 +01:00
2012-02-08 14:14:01 +01:00
private val state = {
val member = Member ( remoteAddress , MemberStatus . Joining )
2012-02-20 15:45:50 +01:00
val gossip = Gossip ( members = SortedSet . empty [ Member ] + member ) + vclockNode // add me as member and update my vector clock
2012-03-09 12:56:56 +01:00
new AtomicReference [ State ] ( State ( gossip ) )
2012-02-08 14:14:01 +01:00
}
2011-10-26 08:48:16 +02:00
2012-02-08 14:14:01 +01:00
// try to join the node defined in the 'akka.cluster.node-to-join' option
2012-02-28 17:04:48 +01:00
autoJoin ( )
2012-01-24 12:09:32 +01:00
2012-03-09 12:56:56 +01:00
// ========================================================
// ===================== WORK DAEMONS =====================
// ========================================================
2012-02-14 20:50:12 +01:00
// start periodic gossip to random nodes in cluster
2011-01-01 01:50:33 +01:00
private val gossipCanceller = system . scheduler . schedule ( gossipInitialDelay , gossipFrequency ) {
2012-02-08 14:14:01 +01:00
gossip ( )
}
2012-02-14 20:50:12 +01:00
2012-03-09 12:56:56 +01:00
// start periodic cluster failure detector reaping (moving nodes condemned by the failure detector to unreachable list)
private val failureDetectorReaperCanceller = system . scheduler . schedule ( gossipInitialDelay , gossipFrequency ) { // TODO: should we use the same gossipFrequency for reaping?
reapUnreachableMembers ( )
2012-02-08 14:14:01 +01:00
}
2012-01-24 12:09:32 +01:00
2012-03-09 12:56:56 +01:00
// start periodic leader action management (only applies for the current leader)
private val leaderActionsCanceller = system . scheduler . schedule ( gossipInitialDelay , gossipFrequency ) { // TODO: should we use the same gossipFrequency for leaderActions?
leaderActions ( )
}
2012-03-14 14:09:09 +01:00
log . info ( "Node [{}] - has JOINED cluster successfully" , remoteAddress )
2012-03-09 12:56:56 +01:00
2012-02-14 20:50:12 +01:00
// ======================================================
// ===================== PUBLIC API =====================
// ======================================================
2012-03-09 12:56:56 +01:00
def self : Member = latestGossip . members
. find ( _ . address == remoteAddress )
. getOrElse ( throw new IllegalStateException ( "Can't find 'this' Member in the cluster membership ring" ) )
2012-02-14 20:50:12 +01:00
/* *
* Latest gossip .
*/
def latestGossip : Gossip = state . get . latestGossip
/* *
* Member status for this node .
*/
2012-03-09 12:56:56 +01:00
def status : MemberStatus = self . status
2012-02-14 20:50:12 +01:00
2012-02-20 17:22:07 +01:00
/* *
* Is this node the leader ?
*/
def isLeader : Boolean = {
2012-03-09 12:56:56 +01:00
val members = latestGossip . members
! members . isEmpty && ( remoteAddress == members . head . address )
2012-02-20 17:22:07 +01:00
}
2012-02-14 20:50:12 +01:00
/* *
* Is this node a singleton cluster ?
*/
def isSingletonCluster : Boolean = isSingletonCluster ( state . get )
2011-01-01 01:50:33 +01:00
/* *
* Checks if we have a cluster convergence .
*
* @returns Some ( convergedGossip ) if convergence have been reached and None if not
*/
def convergence : Option [ Gossip ] = convergence ( latestGossip )
2012-03-09 12:56:56 +01:00
/* *
* Returns true if the node is UP or JOINING .
*/
def isAvailable : Boolean = ! isUnavailable ( state . get )
2012-01-24 12:09:32 +01:00
/* *
2012-01-30 11:41:41 +01:00
* Shuts down all connections to other members , the cluster daemon and the periodic gossip and cleanup tasks .
2012-01-24 12:09:32 +01:00
*/
def shutdown ( ) {
2012-01-30 11:41:41 +01:00
if ( isRunning . compareAndSet ( true , false ) ) {
2012-03-02 16:20:30 +01:00
log . info ( "Node [{}] - Shutting down Node and cluster daemons..." , remoteAddress )
2012-03-02 09:55:54 +01:00
gossipCanceller . cancel ( )
2012-03-09 12:56:56 +01:00
failureDetectorReaperCanceller . cancel ( )
leaderActionsCanceller . cancel ( )
2012-03-02 16:20:30 +01:00
system . stop ( clusterDaemons )
2012-01-30 11:41:41 +01:00
}
2011-10-26 08:48:16 +02:00
}
2011-01-01 01:50:33 +01:00
/* *
* Registers a listener to subscribe to cluster membership changes .
*/
@tailrec
final def registerListener ( listener : MembershipChangeListener ) {
val localState = state . get
val newListeners = localState . memberMembershipChangeListeners + listener
val newState = localState copy ( memberMembershipChangeListeners = newListeners )
if ( ! state . compareAndSet ( localState , newState ) ) registerListener ( listener ) // recur
}
/* *
* Unsubscribes to cluster membership changes .
*/
@tailrec
final def unregisterListener ( listener : MembershipChangeListener ) {
val localState = state . get
val newListeners = localState . memberMembershipChangeListeners - listener
val newState = localState copy ( memberMembershipChangeListeners = newListeners )
if ( ! state . compareAndSet ( localState , newState ) ) unregisterListener ( listener ) // recur
}
2012-02-29 10:02:00 +01:00
/* *
* Send command to JOIN one node to another .
*/
2012-03-09 12:56:56 +01:00
def scheduleNodeJoin ( address : Address ) {
2012-02-29 10:02:00 +01:00
clusterCommandDaemon ! ClusterAction . Join ( address )
}
/* *
* Send command to issue state transition to LEAVING .
*/
2012-03-09 12:56:56 +01:00
def scheduleNodeLeave ( address : Address ) {
2012-03-03 23:55:48 +01:00
clusterCommandDaemon ! ClusterAction . Leave ( address )
2012-02-29 10:02:00 +01:00
}
/* *
* Send command to issue state transition to EXITING .
*/
2012-03-09 12:56:56 +01:00
def scheduleNodeDown ( address : Address ) {
2012-03-03 23:55:48 +01:00
clusterCommandDaemon ! ClusterAction . Down ( address )
2012-02-29 10:02:00 +01:00
}
/* *
* Send command to issue state transition to REMOVED .
*/
2012-03-09 12:56:56 +01:00
def scheduleNodeRemove ( address : Address ) {
2012-03-03 23:55:48 +01:00
clusterCommandDaemon ! ClusterAction . Remove ( address )
2012-02-29 10:02:00 +01:00
}
2011-01-01 01:50:33 +01:00
// ========================================================
// ===================== INTERNAL API =====================
// ========================================================
2012-02-08 14:14:01 +01:00
/* *
2012-02-28 17:04:48 +01:00
* State transition to JOINING .
2012-02-08 14:14:01 +01:00
* New node joining .
*/
@tailrec
2011-01-01 01:50:33 +01:00
private [ cluster ] final def joining ( node : Address ) {
2012-03-12 19:22:02 +01:00
log . info ( "Node [{}] - Node [{}] is JOINING" , remoteAddress , node )
2012-02-08 16:15:31 +01:00
2012-02-14 20:50:12 +01:00
val localState = state . get
val localGossip = localState . latestGossip
val localMembers = localGossip . members
2012-03-09 12:56:56 +01:00
val localOverview = localGossip . overview
val localUnreachableMembers = localOverview . unreachable
// remove the node from the 'unreachable' set in case it is a DOWN node that is rejoining cluster
val newUnreachableMembers = localUnreachableMembers filterNot { _ . address == node }
val newOverview = localOverview copy ( unreachable = newUnreachableMembers )
2012-02-08 16:15:31 +01:00
2012-02-14 20:50:12 +01:00
val newMembers = localMembers + Member ( node , MemberStatus . Joining ) // add joining node as Joining
2012-03-09 12:56:56 +01:00
val newGossip = localGossip copy ( overview = newOverview , members = newMembers )
2012-02-14 20:50:12 +01:00
2012-02-20 15:45:50 +01:00
val versionedGossip = newGossip + vclockNode
2012-02-14 20:50:12 +01:00
val seenVersionedGossip = versionedGossip seen remoteAddress
val newState = localState copy ( latestGossip = seenVersionedGossip )
if ( ! state . compareAndSet ( localState , newState ) ) joining ( node ) // recur if we failed update
2012-02-18 22:14:53 +01:00
else {
2012-03-02 09:55:54 +01:00
failureDetector heartbeat node // update heartbeat in failure detector
2012-02-18 22:14:53 +01:00
if ( convergence ( newState . latestGossip ) . isDefined ) {
2012-03-02 09:55:54 +01:00
newState . memberMembershipChangeListeners foreach { _ notify newMembers }
2012-02-18 22:14:53 +01:00
}
}
2012-02-08 14:14:01 +01:00
}
2012-01-24 12:09:32 +01:00
2012-02-28 17:04:48 +01:00
/* *
* State transition to UP .
*/
2012-03-09 12:56:56 +01:00
private [ cluster ] final def up ( address : Address ) {
2012-03-12 19:22:02 +01:00
log . info ( "Node [{}] - Marking node [{}] as UP" , remoteAddress , address )
2012-03-09 12:56:56 +01:00
}
2012-02-28 17:04:48 +01:00
/* *
* State transition to LEAVING .
*/
2012-03-09 12:56:56 +01:00
private [ cluster ] final def leaving ( address : Address ) {
2012-03-12 19:22:02 +01:00
log . info ( "Node [{}] - Marking node [{}] as LEAVING" , remoteAddress , address )
2012-03-09 12:56:56 +01:00
}
2012-02-28 17:04:48 +01:00
/* *
* State transition to EXITING .
*/
2012-03-09 12:56:56 +01:00
private [ cluster ] final def exiting ( address : Address ) {
2012-03-12 19:22:02 +01:00
log . info ( "Node [{}] - Marking node [{}] as EXITING" , remoteAddress , address )
2012-03-09 12:56:56 +01:00
}
2012-02-28 17:04:48 +01:00
/* *
* State transition to REMOVED .
*/
2012-03-09 12:56:56 +01:00
private [ cluster ] final def removing ( address : Address ) {
2012-03-12 19:22:02 +01:00
log . info ( "Node [{}] - Marking node [{}] as REMOVED" , remoteAddress , address )
2012-03-09 12:56:56 +01:00
}
2012-02-28 17:04:48 +01:00
/* *
2012-03-09 12:56:56 +01:00
* The node to DOWN is removed from the 'members' set and put in the 'unreachable' set ( if not alread there )
* and its status is set to DOWN . The node is alo removed from the 'seen' table .
*
* The node will reside as DOWN in the 'unreachable' set until an explicit command JOIN command is sent directly
* to this node and it will then go through the normal JOINING procedure .
2012-02-28 17:04:48 +01:00
*/
2012-03-09 12:56:56 +01:00
@tailrec
final private [ cluster ] def downing ( address : Address ) {
val localState = state . get
val localGossip = localState . latestGossip
val localMembers = localGossip . members
val localOverview = localGossip . overview
val localSeen = localOverview . seen
val localUnreachableMembers = localOverview . unreachable
// 1. check if the node to DOWN is in the 'members' set
var downedMember : Option [ Member ] = None
val newMembers =
localMembers
. map { member ⇒
if ( member . address == address ) {
log . info ( "Node [{}] - Marking node [{}] as DOWN" , remoteAddress , member . address )
val newMember = member copy ( status = MemberStatus . Down )
downedMember = Some ( newMember )
newMember
} else member
}
. filter ( _ . status != MemberStatus . Down )
// 2. check if the node to DOWN is in the 'unreachable' set
val newUnreachableMembers =
localUnreachableMembers
. filter ( _ . status != MemberStatus . Down ) // no need to DOWN members already DOWN
. map { member ⇒
if ( member . address == address ) {
log . info ( "Node [{}] - Marking unreachable node [{}] as DOWN" , remoteAddress , member . address )
member copy ( status = MemberStatus . Down )
} else member
}
// 3. add the newly DOWNED members from the 'members' (in step 1.) to the 'newUnreachableMembers' set.
val newUnreachablePlusNewlyDownedMembers = downedMember match {
case Some ( member ) ⇒ newUnreachableMembers + member
case None ⇒ newUnreachableMembers
}
// 4. remove nodes marked as DOWN from the 'seen' table
val newSeen = newUnreachablePlusNewlyDownedMembers . foldLeft ( localSeen ) { ( currentSeen , member ) ⇒
currentSeen - member . address
}
val newOverview = localOverview copy ( seen = newSeen , unreachable = newUnreachablePlusNewlyDownedMembers ) // update gossip overview
val newGossip = localGossip copy ( overview = newOverview , members = newMembers ) // update gossip
val versionedGossip = newGossip + vclockNode
val newState = localState copy ( latestGossip = versionedGossip seen remoteAddress )
if ( ! state . compareAndSet ( localState , newState ) ) downing ( address ) // recur if we fail the update
else {
if ( convergence ( newState . latestGossip ) . isDefined ) {
newState . memberMembershipChangeListeners foreach { _ notify newState . latestGossip . members }
}
}
}
2012-02-28 17:04:48 +01:00
2011-10-26 08:48:16 +02:00
/* *
2012-02-08 14:14:01 +01:00
* Receive new gossip .
2011-10-26 08:48:16 +02:00
*/
2012-02-09 15:59:10 +01:00
@tailrec
2012-03-09 12:56:56 +01:00
final private [ cluster ] def receive ( sender : Member , remoteGossip : Gossip ) {
2012-02-14 20:50:12 +01:00
val localState = state . get
val localGossip = localState . latestGossip
2012-02-09 15:59:10 +01:00
2012-02-14 20:50:12 +01:00
val winningGossip =
if ( remoteGossip . version <> localGossip . version ) {
// concurrent
2012-03-09 12:56:56 +01:00
val mergedGossip = remoteGossip merge localGossip
2012-02-20 15:45:50 +01:00
val versionedMergedGossip = mergedGossip + vclockNode
2012-02-09 15:59:10 +01:00
2012-02-18 22:14:53 +01:00
log . debug (
"Can't establish a causal relationship between \"remote\" gossip [{}] and \"local\" gossip [{}] - merging them into [{}]" ,
2012-02-14 20:50:12 +01:00
remoteGossip , localGossip , versionedMergedGossip )
versionedMergedGossip
} else if ( remoteGossip . version < localGossip . version ) {
// local gossip is newer
localGossip
} else {
// remote gossip is newer
remoteGossip
}
val newState = localState copy ( latestGossip = winningGossip seen remoteAddress )
2012-02-09 15:59:10 +01:00
// if we won the race then update else try again
2012-02-14 20:50:12 +01:00
if ( ! state . compareAndSet ( localState , newState ) ) receive ( sender , remoteGossip ) // recur if we fail the update
2012-02-18 22:14:53 +01:00
else {
2012-03-02 09:55:54 +01:00
log . debug ( "Node [{}] - Receiving gossip from [{}]" , remoteAddress , sender . address )
failureDetector heartbeat sender . address // update heartbeat in failure detector
2012-02-18 22:14:53 +01:00
if ( convergence ( newState . latestGossip ) . isDefined ) {
2012-03-02 09:55:54 +01:00
newState . memberMembershipChangeListeners foreach { _ notify newState . latestGossip . members }
2012-02-18 22:14:53 +01:00
}
}
2011-10-26 08:48:16 +02:00
}
2012-01-24 12:09:32 +01:00
/* *
2012-02-07 16:53:49 +01:00
* Joins the pre - configured contact point and retrieves current gossip state .
2012-01-24 12:09:32 +01:00
*/
2012-02-28 17:04:48 +01:00
private def autoJoin ( ) = nodeToJoin foreach { address ⇒
2012-02-20 15:26:12 +01:00
val connection = clusterCommandConnectionFor ( address )
2012-02-29 10:02:00 +01:00
val command = ClusterAction . Join ( remoteAddress )
2012-02-20 15:26:12 +01:00
log . info ( "Node [{}] - Sending [{}] to [{}] through connection [{}]" , remoteAddress , command , address , connection )
connection ! command
2012-01-24 12:09:32 +01:00
}
2012-02-09 15:59:10 +01:00
/* *
2012-02-28 17:04:48 +01:00
* Switches the member status .
*
* @param newStatus the new member status
* @param oldState the state to change the member status in
* @return the updated new state with the new member status
2012-02-09 15:59:10 +01:00
*/
2012-02-28 17:04:48 +01:00
private def switchMemberStatusTo ( newStatus : MemberStatus , state : State ) : State = {
2012-02-08 15:11:06 +01:00
log . info ( "Node [{}] - Switching membership status to [{}]" , remoteAddress , newStatus )
2012-03-09 12:56:56 +01:00
val localSelf = self
2012-02-08 15:11:06 +01:00
2012-02-28 17:04:48 +01:00
val localGossip = state . latestGossip
2012-02-14 20:50:12 +01:00
val localMembers = localGossip . members
2012-02-08 15:11:06 +01:00
2012-03-09 12:56:56 +01:00
// change my state into a "new" self
2012-02-14 20:50:12 +01:00
val newSelf = localSelf copy ( status = newStatus )
2012-03-09 12:56:56 +01:00
// change my state in 'gossip.members'
2012-02-14 20:50:12 +01:00
val newMembersSet = localMembers map { member ⇒
2012-02-08 15:11:06 +01:00
if ( member . address == remoteAddress ) newSelf
else member
}
2012-02-14 20:50:12 +01:00
2012-02-08 15:11:06 +01:00
// ugly crap to work around bug in scala colletions ('val ss: SortedSet[Member] = SortedSet.empty[Member] ++ aSet' does not compile)
2012-02-08 16:15:31 +01:00
val newMembersSortedSet = SortedSet [ Member ] ( newMembersSet . toList : _ * )
2012-02-14 20:50:12 +01:00
val newGossip = localGossip copy ( members = newMembersSortedSet )
2012-03-09 12:56:56 +01:00
// version my changes
2012-02-20 15:45:50 +01:00
val versionedGossip = newGossip + vclockNode
2012-02-14 20:50:12 +01:00
val seenVersionedGossip = versionedGossip seen remoteAddress
2012-02-08 15:11:06 +01:00
2012-03-09 12:56:56 +01:00
state copy ( latestGossip = seenVersionedGossip )
2012-02-08 15:11:06 +01:00
}
2011-10-26 08:48:16 +02:00
/* *
2012-02-09 15:59:10 +01:00
* Gossips latest gossip to an address .
2012-02-08 14:14:01 +01:00
*/
2012-02-09 15:59:10 +01:00
private def gossipTo ( address : Address ) {
2012-02-20 15:26:12 +01:00
val connection = clusterGossipConnectionFor ( address )
log . debug ( "Node [{}] - Gossiping to [{}]" , remoteAddress , connection )
connection ! GossipEnvelope ( self , latestGossip )
2012-02-08 14:14:01 +01:00
}
/* *
* Gossips latest gossip to a random member in the set of members passed in as argument .
2012-01-24 12:09:32 +01:00
*
2012-02-07 16:53:49 +01:00
* @return 'true' if it gossiped to a "deputy" member .
2011-10-26 08:48:16 +02:00
*/
2012-03-02 09:55:54 +01:00
private def gossipToRandomNodeOf ( addresses : Iterable [ Address ] ) : Boolean = {
2012-03-09 12:56:56 +01:00
if ( addresses . isEmpty ) false
else {
val peers = addresses filter ( _ != remoteAddress ) // filter out myself
val peer = selectRandomNode ( peers )
gossipTo ( peer )
deputyNodes exists ( peer == _ )
}
}
/* *
* Initates a new round of gossip .
*/
private def gossip ( ) {
val localState = state . get
val localGossip = localState . latestGossip
val localMembers = localGossip . members
if ( ! isSingletonCluster ( localState ) && isAvailable ( localState ) ) {
// only gossip if we are a non-singleton cluster and available
log . debug ( "Node [{}] - Initiating new round of gossip" , remoteAddress )
val localGossip = localState . latestGossip
val localMembers = localGossip . members
val localMembersSize = localMembers . size
val localUnreachableMembers = localGossip . overview . unreachable
val localUnreachableSize = localUnreachableMembers . size
// 1. gossip to alive members
val gossipedToDeputy = gossipToRandomNodeOf ( localMembers map { _ . address } )
// 2. gossip to unreachable members
if ( localUnreachableSize > 0 ) {
val probability : Double = localUnreachableSize / ( localMembersSize + 1 )
if ( random . nextDouble ( ) < probability ) gossipToRandomNodeOf ( localUnreachableMembers . map ( _ . address ) )
}
// 3. gossip to a deputy nodes for facilitating partition healing
val deputies = deputyNodes
if ( ( ! gossipedToDeputy || localMembersSize < 1 ) && ! deputies . isEmpty ) {
if ( localMembersSize == 0 ) gossipToRandomNodeOf ( deputies )
else {
val probability = 1.0 / localMembersSize + localUnreachableSize
if ( random . nextDouble ( ) <= probability ) gossipToRandomNodeOf ( deputies )
}
}
}
2011-10-26 08:48:16 +02:00
}
2012-02-07 16:53:49 +01:00
/* *
2012-03-09 12:56:56 +01:00
* Reaps the unreachable members ( moves them to the 'unreachable' list in the cluster overview ) according to the failure detector 's verdict .
2011-10-26 08:48:16 +02:00
*/
@tailrec
2012-03-09 12:56:56 +01:00
final private def reapUnreachableMembers ( ) {
2012-02-14 20:50:12 +01:00
val localState = state . get
2012-03-09 12:56:56 +01:00
if ( ! isSingletonCluster ( localState ) && isAvailable ( localState ) ) {
// only scrutinize if we are a non-singleton cluster and available
2012-02-14 20:50:12 +01:00
val localGossip = localState . latestGossip
val localOverview = localGossip . overview
2012-02-20 17:22:07 +01:00
val localSeen = localOverview . seen
2012-02-14 20:50:12 +01:00
val localMembers = localGossip . members
2012-03-09 12:56:56 +01:00
val localUnreachableMembers = localGossip . overview . unreachable
2012-02-14 20:50:12 +01:00
val newlyDetectedUnreachableMembers = localMembers filterNot { member ⇒ failureDetector . isAvailable ( member . address ) }
2012-02-09 15:59:10 +01:00
2012-03-09 12:56:56 +01:00
if ( ! newlyDetectedUnreachableMembers . isEmpty ) { // we have newly detected members marked as unavailable
2012-02-08 16:15:31 +01:00
2012-02-14 20:50:12 +01:00
val newMembers = localMembers diff newlyDetectedUnreachableMembers
2012-03-09 12:56:56 +01:00
val newUnreachableMembers : Set [ Member ] = localUnreachableMembers ++ newlyDetectedUnreachableMembers
2012-02-20 17:22:07 +01:00
2012-03-09 12:56:56 +01:00
val newOverview = localOverview copy ( unreachable = newUnreachableMembers )
2012-02-14 20:50:12 +01:00
val newGossip = localGossip copy ( overview = newOverview , members = newMembers )
2012-03-09 12:56:56 +01:00
// updating vclock and 'seen' table
2012-02-20 15:45:50 +01:00
val versionedGossip = newGossip + vclockNode
2012-02-14 20:50:12 +01:00
val seenVersionedGossip = versionedGossip seen remoteAddress
val newState = localState copy ( latestGossip = seenVersionedGossip )
2012-02-08 16:15:31 +01:00
// if we won the race then update else try again
2012-03-09 12:56:56 +01:00
if ( ! state . compareAndSet ( localState , newState ) ) reapUnreachableMembers ( ) // recur
2012-02-08 16:15:31 +01:00
else {
2012-03-09 12:56:56 +01:00
log . info ( "Node [{}] - Marking node(s) as UNREACHABLE [{}]" , remoteAddress , newlyDetectedUnreachableMembers . mkString ( ", " ) )
2012-03-02 09:55:54 +01:00
2012-02-18 22:14:53 +01:00
if ( convergence ( newState . latestGossip ) . isDefined ) {
2012-03-02 09:55:54 +01:00
newState . memberMembershipChangeListeners foreach { _ notify newMembers }
2012-02-18 22:14:53 +01:00
}
2012-02-08 16:15:31 +01:00
}
2011-10-26 08:48:16 +02:00
}
}
}
2012-02-18 22:14:53 +01:00
/* *
2012-03-09 12:56:56 +01:00
* Runs periodic leader actions , such as auto - downing unreachable nodes , assigning partitions etc .
*/
@tailrec
final private def leaderActions ( ) {
val localState = state . get
val localGossip = localState . latestGossip
val localMembers = localGossip . members
val isLeader = ! localMembers . isEmpty && ( remoteAddress == localMembers . head . address )
if ( isLeader && isAvailable ( localState ) ) {
// only run the leader actions if we are the LEADER and available
val localOverview = localGossip . overview
val localSeen = localOverview . seen
val localUnreachableMembers = localGossip . overview . unreachable
// Leader actions are as follows:
// 1. Move JOINING => UP
// 2. Move EXITING => REMOVED
// 3. Move UNREACHABLE => DOWN (auto-downing by leader)
// 4. Updating the vclock version for the changes
// 5. Updating the 'seen' table
var hasChangedState = false
val newGossip =
if ( convergence ( localGossip ) . isDefined ) {
// we have convergence - so we can't have unreachable nodes
val newMembers =
localMembers map { member ⇒
// 1. Move JOINING => UP
if ( member . status == MemberStatus . Joining ) {
log . info ( "Node [{}] - Leader is moving node [{}] from JOINING to UP" , remoteAddress , member . address )
hasChangedState = true
member copy ( status = MemberStatus . Up )
} else member
} map { member ⇒
// 2. Move EXITING => REMOVED
if ( member . status == MemberStatus . Exiting ) {
log . info ( "Node [{}] - Leader is moving node [{}] from EXITING to REMOVED" , remoteAddress , member . address )
hasChangedState = true
member copy ( status = MemberStatus . Removed )
} else member
}
localGossip copy ( members = newMembers ) // update gossip
} else if ( autoDown ) {
// we don't have convergence - so we might have unreachable nodes
// if 'auto-down' is turned on, then try to auto-down any unreachable nodes
// 3. Move UNREACHABLE => DOWN (auto-downing by leader)
val newUnreachableMembers =
localUnreachableMembers
. filter ( _ . status != MemberStatus . Down ) // no need to DOWN members already DOWN
. map { member ⇒
log . info ( "Node [{}] - Leader is marking unreachable node [{}] as DOWN" , remoteAddress , member . address )
hasChangedState = true
member copy ( status = MemberStatus . Down )
}
// removing nodes marked as DOWN from the 'seen' table
val newSeen = localUnreachableMembers . foldLeft ( localSeen ) ( ( currentSeen , member ) ⇒ currentSeen - member . address )
val newOverview = localOverview copy ( seen = newSeen , unreachable = newUnreachableMembers ) // update gossip overview
localGossip copy ( overview = newOverview ) // update gossip
} else localGossip
if ( hasChangedState ) { // we have a change of state - version it and try to update
// 4. Updating the vclock version for the changes
val versionedGossip = newGossip + vclockNode
// 5. Updating the 'seen' table
val seenVersionedGossip = versionedGossip seen remoteAddress
val newState = localState copy ( latestGossip = seenVersionedGossip )
// if we won the race then update else try again
if ( ! state . compareAndSet ( localState , newState ) ) leaderActions ( ) // recur
else {
if ( convergence ( newState . latestGossip ) . isDefined ) {
newState . memberMembershipChangeListeners map { _ notify newGossip . members }
}
}
}
}
}
/* *
* Checks if we have a cluster convergence . If there are any unreachable nodes then we can 't have a convergence -
* waiting for user to act ( issuing DOWN ) or leader to act ( issuing DOWN through auto - down ) .
2012-02-18 22:14:53 +01:00
*
* @returns Some ( convergedGossip ) if convergence have been reached and None if not
*/
private def convergence ( gossip : Gossip ) : Option [ Gossip ] = {
2012-02-29 10:02:00 +01:00
val overview = gossip . overview
2012-03-09 12:56:56 +01:00
val unreachable = overview . unreachable
// First check that:
// 1. we don't have any members that are unreachable (unreachable.isEmpty == true), or
// 2. all unreachable members in the set have status DOWN
// Else we can't continue to check for convergence
// When that is done we check that all the entries in the 'seen' table have the same vector clock version
2012-03-12 19:22:02 +01:00
if ( unreachable . isEmpty || ! unreachable . exists ( m ⇒ m . status != MemberStatus . Down || m . status != MemberStatus . Removed ) ) {
2012-03-09 12:56:56 +01:00
val seen = gossip . overview . seen
val views = Set . empty [ VectorClock ] ++ seen . values
if ( views . size == 1 ) {
log . debug ( "Node [{}] - Cluster convergence reached" , remoteAddress )
Some ( gossip )
} else None
2012-02-18 22:14:53 +01:00
} else None
2012-03-09 12:56:56 +01:00
}
private def isAvailable ( state : State ) : Boolean = ! isUnavailable ( state )
private def isUnavailable ( state : State ) : Boolean = {
val localGossip = state . latestGossip
val localOverview = localGossip . overview
val localMembers = localGossip . members
val localUnreachableMembers = localOverview . unreachable
val isUnreachable = localUnreachableMembers exists { _ . address == remoteAddress }
val hasUnavailableMemberStatus = localMembers exists { m ⇒ ( m == self ) && MemberStatus . isUnavailable ( m . status ) }
isUnreachable || hasUnavailableMemberStatus
2012-02-18 22:14:53 +01:00
}
2012-02-07 16:53:49 +01:00
/* *
2012-03-09 12:56:56 +01:00
* Looks up and returns the local cluster command connection .
2012-02-07 16:53:49 +01:00
*/
2012-03-09 12:56:56 +01:00
private def clusterCommandDaemon = system . actorFor ( RootActorPath ( remoteAddress ) / "system" / "cluster" / "commands" )
2012-02-07 16:53:49 +01:00
2012-03-03 23:55:48 +01:00
/* *
2012-03-09 12:56:56 +01:00
* Looks up and returns the remote cluster command connection for the specific address .
2012-03-03 23:55:48 +01:00
*/
2012-03-09 12:56:56 +01:00
private def clusterCommandConnectionFor ( address : Address ) : ActorRef = system . actorFor ( RootActorPath ( address ) / "system" / "cluster" / "commands" )
2012-03-03 23:55:48 +01:00
2012-02-07 16:53:49 +01:00
/* *
2012-03-09 12:56:56 +01:00
* Looks up and returns the remote cluster gossip connection for the specific address .
2012-02-07 16:53:49 +01:00
*/
2012-03-02 16:20:30 +01:00
private def clusterGossipConnectionFor ( address : Address ) : ActorRef = system . actorFor ( RootActorPath ( address ) / "system" / "cluster" / "gossip" )
2012-01-24 12:09:32 +01:00
2012-03-09 12:56:56 +01:00
/* *
* Gets an Iterable with the addresses of a all the 'deputy' nodes - excluding this node if part of the group .
*/
2012-03-02 09:55:54 +01:00
private def deputyNodes : Iterable [ Address ] = state . get . latestGossip . members . toIterable map ( _ . address ) drop 1 take nrOfDeputyNodes filter ( _ != remoteAddress )
2012-01-24 12:09:32 +01:00
2012-03-02 09:55:54 +01:00
private def selectRandomNode ( addresses : Iterable [ Address ] ) : Address = addresses . toSeq ( random nextInt addresses . size )
2012-02-14 20:50:12 +01:00
private def isSingletonCluster ( currentState : State ) : Boolean = currentState . latestGossip . members . size == 1
2011-10-26 08:48:16 +02:00
}