diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala index 5be014ab2e..6012c48f45 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala @@ -116,6 +116,7 @@ private[cluster] object InternalClusterAction { case class PublishCurrentClusterState(receiver: Option[ActorRef]) extends SubscriptionMessage case class PublishChanges(oldGossip: Gossip, newGossip: Gossip) + case class PublishEvent(event: ClusterDomainEvent) case object PublishDone } @@ -150,11 +151,13 @@ private[cluster] final class ClusterDaemon(settings: ClusterSettings) extends Ac // cause deadlock. The Cluster extension is currently being created and is waiting // for response from GetClusterCoreRef in its constructor. - val core = context.actorOf(Props[ClusterCoreDaemon]. + val publisher = context.actorOf(Props[ClusterDomainEventPublisher]. + withDispatcher(context.props.dispatcher), name = "publisher") + val core = context.actorOf(Props(new ClusterCoreDaemon(publisher)). withDispatcher(context.props.dispatcher), name = "core") - val heartbeat = context.actorOf(Props[ClusterHeartbeatDaemon]. + context.actorOf(Props[ClusterHeartbeatDaemon]. withDispatcher(context.props.dispatcher), name = "heartbeat") - if (settings.MetricsEnabled) context.actorOf(Props[ClusterMetricsCollector]. + if (settings.MetricsEnabled) context.actorOf(Props(new ClusterMetricsCollector(publisher)). withDispatcher(context.props.dispatcher), name = "metrics") def receive = { @@ -166,7 +169,7 @@ private[cluster] final class ClusterDaemon(settings: ClusterSettings) extends Ac /** * INTERNAL API. */ -private[cluster] final class ClusterCoreDaemon extends Actor with ActorLogging { +private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Actor with ActorLogging { import ClusterLeaderAction._ import InternalClusterAction._ import ClusterHeartbeatSender._ @@ -189,8 +192,6 @@ private[cluster] final class ClusterCoreDaemon extends Actor with ActorLogging { withDispatcher(UseDispatcher), name = "heartbeatSender") val coreSender = context.actorOf(Props[ClusterCoreSender]. withDispatcher(UseDispatcher), name = "coreSender") - val publisher = context.actorOf(Props[ClusterDomainEventPublisher]. - withDispatcher(UseDispatcher), name = "publisher") import context.dispatcher diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterEvent.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterEvent.scala index 821117e5e7..0695180ac2 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterEvent.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterEvent.scala @@ -193,6 +193,7 @@ private[cluster] final class ClusterDomainEventPublisher extends Actor with Acto case PublishCurrentClusterState(receiver) ⇒ publishCurrentClusterState(receiver) case Subscribe(subscriber, to) ⇒ subscribe(subscriber, to) case Unsubscribe(subscriber, to) ⇒ unsubscribe(subscriber, to) + case PublishEvent(event) ⇒ publish(event) case PublishDone ⇒ sender ! PublishDone } @@ -207,7 +208,7 @@ private[cluster] final class ClusterDomainEventPublisher extends Actor with Acto leader = latestGossip.leader) receiver match { case Some(ref) ⇒ ref ! state - case None ⇒ eventStream publish state + case None ⇒ publish(state) } } @@ -232,7 +233,7 @@ private[cluster] final class ClusterDomainEventPublisher extends Actor with Acto case x @ LeaderChanged(_) if oldGossip.convergence && newGossip.convergence ⇒ // leader changed and immediate convergence leaderChangedState = Some(Right(x)) - eventStream publish x + publish(x) case x: LeaderChanged ⇒ // publish later, when convergence @@ -243,25 +244,25 @@ private[cluster] final class ClusterDomainEventPublisher extends Actor with Acto leaderChangedState match { case Some(Left(x)) ⇒ leaderChangedState = Some(Right(x)) - eventStream publish x + publish(x) case _ ⇒ // nothing stashed } - eventStream publish event + publish(event) case MemberUnreachable(m) ⇒ - eventStream publish event + publish(event) // notify DeathWatch about unreachable node - eventStream publish AddressTerminated(m.address) + publish(AddressTerminated(m.address)) case _ ⇒ // all other events - eventStream publish event + publish(event) } } } - def publishInternalStats(currentStats: CurrentInternalStats): Unit = { - eventStream publish currentStats - } + def publishInternalStats(currentStats: CurrentInternalStats): Unit = publish(currentStats) + + def publish(event: AnyRef): Unit = eventStream publish event } diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterMetricsCollector.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterMetricsCollector.scala index 1e0c314f21..ba086b600c 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterMetricsCollector.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterMetricsCollector.scala @@ -36,7 +36,7 @@ import java.lang.System.{ currentTimeMillis ⇒ newTimestamp } * * @author Helena Edelson */ -private[cluster] class ClusterMetricsCollector extends Actor with ActorLogging { +private[cluster] class ClusterMetricsCollector(publisher: ActorRef) extends Actor with ActorLogging { import InternalClusterAction._ import ClusterEvent._ @@ -107,6 +107,7 @@ private[cluster] class ClusterMetricsCollector extends Actor with ActorLogging { def removeMember(event: MemberEvent): Unit = { nodes -= event.member.address latestGossip = latestGossip remove event.member.address + publish() } /** @@ -155,7 +156,7 @@ private[cluster] class ClusterMetricsCollector extends Actor with ActorLogging { /** * Publishes to the event stream. */ - def publish(): Unit = context.system.eventStream publish ClusterMetricsChanged(latestGossip.nodes) + def publish(): Unit = publisher ! PublishEvent(ClusterMetricsChanged(latestGossip.nodes)) }