Publish cluster metrics through the publisher actor.
* To avoid ordering surprises metrics should be published via the same actor that handles the subscriptions and publishes other cluster domain events. * Added missing publish in case of removal of member (had a test failure for that)
This commit is contained in:
parent
3dea880ca3
commit
49b9ec6c2c
3 changed files with 21 additions and 18 deletions
|
|
@ -116,6 +116,7 @@ private[cluster] object InternalClusterAction {
|
|||
case class PublishCurrentClusterState(receiver: Option[ActorRef]) extends SubscriptionMessage
|
||||
|
||||
case class PublishChanges(oldGossip: Gossip, newGossip: Gossip)
|
||||
case class PublishEvent(event: ClusterDomainEvent)
|
||||
case object PublishDone
|
||||
|
||||
}
|
||||
|
|
@ -150,11 +151,13 @@ private[cluster] final class ClusterDaemon(settings: ClusterSettings) extends Ac
|
|||
// cause deadlock. The Cluster extension is currently being created and is waiting
|
||||
// for response from GetClusterCoreRef in its constructor.
|
||||
|
||||
val core = context.actorOf(Props[ClusterCoreDaemon].
|
||||
val publisher = context.actorOf(Props[ClusterDomainEventPublisher].
|
||||
withDispatcher(context.props.dispatcher), name = "publisher")
|
||||
val core = context.actorOf(Props(new ClusterCoreDaemon(publisher)).
|
||||
withDispatcher(context.props.dispatcher), name = "core")
|
||||
val heartbeat = context.actorOf(Props[ClusterHeartbeatDaemon].
|
||||
context.actorOf(Props[ClusterHeartbeatDaemon].
|
||||
withDispatcher(context.props.dispatcher), name = "heartbeat")
|
||||
if (settings.MetricsEnabled) context.actorOf(Props[ClusterMetricsCollector].
|
||||
if (settings.MetricsEnabled) context.actorOf(Props(new ClusterMetricsCollector(publisher)).
|
||||
withDispatcher(context.props.dispatcher), name = "metrics")
|
||||
|
||||
def receive = {
|
||||
|
|
@ -166,7 +169,7 @@ private[cluster] final class ClusterDaemon(settings: ClusterSettings) extends Ac
|
|||
/**
|
||||
* INTERNAL API.
|
||||
*/
|
||||
private[cluster] final class ClusterCoreDaemon extends Actor with ActorLogging {
|
||||
private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Actor with ActorLogging {
|
||||
import ClusterLeaderAction._
|
||||
import InternalClusterAction._
|
||||
import ClusterHeartbeatSender._
|
||||
|
|
@ -189,8 +192,6 @@ private[cluster] final class ClusterCoreDaemon extends Actor with ActorLogging {
|
|||
withDispatcher(UseDispatcher), name = "heartbeatSender")
|
||||
val coreSender = context.actorOf(Props[ClusterCoreSender].
|
||||
withDispatcher(UseDispatcher), name = "coreSender")
|
||||
val publisher = context.actorOf(Props[ClusterDomainEventPublisher].
|
||||
withDispatcher(UseDispatcher), name = "publisher")
|
||||
|
||||
import context.dispatcher
|
||||
|
||||
|
|
|
|||
|
|
@ -193,6 +193,7 @@ private[cluster] final class ClusterDomainEventPublisher extends Actor with Acto
|
|||
case PublishCurrentClusterState(receiver) ⇒ publishCurrentClusterState(receiver)
|
||||
case Subscribe(subscriber, to) ⇒ subscribe(subscriber, to)
|
||||
case Unsubscribe(subscriber, to) ⇒ unsubscribe(subscriber, to)
|
||||
case PublishEvent(event) ⇒ publish(event)
|
||||
case PublishDone ⇒ sender ! PublishDone
|
||||
}
|
||||
|
||||
|
|
@ -207,7 +208,7 @@ private[cluster] final class ClusterDomainEventPublisher extends Actor with Acto
|
|||
leader = latestGossip.leader)
|
||||
receiver match {
|
||||
case Some(ref) ⇒ ref ! state
|
||||
case None ⇒ eventStream publish state
|
||||
case None ⇒ publish(state)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -232,7 +233,7 @@ private[cluster] final class ClusterDomainEventPublisher extends Actor with Acto
|
|||
case x @ LeaderChanged(_) if oldGossip.convergence && newGossip.convergence ⇒
|
||||
// leader changed and immediate convergence
|
||||
leaderChangedState = Some(Right(x))
|
||||
eventStream publish x
|
||||
publish(x)
|
||||
|
||||
case x: LeaderChanged ⇒
|
||||
// publish later, when convergence
|
||||
|
|
@ -243,25 +244,25 @@ private[cluster] final class ClusterDomainEventPublisher extends Actor with Acto
|
|||
leaderChangedState match {
|
||||
case Some(Left(x)) ⇒
|
||||
leaderChangedState = Some(Right(x))
|
||||
eventStream publish x
|
||||
publish(x)
|
||||
|
||||
case _ ⇒ // nothing stashed
|
||||
}
|
||||
eventStream publish event
|
||||
publish(event)
|
||||
|
||||
case MemberUnreachable(m) ⇒
|
||||
eventStream publish event
|
||||
publish(event)
|
||||
// notify DeathWatch about unreachable node
|
||||
eventStream publish AddressTerminated(m.address)
|
||||
publish(AddressTerminated(m.address))
|
||||
|
||||
case _ ⇒
|
||||
// all other events
|
||||
eventStream publish event
|
||||
publish(event)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
def publishInternalStats(currentStats: CurrentInternalStats): Unit = {
|
||||
eventStream publish currentStats
|
||||
}
|
||||
def publishInternalStats(currentStats: CurrentInternalStats): Unit = publish(currentStats)
|
||||
|
||||
def publish(event: AnyRef): Unit = eventStream publish event
|
||||
}
|
||||
|
|
|
|||
|
|
@ -36,7 +36,7 @@ import java.lang.System.{ currentTimeMillis ⇒ newTimestamp }
|
|||
*
|
||||
* @author Helena Edelson
|
||||
*/
|
||||
private[cluster] class ClusterMetricsCollector extends Actor with ActorLogging {
|
||||
private[cluster] class ClusterMetricsCollector(publisher: ActorRef) extends Actor with ActorLogging {
|
||||
|
||||
import InternalClusterAction._
|
||||
import ClusterEvent._
|
||||
|
|
@ -107,6 +107,7 @@ private[cluster] class ClusterMetricsCollector extends Actor with ActorLogging {
|
|||
def removeMember(event: MemberEvent): Unit = {
|
||||
nodes -= event.member.address
|
||||
latestGossip = latestGossip remove event.member.address
|
||||
publish()
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -155,7 +156,7 @@ private[cluster] class ClusterMetricsCollector extends Actor with ActorLogging {
|
|||
/**
|
||||
* Publishes to the event stream.
|
||||
*/
|
||||
def publish(): Unit = context.system.eventStream publish ClusterMetricsChanged(latestGossip.nodes)
|
||||
def publish(): Unit = publisher ! PublishEvent(ClusterMetricsChanged(latestGossip.nodes))
|
||||
|
||||
}
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue