From 51ff9ce6d15a46eb3e1a0b70363d3ab7e91107af Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Fri, 28 Sep 2012 13:09:36 +0200 Subject: [PATCH] Cluster.unsubscribe with class parameter, see #2567 --- .../src/main/scala/akka/cluster/Cluster.scala | 11 +++++++++-- .../src/main/scala/akka/cluster/ClusterDaemon.scala | 2 +- .../src/main/scala/akka/cluster/ClusterEvent.scala | 8 +++++--- .../cluster/ClusterDomainEventPublisherSpec.scala | 8 +++++++- 4 files changed, 22 insertions(+), 7 deletions(-) diff --git a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala index 48e30080be..25b1cd684b 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala @@ -185,10 +185,17 @@ class Cluster(val system: ExtendedActorSystem) extends Extension { clusterCore ! InternalClusterAction.Subscribe(subscriber, to) /** - * Unsubscribe to cluster domain events. + * Unsubscribe to all cluster domain events. */ def unsubscribe(subscriber: ActorRef): Unit = - clusterCore ! InternalClusterAction.Unsubscribe(subscriber) + clusterCore ! InternalClusterAction.Unsubscribe(subscriber, None) + + /** + * Unsubscribe to a specific type of cluster domain events, + * matching previous `subscribe` registration. + */ + def unsubscribe(subscriber: ActorRef, to: Class[_]): Unit = + clusterCore ! InternalClusterAction.Unsubscribe(subscriber, Some(to)) /** * Publish current (full) state of the cluster to subscribers, diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala index a2d88b55ad..5be014ab2e 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala @@ -108,7 +108,7 @@ private[cluster] object InternalClusterAction { sealed trait SubscriptionMessage case class Subscribe(subscriber: ActorRef, to: Class[_]) extends SubscriptionMessage - case class Unsubscribe(subscriber: ActorRef) extends SubscriptionMessage + case class Unsubscribe(subscriber: ActorRef, to: Option[Class[_]]) extends SubscriptionMessage /** * @param receiver if `receiver` is defined the event will only be sent to that * actor, otherwise it will be sent to all subscribers via the `eventStream`. diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterEvent.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterEvent.scala index 3ccf32307b..821117e5e7 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterEvent.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterEvent.scala @@ -192,7 +192,7 @@ private[cluster] final class ClusterDomainEventPublisher extends Actor with Acto case currentStats: CurrentInternalStats ⇒ publishInternalStats(currentStats) case PublishCurrentClusterState(receiver) ⇒ publishCurrentClusterState(receiver) case Subscribe(subscriber, to) ⇒ subscribe(subscriber, to) - case Unsubscribe(subscriber) ⇒ unsubscribe(subscriber) + case Unsubscribe(subscriber, to) ⇒ unsubscribe(subscriber, to) case PublishDone ⇒ sender ! PublishDone } @@ -216,8 +216,10 @@ private[cluster] final class ClusterDomainEventPublisher extends Actor with Acto eventStream.subscribe(subscriber, to) } - def unsubscribe(subscriber: ActorRef): Unit = - eventStream.unsubscribe(subscriber) + def unsubscribe(subscriber: ActorRef, to: Option[Class[_]]): Unit = to match { + case None ⇒ eventStream.unsubscribe(subscriber) + case Some(c) ⇒ eventStream.unsubscribe(subscriber, c) + } def publishChanges(oldGossip: Gossip, newGossip: Gossip): Unit = { // keep the latestGossip to be sent to new subscribers diff --git a/akka-cluster/src/test/scala/akka/cluster/ClusterDomainEventPublisherSpec.scala b/akka-cluster/src/test/scala/akka/cluster/ClusterDomainEventPublisherSpec.scala index c29f237be7..5b615a61af 100644 --- a/akka-cluster/src/test/scala/akka/cluster/ClusterDomainEventPublisherSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/ClusterDomainEventPublisherSpec.scala @@ -54,7 +54,7 @@ class ClusterDomainEventPublisherSpec extends AkkaSpec(ClusterDomainEventPublish } override def afterEach(): Unit = { - publisher ! Unsubscribe(testActor) + publisher ! Unsubscribe(testActor, None) system.stop(publisher) } @@ -116,6 +116,12 @@ class ClusterDomainEventPublisherSpec extends AkkaSpec(ClusterDomainEventPublish expectMsgType[SeenChanged] } + "support unsubscribe" in { + publisher ! Unsubscribe(testActor, Some(classOf[ClusterDomainEvent])) + publisher ! PublishChanges(g1, g2) + expectNoMsg + } + } }