diff --git a/akka-actor-tests/src/test/scala/akka/event/EventStreamSpec.scala b/akka-actor-tests/src/test/scala/akka/event/EventStreamSpec.scala index c027d805d1..5447a1eb74 100644 --- a/akka-actor-tests/src/test/scala/akka/event/EventStreamSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/event/EventStreamSpec.scala @@ -5,13 +5,13 @@ package akka.event import language.postfixOps -import akka.testkit.AkkaSpec import scala.concurrent.util.duration._ import akka.actor.{ Actor, ActorRef, ActorSystemImpl, ActorSystem, Props, UnhandledMessage } import com.typesafe.config.ConfigFactory import scala.collection.JavaConverters._ import akka.event.Logging.InitializeLogger import akka.pattern.gracefulStop +import akka.testkit.{ TestProbe, AkkaSpec } object EventStreamSpec { @@ -53,6 +53,12 @@ object EventStreamSpec { class B1 extends A class B2 extends A class C extends B1 + + trait T + trait AT extends T + trait BT extends T + class TA + class TAATBT extends TA with AT with BT } @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) @@ -117,7 +123,7 @@ class EventStreamSpec extends AkkaSpec(EventStreamSpec.config) { } } - "manage sub-channels" in { + "manage sub-channels using classes" in { val a = new A val b1 = new B1 val b2 = new B2 @@ -143,6 +149,89 @@ class EventStreamSpec extends AkkaSpec(EventStreamSpec.config) { } } + "manage sub-channels using classes and traits (update on subscribe)" in { + val es = new EventStream(false) + val tm1 = new TA + val tm2 = new TAATBT + val a1, a2, a3, a4 = TestProbe() + + es.subscribe(a1.ref, classOf[AT]) must be === true + es.subscribe(a2.ref, classOf[BT]) must be === true + es.subscribe(a3.ref, classOf[TA]) must be === true + es.subscribe(a4.ref, classOf[TAATBT]) must be === true + es.publish(tm1) + es.publish(tm2) + a1.expectMsgType[AT] must be === tm2 + a2.expectMsgType[BT] must be === tm2 + a3.expectMsgType[TA] must be === tm1 + a3.expectMsgType[TA] must be === tm2 + a4.expectMsgType[TAATBT] must be === tm2 + es.unsubscribe(a1.ref, classOf[AT]) must be === true + es.unsubscribe(a2.ref, classOf[BT]) must be === true + es.unsubscribe(a3.ref, classOf[TA]) must be === true + es.unsubscribe(a4.ref, classOf[TAATBT]) must be === true + } + + "manage sub-channels using classes and traits (update on unsubscribe)" in { + val es = new EventStream(false) + val tm1 = new TA + val tm2 = new TAATBT + val a1, a2, a3, a4 = TestProbe() + + es.subscribe(a1.ref, classOf[AT]) must be === true + es.subscribe(a2.ref, classOf[BT]) must be === true + es.subscribe(a3.ref, classOf[TA]) must be === true + es.subscribe(a4.ref, classOf[TAATBT]) must be === true + es.unsubscribe(a3.ref, classOf[TA]) must be === true + es.publish(tm1) + es.publish(tm2) + a1.expectMsgType[AT] must be === tm2 + a2.expectMsgType[BT] must be === tm2 + a3.expectNoMsg(1 second) + a4.expectMsgType[TAATBT] must be === tm2 + es.unsubscribe(a1.ref, classOf[AT]) must be === true + es.unsubscribe(a2.ref, classOf[BT]) must be === true + es.unsubscribe(a4.ref, classOf[TAATBT]) must be === true + } + + "manage sub-channels using classes and traits (update on unsubscribe all)" in { + val es = new EventStream(false) + val tm1 = new TA + val tm2 = new TAATBT + val a1, a2, a3, a4 = TestProbe() + + es.subscribe(a1.ref, classOf[AT]) must be === true + es.subscribe(a2.ref, classOf[BT]) must be === true + es.subscribe(a3.ref, classOf[TA]) must be === true + es.subscribe(a4.ref, classOf[TAATBT]) must be === true + es.unsubscribe(a3.ref) + es.publish(tm1) + es.publish(tm2) + a1.expectMsgType[AT] must be === tm2 + a2.expectMsgType[BT] must be === tm2 + a3.expectNoMsg(1 second) + a4.expectMsgType[TAATBT] must be === tm2 + es.unsubscribe(a1.ref, classOf[AT]) must be === true + es.unsubscribe(a2.ref, classOf[BT]) must be === true + es.unsubscribe(a4.ref, classOf[TAATBT]) must be === true + } + + "manage sub-channels using classes and traits (update on publish)" in { + val es = new EventStream(false) + val tm1 = new TA + val tm2 = new TAATBT + val a1, a2 = TestProbe() + + es.subscribe(a1.ref, classOf[AT]) must be === true + es.subscribe(a2.ref, classOf[BT]) must be === true + es.publish(tm1) + es.publish(tm2) + a1.expectMsgType[AT] must be === tm2 + a2.expectMsgType[BT] must be === tm2 + es.unsubscribe(a1.ref, classOf[AT]) must be === true + es.unsubscribe(a2.ref, classOf[BT]) must be === true + } + } private def verifyLevel(bus: LoggingBus, level: Logging.LogLevel) { @@ -150,7 +239,7 @@ class EventStreamSpec extends AkkaSpec(EventStreamSpec.config) { val allmsg = Seq(Debug("", null, "debug"), Info("", null, "info"), Warning("", null, "warning"), Error("", null, "error")) val msg = allmsg filter (_.level <= level) allmsg foreach bus.publish - msg foreach (x ⇒ expectMsg(x)) + msg foreach (expectMsg(_)) } } \ No newline at end of file diff --git a/akka-actor/src/main/scala/akka/event/EventBus.scala b/akka-actor/src/main/scala/akka/event/EventBus.scala index cad7351bbb..63436723bd 100644 --- a/akka-actor/src/main/scala/akka/event/EventBus.scala +++ b/akka-actor/src/main/scala/akka/event/EventBus.scala @@ -139,7 +139,7 @@ trait SubchannelClassification { this: EventBus ⇒ val diff = subscriptions.addValue(to, subscriber) if (diff.isEmpty) false else { - cache = cache ++ diff + cache ++= diff true } } @@ -148,16 +148,19 @@ trait SubchannelClassification { this: EventBus ⇒ val diff = subscriptions.removeValue(from, subscriber) if (diff.isEmpty) false else { - cache = cache ++ diff + removeFromCache(diff) true } } def unsubscribe(subscriber: Subscriber): Unit = subscriptions.synchronized { val diff = subscriptions.removeValue(subscriber) - if (diff.nonEmpty) cache = cache ++ diff + if (diff.nonEmpty) removeFromCache(diff) } + private def removeFromCache(changes: Seq[(Classifier, Set[Subscriber])]): Unit = + cache ++= changes map { case (c, s) ⇒ (c, cache.getOrElse(c, Set[Subscriber]()) -- s) } + def publish(event: Event): Unit = { val c = classify(event) val recv = @@ -166,7 +169,7 @@ trait SubchannelClassification { this: EventBus ⇒ if (cache contains c) cache(c) else { val diff = subscriptions.addKey(c) - cache = cache ++ diff + cache ++= diff cache(c) } } diff --git a/akka-actor/src/main/scala/akka/util/SubclassifiedIndex.scala b/akka-actor/src/main/scala/akka/util/SubclassifiedIndex.scala index f3623003ed..45a70be27c 100644 --- a/akka-actor/src/main/scala/akka/util/SubclassifiedIndex.scala +++ b/akka-actor/src/main/scala/akka/util/SubclassifiedIndex.scala @@ -21,9 +21,9 @@ object SubclassifiedIndex { class Nonroot[K, V](val key: K, _values: Set[V])(implicit sc: Subclassification[K]) extends SubclassifiedIndex[K, V](_values) { - override def addValue(key: K, value: V): Changes = { + override def innerAddValue(key: K, value: V): Changes = { // break the recursion on super when key is found and transition to recursive add-to-set - if (sc.isEqual(key, this.key)) addValue(value) else super.addValue(key, value) + if (sc.isEqual(key, this.key)) addValue(value) else super.innerAddValue(key, value) } private def addValue(value: V): Changes = { @@ -34,23 +34,24 @@ object SubclassifiedIndex { } else kids } - override def removeValue(key: K, value: V): Changes = { + override def innerRemoveValue(key: K, value: V): Changes = { // break the recursion on super when key is found and transition to recursive remove-from-set - if (sc.isEqual(key, this.key)) removeValue(value) else super.removeValue(key, value) + if (sc.isEqual(key, this.key)) removeValue(value) else super.innerRemoveValue(key, value) } override def removeValue(value: V): Changes = { val kids = subkeys flatMap (_ removeValue value) if (values contains value) { values -= value - kids :+ ((key, values)) + kids :+ ((key, Set(value))) } else kids } override def toString = subkeys.mkString("Nonroot(" + key + ", " + values + ",\n", ",\n", ")") - } + private[SubclassifiedIndex] def emptyMergeMap[K, V] = internalEmptyMergeMap.asInstanceOf[Map[K, Set[V]]] + private[this] val internalEmptyMergeMap = Map[AnyRef, Set[AnyRef]]().withDefault(_ ⇒ Set[AnyRef]()) } /** @@ -79,44 +80,58 @@ class SubclassifiedIndex[K, V] private (private var values: Set[V])(implicit sc: * Add key to this index which inherits its value set from the most specific * super-class which is known. */ - def addKey(key: K): Changes = - subkeys collectFirst { - case n if sc.isEqual(n.key, key) ⇒ Nil - case n if sc.isSubclass(key, n.key) ⇒ n.addKey(key) - } getOrElse { - integrate(new Nonroot(key, values)) - List((key, values)) + def addKey(key: K): Changes = mergeChangesByKey(innerAddKey(key)) + + protected def innerAddKey(key: K): Changes = { + var found = false + val ch = subkeys flatMap { n ⇒ + if (sc.isEqual(key, n.key)) { + found = true + Nil + } else if (sc.isSubclass(key, n.key)) { + found = true + n.innerAddKey(key) + } else Nil } + if (!found) { + integrate(new Nonroot(key, values)) + Seq((key, values)) + } else ch + } /** * Add value to all keys which are subclasses of the given key. If the key * is not known yet, it is inserted as if using addKey. */ - def addValue(key: K, value: V): Changes = { + def addValue(key: K, value: V): Changes = mergeChangesByKey(innerAddValue(key, value)) + + protected def innerAddValue(key: K, value: V): Changes = { var found = false val ch = subkeys flatMap { n ⇒ if (sc.isSubclass(key, n.key)) { found = true - n.addValue(key, value) + n.innerAddValue(key, value) } else Nil } if (!found) { val v = values + value val n = new Nonroot(key, v) integrate(n) - n.addValue(key, value) :+ ((key, v)) + n.innerAddValue(key, value) :+ (key -> v) } else ch } /** * Remove value from all keys which are subclasses of the given key. + * @return The keys and values that have been removed. */ - def removeValue(key: K, value: V): Changes = { + def removeValue(key: K, value: V): Changes = mergeChangesByKey(innerRemoveValue(key, value)) + protected def innerRemoveValue(key: K, value: V): Changes = { var found = false val ch = subkeys flatMap { n ⇒ if (sc.isSubclass(key, n.key)) { found = true - n.removeValue(key, value) + n.innerRemoveValue(key, value) } else Nil } if (!found) { @@ -129,7 +144,7 @@ class SubclassifiedIndex[K, V] private (private var values: Set[V])(implicit sc: /** * Remove value from all keys in the index. */ - def removeValue(value: V): Changes = subkeys flatMap (_ removeValue value) + def removeValue(value: V): Changes = mergeChangesByKey(subkeys flatMap (_ removeValue value)) override def toString = subkeys.mkString("SubclassifiedIndex(" + values + ",\n", ",\n", ")") @@ -148,4 +163,8 @@ class SubclassifiedIndex[K, V] private (private var values: Set[V])(implicit sc: } } -} \ No newline at end of file + private def mergeChangesByKey(changes: Changes): Changes = + (emptyMergeMap[K, V] /: changes) { + case (m, (k, s)) ⇒ m.updated(k, m(k) ++ s) + }.toSeq +}