From b2d548bd0e136602dee7657d98d200c421ff5d47 Mon Sep 17 00:00:00 2001 From: Roland Date: Thu, 10 Nov 2011 00:26:53 +0100 Subject: [PATCH] implement SubchannelClassification in MainBus, fixes #1340 --- .../test/scala/akka/event/MainBusSpec.scala | 33 ++++ .../src/main/scala/akka/event/EventBus.scala | 73 ++++++++- .../src/main/scala/akka/event/MainBus.scala | 18 +-- .../src/main/scala/akka/util/Index.scala | 1 - .../scala/akka/util/SubclassifiedIndex.scala | 151 ++++++++++++++++++ 5 files changed, 261 insertions(+), 15 deletions(-) create mode 100644 akka-actor/src/main/scala/akka/util/SubclassifiedIndex.scala diff --git a/akka-actor-tests/src/test/scala/akka/event/MainBusSpec.scala b/akka-actor-tests/src/test/scala/akka/event/MainBusSpec.scala index 16e735542b..2116f49151 100644 --- a/akka-actor-tests/src/test/scala/akka/event/MainBusSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/event/MainBusSpec.scala @@ -21,6 +21,12 @@ object MainBusSpec { case e: Logging.LogEvent ⇒ dst ! e } } + + // class hierarchy for subchannel test + class A + class B1 extends A + class B2 extends A + class C extends B1 } class MainBusSpec extends AkkaSpec(Configuration( @@ -62,6 +68,33 @@ class MainBusSpec extends AkkaSpec(Configuration( } } + "manage sub-channels" in { + val a = new A + val b1 = new B1 + val b2 = new B2 + val c = new C + val bus = new MainBus(false) + bus.start(app) + within(2 seconds) { + bus.subscribe(testActor, classOf[B2]) === true + bus.publish(c) + bus.publish(b2) + expectMsg(b2) + bus.subscribe(testActor, classOf[A]) === true + bus.publish(c) + expectMsg(c) + bus.publish(b1) + expectMsg(b1) + bus.unsubscribe(testActor, classOf[B1]) === true + bus.publish(c) + bus.publish(b2) + bus.publish(a) + expectMsg(b2) + expectMsg(a) + expectNoMsg + } + } + } private def verifyLevel(bus: LoggingBus, level: Logging.LogLevel) { diff --git a/akka-actor/src/main/scala/akka/event/EventBus.scala b/akka-actor/src/main/scala/akka/event/EventBus.scala index a337fd1cbf..b76eaab9d1 100644 --- a/akka-actor/src/main/scala/akka/event/EventBus.scala +++ b/akka-actor/src/main/scala/akka/event/EventBus.scala @@ -8,6 +8,7 @@ import akka.actor.ActorRef import akka.util.Index import java.util.concurrent.ConcurrentSkipListSet import java.util.Comparator +import akka.util.{ Subclassification, SubclassifiedIndex } /** * Represents the base type for EventBuses @@ -54,14 +55,14 @@ trait ActorEventBus extends EventBus { /** * Can be mixed into an EventBus to specify that the Classifier type is ActorRef */ -trait ActorClassifier { self: EventBus ⇒ +trait ActorClassifier { this: EventBus ⇒ type Classifier = ActorRef } /** * Can be mixed into an EventBus to specify that the Classifier type is a Function from Event to Boolean (predicate) */ -trait PredicateClassifier { self: EventBus ⇒ +trait PredicateClassifier { this: EventBus ⇒ type Classifier = Event ⇒ Boolean } @@ -71,7 +72,7 @@ trait PredicateClassifier { self: EventBus ⇒ * * The compareSubscribers need to provide a total ordering of the Subscribers */ -trait LookupClassification { self: EventBus ⇒ +trait LookupClassification { this: EventBus ⇒ protected final val subscribers = new Index[Classifier, Subscriber](mapSize(), new Comparator[Subscriber] { def compare(a: Subscriber, b: Subscriber): Int = compareSubscribers(a, b) @@ -109,6 +110,70 @@ trait LookupClassification { self: EventBus ⇒ } } +/** + * Classification which respects relationships between channels: subscribing + * to one channel automatically and idempotently subscribes to all sub-channels. + */ +trait SubchannelClassification { this: EventBus ⇒ + + implicit val subclassification: Subclassification[Classifier] + + // must be lazy to avoid initialization order problem with subclassification + private lazy val subscriptions = new SubclassifiedIndex[Classifier, Subscriber]() + + @volatile + private var cache = Map.empty[Classifier, Set[Subscriber]] + + protected def subscribers = cache.values.flatten + + /** + * Returns the Classifier associated with the given Event + */ + protected def classify(event: Event): Classifier + + /** + * Publishes the given Event to the given Subscriber + */ + protected def publish(event: Event, subscriber: Subscriber): Unit + + def subscribe(subscriber: Subscriber, to: Classifier): Boolean = subscriptions.synchronized { + val diff = subscriptions.addValue(to, subscriber) + if (diff.isEmpty) false + else { + cache = cache ++ diff + true + } + } + + def unsubscribe(subscriber: Subscriber, from: Classifier): Boolean = subscriptions.synchronized { + val diff = subscriptions.removeValue(from, subscriber) + if (diff.isEmpty) false + else { + cache = cache ++ diff + true + } + } + + def unsubscribe(subscriber: Subscriber): Unit = subscriptions.synchronized { + val diff = subscriptions.removeValue(subscriber) + if (diff.nonEmpty) cache = cache ++ diff + } + + def publish(event: Event): Unit = { + val c = classify(event) + val recv = cache get c getOrElse { + subscriptions.synchronized { + cache get c getOrElse { + val diff = subscriptions.addKey(c) + cache = cache ++ diff + cache(c) + } + } + } + recv foreach (publish(event, _)) + } +} + /** * Maps Classifiers to Subscribers and selects which Subscriber should receive which publication through scanning through all Subscribers * through the matches(classifier, event) method @@ -169,7 +234,7 @@ trait ScanningClassification { self: EventBus ⇒ /** * Maps ActorRefs to ActorRefs to form an EventBus where ActorRefs can listen to other ActorRefs */ -trait ActorClassification { self: ActorEventBus with ActorClassifier ⇒ +trait ActorClassification { this: ActorEventBus with ActorClassifier ⇒ import java.util.concurrent.ConcurrentHashMap import scala.annotation.tailrec diff --git a/akka-actor/src/main/scala/akka/event/MainBus.scala b/akka-actor/src/main/scala/akka/event/MainBus.scala index 8a628e6edb..3dd33aa5eb 100644 --- a/akka-actor/src/main/scala/akka/event/MainBus.scala +++ b/akka-actor/src/main/scala/akka/event/MainBus.scala @@ -6,17 +6,21 @@ package akka.event import akka.actor.{ ActorRef, Actor, Props } import akka.AkkaApplication import akka.actor.Terminated +import akka.util.Subclassification -class MainBus(debug: Boolean = false) extends LoggingBus with LookupClassification { +class MainBus(debug: Boolean = false) extends LoggingBus with SubchannelClassification { type Event = AnyRef type Classifier = Class[_] + val subclassification = new Subclassification[Class[_]] { + def isEqual(x: Class[_], y: Class[_]) = x == y + def isSubclass(x: Class[_], y: Class[_]) = y isAssignableFrom x + } + @volatile private var reaper: ActorRef = _ - protected def mapSize = 16 - protected def classify(event: AnyRef): Class[_] = event.getClass protected def publish(event: AnyRef, subscriber: ActorRef) = subscriber ! event @@ -44,13 +48,7 @@ class MainBus(debug: Boolean = false) extends LoggingBus with LookupClassificati case Terminated(ref) ⇒ unsubscribe(ref) } }), "MainBusReaper") - subscribers.values foreach (reaper ! _) - } - - def printSubscribers: String = { - val sb = new StringBuilder - for (c ← subscribers.keys) sb.append(c + " -> " + subscribers.valueIterator(c).mkString("[", ", ", "]")) - sb.toString + subscribers foreach (reaper ! _) } } \ No newline at end of file diff --git a/akka-actor/src/main/scala/akka/util/Index.scala b/akka-actor/src/main/scala/akka/util/Index.scala index d87103b93c..3b0f68eabe 100644 --- a/akka-actor/src/main/scala/akka/util/Index.scala +++ b/akka-actor/src/main/scala/akka/util/Index.scala @@ -1,7 +1,6 @@ /** * Copyright (C) 2009-2011 Typesafe Inc. */ - package akka.util import annotation.tailrec diff --git a/akka-actor/src/main/scala/akka/util/SubclassifiedIndex.scala b/akka-actor/src/main/scala/akka/util/SubclassifiedIndex.scala new file mode 100644 index 0000000000..7768a73fd2 --- /dev/null +++ b/akka-actor/src/main/scala/akka/util/SubclassifiedIndex.scala @@ -0,0 +1,151 @@ +/** + * Copyright (C) 2009-2011 Typesafe Inc. + */ +package akka.util + +/** + * Typeclass which describes a classification hierarchy. Observe the contract between `isEqual` and `isSubclass`! + */ +trait Subclassification[K] { + /** + * True if and only if x and y are of the same class. + */ + def isEqual(x: K, y: K): Boolean + /** + * True if and only if x is a subclass of y; equal classes must be considered sub-classes! + */ + def isSubclass(x: K, y: K): Boolean +} + +object SubclassifiedIndex { + + class Nonroot[K, V](val key: K, _values: Set[V])(implicit sc: Subclassification[K]) extends SubclassifiedIndex[K, V](_values) { + + override def addValue(key: K, value: V): Changes = { + // break the recursion on super when key is found and transition to recursive add-to-set + if (sc.isEqual(key, this.key)) addValue(value) else super.addValue(key, value) + } + + private def addValue(value: V): Changes = { + val kids = subkeys flatMap (_ addValue value) + if (!(values contains value)) { + values += value + kids :+ (key, values) + } else kids + } + + override def removeValue(key: K, value: V): Changes = { + // break the recursion on super when key is found and transition to recursive remove-from-set + if (sc.isEqual(key, this.key)) removeValue(value) else super.removeValue(key, value) + } + + override def removeValue(value: V): Changes = { + val kids = subkeys flatMap (_ removeValue value) + if (values contains value) { + values -= value + kids :+ (key, values) + } else kids + } + + override def toString = subkeys.mkString("Nonroot(" + key + ", " + values + ",\n", ",\n", ")") + + } + +} + +/** + * Mutable index which respects sub-class relationships between keys: + * + * - adding a key inherits from super-class + * - adding values applies to all sub-classes + * - removing values applies to all sub-classes + * + * Currently this class is only used to keep the tree and return changed key- + * value-sets upon modification, since looking up the keys in an external + * cache, e.g. HashMap, is faster than tree traversal which must use linear + * scan at each level. Therefore, no value traversals are published. + */ +class SubclassifiedIndex[K, V] private (private var values: Set[V])(implicit sc: Subclassification[K]) { + + import SubclassifiedIndex._ + + type Changes = Seq[(K, Set[V])] + + protected var subkeys = Vector.empty[Nonroot[K, V]] + + def this()(implicit sc: Subclassification[K]) = this(Set.empty) + + /** + * Add key to this index which inherits its value set from the most specific + * super-class which is known. + */ + def addKey(key: K): Changes = { + for (n ← subkeys) { + if (sc.isEqual(n.key, key)) return Nil + else if (sc.isSubclass(key, n.key)) return n.addKey(key) + } + integrate(new Nonroot(key, values)) + (key, values) :: Nil + } + + /** + * Add value to all keys which are subclasses of the given key. If the key + * is not known yet, it is inserted as if using addKey. + */ + def addValue(key: K, value: V): Changes = { + var found = false + val ch = subkeys flatMap { n ⇒ + if (sc.isSubclass(key, n.key)) { + found = true + n.addValue(key, value) + } else Nil + } + if (!found) { + val v = values + value + val n = new Nonroot(key, v) + integrate(n) + n.addValue(key, value) :+ (key, v) + } else ch + } + + /** + * Remove value from all keys which are subclasses of the given key. + */ + def removeValue(key: K, value: V): Changes = { + var found = false + val ch = subkeys flatMap { n ⇒ + if (sc.isSubclass(key, n.key)) { + found = true + n.removeValue(key, value) + } else Nil + } + if (!found) { + val n = new Nonroot(key, values) + integrate(n) + n.removeValue(value) + } else ch + } + + /** + * Remove value from all keys in the index. + */ + def removeValue(value: V): Changes = subkeys flatMap (_ removeValue value) + + override def toString = subkeys.mkString("SubclassifiedIndex(" + values + ",\n", ",\n", ")") + + /** + * Add new Nonroot below this node and check all existing nodes for subclass relationship. + * + * @return true if and only if the new node has received subkeys during this operation. + */ + private def integrate(n: Nonroot[K, V]) { + val (subsub, sub) = subkeys partition (k ⇒ sc.isSubclass(k.key, n.key)) + if (sub.size == subkeys.size) { + subkeys :+= n + } else { + n.subkeys = subsub + subkeys = sub :+ n + } + } + +} \ No newline at end of file