Improve publish of domain events, see #2202

* Gossip is not exposed in user api
* Better and more events
* Snapshot event sent to new subscriber
* Updated tests
* Periodic publish only for internal stats
This commit is contained in:
Patrik Nordwall 2012-08-15 16:47:34 +02:00
parent bc4d8fc7c5
commit 06f81f4373
21 changed files with 294 additions and 197 deletions

View file

@ -89,19 +89,19 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector)
log.info("Cluster Node [{}] - is starting up...", selfAddress)
/**
* Read only view of cluster state, updated periodically by
* ClusterCoreDaemon. Access with `latestGossip`.
* Read view of cluster state, updated via subscription of
* cluster events published on the event bus.
*/
@volatile
private[cluster] var _latestGossip: Gossip = Gossip()
private var state: CurrentClusterState = CurrentClusterState()
/**
* INTERNAL API
* Read only view of internal cluster stats, updated periodically by
* ClusterCoreDaemon. Access with `latestStats`.
* ClusterCoreDaemon via event bus. Access with `latestStats`.
*/
@volatile
private[cluster] var _latestStats = ClusterStats()
private var _latestStats = ClusterStats()
// ========================================================
// ===================== WORK DAEMONS =====================
@ -155,20 +155,6 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector)
}
}
// create actor that subscribes to the cluster eventBus to update current read view state
private val eventBusListener: ActorRef = {
val listener = system.asInstanceOf[ActorSystemImpl].systemActorOf(Props(new Actor {
def receive = {
case MembershipGossipChanged(gossip) _latestGossip = gossip
case InternalStatsChanged(stats) _latestStats = stats
case _ // ignore, not interesting
}
}).withDispatcher(UseDispatcher), name = "clusterEventBusListener")
subscribe(listener, classOf[ClusterDomainEvent])
listener
}
// create supervisor for daemons under path "/system/cluster"
private val clusterDaemons: ActorRef = {
system.asInstanceOf[ActorSystemImpl].systemActorOf(Props(new ClusterDaemon(this)).
@ -183,6 +169,24 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector)
Await.result((clusterDaemons ? InternalClusterAction.GetClusterCoreRef).mapTo[ActorRef], timeout.duration)
}
// create actor that subscribes to the cluster eventBus to update current read view state
private val eventBusListener: ActorRef = {
system.asInstanceOf[ActorSystemImpl].systemActorOf(Props(new Actor {
override def preStart(): Unit = subscribe(self, classOf[ClusterDomainEvent])
override def postStop(): Unit = unsubscribe(self)
def receive = {
case s: CurrentClusterState state = s
case MembersChanged(members) state = state.copy(members = members)
case UnreachableMembersChanged(unreachable) state = state.copy(unreachable = unreachable)
case LeaderChanged(leader) state = state.copy(leader = leader)
case SeenChanged(convergence, seenBy) state = state.copy(convergence = convergence, seenBy = seenBy)
case CurrentInternalStats(stats) _latestStats = stats
case _ // ignore, not interesting
}
}).withDispatcher(UseDispatcher), name = "clusterEventBusListener")
}
system.registerOnTermination(shutdown())
private val clusterJmx = new ClusterJmx(this, log)
@ -194,7 +198,10 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector)
// ===================== PUBLIC API =====================
// ======================================================
def self: Member = latestGossip.member(selfAddress)
def self: Member = {
state.members.find(_.address == selfAddress).orElse(state.unreachable.find(_.address == selfAddress)).
getOrElse(Member(selfAddress, MemberStatus.Removed))
}
/**
* Returns true if the cluster node is up and running, false if it is shut down.
@ -202,9 +209,14 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector)
def isRunning: Boolean = _isRunning.get
/**
* Latest gossip.
* Current cluster members, sorted with leader first.
*/
def latestGossip: Gossip = _latestGossip
def members: SortedSet[Member] = state.members
/**
* Members that has been detected as unreachable.
*/
def unreachableMembers: Set[Member] = state.unreachable
/**
* Member status for this node ([[akka.cluster.MemberStatus]]).
@ -218,35 +230,35 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector)
/**
* Is this node the leader?
*/
def isLeader: Boolean = latestGossip.isLeader(selfAddress)
def isLeader: Boolean = leader == Some(selfAddress)
/**
* Get the address of the current leader.
*/
def leader: Address = latestGossip.leader match {
case Some(x) x
case None throw new IllegalStateException("There is no leader in this cluster")
}
def leader: Option[Address] = state.leader
/**
* Is this node a singleton cluster?
*/
def isSingletonCluster: Boolean = latestGossip.isSingletonCluster
def isSingletonCluster: Boolean = members.size == 1
/**
* Checks if we have a cluster convergence.
*
* @return Some(convergedGossip) if convergence have been reached and None if not
*/
def convergence: Option[Gossip] = latestGossip match {
case gossip if gossip.convergence Some(gossip)
case _ None
}
def convergence: Boolean = state.convergence
/**
* The nodes that has seen current version of the Gossip.
*/
def seenBy: Set[Address] = state.seenBy
/**
* Returns true if the node is UP or JOINING.
*/
def isAvailable: Boolean = latestGossip.isAvailable(selfAddress)
def isAvailable: Boolean = {
val myself = self
!unreachableMembers.contains(myself) && !myself.status.isUnavailable
}
/**
* Make it possible to override/configure seedNodes from tests without
@ -257,14 +269,17 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector)
/**
* Subscribe to cluster domain events.
* The `to` Class can be [[akka.cluster.ClusterEvent.ClusterDomainEvent]]
* or subclass.
* or subclass. A snapshot of [[akka.cluster.ClusterEvent.CurrentClusterState]]
* will also be sent to the subscriber.
*/
def subscribe(subscriber: ActorRef, to: Class[_]): Unit = system.eventStream.subscribe(subscriber, to)
def subscribe(subscriber: ActorRef, to: Class[_]): Unit =
clusterCore ! InternalClusterAction.Subscribe(subscriber, to)
/**
* Subscribe to cluster domain events.
* Unsubscribe to cluster domain events.
*/
def unsubscribe(subscriber: ActorRef): Unit = system.eventStream.unsubscribe(subscriber)
def unsubscribe(subscriber: ActorRef): Unit =
clusterCore ! InternalClusterAction.Unsubscribe(subscriber)
/**
* Try to join this cluster node with the node specified by 'address'.
@ -289,6 +304,11 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector)
// ===================== INTERNAL API =====================
// ========================================================
/**
* INTERNAL API
*/
private[cluster] def latestStats: ClusterStats = _latestStats
/**
* INTERNAL API.
*
@ -301,9 +321,8 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector)
if (_isRunning.compareAndSet(true, false)) {
log.info("Cluster Node [{}] - Shutting down cluster Node and cluster daemons...", selfAddress)
system.stop(clusterDaemons)
unsubscribe(eventBusListener)
system.stop(eventBusListener)
system.stop(clusterDaemons)
scheduler.close()
@ -313,32 +332,5 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector)
}
}
/**
* INTERNAL API
*/
private[cluster] def latestStats: ClusterStats = _latestStats
}
/**
* Domain events published to the cluster event bus.
*/
object ClusterEvent {
/**
* Marker interface for cluster domain events.
*/
trait ClusterDomainEvent
/**
* Set of cluster members, or their status has changed.
*/
case class MembersChanged(members: SortedSet[Member]) extends ClusterDomainEvent
case class MembershipGossipChanged(gossip: Gossip) extends ClusterDomainEvent
/**
* INTERNAL API
*/
private[cluster] case class InternalStatsChanged(stats: ClusterStats) extends ClusterDomainEvent
}