pekko/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala

296 lines
8.1 KiB
Scala
Raw Normal View History

2011-07-28 15:48:03 +03:00
package akka.routing
import org.scalatest.WordSpec
import org.scalatest.matchers.MustMatchers
import akka.routing._
import java.util.concurrent.atomic.AtomicInteger
2011-07-28 15:48:03 +03:00
import akka.actor.Actor._
2011-07-28 16:56:35 +03:00
import akka.actor.{ ActorRef, Actor }
2011-07-28 15:48:03 +03:00
import collection.mutable.LinkedList
import akka.routing.Routing.Broadcast
2011-07-28 16:56:35 +03:00
import java.util.concurrent.{ CountDownLatch, TimeUnit }
object RoutingSpec {
2011-07-28 15:48:03 +03:00
class TestActor extends Actor with Serializable {
def receive = {
2011-07-28 16:56:35 +03:00
case _
2011-07-28 15:48:03 +03:00
println("Hello")
}
}
}
class RoutingSpec extends WordSpec with MustMatchers {
2011-07-28 15:48:03 +03:00
import akka.routing.RoutingSpec._
2011-07-28 15:48:03 +03:00
"direct router" must {
"be started when constructed" in {
2011-09-08 11:02:17 +02:00
val actor1 = Actor.actorOf[TestActor]
val props = RoutedProps(() new DirectRouter, List(actor1))
val actor = Routing.actorOf(props, "foo")
actor.isShutdown must be(false)
2011-07-28 15:48:03 +03:00
}
"throw IllegalArgumentException at construction when no connections" in {
try {
val props = RoutedProps(() new DirectRouter, List())
Routing.actorOf(props, "foo")
2011-07-28 15:48:03 +03:00
fail()
} catch {
2011-07-28 16:56:35 +03:00
case e: IllegalArgumentException
2011-07-28 15:48:03 +03:00
}
}
2011-07-28 15:48:03 +03:00
"send message to connection" in {
val doneLatch = new CountDownLatch(1)
val counter = new AtomicInteger(0)
val connection1 = actorOf(new Actor {
def receive = {
2011-07-28 16:56:35 +03:00
case "end" doneLatch.countDown()
case _ counter.incrementAndGet
}
2011-09-08 11:02:17 +02:00
})
val props = RoutedProps(() new DirectRouter, List(connection1))
val routedActor = Routing.actorOf(props, "foo")
2011-07-28 15:48:03 +03:00
routedActor ! "hello"
routedActor ! "end"
2011-07-28 15:48:03 +03:00
doneLatch.await(5, TimeUnit.SECONDS) must be(true)
2011-07-28 15:48:03 +03:00
counter.get must be(1)
}
2011-07-28 15:48:03 +03:00
"deliver a broadcast message" in {
val doneLatch = new CountDownLatch(1)
2011-07-28 15:48:03 +03:00
val counter1 = new AtomicInteger
val connection1 = actorOf(new Actor {
def receive = {
2011-07-28 16:56:35 +03:00
case "end" doneLatch.countDown()
case msg: Int counter1.addAndGet(msg)
2011-07-28 15:48:03 +03:00
}
2011-09-08 11:02:17 +02:00
})
val props = RoutedProps(() new DirectRouter, List(connection1))
val actor = Routing.actorOf(props, "foo")
2011-07-28 15:48:03 +03:00
actor ! Broadcast(1)
actor ! "end"
2011-07-28 15:48:03 +03:00
doneLatch.await(5, TimeUnit.SECONDS) must be(true)
2011-07-28 15:48:03 +03:00
counter1.get must be(1)
}
2011-07-28 15:48:03 +03:00
}
2011-07-28 15:48:03 +03:00
"round robin router" must {
2011-07-28 15:48:03 +03:00
"be started when constructed" in {
2011-09-08 11:02:17 +02:00
val actor1 = Actor.actorOf[TestActor]
val props = RoutedProps(() new RoundRobinRouter, List(actor1))
val actor = Routing.actorOf(props, "foo")
actor.isShutdown must be(false)
2011-07-28 15:48:03 +03:00
}
2011-07-28 15:48:03 +03:00
"throw IllegalArgumentException at construction when no connections" in {
try {
val props = RoutedProps(() new RoundRobinRouter, List())
Routing.actorOf(props, "foo")
2011-07-28 15:48:03 +03:00
fail()
} catch {
2011-07-28 16:56:35 +03:00
case e: IllegalArgumentException
2011-07-28 15:48:03 +03:00
}
}
2011-07-28 15:48:03 +03:00
//In this test a bunch of actors are created and each actor has its own counter.
//to test round robin, the routed actor receives the following sequence of messages 1 2 3 .. 1 2 3 .. 1 2 3 which it
//uses to increment his counter.
//So after n iteration, the first actor his counter should be 1*n, the second 2*n etc etc.
"deliver messages in a round robin fashion" in {
val connectionCount = 10
val iterationCount = 10
val doneLatch = new CountDownLatch(connectionCount)
//lets create some connections.
var connections = new LinkedList[ActorRef]
var counters = new LinkedList[AtomicInteger]
2011-07-28 16:56:35 +03:00
for (i 0 until connectionCount) {
2011-07-28 15:48:03 +03:00
counters = counters :+ new AtomicInteger()
val connection = actorOf(new Actor {
def receive = {
2011-07-28 16:56:35 +03:00
case "end" doneLatch.countDown()
case msg: Int counters.get(i).get.addAndGet(msg)
2011-07-28 15:48:03 +03:00
}
2011-09-08 11:02:17 +02:00
})
2011-07-28 15:48:03 +03:00
connections = connections :+ connection
}
2011-07-28 15:48:03 +03:00
//create the routed actor.
val props = RoutedProps(() new RoundRobinRouter, connections)
val actor = Routing.actorOf(props, "foo")
2011-07-28 15:48:03 +03:00
//send messages to the actor.
2011-07-28 16:56:35 +03:00
for (i 0 until iterationCount) {
for (k 0 until connectionCount) {
2011-07-28 15:48:03 +03:00
actor ! (k + 1)
}
}
2011-07-28 15:48:03 +03:00
actor ! Broadcast("end")
//now wait some and do validations.
doneLatch.await(5, TimeUnit.SECONDS) must be(true)
2011-03-16 12:37:48 +01:00
2011-07-28 16:56:35 +03:00
for (i 0 until connectionCount) {
2011-07-28 15:48:03 +03:00
val counter = counters.get(i).get
counter.get must be((iterationCount * (i + 1)))
}
2011-03-16 12:37:48 +01:00
}
2011-07-28 15:48:03 +03:00
"deliver a broadcast message using the !" in {
val doneLatch = new CountDownLatch(2)
2011-03-16 12:37:48 +01:00
2011-07-28 15:48:03 +03:00
val counter1 = new AtomicInteger
val connection1 = actorOf(new Actor {
def receive = {
2011-07-28 16:56:35 +03:00
case "end" doneLatch.countDown()
case msg: Int counter1.addAndGet(msg)
}
2011-09-08 11:02:17 +02:00
})
2011-03-16 12:37:48 +01:00
2011-07-28 15:48:03 +03:00
val counter2 = new AtomicInteger
val connection2 = actorOf(new Actor {
def receive = {
2011-07-28 16:56:35 +03:00
case "end" doneLatch.countDown()
case msg: Int counter2.addAndGet(msg)
}
2011-09-08 11:02:17 +02:00
})
val props = RoutedProps(() new RoundRobinRouter, List(connection1, connection2))
val actor = Routing.actorOf(props, "foo")
2011-07-28 15:48:03 +03:00
actor ! Broadcast(1)
actor ! Broadcast("end")
2011-07-28 15:48:03 +03:00
doneLatch.await(5, TimeUnit.SECONDS) must be(true)
2011-07-28 15:48:03 +03:00
counter1.get must be(1)
counter2.get must be(1)
}
2011-07-28 15:48:03 +03:00
"fail to deliver a broadcast message using the ?" in {
val doneLatch = new CountDownLatch(1)
2011-07-28 15:48:03 +03:00
val counter1 = new AtomicInteger
val connection1 = actorOf(new Actor {
def receive = {
2011-07-28 16:56:35 +03:00
case "end" doneLatch.countDown()
case _ counter1.incrementAndGet()
2011-02-28 22:55:02 +01:00
}
2011-09-08 11:02:17 +02:00
})
val props = RoutedProps(() new RoundRobinRouter, List(connection1))
val actor = Routing.actorOf(props, "foo")
try {
2011-07-28 15:48:03 +03:00
actor ? Broadcast(1)
fail()
} catch {
2011-07-28 16:56:35 +03:00
case e: RoutingException
}
2011-07-28 15:48:03 +03:00
actor ! "end"
doneLatch.await(5, TimeUnit.SECONDS) must be(true)
counter1.get must be(0)
}
2011-07-28 15:48:03 +03:00
}
2011-07-28 15:48:03 +03:00
"random router" must {
2011-07-28 15:48:03 +03:00
"be started when constructed" in {
2011-09-08 11:02:17 +02:00
val actor1 = Actor.actorOf[TestActor]
val props = RoutedProps(() new RandomRouter, List(actor1))
val actor = Routing.actorOf(props, "foo")
actor.isShutdown must be(false)
2011-07-28 15:48:03 +03:00
}
2011-07-28 15:48:03 +03:00
"throw IllegalArgumentException at construction when no connections" in {
try {
val props = RoutedProps(() new RandomRouter, List())
Routing.actorOf(props, "foo")
2011-07-28 15:48:03 +03:00
fail()
} catch {
2011-07-28 16:56:35 +03:00
case e: IllegalArgumentException
}
2011-07-28 15:48:03 +03:00
}
"deliver messages in a random fashion" ignore {
}
2011-07-28 15:48:03 +03:00
"deliver a broadcast message" in {
val doneLatch = new CountDownLatch(2)
2011-07-28 15:48:03 +03:00
val counter1 = new AtomicInteger
val connection1 = actorOf(new Actor {
def receive = {
2011-07-28 16:56:35 +03:00
case "end" doneLatch.countDown()
case msg: Int counter1.addAndGet(msg)
2011-02-28 22:55:02 +01:00
}
2011-09-08 11:02:17 +02:00
})
2011-07-28 15:48:03 +03:00
val counter2 = new AtomicInteger
val connection2 = actorOf(new Actor {
def receive = {
2011-07-28 16:56:35 +03:00
case "end" doneLatch.countDown()
case msg: Int counter2.addAndGet(msg)
2011-07-28 15:48:03 +03:00
}
2011-09-08 11:02:17 +02:00
})
val props = RoutedProps(() new RandomRouter, List(connection1, connection2))
val actor = Routing.actorOf(props, "foo")
2011-07-28 15:48:03 +03:00
actor ! Broadcast(1)
actor ! Broadcast("end")
2011-07-28 15:48:03 +03:00
doneLatch.await(5, TimeUnit.SECONDS) must be(true)
2011-07-28 15:48:03 +03:00
counter1.get must be(1)
counter2.get must be(1)
}
2011-07-28 15:48:03 +03:00
"fail to deliver a broadcast message using the ?" in {
val doneLatch = new CountDownLatch(1)
2011-07-28 15:48:03 +03:00
val counter1 = new AtomicInteger
val connection1 = actorOf(new Actor {
def receive = {
2011-07-28 16:56:35 +03:00
case "end" doneLatch.countDown()
case _ counter1.incrementAndGet()
2011-07-28 15:48:03 +03:00
}
2011-09-08 11:02:17 +02:00
})
val props = RoutedProps(() new RandomRouter, List(connection1))
val actor = Routing.actorOf(props, "foo")
2011-07-28 15:48:03 +03:00
try {
actor ? Broadcast(1)
fail()
} catch {
2011-07-28 16:56:35 +03:00
case e: RoutingException
}
2011-07-28 15:48:03 +03:00
actor ! "end"
doneLatch.await(5, TimeUnit.SECONDS) must be(true)
counter1.get must be(0)
}
2011-07-28 15:48:03 +03:00
}
}