2012-01-31 21:19:28 +01:00
|
|
|
/**
|
|
|
|
|
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
|
|
|
|
|
*/
|
2011-09-28 18:15:39 +02:00
|
|
|
package akka.routing
|
|
|
|
|
|
2012-06-21 16:09:14 +02:00
|
|
|
import language.postfixOps
|
|
|
|
|
|
2011-09-28 18:15:39 +02:00
|
|
|
import java.util.concurrent.atomic.AtomicInteger
|
2012-01-31 21:19:28 +01:00
|
|
|
import org.junit.runner.RunWith
|
2012-06-13 17:57:56 +02:00
|
|
|
import akka.actor.{ Props, Deploy, Actor, ActorRef }
|
2012-05-16 17:04:13 +02:00
|
|
|
import akka.ConfigurationException
|
2011-12-12 22:50:08 +01:00
|
|
|
import akka.dispatch.Await
|
2012-04-03 16:04:14 +02:00
|
|
|
import akka.pattern.{ ask, gracefulStop }
|
2012-01-31 21:19:28 +01:00
|
|
|
import akka.testkit.{ TestLatch, ImplicitSender, DefaultTimeout, AkkaSpec }
|
2012-06-29 14:07:38 +02:00
|
|
|
import scala.concurrent.util.duration.intToDurationInt
|
2012-06-13 17:57:56 +02:00
|
|
|
import akka.actor.UnstartedCell
|
2011-09-28 18:15:39 +02:00
|
|
|
|
2012-01-17 17:28:57 +01:00
|
|
|
object ConfiguredLocalRoutingSpec {
|
|
|
|
|
val config = """
|
|
|
|
|
akka {
|
|
|
|
|
actor {
|
|
|
|
|
default-dispatcher {
|
2012-01-30 16:34:25 +01:00
|
|
|
executor = "thread-pool-executor"
|
|
|
|
|
thread-pool-executor {
|
|
|
|
|
core-pool-size-min = 8
|
|
|
|
|
core-pool-size-max = 16
|
|
|
|
|
}
|
2012-01-17 17:28:57 +01:00
|
|
|
}
|
2012-01-31 21:19:28 +01:00
|
|
|
deployment {
|
|
|
|
|
/config {
|
|
|
|
|
router = random
|
|
|
|
|
nr-of-instances = 4
|
|
|
|
|
}
|
2012-04-27 16:44:54 +02:00
|
|
|
/weird {
|
|
|
|
|
router = round-robin
|
|
|
|
|
nr-of-instances = 3
|
|
|
|
|
}
|
|
|
|
|
"/weird/*" {
|
|
|
|
|
router = round-robin
|
|
|
|
|
nr-of-instances = 2
|
|
|
|
|
}
|
2012-01-31 21:19:28 +01:00
|
|
|
}
|
2012-01-17 17:28:57 +01:00
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
"""
|
|
|
|
|
}
|
|
|
|
|
|
2011-10-21 17:01:22 +02:00
|
|
|
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
|
2012-01-17 17:28:57 +01:00
|
|
|
class ConfiguredLocalRoutingSpec extends AkkaSpec(ConfiguredLocalRoutingSpec.config) with DefaultTimeout with ImplicitSender {
|
2011-09-28 18:15:39 +02:00
|
|
|
|
2012-06-13 17:57:56 +02:00
|
|
|
def routerConfig(ref: ActorRef): RouterConfig = ref match {
|
|
|
|
|
case r: RoutedActorRef ⇒
|
|
|
|
|
r.underlying match {
|
|
|
|
|
case c: RoutedActorCell ⇒ c.routerConfig
|
|
|
|
|
case _: UnstartedCell ⇒ awaitCond(r.isStarted, 1 second, 10 millis); routerConfig(ref)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2011-12-12 23:31:15 +01:00
|
|
|
"RouterConfig" must {
|
2011-09-28 18:15:39 +02:00
|
|
|
|
2012-01-31 21:19:28 +01:00
|
|
|
"be picked up from Props" in {
|
|
|
|
|
val actor = system.actorOf(Props(new Actor {
|
|
|
|
|
def receive = {
|
|
|
|
|
case "get" ⇒ sender ! context.props
|
|
|
|
|
}
|
|
|
|
|
}).withRouter(RoundRobinRouter(12)), "someOther")
|
2012-06-13 17:57:56 +02:00
|
|
|
routerConfig(actor) must be === RoundRobinRouter(12)
|
2012-04-03 16:04:14 +02:00
|
|
|
Await.result(gracefulStop(actor, 3 seconds), 3 seconds)
|
2012-01-31 21:19:28 +01:00
|
|
|
}
|
|
|
|
|
|
2011-12-12 23:31:15 +01:00
|
|
|
"be overridable in config" in {
|
|
|
|
|
val actor = system.actorOf(Props(new Actor {
|
|
|
|
|
def receive = {
|
|
|
|
|
case "get" ⇒ sender ! context.props
|
|
|
|
|
}
|
2011-12-13 11:32:24 +01:00
|
|
|
}).withRouter(RoundRobinRouter(12)), "config")
|
2012-06-13 17:57:56 +02:00
|
|
|
routerConfig(actor) must be === RandomRouter(4)
|
2012-04-03 16:04:14 +02:00
|
|
|
Await.result(gracefulStop(actor, 3 seconds), 3 seconds)
|
2012-01-31 21:19:28 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
"be overridable in explicit deployment" in {
|
|
|
|
|
val actor = system.actorOf(Props(new Actor {
|
|
|
|
|
def receive = {
|
|
|
|
|
case "get" ⇒ sender ! context.props
|
|
|
|
|
}
|
|
|
|
|
}).withRouter(FromConfig).withDeploy(Deploy(routerConfig = RoundRobinRouter(12))), "someOther")
|
2012-06-13 17:57:56 +02:00
|
|
|
routerConfig(actor) must be === RoundRobinRouter(12)
|
2012-04-03 16:04:14 +02:00
|
|
|
Await.result(gracefulStop(actor, 3 seconds), 3 seconds)
|
2012-01-31 21:19:28 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
"be overridable in config even with explicit deployment" in {
|
|
|
|
|
val actor = system.actorOf(Props(new Actor {
|
|
|
|
|
def receive = {
|
|
|
|
|
case "get" ⇒ sender ! context.props
|
|
|
|
|
}
|
|
|
|
|
}).withRouter(FromConfig).withDeploy(Deploy(routerConfig = RoundRobinRouter(12))), "config")
|
2012-06-13 17:57:56 +02:00
|
|
|
routerConfig(actor) must be === RandomRouter(4)
|
2012-04-03 16:04:14 +02:00
|
|
|
Await.result(gracefulStop(actor, 3 seconds), 3 seconds)
|
2012-01-31 21:19:28 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
"fail with an exception if not correct" in {
|
|
|
|
|
intercept[ConfigurationException] {
|
|
|
|
|
system.actorOf(Props.empty.withRouter(FromConfig))
|
|
|
|
|
}
|
2011-12-12 23:31:15 +01:00
|
|
|
}
|
2011-09-28 18:15:39 +02:00
|
|
|
|
2012-04-27 16:44:54 +02:00
|
|
|
"not get confused when trying to wildcard-configure children" in {
|
|
|
|
|
val router = system.actorOf(Props(new Actor {
|
|
|
|
|
testActor ! self
|
|
|
|
|
def receive = { case _ ⇒ }
|
|
|
|
|
}).withRouter(FromConfig), "weird")
|
|
|
|
|
val recv = Set() ++ (for (_ ← 1 to 3) yield expectMsgType[ActorRef])
|
|
|
|
|
val expc = Set('a', 'b', 'c') map (i ⇒ system.actorFor("/user/weird/$" + i))
|
|
|
|
|
recv must be(expc)
|
|
|
|
|
expectNoMsg(1 second)
|
|
|
|
|
}
|
|
|
|
|
|
2011-12-12 23:31:15 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
"round robin router" must {
|
2011-09-28 18:15:39 +02:00
|
|
|
|
2011-12-12 23:31:15 +01:00
|
|
|
"be able to shut down its instance" in {
|
2011-12-19 15:05:33 +01:00
|
|
|
val helloLatch = new TestLatch(5)
|
|
|
|
|
val stopLatch = new TestLatch(5)
|
2011-09-28 18:15:39 +02:00
|
|
|
|
2011-11-16 17:18:36 +01:00
|
|
|
val actor = system.actorOf(Props(new Actor {
|
2011-09-28 18:15:39 +02:00
|
|
|
def receive = {
|
|
|
|
|
case "hello" ⇒ helloLatch.countDown()
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
override def postStop() {
|
|
|
|
|
stopLatch.countDown()
|
|
|
|
|
}
|
2011-12-13 11:32:24 +01:00
|
|
|
}).withRouter(RoundRobinRouter(5)), "round-robin-shutdown")
|
2011-09-28 18:15:39 +02:00
|
|
|
|
|
|
|
|
actor ! "hello"
|
|
|
|
|
actor ! "hello"
|
|
|
|
|
actor ! "hello"
|
|
|
|
|
actor ! "hello"
|
|
|
|
|
actor ! "hello"
|
2011-12-19 15:05:33 +01:00
|
|
|
Await.ready(helloLatch, 5 seconds)
|
2011-09-28 18:15:39 +02:00
|
|
|
|
2011-12-14 00:06:36 +01:00
|
|
|
system.stop(actor)
|
2011-12-19 15:05:33 +01:00
|
|
|
Await.ready(stopLatch, 5 seconds)
|
2011-09-28 18:15:39 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
"deliver messages in a round robin fashion" in {
|
|
|
|
|
val connectionCount = 10
|
|
|
|
|
val iterationCount = 10
|
2011-12-19 15:05:33 +01:00
|
|
|
val doneLatch = new TestLatch(connectionCount)
|
2011-09-28 18:15:39 +02:00
|
|
|
|
|
|
|
|
val counter = new AtomicInteger
|
|
|
|
|
var replies = Map.empty[Int, Int]
|
|
|
|
|
for (i ← 0 until connectionCount) {
|
2011-12-12 23:31:15 +01:00
|
|
|
replies += i -> 0
|
2011-09-28 18:15:39 +02:00
|
|
|
}
|
|
|
|
|
|
2011-11-16 17:18:36 +01:00
|
|
|
val actor = system.actorOf(Props(new Actor {
|
2011-09-28 18:15:39 +02:00
|
|
|
lazy val id = counter.getAndIncrement()
|
|
|
|
|
def receive = {
|
2011-10-22 16:06:20 +02:00
|
|
|
case "hit" ⇒ sender ! id
|
2011-09-28 18:15:39 +02:00
|
|
|
case "end" ⇒ doneLatch.countDown()
|
|
|
|
|
}
|
2011-12-13 11:32:24 +01:00
|
|
|
}).withRouter(RoundRobinRouter(connectionCount)), "round-robin")
|
2011-09-28 18:15:39 +02:00
|
|
|
|
|
|
|
|
for (i ← 0 until iterationCount) {
|
|
|
|
|
for (k ← 0 until connectionCount) {
|
2011-12-12 22:50:08 +01:00
|
|
|
val id = Await.result((actor ? "hit").mapTo[Int], timeout.duration)
|
2011-09-28 18:15:39 +02:00
|
|
|
replies = replies + (id -> (replies(id) + 1))
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
counter.get must be(connectionCount)
|
|
|
|
|
|
|
|
|
|
actor ! Broadcast("end")
|
2011-12-19 15:05:33 +01:00
|
|
|
Await.ready(doneLatch, 5 seconds)
|
2011-09-28 18:15:39 +02:00
|
|
|
|
2011-12-12 23:31:15 +01:00
|
|
|
replies.values foreach { _ must be(iterationCount) }
|
2011-09-28 18:15:39 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
"deliver a broadcast message using the !" in {
|
2011-12-19 15:05:33 +01:00
|
|
|
val helloLatch = new TestLatch(5)
|
|
|
|
|
val stopLatch = new TestLatch(5)
|
2011-09-28 18:15:39 +02:00
|
|
|
|
2011-11-16 17:18:36 +01:00
|
|
|
val actor = system.actorOf(Props(new Actor {
|
2011-09-28 18:15:39 +02:00
|
|
|
def receive = {
|
|
|
|
|
case "hello" ⇒ helloLatch.countDown()
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
override def postStop() {
|
|
|
|
|
stopLatch.countDown()
|
|
|
|
|
}
|
2011-12-13 11:32:24 +01:00
|
|
|
}).withRouter(RoundRobinRouter(5)), "round-robin-broadcast")
|
2011-09-28 18:15:39 +02:00
|
|
|
|
|
|
|
|
actor ! Broadcast("hello")
|
2011-12-19 15:05:33 +01:00
|
|
|
Await.ready(helloLatch, 5 seconds)
|
2011-09-28 18:15:39 +02:00
|
|
|
|
2011-12-14 00:06:36 +01:00
|
|
|
system.stop(actor)
|
2011-12-19 15:05:33 +01:00
|
|
|
Await.ready(stopLatch, 5 seconds)
|
2011-09-28 18:15:39 +02:00
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
"random router" must {
|
|
|
|
|
|
|
|
|
|
"be able to shut down its instance" in {
|
2011-12-19 15:05:33 +01:00
|
|
|
val stopLatch = new TestLatch(7)
|
2011-09-28 18:15:39 +02:00
|
|
|
|
2011-11-16 17:18:36 +01:00
|
|
|
val actor = system.actorOf(Props(new Actor {
|
2011-09-28 18:15:39 +02:00
|
|
|
def receive = {
|
2011-12-12 23:31:15 +01:00
|
|
|
case "hello" ⇒ sender ! "world"
|
2011-09-28 18:15:39 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
override def postStop() {
|
|
|
|
|
stopLatch.countDown()
|
|
|
|
|
}
|
2011-12-13 11:32:24 +01:00
|
|
|
}).withRouter(RandomRouter(7)), "random-shutdown")
|
2011-09-28 18:15:39 +02:00
|
|
|
|
|
|
|
|
actor ! "hello"
|
|
|
|
|
actor ! "hello"
|
|
|
|
|
actor ! "hello"
|
|
|
|
|
actor ! "hello"
|
|
|
|
|
actor ! "hello"
|
|
|
|
|
|
2011-12-12 23:31:15 +01:00
|
|
|
within(2 seconds) {
|
|
|
|
|
for (i ← 1 to 5) expectMsg("world")
|
|
|
|
|
}
|
|
|
|
|
|
2011-12-14 00:06:36 +01:00
|
|
|
system.stop(actor)
|
2011-12-19 15:05:33 +01:00
|
|
|
Await.ready(stopLatch, 5 seconds)
|
2011-09-28 18:15:39 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
"deliver messages in a random fashion" in {
|
|
|
|
|
val connectionCount = 10
|
|
|
|
|
val iterationCount = 10
|
2011-12-19 15:05:33 +01:00
|
|
|
val doneLatch = new TestLatch(connectionCount)
|
2011-09-28 18:15:39 +02:00
|
|
|
|
|
|
|
|
val counter = new AtomicInteger
|
|
|
|
|
var replies = Map.empty[Int, Int]
|
|
|
|
|
for (i ← 0 until connectionCount) {
|
|
|
|
|
replies = replies + (i -> 0)
|
|
|
|
|
}
|
|
|
|
|
|
2011-11-16 17:18:36 +01:00
|
|
|
val actor = system.actorOf(Props(new Actor {
|
2011-09-28 18:15:39 +02:00
|
|
|
lazy val id = counter.getAndIncrement()
|
|
|
|
|
def receive = {
|
2011-10-22 16:06:20 +02:00
|
|
|
case "hit" ⇒ sender ! id
|
2011-09-28 18:15:39 +02:00
|
|
|
case "end" ⇒ doneLatch.countDown()
|
|
|
|
|
}
|
2011-12-13 11:32:24 +01:00
|
|
|
}).withRouter(RandomRouter(connectionCount)), "random")
|
2011-09-28 18:15:39 +02:00
|
|
|
|
|
|
|
|
for (i ← 0 until iterationCount) {
|
|
|
|
|
for (k ← 0 until connectionCount) {
|
2011-12-12 22:50:08 +01:00
|
|
|
val id = Await.result((actor ? "hit").mapTo[Int], timeout.duration)
|
2011-09-28 18:15:39 +02:00
|
|
|
replies = replies + (id -> (replies(id) + 1))
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
counter.get must be(connectionCount)
|
|
|
|
|
|
|
|
|
|
actor ! Broadcast("end")
|
2011-12-19 15:05:33 +01:00
|
|
|
Await.ready(doneLatch, 5 seconds)
|
2011-09-28 18:15:39 +02:00
|
|
|
|
|
|
|
|
replies.values foreach { _ must be > (0) }
|
2011-12-12 23:31:15 +01:00
|
|
|
replies.values.sum must be === iterationCount * connectionCount
|
2011-09-28 18:15:39 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
"deliver a broadcast message using the !" in {
|
2011-12-19 15:05:33 +01:00
|
|
|
val helloLatch = new TestLatch(6)
|
|
|
|
|
val stopLatch = new TestLatch(6)
|
2011-09-28 18:15:39 +02:00
|
|
|
|
2011-11-16 17:18:36 +01:00
|
|
|
val actor = system.actorOf(Props(new Actor {
|
2011-09-28 18:15:39 +02:00
|
|
|
def receive = {
|
|
|
|
|
case "hello" ⇒ helloLatch.countDown()
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
override def postStop() {
|
|
|
|
|
stopLatch.countDown()
|
|
|
|
|
}
|
2011-12-13 11:32:24 +01:00
|
|
|
}).withRouter(RandomRouter(6)), "random-broadcast")
|
2011-09-28 18:15:39 +02:00
|
|
|
|
|
|
|
|
actor ! Broadcast("hello")
|
2011-12-19 15:05:33 +01:00
|
|
|
Await.ready(helloLatch, 5 seconds)
|
2011-09-28 18:15:39 +02:00
|
|
|
|
2011-12-14 00:06:36 +01:00
|
|
|
system.stop(actor)
|
2011-12-19 15:05:33 +01:00
|
|
|
Await.ready(stopLatch, 5 seconds)
|
2011-09-28 18:15:39 +02:00
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|