2011-12-13 01:09:05 +01:00
|
|
|
/**
|
|
|
|
|
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
|
|
|
|
|
*/
|
2011-07-28 15:48:03 +03:00
|
|
|
package akka.routing
|
2010-08-24 23:21:28 +02:00
|
|
|
|
|
|
|
|
import java.util.concurrent.atomic.AtomicInteger
|
2011-11-11 19:57:27 +01:00
|
|
|
import akka.actor._
|
2011-07-28 15:48:03 +03:00
|
|
|
import collection.mutable.LinkedList
|
2011-07-28 16:56:35 +03:00
|
|
|
import java.util.concurrent.{ CountDownLatch, TimeUnit }
|
2011-10-07 15:42:55 +02:00
|
|
|
import akka.testkit._
|
2011-12-13 16:05:56 +01:00
|
|
|
import akka.util.duration._
|
2011-12-12 22:50:08 +01:00
|
|
|
import akka.dispatch.Await
|
2011-05-23 17:08:45 +02:00
|
|
|
|
|
|
|
|
object RoutingSpec {
|
|
|
|
|
|
2011-12-13 16:05:56 +01:00
|
|
|
class TestActor extends Actor {
|
2011-07-28 15:48:03 +03:00
|
|
|
def receive = {
|
2011-07-28 16:56:35 +03:00
|
|
|
case _ ⇒
|
2011-07-28 15:48:03 +03:00
|
|
|
println("Hello")
|
2011-05-23 17:08:45 +02:00
|
|
|
}
|
|
|
|
|
}
|
2011-12-11 22:34:38 +01:00
|
|
|
|
2011-12-13 16:05:56 +01:00
|
|
|
class Echo extends Actor {
|
|
|
|
|
def receive = {
|
|
|
|
|
case _ ⇒ sender ! self
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2011-05-23 17:08:45 +02:00
|
|
|
}
|
2010-08-24 23:21:28 +02:00
|
|
|
|
2011-10-21 17:01:22 +02:00
|
|
|
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
|
2011-12-13 16:05:56 +01:00
|
|
|
class RoutingSpec extends AkkaSpec with DefaultTimeout with ImplicitSender {
|
2010-08-24 23:21:28 +02:00
|
|
|
|
2011-11-16 17:18:36 +01:00
|
|
|
val impl = system.asInstanceOf[ActorSystemImpl]
|
|
|
|
|
|
2011-07-28 15:48:03 +03:00
|
|
|
import akka.routing.RoutingSpec._
|
2010-08-24 23:21:28 +02:00
|
|
|
|
2011-12-13 16:05:56 +01:00
|
|
|
"routers in general" must {
|
|
|
|
|
|
|
|
|
|
"evict terminated routees" in {
|
|
|
|
|
val router = system.actorOf(Props[Echo].withRouter(RoundRobinRouter(2)))
|
|
|
|
|
router ! ""
|
|
|
|
|
router ! ""
|
|
|
|
|
val c1, c2 = expectMsgType[ActorRef]
|
|
|
|
|
watch(router)
|
|
|
|
|
watch(c2)
|
2011-12-14 00:06:36 +01:00
|
|
|
system.stop(c2)
|
2011-12-13 16:05:56 +01:00
|
|
|
expectMsg(Terminated(c2))
|
|
|
|
|
// it might take a while until the Router has actually processed the Terminated message
|
|
|
|
|
awaitCond {
|
|
|
|
|
router ! ""
|
|
|
|
|
router ! ""
|
|
|
|
|
val res = receiveWhile(100 millis, messages = 2) {
|
|
|
|
|
case x: ActorRef ⇒ x
|
|
|
|
|
}
|
|
|
|
|
res == Seq(c1, c1)
|
|
|
|
|
}
|
2011-12-14 00:06:36 +01:00
|
|
|
system.stop(c1)
|
2011-12-13 16:05:56 +01:00
|
|
|
expectMsg(Terminated(router))
|
|
|
|
|
}
|
|
|
|
|
|
2011-12-19 13:47:49 +01:00
|
|
|
"be able to send their routees" in {
|
|
|
|
|
val doneLatch = new CountDownLatch(1)
|
|
|
|
|
|
|
|
|
|
class TheActor extends Actor {
|
|
|
|
|
val routee1 = context.actorOf(Props[TestActor], "routee1")
|
|
|
|
|
val routee2 = context.actorOf(Props[TestActor], "routee2")
|
|
|
|
|
val routee3 = context.actorOf(Props[TestActor], "routee3")
|
|
|
|
|
val router = context.actorOf(Props[TestActor].withRouter(ScatterGatherFirstCompletedRouter(routees = List(routee1, routee2, routee3))))
|
|
|
|
|
|
|
|
|
|
// Stop one of the routees - which should exclude it from the router's list of active routees
|
|
|
|
|
routee3 ! PoisonPill
|
|
|
|
|
|
|
|
|
|
def receive = {
|
|
|
|
|
case RouterRoutees(iterable) ⇒
|
|
|
|
|
iterable.exists(_ == "routee1") must be(true)
|
|
|
|
|
iterable.exists(_ == "routee2") must be(true)
|
|
|
|
|
iterable.exists(_ == "routee3") must be(false)
|
|
|
|
|
doneLatch.countDown()
|
|
|
|
|
case "doIt" ⇒
|
|
|
|
|
router ! CurrentRoutees
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
val theActor = system.actorOf(Props(new TheActor), "theActor")
|
|
|
|
|
theActor ! "doIt"
|
|
|
|
|
doneLatch.await(1, TimeUnit.SECONDS) must be(true)
|
|
|
|
|
}
|
|
|
|
|
|
2011-12-13 16:05:56 +01:00
|
|
|
}
|
|
|
|
|
|
2011-12-11 22:34:38 +01:00
|
|
|
"no router" must {
|
2011-07-28 15:48:03 +03:00
|
|
|
"be started when constructed" in {
|
2011-12-14 14:05:44 +01:00
|
|
|
val routedActor = system.actorOf(Props[TestActor].withRouter(NoRouter))
|
2011-12-11 22:34:38 +01:00
|
|
|
routedActor.isTerminated must be(false)
|
2011-07-28 15:48:03 +03:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
"send message to connection" in {
|
|
|
|
|
val doneLatch = new CountDownLatch(1)
|
|
|
|
|
|
|
|
|
|
val counter = new AtomicInteger(0)
|
2011-12-11 22:34:38 +01:00
|
|
|
|
|
|
|
|
class Actor1 extends Actor {
|
2011-04-19 17:45:01 +12:00
|
|
|
def receive = {
|
2011-07-28 16:56:35 +03:00
|
|
|
case "end" ⇒ doneLatch.countDown()
|
|
|
|
|
case _ ⇒ counter.incrementAndGet
|
2011-04-19 17:45:01 +12:00
|
|
|
}
|
2011-12-11 22:34:38 +01:00
|
|
|
}
|
2010-08-24 23:21:28 +02:00
|
|
|
|
2011-12-13 11:32:24 +01:00
|
|
|
val routedActor = system.actorOf(Props(new Actor1).withRouter(NoRouter))
|
2011-07-28 15:48:03 +03:00
|
|
|
routedActor ! "hello"
|
|
|
|
|
routedActor ! "end"
|
2010-08-24 23:21:28 +02:00
|
|
|
|
2011-07-28 15:48:03 +03:00
|
|
|
doneLatch.await(5, TimeUnit.SECONDS) must be(true)
|
2010-08-24 23:21:28 +02:00
|
|
|
|
2011-07-28 15:48:03 +03:00
|
|
|
counter.get must be(1)
|
2011-04-19 17:45:01 +12:00
|
|
|
}
|
2011-07-28 15:48:03 +03:00
|
|
|
}
|
2010-08-24 23:21:28 +02:00
|
|
|
|
2011-07-28 15:48:03 +03:00
|
|
|
"round robin router" must {
|
|
|
|
|
"be started when constructed" in {
|
2011-12-14 14:05:44 +01:00
|
|
|
val routedActor = system.actorOf(Props[TestActor].withRouter(RoundRobinRouter(nrOfInstances = 1)))
|
2011-12-11 22:34:38 +01:00
|
|
|
routedActor.isTerminated must be(false)
|
2011-07-28 15:48:03 +03:00
|
|
|
}
|
2011-07-13 19:18:04 +02:00
|
|
|
|
2011-07-28 15:48:03 +03:00
|
|
|
//In this test a bunch of actors are created and each actor has its own counter.
|
|
|
|
|
//to test round robin, the routed actor receives the following sequence of messages 1 2 3 .. 1 2 3 .. 1 2 3 which it
|
|
|
|
|
//uses to increment his counter.
|
|
|
|
|
//So after n iteration, the first actor his counter should be 1*n, the second 2*n etc etc.
|
|
|
|
|
"deliver messages in a round robin fashion" in {
|
|
|
|
|
val connectionCount = 10
|
|
|
|
|
val iterationCount = 10
|
|
|
|
|
val doneLatch = new CountDownLatch(connectionCount)
|
|
|
|
|
|
|
|
|
|
//lets create some connections.
|
2011-12-11 22:34:38 +01:00
|
|
|
var actors = new LinkedList[ActorRef]
|
2011-07-28 15:48:03 +03:00
|
|
|
var counters = new LinkedList[AtomicInteger]
|
2011-07-28 16:56:35 +03:00
|
|
|
for (i ← 0 until connectionCount) {
|
2011-07-28 15:48:03 +03:00
|
|
|
counters = counters :+ new AtomicInteger()
|
|
|
|
|
|
2011-12-13 16:59:43 +01:00
|
|
|
val actor = system.actorOf(Props(new Actor {
|
2011-07-28 15:48:03 +03:00
|
|
|
def receive = {
|
2011-07-28 16:56:35 +03:00
|
|
|
case "end" ⇒ doneLatch.countDown()
|
|
|
|
|
case msg: Int ⇒ counters.get(i).get.addAndGet(msg)
|
2011-07-28 15:48:03 +03:00
|
|
|
}
|
2011-12-13 14:09:40 +01:00
|
|
|
}))
|
2011-12-11 22:34:38 +01:00
|
|
|
actors = actors :+ actor
|
2011-07-28 15:48:03 +03:00
|
|
|
}
|
2011-07-13 19:18:04 +02:00
|
|
|
|
2011-12-17 16:33:29 +01:00
|
|
|
val routedActor = system.actorOf(Props[TestActor].withRouter(RoundRobinRouter(routees = actors)))
|
2011-04-19 17:45:01 +12:00
|
|
|
|
2011-07-28 15:48:03 +03:00
|
|
|
//send messages to the actor.
|
2011-07-28 16:56:35 +03:00
|
|
|
for (i ← 0 until iterationCount) {
|
|
|
|
|
for (k ← 0 until connectionCount) {
|
2011-12-11 22:34:38 +01:00
|
|
|
routedActor ! (k + 1)
|
2011-07-28 15:48:03 +03:00
|
|
|
}
|
|
|
|
|
}
|
2011-07-13 19:18:04 +02:00
|
|
|
|
2011-12-11 22:34:38 +01:00
|
|
|
routedActor ! Broadcast("end")
|
2011-07-28 15:48:03 +03:00
|
|
|
//now wait some and do validations.
|
|
|
|
|
doneLatch.await(5, TimeUnit.SECONDS) must be(true)
|
2011-03-16 12:37:48 +01:00
|
|
|
|
2011-07-28 16:56:35 +03:00
|
|
|
for (i ← 0 until connectionCount) {
|
2011-07-28 15:48:03 +03:00
|
|
|
val counter = counters.get(i).get
|
|
|
|
|
counter.get must be((iterationCount * (i + 1)))
|
2011-06-02 22:54:38 +02:00
|
|
|
}
|
2011-03-16 12:37:48 +01:00
|
|
|
}
|
|
|
|
|
|
2011-07-28 15:48:03 +03:00
|
|
|
"deliver a broadcast message using the !" in {
|
|
|
|
|
val doneLatch = new CountDownLatch(2)
|
2011-03-16 12:37:48 +01:00
|
|
|
|
2011-07-28 15:48:03 +03:00
|
|
|
val counter1 = new AtomicInteger
|
2011-12-13 16:59:43 +01:00
|
|
|
val actor1 = system.actorOf(Props(new Actor {
|
2011-07-28 15:48:03 +03:00
|
|
|
def receive = {
|
2011-07-28 16:56:35 +03:00
|
|
|
case "end" ⇒ doneLatch.countDown()
|
|
|
|
|
case msg: Int ⇒ counter1.addAndGet(msg)
|
2011-04-19 17:45:01 +12:00
|
|
|
}
|
2011-12-13 14:09:40 +01:00
|
|
|
}))
|
2011-03-16 12:37:48 +01:00
|
|
|
|
2011-07-28 15:48:03 +03:00
|
|
|
val counter2 = new AtomicInteger
|
2011-12-13 16:59:43 +01:00
|
|
|
val actor2 = system.actorOf(Props(new Actor {
|
2011-04-19 17:45:01 +12:00
|
|
|
def receive = {
|
2011-07-28 16:56:35 +03:00
|
|
|
case "end" ⇒ doneLatch.countDown()
|
|
|
|
|
case msg: Int ⇒ counter2.addAndGet(msg)
|
2011-04-19 17:45:01 +12:00
|
|
|
}
|
2011-12-13 14:09:40 +01:00
|
|
|
}))
|
2011-03-16 13:20:00 +01:00
|
|
|
|
2011-12-17 16:33:29 +01:00
|
|
|
val routedActor = system.actorOf(Props[TestActor].withRouter(RoundRobinRouter(routees = List(actor1, actor2))))
|
2011-04-19 17:45:01 +12:00
|
|
|
|
2011-12-11 22:34:38 +01:00
|
|
|
routedActor ! Broadcast(1)
|
|
|
|
|
routedActor ! Broadcast("end")
|
2011-04-19 17:45:01 +12:00
|
|
|
|
2011-07-28 15:48:03 +03:00
|
|
|
doneLatch.await(5, TimeUnit.SECONDS) must be(true)
|
2011-04-19 17:45:01 +12:00
|
|
|
|
2011-07-28 15:48:03 +03:00
|
|
|
counter1.get must be(1)
|
|
|
|
|
counter2.get must be(1)
|
2011-03-16 13:20:00 +01:00
|
|
|
}
|
2011-07-28 15:48:03 +03:00
|
|
|
}
|
2011-03-16 13:20:00 +01:00
|
|
|
|
2011-07-28 15:48:03 +03:00
|
|
|
"random router" must {
|
2011-04-19 17:45:01 +12:00
|
|
|
|
2011-07-28 15:48:03 +03:00
|
|
|
"be started when constructed" in {
|
2011-12-14 14:05:44 +01:00
|
|
|
val routedActor = system.actorOf(Props[TestActor].withRouter(RandomRouter(nrOfInstances = 1)))
|
2011-12-11 22:34:38 +01:00
|
|
|
routedActor.isTerminated must be(false)
|
2011-07-28 15:48:03 +03:00
|
|
|
}
|
2011-04-19 17:45:01 +12:00
|
|
|
|
2011-07-28 15:48:03 +03:00
|
|
|
"deliver a broadcast message" in {
|
|
|
|
|
val doneLatch = new CountDownLatch(2)
|
2011-03-16 13:20:00 +01:00
|
|
|
|
2011-07-28 15:48:03 +03:00
|
|
|
val counter1 = new AtomicInteger
|
2011-12-13 16:59:43 +01:00
|
|
|
val actor1 = system.actorOf(Props(new Actor {
|
2011-07-28 15:48:03 +03:00
|
|
|
def receive = {
|
2011-07-28 16:56:35 +03:00
|
|
|
case "end" ⇒ doneLatch.countDown()
|
|
|
|
|
case msg: Int ⇒ counter1.addAndGet(msg)
|
2011-02-28 22:55:02 +01:00
|
|
|
}
|
2011-12-13 14:09:40 +01:00
|
|
|
}))
|
2011-03-16 13:20:00 +01:00
|
|
|
|
2011-07-28 15:48:03 +03:00
|
|
|
val counter2 = new AtomicInteger
|
2011-12-13 16:59:43 +01:00
|
|
|
val actor2 = system.actorOf(Props(new Actor {
|
2011-07-28 15:48:03 +03:00
|
|
|
def receive = {
|
2011-07-28 16:56:35 +03:00
|
|
|
case "end" ⇒ doneLatch.countDown()
|
|
|
|
|
case msg: Int ⇒ counter2.addAndGet(msg)
|
2011-07-28 15:48:03 +03:00
|
|
|
}
|
2011-12-13 14:09:40 +01:00
|
|
|
}))
|
2011-04-19 17:45:01 +12:00
|
|
|
|
2011-12-17 16:33:29 +01:00
|
|
|
val routedActor = system.actorOf(Props[TestActor].withRouter(RandomRouter(routees = List(actor1, actor2))))
|
2011-04-19 17:45:01 +12:00
|
|
|
|
2011-12-11 22:34:38 +01:00
|
|
|
routedActor ! Broadcast(1)
|
|
|
|
|
routedActor ! Broadcast("end")
|
2011-04-19 17:45:01 +12:00
|
|
|
|
2011-07-28 15:48:03 +03:00
|
|
|
doneLatch.await(5, TimeUnit.SECONDS) must be(true)
|
2011-04-19 17:45:01 +12:00
|
|
|
|
2011-07-28 15:48:03 +03:00
|
|
|
counter1.get must be(1)
|
|
|
|
|
counter2.get must be(1)
|
2011-03-16 13:20:00 +01:00
|
|
|
}
|
2011-12-11 22:34:38 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
"broadcast router" must {
|
|
|
|
|
"be started when constructed" in {
|
2011-12-14 14:05:44 +01:00
|
|
|
val routedActor = system.actorOf(Props[TestActor].withRouter(BroadcastRouter(nrOfInstances = 1)))
|
2011-12-11 22:34:38 +01:00
|
|
|
routedActor.isTerminated must be(false)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
"broadcast message using !" in {
|
|
|
|
|
val doneLatch = new CountDownLatch(2)
|
2011-04-19 17:45:01 +12:00
|
|
|
|
2011-07-28 15:48:03 +03:00
|
|
|
val counter1 = new AtomicInteger
|
2011-12-13 16:59:43 +01:00
|
|
|
val actor1 = system.actorOf(Props(new Actor {
|
2011-07-28 15:48:03 +03:00
|
|
|
def receive = {
|
2011-12-11 22:34:38 +01:00
|
|
|
case "end" ⇒ doneLatch.countDown()
|
|
|
|
|
case msg: Int ⇒ counter1.addAndGet(msg)
|
2011-07-28 15:48:03 +03:00
|
|
|
}
|
2011-12-13 14:09:40 +01:00
|
|
|
}))
|
2011-04-19 17:45:01 +12:00
|
|
|
|
2011-12-11 22:34:38 +01:00
|
|
|
val counter2 = new AtomicInteger
|
2011-12-13 16:59:43 +01:00
|
|
|
val actor2 = system.actorOf(Props(new Actor {
|
2011-12-11 22:34:38 +01:00
|
|
|
def receive = {
|
|
|
|
|
case "end" ⇒ doneLatch.countDown()
|
|
|
|
|
case msg: Int ⇒ counter2.addAndGet(msg)
|
|
|
|
|
}
|
2011-12-13 14:09:40 +01:00
|
|
|
}))
|
2011-04-19 17:45:01 +12:00
|
|
|
|
2011-12-17 16:33:29 +01:00
|
|
|
val routedActor = system.actorOf(Props[TestActor].withRouter(BroadcastRouter(routees = List(actor1, actor2))))
|
2011-12-11 22:34:38 +01:00
|
|
|
routedActor ! 1
|
|
|
|
|
routedActor ! "end"
|
2011-03-16 13:20:00 +01:00
|
|
|
|
2011-07-28 15:48:03 +03:00
|
|
|
doneLatch.await(5, TimeUnit.SECONDS) must be(true)
|
2011-12-11 22:34:38 +01:00
|
|
|
|
|
|
|
|
counter1.get must be(1)
|
|
|
|
|
counter2.get must be(1)
|
2011-03-16 13:20:00 +01:00
|
|
|
}
|
2011-10-07 15:42:55 +02:00
|
|
|
|
2011-12-11 22:34:38 +01:00
|
|
|
"broadcast message using ?" in {
|
|
|
|
|
val doneLatch = new CountDownLatch(2)
|
|
|
|
|
|
|
|
|
|
val counter1 = new AtomicInteger
|
2011-12-13 16:59:43 +01:00
|
|
|
val actor1 = system.actorOf(Props(new Actor {
|
2011-12-11 22:34:38 +01:00
|
|
|
def receive = {
|
|
|
|
|
case "end" ⇒ doneLatch.countDown()
|
|
|
|
|
case msg: Int ⇒
|
|
|
|
|
counter1.addAndGet(msg)
|
|
|
|
|
sender ! "ack"
|
|
|
|
|
}
|
2011-12-13 14:09:40 +01:00
|
|
|
}))
|
2011-12-11 22:34:38 +01:00
|
|
|
|
|
|
|
|
val counter2 = new AtomicInteger
|
2011-12-13 16:59:43 +01:00
|
|
|
val actor2 = system.actorOf(Props(new Actor {
|
2011-12-11 22:34:38 +01:00
|
|
|
def receive = {
|
|
|
|
|
case "end" ⇒ doneLatch.countDown()
|
|
|
|
|
case msg: Int ⇒ counter2.addAndGet(msg)
|
|
|
|
|
}
|
2011-12-13 14:09:40 +01:00
|
|
|
}))
|
2011-10-07 15:42:55 +02:00
|
|
|
|
2011-12-17 16:33:29 +01:00
|
|
|
val routedActor = system.actorOf(Props[TestActor].withRouter(BroadcastRouter(routees = List(actor1, actor2))))
|
2011-12-11 22:34:38 +01:00
|
|
|
routedActor ? 1
|
|
|
|
|
routedActor ! "end"
|
2011-10-07 15:42:55 +02:00
|
|
|
|
2011-12-11 22:34:38 +01:00
|
|
|
doneLatch.await(5, TimeUnit.SECONDS) must be(true)
|
2011-10-07 15:42:55 +02:00
|
|
|
|
2011-12-11 22:34:38 +01:00
|
|
|
counter1.get must be(1)
|
|
|
|
|
counter2.get must be(1)
|
|
|
|
|
}
|
|
|
|
|
}
|
2011-10-07 15:42:55 +02:00
|
|
|
|
2011-12-12 15:06:40 +01:00
|
|
|
"Scatter-gather router" must {
|
2011-10-07 15:42:55 +02:00
|
|
|
|
|
|
|
|
"be started when constructed" in {
|
2011-12-17 16:33:29 +01:00
|
|
|
val routedActor = system.actorOf(Props[TestActor].withRouter(ScatterGatherFirstCompletedRouter(routees = List(newActor(0)))))
|
2011-12-12 15:06:40 +01:00
|
|
|
routedActor.isTerminated must be(false)
|
2011-10-07 15:42:55 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
"deliver a broadcast message using the !" in {
|
|
|
|
|
val doneLatch = new TestLatch(2)
|
|
|
|
|
|
|
|
|
|
val counter1 = new AtomicInteger
|
2011-12-13 16:59:43 +01:00
|
|
|
val actor1 = system.actorOf(Props(new Actor {
|
2011-10-07 15:42:55 +02:00
|
|
|
def receive = {
|
2011-12-12 15:06:40 +01:00
|
|
|
case "end" ⇒ doneLatch.countDown()
|
2011-10-07 15:42:55 +02:00
|
|
|
case msg: Int ⇒ counter1.addAndGet(msg)
|
|
|
|
|
}
|
2011-12-13 14:09:40 +01:00
|
|
|
}))
|
2011-10-07 15:42:55 +02:00
|
|
|
|
|
|
|
|
val counter2 = new AtomicInteger
|
2011-12-13 16:59:43 +01:00
|
|
|
val actor2 = system.actorOf(Props(new Actor {
|
2011-10-07 15:42:55 +02:00
|
|
|
def receive = {
|
2011-12-12 15:06:40 +01:00
|
|
|
case "end" ⇒ doneLatch.countDown()
|
2011-10-07 15:42:55 +02:00
|
|
|
case msg: Int ⇒ counter2.addAndGet(msg)
|
|
|
|
|
}
|
2011-12-13 14:09:40 +01:00
|
|
|
}))
|
2011-10-07 15:42:55 +02:00
|
|
|
|
2011-12-17 16:33:29 +01:00
|
|
|
val routedActor = system.actorOf(Props[TestActor].withRouter(ScatterGatherFirstCompletedRouter(routees = List(actor1, actor2))))
|
2011-12-12 15:06:40 +01:00
|
|
|
routedActor ! Broadcast(1)
|
|
|
|
|
routedActor ! Broadcast("end")
|
2011-10-07 15:42:55 +02:00
|
|
|
|
|
|
|
|
doneLatch.await
|
|
|
|
|
|
|
|
|
|
counter1.get must be(1)
|
|
|
|
|
counter2.get must be(1)
|
|
|
|
|
}
|
|
|
|
|
|
2011-12-12 15:06:40 +01:00
|
|
|
"return response, even if one of the actors has stopped" in {
|
|
|
|
|
val shutdownLatch = new TestLatch(1)
|
|
|
|
|
val actor1 = newActor(1, Some(shutdownLatch))
|
|
|
|
|
val actor2 = newActor(22, Some(shutdownLatch))
|
2011-12-17 16:33:29 +01:00
|
|
|
val routedActor = system.actorOf(Props[TestActor].withRouter(ScatterGatherFirstCompletedRouter(routees = List(actor1, actor2))))
|
2011-12-12 15:06:40 +01:00
|
|
|
|
|
|
|
|
routedActor ! Broadcast(Stop(Some(1)))
|
|
|
|
|
shutdownLatch.await
|
2011-12-14 17:26:18 +01:00
|
|
|
Await.result(routedActor ? Broadcast(0), timeout.duration) must be(22)
|
2011-12-12 15:06:40 +01:00
|
|
|
}
|
2011-10-07 15:42:55 +02:00
|
|
|
|
2011-12-12 15:06:40 +01:00
|
|
|
case class Stop(id: Option[Int] = None)
|
2011-10-07 15:42:55 +02:00
|
|
|
|
2011-12-12 15:06:40 +01:00
|
|
|
def newActor(id: Int, shudownLatch: Option[TestLatch] = None) = system.actorOf(Props(new Actor {
|
|
|
|
|
def receive = {
|
2011-12-14 00:06:36 +01:00
|
|
|
case Stop(None) ⇒ context.stop(self)
|
|
|
|
|
case Stop(Some(_id)) if (_id == id) ⇒ context.stop(self)
|
2011-12-12 15:06:40 +01:00
|
|
|
case _id: Int if (_id == id) ⇒
|
|
|
|
|
case x ⇒ {
|
|
|
|
|
Thread sleep 100 * id
|
|
|
|
|
sender.tell(id)
|
|
|
|
|
}
|
2011-10-07 15:42:55 +02:00
|
|
|
}
|
2011-11-24 16:35:37 +01:00
|
|
|
|
2011-12-12 15:06:40 +01:00
|
|
|
override def postStop = {
|
|
|
|
|
shudownLatch foreach (_.countDown())
|
|
|
|
|
}
|
|
|
|
|
}), "Actor:" + id)
|
|
|
|
|
}
|
2011-12-15 15:28:21 +01:00
|
|
|
|
|
|
|
|
"custom router" must {
|
|
|
|
|
"be started when constructed" in {
|
2011-12-15 18:19:40 +01:00
|
|
|
val routedActor = system.actorOf(Props[TestActor].withRouter(VoteCountRouter()))
|
2011-12-15 15:28:21 +01:00
|
|
|
routedActor.isTerminated must be(false)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
"count votes as intended - not as in Florida" in {
|
2011-12-15 18:19:40 +01:00
|
|
|
val routedActor = system.actorOf(Props[TestActor].withRouter(VoteCountRouter()))
|
2011-12-15 15:28:21 +01:00
|
|
|
routedActor ! DemocratVote
|
|
|
|
|
routedActor ! DemocratVote
|
|
|
|
|
routedActor ! RepublicanVote
|
|
|
|
|
routedActor ! DemocratVote
|
|
|
|
|
routedActor ! RepublicanVote
|
|
|
|
|
val democratsResult = (routedActor ? DemocratCountResult)
|
|
|
|
|
val republicansResult = (routedActor ? RepublicanCountResult)
|
|
|
|
|
|
2011-12-15 20:42:51 +01:00
|
|
|
Await.result(democratsResult, 1 seconds) === 3
|
|
|
|
|
Await.result(republicansResult, 1 seconds) === 2
|
2011-12-15 15:28:21 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// DO NOT CHANGE THE COMMENTS BELOW AS THEY ARE USED IN THE DOCUMENTATION
|
|
|
|
|
|
|
|
|
|
//#CustomRouter
|
|
|
|
|
//#crMessages
|
|
|
|
|
case object DemocratVote
|
|
|
|
|
case object DemocratCountResult
|
|
|
|
|
case object RepublicanVote
|
|
|
|
|
case object RepublicanCountResult
|
|
|
|
|
//#crMessages
|
|
|
|
|
|
|
|
|
|
//#crActors
|
|
|
|
|
class DemocratActor extends Actor {
|
2011-12-15 18:19:40 +01:00
|
|
|
var counter = 0
|
2011-12-15 15:28:21 +01:00
|
|
|
|
|
|
|
|
def receive = {
|
2011-12-15 18:19:40 +01:00
|
|
|
case DemocratVote ⇒ counter += 1
|
|
|
|
|
case DemocratCountResult ⇒ sender ! counter
|
2011-12-15 15:28:21 +01:00
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
class RepublicanActor extends Actor {
|
2011-12-15 18:19:40 +01:00
|
|
|
var counter = 0
|
2011-12-15 15:28:21 +01:00
|
|
|
|
|
|
|
|
def receive = {
|
2011-12-15 18:19:40 +01:00
|
|
|
case RepublicanVote ⇒ counter += 1
|
|
|
|
|
case RepublicanCountResult ⇒ sender ! counter
|
2011-12-15 15:28:21 +01:00
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
//#crActors
|
|
|
|
|
|
|
|
|
|
//#crRouter
|
2011-12-17 16:33:29 +01:00
|
|
|
case class VoteCountRouter(nrOfInstances: Int = 0, routees: Iterable[String] = Nil)
|
2011-12-15 15:28:21 +01:00
|
|
|
extends RouterConfig {
|
|
|
|
|
|
|
|
|
|
//#crRoute
|
|
|
|
|
def createRoute(props: Props,
|
|
|
|
|
actorContext: ActorContext,
|
|
|
|
|
ref: RoutedActorRef): Route = {
|
2011-12-15 21:24:10 +01:00
|
|
|
val democratActor = actorContext.actorOf(Props(new DemocratActor()), "d")
|
|
|
|
|
val republicanActor = actorContext.actorOf(Props(new RepublicanActor()), "r")
|
2011-12-15 15:28:21 +01:00
|
|
|
val routees = Vector[ActorRef](democratActor, republicanActor)
|
|
|
|
|
|
|
|
|
|
//#crRegisterRoutees
|
|
|
|
|
registerRoutees(actorContext, routees)
|
|
|
|
|
//#crRegisterRoutees
|
|
|
|
|
|
|
|
|
|
//#crRoutingLogic
|
|
|
|
|
{
|
|
|
|
|
case (sender, message) ⇒
|
|
|
|
|
message match {
|
|
|
|
|
case DemocratVote | DemocratCountResult ⇒
|
|
|
|
|
List(Destination(sender, democratActor))
|
|
|
|
|
case RepublicanVote | RepublicanCountResult ⇒
|
|
|
|
|
List(Destination(sender, republicanActor))
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
//#crRoutingLogic
|
|
|
|
|
}
|
|
|
|
|
//#crRoute
|
|
|
|
|
|
|
|
|
|
}
|
|
|
|
|
//#crRouter
|
|
|
|
|
//#CustomRouter
|
|
|
|
|
}
|
2010-08-24 23:21:28 +02:00
|
|
|
}
|