From 07f28cd26a380f7265a9e7b06a49d80ae4de75c5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bj=C3=B6rn=20Antonsson?= Date: Thu, 27 Sep 2012 13:15:31 +0200 Subject: [PATCH 1/4] EventBus now handles adding and removal better. See #2561 --- .../scala/akka/event/EventStreamSpec.scala | 92 +++++++++++++------ .../src/main/scala/akka/event/EventBus.scala | 25 +++-- .../scala/akka/util/SubclassifiedIndex.scala | 77 ++++++++++++---- 3 files changed, 145 insertions(+), 49 deletions(-) diff --git a/akka-actor-tests/src/test/scala/akka/event/EventStreamSpec.scala b/akka-actor-tests/src/test/scala/akka/event/EventStreamSpec.scala index 5447a1eb74..745f4ca2b8 100644 --- a/akka-actor-tests/src/test/scala/akka/event/EventStreamSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/event/EventStreamSpec.scala @@ -56,9 +56,11 @@ object EventStreamSpec { trait T trait AT extends T + trait ATT extends AT trait BT extends T - class TA - class TAATBT extends TA with AT with BT + trait BTT extends BT + class CC + class CCATBT extends CC with ATT with BTT } @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) @@ -151,75 +153,75 @@ class EventStreamSpec extends AkkaSpec(EventStreamSpec.config) { "manage sub-channels using classes and traits (update on subscribe)" in { val es = new EventStream(false) - val tm1 = new TA - val tm2 = new TAATBT + val tm1 = new CC + val tm2 = new CCATBT val a1, a2, a3, a4 = TestProbe() es.subscribe(a1.ref, classOf[AT]) must be === true es.subscribe(a2.ref, classOf[BT]) must be === true - es.subscribe(a3.ref, classOf[TA]) must be === true - es.subscribe(a4.ref, classOf[TAATBT]) must be === true + es.subscribe(a3.ref, classOf[CC]) must be === true + es.subscribe(a4.ref, classOf[CCATBT]) must be === true es.publish(tm1) es.publish(tm2) a1.expectMsgType[AT] must be === tm2 a2.expectMsgType[BT] must be === tm2 - a3.expectMsgType[TA] must be === tm1 - a3.expectMsgType[TA] must be === tm2 - a4.expectMsgType[TAATBT] must be === tm2 + a3.expectMsgType[CC] must be === tm1 + a3.expectMsgType[CC] must be === tm2 + a4.expectMsgType[CCATBT] must be === tm2 es.unsubscribe(a1.ref, classOf[AT]) must be === true es.unsubscribe(a2.ref, classOf[BT]) must be === true - es.unsubscribe(a3.ref, classOf[TA]) must be === true - es.unsubscribe(a4.ref, classOf[TAATBT]) must be === true + es.unsubscribe(a3.ref, classOf[CC]) must be === true + es.unsubscribe(a4.ref, classOf[CCATBT]) must be === true } "manage sub-channels using classes and traits (update on unsubscribe)" in { val es = new EventStream(false) - val tm1 = new TA - val tm2 = new TAATBT + val tm1 = new CC + val tm2 = new CCATBT val a1, a2, a3, a4 = TestProbe() es.subscribe(a1.ref, classOf[AT]) must be === true es.subscribe(a2.ref, classOf[BT]) must be === true - es.subscribe(a3.ref, classOf[TA]) must be === true - es.subscribe(a4.ref, classOf[TAATBT]) must be === true - es.unsubscribe(a3.ref, classOf[TA]) must be === true + es.subscribe(a3.ref, classOf[CC]) must be === true + es.subscribe(a4.ref, classOf[CCATBT]) must be === true + es.unsubscribe(a3.ref, classOf[CC]) must be === true es.publish(tm1) es.publish(tm2) a1.expectMsgType[AT] must be === tm2 a2.expectMsgType[BT] must be === tm2 a3.expectNoMsg(1 second) - a4.expectMsgType[TAATBT] must be === tm2 + a4.expectMsgType[CCATBT] must be === tm2 es.unsubscribe(a1.ref, classOf[AT]) must be === true es.unsubscribe(a2.ref, classOf[BT]) must be === true - es.unsubscribe(a4.ref, classOf[TAATBT]) must be === true + es.unsubscribe(a4.ref, classOf[CCATBT]) must be === true } "manage sub-channels using classes and traits (update on unsubscribe all)" in { val es = new EventStream(false) - val tm1 = new TA - val tm2 = new TAATBT + val tm1 = new CC + val tm2 = new CCATBT val a1, a2, a3, a4 = TestProbe() es.subscribe(a1.ref, classOf[AT]) must be === true es.subscribe(a2.ref, classOf[BT]) must be === true - es.subscribe(a3.ref, classOf[TA]) must be === true - es.subscribe(a4.ref, classOf[TAATBT]) must be === true + es.subscribe(a3.ref, classOf[CC]) must be === true + es.subscribe(a4.ref, classOf[CCATBT]) must be === true es.unsubscribe(a3.ref) es.publish(tm1) es.publish(tm2) a1.expectMsgType[AT] must be === tm2 a2.expectMsgType[BT] must be === tm2 a3.expectNoMsg(1 second) - a4.expectMsgType[TAATBT] must be === tm2 + a4.expectMsgType[CCATBT] must be === tm2 es.unsubscribe(a1.ref, classOf[AT]) must be === true es.unsubscribe(a2.ref, classOf[BT]) must be === true - es.unsubscribe(a4.ref, classOf[TAATBT]) must be === true + es.unsubscribe(a4.ref, classOf[CCATBT]) must be === true } "manage sub-channels using classes and traits (update on publish)" in { val es = new EventStream(false) - val tm1 = new TA - val tm2 = new TAATBT + val tm1 = new CC + val tm2 = new CCATBT val a1, a2 = TestProbe() es.subscribe(a1.ref, classOf[AT]) must be === true @@ -232,6 +234,44 @@ class EventStreamSpec extends AkkaSpec(EventStreamSpec.config) { es.unsubscribe(a2.ref, classOf[BT]) must be === true } + "manage sub-channels using classes and traits (unsubscribe classes used with trait)" in { + val es = new EventStream(false) + val tm1 = new CC + val tm2 = new CCATBT + val a1, a2, a3 = TestProbe() + + es.subscribe(a1.ref, classOf[AT]) must be === true + es.subscribe(a2.ref, classOf[BT]) must be === true + es.subscribe(a2.ref, classOf[CC]) must be === true + es.subscribe(a3.ref, classOf[CC]) must be === true + es.unsubscribe(a2.ref, classOf[CC]) must be === true + es.unsubscribe(a3.ref, classOf[CCATBT]) must be === true + es.publish(tm1) + es.publish(tm2) + a1.expectMsgType[AT] must be === tm2 + a2.expectMsgType[BT] must be === tm2 + a3.expectMsgType[CC] must be === tm1 + es.unsubscribe(a1.ref, classOf[AT]) must be === true + es.unsubscribe(a2.ref, classOf[BT]) must be === true + es.unsubscribe(a3.ref, classOf[CC]) must be === true + } + + "manage sub-channels using classes and traits (subscribe after publish)" in { + val es = new EventStream(false) + val tm1 = new CCATBT + val a1, a2 = TestProbe() + + es.subscribe(a1.ref, classOf[AT]) must be === true + es.publish(tm1) + a1.expectMsgType[AT] must be === tm1 + a2.expectNoMsg(1 second) + es.subscribe(a2.ref, classOf[BTT]) must be === true + es.publish(tm1) + a1.expectMsgType[AT] must be === tm1 + a2.expectMsgType[BTT] must be === tm1 + es.unsubscribe(a1.ref, classOf[AT]) must be === true + es.unsubscribe(a2.ref, classOf[BTT]) must be === true + } } private def verifyLevel(bus: LoggingBus, level: Logging.LogLevel) { diff --git a/akka-actor/src/main/scala/akka/event/EventBus.scala b/akka-actor/src/main/scala/akka/event/EventBus.scala index 63436723bd..cd89a26f8d 100644 --- a/akka-actor/src/main/scala/akka/event/EventBus.scala +++ b/akka-actor/src/main/scala/akka/event/EventBus.scala @@ -139,7 +139,7 @@ trait SubchannelClassification { this: EventBus ⇒ val diff = subscriptions.addValue(to, subscriber) if (diff.isEmpty) false else { - cache ++= diff + addToCache(diff) true } } @@ -148,7 +148,7 @@ trait SubchannelClassification { this: EventBus ⇒ val diff = subscriptions.removeValue(from, subscriber) if (diff.isEmpty) false else { - removeFromCache(diff) + cache ++= diff true } } @@ -158,9 +158,6 @@ trait SubchannelClassification { this: EventBus ⇒ if (diff.nonEmpty) removeFromCache(diff) } - private def removeFromCache(changes: Seq[(Classifier, Set[Subscriber])]): Unit = - cache ++= changes map { case (c, s) ⇒ (c, cache.getOrElse(c, Set[Subscriber]()) -- s) } - def publish(event: Event): Unit = { val c = classify(event) val recv = @@ -168,13 +165,27 @@ trait SubchannelClassification { this: EventBus ⇒ else subscriptions.synchronized { if (cache contains c) cache(c) else { - val diff = subscriptions.addKey(c) - cache ++= diff + addToCache(subscriptions.addKey(c)) cache(c) } } recv foreach (publish(event, _)) } + + // we can only let keys that already exist in the cache get updated + private def removeFromCache(changes: Seq[(Classifier, Set[Subscriber])]): Unit = + cache ++= (List.empty[(Classifier, Set[Subscriber])] /: changes) { + case (cl, (c, cs)) ⇒ + cache.get(c) match { + case None ⇒ cl + case Some(ss) ⇒ (c, ss -- cs) :: cl + } + } + + private def addToCache(changes: Seq[(Classifier, Set[Subscriber])]): Unit = + cache = (cache /: changes) { + case (m, (c, cs)) ⇒ m.updated(c, m.getOrElse(c, Set.empty[Subscriber]) ++ cs) + } } /** diff --git a/akka-actor/src/main/scala/akka/util/SubclassifiedIndex.scala b/akka-actor/src/main/scala/akka/util/SubclassifiedIndex.scala index 45a70be27c..23cd6096d2 100644 --- a/akka-actor/src/main/scala/akka/util/SubclassifiedIndex.scala +++ b/akka-actor/src/main/scala/akka/util/SubclassifiedIndex.scala @@ -17,9 +17,9 @@ trait Subclassification[K] { def isSubclass(x: K, y: K): Boolean } -object SubclassifiedIndex { +private[akka] object SubclassifiedIndex { - class Nonroot[K, V](val key: K, _values: Set[V])(implicit sc: Subclassification[K]) extends SubclassifiedIndex[K, V](_values) { + class Nonroot[K, V](override val root: SubclassifiedIndex[K, V], val key: K, _values: Set[V])(implicit sc: Subclassification[K]) extends SubclassifiedIndex[K, V](_values) { override def innerAddValue(key: K, value: V): Changes = { // break the recursion on super when key is found and transition to recursive add-to-set @@ -34,6 +34,7 @@ object SubclassifiedIndex { } else kids } + // this will return the keys and values to be removed from the cache override def innerRemoveValue(key: K, value: V): Changes = { // break the recursion on super when key is found and transition to recursive remove-from-set if (sc.isEqual(key, this.key)) removeValue(value) else super.innerRemoveValue(key, value) @@ -47,9 +48,14 @@ object SubclassifiedIndex { } else kids } + override def innerFindValues(key: K): Set[V] = + if (sc.isEqual(key, this.key)) values else super.innerFindValues(key) + override def toString = subkeys.mkString("Nonroot(" + key + ", " + values + ",\n", ",\n", ")") } + private[SubclassifiedIndex] def emptyFindSet[V] = internalEmptyFindSet.asInstanceOf[Set[V]] + private[this] val internalEmptyFindSet = Set.empty[AnyRef] private[SubclassifiedIndex] def emptyMergeMap[K, V] = internalEmptyMergeMap.asInstanceOf[Map[K, Set[V]]] private[this] val internalEmptyMergeMap = Map[AnyRef, Set[AnyRef]]().withDefault(_ ⇒ Set[AnyRef]()) } @@ -66,7 +72,7 @@ object SubclassifiedIndex { * cache, e.g. HashMap, is faster than tree traversal which must use linear * scan at each level. Therefore, no value traversals are published. */ -class SubclassifiedIndex[K, V] private (private var values: Set[V])(implicit sc: Subclassification[K]) { +private[akka] class SubclassifiedIndex[K, V] private (private var values: Set[V])(implicit sc: Subclassification[K]) { import SubclassifiedIndex._ @@ -76,9 +82,13 @@ class SubclassifiedIndex[K, V] private (private var values: Set[V])(implicit sc: def this()(implicit sc: Subclassification[K]) = this(Set.empty) + protected val root = this + /** * Add key to this index which inherits its value set from the most specific * super-class which is known. + * + * @return the diff that should be added to the cache */ def addKey(key: K): Changes = mergeChangesByKey(innerAddKey(key)) @@ -94,14 +104,15 @@ class SubclassifiedIndex[K, V] private (private var values: Set[V])(implicit sc: } else Nil } if (!found) { - integrate(new Nonroot(key, values)) - Seq((key, values)) + integrate(new Nonroot(root, key, values)) :+ ((key, values)) } else ch } /** * Add value to all keys which are subclasses of the given key. If the key * is not known yet, it is inserted as if using addKey. + * + * @return the diff that should be added to the cache */ def addValue(key: K, value: V): Changes = mergeChangesByKey(innerAddValue(key, value)) @@ -115,17 +126,22 @@ class SubclassifiedIndex[K, V] private (private var values: Set[V])(implicit sc: } if (!found) { val v = values + value - val n = new Nonroot(key, v) - integrate(n) - n.innerAddValue(key, value) :+ (key -> v) + val n = new Nonroot(root, key, v) + integrate(n) ++ n.innerAddValue(key, value) :+ (key -> v) } else ch } /** * Remove value from all keys which are subclasses of the given key. - * @return The keys and values that have been removed. + * + * @return the complete changes that should be inserted in the cache */ - def removeValue(key: K, value: V): Changes = mergeChangesByKey(innerRemoveValue(key, value)) + def removeValue(key: K, value: V): Changes = + mergeChangesByKey(innerRemoveValue(key, value)) map { + case (k, v) ⇒ (k, findValues(k)) + } + + // this will return the keys and values to be removed from the cache protected def innerRemoveValue(key: K, value: V): Changes = { var found = false val ch = subkeys flatMap { n ⇒ @@ -135,25 +151,52 @@ class SubclassifiedIndex[K, V] private (private var values: Set[V])(implicit sc: } else Nil } if (!found) { - val n = new Nonroot(key, values) - integrate(n) - n.removeValue(value) + val n = new Nonroot(root, key, values) + integrate(n) ++ n.removeValue(value) } else ch } /** * Remove value from all keys in the index. + * + * @return the diff that should be removed from the cache */ def removeValue(value: V): Changes = mergeChangesByKey(subkeys flatMap (_ removeValue value)) + /** + * Find all values for a given key in the index. + */ + protected final def findValues(key: K): Set[V] = root.innerFindValues(key) + protected def innerFindValues(key: K): Set[V] = + (emptyFindSet[V] /: subkeys) { (s, n) ⇒ + if (sc.isSubclass(key, n.key)) + s ++ n.innerFindValues(key) + else + s + } + + /** + * Find all subkeys of a given key in the index excluding some subkeys. + */ + protected final def findSubKeysExcept(key: K, except: Vector[Nonroot[K, V]]): Set[K] = root.innerFindSubKeys(key, except) + protected def innerFindSubKeys(key: K, except: Vector[Nonroot[K, V]]): Set[K] = + (emptyFindSet[K] /: subkeys) { (s, n) ⇒ + if (sc.isEqual(key, n.key)) s + else n.innerFindSubKeys(key, except) ++ { + if (sc.isSubclass(n.key, key) && !except.exists(e ⇒ sc.isEqual(key, e.key))) + s + n.key + else + s + } + } + override def toString = subkeys.mkString("SubclassifiedIndex(" + values + ",\n", ",\n", ")") /** * Add new Nonroot below this node and check all existing nodes for subclass relationship. - * - * @return true if and only if the new node has received subkeys during this operation. + * Also needs to find subkeys in other parts of the tree to compensate for multiple inheritance. */ - private def integrate(n: Nonroot[K, V]) { + private def integrate(n: Nonroot[K, V]): Changes = { val (subsub, sub) = subkeys partition (k ⇒ sc.isSubclass(k.key, n.key)) if (sub.size == subkeys.size) { subkeys :+= n @@ -161,6 +204,8 @@ class SubclassifiedIndex[K, V] private (private var values: Set[V])(implicit sc: n.subkeys = subsub subkeys = sub :+ n } + n.subkeys ++= findSubKeysExcept(n.key, n.subkeys).map(k ⇒ new Nonroot(root, k, values)) + n.subkeys.map(n ⇒ (n.key, n.values.toSet)) } private def mergeChangesByKey(changes: Changes): Changes = From 036811aeaf9f0e502727182264bbccf57c3a5408 Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Thu, 27 Sep 2012 16:51:00 +0200 Subject: [PATCH 2/4] Minor code restructuring for SubchannelClassification --- .../src/main/scala/akka/event/EventBus.scala | 25 ++++++------------- 1 file changed, 7 insertions(+), 18 deletions(-) diff --git a/akka-actor/src/main/scala/akka/event/EventBus.scala b/akka-actor/src/main/scala/akka/event/EventBus.scala index cd89a26f8d..66b0aa0c29 100644 --- a/akka-actor/src/main/scala/akka/event/EventBus.scala +++ b/akka-actor/src/main/scala/akka/event/EventBus.scala @@ -137,25 +137,18 @@ trait SubchannelClassification { this: EventBus ⇒ def subscribe(subscriber: Subscriber, to: Classifier): Boolean = subscriptions.synchronized { val diff = subscriptions.addValue(to, subscriber) - if (diff.isEmpty) false - else { - addToCache(diff) - true - } + addToCache(diff) + diff.nonEmpty } def unsubscribe(subscriber: Subscriber, from: Classifier): Boolean = subscriptions.synchronized { val diff = subscriptions.removeValue(from, subscriber) - if (diff.isEmpty) false - else { - cache ++= diff - true - } + cache ++= diff // FIXME What is the reason this isn't calling removeFromCache? + diff.nonEmpty } def unsubscribe(subscriber: Subscriber): Unit = subscriptions.synchronized { - val diff = subscriptions.removeValue(subscriber) - if (diff.nonEmpty) removeFromCache(diff) + removeFromCache(subscriptions.removeValue(subscriber)) } def publish(event: Event): Unit = { @@ -174,12 +167,8 @@ trait SubchannelClassification { this: EventBus ⇒ // we can only let keys that already exist in the cache get updated private def removeFromCache(changes: Seq[(Classifier, Set[Subscriber])]): Unit = - cache ++= (List.empty[(Classifier, Set[Subscriber])] /: changes) { - case (cl, (c, cs)) ⇒ - cache.get(c) match { - case None ⇒ cl - case Some(ss) ⇒ (c, ss -- cs) :: cl - } + cache = (cache /: changes) { + case (m, (c, cs)) ⇒ m.updated(c, m.getOrElse(c, Set.empty[Subscriber]) -- cs) } private def addToCache(changes: Seq[(Classifier, Set[Subscriber])]): Unit = From b7d5056dfb70317fa562eda1cefcdcbbea4edae5 Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Thu, 27 Sep 2012 17:09:48 +0200 Subject: [PATCH 3/4] Minor cleanup in SubclassifiedIndex --- .../main/scala/akka/util/SubclassifiedIndex.scala | 14 ++++---------- 1 file changed, 4 insertions(+), 10 deletions(-) diff --git a/akka-actor/src/main/scala/akka/util/SubclassifiedIndex.scala b/akka-actor/src/main/scala/akka/util/SubclassifiedIndex.scala index 23cd6096d2..bff7a0ffee 100644 --- a/akka-actor/src/main/scala/akka/util/SubclassifiedIndex.scala +++ b/akka-actor/src/main/scala/akka/util/SubclassifiedIndex.scala @@ -54,8 +54,6 @@ private[akka] object SubclassifiedIndex { override def toString = subkeys.mkString("Nonroot(" + key + ", " + values + ",\n", ",\n", ")") } - private[SubclassifiedIndex] def emptyFindSet[V] = internalEmptyFindSet.asInstanceOf[Set[V]] - private[this] val internalEmptyFindSet = Set.empty[AnyRef] private[SubclassifiedIndex] def emptyMergeMap[K, V] = internalEmptyMergeMap.asInstanceOf[Map[K, Set[V]]] private[this] val internalEmptyMergeMap = Map[AnyRef, Set[AnyRef]]().withDefault(_ ⇒ Set[AnyRef]()) } @@ -168,7 +166,7 @@ private[akka] class SubclassifiedIndex[K, V] private (private var values: Set[V] */ protected final def findValues(key: K): Set[V] = root.innerFindValues(key) protected def innerFindValues(key: K): Set[V] = - (emptyFindSet[V] /: subkeys) { (s, n) ⇒ + (Set.empty[V] /: subkeys) { (s, n) ⇒ if (sc.isSubclass(key, n.key)) s ++ n.innerFindValues(key) else @@ -180,7 +178,7 @@ private[akka] class SubclassifiedIndex[K, V] private (private var values: Set[V] */ protected final def findSubKeysExcept(key: K, except: Vector[Nonroot[K, V]]): Set[K] = root.innerFindSubKeys(key, except) protected def innerFindSubKeys(key: K, except: Vector[Nonroot[K, V]]): Set[K] = - (emptyFindSet[K] /: subkeys) { (s, n) ⇒ + (Set.empty[K] /: subkeys) { (s, n) ⇒ if (sc.isEqual(key, n.key)) s else n.innerFindSubKeys(key, except) ++ { if (sc.isSubclass(n.key, key) && !except.exists(e ⇒ sc.isEqual(key, e.key))) @@ -198,12 +196,8 @@ private[akka] class SubclassifiedIndex[K, V] private (private var values: Set[V] */ private def integrate(n: Nonroot[K, V]): Changes = { val (subsub, sub) = subkeys partition (k ⇒ sc.isSubclass(k.key, n.key)) - if (sub.size == subkeys.size) { - subkeys :+= n - } else { - n.subkeys = subsub - subkeys = sub :+ n - } + subkeys = sub :+ n + n.subkeys = if (subsub.nonEmpty) subsub else n.subkeys n.subkeys ++= findSubKeysExcept(n.key, n.subkeys).map(k ⇒ new Nonroot(root, k, values)) n.subkeys.map(n ⇒ (n.key, n.values.toSet)) } From 208c0cc4ebaa58281799ad3405dcfbbad6c42232 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bj=C3=B6rn=20Antonsson?= Date: Fri, 28 Sep 2012 15:18:05 +0200 Subject: [PATCH 4/4] Review feedback fixes. #2561 --- akka-actor/src/main/scala/akka/event/EventBus.scala | 5 +++-- .../src/main/scala/akka/util/SubclassifiedIndex.scala | 7 +++++-- 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/akka-actor/src/main/scala/akka/event/EventBus.scala b/akka-actor/src/main/scala/akka/event/EventBus.scala index 66b0aa0c29..cb83fbe806 100644 --- a/akka-actor/src/main/scala/akka/event/EventBus.scala +++ b/akka-actor/src/main/scala/akka/event/EventBus.scala @@ -143,7 +143,9 @@ trait SubchannelClassification { this: EventBus ⇒ def unsubscribe(subscriber: Subscriber, from: Classifier): Boolean = subscriptions.synchronized { val diff = subscriptions.removeValue(from, subscriber) - cache ++= diff // FIXME What is the reason this isn't calling removeFromCache? + // removeValue(K, V) does not return the diff to remove from or add to the cache + // but instead the whole set of keys and values that should be updated in the cache + cache ++= diff diff.nonEmpty } @@ -165,7 +167,6 @@ trait SubchannelClassification { this: EventBus ⇒ recv foreach (publish(event, _)) } - // we can only let keys that already exist in the cache get updated private def removeFromCache(changes: Seq[(Classifier, Set[Subscriber])]): Unit = cache = (cache /: changes) { case (m, (c, cs)) ⇒ m.updated(c, m.getOrElse(c, Set.empty[Subscriber]) -- cs) diff --git a/akka-actor/src/main/scala/akka/util/SubclassifiedIndex.scala b/akka-actor/src/main/scala/akka/util/SubclassifiedIndex.scala index bff7a0ffee..ae82da6407 100644 --- a/akka-actor/src/main/scala/akka/util/SubclassifiedIndex.scala +++ b/akka-actor/src/main/scala/akka/util/SubclassifiedIndex.scala @@ -132,11 +132,14 @@ private[akka] class SubclassifiedIndex[K, V] private (private var values: Set[V] /** * Remove value from all keys which are subclasses of the given key. * - * @return the complete changes that should be inserted in the cache + * @return the complete changes that should be updated in the cache */ def removeValue(key: K, value: V): Changes = + // the reason for not using the values in the returned diff is that we need to + // go through the whole tree to find all values for the "changed" keys in other + // parts of the tree as well, since new nodes might have been created mergeChangesByKey(innerRemoveValue(key, value)) map { - case (k, v) ⇒ (k, findValues(k)) + case (k, _) ⇒ (k, findValues(k)) } // this will return the keys and values to be removed from the cache