Incorparate review comments, see #2803
This commit is contained in:
parent
d07f331e78
commit
79303a1785
1 changed files with 21 additions and 22 deletions
|
|
@ -147,8 +147,8 @@ object ClusterEvent {
|
|||
}
|
||||
|
||||
/**
|
||||
* Marker interface for membership events published immediately.
|
||||
* All other members might not have seen the state.
|
||||
* Marker interface for membership events published immediately when
|
||||
* it happened. All other members might not have seen the state.
|
||||
*/
|
||||
sealed trait InstantMemberEvent extends ClusterDomainEvent {
|
||||
def member: Member
|
||||
|
|
@ -299,15 +299,13 @@ object ClusterEvent {
|
|||
* INTERNAL API
|
||||
*/
|
||||
private[cluster] def convertToInstantMemberEvents(memberEvents: immutable.Seq[MemberEvent]): immutable.Seq[InstantMemberEvent] =
|
||||
memberEvents map { event ⇒
|
||||
event match {
|
||||
case MemberJoined(m) ⇒ InstantMemberJoined(m)
|
||||
case MemberUp(m) ⇒ InstantMemberUp(m)
|
||||
case MemberDowned(m) ⇒ InstantMemberDowned(m)
|
||||
case MemberLeft(m) ⇒ InstantMemberLeft(m)
|
||||
case MemberExited(m) ⇒ InstantMemberExited(m)
|
||||
case MemberRemoved(m) ⇒ InstantMemberRemoved(m)
|
||||
}
|
||||
memberEvents map {
|
||||
case MemberJoined(m) ⇒ InstantMemberJoined(m)
|
||||
case MemberUp(m) ⇒ InstantMemberUp(m)
|
||||
case MemberDowned(m) ⇒ InstantMemberDowned(m)
|
||||
case MemberLeft(m) ⇒ InstantMemberLeft(m)
|
||||
case MemberExited(m) ⇒ InstantMemberExited(m)
|
||||
case MemberRemoved(m) ⇒ InstantMemberRemoved(m)
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -356,9 +354,11 @@ private[cluster] final class ClusterDomainEventPublisher extends Actor with Acto
|
|||
|
||||
def eventStream: EventStream = context.system.eventStream
|
||||
|
||||
/**
|
||||
* The current snapshot state that is a mix of converged and latest gossip
|
||||
* to mimic what you would have seen if you where listening to the events.
|
||||
*/
|
||||
def publishCurrentClusterState(receiver: Option[ActorRef]): Unit = {
|
||||
// The state is a mix of converged and latest gossip to mimic what you
|
||||
// would have seen if you where listening to the events.
|
||||
val state = CurrentClusterState(
|
||||
members = latestConvergedGossip.members,
|
||||
unreachable = latestGossip.overview.unreachable,
|
||||
|
|
@ -370,19 +370,18 @@ private[cluster] final class ClusterDomainEventPublisher extends Actor with Acto
|
|||
}
|
||||
}
|
||||
|
||||
def publishInstantClusterState(receiver: ActorRef): Unit = {
|
||||
// The state is based on latest gossip to mimic what you
|
||||
// would have seen if you where listening to the InstantMemberEvent stream.
|
||||
/**
|
||||
* Publish the snapshot state that is based on latest gossip to mimic what you
|
||||
* would have seen if you where listening to the InstantMemberEvent stream.
|
||||
*/
|
||||
def publishInstantClusterState(receiver: ActorRef): Unit =
|
||||
receiver ! InstantClusterState(members = latestGossip.members)
|
||||
}
|
||||
|
||||
def subscribe(subscriber: ActorRef, to: Class[_]): Unit = {
|
||||
if (classOf[ClusterDomainEvent] == to) {
|
||||
val isInstantMemberEvent = classOf[InstantMemberEvent].isAssignableFrom(to)
|
||||
if (classOf[ClusterDomainEvent] == to || isInstantMemberEvent)
|
||||
publishInstantClusterState(subscriber)
|
||||
publishCurrentClusterState(Some(subscriber))
|
||||
} else if (classOf[InstantMemberEvent].isAssignableFrom(to))
|
||||
publishInstantClusterState(subscriber)
|
||||
else
|
||||
if (!isInstantMemberEvent)
|
||||
publishCurrentClusterState(Some(subscriber))
|
||||
|
||||
eventStream.subscribe(subscriber, to)
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue