diff --git a/akka-actor-tests/src/test/scala/akka/routing/ConfiguredLocalRoutingSpec.scala b/akka-actor-tests/src/test/scala/akka/routing/ConfiguredLocalRoutingSpec.scala new file mode 100644 index 0000000000..d050f37046 --- /dev/null +++ b/akka-actor-tests/src/test/scala/akka/routing/ConfiguredLocalRoutingSpec.scala @@ -0,0 +1,343 @@ +package akka.routing + +import org.scalatest.WordSpec +import org.scalatest.matchers.MustMatchers + +import akka.actor._ +import Actor._ +import DeploymentConfig._ +import akka.routing._ +import Routing.Broadcast + +import java.util.concurrent.atomic.AtomicInteger +import java.util.concurrent.{ CountDownLatch, TimeUnit } + +class ConfiguredLocalRoutingSpec extends WordSpec with MustMatchers { + + "direct router" must { + + "be able to shut down its instance" in { + val address = "direct-0" + + Deployer.deploy( + Deploy( + address, + None, + Direct, + ReplicationFactor(1), + RemoveConnectionOnFirstFailureLocalFailureDetector, + LocalScope)) + + val helloLatch = new CountDownLatch(1) + val stopLatch = new CountDownLatch(1) + + val actor = actorOf(new Actor { + def receive = { + case "hello" ⇒ helloLatch.countDown() + } + + override def postStop() { + stopLatch.countDown() + } + }, address) + + actor ! "hello" + + helloLatch.await(5, TimeUnit.SECONDS) must be(true) + + actor.stop() + + stopLatch.await(5, TimeUnit.SECONDS) must be(true) + } + + "send message to connection" in { + val address = "direct-1" + + Deployer.deploy( + Deploy( + address, + None, + Direct, + ReplicationFactor(1), + RemoveConnectionOnFirstFailureLocalFailureDetector, + LocalScope)) + + val doneLatch = new CountDownLatch(1) + + val counter = new AtomicInteger(0) + val actor = actorOf(new Actor { + def receive = { + case "end" ⇒ doneLatch.countDown() + case _ ⇒ counter.incrementAndGet() + } + }, address) + + actor ! "hello" + actor ! "end" + + doneLatch.await(5, TimeUnit.SECONDS) must be(true) + + counter.get must be(1) + } + + "deliver a broadcast message" in { + val address = "direct-2" + + Deployer.deploy( + Deploy( + address, + None, + Direct, + ReplicationFactor(1), + RemoveConnectionOnFirstFailureLocalFailureDetector, + LocalScope)) + + val doneLatch = new CountDownLatch(1) + + val counter1 = new AtomicInteger + val actor = actorOf(new Actor { + def receive = { + case "end" ⇒ doneLatch.countDown() + case msg: Int ⇒ counter1.addAndGet(msg) + } + }, address) + + actor ! Broadcast(1) + actor ! "end" + + doneLatch.await(5, TimeUnit.SECONDS) must be(true) + + counter1.get must be(1) + } + } + + "round robin router" must { + + "be able to shut down its instance" in { + val address = "round-robin-0" + + Deployer.deploy( + Deploy( + address, + None, + RoundRobin, + ReplicationFactor(5), + RemoveConnectionOnFirstFailureLocalFailureDetector, + LocalScope)) + + val helloLatch = new CountDownLatch(5) + val stopLatch = new CountDownLatch(5) + + val actor = actorOf(new Actor { + def receive = { + case "hello" ⇒ helloLatch.countDown() + } + + override def postStop() { + stopLatch.countDown() + } + }, address) + + actor ! "hello" + actor ! "hello" + actor ! "hello" + actor ! "hello" + actor ! "hello" + helloLatch.await(5, TimeUnit.SECONDS) must be(true) + + actor.stop() + stopLatch.await(5, TimeUnit.SECONDS) must be(true) + } + + "deliver messages in a round robin fashion" in { + val address = "round-robin-1" + + Deployer.deploy( + Deploy( + address, + None, + RoundRobin, + ReplicationFactor(10), + RemoveConnectionOnFirstFailureLocalFailureDetector, + LocalScope)) + + val connectionCount = 10 + val iterationCount = 10 + val doneLatch = new CountDownLatch(connectionCount) + + val counter = new AtomicInteger + var replies = Map.empty[Int, Int] + for (i ← 0 until connectionCount) { + replies = replies + (i -> 0) + } + + val actor = actorOf(new Actor { + lazy val id = counter.getAndIncrement() + def receive = { + case "hit" ⇒ reply(id) + case "end" ⇒ doneLatch.countDown() + } + }, address) + + for (i ← 0 until iterationCount) { + for (k ← 0 until connectionCount) { + val id = (actor ? "hit").as[Int].getOrElse(fail("No id returned by actor")) + replies = replies + (id -> (replies(id) + 1)) + } + } + + counter.get must be(connectionCount) + + actor ! Broadcast("end") + doneLatch.await(5, TimeUnit.SECONDS) must be(true) + + replies.values foreach { _ must be(10) } + } + + "deliver a broadcast message using the !" in { + val address = "round-robin-2" + + Deployer.deploy( + Deploy( + address, + None, + RoundRobin, + ReplicationFactor(5), + RemoveConnectionOnFirstFailureLocalFailureDetector, + LocalScope)) + + val helloLatch = new CountDownLatch(5) + val stopLatch = new CountDownLatch(5) + + val actor = actorOf(new Actor { + def receive = { + case "hello" ⇒ helloLatch.countDown() + } + + override def postStop() { + stopLatch.countDown() + } + }, address) + + actor ! Broadcast("hello") + helloLatch.await(5, TimeUnit.SECONDS) must be(true) + + actor.stop() + stopLatch.await(5, TimeUnit.SECONDS) must be(true) + } + } + + "random router" must { + + "be able to shut down its instance" in { + val address = "random-0" + + Deployer.deploy( + Deploy( + address, + None, + Random, + ReplicationFactor(7), + RemoveConnectionOnFirstFailureLocalFailureDetector, + LocalScope)) + + val stopLatch = new CountDownLatch(7) + + val actor = actorOf(new Actor { + def receive = { + case "hello" ⇒ {} + } + + override def postStop() { + stopLatch.countDown() + } + }, address) + + actor ! "hello" + actor ! "hello" + actor ! "hello" + actor ! "hello" + actor ! "hello" + + actor.stop() + stopLatch.await(5, TimeUnit.SECONDS) must be(true) + } + + "deliver messages in a random fashion" in { + val address = "random-1" + + Deployer.deploy( + Deploy( + address, + None, + Random, + ReplicationFactor(10), + RemoveConnectionOnFirstFailureLocalFailureDetector, + LocalScope)) + + val connectionCount = 10 + val iterationCount = 10 + val doneLatch = new CountDownLatch(connectionCount) + + val counter = new AtomicInteger + var replies = Map.empty[Int, Int] + for (i ← 0 until connectionCount) { + replies = replies + (i -> 0) + } + + val actor = actorOf(new Actor { + lazy val id = counter.getAndIncrement() + def receive = { + case "hit" ⇒ reply(id) + case "end" ⇒ doneLatch.countDown() + } + }, address) + + for (i ← 0 until iterationCount) { + for (k ← 0 until connectionCount) { + val id = (actor ? "hit").as[Int].getOrElse(fail("No id returned by actor")) + replies = replies + (id -> (replies(id) + 1)) + } + } + + counter.get must be(connectionCount) + + actor ! Broadcast("end") + doneLatch.await(5, TimeUnit.SECONDS) must be(true) + + replies.values foreach { _ must be > (0) } + } + + "deliver a broadcast message using the !" in { + val address = "random-2" + + Deployer.deploy( + Deploy( + address, + None, + Random, + ReplicationFactor(6), + RemoveConnectionOnFirstFailureLocalFailureDetector, + LocalScope)) + + val helloLatch = new CountDownLatch(6) + val stopLatch = new CountDownLatch(6) + + val actor = actorOf(new Actor { + def receive = { + case "hello" ⇒ helloLatch.countDown() + } + + override def postStop() { + stopLatch.countDown() + } + }, address) + + actor ! Broadcast("hello") + helloLatch.await(5, TimeUnit.SECONDS) must be(true) + + actor.stop() + stopLatch.await(5, TimeUnit.SECONDS) must be(true) + } + } +}