implement SubchannelClassification in MainBus, fixes #1340

This commit is contained in:
Roland 2011-11-10 00:26:53 +01:00
parent 70ae4e1984
commit b2d548bd0e
5 changed files with 261 additions and 15 deletions

View file

@ -21,6 +21,12 @@ object MainBusSpec {
case e: Logging.LogEvent dst ! e
}
}
// class hierarchy for subchannel test
class A
class B1 extends A
class B2 extends A
class C extends B1
}
class MainBusSpec extends AkkaSpec(Configuration(
@ -62,6 +68,33 @@ class MainBusSpec extends AkkaSpec(Configuration(
}
}
"manage sub-channels" in {
val a = new A
val b1 = new B1
val b2 = new B2
val c = new C
val bus = new MainBus(false)
bus.start(app)
within(2 seconds) {
bus.subscribe(testActor, classOf[B2]) === true
bus.publish(c)
bus.publish(b2)
expectMsg(b2)
bus.subscribe(testActor, classOf[A]) === true
bus.publish(c)
expectMsg(c)
bus.publish(b1)
expectMsg(b1)
bus.unsubscribe(testActor, classOf[B1]) === true
bus.publish(c)
bus.publish(b2)
bus.publish(a)
expectMsg(b2)
expectMsg(a)
expectNoMsg
}
}
}
private def verifyLevel(bus: LoggingBus, level: Logging.LogLevel) {

View file

@ -8,6 +8,7 @@ import akka.actor.ActorRef
import akka.util.Index
import java.util.concurrent.ConcurrentSkipListSet
import java.util.Comparator
import akka.util.{ Subclassification, SubclassifiedIndex }
/**
* Represents the base type for EventBuses
@ -54,14 +55,14 @@ trait ActorEventBus extends EventBus {
/**
* Can be mixed into an EventBus to specify that the Classifier type is ActorRef
*/
trait ActorClassifier { self: EventBus
trait ActorClassifier { this: EventBus
type Classifier = ActorRef
}
/**
* Can be mixed into an EventBus to specify that the Classifier type is a Function from Event to Boolean (predicate)
*/
trait PredicateClassifier { self: EventBus
trait PredicateClassifier { this: EventBus
type Classifier = Event Boolean
}
@ -71,7 +72,7 @@ trait PredicateClassifier { self: EventBus ⇒
*
* The compareSubscribers need to provide a total ordering of the Subscribers
*/
trait LookupClassification { self: EventBus
trait LookupClassification { this: EventBus
protected final val subscribers = new Index[Classifier, Subscriber](mapSize(), new Comparator[Subscriber] {
def compare(a: Subscriber, b: Subscriber): Int = compareSubscribers(a, b)
@ -109,6 +110,70 @@ trait LookupClassification { self: EventBus ⇒
}
}
/**
* Classification which respects relationships between channels: subscribing
* to one channel automatically and idempotently subscribes to all sub-channels.
*/
trait SubchannelClassification { this: EventBus
implicit val subclassification: Subclassification[Classifier]
// must be lazy to avoid initialization order problem with subclassification
private lazy val subscriptions = new SubclassifiedIndex[Classifier, Subscriber]()
@volatile
private var cache = Map.empty[Classifier, Set[Subscriber]]
protected def subscribers = cache.values.flatten
/**
* Returns the Classifier associated with the given Event
*/
protected def classify(event: Event): Classifier
/**
* Publishes the given Event to the given Subscriber
*/
protected def publish(event: Event, subscriber: Subscriber): Unit
def subscribe(subscriber: Subscriber, to: Classifier): Boolean = subscriptions.synchronized {
val diff = subscriptions.addValue(to, subscriber)
if (diff.isEmpty) false
else {
cache = cache ++ diff
true
}
}
def unsubscribe(subscriber: Subscriber, from: Classifier): Boolean = subscriptions.synchronized {
val diff = subscriptions.removeValue(from, subscriber)
if (diff.isEmpty) false
else {
cache = cache ++ diff
true
}
}
def unsubscribe(subscriber: Subscriber): Unit = subscriptions.synchronized {
val diff = subscriptions.removeValue(subscriber)
if (diff.nonEmpty) cache = cache ++ diff
}
def publish(event: Event): Unit = {
val c = classify(event)
val recv = cache get c getOrElse {
subscriptions.synchronized {
cache get c getOrElse {
val diff = subscriptions.addKey(c)
cache = cache ++ diff
cache(c)
}
}
}
recv foreach (publish(event, _))
}
}
/**
* Maps Classifiers to Subscribers and selects which Subscriber should receive which publication through scanning through all Subscribers
* through the matches(classifier, event) method
@ -169,7 +234,7 @@ trait ScanningClassification { self: EventBus ⇒
/**
* Maps ActorRefs to ActorRefs to form an EventBus where ActorRefs can listen to other ActorRefs
*/
trait ActorClassification { self: ActorEventBus with ActorClassifier
trait ActorClassification { this: ActorEventBus with ActorClassifier
import java.util.concurrent.ConcurrentHashMap
import scala.annotation.tailrec

View file

@ -6,17 +6,21 @@ package akka.event
import akka.actor.{ ActorRef, Actor, Props }
import akka.AkkaApplication
import akka.actor.Terminated
import akka.util.Subclassification
class MainBus(debug: Boolean = false) extends LoggingBus with LookupClassification {
class MainBus(debug: Boolean = false) extends LoggingBus with SubchannelClassification {
type Event = AnyRef
type Classifier = Class[_]
val subclassification = new Subclassification[Class[_]] {
def isEqual(x: Class[_], y: Class[_]) = x == y
def isSubclass(x: Class[_], y: Class[_]) = y isAssignableFrom x
}
@volatile
private var reaper: ActorRef = _
protected def mapSize = 16
protected def classify(event: AnyRef): Class[_] = event.getClass
protected def publish(event: AnyRef, subscriber: ActorRef) = subscriber ! event
@ -44,13 +48,7 @@ class MainBus(debug: Boolean = false) extends LoggingBus with LookupClassificati
case Terminated(ref) unsubscribe(ref)
}
}), "MainBusReaper")
subscribers.values foreach (reaper ! _)
}
def printSubscribers: String = {
val sb = new StringBuilder
for (c subscribers.keys) sb.append(c + " -> " + subscribers.valueIterator(c).mkString("[", ", ", "]"))
sb.toString
subscribers foreach (reaper ! _)
}
}

View file

@ -1,7 +1,6 @@
/**
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.util
import annotation.tailrec

View file

@ -0,0 +1,151 @@
/**
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.util
/**
* Typeclass which describes a classification hierarchy. Observe the contract between `isEqual` and `isSubclass`!
*/
trait Subclassification[K] {
/**
* True if and only if x and y are of the same class.
*/
def isEqual(x: K, y: K): Boolean
/**
* True if and only if x is a subclass of y; equal classes must be considered sub-classes!
*/
def isSubclass(x: K, y: K): Boolean
}
object SubclassifiedIndex {
class Nonroot[K, V](val key: K, _values: Set[V])(implicit sc: Subclassification[K]) extends SubclassifiedIndex[K, V](_values) {
override def addValue(key: K, value: V): Changes = {
// break the recursion on super when key is found and transition to recursive add-to-set
if (sc.isEqual(key, this.key)) addValue(value) else super.addValue(key, value)
}
private def addValue(value: V): Changes = {
val kids = subkeys flatMap (_ addValue value)
if (!(values contains value)) {
values += value
kids :+ (key, values)
} else kids
}
override def removeValue(key: K, value: V): Changes = {
// break the recursion on super when key is found and transition to recursive remove-from-set
if (sc.isEqual(key, this.key)) removeValue(value) else super.removeValue(key, value)
}
override def removeValue(value: V): Changes = {
val kids = subkeys flatMap (_ removeValue value)
if (values contains value) {
values -= value
kids :+ (key, values)
} else kids
}
override def toString = subkeys.mkString("Nonroot(" + key + ", " + values + ",\n", ",\n", ")")
}
}
/**
* Mutable index which respects sub-class relationships between keys:
*
* - adding a key inherits from super-class
* - adding values applies to all sub-classes
* - removing values applies to all sub-classes
*
* Currently this class is only used to keep the tree and return changed key-
* value-sets upon modification, since looking up the keys in an external
* cache, e.g. HashMap, is faster than tree traversal which must use linear
* scan at each level. Therefore, no value traversals are published.
*/
class SubclassifiedIndex[K, V] private (private var values: Set[V])(implicit sc: Subclassification[K]) {
import SubclassifiedIndex._
type Changes = Seq[(K, Set[V])]
protected var subkeys = Vector.empty[Nonroot[K, V]]
def this()(implicit sc: Subclassification[K]) = this(Set.empty)
/**
* Add key to this index which inherits its value set from the most specific
* super-class which is known.
*/
def addKey(key: K): Changes = {
for (n subkeys) {
if (sc.isEqual(n.key, key)) return Nil
else if (sc.isSubclass(key, n.key)) return n.addKey(key)
}
integrate(new Nonroot(key, values))
(key, values) :: Nil
}
/**
* Add value to all keys which are subclasses of the given key. If the key
* is not known yet, it is inserted as if using addKey.
*/
def addValue(key: K, value: V): Changes = {
var found = false
val ch = subkeys flatMap { n
if (sc.isSubclass(key, n.key)) {
found = true
n.addValue(key, value)
} else Nil
}
if (!found) {
val v = values + value
val n = new Nonroot(key, v)
integrate(n)
n.addValue(key, value) :+ (key, v)
} else ch
}
/**
* Remove value from all keys which are subclasses of the given key.
*/
def removeValue(key: K, value: V): Changes = {
var found = false
val ch = subkeys flatMap { n
if (sc.isSubclass(key, n.key)) {
found = true
n.removeValue(key, value)
} else Nil
}
if (!found) {
val n = new Nonroot(key, values)
integrate(n)
n.removeValue(value)
} else ch
}
/**
* Remove value from all keys in the index.
*/
def removeValue(value: V): Changes = subkeys flatMap (_ removeValue value)
override def toString = subkeys.mkString("SubclassifiedIndex(" + values + ",\n", ",\n", ")")
/**
* Add new Nonroot below this node and check all existing nodes for subclass relationship.
*
* @return true if and only if the new node has received subkeys during this operation.
*/
private def integrate(n: Nonroot[K, V]) {
val (subsub, sub) = subkeys partition (k sc.isSubclass(k.key, n.key))
if (sub.size == subkeys.size) {
subkeys :+= n
} else {
n.subkeys = subsub
subkeys = sub :+ n
}
}
}