From ca3deb4007aacb3ff0d2c027d91a7c102fc623bf Mon Sep 17 00:00:00 2001 From: Roland Date: Thu, 9 Feb 2012 16:59:16 +0100 Subject: [PATCH 1/3] now that was a nice journey (related to #1804) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - first, fix quite some data races in RoutedActorRef wrt. the contained ActorCell’s childrenRef field (which is not even @volatile) - then notice that there still are double-deregistrations happening in the dispatcher - coming finally to the conclusion that the Mailbox should not really process all system messages in processAllSystemMessages(): we should really really stop after having closed the mailbox ;-) - added simple test case which stops self twice to keep this fixed --- .../akka/actor/dispatch/ActorModelSpec.scala | 25 +++- .../main/scala/akka/dispatch/Mailbox.scala | 2 +- .../src/main/scala/akka/routing/Routing.scala | 124 +++++++++++++----- 3 files changed, 110 insertions(+), 41 deletions(-) diff --git a/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala index 8c949f8776..46bf609c7a 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala @@ -50,6 +50,8 @@ object ActorModelSpec { case object Restart extends ActorModelMessage + case object DoubleStop extends ActorModelMessage + case class ThrowException(e: Throwable) extends ActorModelMessage val Ping = "Ping" @@ -86,6 +88,7 @@ object ActorModelSpec { case Restart ⇒ ack; busy.switchOff(); throw new Exception("Restart requested") case Interrupt ⇒ ack; sender ! Status.Failure(new ActorInterruptedException(new InterruptedException("Ping!"))); busy.switchOff(); throw new InterruptedException("Ping!") case ThrowException(e: Throwable) ⇒ ack; busy.switchOff(); throw e + case DoubleStop ⇒ ack; context.stop(self); context.stop(self); busy.switchOff } } @@ -190,13 +193,13 @@ object ActorModelSpec { } def assertRef(actorRef: ActorRef, dispatcher: MessageDispatcher = null)( - suspensions: Long = statsFor(actorRef).suspensions.get(), - resumes: Long = statsFor(actorRef).resumes.get(), - registers: Long = statsFor(actorRef).registers.get(), - unregisters: Long = statsFor(actorRef).unregisters.get(), - msgsReceived: Long = statsFor(actorRef).msgsReceived.get(), - msgsProcessed: Long = statsFor(actorRef).msgsProcessed.get(), - restarts: Long = statsFor(actorRef).restarts.get())(implicit system: ActorSystem) { + suspensions: Long = statsFor(actorRef, dispatcher).suspensions.get(), + resumes: Long = statsFor(actorRef, dispatcher).resumes.get(), + registers: Long = statsFor(actorRef, dispatcher).registers.get(), + unregisters: Long = statsFor(actorRef, dispatcher).unregisters.get(), + msgsReceived: Long = statsFor(actorRef, dispatcher).msgsReceived.get(), + msgsProcessed: Long = statsFor(actorRef, dispatcher).msgsProcessed.get(), + restarts: Long = statsFor(actorRef, dispatcher).restarts.get())(implicit system: ActorSystem) { val stats = statsFor(actorRef, Option(dispatcher).getOrElse(actorRef.asInstanceOf[LocalActorRef].underlying.dispatcher)) val deadline = System.currentTimeMillis + 1000 try { @@ -426,6 +429,14 @@ abstract class ActorModelSpec(config: String) extends AkkaSpec(config) with Defa assert(f5.value.isEmpty) } } + + "not double-deregister" in { + implicit val dispatcher = interceptedDispatcher() + val a = newTestActor(dispatcher.id) + a ! DoubleStop + awaitCond(statsFor(a, dispatcher).registers.get == 1) + awaitCond(statsFor(a, dispatcher).unregisters.get == 1) + } } } diff --git a/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala b/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala index 27853b49db..cc15ae2173 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala @@ -189,7 +189,7 @@ private[akka] abstract class Mailbox(val actor: ActorCell) extends MessageQueue final def processAllSystemMessages() { var nextMessage = systemDrain() try { - while (nextMessage ne null) { + while ((nextMessage ne null) && !isClosed) { if (debug) println(actor.self + " processing system message " + nextMessage + " with children " + actor.childrenRefs) actor systemInvoke nextMessage nextMessage = nextMessage.next diff --git a/akka-actor/src/main/scala/akka/routing/Routing.scala b/akka-actor/src/main/scala/akka/routing/Routing.scala index c200be0871..c0bd0df251 100644 --- a/akka-actor/src/main/scala/akka/routing/Routing.scala +++ b/akka-actor/src/main/scala/akka/routing/Routing.scala @@ -16,6 +16,7 @@ import scala.collection.JavaConversions.iterableAsScalaIterable import java.util.concurrent.atomic.{ AtomicLong, AtomicBoolean } import java.util.concurrent.TimeUnit +import java.util.concurrent.locks.ReentrantLock import akka.jsr166y.ThreadLocalRandom @@ -30,14 +31,77 @@ private[akka] class RoutedActorRef(_system: ActorSystemImpl, _props: Props, _sup _supervisor, _path) { - private val routeeProps = _props.copy(routerConfig = NoRouter) - private val resizeProgress = new AtomicBoolean + /* + * CAUTION: RoutedActorRef is PROBLEMATIC + * ====================================== + * + * We are constructing/assembling the children outside of the scope of the + * Router actor, inserting them in its childrenRef list, which is not at all + * synchronized. This is done exactly once at start-up, all other accesses + * are done from the Router actor. This means that the only thing which is + * really hairy is making sure that the Router does not touch its childrenRefs + * before we are done with them: create a locked latch really early (hence the + * override of newActorCell) and use that to block the Router constructor for + * as long as it takes to setup the RoutedActorRef itself. + */ + private[akka] var routeReady: ReentrantLock = _ + override def newActorCell( + system: ActorSystemImpl, + ref: InternalActorRef, + props: Props, + supervisor: InternalActorRef, + receiveTimeout: Option[Duration]): ActorCell = { + /* + * TODO RK: check that this really sticks, since this is executed before + * the constructor of RoutedActorRef is executed (invoked from + * LocalActorRef); works on HotSpot and JRockit. + */ + routeReady = new ReentrantLock + routeReady.lock() + super.newActorCell(system, ref, props, supervisor, receiveTimeout) + } + + private[akka] val routerConfig = _props.routerConfig + private[akka] val routeeProps = _props.copy(routerConfig = NoRouter) + private[akka] val resizeProgress = new AtomicBoolean private val resizeCounter = new AtomicLong @volatile private var _routees: IndexedSeq[ActorRef] = IndexedSeq.empty[ActorRef] // this MUST be initialized during createRoute def routees = _routees + private[akka] var routeeProvider: RouteeProvider = _ + val route = + try { + routeeProvider = routerConfig.createRouteeProvider(actorContext) + val r = routerConfig.createRoute(routeeProps, routeeProvider) + // initial resize, before message send + resize() + r + } finally routeReady.unlock() // unblock Router’s constructor + + if (routerConfig.resizer.isEmpty && _routees.isEmpty) + throw new ActorInitializationException("router " + routerConfig + " did not register routees!") + + _routees match { + case x ⇒ _routees = x // volatile write to publish the route before sending messages + } + + /* + * end of construction + */ + + def applyRoute(sender: ActorRef, message: Any): Iterable[Destination] = message match { + case _: AutoReceivedMessage ⇒ Nil + case Terminated(_) ⇒ Nil + case CurrentRoutees ⇒ + sender ! RouterRoutees(_routees) + Nil + case _ ⇒ + if (route.isDefinedAt(sender, message)) route(sender, message) + else Nil + } + /** * Adds the routees to existing routees. * Adds death watch of the routees so that they are removed when terminated. @@ -61,29 +125,6 @@ private[akka] class RoutedActorRef(_system: ActorSystemImpl, _props: Props, _sup abandonedRoutees foreach underlying.unwatch } - private val routeeProvider = _props.routerConfig.createRouteeProvider(actorContext) - val route = _props.routerConfig.createRoute(routeeProps, routeeProvider) - // initial resize, before message send - resize() - - def applyRoute(sender: ActorRef, message: Any): Iterable[Destination] = message match { - case _: AutoReceivedMessage ⇒ Nil - case Terminated(_) ⇒ Nil - case CurrentRoutees ⇒ - sender ! RouterRoutees(_routees) - Nil - case _ ⇒ - if (route.isDefinedAt(sender, message)) route(sender, message) - else Nil - } - - if (_props.routerConfig.resizer.isEmpty && _routees.isEmpty) - throw new ActorInitializationException("router " + _props.routerConfig + " did not register routees!") - - _routees match { - case x ⇒ _routees = x // volatile write to publish the route before sending messages - } - override def !(message: Any)(implicit sender: ActorRef = null): Unit = { resize() @@ -101,14 +142,9 @@ private[akka] class RoutedActorRef(_system: ActorSystemImpl, _props: Props, _sup } def resize() { - for (r ← _props.routerConfig.resizer) { - if (r.isTimeForResize(resizeCounter.getAndIncrement()) && resizeProgress.compareAndSet(false, true)) { - try { - r.resize(routeeProps, routeeProvider) - } finally { - resizeProgress.set(false) - } - } + for (r ← routerConfig.resizer) { + if (r.isTimeForResize(resizeCounter.getAndIncrement()) && resizeProgress.compareAndSet(false, true)) + super.!(Router.Resize) } } } @@ -251,8 +287,18 @@ trait Router extends Actor { case _ ⇒ throw new ActorInitializationException("Router actor can only be used in RoutedActorRef") } + // make sure that we synchronize properly to get the childrenRefs into our CPU cache + ref.routeReady.lock() + try if (context.children.isEmpty) + throw new ActorInitializationException("RouterConfig did not create any children") + finally ref.routeReady.unlock() + final def receive = ({ + case Router.Resize ⇒ + try ref.routerConfig.resizer foreach (_.resize(ref.routeeProps, ref.routeeProvider)) + finally ref.resizeProgress.set(false) + case Terminated(child) ⇒ ref.removeRoutees(IndexedSeq(child)) if (ref.routees.isEmpty) context.stop(self) @@ -264,6 +310,10 @@ trait Router extends Actor { } } +object Router { + case object Resize +} + /** * Used to broadcast a message to all connections in a router; only the * contained message will be forwarded, i.e. the `Broadcast(...)` @@ -795,14 +845,22 @@ trait Resizer { * for the initial resize and continues with 1 for the first message. Make sure to perform * initial resize before first message (messageCounter == 0), because there is no guarantee * that resize will be done when concurrent messages are in play. + * + * CAUTION: this method is invoked from the thread which tries to send a + * message to the pool, i.e. the ActorRef.!() method, hence it may be called + * concurrently. */ def isTimeForResize(messageCounter: Long): Boolean + /** * Decide if the capacity of the router need to be changed. Will be invoked when `isTimeForResize` * returns true and no other resize is in progress. * Create and register more routees with `routeeProvider.registerRoutees(newRoutees) * or remove routees with `routeeProvider.unregisterRoutees(abandonedRoutees)` and * sending [[akka.actor.PoisonPill]] to them. + * + * This method is invoked only in the context of the Router actor in order to safely + * create/stop children. */ def resize(props: Props, routeeProvider: RouteeProvider) } From bbe221e8128dde6ddd981569f13c15f9e1f66ca5 Mon Sep 17 00:00:00 2001 From: Roland Date: Thu, 9 Feb 2012 22:58:28 +0100 Subject: [PATCH 2/3] =?UTF-8?q?incorporate=20Viktor=E2=80=99s=20feedback?= =?UTF-8?q?=20and=20fix=20some=20stuff?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - initial resize should be done directly - must not require children unconditionally in Router constructor - ResizerSpec changed timing due to asynchronous resizing, one test disabled - removed pointless volatile write in RouterActorRef --- .../test/scala/akka/routing/ResizerSpec.scala | 18 +++-- .../src/main/scala/akka/dispatch/Future.scala | 6 +- .../src/main/scala/akka/routing/Routing.scala | 79 ++++++++----------- 3 files changed, 52 insertions(+), 51 deletions(-) diff --git a/akka-actor-tests/src/test/scala/akka/routing/ResizerSpec.scala b/akka-actor-tests/src/test/scala/akka/routing/ResizerSpec.scala index 863922491b..1d271a0959 100644 --- a/akka-actor-tests/src/test/scala/akka/routing/ResizerSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/routing/ResizerSpec.scala @@ -123,7 +123,15 @@ class ResizerSpec extends AkkaSpec(ResizerSpec.config) with DefaultTimeout with current.routees.size must be(2) } - "resize when busy" in { + /* + * TODO RK This test seems invalid to me, because it relies on that no resize() event is lost; + * this currently fails because I made resize() asynchronous (by sending a message to the + * Router), but it could also fail for concurrent send operations, i.e. when one of thread + * fails the resizeInProgress.compareAndSet(false, true) check. + * + * Either the test must be fixed/removed or resize() must be changed to be blocking. + */ + "resize when busy" ignore { val busy = new TestLatch(1) @@ -179,10 +187,10 @@ class ResizerSpec extends AkkaSpec(ResizerSpec.config) with DefaultTimeout with Await.result(router ? CurrentRoutees, 5 seconds).asInstanceOf[RouterRoutees].routees.size must be(2) def loop(loops: Int, t: Int, latch: TestLatch, count: AtomicInteger) = { - (10 millis).dilated.sleep + (100 millis).dilated.sleep for (m ← 0 until loops) { router.!((t, latch, count)) - (10 millis).dilated.sleep + (100 millis).dilated.sleep } } @@ -198,7 +206,7 @@ class ResizerSpec extends AkkaSpec(ResizerSpec.config) with DefaultTimeout with // a whole bunch should max it out val count2 = new AtomicInteger val latch2 = TestLatch(10) - loop(10, 200, latch2, count2) + loop(10, 500, latch2, count2) Await.ready(latch2, TestLatch.DefaultTimeout) count2.get must be(10) @@ -238,7 +246,7 @@ class ResizerSpec extends AkkaSpec(ResizerSpec.config) with DefaultTimeout with // let it cool down for (m ← 0 to 3) { router ! 1 - (200 millis).dilated.sleep + (500 millis).dilated.sleep } Await.result(router ? CurrentRoutees, 5 seconds).asInstanceOf[RouterRoutees].routees.size must be < (z) diff --git a/akka-actor/src/main/scala/akka/dispatch/Future.scala b/akka-actor/src/main/scala/akka/dispatch/Future.scala index 97ff17c075..f995642acd 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Future.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Future.scala @@ -20,6 +20,7 @@ import akka.event.Logging.Debug import java.util.concurrent.TimeUnit.NANOSECONDS import java.util.concurrent.{ ExecutionException, Callable, TimeoutException } import java.util.concurrent.atomic.{ AtomicInteger, AtomicReferenceFieldUpdater } +import akka.pattern.AskTimeoutException object Await { @@ -795,8 +796,9 @@ class DefaultPromise[T](implicit val executor: ExecutionContext) extends Abstrac def result(atMost: Duration)(implicit permit: CanAwait): T = ready(atMost).value.get match { - case Left(e) ⇒ throw e - case Right(r) ⇒ r + case Left(e: AskTimeoutException) ⇒ throw new AskTimeoutException(e.getMessage, e) + case Left(e) ⇒ throw e + case Right(r) ⇒ r } def value: Option[Either[Throwable, T]] = getState match { diff --git a/akka-actor/src/main/scala/akka/routing/Routing.scala b/akka-actor/src/main/scala/akka/routing/Routing.scala index c0bd0df251..badfe9bfcc 100644 --- a/akka-actor/src/main/scala/akka/routing/Routing.scala +++ b/akka-actor/src/main/scala/akka/routing/Routing.scala @@ -9,16 +9,13 @@ import akka.util.duration._ import akka.config.ConfigurationException import akka.pattern.pipe import akka.pattern.AskSupport - import com.typesafe.config.Config - import scala.collection.JavaConversions.iterableAsScalaIterable - import java.util.concurrent.atomic.{ AtomicLong, AtomicBoolean } import java.util.concurrent.TimeUnit import java.util.concurrent.locks.ReentrantLock - import akka.jsr166y.ThreadLocalRandom +import akka.util.Unsafe /** * A RoutedActorRef is an ActorRef that has a set of connected ActorRef and it uses a Router to @@ -40,60 +37,57 @@ private[akka] class RoutedActorRef(_system: ActorSystemImpl, _props: Props, _sup * synchronized. This is done exactly once at start-up, all other accesses * are done from the Router actor. This means that the only thing which is * really hairy is making sure that the Router does not touch its childrenRefs - * before we are done with them: create a locked latch really early (hence the + * before we are done with them: lock the monitor of the actor cell (hence the * override of newActorCell) and use that to block the Router constructor for * as long as it takes to setup the RoutedActorRef itself. */ - private[akka] var routeReady: ReentrantLock = _ override def newActorCell( system: ActorSystemImpl, ref: InternalActorRef, props: Props, supervisor: InternalActorRef, - receiveTimeout: Option[Duration]): ActorCell = { - /* - * TODO RK: check that this really sticks, since this is executed before - * the constructor of RoutedActorRef is executed (invoked from - * LocalActorRef); works on HotSpot and JRockit. - */ - routeReady = new ReentrantLock - routeReady.lock() - super.newActorCell(system, ref, props, supervisor, receiveTimeout) - } + receiveTimeout: Option[Duration]): ActorCell = + { + val cell = super.newActorCell(system, ref, props, supervisor, receiveTimeout) + Unsafe.instance.monitorEnter(cell) + cell + } private[akka] val routerConfig = _props.routerConfig private[akka] val routeeProps = _props.copy(routerConfig = NoRouter) - private[akka] val resizeProgress = new AtomicBoolean + private[akka] val resizeInProgress = new AtomicBoolean private val resizeCounter = new AtomicLong @volatile private var _routees: IndexedSeq[ActorRef] = IndexedSeq.empty[ActorRef] // this MUST be initialized during createRoute def routees = _routees - private[akka] var routeeProvider: RouteeProvider = _ + @volatile + private var _routeeProvider: RouteeProvider = _ + def routeeProvider = _routeeProvider + val route = try { - routeeProvider = routerConfig.createRouteeProvider(actorContext) + _routeeProvider = routerConfig.createRouteeProvider(actorContext) val r = routerConfig.createRoute(routeeProps, routeeProvider) // initial resize, before message send - resize() + routerConfig.resizer foreach { r ⇒ + if (r.isTimeForResize(resizeCounter.getAndIncrement())) + r.resize(routeeProps, routeeProvider) + } r - } finally routeReady.unlock() // unblock Router’s constructor + } finally Unsafe.instance.monitorExit(actorContext) // unblock Router’s constructor if (routerConfig.resizer.isEmpty && _routees.isEmpty) throw new ActorInitializationException("router " + routerConfig + " did not register routees!") - _routees match { - case x ⇒ _routees = x // volatile write to publish the route before sending messages - } - /* * end of construction */ def applyRoute(sender: ActorRef, message: Any): Iterable[Destination] = message match { - case _: AutoReceivedMessage ⇒ Nil - case Terminated(_) ⇒ Nil + case _: AutoReceivedMessage ⇒ Destination(this, this) :: Nil + case Terminated(_) ⇒ Destination(this, this) :: Nil case CurrentRoutees ⇒ sender ! RouterRoutees(_routees) Nil @@ -108,7 +102,7 @@ private[akka] class RoutedActorRef(_system: ActorSystemImpl, _props: Props, _sup * Not thread safe, but intended to be called from protected points, such as * `RouterConfig.createRoute` and `Resizer.resize` */ - private[akka] def addRoutees(newRoutees: IndexedSeq[ActorRef]) { + private[akka] def addRoutees(newRoutees: IndexedSeq[ActorRef]): Unit = { _routees = _routees ++ newRoutees // subscribe to Terminated messages for all route destinations, to be handled by Router actor newRoutees foreach underlying.watch @@ -120,7 +114,7 @@ private[akka] class RoutedActorRef(_system: ActorSystemImpl, _props: Props, _sup * Not thread safe, but intended to be called from protected points, such as * `Resizer.resize` */ - private[akka] def removeRoutees(abandonedRoutees: IndexedSeq[ActorRef]) { + private[akka] def removeRoutees(abandonedRoutees: IndexedSeq[ActorRef]): Unit = { _routees = _routees diff abandonedRoutees abandonedRoutees foreach underlying.unwatch } @@ -136,14 +130,14 @@ private[akka] class RoutedActorRef(_system: ActorSystemImpl, _props: Props, _sup } applyRoute(s, message) match { - case Nil ⇒ super.!(message)(s) - case refs ⇒ refs foreach (p ⇒ p.recipient.!(msg)(p.sender)) + case Destination(_, x) :: Nil if x eq this ⇒ super.!(message)(s) + case refs ⇒ refs foreach (p ⇒ p.recipient.!(msg)(p.sender)) } } - def resize() { + def resize(): Unit = { for (r ← routerConfig.resizer) { - if (r.isTimeForResize(resizeCounter.getAndIncrement()) && resizeProgress.compareAndSet(false, true)) + if (r.isTimeForResize(resizeCounter.getAndIncrement()) && resizeInProgress.compareAndSet(false, true)) super.!(Router.Resize) } } @@ -282,22 +276,19 @@ trait CustomRoute { */ trait Router extends Actor { - val ref = self match { - case x: RoutedActorRef ⇒ x - case _ ⇒ throw new ActorInitializationException("Router actor can only be used in RoutedActorRef") - } - // make sure that we synchronize properly to get the childrenRefs into our CPU cache - ref.routeReady.lock() - try if (context.children.isEmpty) - throw new ActorInitializationException("RouterConfig did not create any children") - finally ref.routeReady.unlock() + val ref = context.synchronized { + self match { + case x: RoutedActorRef ⇒ x + case _ ⇒ throw new ActorInitializationException("Router actor can only be used in RoutedActorRef") + } + } final def receive = ({ case Router.Resize ⇒ try ref.routerConfig.resizer foreach (_.resize(ref.routeeProps, ref.routeeProvider)) - finally ref.resizeProgress.set(false) + finally assert(ref.resizeInProgress.getAndSet(false)) case Terminated(child) ⇒ ref.removeRoutees(IndexedSeq(child)) @@ -310,7 +301,7 @@ trait Router extends Actor { } } -object Router { +private object Router { case object Resize } From 36247b10fe79c8aa6ec878eaf4946c2cd1b65084 Mon Sep 17 00:00:00 2001 From: Roland Date: Fri, 10 Feb 2012 14:13:40 +0100 Subject: [PATCH 3/3] fix some more comments and make Router dispatcher configurable MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - assert locking balance when using Unsafe.instance.monitorExit - add RouterConfig.routerDispatcher - re-enable “busy” resizer test after switching to BalancingDispatcher - document resizer asynchronicity and how to configure dispatchers --- .../test/scala/akka/routing/ResizerSpec.scala | 15 ++--- .../test/scala/akka/routing/RoutingSpec.scala | 4 ++ .../src/main/scala/akka/dispatch/Future.scala | 2 +- .../src/main/scala/akka/routing/Routing.scala | 59 ++++++++++++++++--- .../jrouting/CustomRouterDocTestBase.java | 18 ++++++ akka-docs/java/routing.rst | 35 +++++++++-- .../akka/docs/routing/RouterDocSpec.scala | 29 +++++++++ akka-docs/scala/routing.rst | 35 +++++++++-- .../akka/routing/RemoteRouterConfig.scala | 2 + 9 files changed, 170 insertions(+), 29 deletions(-) create mode 100644 akka-docs/scala/code/akka/docs/routing/RouterDocSpec.scala diff --git a/akka-actor-tests/src/test/scala/akka/routing/ResizerSpec.scala b/akka-actor-tests/src/test/scala/akka/routing/ResizerSpec.scala index 1d271a0959..2130afe107 100644 --- a/akka-actor-tests/src/test/scala/akka/routing/ResizerSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/routing/ResizerSpec.scala @@ -24,6 +24,9 @@ object ResizerSpec { } } } + bal-disp { + type = BalancingDispatcher + } """ class TestActor extends Actor { @@ -123,15 +126,7 @@ class ResizerSpec extends AkkaSpec(ResizerSpec.config) with DefaultTimeout with current.routees.size must be(2) } - /* - * TODO RK This test seems invalid to me, because it relies on that no resize() event is lost; - * this currently fails because I made resize() asynchronous (by sending a message to the - * Router), but it could also fail for concurrent send operations, i.e. when one of thread - * fails the resizeInProgress.compareAndSet(false, true) check. - * - * Either the test must be fixed/removed or resize() must be changed to be blocking. - */ - "resize when busy" ignore { + "resize when busy" in { val busy = new TestLatch(1) @@ -141,7 +136,7 @@ class ResizerSpec extends AkkaSpec(ResizerSpec.config) with DefaultTimeout with pressureThreshold = 0, messagesPerResize = 1) - val router = system.actorOf(Props[BusyActor].withRouter(RoundRobinRouter(resizer = Some(resizer)))) + val router = system.actorOf(Props[BusyActor].withRouter(RoundRobinRouter(resizer = Some(resizer))).withDispatcher("bal-disp")) val latch1 = new TestLatch(1) router ! (latch1, busy) diff --git a/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala b/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala index 9de51bdabf..ad3702d556 100644 --- a/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala @@ -15,6 +15,7 @@ import com.typesafe.config.ConfigFactory import akka.pattern.ask import java.util.concurrent.ConcurrentHashMap import com.typesafe.config.Config +import akka.dispatch.Dispatchers object RoutingSpec { @@ -51,6 +52,7 @@ object RoutingSpec { case (sender, message) ⇒ Nil } } + def routerDispatcher: String = Dispatchers.DefaultDispatcherId } } @@ -539,6 +541,8 @@ class RoutingSpec extends AkkaSpec(RoutingSpec.config) with DefaultTimeout with //#crRouter case class VoteCountRouter() extends RouterConfig { + def routerDispatcher: String = Dispatchers.DefaultDispatcherId + //#crRoute def createRoute(routeeProps: Props, routeeProvider: RouteeProvider): Route = { val democratActor = routeeProvider.context.actorOf(Props(new DemocratActor()), "d") diff --git a/akka-actor/src/main/scala/akka/dispatch/Future.scala b/akka-actor/src/main/scala/akka/dispatch/Future.scala index f995642acd..8be901fb03 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Future.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Future.scala @@ -796,7 +796,7 @@ class DefaultPromise[T](implicit val executor: ExecutionContext) extends Abstrac def result(atMost: Duration)(implicit permit: CanAwait): T = ready(atMost).value.get match { - case Left(e: AskTimeoutException) ⇒ throw new AskTimeoutException(e.getMessage, e) + case Left(e: AskTimeoutException) ⇒ throw new AskTimeoutException(e.getMessage, e) // to get meaningful stack trace case Left(e) ⇒ throw e case Right(r) ⇒ r } diff --git a/akka-actor/src/main/scala/akka/routing/Routing.scala b/akka-actor/src/main/scala/akka/routing/Routing.scala index badfe9bfcc..f83bca2db9 100644 --- a/akka-actor/src/main/scala/akka/routing/Routing.scala +++ b/akka-actor/src/main/scala/akka/routing/Routing.scala @@ -16,6 +16,7 @@ import java.util.concurrent.TimeUnit import java.util.concurrent.locks.ReentrantLock import akka.jsr166y.ThreadLocalRandom import akka.util.Unsafe +import akka.dispatch.Dispatchers /** * A RoutedActorRef is an ActorRef that has a set of connected ActorRef and it uses a Router to @@ -24,7 +25,7 @@ import akka.util.Unsafe private[akka] class RoutedActorRef(_system: ActorSystemImpl, _props: Props, _supervisor: InternalActorRef, _path: ActorPath) extends LocalActorRef( _system, - _props.copy(creator = () ⇒ _props.routerConfig.createActor()), + _props.copy(creator = () ⇒ _props.routerConfig.createActor(), dispatcher = _props.routerConfig.routerDispatcher), _supervisor, _path) { @@ -76,7 +77,10 @@ private[akka] class RoutedActorRef(_system: ActorSystemImpl, _props: Props, _sup r.resize(routeeProps, routeeProvider) } r - } finally Unsafe.instance.monitorExit(actorContext) // unblock Router’s constructor + } finally { + assert(Thread.holdsLock(actorContext)) + Unsafe.instance.monitorExit(actorContext) // unblock Router’s constructor + } if (routerConfig.resizer.isEmpty && _routees.isEmpty) throw new ActorInitializationException("router " + routerConfig + " did not register routees!") @@ -169,6 +173,11 @@ trait RouterConfig { def createActor(): Router = new Router {} + /** + * Dispatcher ID to use for running the “head” actor, i.e. the [[akka.routing.Router]]. + */ + def routerDispatcher: String + /** * Overridable merge strategy, by default completely prefers “this” (i.e. no merge). */ @@ -343,6 +352,7 @@ case class Destination(sender: ActorRef, recipient: ActorRef) //TODO add @SerialVersionUID(1L) when SI-4804 is fixed case object NoRouter extends RouterConfig { def createRoute(props: Props, routeeProvider: RouteeProvider): Route = null + def routerDispatcher: String = "" override def withFallback(other: RouterConfig): RouterConfig = other } @@ -352,13 +362,17 @@ case object NoRouter extends RouterConfig { case object FromConfig extends RouterConfig { def createRoute(props: Props, routeeProvider: RouteeProvider): Route = throw new ConfigurationException("router " + routeeProvider.context.self + " needs external configuration from file (e.g. application.conf)") + def routerDispatcher: String = Dispatchers.DefaultDispatcherId } /** * Java API: Router configuration which has no default, i.e. external configuration is required. + * + * This can be used when the dispatcher to be used for the head Router needs to be configured + * (defaults to default-dispatcher). */ //TODO add @SerialVersionUID(1L) when SI-4804 is fixed -case class FromConfig() extends RouterConfig { +case class FromConfig(val routerDispatcher: String = Dispatchers.DefaultDispatcherId) extends RouterConfig { def createRoute(props: Props, routeeProvider: RouteeProvider): Route = throw new ConfigurationException("router " + routeeProvider.context.self + " needs external configuration from file (e.g. application.conf)") } @@ -389,7 +403,8 @@ object RoundRobinRouter { * using `actorFor` in [[akka.actor.ActorRefProvider]] */ //TODO add @SerialVersionUID(1L) when SI-4804 is fixed -case class RoundRobinRouter(nrOfInstances: Int = 0, routees: Iterable[String] = Nil, override val resizer: Option[Resizer] = None) +case class RoundRobinRouter(nrOfInstances: Int = 0, routees: Iterable[String] = Nil, override val resizer: Option[Resizer] = None, + val routerDispatcher: String = Dispatchers.DefaultDispatcherId) extends RouterConfig with RoundRobinLike { /** @@ -415,6 +430,11 @@ case class RoundRobinRouter(nrOfInstances: Int = 0, routees: Iterable[String] = * Java API */ def this(resizer: Resizer) = this(resizer = Some(resizer)) + + /** + * Java API for setting routerDispatcher + */ + def withDispatcher(dispatcherId: String) = copy(routerDispatcher = dispatcherId) } trait RoundRobinLike { this: RouterConfig ⇒ @@ -469,7 +489,8 @@ object RandomRouter { * using `actorFor` in [[akka.actor.ActorRefProvider]] */ //TODO add @SerialVersionUID(1L) when SI-4804 is fixed -case class RandomRouter(nrOfInstances: Int = 0, routees: Iterable[String] = Nil, override val resizer: Option[Resizer] = None) +case class RandomRouter(nrOfInstances: Int = 0, routees: Iterable[String] = Nil, override val resizer: Option[Resizer] = None, + val routerDispatcher: String = Dispatchers.DefaultDispatcherId) extends RouterConfig with RandomLike { /** @@ -495,6 +516,11 @@ case class RandomRouter(nrOfInstances: Int = 0, routees: Iterable[String] = Nil, * Java API */ def this(resizer: Resizer) = this(resizer = Some(resizer)) + + /** + * Java API for setting routerDispatcher + */ + def withDispatcher(dispatcherId: String) = copy(routerDispatcher = dispatcherId) } trait RandomLike { this: RouterConfig ⇒ @@ -555,7 +581,8 @@ object SmallestMailboxRouter { * using `actorFor` in [[akka.actor.ActorRefProvider]] */ //TODO add @SerialVersionUID(1L) when SI-4804 is fixed -case class SmallestMailboxRouter(nrOfInstances: Int = 0, routees: Iterable[String] = Nil, override val resizer: Option[Resizer] = None) +case class SmallestMailboxRouter(nrOfInstances: Int = 0, routees: Iterable[String] = Nil, override val resizer: Option[Resizer] = None, + val routerDispatcher: String = Dispatchers.DefaultDispatcherId) extends RouterConfig with SmallestMailboxLike { /** @@ -581,6 +608,11 @@ case class SmallestMailboxRouter(nrOfInstances: Int = 0, routees: Iterable[Strin * Java API */ def this(resizer: Resizer) = this(resizer = Some(resizer)) + + /** + * Java API for setting routerDispatcher + */ + def withDispatcher(dispatcherId: String) = copy(routerDispatcher = dispatcherId) } trait SmallestMailboxLike { this: RouterConfig ⇒ @@ -700,7 +732,8 @@ object BroadcastRouter { * using `actorFor` in [[akka.actor.ActorRefProvider]] */ //TODO add @SerialVersionUID(1L) when SI-4804 is fixed -case class BroadcastRouter(nrOfInstances: Int = 0, routees: Iterable[String] = Nil, override val resizer: Option[Resizer] = None) +case class BroadcastRouter(nrOfInstances: Int = 0, routees: Iterable[String] = Nil, override val resizer: Option[Resizer] = None, + val routerDispatcher: String = Dispatchers.DefaultDispatcherId) extends RouterConfig with BroadcastLike { /** @@ -727,6 +760,10 @@ case class BroadcastRouter(nrOfInstances: Int = 0, routees: Iterable[String] = N */ def this(resizer: Resizer) = this(resizer = Some(resizer)) + /** + * Java API for setting routerDispatcher + */ + def withDispatcher(dispatcherId: String) = copy(routerDispatcher = dispatcherId) } trait BroadcastLike { this: RouterConfig ⇒ @@ -773,7 +810,8 @@ object ScatterGatherFirstCompletedRouter { */ //TODO add @SerialVersionUID(1L) when SI-4804 is fixed case class ScatterGatherFirstCompletedRouter(nrOfInstances: Int = 0, routees: Iterable[String] = Nil, within: Duration, - override val resizer: Option[Resizer] = None) + override val resizer: Option[Resizer] = None, + val routerDispatcher: String = Dispatchers.DefaultDispatcherId) extends RouterConfig with ScatterGatherFirstCompletedLike { if (within <= Duration.Zero) throw new IllegalArgumentException( @@ -802,6 +840,11 @@ case class ScatterGatherFirstCompletedRouter(nrOfInstances: Int = 0, routees: It * Java API */ def this(resizer: Resizer, w: Duration) = this(resizer = Some(resizer), within = w) + + /** + * Java API for setting routerDispatcher + */ + def withDispatcher(dispatcherId: String) = copy(routerDispatcher = dispatcherId) } trait ScatterGatherFirstCompletedLike { this: RouterConfig ⇒ diff --git a/akka-docs/java/code/akka/docs/jrouting/CustomRouterDocTestBase.java b/akka-docs/java/code/akka/docs/jrouting/CustomRouterDocTestBase.java index 2a2e5c7f22..a20a351f06 100644 --- a/akka-docs/java/code/akka/docs/jrouting/CustomRouterDocTestBase.java +++ b/akka-docs/java/code/akka/docs/jrouting/CustomRouterDocTestBase.java @@ -17,6 +17,7 @@ import akka.util.Duration; import akka.util.Timeout; import akka.dispatch.Await; import akka.dispatch.Future; +import akka.dispatch.Dispatchers; import akka.testkit.AkkaSpec; import com.typesafe.config.ConfigFactory; import static akka.pattern.Patterns.ask; @@ -38,6 +39,19 @@ public class CustomRouterDocTestBase { public void tearDown() { system.shutdown(); } + + public static class MyActor extends UntypedActor { + @Override public void onReceive(Object o) {} + } + + @Test + public void demonstrateDispatchers() { + //#dispatchers + final ActorRef router = system.actorOf(new Props(MyActor.class) + .withRouter(new RoundRobinRouter(5).withDispatcher("head")) // “head” router runs on "head" dispatcher + .withDispatcher("workers")); // MyActor “workers” run on "workers" dispatcher + //#dispatchers + } //#crTest @Test @@ -105,6 +119,10 @@ public class CustomRouterDocTestBase { //#crRouter public static class VoteCountRouter extends CustomRouterConfig { + + @Override public String routerDispatcher() { + return Dispatchers.DefaultDispatcherId(); + } //#crRoute @Override diff --git a/akka-docs/java/routing.rst b/akka-docs/java/routing.rst index a422900440..265e31a984 100644 --- a/akka-docs/java/routing.rst +++ b/akka-docs/java/routing.rst @@ -8,11 +8,6 @@ Routing (Java) .. contents:: :local: -Akka-core includes some building blocks to build more complex message flow handlers, they are listed and explained below: - -Router ------- - A Router is an actor that routes incoming messages to outbound actors. The router routes the messages sent to it to its underlying actors called 'routees'. @@ -249,6 +244,16 @@ This is an example of how to programatically create a resizable router: *It is also worth pointing out that if you define the ``router`` in the configuration file then this value will be used instead of any programmatically sent parameters.* +.. note:: + + Resizing is triggered by sending messages to the actor pool, but it is not + completed synchronously; instead a message is sent to the “head” + :class:`Router` to perform the size change. Thus you cannot rely on resizing + to instantaneously create new workers when all others are busy, because the + message just sent will be queued to the mailbox of a busy actor. To remedy + this, configure the pool to use a balancing dispatcher, see `Configuring + Dispatchers`_ for more information. + Custom Router ^^^^^^^^^^^^^ @@ -312,3 +317,23 @@ A router with dynamically resizable number of routees is implemented by providin in ``resizer`` method of the ``RouterConfig``. See ``akka.routing.DefaultResizer`` for inspiration of how to write your own resize strategy. +Configuring Dispatchers +^^^^^^^^^^^^^^^^^^^^^^^ + +The dispatcher for created children of the router will be taken from +:class:`Props` as described in :ref:`dispatchers-java`. For a dynamic pool it +makes sense to configure the :class:`BalancingDispatcher` if the precise +routing is not so important (i.e. no consistent hashing or round-robin is +required); this enables newly created routees to pick up work immediately by +stealing it from their siblings. + +The “head” router, of couse, cannot run on the same balancing dispatcher, +because it does not process the same messages, hence this special actor does +not use the dispatcher configured in :class:`Props`, but takes the +``routerDispatcher`` from the :class:`RouterConfig` instead, which defaults to +the actor system’s default dispatcher. All standard routers allow setting this +property in their constructor or factory method, custom routers have to +implement the method in a suitable way. + +.. includecode:: code/akka/docs/jrouting/CustomRouterDocTestBase.java#dispatchers + diff --git a/akka-docs/scala/code/akka/docs/routing/RouterDocSpec.scala b/akka-docs/scala/code/akka/docs/routing/RouterDocSpec.scala new file mode 100644 index 0000000000..229c66f13e --- /dev/null +++ b/akka-docs/scala/code/akka/docs/routing/RouterDocSpec.scala @@ -0,0 +1,29 @@ +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ +package akka.docs.routing + +import RouterDocSpec.MyActor +import akka.actor.{ Props, Actor } +import akka.testkit.AkkaSpec +import akka.routing.RoundRobinRouter + +object RouterDocSpec { + class MyActor extends Actor { + def receive = { + case _ ⇒ + } + } +} + +class RouterDocSpec extends AkkaSpec { + + import RouterDocSpec._ + + //#dispatchers + val router = system.actorOf(Props[MyActor] + .withRouter(RoundRobinRouter(5, routerDispatcher = "router")) // “head” will run on "router" dispatcher + .withDispatcher("workers")) // MyActor workers will run on "workers" dispatcher + //#dispatchers + +} \ No newline at end of file diff --git a/akka-docs/scala/routing.rst b/akka-docs/scala/routing.rst index f67841df2c..161ab88db9 100644 --- a/akka-docs/scala/routing.rst +++ b/akka-docs/scala/routing.rst @@ -8,11 +8,6 @@ Routing (Scala) .. contents:: :local: -Akka-core includes some building blocks to build more complex message flow handlers, they are listed and explained below: - -Router ------- - A Router is an actor that routes incoming messages to outbound actors. The router routes the messages sent to it to its underlying actors called 'routees'. @@ -250,6 +245,16 @@ This is an example of how to programatically create a resizable router: *It is also worth pointing out that if you define the ``router`` in the configuration file then this value will be used instead of any programmatically sent parameters.* +.. note:: + + Resizing is triggered by sending messages to the actor pool, but it is not + completed synchronously; instead a message is sent to the “head” + :class:`Router` to perform the size change. Thus you cannot rely on resizing + to instantaneously create new workers when all others are busy, because the + message just sent will be queued to the mailbox of a busy actor. To remedy + this, configure the pool to use a balancing dispatcher, see `Configuring + Dispatchers`_ for more information. + Custom Router ^^^^^^^^^^^^^ @@ -311,3 +316,23 @@ A router with dynamically resizable number of routees is implemented by providin in ``resizer`` method of the ``RouterConfig``. See ``akka.routing.DefaultResizer`` for inspiration of how to write your own resize strategy. +Configuring Dispatchers +^^^^^^^^^^^^^^^^^^^^^^^ + +The dispatcher for created children of the router will be taken from +:class:`Props` as described in :ref:`dispatchers-scala`. For a dynamic pool it +makes sense to configure the :class:`BalancingDispatcher` if the precise +routing is not so important (i.e. no consistent hashing or round-robin is +required); this enables newly created routees to pick up work immediately by +stealing it from their siblings. + +The “head” router, of couse, cannot run on the same balancing dispatcher, +because it does not process the same messages, hence this special actor does +not use the dispatcher configured in :class:`Props`, but takes the +``routerDispatcher`` from the :class:`RouterConfig` instead, which defaults to +the actor system’s default dispatcher. All standard routers allow setting this +property in their constructor or factory method, custom routers have to +implement the method in a suitable way. + +.. includecode:: code/akka/docs/routing/RouterDocSpec.scala#dispatchers + diff --git a/akka-remote/src/main/scala/akka/routing/RemoteRouterConfig.scala b/akka-remote/src/main/scala/akka/routing/RemoteRouterConfig.scala index 21f5c400b0..3b1791db8e 100644 --- a/akka-remote/src/main/scala/akka/routing/RemoteRouterConfig.scala +++ b/akka-remote/src/main/scala/akka/routing/RemoteRouterConfig.scala @@ -30,6 +30,8 @@ case class RemoteRouterConfig(local: RouterConfig, nodes: Iterable[String]) exte override def createActor(): Router = local.createActor() + override def routerDispatcher: String = local.routerDispatcher + override def resizer: Option[Resizer] = local.resizer override def withFallback(other: RouterConfig): RouterConfig = other match {