2011-10-11 17:41:25 +02:00
|
|
|
/**
|
|
|
|
|
* Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
|
|
|
|
|
*/
|
|
|
|
|
|
|
|
|
|
package akka.event
|
|
|
|
|
|
|
|
|
|
import org.scalatest.{ WordSpec, BeforeAndAfterAll, BeforeAndAfterEach }
|
|
|
|
|
import org.scalatest.matchers.MustMatchers
|
|
|
|
|
|
|
|
|
|
import akka.actor.Actor._
|
|
|
|
|
import akka.testkit._
|
|
|
|
|
import akka.util.duration._
|
|
|
|
|
import java.util.concurrent.atomic._
|
2011-10-11 18:12:57 +02:00
|
|
|
import akka.actor.{ Props, Actor, ActorRef }
|
2011-10-11 17:41:25 +02:00
|
|
|
|
|
|
|
|
object EventBusSpec {
|
|
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
abstract class EventBusSpec(busName: String) extends WordSpec with MustMatchers with TestKit with BeforeAndAfterEach {
|
|
|
|
|
import EventBusSpec._
|
|
|
|
|
type BusType <: EventBus
|
|
|
|
|
|
|
|
|
|
def createNewEventBus(): BusType
|
|
|
|
|
|
|
|
|
|
def createEvents(numberOfEvents: Int): Iterable[BusType#Event]
|
|
|
|
|
|
|
|
|
|
def createSubscriber(pipeTo: ActorRef): BusType#Subscriber
|
|
|
|
|
|
|
|
|
|
def classifierFor(event: BusType#Event): BusType#Classifier
|
|
|
|
|
|
|
|
|
|
def disposeSubscriber(subscriber: BusType#Subscriber): Unit
|
|
|
|
|
|
|
|
|
|
busName must {
|
|
|
|
|
|
|
|
|
|
def createNewSubscriber() = createSubscriber(testActor).asInstanceOf[bus.Subscriber]
|
|
|
|
|
def getClassifierFor(event: BusType#Event) = classifierFor(event).asInstanceOf[bus.Classifier]
|
2011-10-11 18:00:29 +02:00
|
|
|
def createNewEvents(numberOfEvents: Int): Iterable[bus.Event] = createEvents(numberOfEvents).asInstanceOf[Iterable[bus.Event]]
|
2011-10-11 17:41:25 +02:00
|
|
|
|
|
|
|
|
val bus = createNewEventBus()
|
2011-10-11 18:00:29 +02:00
|
|
|
val events = createNewEvents(100)
|
2011-10-11 17:41:25 +02:00
|
|
|
val event = events.head
|
|
|
|
|
val classifier = getClassifierFor(event)
|
|
|
|
|
val subscriber = createNewSubscriber()
|
|
|
|
|
|
|
|
|
|
"allow subscribers" in {
|
|
|
|
|
bus.subscribe(subscriber, classifier) must be === true
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
"allow to unsubscribe already existing subscriber" in {
|
|
|
|
|
bus.unsubscribe(subscriber, classifier) must be === true
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
"not allow to unsubscribe non-existing subscriber" in {
|
|
|
|
|
val sub = createNewSubscriber()
|
|
|
|
|
bus.unsubscribe(sub, classifier) must be === false
|
|
|
|
|
disposeSubscriber(sub)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
"not allow for the same subscriber to subscribe to the same channel twice" in {
|
|
|
|
|
bus.subscribe(subscriber, classifier) must be === true
|
|
|
|
|
bus.subscribe(subscriber, classifier) must be === false
|
|
|
|
|
bus.unsubscribe(subscriber, classifier) must be === true
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
"not allow for the same subscriber to unsubscribe to the same channel twice" in {
|
|
|
|
|
bus.subscribe(subscriber, classifier) must be === true
|
|
|
|
|
bus.unsubscribe(subscriber, classifier) must be === true
|
|
|
|
|
bus.unsubscribe(subscriber, classifier) must be === false
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
"allow to add multiple subscribers" in {
|
|
|
|
|
val subscribers = (1 to 10) map { _ ⇒ createNewSubscriber() }
|
|
|
|
|
val events = createEvents(10)
|
|
|
|
|
val classifiers = events map getClassifierFor
|
|
|
|
|
subscribers.zip(classifiers) forall { case (s, c) ⇒ bus.subscribe(s, c) } must be === true
|
|
|
|
|
subscribers.zip(classifiers) forall { case (s, c) ⇒ bus.unsubscribe(s, c) } must be === true
|
|
|
|
|
|
|
|
|
|
subscribers foreach disposeSubscriber
|
|
|
|
|
}
|
|
|
|
|
|
2011-10-11 18:00:29 +02:00
|
|
|
"publishing events without any subscribers shouldn't be a problem" in {
|
|
|
|
|
bus.publish(event)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
"publish the given event to the only subscriber" in {
|
|
|
|
|
bus.subscribe(subscriber, classifier)
|
|
|
|
|
bus.publish(event)
|
|
|
|
|
expectMsg(event)
|
|
|
|
|
bus.unsubscribe(subscriber, classifier)
|
|
|
|
|
}
|
|
|
|
|
|
2011-10-11 18:12:57 +02:00
|
|
|
"publish the given event to all intended subscribers" in {
|
|
|
|
|
val subscribers = Vector.fill(10)(createNewSubscriber())
|
|
|
|
|
subscribers foreach { s ⇒ bus.subscribe(s, classifier) must be === true }
|
|
|
|
|
bus.publish(event)
|
|
|
|
|
(1 to 10) foreach { _ ⇒ expectMsg(event) }
|
|
|
|
|
subscribers foreach disposeSubscriber
|
|
|
|
|
}
|
|
|
|
|
|
2011-10-11 18:00:29 +02:00
|
|
|
"not publish the given event to any other subscribers than the intended ones" in {
|
|
|
|
|
val otherSubscriber = createNewSubscriber()
|
|
|
|
|
val otherClassifier = getClassifierFor(events.drop(1).head)
|
|
|
|
|
bus.subscribe(subscriber, classifier)
|
|
|
|
|
bus.subscribe(otherSubscriber, otherClassifier)
|
|
|
|
|
bus.publish(event)
|
|
|
|
|
expectMsg(event)
|
|
|
|
|
bus.unsubscribe(subscriber, classifier)
|
|
|
|
|
bus.unsubscribe(otherSubscriber, otherClassifier)
|
2011-10-11 18:12:57 +02:00
|
|
|
expectNoMsg(1 second)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
"not publish the given event to a former subscriber" in {
|
|
|
|
|
bus.subscribe(subscriber, classifier)
|
|
|
|
|
bus.unsubscribe(subscriber, classifier)
|
|
|
|
|
bus.publish(event)
|
|
|
|
|
expectNoMsg(1 second)
|
2011-10-11 18:00:29 +02:00
|
|
|
}
|
|
|
|
|
|
2011-10-11 17:41:25 +02:00
|
|
|
"cleanup subscriber" in {
|
|
|
|
|
disposeSubscriber(subscriber)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
object ActorEventBusSpec {
|
|
|
|
|
class ComposedActorEventBus extends ActorEventBus with LookupClassification with EventType[String] with ClassifierType[String] {
|
|
|
|
|
def classify(event: String) = event.charAt(0).toString
|
|
|
|
|
def publish(event: String, subscriber: ActorRef) = subscriber ! event
|
|
|
|
|
}
|
2011-10-11 18:12:57 +02:00
|
|
|
|
|
|
|
|
class TestActorWrapperActor(testActor: ActorRef) extends Actor {
|
|
|
|
|
def receive = {
|
|
|
|
|
case x ⇒ testActor forward x
|
|
|
|
|
}
|
|
|
|
|
}
|
2011-10-11 17:41:25 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
class ActorEventBusSpec extends EventBusSpec("ActorEventBus") {
|
2011-10-11 18:12:57 +02:00
|
|
|
import akka.event.ActorEventBusSpec._
|
2011-10-11 17:41:25 +02:00
|
|
|
|
|
|
|
|
type BusType = ComposedActorEventBus
|
|
|
|
|
def createNewEventBus(): BusType = new ComposedActorEventBus
|
|
|
|
|
|
|
|
|
|
def createEvents(numberOfEvents: Int) = (0 until numberOfEvents) map { _.toString }
|
|
|
|
|
|
2011-10-11 18:12:57 +02:00
|
|
|
def createSubscriber(pipeTo: ActorRef) = actorOf(Props(new TestActorWrapperActor(pipeTo)))
|
2011-10-11 17:41:25 +02:00
|
|
|
|
|
|
|
|
def classifierFor(event: BusType#Event) = event.charAt(0).toString
|
|
|
|
|
|
2011-10-11 18:12:57 +02:00
|
|
|
def disposeSubscriber(subscriber: BusType#Subscriber): Unit = subscriber.stop()
|
2011-10-11 17:41:25 +02:00
|
|
|
}
|