2011-12-13 01:09:05 +01:00
|
|
|
/**
|
|
|
|
|
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
|
|
|
|
|
*/
|
2011-07-28 15:48:03 +03:00
|
|
|
package akka.routing
|
2010-08-24 23:21:28 +02:00
|
|
|
|
|
|
|
|
import java.util.concurrent.atomic.AtomicInteger
|
2011-11-11 19:57:27 +01:00
|
|
|
import akka.actor._
|
2012-01-05 17:59:19 +01:00
|
|
|
import scala.collection.mutable.LinkedList
|
2011-10-07 15:42:55 +02:00
|
|
|
import akka.testkit._
|
2011-12-13 16:05:56 +01:00
|
|
|
import akka.util.duration._
|
2011-12-12 22:50:08 +01:00
|
|
|
import akka.dispatch.Await
|
2011-12-20 19:57:42 +01:00
|
|
|
import akka.util.Duration
|
2011-12-29 17:11:21 +01:00
|
|
|
import akka.config.ConfigurationException
|
|
|
|
|
import com.typesafe.config.ConfigFactory
|
2011-05-23 17:08:45 +02:00
|
|
|
|
|
|
|
|
object RoutingSpec {
|
|
|
|
|
|
2012-01-10 17:50:17 +01:00
|
|
|
val config = """
|
|
|
|
|
akka.actor.deployment {
|
|
|
|
|
/router1 {
|
|
|
|
|
router = round-robin
|
|
|
|
|
nr-of-instances = 3
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
"""
|
|
|
|
|
|
2011-12-13 16:05:56 +01:00
|
|
|
class TestActor extends Actor {
|
2011-07-28 15:48:03 +03:00
|
|
|
def receive = {
|
2011-07-28 16:56:35 +03:00
|
|
|
case _ ⇒
|
2011-07-28 15:48:03 +03:00
|
|
|
println("Hello")
|
2011-05-23 17:08:45 +02:00
|
|
|
}
|
|
|
|
|
}
|
2011-12-11 22:34:38 +01:00
|
|
|
|
2011-12-13 16:05:56 +01:00
|
|
|
class Echo extends Actor {
|
|
|
|
|
def receive = {
|
|
|
|
|
case _ ⇒ sender ! self
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2011-05-23 17:08:45 +02:00
|
|
|
}
|
2010-08-24 23:21:28 +02:00
|
|
|
|
2011-10-21 17:01:22 +02:00
|
|
|
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
|
2012-01-10 17:50:17 +01:00
|
|
|
class RoutingSpec extends AkkaSpec(RoutingSpec.config) with DefaultTimeout with ImplicitSender {
|
2010-08-24 23:21:28 +02:00
|
|
|
|
2011-07-28 15:48:03 +03:00
|
|
|
import akka.routing.RoutingSpec._
|
2010-08-24 23:21:28 +02:00
|
|
|
|
2011-12-13 16:05:56 +01:00
|
|
|
"routers in general" must {
|
|
|
|
|
|
|
|
|
|
"evict terminated routees" in {
|
|
|
|
|
val router = system.actorOf(Props[Echo].withRouter(RoundRobinRouter(2)))
|
|
|
|
|
router ! ""
|
|
|
|
|
router ! ""
|
|
|
|
|
val c1, c2 = expectMsgType[ActorRef]
|
|
|
|
|
watch(router)
|
|
|
|
|
watch(c2)
|
2011-12-14 00:06:36 +01:00
|
|
|
system.stop(c2)
|
2011-12-13 16:05:56 +01:00
|
|
|
expectMsg(Terminated(c2))
|
|
|
|
|
// it might take a while until the Router has actually processed the Terminated message
|
|
|
|
|
awaitCond {
|
|
|
|
|
router ! ""
|
|
|
|
|
router ! ""
|
|
|
|
|
val res = receiveWhile(100 millis, messages = 2) {
|
|
|
|
|
case x: ActorRef ⇒ x
|
|
|
|
|
}
|
|
|
|
|
res == Seq(c1, c1)
|
|
|
|
|
}
|
2011-12-14 00:06:36 +01:00
|
|
|
system.stop(c1)
|
2011-12-13 16:05:56 +01:00
|
|
|
expectMsg(Terminated(router))
|
|
|
|
|
}
|
|
|
|
|
|
2011-12-19 13:47:49 +01:00
|
|
|
"be able to send their routees" in {
|
2011-12-20 19:57:42 +01:00
|
|
|
val doneLatch = new TestLatch(1)
|
2011-12-19 13:47:49 +01:00
|
|
|
|
|
|
|
|
class TheActor extends Actor {
|
|
|
|
|
val routee1 = context.actorOf(Props[TestActor], "routee1")
|
|
|
|
|
val routee2 = context.actorOf(Props[TestActor], "routee2")
|
|
|
|
|
val routee3 = context.actorOf(Props[TestActor], "routee3")
|
2011-12-20 19:57:42 +01:00
|
|
|
val router = context.actorOf(Props[TestActor].withRouter(
|
|
|
|
|
ScatterGatherFirstCompletedRouter(
|
|
|
|
|
routees = List(routee1, routee2, routee3),
|
|
|
|
|
within = 5 seconds)))
|
2011-12-19 13:47:49 +01:00
|
|
|
|
|
|
|
|
def receive = {
|
|
|
|
|
case RouterRoutees(iterable) ⇒
|
2011-12-19 16:35:35 +01:00
|
|
|
iterable.exists(_.path.name == "routee1") must be(true)
|
|
|
|
|
iterable.exists(_.path.name == "routee2") must be(true)
|
2011-12-19 18:35:48 +01:00
|
|
|
iterable.exists(_.path.name == "routee3") must be(true)
|
2011-12-19 13:47:49 +01:00
|
|
|
doneLatch.countDown()
|
|
|
|
|
case "doIt" ⇒
|
|
|
|
|
router ! CurrentRoutees
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
val theActor = system.actorOf(Props(new TheActor), "theActor")
|
|
|
|
|
theActor ! "doIt"
|
2011-12-20 19:57:42 +01:00
|
|
|
Await.ready(doneLatch, 1 seconds)
|
2011-12-19 13:47:49 +01:00
|
|
|
}
|
|
|
|
|
|
2012-01-10 17:50:17 +01:00
|
|
|
"use configured nr-of-instances when FromConfig" in {
|
|
|
|
|
val router = system.actorOf(Props[TestActor].withRouter(FromConfig), "router1")
|
|
|
|
|
Await.result(router ? CurrentRoutees, 5 seconds).asInstanceOf[RouterRoutees].routees.size must be(3)
|
|
|
|
|
system.stop(router)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
"use configured nr-of-instances when router is specified" in {
|
|
|
|
|
val router = system.actorOf(Props[TestActor].withRouter(RoundRobinRouter(nrOfInstances = 2)), "router1")
|
|
|
|
|
Await.result(router ? CurrentRoutees, 5 seconds).asInstanceOf[RouterRoutees].routees.size must be(3)
|
|
|
|
|
system.stop(router)
|
|
|
|
|
}
|
|
|
|
|
|
2011-12-13 16:05:56 +01:00
|
|
|
}
|
|
|
|
|
|
2011-12-11 22:34:38 +01:00
|
|
|
"no router" must {
|
2011-07-28 15:48:03 +03:00
|
|
|
"be started when constructed" in {
|
2011-12-14 14:05:44 +01:00
|
|
|
val routedActor = system.actorOf(Props[TestActor].withRouter(NoRouter))
|
2011-12-11 22:34:38 +01:00
|
|
|
routedActor.isTerminated must be(false)
|
2011-07-28 15:48:03 +03:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
"send message to connection" in {
|
2011-12-19 15:05:33 +01:00
|
|
|
val doneLatch = new TestLatch(1)
|
2011-07-28 15:48:03 +03:00
|
|
|
|
|
|
|
|
val counter = new AtomicInteger(0)
|
2011-12-11 22:34:38 +01:00
|
|
|
|
|
|
|
|
class Actor1 extends Actor {
|
2011-04-19 17:45:01 +12:00
|
|
|
def receive = {
|
2011-07-28 16:56:35 +03:00
|
|
|
case "end" ⇒ doneLatch.countDown()
|
|
|
|
|
case _ ⇒ counter.incrementAndGet
|
2011-04-19 17:45:01 +12:00
|
|
|
}
|
2011-12-11 22:34:38 +01:00
|
|
|
}
|
2010-08-24 23:21:28 +02:00
|
|
|
|
2011-12-13 11:32:24 +01:00
|
|
|
val routedActor = system.actorOf(Props(new Actor1).withRouter(NoRouter))
|
2011-07-28 15:48:03 +03:00
|
|
|
routedActor ! "hello"
|
|
|
|
|
routedActor ! "end"
|
2010-08-24 23:21:28 +02:00
|
|
|
|
2011-12-19 15:05:33 +01:00
|
|
|
Await.ready(doneLatch, 5 seconds)
|
2010-08-24 23:21:28 +02:00
|
|
|
|
2011-07-28 15:48:03 +03:00
|
|
|
counter.get must be(1)
|
2011-04-19 17:45:01 +12:00
|
|
|
}
|
2011-07-28 15:48:03 +03:00
|
|
|
}
|
2010-08-24 23:21:28 +02:00
|
|
|
|
2011-07-28 15:48:03 +03:00
|
|
|
"round robin router" must {
|
|
|
|
|
"be started when constructed" in {
|
2011-12-14 14:05:44 +01:00
|
|
|
val routedActor = system.actorOf(Props[TestActor].withRouter(RoundRobinRouter(nrOfInstances = 1)))
|
2011-12-11 22:34:38 +01:00
|
|
|
routedActor.isTerminated must be(false)
|
2011-07-28 15:48:03 +03:00
|
|
|
}
|
2011-07-13 19:18:04 +02:00
|
|
|
|
2011-07-28 15:48:03 +03:00
|
|
|
//In this test a bunch of actors are created and each actor has its own counter.
|
|
|
|
|
//to test round robin, the routed actor receives the following sequence of messages 1 2 3 .. 1 2 3 .. 1 2 3 which it
|
|
|
|
|
//uses to increment his counter.
|
|
|
|
|
//So after n iteration, the first actor his counter should be 1*n, the second 2*n etc etc.
|
|
|
|
|
"deliver messages in a round robin fashion" in {
|
|
|
|
|
val connectionCount = 10
|
|
|
|
|
val iterationCount = 10
|
2011-12-19 15:05:33 +01:00
|
|
|
val doneLatch = new TestLatch(connectionCount)
|
2011-07-28 15:48:03 +03:00
|
|
|
|
|
|
|
|
//lets create some connections.
|
2011-12-11 22:34:38 +01:00
|
|
|
var actors = new LinkedList[ActorRef]
|
2011-07-28 15:48:03 +03:00
|
|
|
var counters = new LinkedList[AtomicInteger]
|
2011-07-28 16:56:35 +03:00
|
|
|
for (i ← 0 until connectionCount) {
|
2011-07-28 15:48:03 +03:00
|
|
|
counters = counters :+ new AtomicInteger()
|
|
|
|
|
|
2011-12-13 16:59:43 +01:00
|
|
|
val actor = system.actorOf(Props(new Actor {
|
2011-07-28 15:48:03 +03:00
|
|
|
def receive = {
|
2011-07-28 16:56:35 +03:00
|
|
|
case "end" ⇒ doneLatch.countDown()
|
|
|
|
|
case msg: Int ⇒ counters.get(i).get.addAndGet(msg)
|
2011-07-28 15:48:03 +03:00
|
|
|
}
|
2011-12-13 14:09:40 +01:00
|
|
|
}))
|
2011-12-11 22:34:38 +01:00
|
|
|
actors = actors :+ actor
|
2011-07-28 15:48:03 +03:00
|
|
|
}
|
2011-07-13 19:18:04 +02:00
|
|
|
|
2011-12-17 16:33:29 +01:00
|
|
|
val routedActor = system.actorOf(Props[TestActor].withRouter(RoundRobinRouter(routees = actors)))
|
2011-04-19 17:45:01 +12:00
|
|
|
|
2011-07-28 15:48:03 +03:00
|
|
|
//send messages to the actor.
|
2011-07-28 16:56:35 +03:00
|
|
|
for (i ← 0 until iterationCount) {
|
|
|
|
|
for (k ← 0 until connectionCount) {
|
2011-12-11 22:34:38 +01:00
|
|
|
routedActor ! (k + 1)
|
2011-07-28 15:48:03 +03:00
|
|
|
}
|
|
|
|
|
}
|
2011-07-13 19:18:04 +02:00
|
|
|
|
2011-12-11 22:34:38 +01:00
|
|
|
routedActor ! Broadcast("end")
|
2011-07-28 15:48:03 +03:00
|
|
|
//now wait some and do validations.
|
2011-12-19 15:05:33 +01:00
|
|
|
Await.ready(doneLatch, 5 seconds)
|
2011-03-16 12:37:48 +01:00
|
|
|
|
2011-07-28 16:56:35 +03:00
|
|
|
for (i ← 0 until connectionCount) {
|
2011-07-28 15:48:03 +03:00
|
|
|
val counter = counters.get(i).get
|
|
|
|
|
counter.get must be((iterationCount * (i + 1)))
|
2011-06-02 22:54:38 +02:00
|
|
|
}
|
2011-03-16 12:37:48 +01:00
|
|
|
}
|
|
|
|
|
|
2011-07-28 15:48:03 +03:00
|
|
|
"deliver a broadcast message using the !" in {
|
2011-12-19 15:05:33 +01:00
|
|
|
val doneLatch = new TestLatch(2)
|
2011-03-16 12:37:48 +01:00
|
|
|
|
2011-07-28 15:48:03 +03:00
|
|
|
val counter1 = new AtomicInteger
|
2011-12-13 16:59:43 +01:00
|
|
|
val actor1 = system.actorOf(Props(new Actor {
|
2011-07-28 15:48:03 +03:00
|
|
|
def receive = {
|
2011-07-28 16:56:35 +03:00
|
|
|
case "end" ⇒ doneLatch.countDown()
|
|
|
|
|
case msg: Int ⇒ counter1.addAndGet(msg)
|
2011-04-19 17:45:01 +12:00
|
|
|
}
|
2011-12-13 14:09:40 +01:00
|
|
|
}))
|
2011-03-16 12:37:48 +01:00
|
|
|
|
2011-07-28 15:48:03 +03:00
|
|
|
val counter2 = new AtomicInteger
|
2011-12-13 16:59:43 +01:00
|
|
|
val actor2 = system.actorOf(Props(new Actor {
|
2011-04-19 17:45:01 +12:00
|
|
|
def receive = {
|
2011-07-28 16:56:35 +03:00
|
|
|
case "end" ⇒ doneLatch.countDown()
|
|
|
|
|
case msg: Int ⇒ counter2.addAndGet(msg)
|
2011-04-19 17:45:01 +12:00
|
|
|
}
|
2011-12-13 14:09:40 +01:00
|
|
|
}))
|
2011-03-16 13:20:00 +01:00
|
|
|
|
2011-12-17 16:33:29 +01:00
|
|
|
val routedActor = system.actorOf(Props[TestActor].withRouter(RoundRobinRouter(routees = List(actor1, actor2))))
|
2011-04-19 17:45:01 +12:00
|
|
|
|
2011-12-11 22:34:38 +01:00
|
|
|
routedActor ! Broadcast(1)
|
|
|
|
|
routedActor ! Broadcast("end")
|
2011-04-19 17:45:01 +12:00
|
|
|
|
2011-12-19 15:05:33 +01:00
|
|
|
Await.ready(doneLatch, 5 seconds)
|
2011-04-19 17:45:01 +12:00
|
|
|
|
2011-07-28 15:48:03 +03:00
|
|
|
counter1.get must be(1)
|
|
|
|
|
counter2.get must be(1)
|
2011-03-16 13:20:00 +01:00
|
|
|
}
|
2011-07-28 15:48:03 +03:00
|
|
|
}
|
2011-03-16 13:20:00 +01:00
|
|
|
|
2011-07-28 15:48:03 +03:00
|
|
|
"random router" must {
|
2011-04-19 17:45:01 +12:00
|
|
|
|
2011-07-28 15:48:03 +03:00
|
|
|
"be started when constructed" in {
|
2011-12-14 14:05:44 +01:00
|
|
|
val routedActor = system.actorOf(Props[TestActor].withRouter(RandomRouter(nrOfInstances = 1)))
|
2011-12-11 22:34:38 +01:00
|
|
|
routedActor.isTerminated must be(false)
|
2011-07-28 15:48:03 +03:00
|
|
|
}
|
2011-04-19 17:45:01 +12:00
|
|
|
|
2011-07-28 15:48:03 +03:00
|
|
|
"deliver a broadcast message" in {
|
2011-12-19 15:05:33 +01:00
|
|
|
val doneLatch = new TestLatch(2)
|
2011-03-16 13:20:00 +01:00
|
|
|
|
2011-07-28 15:48:03 +03:00
|
|
|
val counter1 = new AtomicInteger
|
2011-12-13 16:59:43 +01:00
|
|
|
val actor1 = system.actorOf(Props(new Actor {
|
2011-07-28 15:48:03 +03:00
|
|
|
def receive = {
|
2011-07-28 16:56:35 +03:00
|
|
|
case "end" ⇒ doneLatch.countDown()
|
|
|
|
|
case msg: Int ⇒ counter1.addAndGet(msg)
|
2011-02-28 22:55:02 +01:00
|
|
|
}
|
2011-12-13 14:09:40 +01:00
|
|
|
}))
|
2011-03-16 13:20:00 +01:00
|
|
|
|
2011-07-28 15:48:03 +03:00
|
|
|
val counter2 = new AtomicInteger
|
2011-12-13 16:59:43 +01:00
|
|
|
val actor2 = system.actorOf(Props(new Actor {
|
2011-07-28 15:48:03 +03:00
|
|
|
def receive = {
|
2011-07-28 16:56:35 +03:00
|
|
|
case "end" ⇒ doneLatch.countDown()
|
|
|
|
|
case msg: Int ⇒ counter2.addAndGet(msg)
|
2011-07-28 15:48:03 +03:00
|
|
|
}
|
2011-12-13 14:09:40 +01:00
|
|
|
}))
|
2011-04-19 17:45:01 +12:00
|
|
|
|
2011-12-17 16:33:29 +01:00
|
|
|
val routedActor = system.actorOf(Props[TestActor].withRouter(RandomRouter(routees = List(actor1, actor2))))
|
2011-04-19 17:45:01 +12:00
|
|
|
|
2011-12-11 22:34:38 +01:00
|
|
|
routedActor ! Broadcast(1)
|
|
|
|
|
routedActor ! Broadcast("end")
|
2011-04-19 17:45:01 +12:00
|
|
|
|
2011-12-19 15:05:33 +01:00
|
|
|
Await.ready(doneLatch, 5 seconds)
|
2011-04-19 17:45:01 +12:00
|
|
|
|
2011-07-28 15:48:03 +03:00
|
|
|
counter1.get must be(1)
|
|
|
|
|
counter2.get must be(1)
|
2011-03-16 13:20:00 +01:00
|
|
|
}
|
2011-12-11 22:34:38 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
"broadcast router" must {
|
|
|
|
|
"be started when constructed" in {
|
2011-12-14 14:05:44 +01:00
|
|
|
val routedActor = system.actorOf(Props[TestActor].withRouter(BroadcastRouter(nrOfInstances = 1)))
|
2011-12-11 22:34:38 +01:00
|
|
|
routedActor.isTerminated must be(false)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
"broadcast message using !" in {
|
2011-12-19 15:05:33 +01:00
|
|
|
val doneLatch = new TestLatch(2)
|
2011-04-19 17:45:01 +12:00
|
|
|
|
2011-07-28 15:48:03 +03:00
|
|
|
val counter1 = new AtomicInteger
|
2011-12-13 16:59:43 +01:00
|
|
|
val actor1 = system.actorOf(Props(new Actor {
|
2011-07-28 15:48:03 +03:00
|
|
|
def receive = {
|
2011-12-11 22:34:38 +01:00
|
|
|
case "end" ⇒ doneLatch.countDown()
|
|
|
|
|
case msg: Int ⇒ counter1.addAndGet(msg)
|
2011-07-28 15:48:03 +03:00
|
|
|
}
|
2011-12-13 14:09:40 +01:00
|
|
|
}))
|
2011-04-19 17:45:01 +12:00
|
|
|
|
2011-12-11 22:34:38 +01:00
|
|
|
val counter2 = new AtomicInteger
|
2011-12-13 16:59:43 +01:00
|
|
|
val actor2 = system.actorOf(Props(new Actor {
|
2011-12-11 22:34:38 +01:00
|
|
|
def receive = {
|
|
|
|
|
case "end" ⇒ doneLatch.countDown()
|
|
|
|
|
case msg: Int ⇒ counter2.addAndGet(msg)
|
|
|
|
|
}
|
2011-12-13 14:09:40 +01:00
|
|
|
}))
|
2011-04-19 17:45:01 +12:00
|
|
|
|
2011-12-17 16:33:29 +01:00
|
|
|
val routedActor = system.actorOf(Props[TestActor].withRouter(BroadcastRouter(routees = List(actor1, actor2))))
|
2011-12-11 22:34:38 +01:00
|
|
|
routedActor ! 1
|
|
|
|
|
routedActor ! "end"
|
2011-03-16 13:20:00 +01:00
|
|
|
|
2011-12-19 15:05:33 +01:00
|
|
|
Await.ready(doneLatch, 5 seconds)
|
2011-12-11 22:34:38 +01:00
|
|
|
|
|
|
|
|
counter1.get must be(1)
|
|
|
|
|
counter2.get must be(1)
|
2011-03-16 13:20:00 +01:00
|
|
|
}
|
2011-10-07 15:42:55 +02:00
|
|
|
|
2011-12-11 22:34:38 +01:00
|
|
|
"broadcast message using ?" in {
|
2011-12-19 15:05:33 +01:00
|
|
|
val doneLatch = new TestLatch(2)
|
2011-12-11 22:34:38 +01:00
|
|
|
|
|
|
|
|
val counter1 = new AtomicInteger
|
2011-12-13 16:59:43 +01:00
|
|
|
val actor1 = system.actorOf(Props(new Actor {
|
2011-12-11 22:34:38 +01:00
|
|
|
def receive = {
|
|
|
|
|
case "end" ⇒ doneLatch.countDown()
|
|
|
|
|
case msg: Int ⇒
|
|
|
|
|
counter1.addAndGet(msg)
|
|
|
|
|
sender ! "ack"
|
|
|
|
|
}
|
2011-12-13 14:09:40 +01:00
|
|
|
}))
|
2011-12-11 22:34:38 +01:00
|
|
|
|
|
|
|
|
val counter2 = new AtomicInteger
|
2011-12-13 16:59:43 +01:00
|
|
|
val actor2 = system.actorOf(Props(new Actor {
|
2011-12-11 22:34:38 +01:00
|
|
|
def receive = {
|
|
|
|
|
case "end" ⇒ doneLatch.countDown()
|
|
|
|
|
case msg: Int ⇒ counter2.addAndGet(msg)
|
|
|
|
|
}
|
2011-12-13 14:09:40 +01:00
|
|
|
}))
|
2011-10-07 15:42:55 +02:00
|
|
|
|
2011-12-17 16:33:29 +01:00
|
|
|
val routedActor = system.actorOf(Props[TestActor].withRouter(BroadcastRouter(routees = List(actor1, actor2))))
|
2011-12-11 22:34:38 +01:00
|
|
|
routedActor ? 1
|
|
|
|
|
routedActor ! "end"
|
2011-10-07 15:42:55 +02:00
|
|
|
|
2011-12-19 15:05:33 +01:00
|
|
|
Await.ready(doneLatch, 5 seconds)
|
2011-10-07 15:42:55 +02:00
|
|
|
|
2011-12-11 22:34:38 +01:00
|
|
|
counter1.get must be(1)
|
|
|
|
|
counter2.get must be(1)
|
|
|
|
|
}
|
|
|
|
|
}
|
2011-10-07 15:42:55 +02:00
|
|
|
|
2011-12-12 15:06:40 +01:00
|
|
|
"Scatter-gather router" must {
|
2011-10-07 15:42:55 +02:00
|
|
|
|
|
|
|
|
"be started when constructed" in {
|
2011-12-20 19:57:42 +01:00
|
|
|
val routedActor = system.actorOf(Props[TestActor].withRouter(
|
|
|
|
|
ScatterGatherFirstCompletedRouter(routees = List(newActor(0)), within = 1 seconds)))
|
2011-12-12 15:06:40 +01:00
|
|
|
routedActor.isTerminated must be(false)
|
2011-10-07 15:42:55 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
"deliver a broadcast message using the !" in {
|
|
|
|
|
val doneLatch = new TestLatch(2)
|
|
|
|
|
|
|
|
|
|
val counter1 = new AtomicInteger
|
2011-12-13 16:59:43 +01:00
|
|
|
val actor1 = system.actorOf(Props(new Actor {
|
2011-10-07 15:42:55 +02:00
|
|
|
def receive = {
|
2011-12-12 15:06:40 +01:00
|
|
|
case "end" ⇒ doneLatch.countDown()
|
2011-10-07 15:42:55 +02:00
|
|
|
case msg: Int ⇒ counter1.addAndGet(msg)
|
|
|
|
|
}
|
2011-12-13 14:09:40 +01:00
|
|
|
}))
|
2011-10-07 15:42:55 +02:00
|
|
|
|
|
|
|
|
val counter2 = new AtomicInteger
|
2011-12-13 16:59:43 +01:00
|
|
|
val actor2 = system.actorOf(Props(new Actor {
|
2011-10-07 15:42:55 +02:00
|
|
|
def receive = {
|
2011-12-12 15:06:40 +01:00
|
|
|
case "end" ⇒ doneLatch.countDown()
|
2011-10-07 15:42:55 +02:00
|
|
|
case msg: Int ⇒ counter2.addAndGet(msg)
|
|
|
|
|
}
|
2011-12-13 14:09:40 +01:00
|
|
|
}))
|
2011-10-07 15:42:55 +02:00
|
|
|
|
2011-12-20 19:57:42 +01:00
|
|
|
val routedActor = system.actorOf(Props[TestActor].withRouter(
|
|
|
|
|
ScatterGatherFirstCompletedRouter(routees = List(actor1, actor2), within = 1 seconds)))
|
2011-12-12 15:06:40 +01:00
|
|
|
routedActor ! Broadcast(1)
|
|
|
|
|
routedActor ! Broadcast("end")
|
2011-10-07 15:42:55 +02:00
|
|
|
|
2011-12-19 15:05:33 +01:00
|
|
|
Await.ready(doneLatch, TestLatch.DefaultTimeout)
|
2011-10-07 15:42:55 +02:00
|
|
|
|
|
|
|
|
counter1.get must be(1)
|
|
|
|
|
counter2.get must be(1)
|
|
|
|
|
}
|
|
|
|
|
|
2011-12-12 15:06:40 +01:00
|
|
|
"return response, even if one of the actors has stopped" in {
|
|
|
|
|
val shutdownLatch = new TestLatch(1)
|
|
|
|
|
val actor1 = newActor(1, Some(shutdownLatch))
|
2011-12-20 19:57:42 +01:00
|
|
|
val actor2 = newActor(14, Some(shutdownLatch))
|
|
|
|
|
val routedActor = system.actorOf(Props[TestActor].withRouter(
|
|
|
|
|
ScatterGatherFirstCompletedRouter(routees = List(actor1, actor2), within = 3 seconds)))
|
2011-12-12 15:06:40 +01:00
|
|
|
|
|
|
|
|
routedActor ! Broadcast(Stop(Some(1)))
|
2011-12-19 15:05:33 +01:00
|
|
|
Await.ready(shutdownLatch, TestLatch.DefaultTimeout)
|
2011-12-20 19:57:42 +01:00
|
|
|
Await.result(routedActor ? Broadcast(0), timeout.duration) must be(14)
|
2011-12-12 15:06:40 +01:00
|
|
|
}
|
2011-10-07 15:42:55 +02:00
|
|
|
|
2011-12-12 15:06:40 +01:00
|
|
|
case class Stop(id: Option[Int] = None)
|
2011-10-07 15:42:55 +02:00
|
|
|
|
2011-12-12 15:06:40 +01:00
|
|
|
def newActor(id: Int, shudownLatch: Option[TestLatch] = None) = system.actorOf(Props(new Actor {
|
|
|
|
|
def receive = {
|
2011-12-14 00:06:36 +01:00
|
|
|
case Stop(None) ⇒ context.stop(self)
|
|
|
|
|
case Stop(Some(_id)) if (_id == id) ⇒ context.stop(self)
|
2011-12-12 15:06:40 +01:00
|
|
|
case _id: Int if (_id == id) ⇒
|
|
|
|
|
case x ⇒ {
|
|
|
|
|
Thread sleep 100 * id
|
|
|
|
|
sender.tell(id)
|
|
|
|
|
}
|
2011-10-07 15:42:55 +02:00
|
|
|
}
|
2011-11-24 16:35:37 +01:00
|
|
|
|
2011-12-12 15:06:40 +01:00
|
|
|
override def postStop = {
|
|
|
|
|
shudownLatch foreach (_.countDown())
|
|
|
|
|
}
|
|
|
|
|
}), "Actor:" + id)
|
|
|
|
|
}
|
2011-12-15 15:28:21 +01:00
|
|
|
|
2011-12-29 17:11:21 +01:00
|
|
|
"router FromConfig" must {
|
|
|
|
|
"throw suitable exception when not configured" in {
|
|
|
|
|
intercept[ConfigurationException] {
|
|
|
|
|
system.actorOf(Props.empty.withRouter(FromConfig))
|
|
|
|
|
}.getMessage.contains("application.conf") must be(true)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
"allow external configuration" in {
|
|
|
|
|
val sys = ActorSystem("FromConfig", ConfigFactory
|
|
|
|
|
.parseString("akka.actor.deployment./routed.router=round-robin")
|
|
|
|
|
.withFallback(system.settings.config))
|
|
|
|
|
try {
|
|
|
|
|
sys.actorOf(Props.empty.withRouter(FromConfig), "routed")
|
|
|
|
|
} finally {
|
|
|
|
|
sys.shutdown()
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2011-12-15 15:28:21 +01:00
|
|
|
"custom router" must {
|
|
|
|
|
"be started when constructed" in {
|
2011-12-21 11:46:39 +01:00
|
|
|
val routedActor = system.actorOf(Props[TestActor].withRouter(VoteCountRouter))
|
2011-12-15 15:28:21 +01:00
|
|
|
routedActor.isTerminated must be(false)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
"count votes as intended - not as in Florida" in {
|
2012-01-05 17:59:19 +01:00
|
|
|
val routedActor = system.actorOf(Props().withRouter(VoteCountRouter))
|
2011-12-15 15:28:21 +01:00
|
|
|
routedActor ! DemocratVote
|
|
|
|
|
routedActor ! DemocratVote
|
|
|
|
|
routedActor ! RepublicanVote
|
|
|
|
|
routedActor ! DemocratVote
|
|
|
|
|
routedActor ! RepublicanVote
|
|
|
|
|
val democratsResult = (routedActor ? DemocratCountResult)
|
|
|
|
|
val republicansResult = (routedActor ? RepublicanCountResult)
|
|
|
|
|
|
2011-12-15 20:42:51 +01:00
|
|
|
Await.result(democratsResult, 1 seconds) === 3
|
|
|
|
|
Await.result(republicansResult, 1 seconds) === 2
|
2011-12-15 15:28:21 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// DO NOT CHANGE THE COMMENTS BELOW AS THEY ARE USED IN THE DOCUMENTATION
|
|
|
|
|
|
|
|
|
|
//#CustomRouter
|
|
|
|
|
//#crMessages
|
|
|
|
|
case object DemocratVote
|
|
|
|
|
case object DemocratCountResult
|
|
|
|
|
case object RepublicanVote
|
|
|
|
|
case object RepublicanCountResult
|
|
|
|
|
//#crMessages
|
|
|
|
|
|
|
|
|
|
//#crActors
|
|
|
|
|
class DemocratActor extends Actor {
|
2011-12-15 18:19:40 +01:00
|
|
|
var counter = 0
|
2011-12-15 15:28:21 +01:00
|
|
|
|
|
|
|
|
def receive = {
|
2011-12-15 18:19:40 +01:00
|
|
|
case DemocratVote ⇒ counter += 1
|
|
|
|
|
case DemocratCountResult ⇒ sender ! counter
|
2011-12-15 15:28:21 +01:00
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
class RepublicanActor extends Actor {
|
2011-12-15 18:19:40 +01:00
|
|
|
var counter = 0
|
2011-12-15 15:28:21 +01:00
|
|
|
|
|
|
|
|
def receive = {
|
2011-12-15 18:19:40 +01:00
|
|
|
case RepublicanVote ⇒ counter += 1
|
|
|
|
|
case RepublicanCountResult ⇒ sender ! counter
|
2011-12-15 15:28:21 +01:00
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
//#crActors
|
|
|
|
|
|
|
|
|
|
//#crRouter
|
2011-12-21 11:46:39 +01:00
|
|
|
object VoteCountRouter extends RouterConfig {
|
2011-12-15 15:28:21 +01:00
|
|
|
|
|
|
|
|
//#crRoute
|
|
|
|
|
def createRoute(props: Props,
|
|
|
|
|
actorContext: ActorContext,
|
|
|
|
|
ref: RoutedActorRef): Route = {
|
2011-12-15 21:24:10 +01:00
|
|
|
val democratActor = actorContext.actorOf(Props(new DemocratActor()), "d")
|
|
|
|
|
val republicanActor = actorContext.actorOf(Props(new RepublicanActor()), "r")
|
2011-12-15 15:28:21 +01:00
|
|
|
val routees = Vector[ActorRef](democratActor, republicanActor)
|
|
|
|
|
|
|
|
|
|
//#crRegisterRoutees
|
|
|
|
|
registerRoutees(actorContext, routees)
|
|
|
|
|
//#crRegisterRoutees
|
|
|
|
|
|
|
|
|
|
//#crRoutingLogic
|
|
|
|
|
{
|
|
|
|
|
case (sender, message) ⇒
|
|
|
|
|
message match {
|
|
|
|
|
case DemocratVote | DemocratCountResult ⇒
|
|
|
|
|
List(Destination(sender, democratActor))
|
|
|
|
|
case RepublicanVote | RepublicanCountResult ⇒
|
|
|
|
|
List(Destination(sender, republicanActor))
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
//#crRoutingLogic
|
|
|
|
|
}
|
|
|
|
|
//#crRoute
|
|
|
|
|
|
|
|
|
|
}
|
|
|
|
|
//#crRouter
|
|
|
|
|
//#CustomRouter
|
|
|
|
|
}
|
2010-08-24 23:21:28 +02:00
|
|
|
}
|