Adding a Java API to EventBus and adding tests for the Java configurations
This commit is contained in:
parent
5318763e52
commit
d34e3d69ec
3 changed files with 118 additions and 30 deletions
|
|
@ -12,9 +12,15 @@ import akka.testkit._
|
||||||
import akka.util.duration._
|
import akka.util.duration._
|
||||||
import java.util.concurrent.atomic._
|
import java.util.concurrent.atomic._
|
||||||
import akka.actor.{ Props, Actor, ActorRef }
|
import akka.actor.{ Props, Actor, ActorRef }
|
||||||
|
import java.util.Comparator
|
||||||
|
import akka.japi.{ Procedure, Function }
|
||||||
|
|
||||||
object EventBusSpec {
|
object EventBusSpec {
|
||||||
|
class TestActorWrapperActor(testActor: ActorRef) extends Actor {
|
||||||
|
def receive = {
|
||||||
|
case x ⇒ testActor forward x
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
abstract class EventBusSpec(busName: String) extends WordSpec with MustMatchers with TestKit with BeforeAndAfterEach {
|
abstract class EventBusSpec(busName: String) extends WordSpec with MustMatchers with TestKit with BeforeAndAfterEach {
|
||||||
|
|
@ -87,6 +93,7 @@ abstract class EventBusSpec(busName: String) extends WordSpec with MustMatchers
|
||||||
bus.subscribe(subscriber, classifier)
|
bus.subscribe(subscriber, classifier)
|
||||||
bus.publish(event)
|
bus.publish(event)
|
||||||
expectMsg(event)
|
expectMsg(event)
|
||||||
|
expectNoMsg(1 second)
|
||||||
bus.unsubscribe(subscriber, classifier)
|
bus.unsubscribe(subscriber, classifier)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -98,15 +105,17 @@ abstract class EventBusSpec(busName: String) extends WordSpec with MustMatchers
|
||||||
expectMsg(event)
|
expectMsg(event)
|
||||||
expectMsg(event)
|
expectMsg(event)
|
||||||
expectMsg(event)
|
expectMsg(event)
|
||||||
|
expectNoMsg(1 second)
|
||||||
bus.unsubscribe(subscriber, classifier)
|
bus.unsubscribe(subscriber, classifier)
|
||||||
}
|
}
|
||||||
|
|
||||||
"publish the given event to all intended subscribers" in {
|
"publish the given event to all intended subscribers" in {
|
||||||
val subscribers = Vector.fill(10)(createNewSubscriber())
|
val range = 0 until 10
|
||||||
|
val subscribers = range map (_ ⇒ createNewSubscriber())
|
||||||
subscribers foreach { s ⇒ bus.subscribe(s, classifier) must be === true }
|
subscribers foreach { s ⇒ bus.subscribe(s, classifier) must be === true }
|
||||||
bus.publish(event)
|
bus.publish(event)
|
||||||
(1 to 10) foreach { _ ⇒ expectMsg(event) }
|
range foreach { _ ⇒ expectMsg(event) }
|
||||||
subscribers foreach disposeSubscriber
|
subscribers foreach { s ⇒ bus.unsubscribe(s, classifier) must be === true; disposeSubscriber(s) }
|
||||||
}
|
}
|
||||||
|
|
||||||
"not publish the given event to any other subscribers than the intended ones" in {
|
"not publish the given event to any other subscribers than the intended ones" in {
|
||||||
|
|
@ -135,29 +144,80 @@ abstract class EventBusSpec(busName: String) extends WordSpec with MustMatchers
|
||||||
}
|
}
|
||||||
|
|
||||||
object ActorEventBusSpec {
|
object ActorEventBusSpec {
|
||||||
class ComposedActorEventBus extends ActorEventBus with LookupClassification with EventType[String] with ClassifierType[String] {
|
class ComposedActorEventBus extends ActorEventBus with LookupClassification with EventType[Int] with ClassifierType[String] {
|
||||||
def classify(event: String) = event
|
def classify(event: Event) = event.toString
|
||||||
def publish(event: String, subscriber: ActorRef) = subscriber ! event
|
protected def compareSubscribers(a: Subscriber, b: Subscriber): Int = a compareTo b
|
||||||
}
|
protected def mapSize = 32
|
||||||
|
def publish(event: Event, subscriber: Subscriber) = subscriber ! event
|
||||||
class TestActorWrapperActor(testActor: ActorRef) extends Actor {
|
|
||||||
def receive = {
|
|
||||||
case x ⇒ testActor forward x
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
class ActorEventBusSpec extends EventBusSpec("ActorEventBus") {
|
class ActorEventBusSpec extends EventBusSpec("ActorEventBus") {
|
||||||
import akka.event.ActorEventBusSpec._
|
import akka.event.ActorEventBusSpec._
|
||||||
|
import EventBusSpec.TestActorWrapperActor
|
||||||
|
|
||||||
type BusType = ComposedActorEventBus
|
type BusType = ComposedActorEventBus
|
||||||
def createNewEventBus(): BusType = new ComposedActorEventBus
|
def createNewEventBus(): BusType = new ComposedActorEventBus
|
||||||
|
|
||||||
def createEvents(numberOfEvents: Int) = (0 until numberOfEvents) map { _.toString }
|
def createEvents(numberOfEvents: Int) = (0 until numberOfEvents)
|
||||||
|
|
||||||
def createSubscriber(pipeTo: ActorRef) = actorOf(Props(new TestActorWrapperActor(pipeTo)))
|
def createSubscriber(pipeTo: ActorRef) = actorOf(Props(new TestActorWrapperActor(pipeTo)))
|
||||||
|
|
||||||
def classifierFor(event: BusType#Event) = event
|
def classifierFor(event: BusType#Event) = event.toString
|
||||||
|
|
||||||
def disposeSubscriber(subscriber: BusType#Subscriber): Unit = subscriber.stop()
|
def disposeSubscriber(subscriber: BusType#Subscriber): Unit = subscriber.stop()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
object ScanningEventBusSpec {
|
||||||
|
import akka.event.japi.ScanningEventBus
|
||||||
|
|
||||||
|
class MyScanningEventBus extends ScanningEventBus[Int, akka.japi.Procedure[Int], String] {
|
||||||
|
protected def compareClassifiers(a: Classifier, b: Classifier): Int = a compareTo b
|
||||||
|
protected def compareSubscribers(a: Subscriber, b: Subscriber): Int = System.identityHashCode(a) - System.identityHashCode(b)
|
||||||
|
|
||||||
|
protected def matches(classifier: Classifier, event: Event): Boolean = event.toString == classifier
|
||||||
|
|
||||||
|
protected def publish(event: Event, subscriber: Subscriber): Unit = subscriber(event)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
class ScanningEventBusSpec extends EventBusSpec("ScanningEventBus") {
|
||||||
|
import ScanningEventBusSpec._
|
||||||
|
|
||||||
|
type BusType = MyScanningEventBus
|
||||||
|
|
||||||
|
def createNewEventBus(): BusType = new MyScanningEventBus
|
||||||
|
|
||||||
|
def createEvents(numberOfEvents: Int) = (0 until numberOfEvents)
|
||||||
|
|
||||||
|
def createSubscriber(pipeTo: ActorRef) = new Procedure[Int] { def apply(i: Int) = pipeTo ! i }
|
||||||
|
|
||||||
|
def classifierFor(event: BusType#Event) = event.toString
|
||||||
|
|
||||||
|
def disposeSubscriber(subscriber: BusType#Subscriber): Unit = ()
|
||||||
|
}
|
||||||
|
|
||||||
|
object LookupEventBusSpec {
|
||||||
|
class MyLookupEventBus extends akka.event.japi.LookupEventBus[Int, akka.japi.Procedure[Int], String] {
|
||||||
|
protected def classify(event: Event): Classifier = event.toString
|
||||||
|
protected def compareSubscribers(a: Subscriber, b: Subscriber): Int = System.identityHashCode(a) - System.identityHashCode(b)
|
||||||
|
protected def mapSize = 32
|
||||||
|
protected def publish(event: Event, subscriber: Subscriber): Unit = subscriber(event)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
class LookupEventBusSpec extends EventBusSpec("LookupEventBus") {
|
||||||
|
import LookupEventBusSpec._
|
||||||
|
|
||||||
|
type BusType = MyLookupEventBus
|
||||||
|
|
||||||
|
def createNewEventBus(): BusType = new MyLookupEventBus
|
||||||
|
|
||||||
|
def createEvents(numberOfEvents: Int) = (0 until numberOfEvents)
|
||||||
|
|
||||||
|
def createSubscriber(pipeTo: ActorRef) = new Procedure[Int] { def apply(i: Int) = pipeTo ! i }
|
||||||
|
|
||||||
|
def classifierFor(event: BusType#Event) = event.toString
|
||||||
|
|
||||||
|
def disposeSubscriber(subscriber: BusType#Subscriber): Unit = ()
|
||||||
|
}
|
||||||
|
|
|
||||||
|
|
@ -42,7 +42,14 @@ trait ClassifierType[T] { self: EventBus ⇒
|
||||||
}
|
}
|
||||||
|
|
||||||
trait LookupClassification { self: EventBus ⇒
|
trait LookupClassification { self: EventBus ⇒
|
||||||
protected final val subscribers = new Index[Classifier, Subscriber]
|
|
||||||
|
protected final val subscribers = new Index[Classifier, Subscriber](mapSize(), new Comparator[Subscriber] {
|
||||||
|
def compare(a: Subscriber, b: Subscriber): Int = compareSubscribers(a, b)
|
||||||
|
})
|
||||||
|
|
||||||
|
protected def mapSize(): Int
|
||||||
|
|
||||||
|
protected def compareSubscribers(a: Subscriber, b: Subscriber): Int
|
||||||
|
|
||||||
def subscribe(subscriber: Subscriber, to: Classifier): Boolean = subscribers.put(to, subscriber)
|
def subscribe(subscriber: Subscriber, to: Classifier): Boolean = subscribers.put(to, subscriber)
|
||||||
def unsubscribe(subscriber: Subscriber, from: Classifier): Boolean = subscribers.remove(from, subscriber)
|
def unsubscribe(subscriber: Subscriber, from: Classifier): Boolean = subscribers.remove(from, subscriber)
|
||||||
|
|
@ -52,12 +59,20 @@ trait LookupClassification { self: EventBus ⇒
|
||||||
|
|
||||||
protected def publish(event: Event, subscriber: Subscriber): Unit
|
protected def publish(event: Event, subscriber: Subscriber): Unit
|
||||||
|
|
||||||
def publish(event: Event): Unit =
|
def publish(event: Event): Unit = {
|
||||||
subscribers.valueIterator(classify(event)).foreach(publish(event, _))
|
val i = subscribers.valueIterator(classify(event))
|
||||||
|
while (i.hasNext) publish(event, i.next())
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
trait ScanningClassification { self: EventBus ⇒
|
trait ScanningClassification { self: EventBus ⇒
|
||||||
protected final val subscribers = new ConcurrentSkipListSet[(Classifier, Subscriber)](ordering)
|
protected final val subscribers = new ConcurrentSkipListSet[(Classifier, Subscriber)](new Comparator[(Classifier, Subscriber)] {
|
||||||
|
def compare(a: (Classifier, Subscriber), b: (Classifier, Subscriber)): Int = {
|
||||||
|
val cM = compareClassifiers(a._1, b._1)
|
||||||
|
if (cM != 0) cM
|
||||||
|
else compareSubscribers(a._2, b._2)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
def subscribe(subscriber: Subscriber, to: Classifier): Boolean = subscribers.add((to, subscriber))
|
def subscribe(subscriber: Subscriber, to: Classifier): Boolean = subscribers.add((to, subscriber))
|
||||||
def unsubscribe(subscriber: Subscriber, from: Classifier): Boolean = subscribers.remove((from, subscriber))
|
def unsubscribe(subscriber: Subscriber, from: Classifier): Boolean = subscribers.remove((from, subscriber))
|
||||||
|
|
@ -65,11 +80,13 @@ trait ScanningClassification { self: EventBus ⇒
|
||||||
val i = subscribers.iterator()
|
val i = subscribers.iterator()
|
||||||
while (i.hasNext) {
|
while (i.hasNext) {
|
||||||
val e = i.next()
|
val e = i.next()
|
||||||
if (subscriber == e._2) i.remove()
|
if (compareSubscribers(subscriber, e._2) == 0) i.remove()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
protected def ordering: Comparator[(Classifier, Subscriber)]
|
protected def compareClassifiers(a: Classifier, b: Classifier): Int
|
||||||
|
|
||||||
|
protected def compareSubscribers(a: Subscriber, b: Subscriber): Int
|
||||||
|
|
||||||
protected def matches(classifier: Classifier, event: Event): Boolean
|
protected def matches(classifier: Classifier, event: Event): Boolean
|
||||||
|
|
||||||
|
|
@ -79,7 +96,8 @@ trait ScanningClassification { self: EventBus ⇒
|
||||||
val currentSubscribers = subscribers.iterator()
|
val currentSubscribers = subscribers.iterator()
|
||||||
while (currentSubscribers.hasNext) {
|
while (currentSubscribers.hasNext) {
|
||||||
val (classifier, subscriber) = currentSubscribers.next()
|
val (classifier, subscriber) = currentSubscribers.next()
|
||||||
if (matches(classifier, event)) publish(event, subscriber)
|
if (matches(classifier, event))
|
||||||
|
publish(event, subscriber)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -1,9 +1,19 @@
|
||||||
package akka.event
|
package akka.event.japi
|
||||||
|
|
||||||
/*
|
import akka.event._
|
||||||
* Created by IntelliJ IDEA.
|
|
||||||
* User: viktorklang
|
abstract class LookupEventBus[E, S, C] extends EventBus with LookupClassification {
|
||||||
* Date: 10/12/11
|
type Event = E
|
||||||
* Time: 9:14 AM
|
type Subscriber = S
|
||||||
*/
|
type Classifier = C
|
||||||
public scala class { }
|
}
|
||||||
|
|
||||||
|
abstract class ScanningEventBus[E, S, C] extends EventBus with ScanningClassification {
|
||||||
|
type Event = E
|
||||||
|
type Subscriber = S
|
||||||
|
type Classifier = C
|
||||||
|
}
|
||||||
|
|
||||||
|
abstract class ActorEventBus[E] extends akka.event.ActorEventBus with ActorClassification with ActorClassifier {
|
||||||
|
|
||||||
|
}
|
||||||
Loading…
Add table
Add a link
Reference in a new issue