diff --git a/akka-actor-tests/src/test/scala/akka/event/EventBusSpec.scala b/akka-actor-tests/src/test/scala/akka/event/EventBusSpec.scala index f4646a4b1d..b567ef30ab 100644 --- a/akka-actor-tests/src/test/scala/akka/event/EventBusSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/event/EventBusSpec.scala @@ -12,9 +12,15 @@ import akka.testkit._ import akka.util.duration._ import java.util.concurrent.atomic._ import akka.actor.{ Props, Actor, ActorRef } +import java.util.Comparator +import akka.japi.{ Procedure, Function } object EventBusSpec { - + class TestActorWrapperActor(testActor: ActorRef) extends Actor { + def receive = { + case x ⇒ testActor forward x + } + } } abstract class EventBusSpec(busName: String) extends WordSpec with MustMatchers with TestKit with BeforeAndAfterEach { @@ -87,6 +93,7 @@ abstract class EventBusSpec(busName: String) extends WordSpec with MustMatchers bus.subscribe(subscriber, classifier) bus.publish(event) expectMsg(event) + expectNoMsg(1 second) bus.unsubscribe(subscriber, classifier) } @@ -98,15 +105,17 @@ abstract class EventBusSpec(busName: String) extends WordSpec with MustMatchers expectMsg(event) expectMsg(event) expectMsg(event) + expectNoMsg(1 second) bus.unsubscribe(subscriber, classifier) } "publish the given event to all intended subscribers" in { - val subscribers = Vector.fill(10)(createNewSubscriber()) + val range = 0 until 10 + val subscribers = range map (_ ⇒ createNewSubscriber()) subscribers foreach { s ⇒ bus.subscribe(s, classifier) must be === true } bus.publish(event) - (1 to 10) foreach { _ ⇒ expectMsg(event) } - subscribers foreach disposeSubscriber + range foreach { _ ⇒ expectMsg(event) } + subscribers foreach { s ⇒ bus.unsubscribe(s, classifier) must be === true; disposeSubscriber(s) } } "not publish the given event to any other subscribers than the intended ones" in { @@ -135,29 +144,80 @@ abstract class EventBusSpec(busName: String) extends WordSpec with MustMatchers } object ActorEventBusSpec { - class ComposedActorEventBus extends ActorEventBus with LookupClassification with EventType[String] with ClassifierType[String] { - def classify(event: String) = event - def publish(event: String, subscriber: ActorRef) = subscriber ! event - } - - class TestActorWrapperActor(testActor: ActorRef) extends Actor { - def receive = { - case x ⇒ testActor forward x - } + class ComposedActorEventBus extends ActorEventBus with LookupClassification with EventType[Int] with ClassifierType[String] { + def classify(event: Event) = event.toString + protected def compareSubscribers(a: Subscriber, b: Subscriber): Int = a compareTo b + protected def mapSize = 32 + def publish(event: Event, subscriber: Subscriber) = subscriber ! event } } class ActorEventBusSpec extends EventBusSpec("ActorEventBus") { import akka.event.ActorEventBusSpec._ + import EventBusSpec.TestActorWrapperActor type BusType = ComposedActorEventBus def createNewEventBus(): BusType = new ComposedActorEventBus - def createEvents(numberOfEvents: Int) = (0 until numberOfEvents) map { _.toString } + def createEvents(numberOfEvents: Int) = (0 until numberOfEvents) def createSubscriber(pipeTo: ActorRef) = actorOf(Props(new TestActorWrapperActor(pipeTo))) - def classifierFor(event: BusType#Event) = event + def classifierFor(event: BusType#Event) = event.toString def disposeSubscriber(subscriber: BusType#Subscriber): Unit = subscriber.stop() } + +object ScanningEventBusSpec { + import akka.event.japi.ScanningEventBus + + class MyScanningEventBus extends ScanningEventBus[Int, akka.japi.Procedure[Int], String] { + protected def compareClassifiers(a: Classifier, b: Classifier): Int = a compareTo b + protected def compareSubscribers(a: Subscriber, b: Subscriber): Int = System.identityHashCode(a) - System.identityHashCode(b) + + protected def matches(classifier: Classifier, event: Event): Boolean = event.toString == classifier + + protected def publish(event: Event, subscriber: Subscriber): Unit = subscriber(event) + } +} + +class ScanningEventBusSpec extends EventBusSpec("ScanningEventBus") { + import ScanningEventBusSpec._ + + type BusType = MyScanningEventBus + + def createNewEventBus(): BusType = new MyScanningEventBus + + def createEvents(numberOfEvents: Int) = (0 until numberOfEvents) + + def createSubscriber(pipeTo: ActorRef) = new Procedure[Int] { def apply(i: Int) = pipeTo ! i } + + def classifierFor(event: BusType#Event) = event.toString + + def disposeSubscriber(subscriber: BusType#Subscriber): Unit = () +} + +object LookupEventBusSpec { + class MyLookupEventBus extends akka.event.japi.LookupEventBus[Int, akka.japi.Procedure[Int], String] { + protected def classify(event: Event): Classifier = event.toString + protected def compareSubscribers(a: Subscriber, b: Subscriber): Int = System.identityHashCode(a) - System.identityHashCode(b) + protected def mapSize = 32 + protected def publish(event: Event, subscriber: Subscriber): Unit = subscriber(event) + } +} + +class LookupEventBusSpec extends EventBusSpec("LookupEventBus") { + import LookupEventBusSpec._ + + type BusType = MyLookupEventBus + + def createNewEventBus(): BusType = new MyLookupEventBus + + def createEvents(numberOfEvents: Int) = (0 until numberOfEvents) + + def createSubscriber(pipeTo: ActorRef) = new Procedure[Int] { def apply(i: Int) = pipeTo ! i } + + def classifierFor(event: BusType#Event) = event.toString + + def disposeSubscriber(subscriber: BusType#Subscriber): Unit = () +} diff --git a/akka-actor/src/main/scala/akka/event/EventBus.scala b/akka-actor/src/main/scala/akka/event/EventBus.scala index 29bf3cd5a1..d6b2168046 100644 --- a/akka-actor/src/main/scala/akka/event/EventBus.scala +++ b/akka-actor/src/main/scala/akka/event/EventBus.scala @@ -42,7 +42,14 @@ trait ClassifierType[T] { self: EventBus ⇒ } trait LookupClassification { self: EventBus ⇒ - protected final val subscribers = new Index[Classifier, Subscriber] + + protected final val subscribers = new Index[Classifier, Subscriber](mapSize(), new Comparator[Subscriber] { + def compare(a: Subscriber, b: Subscriber): Int = compareSubscribers(a, b) + }) + + protected def mapSize(): Int + + protected def compareSubscribers(a: Subscriber, b: Subscriber): Int def subscribe(subscriber: Subscriber, to: Classifier): Boolean = subscribers.put(to, subscriber) def unsubscribe(subscriber: Subscriber, from: Classifier): Boolean = subscribers.remove(from, subscriber) @@ -52,12 +59,20 @@ trait LookupClassification { self: EventBus ⇒ protected def publish(event: Event, subscriber: Subscriber): Unit - def publish(event: Event): Unit = - subscribers.valueIterator(classify(event)).foreach(publish(event, _)) + def publish(event: Event): Unit = { + val i = subscribers.valueIterator(classify(event)) + while (i.hasNext) publish(event, i.next()) + } } trait ScanningClassification { self: EventBus ⇒ - protected final val subscribers = new ConcurrentSkipListSet[(Classifier, Subscriber)](ordering) + protected final val subscribers = new ConcurrentSkipListSet[(Classifier, Subscriber)](new Comparator[(Classifier, Subscriber)] { + def compare(a: (Classifier, Subscriber), b: (Classifier, Subscriber)): Int = { + val cM = compareClassifiers(a._1, b._1) + if (cM != 0) cM + else compareSubscribers(a._2, b._2) + } + }) def subscribe(subscriber: Subscriber, to: Classifier): Boolean = subscribers.add((to, subscriber)) def unsubscribe(subscriber: Subscriber, from: Classifier): Boolean = subscribers.remove((from, subscriber)) @@ -65,11 +80,13 @@ trait ScanningClassification { self: EventBus ⇒ val i = subscribers.iterator() while (i.hasNext) { val e = i.next() - if (subscriber == e._2) i.remove() + if (compareSubscribers(subscriber, e._2) == 0) i.remove() } } - protected def ordering: Comparator[(Classifier, Subscriber)] + protected def compareClassifiers(a: Classifier, b: Classifier): Int + + protected def compareSubscribers(a: Subscriber, b: Subscriber): Int protected def matches(classifier: Classifier, event: Event): Boolean @@ -79,7 +96,8 @@ trait ScanningClassification { self: EventBus ⇒ val currentSubscribers = subscribers.iterator() while (currentSubscribers.hasNext) { val (classifier, subscriber) = currentSubscribers.next() - if (matches(classifier, event)) publish(event, subscriber) + if (matches(classifier, event)) + publish(event, subscriber) } } } diff --git a/akka-actor/src/main/scala/akka/event/EventBusJavaAPI.scala b/akka-actor/src/main/scala/akka/event/EventBusJavaAPI.scala index 62b39e0c58..caf0005d25 100644 --- a/akka-actor/src/main/scala/akka/event/EventBusJavaAPI.scala +++ b/akka-actor/src/main/scala/akka/event/EventBusJavaAPI.scala @@ -1,9 +1,19 @@ -package akka.event +package akka.event.japi -/* - * Created by IntelliJ IDEA. - * User: viktorklang - * Date: 10/12/11 - * Time: 9:14 AM - */ -public scala class { } \ No newline at end of file +import akka.event._ + +abstract class LookupEventBus[E, S, C] extends EventBus with LookupClassification { + type Event = E + type Subscriber = S + type Classifier = C +} + +abstract class ScanningEventBus[E, S, C] extends EventBus with ScanningClassification { + type Event = E + type Subscriber = S + type Classifier = C +} + +abstract class ActorEventBus[E] extends akka.event.ActorEventBus with ActorClassification with ActorClassifier { + +} \ No newline at end of file