2010-10-26 12:49:25 +02:00
|
|
|
package akka.actor.routing
|
2010-08-24 23:21:28 +02:00
|
|
|
|
2011-04-19 17:45:01 +12:00
|
|
|
import org.scalatest.WordSpec
|
|
|
|
|
import org.scalatest.matchers.MustMatchers
|
|
|
|
|
|
2011-06-17 22:19:17 +02:00
|
|
|
import akka.testkit._
|
|
|
|
|
import akka.testkit.Testing.sleepFor
|
2011-04-19 17:45:01 +12:00
|
|
|
import akka.util.duration._
|
|
|
|
|
|
2011-06-13 14:59:22 +02:00
|
|
|
import akka.actor._
|
2010-10-26 12:49:25 +02:00
|
|
|
import akka.actor.Actor._
|
2011-04-19 17:45:01 +12:00
|
|
|
import akka.routing._
|
2011-06-26 21:33:27 +02:00
|
|
|
import akka.event.EventHandler
|
2010-08-24 23:21:28 +02:00
|
|
|
|
|
|
|
|
import java.util.concurrent.atomic.AtomicInteger
|
2011-05-23 17:08:45 +02:00
|
|
|
import akka.dispatch.{ KeptPromise, Future }
|
|
|
|
|
|
|
|
|
|
object RoutingSpec {
|
|
|
|
|
trait Foo {
|
|
|
|
|
def sq(x: Int, sleep: Long): Future[Int]
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
class FooImpl extends Foo {
|
|
|
|
|
def sq(x: Int, sleep: Long): Future[Int] = {
|
|
|
|
|
if (sleep > 0) Thread.sleep(sleep)
|
|
|
|
|
new KeptPromise(Right(x * x))
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
2010-08-24 23:21:28 +02:00
|
|
|
|
2011-04-19 17:45:01 +12:00
|
|
|
class RoutingSpec extends WordSpec with MustMatchers {
|
|
|
|
|
import Routing._
|
2010-08-24 23:21:28 +02:00
|
|
|
|
2011-04-19 17:45:01 +12:00
|
|
|
"Routing" must {
|
2010-08-24 23:21:28 +02:00
|
|
|
|
2011-04-19 17:45:01 +12:00
|
|
|
"dispatch" in {
|
|
|
|
|
val Test1 = "test1"
|
|
|
|
|
val Test2 = "test2"
|
|
|
|
|
val Test3 = "test3"
|
2010-08-24 23:21:28 +02:00
|
|
|
|
2011-04-19 17:45:01 +12:00
|
|
|
val t1 = actorOf(new Actor {
|
|
|
|
|
def receive = {
|
2011-05-18 17:25:30 +02:00
|
|
|
case Test1 ⇒ self.reply(3)
|
|
|
|
|
case Test2 ⇒ self.reply(7)
|
2011-04-19 17:45:01 +12:00
|
|
|
}
|
|
|
|
|
}).start()
|
2010-08-24 23:21:28 +02:00
|
|
|
|
2011-05-18 17:25:30 +02:00
|
|
|
val t2 = actorOf(new Actor() {
|
2011-04-19 17:45:01 +12:00
|
|
|
def receive = {
|
2011-05-18 17:25:30 +02:00
|
|
|
case Test3 ⇒ self.reply(11)
|
2011-04-19 17:45:01 +12:00
|
|
|
}
|
|
|
|
|
}).start()
|
2010-08-24 23:21:28 +02:00
|
|
|
|
2011-05-20 12:29:48 +02:00
|
|
|
val d = routerActor {
|
2011-05-18 17:25:30 +02:00
|
|
|
case Test1 | Test2 ⇒ t1
|
|
|
|
|
case Test3 ⇒ t2
|
2011-04-19 17:45:01 +12:00
|
|
|
}.start()
|
2010-08-24 23:21:28 +02:00
|
|
|
|
2011-06-28 15:20:31 -06:00
|
|
|
implicit val timeout = Timeout((5 seconds).dilated)
|
2011-04-19 17:45:01 +12:00
|
|
|
val result = for {
|
2011-06-14 00:19:54 +02:00
|
|
|
a ← (d ? (Test1)).as[Int]
|
|
|
|
|
b ← (d ? (Test2)).as[Int]
|
|
|
|
|
c ← (d ? (Test3)).as[Int]
|
2011-04-19 17:45:01 +12:00
|
|
|
} yield a + b + c
|
2010-08-24 23:21:28 +02:00
|
|
|
|
2011-05-18 17:25:30 +02:00
|
|
|
result.isDefined must be(true)
|
|
|
|
|
result.get must be(21)
|
2010-08-24 23:21:28 +02:00
|
|
|
|
2011-05-18 17:25:30 +02:00
|
|
|
for (a ← List(t1, t2, d)) a.stop()
|
2011-04-19 17:45:01 +12:00
|
|
|
}
|
2010-08-24 23:21:28 +02:00
|
|
|
|
2011-04-19 17:45:01 +12:00
|
|
|
"have messages logged" in {
|
|
|
|
|
val msgs = new java.util.concurrent.ConcurrentSkipListSet[Any]
|
|
|
|
|
val latch = TestLatch(2)
|
2010-08-24 23:21:28 +02:00
|
|
|
|
2011-04-19 17:45:01 +12:00
|
|
|
val actor = actorOf(new Actor {
|
2011-05-18 17:25:30 +02:00
|
|
|
def receive = { case _ ⇒ }
|
2011-04-19 17:45:01 +12:00
|
|
|
}).start()
|
2010-08-24 23:21:28 +02:00
|
|
|
|
2011-05-18 17:25:30 +02:00
|
|
|
val logger = loggerActor(actor, x ⇒ { msgs.add(x); latch.countDown() }).start()
|
2010-08-24 23:21:28 +02:00
|
|
|
|
2011-04-19 17:45:01 +12:00
|
|
|
val foo: Any = "foo"
|
|
|
|
|
val bar: Any = "bar"
|
2010-08-24 23:21:28 +02:00
|
|
|
|
2011-04-19 17:45:01 +12:00
|
|
|
logger ! foo
|
|
|
|
|
logger ! bar
|
2010-08-24 23:21:28 +02:00
|
|
|
|
2011-04-19 17:45:01 +12:00
|
|
|
latch.await
|
2010-08-24 23:21:28 +02:00
|
|
|
|
2011-05-18 17:25:30 +02:00
|
|
|
msgs must (have size (2) and contain(foo) and contain(bar))
|
2010-08-24 23:21:28 +02:00
|
|
|
|
2011-04-19 17:45:01 +12:00
|
|
|
actor.stop()
|
|
|
|
|
logger.stop()
|
|
|
|
|
}
|
2010-08-24 23:21:28 +02:00
|
|
|
|
2011-04-19 17:45:01 +12:00
|
|
|
"dispatch to smallest mailbox" in {
|
|
|
|
|
val t1Count = new AtomicInteger(0)
|
|
|
|
|
val t2Count = new AtomicInteger(0)
|
2011-07-13 19:18:04 +02:00
|
|
|
val latch1 = TestLatch(2501)
|
|
|
|
|
val latch2 = TestLatch(2499)
|
2010-08-24 23:21:28 +02:00
|
|
|
|
2011-04-19 17:45:01 +12:00
|
|
|
val t1 = actorOf(new Actor {
|
|
|
|
|
def receive = {
|
2011-05-18 17:25:30 +02:00
|
|
|
case x ⇒
|
2011-04-19 17:45:01 +12:00
|
|
|
t1Count.incrementAndGet
|
2011-07-13 19:18:04 +02:00
|
|
|
latch1.countDown()
|
2011-04-19 17:45:01 +12:00
|
|
|
}
|
|
|
|
|
}).start()
|
2011-02-01 14:56:34 -08:00
|
|
|
|
2011-07-13 19:18:04 +02:00
|
|
|
t1.dispatcher.suspend(t1)
|
|
|
|
|
|
2011-07-26 18:33:59 +12:00
|
|
|
for (i ← 1 to 2501) t1 ! i
|
2011-07-13 19:18:04 +02:00
|
|
|
|
2011-04-19 17:45:01 +12:00
|
|
|
val t2 = actorOf(new Actor {
|
2011-03-16 12:37:48 +01:00
|
|
|
def receive = {
|
2011-05-18 17:25:30 +02:00
|
|
|
case x ⇒
|
2011-04-19 17:45:01 +12:00
|
|
|
t2Count.incrementAndGet
|
2011-07-13 19:18:04 +02:00
|
|
|
latch2.countDown()
|
2011-02-28 22:55:02 +01:00
|
|
|
}
|
2011-04-19 17:45:01 +12:00
|
|
|
}).start()
|
|
|
|
|
|
2011-07-13 19:18:04 +02:00
|
|
|
val d = loadBalancerActor(new SmallestMailboxFirstIterator(t1 :: t2 :: Nil)) //Will pick the last with the smallest mailbox, so make sure t1 is last
|
|
|
|
|
|
2011-07-26 18:33:59 +12:00
|
|
|
for (i ← 1 to 2499) d ! i
|
2011-04-19 17:45:01 +12:00
|
|
|
|
2011-07-13 19:18:04 +02:00
|
|
|
latch2.await(20 seconds)
|
|
|
|
|
|
|
|
|
|
t1.dispatcher.resume(t1)
|
2011-03-16 12:37:48 +01:00
|
|
|
|
2011-06-02 22:54:38 +02:00
|
|
|
try {
|
2011-07-13 19:18:04 +02:00
|
|
|
latch1.await(20 seconds)
|
2011-06-02 22:54:38 +02:00
|
|
|
} finally {
|
|
|
|
|
// because t1 is much slower and thus has a bigger mailbox all the time
|
2011-07-13 19:18:04 +02:00
|
|
|
t1Count.get must be === 2501
|
|
|
|
|
t2Count.get must be === 2499
|
|
|
|
|
for (a ← List(t1, t2, d)) a.stop()
|
2011-06-02 22:54:38 +02:00
|
|
|
}
|
2011-03-16 12:37:48 +01:00
|
|
|
}
|
|
|
|
|
|
2011-04-19 17:45:01 +12:00
|
|
|
"listen" in {
|
|
|
|
|
val fooLatch = TestLatch(2)
|
|
|
|
|
val barLatch = TestLatch(2)
|
|
|
|
|
val barCount = new AtomicInteger(0)
|
2011-03-16 12:37:48 +01:00
|
|
|
|
2011-04-19 17:45:01 +12:00
|
|
|
val broadcast = actorOf(new Actor with Listeners {
|
|
|
|
|
def receive = listenerManagement orElse {
|
2011-05-18 17:25:30 +02:00
|
|
|
case "foo" ⇒ gossip("bar")
|
2011-04-19 17:45:01 +12:00
|
|
|
}
|
|
|
|
|
}).start()
|
2011-03-16 12:37:48 +01:00
|
|
|
|
2011-04-19 17:45:01 +12:00
|
|
|
def newListener = actorOf(new Actor {
|
|
|
|
|
def receive = {
|
2011-05-18 17:25:30 +02:00
|
|
|
case "bar" ⇒
|
2011-04-19 17:45:01 +12:00
|
|
|
barCount.incrementAndGet
|
|
|
|
|
barLatch.countDown()
|
2011-05-18 17:25:30 +02:00
|
|
|
case "foo" ⇒
|
2011-04-19 17:45:01 +12:00
|
|
|
fooLatch.countDown()
|
|
|
|
|
}
|
|
|
|
|
}).start()
|
2011-03-16 13:20:00 +01:00
|
|
|
|
2011-04-19 17:45:01 +12:00
|
|
|
val a1 = newListener
|
|
|
|
|
val a2 = newListener
|
|
|
|
|
val a3 = newListener
|
|
|
|
|
|
|
|
|
|
broadcast ! Listen(a1)
|
|
|
|
|
broadcast ! Listen(a2)
|
|
|
|
|
broadcast ! Listen(a3)
|
|
|
|
|
|
|
|
|
|
broadcast ! Deafen(a3)
|
|
|
|
|
|
|
|
|
|
broadcast ! WithListeners(_ ! "foo")
|
|
|
|
|
broadcast ! "foo"
|
|
|
|
|
|
|
|
|
|
barLatch.await
|
2011-05-18 17:25:30 +02:00
|
|
|
barCount.get must be(2)
|
2011-04-19 17:45:01 +12:00
|
|
|
|
|
|
|
|
fooLatch.await
|
|
|
|
|
|
2011-05-18 17:25:30 +02:00
|
|
|
for (a ← List(broadcast, a1, a2, a3)) a.stop()
|
2011-03-16 13:20:00 +01:00
|
|
|
}
|
|
|
|
|
}
|
2011-04-19 17:45:01 +12:00
|
|
|
|
|
|
|
|
"Actor Pool" must {
|
|
|
|
|
|
|
|
|
|
"have expected capacity" in {
|
|
|
|
|
val latch = TestLatch(2)
|
|
|
|
|
val count = new AtomicInteger(0)
|
|
|
|
|
|
|
|
|
|
val pool = actorOf(
|
2011-07-15 14:25:09 -07:00
|
|
|
new Actor with DefaultActorPool with DefaultActorPoolSupervisionConfig with FixedCapacityStrategy with SmallestMailboxSelector {
|
2011-04-19 17:45:01 +12:00
|
|
|
def factory = actorOf(new Actor {
|
|
|
|
|
def receive = {
|
2011-05-18 17:25:30 +02:00
|
|
|
case _ ⇒
|
2011-04-19 17:45:01 +12:00
|
|
|
count.incrementAndGet
|
|
|
|
|
latch.countDown()
|
2011-07-17 09:02:36 +03:00
|
|
|
self tryReply "success"
|
2011-04-19 17:45:01 +12:00
|
|
|
}
|
|
|
|
|
}).start()
|
|
|
|
|
|
|
|
|
|
def limit = 2
|
|
|
|
|
def selectionCount = 1
|
|
|
|
|
def partialFill = true
|
|
|
|
|
def instance = factory
|
|
|
|
|
def receive = _route
|
|
|
|
|
}).start()
|
|
|
|
|
|
|
|
|
|
val successes = TestLatch(2)
|
2011-06-13 22:36:46 +02:00
|
|
|
val successCounter = actorOf(new Actor {
|
2011-03-16 13:20:00 +01:00
|
|
|
def receive = {
|
2011-05-18 17:25:30 +02:00
|
|
|
case "success" ⇒ successes.countDown()
|
2011-02-28 22:55:02 +01:00
|
|
|
}
|
2011-06-13 22:36:46 +02:00
|
|
|
}).start()
|
2011-04-19 17:45:01 +12:00
|
|
|
|
|
|
|
|
implicit val replyTo = successCounter
|
|
|
|
|
pool ! "a"
|
|
|
|
|
pool ! "b"
|
|
|
|
|
|
|
|
|
|
latch.await
|
|
|
|
|
successes.await
|
|
|
|
|
|
2011-05-18 17:25:30 +02:00
|
|
|
count.get must be(2)
|
2011-04-19 17:45:01 +12:00
|
|
|
|
2011-06-13 15:29:35 +02:00
|
|
|
(pool ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2)
|
2011-04-19 17:45:01 +12:00
|
|
|
|
|
|
|
|
pool.stop()
|
2011-03-16 13:20:00 +01:00
|
|
|
}
|
|
|
|
|
|
2011-04-19 17:45:01 +12:00
|
|
|
"pass ticket #705" in {
|
|
|
|
|
val pool = actorOf(
|
2011-07-15 14:25:09 -07:00
|
|
|
new Actor with DefaultActorPool with DefaultActorPoolSupervisionConfig with BoundedCapacityStrategy with MailboxPressureCapacitor with SmallestMailboxSelector with BasicFilter {
|
2011-04-19 17:45:01 +12:00
|
|
|
def lowerBound = 2
|
|
|
|
|
def upperBound = 20
|
|
|
|
|
def rampupRate = 0.1
|
|
|
|
|
def backoffRate = 0.1
|
|
|
|
|
def backoffThreshold = 0.5
|
|
|
|
|
def partialFill = true
|
|
|
|
|
def selectionCount = 1
|
|
|
|
|
def instance = factory
|
|
|
|
|
def receive = _route
|
|
|
|
|
def pressureThreshold = 1
|
|
|
|
|
def factory = actorOf(new Actor {
|
|
|
|
|
def receive = {
|
2011-05-18 17:25:30 +02:00
|
|
|
case req: String ⇒ {
|
2011-04-19 17:45:01 +12:00
|
|
|
sleepFor(10 millis)
|
2011-07-17 09:02:36 +03:00
|
|
|
self.tryReply("Response")
|
2011-04-19 17:45:01 +12:00
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
})
|
|
|
|
|
}).start()
|
|
|
|
|
|
|
|
|
|
try {
|
2011-06-14 14:26:13 +02:00
|
|
|
(for (count ← 1 to 500) yield pool.?("Test", 20000)) foreach {
|
2011-05-18 17:25:30 +02:00
|
|
|
_.await.resultOrException.get must be("Response")
|
2011-04-19 17:45:01 +12:00
|
|
|
}
|
|
|
|
|
} finally {
|
|
|
|
|
pool.stop()
|
2011-03-16 13:20:00 +01:00
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2011-04-19 17:45:01 +12:00
|
|
|
"grow as needed under pressure" in {
|
|
|
|
|
// make sure the pool starts at the expected lower limit and grows to the upper as needed
|
|
|
|
|
// as influenced by the backlog of blocking pooled actors
|
|
|
|
|
|
|
|
|
|
var latch = TestLatch(3)
|
|
|
|
|
val count = new AtomicInteger(0)
|
|
|
|
|
|
|
|
|
|
val pool = actorOf(
|
2011-07-15 14:25:09 -07:00
|
|
|
new Actor with DefaultActorPool with DefaultActorPoolSupervisionConfig with BoundedCapacityStrategy with ActiveFuturesPressureCapacitor with SmallestMailboxSelector with BasicNoBackoffFilter {
|
2011-04-19 17:45:01 +12:00
|
|
|
def factory = actorOf(new Actor {
|
|
|
|
|
def receive = {
|
2011-05-18 17:25:30 +02:00
|
|
|
case n: Int ⇒
|
2011-04-19 17:45:01 +12:00
|
|
|
sleepFor(n millis)
|
|
|
|
|
count.incrementAndGet
|
|
|
|
|
latch.countDown()
|
|
|
|
|
}
|
|
|
|
|
})
|
|
|
|
|
|
|
|
|
|
def lowerBound = 2
|
|
|
|
|
def upperBound = 4
|
|
|
|
|
def rampupRate = 0.1
|
|
|
|
|
def partialFill = true
|
|
|
|
|
def selectionCount = 1
|
|
|
|
|
def instance = factory
|
|
|
|
|
def receive = _route
|
|
|
|
|
}).start()
|
|
|
|
|
|
|
|
|
|
// first message should create the minimum number of delgates
|
2011-03-16 13:20:00 +01:00
|
|
|
|
2011-04-19 17:45:01 +12:00
|
|
|
pool ! 1
|
|
|
|
|
|
2011-06-13 15:29:35 +02:00
|
|
|
(pool ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2)
|
2011-04-19 17:45:01 +12:00
|
|
|
|
|
|
|
|
var loops = 0
|
|
|
|
|
def loop(t: Int) = {
|
|
|
|
|
latch = TestLatch(loops)
|
|
|
|
|
count.set(0)
|
2011-05-18 17:25:30 +02:00
|
|
|
for (m ← 0 until loops) {
|
2011-06-13 13:43:21 +02:00
|
|
|
pool ? t
|
2011-04-19 17:45:01 +12:00
|
|
|
sleepFor(50 millis)
|
2011-02-28 22:55:02 +01:00
|
|
|
}
|
2011-04-19 17:45:01 +12:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// 2 more should go thru without triggering more
|
|
|
|
|
|
|
|
|
|
loops = 2
|
|
|
|
|
|
|
|
|
|
loop(500)
|
|
|
|
|
latch.await
|
2011-05-18 17:25:30 +02:00
|
|
|
count.get must be(loops)
|
2011-04-19 17:45:01 +12:00
|
|
|
|
2011-06-13 15:29:35 +02:00
|
|
|
(pool ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2)
|
2011-04-19 17:45:01 +12:00
|
|
|
|
|
|
|
|
// a whole bunch should max it out
|
|
|
|
|
|
|
|
|
|
loops = 10
|
|
|
|
|
loop(500)
|
|
|
|
|
latch.await
|
2011-05-18 17:25:30 +02:00
|
|
|
count.get must be(loops)
|
2011-04-19 17:45:01 +12:00
|
|
|
|
2011-06-13 15:29:35 +02:00
|
|
|
(pool ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(4)
|
2011-04-19 17:45:01 +12:00
|
|
|
|
|
|
|
|
pool.stop()
|
2011-03-16 13:20:00 +01:00
|
|
|
}
|
|
|
|
|
|
2011-04-19 17:45:01 +12:00
|
|
|
"grow as needed under mailbox pressure" in {
|
|
|
|
|
// make sure the pool starts at the expected lower limit and grows to the upper as needed
|
|
|
|
|
// as influenced by the backlog of messages in the delegate mailboxes
|
|
|
|
|
|
|
|
|
|
var latch = TestLatch(3)
|
|
|
|
|
val count = new AtomicInteger(0)
|
|
|
|
|
|
|
|
|
|
val pool = actorOf(
|
2011-07-15 14:25:09 -07:00
|
|
|
new Actor with DefaultActorPool with DefaultActorPoolSupervisionConfig with BoundedCapacityStrategy with MailboxPressureCapacitor with SmallestMailboxSelector with BasicNoBackoffFilter {
|
2011-04-19 17:45:01 +12:00
|
|
|
def factory = actorOf(new Actor {
|
|
|
|
|
def receive = {
|
2011-05-18 17:25:30 +02:00
|
|
|
case n: Int ⇒
|
2011-04-19 17:45:01 +12:00
|
|
|
sleepFor(n millis)
|
|
|
|
|
count.incrementAndGet
|
|
|
|
|
latch.countDown()
|
|
|
|
|
}
|
|
|
|
|
})
|
|
|
|
|
|
|
|
|
|
def lowerBound = 2
|
|
|
|
|
def upperBound = 4
|
|
|
|
|
def pressureThreshold = 3
|
|
|
|
|
def rampupRate = 0.1
|
|
|
|
|
def partialFill = true
|
|
|
|
|
def selectionCount = 1
|
|
|
|
|
def instance = factory
|
|
|
|
|
def receive = _route
|
|
|
|
|
}).start()
|
2011-03-16 13:20:00 +01:00
|
|
|
|
|
|
|
|
var loops = 0
|
2011-04-19 17:45:01 +12:00
|
|
|
def loop(t: Int) = {
|
|
|
|
|
latch = TestLatch(loops)
|
|
|
|
|
count.set(0)
|
2011-05-18 17:25:30 +02:00
|
|
|
for (m ← 0 until loops) {
|
2011-04-19 17:45:01 +12:00
|
|
|
pool ! t
|
2011-02-28 22:55:02 +01:00
|
|
|
}
|
2011-03-16 13:20:00 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// send a few messages and observe pool at its lower bound
|
|
|
|
|
loops = 3
|
|
|
|
|
loop(500)
|
2011-04-19 17:45:01 +12:00
|
|
|
latch.await
|
2011-05-18 17:25:30 +02:00
|
|
|
count.get must be(loops)
|
2011-04-19 17:45:01 +12:00
|
|
|
|
2011-06-13 15:29:35 +02:00
|
|
|
(pool ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2)
|
2011-03-16 13:20:00 +01:00
|
|
|
|
|
|
|
|
// send a bunch over the theshold and observe an increment
|
|
|
|
|
loops = 15
|
|
|
|
|
loop(500)
|
|
|
|
|
|
2011-04-19 17:45:01 +12:00
|
|
|
latch.await(10 seconds)
|
2011-05-18 17:25:30 +02:00
|
|
|
count.get must be(loops)
|
2011-03-16 13:20:00 +01:00
|
|
|
|
2011-06-13 15:29:35 +02:00
|
|
|
(pool ? ActorPool.Stat).as[ActorPool.Stats].get.size must be >= (3)
|
2011-03-16 13:20:00 +01:00
|
|
|
|
2011-04-19 17:45:01 +12:00
|
|
|
pool.stop()
|
2011-03-16 13:20:00 +01:00
|
|
|
}
|
|
|
|
|
|
2011-04-19 17:45:01 +12:00
|
|
|
"round robin" in {
|
|
|
|
|
val latch1 = TestLatch(2)
|
|
|
|
|
val delegates = new java.util.concurrent.ConcurrentHashMap[String, String]
|
|
|
|
|
|
|
|
|
|
val pool1 = actorOf(
|
2011-07-15 14:25:09 -07:00
|
|
|
new Actor with DefaultActorPool with DefaultActorPoolSupervisionConfig with FixedCapacityStrategy with RoundRobinSelector with BasicNoBackoffFilter {
|
2011-04-19 17:45:01 +12:00
|
|
|
def factory = actorOf(new Actor {
|
|
|
|
|
def receive = {
|
2011-05-18 17:25:30 +02:00
|
|
|
case _ ⇒
|
|
|
|
|
delegates put (self.uuid.toString, "")
|
2011-04-19 17:45:01 +12:00
|
|
|
latch1.countDown()
|
|
|
|
|
}
|
|
|
|
|
})
|
|
|
|
|
|
|
|
|
|
def limit = 1
|
2011-04-20 16:12:51 +12:00
|
|
|
def selectionCount = 1
|
2011-04-19 17:45:01 +12:00
|
|
|
def rampupRate = 0.1
|
|
|
|
|
def partialFill = true
|
|
|
|
|
def instance = factory
|
|
|
|
|
def receive = _route
|
|
|
|
|
}).start()
|
|
|
|
|
|
|
|
|
|
pool1 ! "a"
|
|
|
|
|
pool1 ! "b"
|
|
|
|
|
|
|
|
|
|
latch1.await
|
2011-05-18 17:25:30 +02:00
|
|
|
delegates.size must be(1)
|
2011-04-19 17:45:01 +12:00
|
|
|
|
|
|
|
|
pool1.stop()
|
|
|
|
|
|
|
|
|
|
val latch2 = TestLatch(2)
|
|
|
|
|
delegates.clear()
|
|
|
|
|
|
|
|
|
|
val pool2 = actorOf(
|
2011-07-15 14:25:09 -07:00
|
|
|
new Actor with DefaultActorPool with DefaultActorPoolSupervisionConfig with FixedCapacityStrategy with RoundRobinSelector with BasicNoBackoffFilter {
|
2011-04-19 17:45:01 +12:00
|
|
|
def factory = actorOf(new Actor {
|
|
|
|
|
def receive = {
|
2011-05-18 17:25:30 +02:00
|
|
|
case _ ⇒
|
|
|
|
|
delegates put (self.uuid.toString, "")
|
2011-04-19 17:45:01 +12:00
|
|
|
latch2.countDown()
|
|
|
|
|
}
|
|
|
|
|
})
|
|
|
|
|
|
|
|
|
|
def limit = 2
|
2011-04-20 16:12:51 +12:00
|
|
|
def selectionCount = 1
|
2011-04-19 17:45:01 +12:00
|
|
|
def rampupRate = 0.1
|
|
|
|
|
def partialFill = false
|
|
|
|
|
def instance = factory
|
|
|
|
|
def receive = _route
|
|
|
|
|
}).start()
|
|
|
|
|
|
|
|
|
|
pool2 ! "a"
|
|
|
|
|
pool2 ! "b"
|
|
|
|
|
|
|
|
|
|
latch2.await
|
2011-05-18 17:25:30 +02:00
|
|
|
delegates.size must be(2)
|
2011-04-19 17:45:01 +12:00
|
|
|
|
|
|
|
|
pool2.stop()
|
2011-03-16 13:20:00 +01:00
|
|
|
}
|
|
|
|
|
|
2011-04-19 17:45:01 +12:00
|
|
|
"backoff" in {
|
|
|
|
|
val latch = TestLatch(10)
|
|
|
|
|
|
|
|
|
|
val pool = actorOf(
|
2011-07-15 14:25:09 -07:00
|
|
|
new Actor with DefaultActorPool with DefaultActorPoolSupervisionConfig with BoundedCapacityStrategy with MailboxPressureCapacitor with SmallestMailboxSelector with Filter with RunningMeanBackoff with BasicRampup {
|
2011-04-19 17:45:01 +12:00
|
|
|
def factory = actorOf(new Actor {
|
|
|
|
|
def receive = {
|
2011-05-18 17:25:30 +02:00
|
|
|
case n: Int ⇒
|
2011-04-19 17:45:01 +12:00
|
|
|
sleepFor(n millis)
|
|
|
|
|
latch.countDown()
|
|
|
|
|
}
|
|
|
|
|
})
|
|
|
|
|
|
|
|
|
|
def lowerBound = 1
|
|
|
|
|
def upperBound = 5
|
|
|
|
|
def pressureThreshold = 1
|
|
|
|
|
def partialFill = true
|
|
|
|
|
def selectionCount = 1
|
|
|
|
|
def rampupRate = 0.1
|
|
|
|
|
def backoffRate = 0.50
|
|
|
|
|
def backoffThreshold = 0.50
|
|
|
|
|
def instance = factory
|
|
|
|
|
def receive = _route
|
|
|
|
|
}).start()
|
|
|
|
|
|
|
|
|
|
// put some pressure on the pool
|
|
|
|
|
|
2011-05-18 17:25:30 +02:00
|
|
|
for (m ← 0 to 10) pool ! 250
|
2011-04-19 17:45:01 +12:00
|
|
|
|
|
|
|
|
sleepFor(5 millis)
|
|
|
|
|
|
2011-06-13 15:29:35 +02:00
|
|
|
val z = (pool ? ActorPool.Stat).as[ActorPool.Stats].get.size
|
2011-04-19 17:45:01 +12:00
|
|
|
|
|
|
|
|
z must be >= (2)
|
|
|
|
|
|
|
|
|
|
// let it cool down
|
|
|
|
|
|
2011-05-18 17:25:30 +02:00
|
|
|
for (m ← 0 to 3) {
|
2011-04-19 17:45:01 +12:00
|
|
|
pool ! 1
|
|
|
|
|
sleepFor(500 millis)
|
|
|
|
|
}
|
2011-03-16 13:20:00 +01:00
|
|
|
|
2011-06-13 15:29:35 +02:00
|
|
|
(pool ? ActorPool.Stat).as[ActorPool.Stats].get.size must be <= (z)
|
2011-03-16 13:20:00 +01:00
|
|
|
|
2011-04-19 17:45:01 +12:00
|
|
|
pool.stop()
|
2011-03-16 13:20:00 +01:00
|
|
|
}
|
2011-05-23 17:08:45 +02:00
|
|
|
|
|
|
|
|
"support typed actors" in {
|
|
|
|
|
import RoutingSpec._
|
|
|
|
|
import TypedActor._
|
2011-07-15 14:25:09 -07:00
|
|
|
def createPool = new Actor with DefaultActorPool with DefaultActorPoolSupervisionConfig with BoundedCapacityStrategy with MailboxPressureCapacitor with SmallestMailboxSelector with Filter with RunningMeanBackoff with BasicRampup {
|
2011-05-23 17:08:45 +02:00
|
|
|
def lowerBound = 1
|
|
|
|
|
def upperBound = 5
|
|
|
|
|
def pressureThreshold = 1
|
|
|
|
|
def partialFill = true
|
|
|
|
|
def selectionCount = 1
|
|
|
|
|
def rampupRate = 0.1
|
|
|
|
|
def backoffRate = 0.50
|
|
|
|
|
def backoffThreshold = 0.50
|
|
|
|
|
def instance = getActorRefFor(typedActorOf[Foo, FooImpl]())
|
|
|
|
|
def receive = _route
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
val pool = createProxy[Foo](createPool)
|
|
|
|
|
|
|
|
|
|
val results = for (i ← 1 to 100) yield (i, pool.sq(i, 100))
|
|
|
|
|
|
|
|
|
|
for ((i, r) ← results) r.get must equal(i * i)
|
|
|
|
|
}
|
2011-07-15 14:25:09 -07:00
|
|
|
|
|
|
|
|
"provide default supervision of pooled actors" in {
|
|
|
|
|
import akka.config.Supervision._
|
|
|
|
|
val pingCount = new AtomicInteger(0)
|
|
|
|
|
val deathCount = new AtomicInteger(0)
|
|
|
|
|
var keepDying = false
|
|
|
|
|
|
|
|
|
|
val pool1 = actorOf(
|
|
|
|
|
new Actor with DefaultActorPool with DefaultActorPoolSupervisionConfig with BoundedCapacityStrategy with ActiveFuturesPressureCapacitor with SmallestMailboxSelector with BasicFilter {
|
|
|
|
|
def lowerBound = 2
|
|
|
|
|
def upperBound = 5
|
|
|
|
|
def rampupRate = 0.1
|
|
|
|
|
def backoffRate = 0.1
|
|
|
|
|
def backoffThreshold = 0.5
|
|
|
|
|
def partialFill = true
|
|
|
|
|
def selectionCount = 1
|
|
|
|
|
def instance = factory
|
|
|
|
|
def receive = _route
|
|
|
|
|
def pressureThreshold = 1
|
|
|
|
|
def factory = actorOf(new Actor {
|
|
|
|
|
if (deathCount.get > 5) deathCount.set(0)
|
2011-07-26 18:33:59 +12:00
|
|
|
if (deathCount.get > 0) { deathCount.incrementAndGet; throw new IllegalStateException("keep dying") }
|
2011-07-15 14:25:09 -07:00
|
|
|
def receive = {
|
2011-07-26 18:33:59 +12:00
|
|
|
case akka.Die ⇒
|
2011-07-15 14:25:09 -07:00
|
|
|
if (keepDying) deathCount.incrementAndGet
|
|
|
|
|
throw new RuntimeException
|
2011-07-26 18:33:59 +12:00
|
|
|
case _ ⇒ pingCount.incrementAndGet
|
2011-07-15 14:25:09 -07:00
|
|
|
}
|
|
|
|
|
}).start()
|
|
|
|
|
}).start()
|
|
|
|
|
|
|
|
|
|
val pool2 = actorOf(
|
|
|
|
|
new Actor with DefaultActorPool with DefaultActorPoolSupervisionConfig with BoundedCapacityStrategy with ActiveFuturesPressureCapacitor with SmallestMailboxSelector with BasicFilter {
|
|
|
|
|
def lowerBound = 2
|
|
|
|
|
def upperBound = 5
|
|
|
|
|
def rampupRate = 0.1
|
|
|
|
|
def backoffRate = 0.1
|
|
|
|
|
def backoffThreshold = 0.5
|
|
|
|
|
def partialFill = true
|
|
|
|
|
def selectionCount = 1
|
|
|
|
|
def instance = factory
|
|
|
|
|
def receive = _route
|
|
|
|
|
def pressureThreshold = 1
|
|
|
|
|
def factory = actorOf(new Actor {
|
|
|
|
|
self.lifeCycle = Permanent
|
|
|
|
|
if (deathCount.get > 5) deathCount.set(0)
|
2011-07-26 18:33:59 +12:00
|
|
|
if (deathCount.get > 0) { deathCount.incrementAndGet; throw new IllegalStateException("keep dying") }
|
2011-07-15 14:25:09 -07:00
|
|
|
def receive = {
|
2011-07-26 18:33:59 +12:00
|
|
|
case akka.Die ⇒
|
2011-07-15 14:25:09 -07:00
|
|
|
if (keepDying) deathCount.incrementAndGet
|
|
|
|
|
throw new RuntimeException
|
2011-07-26 18:33:59 +12:00
|
|
|
case _ ⇒ pingCount.incrementAndGet
|
2011-07-15 14:25:09 -07:00
|
|
|
}
|
|
|
|
|
}).start()
|
|
|
|
|
}).start()
|
|
|
|
|
|
|
|
|
|
val pool3 = actorOf(
|
|
|
|
|
new Actor with DefaultActorPool with DefaultActorPoolSupervisionConfig with BoundedCapacityStrategy with ActiveFuturesPressureCapacitor with RoundRobinSelector with BasicFilter {
|
|
|
|
|
def lowerBound = 2
|
|
|
|
|
def upperBound = 5
|
|
|
|
|
def rampupRate = 0.1
|
|
|
|
|
def backoffRate = 0.1
|
|
|
|
|
def backoffThreshold = 0.5
|
|
|
|
|
def partialFill = true
|
|
|
|
|
def selectionCount = 1
|
|
|
|
|
def instance = factory
|
|
|
|
|
def receive = _route
|
|
|
|
|
def pressureThreshold = 1
|
|
|
|
|
def factory = actorOf(new Actor {
|
|
|
|
|
self.lifeCycle = Temporary
|
|
|
|
|
if (deathCount.get > 5) deathCount.set(0)
|
2011-07-26 18:33:59 +12:00
|
|
|
if (deathCount.get > 0) { deathCount.incrementAndGet; throw new IllegalStateException("keep dying") }
|
2011-07-15 14:25:09 -07:00
|
|
|
def receive = {
|
2011-07-26 18:33:59 +12:00
|
|
|
case akka.Die ⇒
|
2011-07-15 14:25:09 -07:00
|
|
|
if (keepDying) deathCount.incrementAndGet
|
|
|
|
|
throw new RuntimeException
|
2011-07-26 18:33:59 +12:00
|
|
|
case _ ⇒ pingCount.incrementAndGet
|
2011-07-15 14:25:09 -07:00
|
|
|
}
|
|
|
|
|
}).start()
|
|
|
|
|
}).start()
|
|
|
|
|
|
|
|
|
|
// default lifecycle
|
|
|
|
|
// actor comes back right away
|
|
|
|
|
pingCount.set(0)
|
|
|
|
|
keepDying = false
|
|
|
|
|
pool1 ! "ping"
|
|
|
|
|
(pool1 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2)
|
|
|
|
|
pool1 ! akka.Die
|
|
|
|
|
sleepFor(2 seconds)
|
|
|
|
|
(pool1 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2)
|
2011-07-26 18:33:59 +12:00
|
|
|
pingCount.get must be(1)
|
2011-07-15 14:25:09 -07:00
|
|
|
|
|
|
|
|
// default lifecycle
|
|
|
|
|
// actor dies completely
|
|
|
|
|
pingCount.set(0)
|
2011-07-26 18:33:59 +12:00
|
|
|
keepDying = true
|
2011-07-15 14:25:09 -07:00
|
|
|
pool1 ! "ping"
|
|
|
|
|
(pool1 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2)
|
|
|
|
|
pool1 ! akka.Die
|
|
|
|
|
sleepFor(2 seconds)
|
|
|
|
|
(pool1 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(1)
|
|
|
|
|
pool1 ! "ping"
|
|
|
|
|
(pool1 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2)
|
2011-07-26 18:33:59 +12:00
|
|
|
pingCount.get must be(2)
|
2011-07-15 14:25:09 -07:00
|
|
|
|
|
|
|
|
// permanent lifecycle
|
|
|
|
|
// actor comes back right away
|
|
|
|
|
pingCount.set(0)
|
2011-07-26 18:33:59 +12:00
|
|
|
keepDying = false
|
2011-07-15 14:25:09 -07:00
|
|
|
pool2 ! "ping"
|
|
|
|
|
(pool2 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2)
|
|
|
|
|
pool2 ! akka.Die
|
|
|
|
|
sleepFor(2 seconds)
|
|
|
|
|
(pool2 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2)
|
2011-07-26 18:33:59 +12:00
|
|
|
pingCount.get must be(1)
|
2011-07-15 14:25:09 -07:00
|
|
|
|
|
|
|
|
// permanent lifecycle
|
|
|
|
|
// actor dies completely
|
|
|
|
|
pingCount.set(0)
|
2011-07-26 18:33:59 +12:00
|
|
|
keepDying = true
|
2011-07-15 14:25:09 -07:00
|
|
|
pool2 ! "ping"
|
|
|
|
|
(pool2 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2)
|
|
|
|
|
pool2 ! akka.Die
|
|
|
|
|
sleepFor(2 seconds)
|
|
|
|
|
(pool2 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(1)
|
|
|
|
|
pool2 ! "ping"
|
2011-07-26 18:33:59 +12:00
|
|
|
(pool2 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2)
|
|
|
|
|
pingCount.get must be(2)
|
2011-07-15 14:25:09 -07:00
|
|
|
|
|
|
|
|
// temporary lifecycle
|
|
|
|
|
pingCount.set(0)
|
2011-07-26 18:33:59 +12:00
|
|
|
keepDying = false
|
2011-07-15 14:25:09 -07:00
|
|
|
pool3 ! "ping"
|
|
|
|
|
(pool3 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2)
|
|
|
|
|
pool3 ! akka.Die
|
|
|
|
|
sleepFor(2 seconds)
|
|
|
|
|
(pool3 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(1)
|
|
|
|
|
pool3 ! "ping"
|
|
|
|
|
pool3 ! "ping"
|
|
|
|
|
pool3 ! "ping"
|
|
|
|
|
(pool3 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2)
|
2011-07-26 18:33:59 +12:00
|
|
|
pingCount.get must be(4)
|
2011-07-15 14:25:09 -07:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
"support customizable supervision config of pooled actors" in {
|
|
|
|
|
import akka.config.Supervision._
|
|
|
|
|
val pingCount = new AtomicInteger(0)
|
|
|
|
|
val deathCount = new AtomicInteger(0)
|
|
|
|
|
var keepDying = false
|
|
|
|
|
|
|
|
|
|
trait LimitedTrapSupervisionConfig extends ActorPoolSupervisionConfig {
|
|
|
|
|
def poolFaultHandler = OneForOneStrategy(List(classOf[IllegalStateException]), 5, 1000)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
object BadState
|
|
|
|
|
|
|
|
|
|
val pool1 = actorOf(
|
|
|
|
|
new Actor with DefaultActorPool with LimitedTrapSupervisionConfig with BoundedCapacityStrategy with ActiveFuturesPressureCapacitor with SmallestMailboxSelector with BasicFilter {
|
|
|
|
|
def lowerBound = 2
|
|
|
|
|
def upperBound = 5
|
|
|
|
|
def rampupRate = 0.1
|
|
|
|
|
def backoffRate = 0.1
|
|
|
|
|
def backoffThreshold = 0.5
|
|
|
|
|
def partialFill = true
|
|
|
|
|
def selectionCount = 1
|
|
|
|
|
def instance = factory
|
|
|
|
|
def receive = _route
|
|
|
|
|
def pressureThreshold = 1
|
|
|
|
|
def factory = actorOf(new Actor {
|
|
|
|
|
if (deathCount.get > 5) deathCount.set(0)
|
2011-07-26 18:33:59 +12:00
|
|
|
if (deathCount.get > 0) { deathCount.incrementAndGet; throw new IllegalStateException("keep dying") }
|
2011-07-15 14:25:09 -07:00
|
|
|
def receive = {
|
2011-07-26 18:33:59 +12:00
|
|
|
case BadState ⇒
|
2011-07-15 14:25:09 -07:00
|
|
|
if (keepDying) deathCount.incrementAndGet
|
|
|
|
|
throw new IllegalStateException
|
2011-07-26 18:33:59 +12:00
|
|
|
case akka.Die ⇒
|
2011-07-15 14:25:09 -07:00
|
|
|
throw new RuntimeException
|
2011-07-26 18:33:59 +12:00
|
|
|
case _ ⇒ pingCount.incrementAndGet
|
2011-07-15 14:25:09 -07:00
|
|
|
}
|
|
|
|
|
}).start()
|
|
|
|
|
}).start()
|
|
|
|
|
|
|
|
|
|
// actor comes back right away
|
|
|
|
|
pingCount.set(0)
|
|
|
|
|
keepDying = false
|
|
|
|
|
pool1 ! "ping"
|
|
|
|
|
(pool1 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2)
|
|
|
|
|
pool1 ! BadState
|
|
|
|
|
sleepFor(2 seconds)
|
|
|
|
|
(pool1 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2)
|
2011-07-26 18:33:59 +12:00
|
|
|
pingCount.get must be(1)
|
2011-07-15 14:25:09 -07:00
|
|
|
|
|
|
|
|
// actor dies completely
|
|
|
|
|
pingCount.set(0)
|
2011-07-26 18:33:59 +12:00
|
|
|
keepDying = true
|
2011-07-15 14:25:09 -07:00
|
|
|
pool1 ! "ping"
|
|
|
|
|
(pool1 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2)
|
|
|
|
|
pool1 ! BadState
|
|
|
|
|
sleepFor(2 seconds)
|
|
|
|
|
(pool1 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(1)
|
|
|
|
|
pool1 ! "ping"
|
|
|
|
|
(pool1 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2)
|
2011-07-26 18:33:59 +12:00
|
|
|
pingCount.get must be(2)
|
2011-07-15 14:25:09 -07:00
|
|
|
|
|
|
|
|
// kill it
|
|
|
|
|
intercept[RuntimeException](pool1.?(akka.Die).get)
|
|
|
|
|
}
|
2011-03-16 13:20:00 +01:00
|
|
|
}
|
2010-08-24 23:21:28 +02:00
|
|
|
}
|
2011-04-19 17:45:01 +12:00
|
|
|
|