2011-10-26 08:48:16 +02:00
/* *
2012-01-19 18:21:06 +01:00
* Copyright ( C ) 2009 - 2012 Typesafe Inc . < http : //www.typesafe.com>
2011-10-26 08:48:16 +02:00
*/
2012-01-31 13:33:04 +01:00
package akka.cluster
2011-10-26 08:48:16 +02:00
import akka.actor._
import akka.actor.Status._
2012-01-31 13:33:04 +01:00
import akka.remote._
2011-10-27 12:46:10 +02:00
import akka.event.Logging
2012-01-24 12:09:32 +01:00
import akka.dispatch.Await
2012-01-31 13:33:04 +01:00
import akka.pattern.ask
import akka.util._
2011-11-25 12:02:25 +01:00
import akka.config.ConfigurationException
2012-01-30 11:41:41 +01:00
import java.util.concurrent.atomic. { AtomicReference , AtomicBoolean }
2012-01-24 12:09:32 +01:00
import java.util.concurrent.TimeUnit._
import java.util.concurrent.TimeoutException
2011-10-27 15:14:15 +02:00
import java.security.SecureRandom
2011-10-26 08:48:16 +02:00
import System. { currentTimeMillis ⇒ newTimestamp }
2011-11-25 12:02:25 +01:00
2012-01-30 11:41:41 +01:00
import scala.collection.immutable. { Map , SortedSet }
2011-10-26 08:48:16 +02:00
import scala.annotation.tailrec
2011-11-25 12:02:25 +01:00
2012-01-24 12:09:32 +01:00
import com.google.protobuf.ByteString
2011-10-26 08:48:16 +02:00
/* *
2012-01-30 11:41:41 +01:00
* Interface for member membership change listener .
2011-10-26 08:48:16 +02:00
*/
trait NodeMembershipChangeListener {
2012-01-30 11:41:41 +01:00
def memberConnected ( member : Member )
def memberDisconnected ( member : Member )
2011-10-26 08:48:16 +02:00
}
2012-02-07 16:53:49 +01:00
// FIXME create Protobuf messages out of all the Gossip stuff - but wait until the prototol is fully stablized.
2011-10-26 08:48:16 +02:00
/* *
2012-01-30 11:41:41 +01:00
* Base trait for all cluster messages . All ClusterMessage 's are serializable .
2011-10-26 08:48:16 +02:00
*/
2012-01-30 19:40:28 +01:00
sealed trait ClusterMessage extends Serializable
2012-01-30 11:41:41 +01:00
/* *
* Command to join the cluster .
*/
2012-02-07 16:53:49 +01:00
case class Join ( node : Address ) extends ClusterMessage
2011-10-26 08:48:16 +02:00
2012-01-30 11:41:41 +01:00
/* *
2012-02-09 13:36:39 +01:00
* Command to leave the cluster .
2012-01-30 11:41:41 +01:00
*/
2012-02-09 13:36:39 +01:00
case class Leave ( node : Address ) extends ClusterMessage
/* *
* Command to mark node as temporay down .
*/
case class Down ( node : Address ) extends ClusterMessage
/* *
* Command to remove a node from the cluster immediately .
*/
case class Remove ( node : Address ) extends ClusterMessage
2011-10-26 08:48:16 +02:00
2012-01-30 11:41:41 +01:00
/* *
* Represents the address and the current status of a cluster member node .
*/
case class Member ( address : Address , status : MemberStatus ) extends ClusterMessage
2011-10-26 08:48:16 +02:00
2012-01-24 12:09:32 +01:00
/* *
2012-01-30 11:41:41 +01:00
* Defines the current status of a cluster member node
*
* Can be one of : Joining , Up , Leaving , Exiting and Down .
2012-01-24 12:09:32 +01:00
*/
2012-02-07 16:53:49 +01:00
sealed trait MemberStatus extends ClusterMessage
2012-01-30 11:41:41 +01:00
object MemberStatus {
2012-02-07 16:53:49 +01:00
case object Joining extends MemberStatus
case object Up extends MemberStatus
case object Leaving extends MemberStatus
case object Exiting extends MemberStatus
case object Down extends MemberStatus
2012-01-24 12:09:32 +01:00
}
2012-02-09 13:36:39 +01:00
// sealed trait PartitioningStatus
// object PartitioningStatus {
// case object Complete extends PartitioningStatus
// case object Awaiting extends PartitioningStatus
2012-01-30 11:41:41 +01:00
// }
2012-01-24 12:09:32 +01:00
2012-02-09 13:36:39 +01:00
// case class PartitioningChange(
// from: Address,
// to: Address,
// path: PartitionPath,
// status: PartitioningStatus)
/* *
* Represents the overview of the cluster , holds the cluster convergence table and unreachable nodes .
*/
case class GossipOverview (
seen : Map [ Member , VectorClock ] = Map . empty [ Member , VectorClock ] ,
unreachable : Set [ Member ] = Set . empty [ Member ] )
/* *
* Represents the state of the cluster ; cluster ring membership , ring convergence , meta data - all versioned by a vector clock .
*/
case class Gossip (
overview : GossipOverview = GossipOverview ( ) ,
self : Member ,
members : SortedSet [ Member ] , // sorted set of members with their status, sorted by name
//partitions: Tree[PartitionPath, Node] = Tree.empty[PartitionPath, Node],
//pending: Set[PartitioningChange] = Set.empty[PartitioningChange],
meta : Map [ String , Array [ Byte ] ] = Map . empty [ String , Array [ Byte ] ] ,
version : VectorClock = VectorClock ( ) ) // vector clock version
extends ClusterMessage // is a serializable cluster message
with Versioned // has a vector clock as version
2012-01-24 12:09:32 +01:00
2012-01-30 11:41:41 +01:00
final class ClusterDaemon ( system : ActorSystem , gossiper : Gossiper ) extends Actor {
2012-01-24 12:09:32 +01:00
val log = Logging ( system , "ClusterDaemon" )
def receive = {
2012-02-09 13:36:39 +01:00
case gossip : Gossip ⇒ gossiper . receive ( gossip )
case Join ( address ) ⇒ gossiper . joining ( address )
case Leave ( address ) ⇒ //gossiper.leaving(address)
case Down ( address ) ⇒ //gossiper.downing(address)
case Remove ( address ) ⇒ //gossiper.removing(address)
case unknown ⇒ log . error ( "Unknown message sent to cluster daemon [" + unknown + "]" )
2012-01-24 12:09:32 +01:00
}
}
2011-10-26 08:48:16 +02:00
/* *
* This module is responsible for Gossiping cluster information . The abstraction maintains the list of live
2012-01-30 11:41:41 +01:00
* and dead members . Periodically i . e . every 1 second this module chooses a random member and initiates a round
2011-10-26 08:48:16 +02:00
* of Gossip with it . Whenever it gets gossip updates it updates the Failure Detector with the liveness
* information .
* < p />
2012-01-30 11:41:41 +01:00
* During each of these runs the member initiates gossip exchange according to following rules ( as defined in the
2011-10-26 08:48:16 +02:00
* Cassandra documentation [ http :// wiki . apache . org / cassandra / ArchitectureGossip ] :
* < pre >
2012-01-30 11:41:41 +01:00
* 1 ) Gossip to random live member ( if any )
* 2 ) Gossip to random unreachable member with certain probability depending on number of unreachable and live members
2012-02-07 16:53:49 +01:00
* 3 ) If the member gossiped to at ( 1 ) was not deputy , or the number of live members is less than number of deputy list ,
* gossip to random deputy with certain probability depending on number of unreachable , deputy and live members .
2011-10-26 08:48:16 +02:00
* </ pre >
*/
2012-02-08 14:14:01 +01:00
case class Gossiper ( system : ActorSystemImpl , remote : RemoteActorRefProvider ) {
2011-10-26 08:48:16 +02:00
/* *
* Represents the state for this Gossiper . Implemented using optimistic lockless concurrency ,
* all state is represented by this immutable case class and managed by an AtomicReference .
*/
private case class State (
2012-02-08 14:14:01 +01:00
latestGossip : Gossip ,
isSingletonCluster : Boolean = true , // starts as singleton cluster
2012-01-30 11:41:41 +01:00
memberMembershipChangeListeners : Set [ NodeMembershipChangeListener ] = Set . empty [ NodeMembershipChangeListener ] )
2011-10-26 08:48:16 +02:00
2012-01-31 15:00:46 +01:00
val remoteSettings = new RemoteSettings ( system . settings . config , system . name )
val clusterSettings = new ClusterSettings ( system . settings . config , system . name )
2012-01-30 19:40:28 +01:00
2012-02-08 14:14:01 +01:00
val remoteAddress = remote . transport . address
val memberFingerprint = remoteAddress . # #
2012-02-07 16:53:49 +01:00
val gossipInitialDelay = clusterSettings . GossipInitialDelay
2012-01-31 15:00:46 +01:00
val gossipFrequency = clusterSettings . GossipFrequency
2012-02-07 16:53:49 +01:00
2012-02-08 16:15:31 +01:00
implicit val memberOrdering = Ordering . fromLessThan [ Member ] ( _ . address . toString > _ . address . toString )
2012-01-24 12:09:32 +01:00
implicit val defaultTimeout = Timeout ( remoteSettings . RemoteSystemDaemonAckTimeout )
2012-02-08 14:14:01 +01:00
private val nodeToJoin : Option [ Member ] =
clusterSettings . NodeToJoin filter ( _ != remoteAddress ) map ( address ⇒ Member ( address , MemberStatus . Joining ) )
2011-10-26 08:48:16 +02:00
2012-01-31 15:00:46 +01:00
private val serialization = remote . serialization
2012-02-07 16:53:49 +01:00
private val failureDetector = new AccrualFailureDetector (
system , clusterSettings . FailureDetectorThreshold , clusterSettings . FailureDetectorMaxSampleSize )
2012-01-31 15:00:46 +01:00
2012-01-30 11:41:41 +01:00
private val isRunning = new AtomicBoolean ( true )
2012-01-24 12:09:32 +01:00
private val log = Logging ( system , "Gossiper" )
2011-10-27 15:14:15 +02:00
private val random = SecureRandom . getInstance ( "SHA1PRNG" )
2012-01-30 11:41:41 +01:00
// Is it right to put this guy under the /system path or should we have a top-level /cluster or something else...?
2012-01-24 12:09:32 +01:00
private val clusterDaemon = system . systemActorOf ( Props ( new ClusterDaemon ( system , this ) ) , "cluster" )
2012-02-08 14:14:01 +01:00
private val state = {
val member = Member ( remoteAddress , MemberStatus . Joining )
2012-02-08 16:15:31 +01:00
val gossip = Gossip ( self = member , members = SortedSet . empty [ Member ] + member )
2012-02-08 14:14:01 +01:00
new AtomicReference [ State ] ( State ( gossip ) )
}
2011-10-26 08:48:16 +02:00
2012-01-31 13:33:04 +01:00
// FIXME manage connections in some other way so we can delete the RemoteConnectionManager (SINCE IT SUCKS!!!)
private val connectionManager = new RemoteConnectionManager ( system , remote , failureDetector , Map . empty [ Address , ActorRef ] )
2012-02-08 14:14:01 +01:00
log . info ( "Node [{}] - Starting cluster Gossiper..." , remoteAddress )
2012-01-24 12:09:32 +01:00
2012-02-08 14:14:01 +01:00
// try to join the node defined in the 'akka.cluster.node-to-join' option
2012-02-08 15:11:06 +01:00
nodeToJoin match {
case None ⇒ switchStatusTo ( MemberStatus . Up ) // if we are singleton cluster then we are already considered to be UP
case Some ( member ) ⇒ join ( member )
}
2012-01-24 12:09:32 +01:00
2012-01-30 11:41:41 +01:00
// start periodic gossip and cluster scrutinization
2012-02-08 14:14:01 +01:00
val gossipCanceller = system . scheduler . schedule ( gossipInitialDelay , gossipFrequency ) {
gossip ( )
}
val scrutinizeCanceller = system . scheduler . schedule ( gossipInitialDelay , gossipFrequency ) {
scrutinize ( )
}
2012-01-24 12:09:32 +01:00
/* *
2012-01-30 11:41:41 +01:00
* Shuts down all connections to other members , the cluster daemon and the periodic gossip and cleanup tasks .
2012-01-24 12:09:32 +01:00
*/
def shutdown ( ) {
2012-02-09 13:36:39 +01:00
// FIXME Cheating. Can't just shut down. Node must first gossip an Leave command, wait for Leader to do proper Handoff and then await an Exit command before switching to Removed
2012-01-30 11:41:41 +01:00
if ( isRunning . compareAndSet ( true , false ) ) {
2012-02-09 13:36:39 +01:00
log . info ( "Node [{}] - Shutting down Gossiper and ClusterDaemon" , remoteAddress )
2012-01-31 15:07:15 +01:00
try connectionManager . shutdown ( ) finally {
try system . stop ( clusterDaemon ) finally {
2012-02-08 14:14:01 +01:00
try gossipCanceller . cancel ( ) finally {
2012-01-31 15:07:15 +01:00
try scrutinizeCanceller . cancel ( ) finally {
2012-02-08 14:14:01 +01:00
log . info ( "Node [{}] - Gossiper is shut down" , remoteAddress )
2012-01-31 15:07:15 +01:00
}
}
}
}
2012-01-30 11:41:41 +01:00
}
2011-10-26 08:48:16 +02:00
}
2012-02-08 14:14:01 +01:00
/* *
* Latest gossip .
*/
def latestGossip : Gossip = state . get . latestGossip
/* *
* Member status for this node .
*/
def self : Member = latestGossip . self
/* *
* Is this node a singleton cluster ?
*/
def isSingletonCluster : Boolean = state . get . isSingletonCluster
/* *
* New node joining .
*/
@tailrec
final def joining ( node : Address ) {
log . debug ( "Node [{}] - Node [{}] is joining" , remoteAddress , node )
val oldState = state . get
val oldGossip = oldState . latestGossip
val oldMembers = oldGossip . members
val newGossip = oldGossip copy ( members = oldMembers + Member ( node , MemberStatus . Joining ) ) // add joining node as Joining
val newState = oldState copy ( latestGossip = incrementVersionForGossip ( newGossip ) )
2012-02-08 16:15:31 +01:00
// FIXME set flag state.isSingletonCluster = false (if true)
2012-02-08 14:14:01 +01:00
if ( ! state . compareAndSet ( oldState , newState ) ) joining ( node ) // recur if we failed update
}
2012-01-24 12:09:32 +01:00
2011-10-26 08:48:16 +02:00
/* *
2012-02-08 14:14:01 +01:00
* Receive new gossip .
2011-10-26 08:48:16 +02:00
*/
2012-01-30 11:41:41 +01:00
//@tailrec
2012-02-08 14:14:01 +01:00
final def receive ( newGossip : Gossip ) {
val from = newGossip . self
log . debug ( "Node [{}] - Receiving gossip from [{}]" , remoteAddress , from . address )
failureDetector heartbeat from . address // update heartbeat in failure detector
2011-10-26 08:48:16 +02:00
2012-02-08 14:14:01 +01:00
// FIXME set flag state.isSingletonCluster = false (if true)
2011-10-26 08:48:16 +02:00
2012-01-30 11:41:41 +01:00
// FIXME all below here is WRONG - redesign with cluster convergence in mind
// val oldState = state.get
// println("-------- NEW VERSION " + newGossip)
2012-02-08 14:14:01 +01:00
// println("-------- OLD VERSION " + oldState.latestGossip)
// val gossip = VectorClock.latestVersionOf(newGossip, oldState.latestGossip)
// println("-------- WINNING VERSION " + gossip)
2012-01-30 11:41:41 +01:00
2012-02-09 13:36:39 +01:00
// val latestMembers = gossip.members
// val latestUnreachableMembers = gossip.overview.unreachable
2012-02-08 14:14:01 +01:00
// println("=======>>> myself: " + myself)
2012-02-09 13:36:39 +01:00
// println("=======>>> latestMembers: " + latestMembers)
// if (!(latestMembers contains myself) && !(latestUnreachableMembers contains myself)) {
2012-01-30 11:41:41 +01:00
// println("-------- NEW NODE")
// // we have a new member
2012-02-09 13:36:39 +01:00
// val newGossip = gossip copy (availableNodes = latestMembers + myself)
2012-02-08 14:14:01 +01:00
// val newState = oldState copy (latestGossip = incrementVersionForGossip(newGossip))
2012-01-30 11:41:41 +01:00
// println("--------- new GOSSIP " + newGossip.members)
// println("--------- new STATE " + newState)
// // if we won the race then update else try again
2012-02-08 14:14:01 +01:00
// if (!state.compareAndSet(oldState, newState)) receive(newGossip) // recur
2012-01-30 11:41:41 +01:00
// else {
// println("---------- WON RACE - setting state")
// // create connections for all new members in the latest gossip
2012-02-09 13:36:39 +01:00
// (latestMembers + myself) foreach { member ⇒
2012-02-07 16:53:49 +01:00
// setUpConnectionTo(member)
2012-01-30 11:41:41 +01:00
// oldState.memberMembershipChangeListeners foreach (_ memberConnected member) // notify listeners about the new members
// }
// }
2012-02-09 13:36:39 +01:00
// } else if (latestUnreachableMembers contains myself) {
2012-01-30 11:41:41 +01:00
// // gossip from an old former dead member
2012-02-09 13:36:39 +01:00
// val newUnreachableMembers = latestUnreachableMembers - myself
// val newMembers = latestMembers + myself
2012-01-30 11:41:41 +01:00
2012-02-09 13:36:39 +01:00
// val newGossip = gossip copy (availableNodes = newMembers, unavailableNodes = newUnreachableMembers)
2012-02-08 14:14:01 +01:00
// val newState = oldState copy (latestGossip = incrementVersionForGossip(newGossip))
2012-01-30 11:41:41 +01:00
// // if we won the race then update else try again
2012-02-08 14:14:01 +01:00
// if (!state.compareAndSet(oldState, newState)) receive(newGossip) // recur
// else oldState.memberMembershipChangeListeners foreach (_ memberConnected myself) // notify listeners on successful update of state
2012-01-30 11:41:41 +01:00
// }
2011-10-26 08:48:16 +02:00
}
2012-01-24 12:09:32 +01:00
/* *
* Registers a listener to subscribe to cluster membership changes .
*/
2011-10-26 08:48:16 +02:00
@tailrec
final def registerListener ( listener : NodeMembershipChangeListener ) {
val oldState = state . get
2012-01-30 11:41:41 +01:00
val newListeners = oldState . memberMembershipChangeListeners + listener
val newState = oldState copy ( memberMembershipChangeListeners = newListeners )
2011-10-26 08:48:16 +02:00
if ( ! state . compareAndSet ( oldState , newState ) ) registerListener ( listener ) // recur
}
2012-01-24 12:09:32 +01:00
/* *
* Unsubscribes to cluster membership changes .
*/
2011-10-26 08:48:16 +02:00
@tailrec
final def unregisterListener ( listener : NodeMembershipChangeListener ) {
val oldState = state . get
2012-01-30 11:41:41 +01:00
val newListeners = oldState . memberMembershipChangeListeners - listener
val newState = oldState copy ( memberMembershipChangeListeners = newListeners )
2011-10-26 08:48:16 +02:00
if ( ! state . compareAndSet ( oldState , newState ) ) unregisterListener ( listener ) // recur
}
2012-01-24 12:09:32 +01:00
/* *
2012-02-07 16:53:49 +01:00
* Joins the pre - configured contact point and retrieves current gossip state .
2012-01-24 12:09:32 +01:00
*/
2012-02-08 15:11:06 +01:00
private def join ( member : Member ) {
2012-02-08 14:14:01 +01:00
setUpConnectionTo ( member ) foreach { connection ⇒
val command = Join ( remoteAddress )
2012-02-08 15:11:06 +01:00
log . info ( "Node [{}] - Sending [{}] to [{}]" , remoteAddress , command , member . address )
2012-02-08 14:14:01 +01:00
connection ! command
2012-02-07 16:53:49 +01:00
}
2012-02-07 16:53:49 +01:00
contactPoint match {
case None ⇒ log . info ( "Booting up in singleton cluster mode" )
case Some ( member ) ⇒
log . info ( "Trying to join contact point node defined in the configuration [{}]" , member )
setUpConnectionTo ( member ) match {
case None ⇒ log . error ( "Could not set up connection to join contact point node defined in the configuration [{}]" , member )
case Some ( connection ) ⇒ tryJoinContactPoint ( connection , deadline )
}
}
2012-01-24 12:09:32 +01:00
}
2011-10-26 08:48:16 +02:00
/* *
* Initates a new round of gossip .
*/
2012-02-08 14:14:01 +01:00
private def gossip ( ) {
2011-10-26 08:48:16 +02:00
val oldState = state . get
2012-02-08 16:15:31 +01:00
if ( ! oldState . isSingletonCluster ) { // do not gossip if we are a singleton cluster
val oldGossip = oldState . latestGossip
val oldMembers = oldGossip . members
val oldMembersSize = oldMembers . size
2012-02-09 13:36:39 +01:00
val oldUnreachableMembers = oldGossip . overview . unreachable
val oldUnreachableSize = oldUnreachableMembers . size
2012-02-08 16:15:31 +01:00
// 1. gossip to alive members
val shouldGossipToDeputy =
2012-02-09 13:36:39 +01:00
if ( oldUnreachableSize > 0 ) gossipToRandomNodeOf ( oldMembers )
2012-02-08 16:15:31 +01:00
else false
// 2. gossip to dead members
2012-02-09 13:36:39 +01:00
if ( oldUnreachableSize > 0 ) {
val probability : Double = oldUnreachableSize / ( oldMembersSize + 1 )
if ( random . nextDouble ( ) < probability ) gossipToRandomNodeOf ( oldUnreachableMembers )
2012-02-08 16:15:31 +01:00
}
2011-10-26 08:48:16 +02:00
2012-02-08 16:15:31 +01:00
// 3. gossip to a deputy nodes for facilitating partition healing
val deputies = deputyNodesWithoutMyself
if ( ( ! shouldGossipToDeputy || oldMembersSize < 1 ) && ! deputies . isEmpty ) {
if ( oldMembersSize == 0 ) gossipToRandomNodeOf ( deputies )
else {
2012-02-09 13:36:39 +01:00
val probability = 1.0 / oldMembersSize + oldUnreachableSize
2012-02-08 16:15:31 +01:00
if ( random . nextDouble ( ) <= probability ) gossipToRandomNodeOf ( deputies )
}
2011-10-26 08:48:16 +02:00
}
}
}
2012-02-08 15:11:06 +01:00
@tailrec
final private def switchStatusTo ( newStatus : MemberStatus ) {
log . info ( "Node [{}] - Switching membership status to [{}]" , remoteAddress , newStatus )
val oldState = state . get
val oldGossip = oldState . latestGossip
val oldSelf = oldGossip . self
val oldMembers = oldGossip . members
val newSelf = oldSelf copy ( status = newStatus )
val newMembersSet = oldMembers map { member ⇒
if ( member . address == remoteAddress ) newSelf
else member
}
// ugly crap to work around bug in scala colletions ('val ss: SortedSet[Member] = SortedSet.empty[Member] ++ aSet' does not compile)
2012-02-08 16:15:31 +01:00
val newMembersSortedSet = SortedSet [ Member ] ( newMembersSet . toList : _ * )
2012-02-08 15:11:06 +01:00
val newGossip = oldGossip copy ( self = newSelf , members = newMembersSortedSet )
val newState = oldState copy ( latestGossip = incrementVersionForGossip ( newGossip ) )
if ( ! state . compareAndSet ( oldState , newState ) ) switchStatusTo ( newStatus ) // recur if we failed update
}
2011-10-26 08:48:16 +02:00
/* *
2012-02-08 14:14:01 +01:00
* Gossips latest gossip to a member .
*/
private def gossipTo ( member : Member ) {
setUpConnectionTo ( member ) foreach { _ ! latestGossip }
}
/* *
* Gossips latest gossip to a random member in the set of members passed in as argument .
2012-01-24 12:09:32 +01:00
*
2012-02-07 16:53:49 +01:00
* @return 'true' if it gossiped to a "deputy" member .
2011-10-26 08:48:16 +02:00
*/
2012-02-07 16:53:49 +01:00
private def gossipToRandomNodeOf ( members : Seq [ Member ] ) : Boolean = {
2012-02-08 14:14:01 +01:00
val peers = members filter ( _ . address != remoteAddress ) // filter out myself
2011-10-26 08:48:16 +02:00
val peer = selectRandomNode ( peers )
val oldState = state . get
2012-02-08 14:14:01 +01:00
val oldGossip = oldState . latestGossip
2012-01-30 11:41:41 +01:00
// if connection can't be established/found => ignore it since the failure detector will take care of the potential problem
2012-02-08 14:14:01 +01:00
gossipTo ( peer )
2012-02-07 16:53:49 +01:00
deputyNodesWithoutMyself exists ( peer == _ )
2011-10-26 08:48:16 +02:00
}
2012-02-07 16:53:49 +01:00
/* *
* Gossips to a random member in the set of members passed in as argument .
*
* @return 'true' if it gossiped to a "deputy" member .
*/
private def gossipToRandomNodeOf ( members : Set [ Member ] ) : Boolean = gossipToRandomNodeOf ( members . toList )
2011-10-26 08:48:16 +02:00
/* *
2012-01-30 11:41:41 +01:00
* Scrutinizes the cluster ; marks members detected by the failure detector as unavailable , and notifies all listeners
2011-10-26 08:48:16 +02:00
* of the change in the cluster membership .
*/
@tailrec
final private def scrutinize ( ) {
val oldState = state . get
2012-02-08 16:15:31 +01:00
if ( ! oldState . isSingletonCluster ) { // do not scrutinize if we are a singleton cluster
val oldGossip = oldState . latestGossip
2012-02-09 13:36:39 +01:00
val oldOverview = oldGossip . overview
2012-02-08 16:15:31 +01:00
val oldMembers = oldGossip . members
2012-02-09 13:36:39 +01:00
val oldUnreachableMembers = oldGossip . overview . unreachable
val newlyDetectedUnreachableMembers = oldMembers filterNot ( member ⇒ failureDetector . isAvailable ( member . address ) )
2012-02-08 16:15:31 +01:00
2012-02-09 13:36:39 +01:00
if ( ! newlyDetectedUnreachableMembers . isEmpty ) { // we have newly detected members marked as unavailable
val newMembers = oldMembers diff newlyDetectedUnreachableMembers
val newUnreachableMembers = oldUnreachableMembers ++ newlyDetectedUnreachableMembers
2012-02-08 16:15:31 +01:00
2012-02-09 13:36:39 +01:00
val newOverview = oldOverview copy ( unreachable = newUnreachableMembers )
val newGossip = oldGossip copy ( overview = newOverview , members = newMembers )
2012-02-08 16:15:31 +01:00
val newState = oldState copy ( latestGossip = incrementVersionForGossip ( newGossip ) )
// if we won the race then update else try again
if ( ! state . compareAndSet ( oldState , newState ) ) scrutinize ( ) // recur
else {
// notify listeners on successful update of state
for {
2012-02-09 13:36:39 +01:00
deadNode ← newUnreachableMembers
2012-02-08 16:15:31 +01:00
listener ← oldState . memberMembershipChangeListeners
} listener memberDisconnected deadNode
}
2011-10-26 08:48:16 +02:00
}
}
}
2012-02-07 16:53:49 +01:00
// FIXME should shuffle list randomly before start traversing to avoid connecting to some member on every member
@tailrec
final private def connectToRandomNodeOf ( members : Seq [ Member ] ) : ActorRef = {
members match {
case member : : rest ⇒
setUpConnectionTo ( member ) match {
case Some ( connection ) ⇒ connection
case None ⇒ connectToRandomNodeOf ( rest ) // recur if
}
case Nil ⇒
throw new RemoteConnectionException (
"Could not establish connection to any of the members in the argument list" )
}
}
/* *
* Sets up remote connections to all the members in the argument list .
*/
private def setUpConnectionsTo ( members : Seq [ Member ] ) : Seq [ Option [ ActorRef ] ] = members map { setUpConnectionTo ( _ ) }
/* *
* Sets up remote connection .
*/
private def setUpConnectionTo ( member : Member ) : Option [ ActorRef ] = {
2012-01-30 11:41:41 +01:00
val address = member . address
2012-01-24 12:09:32 +01:00
try {
2012-02-08 14:14:01 +01:00
Some ( connectionManager . putIfAbsent ( address , ( ) ⇒ system . actorFor ( RootActorPath ( address ) / "system" / "cluster" ) ) )
2012-01-24 12:09:32 +01:00
} catch {
case e : Exception ⇒ None
}
}
2011-10-26 08:48:16 +02:00
private def incrementVersionForGossip ( from : Gossip ) : Gossip = {
2012-02-07 16:53:49 +01:00
from copy ( version = from . version . increment ( memberFingerprint , newTimestamp ) )
2011-10-26 08:48:16 +02:00
}
2012-02-08 14:14:01 +01:00
private def deputyNodesWithoutMyself : Seq [ Member ] = Seq . empty [ Member ] filter ( _ . address != remoteAddress ) // FIXME read in deputy nodes from gossip data - now empty seq
2012-01-24 12:09:32 +01:00
2012-02-07 16:53:49 +01:00
private def selectRandomNode ( members : Seq [ Member ] ) : Member = members ( random . nextInt ( members . size ) )
2011-10-26 08:48:16 +02:00
}