diff --git a/akka-actor-tests/src/test/scala/akka/event/EventBusSpec.scala b/akka-actor-tests/src/test/scala/akka/event/EventBusSpec.scala index b567ef30ab..df36a59c09 100644 --- a/akka-actor-tests/src/test/scala/akka/event/EventBusSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/event/EventBusSpec.scala @@ -144,7 +144,10 @@ abstract class EventBusSpec(busName: String) extends WordSpec with MustMatchers } object ActorEventBusSpec { - class ComposedActorEventBus extends ActorEventBus with LookupClassification with EventType[Int] with ClassifierType[String] { + class ComposedActorEventBus extends ActorEventBus with LookupClassification { + type Event = Int + type Classifier = String + def classify(event: Event) = event.toString protected def compareSubscribers(a: Subscriber, b: Subscriber): Int = a compareTo b protected def mapSize = 32 diff --git a/akka-actor/src/main/scala/akka/event/EventBus.scala b/akka-actor/src/main/scala/akka/event/EventBus.scala index d6b2168046..33319fbb13 100644 --- a/akka-actor/src/main/scala/akka/event/EventBus.scala +++ b/akka-actor/src/main/scala/akka/event/EventBus.scala @@ -9,62 +9,111 @@ import akka.util.Index import java.util.concurrent.ConcurrentSkipListSet import java.util.Comparator +/** + * Represents the base type for EventBuses + * Internally has an Event type, a Classifier type and a Subscriber type + * + * For the Java API, @see akka.event.japi.* + */ trait EventBus { type Event type Classifier type Subscriber + /** + * Attempts to register the subscriber to the specified Classifier + * @returns true if successful and false if not (because it was already subscribed to that Classifier, or otherwise) + */ def subscribe(subscriber: Subscriber, to: Classifier): Boolean + + /** + * Attempts to deregister the subscriber from the specified Classifier + * @returns true if successful and false if not (because it wasn't subscribed to that Classifier, or otherwise) + */ def unsubscribe(subscriber: Subscriber, from: Classifier): Boolean + + /** + * Attempts to deregister the subscriber from all Classifiers it may be subscribed to + */ def unsubscribe(subscriber: Subscriber): Unit + /** + * Publishes the specified Event to this bus + */ def publish(event: Event): Unit } +/** + * Represents an EventBus where the Subscriber type is ActorRef + */ trait ActorEventBus extends EventBus { type Subscriber = ActorRef } +/** + * Can be mixed into an EventBus to specify that the Classifier type is ActorRef + */ trait ActorClassifier { self: EventBus ⇒ type Classifier = ActorRef } +/** + * Can be mixed into an EventBus to specify that the Classifier type is a Function from Event to Boolean (predicate) + */ trait PredicateClassifier { self: EventBus ⇒ type Classifier = Event ⇒ Boolean } -trait EventType[T] { self: EventBus ⇒ - type Event = T -} - -trait ClassifierType[T] { self: EventBus ⇒ - type Classifier = T -} - +/** + * Maps Subscribers to Classifiers using equality on Classifier to store a Set of Subscribers (hence the need for compareSubscribers) + * Maps Events to Classifiers through the classify-method (so it knows who to publish to) + * + * The compareSubscribers need to provide a total ordering of the Subscribers + */ trait LookupClassification { self: EventBus ⇒ protected final val subscribers = new Index[Classifier, Subscriber](mapSize(), new Comparator[Subscriber] { def compare(a: Subscriber, b: Subscriber): Int = compareSubscribers(a, b) }) + /** + * This is a size hint for the number of Classifiers you expect to have (use powers of 2) + */ protected def mapSize(): Int + /** + * Provides a total ordering of Subscribers (think java.util.Comparator.compare) + */ protected def compareSubscribers(a: Subscriber, b: Subscriber): Int - def subscribe(subscriber: Subscriber, to: Classifier): Boolean = subscribers.put(to, subscriber) - def unsubscribe(subscriber: Subscriber, from: Classifier): Boolean = subscribers.remove(from, subscriber) - def unsubscribe(subscriber: Subscriber): Unit = subscribers.removeValue(subscriber) - + /** + * Returns the Classifier associated with the given Event + */ protected def classify(event: Event): Classifier + /** + * Publishes the given Event to the given Subscriber + */ protected def publish(event: Event, subscriber: Subscriber): Unit + def subscribe(subscriber: Subscriber, to: Classifier): Boolean = subscribers.put(to, subscriber) + + def unsubscribe(subscriber: Subscriber, from: Classifier): Boolean = subscribers.remove(from, subscriber) + + def unsubscribe(subscriber: Subscriber): Unit = subscribers.removeValue(subscriber) + def publish(event: Event): Unit = { val i = subscribers.valueIterator(classify(event)) while (i.hasNext) publish(event, i.next()) } } +/** + * Maps Classifiers to Subscribers and selects which Subscriber should receive which publication through scanning through all Subscribers + * through the matches(classifier, event) method + * + * Note: the compareClassifiers and compareSubscribers must together form an absolute ordering (think java.util.Comparator.compare) + */ trait ScanningClassification { self: EventBus ⇒ protected final val subscribers = new ConcurrentSkipListSet[(Classifier, Subscriber)](new Comparator[(Classifier, Subscriber)] { def compare(a: (Classifier, Subscriber), b: (Classifier, Subscriber)): Int = { @@ -74,8 +123,30 @@ trait ScanningClassification { self: EventBus ⇒ } }) + /** + * Provides a total ordering of Classifiers (think java.util.Comparator.compare) + */ + protected def compareClassifiers(a: Classifier, b: Classifier): Int + + /** + * Provides a total ordering of Subscribers (think java.util.Comparator.compare) + */ + protected def compareSubscribers(a: Subscriber, b: Subscriber): Int + + /** + * Returns whether the specified Classifier matches the specified Event + */ + protected def matches(classifier: Classifier, event: Event): Boolean + + /** + * Publishes the specified Event to the specified Subscriber + */ + protected def publish(event: Event, subscriber: Subscriber): Unit + def subscribe(subscriber: Subscriber, to: Classifier): Boolean = subscribers.add((to, subscriber)) + def unsubscribe(subscriber: Subscriber, from: Classifier): Boolean = subscribers.remove((from, subscriber)) + def unsubscribe(subscriber: Subscriber): Unit = { val i = subscribers.iterator() while (i.hasNext) { @@ -84,14 +155,6 @@ trait ScanningClassification { self: EventBus ⇒ } } - protected def compareClassifiers(a: Classifier, b: Classifier): Int - - protected def compareSubscribers(a: Subscriber, b: Subscriber): Int - - protected def matches(classifier: Classifier, event: Event): Boolean - - protected def publish(event: Event, subscriber: Subscriber): Unit - def publish(event: Event): Unit = { val currentSubscribers = subscribers.iterator() while (currentSubscribers.hasNext) { @@ -102,12 +165,13 @@ trait ScanningClassification { self: EventBus ⇒ } } +/** + * Maps ActorRefs to ActorRefs to form an EventBus where ActorRefs can listen to other ActorRefs + */ trait ActorClassification { self: ActorEventBus with ActorClassifier ⇒ import java.util.concurrent.ConcurrentHashMap import scala.annotation.tailrec - def mapSize: Int - protected val mappings = new ConcurrentHashMap[ActorRef, Vector[ActorRef]](mapSize) @tailrec @@ -188,8 +252,16 @@ trait ActorClassification { self: ActorEventBus with ActorClassifier ⇒ } } + /** + * Returns the Classifier associated with the specified Event + */ protected def classify(event: Event): Classifier + /** + * This is a size hint for the number of Classifiers you expect to have (use powers of 2) + */ + protected def mapSize: Int + def publish(event: Event): Unit = mappings.get(classify(event)) match { case null ⇒ case raw: Vector[_] ⇒ diff --git a/akka-actor/src/main/scala/akka/event/EventBusJavaAPI.scala b/akka-actor/src/main/scala/akka/event/EventBusJavaAPI.scala index caf0005d25..669198c187 100644 --- a/akka-actor/src/main/scala/akka/event/EventBusJavaAPI.scala +++ b/akka-actor/src/main/scala/akka/event/EventBusJavaAPI.scala @@ -2,18 +2,37 @@ package akka.event.japi import akka.event._ +/** + * See documentation for akka.event.LookupClassification + * E is the Event type + * S is the Subscriber type + * C is the Classifier type + */ abstract class LookupEventBus[E, S, C] extends EventBus with LookupClassification { type Event = E type Subscriber = S type Classifier = C } +/** + * See documentation for akka.event.ScanningClassification + * E is the Event type + * S is the Subscriber type + * C is the Classifier type + */ abstract class ScanningEventBus[E, S, C] extends EventBus with ScanningClassification { type Event = E type Subscriber = S type Classifier = C } +/** + * See documentation for akka.event.ActorClassification + * An EventBus where the Subscribers are ActorRefs and the Classifier is ActorRef + * Means that ActorRefs "listen" to other ActorRefs + * E is the Event type + */ + abstract class ActorEventBus[E] extends akka.event.ActorEventBus with ActorClassification with ActorClassifier { } \ No newline at end of file