ticket #889 after merge

This commit is contained in:
Peter Veentjer 2011-07-28 16:56:35 +03:00
commit 4b4f38c973
14 changed files with 1263 additions and 1045 deletions

View file

@ -12,32 +12,65 @@ import akka.actor.TypedActor._
import akka.japi.{ Option JOption }
import akka.util.Duration
import akka.dispatch.{ Dispatchers, Future, KeptPromise }
import akka.routing.CyclicIterator
import java.io.{ ObjectInputStream, ObjectOutputStream, ByteArrayOutputStream }
import akka.actor.TypedActorSpec.Foo
import java.util.concurrent.atomic.AtomicReference
import annotation.tailrec
object TypedActorSpec {
class CyclicIterator[T](val items: Seq[T]) extends Iterator[T] {
private[this] val current: AtomicReference[Seq[T]] = new AtomicReference(items)
def hasNext = items != Nil
def next: T = {
@tailrec
def findNext: T = {
val currentItems = current.get
val newItems = currentItems match {
case Nil items
case xs xs
}
if (current.compareAndSet(currentItems, newItems.tail)) newItems.head
else findNext
}
findNext
}
override def exists(f: T Boolean): Boolean = items exists f
}
trait Foo {
def pigdog(): String
def self = TypedActor.self[Foo]
def futurePigdog(): Future[String]
def futurePigdog(delay: Long): Future[String]
def futurePigdog(delay: Long, numbered: Int): Future[String]
def futureComposePigdogFrom(foo: Foo): Future[String]
def failingFuturePigdog(): Future[String] = throw new IllegalStateException("expected")
def failingOptionPigdog(): Option[String] = throw new IllegalStateException("expected")
def failingJOptionPigdog(): JOption[String] = throw new IllegalStateException("expected")
def failingPigdog(): Unit = throw new IllegalStateException("expected")
def optionPigdog(): Option[String]
def optionPigdog(delay: Long): Option[String]
def joptionPigdog(delay: Long): JOption[String]
def incr()
def read(): Int
def testMethodCallSerialization(foo: Foo, s: String, i: Int): Unit = throw new IllegalStateException("expected")
@ -48,6 +81,7 @@ object TypedActorSpec {
def pigdog = "Pigdog"
def futurePigdog(): Future[String] = new KeptPromise(Right(pigdog))
def futurePigdog(delay: Long): Future[String] = {
Thread.sleep(delay)
futurePigdog
@ -92,6 +126,7 @@ object TypedActorSpec {
trait Stacked extends Stackable1 with Stackable2 {
def stacked: String = stackable1 + stackable2
def notOverriddenStacked: String = stackable1 + stackable2
}

View file

@ -0,0 +1,566 @@
package akka.routing
import org.scalatest.WordSpec
import org.scalatest.matchers.MustMatchers
import akka.dispatch.{ KeptPromise, Future }
import java.util.concurrent.atomic.AtomicInteger
import akka.actor.Actor._
import akka.testkit.Testing._
import akka.actor.{ TypedActor, Actor }
import akka.testkit.TestLatch
import akka.util.duration._
object ActorPoolSpec {
trait Foo {
def sq(x: Int, sleep: Long): Future[Int]
}
class FooImpl extends Foo {
def sq(x: Int, sleep: Long): Future[Int] = {
if (sleep > 0) Thread.sleep(sleep)
new KeptPromise(Right(x * x))
}
}
}
class ActorPoolSpec extends WordSpec with MustMatchers {
import akka.routing.ActorPoolSpec._
"Actor Pool" must {
"have expected capacity" in {
val latch = TestLatch(2)
val count = new AtomicInteger(0)
val pool = actorOf(
new Actor with DefaultActorPool with DefaultActorPoolSupervisionConfig with FixedCapacityStrategy with SmallestMailboxSelector {
def factory = actorOf(new Actor {
def receive = {
case _
count.incrementAndGet
latch.countDown()
self tryReply "success"
}
}).start()
def limit = 2
def selectionCount = 1
def partialFill = true
def instance = factory
def receive = _route
}).start()
val successes = TestLatch(2)
val successCounter = actorOf(new Actor {
def receive = {
case "success" successes.countDown()
}
}).start()
implicit val replyTo = successCounter
pool ! "a"
pool ! "b"
latch.await
successes.await
count.get must be(2)
(pool ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2)
pool.stop()
}
"pass ticket #705" in {
val pool = actorOf(
new Actor with DefaultActorPool with DefaultActorPoolSupervisionConfig with BoundedCapacityStrategy with MailboxPressureCapacitor with SmallestMailboxSelector with BasicFilter {
def lowerBound = 2
def upperBound = 20
def rampupRate = 0.1
def backoffRate = 0.1
def backoffThreshold = 0.5
def partialFill = true
def selectionCount = 1
def instance = factory
def receive = _route
def pressureThreshold = 1
def factory = actorOf(new Actor {
def receive = {
case req: String {
sleepFor(10 millis)
self.tryReply("Response")
}
}
})
}).start()
try {
(for (count 1 to 500) yield pool.?("Test", 20000)) foreach {
_.await.resultOrException.get must be("Response")
}
} finally {
pool.stop()
}
}
"grow as needed under pressure" in {
// make sure the pool starts at the expected lower limit and grows to the upper as needed
// as influenced by the backlog of blocking pooled actors
var latch = TestLatch(3)
val count = new AtomicInteger(0)
val pool = actorOf(
new Actor with DefaultActorPool with DefaultActorPoolSupervisionConfig with BoundedCapacityStrategy with ActiveFuturesPressureCapacitor with SmallestMailboxSelector with BasicNoBackoffFilter {
def factory = actorOf(new Actor {
def receive = {
case n: Int
sleepFor(n millis)
count.incrementAndGet
latch.countDown()
}
})
def lowerBound = 2
def upperBound = 4
def rampupRate = 0.1
def partialFill = true
def selectionCount = 1
def instance = factory
def receive = _route
}).start()
// first message should create the minimum number of delgates
pool ! 1
(pool ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2)
var loops = 0
def loop(t: Int) = {
latch = TestLatch(loops)
count.set(0)
for (m 0 until loops) {
pool ? t
sleepFor(50 millis)
}
}
// 2 more should go thru without triggering more
loops = 2
loop(500)
latch.await
count.get must be(loops)
(pool ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2)
// a whole bunch should max it out
loops = 10
loop(500)
latch.await
count.get must be(loops)
(pool ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(4)
pool.stop()
}
"grow as needed under mailbox pressure" in {
// make sure the pool starts at the expected lower limit and grows to the upper as needed
// as influenced by the backlog of messages in the delegate mailboxes
var latch = TestLatch(3)
val count = new AtomicInteger(0)
val pool = actorOf(
new Actor with DefaultActorPool with DefaultActorPoolSupervisionConfig with BoundedCapacityStrategy with MailboxPressureCapacitor with SmallestMailboxSelector with BasicNoBackoffFilter {
def factory = actorOf(new Actor {
def receive = {
case n: Int
sleepFor(n millis)
count.incrementAndGet
latch.countDown()
}
})
def lowerBound = 2
def upperBound = 4
def pressureThreshold = 3
def rampupRate = 0.1
def partialFill = true
def selectionCount = 1
def instance = factory
def receive = _route
}).start()
var loops = 0
def loop(t: Int) = {
latch = TestLatch(loops)
count.set(0)
for (m 0 until loops) {
pool ! t
}
}
// send a few messages and observe pool at its lower bound
loops = 3
loop(500)
latch.await
count.get must be(loops)
(pool ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2)
// send a bunch over the theshold and observe an increment
loops = 15
loop(500)
latch.await(10 seconds)
count.get must be(loops)
(pool ? ActorPool.Stat).as[ActorPool.Stats].get.size must be >= (3)
pool.stop()
}
"round robin" in {
val latch1 = TestLatch(2)
val delegates = new java.util.concurrent.ConcurrentHashMap[String, String]
val pool1 = actorOf(
new Actor with DefaultActorPool with DefaultActorPoolSupervisionConfig with FixedCapacityStrategy with RoundRobinSelector with BasicNoBackoffFilter {
def factory = actorOf(new Actor {
def receive = {
case _
delegates put (self.uuid.toString, "")
latch1.countDown()
}
})
def limit = 1
def selectionCount = 1
def rampupRate = 0.1
def partialFill = true
def instance = factory
def receive = _route
}).start()
pool1 ! "a"
pool1 ! "b"
latch1.await
delegates.size must be(1)
pool1.stop()
val latch2 = TestLatch(2)
delegates.clear()
val pool2 = actorOf(
new Actor with DefaultActorPool with DefaultActorPoolSupervisionConfig with FixedCapacityStrategy with RoundRobinSelector with BasicNoBackoffFilter {
def factory = actorOf(new Actor {
def receive = {
case _
delegates put (self.uuid.toString, "")
latch2.countDown()
}
})
def limit = 2
def selectionCount = 1
def rampupRate = 0.1
def partialFill = false
def instance = factory
def receive = _route
}).start()
pool2 ! "a"
pool2 ! "b"
latch2.await
delegates.size must be(2)
pool2.stop()
}
"backoff" in {
val latch = TestLatch(10)
val pool = actorOf(
new Actor with DefaultActorPool with DefaultActorPoolSupervisionConfig with BoundedCapacityStrategy with MailboxPressureCapacitor with SmallestMailboxSelector with Filter with RunningMeanBackoff with BasicRampup {
def factory = actorOf(new Actor {
def receive = {
case n: Int
sleepFor(n millis)
latch.countDown()
}
})
def lowerBound = 1
def upperBound = 5
def pressureThreshold = 1
def partialFill = true
def selectionCount = 1
def rampupRate = 0.1
def backoffRate = 0.50
def backoffThreshold = 0.50
def instance = factory
def receive = _route
}).start()
// put some pressure on the pool
for (m 0 to 10) pool ! 250
sleepFor(5 millis)
val z = (pool ? ActorPool.Stat).as[ActorPool.Stats].get.size
z must be >= (2)
// let it cool down
for (m 0 to 3) {
pool ! 1
sleepFor(500 millis)
}
(pool ? ActorPool.Stat).as[ActorPool.Stats].get.size must be <= (z)
pool.stop()
}
"support typed actors" in {
import RoutingSpec._
import TypedActor._
def createPool = new Actor with DefaultActorPool with DefaultActorPoolSupervisionConfig with BoundedCapacityStrategy with MailboxPressureCapacitor with SmallestMailboxSelector with Filter with RunningMeanBackoff with BasicRampup {
def lowerBound = 1
def upperBound = 5
def pressureThreshold = 1
def partialFill = true
def selectionCount = 1
def rampupRate = 0.1
def backoffRate = 0.50
def backoffThreshold = 0.50
def instance = getActorRefFor(typedActorOf[Foo, FooImpl]())
def receive = _route
}
val pool = createProxy[Foo](createPool)
val results = for (i 1 to 100) yield (i, pool.sq(i, 100))
for ((i, r) results) r.get must equal(i * i)
}
"provide default supervision of pooled actors" in {
import akka.config.Supervision._
val pingCount = new AtomicInteger(0)
val deathCount = new AtomicInteger(0)
var keepDying = false
val pool1 = actorOf(
new Actor with DefaultActorPool with DefaultActorPoolSupervisionConfig with BoundedCapacityStrategy with ActiveFuturesPressureCapacitor with SmallestMailboxSelector with BasicFilter {
def lowerBound = 2
def upperBound = 5
def rampupRate = 0.1
def backoffRate = 0.1
def backoffThreshold = 0.5
def partialFill = true
def selectionCount = 1
def instance = factory
def receive = _route
def pressureThreshold = 1
def factory = actorOf(new Actor {
if (deathCount.get > 5) deathCount.set(0)
if (deathCount.get > 0) { deathCount.incrementAndGet; throw new IllegalStateException("keep dying") }
def receive = {
case akka.Die
if (keepDying) deathCount.incrementAndGet
throw new RuntimeException
case _ pingCount.incrementAndGet
}
}).start()
}).start()
val pool2 = actorOf(
new Actor with DefaultActorPool with DefaultActorPoolSupervisionConfig with BoundedCapacityStrategy with ActiveFuturesPressureCapacitor with SmallestMailboxSelector with BasicFilter {
def lowerBound = 2
def upperBound = 5
def rampupRate = 0.1
def backoffRate = 0.1
def backoffThreshold = 0.5
def partialFill = true
def selectionCount = 1
def instance = factory
def receive = _route
def pressureThreshold = 1
def factory = actorOf(new Actor {
self.lifeCycle = Permanent
if (deathCount.get > 5) deathCount.set(0)
if (deathCount.get > 0) { deathCount.incrementAndGet; throw new IllegalStateException("keep dying") }
def receive = {
case akka.Die
if (keepDying) deathCount.incrementAndGet
throw new RuntimeException
case _ pingCount.incrementAndGet
}
}).start()
}).start()
val pool3 = actorOf(
new Actor with DefaultActorPool with DefaultActorPoolSupervisionConfig with BoundedCapacityStrategy with ActiveFuturesPressureCapacitor with RoundRobinSelector with BasicFilter {
def lowerBound = 2
def upperBound = 5
def rampupRate = 0.1
def backoffRate = 0.1
def backoffThreshold = 0.5
def partialFill = true
def selectionCount = 1
def instance = factory
def receive = _route
def pressureThreshold = 1
def factory = actorOf(new Actor {
self.lifeCycle = Temporary
if (deathCount.get > 5) deathCount.set(0)
if (deathCount.get > 0) { deathCount.incrementAndGet; throw new IllegalStateException("keep dying") }
def receive = {
case akka.Die
if (keepDying) deathCount.incrementAndGet
throw new RuntimeException
case _ pingCount.incrementAndGet
}
}).start()
}).start()
// default lifecycle
// actor comes back right away
pingCount.set(0)
keepDying = false
pool1 ! "ping"
(pool1 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2)
pool1 ! akka.Die
sleepFor(2 seconds)
(pool1 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2)
pingCount.get must be(1)
// default lifecycle
// actor dies completely
pingCount.set(0)
keepDying = true
pool1 ! "ping"
(pool1 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2)
pool1 ! akka.Die
sleepFor(2 seconds)
(pool1 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(1)
pool1 ! "ping"
(pool1 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2)
pingCount.get must be(2)
// permanent lifecycle
// actor comes back right away
pingCount.set(0)
keepDying = false
pool2 ! "ping"
(pool2 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2)
pool2 ! akka.Die
sleepFor(2 seconds)
(pool2 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2)
pingCount.get must be(1)
// permanent lifecycle
// actor dies completely
pingCount.set(0)
keepDying = true
pool2 ! "ping"
(pool2 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2)
pool2 ! akka.Die
sleepFor(2 seconds)
(pool2 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(1)
pool2 ! "ping"
(pool2 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2)
pingCount.get must be(2)
// temporary lifecycle
pingCount.set(0)
keepDying = false
pool3 ! "ping"
(pool3 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2)
pool3 ! akka.Die
sleepFor(2 seconds)
(pool3 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(1)
pool3 ! "ping"
pool3 ! "ping"
pool3 ! "ping"
(pool3 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2)
pingCount.get must be(4)
}
"support customizable supervision config of pooled actors" in {
import akka.config.Supervision._
val pingCount = new AtomicInteger(0)
val deathCount = new AtomicInteger(0)
var keepDying = false
trait LimitedTrapSupervisionConfig extends ActorPoolSupervisionConfig {
def poolFaultHandler = OneForOneStrategy(List(classOf[IllegalStateException]), 5, 1000)
}
object BadState
val pool1 = actorOf(
new Actor with DefaultActorPool with LimitedTrapSupervisionConfig with BoundedCapacityStrategy with ActiveFuturesPressureCapacitor with SmallestMailboxSelector with BasicFilter {
def lowerBound = 2
def upperBound = 5
def rampupRate = 0.1
def backoffRate = 0.1
def backoffThreshold = 0.5
def partialFill = true
def selectionCount = 1
def instance = factory
def receive = _route
def pressureThreshold = 1
def factory = actorOf(new Actor {
if (deathCount.get > 5) deathCount.set(0)
if (deathCount.get > 0) { deathCount.incrementAndGet; throw new IllegalStateException("keep dying") }
def receive = {
case BadState
if (keepDying) deathCount.incrementAndGet
throw new IllegalStateException
case akka.Die
throw new RuntimeException
case _ pingCount.incrementAndGet
}
}).start()
}).start()
// actor comes back right away
pingCount.set(0)
keepDying = false
pool1 ! "ping"
(pool1 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2)
pool1 ! BadState
sleepFor(2 seconds)
(pool1 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2)
pingCount.get must be(1)
// actor dies completely
pingCount.set(0)
keepDying = true
pool1 ! "ping"
(pool1 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2)
pool1 ! BadState
sleepFor(2 seconds)
(pool1 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(1)
pool1 ! "ping"
(pool1 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2)
pingCount.get must be(2)
// kill it
intercept[RuntimeException](pool1.?(akka.Die).get)
}
}
}

View file

@ -0,0 +1,58 @@
package akka.actor.routing
import org.scalatest.WordSpec
import org.scalatest.matchers.MustMatchers
import akka.testkit._
import akka.actor._
import akka.actor.Actor._
import akka.routing._
import java.util.concurrent.atomic.AtomicInteger
class ListenerSpec extends WordSpec with MustMatchers {
"Listener" must {
"listen" in {
val fooLatch = TestLatch(2)
val barLatch = TestLatch(2)
val barCount = new AtomicInteger(0)
val broadcast = actorOf(new Actor with Listeners {
def receive = listenerManagement orElse {
case "foo" gossip("bar")
}
}).start()
def newListener = actorOf(new Actor {
def receive = {
case "bar"
barCount.incrementAndGet
barLatch.countDown()
case "foo"
fooLatch.countDown()
}
}).start()
val a1 = newListener
val a2 = newListener
val a3 = newListener
broadcast ! Listen(a1)
broadcast ! Listen(a2)
broadcast ! Listen(a3)
broadcast ! Deafen(a3)
broadcast ! WithListeners(_ ! "foo")
broadcast ! "foo"
barLatch.await
barCount.get must be(2)
fooLatch.await
for (a List(broadcast, a1, a2, a3)) a.stop()
}
}
}

View file

@ -1,720 +1,319 @@
package akka.actor.routing
package akka.routing
import org.scalatest.WordSpec
import org.scalatest.matchers.MustMatchers
import akka.testkit._
import akka.testkit.Testing.sleepFor
import akka.util.duration._
import akka.actor._
import akka.actor.Actor._
import akka.routing._
import akka.event.EventHandler
import akka.routing.Router
import java.util.concurrent.atomic.AtomicInteger
import akka.dispatch.{ KeptPromise, Future }
import akka.actor.Actor._
import akka.actor.{ ActorRef, Actor }
import collection.mutable.LinkedList
import akka.routing.Routing.Broadcast
import java.util.concurrent.{ CountDownLatch, TimeUnit }
object RoutingSpec {
trait Foo {
def sq(x: Int, sleep: Long): Future[Int]
}
class FooImpl extends Foo {
def sq(x: Int, sleep: Long): Future[Int] = {
if (sleep > 0) Thread.sleep(sleep)
new KeptPromise(Right(x * x))
class TestActor extends Actor with Serializable {
def receive = {
case _
println("Hello")
}
}
}
class RoutingSpec extends WordSpec with MustMatchers {
import Routing._
"Routing" must {
import akka.routing.RoutingSpec._
"dispatch" in {
val Test1 = "test1"
val Test2 = "test2"
val Test3 = "test3"
"direct router" must {
"be started when constructed" in {
val actor1 = Actor.actorOf[TestActor].start
val t1 = actorOf(new Actor {
val actor = Routing.newRoutedActorRef("foo", List(actor1), RouterType.Direct)
actor.isRunning must be(true)
}
"throw IllegalArgumentException at construction when no connections" in {
try {
Routing.newRoutedActorRef("foo", List(), RouterType.Direct)
fail()
} catch {
case e: IllegalArgumentException
}
}
"send message to connection" in {
val doneLatch = new CountDownLatch(1)
val counter = new AtomicInteger(0)
val connection1 = actorOf(new Actor {
def receive = {
case Test1 self.reply(3)
case Test2 self.reply(7)
case "end" doneLatch.countDown()
case _ counter.incrementAndGet
}
}).start()
val t2 = actorOf(new Actor() {
val routedActor = Routing.newRoutedActorRef("foo", List(connection1), RouterType.Direct)
routedActor ! "hello"
routedActor ! "end"
doneLatch.await(5, TimeUnit.SECONDS) must be(true)
counter.get must be(1)
}
"deliver a broadcast message" in {
val doneLatch = new CountDownLatch(1)
val counter1 = new AtomicInteger
val connection1 = actorOf(new Actor {
def receive = {
case Test3 self.reply(11)
case "end" doneLatch.countDown()
case msg: Int counter1.addAndGet(msg)
}
}).start()
val d = routerActor {
case Test1 | Test2 t1
case Test3 t2
}.start()
val actor = Routing.newRoutedActorRef("foo", List(connection1), RouterType.Direct)
implicit val timeout = Timeout((5 seconds).dilated)
val result = for {
a (d ? (Test1)).as[Int]
b (d ? (Test2)).as[Int]
c (d ? (Test3)).as[Int]
} yield a + b + c
actor ! Broadcast(1)
actor ! "end"
result.isDefined must be(true)
result.get must be(21)
doneLatch.await(5, TimeUnit.SECONDS) must be(true)
for (a List(t1, t2, d)) a.stop()
counter1.get must be(1)
}
}
"have messages logged" in {
val msgs = new java.util.concurrent.ConcurrentSkipListSet[Any]
val latch = TestLatch(2)
"round robin router" must {
val actor = actorOf(new Actor {
def receive = { case _ }
}).start()
"be started when constructed" in {
val actor1 = Actor.actorOf[TestActor].start
val logger = loggerActor(actor, x { msgs.add(x); latch.countDown() }).start()
val foo: Any = "foo"
val bar: Any = "bar"
logger ! foo
logger ! bar
latch.await
msgs must (have size (2) and contain(foo) and contain(bar))
actor.stop()
logger.stop()
val actor = Routing.newRoutedActorRef("foo", List(actor1), RouterType.RoundRobin)
actor.isRunning must be(true)
}
"dispatch to smallest mailbox" in {
val t1Count = new AtomicInteger(0)
val t2Count = new AtomicInteger(0)
val latch1 = TestLatch(2501)
val latch2 = TestLatch(2499)
"throw IllegalArgumentException at construction when no connections" in {
try {
Routing.newRoutedActorRef("foo", List(), RouterType.RoundRobin)
fail()
} catch {
case e: IllegalArgumentException
}
}
val t1 = actorOf(new Actor {
//In this test a bunch of actors are created and each actor has its own counter.
//to test round robin, the routed actor receives the following sequence of messages 1 2 3 .. 1 2 3 .. 1 2 3 which it
//uses to increment his counter.
//So after n iteration, the first actor his counter should be 1*n, the second 2*n etc etc.
"deliver messages in a round robin fashion" in {
val connectionCount = 10
val iterationCount = 10
val doneLatch = new CountDownLatch(connectionCount)
//lets create some connections.
var connections = new LinkedList[ActorRef]
var counters = new LinkedList[AtomicInteger]
for (i 0 until connectionCount) {
counters = counters :+ new AtomicInteger()
val connection = actorOf(new Actor {
def receive = {
case x
t1Count.incrementAndGet
latch1.countDown()
case "end" doneLatch.countDown()
case msg: Int counters.get(i).get.addAndGet(msg)
}
}).start()
connections = connections :+ connection
}
t1.dispatcher.suspend(t1)
//create the routed actor.
val actor = Routing.newRoutedActorRef("foo", connections, RouterType.RoundRobin)
for (i 1 to 2501) t1 ! i
//send messages to the actor.
for (i 0 until iterationCount) {
for (k 0 until connectionCount) {
actor ! (k + 1)
}
}
val t2 = actorOf(new Actor {
actor ! Broadcast("end")
//now wait some and do validations.
doneLatch.await(5, TimeUnit.SECONDS) must be(true)
for (i 0 until connectionCount) {
val counter = counters.get(i).get
counter.get must be((iterationCount * (i + 1)))
}
}
"deliver a broadcast message using the !" in {
val doneLatch = new CountDownLatch(2)
val counter1 = new AtomicInteger
val connection1 = actorOf(new Actor {
def receive = {
case x
t2Count.incrementAndGet
latch2.countDown()
case "end" doneLatch.countDown()
case msg: Int counter1.addAndGet(msg)
}
}).start()
val d = loadBalancerActor(new SmallestMailboxFirstIterator(t1 :: t2 :: Nil)) //Will pick the last with the smallest mailbox, so make sure t1 is last
val counter2 = new AtomicInteger
val connection2 = actorOf(new Actor {
def receive = {
case "end" doneLatch.countDown()
case msg: Int counter2.addAndGet(msg)
}
}).start()
for (i 1 to 2499) d ! i
val actor = Routing.newRoutedActorRef("foo", List(connection1, connection2), RouterType.RoundRobin)
latch2.await(20 seconds)
actor ! Broadcast(1)
actor ! Broadcast("end")
t1.dispatcher.resume(t1)
doneLatch.await(5, TimeUnit.SECONDS) must be(true)
counter1.get must be(1)
counter2.get must be(1)
}
"fail to deliver a broadcast message using the ?" in {
val doneLatch = new CountDownLatch(1)
val counter1 = new AtomicInteger
val connection1 = actorOf(new Actor {
def receive = {
case "end" doneLatch.countDown()
case _ counter1.incrementAndGet()
}
}).start()
val actor = Routing.newRoutedActorRef("foo", List(connection1), RouterType.RoundRobin)
try {
latch1.await(20 seconds)
} finally {
// because t1 is much slower and thus has a bigger mailbox all the time
t1Count.get must be === 2501
t2Count.get must be === 2499
for (a List(t1, t2, d)) a.stop()
actor ? Broadcast(1)
fail()
} catch {
case e: RoutingException
}
actor ! "end"
doneLatch.await(5, TimeUnit.SECONDS) must be(true)
counter1.get must be(0)
}
}
"listen" in {
val fooLatch = TestLatch(2)
val barLatch = TestLatch(2)
val barCount = new AtomicInteger(0)
"random router" must {
val broadcast = actorOf(new Actor with Listeners {
def receive = listenerManagement orElse {
case "foo" gossip("bar")
"be started when constructed" in {
val actor1 = Actor.actorOf[TestActor].start
val actor = Routing.newRoutedActorRef("foo", List(actor1), RouterType.Random)
actor.isRunning must be(true)
}
}).start()
def newListener = actorOf(new Actor {
"throw IllegalArgumentException at construction when no connections" in {
try {
Routing.newRoutedActorRef("foo", List(), RouterType.Random)
fail()
} catch {
case e: IllegalArgumentException
}
}
"deliver messages in a random fashion" in {
}
"deliver a broadcast message" in {
val doneLatch = new CountDownLatch(2)
val counter1 = new AtomicInteger
val connection1 = actorOf(new Actor {
def receive = {
case "bar"
barCount.incrementAndGet
barLatch.countDown()
case "foo"
fooLatch.countDown()
case "end" doneLatch.countDown()
case msg: Int counter1.addAndGet(msg)
}
}).start()
val a1 = newListener
val a2 = newListener
val a3 = newListener
broadcast ! Listen(a1)
broadcast ! Listen(a2)
broadcast ! Listen(a3)
broadcast ! Deafen(a3)
broadcast ! WithListeners(_ ! "foo")
broadcast ! "foo"
barLatch.await
barCount.get must be(2)
fooLatch.await
for (a List(broadcast, a1, a2, a3)) a.stop()
}
}
"Actor Pool" must {
"have expected capacity" in {
val latch = TestLatch(2)
val count = new AtomicInteger(0)
val pool = actorOf(
new Actor with DefaultActorPool with DefaultActorPoolSupervisionConfig with FixedCapacityStrategy with SmallestMailboxSelector {
def factory = actorOf(new Actor {
val counter2 = new AtomicInteger
val connection2 = actorOf(new Actor {
def receive = {
case _
count.incrementAndGet
latch.countDown()
self tryReply "success"
case "end" doneLatch.countDown()
case msg: Int counter2.addAndGet(msg)
}
}).start()
def limit = 2
def selectionCount = 1
def partialFill = true
def instance = factory
def receive = _route
}).start()
val actor = Routing.newRoutedActorRef("foo", List(connection1, connection2), RouterType.Random)
val successes = TestLatch(2)
val successCounter = actorOf(new Actor {
actor ! Broadcast(1)
actor ! Broadcast("end")
doneLatch.await(5, TimeUnit.SECONDS) must be(true)
counter1.get must be(1)
counter2.get must be(1)
}
"fail to deliver a broadcast message using the ?" in {
val doneLatch = new CountDownLatch(1)
val counter1 = new AtomicInteger
val connection1 = actorOf(new Actor {
def receive = {
case "success" successes.countDown()
case "end" doneLatch.countDown()
case _ counter1.incrementAndGet()
}
}).start()
implicit val replyTo = successCounter
pool ! "a"
pool ! "b"
latch.await
successes.await
count.get must be(2)
(pool ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2)
pool.stop()
}
"pass ticket #705" in {
val pool = actorOf(
new Actor with DefaultActorPool with DefaultActorPoolSupervisionConfig with BoundedCapacityStrategy with MailboxPressureCapacitor with SmallestMailboxSelector with BasicFilter {
def lowerBound = 2
def upperBound = 20
def rampupRate = 0.1
def backoffRate = 0.1
def backoffThreshold = 0.5
def partialFill = true
def selectionCount = 1
def instance = factory
def receive = _route
def pressureThreshold = 1
def factory = actorOf(new Actor {
def receive = {
case req: String {
sleepFor(10 millis)
self.tryReply("Response")
}
}
})
}).start()
val actor = Routing.newRoutedActorRef("foo", List(connection1), RouterType.Random)
try {
(for (count 1 to 500) yield pool.?("Test", 20000)) foreach {
_.await.resultOrException.get must be("Response")
actor ? Broadcast(1)
fail()
} catch {
case e: RoutingException
}
} finally {
pool.stop()
actor ! "end"
doneLatch.await(5, TimeUnit.SECONDS) must be(true)
counter1.get must be(0)
}
}
"grow as needed under pressure" in {
// make sure the pool starts at the expected lower limit and grows to the upper as needed
// as influenced by the backlog of blocking pooled actors
var latch = TestLatch(3)
val count = new AtomicInteger(0)
val pool = actorOf(
new Actor with DefaultActorPool with DefaultActorPoolSupervisionConfig with BoundedCapacityStrategy with ActiveFuturesPressureCapacitor with SmallestMailboxSelector with BasicNoBackoffFilter {
def factory = actorOf(new Actor {
def receive = {
case n: Int
sleepFor(n millis)
count.incrementAndGet
latch.countDown()
}
})
def lowerBound = 2
def upperBound = 4
def rampupRate = 0.1
def partialFill = true
def selectionCount = 1
def instance = factory
def receive = _route
}).start()
// first message should create the minimum number of delgates
pool ! 1
(pool ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2)
var loops = 0
def loop(t: Int) = {
latch = TestLatch(loops)
count.set(0)
for (m 0 until loops) {
pool ? t
sleepFor(50 millis)
}
}
// 2 more should go thru without triggering more
loops = 2
loop(500)
latch.await
count.get must be(loops)
(pool ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2)
// a whole bunch should max it out
loops = 10
loop(500)
latch.await
count.get must be(loops)
(pool ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(4)
pool.stop()
}
"grow as needed under mailbox pressure" in {
// make sure the pool starts at the expected lower limit and grows to the upper as needed
// as influenced by the backlog of messages in the delegate mailboxes
var latch = TestLatch(3)
val count = new AtomicInteger(0)
val pool = actorOf(
new Actor with DefaultActorPool with DefaultActorPoolSupervisionConfig with BoundedCapacityStrategy with MailboxPressureCapacitor with SmallestMailboxSelector with BasicNoBackoffFilter {
def factory = actorOf(new Actor {
def receive = {
case n: Int
sleepFor(n millis)
count.incrementAndGet
latch.countDown()
}
})
def lowerBound = 2
def upperBound = 4
def pressureThreshold = 3
def rampupRate = 0.1
def partialFill = true
def selectionCount = 1
def instance = factory
def receive = _route
}).start()
var loops = 0
def loop(t: Int) = {
latch = TestLatch(loops)
count.set(0)
for (m 0 until loops) {
pool ! t
}
}
// send a few messages and observe pool at its lower bound
loops = 3
loop(500)
latch.await
count.get must be(loops)
(pool ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2)
// send a bunch over the theshold and observe an increment
loops = 15
loop(500)
latch.await(10 seconds)
count.get must be(loops)
(pool ? ActorPool.Stat).as[ActorPool.Stats].get.size must be >= (3)
pool.stop()
}
"round robin" in {
val latch1 = TestLatch(2)
val delegates = new java.util.concurrent.ConcurrentHashMap[String, String]
val pool1 = actorOf(
new Actor with DefaultActorPool with DefaultActorPoolSupervisionConfig with FixedCapacityStrategy with RoundRobinSelector with BasicNoBackoffFilter {
def factory = actorOf(new Actor {
def receive = {
case _
delegates put (self.uuid.toString, "")
latch1.countDown()
}
})
def limit = 1
def selectionCount = 1
def rampupRate = 0.1
def partialFill = true
def instance = factory
def receive = _route
}).start()
pool1 ! "a"
pool1 ! "b"
latch1.await
delegates.size must be(1)
pool1.stop()
val latch2 = TestLatch(2)
delegates.clear()
val pool2 = actorOf(
new Actor with DefaultActorPool with DefaultActorPoolSupervisionConfig with FixedCapacityStrategy with RoundRobinSelector with BasicNoBackoffFilter {
def factory = actorOf(new Actor {
def receive = {
case _
delegates put (self.uuid.toString, "")
latch2.countDown()
}
})
def limit = 2
def selectionCount = 1
def rampupRate = 0.1
def partialFill = false
def instance = factory
def receive = _route
}).start()
pool2 ! "a"
pool2 ! "b"
latch2.await
delegates.size must be(2)
pool2.stop()
}
"backoff" in {
val latch = TestLatch(10)
val pool = actorOf(
new Actor with DefaultActorPool with DefaultActorPoolSupervisionConfig with BoundedCapacityStrategy with MailboxPressureCapacitor with SmallestMailboxSelector with Filter with RunningMeanBackoff with BasicRampup {
def factory = actorOf(new Actor {
def receive = {
case n: Int
sleepFor(n millis)
latch.countDown()
}
})
def lowerBound = 1
def upperBound = 5
def pressureThreshold = 1
def partialFill = true
def selectionCount = 1
def rampupRate = 0.1
def backoffRate = 0.50
def backoffThreshold = 0.50
def instance = factory
def receive = _route
}).start()
// put some pressure on the pool
for (m 0 to 10) pool ! 250
sleepFor(5 millis)
val z = (pool ? ActorPool.Stat).as[ActorPool.Stats].get.size
z must be >= (2)
// let it cool down
for (m 0 to 3) {
pool ! 1
sleepFor(500 millis)
}
(pool ? ActorPool.Stat).as[ActorPool.Stats].get.size must be <= (z)
pool.stop()
}
"support typed actors" in {
import RoutingSpec._
import TypedActor._
def createPool = new Actor with DefaultActorPool with DefaultActorPoolSupervisionConfig with BoundedCapacityStrategy with MailboxPressureCapacitor with SmallestMailboxSelector with Filter with RunningMeanBackoff with BasicRampup {
def lowerBound = 1
def upperBound = 5
def pressureThreshold = 1
def partialFill = true
def selectionCount = 1
def rampupRate = 0.1
def backoffRate = 0.50
def backoffThreshold = 0.50
def instance = getActorRefFor(typedActorOf[Foo, FooImpl]())
def receive = _route
}
val pool = createProxy[Foo](createPool)
val results = for (i 1 to 100) yield (i, pool.sq(i, 100))
for ((i, r) results) r.get must equal(i * i)
}
"provide default supervision of pooled actors" in {
import akka.config.Supervision._
val pingCount = new AtomicInteger(0)
val deathCount = new AtomicInteger(0)
var keepDying = false
val pool1 = actorOf(
new Actor with DefaultActorPool with DefaultActorPoolSupervisionConfig with BoundedCapacityStrategy with ActiveFuturesPressureCapacitor with SmallestMailboxSelector with BasicFilter {
def lowerBound = 2
def upperBound = 5
def rampupRate = 0.1
def backoffRate = 0.1
def backoffThreshold = 0.5
def partialFill = true
def selectionCount = 1
def instance = factory
def receive = _route
def pressureThreshold = 1
def factory = actorOf(new Actor {
if (deathCount.get > 5) deathCount.set(0)
if (deathCount.get > 0) { deathCount.incrementAndGet; throw new IllegalStateException("keep dying") }
def receive = {
case akka.Die
if (keepDying) deathCount.incrementAndGet
throw new RuntimeException
case _ pingCount.incrementAndGet
}
}).start()
}).start()
val pool2 = actorOf(
new Actor with DefaultActorPool with DefaultActorPoolSupervisionConfig with BoundedCapacityStrategy with ActiveFuturesPressureCapacitor with SmallestMailboxSelector with BasicFilter {
def lowerBound = 2
def upperBound = 5
def rampupRate = 0.1
def backoffRate = 0.1
def backoffThreshold = 0.5
def partialFill = true
def selectionCount = 1
def instance = factory
def receive = _route
def pressureThreshold = 1
def factory = actorOf(new Actor {
self.lifeCycle = Permanent
if (deathCount.get > 5) deathCount.set(0)
if (deathCount.get > 0) { deathCount.incrementAndGet; throw new IllegalStateException("keep dying") }
def receive = {
case akka.Die
if (keepDying) deathCount.incrementAndGet
throw new RuntimeException
case _ pingCount.incrementAndGet
}
}).start()
}).start()
val pool3 = actorOf(
new Actor with DefaultActorPool with DefaultActorPoolSupervisionConfig with BoundedCapacityStrategy with ActiveFuturesPressureCapacitor with RoundRobinSelector with BasicFilter {
def lowerBound = 2
def upperBound = 5
def rampupRate = 0.1
def backoffRate = 0.1
def backoffThreshold = 0.5
def partialFill = true
def selectionCount = 1
def instance = factory
def receive = _route
def pressureThreshold = 1
def factory = actorOf(new Actor {
self.lifeCycle = Temporary
if (deathCount.get > 5) deathCount.set(0)
if (deathCount.get > 0) { deathCount.incrementAndGet; throw new IllegalStateException("keep dying") }
def receive = {
case akka.Die
if (keepDying) deathCount.incrementAndGet
throw new RuntimeException
case _ pingCount.incrementAndGet
}
}).start()
}).start()
// default lifecycle
// actor comes back right away
pingCount.set(0)
keepDying = false
pool1 ! "ping"
(pool1 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2)
pool1 ! akka.Die
sleepFor(2 seconds)
(pool1 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2)
pingCount.get must be(1)
// default lifecycle
// actor dies completely
pingCount.set(0)
keepDying = true
pool1 ! "ping"
(pool1 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2)
pool1 ! akka.Die
sleepFor(2 seconds)
(pool1 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(1)
pool1 ! "ping"
(pool1 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2)
pingCount.get must be(2)
// permanent lifecycle
// actor comes back right away
pingCount.set(0)
keepDying = false
pool2 ! "ping"
(pool2 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2)
pool2 ! akka.Die
sleepFor(2 seconds)
(pool2 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2)
pingCount.get must be(1)
// permanent lifecycle
// actor dies completely
pingCount.set(0)
keepDying = true
pool2 ! "ping"
(pool2 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2)
pool2 ! akka.Die
sleepFor(2 seconds)
(pool2 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(1)
pool2 ! "ping"
(pool2 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2)
pingCount.get must be(2)
// temporary lifecycle
pingCount.set(0)
keepDying = false
pool3 ! "ping"
(pool3 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2)
pool3 ! akka.Die
sleepFor(2 seconds)
(pool3 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(1)
pool3 ! "ping"
pool3 ! "ping"
pool3 ! "ping"
(pool3 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2)
pingCount.get must be(4)
}
"support customizable supervision config of pooled actors" in {
import akka.config.Supervision._
val pingCount = new AtomicInteger(0)
val deathCount = new AtomicInteger(0)
var keepDying = false
trait LimitedTrapSupervisionConfig extends ActorPoolSupervisionConfig {
def poolFaultHandler = OneForOneStrategy(List(classOf[IllegalStateException]), 5, 1000)
}
object BadState
val pool1 = actorOf(
new Actor with DefaultActorPool with LimitedTrapSupervisionConfig with BoundedCapacityStrategy with ActiveFuturesPressureCapacitor with SmallestMailboxSelector with BasicFilter {
def lowerBound = 2
def upperBound = 5
def rampupRate = 0.1
def backoffRate = 0.1
def backoffThreshold = 0.5
def partialFill = true
def selectionCount = 1
def instance = factory
def receive = _route
def pressureThreshold = 1
def factory = actorOf(new Actor {
if (deathCount.get > 5) deathCount.set(0)
if (deathCount.get > 0) { deathCount.incrementAndGet; throw new IllegalStateException("keep dying") }
def receive = {
case BadState
if (keepDying) deathCount.incrementAndGet
throw new IllegalStateException
case akka.Die
throw new RuntimeException
case _ pingCount.incrementAndGet
}
}).start()
}).start()
// actor comes back right away
pingCount.set(0)
keepDying = false
pool1 ! "ping"
(pool1 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2)
pool1 ! BadState
sleepFor(2 seconds)
(pool1 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2)
pingCount.get must be(1)
// actor dies completely
pingCount.set(0)
keepDying = true
pool1 ! "ping"
(pool1 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2)
pool1 ! BadState
sleepFor(2 seconds)
(pool1 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(1)
pool1 ! "ping"
(pool1 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2)
pingCount.get must be(2)
// kill it
intercept[RuntimeException](pool1.?(akka.Die).get)
"least cpu router" must {
"throw IllegalArgumentException when constructed" in {
val actor1 = Actor.actorOf[TestActor].start
try {
Routing.newRoutedActorRef("foo", List(actor1), RouterType.LeastCPU)
} catch {
case e: IllegalArgumentException
}
}
}
"least ram router" must {
"throw IllegalArgumentException when constructed" in {
val actor1 = Actor.actorOf[TestActor].start
try {
Routing.newRoutedActorRef("foo", List(actor1), RouterType.LeastRAM)
} catch {
case e: IllegalArgumentException
}
}
}
"smallest mailbox" must {
"throw IllegalArgumentException when constructed" in {
val actor1 = Actor.actorOf[TestActor].start
try {
Routing.newRoutedActorRef("foo", List(actor1), RouterType.LeastMessages)
} catch {
case e: IllegalArgumentException
}
}
}
}

View file

@ -649,7 +649,8 @@ class LocalActorRef private[akka] (private[this] val actorFactory: () ⇒ Actor,
def mailbox: AnyRef = _mailbox
protected[akka] def mailbox_=(value: AnyRef): AnyRef = {
_mailbox = value; value
_mailbox = value;
value
}
/**
@ -1257,3 +1258,57 @@ case class SerializedActorRef(uuid: Uuid,
"] but it's not found in the local registry and remoting is not enabled.")
}
}
/**
* Trait for ActorRef implementations where most of the methods are not supported.
*/
trait UnsupportedActorRef extends ActorRef with ScalaActorRef {
def dispatcher_=(md: MessageDispatcher) {
unsupported
}
def dispatcher: MessageDispatcher = unsupported
def link(actorRef: ActorRef) {
unsupported
}
def unlink(actorRef: ActorRef) {
unsupported
}
def startLink(actorRef: ActorRef): ActorRef = unsupported
def supervisor: Option[ActorRef] = unsupported
def linkedActors: JMap[Uuid, ActorRef] = unsupported
protected[akka] def mailbox: AnyRef = unsupported
protected[akka] def mailbox_=(value: AnyRef): AnyRef = unsupported
protected[akka] def handleTrapExit(dead: ActorRef, reason: Throwable) {
unsupported
}
protected[akka] def restart(reason: Throwable, maxNrOfRetries: Option[Int], withinTimeRange: Option[Int]) {
unsupported
}
protected[akka] def restartLinkedActors(reason: Throwable, maxNrOfRetries: Option[Int], withinTimeRange: Option[Int]) {
unsupported
}
protected[akka] def invoke(messageHandle: MessageInvocation) {
unsupported
}
protected[akka] def supervisor_=(sup: Option[ActorRef]) {
unsupported
}
protected[akka] def actorInstance: AtomicReference[Actor] = unsupported
private def unsupported = throw new UnsupportedOperationException("Not supported for %s".format(getClass.getName))
}

View file

@ -4,16 +4,19 @@
package akka.routing
import akka.actor.{ UntypedActor, Actor, ActorRef }
import akka.actor.Actor._
import akka.actor.ActorRef
import scala.collection.JavaConversions._
import scala.collection.immutable.Seq
import java.util.concurrent.atomic.AtomicReference
import annotation.tailrec
import akka.AkkaException
import akka.dispatch.Future
import java.net.InetSocketAddress
import akka.actor._
import akka.event.EventHandler
import akka.actor.UntypedChannel._
import akka.routing.RouterType.RoundRobin
class RoutingException(message: String) extends AkkaException(message)
@ -23,191 +26,304 @@ sealed trait RouterType
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
object RouterType {
object Direct extends RouterType
/**
* A RouterType that randomly selects a connection to send a message to.
*/
object Random extends RouterType
/**
* A RouterType that selects the connection by using round robin.
*/
object RoundRobin extends RouterType
/**
* A RouterType that selects the connection based on the least amount of cpu usage
*/
object LeastCPU extends RouterType
/**
* A RouterType that select the connection based on the least amount of ram used.
*
* todo: this is extremely vague currently since there are so many ways to define least amount of ram.
*/
object LeastRAM extends RouterType
/**
* A RouterType that select the connection where the actor has the least amount of messages in its mailbox.
*/
object LeastMessages extends RouterType
}
/**
* A Router is a trait whose purpose is to route incoming messages to actors.
*/
trait Router { this: Actor
protected def transform(msg: Any): Any = msg
protected def routes: PartialFunction[Any, ActorRef]
protected def broadcast(message: Any) {}
protected def dispatch: Receive = {
case Routing.Broadcast(message)
broadcast(message)
case a if routes.isDefinedAt(a)
if (isSenderDefined) routes(a).forward(transform(a))(someSelf)
else routes(a).!(transform(a))(None)
}
def receive = dispatch
private def isSenderDefined = self.senderFuture.isDefined || self.sender.isDefined
}
/**
* An UntypedRouter is an abstract class whose purpose is to route incoming messages to actors.
*/
abstract class UntypedRouter extends UntypedActor {
protected def transform(msg: Any): Any = msg
protected def route(msg: Any): ActorRef
protected def broadcast(message: Any) {}
private def isSenderDefined = self.senderFuture.isDefined || self.sender.isDefined
@throws(classOf[Exception])
def onReceive(msg: Any): Unit = msg match {
case m: Routing.Broadcast broadcast(m.message)
case _
val r = route(msg)
if (r eq null) throw new IllegalStateException("No route for " + msg + " defined!")
if (isSenderDefined) r.forward(transform(msg))(someSelf)
else r.!(transform(msg))(None)
}
}
/**
* A LoadBalancer is a specialized kind of Router, that is supplied an InfiniteIterator of targets
* to dispatch incoming messages to.
*/
trait LoadBalancer extends Router { self: Actor
protected def seq: InfiniteIterator[ActorRef]
protected def routes = {
case x if seq.hasNext seq.next
}
override def broadcast(message: Any) = seq.items.foreach(_ ! message)
}
/**
* A UntypedLoadBalancer is a specialized kind of UntypedRouter, that is supplied an InfiniteIterator of targets
* to dispatch incoming messages to.
*/
abstract class UntypedLoadBalancer extends UntypedRouter {
protected def seq: InfiniteIterator[ActorRef]
protected def route(msg: Any) =
if (seq.hasNext) seq.next
else null
override def broadcast(message: Any) = seq.items.foreach(_ ! message)
}
object Routing {
sealed trait RoutingMessage
case class Broadcast(message: Any) extends RoutingMessage
type PF[A, B] = PartialFunction[A, B]
/**
* Creates a new PartialFunction whose isDefinedAt is a combination
* of the two parameters, and whose apply is first to call filter.apply
* and then filtered.apply.
* Creates a new started RoutedActorRef that uses routing to deliver a message to one of its connected actors.
*
* @param actorAddress the address of the ActorRef.
* @param connections an Iterable pointing to all connected actor references.
* @param routerType the type of routing that should be used.
* @throws IllegalArgumentException if the number of connections is zero, or if it depends on the actual router implementation
* how many connections it can handle.
*/
def filter[A, B](filter: PF[A, Unit], filtered: PF[A, B]): PF[A, B] = {
case a: A if filtered.isDefinedAt(a) && filter.isDefinedAt(a)
filter(a)
filtered(a)
def newRoutedActorRef(actorAddress: String, connections: Iterable[ActorRef], routerType: RouterType): RoutedActorRef = {
if (connections.size == 0)
throw new IllegalArgumentException("To create a routed actor ref, at least one connection is required")
val ref = routerType match {
case RouterType.Direct
if (connections.size > 1) throw new IllegalArgumentException("A direct router can't have more than 1 connection")
new RoutedActorRef(actorAddress, connections) with Direct
case RouterType.Random
new RoutedActorRef(actorAddress, connections) with Random
case RouterType.RoundRobin
new RoutedActorRef(actorAddress, connections) with RoundRobin
case _ throw new IllegalArgumentException("Unsupported routerType " + routerType)
}
ref.start()
}
def newRoundRobinActorRef(actorAddress: String, connections: Iterable[ActorRef]): RoutedActorRef = {
newRoutedActorRef(actorAddress, connections, RoundRobin)
}
}
/**
* Interceptor is a filter(x,y) where x.isDefinedAt is considered to be always true.
* A RoutedActorRef is an ActorRef that has a set of connected ActorRef and it uses a Router to send a message to
* on (or more) of these actors.
*/
def intercept[A: Manifest, B](interceptor: (A) Unit, interceptee: PF[A, B]): PF[A, B] =
filter({ case a interceptor(a) }, interceptee)
class RoutedActorRef(val address: String, val cons: Iterable[ActorRef]) extends UnsupportedActorRef with ScalaActorRef {
this: Router
/**
* Creates a LoadBalancer from the thunk-supplied InfiniteIterator.
*/
def loadBalancerActor(actors: InfiniteIterator[ActorRef]): ActorRef =
localActorOf(new Actor with LoadBalancer {
val seq = actors
}).start()
def connections: Iterable[ActorRef] = cons
/**
* Creates a Router given a routing and a message-transforming function.
*/
def routerActor(routing: PF[Any, ActorRef], msgTransformer: (Any) Any): ActorRef =
localActorOf(new Actor with Router {
override def transform(msg: Any) = msgTransformer(msg)
def routes = routing
}).start()
override def postMessageToMailbox(message: Any, channel: UntypedChannel): Unit = {
val sender = channel match {
case ref: ActorRef Some(ref)
case _ None
}
route(message)(sender)
}
/**
* Creates a Router given a routing.
*/
def routerActor(routing: PF[Any, ActorRef]): ActorRef = localActorOf(new Actor with Router {
def routes = routing
}).start()
override def postMessageToMailboxAndCreateFutureResultWithTimeout(message: Any,
timeout: Timeout,
channel: UntypedChannel): Future[Any] = {
val sender = channel match {
case ref: ActorRef Some(ref)
case _ None
}
route[Any](message, timeout)(sender)
}
/**
* Creates an actor that pipes all incoming messages to
* both another actor and through the supplied function
*/
def loggerActor(actorToLog: ActorRef, logger: (Any) Unit): ActorRef =
routerActor({ case _ actorToLog }, logger)
private[akka] def failOver(from: InetSocketAddress, to: InetSocketAddress): Unit = {
throw new UnsupportedOperationException
}
def signalDeadActor(ref: ActorRef): Unit = {
throw new UnsupportedOperationException
}
def start(): this.type = synchronized[this.type] {
_status = ActorRefInternals.RUNNING
this
}
def stop() {
synchronized {
if (_status == ActorRefInternals.RUNNING) {
_status = ActorRefInternals.SHUTDOWN
postMessageToMailbox(RemoteActorSystemMessage.Stop, None)
// FIXME here we need to fire off Actor.cluster.remove(address) (which needs to be properly implemented first, see ticket)
//inetSocketAddressToActorRefMap.get.values foreach (_.stop()) // shut down all remote connections
}
}
}
}
/**
* An Iterator that is either always empty or yields an infinite number of Ts.
* The Router is responsible for sending a message to one (or more) of its connections.
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
trait InfiniteIterator[T] extends Iterator[T] {
val items: Seq[T]
trait Router {
/**
* Returns an Iterable containing all connected ActorRefs this Router uses to send messages to.
*/
def connections: Iterable[ActorRef]
/**
* A callback this Router uses to indicate that some actorRef was not usable.
*
* Implementations should make sure that this method can be called without the actorRef being part of the
* current set of connections. The most logical way to deal with this situation, is just to ignore it.
*
* @param ref the dead
*/
def signalDeadActor(ref: ActorRef): Unit
/**
* Routes the message to one of the connections.
*/
def route(message: Any)(implicit sender: Option[ActorRef]): Unit
/**
* Routes the message using a timeout to one of the connections and returns a Future to synchronize on the
* completion of the processing of the message.
*/
def route[T](message: Any, timeout: Timeout)(implicit sender: Option[ActorRef]): Future[T]
}
/**
* CyclicIterator is a round-robin style InfiniteIterator that cycles the supplied List.
* An Abstract Router implementation that already provides the basic infrastructure so that a concrete
* Router only needs to implement the next method.
*
* todo:
* This also is the location where a failover is done in the future if an ActorRef fails and a different
* one needs to be selected.
* todo:
* this is also the location where message buffering should be done in case of failure.
*/
case class CyclicIterator[T](val items: Seq[T]) extends InfiniteIterator[T] {
def this(items: java.util.List[T]) = this(items.toList)
trait BasicRouter extends Router {
private[this] val current: AtomicReference[Seq[T]] = new AtomicReference(items)
def route(message: Any)(implicit sender: Option[ActorRef]): Unit = message match {
case Routing.Broadcast(message)
//it is a broadcast message, we are going to send to message to all connections.
connections.foreach(actor
try {
actor.!(message)(sender)
} catch {
case e: Exception
signalDeadActor(actor)
throw e
})
case _
//it no broadcast message, we are going to select an actor from the connections and send the message to him.
next match {
case Some(actor)
try {
actor.!(message)(sender)
} catch {
case e: Exception
signalDeadActor(actor)
throw e
def hasNext = items != Nil
}
case None
throwNoConnectionsError()
}
}
def next: T = {
def route[T](message: Any, timeout: Timeout)(implicit sender: Option[ActorRef]): Future[T] = message match {
case Routing.Broadcast(message)
throw new RoutingException("Broadcasting using a future for the time being is not supported")
case _
//it no broadcast message, we are going to select an actor from the connections and send the message to him.
next match {
case Some(actor)
try {
actor.?(message, timeout)(sender).asInstanceOf[Future[T]]
} catch {
case e: Exception
signalDeadActor(actor)
throw e
}
case None
throwNoConnectionsError()
}
}
protected def next: Option[ActorRef]
private def throwNoConnectionsError() = {
val error = new RoutingException("No replica connections for router")
EventHandler.error(error, this, error.toString)
throw error
}
}
/**
* A Router that is used when a durable actor is used. All requests are send to the node containing the actor.
* As soon as that instance fails, a different instance is created and since the mailbox is durable, the internal
* state can be restored using event sourcing, and once this instance is up and running, all request will be send
* to this instance.
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
trait Direct extends BasicRouter {
lazy val next: Option[ActorRef] = {
val connection = connections.headOption
if (connection.isEmpty) EventHandler.warning(this, "Router has no replica connection")
connection
}
}
/**
* A Router that randomly selects one of the target connections to send a message to.
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
trait Random extends BasicRouter {
//todo: threadlocal random?
private val random = new java.util.Random(System.currentTimeMillis)
def next: Option[ActorRef] =
if (connections.isEmpty) {
EventHandler.warning(this, "Router has no replica connections")
None
} else {
val randomIndex = random.nextInt(connections.size)
//todo: possible index ouf of bounds problems since the number of connection could already have been changed.
Some(connections.iterator.drop(randomIndex).next())
}
}
/**
* A Router that uses round-robin to select a connection.
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
trait RoundRobin extends BasicRouter {
private def items: List[ActorRef] = connections.toList
//todo: this is broken since the list is not updated.
private val current = new AtomicReference[List[ActorRef]](items)
private def hasNext = connections.nonEmpty
def next: Option[ActorRef] = {
@tailrec
def findNext: T = {
def findNext: Option[ActorRef] = {
val currentItems = current.get
val newItems = currentItems match {
case Nil items
case xs xs
}
if (current.compareAndSet(currentItems, newItems.tail)) newItems.head
if (newItems.isEmpty) {
EventHandler.warning(this, "Router has no replica connections")
None
} else {
if (current.compareAndSet(currentItems, newItems.tail)) newItems.headOption
else findNext
}
}
findNext
}
override def exists(f: T Boolean): Boolean = items exists f
}
/**
* This InfiniteIterator always returns the Actor that has the currently smallest mailbox
* useful for work-stealing.
*/
case class SmallestMailboxFirstIterator(val items: Seq[ActorRef]) extends InfiniteIterator[ActorRef] {
def this(items: java.util.List[ActorRef]) = this(items.toList)
def hasNext = items != Nil
def next = items.reduceLeft((a1, a2) if (a1.dispatcher.mailboxSize(a1) < a2.dispatcher.mailboxSize(a2)) a1 else a2)
override def exists(f: ActorRef Boolean): Boolean = items.exists(f)
}

View file

@ -977,7 +977,7 @@ class DefaultClusterNode private[akka] (
"Checking out cluster actor ref with address [%s] and router [%s] on [%s] connected to [\n\t%s]"
.format(actorAddress, router, remoteServerAddress, addresses.map(_._2).mkString("\n\t")))
val actorRef = Router newRouter (router, addresses, actorAddress, Actor.TIMEOUT)
val actorRef = Routing newRouter (router, addresses, actorAddress, Actor.TIMEOUT)
addresses foreach {
case (_, address) clusterActorRefs.put(address, actorRef)
}

View file

@ -4,18 +4,17 @@
package akka.cluster
import akka.actor._
import akka.dispatch._
import akka.util._
import ReflectiveAccess._
import akka.dispatch.Future
import java.net.InetSocketAddress
import java.util.concurrent.atomic.AtomicReference
import java.util.{ Map JMap }
import com.eaio.uuid.UUID
import collection.immutable.Map
import annotation.tailrec
import akka.routing.Router
/**
* ActorRef representing a one or many instances of a clustered, load-balanced and sometimes replicated actor
@ -26,8 +25,8 @@ import annotation.tailrec
class ClusterActorRef private[akka] (inetSocketAddresses: Array[Tuple2[UUID, InetSocketAddress]],
val address: String,
_timeout: Long)
extends ActorRef with ScalaActorRef {
this: Router.Router
extends UnsupportedActorRef with ScalaActorRef {
this: Router
timeout = _timeout
private[akka] val inetSocketAddressToActorRefMap = new AtomicReference[Map[InetSocketAddress, ActorRef]](
@ -37,7 +36,7 @@ class ClusterActorRef private[akka] (inetSocketAddresses: Array[Tuple2[UUID, Ine
ClusterModule.ensureEnabled()
def connections: Map[InetSocketAddress, ActorRef] = inetSocketAddressToActorRefMap.get
def connections: Iterable[ActorRef] = inetSocketAddressToActorRefMap.get.values
override def postMessageToMailbox(message: Any, channel: UntypedChannel): Unit = {
val sender = channel match {
@ -96,11 +95,12 @@ class ClusterActorRef private[akka] (inetSocketAddresses: Array[Tuple2[UUID, Ine
def signalDeadActor(ref: ActorRef): Unit = {
//since the number remote actor refs for a clustered actor ref is quite low, we can deal with the O(N) complexity
//of the following removal.
val it = connections.keySet.iterator
val map = inetSocketAddressToActorRefMap.get
val it = map.keySet.iterator
while (it.hasNext) {
val address = it.next()
val foundRef: ActorRef = connections.get(address).get
val foundRef: ActorRef = map.get(address).get
if (foundRef == ref) {
remove(address)
@ -133,57 +133,4 @@ class ClusterActorRef private[akka] (inetSocketAddresses: Array[Tuple2[UUID, Ine
}
}
}
// ========================================================================
// ==== NOT SUPPORTED ====
// ========================================================================
// FIXME move these methods and the same ones in RemoteActorRef to a base class - now duplicated
def dispatcher_=(md: MessageDispatcher) {
unsupported
}
def dispatcher: MessageDispatcher = unsupported
def link(actorRef: ActorRef) {
unsupported
}
def unlink(actorRef: ActorRef) {
unsupported
}
def startLink(actorRef: ActorRef): ActorRef = unsupported
def supervisor: Option[ActorRef] = unsupported
def linkedActors: JMap[Uuid, ActorRef] = unsupported
protected[akka] def mailbox: AnyRef = unsupported
protected[akka] def mailbox_=(value: AnyRef): AnyRef = unsupported
protected[akka] def handleTrapExit(dead: ActorRef, reason: Throwable) {
unsupported
}
protected[akka] def restart(reason: Throwable, maxNrOfRetries: Option[Int], withinTimeRange: Option[Int]) {
unsupported
}
protected[akka] def restartLinkedActors(reason: Throwable, maxNrOfRetries: Option[Int], withinTimeRange: Option[Int]) {
unsupported
}
protected[akka] def invoke(messageHandle: MessageInvocation) {
unsupported
}
protected[akka] def supervisor_=(sup: Option[ActorRef]) {
unsupported
}
protected[akka] def actorInstance: AtomicReference[Actor] = unsupported
private def unsupported = throw new UnsupportedOperationException("Not supported for ClusterActorRef")
}

View file

@ -1,25 +1,22 @@
package akka.cluster
/**
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.cluster
import akka.actor._
import akka.dispatch.Future
import akka.event.EventHandler
import akka.routing.{ RouterType, RoutingException }
import akka.routing.RouterType
import RouterType._
import com.eaio.uuid.UUID
import annotation.tailrec
import java.net.InetSocketAddress
import java.util.concurrent.atomic.AtomicReference
import akka.routing.{ Random, Direct, RoundRobin }
/**
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
object Router {
object Routing {
def newRouter(
routerType: RouterType,
inetSocketAddresses: Array[Tuple2[UUID, InetSocketAddress]],
@ -34,122 +31,4 @@ object Router {
case LeastMessages sys.error("Router LeastMessages not supported yet")
}
}
/**
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
trait Router {
def connections: Map[InetSocketAddress, ActorRef]
def signalDeadActor(ref: ActorRef): Unit
def route(message: Any)(implicit sender: Option[ActorRef]): Unit
def route[T](message: Any, timeout: Long)(implicit sender: Option[ActorRef]): Future[T]
}
/**
* An Abstract Router implementation that already provides the basic infrastructure so that a concrete
* Router only needs to implement the next method.
*
* This also is the location where a failover is done in the future if an ActorRef fails and a different
* one needs to be selected.
*/
trait BasicRouter extends Router {
def route(message: Any)(implicit sender: Option[ActorRef]): Unit = next match {
case Some(actor) {
try {
actor.!(message)(sender)
} catch {
case e: Exception
signalDeadActor(actor)
throw e
}
}
case _ throwNoConnectionsError()
}
def route[T](message: Any, timeout: Long)(implicit sender: Option[ActorRef]): Future[T] = next match {
case Some(actor) {
try {
actor.?(message, timeout)(sender).asInstanceOf[Future[T]]
} catch {
case e: Throwable
signalDeadActor(actor)
throw e
}
}
case _ throwNoConnectionsError()
}
protected def next: Option[ActorRef]
private def throwNoConnectionsError() = {
val error = new RoutingException("No replica connections for router")
EventHandler.error(error, this, error.toString)
throw error
}
}
/**
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
trait Direct extends BasicRouter {
lazy val next: Option[ActorRef] = {
val connection = connections.values.headOption
if (connection.isEmpty) EventHandler.warning(this, "Router has no replica connection")
connection
}
}
/**
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
trait Random extends BasicRouter {
private val random = new java.util.Random(System.currentTimeMillis)
def next: Option[ActorRef] =
if (connections.isEmpty) {
EventHandler.warning(this, "Router has no replica connections")
None
} else {
Some(connections.valuesIterator.drop(random.nextInt(connections.size)).next())
}
}
/**
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
trait RoundRobin extends BasicRouter {
private def items: List[ActorRef] = connections.values.toList
private val current = new AtomicReference[List[ActorRef]](items)
private def hasNext = connections.nonEmpty
def next: Option[ActorRef] = {
@tailrec
def findNext: Option[ActorRef] = {
val currentItems = current.get
val newItems = currentItems match {
case Nil items
case xs xs
}
if (newItems.isEmpty) {
EventHandler.warning(this, "Router has no replica connections")
None
} else {
if (current.compareAndSet(currentItems, newItems.tail)) newItems.headOption
else findNext
}
}
findNext
}
}
}

View file

@ -11,7 +11,6 @@ import akka.actor.Actor._
import akka.actor.{ ActorRegistry, ActorRef, Actor }
import akka.camel._
import akka.camel.CamelServiceManager._
import akka.routing.CyclicIterator
import akka.routing.Routing._
/**
@ -51,8 +50,7 @@ object HttpConcurrencyTestStress {
startCamelService
val workers = for (i 1 to 8) yield actorOf[HttpServerWorker].start
val balancer = loadBalancerActor(new CyclicIterator(workers.toList))
val balancer = newRoundRobinActorRef("loadbalancer", workers)
//service.get.awaitEndpointActivation(1) {
// actorOf(new HttpServerActor(balancer)).start
//}

View file

@ -11,11 +11,12 @@ import static java.util.Arrays.asList;
import akka.actor.ActorRef;
import akka.actor.UntypedActor;
import akka.actor.UntypedActorFactory;
import akka.routing.CyclicIterator;
import akka.routing.InfiniteIterator;
import akka.routing.RouterType;
import akka.routing.Routing;
import akka.routing.Routing.Broadcast;
import akka.routing.UntypedLoadBalancer;
import scala.collection.JavaConversions;
import java.util.LinkedList;
import java.util.concurrent.CountDownLatch;
public class Pi {
@ -96,35 +97,18 @@ public class Pi {
private ActorRef router;
static class PiRouter extends UntypedLoadBalancer {
private final InfiniteIterator<ActorRef> workers;
public PiRouter(ActorRef[] workers) {
this.workers = new CyclicIterator<ActorRef>(asList(workers));
}
public InfiniteIterator<ActorRef> seq() {
return workers;
}
}
public Master(int nrOfWorkers, int nrOfMessages, int nrOfElements, CountDownLatch latch) {
this.nrOfMessages = nrOfMessages;
this.nrOfElements = nrOfElements;
this.latch = latch;
// create the workers
final ActorRef[] workers = new ActorRef[nrOfWorkers];
LinkedList<ActorRef> workers = new LinkedList<ActorRef>();
for (int i = 0; i < nrOfWorkers; i++) {
workers[i] = actorOf(Worker.class, "worker").start();
ActorRef worker = actorOf(Worker.class, "worker").start();
workers.add(worker);
}
// wrap them with a load-balancing router
router = actorOf(new UntypedActorFactory() {
public UntypedActor create() {
return new PiRouter(workers);
}
}, "router").start();
router = Routing.newRoundRobinActorRef("pi", JavaConversions.asIterable(workers));
}
// message handler

View file

@ -6,11 +6,9 @@ package akka.tutorial.first.scala
import akka.actor.{ Actor, PoisonPill }
import Actor._
import akka.routing.{ Routing, CyclicIterator }
import Routing._
import System.{ currentTimeMillis now }
import java.util.concurrent.CountDownLatch
import akka.routing.Routing.Broadcast
import akka.routing.Routing
object Pi extends App {
@ -57,7 +55,7 @@ object Pi extends App {
val workers = Vector.fill(nrOfWorkers)(actorOf[Worker].start())
// wrap them with a load-balancing router
val router = Routing.loadBalancerActor(CyclicIterator(workers)).start()
val router = Routing.newRoundRobinActorRef("pi", workers)
// message handler
def receive = {

View file

@ -8,6 +8,8 @@ import static akka.actor.Actors.actorOf;
import static akka.actor.Actors.poisonPill;
import static java.lang.System.currentTimeMillis;
import static java.util.Arrays.asList;
import akka.routing.Routing;
import scala.Option;
import akka.actor.ActorRef;
import akka.actor.Channel;
@ -15,10 +17,10 @@ import akka.actor.UntypedActor;
import akka.actor.UntypedActorFactory;
import akka.dispatch.Future;
import akka.japi.Procedure;
import akka.routing.CyclicIterator;
import akka.routing.InfiniteIterator;
import akka.routing.Routing.Broadcast;
import akka.routing.UntypedLoadBalancer;
import scala.collection.JavaConversions;
import java.util.LinkedList;
public class Pi {
@ -90,34 +92,17 @@ public class Pi {
private ActorRef router;
static class PiRouter extends UntypedLoadBalancer {
private final InfiniteIterator<ActorRef> workers;
public PiRouter(ActorRef[] workers) {
this.workers = new CyclicIterator<ActorRef>(asList(workers));
}
public InfiniteIterator<ActorRef> seq() {
return workers;
}
}
public Master(int nrOfWorkers, int nrOfMessages, int nrOfElements) {
this.nrOfMessages = nrOfMessages;
this.nrOfElements = nrOfElements;
// create the workers
final ActorRef[] workers = new ActorRef[nrOfWorkers];
LinkedList<ActorRef> workers = new LinkedList<ActorRef>();
for (int i = 0; i < nrOfWorkers; i++) {
workers[i] = actorOf(Worker.class, "worker").start();
ActorRef worker = actorOf(Worker.class, "worker").start();
workers.add(worker);
}
// wrap them with a load-balancing router
router = actorOf(new UntypedActorFactory() {
public UntypedActor create() {
return new PiRouter(workers);
}
}, "router").start();
router = Routing.newRoundRobinActorRef("pi", JavaConversions.asIterable(workers));
}
@Override

View file

@ -5,13 +5,11 @@
package akka.tutorial.second
import akka.actor.Actor._
import akka.routing.{ Routing, CyclicIterator }
import Routing._
import akka.routing.Routing
import akka.event.EventHandler
import akka.actor.{ Channel, Actor, PoisonPill, Timeout }
import akka.dispatch.Future
import System.{ currentTimeMillis now }
import akka.routing.Routing.Broadcast
import akka.actor.{ Timeout, Channel, Actor, PoisonPill }
object Pi extends App {
@ -55,7 +53,7 @@ object Pi extends App {
val workers = Vector.fill(nrOfWorkers)(actorOf[Worker].start())
// wrap them with a load-balancing router
val router = Routing.loadBalancerActor(CyclicIterator(workers)).start()
val router = Routing.newRoundRobinActorRef("pi", workers)
// phase 1, can accept a Calculate message
def scatter: Receive = {