Publish cluster changes to event bus, see #2202

* ClusterEventBus
* Removed register listener and related
* Removed Gossip.meta because it doesn't handle version conflicts
This commit is contained in:
Patrik Nordwall 2012-08-14 10:58:30 +02:00
parent 9094199011
commit e38dd80f38
14 changed files with 209 additions and 159 deletions

View file

@ -71,11 +71,7 @@ object Cluster extends ExtensionId[Cluster] with ExtensionIdProvider {
*/
class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector) extends Extension with ClusterEnvironment {
/**
* Represents the state for this Cluster. Implemented using optimistic lockless concurrency.
* All state is represented by this immutable case class and managed by an AtomicReference.
*/
private case class State(memberMembershipChangeListeners: Set[MembershipChangeListener] = Set.empty)
import ClusterEvent._
if (!system.provider.isInstanceOf[RemoteActorRefProvider])
throw new ConfigurationException("ActorSystem[" + system + "] needs to have a 'RemoteActorRefProvider' enabled in the configuration")
@ -92,8 +88,6 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector)
log.info("Cluster Node [{}] - is starting up...", selfAddress)
private val state = new AtomicReference[State](State())
/**
* Read only view of cluster state, updated periodically by
* ClusterCoreDaemon. Access with `latestGossip`.
@ -109,6 +103,8 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector)
@volatile
private[cluster] var _latestStats = ClusterStats()
private[cluster] val eventBus: ClusterEventBus = new ClusterEventBus
// ========================================================
// ===================== WORK DAEMONS =====================
// ========================================================
@ -161,6 +157,20 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector)
}
}
// create actor that subscribes to the cluster eventBus to update current read view state
private val eventBusListener: ActorRef = {
val listener = system.asInstanceOf[ActorSystemImpl].systemActorOf(Props(new Actor {
def receive = {
case MembershipGossipChanged(gossip) _latestGossip = gossip
case InternalStatsChanged(stats) _latestStats = stats
case _ // ignore, not interesting
}
}).withDispatcher(UseDispatcher), name = "clusterEventBusListener")
eventBus.subscribe(listener, classOf[ClusterDomainEvent])
listener
}
// create supervisor for daemons under path "/system/cluster"
private val clusterDaemons: ActorRef = {
system.asInstanceOf[ActorSystemImpl].systemActorOf(Props(new ClusterDaemon(this)).
@ -247,26 +257,11 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector)
def seedNodes: IndexedSeq[Address] = SeedNodes
/**
* Registers a listener to subscribe to cluster membership changes.
* Subscribe to cluster domain events.
* The `to` Class can be [[akka.cluster.ClusterEvent.ClusterDomainEvent]]
* or subclass.
*/
@tailrec
final def registerListener(listener: MembershipChangeListener): Unit = {
val localState = state.get
val newListeners = localState.memberMembershipChangeListeners + listener
val newState = localState copy (memberMembershipChangeListeners = newListeners)
if (!state.compareAndSet(localState, newState)) registerListener(listener) // recur
}
/**
* Unsubscribes to cluster membership changes.
*/
@tailrec
final def unregisterListener(listener: MembershipChangeListener): Unit = {
val localState = state.get
val newListeners = localState.memberMembershipChangeListeners - listener
val newState = localState copy (memberMembershipChangeListeners = newListeners)
if (!state.compareAndSet(localState, newState)) unregisterListener(listener) // recur
}
def subscribe(subscriber: ActorRef, to: Class[_]): Unit = eventBus.subscribe(subscriber, to)
/**
* Try to join this cluster node with the node specified by 'address'.
@ -303,10 +298,9 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector)
if (_isRunning.compareAndSet(true, false)) {
log.info("Cluster Node [{}] - Shutting down cluster Node and cluster daemons...", selfAddress)
// FIXME isTerminated check can be removed when ticket #2221 is fixed
// now it prevents logging if system is shutdown (or in progress of shutdown)
if (!clusterDaemons.isTerminated)
system.stop(clusterDaemons)
system.stop(clusterDaemons)
eventBus.unsubscribe(eventBusListener)
system.stop(eventBusListener)
scheduler.close()
@ -316,41 +310,32 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector)
}
}
/**
* INTERNAL API
*/
private[cluster] def notifyMembershipChangeListeners(members: SortedSet[Member]): Unit = {
// FIXME run callbacks async (to not block the cluster)
state.get.memberMembershipChangeListeners foreach { _ notify members }
}
/**
* INTERNAL API
*/
private[cluster] def latestStats: ClusterStats = _latestStats
/**
* INTERNAL API
*/
private[cluster] def publishLatestGossip(gossip: Gossip): Unit = _latestGossip = gossip
/**
* INTERNAL API
*/
private[cluster] def publishLatestStats(stats: ClusterStats): Unit = _latestStats = stats
}
/**
* Interface for membership change listener.
* Domain events published to the cluster event bus.
*/
trait MembershipChangeListener {
def notify(members: SortedSet[Member]): Unit
object ClusterEvent {
/**
* Marker interface for cluster domain events.
*/
trait ClusterDomainEvent
/**
* Set of cluster members, or their status has changed.
*/
case class MembersChanged(members: SortedSet[Member]) extends ClusterDomainEvent
case class MembershipGossipChanged(gossip: Gossip) extends ClusterDomainEvent
/**
* INTERNAL API
*/
private[cluster] case class InternalStatsChanged(stats: ClusterStats) extends ClusterDomainEvent
}
/**
* Interface for meta data change listener.
*/
trait MetaDataChangeListener {
def notify(meta: Map[String, Array[Byte]]): Unit
}