diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterEvent.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterEvent.scala index f804f1f5ac..509536b3d8 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterEvent.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterEvent.scala @@ -147,8 +147,8 @@ object ClusterEvent { } /** - * Marker interface for membership events published immediately. - * All other members might not have seen the state. + * Marker interface for membership events published immediately when + * it happened. All other members might not have seen the state. */ sealed trait InstantMemberEvent extends ClusterDomainEvent { def member: Member @@ -299,15 +299,13 @@ object ClusterEvent { * INTERNAL API */ private[cluster] def convertToInstantMemberEvents(memberEvents: immutable.Seq[MemberEvent]): immutable.Seq[InstantMemberEvent] = - memberEvents map { event ⇒ - event match { - case MemberJoined(m) ⇒ InstantMemberJoined(m) - case MemberUp(m) ⇒ InstantMemberUp(m) - case MemberDowned(m) ⇒ InstantMemberDowned(m) - case MemberLeft(m) ⇒ InstantMemberLeft(m) - case MemberExited(m) ⇒ InstantMemberExited(m) - case MemberRemoved(m) ⇒ InstantMemberRemoved(m) - } + memberEvents map { + case MemberJoined(m) ⇒ InstantMemberJoined(m) + case MemberUp(m) ⇒ InstantMemberUp(m) + case MemberDowned(m) ⇒ InstantMemberDowned(m) + case MemberLeft(m) ⇒ InstantMemberLeft(m) + case MemberExited(m) ⇒ InstantMemberExited(m) + case MemberRemoved(m) ⇒ InstantMemberRemoved(m) } /** @@ -356,9 +354,11 @@ private[cluster] final class ClusterDomainEventPublisher extends Actor with Acto def eventStream: EventStream = context.system.eventStream + /** + * The current snapshot state that is a mix of converged and latest gossip + * to mimic what you would have seen if you where listening to the events. + */ def publishCurrentClusterState(receiver: Option[ActorRef]): Unit = { - // The state is a mix of converged and latest gossip to mimic what you - // would have seen if you where listening to the events. val state = CurrentClusterState( members = latestConvergedGossip.members, unreachable = latestGossip.overview.unreachable, @@ -370,19 +370,18 @@ private[cluster] final class ClusterDomainEventPublisher extends Actor with Acto } } - def publishInstantClusterState(receiver: ActorRef): Unit = { - // The state is based on latest gossip to mimic what you - // would have seen if you where listening to the InstantMemberEvent stream. + /** + * Publish the snapshot state that is based on latest gossip to mimic what you + * would have seen if you where listening to the InstantMemberEvent stream. + */ + def publishInstantClusterState(receiver: ActorRef): Unit = receiver ! InstantClusterState(members = latestGossip.members) - } def subscribe(subscriber: ActorRef, to: Class[_]): Unit = { - if (classOf[ClusterDomainEvent] == to) { + val isInstantMemberEvent = classOf[InstantMemberEvent].isAssignableFrom(to) + if (classOf[ClusterDomainEvent] == to || isInstantMemberEvent) publishInstantClusterState(subscriber) - publishCurrentClusterState(Some(subscriber)) - } else if (classOf[InstantMemberEvent].isAssignableFrom(to)) - publishInstantClusterState(subscriber) - else + if (!isInstantMemberEvent) publishCurrentClusterState(Some(subscriber)) eventStream.subscribe(subscriber, to)