From 21fee0fa3413f6af3f6050689684f0ab44de55b4 Mon Sep 17 00:00:00 2001 From: Peter Veentjer Date: Tue, 26 Jul 2011 11:16:39 +0300 Subject: [PATCH 1/3] ticket 889, initial checkin --- .../src/main/scala/akka/routing/Routing.scala | 6 +++- .../src/main/scala/akka/cluster/Routing.scala | 30 +++++++++++++++++++ 2 files changed, 35 insertions(+), 1 deletion(-) diff --git a/akka-actor/src/main/scala/akka/routing/Routing.scala b/akka-actor/src/main/scala/akka/routing/Routing.scala index b285d6783a..15ec9653a9 100644 --- a/akka-actor/src/main/scala/akka/routing/Routing.scala +++ b/akka-actor/src/main/scala/akka/routing/Routing.scala @@ -4,7 +4,9 @@ package akka.routing -import akka.actor.{ UntypedActor, Actor, ActorRef } +//TODO: This will package is going to be removed. + +import akka.actor.{ UntypedActor, Actor} import akka.actor.Actor._ import akka.actor.ActorRef @@ -203,7 +205,9 @@ case class CyclicIterator[T](val items: Seq[T]) extends InfiniteIterator[T] { * useful for work-stealing. */ case class SmallestMailboxFirstIterator(val items: Seq[ActorRef]) extends InfiniteIterator[ActorRef] { + def this(items: java.util.List[ActorRef]) = this(items.toList) + def hasNext = items != Nil def next = items.reduceLeft((a1, a2) ⇒ if (a1.dispatcher.mailboxSize(a1) < a2.dispatcher.mailboxSize(a2)) a1 else a2) diff --git a/akka-cluster/src/main/scala/akka/cluster/Routing.scala b/akka-cluster/src/main/scala/akka/cluster/Routing.scala index c9db1e7208..36a2393be6 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Routing.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Routing.scala @@ -20,6 +20,7 @@ import java.util.concurrent.atomic.AtomicReference * @author Jonas Bonér */ object Router { + def newRouter( routerType: RouterType, inetSocketAddresses: Array[Tuple2[UUID, InetSocketAddress]], @@ -36,16 +37,35 @@ object Router { } /** + * The Router is responsible for sending a message to one (or more) of its connections. + * * @author Jonas Bonér */ trait Router { + /** + * Returns a Map containing all ActorRefs this Router uses send messages to. + */ def connections: Map[InetSocketAddress, ActorRef] + /** + * A callback this Router uses to indicate that some actorRef was not usable. + * + * Implementations should make sure that this method can be called without the actorRef being part of the + * current set of connections. The most logical way to deal with this situation, is just to ignore it. + * + * @param ref the dead + */ def signalDeadActor(ref: ActorRef): Unit + /** + * + */ def route(message: Any)(implicit sender: Option[ActorRef]): Unit + /** + * + */ def route[T](message: Any, timeout: Long)(implicit sender: Option[ActorRef]): Future[T] } @@ -94,6 +114,11 @@ object Router { } /** + * A Router that is used when a durable actor is used. All requests are send to the node containing the actor. + * As soon as that instance fails, a different instance is created and since the mailbox is durable, the internal + * state can be restored using event sourcing, and once this instance is up and running, all request will be send + * to this instance. + * * @author Jonas Bonér */ trait Direct extends BasicRouter { @@ -106,9 +131,12 @@ object Router { } /** + * A Router that randomly selects one of the target connections to send a message to. + * * @author Jonas Bonér */ trait Random extends BasicRouter { + private val random = new java.util.Random(System.currentTimeMillis) def next: Option[ActorRef] = @@ -121,6 +149,8 @@ object Router { } /** + * A Router that uses round-robin to select a connection. + * * @author Jonas Bonér */ trait RoundRobin extends BasicRouter { From f1733aacc6b8dd756ccd7312b4d8deec0d338462 Mon Sep 17 00:00:00 2001 From: Derek Williams Date: Tue, 26 Jul 2011 22:41:53 -0600 Subject: [PATCH 2/3] Reducing IO load again to try and pass test on CI build --- .../src/test/scala/akka/actor/actor/IOActor.scala | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/akka-actor-tests/src/test/scala/akka/actor/actor/IOActor.scala b/akka-actor-tests/src/test/scala/akka/actor/actor/IOActor.scala index 4c2d069f24..82e1734b45 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/actor/IOActor.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/actor/IOActor.scala @@ -189,9 +189,9 @@ class IOActorSpec extends WordSpec with MustMatchers with BeforeAndAfterEach { val ioManager = Actor.actorOf(new IOManager()).start val server = Actor.actorOf(new SimpleEchoServer("localhost", 8065, ioManager)).start val client = Actor.actorOf(new SimpleEchoClient("localhost", 8065, ioManager)).start - val list = List.range(0, 100000) + val list = List.range(0, 1000) val f = Future.traverse(list)(i ⇒ client ? ByteString(i.toString)) - assert(f.get.size === 100000) + assert(f.get.size === 1000) client.stop server.stop ioManager.stop @@ -201,9 +201,9 @@ class IOActorSpec extends WordSpec with MustMatchers with BeforeAndAfterEach { val ioManager = Actor.actorOf(new IOManager(2)).start val server = Actor.actorOf(new SimpleEchoServer("localhost", 8066, ioManager)).start val client = Actor.actorOf(new SimpleEchoClient("localhost", 8066, ioManager)).start - val list = List.range(0, 100000) + val list = List.range(0, 1000) val f = Future.traverse(list)(i ⇒ client ? ByteString(i.toString)) - assert(f.get.size === 100000) + assert(f.get.size === 1000) client.stop server.stop ioManager.stop From 0fcc35d4b0c4b6a16dd7ca9107c4c3905787e369 Mon Sep 17 00:00:00 2001 From: Peter Veentjer Date: Thu, 28 Jul 2011 15:48:03 +0300 Subject: [PATCH 3/3] ticket #889 --- .../akka/actor/actor/TypedActorSpec.scala | 48 +- .../scala/akka/routing/ActorPoolSpec.scala | 566 +++++++++++ .../scala/akka/routing/ListenerSpec.scala | 58 ++ .../test/scala/akka/routing/RoutingSpec.scala | 926 +++++------------- .../src/main/scala/akka/actor/ActorRef.scala | 74 +- .../src/main/scala/akka/routing/Routing.scala | 419 +++++--- .../scala/akka/cluster/ClusterActorRef.scala | 65 +- .../src/main/scala/akka/cluster/Routing.scala | 165 +--- .../camel/HttpConcurrencyTestStress.scala | 3 +- .../java/akka/tutorial/first/java/Pi.java | 34 +- .../src/main/scala/Pi.scala | 8 +- .../java/akka/tutorial/java/second/Pi.java | 33 +- .../src/main/scala/Pi.scala | 7 +- 13 files changed, 1303 insertions(+), 1103 deletions(-) create mode 100644 akka-actor-tests/src/test/scala/akka/routing/ActorPoolSpec.scala create mode 100644 akka-actor-tests/src/test/scala/akka/routing/ListenerSpec.scala diff --git a/akka-actor-tests/src/test/scala/akka/actor/actor/TypedActorSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/actor/TypedActorSpec.scala index 7c09de930e..a72ec16d11 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/actor/TypedActorSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/actor/TypedActorSpec.scala @@ -7,37 +7,71 @@ package akka.actor import org.scalatest.matchers.MustMatchers import org.scalatest.junit.JUnitRunner import org.junit.runner.RunWith -import org.scalatest.{ BeforeAndAfterAll, WordSpec, BeforeAndAfterEach } +import org.scalatest.{BeforeAndAfterAll, WordSpec, BeforeAndAfterEach} import akka.actor.TypedActor._ -import akka.japi.{ Option ⇒ JOption } +import akka.japi.{Option ⇒ JOption} import akka.util.Duration -import akka.dispatch.{ Dispatchers, Future, KeptPromise } -import akka.routing.CyclicIterator -import java.io.{ ObjectInputStream, ObjectOutputStream, ByteArrayOutputStream } -import akka.actor.TypedActorSpec.Foo +import akka.dispatch.{Dispatchers, Future, KeptPromise} +import java.util.concurrent.atomic.AtomicReference +import annotation.tailrec object TypedActorSpec { + + class CyclicIterator[T](val items: Seq[T]) extends Iterator[T] { + + private[this] val current: AtomicReference[Seq[T]] = new AtomicReference(items) + + def hasNext = items != Nil + + def next: T = { + @tailrec + def findNext: T = { + val currentItems = current.get + val newItems = currentItems match { + case Nil ⇒ items + case xs ⇒ xs + } + + if (current.compareAndSet(currentItems, newItems.tail)) newItems.head + else findNext + } + + findNext + } + + override def exists(f: T ⇒ Boolean): Boolean = items exists f + } + + trait Foo { def pigdog(): String def self = TypedActor.self[Foo] def futurePigdog(): Future[String] + def futurePigdog(delay: Long): Future[String] + def futurePigdog(delay: Long, numbered: Int): Future[String] + def futureComposePigdogFrom(foo: Foo): Future[String] def failingFuturePigdog(): Future[String] = throw new IllegalStateException("expected") + def failingOptionPigdog(): Option[String] = throw new IllegalStateException("expected") + def failingJOptionPigdog(): JOption[String] = throw new IllegalStateException("expected") def failingPigdog(): Unit = throw new IllegalStateException("expected") def optionPigdog(): Option[String] + def optionPigdog(delay: Long): Option[String] + def joptionPigdog(delay: Long): JOption[String] def incr() + def read(): Int def testMethodCallSerialization(foo: Foo, s: String, i: Int): Unit = throw new IllegalStateException("expected") @@ -48,6 +82,7 @@ object TypedActorSpec { def pigdog = "Pigdog" def futurePigdog(): Future[String] = new KeptPromise(Right(pigdog)) + def futurePigdog(delay: Long): Future[String] = { Thread.sleep(delay) futurePigdog @@ -92,6 +127,7 @@ object TypedActorSpec { trait Stacked extends Stackable1 with Stackable2 { def stacked: String = stackable1 + stackable2 + def notOverriddenStacked: String = stackable1 + stackable2 } diff --git a/akka-actor-tests/src/test/scala/akka/routing/ActorPoolSpec.scala b/akka-actor-tests/src/test/scala/akka/routing/ActorPoolSpec.scala new file mode 100644 index 0000000000..499ae8e81e --- /dev/null +++ b/akka-actor-tests/src/test/scala/akka/routing/ActorPoolSpec.scala @@ -0,0 +1,566 @@ +package akka.routing + +import org.scalatest.WordSpec +import org.scalatest.matchers.MustMatchers +import akka.dispatch.{KeptPromise, Future} +import java.util.concurrent.atomic.AtomicInteger +import akka.actor.Actor._ +import akka.testkit.Testing._ +import akka.actor.{TypedActor, Actor} +import akka.actor.Actor.Timeout._ +import akka.testkit.TestLatch +import akka.util.duration._ + +object ActorPoolSpec { + + trait Foo { + def sq(x: Int, sleep: Long): Future[Int] + } + + class FooImpl extends Foo { + def sq(x: Int, sleep: Long): Future[Int] = { + if (sleep > 0) Thread.sleep(sleep) + new KeptPromise(Right(x * x)) + } + } +} + +class ActorPoolSpec extends WordSpec with MustMatchers { + "Actor Pool" must { + + "have expected capacity" in { + val latch = TestLatch(2) + val count = new AtomicInteger(0) + + val pool = actorOf( + new Actor with DefaultActorPool with DefaultActorPoolSupervisionConfig with FixedCapacityStrategy with SmallestMailboxSelector { + def factory = actorOf(new Actor { + def receive = { + case _ ⇒ + count.incrementAndGet + latch.countDown() + self tryReply "success" + } + }).start() + + def limit = 2 + def selectionCount = 1 + def partialFill = true + def instance = factory + def receive = _route + }).start() + + val successes = TestLatch(2) + val successCounter = actorOf(new Actor { + def receive = { + case "success" ⇒ successes.countDown() + } + }).start() + + implicit val replyTo = successCounter + pool ! "a" + pool ! "b" + + latch.await + successes.await + + count.get must be(2) + + (pool ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2) + + pool.stop() + } + + "pass ticket #705" in { + val pool = actorOf( + new Actor with DefaultActorPool with DefaultActorPoolSupervisionConfig with BoundedCapacityStrategy with MailboxPressureCapacitor with SmallestMailboxSelector with BasicFilter { + def lowerBound = 2 + def upperBound = 20 + def rampupRate = 0.1 + def backoffRate = 0.1 + def backoffThreshold = 0.5 + def partialFill = true + def selectionCount = 1 + def instance = factory + def receive = _route + def pressureThreshold = 1 + def factory = actorOf(new Actor { + def receive = { + case req: String ⇒ { + sleepFor(10 millis) + self.tryReply("Response") + } + } + }) + }).start() + + try { + (for (count ← 1 to 500) yield pool.?("Test", 20000)) foreach { + _.await.resultOrException.get must be("Response") + } + } finally { + pool.stop() + } + } + + "grow as needed under pressure" in { + // make sure the pool starts at the expected lower limit and grows to the upper as needed + // as influenced by the backlog of blocking pooled actors + + var latch = TestLatch(3) + val count = new AtomicInteger(0) + + val pool = actorOf( + new Actor with DefaultActorPool with DefaultActorPoolSupervisionConfig with BoundedCapacityStrategy with ActiveFuturesPressureCapacitor with SmallestMailboxSelector with BasicNoBackoffFilter { + def factory = actorOf(new Actor { + def receive = { + case n: Int ⇒ + sleepFor(n millis) + count.incrementAndGet + latch.countDown() + } + }) + + def lowerBound = 2 + def upperBound = 4 + def rampupRate = 0.1 + def partialFill = true + def selectionCount = 1 + def instance = factory + def receive = _route + }).start() + + // first message should create the minimum number of delgates + + pool ! 1 + + (pool ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2) + + var loops = 0 + def loop(t: Int) = { + latch = TestLatch(loops) + count.set(0) + for (m ← 0 until loops) { + pool ? t + sleepFor(50 millis) + } + } + + // 2 more should go thru without triggering more + + loops = 2 + + loop(500) + latch.await + count.get must be(loops) + + (pool ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2) + + // a whole bunch should max it out + + loops = 10 + loop(500) + latch.await + count.get must be(loops) + + (pool ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(4) + + pool.stop() + } + + "grow as needed under mailbox pressure" in { + // make sure the pool starts at the expected lower limit and grows to the upper as needed + // as influenced by the backlog of messages in the delegate mailboxes + + var latch = TestLatch(3) + val count = new AtomicInteger(0) + + val pool = actorOf( + new Actor with DefaultActorPool with DefaultActorPoolSupervisionConfig with BoundedCapacityStrategy with MailboxPressureCapacitor with SmallestMailboxSelector with BasicNoBackoffFilter { + def factory = actorOf(new Actor { + def receive = { + case n: Int ⇒ + sleepFor(n millis) + count.incrementAndGet + latch.countDown() + } + }) + + def lowerBound = 2 + def upperBound = 4 + def pressureThreshold = 3 + def rampupRate = 0.1 + def partialFill = true + def selectionCount = 1 + def instance = factory + def receive = _route + }).start() + + var loops = 0 + def loop(t: Int) = { + latch = TestLatch(loops) + count.set(0) + for (m ← 0 until loops) { + pool ! t + } + } + + // send a few messages and observe pool at its lower bound + loops = 3 + loop(500) + latch.await + count.get must be(loops) + + (pool ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2) + + // send a bunch over the theshold and observe an increment + loops = 15 + loop(500) + + latch.await(10 seconds) + count.get must be(loops) + + (pool ? ActorPool.Stat).as[ActorPool.Stats].get.size must be >= (3) + + pool.stop() + } + + "round robin" in { + val latch1 = TestLatch(2) + val delegates = new java.util.concurrent.ConcurrentHashMap[String, String] + + val pool1 = actorOf( + new Actor with DefaultActorPool with DefaultActorPoolSupervisionConfig with FixedCapacityStrategy with RoundRobinSelector with BasicNoBackoffFilter { + def factory = actorOf(new Actor { + def receive = { + case _ ⇒ + delegates put (self.uuid.toString, "") + latch1.countDown() + } + }) + + def limit = 1 + def selectionCount = 1 + def rampupRate = 0.1 + def partialFill = true + def instance = factory + def receive = _route + }).start() + + pool1 ! "a" + pool1 ! "b" + + latch1.await + delegates.size must be(1) + + pool1.stop() + + val latch2 = TestLatch(2) + delegates.clear() + + val pool2 = actorOf( + new Actor with DefaultActorPool with DefaultActorPoolSupervisionConfig with FixedCapacityStrategy with RoundRobinSelector with BasicNoBackoffFilter { + def factory = actorOf(new Actor { + def receive = { + case _ ⇒ + delegates put (self.uuid.toString, "") + latch2.countDown() + } + }) + + def limit = 2 + def selectionCount = 1 + def rampupRate = 0.1 + def partialFill = false + def instance = factory + def receive = _route + }).start() + + pool2 ! "a" + pool2 ! "b" + + latch2.await + delegates.size must be(2) + + pool2.stop() + } + + "backoff" in { + val latch = TestLatch(10) + + val pool = actorOf( + new Actor with DefaultActorPool with DefaultActorPoolSupervisionConfig with BoundedCapacityStrategy with MailboxPressureCapacitor with SmallestMailboxSelector with Filter with RunningMeanBackoff with BasicRampup { + def factory = actorOf(new Actor { + def receive = { + case n: Int ⇒ + sleepFor(n millis) + latch.countDown() + } + }) + + def lowerBound = 1 + def upperBound = 5 + def pressureThreshold = 1 + def partialFill = true + def selectionCount = 1 + def rampupRate = 0.1 + def backoffRate = 0.50 + def backoffThreshold = 0.50 + def instance = factory + def receive = _route + }).start() + + // put some pressure on the pool + + for (m ← 0 to 10) pool ! 250 + + sleepFor(5 millis) + + val z = (pool ? ActorPool.Stat).as[ActorPool.Stats].get.size + + z must be >= (2) + + // let it cool down + + for (m ← 0 to 3) { + pool ! 1 + sleepFor(500 millis) + } + + (pool ? ActorPool.Stat).as[ActorPool.Stats].get.size must be <= (z) + + pool.stop() + } + + "support typed actors" in { + import ActorPoolSpec._ + import TypedActor._ + def createPool = new Actor with DefaultActorPool with DefaultActorPoolSupervisionConfig with BoundedCapacityStrategy with MailboxPressureCapacitor with SmallestMailboxSelector with Filter with RunningMeanBackoff with BasicRampup { + def lowerBound = 1 + def upperBound = 5 + def pressureThreshold = 1 + def partialFill = true + def selectionCount = 1 + def rampupRate = 0.1 + def backoffRate = 0.50 + def backoffThreshold = 0.50 + def instance = getActorRefFor(typedActorOf[Foo, FooImpl]()) + def receive = _route + } + + val pool = createProxy[Foo](createPool) + + val results = for (i ← 1 to 100) yield (i, pool.sq(i, 100)) + + for ((i, r) ← results) r.get must equal(i * i) + } + + "provide default supervision of pooled actors" in { + import akka.config.Supervision._ + val pingCount = new AtomicInteger(0) + val deathCount = new AtomicInteger(0) + var keepDying = false + + val pool1 = actorOf( + new Actor with DefaultActorPool with DefaultActorPoolSupervisionConfig with BoundedCapacityStrategy with ActiveFuturesPressureCapacitor with SmallestMailboxSelector with BasicFilter { + def lowerBound = 2 + def upperBound = 5 + def rampupRate = 0.1 + def backoffRate = 0.1 + def backoffThreshold = 0.5 + def partialFill = true + def selectionCount = 1 + def instance = factory + def receive = _route + def pressureThreshold = 1 + def factory = actorOf(new Actor { + if (deathCount.get > 5) deathCount.set(0) + if (deathCount.get > 0) {deathCount.incrementAndGet;throw new IllegalStateException("keep dying")} + def receive = { + case akka.Die ⇒ + if (keepDying) deathCount.incrementAndGet + throw new RuntimeException + case _ => pingCount.incrementAndGet + } + }).start() + }).start() + + val pool2 = actorOf( + new Actor with DefaultActorPool with DefaultActorPoolSupervisionConfig with BoundedCapacityStrategy with ActiveFuturesPressureCapacitor with SmallestMailboxSelector with BasicFilter { + def lowerBound = 2 + def upperBound = 5 + def rampupRate = 0.1 + def backoffRate = 0.1 + def backoffThreshold = 0.5 + def partialFill = true + def selectionCount = 1 + def instance = factory + def receive = _route + def pressureThreshold = 1 + def factory = actorOf(new Actor { + self.lifeCycle = Permanent + if (deathCount.get > 5) deathCount.set(0) + if (deathCount.get > 0) {deathCount.incrementAndGet;throw new IllegalStateException("keep dying")} + def receive = { + case akka.Die ⇒ + if (keepDying) deathCount.incrementAndGet + throw new RuntimeException + case _ => pingCount.incrementAndGet + } + }).start() + }).start() + + val pool3 = actorOf( + new Actor with DefaultActorPool with DefaultActorPoolSupervisionConfig with BoundedCapacityStrategy with ActiveFuturesPressureCapacitor with RoundRobinSelector with BasicFilter { + def lowerBound = 2 + def upperBound = 5 + def rampupRate = 0.1 + def backoffRate = 0.1 + def backoffThreshold = 0.5 + def partialFill = true + def selectionCount = 1 + def instance = factory + def receive = _route + def pressureThreshold = 1 + def factory = actorOf(new Actor { + self.lifeCycle = Temporary + if (deathCount.get > 5) deathCount.set(0) + if (deathCount.get > 0) {deathCount.incrementAndGet;throw new IllegalStateException("keep dying")} + def receive = { + case akka.Die ⇒ + if (keepDying) deathCount.incrementAndGet + throw new RuntimeException + case _ => pingCount.incrementAndGet + } + }).start() + }).start() + + // default lifecycle + // actor comes back right away + pingCount.set(0) + keepDying = false + pool1 ! "ping" + (pool1 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2) + pool1 ! akka.Die + sleepFor(2 seconds) + (pool1 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2) + pingCount.get must be (1) + + // default lifecycle + // actor dies completely + pingCount.set(0) + keepDying = true + pool1 ! "ping" + (pool1 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2) + pool1 ! akka.Die + sleepFor(2 seconds) + (pool1 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(1) + pool1 ! "ping" + (pool1 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2) + pingCount.get must be (2) + + // permanent lifecycle + // actor comes back right away + pingCount.set(0) + keepDying = false + pool2 ! "ping" + (pool2 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2) + pool2 ! akka.Die + sleepFor(2 seconds) + (pool2 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2) + pingCount.get must be (1) + + // permanent lifecycle + // actor dies completely + pingCount.set(0) + keepDying = true + pool2 ! "ping" + (pool2 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2) + pool2 ! akka.Die + sleepFor(2 seconds) + (pool2 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(1) + pool2 ! "ping" + (pool2 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2) + pingCount.get must be (2) + + // temporary lifecycle + pingCount.set(0) + keepDying = false + pool3 ! "ping" + (pool3 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2) + pool3 ! akka.Die + sleepFor(2 seconds) + (pool3 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(1) + pool3 ! "ping" + pool3 ! "ping" + pool3 ! "ping" + (pool3 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2) + pingCount.get must be (4) + } + + "support customizable supervision config of pooled actors" in { + import akka.config.Supervision._ + val pingCount = new AtomicInteger(0) + val deathCount = new AtomicInteger(0) + var keepDying = false + + trait LimitedTrapSupervisionConfig extends ActorPoolSupervisionConfig { + def poolFaultHandler = OneForOneStrategy(List(classOf[IllegalStateException]), 5, 1000) + } + + object BadState + + val pool1 = actorOf( + new Actor with DefaultActorPool with LimitedTrapSupervisionConfig with BoundedCapacityStrategy with ActiveFuturesPressureCapacitor with SmallestMailboxSelector with BasicFilter { + def lowerBound = 2 + def upperBound = 5 + def rampupRate = 0.1 + def backoffRate = 0.1 + def backoffThreshold = 0.5 + def partialFill = true + def selectionCount = 1 + def instance = factory + def receive = _route + def pressureThreshold = 1 + def factory = actorOf(new Actor { + if (deathCount.get > 5) deathCount.set(0) + if (deathCount.get > 0) {deathCount.incrementAndGet;throw new IllegalStateException("keep dying")} + def receive = { + case BadState ⇒ + if (keepDying) deathCount.incrementAndGet + throw new IllegalStateException + case akka.Die => + throw new RuntimeException + case _ => pingCount.incrementAndGet + } + }).start() + }).start() + + + // actor comes back right away + pingCount.set(0) + keepDying = false + pool1 ! "ping" + (pool1 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2) + pool1 ! BadState + sleepFor(2 seconds) + (pool1 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2) + pingCount.get must be (1) + + // actor dies completely + pingCount.set(0) + keepDying = true + pool1 ! "ping" + (pool1 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2) + pool1 ! BadState + sleepFor(2 seconds) + (pool1 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(1) + pool1 ! "ping" + (pool1 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2) + pingCount.get must be (2) + + // kill it + intercept[RuntimeException](pool1.?(akka.Die).get) + } + } +} \ No newline at end of file diff --git a/akka-actor-tests/src/test/scala/akka/routing/ListenerSpec.scala b/akka-actor-tests/src/test/scala/akka/routing/ListenerSpec.scala new file mode 100644 index 0000000000..0106558f1f --- /dev/null +++ b/akka-actor-tests/src/test/scala/akka/routing/ListenerSpec.scala @@ -0,0 +1,58 @@ +package akka.actor.routing + +import org.scalatest.WordSpec +import org.scalatest.matchers.MustMatchers + +import akka.testkit._ +import akka.actor._ +import akka.actor.Actor._ +import akka.routing._ +import java.util.concurrent.atomic.AtomicInteger + +class ListenerSpec extends WordSpec with MustMatchers { + + "Listener" must { + + "do something" in { + val fooLatch = TestLatch(2) + val barLatch = TestLatch(2) + val barCount = new AtomicInteger(0) + + val broadcast = actorOf(new Actor with Listeners { + def receive = listenerManagement orElse { + case "foo" ⇒ gossip("bar") + } + }).start() + + def newListener = actorOf(new Actor { + def receive = { + case "bar" ⇒ + barCount.incrementAndGet + barLatch.countDown() + case "foo" ⇒ + fooLatch.countDown() + } + }).start() + + val a1 = newListener + val a2 = newListener + val a3 = newListener + + broadcast ! Listen(a1) + broadcast ! Listen(a2) + broadcast ! Listen(a3) + + broadcast ! Deafen(a3) + + broadcast ! WithListeners(_ ! "foo") + broadcast ! "foo" + + barLatch.await + barCount.get must be(2) + + fooLatch.await + + for (a ← List(broadcast, a1, a2, a3)) a.stop() + } + } +} \ No newline at end of file diff --git a/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala b/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala index f4bd4f1204..10f80904a6 100644 --- a/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala @@ -1,721 +1,321 @@ -package akka.actor.routing +package akka.routing import org.scalatest.WordSpec import org.scalatest.matchers.MustMatchers - -import akka.testkit._ -import akka.testkit.Testing.sleepFor -import akka.util.duration._ - -import akka.actor._ -import akka.actor.Actor._ import akka.routing._ -import akka.event.EventHandler - +import akka.routing.Router import java.util.concurrent.atomic.AtomicInteger -import akka.dispatch.{ KeptPromise, Future } +import akka.actor.Actor._ +import akka.actor.{ActorRef, Actor} +import collection.mutable.LinkedList +import akka.routing.Routing.Broadcast +import java.util.concurrent.{CountDownLatch, TimeUnit} object RoutingSpec { - trait Foo { - def sq(x: Int, sleep: Long): Future[Int] - } - class FooImpl extends Foo { - def sq(x: Int, sleep: Long): Future[Int] = { - if (sleep > 0) Thread.sleep(sleep) - new KeptPromise(Right(x * x)) + class TestActor extends Actor with Serializable { + def receive = { + case _ => + println("Hello") } } } class RoutingSpec extends WordSpec with MustMatchers { - import Routing._ - "Routing" must { + import akka.routing.RoutingSpec._ - "dispatch" in { - val Test1 = "test1" - val Test2 = "test2" - val Test3 = "test3" + "direct router" must { + "be started when constructed" in { + val actor1 = Actor.actorOf[TestActor].start - val t1 = actorOf(new Actor { - def receive = { - case Test1 ⇒ self.reply(3) - case Test2 ⇒ self.reply(7) - } - }).start() - - val t2 = actorOf(new Actor() { - def receive = { - case Test3 ⇒ self.reply(11) - } - }).start() - - val d = routerActor { - case Test1 | Test2 ⇒ t1 - case Test3 ⇒ t2 - }.start() - - implicit val timeout = Actor.Timeout((5 seconds).dilated) - val result = for { - a ← (d ? (Test1)).as[Int] - b ← (d ? (Test2)).as[Int] - c ← (d ? (Test3)).as[Int] - } yield a + b + c - - result.isDefined must be(true) - result.get must be(21) - - for (a ← List(t1, t2, d)) a.stop() + val actor = Routing.newRoutedActorRef("foo", List(actor1), RouterType.Direct) + actor.isRunning must be(true) } - "have messages logged" in { - val msgs = new java.util.concurrent.ConcurrentSkipListSet[Any] - val latch = TestLatch(2) - - val actor = actorOf(new Actor { - def receive = { case _ ⇒ } - }).start() - - val logger = loggerActor(actor, x ⇒ { msgs.add(x); latch.countDown() }).start() - - val foo: Any = "foo" - val bar: Any = "bar" - - logger ! foo - logger ! bar - - latch.await - - msgs must (have size (2) and contain(foo) and contain(bar)) - - actor.stop() - logger.stop() - } - - "dispatch to smallest mailbox" in { - val t1Count = new AtomicInteger(0) - val t2Count = new AtomicInteger(0) - val latch1 = TestLatch(2501) - val latch2 = TestLatch(2499) - - val t1 = actorOf(new Actor { - def receive = { - case x ⇒ - t1Count.incrementAndGet - latch1.countDown() - } - }).start() - - t1.dispatcher.suspend(t1) - - for (i <- 1 to 2501) t1 ! i - - val t2 = actorOf(new Actor { - def receive = { - case x ⇒ - t2Count.incrementAndGet - latch2.countDown() - } - }).start() - - val d = loadBalancerActor(new SmallestMailboxFirstIterator(t1 :: t2 :: Nil)) //Will pick the last with the smallest mailbox, so make sure t1 is last - - for (i ← 1 to 2499 ) d ! i - - latch2.await(20 seconds) - - t1.dispatcher.resume(t1) - + "throw IllegalArgumentException at construction when no connections" in { try { - latch1.await(20 seconds) - } finally { - // because t1 is much slower and thus has a bigger mailbox all the time - t1Count.get must be === 2501 - t2Count.get must be === 2499 - for (a ← List(t1, t2, d)) a.stop() + Routing.newRoutedActorRef("foo", List(), RouterType.Direct) + fail() + } catch { + case e: IllegalArgumentException => } } - "listen" in { - val fooLatch = TestLatch(2) - val barLatch = TestLatch(2) - val barCount = new AtomicInteger(0) + "send message to connection" in { + val doneLatch = new CountDownLatch(1) - val broadcast = actorOf(new Actor with Listeners { - def receive = listenerManagement orElse { - case "foo" ⇒ gossip("bar") - } - }).start() - - def newListener = actorOf(new Actor { + val counter = new AtomicInteger(0) + val connection1 = actorOf(new Actor { def receive = { - case "bar" ⇒ - barCount.incrementAndGet - barLatch.countDown() - case "foo" ⇒ - fooLatch.countDown() + case "end" => doneLatch.countDown() + case _ => counter.incrementAndGet } }).start() - val a1 = newListener - val a2 = newListener - val a3 = newListener - broadcast ! Listen(a1) - broadcast ! Listen(a2) - broadcast ! Listen(a3) + val routedActor = Routing.newRoutedActorRef("foo", List(connection1), RouterType.Direct) + routedActor ! "hello" + routedActor ! "end" - broadcast ! Deafen(a3) + doneLatch.await(5, TimeUnit.SECONDS) must be(true) - broadcast ! WithListeners(_ ! "foo") - broadcast ! "foo" + counter.get must be(1) + } - barLatch.await - barCount.get must be(2) + "deliver a broadcast message" in { + val doneLatch = new CountDownLatch(1) - fooLatch.await + val counter1 = new AtomicInteger + val connection1 = actorOf(new Actor { + def receive = { + case "end" => doneLatch.countDown() + case msg: Int => counter1.addAndGet(msg) + } + }).start() - for (a ← List(broadcast, a1, a2, a3)) a.stop() + val actor = Routing.newRoutedActorRef("foo", List(connection1), RouterType.Direct) + + actor ! Broadcast(1) + actor ! "end" + + doneLatch.await(5, TimeUnit.SECONDS) must be(true) + + counter1.get must be(1) } } - "Actor Pool" must { + "round robin router" must { - "have expected capacity" in { - val latch = TestLatch(2) - val count = new AtomicInteger(0) + "be started when constructed" in { + val actor1 = Actor.actorOf[TestActor].start - val pool = actorOf( - new Actor with DefaultActorPool with DefaultActorPoolSupervisionConfig with FixedCapacityStrategy with SmallestMailboxSelector { - def factory = actorOf(new Actor { - def receive = { - case _ ⇒ - count.incrementAndGet - latch.countDown() - self tryReply "success" - } - }).start() + val actor = Routing.newRoutedActorRef("foo", List(actor1), RouterType.RoundRobin) + actor.isRunning must be(true) + } - def limit = 2 - def selectionCount = 1 - def partialFill = true - def instance = factory - def receive = _route + "throw IllegalArgumentException at construction when no connections" in { + try { + Routing.newRoutedActorRef("foo", List(), RouterType.RoundRobin) + fail() + } catch { + case e: IllegalArgumentException => + } + } + + //In this test a bunch of actors are created and each actor has its own counter. + //to test round robin, the routed actor receives the following sequence of messages 1 2 3 .. 1 2 3 .. 1 2 3 which it + //uses to increment his counter. + //So after n iteration, the first actor his counter should be 1*n, the second 2*n etc etc. + "deliver messages in a round robin fashion" in { + val connectionCount = 10 + val iterationCount = 10 + val doneLatch = new CountDownLatch(connectionCount) + + //lets create some connections. + var connections = new LinkedList[ActorRef] + var counters = new LinkedList[AtomicInteger] + for (i <- 0 until connectionCount) { + counters = counters :+ new AtomicInteger() + + val connection = actorOf(new Actor { + def receive = { + case "end"=>doneLatch.countDown() + case msg: Int => counters.get(i).get.addAndGet(msg) + } }).start() + connections = connections :+ connection + } - val successes = TestLatch(2) - val successCounter = actorOf(new Actor { + //create the routed actor. + val actor = Routing.newRoutedActorRef("foo", connections, RouterType.RoundRobin) + + //send messages to the actor. + for (i <- 0 until iterationCount) { + for (k <- 0 until connectionCount) { + actor ! (k + 1) + } + } + + actor ! Broadcast("end") + //now wait some and do validations. + doneLatch.await(5, TimeUnit.SECONDS) must be(true) + + for (i <- 0 until connectionCount) { + val counter = counters.get(i).get + counter.get must be((iterationCount * (i + 1))) + } + } + + "deliver a broadcast message using the !" in { + val doneLatch = new CountDownLatch(2) + + val counter1 = new AtomicInteger + val connection1 = actorOf(new Actor { def receive = { - case "success" ⇒ successes.countDown() + case "end" => doneLatch.countDown() + case msg: Int => counter1.addAndGet(msg) } }).start() - implicit val replyTo = successCounter - pool ! "a" - pool ! "b" + val counter2 = new AtomicInteger + val connection2 = actorOf(new Actor { + def receive = { + case "end" => doneLatch.countDown() + case msg: Int => counter2.addAndGet(msg) + } + }).start() - latch.await - successes.await + val actor = Routing.newRoutedActorRef("foo", List(connection1, connection2), RouterType.RoundRobin) - count.get must be(2) + actor ! Broadcast(1) + actor ! Broadcast("end") - (pool ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2) + doneLatch.await(5, TimeUnit.SECONDS) must be(true) - pool.stop() + counter1.get must be(1) + counter2.get must be(1) } - "pass ticket #705" in { - val pool = actorOf( - new Actor with DefaultActorPool with DefaultActorPoolSupervisionConfig with BoundedCapacityStrategy with MailboxPressureCapacitor with SmallestMailboxSelector with BasicFilter { - def lowerBound = 2 - def upperBound = 20 - def rampupRate = 0.1 - def backoffRate = 0.1 - def backoffThreshold = 0.5 - def partialFill = true - def selectionCount = 1 - def instance = factory - def receive = _route - def pressureThreshold = 1 - def factory = actorOf(new Actor { - def receive = { - case req: String ⇒ { - sleepFor(10 millis) - self.tryReply("Response") - } - } - }) - }).start() + "fail to deliver a broadcast message using the ?" in { + val doneLatch = new CountDownLatch(1) + + val counter1 = new AtomicInteger + val connection1 = actorOf(new Actor { + def receive = { + case "end" => doneLatch.countDown() + case _ => counter1.incrementAndGet() + } + }).start() + + + val actor = Routing.newRoutedActorRef("foo", List(connection1), RouterType.RoundRobin) try { - (for (count ← 1 to 500) yield pool.?("Test", 20000)) foreach { - _.await.resultOrException.get must be("Response") - } - } finally { - pool.stop() - } - } - - "grow as needed under pressure" in { - // make sure the pool starts at the expected lower limit and grows to the upper as needed - // as influenced by the backlog of blocking pooled actors - - var latch = TestLatch(3) - val count = new AtomicInteger(0) - - val pool = actorOf( - new Actor with DefaultActorPool with DefaultActorPoolSupervisionConfig with BoundedCapacityStrategy with ActiveFuturesPressureCapacitor with SmallestMailboxSelector with BasicNoBackoffFilter { - def factory = actorOf(new Actor { - def receive = { - case n: Int ⇒ - sleepFor(n millis) - count.incrementAndGet - latch.countDown() - } - }) - - def lowerBound = 2 - def upperBound = 4 - def rampupRate = 0.1 - def partialFill = true - def selectionCount = 1 - def instance = factory - def receive = _route - }).start() - - // first message should create the minimum number of delgates - - pool ! 1 - - (pool ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2) - - var loops = 0 - def loop(t: Int) = { - latch = TestLatch(loops) - count.set(0) - for (m ← 0 until loops) { - pool ? t - sleepFor(50 millis) - } + actor ? Broadcast(1) + fail() + } catch { + case e: RoutingException => } - // 2 more should go thru without triggering more - - loops = 2 - - loop(500) - latch.await - count.get must be(loops) - - (pool ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2) - - // a whole bunch should max it out - - loops = 10 - loop(500) - latch.await - count.get must be(loops) - - (pool ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(4) - - pool.stop() - } - - "grow as needed under mailbox pressure" in { - // make sure the pool starts at the expected lower limit and grows to the upper as needed - // as influenced by the backlog of messages in the delegate mailboxes - - var latch = TestLatch(3) - val count = new AtomicInteger(0) - - val pool = actorOf( - new Actor with DefaultActorPool with DefaultActorPoolSupervisionConfig with BoundedCapacityStrategy with MailboxPressureCapacitor with SmallestMailboxSelector with BasicNoBackoffFilter { - def factory = actorOf(new Actor { - def receive = { - case n: Int ⇒ - sleepFor(n millis) - count.incrementAndGet - latch.countDown() - } - }) - - def lowerBound = 2 - def upperBound = 4 - def pressureThreshold = 3 - def rampupRate = 0.1 - def partialFill = true - def selectionCount = 1 - def instance = factory - def receive = _route - }).start() - - var loops = 0 - def loop(t: Int) = { - latch = TestLatch(loops) - count.set(0) - for (m ← 0 until loops) { - pool ! t - } - } - - // send a few messages and observe pool at its lower bound - loops = 3 - loop(500) - latch.await - count.get must be(loops) - - (pool ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2) - - // send a bunch over the theshold and observe an increment - loops = 15 - loop(500) - - latch.await(10 seconds) - count.get must be(loops) - - (pool ? ActorPool.Stat).as[ActorPool.Stats].get.size must be >= (3) - - pool.stop() - } - - "round robin" in { - val latch1 = TestLatch(2) - val delegates = new java.util.concurrent.ConcurrentHashMap[String, String] - - val pool1 = actorOf( - new Actor with DefaultActorPool with DefaultActorPoolSupervisionConfig with FixedCapacityStrategy with RoundRobinSelector with BasicNoBackoffFilter { - def factory = actorOf(new Actor { - def receive = { - case _ ⇒ - delegates put (self.uuid.toString, "") - latch1.countDown() - } - }) - - def limit = 1 - def selectionCount = 1 - def rampupRate = 0.1 - def partialFill = true - def instance = factory - def receive = _route - }).start() - - pool1 ! "a" - pool1 ! "b" - - latch1.await - delegates.size must be(1) - - pool1.stop() - - val latch2 = TestLatch(2) - delegates.clear() - - val pool2 = actorOf( - new Actor with DefaultActorPool with DefaultActorPoolSupervisionConfig with FixedCapacityStrategy with RoundRobinSelector with BasicNoBackoffFilter { - def factory = actorOf(new Actor { - def receive = { - case _ ⇒ - delegates put (self.uuid.toString, "") - latch2.countDown() - } - }) - - def limit = 2 - def selectionCount = 1 - def rampupRate = 0.1 - def partialFill = false - def instance = factory - def receive = _route - }).start() - - pool2 ! "a" - pool2 ! "b" - - latch2.await - delegates.size must be(2) - - pool2.stop() - } - - "backoff" in { - val latch = TestLatch(10) - - val pool = actorOf( - new Actor with DefaultActorPool with DefaultActorPoolSupervisionConfig with BoundedCapacityStrategy with MailboxPressureCapacitor with SmallestMailboxSelector with Filter with RunningMeanBackoff with BasicRampup { - def factory = actorOf(new Actor { - def receive = { - case n: Int ⇒ - sleepFor(n millis) - latch.countDown() - } - }) - - def lowerBound = 1 - def upperBound = 5 - def pressureThreshold = 1 - def partialFill = true - def selectionCount = 1 - def rampupRate = 0.1 - def backoffRate = 0.50 - def backoffThreshold = 0.50 - def instance = factory - def receive = _route - }).start() - - // put some pressure on the pool - - for (m ← 0 to 10) pool ! 250 - - sleepFor(5 millis) - - val z = (pool ? ActorPool.Stat).as[ActorPool.Stats].get.size - - z must be >= (2) - - // let it cool down - - for (m ← 0 to 3) { - pool ! 1 - sleepFor(500 millis) - } - - (pool ? ActorPool.Stat).as[ActorPool.Stats].get.size must be <= (z) - - pool.stop() - } - - "support typed actors" in { - import RoutingSpec._ - import TypedActor._ - def createPool = new Actor with DefaultActorPool with DefaultActorPoolSupervisionConfig with BoundedCapacityStrategy with MailboxPressureCapacitor with SmallestMailboxSelector with Filter with RunningMeanBackoff with BasicRampup { - def lowerBound = 1 - def upperBound = 5 - def pressureThreshold = 1 - def partialFill = true - def selectionCount = 1 - def rampupRate = 0.1 - def backoffRate = 0.50 - def backoffThreshold = 0.50 - def instance = getActorRefFor(typedActorOf[Foo, FooImpl]()) - def receive = _route - } - - val pool = createProxy[Foo](createPool) - - val results = for (i ← 1 to 100) yield (i, pool.sq(i, 100)) - - for ((i, r) ← results) r.get must equal(i * i) - } - - "provide default supervision of pooled actors" in { - import akka.config.Supervision._ - val pingCount = new AtomicInteger(0) - val deathCount = new AtomicInteger(0) - var keepDying = false - - val pool1 = actorOf( - new Actor with DefaultActorPool with DefaultActorPoolSupervisionConfig with BoundedCapacityStrategy with ActiveFuturesPressureCapacitor with SmallestMailboxSelector with BasicFilter { - def lowerBound = 2 - def upperBound = 5 - def rampupRate = 0.1 - def backoffRate = 0.1 - def backoffThreshold = 0.5 - def partialFill = true - def selectionCount = 1 - def instance = factory - def receive = _route - def pressureThreshold = 1 - def factory = actorOf(new Actor { - if (deathCount.get > 5) deathCount.set(0) - if (deathCount.get > 0) {deathCount.incrementAndGet;throw new IllegalStateException("keep dying")} - def receive = { - case akka.Die ⇒ - if (keepDying) deathCount.incrementAndGet - throw new RuntimeException - case _ => pingCount.incrementAndGet - } - }).start() - }).start() - - val pool2 = actorOf( - new Actor with DefaultActorPool with DefaultActorPoolSupervisionConfig with BoundedCapacityStrategy with ActiveFuturesPressureCapacitor with SmallestMailboxSelector with BasicFilter { - def lowerBound = 2 - def upperBound = 5 - def rampupRate = 0.1 - def backoffRate = 0.1 - def backoffThreshold = 0.5 - def partialFill = true - def selectionCount = 1 - def instance = factory - def receive = _route - def pressureThreshold = 1 - def factory = actorOf(new Actor { - self.lifeCycle = Permanent - if (deathCount.get > 5) deathCount.set(0) - if (deathCount.get > 0) {deathCount.incrementAndGet;throw new IllegalStateException("keep dying")} - def receive = { - case akka.Die ⇒ - if (keepDying) deathCount.incrementAndGet - throw new RuntimeException - case _ => pingCount.incrementAndGet - } - }).start() - }).start() - - val pool3 = actorOf( - new Actor with DefaultActorPool with DefaultActorPoolSupervisionConfig with BoundedCapacityStrategy with ActiveFuturesPressureCapacitor with RoundRobinSelector with BasicFilter { - def lowerBound = 2 - def upperBound = 5 - def rampupRate = 0.1 - def backoffRate = 0.1 - def backoffThreshold = 0.5 - def partialFill = true - def selectionCount = 1 - def instance = factory - def receive = _route - def pressureThreshold = 1 - def factory = actorOf(new Actor { - self.lifeCycle = Temporary - if (deathCount.get > 5) deathCount.set(0) - if (deathCount.get > 0) {deathCount.incrementAndGet;throw new IllegalStateException("keep dying")} - def receive = { - case akka.Die ⇒ - if (keepDying) deathCount.incrementAndGet - throw new RuntimeException - case _ => pingCount.incrementAndGet - } - }).start() - }).start() - - // default lifecycle - // actor comes back right away - pingCount.set(0) - keepDying = false - pool1 ! "ping" - (pool1 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2) - pool1 ! akka.Die - sleepFor(2 seconds) - (pool1 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2) - pingCount.get must be (1) - - // default lifecycle - // actor dies completely - pingCount.set(0) - keepDying = true - pool1 ! "ping" - (pool1 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2) - pool1 ! akka.Die - sleepFor(2 seconds) - (pool1 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(1) - pool1 ! "ping" - (pool1 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2) - pingCount.get must be (2) - - // permanent lifecycle - // actor comes back right away - pingCount.set(0) - keepDying = false - pool2 ! "ping" - (pool2 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2) - pool2 ! akka.Die - sleepFor(2 seconds) - (pool2 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2) - pingCount.get must be (1) - - // permanent lifecycle - // actor dies completely - pingCount.set(0) - keepDying = true - pool2 ! "ping" - (pool2 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2) - pool2 ! akka.Die - sleepFor(2 seconds) - (pool2 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(1) - pool2 ! "ping" - (pool2 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2) - pingCount.get must be (2) - - // temporary lifecycle - pingCount.set(0) - keepDying = false - pool3 ! "ping" - (pool3 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2) - pool3 ! akka.Die - sleepFor(2 seconds) - (pool3 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(1) - pool3 ! "ping" - pool3 ! "ping" - pool3 ! "ping" - (pool3 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2) - pingCount.get must be (4) - } - - "support customizable supervision config of pooled actors" in { - import akka.config.Supervision._ - val pingCount = new AtomicInteger(0) - val deathCount = new AtomicInteger(0) - var keepDying = false - - trait LimitedTrapSupervisionConfig extends ActorPoolSupervisionConfig { - def poolFaultHandler = OneForOneStrategy(List(classOf[IllegalStateException]), 5, 1000) - } - - object BadState - - val pool1 = actorOf( - new Actor with DefaultActorPool with LimitedTrapSupervisionConfig with BoundedCapacityStrategy with ActiveFuturesPressureCapacitor with SmallestMailboxSelector with BasicFilter { - def lowerBound = 2 - def upperBound = 5 - def rampupRate = 0.1 - def backoffRate = 0.1 - def backoffThreshold = 0.5 - def partialFill = true - def selectionCount = 1 - def instance = factory - def receive = _route - def pressureThreshold = 1 - def factory = actorOf(new Actor { - if (deathCount.get > 5) deathCount.set(0) - if (deathCount.get > 0) {deathCount.incrementAndGet;throw new IllegalStateException("keep dying")} - def receive = { - case BadState ⇒ - if (keepDying) deathCount.incrementAndGet - throw new IllegalStateException - case akka.Die => - throw new RuntimeException - case _ => pingCount.incrementAndGet - } - }).start() - }).start() - - - // actor comes back right away - pingCount.set(0) - keepDying = false - pool1 ! "ping" - (pool1 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2) - pool1 ! BadState - sleepFor(2 seconds) - (pool1 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2) - pingCount.get must be (1) - - // actor dies completely - pingCount.set(0) - keepDying = true - pool1 ! "ping" - (pool1 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2) - pool1 ! BadState - sleepFor(2 seconds) - (pool1 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(1) - pool1 ! "ping" - (pool1 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2) - pingCount.get must be (2) - - // kill it - intercept[RuntimeException](pool1.?(akka.Die).get) + actor ! "end" + doneLatch.await(5, TimeUnit.SECONDS) must be(true) + counter1.get must be(0) } } -} + "random router" must { + + "be started when constructed" in { + + val actor1 = Actor.actorOf[TestActor].start + + val actor = Routing.newRoutedActorRef("foo", List(actor1), RouterType.Random) + actor.isRunning must be(true) + } + + "throw IllegalArgumentException at construction when no connections" in { + try { + Routing.newRoutedActorRef("foo", List(), RouterType.Random) + fail() + } catch { + case e: IllegalArgumentException => + } + } + + "deliver messages in a random fashion" in{ + + } + + "deliver a broadcast message" in { + val doneLatch = new CountDownLatch(2) + + val counter1 = new AtomicInteger + val connection1 = actorOf(new Actor { + def receive = { + case "end" => doneLatch.countDown() + case msg: Int => counter1.addAndGet(msg) + } + }).start() + + val counter2 = new AtomicInteger + val connection2 = actorOf(new Actor { + def receive = { + case "end" => doneLatch.countDown() + case msg: Int => counter2.addAndGet(msg) + } + }).start() + + val actor = Routing.newRoutedActorRef("foo", List(connection1, connection2), RouterType.Random) + + actor ! Broadcast(1) + actor ! Broadcast("end") + + doneLatch.await(5, TimeUnit.SECONDS) must be(true) + + counter1.get must be(1) + counter2.get must be(1) + } + + "fail to deliver a broadcast message using the ?" in { + val doneLatch = new CountDownLatch(1) + + val counter1 = new AtomicInteger + val connection1 = actorOf(new Actor { + def receive = { + case "end" => doneLatch.countDown() + case _ => counter1.incrementAndGet() + } + }).start() + + val actor = Routing.newRoutedActorRef("foo", List(connection1), RouterType.Random) + + try { + actor ? Broadcast(1) + fail() + } catch { + case e: RoutingException => + } + + actor ! "end" + doneLatch.await(5, TimeUnit.SECONDS) must be(true) + counter1.get must be(0) + } + } + + "least cpu router" must { + "throw IllegalArgumentException when constructed" in { + val actor1 = Actor.actorOf[TestActor].start + + try { + Routing.newRoutedActorRef("foo", List(actor1), RouterType.LeastCPU) + } catch { + case e: IllegalArgumentException => + } + } + } + + "least ram router" must { + "throw IllegalArgumentException when constructed" in { + val actor1 = Actor.actorOf[TestActor].start + + try { + Routing.newRoutedActorRef("foo", List(actor1), RouterType.LeastRAM) + } catch { + case e: IllegalArgumentException => + } + } + } + + "smallest mailbox" must { + "throw IllegalArgumentException when constructed" in { + val actor1 = Actor.actorOf[TestActor].start + + try { + Routing.newRoutedActorRef("foo", List(actor1), RouterType.LeastMessages) + } catch { + case e: IllegalArgumentException => + } + } + } +} \ No newline at end of file diff --git a/akka-actor/src/main/scala/akka/actor/ActorRef.scala b/akka-actor/src/main/scala/akka/actor/ActorRef.scala index 58a592a7c0..1988cab2f8 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRef.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRef.scala @@ -301,7 +301,7 @@ trait ActorRef extends ActorRefShared with ForwardableChannel with java.lang.Com * * If you would rather have an exception, check the reply(..) version. */ - def tryReply(message: Any): Boolean = channel.safe_!(message)(this) + def tryReply(message: Any): Boolean = channel.safe_!(message)(this) /** * Sets the dispatcher for this actor. Needs to be invoked before the actor is started. @@ -480,12 +480,16 @@ class LocalActorRef private[akka](private[this] val actorFactory: () ⇒ Actor, "]") private val serializer: Serializer = - try { Serialization.serializerFor(this.getClass) } catch { case e: Exception => serializerErrorDueTo(e.toString)} + try { + Serialization.serializerFor(this.getClass) + } catch { + case e: Exception => serializerErrorDueTo(e.toString) + } private lazy val replicationStorage: Option[TransactionLog] = { import DeploymentConfig._ val replicationScheme = replicationSchemeFor(Deployer.deploymentFor(address)).getOrElse(Transient) - if(isReplicated(replicationScheme)) { + if (isReplicated(replicationScheme)) { if (isReplicatedWithTransactionLog(replicationScheme)) { EventHandler.debug(this, "Creating a transaction log for Actor [%s] with replication strategy [%s]".format(address, replicationScheme)) @@ -649,7 +653,8 @@ class LocalActorRef private[akka](private[this] val actorFactory: () ⇒ Actor, def mailbox: AnyRef = _mailbox protected[akka] def mailbox_=(value: AnyRef): AnyRef = { - _mailbox = value; value + _mailbox = value; + value } /** @@ -842,7 +847,7 @@ class LocalActorRef private[akka](private[this] val actorFactory: () ⇒ Actor, actorRef.lifeCycle match { // either permanent or none where default is permanent case Temporary ⇒ shutDownTemporaryActor(actorRef, reason) - case _ ⇒ actorRef.restart(reason, maxNrOfRetries, withinTimeRange) + case _ ⇒ actorRef.restart(reason, maxNrOfRetries, withinTimeRange) } } } @@ -865,7 +870,7 @@ class LocalActorRef private[akka](private[this] val actorFactory: () ⇒ Actor, } fresh match { case Some(ref) ⇒ ref - case None ⇒ actorFactory() + case None ⇒ actorFactory() } } else { actorFactory() @@ -902,7 +907,7 @@ class LocalActorRef private[akka](private[this] val actorFactory: () ⇒ Actor, else { lifeCycle match { case Temporary ⇒ shutDownTemporaryActor(this, reason) - case _ ⇒ dispatcher.resume(this) //Resume processing for this actor + case _ ⇒ dispatcher.resume(this) //Resume processing for this actor } } } @@ -1259,3 +1264,58 @@ case class SerializedActorRef(uuid: Uuid, "] but it's not found in the local registry and remoting is not enabled.") } } + + +/** + * Trait for ActorRef implementations where most of the methods are not supported. + */ +trait UnsupportedActorRef extends ActorRef with ScalaActorRef{ + + def dispatcher_=(md: MessageDispatcher) { + unsupported + } + + def dispatcher: MessageDispatcher = unsupported + + def link(actorRef: ActorRef) { + unsupported + } + + def unlink(actorRef: ActorRef) { + unsupported + } + + def startLink(actorRef: ActorRef): ActorRef = unsupported + + def supervisor: Option[ActorRef] = unsupported + + def linkedActors: JMap[Uuid, ActorRef] = unsupported + + protected[akka] def mailbox: AnyRef = unsupported + + protected[akka] def mailbox_=(value: AnyRef): AnyRef = unsupported + + protected[akka] def handleTrapExit(dead: ActorRef, reason: Throwable) { + unsupported + } + + protected[akka] def restart(reason: Throwable, maxNrOfRetries: Option[Int], withinTimeRange: Option[Int]) { + unsupported + } + + protected[akka] def restartLinkedActors(reason: Throwable, maxNrOfRetries: Option[Int], withinTimeRange: Option[Int]) { + unsupported + } + + protected[akka] def invoke(messageHandle: MessageInvocation) { + unsupported + } + + protected[akka] def supervisor_=(sup: Option[ActorRef]) { + unsupported + } + + protected[akka] def actorInstance: AtomicReference[Actor] = unsupported + + private def unsupported = throw new UnsupportedOperationException("Not supported for %s".format(getClass.getName)) +} \ No newline at end of file diff --git a/akka-actor/src/main/scala/akka/routing/Routing.scala b/akka-actor/src/main/scala/akka/routing/Routing.scala index 15ec9653a9..9a0cef33d9 100644 --- a/akka-actor/src/main/scala/akka/routing/Routing.scala +++ b/akka-actor/src/main/scala/akka/routing/Routing.scala @@ -4,18 +4,19 @@ package akka.routing -//TODO: This will package is going to be removed. - -import akka.actor.{ UntypedActor, Actor} import akka.actor.Actor._ - -import akka.actor.ActorRef import scala.collection.JavaConversions._ import scala.collection.immutable.Seq import java.util.concurrent.atomic.AtomicReference import annotation.tailrec import akka.AkkaException +import akka.dispatch.Future +import java.net.InetSocketAddress +import akka.actor._ +import akka.event.EventHandler +import akka.actor.UntypedChannel._ +import akka.routing.RouterType.RoundRobin class RoutingException(message: String) extends AkkaException(message) @@ -25,193 +26,307 @@ sealed trait RouterType * @author Jonas Bonér */ object RouterType { + object Direct extends RouterType + + /** + * A RouterType that randomly selects a connection to send a message to. + */ object Random extends RouterType + + /** + * A RouterType that selects the connection by using round robin. + */ object RoundRobin extends RouterType + + /** + * A RouterType that selects the connection based on the least amount of cpu usage + */ object LeastCPU extends RouterType + + /** + * A RouterType that select the connection based on the least amount of ram used. + * + * todo: this is extremely vague currently since there are so many ways to define least amount of ram. + */ object LeastRAM extends RouterType + + /** + * A RouterType that select the connection where the actor has the least amount of messages in its mailbox. + */ object LeastMessages extends RouterType -} -/** - * A Router is a trait whose purpose is to route incoming messages to actors. - */ -trait Router { this: Actor ⇒ - - protected def transform(msg: Any): Any = msg - - protected def routes: PartialFunction[Any, ActorRef] - - protected def broadcast(message: Any) {} - - protected def dispatch: Receive = { - case Routing.Broadcast(message) ⇒ - broadcast(message) - case a if routes.isDefinedAt(a) ⇒ - if (isSenderDefined) routes(a).forward(transform(a))(someSelf) - else routes(a).!(transform(a))(None) - } - - def receive = dispatch - - private def isSenderDefined = self.senderFuture.isDefined || self.sender.isDefined -} - -/** - * An UntypedRouter is an abstract class whose purpose is to route incoming messages to actors. - */ -abstract class UntypedRouter extends UntypedActor { - protected def transform(msg: Any): Any = msg - - protected def route(msg: Any): ActorRef - - protected def broadcast(message: Any) {} - - private def isSenderDefined = self.senderFuture.isDefined || self.sender.isDefined - - @throws(classOf[Exception]) - def onReceive(msg: Any): Unit = msg match { - case m: Routing.Broadcast ⇒ broadcast(m.message) - case _ ⇒ - val r = route(msg) - if (r eq null) throw new IllegalStateException("No route for " + msg + " defined!") - if (isSenderDefined) r.forward(transform(msg))(someSelf) - else r.!(transform(msg))(None) - } -} - -/** - * A LoadBalancer is a specialized kind of Router, that is supplied an InfiniteIterator of targets - * to dispatch incoming messages to. - */ -trait LoadBalancer extends Router { self: Actor ⇒ - protected def seq: InfiniteIterator[ActorRef] - - protected def routes = { - case x if seq.hasNext ⇒ seq.next - } - - override def broadcast(message: Any) = seq.items.foreach(_ ! message) -} - -/** - * A UntypedLoadBalancer is a specialized kind of UntypedRouter, that is supplied an InfiniteIterator of targets - * to dispatch incoming messages to. - */ -abstract class UntypedLoadBalancer extends UntypedRouter { - protected def seq: InfiniteIterator[ActorRef] - - protected def route(msg: Any) = - if (seq.hasNext) seq.next - else null - - override def broadcast(message: Any) = seq.items.foreach(_ ! message) } object Routing { sealed trait RoutingMessage + case class Broadcast(message: Any) extends RoutingMessage - type PF[A, B] = PartialFunction[A, B] - - /** - * Creates a new PartialFunction whose isDefinedAt is a combination - * of the two parameters, and whose apply is first to call filter.apply - * and then filtered.apply. + /** + * Creates a new started RoutedActorRef that uses routing to deliver a message to one of its connected actors. + * + * @param actorAddress the address of the ActorRef. + * @param connections an Iterable pointing to all connected actor references. + * @param routerType the type of routing that should be used. + * @throws IllegalArgumentException if the number of connections is zero, or if it depends on the actual router implementation + * how many connections it can handle. */ - def filter[A, B](filter: PF[A, Unit], filtered: PF[A, B]): PF[A, B] = { - case a: A if filtered.isDefinedAt(a) && filter.isDefinedAt(a) ⇒ - filter(a) - filtered(a) + def newRoutedActorRef(actorAddress: String, connections: Iterable[ActorRef], routerType: RouterType): RoutedActorRef = { + if (connections.size == 0) + throw new IllegalArgumentException("To create a routed actor ref, at least one connection is required") + + val ref = routerType match { + case RouterType.Direct => + if (connections.size > 1) throw new IllegalArgumentException("A direct router can't have more than 1 connection") + new RoutedActorRef(actorAddress, connections) with Direct + case RouterType.Random => + new RoutedActorRef(actorAddress, connections) with Random + case RouterType.RoundRobin => + new RoutedActorRef(actorAddress, connections) with RoundRobin + case _ => throw new IllegalArgumentException("Unsupported routerType "+routerType) + } + + ref.start() } - /** - * Interceptor is a filter(x,y) where x.isDefinedAt is considered to be always true. - */ - def intercept[A: Manifest, B](interceptor: (A) ⇒ Unit, interceptee: PF[A, B]): PF[A, B] = - filter({ case a ⇒ interceptor(a) }, interceptee) + def newRoundRobinActorRef(actorAddress:String, connections: Iterable[ActorRef]):RoutedActorRef = { + newRoutedActorRef(actorAddress,connections, RoundRobin) + } +} - /** - * Creates a LoadBalancer from the thunk-supplied InfiniteIterator. - */ - def loadBalancerActor(actors: ⇒ InfiniteIterator[ActorRef]): ActorRef = - localActorOf(new Actor with LoadBalancer { - val seq = actors - }).start() - /** - * Creates a Router given a routing and a message-transforming function. - */ - def routerActor(routing: PF[Any, ActorRef], msgTransformer: (Any) ⇒ Any): ActorRef = - localActorOf(new Actor with Router { - override def transform(msg: Any) = msgTransformer(msg) - def routes = routing - }).start() +/** + * A RoutedActorRef is an ActorRef that has a set of connected ActorRef and it uses a Router to send a message to + * on (or more) of these actors. + */ +class RoutedActorRef(val address: String, val cons: Iterable[ActorRef]) extends UnsupportedActorRef with ScalaActorRef { + this: Router ⇒ - /** - * Creates a Router given a routing. - */ - def routerActor(routing: PF[Any, ActorRef]): ActorRef = localActorOf(new Actor with Router { - def routes = routing - }).start() + def connections: Iterable[ActorRef] = cons - /** - * Creates an actor that pipes all incoming messages to - * both another actor and through the supplied function - */ - def loggerActor(actorToLog: ActorRef, logger: (Any) ⇒ Unit): ActorRef = - routerActor({ case _ ⇒ actorToLog }, logger) + override def postMessageToMailbox(message: Any, channel: UntypedChannel): Unit = { + val sender = channel match { + case ref: ActorRef ⇒ Some(ref) + case _ ⇒ None + } + route(message)(sender) + } + + override def postMessageToMailboxAndCreateFutureResultWithTimeout(message: Any, + timeout: Long, + channel: UntypedChannel): Future[Any] = { + val sender = channel match { + case ref: ActorRef ⇒ Some(ref) + case _ ⇒ None + } + route[Any](message, timeout)(sender) + } + + private[akka] def failOver(from: InetSocketAddress, to: InetSocketAddress): Unit = { + throw new UnsupportedOperationException + } + + def signalDeadActor(ref: ActorRef): Unit = { + throw new UnsupportedOperationException + } + + def start(): this.type = synchronized[this.type] { + _status = ActorRefInternals.RUNNING + this + } + + def stop() { + synchronized { + if (_status == ActorRefInternals.RUNNING) { + _status = ActorRefInternals.SHUTDOWN + postMessageToMailbox(RemoteActorSystemMessage.Stop, None) + + // FIXME here we need to fire off Actor.cluster.remove(address) (which needs to be properly implemented first, see ticket) + + //inetSocketAddressToActorRefMap.get.values foreach (_.stop()) // shut down all remote connections + } + } + } } /** - * An Iterator that is either always empty or yields an infinite number of Ts. + * The Router is responsible for sending a message to one (or more) of its connections. + * + * @author Jonas Bonér */ -trait InfiniteIterator[T] extends Iterator[T] { - val items: Seq[T] +trait Router { + + /** + * Returns an Iterable containing all connected ActorRefs this Router uses to send messages to. + */ + def connections: Iterable[ActorRef] + + /** + * A callback this Router uses to indicate that some actorRef was not usable. + * + * Implementations should make sure that this method can be called without the actorRef being part of the + * current set of connections. The most logical way to deal with this situation, is just to ignore it. + * + * @param ref the dead + */ + def signalDeadActor(ref: ActorRef): Unit + + /** + * Routes the message to one of the connections. + */ + def route(message: Any)(implicit sender: Option[ActorRef]): Unit + + + /** + * Routes the message using a timeout to one of the connections and returns a Future to synchronize on the + * completion of the processing of the message. + */ + def route[T](message: Any, timeout: Long)(implicit sender: Option[ActorRef]): Future[T] } /** - * CyclicIterator is a round-robin style InfiniteIterator that cycles the supplied List. + * An Abstract Router implementation that already provides the basic infrastructure so that a concrete + * Router only needs to implement the next method. + * + * todo: + * This also is the location where a failover is done in the future if an ActorRef fails and a different + * one needs to be selected. + * todo: + * this is also the location where message buffering should be done in case of failure. */ -case class CyclicIterator[T](val items: Seq[T]) extends InfiniteIterator[T] { - def this(items: java.util.List[T]) = this(items.toList) +trait BasicRouter extends Router { - private[this] val current: AtomicReference[Seq[T]] = new AtomicReference(items) + def route(message: Any)(implicit sender: Option[ActorRef]): Unit = message match { + case Routing.Broadcast(message) ⇒ + //it is a broadcast message, we are going to send to message to all connections. + connections.foreach(actor => + try { + actor.!(message)(sender) + } catch { + case e: Exception => + signalDeadActor(actor) + throw e + } + ) + case _ ⇒ + //it no broadcast message, we are going to select an actor from the connections and send the message to him. + next match { + case Some(actor) => + try { + actor.!(message)(sender) + } catch { + case e: Exception => + signalDeadActor(actor) + throw e - def hasNext = items != Nil + } + case None => + throwNoConnectionsError() + } + } - def next: T = { + def route[T](message: Any, timeout: Long)(implicit sender: Option[ActorRef]): Future[T] = message match { + case Routing.Broadcast(message) ⇒ + throw new RoutingException("Broadcasting using a future for the time being is not supported") + case _ ⇒ + //it no broadcast message, we are going to select an actor from the connections and send the message to him. + next match { + case Some(actor) => + try { + actor.?(message, timeout)(sender).asInstanceOf[Future[T]] + } catch { + case e: Exception => + signalDeadActor(actor) + throw e + + } + case None => + throwNoConnectionsError() + } + } + + protected def next: Option[ActorRef] + + private def throwNoConnectionsError() = { + val error = new RoutingException("No replica connections for router") + EventHandler.error(error, this, error.toString) + throw error + } +} + +/** + * A Router that is used when a durable actor is used. All requests are send to the node containing the actor. + * As soon as that instance fails, a different instance is created and since the mailbox is durable, the internal + * state can be restored using event sourcing, and once this instance is up and running, all request will be send + * to this instance. + * + * @author Jonas Bonér + */ +trait Direct extends BasicRouter { + + lazy val next: Option[ActorRef] = { + val connection = connections.headOption + if (connection.isEmpty) EventHandler.warning(this, "Router has no replica connection") + connection + } +} + +/** + * A Router that randomly selects one of the target connections to send a message to. + * + * @author Jonas Bonér + */ +trait Random extends BasicRouter { + + //todo: threadlocal random? + private val random = new java.util.Random(System.currentTimeMillis) + + def next: Option[ActorRef] = + if (connections.isEmpty) { + EventHandler.warning(this, "Router has no replica connections") + None + } else { + val randomIndex = random.nextInt(connections.size) + + //todo: possible index ouf of bounds problems since the number of connection could already have been changed. + Some(connections.iterator.drop(randomIndex).next()) + } +} + +/** + * A Router that uses round-robin to select a connection. + * + * @author Jonas Bonér + */ +trait RoundRobin extends BasicRouter { + + private def items: List[ActorRef] = connections.toList + + //todo: this is broken since the list is not updated. + private val current = new AtomicReference[List[ActorRef]](items) + + private def hasNext = connections.nonEmpty + + def next: Option[ActorRef] = { @tailrec - def findNext: T = { + def findNext: Option[ActorRef] = { val currentItems = current.get val newItems = currentItems match { case Nil ⇒ items - case xs ⇒ xs + case xs ⇒ xs } - if (current.compareAndSet(currentItems, newItems.tail)) newItems.head - else findNext + if (newItems.isEmpty) { + EventHandler.warning(this, "Router has no replica connections") + None + } else { + if (current.compareAndSet(currentItems, newItems.tail)) newItems.headOption + else findNext + } } findNext } - - override def exists(f: T ⇒ Boolean): Boolean = items exists f } - -/** - * This InfiniteIterator always returns the Actor that has the currently smallest mailbox - * useful for work-stealing. - */ -case class SmallestMailboxFirstIterator(val items: Seq[ActorRef]) extends InfiniteIterator[ActorRef] { - - def this(items: java.util.List[ActorRef]) = this(items.toList) - - def hasNext = items != Nil - - def next = items.reduceLeft((a1, a2) ⇒ if (a1.dispatcher.mailboxSize(a1) < a2.dispatcher.mailboxSize(a2)) a1 else a2) - - override def exists(f: ActorRef ⇒ Boolean): Boolean = items.exists(f) -} - diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterActorRef.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterActorRef.scala index 5afe318daa..fa13fc9396 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterActorRef.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterActorRef.scala @@ -16,6 +16,7 @@ import java.util.{Map ⇒ JMap} import com.eaio.uuid.UUID import collection.immutable.Map import annotation.tailrec +import akka.routing.Router /** * ActorRef representing a one or many instances of a clustered, load-balanced and sometimes replicated actor @@ -26,8 +27,8 @@ import annotation.tailrec class ClusterActorRef private[akka](inetSocketAddresses: Array[Tuple2[UUID, InetSocketAddress]], val address: String, _timeout: Long) - extends ActorRef with ScalaActorRef { - this: Router.Router ⇒ + extends UnsupportedActorRef with ScalaActorRef { + this: Router ⇒ timeout = _timeout private[akka] val inetSocketAddressToActorRefMap = new AtomicReference[Map[InetSocketAddress, ActorRef]]( @@ -37,7 +38,7 @@ class ClusterActorRef private[akka](inetSocketAddresses: Array[Tuple2[UUID, Inet ClusterModule.ensureEnabled() - def connections: Map[InetSocketAddress, ActorRef] = inetSocketAddressToActorRefMap.get + def connections: Iterable[ActorRef] = inetSocketAddressToActorRefMap.get.values override def postMessageToMailbox(message: Any, channel: UntypedChannel): Unit = { val sender = channel match { @@ -96,11 +97,12 @@ class ClusterActorRef private[akka](inetSocketAddresses: Array[Tuple2[UUID, Inet def signalDeadActor(ref: ActorRef): Unit = { //since the number remote actor refs for a clustered actor ref is quite low, we can deal with the O(N) complexity //of the following removal. - val it = connections.keySet.iterator + val map = inetSocketAddressToActorRefMap.get + val it = map.keySet.iterator while (it.hasNext) { val address = it.next() - val foundRef: ActorRef = connections.get(address).get + val foundRef: ActorRef = map.get(address).get if (foundRef == ref) { remove(address) @@ -130,57 +132,4 @@ class ClusterActorRef private[akka](inetSocketAddresses: Array[Tuple2[UUID, Inet } } } - - // ======================================================================== - // ==== NOT SUPPORTED ==== - // ======================================================================== - - // FIXME move these methods and the same ones in RemoteActorRef to a base class - now duplicated - def dispatcher_=(md: MessageDispatcher) { - unsupported - } - - def dispatcher: MessageDispatcher = unsupported - - def link(actorRef: ActorRef) { - unsupported - } - - def unlink(actorRef: ActorRef) { - unsupported - } - - def startLink(actorRef: ActorRef): ActorRef = unsupported - - def supervisor: Option[ActorRef] = unsupported - - def linkedActors: JMap[Uuid, ActorRef] = unsupported - - protected[akka] def mailbox: AnyRef = unsupported - - protected[akka] def mailbox_=(value: AnyRef): AnyRef = unsupported - - protected[akka] def handleTrapExit(dead: ActorRef, reason: Throwable) { - unsupported - } - - protected[akka] def restart(reason: Throwable, maxNrOfRetries: Option[Int], withinTimeRange: Option[Int]) { - unsupported - } - - protected[akka] def restartLinkedActors(reason: Throwable, maxNrOfRetries: Option[Int], withinTimeRange: Option[Int]) { - unsupported - } - - protected[akka] def invoke(messageHandle: MessageInvocation) { - unsupported - } - - protected[akka] def supervisor_=(sup: Option[ActorRef]) { - unsupported - } - - protected[akka] def actorInstance: AtomicReference[Actor] = unsupported - - private def unsupported = throw new UnsupportedOperationException("Not supported for ClusterActorRef") } diff --git a/akka-cluster/src/main/scala/akka/cluster/Routing.scala b/akka-cluster/src/main/scala/akka/cluster/Routing.scala index 36a2393be6..70e3cca64f 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Routing.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Routing.scala @@ -1,28 +1,26 @@ +package akka.cluster + /** * Copyright (C) 2009-2011 Typesafe Inc. */ -package akka.cluster -import akka.actor._ -import akka.dispatch.Future -import akka.event.EventHandler -import akka.routing.{RouterType, RoutingException} +import akka.routing.RouterType import RouterType._ import com.eaio.uuid.UUID -import annotation.tailrec - import java.net.InetSocketAddress -import java.util.concurrent.atomic.AtomicReference +import akka.routing.{Random, Direct, RoundRobin} /** * @author Jonas Bonér */ object Router { - def newRouter( - routerType: RouterType, + /** + * Creates a new clustered ActorRef. + */ + def newRouter( routerType: RouterType, inetSocketAddresses: Array[Tuple2[UUID, InetSocketAddress]], actorAddress: String, timeout: Long): ClusterActorRef = { @@ -35,151 +33,4 @@ object Router { case LeastMessages ⇒ sys.error("Router LeastMessages not supported yet") } } - - /** - * The Router is responsible for sending a message to one (or more) of its connections. - * - * @author Jonas Bonér - */ - trait Router { - - /** - * Returns a Map containing all ActorRefs this Router uses send messages to. - */ - def connections: Map[InetSocketAddress, ActorRef] - - /** - * A callback this Router uses to indicate that some actorRef was not usable. - * - * Implementations should make sure that this method can be called without the actorRef being part of the - * current set of connections. The most logical way to deal with this situation, is just to ignore it. - * - * @param ref the dead - */ - def signalDeadActor(ref: ActorRef): Unit - - /** - * - */ - def route(message: Any)(implicit sender: Option[ActorRef]): Unit - - /** - * - */ - def route[T](message: Any, timeout: Long)(implicit sender: Option[ActorRef]): Future[T] - } - - /** - * An Abstract Router implementation that already provides the basic infrastructure so that a concrete - * Router only needs to implement the next method. - * - * This also is the location where a failover is done in the future if an ActorRef fails and a different - * one needs to be selected. - */ - trait BasicRouter extends Router { - - def route(message: Any)(implicit sender: Option[ActorRef]): Unit = next match { - case Some(actor) ⇒ { - try { - actor.!(message)(sender) - } catch { - case e: Exception => - signalDeadActor(actor) - throw e - } - } - case _ ⇒ throwNoConnectionsError() - } - - def route[T](message: Any, timeout: Long)(implicit sender: Option[ActorRef]): Future[T] = next match { - case Some(actor) ⇒ { - try { - actor.?(message, timeout)(sender).asInstanceOf[Future[T]] - } catch { - case e: Throwable => - signalDeadActor(actor) - throw e - } - } - case _ ⇒ throwNoConnectionsError() - } - - protected def next: Option[ActorRef] - - private def throwNoConnectionsError() = { - val error = new RoutingException("No replica connections for router") - EventHandler.error(error, this, error.toString) - throw error - } - } - - /** - * A Router that is used when a durable actor is used. All requests are send to the node containing the actor. - * As soon as that instance fails, a different instance is created and since the mailbox is durable, the internal - * state can be restored using event sourcing, and once this instance is up and running, all request will be send - * to this instance. - * - * @author Jonas Bonér - */ - trait Direct extends BasicRouter { - - lazy val next: Option[ActorRef] = { - val connection = connections.values.headOption - if (connection.isEmpty) EventHandler.warning(this, "Router has no replica connection") - connection - } - } - - /** - * A Router that randomly selects one of the target connections to send a message to. - * - * @author Jonas Bonér - */ - trait Random extends BasicRouter { - - private val random = new java.util.Random(System.currentTimeMillis) - - def next: Option[ActorRef] = - if (connections.isEmpty) { - EventHandler.warning(this, "Router has no replica connections") - None - } else { - Some(connections.valuesIterator.drop(random.nextInt(connections.size)).next()) - } - } - - /** - * A Router that uses round-robin to select a connection. - * - * @author Jonas Bonér - */ - trait RoundRobin extends BasicRouter { - private def items: List[ActorRef] = connections.values.toList - - private val current = new AtomicReference[List[ActorRef]](items) - - private def hasNext = connections.nonEmpty - - def next: Option[ActorRef] = { - @tailrec - def findNext: Option[ActorRef] = { - val currentItems = current.get - val newItems = currentItems match { - case Nil ⇒ items - case xs ⇒ xs - } - - if (newItems.isEmpty) { - EventHandler.warning(this, "Router has no replica connections") - None - } else { - if (current.compareAndSet(currentItems, newItems.tail)) newItems.headOption - else findNext - } - } - - findNext - } - } - } diff --git a/akka-samples/akka-sample-camel/src/test/scala/sample/camel/HttpConcurrencyTestStress.scala b/akka-samples/akka-sample-camel/src/test/scala/sample/camel/HttpConcurrencyTestStress.scala index bbe039af22..10e1a9b544 100644 --- a/akka-samples/akka-sample-camel/src/test/scala/sample/camel/HttpConcurrencyTestStress.scala +++ b/akka-samples/akka-sample-camel/src/test/scala/sample/camel/HttpConcurrencyTestStress.scala @@ -11,7 +11,6 @@ import akka.actor.Actor._ import akka.actor.{ActorRegistry, ActorRef, Actor} import akka.camel._ import akka.camel.CamelServiceManager._ -import akka.routing.CyclicIterator import akka.routing.Routing._ /** @@ -50,7 +49,7 @@ object HttpConcurrencyTestStress { startCamelService val workers = for (i <- 1 to 8) yield actorOf[HttpServerWorker].start - val balancer = loadBalancerActor(new CyclicIterator(workers.toList)) + val balancer = newRoundRobinActorRef("loadbalancer",workers) //service.get.awaitEndpointActivation(1) { // actorOf(new HttpServerActor(balancer)).start diff --git a/akka-tutorials/akka-tutorial-first/src/main/java/akka/tutorial/first/java/Pi.java b/akka-tutorials/akka-tutorial-first/src/main/java/akka/tutorial/first/java/Pi.java index 2bf9a3bc73..dd026619c8 100644 --- a/akka-tutorials/akka-tutorial-first/src/main/java/akka/tutorial/first/java/Pi.java +++ b/akka-tutorials/akka-tutorial-first/src/main/java/akka/tutorial/first/java/Pi.java @@ -11,11 +11,12 @@ import static java.util.Arrays.asList; import akka.actor.ActorRef; import akka.actor.UntypedActor; import akka.actor.UntypedActorFactory; -import akka.routing.CyclicIterator; -import akka.routing.InfiniteIterator; +import akka.routing.RouterType; +import akka.routing.Routing; import akka.routing.Routing.Broadcast; -import akka.routing.UntypedLoadBalancer; +import scala.collection.JavaConversions; +import java.util.LinkedList; import java.util.concurrent.CountDownLatch; public class Pi { @@ -96,35 +97,18 @@ public class Pi { private ActorRef router; - static class PiRouter extends UntypedLoadBalancer { - private final InfiniteIterator workers; - - public PiRouter(ActorRef[] workers) { - this.workers = new CyclicIterator(asList(workers)); - } - - public InfiniteIterator seq() { - return workers; - } - } - - public Master(int nrOfWorkers, int nrOfMessages, int nrOfElements, CountDownLatch latch) { + public Master(int nrOfWorkers, int nrOfMessages, int nrOfElements, CountDownLatch latch) { this.nrOfMessages = nrOfMessages; this.nrOfElements = nrOfElements; this.latch = latch; - // create the workers - final ActorRef[] workers = new ActorRef[nrOfWorkers]; + LinkedList workers = new LinkedList(); for (int i = 0; i < nrOfWorkers; i++) { - workers[i] = actorOf(Worker.class, "worker").start(); + ActorRef worker = actorOf(Worker.class, "worker").start(); + workers.add(worker); } - // wrap them with a load-balancing router - router = actorOf(new UntypedActorFactory() { - public UntypedActor create() { - return new PiRouter(workers); - } - }, "router").start(); + router = Routing.newRoundRobinActorRef("pi", JavaConversions.asIterable(workers)); } // message handler diff --git a/akka-tutorials/akka-tutorial-first/src/main/scala/Pi.scala b/akka-tutorials/akka-tutorial-first/src/main/scala/Pi.scala index ff09cf88ff..fee4691a3e 100644 --- a/akka-tutorials/akka-tutorial-first/src/main/scala/Pi.scala +++ b/akka-tutorials/akka-tutorial-first/src/main/scala/Pi.scala @@ -6,11 +6,9 @@ package akka.tutorial.first.scala import akka.actor.{Actor, PoisonPill} import Actor._ -import akka.routing.{Routing, CyclicIterator} -import Routing._ - -import System.{currentTimeMillis => now} import java.util.concurrent.CountDownLatch +import akka.routing.Routing.Broadcast +import akka.routing.Routing object Pi extends App { @@ -57,7 +55,7 @@ object Pi extends App { val workers = Vector.fill(nrOfWorkers)(actorOf[Worker].start()) // wrap them with a load-balancing router - val router = Routing.loadBalancerActor(CyclicIterator(workers)).start() + val router = Routing.newRoundRobinActorRef("pi",workers) // message handler def receive = { diff --git a/akka-tutorials/akka-tutorial-second/src/main/java/akka/tutorial/java/second/Pi.java b/akka-tutorials/akka-tutorial-second/src/main/java/akka/tutorial/java/second/Pi.java index 91c7829278..c5da4f46cb 100644 --- a/akka-tutorials/akka-tutorial-second/src/main/java/akka/tutorial/java/second/Pi.java +++ b/akka-tutorials/akka-tutorial-second/src/main/java/akka/tutorial/java/second/Pi.java @@ -8,6 +8,8 @@ import static akka.actor.Actors.actorOf; import static akka.actor.Actors.poisonPill; import static java.lang.System.currentTimeMillis; import static java.util.Arrays.asList; + +import akka.routing.Routing; import scala.Option; import akka.actor.ActorRef; import akka.actor.Channel; @@ -15,10 +17,10 @@ import akka.actor.UntypedActor; import akka.actor.UntypedActorFactory; import akka.dispatch.Future; import akka.japi.Procedure; -import akka.routing.CyclicIterator; -import akka.routing.InfiniteIterator; import akka.routing.Routing.Broadcast; -import akka.routing.UntypedLoadBalancer; +import scala.collection.JavaConversions; + +import java.util.LinkedList; public class Pi { @@ -90,34 +92,17 @@ public class Pi { private ActorRef router; - static class PiRouter extends UntypedLoadBalancer { - private final InfiniteIterator workers; - - public PiRouter(ActorRef[] workers) { - this.workers = new CyclicIterator(asList(workers)); - } - - public InfiniteIterator seq() { - return workers; - } - } - public Master(int nrOfWorkers, int nrOfMessages, int nrOfElements) { this.nrOfMessages = nrOfMessages; this.nrOfElements = nrOfElements; - // create the workers - final ActorRef[] workers = new ActorRef[nrOfWorkers]; + LinkedList workers = new LinkedList(); for (int i = 0; i < nrOfWorkers; i++) { - workers[i] = actorOf(Worker.class, "worker").start(); + ActorRef worker = actorOf(Worker.class, "worker").start(); + workers.add(worker); } - // wrap them with a load-balancing router - router = actorOf(new UntypedActorFactory() { - public UntypedActor create() { - return new PiRouter(workers); - } - }, "router").start(); + router = Routing.newRoundRobinActorRef("pi", JavaConversions.asIterable(workers)); } @Override diff --git a/akka-tutorials/akka-tutorial-second/src/main/scala/Pi.scala b/akka-tutorials/akka-tutorial-second/src/main/scala/Pi.scala index 46f59cdabf..5f6ee8f0c7 100644 --- a/akka-tutorials/akka-tutorial-second/src/main/scala/Pi.scala +++ b/akka-tutorials/akka-tutorial-second/src/main/scala/Pi.scala @@ -5,13 +5,12 @@ package akka.tutorial.second import akka.actor.Actor._ -import akka.routing.{Routing, CyclicIterator} -import Routing._ +import akka.routing.Routing import akka.event.EventHandler import akka.actor.{Channel, Actor, PoisonPill} -import akka.dispatch.Future import System.{currentTimeMillis => now} +import akka.routing.Routing.Broadcast object Pi extends App { @@ -55,7 +54,7 @@ object Pi extends App { val workers = Vector.fill(nrOfWorkers)(actorOf[Worker].start()) // wrap them with a load-balancing router - val router = Routing.loadBalancerActor(CyclicIterator(workers)).start() + val router = Routing.newRoundRobinActorRef("pi",workers) // phase 1, can accept a Calculate message def scatter: Receive = {