ticket #889
This commit is contained in:
parent
21fee0fa34
commit
0fcc35d4b0
13 changed files with 1303 additions and 1103 deletions
|
|
@ -1,721 +1,321 @@
|
|||
package akka.actor.routing
|
||||
package akka.routing
|
||||
|
||||
import org.scalatest.WordSpec
|
||||
import org.scalatest.matchers.MustMatchers
|
||||
|
||||
import akka.testkit._
|
||||
import akka.testkit.Testing.sleepFor
|
||||
import akka.util.duration._
|
||||
|
||||
import akka.actor._
|
||||
import akka.actor.Actor._
|
||||
import akka.routing._
|
||||
import akka.event.EventHandler
|
||||
|
||||
import akka.routing.Router
|
||||
import java.util.concurrent.atomic.AtomicInteger
|
||||
import akka.dispatch.{ KeptPromise, Future }
|
||||
import akka.actor.Actor._
|
||||
import akka.actor.{ActorRef, Actor}
|
||||
import collection.mutable.LinkedList
|
||||
import akka.routing.Routing.Broadcast
|
||||
import java.util.concurrent.{CountDownLatch, TimeUnit}
|
||||
|
||||
object RoutingSpec {
|
||||
trait Foo {
|
||||
def sq(x: Int, sleep: Long): Future[Int]
|
||||
}
|
||||
|
||||
class FooImpl extends Foo {
|
||||
def sq(x: Int, sleep: Long): Future[Int] = {
|
||||
if (sleep > 0) Thread.sleep(sleep)
|
||||
new KeptPromise(Right(x * x))
|
||||
class TestActor extends Actor with Serializable {
|
||||
def receive = {
|
||||
case _ =>
|
||||
println("Hello")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
class RoutingSpec extends WordSpec with MustMatchers {
|
||||
import Routing._
|
||||
|
||||
"Routing" must {
|
||||
import akka.routing.RoutingSpec._
|
||||
|
||||
"dispatch" in {
|
||||
val Test1 = "test1"
|
||||
val Test2 = "test2"
|
||||
val Test3 = "test3"
|
||||
"direct router" must {
|
||||
"be started when constructed" in {
|
||||
val actor1 = Actor.actorOf[TestActor].start
|
||||
|
||||
val t1 = actorOf(new Actor {
|
||||
def receive = {
|
||||
case Test1 ⇒ self.reply(3)
|
||||
case Test2 ⇒ self.reply(7)
|
||||
}
|
||||
}).start()
|
||||
|
||||
val t2 = actorOf(new Actor() {
|
||||
def receive = {
|
||||
case Test3 ⇒ self.reply(11)
|
||||
}
|
||||
}).start()
|
||||
|
||||
val d = routerActor {
|
||||
case Test1 | Test2 ⇒ t1
|
||||
case Test3 ⇒ t2
|
||||
}.start()
|
||||
|
||||
implicit val timeout = Actor.Timeout((5 seconds).dilated)
|
||||
val result = for {
|
||||
a ← (d ? (Test1)).as[Int]
|
||||
b ← (d ? (Test2)).as[Int]
|
||||
c ← (d ? (Test3)).as[Int]
|
||||
} yield a + b + c
|
||||
|
||||
result.isDefined must be(true)
|
||||
result.get must be(21)
|
||||
|
||||
for (a ← List(t1, t2, d)) a.stop()
|
||||
val actor = Routing.newRoutedActorRef("foo", List(actor1), RouterType.Direct)
|
||||
actor.isRunning must be(true)
|
||||
}
|
||||
|
||||
"have messages logged" in {
|
||||
val msgs = new java.util.concurrent.ConcurrentSkipListSet[Any]
|
||||
val latch = TestLatch(2)
|
||||
|
||||
val actor = actorOf(new Actor {
|
||||
def receive = { case _ ⇒ }
|
||||
}).start()
|
||||
|
||||
val logger = loggerActor(actor, x ⇒ { msgs.add(x); latch.countDown() }).start()
|
||||
|
||||
val foo: Any = "foo"
|
||||
val bar: Any = "bar"
|
||||
|
||||
logger ! foo
|
||||
logger ! bar
|
||||
|
||||
latch.await
|
||||
|
||||
msgs must (have size (2) and contain(foo) and contain(bar))
|
||||
|
||||
actor.stop()
|
||||
logger.stop()
|
||||
}
|
||||
|
||||
"dispatch to smallest mailbox" in {
|
||||
val t1Count = new AtomicInteger(0)
|
||||
val t2Count = new AtomicInteger(0)
|
||||
val latch1 = TestLatch(2501)
|
||||
val latch2 = TestLatch(2499)
|
||||
|
||||
val t1 = actorOf(new Actor {
|
||||
def receive = {
|
||||
case x ⇒
|
||||
t1Count.incrementAndGet
|
||||
latch1.countDown()
|
||||
}
|
||||
}).start()
|
||||
|
||||
t1.dispatcher.suspend(t1)
|
||||
|
||||
for (i <- 1 to 2501) t1 ! i
|
||||
|
||||
val t2 = actorOf(new Actor {
|
||||
def receive = {
|
||||
case x ⇒
|
||||
t2Count.incrementAndGet
|
||||
latch2.countDown()
|
||||
}
|
||||
}).start()
|
||||
|
||||
val d = loadBalancerActor(new SmallestMailboxFirstIterator(t1 :: t2 :: Nil)) //Will pick the last with the smallest mailbox, so make sure t1 is last
|
||||
|
||||
for (i ← 1 to 2499 ) d ! i
|
||||
|
||||
latch2.await(20 seconds)
|
||||
|
||||
t1.dispatcher.resume(t1)
|
||||
|
||||
"throw IllegalArgumentException at construction when no connections" in {
|
||||
try {
|
||||
latch1.await(20 seconds)
|
||||
} finally {
|
||||
// because t1 is much slower and thus has a bigger mailbox all the time
|
||||
t1Count.get must be === 2501
|
||||
t2Count.get must be === 2499
|
||||
for (a ← List(t1, t2, d)) a.stop()
|
||||
Routing.newRoutedActorRef("foo", List(), RouterType.Direct)
|
||||
fail()
|
||||
} catch {
|
||||
case e: IllegalArgumentException =>
|
||||
}
|
||||
}
|
||||
|
||||
"listen" in {
|
||||
val fooLatch = TestLatch(2)
|
||||
val barLatch = TestLatch(2)
|
||||
val barCount = new AtomicInteger(0)
|
||||
"send message to connection" in {
|
||||
val doneLatch = new CountDownLatch(1)
|
||||
|
||||
val broadcast = actorOf(new Actor with Listeners {
|
||||
def receive = listenerManagement orElse {
|
||||
case "foo" ⇒ gossip("bar")
|
||||
}
|
||||
}).start()
|
||||
|
||||
def newListener = actorOf(new Actor {
|
||||
val counter = new AtomicInteger(0)
|
||||
val connection1 = actorOf(new Actor {
|
||||
def receive = {
|
||||
case "bar" ⇒
|
||||
barCount.incrementAndGet
|
||||
barLatch.countDown()
|
||||
case "foo" ⇒
|
||||
fooLatch.countDown()
|
||||
case "end" => doneLatch.countDown()
|
||||
case _ => counter.incrementAndGet
|
||||
}
|
||||
}).start()
|
||||
|
||||
val a1 = newListener
|
||||
val a2 = newListener
|
||||
val a3 = newListener
|
||||
|
||||
broadcast ! Listen(a1)
|
||||
broadcast ! Listen(a2)
|
||||
broadcast ! Listen(a3)
|
||||
val routedActor = Routing.newRoutedActorRef("foo", List(connection1), RouterType.Direct)
|
||||
routedActor ! "hello"
|
||||
routedActor ! "end"
|
||||
|
||||
broadcast ! Deafen(a3)
|
||||
doneLatch.await(5, TimeUnit.SECONDS) must be(true)
|
||||
|
||||
broadcast ! WithListeners(_ ! "foo")
|
||||
broadcast ! "foo"
|
||||
counter.get must be(1)
|
||||
}
|
||||
|
||||
barLatch.await
|
||||
barCount.get must be(2)
|
||||
"deliver a broadcast message" in {
|
||||
val doneLatch = new CountDownLatch(1)
|
||||
|
||||
fooLatch.await
|
||||
val counter1 = new AtomicInteger
|
||||
val connection1 = actorOf(new Actor {
|
||||
def receive = {
|
||||
case "end" => doneLatch.countDown()
|
||||
case msg: Int => counter1.addAndGet(msg)
|
||||
}
|
||||
}).start()
|
||||
|
||||
for (a ← List(broadcast, a1, a2, a3)) a.stop()
|
||||
val actor = Routing.newRoutedActorRef("foo", List(connection1), RouterType.Direct)
|
||||
|
||||
actor ! Broadcast(1)
|
||||
actor ! "end"
|
||||
|
||||
doneLatch.await(5, TimeUnit.SECONDS) must be(true)
|
||||
|
||||
counter1.get must be(1)
|
||||
}
|
||||
}
|
||||
|
||||
"Actor Pool" must {
|
||||
"round robin router" must {
|
||||
|
||||
"have expected capacity" in {
|
||||
val latch = TestLatch(2)
|
||||
val count = new AtomicInteger(0)
|
||||
"be started when constructed" in {
|
||||
val actor1 = Actor.actorOf[TestActor].start
|
||||
|
||||
val pool = actorOf(
|
||||
new Actor with DefaultActorPool with DefaultActorPoolSupervisionConfig with FixedCapacityStrategy with SmallestMailboxSelector {
|
||||
def factory = actorOf(new Actor {
|
||||
def receive = {
|
||||
case _ ⇒
|
||||
count.incrementAndGet
|
||||
latch.countDown()
|
||||
self tryReply "success"
|
||||
}
|
||||
}).start()
|
||||
val actor = Routing.newRoutedActorRef("foo", List(actor1), RouterType.RoundRobin)
|
||||
actor.isRunning must be(true)
|
||||
}
|
||||
|
||||
def limit = 2
|
||||
def selectionCount = 1
|
||||
def partialFill = true
|
||||
def instance = factory
|
||||
def receive = _route
|
||||
"throw IllegalArgumentException at construction when no connections" in {
|
||||
try {
|
||||
Routing.newRoutedActorRef("foo", List(), RouterType.RoundRobin)
|
||||
fail()
|
||||
} catch {
|
||||
case e: IllegalArgumentException =>
|
||||
}
|
||||
}
|
||||
|
||||
//In this test a bunch of actors are created and each actor has its own counter.
|
||||
//to test round robin, the routed actor receives the following sequence of messages 1 2 3 .. 1 2 3 .. 1 2 3 which it
|
||||
//uses to increment his counter.
|
||||
//So after n iteration, the first actor his counter should be 1*n, the second 2*n etc etc.
|
||||
"deliver messages in a round robin fashion" in {
|
||||
val connectionCount = 10
|
||||
val iterationCount = 10
|
||||
val doneLatch = new CountDownLatch(connectionCount)
|
||||
|
||||
//lets create some connections.
|
||||
var connections = new LinkedList[ActorRef]
|
||||
var counters = new LinkedList[AtomicInteger]
|
||||
for (i <- 0 until connectionCount) {
|
||||
counters = counters :+ new AtomicInteger()
|
||||
|
||||
val connection = actorOf(new Actor {
|
||||
def receive = {
|
||||
case "end"=>doneLatch.countDown()
|
||||
case msg: Int => counters.get(i).get.addAndGet(msg)
|
||||
}
|
||||
}).start()
|
||||
connections = connections :+ connection
|
||||
}
|
||||
|
||||
val successes = TestLatch(2)
|
||||
val successCounter = actorOf(new Actor {
|
||||
//create the routed actor.
|
||||
val actor = Routing.newRoutedActorRef("foo", connections, RouterType.RoundRobin)
|
||||
|
||||
//send messages to the actor.
|
||||
for (i <- 0 until iterationCount) {
|
||||
for (k <- 0 until connectionCount) {
|
||||
actor ! (k + 1)
|
||||
}
|
||||
}
|
||||
|
||||
actor ! Broadcast("end")
|
||||
//now wait some and do validations.
|
||||
doneLatch.await(5, TimeUnit.SECONDS) must be(true)
|
||||
|
||||
for (i <- 0 until connectionCount) {
|
||||
val counter = counters.get(i).get
|
||||
counter.get must be((iterationCount * (i + 1)))
|
||||
}
|
||||
}
|
||||
|
||||
"deliver a broadcast message using the !" in {
|
||||
val doneLatch = new CountDownLatch(2)
|
||||
|
||||
val counter1 = new AtomicInteger
|
||||
val connection1 = actorOf(new Actor {
|
||||
def receive = {
|
||||
case "success" ⇒ successes.countDown()
|
||||
case "end" => doneLatch.countDown()
|
||||
case msg: Int => counter1.addAndGet(msg)
|
||||
}
|
||||
}).start()
|
||||
|
||||
implicit val replyTo = successCounter
|
||||
pool ! "a"
|
||||
pool ! "b"
|
||||
val counter2 = new AtomicInteger
|
||||
val connection2 = actorOf(new Actor {
|
||||
def receive = {
|
||||
case "end" => doneLatch.countDown()
|
||||
case msg: Int => counter2.addAndGet(msg)
|
||||
}
|
||||
}).start()
|
||||
|
||||
latch.await
|
||||
successes.await
|
||||
val actor = Routing.newRoutedActorRef("foo", List(connection1, connection2), RouterType.RoundRobin)
|
||||
|
||||
count.get must be(2)
|
||||
actor ! Broadcast(1)
|
||||
actor ! Broadcast("end")
|
||||
|
||||
(pool ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2)
|
||||
doneLatch.await(5, TimeUnit.SECONDS) must be(true)
|
||||
|
||||
pool.stop()
|
||||
counter1.get must be(1)
|
||||
counter2.get must be(1)
|
||||
}
|
||||
|
||||
"pass ticket #705" in {
|
||||
val pool = actorOf(
|
||||
new Actor with DefaultActorPool with DefaultActorPoolSupervisionConfig with BoundedCapacityStrategy with MailboxPressureCapacitor with SmallestMailboxSelector with BasicFilter {
|
||||
def lowerBound = 2
|
||||
def upperBound = 20
|
||||
def rampupRate = 0.1
|
||||
def backoffRate = 0.1
|
||||
def backoffThreshold = 0.5
|
||||
def partialFill = true
|
||||
def selectionCount = 1
|
||||
def instance = factory
|
||||
def receive = _route
|
||||
def pressureThreshold = 1
|
||||
def factory = actorOf(new Actor {
|
||||
def receive = {
|
||||
case req: String ⇒ {
|
||||
sleepFor(10 millis)
|
||||
self.tryReply("Response")
|
||||
}
|
||||
}
|
||||
})
|
||||
}).start()
|
||||
"fail to deliver a broadcast message using the ?" in {
|
||||
val doneLatch = new CountDownLatch(1)
|
||||
|
||||
val counter1 = new AtomicInteger
|
||||
val connection1 = actorOf(new Actor {
|
||||
def receive = {
|
||||
case "end" => doneLatch.countDown()
|
||||
case _ => counter1.incrementAndGet()
|
||||
}
|
||||
}).start()
|
||||
|
||||
|
||||
val actor = Routing.newRoutedActorRef("foo", List(connection1), RouterType.RoundRobin)
|
||||
|
||||
try {
|
||||
(for (count ← 1 to 500) yield pool.?("Test", 20000)) foreach {
|
||||
_.await.resultOrException.get must be("Response")
|
||||
}
|
||||
} finally {
|
||||
pool.stop()
|
||||
}
|
||||
}
|
||||
|
||||
"grow as needed under pressure" in {
|
||||
// make sure the pool starts at the expected lower limit and grows to the upper as needed
|
||||
// as influenced by the backlog of blocking pooled actors
|
||||
|
||||
var latch = TestLatch(3)
|
||||
val count = new AtomicInteger(0)
|
||||
|
||||
val pool = actorOf(
|
||||
new Actor with DefaultActorPool with DefaultActorPoolSupervisionConfig with BoundedCapacityStrategy with ActiveFuturesPressureCapacitor with SmallestMailboxSelector with BasicNoBackoffFilter {
|
||||
def factory = actorOf(new Actor {
|
||||
def receive = {
|
||||
case n: Int ⇒
|
||||
sleepFor(n millis)
|
||||
count.incrementAndGet
|
||||
latch.countDown()
|
||||
}
|
||||
})
|
||||
|
||||
def lowerBound = 2
|
||||
def upperBound = 4
|
||||
def rampupRate = 0.1
|
||||
def partialFill = true
|
||||
def selectionCount = 1
|
||||
def instance = factory
|
||||
def receive = _route
|
||||
}).start()
|
||||
|
||||
// first message should create the minimum number of delgates
|
||||
|
||||
pool ! 1
|
||||
|
||||
(pool ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2)
|
||||
|
||||
var loops = 0
|
||||
def loop(t: Int) = {
|
||||
latch = TestLatch(loops)
|
||||
count.set(0)
|
||||
for (m ← 0 until loops) {
|
||||
pool ? t
|
||||
sleepFor(50 millis)
|
||||
}
|
||||
actor ? Broadcast(1)
|
||||
fail()
|
||||
} catch {
|
||||
case e: RoutingException =>
|
||||
}
|
||||
|
||||
// 2 more should go thru without triggering more
|
||||
|
||||
loops = 2
|
||||
|
||||
loop(500)
|
||||
latch.await
|
||||
count.get must be(loops)
|
||||
|
||||
(pool ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2)
|
||||
|
||||
// a whole bunch should max it out
|
||||
|
||||
loops = 10
|
||||
loop(500)
|
||||
latch.await
|
||||
count.get must be(loops)
|
||||
|
||||
(pool ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(4)
|
||||
|
||||
pool.stop()
|
||||
}
|
||||
|
||||
"grow as needed under mailbox pressure" in {
|
||||
// make sure the pool starts at the expected lower limit and grows to the upper as needed
|
||||
// as influenced by the backlog of messages in the delegate mailboxes
|
||||
|
||||
var latch = TestLatch(3)
|
||||
val count = new AtomicInteger(0)
|
||||
|
||||
val pool = actorOf(
|
||||
new Actor with DefaultActorPool with DefaultActorPoolSupervisionConfig with BoundedCapacityStrategy with MailboxPressureCapacitor with SmallestMailboxSelector with BasicNoBackoffFilter {
|
||||
def factory = actorOf(new Actor {
|
||||
def receive = {
|
||||
case n: Int ⇒
|
||||
sleepFor(n millis)
|
||||
count.incrementAndGet
|
||||
latch.countDown()
|
||||
}
|
||||
})
|
||||
|
||||
def lowerBound = 2
|
||||
def upperBound = 4
|
||||
def pressureThreshold = 3
|
||||
def rampupRate = 0.1
|
||||
def partialFill = true
|
||||
def selectionCount = 1
|
||||
def instance = factory
|
||||
def receive = _route
|
||||
}).start()
|
||||
|
||||
var loops = 0
|
||||
def loop(t: Int) = {
|
||||
latch = TestLatch(loops)
|
||||
count.set(0)
|
||||
for (m ← 0 until loops) {
|
||||
pool ! t
|
||||
}
|
||||
}
|
||||
|
||||
// send a few messages and observe pool at its lower bound
|
||||
loops = 3
|
||||
loop(500)
|
||||
latch.await
|
||||
count.get must be(loops)
|
||||
|
||||
(pool ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2)
|
||||
|
||||
// send a bunch over the theshold and observe an increment
|
||||
loops = 15
|
||||
loop(500)
|
||||
|
||||
latch.await(10 seconds)
|
||||
count.get must be(loops)
|
||||
|
||||
(pool ? ActorPool.Stat).as[ActorPool.Stats].get.size must be >= (3)
|
||||
|
||||
pool.stop()
|
||||
}
|
||||
|
||||
"round robin" in {
|
||||
val latch1 = TestLatch(2)
|
||||
val delegates = new java.util.concurrent.ConcurrentHashMap[String, String]
|
||||
|
||||
val pool1 = actorOf(
|
||||
new Actor with DefaultActorPool with DefaultActorPoolSupervisionConfig with FixedCapacityStrategy with RoundRobinSelector with BasicNoBackoffFilter {
|
||||
def factory = actorOf(new Actor {
|
||||
def receive = {
|
||||
case _ ⇒
|
||||
delegates put (self.uuid.toString, "")
|
||||
latch1.countDown()
|
||||
}
|
||||
})
|
||||
|
||||
def limit = 1
|
||||
def selectionCount = 1
|
||||
def rampupRate = 0.1
|
||||
def partialFill = true
|
||||
def instance = factory
|
||||
def receive = _route
|
||||
}).start()
|
||||
|
||||
pool1 ! "a"
|
||||
pool1 ! "b"
|
||||
|
||||
latch1.await
|
||||
delegates.size must be(1)
|
||||
|
||||
pool1.stop()
|
||||
|
||||
val latch2 = TestLatch(2)
|
||||
delegates.clear()
|
||||
|
||||
val pool2 = actorOf(
|
||||
new Actor with DefaultActorPool with DefaultActorPoolSupervisionConfig with FixedCapacityStrategy with RoundRobinSelector with BasicNoBackoffFilter {
|
||||
def factory = actorOf(new Actor {
|
||||
def receive = {
|
||||
case _ ⇒
|
||||
delegates put (self.uuid.toString, "")
|
||||
latch2.countDown()
|
||||
}
|
||||
})
|
||||
|
||||
def limit = 2
|
||||
def selectionCount = 1
|
||||
def rampupRate = 0.1
|
||||
def partialFill = false
|
||||
def instance = factory
|
||||
def receive = _route
|
||||
}).start()
|
||||
|
||||
pool2 ! "a"
|
||||
pool2 ! "b"
|
||||
|
||||
latch2.await
|
||||
delegates.size must be(2)
|
||||
|
||||
pool2.stop()
|
||||
}
|
||||
|
||||
"backoff" in {
|
||||
val latch = TestLatch(10)
|
||||
|
||||
val pool = actorOf(
|
||||
new Actor with DefaultActorPool with DefaultActorPoolSupervisionConfig with BoundedCapacityStrategy with MailboxPressureCapacitor with SmallestMailboxSelector with Filter with RunningMeanBackoff with BasicRampup {
|
||||
def factory = actorOf(new Actor {
|
||||
def receive = {
|
||||
case n: Int ⇒
|
||||
sleepFor(n millis)
|
||||
latch.countDown()
|
||||
}
|
||||
})
|
||||
|
||||
def lowerBound = 1
|
||||
def upperBound = 5
|
||||
def pressureThreshold = 1
|
||||
def partialFill = true
|
||||
def selectionCount = 1
|
||||
def rampupRate = 0.1
|
||||
def backoffRate = 0.50
|
||||
def backoffThreshold = 0.50
|
||||
def instance = factory
|
||||
def receive = _route
|
||||
}).start()
|
||||
|
||||
// put some pressure on the pool
|
||||
|
||||
for (m ← 0 to 10) pool ! 250
|
||||
|
||||
sleepFor(5 millis)
|
||||
|
||||
val z = (pool ? ActorPool.Stat).as[ActorPool.Stats].get.size
|
||||
|
||||
z must be >= (2)
|
||||
|
||||
// let it cool down
|
||||
|
||||
for (m ← 0 to 3) {
|
||||
pool ! 1
|
||||
sleepFor(500 millis)
|
||||
}
|
||||
|
||||
(pool ? ActorPool.Stat).as[ActorPool.Stats].get.size must be <= (z)
|
||||
|
||||
pool.stop()
|
||||
}
|
||||
|
||||
"support typed actors" in {
|
||||
import RoutingSpec._
|
||||
import TypedActor._
|
||||
def createPool = new Actor with DefaultActorPool with DefaultActorPoolSupervisionConfig with BoundedCapacityStrategy with MailboxPressureCapacitor with SmallestMailboxSelector with Filter with RunningMeanBackoff with BasicRampup {
|
||||
def lowerBound = 1
|
||||
def upperBound = 5
|
||||
def pressureThreshold = 1
|
||||
def partialFill = true
|
||||
def selectionCount = 1
|
||||
def rampupRate = 0.1
|
||||
def backoffRate = 0.50
|
||||
def backoffThreshold = 0.50
|
||||
def instance = getActorRefFor(typedActorOf[Foo, FooImpl]())
|
||||
def receive = _route
|
||||
}
|
||||
|
||||
val pool = createProxy[Foo](createPool)
|
||||
|
||||
val results = for (i ← 1 to 100) yield (i, pool.sq(i, 100))
|
||||
|
||||
for ((i, r) ← results) r.get must equal(i * i)
|
||||
}
|
||||
|
||||
"provide default supervision of pooled actors" in {
|
||||
import akka.config.Supervision._
|
||||
val pingCount = new AtomicInteger(0)
|
||||
val deathCount = new AtomicInteger(0)
|
||||
var keepDying = false
|
||||
|
||||
val pool1 = actorOf(
|
||||
new Actor with DefaultActorPool with DefaultActorPoolSupervisionConfig with BoundedCapacityStrategy with ActiveFuturesPressureCapacitor with SmallestMailboxSelector with BasicFilter {
|
||||
def lowerBound = 2
|
||||
def upperBound = 5
|
||||
def rampupRate = 0.1
|
||||
def backoffRate = 0.1
|
||||
def backoffThreshold = 0.5
|
||||
def partialFill = true
|
||||
def selectionCount = 1
|
||||
def instance = factory
|
||||
def receive = _route
|
||||
def pressureThreshold = 1
|
||||
def factory = actorOf(new Actor {
|
||||
if (deathCount.get > 5) deathCount.set(0)
|
||||
if (deathCount.get > 0) {deathCount.incrementAndGet;throw new IllegalStateException("keep dying")}
|
||||
def receive = {
|
||||
case akka.Die ⇒
|
||||
if (keepDying) deathCount.incrementAndGet
|
||||
throw new RuntimeException
|
||||
case _ => pingCount.incrementAndGet
|
||||
}
|
||||
}).start()
|
||||
}).start()
|
||||
|
||||
val pool2 = actorOf(
|
||||
new Actor with DefaultActorPool with DefaultActorPoolSupervisionConfig with BoundedCapacityStrategy with ActiveFuturesPressureCapacitor with SmallestMailboxSelector with BasicFilter {
|
||||
def lowerBound = 2
|
||||
def upperBound = 5
|
||||
def rampupRate = 0.1
|
||||
def backoffRate = 0.1
|
||||
def backoffThreshold = 0.5
|
||||
def partialFill = true
|
||||
def selectionCount = 1
|
||||
def instance = factory
|
||||
def receive = _route
|
||||
def pressureThreshold = 1
|
||||
def factory = actorOf(new Actor {
|
||||
self.lifeCycle = Permanent
|
||||
if (deathCount.get > 5) deathCount.set(0)
|
||||
if (deathCount.get > 0) {deathCount.incrementAndGet;throw new IllegalStateException("keep dying")}
|
||||
def receive = {
|
||||
case akka.Die ⇒
|
||||
if (keepDying) deathCount.incrementAndGet
|
||||
throw new RuntimeException
|
||||
case _ => pingCount.incrementAndGet
|
||||
}
|
||||
}).start()
|
||||
}).start()
|
||||
|
||||
val pool3 = actorOf(
|
||||
new Actor with DefaultActorPool with DefaultActorPoolSupervisionConfig with BoundedCapacityStrategy with ActiveFuturesPressureCapacitor with RoundRobinSelector with BasicFilter {
|
||||
def lowerBound = 2
|
||||
def upperBound = 5
|
||||
def rampupRate = 0.1
|
||||
def backoffRate = 0.1
|
||||
def backoffThreshold = 0.5
|
||||
def partialFill = true
|
||||
def selectionCount = 1
|
||||
def instance = factory
|
||||
def receive = _route
|
||||
def pressureThreshold = 1
|
||||
def factory = actorOf(new Actor {
|
||||
self.lifeCycle = Temporary
|
||||
if (deathCount.get > 5) deathCount.set(0)
|
||||
if (deathCount.get > 0) {deathCount.incrementAndGet;throw new IllegalStateException("keep dying")}
|
||||
def receive = {
|
||||
case akka.Die ⇒
|
||||
if (keepDying) deathCount.incrementAndGet
|
||||
throw new RuntimeException
|
||||
case _ => pingCount.incrementAndGet
|
||||
}
|
||||
}).start()
|
||||
}).start()
|
||||
|
||||
// default lifecycle
|
||||
// actor comes back right away
|
||||
pingCount.set(0)
|
||||
keepDying = false
|
||||
pool1 ! "ping"
|
||||
(pool1 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2)
|
||||
pool1 ! akka.Die
|
||||
sleepFor(2 seconds)
|
||||
(pool1 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2)
|
||||
pingCount.get must be (1)
|
||||
|
||||
// default lifecycle
|
||||
// actor dies completely
|
||||
pingCount.set(0)
|
||||
keepDying = true
|
||||
pool1 ! "ping"
|
||||
(pool1 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2)
|
||||
pool1 ! akka.Die
|
||||
sleepFor(2 seconds)
|
||||
(pool1 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(1)
|
||||
pool1 ! "ping"
|
||||
(pool1 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2)
|
||||
pingCount.get must be (2)
|
||||
|
||||
// permanent lifecycle
|
||||
// actor comes back right away
|
||||
pingCount.set(0)
|
||||
keepDying = false
|
||||
pool2 ! "ping"
|
||||
(pool2 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2)
|
||||
pool2 ! akka.Die
|
||||
sleepFor(2 seconds)
|
||||
(pool2 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2)
|
||||
pingCount.get must be (1)
|
||||
|
||||
// permanent lifecycle
|
||||
// actor dies completely
|
||||
pingCount.set(0)
|
||||
keepDying = true
|
||||
pool2 ! "ping"
|
||||
(pool2 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2)
|
||||
pool2 ! akka.Die
|
||||
sleepFor(2 seconds)
|
||||
(pool2 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(1)
|
||||
pool2 ! "ping"
|
||||
(pool2 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2)
|
||||
pingCount.get must be (2)
|
||||
|
||||
// temporary lifecycle
|
||||
pingCount.set(0)
|
||||
keepDying = false
|
||||
pool3 ! "ping"
|
||||
(pool3 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2)
|
||||
pool3 ! akka.Die
|
||||
sleepFor(2 seconds)
|
||||
(pool3 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(1)
|
||||
pool3 ! "ping"
|
||||
pool3 ! "ping"
|
||||
pool3 ! "ping"
|
||||
(pool3 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2)
|
||||
pingCount.get must be (4)
|
||||
}
|
||||
|
||||
"support customizable supervision config of pooled actors" in {
|
||||
import akka.config.Supervision._
|
||||
val pingCount = new AtomicInteger(0)
|
||||
val deathCount = new AtomicInteger(0)
|
||||
var keepDying = false
|
||||
|
||||
trait LimitedTrapSupervisionConfig extends ActorPoolSupervisionConfig {
|
||||
def poolFaultHandler = OneForOneStrategy(List(classOf[IllegalStateException]), 5, 1000)
|
||||
}
|
||||
|
||||
object BadState
|
||||
|
||||
val pool1 = actorOf(
|
||||
new Actor with DefaultActorPool with LimitedTrapSupervisionConfig with BoundedCapacityStrategy with ActiveFuturesPressureCapacitor with SmallestMailboxSelector with BasicFilter {
|
||||
def lowerBound = 2
|
||||
def upperBound = 5
|
||||
def rampupRate = 0.1
|
||||
def backoffRate = 0.1
|
||||
def backoffThreshold = 0.5
|
||||
def partialFill = true
|
||||
def selectionCount = 1
|
||||
def instance = factory
|
||||
def receive = _route
|
||||
def pressureThreshold = 1
|
||||
def factory = actorOf(new Actor {
|
||||
if (deathCount.get > 5) deathCount.set(0)
|
||||
if (deathCount.get > 0) {deathCount.incrementAndGet;throw new IllegalStateException("keep dying")}
|
||||
def receive = {
|
||||
case BadState ⇒
|
||||
if (keepDying) deathCount.incrementAndGet
|
||||
throw new IllegalStateException
|
||||
case akka.Die =>
|
||||
throw new RuntimeException
|
||||
case _ => pingCount.incrementAndGet
|
||||
}
|
||||
}).start()
|
||||
}).start()
|
||||
|
||||
|
||||
// actor comes back right away
|
||||
pingCount.set(0)
|
||||
keepDying = false
|
||||
pool1 ! "ping"
|
||||
(pool1 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2)
|
||||
pool1 ! BadState
|
||||
sleepFor(2 seconds)
|
||||
(pool1 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2)
|
||||
pingCount.get must be (1)
|
||||
|
||||
// actor dies completely
|
||||
pingCount.set(0)
|
||||
keepDying = true
|
||||
pool1 ! "ping"
|
||||
(pool1 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2)
|
||||
pool1 ! BadState
|
||||
sleepFor(2 seconds)
|
||||
(pool1 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(1)
|
||||
pool1 ! "ping"
|
||||
(pool1 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2)
|
||||
pingCount.get must be (2)
|
||||
|
||||
// kill it
|
||||
intercept[RuntimeException](pool1.?(akka.Die).get)
|
||||
actor ! "end"
|
||||
doneLatch.await(5, TimeUnit.SECONDS) must be(true)
|
||||
counter1.get must be(0)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
"random router" must {
|
||||
|
||||
"be started when constructed" in {
|
||||
|
||||
val actor1 = Actor.actorOf[TestActor].start
|
||||
|
||||
val actor = Routing.newRoutedActorRef("foo", List(actor1), RouterType.Random)
|
||||
actor.isRunning must be(true)
|
||||
}
|
||||
|
||||
"throw IllegalArgumentException at construction when no connections" in {
|
||||
try {
|
||||
Routing.newRoutedActorRef("foo", List(), RouterType.Random)
|
||||
fail()
|
||||
} catch {
|
||||
case e: IllegalArgumentException =>
|
||||
}
|
||||
}
|
||||
|
||||
"deliver messages in a random fashion" in{
|
||||
|
||||
}
|
||||
|
||||
"deliver a broadcast message" in {
|
||||
val doneLatch = new CountDownLatch(2)
|
||||
|
||||
val counter1 = new AtomicInteger
|
||||
val connection1 = actorOf(new Actor {
|
||||
def receive = {
|
||||
case "end" => doneLatch.countDown()
|
||||
case msg: Int => counter1.addAndGet(msg)
|
||||
}
|
||||
}).start()
|
||||
|
||||
val counter2 = new AtomicInteger
|
||||
val connection2 = actorOf(new Actor {
|
||||
def receive = {
|
||||
case "end" => doneLatch.countDown()
|
||||
case msg: Int => counter2.addAndGet(msg)
|
||||
}
|
||||
}).start()
|
||||
|
||||
val actor = Routing.newRoutedActorRef("foo", List(connection1, connection2), RouterType.Random)
|
||||
|
||||
actor ! Broadcast(1)
|
||||
actor ! Broadcast("end")
|
||||
|
||||
doneLatch.await(5, TimeUnit.SECONDS) must be(true)
|
||||
|
||||
counter1.get must be(1)
|
||||
counter2.get must be(1)
|
||||
}
|
||||
|
||||
"fail to deliver a broadcast message using the ?" in {
|
||||
val doneLatch = new CountDownLatch(1)
|
||||
|
||||
val counter1 = new AtomicInteger
|
||||
val connection1 = actorOf(new Actor {
|
||||
def receive = {
|
||||
case "end" => doneLatch.countDown()
|
||||
case _ => counter1.incrementAndGet()
|
||||
}
|
||||
}).start()
|
||||
|
||||
val actor = Routing.newRoutedActorRef("foo", List(connection1), RouterType.Random)
|
||||
|
||||
try {
|
||||
actor ? Broadcast(1)
|
||||
fail()
|
||||
} catch {
|
||||
case e: RoutingException =>
|
||||
}
|
||||
|
||||
actor ! "end"
|
||||
doneLatch.await(5, TimeUnit.SECONDS) must be(true)
|
||||
counter1.get must be(0)
|
||||
}
|
||||
}
|
||||
|
||||
"least cpu router" must {
|
||||
"throw IllegalArgumentException when constructed" in {
|
||||
val actor1 = Actor.actorOf[TestActor].start
|
||||
|
||||
try {
|
||||
Routing.newRoutedActorRef("foo", List(actor1), RouterType.LeastCPU)
|
||||
} catch {
|
||||
case e: IllegalArgumentException =>
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
"least ram router" must {
|
||||
"throw IllegalArgumentException when constructed" in {
|
||||
val actor1 = Actor.actorOf[TestActor].start
|
||||
|
||||
try {
|
||||
Routing.newRoutedActorRef("foo", List(actor1), RouterType.LeastRAM)
|
||||
} catch {
|
||||
case e: IllegalArgumentException =>
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
"smallest mailbox" must {
|
||||
"throw IllegalArgumentException when constructed" in {
|
||||
val actor1 = Actor.actorOf[TestActor].start
|
||||
|
||||
try {
|
||||
Routing.newRoutedActorRef("foo", List(actor1), RouterType.LeastMessages)
|
||||
} catch {
|
||||
case e: IllegalArgumentException =>
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
Loading…
Add table
Add a link
Reference in a new issue