2011-10-26 08:48:16 +02:00
/* *
2012-01-19 18:21:06 +01:00
* Copyright ( C ) 2009 - 2012 Typesafe Inc . < http : //www.typesafe.com>
2011-10-26 08:48:16 +02:00
*/
2012-01-31 13:33:04 +01:00
package akka.cluster
2011-10-26 08:48:16 +02:00
import akka.actor._
import akka.actor.Status._
2012-06-16 00:00:19 +02:00
import akka.ConfigurationException
2012-01-24 12:09:32 +01:00
import akka.dispatch.Await
2012-06-16 00:00:19 +02:00
import akka.dispatch.MonitorableThreadFactory
import akka.event.Logging
import akka.jsr166y.ThreadLocalRandom
2012-01-31 13:33:04 +01:00
import akka.pattern.ask
2012-06-16 00:00:19 +02:00
import akka.remote._
import akka.routing._
2012-01-31 13:33:04 +01:00
import akka.util._
2012-03-02 16:20:30 +01:00
import akka.util.duration._
2012-06-16 00:00:19 +02:00
import akka.util.internal.HashedWheelTimer
import com.google.protobuf.ByteString
import java.io.Closeable
import java.lang.management.ManagementFactory
2012-01-30 11:41:41 +01:00
import java.util.concurrent.atomic. { AtomicReference , AtomicBoolean }
2012-01-24 12:09:32 +01:00
import java.util.concurrent.TimeoutException
2012-06-16 00:00:19 +02:00
import java.util.concurrent.TimeUnit._
2012-04-14 20:06:03 +02:00
import javax.management._
2012-06-13 15:33:38 +02:00
import MemberStatus._
2012-06-16 00:00:19 +02:00
import scala.annotation.tailrec
import scala.collection.immutable. { Map , SortedSet }
2012-01-24 12:09:32 +01:00
2011-10-26 08:48:16 +02:00
/* *
2012-02-09 15:59:10 +01:00
* Interface for membership change listener .
2011-10-26 08:48:16 +02:00
*/
2012-02-18 22:14:53 +01:00
trait MembershipChangeListener {
2012-02-09 15:59:10 +01:00
def notify ( members : SortedSet [ Member ] ) : Unit
}
/* *
* Interface for meta data change listener .
*/
2012-03-12 19:22:02 +01:00
trait MetaDataChangeListener {
2012-02-09 15:59:10 +01:00
def notify ( meta : Map [ String , Array [ Byte ] ] ) : Unit
2011-10-26 08:48:16 +02:00
}
/* *
2012-01-30 11:41:41 +01:00
* Base trait for all cluster messages . All ClusterMessage 's are serializable .
2012-06-11 21:12:57 +02:00
*
* FIXME Protobuf all ClusterMessages
2011-10-26 08:48:16 +02:00
*/
2012-01-30 19:40:28 +01:00
sealed trait ClusterMessage extends Serializable
2012-01-30 11:41:41 +01:00
/* *
2012-02-28 17:04:48 +01:00
* Cluster commands sent by the USER .
2012-01-30 11:41:41 +01:00
*/
2012-06-08 11:51:34 +02:00
object ClusterUserAction {
2011-10-26 08:48:16 +02:00
2012-02-28 17:04:48 +01:00
/* *
* Command to join the cluster . Sent when a node ( reprsesented by 'address' )
* wants to join another node ( the receiver ) .
*/
case class Join ( address : Address ) extends ClusterMessage
2012-02-09 13:36:39 +01:00
2012-02-28 17:04:48 +01:00
/* *
* Command to leave the cluster .
*/
2012-03-03 23:55:48 +01:00
case class Leave ( address : Address ) extends ClusterMessage
2012-02-28 17:04:48 +01:00
/* *
* Command to mark node as temporary down .
*/
2012-03-03 23:55:48 +01:00
case class Down ( address : Address ) extends ClusterMessage
2012-06-08 11:51:34 +02:00
}
/* *
* Cluster commands sent by the LEADER .
*/
object ClusterLeaderAction {
2012-02-28 17:04:48 +01:00
/* *
2012-06-16 00:00:19 +02:00
* INTERNAL API .
*
2012-06-01 16:49:50 +02:00
* Command to mark a node to be removed from the cluster immediately .
* Can only be sent by the leader .
2012-02-28 17:04:48 +01:00
*/
2012-06-16 00:00:19 +02:00
private [ cluster ] case class Exit ( address : Address ) extends ClusterMessage
/* *
* INTERNAL API .
*
* Command to remove a node from the cluster immediately .
*/
private [ cluster ] case class Remove ( address : Address ) extends ClusterMessage
2012-02-28 17:04:48 +01:00
}
2011-10-26 08:48:16 +02:00
2012-01-30 11:41:41 +01:00
/* *
* Represents the address and the current status of a cluster member node .
2012-06-11 21:12:57 +02:00
*
2012-06-16 00:00:19 +02:00
* Note : `hashCode` and `equals` are solely based on the underlying `Address` , not its `MemberStatus` .
2012-01-30 11:41:41 +01:00
*/
2012-03-09 12:56:56 +01:00
class Member ( val address : Address , val status : MemberStatus ) extends ClusterMessage {
override def hashCode = address . # #
override def equals ( other : Any ) = Member . unapply ( this ) == Member . unapply ( other )
override def toString = "Member(address = %s, status = %s)" format ( address , status )
def copy ( address : Address = this . address , status : MemberStatus = this . status ) : Member = new Member ( address , status )
}
/* *
2012-06-18 15:25:17 +02:00
* Module with factory and ordering methods for Member instances .
2012-03-09 12:56:56 +01:00
*/
object Member {
2012-05-25 12:59:14 +02:00
/* *
2012-06-16 00:00:19 +02:00
* `Address` ordering type class , sorts addresses by host and port .
2012-05-25 12:59:14 +02:00
*/
2012-05-25 12:10:17 +02:00
implicit val addressOrdering : Ordering [ Address ] = Ordering . fromLessThan [ Address ] { ( a , b ) ⇒
2012-05-25 12:59:14 +02:00
if ( a . host != b . host ) a . host . getOrElse ( "" ) . compareTo ( b . host . getOrElse ( "" ) ) < 0
else if ( a . port != b . port ) a . port . getOrElse ( 0 ) < b . port . getOrElse ( 0 )
2012-05-25 12:10:17 +02:00
else false
}
2012-06-16 00:00:19 +02:00
/* *
2012-06-18 15:25:17 +02:00
* `Member` ordering type class , sorts members by host and port with the exception that
* it puts all members that are in MemberStatus . EXITING last .
2012-06-16 00:00:19 +02:00
*/
2012-06-18 15:25:17 +02:00
implicit val ordering : Ordering [ Member ] = Ordering . fromLessThan [ Member ] { ( a , b ) ⇒
2012-06-19 14:27:12 +02:00
if ( a . status == Exiting && b . status != Exiting ) false
else if ( a . status != Exiting && b . status == Exiting ) true
2012-06-18 15:25:17 +02:00
else addressOrdering . compare ( a . address , b . address ) < 0
2012-05-25 12:10:17 +02:00
}
2012-03-09 12:56:56 +01:00
def apply ( address : Address , status : MemberStatus ) : Member = new Member ( address , status )
def unapply ( other : Any ) = other match {
case m : Member ⇒ Some ( m . address )
case _ ⇒ None
}
2012-06-15 13:24:06 +02:00
def pickHighestPriority ( a : Set [ Member ] , b : Set [ Member ] ) : Set [ Member ] = {
// group all members by Address => Seq[Member]
val groupedByAddress = ( a . toSeq ++ b . toSeq ) . groupBy ( _ . address )
// pick highest MemberStatus
( Set . empty [ Member ] /: groupedByAddress ) {
case ( acc , ( _ , members ) ) ⇒ acc + members . reduceLeft ( highestPriorityOf )
}
}
2012-03-09 12:56:56 +01:00
/* *
* Picks the Member with the highest "priority" MemberStatus .
*/
def highestPriorityOf ( m1 : Member , m2 : Member ) : Member = ( m1 . status , m2 . status ) match {
case ( Removed , _ ) ⇒ m1
case ( _ , Removed ) ⇒ m2
case ( Down , _ ) ⇒ m1
case ( _ , Down ) ⇒ m2
case ( Exiting , _ ) ⇒ m1
case ( _ , Exiting ) ⇒ m2
case ( Leaving , _ ) ⇒ m1
case ( _ , Leaving ) ⇒ m2
2012-06-15 13:24:06 +02:00
case ( Up , Joining ) ⇒ m2
case ( Joining , Up ) ⇒ m1
2012-03-09 12:56:56 +01:00
case ( Joining , Joining ) ⇒ m1
case ( Up , Up ) ⇒ m1
}
}
2011-10-26 08:48:16 +02:00
2012-02-09 15:59:10 +01:00
/* *
* Envelope adding a sender address to the gossip .
*/
2012-06-11 14:59:34 +02:00
case class GossipEnvelope ( from : Address , gossip : Gossip ) extends ClusterMessage
2012-02-09 15:59:10 +01:00
2012-01-24 12:09:32 +01:00
/* *
2012-01-30 11:41:41 +01:00
* Defines the current status of a cluster member node
*
* Can be one of : Joining , Up , Leaving , Exiting and Down .
2012-01-24 12:09:32 +01:00
*/
2012-06-13 15:23:45 +02:00
sealed trait MemberStatus extends ClusterMessage {
2012-06-16 00:00:19 +02:00
2012-06-13 15:23:45 +02:00
/* *
2012-06-16 00:00:19 +02:00
* Using the same notion for 'unavailable' as 'non- convergence ' : DOWN
2012-06-13 15:23:45 +02:00
*/
2012-06-16 00:00:19 +02:00
def isUnavailable : Boolean = this == Down
2012-06-13 15:23:45 +02:00
}
2012-01-30 11:41:41 +01:00
object MemberStatus {
2012-02-07 16:53:49 +01:00
case object Joining extends MemberStatus
case object Up extends MemberStatus
case object Leaving extends MemberStatus
case object Exiting extends MemberStatus
case object Down extends MemberStatus
2012-02-28 17:04:48 +01:00
case object Removed extends MemberStatus
2012-01-24 12:09:32 +01:00
}
2012-02-09 13:36:39 +01:00
/* *
2012-02-14 20:50:12 +01:00
* Represents the overview of the cluster , holds the cluster convergence table and set with unreachable nodes .
2012-02-09 13:36:39 +01:00
*/
case class GossipOverview (
2012-02-09 15:59:10 +01:00
seen : Map [ Address , VectorClock ] = Map . empty [ Address , VectorClock ] ,
2012-03-09 12:56:56 +01:00
unreachable : Set [ Member ] = Set . empty [ Member ] ) {
2012-02-14 20:50:12 +01:00
override def toString =
"GossipOverview(seen = [" + seen . mkString ( ", " ) +
"], unreachable = [" + unreachable . mkString ( ", " ) +
"])"
}
2012-02-09 13:36:39 +01:00
2012-06-11 21:12:57 +02:00
object Gossip {
val emptyMembers : SortedSet [ Member ] = SortedSet . empty
}
2012-02-09 13:36:39 +01:00
/* *
2012-06-13 15:23:45 +02:00
* Represents the state of the cluster ; cluster ring membership , ring convergence , meta data -
* all versioned by a vector clock .
*
2012-06-19 20:11:54 +02:00
* When a node is joining the `Member` , with status `Joining` , is added to `members` .
* If the joining node was downed it is moved from `overview.unreachable` ( status `Down` )
* to `members` ( status `Joining` ) . It cannot rejoin if not first downed .
2012-06-13 15:23:45 +02:00
*
2012-06-19 20:11:54 +02:00
* When convergence is reached the leader change status of `members` from `Joining`
* to `Up` .
2012-06-13 15:23:45 +02:00
*
* When failure detector consider a node as unavailble it will be moved from
* `members` to `overview.unreachable` .
*
2012-06-19 20:11:54 +02:00
* When a node is downed , either manually or automatically , its status is changed to `Down` .
* It is also removed from `overview.seen` table . The node will reside as `Down` in the
* `overview.unreachable` set until joining again and it will then go through the normal
* joining procedure .
2012-06-13 15:23:45 +02:00
*
2012-06-19 20:11:54 +02:00
* When a `Gossip` is received the version ( vector clock ) is used to determine if the
* received `Gossip` is newer or older than the current local `Gossip` . The received `Gossip`
* and local `Gossip` is merged in case of conflicting version , i . e . vector clocks without
2012-06-13 16:54:21 +02:00
* same history . When merged the seen table is cleared .
2012-06-13 15:23:45 +02:00
*
2012-06-19 20:11:54 +02:00
* When a node is told by the user to leave the cluster the leader will move it to `Leaving`
* and then rebalance and repartition the cluster and start hand - off by migrating the actors
* from the leaving node to the new partitions . Once this process is complete the leader will
* move the node to the `Exiting` state and once a convergence is complete move the node to
* `Removed` by removing it from the `members` set and sending a `Removed` command to the
* removed node telling it to shut itself down .
2012-02-09 13:36:39 +01:00
*/
case class Gossip (
overview : GossipOverview = GossipOverview ( ) ,
2012-06-05 22:16:15 +02:00
members : SortedSet [ Member ] , // sorted set of members with their status, sorted by address
2012-02-09 13:36:39 +01:00
meta : Map [ String , Array [ Byte ] ] = Map . empty [ String , Array [ Byte ] ] ,
version : VectorClock = VectorClock ( ) ) // vector clock version
extends ClusterMessage // is a serializable cluster message
2012-02-14 20:50:12 +01:00
with Versioned [ Gossip ] {
2012-06-13 15:23:45 +02:00
// FIXME can be disabled as optimization
assertInvariants
2012-06-16 00:00:19 +02:00
2012-06-13 15:23:45 +02:00
private def assertInvariants : Unit = {
val unreachableAndLive = members . intersect ( overview . unreachable )
if ( unreachableAndLive . nonEmpty )
throw new IllegalArgumentException ( "Same nodes in both members and unreachable is not allowed, got [%s]"
format unreachableAndLive . mkString ( ", " ) )
2012-06-13 15:33:38 +02:00
val allowedLiveMemberStatuses : Set [ MemberStatus ] = Set ( Joining , Up , Leaving , Exiting )
2012-06-13 15:23:45 +02:00
def hasNotAllowedLiveMemberStatus ( m : Member ) = ! allowedLiveMemberStatuses . contains ( m . status )
if ( members exists hasNotAllowedLiveMemberStatus )
throw new IllegalArgumentException ( "Live members must have status [%s], got [%s]"
format ( allowedLiveMemberStatuses . mkString ( ", " ) ,
( members filter hasNotAllowedLiveMemberStatus ) . mkString ( ", " ) ) )
val seenButNotMember = overview . seen . keySet -- members . map ( _ . address ) -- overview . unreachable . map ( _ . address )
if ( seenButNotMember . nonEmpty )
throw new IllegalArgumentException ( "Nodes not part of cluster have marked the Gossip as seen, got [%s]"
format seenButNotMember . mkString ( ", " ) )
}
2012-02-14 20:50:12 +01:00
/* *
* Increments the version for this 'Node' .
*/
2012-06-12 16:15:05 +02:00
def : + ( node : VectorClock . Node ) : Gossip = copy ( version = version : + node )
2012-02-09 15:59:10 +01:00
2012-06-16 00:00:19 +02:00
/* *
* Adds a member to the member node ring .
*/
2012-06-12 16:15:05 +02:00
def : + ( member : Member ) : Gossip = {
2012-02-09 15:59:10 +01:00
if ( members contains member ) this
else this copy ( members = members + member )
}
/* *
2012-06-08 11:51:34 +02:00
* Marks the gossip as seen by this node ( address ) by updating the address entry in the 'gossip. overview . seen '
* Map with the VectorClock ( version ) for the new gossip .
2012-02-09 15:59:10 +01:00
*/
2012-03-02 09:55:54 +01:00
def seen ( address : Address ) : Gossip = {
if ( overview . seen . contains ( address ) && overview . seen ( address ) == version ) this
else this copy ( overview = overview copy ( seen = overview . seen + ( address -> version ) ) )
}
2012-02-09 15:59:10 +01:00
2012-03-09 12:56:56 +01:00
/* *
* Merges two Gossip instances including membership tables , meta - data tables and the VectorClock histories .
*/
def merge ( that : Gossip ) : Gossip = {
import Member.ordering
// 1. merge vector clocks
val mergedVClock = this . version merge that . version
2012-06-13 09:37:10 +02:00
// 2. merge meta-data
2012-03-09 12:56:56 +01:00
val mergedMeta = this . meta ++ that . meta
2012-06-13 09:37:10 +02:00
// 3. merge unreachable by selecting the single Member with highest MemberStatus out of the Member groups
2012-06-15 13:24:06 +02:00
val mergedUnreachable = Member . pickHighestPriority ( this . overview . unreachable , that . overview . unreachable )
2012-03-09 12:56:56 +01:00
2012-06-13 09:37:10 +02:00
// 4. merge members by selecting the single Member with highest MemberStatus out of the Member groups,
// and exclude unreachable
2012-06-18 15:25:17 +02:00
val mergedMembers = Gossip . emptyMembers ++ Member . pickHighestPriority ( this . members , that . members ) . filterNot ( mergedUnreachable . contains )
2012-03-09 12:56:56 +01:00
2012-06-13 11:19:06 +02:00
// 5. fresh seen table
val mergedSeen = Map . empty [ Address , VectorClock ]
2012-06-13 09:37:10 +02:00
Gossip ( GossipOverview ( mergedSeen , mergedUnreachable ) , mergedMembers , mergedMeta , mergedVClock )
2012-03-09 12:56:56 +01:00
}
2012-02-14 20:50:12 +01:00
override def toString =
"Gossip(" +
"overview = " + overview +
", members = [" + members . mkString ( ", " ) +
"], meta = [" + meta . mkString ( ", " ) +
"], version = " + version +
")"
2012-02-09 15:59:10 +01:00
}
2012-01-24 12:09:32 +01:00
2012-06-11 21:12:57 +02:00
/* *
* Sent at regular intervals for failure detection .
*/
case class Heartbeat ( from : Address ) extends ClusterMessage
2012-06-11 14:59:34 +02:00
2012-02-20 15:26:12 +01:00
/* *
2012-06-16 00:00:19 +02:00
* INTERNAL API .
*
2012-03-09 12:56:56 +01:00
* Manages routing of the different cluster commands .
2012-03-22 23:04:04 +01:00
* Instantiated as a single instance for each Cluster - e . g . commands are serialized to Cluster message after message .
2012-02-20 15:26:12 +01:00
*/
2012-06-16 00:00:19 +02:00
private [ cluster ] final class ClusterCommandDaemon ( cluster : Cluster ) extends Actor {
import ClusterUserAction._
import ClusterLeaderAction._
2012-03-09 12:56:56 +01:00
val log = Logging ( context . system , this )
2012-02-20 15:26:12 +01:00
2012-03-09 12:56:56 +01:00
def receive = {
2012-03-22 23:04:04 +01:00
case Join ( address ) ⇒ cluster . joining ( address )
case Down ( address ) ⇒ cluster . downing ( address )
case Leave ( address ) ⇒ cluster . leaving ( address )
case Exit ( address ) ⇒ cluster . exiting ( address )
case Remove ( address ) ⇒ cluster . removing ( address )
2012-02-20 15:26:12 +01:00
}
2012-03-09 12:56:56 +01:00
override def unhandled ( unknown : Any ) = log . error ( "Illegal command [{}]" , unknown )
2012-02-20 15:26:12 +01:00
}
/* *
2012-06-16 00:00:19 +02:00
* INTERNAL API .
*
2012-03-03 23:55:48 +01:00
* Pooled and routed with N number of configurable instances .
2012-03-22 23:04:04 +01:00
* Concurrent access to Cluster .
2012-02-20 15:26:12 +01:00
*/
2012-06-16 00:00:19 +02:00
private [ cluster ] final class ClusterGossipDaemon ( cluster : Cluster ) extends Actor {
2012-03-03 23:55:48 +01:00
val log = Logging ( context . system , this )
2012-01-24 12:09:32 +01:00
def receive = {
2012-06-11 14:59:34 +02:00
case Heartbeat ( from ) ⇒ cluster . receiveHeartbeat ( from )
case GossipEnvelope ( from , gossip ) ⇒ cluster . receiveGossip ( from , gossip )
2012-01-24 12:09:32 +01:00
}
2012-03-02 09:55:54 +01:00
2012-03-03 23:55:48 +01:00
override def unhandled ( unknown : Any ) = log . error ( "[/system/cluster/gossip] can not respond to messages - received [{}]" , unknown )
2012-03-02 16:20:30 +01:00
}
2012-03-09 12:56:56 +01:00
/* *
2012-06-16 00:00:19 +02:00
* INTERNAL API .
*
2012-03-22 23:04:04 +01:00
* Supervisor managing the different Cluster daemons .
2012-03-09 12:56:56 +01:00
*/
2012-06-16 00:00:19 +02:00
private [ cluster ] final class ClusterDaemonSupervisor ( cluster : Cluster ) extends Actor {
2012-03-03 23:55:48 +01:00
val log = Logging ( context . system , this )
2012-03-02 16:20:30 +01:00
2012-06-05 22:16:15 +02:00
private val commands = context . actorOf ( Props ( new ClusterCommandDaemon ( cluster ) ) , "commands" )
2012-03-03 23:55:48 +01:00
private val gossip = context . actorOf (
2012-06-05 22:16:15 +02:00
Props ( new ClusterGossipDaemon ( cluster ) ) . withRouter (
RoundRobinRouter ( cluster . clusterSettings . NrOfGossipDaemons ) ) , "gossip" )
2012-03-02 16:20:30 +01:00
2012-03-03 23:55:48 +01:00
def receive = Actor . emptyBehavior
2012-03-02 16:20:30 +01:00
2012-03-12 19:22:02 +01:00
override def unhandled ( unknown : Any ) : Unit = log . error ( "[/system/cluster] can not respond to messages - received [{}]" , unknown )
2012-01-24 12:09:32 +01:00
}
2012-02-22 18:40:16 +01:00
/* *
2012-03-22 23:04:04 +01:00
* Cluster Extension Id and factory for creating Cluster extension .
2012-02-22 18:40:16 +01:00
* Example :
* { { {
2012-04-12 12:25:39 +02:00
* if ( Cluster ( system ) . isLeader ) { . . . }
2012-02-29 10:02:00 +01:00
* } } }
2012-02-22 18:40:16 +01:00
*/
2012-03-22 23:04:04 +01:00
object Cluster extends ExtensionId [ Cluster ] with ExtensionIdProvider {
override def get ( system : ActorSystem ) : Cluster = super . get ( system )
2012-02-22 18:40:16 +01:00
2012-03-22 23:04:04 +01:00
override def lookup = Cluster
2012-02-22 18:40:16 +01:00
2012-06-10 16:52:33 +02:00
override def createExtension ( system : ExtendedActorSystem ) : Cluster = {
val clusterSettings = new ClusterSettings ( system . settings . config , system . name )
2012-06-20 14:14:10 +02:00
val failureDetector = {
import clusterSettings. { FailureDetectorImplementationClass ⇒ fqcn }
system . dynamicAccess . createInstanceFor [ FailureDetector ] (
fqcn , Seq ( classOf [ ActorSystem ] -> system , classOf [ ClusterSettings ] -> clusterSettings ) ) . fold (
e ⇒ throw new ConfigurationException ( "Could not create custom failure detector [" + fqcn + "] due to:" + e . toString ) ,
identity )
2012-06-10 16:52:33 +02:00
}
new Cluster ( system , failureDetector )
}
2012-02-22 18:40:16 +01:00
}
2012-02-14 20:50:12 +01:00
2012-04-14 20:06:03 +02:00
/* *
* Interface for the cluster JMX MBean .
*/
trait ClusterNodeMBean {
def getMemberStatus : String
def getClusterStatus : String
2012-04-16 11:23:03 +02:00
def getLeader : String
2012-04-14 20:06:03 +02:00
def isSingleton : Boolean
def isConvergence : Boolean
def isAvailable : Boolean
2012-06-16 00:00:19 +02:00
def isRunning : Boolean
2012-04-14 20:06:03 +02:00
def join ( address : String )
def leave ( address : String )
def down ( address : String )
}
2011-10-26 08:48:16 +02:00
/* *
* This module is responsible for Gossiping cluster information . The abstraction maintains the list of live
2012-01-30 11:41:41 +01:00
* and dead members . Periodically i . e . every 1 second this module chooses a random member and initiates a round
2011-10-26 08:48:16 +02:00
* of Gossip with it . Whenever it gets gossip updates it updates the Failure Detector with the liveness
* information .
* < p />
2012-01-30 11:41:41 +01:00
* During each of these runs the member initiates gossip exchange according to following rules ( as defined in the
2011-10-26 08:48:16 +02:00
* Cassandra documentation [ http :// wiki . apache . org / cassandra / ArchitectureGossip ] :
* < pre >
2012-01-30 11:41:41 +01:00
* 1 ) Gossip to random live member ( if any )
* 2 ) Gossip to random unreachable member with certain probability depending on number of unreachable and live members
2012-02-07 16:53:49 +01:00
* 3 ) If the member gossiped to at ( 1 ) was not deputy , or the number of live members is less than number of deputy list ,
* gossip to random deputy with certain probability depending on number of unreachable , deputy and live members .
2011-10-26 08:48:16 +02:00
* </ pre >
2012-02-29 10:02:00 +01:00
*
* Example :
* { { {
2012-04-12 12:25:39 +02:00
* if ( Cluster ( system ) . isLeader ) { . . . }
2012-02-29 10:02:00 +01:00
* } } }
2011-10-26 08:48:16 +02:00
*/
2012-06-10 16:52:33 +02:00
class Cluster ( system : ExtendedActorSystem , val failureDetector : FailureDetector ) extends Extension { clusterNode ⇒
2012-02-22 18:40:16 +01:00
2011-10-26 08:48:16 +02:00
/* *
2012-03-22 23:04:04 +01:00
* Represents the state for this Cluster . Implemented using optimistic lockless concurrency .
2012-03-15 23:00:20 +01:00
* All state is represented by this immutable case class and managed by an AtomicReference .
2011-10-26 08:48:16 +02:00
*/
private case class State (
2012-02-08 14:14:01 +01:00
latestGossip : Gossip ,
2012-02-09 15:59:10 +01:00
memberMembershipChangeListeners : Set [ MembershipChangeListener ] = Set . empty [ MembershipChangeListener ] )
2011-10-26 08:48:16 +02:00
2012-02-29 10:02:00 +01:00
if ( ! system . provider . isInstanceOf [ RemoteActorRefProvider ] )
throw new ConfigurationException ( "ActorSystem[" + system + "] needs to have a 'RemoteActorRefProvider' enabled in the configuration" )
private val remote : RemoteActorRefProvider = system . provider . asInstanceOf [ RemoteActorRefProvider ]
2012-01-30 19:40:28 +01:00
2012-03-02 16:20:30 +01:00
val remoteSettings = new RemoteSettings ( system . settings . config , system . name )
val clusterSettings = new ClusterSettings ( system . settings . config , system . name )
2012-06-05 14:13:28 +02:00
import clusterSettings._
2012-02-07 16:53:49 +01:00
2012-05-31 17:19:49 +02:00
val selfAddress = remote . transport . address
2012-06-11 21:12:57 +02:00
private val selfHeartbeat = Heartbeat ( selfAddress )
2012-03-09 12:56:56 +01:00
2012-05-31 17:19:49 +02:00
private val vclockNode = VectorClock . Node ( selfAddress . toString )
2012-02-07 16:53:49 +01:00
2012-02-29 10:02:00 +01:00
implicit private val defaultTimeout = Timeout ( remoteSettings . RemoteSystemDaemonAckTimeout )
2012-01-24 12:09:32 +01:00
2012-06-05 14:13:28 +02:00
private val nodeToJoin : Option [ Address ] = NodeToJoin filter ( _ != selfAddress )
2011-10-26 08:48:16 +02:00
2012-01-31 15:00:46 +01:00
private val serialization = remote . serialization
2012-06-16 00:00:19 +02:00
private val _isRunning = new AtomicBoolean ( true )
2012-02-20 15:45:50 +01:00
private val log = Logging ( system , "Node" )
2012-01-30 11:41:41 +01:00
2012-04-14 20:06:03 +02:00
private val mBeanServer = ManagementFactory . getPlatformMBeanServer
private val clusterMBeanName = new ObjectName ( "akka:type=Cluster" )
2012-05-31 17:19:49 +02:00
log . info ( "Cluster Node [{}] - is starting up..." , selfAddress )
2012-03-09 12:56:56 +01:00
2012-06-11 14:59:34 +02:00
// create supervisor for daemons under path "/system/cluster"
2012-03-02 16:20:30 +01:00
private val clusterDaemons = {
2012-06-05 22:16:15 +02:00
val createChild = CreateChild ( Props ( new ClusterDaemonSupervisor ( this ) ) , "cluster" )
2012-03-02 16:20:30 +01:00
Await . result ( system . systemGuardian ? createChild , defaultTimeout . duration ) match {
case a : ActorRef ⇒ a
case e : Exception ⇒ throw e
}
}
2012-02-20 15:26:12 +01:00
2012-02-08 14:14:01 +01:00
private val state = {
2012-06-13 15:33:38 +02:00
val member = Member ( selfAddress , Joining )
2012-06-12 16:15:05 +02:00
val versionedGossip = Gossip ( members = Gossip . emptyMembers + member ) : + vclockNode // add me as member and update my vector clock
2012-06-11 14:59:34 +02:00
val seenVersionedGossip = versionedGossip seen selfAddress
new AtomicReference [ State ] ( State ( seenVersionedGossip ) )
2012-02-08 14:14:01 +01:00
}
2011-10-26 08:48:16 +02:00
2012-02-08 14:14:01 +01:00
// try to join the node defined in the 'akka.cluster.node-to-join' option
2012-02-28 17:04:48 +01:00
autoJoin ( )
2012-01-24 12:09:32 +01:00
2012-03-09 12:56:56 +01:00
// ========================================================
// ===================== WORK DAEMONS =====================
// ========================================================
2012-06-12 14:16:30 +02:00
private val clusterScheduler : Scheduler with Closeable = {
if ( system . settings . SchedulerTickDuration > SchedulerTickDuration ) {
log . info ( "Using a dedicated scheduler for cluster. Default scheduler can be used if configured " +
"with 'akka.scheduler.tick-duration' [{} ms] <= 'akka.cluster.scheduler.tick-duration' [{} ms]." ,
system . settings . SchedulerTickDuration . toMillis , SchedulerTickDuration . toMillis )
2012-06-12 13:34:59 +02:00
val threadFactory = system . threadFactory match {
case tf : MonitorableThreadFactory ⇒ tf . copy ( name = tf . name + "-cluster-scheduler" )
case tf ⇒ tf
}
val hwt = new HashedWheelTimer ( log ,
threadFactory ,
SchedulerTickDuration , SchedulerTicksPerWheel )
new DefaultScheduler ( hwt , log , system . dispatcher )
2012-06-12 14:16:30 +02:00
} else {
// delegate to system.scheduler, but don't close
val systemScheduler = system . scheduler
new Scheduler with Closeable {
// we are using system.scheduler, which we are not responsible for closing
def close ( ) : Unit = ( )
def schedule ( initialDelay : Duration , frequency : Duration , receiver : ActorRef , message : Any ) : Cancellable =
systemScheduler . schedule ( initialDelay , frequency , receiver , message )
def schedule ( initialDelay : Duration , frequency : Duration ) ( f : ⇒ Unit ) : Cancellable =
systemScheduler . schedule ( initialDelay , frequency ) ( f )
def schedule ( initialDelay : Duration , frequency : Duration , runnable : Runnable ) : Cancellable =
systemScheduler . schedule ( initialDelay , frequency , runnable )
def scheduleOnce ( delay : Duration , runnable : Runnable ) : Cancellable =
systemScheduler . scheduleOnce ( delay , runnable )
def scheduleOnce ( delay : Duration , receiver : ActorRef , message : Any ) : Cancellable =
systemScheduler . scheduleOnce ( delay , receiver , message )
def scheduleOnce ( delay : Duration ) ( f : ⇒ Unit ) : Cancellable =
systemScheduler . scheduleOnce ( delay ) ( f )
}
}
2012-06-12 13:34:59 +02:00
}
2012-06-11 22:12:45 +02:00
2012-02-14 20:50:12 +01:00
// start periodic gossip to random nodes in cluster
2012-06-15 13:31:34 +02:00
private val gossipTask =
FixedRateTask ( clusterScheduler , PeriodicTasksInitialDelay . max ( GossipInterval ) , GossipInterval ) {
gossip ( )
}
2012-02-14 20:50:12 +01:00
2012-06-11 14:59:34 +02:00
// start periodic heartbeat to all nodes in cluster
2012-06-15 13:31:34 +02:00
private val heartbeatTask =
FixedRateTask ( clusterScheduler , PeriodicTasksInitialDelay . max ( HeartbeatInterval ) , HeartbeatInterval ) {
heartbeat ( )
}
2012-06-11 14:59:34 +02:00
2012-03-09 12:56:56 +01:00
// start periodic cluster failure detector reaping (moving nodes condemned by the failure detector to unreachable list)
2012-06-15 13:31:34 +02:00
private val failureDetectorReaperTask =
FixedRateTask ( clusterScheduler , PeriodicTasksInitialDelay . max ( UnreachableNodesReaperInterval ) , UnreachableNodesReaperInterval ) {
reapUnreachableMembers ( )
}
2012-01-24 12:09:32 +01:00
2012-03-09 12:56:56 +01:00
// start periodic leader action management (only applies for the current leader)
2012-06-15 13:31:34 +02:00
private val leaderActionsTask =
FixedRateTask ( clusterScheduler , PeriodicTasksInitialDelay . max ( LeaderActionsInterval ) , LeaderActionsInterval ) {
leaderActions ( )
}
2012-03-09 12:56:56 +01:00
2012-04-14 20:06:03 +02:00
createMBean ( )
2012-06-11 22:12:45 +02:00
system . registerOnTermination ( shutdown ( ) )
2012-05-31 17:19:49 +02:00
log . info ( "Cluster Node [{}] - has started up successfully" , selfAddress )
2012-03-09 12:56:56 +01:00
2012-02-14 20:50:12 +01:00
// ======================================================
// ===================== PUBLIC API =====================
// ======================================================
2012-06-01 16:49:50 +02:00
def self : Member = {
val gossip = latestGossip
gossip . members
. find ( _ . address == selfAddress )
. getOrElse {
gossip . overview . unreachable
. find ( _ . address == selfAddress )
. getOrElse ( throw new IllegalStateException ( "Can't find 'this' Member [" + selfAddress + "] in the cluster membership ring or in the unreachable set" ) )
}
}
2012-03-09 12:56:56 +01:00
2012-06-16 00:00:19 +02:00
/* *
* Returns true if the cluster node is up and running , false if it is shut down .
*/
def isRunning : Boolean = _isRunning . get
2012-02-14 20:50:12 +01:00
/* *
* Latest gossip .
*/
def latestGossip : Gossip = state . get . latestGossip
/* *
2012-06-19 14:21:56 +02:00
* Member status for this node ( `MemberStatus` ) .
*
* NOTE : If the node has been removed from the cluster ( and shut down ) then it ' s status is set to the ' REMOVED ' tombstone state
* and is no longer present in the node ring or any other part of the gossiping state . However in order to maintain the
* model and the semantics the user would expect , this method will in this situation return `MemberStatus.Removed` .
2012-02-14 20:50:12 +01:00
*/
2012-06-16 00:00:19 +02:00
def status : MemberStatus = {
if ( isRunning ) self . status
else MemberStatus . Removed
}
2012-02-14 20:50:12 +01:00
2012-02-20 17:22:07 +01:00
/* *
* Is this node the leader ?
*/
def isLeader : Boolean = {
2012-03-09 12:56:56 +01:00
val members = latestGossip . members
2012-05-31 17:19:49 +02:00
members . nonEmpty && ( selfAddress == members . head . address )
2012-02-20 17:22:07 +01:00
}
2012-04-16 11:23:03 +02:00
/* *
* Get the address of the current leader .
*/
def leader : Address = latestGossip . members . head . address
2012-02-14 20:50:12 +01:00
/* *
* Is this node a singleton cluster ?
*/
def isSingletonCluster : Boolean = isSingletonCluster ( state . get )
2011-01-01 01:50:33 +01:00
/* *
* Checks if we have a cluster convergence .
*
2012-06-05 18:19:46 +02:00
* @return Some ( convergedGossip ) if convergence have been reached and None if not
2011-01-01 01:50:33 +01:00
*/
def convergence : Option [ Gossip ] = convergence ( latestGossip )
2012-03-09 12:56:56 +01:00
/* *
* Returns true if the node is UP or JOINING .
*/
def isAvailable : Boolean = ! isUnavailable ( state . get )
2011-01-01 01:50:33 +01:00
/* *
* Registers a listener to subscribe to cluster membership changes .
*/
@tailrec
2012-05-31 14:38:44 +02:00
final def registerListener ( listener : MembershipChangeListener ) : Unit = {
2011-01-01 01:50:33 +01:00
val localState = state . get
val newListeners = localState . memberMembershipChangeListeners + listener
val newState = localState copy ( memberMembershipChangeListeners = newListeners )
if ( ! state . compareAndSet ( localState , newState ) ) registerListener ( listener ) // recur
}
/* *
* Unsubscribes to cluster membership changes .
*/
@tailrec
2012-05-31 14:38:44 +02:00
final def unregisterListener ( listener : MembershipChangeListener ) : Unit = {
2011-01-01 01:50:33 +01:00
val localState = state . get
val newListeners = localState . memberMembershipChangeListeners - listener
val newState = localState copy ( memberMembershipChangeListeners = newListeners )
if ( ! state . compareAndSet ( localState , newState ) ) unregisterListener ( listener ) // recur
}
2012-02-29 10:02:00 +01:00
/* *
2012-04-12 22:50:50 +02:00
* Try to join this cluster node with the node specified by 'address' .
2012-04-16 11:23:03 +02:00
* A 'Join( thisNodeAddress ) ' command is sent to the node to join .
2012-02-29 10:02:00 +01:00
*/
2012-05-31 14:38:44 +02:00
def join ( address : Address ) : Unit = {
2012-04-12 22:50:50 +02:00
val connection = clusterCommandConnectionFor ( address )
2012-06-08 11:51:34 +02:00
val command = ClusterUserAction . Join ( selfAddress )
2012-05-31 17:19:49 +02:00
log . info ( "Cluster Node [{}] - Trying to send JOIN to [{}] through connection [{}]" , selfAddress , address , connection )
2012-04-12 22:50:50 +02:00
connection ! command
2012-02-29 10:02:00 +01:00
}
/* *
2012-04-12 22:50:50 +02:00
* Send command to issue state transition to LEAVING for the node specified by 'address' .
2012-02-29 10:02:00 +01:00
*/
2012-05-31 14:38:44 +02:00
def leave ( address : Address ) : Unit = {
2012-06-08 11:51:34 +02:00
clusterCommandDaemon ! ClusterUserAction . Leave ( address )
2012-02-29 10:02:00 +01:00
}
/* *
2012-06-08 11:51:34 +02:00
* Send command to DOWN the node specified by 'address' .
2012-02-29 10:02:00 +01:00
*/
2012-05-31 14:38:44 +02:00
def down ( address : Address ) : Unit = {
2012-06-08 11:51:34 +02:00
clusterCommandDaemon ! ClusterUserAction . Down ( address )
2012-02-29 10:02:00 +01:00
}
2011-01-01 01:50:33 +01:00
// ========================================================
// ===================== INTERNAL API =====================
// ========================================================
2012-02-08 14:14:01 +01:00
/* *
2012-06-16 00:00:19 +02:00
* INTERNAL API .
*
* Shuts down all connections to other members , the cluster daemon and the periodic gossip and cleanup tasks .
*
* Should not called by the user . The user can issue a LEAVE command which will tell the node
* to go through graceful handoff process `LEAVE -> EXITING -> REMOVED -> SHUTDOWN` .
*/
private [ cluster ] def shutdown ( ) : Unit = {
if ( _isRunning . compareAndSet ( true , false ) ) {
log . info ( "Cluster Node [{}] - Shutting down cluster Node and cluster daemons..." , selfAddress )
// cancel the periodic tasks, note that otherwise they will be run when scheduler is shutdown
gossipTask . cancel ( )
heartbeatTask . cancel ( )
failureDetectorReaperTask . cancel ( )
leaderActionsTask . cancel ( )
clusterScheduler . close ( )
// FIXME isTerminated check can be removed when ticket #2221 is fixed
// now it prevents logging if system is shutdown (or in progress of shutdown)
if ( ! clusterDaemons . isTerminated )
system . stop ( clusterDaemons )
try {
mBeanServer . unregisterMBean ( clusterMBeanName )
} catch {
case e : InstanceNotFoundException ⇒ // ignore - we are running multiple cluster nodes in the same JVM (probably for testing)
}
log . info ( "Cluster Node [{}] - Cluster node successfully shut down" , selfAddress )
}
}
/* *
* INTERNAL API .
*
* State transition to JOINING - new node joining .
2012-02-08 14:14:01 +01:00
*/
@tailrec
2012-05-31 14:38:44 +02:00
private [ cluster ] final def joining ( node : Address ) : Unit = {
2012-02-14 20:50:12 +01:00
val localState = state . get
val localGossip = localState . latestGossip
val localMembers = localGossip . members
2012-06-13 09:37:10 +02:00
val localUnreachable = localGossip . overview . unreachable
val alreadyMember = localMembers . exists ( _ . address == node )
2012-06-16 00:00:19 +02:00
val isUnreachable = localUnreachable . exists { m ⇒ m . address == node && m . status != Down }
2012-03-09 12:56:56 +01:00
2012-06-13 09:37:10 +02:00
if ( ! alreadyMember && ! isUnreachable ) {
2012-02-08 16:15:31 +01:00
2012-06-07 15:45:10 +02:00
// remove the node from the 'unreachable' set in case it is a DOWN node that is rejoining cluster
2012-06-13 09:37:10 +02:00
val newUnreachableMembers = localUnreachable filterNot { _ . address == node }
2012-06-07 15:45:10 +02:00
val newOverview = localGossip . overview copy ( unreachable = newUnreachableMembers )
2012-02-14 20:50:12 +01:00
2012-06-13 15:33:38 +02:00
val newMembers = localMembers + Member ( node , Joining ) // add joining node as Joining
2012-06-07 15:45:10 +02:00
val newGossip = localGossip copy ( overview = newOverview , members = newMembers )
2012-02-14 20:50:12 +01:00
2012-06-12 16:15:05 +02:00
val versionedGossip = newGossip : + vclockNode
2012-06-07 15:45:10 +02:00
val seenVersionedGossip = versionedGossip seen selfAddress
2012-02-14 20:50:12 +01:00
2012-06-07 15:45:10 +02:00
val newState = localState copy ( latestGossip = seenVersionedGossip )
2012-05-31 17:19:49 +02:00
2012-06-07 15:45:10 +02:00
if ( ! state . compareAndSet ( localState , newState ) ) joining ( node ) // recur if we failed update
else {
2012-06-16 00:00:19 +02:00
log . info ( "Cluster Node [{}] - Node [{}] is JOINING" , selfAddress , node )
2012-06-11 14:59:34 +02:00
// treat join as initial heartbeat, so that it becomes unavailable if nothing more happens
2012-06-07 15:45:10 +02:00
if ( node != selfAddress ) failureDetector heartbeat node
2012-06-08 08:59:05 +02:00
notifyMembershipChangeListeners ( localState , newState )
2012-02-18 22:14:53 +01:00
}
}
2012-02-08 14:14:01 +01:00
}
2012-01-24 12:09:32 +01:00
2012-02-28 17:04:48 +01:00
/* *
2012-06-16 00:00:19 +02:00
* INTERNAL API .
*
2012-02-28 17:04:48 +01:00
* State transition to LEAVING .
*/
2012-06-01 16:49:50 +02:00
@tailrec
private [ cluster ] final def leaving ( address : Address ) {
val localState = state . get
val localGossip = localState . latestGossip
2012-06-18 13:53:49 +02:00
if ( localGossip . members . exists ( _ . address == address ) ) { // only try to update if the node is available (in the member ring)
val newMembers = localGossip . members map { member ⇒ if ( member . address == address ) Member ( address , Leaving ) else member } // mark node as LEAVING
val newGossip = localGossip copy ( members = newMembers )
2012-06-01 16:49:50 +02:00
2012-06-18 13:53:49 +02:00
val versionedGossip = newGossip : + vclockNode
val seenVersionedGossip = versionedGossip seen selfAddress
2012-06-01 16:49:50 +02:00
2012-06-18 13:53:49 +02:00
val newState = localState copy ( latestGossip = seenVersionedGossip )
2012-06-01 16:49:50 +02:00
2012-06-18 13:53:49 +02:00
if ( ! state . compareAndSet ( localState , newState ) ) leaving ( address ) // recur if we failed update
else {
log . info ( "Cluster Node [{}] - Marked address [{}] as LEAVING" , selfAddress , address )
notifyMembershipChangeListeners ( localState , newState )
}
2012-06-01 16:49:50 +02:00
}
2012-03-09 12:56:56 +01:00
}
2012-02-28 17:04:48 +01:00
/* *
2012-06-16 00:00:19 +02:00
* INTERNAL API .
*
2012-02-28 17:04:48 +01:00
* State transition to EXITING .
*/
2012-05-31 14:38:44 +02:00
private [ cluster ] final def exiting ( address : Address ) : Unit = {
2012-06-16 00:00:19 +02:00
log . info ( "Cluster Node [{}] - Marked node [{}] as EXITING" , selfAddress , address )
2012-06-08 11:51:34 +02:00
// FIXME implement when we implement hand-off
2012-03-09 12:56:56 +01:00
}
2012-02-28 17:04:48 +01:00
/* *
2012-06-16 00:00:19 +02:00
* INTERNAL API .
*
2012-02-28 17:04:48 +01:00
* State transition to REMOVED .
2012-06-16 00:00:19 +02:00
*
* This method is for now only called after the LEADER have sent a Removed message - telling the node
* to shut down himself .
*
* In the future we might change this to allow the USER to send a Removed ( address ) message telling an
* arbitrary node to be moved direcly from UP -> REMOVED .
2012-02-28 17:04:48 +01:00
*/
2012-05-31 14:38:44 +02:00
private [ cluster ] final def removing ( address : Address ) : Unit = {
2012-06-08 11:51:34 +02:00
log . info ( "Cluster Node [{}] - Node has been REMOVED by the leader - shutting down..." , selfAddress )
shutdown ( )
2012-03-09 12:56:56 +01:00
}
2012-02-28 17:04:48 +01:00
/* *
2012-06-16 00:00:19 +02:00
* INTERNAL API .
*
2012-06-13 09:37:10 +02:00
* The node to DOWN is removed from the 'members' set and put in the 'unreachable' set ( if not already there )
* and its status is set to DOWN . The node is also removed from the 'seen' table .
2012-03-09 12:56:56 +01:00
*
* The node will reside as DOWN in the 'unreachable' set until an explicit command JOIN command is sent directly
* to this node and it will then go through the normal JOINING procedure .
2012-02-28 17:04:48 +01:00
*/
2012-03-09 12:56:56 +01:00
@tailrec
2012-05-31 14:38:44 +02:00
final private [ cluster ] def downing ( address : Address ) : Unit = {
2012-03-09 12:56:56 +01:00
val localState = state . get
val localGossip = localState . latestGossip
val localMembers = localGossip . members
val localOverview = localGossip . overview
val localSeen = localOverview . seen
val localUnreachableMembers = localOverview . unreachable
// 1. check if the node to DOWN is in the 'members' set
2012-06-13 11:04:27 +02:00
val downedMember : Option [ Member ] = localMembers . collectFirst {
2012-06-13 15:33:38 +02:00
case m if m . address == address ⇒ m . copy ( status = Down )
2012-06-13 11:04:27 +02:00
}
2012-06-13 09:37:10 +02:00
val newMembers = downedMember match {
case Some ( m ) ⇒
log . info ( "Cluster Node [{}] - Marking node [{}] as DOWN" , selfAddress , m . address )
localMembers - m
case None ⇒ localMembers
}
2012-03-09 12:56:56 +01:00
// 2. check if the node to DOWN is in the 'unreachable' set
val newUnreachableMembers =
2012-06-13 09:37:10 +02:00
localUnreachableMembers . map { member ⇒
// no need to DOWN members already DOWN
2012-06-13 15:33:38 +02:00
if ( member . address == address && member . status != Down ) {
2012-06-13 09:37:10 +02:00
log . info ( "Cluster Node [{}] - Marking unreachable node [{}] as DOWN" , selfAddress , member . address )
2012-06-13 15:33:38 +02:00
member copy ( status = Down )
2012-06-13 09:37:10 +02:00
} else member
}
2012-03-09 12:56:56 +01:00
// 3. add the newly DOWNED members from the 'members' (in step 1.) to the 'newUnreachableMembers' set.
2012-06-13 09:37:10 +02:00
val newUnreachablePlusNewlyDownedMembers = newUnreachableMembers ++ downedMember
2012-03-09 12:56:56 +01:00
// 4. remove nodes marked as DOWN from the 'seen' table
2012-06-13 09:37:10 +02:00
val newSeen = localSeen -- newUnreachablePlusNewlyDownedMembers . collect {
2012-06-13 15:33:38 +02:00
case m if m . status == Down ⇒ m . address
2012-03-09 12:56:56 +01:00
}
2012-06-13 09:37:10 +02:00
// update gossip overview
val newOverview = localOverview copy ( seen = newSeen , unreachable = newUnreachablePlusNewlyDownedMembers )
2012-03-09 12:56:56 +01:00
val newGossip = localGossip copy ( overview = newOverview , members = newMembers ) // update gossip
2012-06-12 16:15:05 +02:00
val versionedGossip = newGossip : + vclockNode
2012-05-31 17:19:49 +02:00
val newState = localState copy ( latestGossip = versionedGossip seen selfAddress )
2012-03-09 12:56:56 +01:00
if ( ! state . compareAndSet ( localState , newState ) ) downing ( address ) // recur if we fail the update
else {
2012-06-07 15:14:38 +02:00
notifyMembershipChangeListeners ( localState , newState )
2012-03-09 12:56:56 +01:00
}
}
2012-02-28 17:04:48 +01:00
2011-10-26 08:48:16 +02:00
/* *
2012-06-16 00:00:19 +02:00
* INTERNAL API .
*
2012-02-08 14:14:01 +01:00
* Receive new gossip .
2011-10-26 08:48:16 +02:00
*/
2012-02-09 15:59:10 +01:00
@tailrec
2012-06-11 14:59:34 +02:00
final private [ cluster ] def receiveGossip ( from : Address , remoteGossip : Gossip ) : Unit = {
2012-02-14 20:50:12 +01:00
val localState = state . get
val localGossip = localState . latestGossip
2012-02-09 15:59:10 +01:00
2012-02-14 20:50:12 +01:00
val winningGossip =
if ( remoteGossip . version <> localGossip . version ) {
// concurrent
2012-03-09 12:56:56 +01:00
val mergedGossip = remoteGossip merge localGossip
2012-06-12 16:15:05 +02:00
val versionedMergedGossip = mergedGossip : + vclockNode
2012-02-09 15:59:10 +01:00
2012-06-16 00:00:19 +02:00
log . debug (
"""Can't establish a causal relationship between "remote" gossip and "local" gossip - Remote[{}] - Local[{}] - merging them into [{}]""" ,
2012-02-14 20:50:12 +01:00
remoteGossip , localGossip , versionedMergedGossip )
versionedMergedGossip
} else if ( remoteGossip . version < localGossip . version ) {
// local gossip is newer
localGossip
} else {
// remote gossip is newer
remoteGossip
}
2012-05-31 17:19:49 +02:00
val newState = localState copy ( latestGossip = winningGossip seen selfAddress )
2012-02-09 15:59:10 +01:00
// if we won the race then update else try again
2012-06-11 14:59:34 +02:00
if ( ! state . compareAndSet ( localState , newState ) ) receiveGossip ( from , remoteGossip ) // recur if we fail the update
2012-02-18 22:14:53 +01:00
else {
2012-06-11 14:59:34 +02:00
log . debug ( "Cluster Node [{}] - Receiving gossip from [{}]" , selfAddress , from )
2012-06-07 15:14:38 +02:00
notifyMembershipChangeListeners ( localState , newState )
2012-02-18 22:14:53 +01:00
}
2011-10-26 08:48:16 +02:00
}
2012-01-24 12:09:32 +01:00
/* *
2012-06-16 00:00:19 +02:00
* INTERNAL API .
2012-01-24 12:09:32 +01:00
*/
2012-06-11 21:12:57 +02:00
private [ cluster ] def receiveHeartbeat ( from : Address ) : Unit = failureDetector heartbeat from
2012-01-24 12:09:32 +01:00
2012-02-09 15:59:10 +01:00
/* *
2012-05-31 14:38:44 +02:00
* Joins the pre - configured contact point .
2012-02-09 15:59:10 +01:00
*/
2012-05-31 14:38:44 +02:00
private def autoJoin ( ) : Unit = nodeToJoin foreach join
2012-02-08 15:11:06 +01:00
2011-10-26 08:48:16 +02:00
/* *
2012-06-16 00:00:19 +02:00
* INTERNAL API .
2012-06-05 22:16:15 +02:00
*
2012-02-09 15:59:10 +01:00
* Gossips latest gossip to an address .
2012-02-08 14:14:01 +01:00
*/
2012-06-16 00:00:19 +02:00
private [ cluster ] def gossipTo ( address : Address ) : Unit = {
2012-02-20 15:26:12 +01:00
val connection = clusterGossipConnectionFor ( address )
2012-05-31 17:19:49 +02:00
log . debug ( "Cluster Node [{}] - Gossiping to [{}]" , selfAddress , connection )
2012-06-11 14:59:34 +02:00
connection ! GossipEnvelope ( selfAddress , latestGossip )
2012-02-08 14:14:01 +01:00
}
/* *
* Gossips latest gossip to a random member in the set of members passed in as argument .
2012-01-24 12:09:32 +01:00
*
2012-06-05 22:16:15 +02:00
* @return the used [ [ akka . actor . Address ] if any
2011-10-26 08:48:16 +02:00
*/
2012-06-05 22:16:15 +02:00
private def gossipToRandomNodeOf ( addresses : IndexedSeq [ Address ] ) : Option [ Address ] = {
2012-05-31 17:19:49 +02:00
log . debug ( "Cluster Node [{}] - Selecting random node to gossip to [{}]" , selfAddress , addresses . mkString ( ", " ) )
2012-06-05 22:16:15 +02:00
val peers = addresses filterNot ( _ == selfAddress ) // filter out myself
val peer = selectRandomNode ( peers )
peer foreach gossipTo
peer
}
/* *
2012-06-16 00:00:19 +02:00
* INTERNAL API .
2012-06-05 22:16:15 +02:00
*/
2012-06-16 00:00:19 +02:00
private [ cluster ] def gossipToUnreachableProbablity ( membersSize : Int , unreachableSize : Int ) : Double =
2012-06-05 22:16:15 +02:00
( membersSize + unreachableSize ) match {
case 0 ⇒ 0.0
case sum ⇒ unreachableSize . toDouble / sum
}
/* *
2012-06-16 00:00:19 +02:00
* INTERNAL API .
2012-06-05 22:16:15 +02:00
*/
2012-06-16 00:00:19 +02:00
private [ cluster ] def gossipToDeputyProbablity ( membersSize : Int , unreachableSize : Int , nrOfDeputyNodes : Int ) : Double = {
2012-06-05 22:16:15 +02:00
if ( nrOfDeputyNodes > membersSize ) 1.0
else if ( nrOfDeputyNodes == 0 ) 0.0
else ( membersSize + unreachableSize ) match {
case 0 ⇒ 0.0
case sum ⇒ ( nrOfDeputyNodes + unreachableSize ) . toDouble / sum
2012-03-09 12:56:56 +01:00
}
}
/* *
2012-06-16 00:00:19 +02:00
* INTERNAL API .
2012-06-05 22:16:15 +02:00
*
2012-03-09 12:56:56 +01:00
* Initates a new round of gossip .
*/
2012-06-16 00:00:19 +02:00
private [ cluster ] def gossip ( ) : Unit = {
2012-03-09 12:56:56 +01:00
val localState = state . get
2012-06-07 15:14:38 +02:00
log . debug ( "Cluster Node [{}] - Initiating new round of gossip" , selfAddress )
2012-06-11 14:59:34 +02:00
if ( ! isSingletonCluster ( localState ) && isAvailable ( localState ) ) {
2012-03-09 12:56:56 +01:00
val localGossip = localState . latestGossip
2012-06-05 22:16:15 +02:00
// important to not accidentally use `map` of the SortedSet, since the original order is not preserved
val localMembers = localGossip . members . toIndexedSeq
2012-03-09 12:56:56 +01:00
val localMembersSize = localMembers . size
2012-06-05 22:16:15 +02:00
val localMemberAddresses = localMembers map { _ . address }
2012-03-09 12:56:56 +01:00
2012-06-05 22:16:15 +02:00
val localUnreachableMembers = localGossip . overview . unreachable . toIndexedSeq
2012-03-09 12:56:56 +01:00
val localUnreachableSize = localUnreachableMembers . size
// 1. gossip to alive members
2012-06-05 22:16:15 +02:00
val gossipedToAlive = gossipToRandomNodeOf ( localMemberAddresses )
2012-03-09 12:56:56 +01:00
// 2. gossip to unreachable members
if ( localUnreachableSize > 0 ) {
2012-06-05 22:16:15 +02:00
val probability = gossipToUnreachableProbablity ( localMembersSize , localUnreachableSize )
if ( ThreadLocalRandom . current . nextDouble ( ) < probability )
gossipToRandomNodeOf ( localUnreachableMembers . map ( _ . address ) )
2012-03-09 12:56:56 +01:00
}
// 3. gossip to a deputy nodes for facilitating partition healing
2012-06-05 22:16:15 +02:00
val deputies = deputyNodes ( localMemberAddresses )
val alreadyGossipedToDeputy = gossipedToAlive . map ( deputies . contains ( _ ) ) . getOrElse ( false )
if ( ( ! alreadyGossipedToDeputy || localMembersSize < NrOfDeputyNodes ) && deputies . nonEmpty ) {
val probability = gossipToDeputyProbablity ( localMembersSize , localUnreachableSize , NrOfDeputyNodes )
if ( ThreadLocalRandom . current . nextDouble ( ) < probability )
gossipToRandomNodeOf ( deputies )
2012-03-09 12:56:56 +01:00
}
}
2011-10-26 08:48:16 +02:00
}
2012-06-11 14:59:34 +02:00
/* *
2012-06-16 00:00:19 +02:00
* INTERNAL API .
2012-06-11 14:59:34 +02:00
*/
2012-06-16 00:00:19 +02:00
private [ cluster ] def heartbeat ( ) : Unit = {
2012-06-11 14:59:34 +02:00
val localState = state . get
if ( ! isSingletonCluster ( localState ) ) {
val liveMembers = localState . latestGossip . members . toIndexedSeq
2012-06-11 21:12:57 +02:00
for ( member ← liveMembers ; if member . address != selfAddress ) {
2012-06-11 14:59:34 +02:00
val connection = clusterGossipConnectionFor ( member . address )
log . debug ( "Cluster Node [{}] - Heartbeat to [{}]" , selfAddress , connection )
2012-06-11 21:12:57 +02:00
connection ! selfHeartbeat
2012-06-11 14:59:34 +02:00
}
}
}
2012-02-07 16:53:49 +01:00
/* *
2012-06-16 00:00:19 +02:00
* INTERNAL API .
2012-06-05 22:16:15 +02:00
*
2012-03-09 12:56:56 +01:00
* Reaps the unreachable members ( moves them to the 'unreachable' list in the cluster overview ) according to the failure detector 's verdict .
2011-10-26 08:48:16 +02:00
*/
@tailrec
2012-06-16 00:00:19 +02:00
final private [ cluster ] def reapUnreachableMembers ( ) : Unit = {
2012-02-14 20:50:12 +01:00
val localState = state . get
2012-03-09 12:56:56 +01:00
if ( ! isSingletonCluster ( localState ) && isAvailable ( localState ) ) {
// only scrutinize if we are a non-singleton cluster and available
2012-02-14 20:50:12 +01:00
val localGossip = localState . latestGossip
val localOverview = localGossip . overview
val localMembers = localGossip . members
2012-03-09 12:56:56 +01:00
val localUnreachableMembers = localGossip . overview . unreachable
2012-02-14 20:50:12 +01:00
val newlyDetectedUnreachableMembers = localMembers filterNot { member ⇒ failureDetector . isAvailable ( member . address ) }
2012-02-09 15:59:10 +01:00
2012-05-31 17:19:49 +02:00
if ( newlyDetectedUnreachableMembers . nonEmpty ) { // we have newly detected members marked as unavailable
2012-02-08 16:15:31 +01:00
2012-06-13 09:37:10 +02:00
val newMembers = localMembers -- newlyDetectedUnreachableMembers
val newUnreachableMembers = localUnreachableMembers ++ newlyDetectedUnreachableMembers
2012-02-20 17:22:07 +01:00
2012-03-09 12:56:56 +01:00
val newOverview = localOverview copy ( unreachable = newUnreachableMembers )
2012-02-14 20:50:12 +01:00
val newGossip = localGossip copy ( overview = newOverview , members = newMembers )
2012-03-09 12:56:56 +01:00
// updating vclock and 'seen' table
2012-06-12 16:15:05 +02:00
val versionedGossip = newGossip : + vclockNode
2012-05-31 17:19:49 +02:00
val seenVersionedGossip = versionedGossip seen selfAddress
2012-02-14 20:50:12 +01:00
val newState = localState copy ( latestGossip = seenVersionedGossip )
2012-02-08 16:15:31 +01:00
// if we won the race then update else try again
2012-03-09 12:56:56 +01:00
if ( ! state . compareAndSet ( localState , newState ) ) reapUnreachableMembers ( ) // recur
2012-02-08 16:15:31 +01:00
else {
2012-05-31 17:19:49 +02:00
log . info ( "Cluster Node [{}] - Marking node(s) as UNREACHABLE [{}]" , selfAddress , newlyDetectedUnreachableMembers . mkString ( ", " ) )
2012-03-02 09:55:54 +01:00
2012-06-07 15:14:38 +02:00
notifyMembershipChangeListeners ( localState , newState )
2012-02-08 16:15:31 +01:00
}
2011-10-26 08:48:16 +02:00
}
}
}
2012-02-18 22:14:53 +01:00
/* *
2012-06-16 00:00:19 +02:00
* INTERNAL API .
2012-06-05 22:16:15 +02:00
*
2012-03-09 12:56:56 +01:00
* Runs periodic leader actions , such as auto - downing unreachable nodes , assigning partitions etc .
*/
@tailrec
2012-06-16 00:00:19 +02:00
final private [ cluster ] def leaderActions ( ) : Unit = {
2012-03-09 12:56:56 +01:00
val localState = state . get
val localGossip = localState . latestGossip
val localMembers = localGossip . members
2012-05-31 17:19:49 +02:00
val isLeader = localMembers . nonEmpty && ( selfAddress == localMembers . head . address )
2012-03-09 12:56:56 +01:00
if ( isLeader && isAvailable ( localState ) ) {
// only run the leader actions if we are the LEADER and available
val localOverview = localGossip . overview
val localSeen = localOverview . seen
2012-05-31 14:38:44 +02:00
val localUnreachableMembers = localOverview . unreachable
2012-06-18 13:53:49 +02:00
val hasPartionHandoffCompletedSuccessfully : Boolean = {
// FIXME implement partion handoff and a check if it is completed - now just returns TRUE - e.g. has completed successfully
true
}
2012-03-09 12:56:56 +01:00
// Leader actions are as follows:
2012-06-16 00:00:19 +02:00
// 1. Move EXITING => REMOVED -- When all nodes have seen that the node is EXITING (convergence) - remove the nodes from the node ring and seen table
2012-06-08 11:51:34 +02:00
// 2. Move JOINING => UP -- When a node joins the cluster
2012-06-01 16:49:50 +02:00
// 3. Move LEAVING => EXITING -- When all partition handoff has completed
// 4. Move UNREACHABLE => DOWN -- When the node is in the UNREACHABLE set it can be auto-down by leader
2012-06-18 13:53:49 +02:00
// 5. Store away all stuff needed for the side-effecting processing in 10.
// 6. Updating the vclock version for the changes
// 7. Updating the 'seen' table
// 8. Try to update the state with the new gossip
// 9. If failure - retry
// 10. If success - run all the side-effecting processing
val (
newGossip : Gossip ,
hasChangedState : Boolean ,
2012-06-18 15:25:17 +02:00
upMembers ,
exitingMembers ,
removedMembers ,
unreachableButNotDownedMembers ) =
2012-03-09 12:56:56 +01:00
if ( convergence ( localGossip ) . isDefined ) {
// we have convergence - so we can't have unreachable nodes
2012-06-18 13:53:49 +02:00
// transform the node member ring - filterNot/map/map
2012-03-09 12:56:56 +01:00
val newMembers =
2012-06-18 13:53:49 +02:00
localMembers filterNot { member ⇒
// ----------------------
// 1. Move EXITING => REMOVED - e.g. remove the nodes from the 'members' set/node ring and seen table
// ----------------------
member . status == MemberStatus . Exiting
2012-06-01 16:49:50 +02:00
2012-03-09 12:56:56 +01:00
} map { member ⇒
2012-06-01 16:49:50 +02:00
// ----------------------
2012-06-16 00:00:19 +02:00
// 2. Move JOINING => UP (once all nodes have seen that this node is JOINING e.g. we have a convergence)
2012-06-01 16:49:50 +02:00
// ----------------------
2012-06-18 13:53:49 +02:00
if ( member . status == Joining ) member copy ( status = Up )
else member
2012-06-01 16:49:50 +02:00
} map { member ⇒
// ----------------------
// 3. Move LEAVING => EXITING (once we have a convergence on LEAVING *and* if we have a successful partition handoff)
// ----------------------
2012-06-18 13:53:49 +02:00
if ( member . status == Leaving && hasPartionHandoffCompletedSuccessfully ) member copy ( status = Exiting )
else member
2012-03-09 12:56:56 +01:00
}
2012-06-08 11:51:34 +02:00
2012-06-18 13:53:49 +02:00
// ----------------------
// 5. Store away all stuff needed for the side-effecting processing in 10.
// ----------------------
// Check for the need to do side-effecting on successful state change
// Repeat the checking for transitions between JOINING -> UP, LEAVING -> EXITING, EXITING -> REMOVED
// to check for state-changes and to store away removed and exiting members for later notification
// 1. check for state-changes to update
// 2. store away removed and exiting members so we can separate the pure state changes (that can be retried on collision) and the side-effecting message sending
val ( removedMembers , newMembers1 ) = localMembers partition ( _ . status == Exiting )
val ( upMembers , newMembers2 ) = newMembers1 partition ( _ . status == Joining )
val ( exitingMembers , newMembers3 ) = newMembers2 partition ( _ . status == Leaving && hasPartionHandoffCompletedSuccessfully )
val hasChangedState = removedMembers . nonEmpty || upMembers . nonEmpty || exitingMembers . nonEmpty
2012-06-16 00:00:19 +02:00
// removing REMOVED nodes from the 'seen' table
2012-06-18 13:53:49 +02:00
val newSeen = localSeen -- removedMembers . map ( _ . address )
2012-06-16 00:00:19 +02:00
// removing REMOVED nodes from the 'unreachable' set
2012-06-18 13:53:49 +02:00
val newUnreachableMembers = localUnreachableMembers -- removedMembers
2012-06-16 00:00:19 +02:00
val newOverview = localOverview copy ( seen = newSeen , unreachable = newUnreachableMembers ) // update gossip overview
2012-06-18 13:53:49 +02:00
val newGossip = localGossip copy ( members = newMembers , overview = newOverview ) // update gossip
( newGossip , hasChangedState , upMembers , exitingMembers , removedMembers , Set . empty [ Member ] )
2012-03-09 12:56:56 +01:00
2012-06-05 14:13:28 +02:00
} else if ( AutoDown ) {
2012-03-09 12:56:56 +01:00
// we don't have convergence - so we might have unreachable nodes
2012-06-18 13:53:49 +02:00
2012-03-09 12:56:56 +01:00
// if 'auto-down' is turned on, then try to auto-down any unreachable nodes
2012-06-18 13:53:49 +02:00
val newUnreachableMembers = localUnreachableMembers . map { member ⇒
// ----------------------
// 5. Move UNREACHABLE => DOWN (auto-downing by leader)
// ----------------------
if ( member . status == Down ) member // no need to DOWN members already DOWN
else member copy ( status = Down )
}
2012-03-09 12:56:56 +01:00
2012-06-18 13:53:49 +02:00
// Check for the need to do side-effecting on successful state change
val ( unreachableButNotDownedMembers , _ ) = localUnreachableMembers partition ( _ . status != Down )
2012-03-09 12:56:56 +01:00
// removing nodes marked as DOWN from the 'seen' table
2012-06-16 00:00:19 +02:00
val newSeen = localSeen -- newUnreachableMembers . collect { case m if m . status == Down ⇒ m . address }
2012-03-09 12:56:56 +01:00
val newOverview = localOverview copy ( seen = newSeen , unreachable = newUnreachableMembers ) // update gossip overview
2012-06-18 13:53:49 +02:00
val newGossip = localGossip copy ( overview = newOverview ) // update gossip
2012-03-09 12:56:56 +01:00
2012-06-18 13:53:49 +02:00
( newGossip , unreachableButNotDownedMembers . nonEmpty , Set . empty [ Member ] , Set . empty [ Member ] , Set . empty [ Member ] , unreachableButNotDownedMembers )
2012-03-09 12:56:56 +01:00
2012-06-18 13:53:49 +02:00
} else ( localGossip , false , Set . empty [ Member ] , Set . empty [ Member ] , Set . empty [ Member ] , Set . empty [ Member ] )
2012-03-09 12:56:56 +01:00
2012-06-18 13:53:49 +02:00
if ( hasChangedState ) { // we have a change of state - version it and try to update
2012-06-01 16:49:50 +02:00
// ----------------------
2012-06-18 13:53:49 +02:00
// 6. Updating the vclock version for the changes
2012-06-01 16:49:50 +02:00
// ----------------------
2012-06-12 16:15:05 +02:00
val versionedGossip = newGossip : + vclockNode
2012-03-09 12:56:56 +01:00
2012-06-01 16:49:50 +02:00
// ----------------------
2012-06-18 13:53:49 +02:00
// 7. Updating the 'seen' table
2012-06-16 00:00:19 +02:00
// Unless the leader (this node) is part of the removed members, i.e. the leader have moved himself from EXITING -> REMOVED
2012-06-01 16:49:50 +02:00
// ----------------------
2012-06-16 00:00:19 +02:00
val seenVersionedGossip =
if ( removedMembers . exists ( _ . address == selfAddress ) ) versionedGossip
else versionedGossip seen selfAddress
2012-03-09 12:56:56 +01:00
val newState = localState copy ( latestGossip = seenVersionedGossip )
2012-06-18 13:53:49 +02:00
// ----------------------
// 8. Try to update the state with the new gossip
// ----------------------
if ( ! state . compareAndSet ( localState , newState ) ) {
2012-06-16 00:00:19 +02:00
2012-06-18 13:53:49 +02:00
// ----------------------
// 9. Failure - retry
// ----------------------
leaderActions ( ) // recur
} else {
// ----------------------
// 10. Success - run all the side-effecting processing
// ----------------------
// log the move of members from joining to up
upMembers foreach { member ⇒ log . info ( "Cluster Node [{}] - Leader is moving node [{}] from JOINING to UP" , selfAddress , member . address ) }
2012-06-16 00:00:19 +02:00
// tell all removed members to remove and shut down themselves
2012-06-18 13:53:49 +02:00
removedMembers foreach { member ⇒
val address = member . address
log . info ( "Cluster Node [{}] - Leader is moving node [{}] from EXITING to REMOVED - and removing node from node ring" , selfAddress , address )
2012-06-16 00:00:19 +02:00
clusterCommandConnectionFor ( address ) ! ClusterLeaderAction . Remove ( address )
}
// tell all exiting members to exit
2012-06-18 13:53:49 +02:00
exitingMembers foreach { member ⇒
val address = member . address
log . info ( "Cluster Node [{}] - Leader is moving node [{}] from LEAVING to EXITING" , selfAddress , address )
2012-06-16 00:00:19 +02:00
clusterCommandConnectionFor ( address ) ! ClusterLeaderAction . Exit ( address ) // FIXME should use ? to await completion of handoff?
}
2012-06-18 13:53:49 +02:00
// log the auto-downing of the unreachable nodes
unreachableButNotDownedMembers foreach { member ⇒
log . info ( "Cluster Node [{}] - Leader is marking unreachable node [{}] as DOWN" , selfAddress , member . address )
}
2012-06-07 15:14:38 +02:00
notifyMembershipChangeListeners ( localState , newState )
2012-03-09 12:56:56 +01:00
}
}
}
}
/* *
* Checks if we have a cluster convergence . If there are any unreachable nodes then we can 't have a convergence -
* waiting for user to act ( issuing DOWN ) or leader to act ( issuing DOWN through auto - down ) .
2012-02-18 22:14:53 +01:00
*
* @returns Some ( convergedGossip ) if convergence have been reached and None if not
*/
private def convergence ( gossip : Gossip ) : Option [ Gossip ] = {
2012-02-29 10:02:00 +01:00
val overview = gossip . overview
2012-03-09 12:56:56 +01:00
val unreachable = overview . unreachable
2012-06-12 16:07:20 +02:00
val seen = overview . seen
2012-03-09 12:56:56 +01:00
// First check that:
2012-06-12 16:07:20 +02:00
// 1. we don't have any members that are unreachable, or
2012-03-09 12:56:56 +01:00
// 2. all unreachable members in the set have status DOWN
// Else we can't continue to check for convergence
// When that is done we check that all the entries in the 'seen' table have the same vector clock version
2012-06-12 16:07:20 +02:00
// and that all members exists in seen table
2012-06-16 00:00:19 +02:00
val hasUnreachable = unreachable . nonEmpty && unreachable . exists { _ . status != Down }
2012-06-12 16:07:20 +02:00
val allMembersInSeen = gossip . members . forall ( m ⇒ seen . contains ( m . address ) )
if ( hasUnreachable ) {
2012-06-13 16:13:49 +02:00
log . debug ( "Cluster Node [{}] - No cluster convergence, due to unreachable nodes [{}]." , selfAddress , unreachable )
2012-06-12 16:07:20 +02:00
None
} else if ( ! allMembersInSeen ) {
log . debug ( "Cluster Node [{}] - No cluster convergence, due to members not in seen table [{}]." , selfAddress ,
gossip . members . map ( _ . address ) -- seen . keySet )
None
} else {
2012-06-13 16:13:49 +02:00
val views = seen . values . toSet . size
2012-03-09 12:56:56 +01:00
2012-06-12 16:07:20 +02:00
if ( views == 1 ) {
2012-06-05 14:13:28 +02:00
log . debug ( "Cluster Node [{}] - Cluster convergence reached: [{}]" , selfAddress , gossip . members . mkString ( ", " ) )
2012-03-09 12:56:56 +01:00
Some ( gossip )
2012-06-12 16:07:20 +02:00
} else {
2012-06-13 16:13:49 +02:00
log . debug ( "Cluster Node [{}] - No cluster convergence, since not all nodes have seen the same state yet. [{} of {}]" ,
selfAddress , views , seen . values . size )
2012-06-12 16:07:20 +02:00
None
}
}
2012-03-09 12:56:56 +01:00
}
private def isAvailable ( state : State ) : Boolean = ! isUnavailable ( state )
private def isUnavailable ( state : State ) : Boolean = {
val localGossip = state . latestGossip
2012-06-16 00:00:19 +02:00
val isUnreachable = localGossip . overview . unreachable exists { _ . address == selfAddress }
2012-06-18 13:53:49 +02:00
val hasUnavailableMemberStatus = localGossip . members exists { m ⇒ ( m == self ) && m . status . isUnavailable }
2012-03-09 12:56:56 +01:00
isUnreachable || hasUnavailableMemberStatus
2012-02-18 22:14:53 +01:00
}
2012-06-16 00:00:19 +02:00
private def notifyMembershipChangeListeners ( oldState : State , newState : State ) : Unit = {
2012-06-18 13:53:49 +02:00
val oldMembersStatus = oldState . latestGossip . members . map ( m ⇒ ( m . address , m . status ) )
val newMembersStatus = newState . latestGossip . members . map ( m ⇒ ( m . address , m . status ) )
2012-06-16 00:00:19 +02:00
if ( newMembersStatus != oldMembersStatus )
newState . memberMembershipChangeListeners foreach { _ notify newState . latestGossip . members }
}
2012-02-07 16:53:49 +01:00
/* *
2012-03-09 12:56:56 +01:00
* Looks up and returns the local cluster command connection .
2012-02-07 16:53:49 +01:00
*/
2012-05-31 17:19:49 +02:00
private def clusterCommandDaemon = system . actorFor ( RootActorPath ( selfAddress ) / "system" / "cluster" / "commands" )
2012-02-07 16:53:49 +01:00
2012-03-03 23:55:48 +01:00
/* *
2012-03-09 12:56:56 +01:00
* Looks up and returns the remote cluster command connection for the specific address .
2012-03-03 23:55:48 +01:00
*/
2012-03-09 12:56:56 +01:00
private def clusterCommandConnectionFor ( address : Address ) : ActorRef = system . actorFor ( RootActorPath ( address ) / "system" / "cluster" / "commands" )
2012-03-03 23:55:48 +01:00
2012-02-07 16:53:49 +01:00
/* *
2012-03-09 12:56:56 +01:00
* Looks up and returns the remote cluster gossip connection for the specific address .
2012-02-07 16:53:49 +01:00
*/
2012-03-02 16:20:30 +01:00
private def clusterGossipConnectionFor ( address : Address ) : ActorRef = system . actorFor ( RootActorPath ( address ) / "system" / "cluster" / "gossip" )
2012-01-24 12:09:32 +01:00
2012-03-09 12:56:56 +01:00
/* *
2012-06-05 22:16:15 +02:00
* Gets the addresses of a all the 'deputy' nodes - excluding this node if part of the group .
2012-03-09 12:56:56 +01:00
*/
2012-06-05 22:16:15 +02:00
private def deputyNodes ( addresses : IndexedSeq [ Address ] ) : IndexedSeq [ Address ] =
addresses drop 1 take NrOfDeputyNodes filterNot ( _ == selfAddress )
2012-01-24 12:09:32 +01:00
2012-06-05 22:16:15 +02:00
/* *
2012-06-16 00:00:19 +02:00
* INTERNAL API .
2012-06-05 22:16:15 +02:00
*/
2012-06-16 00:00:19 +02:00
private [ cluster ] def selectRandomNode ( addresses : IndexedSeq [ Address ] ) : Option [ Address ] =
2012-06-05 22:16:15 +02:00
if ( addresses . isEmpty ) None
else Some ( addresses ( ThreadLocalRandom . current nextInt addresses . size ) )
2012-02-14 20:50:12 +01:00
private def isSingletonCluster ( currentState : State ) : Boolean = currentState . latestGossip . members . size == 1
2012-04-14 20:06:03 +02:00
/* *
* Creates the cluster JMX MBean and registers it in the MBean server .
*/
private def createMBean ( ) = {
val mbean = new StandardMBean ( classOf [ ClusterNodeMBean ] ) with ClusterNodeMBean {
2012-04-16 11:23:03 +02:00
// JMX attributes (bean-style)
2012-04-16 16:58:19 +02:00
/*
* Sends a string to the JMX client that will list all nodes in the node ring as follows :
* { { {
* Members :
* Member ( address = akka : //system0@localhost:5550, status = Up)
* Member ( address = akka : //system1@localhost:5551, status = Up)
* Unreachable :
* Member ( address = akka : //system2@localhost:5553, status = Down)
* } } }
*/
def getClusterStatus : String = {
val gossip = clusterNode . latestGossip
val unreachable = gossip . overview . unreachable
val metaData = gossip . meta
"\nMembers:\n\t" + gossip . members . mkString ( "\n\t" ) +
2012-05-31 17:19:49 +02:00
{ if ( unreachable . nonEmpty ) "\nUnreachable:\n\t" + unreachable . mkString ( "\n\t" ) else "" } +
{ if ( metaData . nonEmpty ) "\nMeta Data:\t" + metaData . toString else "" }
2012-04-16 16:58:19 +02:00
}
2012-04-16 11:23:03 +02:00
def getMemberStatus : String = clusterNode . status . toString
2012-04-16 16:58:19 +02:00
2012-04-16 11:23:03 +02:00
def getLeader : String = clusterNode . leader . toString
2012-04-14 20:06:03 +02:00
def isSingleton : Boolean = clusterNode . isSingletonCluster
2012-04-16 16:58:19 +02:00
2012-04-14 20:06:03 +02:00
def isConvergence : Boolean = clusterNode . convergence . isDefined
2012-04-16 16:58:19 +02:00
2012-04-14 20:06:03 +02:00
def isAvailable : Boolean = clusterNode . isAvailable
2012-06-16 00:00:19 +02:00
def isRunning : Boolean = clusterNode . isRunning
2012-04-16 11:23:03 +02:00
// JMX commands
2012-04-16 16:58:19 +02:00
2012-04-14 20:06:03 +02:00
def join ( address : String ) = clusterNode . join ( AddressFromURIString ( address ) )
2012-04-16 16:58:19 +02:00
2012-04-14 20:06:03 +02:00
def leave ( address : String ) = clusterNode . leave ( AddressFromURIString ( address ) )
2012-04-16 16:58:19 +02:00
2012-04-14 20:06:03 +02:00
def down ( address : String ) = clusterNode . down ( AddressFromURIString ( address ) )
}
2012-05-31 17:19:49 +02:00
log . info ( "Cluster Node [{}] - registering cluster JMX MBean [{}]" , selfAddress , clusterMBeanName )
2012-04-16 16:58:19 +02:00
try {
mBeanServer . registerMBean ( mbean , clusterMBeanName )
} catch {
case e : InstanceAlreadyExistsException ⇒ // ignore - we are running multiple cluster nodes in the same JVM (probably for testing)
}
2012-04-14 20:06:03 +02:00
}
2011-10-26 08:48:16 +02:00
}