!clu #3920 Remove deprecated Cluster.publishCurrentClusterState
This commit is contained in:
parent
4b977361eb
commit
503c4ced8f
5 changed files with 14 additions and 47 deletions
|
|
@ -249,24 +249,13 @@ class Cluster(val system: ExtendedActorSystem) extends Extension {
|
||||||
clusterCore ! InternalClusterAction.Unsubscribe(subscriber, Some(to))
|
clusterCore ! InternalClusterAction.Unsubscribe(subscriber, Some(to))
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Publish current (full) state of the cluster to subscribers,
|
* Send current (full) state of the cluster to the specified
|
||||||
* that are subscribing to [[akka.cluster.ClusterEvent.ClusterDomainEvent]]
|
|
||||||
* or [[akka.cluster.ClusterEvent.CurrentClusterState]].
|
|
||||||
* If you want this to happen periodically you need to schedule a call to
|
|
||||||
* this method yourself.
|
|
||||||
*/
|
|
||||||
@deprecated("Use sendCurrentClusterState instead of publishCurrentClusterState", "2.3")
|
|
||||||
def publishCurrentClusterState(): Unit =
|
|
||||||
clusterCore ! InternalClusterAction.PublishCurrentClusterState(None)
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Publish current (full) state of the cluster to the specified
|
|
||||||
* receiver. If you want this to happen periodically you need to schedule
|
* receiver. If you want this to happen periodically you need to schedule
|
||||||
* a call to this method yourself. Note that you can also retrieve the current
|
* a call to this method yourself. Note that you can also retrieve the current
|
||||||
* state with [[#state]].
|
* state with [[#state]].
|
||||||
*/
|
*/
|
||||||
def sendCurrentClusterState(receiver: ActorRef): Unit =
|
def sendCurrentClusterState(receiver: ActorRef): Unit =
|
||||||
clusterCore ! InternalClusterAction.PublishCurrentClusterState(Some(receiver))
|
clusterCore ! InternalClusterAction.SendCurrentClusterState(receiver)
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Try to join this cluster node with the node specified by 'address'.
|
* Try to join this cluster node with the node specified by 'address'.
|
||||||
|
|
|
||||||
|
|
@ -137,10 +137,9 @@ private[cluster] object InternalClusterAction {
|
||||||
final case class Subscribe(subscriber: ActorRef, initialStateMode: SubscriptionInitialStateMode, to: Set[Class[_]]) extends SubscriptionMessage
|
final case class Subscribe(subscriber: ActorRef, initialStateMode: SubscriptionInitialStateMode, to: Set[Class[_]]) extends SubscriptionMessage
|
||||||
final case class Unsubscribe(subscriber: ActorRef, to: Option[Class[_]]) extends SubscriptionMessage
|
final case class Unsubscribe(subscriber: ActorRef, to: Option[Class[_]]) extends SubscriptionMessage
|
||||||
/**
|
/**
|
||||||
* @param receiver if `receiver` is defined the event will only be sent to that
|
* @param receiver [[akka.cluster.ClusterEvent.CurrentClusterState]] will be sent to the `receiver`
|
||||||
* actor, otherwise it will be sent to all subscribers via the `eventStream`.
|
|
||||||
*/
|
*/
|
||||||
final case class PublishCurrentClusterState(receiver: Option[ActorRef]) extends SubscriptionMessage
|
final case class SendCurrentClusterState(receiver: ActorRef) extends SubscriptionMessage
|
||||||
|
|
||||||
sealed trait PublishMessage
|
sealed trait PublishMessage
|
||||||
final case class PublishChanges(newGossip: Gossip) extends PublishMessage
|
final case class PublishChanges(newGossip: Gossip) extends PublishMessage
|
||||||
|
|
|
||||||
|
|
@ -328,12 +328,12 @@ private[cluster] final class ClusterDomainEventPublisher extends Actor with Acto
|
||||||
}
|
}
|
||||||
|
|
||||||
def receive = {
|
def receive = {
|
||||||
case PublishChanges(newGossip) ⇒ publishChanges(newGossip)
|
case PublishChanges(newGossip) ⇒ publishChanges(newGossip)
|
||||||
case currentStats: CurrentInternalStats ⇒ publishInternalStats(currentStats)
|
case currentStats: CurrentInternalStats ⇒ publishInternalStats(currentStats)
|
||||||
case PublishCurrentClusterState(receiver) ⇒ publishCurrentClusterState(receiver)
|
case SendCurrentClusterState(receiver) ⇒ sendCurrentClusterState(receiver)
|
||||||
case Subscribe(subscriber, initMode, to) ⇒ subscribe(subscriber, initMode, to)
|
case Subscribe(subscriber, initMode, to) ⇒ subscribe(subscriber, initMode, to)
|
||||||
case Unsubscribe(subscriber, to) ⇒ unsubscribe(subscriber, to)
|
case Unsubscribe(subscriber, to) ⇒ unsubscribe(subscriber, to)
|
||||||
case PublishEvent(event) ⇒ publish(event)
|
case PublishEvent(event) ⇒ publish(event)
|
||||||
}
|
}
|
||||||
|
|
||||||
def eventStream: EventStream = context.system.eventStream
|
def eventStream: EventStream = context.system.eventStream
|
||||||
|
|
@ -342,17 +342,14 @@ private[cluster] final class ClusterDomainEventPublisher extends Actor with Acto
|
||||||
* The current snapshot state corresponding to latest gossip
|
* The current snapshot state corresponding to latest gossip
|
||||||
* to mimic what you would have seen if you were listening to the events.
|
* to mimic what you would have seen if you were listening to the events.
|
||||||
*/
|
*/
|
||||||
def publishCurrentClusterState(receiver: Option[ActorRef]): Unit = {
|
def sendCurrentClusterState(receiver: ActorRef): Unit = {
|
||||||
val state = CurrentClusterState(
|
val state = CurrentClusterState(
|
||||||
members = latestGossip.members,
|
members = latestGossip.members,
|
||||||
unreachable = latestGossip.overview.reachability.allUnreachableOrTerminated map latestGossip.member,
|
unreachable = latestGossip.overview.reachability.allUnreachableOrTerminated map latestGossip.member,
|
||||||
seenBy = latestGossip.seenBy.map(_.address),
|
seenBy = latestGossip.seenBy.map(_.address),
|
||||||
leader = latestGossip.leader.map(_.address),
|
leader = latestGossip.leader.map(_.address),
|
||||||
roleLeaderMap = latestGossip.allRoles.map(r ⇒ r -> latestGossip.roleLeader(r).map(_.address))(collection.breakOut))
|
roleLeaderMap = latestGossip.allRoles.map(r ⇒ r -> latestGossip.roleLeader(r).map(_.address))(collection.breakOut))
|
||||||
receiver match {
|
receiver ! state
|
||||||
case Some(ref) ⇒ ref ! state
|
|
||||||
case None ⇒ publish(state)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
def subscribe(subscriber: ActorRef, initMode: SubscriptionInitialStateMode, to: Set[Class[_]]): Unit = {
|
def subscribe(subscriber: ActorRef, initMode: SubscriptionInitialStateMode, to: Set[Class[_]]): Unit = {
|
||||||
|
|
@ -364,7 +361,7 @@ private[cluster] final class ClusterDomainEventPublisher extends Actor with Acto
|
||||||
}
|
}
|
||||||
publishDiff(Gossip.empty, latestGossip, pub)
|
publishDiff(Gossip.empty, latestGossip, pub)
|
||||||
case InitialStateAsSnapshot ⇒
|
case InitialStateAsSnapshot ⇒
|
||||||
publishCurrentClusterState(Some(subscriber))
|
sendCurrentClusterState(subscriber)
|
||||||
}
|
}
|
||||||
|
|
||||||
to foreach { eventStream.subscribe(subscriber, _) }
|
to foreach { eventStream.subscribe(subscriber, _) }
|
||||||
|
|
@ -397,12 +394,6 @@ private[cluster] final class ClusterDomainEventPublisher extends Actor with Acto
|
||||||
|
|
||||||
def publish(event: AnyRef): Unit = eventStream publish event
|
def publish(event: AnyRef): Unit = eventStream publish event
|
||||||
|
|
||||||
def publishStart(): Unit =
|
|
||||||
if (latestGossip ne Gossip.empty) {
|
|
||||||
clearState()
|
|
||||||
publishCurrentClusterState(None)
|
|
||||||
}
|
|
||||||
|
|
||||||
def clearState(): Unit = {
|
def clearState(): Unit = {
|
||||||
latestGossip = Gossip.empty
|
latestGossip = Gossip.empty
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -86,19 +86,6 @@ class ClusterSpec extends AkkaSpec(ClusterSpec.config) with ImplicitSender {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
"publish CurrentClusterState to subscribers when requested" in {
|
|
||||||
try {
|
|
||||||
cluster.subscribe(testActor, classOf[ClusterEvent.ClusterDomainEvent], classOf[ClusterEvent.CurrentClusterState])
|
|
||||||
// first, is in response to the subscription
|
|
||||||
expectMsgClass(classOf[ClusterEvent.CurrentClusterState])
|
|
||||||
|
|
||||||
cluster.publishCurrentClusterState()
|
|
||||||
expectMsgClass(classOf[ClusterEvent.CurrentClusterState])
|
|
||||||
} finally {
|
|
||||||
cluster.unsubscribe(testActor)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
"send CurrentClusterState to one receiver when requested" in {
|
"send CurrentClusterState to one receiver when requested" in {
|
||||||
cluster.sendCurrentClusterState(testActor)
|
cluster.sendCurrentClusterState(testActor)
|
||||||
expectMsgClass(classOf[ClusterEvent.CurrentClusterState])
|
expectMsgClass(classOf[ClusterEvent.CurrentClusterState])
|
||||||
|
|
|
||||||
|
|
@ -27,3 +27,4 @@ The following, previously deprecated, features have been removed:
|
||||||
* akka-dataflow
|
* akka-dataflow
|
||||||
* akka-transactor
|
* akka-transactor
|
||||||
* durable mailboxes (akka-mailboxes-common, akka-file-mailbox)
|
* durable mailboxes (akka-mailboxes-common, akka-file-mailbox)
|
||||||
|
* Cluster.publishCurrentClusterState
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue