Cluster.unsubscribe with class parameter, see #2567

This commit is contained in:
Patrik Nordwall 2012-09-28 13:09:36 +02:00
parent ddde23576f
commit 51ff9ce6d1
4 changed files with 22 additions and 7 deletions

View file

@ -185,10 +185,17 @@ class Cluster(val system: ExtendedActorSystem) extends Extension {
clusterCore ! InternalClusterAction.Subscribe(subscriber, to) clusterCore ! InternalClusterAction.Subscribe(subscriber, to)
/** /**
* Unsubscribe to cluster domain events. * Unsubscribe to all cluster domain events.
*/ */
def unsubscribe(subscriber: ActorRef): Unit = def unsubscribe(subscriber: ActorRef): Unit =
clusterCore ! InternalClusterAction.Unsubscribe(subscriber) clusterCore ! InternalClusterAction.Unsubscribe(subscriber, None)
/**
* Unsubscribe to a specific type of cluster domain events,
* matching previous `subscribe` registration.
*/
def unsubscribe(subscriber: ActorRef, to: Class[_]): Unit =
clusterCore ! InternalClusterAction.Unsubscribe(subscriber, Some(to))
/** /**
* Publish current (full) state of the cluster to subscribers, * Publish current (full) state of the cluster to subscribers,

View file

@ -108,7 +108,7 @@ private[cluster] object InternalClusterAction {
sealed trait SubscriptionMessage sealed trait SubscriptionMessage
case class Subscribe(subscriber: ActorRef, to: Class[_]) extends SubscriptionMessage case class Subscribe(subscriber: ActorRef, to: Class[_]) extends SubscriptionMessage
case class Unsubscribe(subscriber: ActorRef) extends SubscriptionMessage case class Unsubscribe(subscriber: ActorRef, to: Option[Class[_]]) extends SubscriptionMessage
/** /**
* @param receiver if `receiver` is defined the event will only be sent to that * @param receiver if `receiver` is defined the event will only be sent to that
* actor, otherwise it will be sent to all subscribers via the `eventStream`. * actor, otherwise it will be sent to all subscribers via the `eventStream`.

View file

@ -192,7 +192,7 @@ private[cluster] final class ClusterDomainEventPublisher extends Actor with Acto
case currentStats: CurrentInternalStats publishInternalStats(currentStats) case currentStats: CurrentInternalStats publishInternalStats(currentStats)
case PublishCurrentClusterState(receiver) publishCurrentClusterState(receiver) case PublishCurrentClusterState(receiver) publishCurrentClusterState(receiver)
case Subscribe(subscriber, to) subscribe(subscriber, to) case Subscribe(subscriber, to) subscribe(subscriber, to)
case Unsubscribe(subscriber) unsubscribe(subscriber) case Unsubscribe(subscriber, to) unsubscribe(subscriber, to)
case PublishDone sender ! PublishDone case PublishDone sender ! PublishDone
} }
@ -216,8 +216,10 @@ private[cluster] final class ClusterDomainEventPublisher extends Actor with Acto
eventStream.subscribe(subscriber, to) eventStream.subscribe(subscriber, to)
} }
def unsubscribe(subscriber: ActorRef): Unit = def unsubscribe(subscriber: ActorRef, to: Option[Class[_]]): Unit = to match {
eventStream.unsubscribe(subscriber) case None eventStream.unsubscribe(subscriber)
case Some(c) eventStream.unsubscribe(subscriber, c)
}
def publishChanges(oldGossip: Gossip, newGossip: Gossip): Unit = { def publishChanges(oldGossip: Gossip, newGossip: Gossip): Unit = {
// keep the latestGossip to be sent to new subscribers // keep the latestGossip to be sent to new subscribers

View file

@ -54,7 +54,7 @@ class ClusterDomainEventPublisherSpec extends AkkaSpec(ClusterDomainEventPublish
} }
override def afterEach(): Unit = { override def afterEach(): Unit = {
publisher ! Unsubscribe(testActor) publisher ! Unsubscribe(testActor, None)
system.stop(publisher) system.stop(publisher)
} }
@ -116,6 +116,12 @@ class ClusterDomainEventPublisherSpec extends AkkaSpec(ClusterDomainEventPublish
expectMsgType[SeenChanged] expectMsgType[SeenChanged]
} }
"support unsubscribe" in {
publisher ! Unsubscribe(testActor, Some(classOf[ClusterDomainEvent]))
publisher ! PublishChanges(g1, g2)
expectNoMsg
}
} }
} }