diff --git a/akka-actor/src/main/scala/akka/event/EventBus.scala b/akka-actor/src/main/scala/akka/event/EventBus.scala index cd89a26f8d..66b0aa0c29 100644 --- a/akka-actor/src/main/scala/akka/event/EventBus.scala +++ b/akka-actor/src/main/scala/akka/event/EventBus.scala @@ -137,25 +137,18 @@ trait SubchannelClassification { this: EventBus ⇒ def subscribe(subscriber: Subscriber, to: Classifier): Boolean = subscriptions.synchronized { val diff = subscriptions.addValue(to, subscriber) - if (diff.isEmpty) false - else { - addToCache(diff) - true - } + addToCache(diff) + diff.nonEmpty } def unsubscribe(subscriber: Subscriber, from: Classifier): Boolean = subscriptions.synchronized { val diff = subscriptions.removeValue(from, subscriber) - if (diff.isEmpty) false - else { - cache ++= diff - true - } + cache ++= diff // FIXME What is the reason this isn't calling removeFromCache? + diff.nonEmpty } def unsubscribe(subscriber: Subscriber): Unit = subscriptions.synchronized { - val diff = subscriptions.removeValue(subscriber) - if (diff.nonEmpty) removeFromCache(diff) + removeFromCache(subscriptions.removeValue(subscriber)) } def publish(event: Event): Unit = { @@ -174,12 +167,8 @@ trait SubchannelClassification { this: EventBus ⇒ // we can only let keys that already exist in the cache get updated private def removeFromCache(changes: Seq[(Classifier, Set[Subscriber])]): Unit = - cache ++= (List.empty[(Classifier, Set[Subscriber])] /: changes) { - case (cl, (c, cs)) ⇒ - cache.get(c) match { - case None ⇒ cl - case Some(ss) ⇒ (c, ss -- cs) :: cl - } + cache = (cache /: changes) { + case (m, (c, cs)) ⇒ m.updated(c, m.getOrElse(c, Set.empty[Subscriber]) -- cs) } private def addToCache(changes: Seq[(Classifier, Set[Subscriber])]): Unit =