Fine grained events, see #2202

* Defined the domain events in ClusterEvent.scala file
* Produce events from diff  and publish publish to event bus
  from separate actor, ClusterDomainEventPublisher
* Adjustments of tests
This commit is contained in:
Patrik Nordwall 2012-08-19 20:15:22 +02:00
parent 6389e1097b
commit 20a038fdfd
15 changed files with 458 additions and 214 deletions

View file

@ -5,14 +5,17 @@ package akka.cluster
import scala.collection.immutable.SortedSet
import scala.concurrent.util.{ Deadline, Duration }
import scala.concurrent.util.duration._
import scala.concurrent.forkjoin.ThreadLocalRandom
import akka.actor.{ Actor, ActorLogging, ActorRef, Address, Cancellable, Props, ReceiveTimeout, RootActorPath, PoisonPill, Scheduler }
import akka.actor.Status.Failure
import akka.event.EventStream
import akka.pattern.ask
import akka.util.Timeout
import akka.cluster.MemberStatus._
import akka.cluster.ClusterEvent._
import language.existentials
import language.postfixOps
/**
* Base trait for all cluster messages. All ClusterMessage's are serializable.
@ -94,8 +97,12 @@ private[cluster] object InternalClusterAction {
case object GetClusterCoreRef
case class Subscribe(subscriber: ActorRef, to: Class[_])
case class Unsubscribe(subscriber: ActorRef)
sealed trait SubscriptionMessage
case class Subscribe(subscriber: ActorRef, to: Class[_]) extends SubscriptionMessage
case class Unsubscribe(subscriber: ActorRef) extends SubscriptionMessage
case class PublishChanges(oldGossip: Gossip, newGossip: Gossip)
case object PublishDone
case class Ping(timestamp: Long = System.currentTimeMillis) extends ClusterMessage
case class Pong(ping: Ping, timestamp: Long = System.currentTimeMillis) extends ClusterMessage
@ -183,6 +190,8 @@ private[cluster] final class ClusterCoreDaemon(environment: ClusterEnvironment)
withDispatcher(UseDispatcher), name = "heartbeatSender")
val coreSender = context.actorOf(Props(new ClusterCoreSender(selfAddress)).
withDispatcher(UseDispatcher), name = "coreSender")
val publisher = context.actorOf(Props(new ClusterDomainEventPublisher(environment)).
withDispatcher(UseDispatcher), name = "publisher")
import context.dispatcher
@ -237,11 +246,10 @@ private[cluster] final class ClusterCoreDaemon(environment: ClusterEnvironment)
}
def uninitialized: Actor.Receive = {
case InitJoin // skip, not ready yet
case JoinTo(address) join(address)
case Subscribe(subscriber, to) subscribe(subscriber, to)
case Unsubscribe(subscriber) unsubscribe(subscriber)
case _: Tick // ignore periodic tasks until initialized
case InitJoin // skip, not ready yet
case JoinTo(address) join(address)
case msg: SubscriptionMessage publisher forward msg
case _: Tick // ignore periodic tasks until initialized
}
def initialized: Actor.Receive = {
@ -260,12 +268,16 @@ private[cluster] final class ClusterCoreDaemon(environment: ClusterEnvironment)
case Exit(address) exiting(address)
case Remove(address) removing(address)
case SendGossipTo(address) gossipTo(address)
case Subscribe(subscriber, to) subscribe(subscriber, to)
case Unsubscribe(subscriber) unsubscribe(subscriber)
case msg: SubscriptionMessage publisher forward msg
case p: Ping ping(p)
}
def removed: Actor.Receive = {
case msg: SubscriptionMessage publisher forward msg
case _: Tick // ignore periodic tasks
}
def receive = uninitialized
def initJoin(): Unit = sender ! InitJoinAck(selfAddress)
@ -275,21 +287,23 @@ private[cluster] final class ClusterCoreDaemon(environment: ClusterEnvironment)
* A 'Join(thisNodeAddress)' command is sent to the node to join.
*/
def join(address: Address): Unit = {
val localGossip = latestGossip
// wipe our state since a node that joins a cluster must be empty
latestGossip = Gossip()
joinInProgress = Map(address -> (Deadline.now + JoinTimeout))
if (!latestGossip.members.exists(_.address == address)) {
val localGossip = latestGossip
// wipe our state since a node that joins a cluster must be empty
latestGossip = Gossip()
joinInProgress = Map(address -> (Deadline.now + JoinTimeout))
// wipe the failure detector since we are starting fresh and shouldn't care about the past
failureDetector.reset()
// wipe the failure detector since we are starting fresh and shouldn't care about the past
failureDetector.reset()
publish(localGossip)
publish(localGossip)
context.become(initialized)
if (address == selfAddress)
joining(address)
else
coreSender ! SendClusterMessage(address, ClusterUserAction.Join(selfAddress))
context.become(initialized)
if (address == selfAddress)
joining(address)
else
coreSender ! SendClusterMessage(address, ClusterUserAction.Join(selfAddress))
}
}
/**
@ -374,9 +388,12 @@ private[cluster] final class ClusterCoreDaemon(environment: ClusterEnvironment)
val localGossip = latestGossip
// just cleaning up the gossip state
latestGossip = Gossip()
// make sure the final (removed) state is always published
publish(localGossip)
environment.shutdown()
context.become(removed)
// make sure the final (removed) state is published
// before shutting down
implicit val timeout = Timeout(5 seconds)
publisher ? PublishDone onComplete { case _ environment.shutdown() }
}
/**
@ -591,12 +608,10 @@ private[cluster] final class ClusterCoreDaemon(environment: ClusterEnvironment)
// 3. Non-exiting remain -- When all partition handoff has completed
// 4. Move EXITING => REMOVED -- When all nodes have seen that the node is EXITING (convergence) - remove the nodes from the node ring and seen table
// 5. Move UNREACHABLE => DOWN -- When the node is in the UNREACHABLE set it can be auto-down by leader
// 5. Store away all stuff needed for the side-effecting processing in 10.
// 6. Updating the vclock version for the changes
// 7. Updating the 'seen' table
// 8. Try to update the state with the new gossip
// 9. If failure - retry
// 10. If success - run all the side-effecting processing
// 9. If success - run all the side-effecting processing
val (
newGossip: Gossip,
@ -816,64 +831,12 @@ private[cluster] final class ClusterCoreDaemon(environment: ClusterEnvironment)
def gossipTo(address: Address, gossipMsg: GossipEnvelope): Unit = if (address != selfAddress)
coreSender ! SendClusterMessage(address, gossipMsg)
def subscribe(subscriber: ActorRef, to: Class[_]): Unit = {
subscriber ! CurrentClusterState(
members = latestGossip.members,
unreachable = latestGossip.overview.unreachable,
convergence = latestGossip.convergence,
seenBy = latestGossip.seenBy,
leader = latestGossip.leader)
eventStream.subscribe(subscriber, to)
}
def unsubscribe(subscriber: ActorRef): Unit =
eventStream.unsubscribe(subscriber)
def publish(oldGossip: Gossip): Unit = {
publishMembers(oldGossip)
publishUnreachableMembers(oldGossip)
publishLeader(oldGossip)
publishSeen(oldGossip)
publisher ! PublishChanges(oldGossip, latestGossip)
if (PublishStatsInterval == Duration.Zero) publishInternalStats()
}
def publishMembers(oldGossip: Gossip): Unit = {
if (!isSame(oldGossip.members, latestGossip.members))
eventStream publish MembersChanged(latestGossip.members)
}
def publishUnreachableMembers(oldGossip: Gossip): Unit = {
if (!isSame(oldGossip.overview.unreachable, latestGossip.overview.unreachable))
eventStream publish UnreachableMembersChanged(latestGossip.overview.unreachable)
}
def isSame(oldMembers: Set[Member], newMembers: Set[Member]): Boolean = {
def oldMembersStatus = oldMembers.map(m (m.address, m.status))
def newMembersStatus = newMembers.map(m (m.address, m.status))
(newMembers eq oldMembers) || ((newMembers.size == oldMembers.size) && (newMembersStatus == oldMembersStatus))
}
def publishLeader(oldGossip: Gossip): Unit = {
if (latestGossip.leader != oldGossip.leader || latestGossip.convergence != oldGossip.convergence)
eventStream publish LeaderChanged(latestGossip.leader, latestGossip.convergence)
}
def publishSeen(oldGossip: Gossip): Unit = {
val oldConvergence = oldGossip.convergence
val newConvergence = latestGossip.convergence
val oldSeenBy = oldGossip.seenBy
val newSeenBy = latestGossip.seenBy
if (newConvergence != oldConvergence || newSeenBy != oldSeenBy) {
eventStream publish SeenChanged(newConvergence, newSeenBy)
}
}
def publishInternalStats(): Unit = {
eventStream publish CurrentInternalStats(stats)
}
def eventStream: EventStream = context.system.eventStream
def publishInternalStats(): Unit = publisher ! CurrentInternalStats(stats)
def ping(p: Ping): Unit = sender ! Pong(p)
}
@ -944,53 +907,6 @@ private[cluster] final class ClusterCoreSender(selfAddress: Address) extends Act
}
}
/**
* Domain events published to the event bus.
*/
object ClusterEvent {
/**
* Marker interface for cluster domain events.
*/
sealed trait ClusterDomainEvent
/**
* Current snapshot state of the cluster. Sent to new subscriber.
*/
case class CurrentClusterState(
members: SortedSet[Member] = SortedSet.empty,
unreachable: Set[Member] = Set.empty,
convergence: Boolean = false,
seenBy: Set[Address] = Set.empty,
leader: Option[Address] = None) extends ClusterDomainEvent
/**
* Set of cluster members or their status have changed.
*/
case class MembersChanged(members: SortedSet[Member]) extends ClusterDomainEvent
/**
* Set of unreachable cluster members or their status have changed.
*/
case class UnreachableMembersChanged(unreachable: Set[Member]) extends ClusterDomainEvent
/**
* Leader of the cluster members changed, and/or convergence status.
*/
case class LeaderChanged(leader: Option[Address], convergence: Boolean) extends ClusterDomainEvent
/**
* INTERNAL API
* The nodes that have seen current version of the Gossip.
*/
private[cluster] case class SeenChanged(convergence: Boolean, seenBy: Set[Address]) extends ClusterDomainEvent
/**
* INTERNAL API
*/
private[cluster] case class CurrentInternalStats(stats: ClusterStats) extends ClusterDomainEvent
}
/**
* INTERNAL API
*/