2011-12-13 01:09:05 +01:00
|
|
|
/**
|
2012-01-19 18:21:06 +01:00
|
|
|
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
|
2011-12-13 01:09:05 +01:00
|
|
|
*/
|
2011-07-28 15:48:03 +03:00
|
|
|
package akka.routing
|
2010-08-24 23:21:28 +02:00
|
|
|
|
|
|
|
|
import java.util.concurrent.atomic.AtomicInteger
|
2011-11-11 19:57:27 +01:00
|
|
|
import akka.actor._
|
2012-01-05 17:59:19 +01:00
|
|
|
import scala.collection.mutable.LinkedList
|
2011-10-07 15:42:55 +02:00
|
|
|
import akka.testkit._
|
2011-12-13 16:05:56 +01:00
|
|
|
import akka.util.duration._
|
2011-12-12 22:50:08 +01:00
|
|
|
import akka.dispatch.Await
|
2011-12-20 19:57:42 +01:00
|
|
|
import akka.util.Duration
|
2011-12-29 17:11:21 +01:00
|
|
|
import akka.config.ConfigurationException
|
|
|
|
|
import com.typesafe.config.ConfigFactory
|
2012-01-18 10:18:51 +01:00
|
|
|
import akka.pattern.ask
|
2012-01-11 13:56:38 +01:00
|
|
|
import java.util.concurrent.ConcurrentHashMap
|
2012-01-12 16:37:08 +01:00
|
|
|
import com.typesafe.config.Config
|
2012-02-10 14:13:40 +01:00
|
|
|
import akka.dispatch.Dispatchers
|
2011-05-23 17:08:45 +02:00
|
|
|
|
|
|
|
|
object RoutingSpec {
|
|
|
|
|
|
2012-01-10 17:50:17 +01:00
|
|
|
val config = """
|
|
|
|
|
akka.actor.deployment {
|
|
|
|
|
/router1 {
|
|
|
|
|
router = round-robin
|
|
|
|
|
nr-of-instances = 3
|
|
|
|
|
}
|
2012-01-12 16:37:08 +01:00
|
|
|
/myrouter {
|
|
|
|
|
router = "akka.routing.RoutingSpec$MyRouter"
|
|
|
|
|
foo = bar
|
|
|
|
|
}
|
2012-01-10 17:50:17 +01:00
|
|
|
}
|
|
|
|
|
"""
|
|
|
|
|
|
2011-12-13 16:05:56 +01:00
|
|
|
class TestActor extends Actor {
|
2012-01-20 12:30:19 +01:00
|
|
|
def receive = { case _ ⇒ }
|
2011-05-23 17:08:45 +02:00
|
|
|
}
|
2011-12-11 22:34:38 +01:00
|
|
|
|
2011-12-13 16:05:56 +01:00
|
|
|
class Echo extends Actor {
|
|
|
|
|
def receive = {
|
|
|
|
|
case _ ⇒ sender ! self
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2012-01-12 16:37:08 +01:00
|
|
|
class MyRouter(config: Config) extends RouterConfig {
|
|
|
|
|
val foo = config.getString("foo")
|
2012-01-17 08:45:07 +01:00
|
|
|
def createRoute(routeeProps: Props, routeeProvider: RouteeProvider): Route = {
|
|
|
|
|
val routees = IndexedSeq(routeeProvider.context.actorOf(Props[Echo]))
|
|
|
|
|
routeeProvider.registerRoutees(routees)
|
2012-01-12 16:37:08 +01:00
|
|
|
|
|
|
|
|
{
|
|
|
|
|
case (sender, message) ⇒ Nil
|
|
|
|
|
}
|
|
|
|
|
}
|
2012-02-10 14:13:40 +01:00
|
|
|
def routerDispatcher: String = Dispatchers.DefaultDispatcherId
|
2012-02-18 22:15:39 +01:00
|
|
|
def supervisorStrategy: SupervisorStrategy = SupervisorStrategy.defaultStrategy
|
2012-01-12 16:37:08 +01:00
|
|
|
}
|
|
|
|
|
|
2011-05-23 17:08:45 +02:00
|
|
|
}
|
2010-08-24 23:21:28 +02:00
|
|
|
|
2011-10-21 17:01:22 +02:00
|
|
|
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
|
2012-01-10 17:50:17 +01:00
|
|
|
class RoutingSpec extends AkkaSpec(RoutingSpec.config) with DefaultTimeout with ImplicitSender {
|
2011-11-16 17:18:36 +01:00
|
|
|
|
2011-07-28 15:48:03 +03:00
|
|
|
import akka.routing.RoutingSpec._
|
2010-08-24 23:21:28 +02:00
|
|
|
|
2011-12-13 16:05:56 +01:00
|
|
|
"routers in general" must {
|
|
|
|
|
|
|
|
|
|
"evict terminated routees" in {
|
|
|
|
|
val router = system.actorOf(Props[Echo].withRouter(RoundRobinRouter(2)))
|
|
|
|
|
router ! ""
|
|
|
|
|
router ! ""
|
|
|
|
|
val c1, c2 = expectMsgType[ActorRef]
|
|
|
|
|
watch(router)
|
|
|
|
|
watch(c2)
|
2011-12-14 00:06:36 +01:00
|
|
|
system.stop(c2)
|
2011-12-13 16:05:56 +01:00
|
|
|
expectMsg(Terminated(c2))
|
|
|
|
|
// it might take a while until the Router has actually processed the Terminated message
|
|
|
|
|
awaitCond {
|
|
|
|
|
router ! ""
|
|
|
|
|
router ! ""
|
|
|
|
|
val res = receiveWhile(100 millis, messages = 2) {
|
|
|
|
|
case x: ActorRef ⇒ x
|
|
|
|
|
}
|
|
|
|
|
res == Seq(c1, c1)
|
|
|
|
|
}
|
2011-12-14 00:06:36 +01:00
|
|
|
system.stop(c1)
|
2011-12-13 16:05:56 +01:00
|
|
|
expectMsg(Terminated(router))
|
|
|
|
|
}
|
|
|
|
|
|
2011-12-19 13:47:49 +01:00
|
|
|
"be able to send their routees" in {
|
2011-12-20 19:57:42 +01:00
|
|
|
val doneLatch = new TestLatch(1)
|
2011-12-19 13:47:49 +01:00
|
|
|
|
|
|
|
|
class TheActor extends Actor {
|
|
|
|
|
val routee1 = context.actorOf(Props[TestActor], "routee1")
|
|
|
|
|
val routee2 = context.actorOf(Props[TestActor], "routee2")
|
|
|
|
|
val routee3 = context.actorOf(Props[TestActor], "routee3")
|
2011-12-20 19:57:42 +01:00
|
|
|
val router = context.actorOf(Props[TestActor].withRouter(
|
|
|
|
|
ScatterGatherFirstCompletedRouter(
|
|
|
|
|
routees = List(routee1, routee2, routee3),
|
|
|
|
|
within = 5 seconds)))
|
2011-12-19 13:47:49 +01:00
|
|
|
|
|
|
|
|
def receive = {
|
|
|
|
|
case RouterRoutees(iterable) ⇒
|
2011-12-19 16:35:35 +01:00
|
|
|
iterable.exists(_.path.name == "routee1") must be(true)
|
|
|
|
|
iterable.exists(_.path.name == "routee2") must be(true)
|
2011-12-19 18:35:48 +01:00
|
|
|
iterable.exists(_.path.name == "routee3") must be(true)
|
2011-12-19 13:47:49 +01:00
|
|
|
doneLatch.countDown()
|
|
|
|
|
case "doIt" ⇒
|
|
|
|
|
router ! CurrentRoutees
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
val theActor = system.actorOf(Props(new TheActor), "theActor")
|
|
|
|
|
theActor ! "doIt"
|
2011-12-20 19:57:42 +01:00
|
|
|
Await.ready(doneLatch, 1 seconds)
|
2011-12-19 13:47:49 +01:00
|
|
|
}
|
|
|
|
|
|
2012-01-10 17:50:17 +01:00
|
|
|
"use configured nr-of-instances when FromConfig" in {
|
|
|
|
|
val router = system.actorOf(Props[TestActor].withRouter(FromConfig), "router1")
|
|
|
|
|
Await.result(router ? CurrentRoutees, 5 seconds).asInstanceOf[RouterRoutees].routees.size must be(3)
|
|
|
|
|
system.stop(router)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
"use configured nr-of-instances when router is specified" in {
|
|
|
|
|
val router = system.actorOf(Props[TestActor].withRouter(RoundRobinRouter(nrOfInstances = 2)), "router1")
|
|
|
|
|
Await.result(router ? CurrentRoutees, 5 seconds).asInstanceOf[RouterRoutees].routees.size must be(3)
|
|
|
|
|
system.stop(router)
|
|
|
|
|
}
|
|
|
|
|
|
2012-02-18 22:15:39 +01:00
|
|
|
"set supplied supervisorStrategy" in {
|
|
|
|
|
//#supervision
|
|
|
|
|
val escalator = OneForOneStrategy() {
|
|
|
|
|
//#custom-strategy
|
|
|
|
|
case e ⇒ testActor ! e; SupervisorStrategy.Escalate
|
|
|
|
|
//#custom-strategy
|
|
|
|
|
}
|
|
|
|
|
val router = system.actorOf(Props.empty.withRouter(
|
2012-02-18 22:32:41 +01:00
|
|
|
RoundRobinRouter(1, supervisorStrategy = escalator)))
|
2012-02-18 22:15:39 +01:00
|
|
|
//#supervision
|
|
|
|
|
router ! CurrentRoutees
|
|
|
|
|
EventFilter[ActorKilledException](occurrences = 2) intercept {
|
|
|
|
|
expectMsgType[RouterRoutees].routees.head ! Kill
|
|
|
|
|
}
|
|
|
|
|
expectMsgType[ActorKilledException]
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
"default to all-for-one-always-escalate strategy" in {
|
|
|
|
|
val restarter = OneForOneStrategy() {
|
|
|
|
|
case e ⇒ testActor ! e; SupervisorStrategy.Restart
|
|
|
|
|
}
|
|
|
|
|
val supervisor = system.actorOf(Props(new Supervisor(restarter)))
|
|
|
|
|
supervisor ! Props(new Actor {
|
|
|
|
|
def receive = {
|
|
|
|
|
case x: String ⇒ throw new Exception(x)
|
|
|
|
|
}
|
|
|
|
|
override def postRestart(reason: Throwable): Unit = testActor ! "restarted"
|
|
|
|
|
}).withRouter(RoundRobinRouter(3))
|
|
|
|
|
val router = expectMsgType[ActorRef]
|
|
|
|
|
EventFilter[Exception]("die", occurrences = 2) intercept {
|
|
|
|
|
router ! "die"
|
|
|
|
|
}
|
|
|
|
|
expectMsgType[Exception].getMessage must be("die")
|
|
|
|
|
expectMsg("restarted")
|
|
|
|
|
expectMsg("restarted")
|
|
|
|
|
expectMsg("restarted")
|
|
|
|
|
}
|
|
|
|
|
|
2011-12-13 16:05:56 +01:00
|
|
|
}
|
|
|
|
|
|
2011-12-11 22:34:38 +01:00
|
|
|
"no router" must {
|
2011-07-28 15:48:03 +03:00
|
|
|
"be started when constructed" in {
|
2011-12-14 14:05:44 +01:00
|
|
|
val routedActor = system.actorOf(Props[TestActor].withRouter(NoRouter))
|
2011-12-11 22:34:38 +01:00
|
|
|
routedActor.isTerminated must be(false)
|
2011-07-28 15:48:03 +03:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
"send message to connection" in {
|
2011-12-19 15:05:33 +01:00
|
|
|
val doneLatch = new TestLatch(1)
|
2011-07-28 15:48:03 +03:00
|
|
|
|
|
|
|
|
val counter = new AtomicInteger(0)
|
2011-12-11 22:34:38 +01:00
|
|
|
|
|
|
|
|
class Actor1 extends Actor {
|
2011-04-19 17:45:01 +12:00
|
|
|
def receive = {
|
2011-07-28 16:56:35 +03:00
|
|
|
case "end" ⇒ doneLatch.countDown()
|
|
|
|
|
case _ ⇒ counter.incrementAndGet
|
2011-04-19 17:45:01 +12:00
|
|
|
}
|
2011-12-11 22:34:38 +01:00
|
|
|
}
|
2010-08-24 23:21:28 +02:00
|
|
|
|
2011-12-13 11:32:24 +01:00
|
|
|
val routedActor = system.actorOf(Props(new Actor1).withRouter(NoRouter))
|
2011-07-28 15:48:03 +03:00
|
|
|
routedActor ! "hello"
|
|
|
|
|
routedActor ! "end"
|
2010-08-24 23:21:28 +02:00
|
|
|
|
2011-12-19 15:05:33 +01:00
|
|
|
Await.ready(doneLatch, 5 seconds)
|
2010-08-24 23:21:28 +02:00
|
|
|
|
2011-07-28 15:48:03 +03:00
|
|
|
counter.get must be(1)
|
2011-04-19 17:45:01 +12:00
|
|
|
}
|
2011-07-28 15:48:03 +03:00
|
|
|
}
|
2010-08-24 23:21:28 +02:00
|
|
|
|
2011-07-28 15:48:03 +03:00
|
|
|
"round robin router" must {
|
|
|
|
|
"be started when constructed" in {
|
2011-12-14 14:05:44 +01:00
|
|
|
val routedActor = system.actorOf(Props[TestActor].withRouter(RoundRobinRouter(nrOfInstances = 1)))
|
2011-12-11 22:34:38 +01:00
|
|
|
routedActor.isTerminated must be(false)
|
2011-07-28 15:48:03 +03:00
|
|
|
}
|
2011-07-13 19:18:04 +02:00
|
|
|
|
2011-07-28 15:48:03 +03:00
|
|
|
//In this test a bunch of actors are created and each actor has its own counter.
|
|
|
|
|
//to test round robin, the routed actor receives the following sequence of messages 1 2 3 .. 1 2 3 .. 1 2 3 which it
|
|
|
|
|
//uses to increment his counter.
|
|
|
|
|
//So after n iteration, the first actor his counter should be 1*n, the second 2*n etc etc.
|
|
|
|
|
"deliver messages in a round robin fashion" in {
|
|
|
|
|
val connectionCount = 10
|
|
|
|
|
val iterationCount = 10
|
2011-12-19 15:05:33 +01:00
|
|
|
val doneLatch = new TestLatch(connectionCount)
|
2011-07-28 15:48:03 +03:00
|
|
|
|
|
|
|
|
//lets create some connections.
|
2011-12-11 22:34:38 +01:00
|
|
|
var actors = new LinkedList[ActorRef]
|
2011-07-28 15:48:03 +03:00
|
|
|
var counters = new LinkedList[AtomicInteger]
|
2011-07-28 16:56:35 +03:00
|
|
|
for (i ← 0 until connectionCount) {
|
2011-07-28 15:48:03 +03:00
|
|
|
counters = counters :+ new AtomicInteger()
|
|
|
|
|
|
2011-12-13 16:59:43 +01:00
|
|
|
val actor = system.actorOf(Props(new Actor {
|
2011-07-28 15:48:03 +03:00
|
|
|
def receive = {
|
2011-07-28 16:56:35 +03:00
|
|
|
case "end" ⇒ doneLatch.countDown()
|
|
|
|
|
case msg: Int ⇒ counters.get(i).get.addAndGet(msg)
|
2011-07-28 15:48:03 +03:00
|
|
|
}
|
2011-12-13 14:09:40 +01:00
|
|
|
}))
|
2011-12-11 22:34:38 +01:00
|
|
|
actors = actors :+ actor
|
2011-07-28 15:48:03 +03:00
|
|
|
}
|
2011-07-13 19:18:04 +02:00
|
|
|
|
2011-12-17 16:33:29 +01:00
|
|
|
val routedActor = system.actorOf(Props[TestActor].withRouter(RoundRobinRouter(routees = actors)))
|
2011-04-19 17:45:01 +12:00
|
|
|
|
2011-07-28 15:48:03 +03:00
|
|
|
//send messages to the actor.
|
2011-07-28 16:56:35 +03:00
|
|
|
for (i ← 0 until iterationCount) {
|
|
|
|
|
for (k ← 0 until connectionCount) {
|
2011-12-11 22:34:38 +01:00
|
|
|
routedActor ! (k + 1)
|
2011-07-28 15:48:03 +03:00
|
|
|
}
|
|
|
|
|
}
|
2011-07-13 19:18:04 +02:00
|
|
|
|
2011-12-11 22:34:38 +01:00
|
|
|
routedActor ! Broadcast("end")
|
2011-07-28 15:48:03 +03:00
|
|
|
//now wait some and do validations.
|
2011-12-19 15:05:33 +01:00
|
|
|
Await.ready(doneLatch, 5 seconds)
|
2011-03-16 12:37:48 +01:00
|
|
|
|
2011-07-28 16:56:35 +03:00
|
|
|
for (i ← 0 until connectionCount) {
|
2011-07-28 15:48:03 +03:00
|
|
|
val counter = counters.get(i).get
|
|
|
|
|
counter.get must be((iterationCount * (i + 1)))
|
2011-06-02 22:54:38 +02:00
|
|
|
}
|
2011-03-16 12:37:48 +01:00
|
|
|
}
|
|
|
|
|
|
2011-07-28 15:48:03 +03:00
|
|
|
"deliver a broadcast message using the !" in {
|
2011-12-19 15:05:33 +01:00
|
|
|
val doneLatch = new TestLatch(2)
|
2011-03-16 12:37:48 +01:00
|
|
|
|
2011-07-28 15:48:03 +03:00
|
|
|
val counter1 = new AtomicInteger
|
2011-12-13 16:59:43 +01:00
|
|
|
val actor1 = system.actorOf(Props(new Actor {
|
2011-07-28 15:48:03 +03:00
|
|
|
def receive = {
|
2011-07-28 16:56:35 +03:00
|
|
|
case "end" ⇒ doneLatch.countDown()
|
|
|
|
|
case msg: Int ⇒ counter1.addAndGet(msg)
|
2011-04-19 17:45:01 +12:00
|
|
|
}
|
2011-12-13 14:09:40 +01:00
|
|
|
}))
|
2011-03-16 12:37:48 +01:00
|
|
|
|
2011-07-28 15:48:03 +03:00
|
|
|
val counter2 = new AtomicInteger
|
2011-12-13 16:59:43 +01:00
|
|
|
val actor2 = system.actorOf(Props(new Actor {
|
2011-04-19 17:45:01 +12:00
|
|
|
def receive = {
|
2011-07-28 16:56:35 +03:00
|
|
|
case "end" ⇒ doneLatch.countDown()
|
|
|
|
|
case msg: Int ⇒ counter2.addAndGet(msg)
|
2011-04-19 17:45:01 +12:00
|
|
|
}
|
2011-12-13 14:09:40 +01:00
|
|
|
}))
|
2011-03-16 13:20:00 +01:00
|
|
|
|
2011-12-17 16:33:29 +01:00
|
|
|
val routedActor = system.actorOf(Props[TestActor].withRouter(RoundRobinRouter(routees = List(actor1, actor2))))
|
2011-04-19 17:45:01 +12:00
|
|
|
|
2011-12-11 22:34:38 +01:00
|
|
|
routedActor ! Broadcast(1)
|
|
|
|
|
routedActor ! Broadcast("end")
|
2011-04-19 17:45:01 +12:00
|
|
|
|
2011-12-19 15:05:33 +01:00
|
|
|
Await.ready(doneLatch, 5 seconds)
|
2011-04-19 17:45:01 +12:00
|
|
|
|
2011-07-28 15:48:03 +03:00
|
|
|
counter1.get must be(1)
|
|
|
|
|
counter2.get must be(1)
|
2011-03-16 13:20:00 +01:00
|
|
|
}
|
2011-07-28 15:48:03 +03:00
|
|
|
}
|
2011-03-16 13:20:00 +01:00
|
|
|
|
2011-07-28 15:48:03 +03:00
|
|
|
"random router" must {
|
2011-04-19 17:45:01 +12:00
|
|
|
|
2011-07-28 15:48:03 +03:00
|
|
|
"be started when constructed" in {
|
2011-12-14 14:05:44 +01:00
|
|
|
val routedActor = system.actorOf(Props[TestActor].withRouter(RandomRouter(nrOfInstances = 1)))
|
2011-12-11 22:34:38 +01:00
|
|
|
routedActor.isTerminated must be(false)
|
2011-07-28 15:48:03 +03:00
|
|
|
}
|
2011-04-19 17:45:01 +12:00
|
|
|
|
2011-07-28 15:48:03 +03:00
|
|
|
"deliver a broadcast message" in {
|
2011-12-19 15:05:33 +01:00
|
|
|
val doneLatch = new TestLatch(2)
|
2011-03-16 13:20:00 +01:00
|
|
|
|
2011-07-28 15:48:03 +03:00
|
|
|
val counter1 = new AtomicInteger
|
2011-12-13 16:59:43 +01:00
|
|
|
val actor1 = system.actorOf(Props(new Actor {
|
2011-07-28 15:48:03 +03:00
|
|
|
def receive = {
|
2011-07-28 16:56:35 +03:00
|
|
|
case "end" ⇒ doneLatch.countDown()
|
|
|
|
|
case msg: Int ⇒ counter1.addAndGet(msg)
|
2011-02-28 22:55:02 +01:00
|
|
|
}
|
2011-12-13 14:09:40 +01:00
|
|
|
}))
|
2011-03-16 13:20:00 +01:00
|
|
|
|
2011-07-28 15:48:03 +03:00
|
|
|
val counter2 = new AtomicInteger
|
2011-12-13 16:59:43 +01:00
|
|
|
val actor2 = system.actorOf(Props(new Actor {
|
2011-07-28 15:48:03 +03:00
|
|
|
def receive = {
|
2011-07-28 16:56:35 +03:00
|
|
|
case "end" ⇒ doneLatch.countDown()
|
|
|
|
|
case msg: Int ⇒ counter2.addAndGet(msg)
|
2011-07-28 15:48:03 +03:00
|
|
|
}
|
2011-12-13 14:09:40 +01:00
|
|
|
}))
|
2011-04-19 17:45:01 +12:00
|
|
|
|
2011-12-17 16:33:29 +01:00
|
|
|
val routedActor = system.actorOf(Props[TestActor].withRouter(RandomRouter(routees = List(actor1, actor2))))
|
2011-04-19 17:45:01 +12:00
|
|
|
|
2011-12-11 22:34:38 +01:00
|
|
|
routedActor ! Broadcast(1)
|
|
|
|
|
routedActor ! Broadcast("end")
|
2011-04-19 17:45:01 +12:00
|
|
|
|
2011-12-19 15:05:33 +01:00
|
|
|
Await.ready(doneLatch, 5 seconds)
|
2011-04-19 17:45:01 +12:00
|
|
|
|
2011-07-28 15:48:03 +03:00
|
|
|
counter1.get must be(1)
|
|
|
|
|
counter2.get must be(1)
|
2011-03-16 13:20:00 +01:00
|
|
|
}
|
2011-12-11 22:34:38 +01:00
|
|
|
}
|
|
|
|
|
|
2012-01-11 13:56:38 +01:00
|
|
|
"smallest mailbox router" must {
|
|
|
|
|
"be started when constructed" in {
|
|
|
|
|
val routedActor = system.actorOf(Props[TestActor].withRouter(SmallestMailboxRouter(nrOfInstances = 1)))
|
|
|
|
|
routedActor.isTerminated must be(false)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
"deliver messages to idle actor" in {
|
|
|
|
|
val usedActors = new ConcurrentHashMap[Int, String]()
|
|
|
|
|
val router = system.actorOf(Props(new Actor {
|
|
|
|
|
def receive = {
|
2012-01-12 09:53:53 +01:00
|
|
|
case (busy: TestLatch, receivedLatch: TestLatch) ⇒
|
2012-01-11 13:56:38 +01:00
|
|
|
usedActors.put(0, self.path.toString)
|
2012-01-12 09:53:53 +01:00
|
|
|
self ! "another in busy mailbox"
|
|
|
|
|
receivedLatch.countDown()
|
2012-01-11 13:56:38 +01:00
|
|
|
Await.ready(busy, TestLatch.DefaultTimeout)
|
|
|
|
|
case (msg: Int, receivedLatch: TestLatch) ⇒
|
|
|
|
|
usedActors.put(msg, self.path.toString)
|
|
|
|
|
receivedLatch.countDown()
|
2012-01-12 09:53:53 +01:00
|
|
|
case s: String ⇒
|
2012-01-11 13:56:38 +01:00
|
|
|
}
|
|
|
|
|
}).withRouter(SmallestMailboxRouter(3)))
|
|
|
|
|
|
|
|
|
|
val busy = TestLatch(1)
|
2012-01-12 09:53:53 +01:00
|
|
|
val received0 = TestLatch(1)
|
|
|
|
|
router ! (busy, received0)
|
|
|
|
|
Await.ready(received0, TestLatch.DefaultTimeout)
|
2012-01-11 13:56:38 +01:00
|
|
|
|
|
|
|
|
val received1 = TestLatch(1)
|
2012-01-12 09:53:53 +01:00
|
|
|
router ! (1, received1)
|
2012-01-11 13:56:38 +01:00
|
|
|
Await.ready(received1, TestLatch.DefaultTimeout)
|
|
|
|
|
|
|
|
|
|
val received2 = TestLatch(1)
|
2012-01-12 09:53:53 +01:00
|
|
|
router ! (2, received2)
|
2012-01-11 13:56:38 +01:00
|
|
|
Await.ready(received2, TestLatch.DefaultTimeout)
|
|
|
|
|
|
|
|
|
|
val received3 = TestLatch(1)
|
2012-01-12 09:53:53 +01:00
|
|
|
router ! (3, received3)
|
2012-01-11 13:56:38 +01:00
|
|
|
Await.ready(received3, TestLatch.DefaultTimeout)
|
|
|
|
|
|
|
|
|
|
busy.countDown()
|
|
|
|
|
|
|
|
|
|
val busyPath = usedActors.get(0)
|
|
|
|
|
busyPath must not be (null)
|
|
|
|
|
|
|
|
|
|
val path1 = usedActors.get(1)
|
|
|
|
|
val path2 = usedActors.get(2)
|
|
|
|
|
val path3 = usedActors.get(3)
|
|
|
|
|
|
|
|
|
|
path1 must not be (busyPath)
|
|
|
|
|
path2 must not be (busyPath)
|
|
|
|
|
path3 must not be (busyPath)
|
|
|
|
|
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2011-12-11 22:34:38 +01:00
|
|
|
"broadcast router" must {
|
|
|
|
|
"be started when constructed" in {
|
2011-12-14 14:05:44 +01:00
|
|
|
val routedActor = system.actorOf(Props[TestActor].withRouter(BroadcastRouter(nrOfInstances = 1)))
|
2011-12-11 22:34:38 +01:00
|
|
|
routedActor.isTerminated must be(false)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
"broadcast message using !" in {
|
2011-12-19 15:05:33 +01:00
|
|
|
val doneLatch = new TestLatch(2)
|
2011-04-19 17:45:01 +12:00
|
|
|
|
2011-07-28 15:48:03 +03:00
|
|
|
val counter1 = new AtomicInteger
|
2011-12-13 16:59:43 +01:00
|
|
|
val actor1 = system.actorOf(Props(new Actor {
|
2011-07-28 15:48:03 +03:00
|
|
|
def receive = {
|
2011-12-11 22:34:38 +01:00
|
|
|
case "end" ⇒ doneLatch.countDown()
|
|
|
|
|
case msg: Int ⇒ counter1.addAndGet(msg)
|
2011-07-28 15:48:03 +03:00
|
|
|
}
|
2011-12-13 14:09:40 +01:00
|
|
|
}))
|
2011-04-19 17:45:01 +12:00
|
|
|
|
2011-12-11 22:34:38 +01:00
|
|
|
val counter2 = new AtomicInteger
|
2011-12-13 16:59:43 +01:00
|
|
|
val actor2 = system.actorOf(Props(new Actor {
|
2011-12-11 22:34:38 +01:00
|
|
|
def receive = {
|
|
|
|
|
case "end" ⇒ doneLatch.countDown()
|
|
|
|
|
case msg: Int ⇒ counter2.addAndGet(msg)
|
|
|
|
|
}
|
2011-12-13 14:09:40 +01:00
|
|
|
}))
|
2011-04-19 17:45:01 +12:00
|
|
|
|
2011-12-17 16:33:29 +01:00
|
|
|
val routedActor = system.actorOf(Props[TestActor].withRouter(BroadcastRouter(routees = List(actor1, actor2))))
|
2011-12-11 22:34:38 +01:00
|
|
|
routedActor ! 1
|
|
|
|
|
routedActor ! "end"
|
2011-03-16 13:20:00 +01:00
|
|
|
|
2011-12-19 15:05:33 +01:00
|
|
|
Await.ready(doneLatch, 5 seconds)
|
2011-12-11 22:34:38 +01:00
|
|
|
|
|
|
|
|
counter1.get must be(1)
|
|
|
|
|
counter2.get must be(1)
|
2011-03-16 13:20:00 +01:00
|
|
|
}
|
2011-10-07 15:42:55 +02:00
|
|
|
|
2011-12-11 22:34:38 +01:00
|
|
|
"broadcast message using ?" in {
|
2011-12-19 15:05:33 +01:00
|
|
|
val doneLatch = new TestLatch(2)
|
2011-12-11 22:34:38 +01:00
|
|
|
|
|
|
|
|
val counter1 = new AtomicInteger
|
2011-12-13 16:59:43 +01:00
|
|
|
val actor1 = system.actorOf(Props(new Actor {
|
2011-12-11 22:34:38 +01:00
|
|
|
def receive = {
|
|
|
|
|
case "end" ⇒ doneLatch.countDown()
|
|
|
|
|
case msg: Int ⇒
|
|
|
|
|
counter1.addAndGet(msg)
|
|
|
|
|
sender ! "ack"
|
|
|
|
|
}
|
2011-12-13 14:09:40 +01:00
|
|
|
}))
|
2011-12-11 22:34:38 +01:00
|
|
|
|
|
|
|
|
val counter2 = new AtomicInteger
|
2011-12-13 16:59:43 +01:00
|
|
|
val actor2 = system.actorOf(Props(new Actor {
|
2011-12-11 22:34:38 +01:00
|
|
|
def receive = {
|
|
|
|
|
case "end" ⇒ doneLatch.countDown()
|
|
|
|
|
case msg: Int ⇒ counter2.addAndGet(msg)
|
|
|
|
|
}
|
2011-12-13 14:09:40 +01:00
|
|
|
}))
|
2011-10-07 15:42:55 +02:00
|
|
|
|
2011-12-17 16:33:29 +01:00
|
|
|
val routedActor = system.actorOf(Props[TestActor].withRouter(BroadcastRouter(routees = List(actor1, actor2))))
|
2011-12-11 22:34:38 +01:00
|
|
|
routedActor ? 1
|
|
|
|
|
routedActor ! "end"
|
2011-10-07 15:42:55 +02:00
|
|
|
|
2011-12-19 15:05:33 +01:00
|
|
|
Await.ready(doneLatch, 5 seconds)
|
2011-10-07 15:42:55 +02:00
|
|
|
|
2011-12-11 22:34:38 +01:00
|
|
|
counter1.get must be(1)
|
|
|
|
|
counter2.get must be(1)
|
|
|
|
|
}
|
|
|
|
|
}
|
2011-10-07 15:42:55 +02:00
|
|
|
|
2011-12-12 15:06:40 +01:00
|
|
|
"Scatter-gather router" must {
|
2011-10-07 15:42:55 +02:00
|
|
|
|
|
|
|
|
"be started when constructed" in {
|
2011-12-20 19:57:42 +01:00
|
|
|
val routedActor = system.actorOf(Props[TestActor].withRouter(
|
|
|
|
|
ScatterGatherFirstCompletedRouter(routees = List(newActor(0)), within = 1 seconds)))
|
2011-12-12 15:06:40 +01:00
|
|
|
routedActor.isTerminated must be(false)
|
2011-10-07 15:42:55 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
"deliver a broadcast message using the !" in {
|
|
|
|
|
val doneLatch = new TestLatch(2)
|
|
|
|
|
|
|
|
|
|
val counter1 = new AtomicInteger
|
2011-12-13 16:59:43 +01:00
|
|
|
val actor1 = system.actorOf(Props(new Actor {
|
2011-10-07 15:42:55 +02:00
|
|
|
def receive = {
|
2011-12-12 15:06:40 +01:00
|
|
|
case "end" ⇒ doneLatch.countDown()
|
2011-10-07 15:42:55 +02:00
|
|
|
case msg: Int ⇒ counter1.addAndGet(msg)
|
|
|
|
|
}
|
2011-12-13 14:09:40 +01:00
|
|
|
}))
|
2011-10-07 15:42:55 +02:00
|
|
|
|
|
|
|
|
val counter2 = new AtomicInteger
|
2011-12-13 16:59:43 +01:00
|
|
|
val actor2 = system.actorOf(Props(new Actor {
|
2011-10-07 15:42:55 +02:00
|
|
|
def receive = {
|
2011-12-12 15:06:40 +01:00
|
|
|
case "end" ⇒ doneLatch.countDown()
|
2011-10-07 15:42:55 +02:00
|
|
|
case msg: Int ⇒ counter2.addAndGet(msg)
|
|
|
|
|
}
|
2011-12-13 14:09:40 +01:00
|
|
|
}))
|
2011-10-07 15:42:55 +02:00
|
|
|
|
2011-12-20 19:57:42 +01:00
|
|
|
val routedActor = system.actorOf(Props[TestActor].withRouter(
|
|
|
|
|
ScatterGatherFirstCompletedRouter(routees = List(actor1, actor2), within = 1 seconds)))
|
2011-12-12 15:06:40 +01:00
|
|
|
routedActor ! Broadcast(1)
|
|
|
|
|
routedActor ! Broadcast("end")
|
2011-10-07 15:42:55 +02:00
|
|
|
|
2011-12-19 15:05:33 +01:00
|
|
|
Await.ready(doneLatch, TestLatch.DefaultTimeout)
|
2011-10-07 15:42:55 +02:00
|
|
|
|
|
|
|
|
counter1.get must be(1)
|
|
|
|
|
counter2.get must be(1)
|
|
|
|
|
}
|
|
|
|
|
|
2011-12-12 15:06:40 +01:00
|
|
|
"return response, even if one of the actors has stopped" in {
|
|
|
|
|
val shutdownLatch = new TestLatch(1)
|
|
|
|
|
val actor1 = newActor(1, Some(shutdownLatch))
|
2011-12-20 19:57:42 +01:00
|
|
|
val actor2 = newActor(14, Some(shutdownLatch))
|
|
|
|
|
val routedActor = system.actorOf(Props[TestActor].withRouter(
|
|
|
|
|
ScatterGatherFirstCompletedRouter(routees = List(actor1, actor2), within = 3 seconds)))
|
2011-12-12 15:06:40 +01:00
|
|
|
|
|
|
|
|
routedActor ! Broadcast(Stop(Some(1)))
|
2011-12-19 15:05:33 +01:00
|
|
|
Await.ready(shutdownLatch, TestLatch.DefaultTimeout)
|
2011-12-20 19:57:42 +01:00
|
|
|
Await.result(routedActor ? Broadcast(0), timeout.duration) must be(14)
|
2011-12-12 15:06:40 +01:00
|
|
|
}
|
2011-10-07 15:42:55 +02:00
|
|
|
|
2011-12-12 15:06:40 +01:00
|
|
|
case class Stop(id: Option[Int] = None)
|
2011-10-07 15:42:55 +02:00
|
|
|
|
2011-12-12 15:06:40 +01:00
|
|
|
def newActor(id: Int, shudownLatch: Option[TestLatch] = None) = system.actorOf(Props(new Actor {
|
|
|
|
|
def receive = {
|
2011-12-14 00:06:36 +01:00
|
|
|
case Stop(None) ⇒ context.stop(self)
|
|
|
|
|
case Stop(Some(_id)) if (_id == id) ⇒ context.stop(self)
|
2011-12-12 15:06:40 +01:00
|
|
|
case _id: Int if (_id == id) ⇒
|
|
|
|
|
case x ⇒ {
|
|
|
|
|
Thread sleep 100 * id
|
|
|
|
|
sender.tell(id)
|
|
|
|
|
}
|
2011-10-07 15:42:55 +02:00
|
|
|
}
|
2011-11-24 16:35:37 +01:00
|
|
|
|
2011-12-12 15:06:40 +01:00
|
|
|
override def postStop = {
|
|
|
|
|
shudownLatch foreach (_.countDown())
|
|
|
|
|
}
|
|
|
|
|
}), "Actor:" + id)
|
|
|
|
|
}
|
2011-12-15 15:28:21 +01:00
|
|
|
|
2011-12-29 17:11:21 +01:00
|
|
|
"router FromConfig" must {
|
|
|
|
|
"throw suitable exception when not configured" in {
|
|
|
|
|
intercept[ConfigurationException] {
|
|
|
|
|
system.actorOf(Props.empty.withRouter(FromConfig))
|
|
|
|
|
}.getMessage.contains("application.conf") must be(true)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
"allow external configuration" in {
|
|
|
|
|
val sys = ActorSystem("FromConfig", ConfigFactory
|
|
|
|
|
.parseString("akka.actor.deployment./routed.router=round-robin")
|
|
|
|
|
.withFallback(system.settings.config))
|
|
|
|
|
try {
|
|
|
|
|
sys.actorOf(Props.empty.withRouter(FromConfig), "routed")
|
|
|
|
|
} finally {
|
|
|
|
|
sys.shutdown()
|
|
|
|
|
}
|
|
|
|
|
}
|
2012-01-12 16:37:08 +01:00
|
|
|
"support custom router" in {
|
|
|
|
|
val myrouter = system.actorOf(Props().withRouter(FromConfig), "myrouter")
|
|
|
|
|
myrouter.isTerminated must be(false)
|
|
|
|
|
}
|
2011-12-29 17:11:21 +01:00
|
|
|
}
|
|
|
|
|
|
2011-12-15 15:28:21 +01:00
|
|
|
"custom router" must {
|
|
|
|
|
"be started when constructed" in {
|
2012-01-11 11:30:32 +01:00
|
|
|
val routedActor = system.actorOf(Props[TestActor].withRouter(VoteCountRouter()))
|
2011-12-15 15:28:21 +01:00
|
|
|
routedActor.isTerminated must be(false)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
"count votes as intended - not as in Florida" in {
|
2012-01-11 11:30:32 +01:00
|
|
|
val routedActor = system.actorOf(Props().withRouter(VoteCountRouter()))
|
2011-12-15 15:28:21 +01:00
|
|
|
routedActor ! DemocratVote
|
|
|
|
|
routedActor ! DemocratVote
|
|
|
|
|
routedActor ! RepublicanVote
|
|
|
|
|
routedActor ! DemocratVote
|
|
|
|
|
routedActor ! RepublicanVote
|
|
|
|
|
val democratsResult = (routedActor ? DemocratCountResult)
|
|
|
|
|
val republicansResult = (routedActor ? RepublicanCountResult)
|
|
|
|
|
|
2011-12-15 20:42:51 +01:00
|
|
|
Await.result(democratsResult, 1 seconds) === 3
|
|
|
|
|
Await.result(republicansResult, 1 seconds) === 2
|
2011-12-15 15:28:21 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// DO NOT CHANGE THE COMMENTS BELOW AS THEY ARE USED IN THE DOCUMENTATION
|
|
|
|
|
|
|
|
|
|
//#CustomRouter
|
|
|
|
|
//#crMessages
|
|
|
|
|
case object DemocratVote
|
|
|
|
|
case object DemocratCountResult
|
|
|
|
|
case object RepublicanVote
|
|
|
|
|
case object RepublicanCountResult
|
|
|
|
|
//#crMessages
|
|
|
|
|
|
|
|
|
|
//#crActors
|
|
|
|
|
class DemocratActor extends Actor {
|
2011-12-15 18:19:40 +01:00
|
|
|
var counter = 0
|
2011-12-15 15:28:21 +01:00
|
|
|
|
|
|
|
|
def receive = {
|
2011-12-15 18:19:40 +01:00
|
|
|
case DemocratVote ⇒ counter += 1
|
|
|
|
|
case DemocratCountResult ⇒ sender ! counter
|
2011-12-15 15:28:21 +01:00
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
class RepublicanActor extends Actor {
|
2011-12-15 18:19:40 +01:00
|
|
|
var counter = 0
|
2011-12-15 15:28:21 +01:00
|
|
|
|
|
|
|
|
def receive = {
|
2011-12-15 18:19:40 +01:00
|
|
|
case RepublicanVote ⇒ counter += 1
|
|
|
|
|
case RepublicanCountResult ⇒ sender ! counter
|
2011-12-15 15:28:21 +01:00
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
//#crActors
|
|
|
|
|
|
|
|
|
|
//#crRouter
|
2012-01-11 11:30:32 +01:00
|
|
|
case class VoteCountRouter() extends RouterConfig {
|
2011-12-15 15:28:21 +01:00
|
|
|
|
2012-02-10 14:13:40 +01:00
|
|
|
def routerDispatcher: String = Dispatchers.DefaultDispatcherId
|
2012-02-18 22:15:39 +01:00
|
|
|
def supervisorStrategy: SupervisorStrategy = SupervisorStrategy.defaultStrategy
|
2012-02-10 14:13:40 +01:00
|
|
|
|
2011-12-15 15:28:21 +01:00
|
|
|
//#crRoute
|
2012-01-17 08:45:07 +01:00
|
|
|
def createRoute(routeeProps: Props, routeeProvider: RouteeProvider): Route = {
|
|
|
|
|
val democratActor = routeeProvider.context.actorOf(Props(new DemocratActor()), "d")
|
|
|
|
|
val republicanActor = routeeProvider.context.actorOf(Props(new RepublicanActor()), "r")
|
2011-12-15 15:28:21 +01:00
|
|
|
val routees = Vector[ActorRef](democratActor, republicanActor)
|
|
|
|
|
|
|
|
|
|
//#crRegisterRoutees
|
2012-01-17 08:45:07 +01:00
|
|
|
routeeProvider.registerRoutees(routees)
|
2011-12-15 15:28:21 +01:00
|
|
|
//#crRegisterRoutees
|
|
|
|
|
|
|
|
|
|
//#crRoutingLogic
|
|
|
|
|
{
|
|
|
|
|
case (sender, message) ⇒
|
|
|
|
|
message match {
|
|
|
|
|
case DemocratVote | DemocratCountResult ⇒
|
|
|
|
|
List(Destination(sender, democratActor))
|
|
|
|
|
case RepublicanVote | RepublicanCountResult ⇒
|
|
|
|
|
List(Destination(sender, republicanActor))
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
//#crRoutingLogic
|
|
|
|
|
}
|
|
|
|
|
//#crRoute
|
|
|
|
|
|
|
|
|
|
}
|
|
|
|
|
//#crRouter
|
|
|
|
|
//#CustomRouter
|
|
|
|
|
}
|
2010-08-24 23:21:28 +02:00
|
|
|
}
|