2012-07-05 13:55:08 +02:00
/* *
* Copyright ( C ) 2009 - 2012 Typesafe Inc . < http : //www.typesafe.com>
*/
package akka.cluster
import scala.collection.immutable.SortedSet
2012-07-06 17:04:04 +02:00
import scala.concurrent.util. { Deadline , Duration }
2012-08-19 20:15:22 +02:00
import scala.concurrent.util.duration._
2012-07-06 17:04:04 +02:00
import scala.concurrent.forkjoin.ThreadLocalRandom
2012-09-24 13:07:11 -06:00
import akka.actor. { Actor , ActorLogging , ActorRef , Address , Cancellable , Props , ReceiveTimeout , RootActorPath , Scheduler }
2012-07-05 13:55:08 +02:00
import akka.actor.Status.Failure
2012-08-14 15:33:34 +02:00
import akka.event.EventStream
2012-08-19 20:15:22 +02:00
import akka.pattern.ask
2012-07-05 13:55:08 +02:00
import akka.util.Timeout
2012-08-14 10:58:30 +02:00
import akka.cluster.MemberStatus._
import akka.cluster.ClusterEvent._
2012-08-15 16:47:34 +02:00
import language.existentials
2012-08-19 20:15:22 +02:00
import language.postfixOps
2012-09-18 09:58:30 +02:00
import scala.concurrent.util.FiniteDuration
2012-07-05 13:55:08 +02:00
/* *
* Base trait for all cluster messages . All ClusterMessage 's are serializable .
*
* FIXME Protobuf all ClusterMessages
*/
trait ClusterMessage extends Serializable
/* *
* Cluster commands sent by the USER .
*/
object ClusterUserAction {
/* *
* Command to join the cluster . Sent when a node ( represented by 'address' )
* wants to join another node ( the receiver ) .
*/
case class Join ( address : Address ) extends ClusterMessage
/* *
* Command to leave the cluster .
*/
case class Leave ( address : Address ) extends ClusterMessage
/* *
* Command to mark node as temporary down .
*/
case class Down ( address : Address ) extends ClusterMessage
}
/* *
* INTERNAL API
*/
private [ cluster ] object InternalClusterAction {
/* *
* Command to initiate join another node ( represented by 'address' ) .
* Join will be sent to the other node .
*/
case class JoinTo ( address : Address ) extends ClusterMessage
2012-09-06 21:48:40 +02:00
/* *
* Command to initiate the process to join the specified
* seed nodes .
*/
case class JoinSeedNodes ( seedNodes : IndexedSeq [ Address ] )
2012-07-05 13:55:08 +02:00
/* *
* Start message of the process to join one of the seed nodes .
* The node sends `InitJoin` to all seed nodes , which replies
* with `InitJoinAck` . The first reply is used others are discarded .
* The node sends `Join` command to the seed node that replied first .
*/
case object JoinSeedNode extends ClusterMessage
/* *
* @see JoinSeedNode
*/
case object InitJoin extends ClusterMessage
/* *
* @see JoinSeedNode
*/
case class InitJoinAck ( address : Address ) extends ClusterMessage
2012-08-14 13:55:22 +02:00
/* *
* Marker interface for periodic tick messages
*/
2012-08-14 17:30:49 +02:00
sealed trait Tick
2012-07-05 13:55:08 +02:00
2012-08-14 13:55:22 +02:00
case object GossipTick extends Tick
2012-07-05 13:55:08 +02:00
2012-08-14 13:55:22 +02:00
case object HeartbeatTick extends Tick
2012-07-05 13:55:08 +02:00
2012-08-14 13:55:22 +02:00
case object ReapUnreachableTick extends Tick
2012-07-05 13:55:08 +02:00
2012-09-24 13:07:11 -06:00
case object MetricsTick extends Tick
2012-08-14 13:55:22 +02:00
case object LeaderActionsTick extends Tick
2012-08-16 18:54:10 +02:00
case object PublishStatsTick extends Tick
2012-07-05 13:55:08 +02:00
case class SendClusterMessage ( to : Address , msg : ClusterMessage )
case class SendGossipTo ( address : Address )
case object GetClusterCoreRef
2012-08-19 20:15:22 +02:00
sealed trait SubscriptionMessage
case class Subscribe ( subscriber : ActorRef , to : Class [ _ ] ) extends SubscriptionMessage
case class Unsubscribe ( subscriber : ActorRef ) extends SubscriptionMessage
2012-09-12 09:23:02 +02:00
/* *
* @param receiver if `receiver` is defined the event will only be sent to that
* actor , otherwise it will be sent to all subscribers via the `eventStream` .
*/
case class PublishCurrentClusterState ( receiver : Option [ ActorRef ] ) extends SubscriptionMessage
2012-08-19 20:15:22 +02:00
case class PublishChanges ( oldGossip : Gossip , newGossip : Gossip )
case object PublishDone
2012-08-15 16:47:34 +02:00
2012-07-05 13:55:08 +02:00
}
/* *
* INTERNAL API .
*
* Cluster commands sent by the LEADER .
*/
private [ cluster ] object ClusterLeaderAction {
/* *
* Command to mark a node to be removed from the cluster immediately .
* Can only be sent by the leader .
*/
case class Exit ( address : Address ) extends ClusterMessage
/* *
* Command to remove a node from the cluster immediately .
*/
case class Remove ( address : Address ) extends ClusterMessage
}
/* *
* INTERNAL API .
*
* Supervisor managing the different Cluster daemons .
*/
2012-09-07 17:42:15 +02:00
private [ cluster ] final class ClusterDaemon ( settings : ClusterSettings ) extends Actor with ActorLogging {
2012-09-06 21:48:40 +02:00
// Important - don't use Cluster(context.system) here because that would
// cause deadlock. The Cluster extension is currently being created and is waiting
// for response from GetClusterCoreRef in its constructor.
2012-07-05 13:55:08 +02:00
2012-09-06 21:48:40 +02:00
val core = context . actorOf ( Props [ ClusterCoreDaemon ] .
withDispatcher ( context . props . dispatcher ) , name = "core" )
val heartbeat = context . actorOf ( Props [ ClusterHeartbeatDaemon ] .
withDispatcher ( context . props . dispatcher ) , name = "heartbeat" )
2012-09-24 13:07:11 -06:00
if ( settings . MetricsEnabled ) context . actorOf ( Props [ ClusterMetricsCollector ] .
withDispatcher ( context . props . dispatcher ) , name = "metrics" )
2012-07-05 13:55:08 +02:00
def receive = {
case InternalClusterAction . GetClusterCoreRef ⇒ sender ! core
}
}
/* *
* INTERNAL API .
*/
2012-09-06 21:48:40 +02:00
private [ cluster ] final class ClusterCoreDaemon extends Actor with ActorLogging {
2012-07-05 13:55:08 +02:00
import ClusterLeaderAction._
import InternalClusterAction._
import ClusterHeartbeatSender._
2012-09-06 21:48:40 +02:00
val cluster = Cluster ( context . system )
2012-09-07 17:42:15 +02:00
import cluster. { selfAddress , scheduler , failureDetector }
2012-09-06 21:48:40 +02:00
import cluster.settings._
2012-07-05 13:55:08 +02:00
val vclockNode = VectorClock . Node ( selfAddress . toString )
val selfHeartbeat = Heartbeat ( selfAddress )
// note that self is not initially member,
// and the Gossip is not versioned for this 'Node' yet
var latestGossip : Gossip = Gossip ( )
var joinInProgress : Map [ Address , Deadline ] = Map . empty
var stats = ClusterStats ( )
2012-09-06 21:48:40 +02:00
val heartbeatSender = context . actorOf ( Props [ ClusterHeartbeatSender ] .
2012-07-05 13:55:08 +02:00
withDispatcher ( UseDispatcher ) , name = "heartbeatSender" )
2012-09-06 21:48:40 +02:00
val coreSender = context . actorOf ( Props [ ClusterCoreSender ] .
2012-07-05 13:55:08 +02:00
withDispatcher ( UseDispatcher ) , name = "coreSender" )
2012-09-06 21:48:40 +02:00
val publisher = context . actorOf ( Props [ ClusterDomainEventPublisher ] .
2012-08-19 20:15:22 +02:00
withDispatcher ( UseDispatcher ) , name = "publisher" )
2012-07-05 13:55:08 +02:00
2012-08-08 15:57:30 +02:00
import context.dispatcher
2012-07-05 13:55:08 +02:00
// start periodic gossip to random nodes in cluster
val gossipTask =
2012-09-18 09:58:30 +02:00
FixedRateTask ( scheduler , PeriodicTasksInitialDelay . max ( GossipInterval ) . asInstanceOf [ FiniteDuration ] , GossipInterval ) {
2012-07-05 13:55:08 +02:00
self ! GossipTick
}
// start periodic heartbeat to all nodes in cluster
val heartbeatTask =
2012-09-18 09:58:30 +02:00
FixedRateTask ( scheduler , PeriodicTasksInitialDelay . max ( HeartbeatInterval ) . asInstanceOf [ FiniteDuration ] , HeartbeatInterval ) {
2012-07-05 13:55:08 +02:00
self ! HeartbeatTick
}
// start periodic cluster failure detector reaping (moving nodes condemned by the failure detector to unreachable list)
val failureDetectorReaperTask =
2012-09-18 09:58:30 +02:00
FixedRateTask ( scheduler , PeriodicTasksInitialDelay . max ( UnreachableNodesReaperInterval ) . asInstanceOf [ FiniteDuration ] , UnreachableNodesReaperInterval ) {
2012-07-05 13:55:08 +02:00
self ! ReapUnreachableTick
}
// start periodic leader action management (only applies for the current leader)
private val leaderActionsTask =
2012-09-18 09:58:30 +02:00
FixedRateTask ( scheduler , PeriodicTasksInitialDelay . max ( LeaderActionsInterval ) . asInstanceOf [ FiniteDuration ] , LeaderActionsInterval ) {
2012-07-05 13:55:08 +02:00
self ! LeaderActionsTick
}
2012-09-24 13:07:11 -06:00
// start periodic publish of current stats
private val publishStatsTask : Option [ Cancellable ] =
2012-08-15 16:47:34 +02:00
if ( PublishStatsInterval == Duration . Zero ) None
2012-09-18 09:58:30 +02:00
else Some ( FixedRateTask ( scheduler , PeriodicTasksInitialDelay . max ( PublishStatsInterval ) . asInstanceOf [ FiniteDuration ] , PublishStatsInterval ) {
2012-08-15 16:47:34 +02:00
self ! PublishStatsTick
2012-07-05 13:55:08 +02:00
} )
override def preStart ( ) : Unit = {
2012-09-06 21:48:40 +02:00
if ( AutoJoin ) self ! JoinSeedNodes ( SeedNodes )
2012-07-05 13:55:08 +02:00
}
override def postStop ( ) : Unit = {
gossipTask . cancel ( )
heartbeatTask . cancel ( )
failureDetectorReaperTask . cancel ( )
leaderActionsTask . cancel ( )
2012-09-24 13:07:11 -06:00
publishStatsTask foreach { _ . cancel ( ) }
2012-07-05 13:55:08 +02:00
}
2012-08-14 13:55:22 +02:00
def uninitialized : Actor . Receive = {
2012-08-19 20:15:22 +02:00
case InitJoin ⇒ // skip, not ready yet
case JoinTo ( address ) ⇒ join ( address )
2012-09-06 21:48:40 +02:00
case JoinSeedNodes ( seedNodes ) ⇒ joinSeedNodes ( seedNodes )
2012-08-19 20:15:22 +02:00
case msg : SubscriptionMessage ⇒ publisher forward msg
case _ : Tick ⇒ // ignore periodic tasks until initialized
2012-08-14 13:55:22 +02:00
}
def initialized : Actor . Receive = {
2012-07-05 13:55:08 +02:00
case msg : GossipEnvelope ⇒ receiveGossip ( msg )
case msg : GossipMergeConflict ⇒ receiveGossipMerge ( msg )
case GossipTick ⇒ gossip ( )
case HeartbeatTick ⇒ heartbeat ( )
case ReapUnreachableTick ⇒ reapUnreachableMembers ( )
case LeaderActionsTick ⇒ leaderActions ( )
2012-08-15 16:47:34 +02:00
case PublishStatsTick ⇒ publishInternalStats ( )
2012-07-05 13:55:08 +02:00
case InitJoin ⇒ initJoin ( )
case JoinTo ( address ) ⇒ join ( address )
case ClusterUserAction . Join ( address ) ⇒ joining ( address )
case ClusterUserAction . Down ( address ) ⇒ downing ( address )
case ClusterUserAction . Leave ( address ) ⇒ leaving ( address )
case Exit ( address ) ⇒ exiting ( address )
case Remove ( address ) ⇒ removing ( address )
case SendGossipTo ( address ) ⇒ gossipTo ( address )
2012-08-19 20:15:22 +02:00
case msg : SubscriptionMessage ⇒ publisher forward msg
2012-07-05 13:55:08 +02:00
}
2012-08-19 20:15:22 +02:00
def removed : Actor . Receive = {
case msg : SubscriptionMessage ⇒ publisher forward msg
case _ : Tick ⇒ // ignore periodic tasks
}
2012-08-14 13:55:22 +02:00
def receive = uninitialized
2012-07-05 13:55:08 +02:00
def initJoin ( ) : Unit = sender ! InitJoinAck ( selfAddress )
2012-09-06 21:48:40 +02:00
def joinSeedNodes ( seedNodes : IndexedSeq [ Address ] ) : Unit = {
// only the node which is named first in the list of seed nodes will join itself
if ( seedNodes . isEmpty || seedNodes . head == selfAddress )
self ! JoinTo ( selfAddress )
else
context . actorOf ( Props ( new JoinSeedNodeProcess ( seedNodes ) ) .
withDispatcher ( UseDispatcher ) , name = "joinSeedNodeProcess" )
}
2012-07-05 13:55:08 +02:00
/* *
* Try to join this cluster node with the node specified by 'address' .
* A 'Join( thisNodeAddress ) ' command is sent to the node to join .
*/
def join ( address : Address ) : Unit = {
2012-08-19 20:15:22 +02:00
if ( ! latestGossip . members . exists ( _ . address == address ) ) {
val localGossip = latestGossip
// wipe our state since a node that joins a cluster must be empty
latestGossip = Gossip ( )
joinInProgress = Map ( address -> ( Deadline . now + JoinTimeout ) )
2012-07-05 13:55:08 +02:00
2012-08-19 20:15:22 +02:00
// wipe the failure detector since we are starting fresh and shouldn't care about the past
failureDetector . reset ( )
2012-07-05 13:55:08 +02:00
2012-08-19 20:15:22 +02:00
publish ( localGossip )
2012-07-05 13:55:08 +02:00
2012-08-19 20:15:22 +02:00
context . become ( initialized )
if ( address == selfAddress )
joining ( address )
else
coreSender ! SendClusterMessage ( address , ClusterUserAction . Join ( selfAddress ) )
}
2012-07-05 13:55:08 +02:00
}
/* *
* State transition to JOINING - new node joining .
*/
def joining ( node : Address ) : Unit = {
val localGossip = latestGossip
val localMembers = localGossip . members
val localUnreachable = localGossip . overview . unreachable
val alreadyMember = localMembers . exists ( _ . address == node )
val isUnreachable = localGossip . overview . isNonDownUnreachable ( node )
if ( ! alreadyMember && ! isUnreachable ) {
// remove the node from the 'unreachable' set in case it is a DOWN node that is rejoining cluster
val ( rejoiningMember , newUnreachableMembers ) = localUnreachable partition { _ . address == node }
val newOverview = localGossip . overview copy ( unreachable = newUnreachableMembers )
// remove the node from the failure detector if it is a DOWN node that is rejoining cluster
if ( rejoiningMember . nonEmpty ) failureDetector . remove ( node )
// add joining node as Joining
// add self in case someone else joins before self has joined (Set discards duplicates)
2012-08-13 10:10:33 +02:00
val newMembers = localMembers + Member ( node , Joining ) + Member ( selfAddress , Joining )
2012-07-05 13:55:08 +02:00
val newGossip = localGossip copy ( overview = newOverview , members = newMembers )
val versionedGossip = newGossip : + vclockNode
val seenVersionedGossip = versionedGossip seen selfAddress
latestGossip = seenVersionedGossip
log . debug ( "Cluster Node [{}] - Node [{}] is JOINING" , selfAddress , node )
// treat join as initial heartbeat, so that it becomes unavailable if nothing more happens
if ( node != selfAddress ) {
failureDetector heartbeat node
gossipTo ( node )
}
2012-08-14 10:58:30 +02:00
publish ( localGossip )
2012-07-05 13:55:08 +02:00
}
}
/* *
* State transition to LEAVING .
*/
def leaving ( address : Address ) : Unit = {
val localGossip = latestGossip
if ( localGossip . members . exists ( _ . address == address ) ) { // only try to update if the node is available (in the member ring)
val newMembers = localGossip . members map { member ⇒ if ( member . address == address ) Member ( address , Leaving ) else member } // mark node as LEAVING
val newGossip = localGossip copy ( members = newMembers )
val versionedGossip = newGossip : + vclockNode
val seenVersionedGossip = versionedGossip seen selfAddress
latestGossip = seenVersionedGossip
log . info ( "Cluster Node [{}] - Marked address [{}] as LEAVING" , selfAddress , address )
2012-08-14 10:58:30 +02:00
publish ( localGossip )
2012-07-05 13:55:08 +02:00
}
}
/* *
* State transition to EXITING .
*/
def exiting ( address : Address ) : Unit = {
log . info ( "Cluster Node [{}] - Marked node [{}] as EXITING" , selfAddress , address )
// FIXME implement when we implement hand-off
}
/* *
* State transition to REMOVED .
*
* This method is for now only called after the LEADER have sent a Removed message - telling the node
* to shut down himself .
*
* In the future we might change this to allow the USER to send a Removed ( address ) message telling an
* arbitrary node to be moved direcly from UP -> REMOVED .
*/
def removing ( address : Address ) : Unit = {
log . info ( "Cluster Node [{}] - Node has been REMOVED by the leader - shutting down..." , selfAddress )
val localGossip = latestGossip
// just cleaning up the gossip state
latestGossip = Gossip ( )
2012-08-14 10:58:30 +02:00
publish ( localGossip )
2012-08-19 20:15:22 +02:00
context . become ( removed )
// make sure the final (removed) state is published
// before shutting down
implicit val timeout = Timeout ( 5 seconds )
2012-09-06 21:48:40 +02:00
publisher ? PublishDone onComplete { case _ ⇒ cluster . shutdown ( ) }
2012-07-05 13:55:08 +02:00
}
/* *
* The node to DOWN is removed from the 'members' set and put in the 'unreachable' set ( if not already there )
* and its status is set to DOWN . The node is also removed from the 'seen' table .
*
* The node will reside as DOWN in the 'unreachable' set until an explicit command JOIN command is sent directly
* to this node and it will then go through the normal JOINING procedure .
*/
def downing ( address : Address ) : Unit = {
val localGossip = latestGossip
val localMembers = localGossip . members
val localOverview = localGossip . overview
val localSeen = localOverview . seen
val localUnreachableMembers = localOverview . unreachable
// 1. check if the node to DOWN is in the 'members' set
2012-07-07 20:55:02 +02:00
val downedMember : Option [ Member ] =
localMembers . collectFirst { case m if m . address == address ⇒ m . copy ( status = Down ) }
2012-07-05 13:55:08 +02:00
val newMembers = downedMember match {
case Some ( m ) ⇒
log . info ( "Cluster Node [{}] - Marking node [{}] as DOWN" , selfAddress , m . address )
localMembers - m
case None ⇒ localMembers
}
// 2. check if the node to DOWN is in the 'unreachable' set
val newUnreachableMembers =
localUnreachableMembers . map { member ⇒
// no need to DOWN members already DOWN
if ( member . address == address && member . status != Down ) {
log . info ( "Cluster Node [{}] - Marking unreachable node [{}] as DOWN" , selfAddress , member . address )
member copy ( status = Down )
} else member
}
// 3. add the newly DOWNED members from the 'members' (in step 1.) to the 'newUnreachableMembers' set.
val newUnreachablePlusNewlyDownedMembers = newUnreachableMembers ++ downedMember
// 4. remove nodes marked as DOWN from the 'seen' table
2012-07-07 20:55:02 +02:00
val newSeen = localSeen -- newUnreachablePlusNewlyDownedMembers . collect { case m if m . status == Down ⇒ m . address }
2012-07-05 13:55:08 +02:00
// update gossip overview
val newOverview = localOverview copy ( seen = newSeen , unreachable = newUnreachablePlusNewlyDownedMembers )
val newGossip = localGossip copy ( overview = newOverview , members = newMembers ) // update gossip
val versionedGossip = newGossip : + vclockNode
latestGossip = versionedGossip seen selfAddress
2012-08-14 10:58:30 +02:00
publish ( localGossip )
2012-07-05 13:55:08 +02:00
}
/* *
* When conflicting versions of received and local [ [ akka . cluster . Gossip ] ] is detected
* it 's forwarded to the leader for conflict resolution . Trying to simultaneously
* resolving conflicts at several nodes creates new conflicts . Therefore the leader resolves
* conflicts to limit divergence . To avoid overload there is also a configurable rate
* limit of how many conflicts that are handled by second . If the limit is
* exceeded the conflicting gossip messages are dropped and will reappear later .
*/
def receiveGossipMerge ( merge : GossipMergeConflict ) : Unit = {
stats = stats . incrementMergeConflictCount
val rate = mergeRate ( stats . mergeConflictCount )
if ( rate <= MaxGossipMergeRate ) {
receiveGossip ( merge . a . copy ( conversation = false ) )
receiveGossip ( merge . b . copy ( conversation = false ) )
// use one-way gossip from leader to reduce load of leader
def sendBack ( to : Address ) : Unit = {
if ( to != selfAddress && ! latestGossip . overview . unreachable . exists ( _ . address == to ) )
oneWayGossipTo ( to )
}
sendBack ( merge . a . from )
sendBack ( merge . b . from )
} else {
log . debug ( "Dropping gossip merge conflict due to rate [{}] / s " , rate )
}
}
/* *
* Receive new gossip .
*/
def receiveGossip ( envelope : GossipEnvelope ) : Unit = {
val from = envelope . from
val remoteGossip = envelope . gossip
val localGossip = latestGossip
if ( remoteGossip . overview . unreachable . exists ( _ . address == selfAddress ) ) {
// FIXME how should we handle this situation?
log . debug ( "Received gossip with self as unreachable, from [{}]" , from )
} else if ( ! localGossip . overview . isNonDownUnreachable ( from ) ) {
// leader handles merge conflicts, or when they have different views of how is leader
val handleMerge = localGossip . leader == Some ( selfAddress ) || localGossip . leader != remoteGossip . leader
val conflict = remoteGossip . version <> localGossip . version
if ( conflict && ! handleMerge ) {
// delegate merge resolution to leader to reduce number of simultaneous resolves,
// which will result in new conflicts
stats = stats . incrementMergeDetectedCount
log . debug ( "Merge conflict [{}] detected [{}] <> [{}]" , stats . mergeDetectedCount , selfAddress , from )
stats = stats . incrementMergeConflictCount
val rate = mergeRate ( stats . mergeConflictCount )
2012-07-07 20:55:02 +02:00
if ( rate <= MaxGossipMergeRate )
coreSender ! SendClusterMessage ( to = localGossip . leader . get , msg = GossipMergeConflict ( GossipEnvelope ( selfAddress , localGossip ) , envelope ) )
else
2012-07-05 13:55:08 +02:00
log . debug ( "Skipping gossip merge conflict due to rate [{}] / s " , rate )
} else {
val winningGossip =
2012-07-07 20:55:02 +02:00
if ( conflict ) ( remoteGossip merge localGossip ) : + vclockNode // conflicting versions, merge, and new version
else if ( remoteGossip . version < localGossip . version ) localGossip // local gossip is newer
else remoteGossip // remote gossip is newer
2012-07-05 13:55:08 +02:00
val newJoinInProgress =
if ( joinInProgress . isEmpty ) joinInProgress
2012-07-07 20:55:02 +02:00
else joinInProgress -- winningGossip . members . map ( _ . address ) -- winningGossip . overview . unreachable . map ( _ . address )
2012-07-05 13:55:08 +02:00
latestGossip = winningGossip seen selfAddress
joinInProgress = newJoinInProgress
// for all new joining nodes we remove them from the failure detector
2012-07-07 20:55:02 +02:00
( latestGossip . members -- localGossip . members ) . foreach {
node ⇒ if ( node . status == Joining ) failureDetector . remove ( node . address )
2012-07-05 13:55:08 +02:00
}
log . debug ( "Cluster Node [{}] - Receiving gossip from [{}]" , selfAddress , from )
if ( conflict ) {
stats = stats . incrementMergeCount
log . debug (
"""Couldn't establish a causal relationship between "remote" gossip and "local" gossip - Remote[{}] - Local[{}] - merged them into [{}]""" ,
remoteGossip , localGossip , winningGossip )
}
stats = stats . incrementReceivedGossipCount
2012-08-14 10:58:30 +02:00
publish ( localGossip )
2012-07-05 13:55:08 +02:00
if ( envelope . conversation &&
( conflict || ( winningGossip ne remoteGossip ) || ( latestGossip ne remoteGossip ) ) ) {
// send back gossip to sender when sender had different view, i.e. merge, or sender had
// older or sender had newer
gossipTo ( from )
}
}
}
}
def mergeRate ( count : Long ) : Double = ( count * 1000.0 ) / GossipInterval . toMillis
/* *
* Initiates a new round of gossip .
*/
def gossip ( ) : Unit = {
stats = stats . copy ( mergeConflictCount = 0 )
log . debug ( "Cluster Node [{}] - Initiating new round of gossip" , selfAddress )
if ( ! isSingletonCluster && isAvailable ) {
val localGossip = latestGossip
2012-07-07 20:55:02 +02:00
val preferredGossipTargets =
if ( ThreadLocalRandom . current . nextDouble ( ) < GossipDifferentViewProbability ) { // If it's time to try to gossip to some nodes with a different view
// gossip to a random alive member with preference to a member with older or newer gossip version
val localMemberAddressesSet = localGossip . members map { _ . address }
val nodesWithDifferentView = for {
( address , version ) ← localGossip . overview . seen
if localMemberAddressesSet contains address
if version != localGossip . version
} yield address
nodesWithDifferentView . toIndexedSeq
} else Vector . empty [ Address ]
gossipToRandomNodeOf (
if ( preferredGossipTargets . nonEmpty ) preferredGossipTargets
2012-07-12 00:34:51 +02:00
else localGossip . members . toIndexedSeq . map ( _ . address ) // Fall back to localGossip; important to not accidentally use `map` of the SortedSet, since the original order is not preserved)
2012-07-07 20:55:02 +02:00
)
2012-07-05 13:55:08 +02:00
}
}
/* *
* Runs periodic leader actions , such as auto - downing unreachable nodes , assigning partitions etc .
*/
def leaderActions ( ) : Unit = {
val localGossip = latestGossip
val localMembers = localGossip . members
2012-08-19 21:48:39 +02:00
val isLeader = localGossip . isLeader ( selfAddress )
2012-07-05 13:55:08 +02:00
if ( isLeader && isAvailable ) {
// only run the leader actions if we are the LEADER and available
val localOverview = localGossip . overview
val localSeen = localOverview . seen
val localUnreachableMembers = localOverview . unreachable
val hasPartionHandoffCompletedSuccessfully : Boolean = {
// FIXME implement partion handoff and a check if it is completed - now just returns TRUE - e.g. has completed successfully
true
}
// Leader actions are as follows:
2012-07-07 20:55:02 +02:00
// 1. Move JOINING => UP -- When a node joins the cluster
// 2. Move LEAVING => EXITING -- When all partition handoff has completed
// 3. Non-exiting remain -- When all partition handoff has completed
// 4. Move EXITING => REMOVED -- When all nodes have seen that the node is EXITING (convergence) - remove the nodes from the node ring and seen table
// 5. Move UNREACHABLE => DOWN -- When the node is in the UNREACHABLE set it can be auto-down by leader
2012-07-05 13:55:08 +02:00
// 6. Updating the vclock version for the changes
// 7. Updating the 'seen' table
// 8. Try to update the state with the new gossip
2012-08-19 20:15:22 +02:00
// 9. If success - run all the side-effecting processing
2012-07-05 13:55:08 +02:00
val (
newGossip : Gossip ,
hasChangedState : Boolean ,
upMembers ,
exitingMembers ,
removedMembers ,
unreachableButNotDownedMembers ) =
if ( localGossip . convergence ) {
// we have convergence - so we can't have unreachable nodes
2012-07-07 20:55:02 +02:00
// transform the node member ring
val newMembers = localMembers collect {
// 1. Move JOINING => UP (once all nodes have seen that this node is JOINING e.g. we have a convergence)
case member if member . status == Joining ⇒ member copy ( status = Up )
// 2. Move LEAVING => EXITING (once we have a convergence on LEAVING *and* if we have a successful partition handoff)
case member if member . status == Leaving && hasPartionHandoffCompletedSuccessfully ⇒ member copy ( status = Exiting )
// 3. Everyone else that is not Exiting stays as they are
case member if member . status != Exiting ⇒ member
// 4. Move EXITING => REMOVED - e.g. remove the nodes from the 'members' set/node ring and seen table
}
2012-07-05 13:55:08 +02:00
// ----------------------
// 5. Store away all stuff needed for the side-effecting processing in 10.
// ----------------------
// Check for the need to do side-effecting on successful state change
// Repeat the checking for transitions between JOINING -> UP, LEAVING -> EXITING, EXITING -> REMOVED
// to check for state-changes and to store away removed and exiting members for later notification
// 1. check for state-changes to update
// 2. store away removed and exiting members so we can separate the pure state changes (that can be retried on collision) and the side-effecting message sending
val ( removedMembers , newMembers1 ) = localMembers partition ( _ . status == Exiting )
val ( upMembers , newMembers2 ) = newMembers1 partition ( _ . status == Joining )
2012-07-07 20:55:02 +02:00
val exitingMembers = newMembers2 filter ( _ . status == Leaving && hasPartionHandoffCompletedSuccessfully )
2012-07-05 13:55:08 +02:00
val hasChangedState = removedMembers . nonEmpty || upMembers . nonEmpty || exitingMembers . nonEmpty
// removing REMOVED nodes from the 'seen' table
val newSeen = localSeen -- removedMembers . map ( _ . address )
// removing REMOVED nodes from the 'unreachable' set
val newUnreachableMembers = localUnreachableMembers -- removedMembers
val newOverview = localOverview copy ( seen = newSeen , unreachable = newUnreachableMembers ) // update gossip overview
val newGossip = localGossip copy ( members = newMembers , overview = newOverview ) // update gossip
2012-07-07 20:55:02 +02:00
( newGossip , hasChangedState , upMembers , exitingMembers , removedMembers , Member . none )
2012-07-05 13:55:08 +02:00
} else if ( AutoDown ) {
// we don't have convergence - so we might have unreachable nodes
// if 'auto-down' is turned on, then try to auto-down any unreachable nodes
2012-07-07 20:55:02 +02:00
val newUnreachableMembers = localUnreachableMembers collect {
2012-07-05 13:55:08 +02:00
// ----------------------
2012-07-07 20:55:02 +02:00
// 6. Move UNREACHABLE => DOWN (auto-downing by leader)
2012-07-05 13:55:08 +02:00
// ----------------------
2012-07-07 20:55:02 +02:00
case member if member . status != Down ⇒ member copy ( status = Down )
case downMember ⇒ downMember // no need to DOWN members already DOWN
2012-07-05 13:55:08 +02:00
}
// Check for the need to do side-effecting on successful state change
2012-07-07 20:55:02 +02:00
val unreachableButNotDownedMembers = localUnreachableMembers filter ( _ . status != Down )
2012-07-05 13:55:08 +02:00
// removing nodes marked as DOWN from the 'seen' table
val newSeen = localSeen -- newUnreachableMembers . collect { case m if m . status == Down ⇒ m . address }
val newOverview = localOverview copy ( seen = newSeen , unreachable = newUnreachableMembers ) // update gossip overview
val newGossip = localGossip copy ( overview = newOverview ) // update gossip
2012-07-07 20:55:02 +02:00
( newGossip , unreachableButNotDownedMembers . nonEmpty , Member . none , Member . none , Member . none , unreachableButNotDownedMembers )
2012-07-05 13:55:08 +02:00
2012-07-07 20:55:02 +02:00
} else ( localGossip , false , Member . none , Member . none , Member . none , Member . none )
2012-07-05 13:55:08 +02:00
if ( hasChangedState ) { // we have a change of state - version it and try to update
// ----------------------
// 6. Updating the vclock version for the changes
// ----------------------
val versionedGossip = newGossip : + vclockNode
// ----------------------
// 7. Updating the 'seen' table
// Unless the leader (this node) is part of the removed members, i.e. the leader have moved himself from EXITING -> REMOVED
// ----------------------
val seenVersionedGossip =
if ( removedMembers . exists ( _ . address == selfAddress ) ) versionedGossip
else versionedGossip seen selfAddress
// ----------------------
// 8. Update the state with the new gossip
// ----------------------
latestGossip = seenVersionedGossip
// ----------------------
// 9. Run all the side-effecting processing
// ----------------------
// log the move of members from joining to up
upMembers foreach { member ⇒ log . info ( "Cluster Node [{}] - Leader is moving node [{}] from JOINING to UP" , selfAddress , member . address ) }
// tell all removed members to remove and shut down themselves
removedMembers foreach { member ⇒
val address = member . address
log . info ( "Cluster Node [{}] - Leader is moving node [{}] from EXITING to REMOVED - and removing node from node ring" , selfAddress , address )
coreSender ! SendClusterMessage (
to = address ,
msg = ClusterLeaderAction . Remove ( address ) )
}
// tell all exiting members to exit
exitingMembers foreach { member ⇒
val address = member . address
log . info ( "Cluster Node [{}] - Leader is moving node [{}] from LEAVING to EXITING" , selfAddress , address )
coreSender ! SendClusterMessage (
to = address ,
msg = ClusterLeaderAction . Exit ( address ) ) // FIXME should use ? to await completion of handoff?
}
// log the auto-downing of the unreachable nodes
unreachableButNotDownedMembers foreach { member ⇒
log . info ( "Cluster Node [{}] - Leader is marking unreachable node [{}] as DOWN" , selfAddress , member . address )
}
2012-08-14 10:58:30 +02:00
publish ( localGossip )
2012-07-05 13:55:08 +02:00
}
}
}
def heartbeat ( ) : Unit = {
removeOverdueJoinInProgress ( )
val beatTo = latestGossip . members . toSeq . map ( _ . address ) ++ joinInProgress . keys
val deadline = Deadline . now + HeartbeatInterval
2012-07-07 20:55:02 +02:00
beatTo . foreach { address ⇒ if ( address != selfAddress ) heartbeatSender ! SendHeartbeat ( selfHeartbeat , address , deadline ) }
2012-07-05 13:55:08 +02:00
}
/* *
* Removes overdue joinInProgress from State .
*/
def removeOverdueJoinInProgress ( ) : Unit = {
2012-07-07 20:55:02 +02:00
joinInProgress --= joinInProgress collect { case ( address , deadline ) if deadline . isOverdue ⇒ address }
2012-07-05 13:55:08 +02:00
}
/* *
* Reaps the unreachable members ( moves them to the 'unreachable' list in the cluster overview ) according to the failure detector 's verdict .
*/
def reapUnreachableMembers ( ) : Unit = {
if ( ! isSingletonCluster && isAvailable ) {
// only scrutinize if we are a non-singleton cluster and available
val localGossip = latestGossip
val localOverview = localGossip . overview
val localMembers = localGossip . members
val localUnreachableMembers = localGossip . overview . unreachable
val newlyDetectedUnreachableMembers = localMembers filterNot { member ⇒
member . address == selfAddress || failureDetector . isAvailable ( member . address )
}
if ( newlyDetectedUnreachableMembers . nonEmpty ) {
val newMembers = localMembers -- newlyDetectedUnreachableMembers
val newUnreachableMembers = localUnreachableMembers ++ newlyDetectedUnreachableMembers
val newOverview = localOverview copy ( unreachable = newUnreachableMembers )
val newGossip = localGossip copy ( overview = newOverview , members = newMembers )
// updating vclock and 'seen' table
val versionedGossip = newGossip : + vclockNode
val seenVersionedGossip = versionedGossip seen selfAddress
latestGossip = seenVersionedGossip
log . error ( "Cluster Node [{}] - Marking node(s) as UNREACHABLE [{}]" , selfAddress , newlyDetectedUnreachableMembers . mkString ( ", " ) )
2012-08-14 10:58:30 +02:00
publish ( localGossip )
2012-07-05 13:55:08 +02:00
}
}
}
def selectRandomNode ( addresses : IndexedSeq [ Address ] ) : Option [ Address ] =
if ( addresses . isEmpty ) None
else Some ( addresses ( ThreadLocalRandom . current nextInt addresses . size ) )
def isSingletonCluster : Boolean = latestGossip . isSingletonCluster
def isAvailable : Boolean = latestGossip . isAvailable ( selfAddress )
/* *
* Gossips latest gossip to a random member in the set of members passed in as argument .
*
* @return the used [ [ akka . actor . Address ] if any
*/
private def gossipToRandomNodeOf ( addresses : IndexedSeq [ Address ] ) : Option [ Address ] = {
log . debug ( "Cluster Node [{}] - Selecting random node to gossip to [{}]" , selfAddress , addresses . mkString ( ", " ) )
// filter out myself
val peer = selectRandomNode ( addresses filterNot ( _ == selfAddress ) )
peer foreach gossipTo
peer
}
/* *
* Gossips latest gossip to an address .
*/
def gossipTo ( address : Address ) : Unit =
gossipTo ( address , GossipEnvelope ( selfAddress , latestGossip , conversation = true ) )
def oneWayGossipTo ( address : Address ) : Unit =
gossipTo ( address , GossipEnvelope ( selfAddress , latestGossip , conversation = false ) )
def gossipTo ( address : Address , gossipMsg : GossipEnvelope ) : Unit = if ( address != selfAddress )
coreSender ! SendClusterMessage ( address , gossipMsg )
2012-08-15 16:47:34 +02:00
def publish ( oldGossip : Gossip ) : Unit = {
2012-08-19 20:15:22 +02:00
publisher ! PublishChanges ( oldGossip , latestGossip )
2012-08-15 16:47:34 +02:00
if ( PublishStatsInterval == Duration . Zero ) publishInternalStats ( )
2012-08-14 10:58:30 +02:00
}
2012-08-19 20:15:22 +02:00
def publishInternalStats ( ) : Unit = publisher ! CurrentInternalStats ( stats )
2012-08-14 15:33:34 +02:00
2012-07-05 13:55:08 +02:00
}
2012-08-14 17:26:33 +02:00
/* *
* INTERNAL API .
2012-08-15 08:21:34 +02:00
*
* Sends InitJoinAck to all seed nodes ( except itself ) and expect
* InitJoinAck reply back . The seed node that replied first
* will be used , joined to . InitJoinAck replies received after the
* first one are ignored .
*
* Retries if no InitJoinAck replies are received within the
* SeedNodeTimeout .
* When at least one reply has been received it stops itself after
* an idle SeedNodeTimeout .
*
2012-08-31 12:27:17 +02:00
* The seed nodes can be started in any order , but they will not be "active" ,
* until they have been able to join another seed node ( seed1 ) .
* They will retry the join procedure .
* So one possible startup scenario is :
* 1. seed2 started , but doesn 't get any ack from seed1 or seed3
* 2. seed3 started , doesn 't get any ack from seed1 or seed3 ( seed2 doesn 't reply )
* 3. seed1 is started and joins itself
* 4. seed2 retries the join procedure and gets an ack from seed1 , and then joins to seed1
* 5. seed3 retries the join procedure and gets acks from seed2 first , and then joins to seed2
*
2012-08-14 17:26:33 +02:00
*/
2012-09-06 21:48:40 +02:00
private [ cluster ] final class JoinSeedNodeProcess ( seedNodes : IndexedSeq [ Address ] ) extends Actor with ActorLogging {
2012-08-14 17:26:33 +02:00
import InternalClusterAction._
2012-09-06 21:48:40 +02:00
def selfAddress = Cluster ( context . system ) . selfAddress
2012-08-14 17:26:33 +02:00
2012-09-06 21:48:40 +02:00
if ( seedNodes . isEmpty || seedNodes . head == selfAddress )
2012-08-14 17:26:33 +02:00
throw new IllegalArgumentException ( "Join seed node should not be done" )
2012-09-06 21:48:40 +02:00
context . setReceiveTimeout ( Cluster ( context . system ) . settings . SeedNodeTimeout )
2012-08-15 08:21:34 +02:00
2012-08-15 17:31:36 +02:00
override def preStart ( ) : Unit = self ! JoinSeedNode
2012-08-14 17:26:33 +02:00
def receive = {
case JoinSeedNode ⇒
2012-08-15 08:21:34 +02:00
// send InitJoin to all seed nodes (except myself)
2012-09-06 21:48:40 +02:00
seedNodes . collect {
2012-09-24 13:07:11 -06:00
case a if a != selfAddress ⇒ context . actorFor ( context . parent . path . toStringWithAddress ( a ) )
2012-08-15 17:31:36 +02:00
} foreach { _ ! InitJoin }
2012-08-14 17:26:33 +02:00
case InitJoinAck ( address ) ⇒
2012-08-15 08:21:34 +02:00
// first InitJoinAck reply
2012-08-14 17:26:33 +02:00
context . parent ! JoinTo ( address )
2012-08-15 08:21:34 +02:00
context . become ( done )
case ReceiveTimeout ⇒
// no InitJoinAck received, try again
self ! JoinSeedNode
2012-08-14 17:26:33 +02:00
}
2012-08-15 08:21:34 +02:00
def done : Actor . Receive = {
case InitJoinAck ( _ ) ⇒ // already received one, skip rest
case ReceiveTimeout ⇒ context . stop ( self )
}
2012-08-14 17:26:33 +02:00
}
2012-07-05 13:55:08 +02:00
/* *
* INTERNAL API .
*/
2012-09-06 21:48:40 +02:00
private [ cluster ] final class ClusterCoreSender extends Actor with ActorLogging {
2012-07-05 13:55:08 +02:00
import InternalClusterAction._
2012-09-06 21:48:40 +02:00
val selfAddress = Cluster ( context . system ) . selfAddress
2012-07-05 13:55:08 +02:00
/* *
* Looks up and returns the remote cluster command connection for the specific address .
*/
private def clusterCoreConnectionFor ( address : Address ) : ActorRef =
2012-09-24 13:07:11 -06:00
context . actorFor ( RootActorPath ( address ) / "system" / "cluster" / "core" )
2012-07-05 13:55:08 +02:00
def receive = {
case SendClusterMessage ( to , msg ) ⇒
log . debug ( "Cluster Node [{}] - Trying to send [{}] to [{}]" , selfAddress , msg . getClass . getSimpleName , to )
clusterCoreConnectionFor ( to ) ! msg
}
}
/* *
* INTERNAL API
*/
private [ cluster ] case class ClusterStats (
receivedGossipCount : Long = 0L ,
mergeConflictCount : Long = 0L ,
mergeCount : Long = 0L ,
mergeDetectedCount : Long = 0L ) {
def incrementReceivedGossipCount ( ) : ClusterStats =
copy ( receivedGossipCount = receivedGossipCount + 1 )
def incrementMergeConflictCount ( ) : ClusterStats =
copy ( mergeConflictCount = mergeConflictCount + 1 )
def incrementMergeCount ( ) : ClusterStats =
copy ( mergeCount = mergeCount + 1 )
def incrementMergeDetectedCount ( ) : ClusterStats =
copy ( mergeDetectedCount = mergeDetectedCount + 1 )
}