EventBus now handles adding and removal better. See #2561

This commit is contained in:
Björn Antonsson 2012-09-27 13:15:31 +02:00
parent 4714e90679
commit 07f28cd26a
3 changed files with 145 additions and 49 deletions

View file

@ -56,9 +56,11 @@ object EventStreamSpec {
trait T trait T
trait AT extends T trait AT extends T
trait ATT extends AT
trait BT extends T trait BT extends T
class TA trait BTT extends BT
class TAATBT extends TA with AT with BT class CC
class CCATBT extends CC with ATT with BTT
} }
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
@ -151,75 +153,75 @@ class EventStreamSpec extends AkkaSpec(EventStreamSpec.config) {
"manage sub-channels using classes and traits (update on subscribe)" in { "manage sub-channels using classes and traits (update on subscribe)" in {
val es = new EventStream(false) val es = new EventStream(false)
val tm1 = new TA val tm1 = new CC
val tm2 = new TAATBT val tm2 = new CCATBT
val a1, a2, a3, a4 = TestProbe() val a1, a2, a3, a4 = TestProbe()
es.subscribe(a1.ref, classOf[AT]) must be === true es.subscribe(a1.ref, classOf[AT]) must be === true
es.subscribe(a2.ref, classOf[BT]) must be === true es.subscribe(a2.ref, classOf[BT]) must be === true
es.subscribe(a3.ref, classOf[TA]) must be === true es.subscribe(a3.ref, classOf[CC]) must be === true
es.subscribe(a4.ref, classOf[TAATBT]) must be === true es.subscribe(a4.ref, classOf[CCATBT]) must be === true
es.publish(tm1) es.publish(tm1)
es.publish(tm2) es.publish(tm2)
a1.expectMsgType[AT] must be === tm2 a1.expectMsgType[AT] must be === tm2
a2.expectMsgType[BT] must be === tm2 a2.expectMsgType[BT] must be === tm2
a3.expectMsgType[TA] must be === tm1 a3.expectMsgType[CC] must be === tm1
a3.expectMsgType[TA] must be === tm2 a3.expectMsgType[CC] must be === tm2
a4.expectMsgType[TAATBT] must be === tm2 a4.expectMsgType[CCATBT] must be === tm2
es.unsubscribe(a1.ref, classOf[AT]) must be === true es.unsubscribe(a1.ref, classOf[AT]) must be === true
es.unsubscribe(a2.ref, classOf[BT]) must be === true es.unsubscribe(a2.ref, classOf[BT]) must be === true
es.unsubscribe(a3.ref, classOf[TA]) must be === true es.unsubscribe(a3.ref, classOf[CC]) must be === true
es.unsubscribe(a4.ref, classOf[TAATBT]) must be === true es.unsubscribe(a4.ref, classOf[CCATBT]) must be === true
} }
"manage sub-channels using classes and traits (update on unsubscribe)" in { "manage sub-channels using classes and traits (update on unsubscribe)" in {
val es = new EventStream(false) val es = new EventStream(false)
val tm1 = new TA val tm1 = new CC
val tm2 = new TAATBT val tm2 = new CCATBT
val a1, a2, a3, a4 = TestProbe() val a1, a2, a3, a4 = TestProbe()
es.subscribe(a1.ref, classOf[AT]) must be === true es.subscribe(a1.ref, classOf[AT]) must be === true
es.subscribe(a2.ref, classOf[BT]) must be === true es.subscribe(a2.ref, classOf[BT]) must be === true
es.subscribe(a3.ref, classOf[TA]) must be === true es.subscribe(a3.ref, classOf[CC]) must be === true
es.subscribe(a4.ref, classOf[TAATBT]) must be === true es.subscribe(a4.ref, classOf[CCATBT]) must be === true
es.unsubscribe(a3.ref, classOf[TA]) must be === true es.unsubscribe(a3.ref, classOf[CC]) must be === true
es.publish(tm1) es.publish(tm1)
es.publish(tm2) es.publish(tm2)
a1.expectMsgType[AT] must be === tm2 a1.expectMsgType[AT] must be === tm2
a2.expectMsgType[BT] must be === tm2 a2.expectMsgType[BT] must be === tm2
a3.expectNoMsg(1 second) a3.expectNoMsg(1 second)
a4.expectMsgType[TAATBT] must be === tm2 a4.expectMsgType[CCATBT] must be === tm2
es.unsubscribe(a1.ref, classOf[AT]) must be === true es.unsubscribe(a1.ref, classOf[AT]) must be === true
es.unsubscribe(a2.ref, classOf[BT]) must be === true es.unsubscribe(a2.ref, classOf[BT]) must be === true
es.unsubscribe(a4.ref, classOf[TAATBT]) must be === true es.unsubscribe(a4.ref, classOf[CCATBT]) must be === true
} }
"manage sub-channels using classes and traits (update on unsubscribe all)" in { "manage sub-channels using classes and traits (update on unsubscribe all)" in {
val es = new EventStream(false) val es = new EventStream(false)
val tm1 = new TA val tm1 = new CC
val tm2 = new TAATBT val tm2 = new CCATBT
val a1, a2, a3, a4 = TestProbe() val a1, a2, a3, a4 = TestProbe()
es.subscribe(a1.ref, classOf[AT]) must be === true es.subscribe(a1.ref, classOf[AT]) must be === true
es.subscribe(a2.ref, classOf[BT]) must be === true es.subscribe(a2.ref, classOf[BT]) must be === true
es.subscribe(a3.ref, classOf[TA]) must be === true es.subscribe(a3.ref, classOf[CC]) must be === true
es.subscribe(a4.ref, classOf[TAATBT]) must be === true es.subscribe(a4.ref, classOf[CCATBT]) must be === true
es.unsubscribe(a3.ref) es.unsubscribe(a3.ref)
es.publish(tm1) es.publish(tm1)
es.publish(tm2) es.publish(tm2)
a1.expectMsgType[AT] must be === tm2 a1.expectMsgType[AT] must be === tm2
a2.expectMsgType[BT] must be === tm2 a2.expectMsgType[BT] must be === tm2
a3.expectNoMsg(1 second) a3.expectNoMsg(1 second)
a4.expectMsgType[TAATBT] must be === tm2 a4.expectMsgType[CCATBT] must be === tm2
es.unsubscribe(a1.ref, classOf[AT]) must be === true es.unsubscribe(a1.ref, classOf[AT]) must be === true
es.unsubscribe(a2.ref, classOf[BT]) must be === true es.unsubscribe(a2.ref, classOf[BT]) must be === true
es.unsubscribe(a4.ref, classOf[TAATBT]) must be === true es.unsubscribe(a4.ref, classOf[CCATBT]) must be === true
} }
"manage sub-channels using classes and traits (update on publish)" in { "manage sub-channels using classes and traits (update on publish)" in {
val es = new EventStream(false) val es = new EventStream(false)
val tm1 = new TA val tm1 = new CC
val tm2 = new TAATBT val tm2 = new CCATBT
val a1, a2 = TestProbe() val a1, a2 = TestProbe()
es.subscribe(a1.ref, classOf[AT]) must be === true es.subscribe(a1.ref, classOf[AT]) must be === true
@ -232,6 +234,44 @@ class EventStreamSpec extends AkkaSpec(EventStreamSpec.config) {
es.unsubscribe(a2.ref, classOf[BT]) must be === true es.unsubscribe(a2.ref, classOf[BT]) must be === true
} }
"manage sub-channels using classes and traits (unsubscribe classes used with trait)" in {
val es = new EventStream(false)
val tm1 = new CC
val tm2 = new CCATBT
val a1, a2, a3 = TestProbe()
es.subscribe(a1.ref, classOf[AT]) must be === true
es.subscribe(a2.ref, classOf[BT]) must be === true
es.subscribe(a2.ref, classOf[CC]) must be === true
es.subscribe(a3.ref, classOf[CC]) must be === true
es.unsubscribe(a2.ref, classOf[CC]) must be === true
es.unsubscribe(a3.ref, classOf[CCATBT]) must be === true
es.publish(tm1)
es.publish(tm2)
a1.expectMsgType[AT] must be === tm2
a2.expectMsgType[BT] must be === tm2
a3.expectMsgType[CC] must be === tm1
es.unsubscribe(a1.ref, classOf[AT]) must be === true
es.unsubscribe(a2.ref, classOf[BT]) must be === true
es.unsubscribe(a3.ref, classOf[CC]) must be === true
}
"manage sub-channels using classes and traits (subscribe after publish)" in {
val es = new EventStream(false)
val tm1 = new CCATBT
val a1, a2 = TestProbe()
es.subscribe(a1.ref, classOf[AT]) must be === true
es.publish(tm1)
a1.expectMsgType[AT] must be === tm1
a2.expectNoMsg(1 second)
es.subscribe(a2.ref, classOf[BTT]) must be === true
es.publish(tm1)
a1.expectMsgType[AT] must be === tm1
a2.expectMsgType[BTT] must be === tm1
es.unsubscribe(a1.ref, classOf[AT]) must be === true
es.unsubscribe(a2.ref, classOf[BTT]) must be === true
}
} }
private def verifyLevel(bus: LoggingBus, level: Logging.LogLevel) { private def verifyLevel(bus: LoggingBus, level: Logging.LogLevel) {

View file

@ -139,7 +139,7 @@ trait SubchannelClassification { this: EventBus ⇒
val diff = subscriptions.addValue(to, subscriber) val diff = subscriptions.addValue(to, subscriber)
if (diff.isEmpty) false if (diff.isEmpty) false
else { else {
cache ++= diff addToCache(diff)
true true
} }
} }
@ -148,7 +148,7 @@ trait SubchannelClassification { this: EventBus ⇒
val diff = subscriptions.removeValue(from, subscriber) val diff = subscriptions.removeValue(from, subscriber)
if (diff.isEmpty) false if (diff.isEmpty) false
else { else {
removeFromCache(diff) cache ++= diff
true true
} }
} }
@ -158,9 +158,6 @@ trait SubchannelClassification { this: EventBus ⇒
if (diff.nonEmpty) removeFromCache(diff) if (diff.nonEmpty) removeFromCache(diff)
} }
private def removeFromCache(changes: Seq[(Classifier, Set[Subscriber])]): Unit =
cache ++= changes map { case (c, s) (c, cache.getOrElse(c, Set[Subscriber]()) -- s) }
def publish(event: Event): Unit = { def publish(event: Event): Unit = {
val c = classify(event) val c = classify(event)
val recv = val recv =
@ -168,13 +165,27 @@ trait SubchannelClassification { this: EventBus ⇒
else subscriptions.synchronized { else subscriptions.synchronized {
if (cache contains c) cache(c) if (cache contains c) cache(c)
else { else {
val diff = subscriptions.addKey(c) addToCache(subscriptions.addKey(c))
cache ++= diff
cache(c) cache(c)
} }
} }
recv foreach (publish(event, _)) recv foreach (publish(event, _))
} }
// we can only let keys that already exist in the cache get updated
private def removeFromCache(changes: Seq[(Classifier, Set[Subscriber])]): Unit =
cache ++= (List.empty[(Classifier, Set[Subscriber])] /: changes) {
case (cl, (c, cs))
cache.get(c) match {
case None cl
case Some(ss) (c, ss -- cs) :: cl
}
}
private def addToCache(changes: Seq[(Classifier, Set[Subscriber])]): Unit =
cache = (cache /: changes) {
case (m, (c, cs)) m.updated(c, m.getOrElse(c, Set.empty[Subscriber]) ++ cs)
}
} }
/** /**

View file

@ -17,9 +17,9 @@ trait Subclassification[K] {
def isSubclass(x: K, y: K): Boolean def isSubclass(x: K, y: K): Boolean
} }
object SubclassifiedIndex { private[akka] object SubclassifiedIndex {
class Nonroot[K, V](val key: K, _values: Set[V])(implicit sc: Subclassification[K]) extends SubclassifiedIndex[K, V](_values) { class Nonroot[K, V](override val root: SubclassifiedIndex[K, V], val key: K, _values: Set[V])(implicit sc: Subclassification[K]) extends SubclassifiedIndex[K, V](_values) {
override def innerAddValue(key: K, value: V): Changes = { override def innerAddValue(key: K, value: V): Changes = {
// break the recursion on super when key is found and transition to recursive add-to-set // break the recursion on super when key is found and transition to recursive add-to-set
@ -34,6 +34,7 @@ object SubclassifiedIndex {
} else kids } else kids
} }
// this will return the keys and values to be removed from the cache
override def innerRemoveValue(key: K, value: V): Changes = { override def innerRemoveValue(key: K, value: V): Changes = {
// break the recursion on super when key is found and transition to recursive remove-from-set // break the recursion on super when key is found and transition to recursive remove-from-set
if (sc.isEqual(key, this.key)) removeValue(value) else super.innerRemoveValue(key, value) if (sc.isEqual(key, this.key)) removeValue(value) else super.innerRemoveValue(key, value)
@ -47,9 +48,14 @@ object SubclassifiedIndex {
} else kids } else kids
} }
override def innerFindValues(key: K): Set[V] =
if (sc.isEqual(key, this.key)) values else super.innerFindValues(key)
override def toString = subkeys.mkString("Nonroot(" + key + ", " + values + ",\n", ",\n", ")") override def toString = subkeys.mkString("Nonroot(" + key + ", " + values + ",\n", ",\n", ")")
} }
private[SubclassifiedIndex] def emptyFindSet[V] = internalEmptyFindSet.asInstanceOf[Set[V]]
private[this] val internalEmptyFindSet = Set.empty[AnyRef]
private[SubclassifiedIndex] def emptyMergeMap[K, V] = internalEmptyMergeMap.asInstanceOf[Map[K, Set[V]]] private[SubclassifiedIndex] def emptyMergeMap[K, V] = internalEmptyMergeMap.asInstanceOf[Map[K, Set[V]]]
private[this] val internalEmptyMergeMap = Map[AnyRef, Set[AnyRef]]().withDefault(_ Set[AnyRef]()) private[this] val internalEmptyMergeMap = Map[AnyRef, Set[AnyRef]]().withDefault(_ Set[AnyRef]())
} }
@ -66,7 +72,7 @@ object SubclassifiedIndex {
* cache, e.g. HashMap, is faster than tree traversal which must use linear * cache, e.g. HashMap, is faster than tree traversal which must use linear
* scan at each level. Therefore, no value traversals are published. * scan at each level. Therefore, no value traversals are published.
*/ */
class SubclassifiedIndex[K, V] private (private var values: Set[V])(implicit sc: Subclassification[K]) { private[akka] class SubclassifiedIndex[K, V] private (private var values: Set[V])(implicit sc: Subclassification[K]) {
import SubclassifiedIndex._ import SubclassifiedIndex._
@ -76,9 +82,13 @@ class SubclassifiedIndex[K, V] private (private var values: Set[V])(implicit sc:
def this()(implicit sc: Subclassification[K]) = this(Set.empty) def this()(implicit sc: Subclassification[K]) = this(Set.empty)
protected val root = this
/** /**
* Add key to this index which inherits its value set from the most specific * Add key to this index which inherits its value set from the most specific
* super-class which is known. * super-class which is known.
*
* @return the diff that should be added to the cache
*/ */
def addKey(key: K): Changes = mergeChangesByKey(innerAddKey(key)) def addKey(key: K): Changes = mergeChangesByKey(innerAddKey(key))
@ -94,14 +104,15 @@ class SubclassifiedIndex[K, V] private (private var values: Set[V])(implicit sc:
} else Nil } else Nil
} }
if (!found) { if (!found) {
integrate(new Nonroot(key, values)) integrate(new Nonroot(root, key, values)) :+ ((key, values))
Seq((key, values))
} else ch } else ch
} }
/** /**
* Add value to all keys which are subclasses of the given key. If the key * Add value to all keys which are subclasses of the given key. If the key
* is not known yet, it is inserted as if using addKey. * is not known yet, it is inserted as if using addKey.
*
* @return the diff that should be added to the cache
*/ */
def addValue(key: K, value: V): Changes = mergeChangesByKey(innerAddValue(key, value)) def addValue(key: K, value: V): Changes = mergeChangesByKey(innerAddValue(key, value))
@ -115,17 +126,22 @@ class SubclassifiedIndex[K, V] private (private var values: Set[V])(implicit sc:
} }
if (!found) { if (!found) {
val v = values + value val v = values + value
val n = new Nonroot(key, v) val n = new Nonroot(root, key, v)
integrate(n) integrate(n) ++ n.innerAddValue(key, value) :+ (key -> v)
n.innerAddValue(key, value) :+ (key -> v)
} else ch } else ch
} }
/** /**
* Remove value from all keys which are subclasses of the given key. * Remove value from all keys which are subclasses of the given key.
* @return The keys and values that have been removed. *
* @return the complete changes that should be inserted in the cache
*/ */
def removeValue(key: K, value: V): Changes = mergeChangesByKey(innerRemoveValue(key, value)) def removeValue(key: K, value: V): Changes =
mergeChangesByKey(innerRemoveValue(key, value)) map {
case (k, v) (k, findValues(k))
}
// this will return the keys and values to be removed from the cache
protected def innerRemoveValue(key: K, value: V): Changes = { protected def innerRemoveValue(key: K, value: V): Changes = {
var found = false var found = false
val ch = subkeys flatMap { n val ch = subkeys flatMap { n
@ -135,25 +151,52 @@ class SubclassifiedIndex[K, V] private (private var values: Set[V])(implicit sc:
} else Nil } else Nil
} }
if (!found) { if (!found) {
val n = new Nonroot(key, values) val n = new Nonroot(root, key, values)
integrate(n) integrate(n) ++ n.removeValue(value)
n.removeValue(value)
} else ch } else ch
} }
/** /**
* Remove value from all keys in the index. * Remove value from all keys in the index.
*
* @return the diff that should be removed from the cache
*/ */
def removeValue(value: V): Changes = mergeChangesByKey(subkeys flatMap (_ removeValue value)) def removeValue(value: V): Changes = mergeChangesByKey(subkeys flatMap (_ removeValue value))
/**
* Find all values for a given key in the index.
*/
protected final def findValues(key: K): Set[V] = root.innerFindValues(key)
protected def innerFindValues(key: K): Set[V] =
(emptyFindSet[V] /: subkeys) { (s, n)
if (sc.isSubclass(key, n.key))
s ++ n.innerFindValues(key)
else
s
}
/**
* Find all subkeys of a given key in the index excluding some subkeys.
*/
protected final def findSubKeysExcept(key: K, except: Vector[Nonroot[K, V]]): Set[K] = root.innerFindSubKeys(key, except)
protected def innerFindSubKeys(key: K, except: Vector[Nonroot[K, V]]): Set[K] =
(emptyFindSet[K] /: subkeys) { (s, n)
if (sc.isEqual(key, n.key)) s
else n.innerFindSubKeys(key, except) ++ {
if (sc.isSubclass(n.key, key) && !except.exists(e sc.isEqual(key, e.key)))
s + n.key
else
s
}
}
override def toString = subkeys.mkString("SubclassifiedIndex(" + values + ",\n", ",\n", ")") override def toString = subkeys.mkString("SubclassifiedIndex(" + values + ",\n", ",\n", ")")
/** /**
* Add new Nonroot below this node and check all existing nodes for subclass relationship. * Add new Nonroot below this node and check all existing nodes for subclass relationship.
* * Also needs to find subkeys in other parts of the tree to compensate for multiple inheritance.
* @return true if and only if the new node has received subkeys during this operation.
*/ */
private def integrate(n: Nonroot[K, V]) { private def integrate(n: Nonroot[K, V]): Changes = {
val (subsub, sub) = subkeys partition (k sc.isSubclass(k.key, n.key)) val (subsub, sub) = subkeys partition (k sc.isSubclass(k.key, n.key))
if (sub.size == subkeys.size) { if (sub.size == subkeys.size) {
subkeys :+= n subkeys :+= n
@ -161,6 +204,8 @@ class SubclassifiedIndex[K, V] private (private var values: Set[V])(implicit sc:
n.subkeys = subsub n.subkeys = subsub
subkeys = sub :+ n subkeys = sub :+ n
} }
n.subkeys ++= findSubKeysExcept(n.key, n.subkeys).map(k new Nonroot(root, k, values))
n.subkeys.map(n (n.key, n.values.toSet))
} }
private def mergeChangesByKey(changes: Changes): Changes = private def mergeChangesByKey(changes: Changes): Changes =