2011-10-26 08:48:16 +02:00
/* *
2012-01-19 18:21:06 +01:00
* Copyright ( C ) 2009 - 2012 Typesafe Inc . < http : //www.typesafe.com>
2011-10-26 08:48:16 +02:00
*/
2012-01-31 13:33:04 +01:00
package akka.cluster
2011-10-26 08:48:16 +02:00
import akka.actor._
import akka.actor.Status._
2012-01-31 13:33:04 +01:00
import akka.remote._
2012-02-20 15:26:12 +01:00
import akka.routing._
2011-10-27 12:46:10 +02:00
import akka.event.Logging
2012-01-24 12:09:32 +01:00
import akka.dispatch.Await
2012-01-31 13:33:04 +01:00
import akka.pattern.ask
import akka.util._
2011-11-25 12:02:25 +01:00
import akka.config.ConfigurationException
2012-01-30 11:41:41 +01:00
import java.util.concurrent.atomic. { AtomicReference , AtomicBoolean }
2012-01-24 12:09:32 +01:00
import java.util.concurrent.TimeUnit._
import java.util.concurrent.TimeoutException
2011-10-27 15:14:15 +02:00
import java.security.SecureRandom
2011-11-25 12:02:25 +01:00
2012-01-30 11:41:41 +01:00
import scala.collection.immutable. { Map , SortedSet }
2011-10-26 08:48:16 +02:00
import scala.annotation.tailrec
2011-11-25 12:02:25 +01:00
2012-01-24 12:09:32 +01:00
import com.google.protobuf.ByteString
2011-10-26 08:48:16 +02:00
/* *
2012-02-09 15:59:10 +01:00
* Interface for membership change listener .
2011-10-26 08:48:16 +02:00
*/
2012-02-18 22:14:53 +01:00
trait MembershipChangeListener {
2012-02-09 15:59:10 +01:00
def notify ( members : SortedSet [ Member ] ) : Unit
}
/* *
* Interface for meta data change listener .
*/
trait MetaDataChangeListener { // FIXME add management and notification for MetaDataChangeListener
def notify ( meta : Map [ String , Array [ Byte ] ] ) : Unit
2011-10-26 08:48:16 +02:00
}
2012-02-07 16:53:49 +01:00
// FIXME create Protobuf messages out of all the Gossip stuff - but wait until the prototol is fully stablized.
2011-10-26 08:48:16 +02:00
/* *
2012-01-30 11:41:41 +01:00
* Base trait for all cluster messages . All ClusterMessage 's are serializable .
2011-10-26 08:48:16 +02:00
*/
2012-01-30 19:40:28 +01:00
sealed trait ClusterMessage extends Serializable
2012-01-30 11:41:41 +01:00
/* *
* Command to join the cluster .
*/
2012-02-07 16:53:49 +01:00
case class Join ( node : Address ) extends ClusterMessage
2011-10-26 08:48:16 +02:00
2012-01-30 11:41:41 +01:00
/* *
2012-02-09 13:36:39 +01:00
* Command to leave the cluster .
2012-01-30 11:41:41 +01:00
*/
2012-02-09 13:36:39 +01:00
case class Leave ( node : Address ) extends ClusterMessage
/* *
* Command to mark node as temporay down .
*/
case class Down ( node : Address ) extends ClusterMessage
/* *
* Command to remove a node from the cluster immediately .
*/
case class Remove ( node : Address ) extends ClusterMessage
2011-10-26 08:48:16 +02:00
2012-01-30 11:41:41 +01:00
/* *
* Represents the address and the current status of a cluster member node .
*/
case class Member ( address : Address , status : MemberStatus ) extends ClusterMessage
2011-10-26 08:48:16 +02:00
2012-02-09 15:59:10 +01:00
/* *
* Envelope adding a sender address to the gossip .
*/
case class GossipEnvelope ( sender : Member , gossip : Gossip ) extends ClusterMessage
2012-01-24 12:09:32 +01:00
/* *
2012-01-30 11:41:41 +01:00
* Defines the current status of a cluster member node
*
* Can be one of : Joining , Up , Leaving , Exiting and Down .
2012-01-24 12:09:32 +01:00
*/
2012-02-07 16:53:49 +01:00
sealed trait MemberStatus extends ClusterMessage
2012-01-30 11:41:41 +01:00
object MemberStatus {
2012-02-07 16:53:49 +01:00
case object Joining extends MemberStatus
case object Up extends MemberStatus
case object Leaving extends MemberStatus
case object Exiting extends MemberStatus
case object Down extends MemberStatus
2012-01-24 12:09:32 +01:00
}
2012-02-09 13:36:39 +01:00
// sealed trait PartitioningStatus
// object PartitioningStatus {
// case object Complete extends PartitioningStatus
// case object Awaiting extends PartitioningStatus
2012-01-30 11:41:41 +01:00
// }
2012-01-24 12:09:32 +01:00
2012-02-09 13:36:39 +01:00
// case class PartitioningChange(
// from: Address,
// to: Address,
// path: PartitionPath,
// status: PartitioningStatus)
/* *
2012-02-14 20:50:12 +01:00
* Represents the overview of the cluster , holds the cluster convergence table and set with unreachable nodes .
2012-02-09 13:36:39 +01:00
*/
case class GossipOverview (
2012-02-09 15:59:10 +01:00
seen : Map [ Address , VectorClock ] = Map . empty [ Address , VectorClock ] ,
2012-02-14 20:50:12 +01:00
unreachable : Set [ Address ] = Set . empty [ Address ] ) {
override def toString =
"GossipOverview(seen = [" + seen . mkString ( ", " ) +
"], unreachable = [" + unreachable . mkString ( ", " ) +
"])"
}
2012-02-09 13:36:39 +01:00
/* *
* Represents the state of the cluster ; cluster ring membership , ring convergence , meta data - all versioned by a vector clock .
*/
case class Gossip (
overview : GossipOverview = GossipOverview ( ) ,
members : SortedSet [ Member ] , // sorted set of members with their status, sorted by name
2012-02-20 15:26:12 +01:00
//partitions: Tree[PartitionPath, Node] = Tree.empty[PartitionPath, Node], // name/partition service
2012-02-09 13:36:39 +01:00
//pending: Set[PartitioningChange] = Set.empty[PartitioningChange],
meta : Map [ String , Array [ Byte ] ] = Map . empty [ String , Array [ Byte ] ] ,
version : VectorClock = VectorClock ( ) ) // vector clock version
extends ClusterMessage // is a serializable cluster message
2012-02-14 20:50:12 +01:00
with Versioned [ Gossip ] {
/* *
* Increments the version for this 'Node' .
*/
def + ( node : VectorClock . Node ) : Gossip = copy ( version = version + node )
2012-02-09 15:59:10 +01:00
2012-02-14 20:50:12 +01:00
def + ( member : Member ) : Gossip = {
2012-02-09 15:59:10 +01:00
if ( members contains member ) this
else this copy ( members = members + member )
}
/* *
* Marks the gossip as seen by this node ( remoteAddress ) by updating the address entry in the 'gossip. overview . seen '
* Map with the VectorClock for the new gossip .
*/
2012-02-14 20:50:12 +01:00
def seen ( address : Address ) : Gossip =
2012-02-09 15:59:10 +01:00
this copy ( overview = overview copy ( seen = overview . seen + ( address -> version ) ) )
2012-02-14 20:50:12 +01:00
override def toString =
"Gossip(" +
"overview = " + overview +
", members = [" + members . mkString ( ", " ) +
"], meta = [" + meta . mkString ( ", " ) +
"], version = " + version +
")"
2012-02-09 15:59:10 +01:00
}
2012-01-24 12:09:32 +01:00
2012-02-20 15:26:12 +01:00
// FIXME ClusterCommandDaemon with FSM trait
/* *
* Single instance . FSM managing the different cluster nodes states .
2012-02-20 15:45:50 +01:00
* Serialized access to Node .
2012-02-20 15:26:12 +01:00
*/
2012-02-20 15:45:50 +01:00
final class ClusterCommandDaemon ( system : ActorSystem , node : Node ) extends Actor {
2012-02-20 15:26:12 +01:00
val log = Logging ( system , "ClusterCommandDaemon" )
def receive = {
2012-02-20 15:45:50 +01:00
case Join ( address ) ⇒ node . joining ( address )
case Leave ( address ) ⇒ //node.leaving(address)
case Down ( address ) ⇒ //node.downing(address)
case Remove ( address ) ⇒ //node.removing(address)
2012-02-20 15:26:12 +01:00
case unknown ⇒ log . error ( "Unknown message sent to cluster daemon [" + unknown + "]" )
}
}
/* *
* Pooled and routed wit N number of configurable instances .
2012-02-20 15:45:50 +01:00
* Concurrent access to Node .
2012-02-20 15:26:12 +01:00
*/
2012-02-20 15:45:50 +01:00
final class ClusterGossipDaemon ( system : ActorSystem , node : Node ) extends Actor {
2012-02-20 15:26:12 +01:00
val log = Logging ( system , "ClusterGossipDaemon" )
2012-01-24 12:09:32 +01:00
def receive = {
2012-02-20 15:45:50 +01:00
case GossipEnvelope ( sender , gossip ) ⇒ node . receive ( sender , gossip )
2012-02-09 15:59:10 +01:00
case unknown ⇒ log . error ( "Unknown message sent to cluster daemon [" + unknown + "]" )
2012-01-24 12:09:32 +01:00
}
}
2012-02-14 20:50:12 +01:00
// FIXME Cluster public API should be an Extension
2012-02-20 15:45:50 +01:00
// FIXME Add cluster Node class and refactor out all non-gossip related stuff out of Node
2012-02-14 20:50:12 +01:00
2011-10-26 08:48:16 +02:00
/* *
* This module is responsible for Gossiping cluster information . The abstraction maintains the list of live
2012-01-30 11:41:41 +01:00
* and dead members . Periodically i . e . every 1 second this module chooses a random member and initiates a round
2011-10-26 08:48:16 +02:00
* of Gossip with it . Whenever it gets gossip updates it updates the Failure Detector with the liveness
* information .
* < p />
2012-01-30 11:41:41 +01:00
* During each of these runs the member initiates gossip exchange according to following rules ( as defined in the
2011-10-26 08:48:16 +02:00
* Cassandra documentation [ http :// wiki . apache . org / cassandra / ArchitectureGossip ] :
* < pre >
2012-01-30 11:41:41 +01:00
* 1 ) Gossip to random live member ( if any )
* 2 ) Gossip to random unreachable member with certain probability depending on number of unreachable and live members
2012-02-07 16:53:49 +01:00
* 3 ) If the member gossiped to at ( 1 ) was not deputy , or the number of live members is less than number of deputy list ,
* gossip to random deputy with certain probability depending on number of unreachable , deputy and live members .
2011-10-26 08:48:16 +02:00
* </ pre >
*/
2012-02-20 15:45:50 +01:00
case class Node ( system : ActorSystemImpl , remote : RemoteActorRefProvider ) {
2011-10-26 08:48:16 +02:00
/* *
2012-02-20 15:45:50 +01:00
* Represents the state for this Node . Implemented using optimistic lockless concurrency ,
2011-10-26 08:48:16 +02:00
* all state is represented by this immutable case class and managed by an AtomicReference .
*/
private case class State (
2012-02-09 15:59:10 +01:00
self : Member ,
2012-02-08 14:14:01 +01:00
latestGossip : Gossip ,
2012-02-09 15:59:10 +01:00
memberMembershipChangeListeners : Set [ MembershipChangeListener ] = Set . empty [ MembershipChangeListener ] )
2011-10-26 08:48:16 +02:00
2012-01-31 15:00:46 +01:00
val remoteSettings = new RemoteSettings ( system . settings . config , system . name )
val clusterSettings = new ClusterSettings ( system . settings . config , system . name )
2012-01-30 19:40:28 +01:00
2012-02-08 14:14:01 +01:00
val remoteAddress = remote . transport . address
2012-02-20 15:45:50 +01:00
val vclockNode = VectorClock . Node ( remoteAddress . toString )
2012-02-07 16:53:49 +01:00
val gossipInitialDelay = clusterSettings . GossipInitialDelay
2012-01-31 15:00:46 +01:00
val gossipFrequency = clusterSettings . GossipFrequency
2012-02-07 16:53:49 +01:00
2012-02-14 20:50:12 +01:00
implicit val memberOrdering = Ordering . fromLessThan [ Member ] ( _ . address . toString < _ . address . toString )
2012-02-08 16:15:31 +01:00
2012-01-24 12:09:32 +01:00
implicit val defaultTimeout = Timeout ( remoteSettings . RemoteSystemDaemonAckTimeout )
2012-02-18 17:48:07 +01:00
val failureDetector = new AccrualFailureDetector (
system , remoteAddress , clusterSettings . FailureDetectorThreshold , clusterSettings . FailureDetectorMaxSampleSize )
2012-02-20 15:26:12 +01:00
private val nrOfGossipDaemons = clusterSettings . NrOfGossipDaemons
2012-02-09 15:59:10 +01:00
private val nodeToJoin : Option [ Address ] = clusterSettings . NodeToJoin filter ( _ != remoteAddress )
2011-10-26 08:48:16 +02:00
2012-01-31 15:00:46 +01:00
private val serialization = remote . serialization
2012-01-30 11:41:41 +01:00
private val isRunning = new AtomicBoolean ( true )
2012-02-20 15:45:50 +01:00
private val log = Logging ( system , "Node" )
2011-10-27 15:14:15 +02:00
private val random = SecureRandom . getInstance ( "SHA1PRNG" )
2012-01-30 11:41:41 +01:00
// Is it right to put this guy under the /system path or should we have a top-level /cluster or something else...?
2012-02-14 20:50:12 +01:00
// FIXME should be defined as a router so we get concurrency here
2012-02-20 15:26:12 +01:00
private val clusterCommandDaemon = system . systemActorOf (
Props ( new ClusterCommandDaemon ( system , this ) ) , "clusterCommand" )
private val clusterGossipDaemon = system . systemActorOf (
Props ( new ClusterGossipDaemon ( system , this ) ) . withRouter ( RoundRobinRouter ( nrOfGossipDaemons ) ) , "clusterGossip" )
2012-02-08 14:14:01 +01:00
private val state = {
val member = Member ( remoteAddress , MemberStatus . Joining )
2012-02-20 15:45:50 +01:00
val gossip = Gossip ( members = SortedSet . empty [ Member ] + member ) + vclockNode // add me as member and update my vector clock
2012-02-09 15:59:10 +01:00
new AtomicReference [ State ] ( State ( member , gossip ) )
2012-02-08 14:14:01 +01:00
}
2011-10-26 08:48:16 +02:00
2012-02-14 20:50:12 +01:00
import Versioned.latestVersionOf
2012-02-20 15:45:50 +01:00
log . info ( "Node [{}] - Starting cluster Node..." , remoteAddress )
2012-01-24 12:09:32 +01:00
2012-02-08 14:14:01 +01:00
// try to join the node defined in the 'akka.cluster.node-to-join' option
2012-02-16 11:20:51 +01:00
join ( )
2012-01-24 12:09:32 +01:00
2012-02-14 20:50:12 +01:00
// start periodic gossip to random nodes in cluster
2012-02-08 14:14:01 +01:00
val gossipCanceller = system . scheduler . schedule ( gossipInitialDelay , gossipFrequency ) {
gossip ( )
}
2012-02-14 20:50:12 +01:00
// start periodic cluster scrutinization (moving nodes condemned by the failure detector to unreachable list)
2012-02-08 14:14:01 +01:00
val scrutinizeCanceller = system . scheduler . schedule ( gossipInitialDelay , gossipFrequency ) {
2012-02-15 15:51:27 +01:00
scrutinize ( )
2012-02-08 14:14:01 +01:00
}
2012-01-24 12:09:32 +01:00
2012-02-14 20:50:12 +01:00
// ======================================================
// ===================== PUBLIC API =====================
// ======================================================
/* *
* Latest gossip .
*/
def latestGossip : Gossip = state . get . latestGossip
/* *
* Member status for this node .
*/
def self : Member = state . get . self
2012-02-20 17:22:07 +01:00
/* *
* Is this node the leader ?
*/
def isLeader : Boolean = {
val currentState = state . get
remoteAddress == currentState . latestGossip . members . head . address
}
2012-02-14 20:50:12 +01:00
/* *
* Is this node a singleton cluster ?
*/
def isSingletonCluster : Boolean = isSingletonCluster ( state . get )
2012-01-24 12:09:32 +01:00
/* *
2012-01-30 11:41:41 +01:00
* Shuts down all connections to other members , the cluster daemon and the periodic gossip and cleanup tasks .
2012-01-24 12:09:32 +01:00
*/
def shutdown ( ) {
2012-02-09 13:36:39 +01:00
2012-02-14 20:50:12 +01:00
// FIXME Cheating for now. Can't just shut down. Node must first gossip an Leave command, wait for Leader to do proper Handoff and then await an Exit command before switching to Removed
2012-02-09 13:36:39 +01:00
2012-01-30 11:41:41 +01:00
if ( isRunning . compareAndSet ( true , false ) ) {
2012-02-20 15:45:50 +01:00
log . info ( "Node [{}] - Shutting down Node and ClusterDaemon..." , remoteAddress )
2012-02-14 20:50:12 +01:00
2012-02-20 15:26:12 +01:00
try system . stop ( clusterCommandDaemon ) finally {
try system . stop ( clusterGossipDaemon ) finally {
try gossipCanceller . cancel ( ) finally {
try scrutinizeCanceller . cancel ( ) finally {
2012-02-20 15:45:50 +01:00
log . info ( "Node [{}] - Node and ClusterDaemon shut down successfully" , remoteAddress )
2012-02-20 15:26:12 +01:00
}
2012-01-31 15:07:15 +01:00
}
}
}
2012-01-30 11:41:41 +01:00
}
2011-10-26 08:48:16 +02:00
}
2012-02-08 14:14:01 +01:00
/* *
* New node joining .
*/
@tailrec
final def joining ( node : Address ) {
2012-02-14 20:50:12 +01:00
log . info ( "Node [{}] - Node [{}] is joining" , remoteAddress , node )
2012-02-08 16:15:31 +01:00
2012-02-18 17:48:07 +01:00
failureDetector heartbeat node // update heartbeat in failure detector
2012-02-14 20:50:12 +01:00
val localState = state . get
val localGossip = localState . latestGossip
val localMembers = localGossip . members
2012-02-08 16:15:31 +01:00
2012-02-14 20:50:12 +01:00
val newMembers = localMembers + Member ( node , MemberStatus . Joining ) // add joining node as Joining
val newGossip = localGossip copy ( members = newMembers )
2012-02-20 15:45:50 +01:00
val versionedGossip = newGossip + vclockNode
2012-02-14 20:50:12 +01:00
val seenVersionedGossip = versionedGossip seen remoteAddress
val newState = localState copy ( latestGossip = seenVersionedGossip )
if ( ! state . compareAndSet ( localState , newState ) ) joining ( node ) // recur if we failed update
2012-02-18 22:14:53 +01:00
else {
if ( convergence ( newState . latestGossip ) . isDefined ) {
newState . memberMembershipChangeListeners map { _ notify newMembers } // FIXME should check for cluster convergence before triggering listeners
}
}
2012-02-08 14:14:01 +01:00
}
2012-01-24 12:09:32 +01:00
2011-10-26 08:48:16 +02:00
/* *
2012-02-08 14:14:01 +01:00
* Receive new gossip .
2011-10-26 08:48:16 +02:00
*/
2012-02-09 15:59:10 +01:00
@tailrec
2012-02-14 20:50:12 +01:00
final def receive ( sender : Member , remoteGossip : Gossip ) {
2012-02-09 15:59:10 +01:00
log . debug ( "Node [{}] - Receiving gossip from [{}]" , remoteAddress , sender . address )
2012-02-08 14:14:01 +01:00
2012-02-09 15:59:10 +01:00
failureDetector heartbeat sender . address // update heartbeat in failure detector
2011-10-26 08:48:16 +02:00
2012-02-14 20:50:12 +01:00
val localState = state . get
val localGossip = localState . latestGossip
2012-02-09 15:59:10 +01:00
2012-02-14 20:50:12 +01:00
val winningGossip =
if ( remoteGossip . version <> localGossip . version ) {
// concurrent
val mergedGossip = merge ( remoteGossip , localGossip )
2012-02-20 15:45:50 +01:00
val versionedMergedGossip = mergedGossip + vclockNode
2012-02-09 15:59:10 +01:00
2012-02-18 22:14:53 +01:00
log . debug (
"Can't establish a causal relationship between \"remote\" gossip [{}] and \"local\" gossip [{}] - merging them into [{}]" ,
2012-02-14 20:50:12 +01:00
remoteGossip , localGossip , versionedMergedGossip )
versionedMergedGossip
} else if ( remoteGossip . version < localGossip . version ) {
// local gossip is newer
localGossip
} else {
// remote gossip is newer
remoteGossip
}
val newState = localState copy ( latestGossip = winningGossip seen remoteAddress )
2012-02-09 15:59:10 +01:00
// if we won the race then update else try again
2012-02-14 20:50:12 +01:00
if ( ! state . compareAndSet ( localState , newState ) ) receive ( sender , remoteGossip ) // recur if we fail the update
2012-02-18 22:14:53 +01:00
else {
if ( convergence ( newState . latestGossip ) . isDefined ) {
newState . memberMembershipChangeListeners map { _ notify newState . latestGossip . members } // FIXME should check for cluster convergence before triggering listeners
}
}
2011-10-26 08:48:16 +02:00
}
2012-01-24 12:09:32 +01:00
/* *
* Registers a listener to subscribe to cluster membership changes .
*/
2011-10-26 08:48:16 +02:00
@tailrec
2012-02-09 15:59:10 +01:00
final def registerListener ( listener : MembershipChangeListener ) {
2012-02-14 20:50:12 +01:00
val localState = state . get
val newListeners = localState . memberMembershipChangeListeners + listener
val newState = localState copy ( memberMembershipChangeListeners = newListeners )
if ( ! state . compareAndSet ( localState , newState ) ) registerListener ( listener ) // recur
2011-10-26 08:48:16 +02:00
}
2012-01-24 12:09:32 +01:00
/* *
* Unsubscribes to cluster membership changes .
*/
2011-10-26 08:48:16 +02:00
@tailrec
2012-02-09 15:59:10 +01:00
final def unregisterListener ( listener : MembershipChangeListener ) {
2012-02-14 20:50:12 +01:00
val localState = state . get
val newListeners = localState . memberMembershipChangeListeners - listener
val newState = localState copy ( memberMembershipChangeListeners = newListeners )
if ( ! state . compareAndSet ( localState , newState ) ) unregisterListener ( listener ) // recur
2011-10-26 08:48:16 +02:00
}
2012-02-18 22:14:53 +01:00
/* *
* Checks if we have a cluster convergence .
*
* @returns Some ( convergedGossip ) if convergence have been reached and None if not
*/
def convergence : Option [ Gossip ] = convergence ( latestGossip )
2012-02-14 20:50:12 +01:00
// ========================================================
// ===================== INTERNAL API =====================
// ========================================================
2012-01-24 12:09:32 +01:00
/* *
2012-02-07 16:53:49 +01:00
* Joins the pre - configured contact point and retrieves current gossip state .
2012-01-24 12:09:32 +01:00
*/
2012-02-16 11:20:51 +01:00
private def join ( ) = nodeToJoin foreach { address ⇒
2012-02-20 15:26:12 +01:00
val connection = clusterCommandConnectionFor ( address )
val command = Join ( remoteAddress )
log . info ( "Node [{}] - Sending [{}] to [{}] through connection [{}]" , remoteAddress , command , address , connection )
connection ! command
2012-01-24 12:09:32 +01:00
}
2011-10-26 08:48:16 +02:00
/* *
* Initates a new round of gossip .
*/
2012-02-08 14:14:01 +01:00
private def gossip ( ) {
2012-02-14 20:50:12 +01:00
val localState = state . get
val localGossip = localState . latestGossip
val localMembers = localGossip . members
if ( ! isSingletonCluster ( localState ) ) { // do not gossip if we are a singleton cluster
log . debug ( "Node [{}] - Initiating new round of gossip" , remoteAddress )
2012-02-08 16:15:31 +01:00
2012-02-14 20:50:12 +01:00
val localGossip = localState . latestGossip
val localMembers = localGossip . members
val localMembersSize = localMembers . size
val localUnreachableAddresses = localGossip . overview . unreachable
val localUnreachableSize = localUnreachableAddresses . size
2012-02-08 16:15:31 +01:00
// 1. gossip to alive members
2012-02-14 20:50:12 +01:00
val gossipedToDeputy = gossipToRandomNodeOf ( localMembers . toList map { _ . address } )
2012-02-08 16:15:31 +01:00
2012-02-09 15:59:10 +01:00
// 2. gossip to unreachable members
2012-02-14 20:50:12 +01:00
if ( localUnreachableSize > 0 ) {
val probability : Double = localUnreachableSize / ( localMembersSize + 1 )
if ( random . nextDouble ( ) < probability ) gossipToRandomNodeOf ( localUnreachableAddresses . toList )
2012-02-08 16:15:31 +01:00
}
2011-10-26 08:48:16 +02:00
2012-02-08 16:15:31 +01:00
// 3. gossip to a deputy nodes for facilitating partition healing
val deputies = deputyNodesWithoutMyself
2012-02-14 20:50:12 +01:00
if ( ( ! gossipedToDeputy || localMembersSize < 1 ) && ! deputies . isEmpty ) {
if ( localMembersSize == 0 ) gossipToRandomNodeOf ( deputies )
2012-02-08 16:15:31 +01:00
else {
2012-02-14 20:50:12 +01:00
val probability = 1.0 / localMembersSize + localUnreachableSize
2012-02-08 16:15:31 +01:00
if ( random . nextDouble ( ) <= probability ) gossipToRandomNodeOf ( deputies )
}
2011-10-26 08:48:16 +02:00
}
}
}
2012-02-14 20:50:12 +01:00
/* *
* Merges two Gossip instances including membership tables , meta - data tables and the VectorClock histories .
*/
private def merge ( gossip1 : Gossip , gossip2 : Gossip ) : Gossip = {
val mergedVClock = gossip1 . version merge gossip2 . version
val mergedMembers = gossip1 . members union gossip2 . members
val mergedMeta = gossip1 . meta ++ gossip2 . meta
Gossip ( gossip2 . overview , mergedMembers , mergedMeta , mergedVClock )
}
2012-02-09 15:59:10 +01:00
/* *
* Switches the state in the FSM .
*/
2012-02-08 15:11:06 +01:00
@tailrec
final private def switchStatusTo ( newStatus : MemberStatus ) {
log . info ( "Node [{}] - Switching membership status to [{}]" , remoteAddress , newStatus )
2012-02-14 20:50:12 +01:00
val localState = state . get
val localSelf = localState . self
2012-02-08 15:11:06 +01:00
2012-02-14 20:50:12 +01:00
val localGossip = localState . latestGossip
val localMembers = localGossip . members
2012-02-08 15:11:06 +01:00
2012-02-14 20:50:12 +01:00
val newSelf = localSelf copy ( status = newStatus )
val newMembersSet = localMembers map { member ⇒
2012-02-08 15:11:06 +01:00
if ( member . address == remoteAddress ) newSelf
else member
}
2012-02-14 20:50:12 +01:00
2012-02-08 15:11:06 +01:00
// ugly crap to work around bug in scala colletions ('val ss: SortedSet[Member] = SortedSet.empty[Member] ++ aSet' does not compile)
2012-02-08 16:15:31 +01:00
val newMembersSortedSet = SortedSet [ Member ] ( newMembersSet . toList : _ * )
2012-02-14 20:50:12 +01:00
val newGossip = localGossip copy ( members = newMembersSortedSet )
2012-02-20 15:45:50 +01:00
val versionedGossip = newGossip + vclockNode
2012-02-14 20:50:12 +01:00
val seenVersionedGossip = versionedGossip seen remoteAddress
2012-02-08 15:11:06 +01:00
2012-02-14 20:50:12 +01:00
val newState = localState copy ( self = newSelf , latestGossip = seenVersionedGossip )
if ( ! state . compareAndSet ( localState , newState ) ) switchStatusTo ( newStatus ) // recur if we failed update
2012-02-08 15:11:06 +01:00
}
2011-10-26 08:48:16 +02:00
/* *
2012-02-09 15:59:10 +01:00
* Gossips latest gossip to an address .
2012-02-08 14:14:01 +01:00
*/
2012-02-09 15:59:10 +01:00
private def gossipTo ( address : Address ) {
2012-02-20 15:26:12 +01:00
val connection = clusterGossipConnectionFor ( address )
log . debug ( "Node [{}] - Gossiping to [{}]" , remoteAddress , connection )
connection ! GossipEnvelope ( self , latestGossip )
2012-02-08 14:14:01 +01:00
}
/* *
* Gossips latest gossip to a random member in the set of members passed in as argument .
2012-01-24 12:09:32 +01:00
*
2012-02-07 16:53:49 +01:00
* @return 'true' if it gossiped to a "deputy" member .
2011-10-26 08:48:16 +02:00
*/
2012-02-09 15:59:10 +01:00
private def gossipToRandomNodeOf ( addresses : Seq [ Address ] ) : Boolean = {
val peers = addresses filter ( _ != remoteAddress ) // filter out myself
2011-10-26 08:48:16 +02:00
val peer = selectRandomNode ( peers )
2012-02-14 20:50:12 +01:00
val localState = state . get
val localGossip = localState . latestGossip
2012-01-30 11:41:41 +01:00
// if connection can't be established/found => ignore it since the failure detector will take care of the potential problem
2012-02-08 14:14:01 +01:00
gossipTo ( peer )
2012-02-07 16:53:49 +01:00
deputyNodesWithoutMyself exists ( peer == _ )
2011-10-26 08:48:16 +02:00
}
2012-02-07 16:53:49 +01:00
/* *
2012-02-18 17:48:07 +01:00
* Scrutinizes the cluster ; marks members detected by the failure detector as unreachable .
2011-10-26 08:48:16 +02:00
*/
@tailrec
final private def scrutinize ( ) {
2012-02-14 20:50:12 +01:00
val localState = state . get
if ( ! isSingletonCluster ( localState ) ) { // do not scrutinize if we are a singleton cluster
val localGossip = localState . latestGossip
val localOverview = localGossip . overview
2012-02-20 17:22:07 +01:00
val localSeen = localOverview . seen
2012-02-14 20:50:12 +01:00
val localMembers = localGossip . members
val localUnreachableAddresses = localGossip . overview . unreachable
val newlyDetectedUnreachableMembers = localMembers filterNot { member ⇒ failureDetector . isAvailable ( member . address ) }
2012-02-09 15:59:10 +01:00
val newlyDetectedUnreachableAddresses = newlyDetectedUnreachableMembers map { _ . address }
if ( ! newlyDetectedUnreachableAddresses . isEmpty ) { // we have newly detected members marked as unavailable
2012-02-08 16:15:31 +01:00
2012-02-14 20:50:12 +01:00
val newMembers = localMembers diff newlyDetectedUnreachableMembers
2012-02-15 15:51:27 +01:00
val newUnreachableAddresses : Set [ Address ] = localUnreachableAddresses ++ newlyDetectedUnreachableAddresses
2012-02-14 20:50:12 +01:00
2012-02-18 17:48:07 +01:00
log . info ( "Node [{}] - Marking node(s) an unreachable [{}]" , remoteAddress , newlyDetectedUnreachableAddresses . mkString ( ", " ) )
2012-02-20 17:22:07 +01:00
val newSeen = newUnreachableAddresses . foldLeft ( localSeen ) ( ( currentSeen , address ) ⇒ currentSeen - address )
val newOverview = localOverview copy ( seen = newSeen , unreachable = newUnreachableAddresses )
2012-02-14 20:50:12 +01:00
val newGossip = localGossip copy ( overview = newOverview , members = newMembers )
2012-02-20 15:45:50 +01:00
val versionedGossip = newGossip + vclockNode
2012-02-14 20:50:12 +01:00
val seenVersionedGossip = versionedGossip seen remoteAddress
val newState = localState copy ( latestGossip = seenVersionedGossip )
2012-02-08 16:15:31 +01:00
// if we won the race then update else try again
2012-02-14 20:50:12 +01:00
if ( ! state . compareAndSet ( localState , newState ) ) scrutinize ( ) // recur
2012-02-08 16:15:31 +01:00
else {
2012-02-18 22:14:53 +01:00
if ( convergence ( newState . latestGossip ) . isDefined ) {
newState . memberMembershipChangeListeners map { _ notify newMembers } // FIXME should check for cluster convergence before triggering listeners
}
2012-02-08 16:15:31 +01:00
}
2011-10-26 08:48:16 +02:00
}
}
}
2012-02-18 22:14:53 +01:00
/* *
* Checks if we have a cluster convergence .
*
* @returns Some ( convergedGossip ) if convergence have been reached and None if not
*/
private def convergence ( gossip : Gossip ) : Option [ Gossip ] = {
val seen = gossip . overview . seen
val views = Set . empty [ VectorClock ] ++ seen . values
if ( views . size == 1 ) {
log . debug ( "Node [{}] - Cluster convergence reached" , remoteAddress )
Some ( gossip )
} else None
}
2012-02-07 16:53:49 +01:00
/* *
2012-02-20 15:26:12 +01:00
* Sets up cluster command connection .
2012-02-07 16:53:49 +01:00
*/
2012-02-20 15:26:12 +01:00
private def clusterCommandConnectionFor ( address : Address ) : ActorRef = system . actorFor ( RootActorPath ( address ) / "system" / "clusterCommand" )
2012-02-07 16:53:49 +01:00
/* *
2012-02-20 15:26:12 +01:00
* Sets up cluster gossip connection .
2012-02-07 16:53:49 +01:00
*/
2012-02-20 15:26:12 +01:00
private def clusterGossipConnectionFor ( address : Address ) : ActorRef = system . actorFor ( RootActorPath ( address ) / "system" / "clusterGossip" )
2012-01-24 12:09:32 +01:00
2012-02-09 15:59:10 +01:00
private def deputyNodesWithoutMyself : Seq [ Address ] = Seq . empty [ Address ] filter ( _ != remoteAddress ) // FIXME read in deputy nodes from gossip data - now empty seq
2012-01-24 12:09:32 +01:00
2012-02-09 15:59:10 +01:00
private def selectRandomNode ( addresses : Seq [ Address ] ) : Address = addresses ( random nextInt addresses . size )
2012-02-14 20:50:12 +01:00
private def isSingletonCluster ( currentState : State ) : Boolean = currentState . latestGossip . members . size == 1
2011-10-26 08:48:16 +02:00
}