diff --git a/akka-actor-tests/src/test/scala/akka/actor/actor/IOActor.scala b/akka-actor-tests/src/test/scala/akka/actor/actor/IOActor.scala
index 4c2d069f24..82e1734b45 100644
--- a/akka-actor-tests/src/test/scala/akka/actor/actor/IOActor.scala
+++ b/akka-actor-tests/src/test/scala/akka/actor/actor/IOActor.scala
@@ -189,9 +189,9 @@ class IOActorSpec extends WordSpec with MustMatchers with BeforeAndAfterEach {
val ioManager = Actor.actorOf(new IOManager()).start
val server = Actor.actorOf(new SimpleEchoServer("localhost", 8065, ioManager)).start
val client = Actor.actorOf(new SimpleEchoClient("localhost", 8065, ioManager)).start
- val list = List.range(0, 100000)
+ val list = List.range(0, 1000)
val f = Future.traverse(list)(i ⇒ client ? ByteString(i.toString))
- assert(f.get.size === 100000)
+ assert(f.get.size === 1000)
client.stop
server.stop
ioManager.stop
@@ -201,9 +201,9 @@ class IOActorSpec extends WordSpec with MustMatchers with BeforeAndAfterEach {
val ioManager = Actor.actorOf(new IOManager(2)).start
val server = Actor.actorOf(new SimpleEchoServer("localhost", 8066, ioManager)).start
val client = Actor.actorOf(new SimpleEchoClient("localhost", 8066, ioManager)).start
- val list = List.range(0, 100000)
+ val list = List.range(0, 1000)
val f = Future.traverse(list)(i ⇒ client ? ByteString(i.toString))
- assert(f.get.size === 100000)
+ assert(f.get.size === 1000)
client.stop
server.stop
ioManager.stop
diff --git a/akka-actor-tests/src/test/scala/akka/actor/actor/TypedActorSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/actor/TypedActorSpec.scala
index 7c09de930e..48f2785d01 100644
--- a/akka-actor-tests/src/test/scala/akka/actor/actor/TypedActorSpec.scala
+++ b/akka-actor-tests/src/test/scala/akka/actor/actor/TypedActorSpec.scala
@@ -12,32 +12,65 @@ import akka.actor.TypedActor._
import akka.japi.{ Option ⇒ JOption }
import akka.util.Duration
import akka.dispatch.{ Dispatchers, Future, KeptPromise }
-import akka.routing.CyclicIterator
-import java.io.{ ObjectInputStream, ObjectOutputStream, ByteArrayOutputStream }
-import akka.actor.TypedActorSpec.Foo
+import java.util.concurrent.atomic.AtomicReference
+import annotation.tailrec
object TypedActorSpec {
+
+ class CyclicIterator[T](val items: Seq[T]) extends Iterator[T] {
+
+ private[this] val current: AtomicReference[Seq[T]] = new AtomicReference(items)
+
+ def hasNext = items != Nil
+
+ def next: T = {
+ @tailrec
+ def findNext: T = {
+ val currentItems = current.get
+ val newItems = currentItems match {
+ case Nil ⇒ items
+ case xs ⇒ xs
+ }
+
+ if (current.compareAndSet(currentItems, newItems.tail)) newItems.head
+ else findNext
+ }
+
+ findNext
+ }
+
+ override def exists(f: T ⇒ Boolean): Boolean = items exists f
+ }
+
trait Foo {
def pigdog(): String
def self = TypedActor.self[Foo]
def futurePigdog(): Future[String]
+
def futurePigdog(delay: Long): Future[String]
+
def futurePigdog(delay: Long, numbered: Int): Future[String]
+
def futureComposePigdogFrom(foo: Foo): Future[String]
def failingFuturePigdog(): Future[String] = throw new IllegalStateException("expected")
+
def failingOptionPigdog(): Option[String] = throw new IllegalStateException("expected")
+
def failingJOptionPigdog(): JOption[String] = throw new IllegalStateException("expected")
def failingPigdog(): Unit = throw new IllegalStateException("expected")
def optionPigdog(): Option[String]
+
def optionPigdog(delay: Long): Option[String]
+
def joptionPigdog(delay: Long): JOption[String]
def incr()
+
def read(): Int
def testMethodCallSerialization(foo: Foo, s: String, i: Int): Unit = throw new IllegalStateException("expected")
@@ -48,6 +81,7 @@ object TypedActorSpec {
def pigdog = "Pigdog"
def futurePigdog(): Future[String] = new KeptPromise(Right(pigdog))
+
def futurePigdog(delay: Long): Future[String] = {
Thread.sleep(delay)
futurePigdog
@@ -92,6 +126,7 @@ object TypedActorSpec {
trait Stacked extends Stackable1 with Stackable2 {
def stacked: String = stackable1 + stackable2
+
def notOverriddenStacked: String = stackable1 + stackable2
}
diff --git a/akka-actor-tests/src/test/scala/akka/routing/ActorPoolSpec.scala b/akka-actor-tests/src/test/scala/akka/routing/ActorPoolSpec.scala
new file mode 100644
index 0000000000..0ac7680da1
--- /dev/null
+++ b/akka-actor-tests/src/test/scala/akka/routing/ActorPoolSpec.scala
@@ -0,0 +1,566 @@
+package akka.routing
+
+import org.scalatest.WordSpec
+import org.scalatest.matchers.MustMatchers
+import akka.dispatch.{ KeptPromise, Future }
+import java.util.concurrent.atomic.AtomicInteger
+import akka.actor.Actor._
+import akka.testkit.Testing._
+import akka.actor.{ TypedActor, Actor }
+import akka.testkit.TestLatch
+import akka.util.duration._
+
+object ActorPoolSpec {
+
+ trait Foo {
+ def sq(x: Int, sleep: Long): Future[Int]
+ }
+
+ class FooImpl extends Foo {
+ def sq(x: Int, sleep: Long): Future[Int] = {
+ if (sleep > 0) Thread.sleep(sleep)
+ new KeptPromise(Right(x * x))
+ }
+ }
+}
+
+class ActorPoolSpec extends WordSpec with MustMatchers {
+ import akka.routing.ActorPoolSpec._
+
+ "Actor Pool" must {
+
+ "have expected capacity" in {
+ val latch = TestLatch(2)
+ val count = new AtomicInteger(0)
+
+ val pool = actorOf(
+ new Actor with DefaultActorPool with DefaultActorPoolSupervisionConfig with FixedCapacityStrategy with SmallestMailboxSelector {
+ def factory = actorOf(new Actor {
+ def receive = {
+ case _ ⇒
+ count.incrementAndGet
+ latch.countDown()
+ self tryReply "success"
+ }
+ }).start()
+
+ def limit = 2
+ def selectionCount = 1
+ def partialFill = true
+ def instance = factory
+ def receive = _route
+ }).start()
+
+ val successes = TestLatch(2)
+ val successCounter = actorOf(new Actor {
+ def receive = {
+ case "success" ⇒ successes.countDown()
+ }
+ }).start()
+
+ implicit val replyTo = successCounter
+ pool ! "a"
+ pool ! "b"
+
+ latch.await
+ successes.await
+
+ count.get must be(2)
+
+ (pool ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2)
+
+ pool.stop()
+ }
+
+ "pass ticket #705" in {
+ val pool = actorOf(
+ new Actor with DefaultActorPool with DefaultActorPoolSupervisionConfig with BoundedCapacityStrategy with MailboxPressureCapacitor with SmallestMailboxSelector with BasicFilter {
+ def lowerBound = 2
+ def upperBound = 20
+ def rampupRate = 0.1
+ def backoffRate = 0.1
+ def backoffThreshold = 0.5
+ def partialFill = true
+ def selectionCount = 1
+ def instance = factory
+ def receive = _route
+ def pressureThreshold = 1
+ def factory = actorOf(new Actor {
+ def receive = {
+ case req: String ⇒ {
+ sleepFor(10 millis)
+ self.tryReply("Response")
+ }
+ }
+ })
+ }).start()
+
+ try {
+ (for (count ← 1 to 500) yield pool.?("Test", 20000)) foreach {
+ _.await.resultOrException.get must be("Response")
+ }
+ } finally {
+ pool.stop()
+ }
+ }
+
+ "grow as needed under pressure" in {
+ // make sure the pool starts at the expected lower limit and grows to the upper as needed
+ // as influenced by the backlog of blocking pooled actors
+
+ var latch = TestLatch(3)
+ val count = new AtomicInteger(0)
+
+ val pool = actorOf(
+ new Actor with DefaultActorPool with DefaultActorPoolSupervisionConfig with BoundedCapacityStrategy with ActiveFuturesPressureCapacitor with SmallestMailboxSelector with BasicNoBackoffFilter {
+ def factory = actorOf(new Actor {
+ def receive = {
+ case n: Int ⇒
+ sleepFor(n millis)
+ count.incrementAndGet
+ latch.countDown()
+ }
+ })
+
+ def lowerBound = 2
+ def upperBound = 4
+ def rampupRate = 0.1
+ def partialFill = true
+ def selectionCount = 1
+ def instance = factory
+ def receive = _route
+ }).start()
+
+ // first message should create the minimum number of delgates
+
+ pool ! 1
+
+ (pool ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2)
+
+ var loops = 0
+ def loop(t: Int) = {
+ latch = TestLatch(loops)
+ count.set(0)
+ for (m ← 0 until loops) {
+ pool ? t
+ sleepFor(50 millis)
+ }
+ }
+
+ // 2 more should go thru without triggering more
+
+ loops = 2
+
+ loop(500)
+ latch.await
+ count.get must be(loops)
+
+ (pool ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2)
+
+ // a whole bunch should max it out
+
+ loops = 10
+ loop(500)
+ latch.await
+ count.get must be(loops)
+
+ (pool ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(4)
+
+ pool.stop()
+ }
+
+ "grow as needed under mailbox pressure" in {
+ // make sure the pool starts at the expected lower limit and grows to the upper as needed
+ // as influenced by the backlog of messages in the delegate mailboxes
+
+ var latch = TestLatch(3)
+ val count = new AtomicInteger(0)
+
+ val pool = actorOf(
+ new Actor with DefaultActorPool with DefaultActorPoolSupervisionConfig with BoundedCapacityStrategy with MailboxPressureCapacitor with SmallestMailboxSelector with BasicNoBackoffFilter {
+ def factory = actorOf(new Actor {
+ def receive = {
+ case n: Int ⇒
+ sleepFor(n millis)
+ count.incrementAndGet
+ latch.countDown()
+ }
+ })
+
+ def lowerBound = 2
+ def upperBound = 4
+ def pressureThreshold = 3
+ def rampupRate = 0.1
+ def partialFill = true
+ def selectionCount = 1
+ def instance = factory
+ def receive = _route
+ }).start()
+
+ var loops = 0
+ def loop(t: Int) = {
+ latch = TestLatch(loops)
+ count.set(0)
+ for (m ← 0 until loops) {
+ pool ! t
+ }
+ }
+
+ // send a few messages and observe pool at its lower bound
+ loops = 3
+ loop(500)
+ latch.await
+ count.get must be(loops)
+
+ (pool ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2)
+
+ // send a bunch over the theshold and observe an increment
+ loops = 15
+ loop(500)
+
+ latch.await(10 seconds)
+ count.get must be(loops)
+
+ (pool ? ActorPool.Stat).as[ActorPool.Stats].get.size must be >= (3)
+
+ pool.stop()
+ }
+
+ "round robin" in {
+ val latch1 = TestLatch(2)
+ val delegates = new java.util.concurrent.ConcurrentHashMap[String, String]
+
+ val pool1 = actorOf(
+ new Actor with DefaultActorPool with DefaultActorPoolSupervisionConfig with FixedCapacityStrategy with RoundRobinSelector with BasicNoBackoffFilter {
+ def factory = actorOf(new Actor {
+ def receive = {
+ case _ ⇒
+ delegates put (self.uuid.toString, "")
+ latch1.countDown()
+ }
+ })
+
+ def limit = 1
+ def selectionCount = 1
+ def rampupRate = 0.1
+ def partialFill = true
+ def instance = factory
+ def receive = _route
+ }).start()
+
+ pool1 ! "a"
+ pool1 ! "b"
+
+ latch1.await
+ delegates.size must be(1)
+
+ pool1.stop()
+
+ val latch2 = TestLatch(2)
+ delegates.clear()
+
+ val pool2 = actorOf(
+ new Actor with DefaultActorPool with DefaultActorPoolSupervisionConfig with FixedCapacityStrategy with RoundRobinSelector with BasicNoBackoffFilter {
+ def factory = actorOf(new Actor {
+ def receive = {
+ case _ ⇒
+ delegates put (self.uuid.toString, "")
+ latch2.countDown()
+ }
+ })
+
+ def limit = 2
+ def selectionCount = 1
+ def rampupRate = 0.1
+ def partialFill = false
+ def instance = factory
+ def receive = _route
+ }).start()
+
+ pool2 ! "a"
+ pool2 ! "b"
+
+ latch2.await
+ delegates.size must be(2)
+
+ pool2.stop()
+ }
+
+ "backoff" in {
+ val latch = TestLatch(10)
+
+ val pool = actorOf(
+ new Actor with DefaultActorPool with DefaultActorPoolSupervisionConfig with BoundedCapacityStrategy with MailboxPressureCapacitor with SmallestMailboxSelector with Filter with RunningMeanBackoff with BasicRampup {
+ def factory = actorOf(new Actor {
+ def receive = {
+ case n: Int ⇒
+ sleepFor(n millis)
+ latch.countDown()
+ }
+ })
+
+ def lowerBound = 1
+ def upperBound = 5
+ def pressureThreshold = 1
+ def partialFill = true
+ def selectionCount = 1
+ def rampupRate = 0.1
+ def backoffRate = 0.50
+ def backoffThreshold = 0.50
+ def instance = factory
+ def receive = _route
+ }).start()
+
+ // put some pressure on the pool
+
+ for (m ← 0 to 10) pool ! 250
+
+ sleepFor(5 millis)
+
+ val z = (pool ? ActorPool.Stat).as[ActorPool.Stats].get.size
+
+ z must be >= (2)
+
+ // let it cool down
+
+ for (m ← 0 to 3) {
+ pool ! 1
+ sleepFor(500 millis)
+ }
+
+ (pool ? ActorPool.Stat).as[ActorPool.Stats].get.size must be <= (z)
+
+ pool.stop()
+ }
+
+ "support typed actors" in {
+ import RoutingSpec._
+ import TypedActor._
+ def createPool = new Actor with DefaultActorPool with DefaultActorPoolSupervisionConfig with BoundedCapacityStrategy with MailboxPressureCapacitor with SmallestMailboxSelector with Filter with RunningMeanBackoff with BasicRampup {
+ def lowerBound = 1
+ def upperBound = 5
+ def pressureThreshold = 1
+ def partialFill = true
+ def selectionCount = 1
+ def rampupRate = 0.1
+ def backoffRate = 0.50
+ def backoffThreshold = 0.50
+ def instance = getActorRefFor(typedActorOf[Foo, FooImpl]())
+ def receive = _route
+ }
+
+ val pool = createProxy[Foo](createPool)
+
+ val results = for (i ← 1 to 100) yield (i, pool.sq(i, 100))
+
+ for ((i, r) ← results) r.get must equal(i * i)
+ }
+
+ "provide default supervision of pooled actors" in {
+ import akka.config.Supervision._
+ val pingCount = new AtomicInteger(0)
+ val deathCount = new AtomicInteger(0)
+ var keepDying = false
+
+ val pool1 = actorOf(
+ new Actor with DefaultActorPool with DefaultActorPoolSupervisionConfig with BoundedCapacityStrategy with ActiveFuturesPressureCapacitor with SmallestMailboxSelector with BasicFilter {
+ def lowerBound = 2
+ def upperBound = 5
+ def rampupRate = 0.1
+ def backoffRate = 0.1
+ def backoffThreshold = 0.5
+ def partialFill = true
+ def selectionCount = 1
+ def instance = factory
+ def receive = _route
+ def pressureThreshold = 1
+ def factory = actorOf(new Actor {
+ if (deathCount.get > 5) deathCount.set(0)
+ if (deathCount.get > 0) { deathCount.incrementAndGet; throw new IllegalStateException("keep dying") }
+ def receive = {
+ case akka.Die ⇒
+ if (keepDying) deathCount.incrementAndGet
+ throw new RuntimeException
+ case _ ⇒ pingCount.incrementAndGet
+ }
+ }).start()
+ }).start()
+
+ val pool2 = actorOf(
+ new Actor with DefaultActorPool with DefaultActorPoolSupervisionConfig with BoundedCapacityStrategy with ActiveFuturesPressureCapacitor with SmallestMailboxSelector with BasicFilter {
+ def lowerBound = 2
+ def upperBound = 5
+ def rampupRate = 0.1
+ def backoffRate = 0.1
+ def backoffThreshold = 0.5
+ def partialFill = true
+ def selectionCount = 1
+ def instance = factory
+ def receive = _route
+ def pressureThreshold = 1
+ def factory = actorOf(new Actor {
+ self.lifeCycle = Permanent
+ if (deathCount.get > 5) deathCount.set(0)
+ if (deathCount.get > 0) { deathCount.incrementAndGet; throw new IllegalStateException("keep dying") }
+ def receive = {
+ case akka.Die ⇒
+ if (keepDying) deathCount.incrementAndGet
+ throw new RuntimeException
+ case _ ⇒ pingCount.incrementAndGet
+ }
+ }).start()
+ }).start()
+
+ val pool3 = actorOf(
+ new Actor with DefaultActorPool with DefaultActorPoolSupervisionConfig with BoundedCapacityStrategy with ActiveFuturesPressureCapacitor with RoundRobinSelector with BasicFilter {
+ def lowerBound = 2
+ def upperBound = 5
+ def rampupRate = 0.1
+ def backoffRate = 0.1
+ def backoffThreshold = 0.5
+ def partialFill = true
+ def selectionCount = 1
+ def instance = factory
+ def receive = _route
+ def pressureThreshold = 1
+ def factory = actorOf(new Actor {
+ self.lifeCycle = Temporary
+ if (deathCount.get > 5) deathCount.set(0)
+ if (deathCount.get > 0) { deathCount.incrementAndGet; throw new IllegalStateException("keep dying") }
+ def receive = {
+ case akka.Die ⇒
+ if (keepDying) deathCount.incrementAndGet
+ throw new RuntimeException
+ case _ ⇒ pingCount.incrementAndGet
+ }
+ }).start()
+ }).start()
+
+ // default lifecycle
+ // actor comes back right away
+ pingCount.set(0)
+ keepDying = false
+ pool1 ! "ping"
+ (pool1 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2)
+ pool1 ! akka.Die
+ sleepFor(2 seconds)
+ (pool1 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2)
+ pingCount.get must be(1)
+
+ // default lifecycle
+ // actor dies completely
+ pingCount.set(0)
+ keepDying = true
+ pool1 ! "ping"
+ (pool1 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2)
+ pool1 ! akka.Die
+ sleepFor(2 seconds)
+ (pool1 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(1)
+ pool1 ! "ping"
+ (pool1 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2)
+ pingCount.get must be(2)
+
+ // permanent lifecycle
+ // actor comes back right away
+ pingCount.set(0)
+ keepDying = false
+ pool2 ! "ping"
+ (pool2 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2)
+ pool2 ! akka.Die
+ sleepFor(2 seconds)
+ (pool2 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2)
+ pingCount.get must be(1)
+
+ // permanent lifecycle
+ // actor dies completely
+ pingCount.set(0)
+ keepDying = true
+ pool2 ! "ping"
+ (pool2 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2)
+ pool2 ! akka.Die
+ sleepFor(2 seconds)
+ (pool2 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(1)
+ pool2 ! "ping"
+ (pool2 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2)
+ pingCount.get must be(2)
+
+ // temporary lifecycle
+ pingCount.set(0)
+ keepDying = false
+ pool3 ! "ping"
+ (pool3 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2)
+ pool3 ! akka.Die
+ sleepFor(2 seconds)
+ (pool3 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(1)
+ pool3 ! "ping"
+ pool3 ! "ping"
+ pool3 ! "ping"
+ (pool3 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2)
+ pingCount.get must be(4)
+ }
+
+ "support customizable supervision config of pooled actors" in {
+ import akka.config.Supervision._
+ val pingCount = new AtomicInteger(0)
+ val deathCount = new AtomicInteger(0)
+ var keepDying = false
+
+ trait LimitedTrapSupervisionConfig extends ActorPoolSupervisionConfig {
+ def poolFaultHandler = OneForOneStrategy(List(classOf[IllegalStateException]), 5, 1000)
+ }
+
+ object BadState
+
+ val pool1 = actorOf(
+ new Actor with DefaultActorPool with LimitedTrapSupervisionConfig with BoundedCapacityStrategy with ActiveFuturesPressureCapacitor with SmallestMailboxSelector with BasicFilter {
+ def lowerBound = 2
+ def upperBound = 5
+ def rampupRate = 0.1
+ def backoffRate = 0.1
+ def backoffThreshold = 0.5
+ def partialFill = true
+ def selectionCount = 1
+ def instance = factory
+ def receive = _route
+ def pressureThreshold = 1
+ def factory = actorOf(new Actor {
+ if (deathCount.get > 5) deathCount.set(0)
+ if (deathCount.get > 0) { deathCount.incrementAndGet; throw new IllegalStateException("keep dying") }
+ def receive = {
+ case BadState ⇒
+ if (keepDying) deathCount.incrementAndGet
+ throw new IllegalStateException
+ case akka.Die ⇒
+ throw new RuntimeException
+ case _ ⇒ pingCount.incrementAndGet
+ }
+ }).start()
+ }).start()
+
+ // actor comes back right away
+ pingCount.set(0)
+ keepDying = false
+ pool1 ! "ping"
+ (pool1 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2)
+ pool1 ! BadState
+ sleepFor(2 seconds)
+ (pool1 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2)
+ pingCount.get must be(1)
+
+ // actor dies completely
+ pingCount.set(0)
+ keepDying = true
+ pool1 ! "ping"
+ (pool1 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2)
+ pool1 ! BadState
+ sleepFor(2 seconds)
+ (pool1 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(1)
+ pool1 ! "ping"
+ (pool1 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2)
+ pingCount.get must be(2)
+
+ // kill it
+ intercept[RuntimeException](pool1.?(akka.Die).get)
+ }
+ }
+}
\ No newline at end of file
diff --git a/akka-actor-tests/src/test/scala/akka/routing/ListenerSpec.scala b/akka-actor-tests/src/test/scala/akka/routing/ListenerSpec.scala
new file mode 100644
index 0000000000..e7e00c1ea2
--- /dev/null
+++ b/akka-actor-tests/src/test/scala/akka/routing/ListenerSpec.scala
@@ -0,0 +1,58 @@
+package akka.actor.routing
+
+import org.scalatest.WordSpec
+import org.scalatest.matchers.MustMatchers
+
+import akka.testkit._
+import akka.actor._
+import akka.actor.Actor._
+import akka.routing._
+import java.util.concurrent.atomic.AtomicInteger
+
+class ListenerSpec extends WordSpec with MustMatchers {
+
+ "Listener" must {
+
+ "listen" in {
+ val fooLatch = TestLatch(2)
+ val barLatch = TestLatch(2)
+ val barCount = new AtomicInteger(0)
+
+ val broadcast = actorOf(new Actor with Listeners {
+ def receive = listenerManagement orElse {
+ case "foo" ⇒ gossip("bar")
+ }
+ }).start()
+
+ def newListener = actorOf(new Actor {
+ def receive = {
+ case "bar" ⇒
+ barCount.incrementAndGet
+ barLatch.countDown()
+ case "foo" ⇒
+ fooLatch.countDown()
+ }
+ }).start()
+
+ val a1 = newListener
+ val a2 = newListener
+ val a3 = newListener
+
+ broadcast ! Listen(a1)
+ broadcast ! Listen(a2)
+ broadcast ! Listen(a3)
+
+ broadcast ! Deafen(a3)
+
+ broadcast ! WithListeners(_ ! "foo")
+ broadcast ! "foo"
+
+ barLatch.await
+ barCount.get must be(2)
+
+ fooLatch.await
+
+ for (a ← List(broadcast, a1, a2, a3)) a.stop()
+ }
+ }
+}
\ No newline at end of file
diff --git a/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala b/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala
index 8fd5d31bf0..962eba6183 100644
--- a/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala
+++ b/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala
@@ -1,720 +1,319 @@
-package akka.actor.routing
+package akka.routing
import org.scalatest.WordSpec
import org.scalatest.matchers.MustMatchers
-
-import akka.testkit._
-import akka.testkit.Testing.sleepFor
-import akka.util.duration._
-
-import akka.actor._
-import akka.actor.Actor._
import akka.routing._
-import akka.event.EventHandler
-
+import akka.routing.Router
import java.util.concurrent.atomic.AtomicInteger
-import akka.dispatch.{ KeptPromise, Future }
+import akka.actor.Actor._
+import akka.actor.{ ActorRef, Actor }
+import collection.mutable.LinkedList
+import akka.routing.Routing.Broadcast
+import java.util.concurrent.{ CountDownLatch, TimeUnit }
object RoutingSpec {
- trait Foo {
- def sq(x: Int, sleep: Long): Future[Int]
- }
- class FooImpl extends Foo {
- def sq(x: Int, sleep: Long): Future[Int] = {
- if (sleep > 0) Thread.sleep(sleep)
- new KeptPromise(Right(x * x))
+ class TestActor extends Actor with Serializable {
+ def receive = {
+ case _ ⇒
+ println("Hello")
}
}
}
class RoutingSpec extends WordSpec with MustMatchers {
- import Routing._
- "Routing" must {
+ import akka.routing.RoutingSpec._
- "dispatch" in {
- val Test1 = "test1"
- val Test2 = "test2"
- val Test3 = "test3"
+ "direct router" must {
+ "be started when constructed" in {
+ val actor1 = Actor.actorOf[TestActor].start
- val t1 = actorOf(new Actor {
- def receive = {
- case Test1 ⇒ self.reply(3)
- case Test2 ⇒ self.reply(7)
- }
- }).start()
-
- val t2 = actorOf(new Actor() {
- def receive = {
- case Test3 ⇒ self.reply(11)
- }
- }).start()
-
- val d = routerActor {
- case Test1 | Test2 ⇒ t1
- case Test3 ⇒ t2
- }.start()
-
- implicit val timeout = Timeout((5 seconds).dilated)
- val result = for {
- a ← (d ? (Test1)).as[Int]
- b ← (d ? (Test2)).as[Int]
- c ← (d ? (Test3)).as[Int]
- } yield a + b + c
-
- result.isDefined must be(true)
- result.get must be(21)
-
- for (a ← List(t1, t2, d)) a.stop()
+ val actor = Routing.newRoutedActorRef("foo", List(actor1), RouterType.Direct)
+ actor.isRunning must be(true)
}
- "have messages logged" in {
- val msgs = new java.util.concurrent.ConcurrentSkipListSet[Any]
- val latch = TestLatch(2)
-
- val actor = actorOf(new Actor {
- def receive = { case _ ⇒ }
- }).start()
-
- val logger = loggerActor(actor, x ⇒ { msgs.add(x); latch.countDown() }).start()
-
- val foo: Any = "foo"
- val bar: Any = "bar"
-
- logger ! foo
- logger ! bar
-
- latch.await
-
- msgs must (have size (2) and contain(foo) and contain(bar))
-
- actor.stop()
- logger.stop()
- }
-
- "dispatch to smallest mailbox" in {
- val t1Count = new AtomicInteger(0)
- val t2Count = new AtomicInteger(0)
- val latch1 = TestLatch(2501)
- val latch2 = TestLatch(2499)
-
- val t1 = actorOf(new Actor {
- def receive = {
- case x ⇒
- t1Count.incrementAndGet
- latch1.countDown()
- }
- }).start()
-
- t1.dispatcher.suspend(t1)
-
- for (i ← 1 to 2501) t1 ! i
-
- val t2 = actorOf(new Actor {
- def receive = {
- case x ⇒
- t2Count.incrementAndGet
- latch2.countDown()
- }
- }).start()
-
- val d = loadBalancerActor(new SmallestMailboxFirstIterator(t1 :: t2 :: Nil)) //Will pick the last with the smallest mailbox, so make sure t1 is last
-
- for (i ← 1 to 2499) d ! i
-
- latch2.await(20 seconds)
-
- t1.dispatcher.resume(t1)
-
+ "throw IllegalArgumentException at construction when no connections" in {
try {
- latch1.await(20 seconds)
- } finally {
- // because t1 is much slower and thus has a bigger mailbox all the time
- t1Count.get must be === 2501
- t2Count.get must be === 2499
- for (a ← List(t1, t2, d)) a.stop()
+ Routing.newRoutedActorRef("foo", List(), RouterType.Direct)
+ fail()
+ } catch {
+ case e: IllegalArgumentException ⇒
}
}
- "listen" in {
- val fooLatch = TestLatch(2)
- val barLatch = TestLatch(2)
- val barCount = new AtomicInteger(0)
+ "send message to connection" in {
+ val doneLatch = new CountDownLatch(1)
- val broadcast = actorOf(new Actor with Listeners {
- def receive = listenerManagement orElse {
- case "foo" ⇒ gossip("bar")
- }
- }).start()
-
- def newListener = actorOf(new Actor {
+ val counter = new AtomicInteger(0)
+ val connection1 = actorOf(new Actor {
def receive = {
- case "bar" ⇒
- barCount.incrementAndGet
- barLatch.countDown()
- case "foo" ⇒
- fooLatch.countDown()
+ case "end" ⇒ doneLatch.countDown()
+ case _ ⇒ counter.incrementAndGet
}
}).start()
- val a1 = newListener
- val a2 = newListener
- val a3 = newListener
+ val routedActor = Routing.newRoutedActorRef("foo", List(connection1), RouterType.Direct)
+ routedActor ! "hello"
+ routedActor ! "end"
- broadcast ! Listen(a1)
- broadcast ! Listen(a2)
- broadcast ! Listen(a3)
+ doneLatch.await(5, TimeUnit.SECONDS) must be(true)
- broadcast ! Deafen(a3)
+ counter.get must be(1)
+ }
- broadcast ! WithListeners(_ ! "foo")
- broadcast ! "foo"
+ "deliver a broadcast message" in {
+ val doneLatch = new CountDownLatch(1)
- barLatch.await
- barCount.get must be(2)
+ val counter1 = new AtomicInteger
+ val connection1 = actorOf(new Actor {
+ def receive = {
+ case "end" ⇒ doneLatch.countDown()
+ case msg: Int ⇒ counter1.addAndGet(msg)
+ }
+ }).start()
- fooLatch.await
+ val actor = Routing.newRoutedActorRef("foo", List(connection1), RouterType.Direct)
- for (a ← List(broadcast, a1, a2, a3)) a.stop()
+ actor ! Broadcast(1)
+ actor ! "end"
+
+ doneLatch.await(5, TimeUnit.SECONDS) must be(true)
+
+ counter1.get must be(1)
}
}
- "Actor Pool" must {
+ "round robin router" must {
- "have expected capacity" in {
- val latch = TestLatch(2)
- val count = new AtomicInteger(0)
+ "be started when constructed" in {
+ val actor1 = Actor.actorOf[TestActor].start
- val pool = actorOf(
- new Actor with DefaultActorPool with DefaultActorPoolSupervisionConfig with FixedCapacityStrategy with SmallestMailboxSelector {
- def factory = actorOf(new Actor {
- def receive = {
- case _ ⇒
- count.incrementAndGet
- latch.countDown()
- self tryReply "success"
- }
- }).start()
+ val actor = Routing.newRoutedActorRef("foo", List(actor1), RouterType.RoundRobin)
+ actor.isRunning must be(true)
+ }
- def limit = 2
- def selectionCount = 1
- def partialFill = true
- def instance = factory
- def receive = _route
+ "throw IllegalArgumentException at construction when no connections" in {
+ try {
+ Routing.newRoutedActorRef("foo", List(), RouterType.RoundRobin)
+ fail()
+ } catch {
+ case e: IllegalArgumentException ⇒
+ }
+ }
+
+ //In this test a bunch of actors are created and each actor has its own counter.
+ //to test round robin, the routed actor receives the following sequence of messages 1 2 3 .. 1 2 3 .. 1 2 3 which it
+ //uses to increment his counter.
+ //So after n iteration, the first actor his counter should be 1*n, the second 2*n etc etc.
+ "deliver messages in a round robin fashion" in {
+ val connectionCount = 10
+ val iterationCount = 10
+ val doneLatch = new CountDownLatch(connectionCount)
+
+ //lets create some connections.
+ var connections = new LinkedList[ActorRef]
+ var counters = new LinkedList[AtomicInteger]
+ for (i ← 0 until connectionCount) {
+ counters = counters :+ new AtomicInteger()
+
+ val connection = actorOf(new Actor {
+ def receive = {
+ case "end" ⇒ doneLatch.countDown()
+ case msg: Int ⇒ counters.get(i).get.addAndGet(msg)
+ }
}).start()
+ connections = connections :+ connection
+ }
- val successes = TestLatch(2)
- val successCounter = actorOf(new Actor {
+ //create the routed actor.
+ val actor = Routing.newRoutedActorRef("foo", connections, RouterType.RoundRobin)
+
+ //send messages to the actor.
+ for (i ← 0 until iterationCount) {
+ for (k ← 0 until connectionCount) {
+ actor ! (k + 1)
+ }
+ }
+
+ actor ! Broadcast("end")
+ //now wait some and do validations.
+ doneLatch.await(5, TimeUnit.SECONDS) must be(true)
+
+ for (i ← 0 until connectionCount) {
+ val counter = counters.get(i).get
+ counter.get must be((iterationCount * (i + 1)))
+ }
+ }
+
+ "deliver a broadcast message using the !" in {
+ val doneLatch = new CountDownLatch(2)
+
+ val counter1 = new AtomicInteger
+ val connection1 = actorOf(new Actor {
def receive = {
- case "success" ⇒ successes.countDown()
+ case "end" ⇒ doneLatch.countDown()
+ case msg: Int ⇒ counter1.addAndGet(msg)
}
}).start()
- implicit val replyTo = successCounter
- pool ! "a"
- pool ! "b"
+ val counter2 = new AtomicInteger
+ val connection2 = actorOf(new Actor {
+ def receive = {
+ case "end" ⇒ doneLatch.countDown()
+ case msg: Int ⇒ counter2.addAndGet(msg)
+ }
+ }).start()
- latch.await
- successes.await
+ val actor = Routing.newRoutedActorRef("foo", List(connection1, connection2), RouterType.RoundRobin)
- count.get must be(2)
+ actor ! Broadcast(1)
+ actor ! Broadcast("end")
- (pool ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2)
+ doneLatch.await(5, TimeUnit.SECONDS) must be(true)
- pool.stop()
+ counter1.get must be(1)
+ counter2.get must be(1)
}
- "pass ticket #705" in {
- val pool = actorOf(
- new Actor with DefaultActorPool with DefaultActorPoolSupervisionConfig with BoundedCapacityStrategy with MailboxPressureCapacitor with SmallestMailboxSelector with BasicFilter {
- def lowerBound = 2
- def upperBound = 20
- def rampupRate = 0.1
- def backoffRate = 0.1
- def backoffThreshold = 0.5
- def partialFill = true
- def selectionCount = 1
- def instance = factory
- def receive = _route
- def pressureThreshold = 1
- def factory = actorOf(new Actor {
- def receive = {
- case req: String ⇒ {
- sleepFor(10 millis)
- self.tryReply("Response")
- }
- }
- })
- }).start()
+ "fail to deliver a broadcast message using the ?" in {
+ val doneLatch = new CountDownLatch(1)
+
+ val counter1 = new AtomicInteger
+ val connection1 = actorOf(new Actor {
+ def receive = {
+ case "end" ⇒ doneLatch.countDown()
+ case _ ⇒ counter1.incrementAndGet()
+ }
+ }).start()
+
+ val actor = Routing.newRoutedActorRef("foo", List(connection1), RouterType.RoundRobin)
try {
- (for (count ← 1 to 500) yield pool.?("Test", 20000)) foreach {
- _.await.resultOrException.get must be("Response")
+ actor ? Broadcast(1)
+ fail()
+ } catch {
+ case e: RoutingException ⇒
+ }
+
+ actor ! "end"
+ doneLatch.await(5, TimeUnit.SECONDS) must be(true)
+ counter1.get must be(0)
+ }
+ }
+
+ "random router" must {
+
+ "be started when constructed" in {
+
+ val actor1 = Actor.actorOf[TestActor].start
+
+ val actor = Routing.newRoutedActorRef("foo", List(actor1), RouterType.Random)
+ actor.isRunning must be(true)
+ }
+
+ "throw IllegalArgumentException at construction when no connections" in {
+ try {
+ Routing.newRoutedActorRef("foo", List(), RouterType.Random)
+ fail()
+ } catch {
+ case e: IllegalArgumentException ⇒
+ }
+ }
+
+ "deliver messages in a random fashion" in {
+
+ }
+
+ "deliver a broadcast message" in {
+ val doneLatch = new CountDownLatch(2)
+
+ val counter1 = new AtomicInteger
+ val connection1 = actorOf(new Actor {
+ def receive = {
+ case "end" ⇒ doneLatch.countDown()
+ case msg: Int ⇒ counter1.addAndGet(msg)
}
- } finally {
- pool.stop()
- }
- }
+ }).start()
- "grow as needed under pressure" in {
- // make sure the pool starts at the expected lower limit and grows to the upper as needed
- // as influenced by the backlog of blocking pooled actors
-
- var latch = TestLatch(3)
- val count = new AtomicInteger(0)
-
- val pool = actorOf(
- new Actor with DefaultActorPool with DefaultActorPoolSupervisionConfig with BoundedCapacityStrategy with ActiveFuturesPressureCapacitor with SmallestMailboxSelector with BasicNoBackoffFilter {
- def factory = actorOf(new Actor {
- def receive = {
- case n: Int ⇒
- sleepFor(n millis)
- count.incrementAndGet
- latch.countDown()
- }
- })
-
- def lowerBound = 2
- def upperBound = 4
- def rampupRate = 0.1
- def partialFill = true
- def selectionCount = 1
- def instance = factory
- def receive = _route
- }).start()
-
- // first message should create the minimum number of delgates
-
- pool ! 1
-
- (pool ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2)
-
- var loops = 0
- def loop(t: Int) = {
- latch = TestLatch(loops)
- count.set(0)
- for (m ← 0 until loops) {
- pool ? t
- sleepFor(50 millis)
+ val counter2 = new AtomicInteger
+ val connection2 = actorOf(new Actor {
+ def receive = {
+ case "end" ⇒ doneLatch.countDown()
+ case msg: Int ⇒ counter2.addAndGet(msg)
}
- }
+ }).start()
- // 2 more should go thru without triggering more
+ val actor = Routing.newRoutedActorRef("foo", List(connection1, connection2), RouterType.Random)
- loops = 2
+ actor ! Broadcast(1)
+ actor ! Broadcast("end")
- loop(500)
- latch.await
- count.get must be(loops)
+ doneLatch.await(5, TimeUnit.SECONDS) must be(true)
- (pool ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2)
-
- // a whole bunch should max it out
-
- loops = 10
- loop(500)
- latch.await
- count.get must be(loops)
-
- (pool ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(4)
-
- pool.stop()
+ counter1.get must be(1)
+ counter2.get must be(1)
}
- "grow as needed under mailbox pressure" in {
- // make sure the pool starts at the expected lower limit and grows to the upper as needed
- // as influenced by the backlog of messages in the delegate mailboxes
+ "fail to deliver a broadcast message using the ?" in {
+ val doneLatch = new CountDownLatch(1)
- var latch = TestLatch(3)
- val count = new AtomicInteger(0)
-
- val pool = actorOf(
- new Actor with DefaultActorPool with DefaultActorPoolSupervisionConfig with BoundedCapacityStrategy with MailboxPressureCapacitor with SmallestMailboxSelector with BasicNoBackoffFilter {
- def factory = actorOf(new Actor {
- def receive = {
- case n: Int ⇒
- sleepFor(n millis)
- count.incrementAndGet
- latch.countDown()
- }
- })
-
- def lowerBound = 2
- def upperBound = 4
- def pressureThreshold = 3
- def rampupRate = 0.1
- def partialFill = true
- def selectionCount = 1
- def instance = factory
- def receive = _route
- }).start()
-
- var loops = 0
- def loop(t: Int) = {
- latch = TestLatch(loops)
- count.set(0)
- for (m ← 0 until loops) {
- pool ! t
+ val counter1 = new AtomicInteger
+ val connection1 = actorOf(new Actor {
+ def receive = {
+ case "end" ⇒ doneLatch.countDown()
+ case _ ⇒ counter1.incrementAndGet()
}
+ }).start()
+
+ val actor = Routing.newRoutedActorRef("foo", List(connection1), RouterType.Random)
+
+ try {
+ actor ? Broadcast(1)
+ fail()
+ } catch {
+ case e: RoutingException ⇒
}
- // send a few messages and observe pool at its lower bound
- loops = 3
- loop(500)
- latch.await
- count.get must be(loops)
-
- (pool ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2)
-
- // send a bunch over the theshold and observe an increment
- loops = 15
- loop(500)
-
- latch.await(10 seconds)
- count.get must be(loops)
-
- (pool ? ActorPool.Stat).as[ActorPool.Stats].get.size must be >= (3)
-
- pool.stop()
+ actor ! "end"
+ doneLatch.await(5, TimeUnit.SECONDS) must be(true)
+ counter1.get must be(0)
}
+ }
- "round robin" in {
- val latch1 = TestLatch(2)
- val delegates = new java.util.concurrent.ConcurrentHashMap[String, String]
+ "least cpu router" must {
+ "throw IllegalArgumentException when constructed" in {
+ val actor1 = Actor.actorOf[TestActor].start
- val pool1 = actorOf(
- new Actor with DefaultActorPool with DefaultActorPoolSupervisionConfig with FixedCapacityStrategy with RoundRobinSelector with BasicNoBackoffFilter {
- def factory = actorOf(new Actor {
- def receive = {
- case _ ⇒
- delegates put (self.uuid.toString, "")
- latch1.countDown()
- }
- })
-
- def limit = 1
- def selectionCount = 1
- def rampupRate = 0.1
- def partialFill = true
- def instance = factory
- def receive = _route
- }).start()
-
- pool1 ! "a"
- pool1 ! "b"
-
- latch1.await
- delegates.size must be(1)
-
- pool1.stop()
-
- val latch2 = TestLatch(2)
- delegates.clear()
-
- val pool2 = actorOf(
- new Actor with DefaultActorPool with DefaultActorPoolSupervisionConfig with FixedCapacityStrategy with RoundRobinSelector with BasicNoBackoffFilter {
- def factory = actorOf(new Actor {
- def receive = {
- case _ ⇒
- delegates put (self.uuid.toString, "")
- latch2.countDown()
- }
- })
-
- def limit = 2
- def selectionCount = 1
- def rampupRate = 0.1
- def partialFill = false
- def instance = factory
- def receive = _route
- }).start()
-
- pool2 ! "a"
- pool2 ! "b"
-
- latch2.await
- delegates.size must be(2)
-
- pool2.stop()
- }
-
- "backoff" in {
- val latch = TestLatch(10)
-
- val pool = actorOf(
- new Actor with DefaultActorPool with DefaultActorPoolSupervisionConfig with BoundedCapacityStrategy with MailboxPressureCapacitor with SmallestMailboxSelector with Filter with RunningMeanBackoff with BasicRampup {
- def factory = actorOf(new Actor {
- def receive = {
- case n: Int ⇒
- sleepFor(n millis)
- latch.countDown()
- }
- })
-
- def lowerBound = 1
- def upperBound = 5
- def pressureThreshold = 1
- def partialFill = true
- def selectionCount = 1
- def rampupRate = 0.1
- def backoffRate = 0.50
- def backoffThreshold = 0.50
- def instance = factory
- def receive = _route
- }).start()
-
- // put some pressure on the pool
-
- for (m ← 0 to 10) pool ! 250
-
- sleepFor(5 millis)
-
- val z = (pool ? ActorPool.Stat).as[ActorPool.Stats].get.size
-
- z must be >= (2)
-
- // let it cool down
-
- for (m ← 0 to 3) {
- pool ! 1
- sleepFor(500 millis)
+ try {
+ Routing.newRoutedActorRef("foo", List(actor1), RouterType.LeastCPU)
+ } catch {
+ case e: IllegalArgumentException ⇒
}
-
- (pool ? ActorPool.Stat).as[ActorPool.Stats].get.size must be <= (z)
-
- pool.stop()
}
+ }
- "support typed actors" in {
- import RoutingSpec._
- import TypedActor._
- def createPool = new Actor with DefaultActorPool with DefaultActorPoolSupervisionConfig with BoundedCapacityStrategy with MailboxPressureCapacitor with SmallestMailboxSelector with Filter with RunningMeanBackoff with BasicRampup {
- def lowerBound = 1
- def upperBound = 5
- def pressureThreshold = 1
- def partialFill = true
- def selectionCount = 1
- def rampupRate = 0.1
- def backoffRate = 0.50
- def backoffThreshold = 0.50
- def instance = getActorRefFor(typedActorOf[Foo, FooImpl]())
- def receive = _route
+ "least ram router" must {
+ "throw IllegalArgumentException when constructed" in {
+ val actor1 = Actor.actorOf[TestActor].start
+
+ try {
+ Routing.newRoutedActorRef("foo", List(actor1), RouterType.LeastRAM)
+ } catch {
+ case e: IllegalArgumentException ⇒
}
-
- val pool = createProxy[Foo](createPool)
-
- val results = for (i ← 1 to 100) yield (i, pool.sq(i, 100))
-
- for ((i, r) ← results) r.get must equal(i * i)
}
+ }
- "provide default supervision of pooled actors" in {
- import akka.config.Supervision._
- val pingCount = new AtomicInteger(0)
- val deathCount = new AtomicInteger(0)
- var keepDying = false
+ "smallest mailbox" must {
+ "throw IllegalArgumentException when constructed" in {
+ val actor1 = Actor.actorOf[TestActor].start
- val pool1 = actorOf(
- new Actor with DefaultActorPool with DefaultActorPoolSupervisionConfig with BoundedCapacityStrategy with ActiveFuturesPressureCapacitor with SmallestMailboxSelector with BasicFilter {
- def lowerBound = 2
- def upperBound = 5
- def rampupRate = 0.1
- def backoffRate = 0.1
- def backoffThreshold = 0.5
- def partialFill = true
- def selectionCount = 1
- def instance = factory
- def receive = _route
- def pressureThreshold = 1
- def factory = actorOf(new Actor {
- if (deathCount.get > 5) deathCount.set(0)
- if (deathCount.get > 0) { deathCount.incrementAndGet; throw new IllegalStateException("keep dying") }
- def receive = {
- case akka.Die ⇒
- if (keepDying) deathCount.incrementAndGet
- throw new RuntimeException
- case _ ⇒ pingCount.incrementAndGet
- }
- }).start()
- }).start()
-
- val pool2 = actorOf(
- new Actor with DefaultActorPool with DefaultActorPoolSupervisionConfig with BoundedCapacityStrategy with ActiveFuturesPressureCapacitor with SmallestMailboxSelector with BasicFilter {
- def lowerBound = 2
- def upperBound = 5
- def rampupRate = 0.1
- def backoffRate = 0.1
- def backoffThreshold = 0.5
- def partialFill = true
- def selectionCount = 1
- def instance = factory
- def receive = _route
- def pressureThreshold = 1
- def factory = actorOf(new Actor {
- self.lifeCycle = Permanent
- if (deathCount.get > 5) deathCount.set(0)
- if (deathCount.get > 0) { deathCount.incrementAndGet; throw new IllegalStateException("keep dying") }
- def receive = {
- case akka.Die ⇒
- if (keepDying) deathCount.incrementAndGet
- throw new RuntimeException
- case _ ⇒ pingCount.incrementAndGet
- }
- }).start()
- }).start()
-
- val pool3 = actorOf(
- new Actor with DefaultActorPool with DefaultActorPoolSupervisionConfig with BoundedCapacityStrategy with ActiveFuturesPressureCapacitor with RoundRobinSelector with BasicFilter {
- def lowerBound = 2
- def upperBound = 5
- def rampupRate = 0.1
- def backoffRate = 0.1
- def backoffThreshold = 0.5
- def partialFill = true
- def selectionCount = 1
- def instance = factory
- def receive = _route
- def pressureThreshold = 1
- def factory = actorOf(new Actor {
- self.lifeCycle = Temporary
- if (deathCount.get > 5) deathCount.set(0)
- if (deathCount.get > 0) { deathCount.incrementAndGet; throw new IllegalStateException("keep dying") }
- def receive = {
- case akka.Die ⇒
- if (keepDying) deathCount.incrementAndGet
- throw new RuntimeException
- case _ ⇒ pingCount.incrementAndGet
- }
- }).start()
- }).start()
-
- // default lifecycle
- // actor comes back right away
- pingCount.set(0)
- keepDying = false
- pool1 ! "ping"
- (pool1 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2)
- pool1 ! akka.Die
- sleepFor(2 seconds)
- (pool1 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2)
- pingCount.get must be(1)
-
- // default lifecycle
- // actor dies completely
- pingCount.set(0)
- keepDying = true
- pool1 ! "ping"
- (pool1 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2)
- pool1 ! akka.Die
- sleepFor(2 seconds)
- (pool1 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(1)
- pool1 ! "ping"
- (pool1 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2)
- pingCount.get must be(2)
-
- // permanent lifecycle
- // actor comes back right away
- pingCount.set(0)
- keepDying = false
- pool2 ! "ping"
- (pool2 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2)
- pool2 ! akka.Die
- sleepFor(2 seconds)
- (pool2 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2)
- pingCount.get must be(1)
-
- // permanent lifecycle
- // actor dies completely
- pingCount.set(0)
- keepDying = true
- pool2 ! "ping"
- (pool2 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2)
- pool2 ! akka.Die
- sleepFor(2 seconds)
- (pool2 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(1)
- pool2 ! "ping"
- (pool2 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2)
- pingCount.get must be(2)
-
- // temporary lifecycle
- pingCount.set(0)
- keepDying = false
- pool3 ! "ping"
- (pool3 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2)
- pool3 ! akka.Die
- sleepFor(2 seconds)
- (pool3 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(1)
- pool3 ! "ping"
- pool3 ! "ping"
- pool3 ! "ping"
- (pool3 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2)
- pingCount.get must be(4)
- }
-
- "support customizable supervision config of pooled actors" in {
- import akka.config.Supervision._
- val pingCount = new AtomicInteger(0)
- val deathCount = new AtomicInteger(0)
- var keepDying = false
-
- trait LimitedTrapSupervisionConfig extends ActorPoolSupervisionConfig {
- def poolFaultHandler = OneForOneStrategy(List(classOf[IllegalStateException]), 5, 1000)
+ try {
+ Routing.newRoutedActorRef("foo", List(actor1), RouterType.LeastMessages)
+ } catch {
+ case e: IllegalArgumentException ⇒
}
-
- object BadState
-
- val pool1 = actorOf(
- new Actor with DefaultActorPool with LimitedTrapSupervisionConfig with BoundedCapacityStrategy with ActiveFuturesPressureCapacitor with SmallestMailboxSelector with BasicFilter {
- def lowerBound = 2
- def upperBound = 5
- def rampupRate = 0.1
- def backoffRate = 0.1
- def backoffThreshold = 0.5
- def partialFill = true
- def selectionCount = 1
- def instance = factory
- def receive = _route
- def pressureThreshold = 1
- def factory = actorOf(new Actor {
- if (deathCount.get > 5) deathCount.set(0)
- if (deathCount.get > 0) { deathCount.incrementAndGet; throw new IllegalStateException("keep dying") }
- def receive = {
- case BadState ⇒
- if (keepDying) deathCount.incrementAndGet
- throw new IllegalStateException
- case akka.Die ⇒
- throw new RuntimeException
- case _ ⇒ pingCount.incrementAndGet
- }
- }).start()
- }).start()
-
- // actor comes back right away
- pingCount.set(0)
- keepDying = false
- pool1 ! "ping"
- (pool1 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2)
- pool1 ! BadState
- sleepFor(2 seconds)
- (pool1 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2)
- pingCount.get must be(1)
-
- // actor dies completely
- pingCount.set(0)
- keepDying = true
- pool1 ! "ping"
- (pool1 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2)
- pool1 ! BadState
- sleepFor(2 seconds)
- (pool1 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(1)
- pool1 ! "ping"
- (pool1 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2)
- pingCount.get must be(2)
-
- // kill it
- intercept[RuntimeException](pool1.?(akka.Die).get)
}
}
}
-
diff --git a/akka-actor/src/main/scala/akka/actor/ActorRef.scala b/akka-actor/src/main/scala/akka/actor/ActorRef.scala
index 920d2c1041..71b5e7e2d1 100644
--- a/akka-actor/src/main/scala/akka/actor/ActorRef.scala
+++ b/akka-actor/src/main/scala/akka/actor/ActorRef.scala
@@ -649,7 +649,8 @@ class LocalActorRef private[akka] (private[this] val actorFactory: () ⇒ Actor,
def mailbox: AnyRef = _mailbox
protected[akka] def mailbox_=(value: AnyRef): AnyRef = {
- _mailbox = value; value
+ _mailbox = value;
+ value
}
/**
@@ -1257,3 +1258,57 @@ case class SerializedActorRef(uuid: Uuid,
"] but it's not found in the local registry and remoting is not enabled.")
}
}
+
+/**
+ * Trait for ActorRef implementations where most of the methods are not supported.
+ */
+trait UnsupportedActorRef extends ActorRef with ScalaActorRef {
+
+ def dispatcher_=(md: MessageDispatcher) {
+ unsupported
+ }
+
+ def dispatcher: MessageDispatcher = unsupported
+
+ def link(actorRef: ActorRef) {
+ unsupported
+ }
+
+ def unlink(actorRef: ActorRef) {
+ unsupported
+ }
+
+ def startLink(actorRef: ActorRef): ActorRef = unsupported
+
+ def supervisor: Option[ActorRef] = unsupported
+
+ def linkedActors: JMap[Uuid, ActorRef] = unsupported
+
+ protected[akka] def mailbox: AnyRef = unsupported
+
+ protected[akka] def mailbox_=(value: AnyRef): AnyRef = unsupported
+
+ protected[akka] def handleTrapExit(dead: ActorRef, reason: Throwable) {
+ unsupported
+ }
+
+ protected[akka] def restart(reason: Throwable, maxNrOfRetries: Option[Int], withinTimeRange: Option[Int]) {
+ unsupported
+ }
+
+ protected[akka] def restartLinkedActors(reason: Throwable, maxNrOfRetries: Option[Int], withinTimeRange: Option[Int]) {
+ unsupported
+ }
+
+ protected[akka] def invoke(messageHandle: MessageInvocation) {
+ unsupported
+ }
+
+ protected[akka] def supervisor_=(sup: Option[ActorRef]) {
+ unsupported
+ }
+
+ protected[akka] def actorInstance: AtomicReference[Actor] = unsupported
+
+ private def unsupported = throw new UnsupportedOperationException("Not supported for %s".format(getClass.getName))
+}
diff --git a/akka-actor/src/main/scala/akka/routing/Routing.scala b/akka-actor/src/main/scala/akka/routing/Routing.scala
index b285d6783a..4b6597393e 100644
--- a/akka-actor/src/main/scala/akka/routing/Routing.scala
+++ b/akka-actor/src/main/scala/akka/routing/Routing.scala
@@ -4,16 +4,19 @@
package akka.routing
-import akka.actor.{ UntypedActor, Actor, ActorRef }
import akka.actor.Actor._
-
-import akka.actor.ActorRef
import scala.collection.JavaConversions._
import scala.collection.immutable.Seq
import java.util.concurrent.atomic.AtomicReference
import annotation.tailrec
import akka.AkkaException
+import akka.dispatch.Future
+import java.net.InetSocketAddress
+import akka.actor._
+import akka.event.EventHandler
+import akka.actor.UntypedChannel._
+import akka.routing.RouterType.RoundRobin
class RoutingException(message: String) extends AkkaException(message)
@@ -23,191 +26,304 @@ sealed trait RouterType
* @author Jonas Bonér
*/
object RouterType {
+
object Direct extends RouterType
+
+ /**
+ * A RouterType that randomly selects a connection to send a message to.
+ */
object Random extends RouterType
+
+ /**
+ * A RouterType that selects the connection by using round robin.
+ */
object RoundRobin extends RouterType
+
+ /**
+ * A RouterType that selects the connection based on the least amount of cpu usage
+ */
object LeastCPU extends RouterType
+
+ /**
+ * A RouterType that select the connection based on the least amount of ram used.
+ *
+ * todo: this is extremely vague currently since there are so many ways to define least amount of ram.
+ */
object LeastRAM extends RouterType
+
+ /**
+ * A RouterType that select the connection where the actor has the least amount of messages in its mailbox.
+ */
object LeastMessages extends RouterType
-}
-/**
- * A Router is a trait whose purpose is to route incoming messages to actors.
- */
-trait Router { this: Actor ⇒
-
- protected def transform(msg: Any): Any = msg
-
- protected def routes: PartialFunction[Any, ActorRef]
-
- protected def broadcast(message: Any) {}
-
- protected def dispatch: Receive = {
- case Routing.Broadcast(message) ⇒
- broadcast(message)
- case a if routes.isDefinedAt(a) ⇒
- if (isSenderDefined) routes(a).forward(transform(a))(someSelf)
- else routes(a).!(transform(a))(None)
- }
-
- def receive = dispatch
-
- private def isSenderDefined = self.senderFuture.isDefined || self.sender.isDefined
-}
-
-/**
- * An UntypedRouter is an abstract class whose purpose is to route incoming messages to actors.
- */
-abstract class UntypedRouter extends UntypedActor {
- protected def transform(msg: Any): Any = msg
-
- protected def route(msg: Any): ActorRef
-
- protected def broadcast(message: Any) {}
-
- private def isSenderDefined = self.senderFuture.isDefined || self.sender.isDefined
-
- @throws(classOf[Exception])
- def onReceive(msg: Any): Unit = msg match {
- case m: Routing.Broadcast ⇒ broadcast(m.message)
- case _ ⇒
- val r = route(msg)
- if (r eq null) throw new IllegalStateException("No route for " + msg + " defined!")
- if (isSenderDefined) r.forward(transform(msg))(someSelf)
- else r.!(transform(msg))(None)
- }
-}
-
-/**
- * A LoadBalancer is a specialized kind of Router, that is supplied an InfiniteIterator of targets
- * to dispatch incoming messages to.
- */
-trait LoadBalancer extends Router { self: Actor ⇒
- protected def seq: InfiniteIterator[ActorRef]
-
- protected def routes = {
- case x if seq.hasNext ⇒ seq.next
- }
-
- override def broadcast(message: Any) = seq.items.foreach(_ ! message)
-}
-
-/**
- * A UntypedLoadBalancer is a specialized kind of UntypedRouter, that is supplied an InfiniteIterator of targets
- * to dispatch incoming messages to.
- */
-abstract class UntypedLoadBalancer extends UntypedRouter {
- protected def seq: InfiniteIterator[ActorRef]
-
- protected def route(msg: Any) =
- if (seq.hasNext) seq.next
- else null
-
- override def broadcast(message: Any) = seq.items.foreach(_ ! message)
}
object Routing {
sealed trait RoutingMessage
+
case class Broadcast(message: Any) extends RoutingMessage
- type PF[A, B] = PartialFunction[A, B]
-
/**
- * Creates a new PartialFunction whose isDefinedAt is a combination
- * of the two parameters, and whose apply is first to call filter.apply
- * and then filtered.apply.
+ * Creates a new started RoutedActorRef that uses routing to deliver a message to one of its connected actors.
+ *
+ * @param actorAddress the address of the ActorRef.
+ * @param connections an Iterable pointing to all connected actor references.
+ * @param routerType the type of routing that should be used.
+ * @throws IllegalArgumentException if the number of connections is zero, or if it depends on the actual router implementation
+ * how many connections it can handle.
*/
- def filter[A, B](filter: PF[A, Unit], filtered: PF[A, B]): PF[A, B] = {
- case a: A if filtered.isDefinedAt(a) && filter.isDefinedAt(a) ⇒
- filter(a)
- filtered(a)
+ def newRoutedActorRef(actorAddress: String, connections: Iterable[ActorRef], routerType: RouterType): RoutedActorRef = {
+ if (connections.size == 0)
+ throw new IllegalArgumentException("To create a routed actor ref, at least one connection is required")
+
+ val ref = routerType match {
+ case RouterType.Direct ⇒
+ if (connections.size > 1) throw new IllegalArgumentException("A direct router can't have more than 1 connection")
+ new RoutedActorRef(actorAddress, connections) with Direct
+ case RouterType.Random ⇒
+ new RoutedActorRef(actorAddress, connections) with Random
+ case RouterType.RoundRobin ⇒
+ new RoutedActorRef(actorAddress, connections) with RoundRobin
+ case _ ⇒ throw new IllegalArgumentException("Unsupported routerType " + routerType)
+ }
+
+ ref.start()
}
- /**
- * Interceptor is a filter(x,y) where x.isDefinedAt is considered to be always true.
- */
- def intercept[A: Manifest, B](interceptor: (A) ⇒ Unit, interceptee: PF[A, B]): PF[A, B] =
- filter({ case a ⇒ interceptor(a) }, interceptee)
-
- /**
- * Creates a LoadBalancer from the thunk-supplied InfiniteIterator.
- */
- def loadBalancerActor(actors: ⇒ InfiniteIterator[ActorRef]): ActorRef =
- localActorOf(new Actor with LoadBalancer {
- val seq = actors
- }).start()
-
- /**
- * Creates a Router given a routing and a message-transforming function.
- */
- def routerActor(routing: PF[Any, ActorRef], msgTransformer: (Any) ⇒ Any): ActorRef =
- localActorOf(new Actor with Router {
- override def transform(msg: Any) = msgTransformer(msg)
- def routes = routing
- }).start()
-
- /**
- * Creates a Router given a routing.
- */
- def routerActor(routing: PF[Any, ActorRef]): ActorRef = localActorOf(new Actor with Router {
- def routes = routing
- }).start()
-
- /**
- * Creates an actor that pipes all incoming messages to
- * both another actor and through the supplied function
- */
- def loggerActor(actorToLog: ActorRef, logger: (Any) ⇒ Unit): ActorRef =
- routerActor({ case _ ⇒ actorToLog }, logger)
+ def newRoundRobinActorRef(actorAddress: String, connections: Iterable[ActorRef]): RoutedActorRef = {
+ newRoutedActorRef(actorAddress, connections, RoundRobin)
+ }
}
/**
- * An Iterator that is either always empty or yields an infinite number of Ts.
+ * A RoutedActorRef is an ActorRef that has a set of connected ActorRef and it uses a Router to send a message to
+ * on (or more) of these actors.
*/
-trait InfiniteIterator[T] extends Iterator[T] {
- val items: Seq[T]
+class RoutedActorRef(val address: String, val cons: Iterable[ActorRef]) extends UnsupportedActorRef with ScalaActorRef {
+ this: Router ⇒
+
+ def connections: Iterable[ActorRef] = cons
+
+ override def postMessageToMailbox(message: Any, channel: UntypedChannel): Unit = {
+ val sender = channel match {
+ case ref: ActorRef ⇒ Some(ref)
+ case _ ⇒ None
+ }
+ route(message)(sender)
+ }
+
+ override def postMessageToMailboxAndCreateFutureResultWithTimeout(message: Any,
+ timeout: Timeout,
+ channel: UntypedChannel): Future[Any] = {
+ val sender = channel match {
+ case ref: ActorRef ⇒ Some(ref)
+ case _ ⇒ None
+ }
+ route[Any](message, timeout)(sender)
+ }
+
+ private[akka] def failOver(from: InetSocketAddress, to: InetSocketAddress): Unit = {
+ throw new UnsupportedOperationException
+ }
+
+ def signalDeadActor(ref: ActorRef): Unit = {
+ throw new UnsupportedOperationException
+ }
+
+ def start(): this.type = synchronized[this.type] {
+ _status = ActorRefInternals.RUNNING
+ this
+ }
+
+ def stop() {
+ synchronized {
+ if (_status == ActorRefInternals.RUNNING) {
+ _status = ActorRefInternals.SHUTDOWN
+ postMessageToMailbox(RemoteActorSystemMessage.Stop, None)
+
+ // FIXME here we need to fire off Actor.cluster.remove(address) (which needs to be properly implemented first, see ticket)
+
+ //inetSocketAddressToActorRefMap.get.values foreach (_.stop()) // shut down all remote connections
+ }
+ }
+ }
}
/**
- * CyclicIterator is a round-robin style InfiniteIterator that cycles the supplied List.
+ * The Router is responsible for sending a message to one (or more) of its connections.
+ *
+ * @author Jonas Bonér
*/
-case class CyclicIterator[T](val items: Seq[T]) extends InfiniteIterator[T] {
- def this(items: java.util.List[T]) = this(items.toList)
+trait Router {
- private[this] val current: AtomicReference[Seq[T]] = new AtomicReference(items)
+ /**
+ * Returns an Iterable containing all connected ActorRefs this Router uses to send messages to.
+ */
+ def connections: Iterable[ActorRef]
- def hasNext = items != Nil
+ /**
+ * A callback this Router uses to indicate that some actorRef was not usable.
+ *
+ * Implementations should make sure that this method can be called without the actorRef being part of the
+ * current set of connections. The most logical way to deal with this situation, is just to ignore it.
+ *
+ * @param ref the dead
+ */
+ def signalDeadActor(ref: ActorRef): Unit
- def next: T = {
+ /**
+ * Routes the message to one of the connections.
+ */
+ def route(message: Any)(implicit sender: Option[ActorRef]): Unit
+
+ /**
+ * Routes the message using a timeout to one of the connections and returns a Future to synchronize on the
+ * completion of the processing of the message.
+ */
+ def route[T](message: Any, timeout: Timeout)(implicit sender: Option[ActorRef]): Future[T]
+}
+
+/**
+ * An Abstract Router implementation that already provides the basic infrastructure so that a concrete
+ * Router only needs to implement the next method.
+ *
+ * todo:
+ * This also is the location where a failover is done in the future if an ActorRef fails and a different
+ * one needs to be selected.
+ * todo:
+ * this is also the location where message buffering should be done in case of failure.
+ */
+trait BasicRouter extends Router {
+
+ def route(message: Any)(implicit sender: Option[ActorRef]): Unit = message match {
+ case Routing.Broadcast(message) ⇒
+ //it is a broadcast message, we are going to send to message to all connections.
+ connections.foreach(actor ⇒
+ try {
+ actor.!(message)(sender)
+ } catch {
+ case e: Exception ⇒
+ signalDeadActor(actor)
+ throw e
+ })
+ case _ ⇒
+ //it no broadcast message, we are going to select an actor from the connections and send the message to him.
+ next match {
+ case Some(actor) ⇒
+ try {
+ actor.!(message)(sender)
+ } catch {
+ case e: Exception ⇒
+ signalDeadActor(actor)
+ throw e
+
+ }
+ case None ⇒
+ throwNoConnectionsError()
+ }
+ }
+
+ def route[T](message: Any, timeout: Timeout)(implicit sender: Option[ActorRef]): Future[T] = message match {
+ case Routing.Broadcast(message) ⇒
+ throw new RoutingException("Broadcasting using a future for the time being is not supported")
+ case _ ⇒
+ //it no broadcast message, we are going to select an actor from the connections and send the message to him.
+ next match {
+ case Some(actor) ⇒
+ try {
+ actor.?(message, timeout)(sender).asInstanceOf[Future[T]]
+ } catch {
+ case e: Exception ⇒
+ signalDeadActor(actor)
+ throw e
+
+ }
+ case None ⇒
+ throwNoConnectionsError()
+ }
+ }
+
+ protected def next: Option[ActorRef]
+
+ private def throwNoConnectionsError() = {
+ val error = new RoutingException("No replica connections for router")
+ EventHandler.error(error, this, error.toString)
+ throw error
+ }
+}
+
+/**
+ * A Router that is used when a durable actor is used. All requests are send to the node containing the actor.
+ * As soon as that instance fails, a different instance is created and since the mailbox is durable, the internal
+ * state can be restored using event sourcing, and once this instance is up and running, all request will be send
+ * to this instance.
+ *
+ * @author Jonas Bonér
+ */
+trait Direct extends BasicRouter {
+
+ lazy val next: Option[ActorRef] = {
+ val connection = connections.headOption
+ if (connection.isEmpty) EventHandler.warning(this, "Router has no replica connection")
+ connection
+ }
+}
+
+/**
+ * A Router that randomly selects one of the target connections to send a message to.
+ *
+ * @author Jonas Bonér
+ */
+trait Random extends BasicRouter {
+
+ //todo: threadlocal random?
+ private val random = new java.util.Random(System.currentTimeMillis)
+
+ def next: Option[ActorRef] =
+ if (connections.isEmpty) {
+ EventHandler.warning(this, "Router has no replica connections")
+ None
+ } else {
+ val randomIndex = random.nextInt(connections.size)
+
+ //todo: possible index ouf of bounds problems since the number of connection could already have been changed.
+ Some(connections.iterator.drop(randomIndex).next())
+ }
+}
+
+/**
+ * A Router that uses round-robin to select a connection.
+ *
+ * @author Jonas Bonér
+ */
+trait RoundRobin extends BasicRouter {
+
+ private def items: List[ActorRef] = connections.toList
+
+ //todo: this is broken since the list is not updated.
+ private val current = new AtomicReference[List[ActorRef]](items)
+
+ private def hasNext = connections.nonEmpty
+
+ def next: Option[ActorRef] = {
@tailrec
- def findNext: T = {
+ def findNext: Option[ActorRef] = {
val currentItems = current.get
val newItems = currentItems match {
case Nil ⇒ items
case xs ⇒ xs
}
- if (current.compareAndSet(currentItems, newItems.tail)) newItems.head
- else findNext
+ if (newItems.isEmpty) {
+ EventHandler.warning(this, "Router has no replica connections")
+ None
+ } else {
+ if (current.compareAndSet(currentItems, newItems.tail)) newItems.headOption
+ else findNext
+ }
}
findNext
}
-
- override def exists(f: T ⇒ Boolean): Boolean = items exists f
}
-
-/**
- * This InfiniteIterator always returns the Actor that has the currently smallest mailbox
- * useful for work-stealing.
- */
-case class SmallestMailboxFirstIterator(val items: Seq[ActorRef]) extends InfiniteIterator[ActorRef] {
- def this(items: java.util.List[ActorRef]) = this(items.toList)
- def hasNext = items != Nil
-
- def next = items.reduceLeft((a1, a2) ⇒ if (a1.dispatcher.mailboxSize(a1) < a2.dispatcher.mailboxSize(a2)) a1 else a2)
-
- override def exists(f: ActorRef ⇒ Boolean): Boolean = items.exists(f)
-}
-
diff --git a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala
index 0e7157fa5e..348b11195e 100644
--- a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala
+++ b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala
@@ -977,7 +977,7 @@ class DefaultClusterNode private[akka] (
"Checking out cluster actor ref with address [%s] and router [%s] on [%s] connected to [\n\t%s]"
.format(actorAddress, router, remoteServerAddress, addresses.map(_._2).mkString("\n\t")))
- val actorRef = Router newRouter (router, addresses, actorAddress, Actor.TIMEOUT)
+ val actorRef = Routing newRouter (router, addresses, actorAddress, Actor.TIMEOUT)
addresses foreach {
case (_, address) ⇒ clusterActorRefs.put(address, actorRef)
}
diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterActorRef.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterActorRef.scala
index a5727e5381..87b120cf30 100644
--- a/akka-cluster/src/main/scala/akka/cluster/ClusterActorRef.scala
+++ b/akka-cluster/src/main/scala/akka/cluster/ClusterActorRef.scala
@@ -4,18 +4,17 @@
package akka.cluster
import akka.actor._
-import akka.dispatch._
import akka.util._
import ReflectiveAccess._
import akka.dispatch.Future
import java.net.InetSocketAddress
import java.util.concurrent.atomic.AtomicReference
-import java.util.{ Map ⇒ JMap }
import com.eaio.uuid.UUID
import collection.immutable.Map
import annotation.tailrec
+import akka.routing.Router
/**
* ActorRef representing a one or many instances of a clustered, load-balanced and sometimes replicated actor
@@ -26,8 +25,8 @@ import annotation.tailrec
class ClusterActorRef private[akka] (inetSocketAddresses: Array[Tuple2[UUID, InetSocketAddress]],
val address: String,
_timeout: Long)
- extends ActorRef with ScalaActorRef {
- this: Router.Router ⇒
+ extends UnsupportedActorRef with ScalaActorRef {
+ this: Router ⇒
timeout = _timeout
private[akka] val inetSocketAddressToActorRefMap = new AtomicReference[Map[InetSocketAddress, ActorRef]](
@@ -37,7 +36,7 @@ class ClusterActorRef private[akka] (inetSocketAddresses: Array[Tuple2[UUID, Ine
ClusterModule.ensureEnabled()
- def connections: Map[InetSocketAddress, ActorRef] = inetSocketAddressToActorRefMap.get
+ def connections: Iterable[ActorRef] = inetSocketAddressToActorRefMap.get.values
override def postMessageToMailbox(message: Any, channel: UntypedChannel): Unit = {
val sender = channel match {
@@ -96,11 +95,12 @@ class ClusterActorRef private[akka] (inetSocketAddresses: Array[Tuple2[UUID, Ine
def signalDeadActor(ref: ActorRef): Unit = {
//since the number remote actor refs for a clustered actor ref is quite low, we can deal with the O(N) complexity
//of the following removal.
- val it = connections.keySet.iterator
+ val map = inetSocketAddressToActorRefMap.get
+ val it = map.keySet.iterator
while (it.hasNext) {
val address = it.next()
- val foundRef: ActorRef = connections.get(address).get
+ val foundRef: ActorRef = map.get(address).get
if (foundRef == ref) {
remove(address)
@@ -133,57 +133,4 @@ class ClusterActorRef private[akka] (inetSocketAddresses: Array[Tuple2[UUID, Ine
}
}
}
-
- // ========================================================================
- // ==== NOT SUPPORTED ====
- // ========================================================================
-
- // FIXME move these methods and the same ones in RemoteActorRef to a base class - now duplicated
- def dispatcher_=(md: MessageDispatcher) {
- unsupported
- }
-
- def dispatcher: MessageDispatcher = unsupported
-
- def link(actorRef: ActorRef) {
- unsupported
- }
-
- def unlink(actorRef: ActorRef) {
- unsupported
- }
-
- def startLink(actorRef: ActorRef): ActorRef = unsupported
-
- def supervisor: Option[ActorRef] = unsupported
-
- def linkedActors: JMap[Uuid, ActorRef] = unsupported
-
- protected[akka] def mailbox: AnyRef = unsupported
-
- protected[akka] def mailbox_=(value: AnyRef): AnyRef = unsupported
-
- protected[akka] def handleTrapExit(dead: ActorRef, reason: Throwable) {
- unsupported
- }
-
- protected[akka] def restart(reason: Throwable, maxNrOfRetries: Option[Int], withinTimeRange: Option[Int]) {
- unsupported
- }
-
- protected[akka] def restartLinkedActors(reason: Throwable, maxNrOfRetries: Option[Int], withinTimeRange: Option[Int]) {
- unsupported
- }
-
- protected[akka] def invoke(messageHandle: MessageInvocation) {
- unsupported
- }
-
- protected[akka] def supervisor_=(sup: Option[ActorRef]) {
- unsupported
- }
-
- protected[akka] def actorInstance: AtomicReference[Actor] = unsupported
-
- private def unsupported = throw new UnsupportedOperationException("Not supported for ClusterActorRef")
}
diff --git a/akka-cluster/src/main/scala/akka/cluster/Routing.scala b/akka-cluster/src/main/scala/akka/cluster/Routing.scala
index 43942b3e8d..aef6a543fd 100644
--- a/akka-cluster/src/main/scala/akka/cluster/Routing.scala
+++ b/akka-cluster/src/main/scala/akka/cluster/Routing.scala
@@ -1,25 +1,22 @@
+package akka.cluster
+
/**
* Copyright (C) 2009-2011 Typesafe Inc.
*/
-package akka.cluster
-import akka.actor._
-import akka.dispatch.Future
-import akka.event.EventHandler
-import akka.routing.{ RouterType, RoutingException }
+import akka.routing.RouterType
import RouterType._
import com.eaio.uuid.UUID
-import annotation.tailrec
-
import java.net.InetSocketAddress
-import java.util.concurrent.atomic.AtomicReference
+import akka.routing.{ Random, Direct, RoundRobin }
/**
* @author Jonas Bonér
*/
-object Router {
+object Routing {
+
def newRouter(
routerType: RouterType,
inetSocketAddresses: Array[Tuple2[UUID, InetSocketAddress]],
@@ -34,122 +31,4 @@ object Router {
case LeastMessages ⇒ sys.error("Router LeastMessages not supported yet")
}
}
-
- /**
- * @author Jonas Bonér
- */
- trait Router {
-
- def connections: Map[InetSocketAddress, ActorRef]
-
- def signalDeadActor(ref: ActorRef): Unit
-
- def route(message: Any)(implicit sender: Option[ActorRef]): Unit
-
- def route[T](message: Any, timeout: Long)(implicit sender: Option[ActorRef]): Future[T]
- }
-
- /**
- * An Abstract Router implementation that already provides the basic infrastructure so that a concrete
- * Router only needs to implement the next method.
- *
- * This also is the location where a failover is done in the future if an ActorRef fails and a different
- * one needs to be selected.
- */
- trait BasicRouter extends Router {
-
- def route(message: Any)(implicit sender: Option[ActorRef]): Unit = next match {
- case Some(actor) ⇒ {
- try {
- actor.!(message)(sender)
- } catch {
- case e: Exception ⇒
- signalDeadActor(actor)
- throw e
- }
- }
- case _ ⇒ throwNoConnectionsError()
- }
-
- def route[T](message: Any, timeout: Long)(implicit sender: Option[ActorRef]): Future[T] = next match {
- case Some(actor) ⇒ {
- try {
- actor.?(message, timeout)(sender).asInstanceOf[Future[T]]
- } catch {
- case e: Throwable ⇒
- signalDeadActor(actor)
- throw e
- }
- }
- case _ ⇒ throwNoConnectionsError()
- }
-
- protected def next: Option[ActorRef]
-
- private def throwNoConnectionsError() = {
- val error = new RoutingException("No replica connections for router")
- EventHandler.error(error, this, error.toString)
- throw error
- }
- }
-
- /**
- * @author Jonas Bonér
- */
- trait Direct extends BasicRouter {
-
- lazy val next: Option[ActorRef] = {
- val connection = connections.values.headOption
- if (connection.isEmpty) EventHandler.warning(this, "Router has no replica connection")
- connection
- }
- }
-
- /**
- * @author Jonas Bonér
- */
- trait Random extends BasicRouter {
- private val random = new java.util.Random(System.currentTimeMillis)
-
- def next: Option[ActorRef] =
- if (connections.isEmpty) {
- EventHandler.warning(this, "Router has no replica connections")
- None
- } else {
- Some(connections.valuesIterator.drop(random.nextInt(connections.size)).next())
- }
- }
-
- /**
- * @author Jonas Bonér
- */
- trait RoundRobin extends BasicRouter {
- private def items: List[ActorRef] = connections.values.toList
-
- private val current = new AtomicReference[List[ActorRef]](items)
-
- private def hasNext = connections.nonEmpty
-
- def next: Option[ActorRef] = {
- @tailrec
- def findNext: Option[ActorRef] = {
- val currentItems = current.get
- val newItems = currentItems match {
- case Nil ⇒ items
- case xs ⇒ xs
- }
-
- if (newItems.isEmpty) {
- EventHandler.warning(this, "Router has no replica connections")
- None
- } else {
- if (current.compareAndSet(currentItems, newItems.tail)) newItems.headOption
- else findNext
- }
- }
-
- findNext
- }
- }
-
}
diff --git a/akka-samples/akka-sample-camel/src/test/scala/sample/camel/HttpConcurrencyTestStress.scala b/akka-samples/akka-sample-camel/src/test/scala/sample/camel/HttpConcurrencyTestStress.scala
index 0254e1e481..19cd7c45b2 100644
--- a/akka-samples/akka-sample-camel/src/test/scala/sample/camel/HttpConcurrencyTestStress.scala
+++ b/akka-samples/akka-sample-camel/src/test/scala/sample/camel/HttpConcurrencyTestStress.scala
@@ -11,7 +11,6 @@ import akka.actor.Actor._
import akka.actor.{ ActorRegistry, ActorRef, Actor }
import akka.camel._
import akka.camel.CamelServiceManager._
-import akka.routing.CyclicIterator
import akka.routing.Routing._
/**
@@ -51,8 +50,7 @@ object HttpConcurrencyTestStress {
startCamelService
val workers = for (i ← 1 to 8) yield actorOf[HttpServerWorker].start
- val balancer = loadBalancerActor(new CyclicIterator(workers.toList))
-
+ val balancer = newRoundRobinActorRef("loadbalancer", workers)
//service.get.awaitEndpointActivation(1) {
// actorOf(new HttpServerActor(balancer)).start
//}
diff --git a/akka-tutorials/akka-tutorial-first/src/main/java/akka/tutorial/first/java/Pi.java b/akka-tutorials/akka-tutorial-first/src/main/java/akka/tutorial/first/java/Pi.java
index 2bf9a3bc73..dd026619c8 100644
--- a/akka-tutorials/akka-tutorial-first/src/main/java/akka/tutorial/first/java/Pi.java
+++ b/akka-tutorials/akka-tutorial-first/src/main/java/akka/tutorial/first/java/Pi.java
@@ -11,11 +11,12 @@ import static java.util.Arrays.asList;
import akka.actor.ActorRef;
import akka.actor.UntypedActor;
import akka.actor.UntypedActorFactory;
-import akka.routing.CyclicIterator;
-import akka.routing.InfiniteIterator;
+import akka.routing.RouterType;
+import akka.routing.Routing;
import akka.routing.Routing.Broadcast;
-import akka.routing.UntypedLoadBalancer;
+import scala.collection.JavaConversions;
+import java.util.LinkedList;
import java.util.concurrent.CountDownLatch;
public class Pi {
@@ -96,35 +97,18 @@ public class Pi {
private ActorRef router;
- static class PiRouter extends UntypedLoadBalancer {
- private final InfiniteIterator workers;
-
- public PiRouter(ActorRef[] workers) {
- this.workers = new CyclicIterator(asList(workers));
- }
-
- public InfiniteIterator seq() {
- return workers;
- }
- }
-
- public Master(int nrOfWorkers, int nrOfMessages, int nrOfElements, CountDownLatch latch) {
+ public Master(int nrOfWorkers, int nrOfMessages, int nrOfElements, CountDownLatch latch) {
this.nrOfMessages = nrOfMessages;
this.nrOfElements = nrOfElements;
this.latch = latch;
- // create the workers
- final ActorRef[] workers = new ActorRef[nrOfWorkers];
+ LinkedList workers = new LinkedList();
for (int i = 0; i < nrOfWorkers; i++) {
- workers[i] = actorOf(Worker.class, "worker").start();
+ ActorRef worker = actorOf(Worker.class, "worker").start();
+ workers.add(worker);
}
- // wrap them with a load-balancing router
- router = actorOf(new UntypedActorFactory() {
- public UntypedActor create() {
- return new PiRouter(workers);
- }
- }, "router").start();
+ router = Routing.newRoundRobinActorRef("pi", JavaConversions.asIterable(workers));
}
// message handler
diff --git a/akka-tutorials/akka-tutorial-first/src/main/scala/Pi.scala b/akka-tutorials/akka-tutorial-first/src/main/scala/Pi.scala
index c3e1de6b6b..03aa6e41b8 100644
--- a/akka-tutorials/akka-tutorial-first/src/main/scala/Pi.scala
+++ b/akka-tutorials/akka-tutorial-first/src/main/scala/Pi.scala
@@ -6,11 +6,9 @@ package akka.tutorial.first.scala
import akka.actor.{ Actor, PoisonPill }
import Actor._
-import akka.routing.{ Routing, CyclicIterator }
-import Routing._
-
-import System.{ currentTimeMillis ⇒ now }
import java.util.concurrent.CountDownLatch
+import akka.routing.Routing.Broadcast
+import akka.routing.Routing
object Pi extends App {
@@ -57,7 +55,7 @@ object Pi extends App {
val workers = Vector.fill(nrOfWorkers)(actorOf[Worker].start())
// wrap them with a load-balancing router
- val router = Routing.loadBalancerActor(CyclicIterator(workers)).start()
+ val router = Routing.newRoundRobinActorRef("pi", workers)
// message handler
def receive = {
diff --git a/akka-tutorials/akka-tutorial-second/src/main/java/akka/tutorial/java/second/Pi.java b/akka-tutorials/akka-tutorial-second/src/main/java/akka/tutorial/java/second/Pi.java
index 91c7829278..c5da4f46cb 100644
--- a/akka-tutorials/akka-tutorial-second/src/main/java/akka/tutorial/java/second/Pi.java
+++ b/akka-tutorials/akka-tutorial-second/src/main/java/akka/tutorial/java/second/Pi.java
@@ -8,6 +8,8 @@ import static akka.actor.Actors.actorOf;
import static akka.actor.Actors.poisonPill;
import static java.lang.System.currentTimeMillis;
import static java.util.Arrays.asList;
+
+import akka.routing.Routing;
import scala.Option;
import akka.actor.ActorRef;
import akka.actor.Channel;
@@ -15,10 +17,10 @@ import akka.actor.UntypedActor;
import akka.actor.UntypedActorFactory;
import akka.dispatch.Future;
import akka.japi.Procedure;
-import akka.routing.CyclicIterator;
-import akka.routing.InfiniteIterator;
import akka.routing.Routing.Broadcast;
-import akka.routing.UntypedLoadBalancer;
+import scala.collection.JavaConversions;
+
+import java.util.LinkedList;
public class Pi {
@@ -90,34 +92,17 @@ public class Pi {
private ActorRef router;
- static class PiRouter extends UntypedLoadBalancer {
- private final InfiniteIterator workers;
-
- public PiRouter(ActorRef[] workers) {
- this.workers = new CyclicIterator(asList(workers));
- }
-
- public InfiniteIterator seq() {
- return workers;
- }
- }
-
public Master(int nrOfWorkers, int nrOfMessages, int nrOfElements) {
this.nrOfMessages = nrOfMessages;
this.nrOfElements = nrOfElements;
- // create the workers
- final ActorRef[] workers = new ActorRef[nrOfWorkers];
+ LinkedList workers = new LinkedList();
for (int i = 0; i < nrOfWorkers; i++) {
- workers[i] = actorOf(Worker.class, "worker").start();
+ ActorRef worker = actorOf(Worker.class, "worker").start();
+ workers.add(worker);
}
- // wrap them with a load-balancing router
- router = actorOf(new UntypedActorFactory() {
- public UntypedActor create() {
- return new PiRouter(workers);
- }
- }, "router").start();
+ router = Routing.newRoundRobinActorRef("pi", JavaConversions.asIterable(workers));
}
@Override
diff --git a/akka-tutorials/akka-tutorial-second/src/main/scala/Pi.scala b/akka-tutorials/akka-tutorial-second/src/main/scala/Pi.scala
index cf9c53f894..d420ebc179 100644
--- a/akka-tutorials/akka-tutorial-second/src/main/scala/Pi.scala
+++ b/akka-tutorials/akka-tutorial-second/src/main/scala/Pi.scala
@@ -5,13 +5,11 @@
package akka.tutorial.second
import akka.actor.Actor._
-import akka.routing.{ Routing, CyclicIterator }
-import Routing._
+import akka.routing.Routing
import akka.event.EventHandler
-import akka.actor.{ Channel, Actor, PoisonPill, Timeout }
-import akka.dispatch.Future
-
import System.{ currentTimeMillis ⇒ now }
+import akka.routing.Routing.Broadcast
+import akka.actor.{ Timeout, Channel, Actor, PoisonPill }
object Pi extends App {
@@ -55,7 +53,7 @@ object Pi extends App {
val workers = Vector.fill(nrOfWorkers)(actorOf[Worker].start())
// wrap them with a load-balancing router
- val router = Routing.loadBalancerActor(CyclicIterator(workers)).start()
+ val router = Routing.newRoundRobinActorRef("pi", workers)
// phase 1, can accept a Calculate message
def scatter: Receive = {