Documenting the EventBus API and removing some superflous/premature traits
This commit is contained in:
parent
aa1c636a6b
commit
44e1562350
3 changed files with 117 additions and 23 deletions
|
|
@ -144,7 +144,10 @@ abstract class EventBusSpec(busName: String) extends WordSpec with MustMatchers
|
|||
}
|
||||
|
||||
object ActorEventBusSpec {
|
||||
class ComposedActorEventBus extends ActorEventBus with LookupClassification with EventType[Int] with ClassifierType[String] {
|
||||
class ComposedActorEventBus extends ActorEventBus with LookupClassification {
|
||||
type Event = Int
|
||||
type Classifier = String
|
||||
|
||||
def classify(event: Event) = event.toString
|
||||
protected def compareSubscribers(a: Subscriber, b: Subscriber): Int = a compareTo b
|
||||
protected def mapSize = 32
|
||||
|
|
|
|||
|
|
@ -9,62 +9,111 @@ import akka.util.Index
|
|||
import java.util.concurrent.ConcurrentSkipListSet
|
||||
import java.util.Comparator
|
||||
|
||||
/**
|
||||
* Represents the base type for EventBuses
|
||||
* Internally has an Event type, a Classifier type and a Subscriber type
|
||||
*
|
||||
* For the Java API, @see akka.event.japi.*
|
||||
*/
|
||||
trait EventBus {
|
||||
type Event
|
||||
type Classifier
|
||||
type Subscriber
|
||||
|
||||
/**
|
||||
* Attempts to register the subscriber to the specified Classifier
|
||||
* @returns true if successful and false if not (because it was already subscribed to that Classifier, or otherwise)
|
||||
*/
|
||||
def subscribe(subscriber: Subscriber, to: Classifier): Boolean
|
||||
|
||||
/**
|
||||
* Attempts to deregister the subscriber from the specified Classifier
|
||||
* @returns true if successful and false if not (because it wasn't subscribed to that Classifier, or otherwise)
|
||||
*/
|
||||
def unsubscribe(subscriber: Subscriber, from: Classifier): Boolean
|
||||
|
||||
/**
|
||||
* Attempts to deregister the subscriber from all Classifiers it may be subscribed to
|
||||
*/
|
||||
def unsubscribe(subscriber: Subscriber): Unit
|
||||
|
||||
/**
|
||||
* Publishes the specified Event to this bus
|
||||
*/
|
||||
def publish(event: Event): Unit
|
||||
}
|
||||
|
||||
/**
|
||||
* Represents an EventBus where the Subscriber type is ActorRef
|
||||
*/
|
||||
trait ActorEventBus extends EventBus {
|
||||
type Subscriber = ActorRef
|
||||
}
|
||||
|
||||
/**
|
||||
* Can be mixed into an EventBus to specify that the Classifier type is ActorRef
|
||||
*/
|
||||
trait ActorClassifier { self: EventBus ⇒
|
||||
type Classifier = ActorRef
|
||||
}
|
||||
|
||||
/**
|
||||
* Can be mixed into an EventBus to specify that the Classifier type is a Function from Event to Boolean (predicate)
|
||||
*/
|
||||
trait PredicateClassifier { self: EventBus ⇒
|
||||
type Classifier = Event ⇒ Boolean
|
||||
}
|
||||
|
||||
trait EventType[T] { self: EventBus ⇒
|
||||
type Event = T
|
||||
}
|
||||
|
||||
trait ClassifierType[T] { self: EventBus ⇒
|
||||
type Classifier = T
|
||||
}
|
||||
|
||||
/**
|
||||
* Maps Subscribers to Classifiers using equality on Classifier to store a Set of Subscribers (hence the need for compareSubscribers)
|
||||
* Maps Events to Classifiers through the classify-method (so it knows who to publish to)
|
||||
*
|
||||
* The compareSubscribers need to provide a total ordering of the Subscribers
|
||||
*/
|
||||
trait LookupClassification { self: EventBus ⇒
|
||||
|
||||
protected final val subscribers = new Index[Classifier, Subscriber](mapSize(), new Comparator[Subscriber] {
|
||||
def compare(a: Subscriber, b: Subscriber): Int = compareSubscribers(a, b)
|
||||
})
|
||||
|
||||
/**
|
||||
* This is a size hint for the number of Classifiers you expect to have (use powers of 2)
|
||||
*/
|
||||
protected def mapSize(): Int
|
||||
|
||||
/**
|
||||
* Provides a total ordering of Subscribers (think java.util.Comparator.compare)
|
||||
*/
|
||||
protected def compareSubscribers(a: Subscriber, b: Subscriber): Int
|
||||
|
||||
def subscribe(subscriber: Subscriber, to: Classifier): Boolean = subscribers.put(to, subscriber)
|
||||
def unsubscribe(subscriber: Subscriber, from: Classifier): Boolean = subscribers.remove(from, subscriber)
|
||||
def unsubscribe(subscriber: Subscriber): Unit = subscribers.removeValue(subscriber)
|
||||
|
||||
/**
|
||||
* Returns the Classifier associated with the given Event
|
||||
*/
|
||||
protected def classify(event: Event): Classifier
|
||||
|
||||
/**
|
||||
* Publishes the given Event to the given Subscriber
|
||||
*/
|
||||
protected def publish(event: Event, subscriber: Subscriber): Unit
|
||||
|
||||
def subscribe(subscriber: Subscriber, to: Classifier): Boolean = subscribers.put(to, subscriber)
|
||||
|
||||
def unsubscribe(subscriber: Subscriber, from: Classifier): Boolean = subscribers.remove(from, subscriber)
|
||||
|
||||
def unsubscribe(subscriber: Subscriber): Unit = subscribers.removeValue(subscriber)
|
||||
|
||||
def publish(event: Event): Unit = {
|
||||
val i = subscribers.valueIterator(classify(event))
|
||||
while (i.hasNext) publish(event, i.next())
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Maps Classifiers to Subscribers and selects which Subscriber should receive which publication through scanning through all Subscribers
|
||||
* through the matches(classifier, event) method
|
||||
*
|
||||
* Note: the compareClassifiers and compareSubscribers must together form an absolute ordering (think java.util.Comparator.compare)
|
||||
*/
|
||||
trait ScanningClassification { self: EventBus ⇒
|
||||
protected final val subscribers = new ConcurrentSkipListSet[(Classifier, Subscriber)](new Comparator[(Classifier, Subscriber)] {
|
||||
def compare(a: (Classifier, Subscriber), b: (Classifier, Subscriber)): Int = {
|
||||
|
|
@ -74,8 +123,30 @@ trait ScanningClassification { self: EventBus ⇒
|
|||
}
|
||||
})
|
||||
|
||||
/**
|
||||
* Provides a total ordering of Classifiers (think java.util.Comparator.compare)
|
||||
*/
|
||||
protected def compareClassifiers(a: Classifier, b: Classifier): Int
|
||||
|
||||
/**
|
||||
* Provides a total ordering of Subscribers (think java.util.Comparator.compare)
|
||||
*/
|
||||
protected def compareSubscribers(a: Subscriber, b: Subscriber): Int
|
||||
|
||||
/**
|
||||
* Returns whether the specified Classifier matches the specified Event
|
||||
*/
|
||||
protected def matches(classifier: Classifier, event: Event): Boolean
|
||||
|
||||
/**
|
||||
* Publishes the specified Event to the specified Subscriber
|
||||
*/
|
||||
protected def publish(event: Event, subscriber: Subscriber): Unit
|
||||
|
||||
def subscribe(subscriber: Subscriber, to: Classifier): Boolean = subscribers.add((to, subscriber))
|
||||
|
||||
def unsubscribe(subscriber: Subscriber, from: Classifier): Boolean = subscribers.remove((from, subscriber))
|
||||
|
||||
def unsubscribe(subscriber: Subscriber): Unit = {
|
||||
val i = subscribers.iterator()
|
||||
while (i.hasNext) {
|
||||
|
|
@ -84,14 +155,6 @@ trait ScanningClassification { self: EventBus ⇒
|
|||
}
|
||||
}
|
||||
|
||||
protected def compareClassifiers(a: Classifier, b: Classifier): Int
|
||||
|
||||
protected def compareSubscribers(a: Subscriber, b: Subscriber): Int
|
||||
|
||||
protected def matches(classifier: Classifier, event: Event): Boolean
|
||||
|
||||
protected def publish(event: Event, subscriber: Subscriber): Unit
|
||||
|
||||
def publish(event: Event): Unit = {
|
||||
val currentSubscribers = subscribers.iterator()
|
||||
while (currentSubscribers.hasNext) {
|
||||
|
|
@ -102,12 +165,13 @@ trait ScanningClassification { self: EventBus ⇒
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Maps ActorRefs to ActorRefs to form an EventBus where ActorRefs can listen to other ActorRefs
|
||||
*/
|
||||
trait ActorClassification { self: ActorEventBus with ActorClassifier ⇒
|
||||
import java.util.concurrent.ConcurrentHashMap
|
||||
import scala.annotation.tailrec
|
||||
|
||||
def mapSize: Int
|
||||
|
||||
protected val mappings = new ConcurrentHashMap[ActorRef, Vector[ActorRef]](mapSize)
|
||||
|
||||
@tailrec
|
||||
|
|
@ -188,8 +252,16 @@ trait ActorClassification { self: ActorEventBus with ActorClassifier ⇒
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the Classifier associated with the specified Event
|
||||
*/
|
||||
protected def classify(event: Event): Classifier
|
||||
|
||||
/**
|
||||
* This is a size hint for the number of Classifiers you expect to have (use powers of 2)
|
||||
*/
|
||||
protected def mapSize: Int
|
||||
|
||||
def publish(event: Event): Unit = mappings.get(classify(event)) match {
|
||||
case null ⇒
|
||||
case raw: Vector[_] ⇒
|
||||
|
|
|
|||
|
|
@ -2,18 +2,37 @@ package akka.event.japi
|
|||
|
||||
import akka.event._
|
||||
|
||||
/**
|
||||
* See documentation for akka.event.LookupClassification
|
||||
* E is the Event type
|
||||
* S is the Subscriber type
|
||||
* C is the Classifier type
|
||||
*/
|
||||
abstract class LookupEventBus[E, S, C] extends EventBus with LookupClassification {
|
||||
type Event = E
|
||||
type Subscriber = S
|
||||
type Classifier = C
|
||||
}
|
||||
|
||||
/**
|
||||
* See documentation for akka.event.ScanningClassification
|
||||
* E is the Event type
|
||||
* S is the Subscriber type
|
||||
* C is the Classifier type
|
||||
*/
|
||||
abstract class ScanningEventBus[E, S, C] extends EventBus with ScanningClassification {
|
||||
type Event = E
|
||||
type Subscriber = S
|
||||
type Classifier = C
|
||||
}
|
||||
|
||||
/**
|
||||
* See documentation for akka.event.ActorClassification
|
||||
* An EventBus where the Subscribers are ActorRefs and the Classifier is ActorRef
|
||||
* Means that ActorRefs "listen" to other ActorRefs
|
||||
* E is the Event type
|
||||
*/
|
||||
|
||||
abstract class ActorEventBus[E] extends akka.event.ActorEventBus with ActorClassification with ActorClassifier {
|
||||
|
||||
}
|
||||
Loading…
Add table
Add a link
Reference in a new issue