Request send/publish of CurrentClusterState, see #2438
* Added publishCurrentClusterState and sendCurrentClusterState * Removed Ping/Pong that was used for some tests, since awaitCond is now needed anyway, since publish to eventStream is done afterwards
This commit is contained in:
parent
e55cde2591
commit
50d0efe7d4
5 changed files with 57 additions and 31 deletions
|
|
@ -177,6 +177,7 @@ private[cluster] final class ClusterDomainEventPublisher extends Actor with Acto
|
|||
def receive = {
|
||||
case PublishChanges(oldGossip, newGossip) ⇒ publishChanges(oldGossip, newGossip)
|
||||
case currentStats: CurrentInternalStats ⇒ publishInternalStats(currentStats)
|
||||
case PublishCurrentClusterState(receiver) ⇒ publishCurrentClusterState(receiver)
|
||||
case Subscribe(subscriber, to) ⇒ subscribe(subscriber, to)
|
||||
case Unsubscribe(subscriber) ⇒ unsubscribe(subscriber)
|
||||
case PublishDone ⇒ sender ! PublishDone
|
||||
|
|
@ -184,13 +185,21 @@ private[cluster] final class ClusterDomainEventPublisher extends Actor with Acto
|
|||
|
||||
def eventStream: EventStream = context.system.eventStream
|
||||
|
||||
def subscribe(subscriber: ActorRef, to: Class[_]): Unit = {
|
||||
subscriber ! CurrentClusterState(
|
||||
def publishCurrentClusterState(receiver: Option[ActorRef]): Unit = {
|
||||
val state = CurrentClusterState(
|
||||
members = latestGossip.members,
|
||||
unreachable = latestGossip.overview.unreachable,
|
||||
convergence = latestGossip.convergence,
|
||||
seenBy = latestGossip.seenBy,
|
||||
leader = latestGossip.leader)
|
||||
receiver match {
|
||||
case Some(ref) ⇒ ref ! state
|
||||
case None ⇒ eventStream publish state
|
||||
}
|
||||
}
|
||||
|
||||
def subscribe(subscriber: ActorRef, to: Class[_]): Unit = {
|
||||
publishCurrentClusterState(Some(subscriber))
|
||||
eventStream.subscribe(subscriber, to)
|
||||
}
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue