diff --git a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala index 1467df7e92..4d21d8108d 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala @@ -249,24 +249,13 @@ class Cluster(val system: ExtendedActorSystem) extends Extension { clusterCore ! InternalClusterAction.Unsubscribe(subscriber, Some(to)) /** - * Publish current (full) state of the cluster to subscribers, - * that are subscribing to [[akka.cluster.ClusterEvent.ClusterDomainEvent]] - * or [[akka.cluster.ClusterEvent.CurrentClusterState]]. - * If you want this to happen periodically you need to schedule a call to - * this method yourself. - */ - @deprecated("Use sendCurrentClusterState instead of publishCurrentClusterState", "2.3") - def publishCurrentClusterState(): Unit = - clusterCore ! InternalClusterAction.PublishCurrentClusterState(None) - - /** - * Publish current (full) state of the cluster to the specified + * Send current (full) state of the cluster to the specified * receiver. If you want this to happen periodically you need to schedule * a call to this method yourself. Note that you can also retrieve the current * state with [[#state]]. */ def sendCurrentClusterState(receiver: ActorRef): Unit = - clusterCore ! InternalClusterAction.PublishCurrentClusterState(Some(receiver)) + clusterCore ! InternalClusterAction.SendCurrentClusterState(receiver) /** * Try to join this cluster node with the node specified by 'address'. diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala index d2b91e3f12..08e1a1b6bd 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala @@ -137,10 +137,9 @@ private[cluster] object InternalClusterAction { final case class Subscribe(subscriber: ActorRef, initialStateMode: SubscriptionInitialStateMode, to: Set[Class[_]]) extends SubscriptionMessage final case class Unsubscribe(subscriber: ActorRef, to: Option[Class[_]]) extends SubscriptionMessage /** - * @param receiver if `receiver` is defined the event will only be sent to that - * actor, otherwise it will be sent to all subscribers via the `eventStream`. + * @param receiver [[akka.cluster.ClusterEvent.CurrentClusterState]] will be sent to the `receiver` */ - final case class PublishCurrentClusterState(receiver: Option[ActorRef]) extends SubscriptionMessage + final case class SendCurrentClusterState(receiver: ActorRef) extends SubscriptionMessage sealed trait PublishMessage final case class PublishChanges(newGossip: Gossip) extends PublishMessage diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterEvent.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterEvent.scala index f90261a65a..0076821323 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterEvent.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterEvent.scala @@ -328,12 +328,12 @@ private[cluster] final class ClusterDomainEventPublisher extends Actor with Acto } def receive = { - case PublishChanges(newGossip) ⇒ publishChanges(newGossip) - case currentStats: CurrentInternalStats ⇒ publishInternalStats(currentStats) - case PublishCurrentClusterState(receiver) ⇒ publishCurrentClusterState(receiver) - case Subscribe(subscriber, initMode, to) ⇒ subscribe(subscriber, initMode, to) - case Unsubscribe(subscriber, to) ⇒ unsubscribe(subscriber, to) - case PublishEvent(event) ⇒ publish(event) + case PublishChanges(newGossip) ⇒ publishChanges(newGossip) + case currentStats: CurrentInternalStats ⇒ publishInternalStats(currentStats) + case SendCurrentClusterState(receiver) ⇒ sendCurrentClusterState(receiver) + case Subscribe(subscriber, initMode, to) ⇒ subscribe(subscriber, initMode, to) + case Unsubscribe(subscriber, to) ⇒ unsubscribe(subscriber, to) + case PublishEvent(event) ⇒ publish(event) } def eventStream: EventStream = context.system.eventStream @@ -342,17 +342,14 @@ private[cluster] final class ClusterDomainEventPublisher extends Actor with Acto * The current snapshot state corresponding to latest gossip * to mimic what you would have seen if you were listening to the events. */ - def publishCurrentClusterState(receiver: Option[ActorRef]): Unit = { + def sendCurrentClusterState(receiver: ActorRef): Unit = { val state = CurrentClusterState( members = latestGossip.members, unreachable = latestGossip.overview.reachability.allUnreachableOrTerminated map latestGossip.member, seenBy = latestGossip.seenBy.map(_.address), leader = latestGossip.leader.map(_.address), roleLeaderMap = latestGossip.allRoles.map(r ⇒ r -> latestGossip.roleLeader(r).map(_.address))(collection.breakOut)) - receiver match { - case Some(ref) ⇒ ref ! state - case None ⇒ publish(state) - } + receiver ! state } def subscribe(subscriber: ActorRef, initMode: SubscriptionInitialStateMode, to: Set[Class[_]]): Unit = { @@ -364,7 +361,7 @@ private[cluster] final class ClusterDomainEventPublisher extends Actor with Acto } publishDiff(Gossip.empty, latestGossip, pub) case InitialStateAsSnapshot ⇒ - publishCurrentClusterState(Some(subscriber)) + sendCurrentClusterState(subscriber) } to foreach { eventStream.subscribe(subscriber, _) } @@ -397,12 +394,6 @@ private[cluster] final class ClusterDomainEventPublisher extends Actor with Acto def publish(event: AnyRef): Unit = eventStream publish event - def publishStart(): Unit = - if (latestGossip ne Gossip.empty) { - clearState() - publishCurrentClusterState(None) - } - def clearState(): Unit = { latestGossip = Gossip.empty } diff --git a/akka-cluster/src/test/scala/akka/cluster/ClusterSpec.scala b/akka-cluster/src/test/scala/akka/cluster/ClusterSpec.scala index 58b5975d52..0aa2a67bb3 100644 --- a/akka-cluster/src/test/scala/akka/cluster/ClusterSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/ClusterSpec.scala @@ -86,19 +86,6 @@ class ClusterSpec extends AkkaSpec(ClusterSpec.config) with ImplicitSender { } } - "publish CurrentClusterState to subscribers when requested" in { - try { - cluster.subscribe(testActor, classOf[ClusterEvent.ClusterDomainEvent], classOf[ClusterEvent.CurrentClusterState]) - // first, is in response to the subscription - expectMsgClass(classOf[ClusterEvent.CurrentClusterState]) - - cluster.publishCurrentClusterState() - expectMsgClass(classOf[ClusterEvent.CurrentClusterState]) - } finally { - cluster.unsubscribe(testActor) - } - } - "send CurrentClusterState to one receiver when requested" in { cluster.sendCurrentClusterState(testActor) expectMsgClass(classOf[ClusterEvent.CurrentClusterState]) diff --git a/akka-docs/rst/project/migration-guide-2.3.x-2.4.x.rst b/akka-docs/rst/project/migration-guide-2.3.x-2.4.x.rst index ba539ef13a..361ef2752e 100644 --- a/akka-docs/rst/project/migration-guide-2.3.x-2.4.x.rst +++ b/akka-docs/rst/project/migration-guide-2.3.x-2.4.x.rst @@ -27,3 +27,4 @@ The following, previously deprecated, features have been removed: * akka-dataflow * akka-transactor * durable mailboxes (akka-mailboxes-common, akka-file-mailbox) +* Cluster.publishCurrentClusterState