diff --git a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala index 647f335b8c..9a4dcaa62b 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala @@ -189,6 +189,24 @@ class Cluster(val system: ExtendedActorSystem) extends Extension { def unsubscribe(subscriber: ActorRef): Unit = clusterCore ! InternalClusterAction.Unsubscribe(subscriber) + /** + * Publish current (full) state of the cluster to subscribers, + * that are subscribing to [[akka.cluster.ClusterEvent.ClusterDomainEvent]] + * or [[akka.cluster.ClusterEvent.CurrentClusterState]]. + * If you want this to happen periodically you need to schedule a call to + * this method yourself. + */ + def publishCurrentClusterState(): Unit = + clusterCore ! InternalClusterAction.PublishCurrentClusterState(None) + + /** + * Publish current (full) state of the cluster to the specified + * receiver. If you want this to happen periodically you need to schedule + * a call to this method yourself. + */ + def sendCurrentClusterState(receiver: ActorRef): Unit = + clusterCore ! InternalClusterAction.PublishCurrentClusterState(Some(receiver)) + /** * Try to join this cluster node with the node specified by 'address'. * A 'Join(thisNodeAddress)' command is sent to the node to join. diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala index f84bf100ea..9557a20f7a 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala @@ -106,13 +106,15 @@ private[cluster] object InternalClusterAction { sealed trait SubscriptionMessage case class Subscribe(subscriber: ActorRef, to: Class[_]) extends SubscriptionMessage case class Unsubscribe(subscriber: ActorRef) extends SubscriptionMessage + /** + * @param receiver if `receiver` is defined the event will only be sent to that + * actor, otherwise it will be sent to all subscribers via the `eventStream`. + */ + case class PublishCurrentClusterState(receiver: Option[ActorRef]) extends SubscriptionMessage case class PublishChanges(oldGossip: Gossip, newGossip: Gossip) case object PublishDone - case class Ping(timestamp: Long = System.currentTimeMillis) extends ClusterMessage - case class Pong(ping: Ping, timestamp: Long = System.currentTimeMillis) extends ClusterMessage - } /** @@ -255,7 +257,6 @@ private[cluster] final class ClusterCoreDaemon extends Actor with ActorLogging { case Remove(address) ⇒ removing(address) case SendGossipTo(address) ⇒ gossipTo(address) case msg: SubscriptionMessage ⇒ publisher forward msg - case p: Ping ⇒ ping(p) } @@ -831,7 +832,6 @@ private[cluster] final class ClusterCoreDaemon extends Actor with ActorLogging { def publishInternalStats(): Unit = publisher ! CurrentInternalStats(stats) - def ping(p: Ping): Unit = sender ! Pong(p) } /** diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterEvent.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterEvent.scala index 4bb0105413..ff23f7b12f 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterEvent.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterEvent.scala @@ -177,6 +177,7 @@ private[cluster] final class ClusterDomainEventPublisher extends Actor with Acto def receive = { case PublishChanges(oldGossip, newGossip) ⇒ publishChanges(oldGossip, newGossip) case currentStats: CurrentInternalStats ⇒ publishInternalStats(currentStats) + case PublishCurrentClusterState(receiver) ⇒ publishCurrentClusterState(receiver) case Subscribe(subscriber, to) ⇒ subscribe(subscriber, to) case Unsubscribe(subscriber) ⇒ unsubscribe(subscriber) case PublishDone ⇒ sender ! PublishDone @@ -184,13 +185,21 @@ private[cluster] final class ClusterDomainEventPublisher extends Actor with Acto def eventStream: EventStream = context.system.eventStream - def subscribe(subscriber: ActorRef, to: Class[_]): Unit = { - subscriber ! CurrentClusterState( + def publishCurrentClusterState(receiver: Option[ActorRef]): Unit = { + val state = CurrentClusterState( members = latestGossip.members, unreachable = latestGossip.overview.unreachable, convergence = latestGossip.convergence, seenBy = latestGossip.seenBy, leader = latestGossip.leader) + receiver match { + case Some(ref) ⇒ ref ! state + case None ⇒ eventStream publish state + } + } + + def subscribe(subscriber: ActorRef, to: Class[_]): Unit = { + publishCurrentClusterState(Some(subscriber)) eventStream.subscribe(subscriber, to) } diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/TransitionSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/TransitionSpec.scala index 4df6032116..c57a03bc8e 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/TransitionSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/TransitionSpec.scala @@ -67,21 +67,11 @@ abstract class TransitionSpec memberStatus(address) == status } - def leaderActions(): Unit = { + def leaderActions(): Unit = cluster.clusterCore ! LeaderActionsTick - awaitPing() - } - def reapUnreachable(): Unit = { + def reapUnreachable(): Unit = cluster.clusterCore ! ReapUnreachableTick - awaitPing() - } - - def awaitPing(): Unit = { - val ping = Ping() - cluster.clusterCore ! ping - expectMsgPF() { case pong @ Pong(`ping`, _) ⇒ pong } - } // DSL sugar for `role1 gossipTo role2` implicit def roleExtras(role: RoleName): RoleWrapper = new RoleWrapper(role) diff --git a/akka-cluster/src/test/scala/akka/cluster/ClusterSpec.scala b/akka-cluster/src/test/scala/akka/cluster/ClusterSpec.scala index 6666e38cce..8edbdd1669 100644 --- a/akka-cluster/src/test/scala/akka/cluster/ClusterSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/ClusterSpec.scala @@ -6,10 +6,8 @@ package akka.cluster import language.postfixOps import language.reflectiveCalls - import scala.concurrent.util.duration._ import scala.concurrent.util.Duration - import akka.testkit.AkkaSpec import akka.testkit.ImplicitSender import akka.actor.ExtendedActorSystem @@ -17,6 +15,7 @@ import akka.actor.Address import akka.cluster.InternalClusterAction._ import java.lang.management.ManagementFactory import javax.management.ObjectName +import akka.actor.ActorRef object ClusterSpec { val config = """ @@ -45,16 +44,8 @@ class ClusterSpec extends AkkaSpec(ClusterSpec.config) with ImplicitSender { val cluster = Cluster(system) def clusterView = cluster.readView - def leaderActions(): Unit = { + def leaderActions(): Unit = cluster.clusterCore ! LeaderActionsTick - awaitPing() - } - - def awaitPing(): Unit = { - val ping = Ping() - cluster.clusterCore ! ping - expectMsgPF() { case pong @ Pong(`ping`, _) ⇒ pong } - } "A Cluster" must { @@ -79,7 +70,25 @@ class ClusterSpec extends AkkaSpec(ClusterSpec.config) with ImplicitSender { clusterView.status must be(MemberStatus.Joining) clusterView.convergence must be(true) leaderActions() - clusterView.status must be(MemberStatus.Up) + awaitCond(clusterView.status == MemberStatus.Up) + } + + "publish CurrentClusterState to subscribers when requested" in { + try { + cluster.subscribe(testActor, classOf[ClusterEvent.ClusterDomainEvent]) + // first, is in response to the subscription + expectMsgClass(classOf[ClusterEvent.ClusterDomainEvent]) + + cluster.publishCurrentClusterState() + expectMsgClass(classOf[ClusterEvent.ClusterDomainEvent]) + } finally { + cluster.unsubscribe(testActor) + } + } + + "send CurrentClusterState to one receiver when requested" in { + cluster.sendCurrentClusterState(testActor) + expectMsgClass(classOf[ClusterEvent.ClusterDomainEvent]) } }