From f85645e46dbe3be651e1590afce41c77755c0764 Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Fri, 10 Feb 2012 10:36:35 +0100 Subject: [PATCH 01/11] Removing the old work redistribution --- .../akka/dispatch/BalancingDispatcher.scala | 22 +------------------ 1 file changed, 1 insertion(+), 21 deletions(-) diff --git a/akka-actor/src/main/scala/akka/dispatch/BalancingDispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/BalancingDispatcher.scala index 8542ac69c8..a6042046ff 100644 --- a/akka-actor/src/main/scala/akka/dispatch/BalancingDispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/BalancingDispatcher.scala @@ -36,7 +36,6 @@ class BalancingDispatcher( extends Dispatcher(_prerequisites, _id, throughput, throughputDeadlineTime, mailboxType, _executorServiceFactoryProvider, _shutdownTimeout) { val buddies = new ConcurrentSkipListSet[ActorCell](akka.util.Helpers.IdentityHashComparator) - val rebalance = new AtomicBoolean(false) val messageQueue: MessageQueue = mailboxType match { case u: UnboundedMailbox ⇒ new QueueBasedMessageQueue with UnboundedMessageQueueSemantics { @@ -84,30 +83,11 @@ class BalancingDispatcher( protected[akka] override def unregister(actor: ActorCell) = { buddies.remove(actor) super.unregister(actor) - intoTheFray(except = actor) //When someone leaves, he tosses a friend into the fray } - def intoTheFray(except: ActorCell): Unit = - if (rebalance.compareAndSet(false, true)) { - try { - val i = buddies.iterator() - - @tailrec - def throwIn(): Unit = { - val n = if (i.hasNext) i.next() else null - if (n eq null) () - else if ((n ne except) && registerForExecution(n.mailbox, false, false)) () - else throwIn() - } - throwIn() - } finally { - rebalance.set(false) - } - } - override protected[akka] def dispatch(receiver: ActorCell, invocation: Envelope) = { messageQueue.enqueue(receiver.self, invocation) registerForExecution(receiver.mailbox, false, false) - intoTheFray(except = receiver) + //Somewhere around here we have to make sure that not only the intended actor is kept busy } } From 43913b0490a88235748f631896217b38f11a2ca6 Mon Sep 17 00:00:00 2001 From: Roland Date: Fri, 10 Feb 2012 20:47:59 +0100 Subject: [PATCH 02/11] change IdentityHashComparator to fall back to a real one MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit System.identityHashCode is not guaranteed to be consistent with equals() (cannot be, just imagine more than 2^32 objects); fix it by checking equals in case 0 would be returned and fall back to a real Comparator in case that’s needed. --- .../scala/akka/dispatch/BalancingDispatcher.scala | 10 +++++++--- akka-actor/src/main/scala/akka/util/Helpers.scala | 14 ++++++++++++-- 2 files changed, 19 insertions(+), 5 deletions(-) diff --git a/akka-actor/src/main/scala/akka/dispatch/BalancingDispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/BalancingDispatcher.scala index a6042046ff..d8274d810a 100644 --- a/akka-actor/src/main/scala/akka/dispatch/BalancingDispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/BalancingDispatcher.scala @@ -9,7 +9,8 @@ import akka.actor.{ ActorCell, ActorRef } import java.util.concurrent.{ LinkedBlockingQueue, ConcurrentLinkedQueue, ConcurrentSkipListSet } import annotation.tailrec import java.util.concurrent.atomic.AtomicBoolean -import akka.util.Duration +import akka.util.{ Duration, Helpers } +import java.util.Comparator /** * An executor based event driven dispatcher which will try to redistribute work from busy actors to idle actors. It is assumed @@ -35,10 +36,13 @@ class BalancingDispatcher( _shutdownTimeout: Duration) extends Dispatcher(_prerequisites, _id, throughput, throughputDeadlineTime, mailboxType, _executorServiceFactoryProvider, _shutdownTimeout) { - val buddies = new ConcurrentSkipListSet[ActorCell](akka.util.Helpers.IdentityHashComparator) + val buddies = new ConcurrentSkipListSet[ActorCell]( + Helpers.identityHashComparator(new Comparator[ActorCell] { + def compare(l: ActorCell, r: ActorCell) = l.self.path compareTo r.self.path + })) val messageQueue: MessageQueue = mailboxType match { - case u: UnboundedMailbox ⇒ new QueueBasedMessageQueue with UnboundedMessageQueueSemantics { + case _: UnboundedMailbox ⇒ new QueueBasedMessageQueue with UnboundedMessageQueueSemantics { final val queue = new ConcurrentLinkedQueue[Envelope] } case BoundedMailbox(cap, timeout) ⇒ new QueueBasedMessageQueue with BoundedMessageQueueSemantics { diff --git a/akka-actor/src/main/scala/akka/util/Helpers.scala b/akka-actor/src/main/scala/akka/util/Helpers.scala index 60e6be8b65..25cb279f2e 100644 --- a/akka-actor/src/main/scala/akka/util/Helpers.scala +++ b/akka-actor/src/main/scala/akka/util/Helpers.scala @@ -21,8 +21,18 @@ object Helpers { if (diff > 0) 1 else if (diff < 0) -1 else 0 } - val IdentityHashComparator = new Comparator[AnyRef] { - def compare(a: AnyRef, b: AnyRef): Int = compareIdentityHash(a, b) + /** + * Create a comparator which will efficiently use `System.identityHashCode`, + * unless that happens to be the same for two non-equals objects, in which + * case the supplied “real” comparator is used; the comparator must be + * consistent with equals, otherwise it would not be an enhancement over + * the identityHashCode. + */ + def identityHashComparator[T <: AnyRef](comp: Comparator[T]): Comparator[T] = new Comparator[T] { + def compare(a: T, b: T): Int = compareIdentityHash(a, b) match { + case 0 if a != b ⇒ comp.compare(a, b) + case x ⇒ x + } } final val base64chars = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789+~" From a2ef3eed7eb0e1764a32c3c018250406c126f8c0 Mon Sep 17 00:00:00 2001 From: Roland Date: Fri, 10 Feb 2012 20:56:52 +0100 Subject: [PATCH 03/11] scaffolding: make debug printout more useful, add assertions --- .../akka/actor/dispatch/ActorModelSpec.scala | 22 ++++++++++++------- .../akka/dispatch/AbstractDispatcher.scala | 2 ++ .../akka/dispatch/BalancingDispatcher.scala | 4 ++-- .../main/scala/akka/dispatch/Mailbox.scala | 5 ++++- 4 files changed, 22 insertions(+), 11 deletions(-) diff --git a/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala index 46bf609c7a..4635fc4749 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala @@ -110,8 +110,9 @@ object ActorModelSpec { val stops = new AtomicLong(0) def getStats(actorRef: ActorRef) = { - stats.putIfAbsent(actorRef, new InterceptorStats) match { - case null ⇒ stats.get(actorRef) + val is = new InterceptorStats + stats.putIfAbsent(actorRef, is) match { + case null ⇒ is case other ⇒ other } } @@ -127,12 +128,12 @@ object ActorModelSpec { } protected[akka] abstract override def register(actor: ActorCell) { - getStats(actor.self).registers.incrementAndGet() + assert(getStats(actor.self).registers.incrementAndGet() == 1) super.register(actor) } protected[akka] abstract override def unregister(actor: ActorCell) { - getStats(actor.self).unregisters.incrementAndGet() + assert(getStats(actor.self).unregisters.incrementAndGet() == 1) super.unregister(actor) } @@ -351,7 +352,7 @@ abstract class ActorModelSpec(config: String) extends AkkaSpec(config) with Defa def flood(num: Int) { val cachedMessage = CountDownNStop(new CountDownLatch(num)) val stopLatch = new CountDownLatch(num) - val waitTime = (30 seconds).dilated.toMillis + val waitTime = (20 seconds).dilated.toMillis val boss = system.actorOf(Props(new Actor { def receive = { case "run" ⇒ for (_ ← 1 to num) (context.watch(context.actorOf(props))) ! cachedMessage @@ -368,13 +369,18 @@ abstract class ActorModelSpec(config: String) extends AkkaSpec(config) with Defa val buddies = dispatcher.buddies val mq = dispatcher.messageQueue - System.err.println("Buddies left: ") - buddies.toArray foreach { + System.err.println("Buddies left: " + buddies.size + " stopLatch: " + stopLatch.getCount + " inhab:" + dispatcher.inhab) + buddies.toArray sorted new Ordering[AnyRef] { + def compare(l: AnyRef, r: AnyRef) = (l, r) match { + case (ll: ActorCell, rr: ActorCell) ⇒ ll.self.path.toString.compareTo(rr.self.path.toString) + } + } foreach { case cell: ActorCell ⇒ System.err.println(" - " + cell.self.path + " " + cell.isTerminated + " " + cell.mailbox.status + " " + cell.mailbox.numberOfMessages + " " + SystemMessage.size(cell.mailbox.systemDrain())) } - System.err.println("Mailbox: " + mq.numberOfMessages + " " + mq.hasMessages + " ") + System.err.println("Mailbox: " + mq.numberOfMessages + " " + mq.hasMessages) + Iterator.continually(mq.dequeue) takeWhile (_ ne null) foreach System.err.println case _ ⇒ } diff --git a/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala index 1b31be630c..da0e4fdc6e 100644 --- a/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala @@ -260,6 +260,8 @@ abstract class MessageDispatcher(val prerequisites: DispatcherPrerequisites) ext mailBox.cleanUp() } + def inhab = inhabitantsUpdater.get(this) + private val shutdownAction = new Runnable { @tailrec final def run() { diff --git a/akka-actor/src/main/scala/akka/dispatch/BalancingDispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/BalancingDispatcher.scala index d8274d810a..4195d0ec61 100644 --- a/akka-actor/src/main/scala/akka/dispatch/BalancingDispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/BalancingDispatcher.scala @@ -81,11 +81,11 @@ class BalancingDispatcher( protected[akka] override def register(actor: ActorCell) = { super.register(actor) - buddies.add(actor) + assert(buddies.add(actor)) } protected[akka] override def unregister(actor: ActorCell) = { - buddies.remove(actor) + assert(buddies.remove(actor)) super.unregister(actor) } diff --git a/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala b/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala index cc15ae2173..32fce8564e 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala @@ -190,7 +190,10 @@ private[akka] abstract class Mailbox(val actor: ActorCell) extends MessageQueue var nextMessage = systemDrain() try { while ((nextMessage ne null) && !isClosed) { - if (debug) println(actor.self + " processing system message " + nextMessage + " with children " + actor.childrenRefs) + if (debug) println(actor.self + " processing system message " + nextMessage + " with " + + (if (actor.childrenRefs.isEmpty) "no children" + else if (actor.childrenRefs.size > 20) actor.childrenRefs.size + " children" + else actor.childrenRefs.mkString("children:\n ", "\n ", ""))) actor systemInvoke nextMessage nextMessage = nextMessage.next // don’t ever execute normal message when system message present! From 880f09be2215c9e8ecacb6cc5e020af570bd2ac9 Mon Sep 17 00:00:00 2001 From: Roland Date: Fri, 10 Feb 2012 21:29:11 +0100 Subject: [PATCH 04/11] special start-up sequence for actors on BalancingDispatcher Normally the ActorCell would register the actor with the dispatcher (yeah, I moved it into the logical order, because the other one was specifically done for BD but does not work out) and then dispatch the Create() message. This does not work for BD, because then the actor could potentiall process a message before Create() is enqueued, so override systemDispatch() to drop Create() and insert that during register() (which is called from attach()), making sure to achieve the following order: - enqueue Create() - register with dispatcher - add to buddies - schedule mailbox --- akka-actor/src/main/scala/akka/actor/ActorCell.scala | 4 ++-- .../scala/akka/dispatch/BalancingDispatcher.scala | 11 +++++++++++ 2 files changed, 13 insertions(+), 2 deletions(-) diff --git a/akka-actor/src/main/scala/akka/actor/ActorCell.scala b/akka-actor/src/main/scala/akka/actor/ActorCell.scala index aa718e12c8..f4112e5d37 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorCell.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorCell.scala @@ -290,10 +290,10 @@ private[akka] class ActorCell( // ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅ parent.sendSystemMessage(akka.dispatch.Supervise(self)) + dispatcher.attach(this) + // ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅ dispatcher.systemDispatch(this, Create()) - - dispatcher.attach(this) } // ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅ diff --git a/akka-actor/src/main/scala/akka/dispatch/BalancingDispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/BalancingDispatcher.scala index 4195d0ec61..4ef7607016 100644 --- a/akka-actor/src/main/scala/akka/dispatch/BalancingDispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/BalancingDispatcher.scala @@ -79,9 +79,20 @@ class BalancingDispatcher( } } + protected[akka] override def systemDispatch(receiver: ActorCell, invocation: SystemMessage): Unit = + invocation match { + case Create() ⇒ + case x ⇒ super.systemDispatch(receiver, invocation) + } + protected[akka] override def register(actor: ActorCell) = { + val mbox = actor.mailbox + mbox.systemEnqueue(actor.self, Create()) + // must make sure that Create() is the first message enqueued in this mailbox super.register(actor) assert(buddies.add(actor)) + // must make sure that buddy-add is executed before the actor has had a chance to die + registerForExecution(mbox, false, true) } protected[akka] override def unregister(actor: ActorCell) = { From 5a9ec45d01b4cf4554cad9420d45091542ca7611 Mon Sep 17 00:00:00 2001 From: Roland Date: Fri, 10 Feb 2012 22:16:32 +0100 Subject: [PATCH 05/11] first stab at balancing algorithm: it passes the tests - add new config item "buddy-wakeup-threshold" which defaults to 5 - if BWT>=0, then check mailbox.numberOfMessages in case the target actor was not scheduled during dispatch and schedule a buddie if that is found >=BWT (BWT is a getfield) - if during unregister() there are messages in the queue, schedule a buddie This way people can tune which behavior they want, knowing full well that numberOfMessages is O(n). --- .../scala/akka/actor/dispatch/ActorModelSpec.scala | 3 ++- .../src/test/scala/akka/config/ConfigSpec.scala | 1 + akka-actor/src/main/resources/reference.conf | 7 +++++++ .../scala/akka/dispatch/BalancingDispatcher.scala | 14 ++++++++++---- .../src/main/scala/akka/dispatch/Dispatchers.scala | 3 ++- 5 files changed, 22 insertions(+), 6 deletions(-) diff --git a/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala index 4635fc4749..a735e7298b 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala @@ -546,7 +546,8 @@ object BalancingDispatcherModelSpec { Duration(config.getNanoseconds("throughput-deadline-time"), TimeUnit.NANOSECONDS), mailboxType, configureExecutor(), - Duration(config.getMilliseconds("shutdown-timeout"), TimeUnit.MILLISECONDS)) with MessageDispatcherInterceptor + Duration(config.getMilliseconds("shutdown-timeout"), TimeUnit.MILLISECONDS), + config.getInt("buddy-wakeup-threshold")) with MessageDispatcherInterceptor override def dispatcher(): MessageDispatcher = instance } diff --git a/akka-actor-tests/src/test/scala/akka/config/ConfigSpec.scala b/akka-actor-tests/src/test/scala/akka/config/ConfigSpec.scala index dd5149ad8e..13bb3b4f27 100644 --- a/akka-actor-tests/src/test/scala/akka/config/ConfigSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/config/ConfigSpec.scala @@ -53,6 +53,7 @@ class ConfigSpec extends AkkaSpec(ConfigFactory.defaultReference) { c.getMilliseconds("shutdown-timeout") must equal(1 * 1000) c.getInt("throughput") must equal(5) c.getMilliseconds("throughput-deadline-time") must equal(0) + c.getInt("buddy-wakeup-threshold") must equal(5) } //Fork join executor config diff --git a/akka-actor/src/main/resources/reference.conf b/akka-actor/src/main/resources/reference.conf index bc52938a7d..b7e0563339 100644 --- a/akka-actor/src/main/resources/reference.conf +++ b/akka-actor/src/main/resources/reference.conf @@ -246,6 +246,13 @@ akka { # mailbox is used. The Class of the FQCN must have a constructor with a # com.typesafe.config.Config parameter. mailbox-type = "" + + # For BalancingDispatcher: if during message enqueuing the target actor is + # already busy and at least this number of messages is currently in the queue, + # then wake up another actor from the same dispatcher at random. + # Set to -1 to disable (which will also skip the possibly expensive check; + # obtaining the mailbox size is O(n) for the default mailboxes). + buddy-wakeup-threshold = 5 } debug { diff --git a/akka-actor/src/main/scala/akka/dispatch/BalancingDispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/BalancingDispatcher.scala index 4ef7607016..63fafef7d1 100644 --- a/akka-actor/src/main/scala/akka/dispatch/BalancingDispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/BalancingDispatcher.scala @@ -10,7 +10,7 @@ import java.util.concurrent.{ LinkedBlockingQueue, ConcurrentLinkedQueue, Concur import annotation.tailrec import java.util.concurrent.atomic.AtomicBoolean import akka.util.{ Duration, Helpers } -import java.util.Comparator +import java.util.{ Comparator, Iterator } /** * An executor based event driven dispatcher which will try to redistribute work from busy actors to idle actors. It is assumed @@ -33,7 +33,8 @@ class BalancingDispatcher( throughputDeadlineTime: Duration, mailboxType: MailboxType, _executorServiceFactoryProvider: ExecutorServiceFactoryProvider, - _shutdownTimeout: Duration) + _shutdownTimeout: Duration, + buddyWakeupThreshold: Int) extends Dispatcher(_prerequisites, _id, throughput, throughputDeadlineTime, mailboxType, _executorServiceFactoryProvider, _shutdownTimeout) { val buddies = new ConcurrentSkipListSet[ActorCell]( @@ -98,11 +99,16 @@ class BalancingDispatcher( protected[akka] override def unregister(actor: ActorCell) = { assert(buddies.remove(actor)) super.unregister(actor) + if (messageQueue.hasMessages) registerOne() } override protected[akka] def dispatch(receiver: ActorCell, invocation: Envelope) = { messageQueue.enqueue(receiver.self, invocation) - registerForExecution(receiver.mailbox, false, false) - //Somewhere around here we have to make sure that not only the intended actor is kept busy + if (!registerForExecution(receiver.mailbox, false, false) && + buddyWakeupThreshold >= 0 && + messageQueue.numberOfMessages >= buddyWakeupThreshold) registerOne() } + + @tailrec private def registerOne(i: Iterator[ActorCell] = buddies.iterator): Unit = + if (i.hasNext && !registerForExecution(i.next.mailbox, false, false)) registerOne(i) } diff --git a/akka-actor/src/main/scala/akka/dispatch/Dispatchers.scala b/akka-actor/src/main/scala/akka/dispatch/Dispatchers.scala index 8e99e05b06..b9fd3f784b 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Dispatchers.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Dispatchers.scala @@ -189,7 +189,8 @@ class BalancingDispatcherConfigurator(config: Config, prerequisites: DispatcherP config.getInt("throughput"), Duration(config.getNanoseconds("throughput-deadline-time"), TimeUnit.NANOSECONDS), mailboxType, configureExecutor(), - Duration(config.getMilliseconds("shutdown-timeout"), TimeUnit.MILLISECONDS)) + Duration(config.getMilliseconds("shutdown-timeout"), TimeUnit.MILLISECONDS), + config.getInt("buddy-wakeup-threshold")) /** * Returns the same dispatcher instance for each invocation From 251a7cc7e399778d4e2765742ea7dcb69086e64b Mon Sep 17 00:00:00 2001 From: Roland Date: Mon, 13 Feb 2012 12:38:59 +0100 Subject: [PATCH 06/11] clean up BalancingDispatcher: - change from messageQueue.numberOfMessages to maintaining an AtomicLong for performance reasons - add comments/scaladoc where missing - remove some assert()s - fix ResiserSpec to employ buddy-wakeup-threshold --- .../akka/actor/dispatch/ActorModelSpec.scala | 4 +- .../test/scala/akka/routing/ResizerSpec.scala | 1 + .../src/main/scala/akka/actor/ActorCell.scala | 5 ++ .../akka/dispatch/AbstractDispatcher.scala | 2 +- .../akka/dispatch/BalancingDispatcher.scala | 55 +++++++++++++++---- .../main/scala/akka/dispatch/Mailbox.scala | 29 +++++++--- 6 files changed, 74 insertions(+), 22 deletions(-) diff --git a/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala index a735e7298b..15886973b2 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala @@ -352,7 +352,7 @@ abstract class ActorModelSpec(config: String) extends AkkaSpec(config) with Defa def flood(num: Int) { val cachedMessage = CountDownNStop(new CountDownLatch(num)) val stopLatch = new CountDownLatch(num) - val waitTime = (20 seconds).dilated.toMillis + val waitTime = (30 seconds).dilated.toMillis val boss = system.actorOf(Props(new Actor { def receive = { case "run" ⇒ for (_ ← 1 to num) (context.watch(context.actorOf(props))) ! cachedMessage @@ -369,7 +369,7 @@ abstract class ActorModelSpec(config: String) extends AkkaSpec(config) with Defa val buddies = dispatcher.buddies val mq = dispatcher.messageQueue - System.err.println("Buddies left: " + buddies.size + " stopLatch: " + stopLatch.getCount + " inhab:" + dispatcher.inhab) + System.err.println("Buddies left: " + buddies.size + " stopLatch: " + stopLatch.getCount + " inhab:" + dispatcher.inhabitants) buddies.toArray sorted new Ordering[AnyRef] { def compare(l: AnyRef, r: AnyRef) = (l, r) match { case (ll: ActorCell, rr: ActorCell) ⇒ ll.self.path.toString.compareTo(rr.self.path.toString) diff --git a/akka-actor-tests/src/test/scala/akka/routing/ResizerSpec.scala b/akka-actor-tests/src/test/scala/akka/routing/ResizerSpec.scala index 2130afe107..26b5021c18 100644 --- a/akka-actor-tests/src/test/scala/akka/routing/ResizerSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/routing/ResizerSpec.scala @@ -26,6 +26,7 @@ object ResizerSpec { } bal-disp { type = BalancingDispatcher + buddy-wakeup-threshold = 1 } """ diff --git a/akka-actor/src/main/scala/akka/actor/ActorCell.scala b/akka-actor/src/main/scala/akka/actor/ActorCell.scala index f4112e5d37..9268406086 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorCell.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorCell.scala @@ -290,6 +290,11 @@ private[akka] class ActorCell( // ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅ parent.sendSystemMessage(akka.dispatch.Supervise(self)) + /* + * attach before submitting the mailbox for the first time, because + * otherwise the actor could already be dead before the dispatcher is + * informed of its existence (with reversed attach/detach sequence). + */ dispatcher.attach(this) // ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅ diff --git a/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala index da0e4fdc6e..9d1575c4ec 100644 --- a/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala @@ -260,7 +260,7 @@ abstract class MessageDispatcher(val prerequisites: DispatcherPrerequisites) ext mailBox.cleanUp() } - def inhab = inhabitantsUpdater.get(this) + def inhabitants: Long = inhabitantsUpdater.get(this) private val shutdownAction = new Runnable { @tailrec diff --git a/akka-actor/src/main/scala/akka/dispatch/BalancingDispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/BalancingDispatcher.scala index 63fafef7d1..70101578f0 100644 --- a/akka-actor/src/main/scala/akka/dispatch/BalancingDispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/BalancingDispatcher.scala @@ -11,6 +11,8 @@ import annotation.tailrec import java.util.concurrent.atomic.AtomicBoolean import akka.util.{ Duration, Helpers } import java.util.{ Comparator, Iterator } +import akka.util.Unsafe +import java.util.concurrent.atomic.AtomicLong /** * An executor based event driven dispatcher which will try to redistribute work from busy actors to idle actors. It is assumed @@ -43,18 +45,46 @@ class BalancingDispatcher( })) val messageQueue: MessageQueue = mailboxType match { - case _: UnboundedMailbox ⇒ new QueueBasedMessageQueue with UnboundedMessageQueueSemantics { - final val queue = new ConcurrentLinkedQueue[Envelope] - } - case BoundedMailbox(cap, timeout) ⇒ new QueueBasedMessageQueue with BoundedMessageQueueSemantics { - final val queue = new LinkedBlockingQueue[Envelope](cap) - final val pushTimeOut = timeout - } + case UnboundedMailbox() ⇒ + new QueueBasedMessageQueue with UnboundedMessageQueueSemantics { + final val queue = new ConcurrentLinkedQueue[Envelope] + + override def enqueue(receiver: ActorRef, handle: Envelope) = { + super.enqueue(receiver, handle) + _pressure.getAndIncrement() + } + + override def dequeue(): Envelope = + super.dequeue() match { + case null ⇒ null + case x ⇒ _pressure.getAndDecrement(); x + } + } + + case BoundedMailbox(cap, timeout) ⇒ + new QueueBasedMessageQueue with BoundedMessageQueueSemantics { + final val queue = new LinkedBlockingQueue[Envelope](cap) + final val pushTimeOut = timeout + + override def enqueue(receiver: ActorRef, handle: Envelope) = { + super.enqueue(receiver, handle) + _pressure.getAndIncrement() + } + + override def dequeue(): Envelope = + super.dequeue() match { + case null ⇒ null + case x ⇒ _pressure.getAndDecrement(); x + } + } + case other ⇒ throw new IllegalArgumentException("Only handles BoundedMailbox and UnboundedMailbox, but you specified [" + other + "]") } protected[akka] override def createMailbox(actor: ActorCell): Mailbox = new SharingMailbox(actor) + private val _pressure = new AtomicLong + class SharingMailbox(_actor: ActorCell) extends Mailbox(_actor) with DefaultSystemMessageQueue { final def enqueue(receiver: ActorRef, handle: Envelope) = messageQueue.enqueue(receiver, handle) @@ -81,6 +111,11 @@ class BalancingDispatcher( } protected[akka] override def systemDispatch(receiver: ActorCell, invocation: SystemMessage): Unit = + /* + * need to filter out Create() messages here because BalancingDispatcher + * already enqueues this within register(), which is called first by the + * ActorCell. + */ invocation match { case Create() ⇒ case x ⇒ super.systemDispatch(receiver, invocation) @@ -91,13 +126,13 @@ class BalancingDispatcher( mbox.systemEnqueue(actor.self, Create()) // must make sure that Create() is the first message enqueued in this mailbox super.register(actor) - assert(buddies.add(actor)) + buddies.add(actor) // must make sure that buddy-add is executed before the actor has had a chance to die registerForExecution(mbox, false, true) } protected[akka] override def unregister(actor: ActorCell) = { - assert(buddies.remove(actor)) + buddies.remove(actor) super.unregister(actor) if (messageQueue.hasMessages) registerOne() } @@ -106,7 +141,7 @@ class BalancingDispatcher( messageQueue.enqueue(receiver.self, invocation) if (!registerForExecution(receiver.mailbox, false, false) && buddyWakeupThreshold >= 0 && - messageQueue.numberOfMessages >= buddyWakeupThreshold) registerOne() + _pressure.get >= buddyWakeupThreshold) registerOne() } @tailrec private def registerOne(i: Iterator[ActorCell] = buddies.iterator): Unit = diff --git a/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala b/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala index 32fce8564e..4c50cb5c8d 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala @@ -239,15 +239,26 @@ private[akka] abstract class Mailbox(val actor: ActorCell) extends MessageQueue } trait MessageQueue { - /* - * These method need to be implemented in subclasses; they should not rely on the internal stuff above. + /** + * Try to enqueue the message to this queue, or throw an exception. */ - def enqueue(receiver: ActorRef, handle: Envelope) + def enqueue(receiver: ActorRef, handle: Envelope): Unit // NOTE: receiver is used only in two places, but cannot be removed + /** + * Try to dequeue the next message from this queue, return null failing that. + */ def dequeue(): Envelope + /** + * Should return the current number of messages held in this queue; may + * always return 0 if no other value is available efficiently. Do not use + * this for testing for presence of messages, use `hasMessages` instead. + */ def numberOfMessages: Int + /** + * Indicates whether this queue is non-empty. + */ def hasMessages: Boolean } @@ -295,15 +306,15 @@ trait DefaultSystemMessageQueue { self: Mailbox ⇒ } trait UnboundedMessageQueueSemantics extends QueueBasedMessageQueue { - final def enqueue(receiver: ActorRef, handle: Envelope): Unit = queue add handle - final def dequeue(): Envelope = queue.poll() + def enqueue(receiver: ActorRef, handle: Envelope): Unit = queue add handle + def dequeue(): Envelope = queue.poll() } trait BoundedMessageQueueSemantics extends QueueBasedMessageQueue { def pushTimeOut: Duration override def queue: BlockingQueue[Envelope] - final def enqueue(receiver: ActorRef, handle: Envelope) { + def enqueue(receiver: ActorRef, handle: Envelope) { if (pushTimeOut.length > 0) { queue.offer(handle, pushTimeOut.length, pushTimeOut.unit) || { throw new MessageQueueAppendFailedException("Couldn't enqueue message " + handle + " to " + receiver) @@ -311,13 +322,13 @@ trait BoundedMessageQueueSemantics extends QueueBasedMessageQueue { } else queue put handle } - final def dequeue(): Envelope = queue.poll() + def dequeue(): Envelope = queue.poll() } trait QueueBasedMessageQueue extends MessageQueue { def queue: Queue[Envelope] - final def numberOfMessages = queue.size - final def hasMessages = !queue.isEmpty + def numberOfMessages = queue.size + def hasMessages = !queue.isEmpty } /** From bb40c1ae307bdb5147b4de03efb7f239f8d2d424 Mon Sep 17 00:00:00 2001 From: Roland Date: Mon, 13 Feb 2012 14:43:20 +0100 Subject: [PATCH 07/11] tweak ResizerSpec to work better with async Resize(), see #1814 - previously relied on resize() being invoked before enqueueing to the mailbox, which is not at all guaranteed any longer. --- .../test/scala/akka/routing/ResizerSpec.scala | 52 +++++++++---------- .../src/main/scala/akka/routing/Routing.scala | 3 +- 2 files changed, 26 insertions(+), 29 deletions(-) diff --git a/akka-actor-tests/src/test/scala/akka/routing/ResizerSpec.scala b/akka-actor-tests/src/test/scala/akka/routing/ResizerSpec.scala index 26b5021c18..b9765c8e92 100644 --- a/akka-actor-tests/src/test/scala/akka/routing/ResizerSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/routing/ResizerSpec.scala @@ -11,6 +11,7 @@ import akka.util.duration._ import akka.actor.ActorRef import java.util.concurrent.atomic.AtomicInteger import akka.pattern.ask +import akka.util.Duration object ResizerSpec { @@ -161,53 +162,48 @@ class ResizerSpec extends AkkaSpec(ResizerSpec.config) with DefaultTimeout with // as influenced by the backlog of blocking pooled actors val resizer = DefaultResizer( - lowerBound = 2, - upperBound = 4, + lowerBound = 3, + upperBound = 5, rampupRate = 0.1, + backoffRate = 0.0, pressureThreshold = 1, messagesPerResize = 1, backoffThreshold = 0.0) val router = system.actorOf(Props(new Actor { def receive = { - case (n: Int, latch: TestLatch, count: AtomicInteger) ⇒ - (n millis).dilated.sleep - count.incrementAndGet - latch.countDown() + case d: Duration ⇒ d.dilated.sleep; sender ! "done" + case "echo" ⇒ sender ! "reply" } }).withRouter(RoundRobinRouter(resizer = Some(resizer)))) // first message should create the minimum number of routees - router ! 1 + router ! "echo" + expectMsg("reply") - Await.result(router ? CurrentRoutees, 5 seconds).asInstanceOf[RouterRoutees].routees.size must be(2) + def routees(r: ActorRef): Int = { + r ! CurrentRoutees + expectMsgType[RouterRoutees].routees.size + } - def loop(loops: Int, t: Int, latch: TestLatch, count: AtomicInteger) = { - (100 millis).dilated.sleep - for (m ← 0 until loops) { - router.!((t, latch, count)) - (100 millis).dilated.sleep - } + routees(router) must be(3) + + def loop(loops: Int, d: Duration) = { + for (m ← 0 until loops) router ! d + for (m ← 0 until loops) expectMsg(d * 2, "done") } // 2 more should go thru without triggering more - val count1 = new AtomicInteger - val latch1 = TestLatch(2) - loop(2, 200, latch1, count1) - Await.ready(latch1, TestLatch.DefaultTimeout) - count1.get must be(2) + loop(2, 200 millis) - Await.result(router ? CurrentRoutees, 5 seconds).asInstanceOf[RouterRoutees].routees.size must be(2) + routees(router) must be(3) // a whole bunch should max it out - val count2 = new AtomicInteger - val latch2 = TestLatch(10) - loop(10, 500, latch2, count2) - Await.ready(latch2, TestLatch.DefaultTimeout) - count2.get must be(10) - - Await.result(router ? CurrentRoutees, 5 seconds).asInstanceOf[RouterRoutees].routees.size must be(4) + loop(4, 500 millis) + awaitCond(routees(router) == 4) + loop(10, 500 millis) + awaitCond(routees(router) == 5) } "backoff" in { @@ -240,7 +236,7 @@ class ResizerSpec extends AkkaSpec(ResizerSpec.config) with DefaultTimeout with (300 millis).dilated.sleep // let it cool down - for (m ← 0 to 3) { + for (m ← 0 to 5) { router ! 1 (500 millis).dilated.sleep } diff --git a/akka-actor/src/main/scala/akka/routing/Routing.scala b/akka-actor/src/main/scala/akka/routing/Routing.scala index da2c81d1a7..b050a21b53 100644 --- a/akka-actor/src/main/scala/akka/routing/Routing.scala +++ b/akka-actor/src/main/scala/akka/routing/Routing.scala @@ -1028,7 +1028,8 @@ case class DefaultResizer( */ def capacity(routees: IndexedSeq[ActorRef]): Int = { val currentSize = routees.size - val delta = filter(pressure(routees), currentSize) + val press = pressure(routees) + val delta = filter(press, currentSize) val proposed = currentSize + delta if (proposed < lowerBound) delta + (lowerBound - proposed) From 7c57a9d60e6f0f9fb014b61d649f3ab802b5fd89 Mon Sep 17 00:00:00 2001 From: Roland Date: Mon, 13 Feb 2012 15:33:31 +0100 Subject: [PATCH 08/11] final touch to actor start-up sequence split systemDispatch(Create()) into systemEnqueue(Create()) directly after createMailbox and registerForExecution from within Dispatcher.attach() (resp. CallingThreadDispatcher.register() does its own thing) --- .../src/main/scala/akka/actor/ActorCell.scala | 15 ++++++------ .../akka/dispatch/AbstractDispatcher.scala | 11 ++++++--- .../akka/dispatch/BalancingDispatcher.scala | 24 ++++--------------- .../testkit/CallingThreadDispatcher.scala | 13 +++++++++- 4 files changed, 31 insertions(+), 32 deletions(-) diff --git a/akka-actor/src/main/scala/akka/actor/ActorCell.scala b/akka-actor/src/main/scala/akka/actor/ActorCell.scala index 9268406086..c22529b2c8 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorCell.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorCell.scala @@ -285,20 +285,19 @@ private[akka] class ActorCell( final def isTerminated: Boolean = mailbox.isClosed final def start(): Unit = { + /* + * Create the mailbox and enqueue the Create() message to ensure that + * this is processed before anything else. + */ mailbox = dispatcher.createMailbox(this) + // ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅ + mailbox.systemEnqueue(self, Create()) // ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅ parent.sendSystemMessage(akka.dispatch.Supervise(self)) - /* - * attach before submitting the mailbox for the first time, because - * otherwise the actor could already be dead before the dispatcher is - * informed of its existence (with reversed attach/detach sequence). - */ + // This call is expected to start off the actor by scheduling its mailbox. dispatcher.attach(this) - - // ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅ - dispatcher.systemDispatch(this, Create()) } // ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅ diff --git a/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala index 9d1575c4ec..22eadb55d5 100644 --- a/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala @@ -185,9 +185,14 @@ abstract class MessageDispatcher(val prerequisites: DispatcherPrerequisites) ext def id: String /** - * Attaches the specified actor instance to this dispatcher + * Attaches the specified actor instance to this dispatcher, which includes + * scheduling it to run for the first time (Create() is expected to have + * been enqueued by the ActorCell upon mailbox creation). */ - final def attach(actor: ActorCell): Unit = register(actor) + final def attach(actor: ActorCell): Unit = { + register(actor) + registerForExecution(actor.mailbox, false, true) + } /** * Detaches the specified actor instance from this dispatcher @@ -243,7 +248,7 @@ abstract class MessageDispatcher(val prerequisites: DispatcherPrerequisites) ext () ⇒ if (inhabitantsUpdater.decrementAndGet(this) == 0) ifSensibleToDoSoThenScheduleShutdown() /** - * If you override it, you must call it. But only ever once. See "attach" for only invocation + * If you override it, you must call it. But only ever once. See "attach" for only invocation. */ protected[akka] def register(actor: ActorCell) { inhabitantsUpdater.incrementAndGet(this) diff --git a/akka-actor/src/main/scala/akka/dispatch/BalancingDispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/BalancingDispatcher.scala index 70101578f0..61ac773aa0 100644 --- a/akka-actor/src/main/scala/akka/dispatch/BalancingDispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/BalancingDispatcher.scala @@ -110,40 +110,24 @@ class BalancingDispatcher( } } - protected[akka] override def systemDispatch(receiver: ActorCell, invocation: SystemMessage): Unit = - /* - * need to filter out Create() messages here because BalancingDispatcher - * already enqueues this within register(), which is called first by the - * ActorCell. - */ - invocation match { - case Create() ⇒ - case x ⇒ super.systemDispatch(receiver, invocation) - } - protected[akka] override def register(actor: ActorCell) = { - val mbox = actor.mailbox - mbox.systemEnqueue(actor.self, Create()) - // must make sure that Create() is the first message enqueued in this mailbox super.register(actor) buddies.add(actor) - // must make sure that buddy-add is executed before the actor has had a chance to die - registerForExecution(mbox, false, true) } protected[akka] override def unregister(actor: ActorCell) = { buddies.remove(actor) super.unregister(actor) - if (messageQueue.hasMessages) registerOne() + if (messageQueue.hasMessages) scheduleOne() } override protected[akka] def dispatch(receiver: ActorCell, invocation: Envelope) = { messageQueue.enqueue(receiver.self, invocation) if (!registerForExecution(receiver.mailbox, false, false) && buddyWakeupThreshold >= 0 && - _pressure.get >= buddyWakeupThreshold) registerOne() + _pressure.get >= buddyWakeupThreshold) scheduleOne() } - @tailrec private def registerOne(i: Iterator[ActorCell] = buddies.iterator): Unit = - if (i.hasNext && !registerForExecution(i.next.mailbox, false, false)) registerOne(i) + @tailrec private def scheduleOne(i: Iterator[ActorCell] = buddies.iterator): Unit = + if (i.hasNext && !registerForExecution(i.next.mailbox, false, false)) scheduleOne(i) } diff --git a/akka-testkit/src/main/scala/akka/testkit/CallingThreadDispatcher.scala b/akka-testkit/src/main/scala/akka/testkit/CallingThreadDispatcher.scala index 8282ee58f5..8b2d15a079 100644 --- a/akka-testkit/src/main/scala/akka/testkit/CallingThreadDispatcher.scala +++ b/akka-testkit/src/main/scala/akka/testkit/CallingThreadDispatcher.scala @@ -8,7 +8,7 @@ import java.util.concurrent.locks.ReentrantLock import java.util.LinkedList import scala.annotation.tailrec import com.typesafe.config.Config -import akka.actor.{ ExtensionIdProvider, ExtensionId, Extension, ExtendedActorSystem, ActorRef, ActorCell } +import akka.actor.{ ActorInitializationException, ExtensionIdProvider, ExtensionId, Extension, ExtendedActorSystem, ActorRef, ActorCell } import akka.dispatch.{ TaskInvocation, SystemMessage, Suspend, Resume, MessageDispatcherConfigurator, MessageDispatcher, Mailbox, Envelope, DispatcherPrerequisites, DefaultSystemMessageQueue } import akka.util.duration.intToDurationInt import akka.util.{ Switch, Duration } @@ -132,6 +132,17 @@ class CallingThreadDispatcher( protected[akka] override def shutdownTimeout = 1 second + protected[akka] override def register(actor: ActorCell): Unit = { + super.register(actor) + actor.mailbox match { + case mbox: CallingThreadMailbox ⇒ + val queue = mbox.queue + queue.enter + runQueue(mbox, queue) + case x ⇒ throw new ActorInitializationException("expected CallingThreadMailbox, got " + x.getClass) + } + } + override def suspend(actor: ActorCell) { actor.mailbox match { case m: CallingThreadMailbox ⇒ m.suspendSwitch.switchOn From 607ec4c2cf6b7a4a57665dace3252c0562c35d22 Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Mon, 13 Feb 2012 18:14:35 +0100 Subject: [PATCH 09/11] Switching approaches to check for max throttle --- .../akka/actor/dispatch/ActorModelSpec.scala | 2 +- akka-actor/src/main/resources/reference.conf | 10 ++--- .../akka/dispatch/AbstractDispatcher.scala | 7 ++- .../akka/dispatch/BalancingDispatcher.scala | 44 +++++-------------- .../main/scala/akka/dispatch/Dispatcher.scala | 7 ++- .../scala/akka/dispatch/Dispatchers.scala | 2 +- .../akka/dispatch/ThreadPoolBuilder.scala | 8 ++-- 7 files changed, 30 insertions(+), 50 deletions(-) diff --git a/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala index 15886973b2..deafb9cdc1 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala @@ -547,7 +547,7 @@ object BalancingDispatcherModelSpec { mailboxType, configureExecutor(), Duration(config.getMilliseconds("shutdown-timeout"), TimeUnit.MILLISECONDS), - config.getInt("buddy-wakeup-threshold")) with MessageDispatcherInterceptor + config.getBoolean("attempt-teamwork")) with MessageDispatcherInterceptor override def dispatcher(): MessageDispatcher = instance } diff --git a/akka-actor/src/main/resources/reference.conf b/akka-actor/src/main/resources/reference.conf index b7e0563339..9e0ee70ff1 100644 --- a/akka-actor/src/main/resources/reference.conf +++ b/akka-actor/src/main/resources/reference.conf @@ -247,12 +247,10 @@ akka { # com.typesafe.config.Config parameter. mailbox-type = "" - # For BalancingDispatcher: if during message enqueuing the target actor is - # already busy and at least this number of messages is currently in the queue, - # then wake up another actor from the same dispatcher at random. - # Set to -1 to disable (which will also skip the possibly expensive check; - # obtaining the mailbox size is O(n) for the default mailboxes). - buddy-wakeup-threshold = 5 + # For BalancingDispatcher: If the balancing dispatcher should attempt to + # schedule idle actors using the same dispatcher when a message comes in, + # and the dispatchers ExecutorService is not fully busy already. + attempt-teamwork = on } debug { diff --git a/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala index 22eadb55d5..6046e249af 100644 --- a/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala @@ -156,7 +156,10 @@ trait ExecutionContext { * log the problem or whatever is appropriate for the implementation. */ def reportFailure(t: Throwable): Unit +} +private[akka] trait LoadMetrics { self: Executor ⇒ + def atFullThrottle(): Boolean } object MessageDispatcher { @@ -447,11 +450,13 @@ object ForkJoinExecutorConfigurator { final class AkkaForkJoinPool(parallelism: Int, threadFactory: ForkJoinPool.ForkJoinWorkerThreadFactory, unhandledExceptionHandler: Thread.UncaughtExceptionHandler) - extends ForkJoinPool(parallelism, threadFactory, unhandledExceptionHandler, true) { + extends ForkJoinPool(parallelism, threadFactory, unhandledExceptionHandler, true) with LoadMetrics { override def execute(r: Runnable): Unit = r match { case m: Mailbox ⇒ super.execute(new MailboxExecutionTask(m)) case other ⇒ super.execute(other) } + + def atFullThrottle(): Boolean = this.getActiveThreadCount() >= this.getParallelism() } /** diff --git a/akka-actor/src/main/scala/akka/dispatch/BalancingDispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/BalancingDispatcher.scala index 61ac773aa0..d2d978341c 100644 --- a/akka-actor/src/main/scala/akka/dispatch/BalancingDispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/BalancingDispatcher.scala @@ -4,15 +4,11 @@ package akka.dispatch -import util.DynamicVariable import akka.actor.{ ActorCell, ActorRef } -import java.util.concurrent.{ LinkedBlockingQueue, ConcurrentLinkedQueue, ConcurrentSkipListSet } import annotation.tailrec -import java.util.concurrent.atomic.AtomicBoolean import akka.util.{ Duration, Helpers } import java.util.{ Comparator, Iterator } -import akka.util.Unsafe -import java.util.concurrent.atomic.AtomicLong +import java.util.concurrent.{ Executor, LinkedBlockingQueue, ConcurrentLinkedQueue, ConcurrentSkipListSet } /** * An executor based event driven dispatcher which will try to redistribute work from busy actors to idle actors. It is assumed @@ -36,7 +32,7 @@ class BalancingDispatcher( mailboxType: MailboxType, _executorServiceFactoryProvider: ExecutorServiceFactoryProvider, _shutdownTimeout: Duration, - buddyWakeupThreshold: Int) + attemptTeamWork: Boolean) extends Dispatcher(_prerequisites, _id, throughput, throughputDeadlineTime, mailboxType, _executorServiceFactoryProvider, _shutdownTimeout) { val buddies = new ConcurrentSkipListSet[ActorCell]( @@ -48,34 +44,12 @@ class BalancingDispatcher( case UnboundedMailbox() ⇒ new QueueBasedMessageQueue with UnboundedMessageQueueSemantics { final val queue = new ConcurrentLinkedQueue[Envelope] - - override def enqueue(receiver: ActorRef, handle: Envelope) = { - super.enqueue(receiver, handle) - _pressure.getAndIncrement() - } - - override def dequeue(): Envelope = - super.dequeue() match { - case null ⇒ null - case x ⇒ _pressure.getAndDecrement(); x - } } case BoundedMailbox(cap, timeout) ⇒ new QueueBasedMessageQueue with BoundedMessageQueueSemantics { final val queue = new LinkedBlockingQueue[Envelope](cap) final val pushTimeOut = timeout - - override def enqueue(receiver: ActorRef, handle: Envelope) = { - super.enqueue(receiver, handle) - _pressure.getAndIncrement() - } - - override def dequeue(): Envelope = - super.dequeue() match { - case null ⇒ null - case x ⇒ _pressure.getAndDecrement(); x - } } case other ⇒ throw new IllegalArgumentException("Only handles BoundedMailbox and UnboundedMailbox, but you specified [" + other + "]") @@ -83,8 +57,6 @@ class BalancingDispatcher( protected[akka] override def createMailbox(actor: ActorCell): Mailbox = new SharingMailbox(actor) - private val _pressure = new AtomicLong - class SharingMailbox(_actor: ActorCell) extends Mailbox(_actor) with DefaultSystemMessageQueue { final def enqueue(receiver: ActorRef, handle: Envelope) = messageQueue.enqueue(receiver, handle) @@ -123,11 +95,15 @@ class BalancingDispatcher( override protected[akka] def dispatch(receiver: ActorCell, invocation: Envelope) = { messageQueue.enqueue(receiver.self, invocation) - if (!registerForExecution(receiver.mailbox, false, false) && - buddyWakeupThreshold >= 0 && - _pressure.get >= buddyWakeupThreshold) scheduleOne() + if (!registerForExecution(receiver.mailbox, false, false) && doTeamWork) scheduleOne() } + protected def doTeamWork(): Boolean = + attemptTeamWork && (executorService.get().executor match { + case lm: LoadMetrics ⇒ lm.atFullThrottle == false + case other ⇒ true + }) + @tailrec private def scheduleOne(i: Iterator[ActorCell] = buddies.iterator): Unit = if (i.hasNext && !registerForExecution(i.next.mailbox, false, false)) scheduleOne(i) -} +} \ No newline at end of file diff --git a/akka-actor/src/main/scala/akka/dispatch/Dispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/Dispatcher.scala index a735ea367e..2046f02286 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Dispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Dispatcher.scala @@ -32,12 +32,11 @@ class Dispatcher( val shutdownTimeout: Duration) extends MessageDispatcher(_prerequisites) { - protected[akka] val executorServiceFactory: ExecutorServiceFactory = + protected val executorServiceFactory: ExecutorServiceFactory = executorServiceFactoryProvider.createExecutorServiceFactory(id, prerequisites.threadFactory) - protected[akka] val executorService = new AtomicReference[ExecutorService](new ExecutorServiceDelegate { - lazy val executor = executorServiceFactory.createExecutorService - }) + protected val executorService = new AtomicReference[ExecutorServiceDelegate]( + new ExecutorServiceDelegate { lazy val executor = executorServiceFactory.createExecutorService }) protected[akka] def dispatch(receiver: ActorCell, invocation: Envelope) = { val mbox = receiver.mailbox diff --git a/akka-actor/src/main/scala/akka/dispatch/Dispatchers.scala b/akka-actor/src/main/scala/akka/dispatch/Dispatchers.scala index b9fd3f784b..5f4528146d 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Dispatchers.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Dispatchers.scala @@ -190,7 +190,7 @@ class BalancingDispatcherConfigurator(config: Config, prerequisites: DispatcherP Duration(config.getNanoseconds("throughput-deadline-time"), TimeUnit.NANOSECONDS), mailboxType, configureExecutor(), Duration(config.getMilliseconds("shutdown-timeout"), TimeUnit.MILLISECONDS), - config.getInt("buddy-wakeup-threshold")) + config.getBoolean("attempt-teamwork")) /** * Returns the same dispatcher instance for each invocation diff --git a/akka-actor/src/main/scala/akka/dispatch/ThreadPoolBuilder.scala b/akka-actor/src/main/scala/akka/dispatch/ThreadPoolBuilder.scala index 1c63831013..b6fd432296 100644 --- a/akka-actor/src/main/scala/akka/dispatch/ThreadPoolBuilder.scala +++ b/akka-actor/src/main/scala/akka/dispatch/ThreadPoolBuilder.scala @@ -81,14 +81,16 @@ case class ThreadPoolConfig(allowCorePoolTimeout: Boolean = ThreadPoolConfig.def extends ExecutorServiceFactoryProvider { class ThreadPoolExecutorServiceFactory(val threadFactory: ThreadFactory) extends ExecutorServiceFactory { def createExecutorService: ExecutorService = { - val service = new ThreadPoolExecutor( + val service: ThreadPoolExecutor = new ThreadPoolExecutor( corePoolSize, maxPoolSize, threadTimeout.length, threadTimeout.unit, queueFactory(), threadFactory, - rejectionPolicy) + rejectionPolicy) with LoadMetrics { + def atFullThrottle(): Boolean = this.getActiveCount >= this.getPoolSize + } service.allowCoreThreadTimeOut(allowCorePoolTimeout) service } @@ -182,7 +184,7 @@ case class MonitorableThreadFactory(name: String, protected def wire[T <: Thread](t: T): T = { t.setUncaughtExceptionHandler(exceptionHandler) t.setDaemon(daemonic) - contextClassLoader foreach (t.setContextClassLoader(_)) + contextClassLoader foreach t.setContextClassLoader t } } From 89cf7aa2f003740d0c53e2c30f3b0fcc149a4ae9 Mon Sep 17 00:00:00 2001 From: Roland Date: Mon, 13 Feb 2012 20:16:54 +0100 Subject: [PATCH 10/11] =?UTF-8?q?unborking=20compile=20in=20akka-cluster?= =?UTF-8?q?=20(sorry=20=E2=80=A6)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../test/scala/akka/cluster/AccrualFailureDetectorSpec.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/akka-cluster/src/test/scala/akka/cluster/AccrualFailureDetectorSpec.scala b/akka-cluster/src/test/scala/akka/cluster/AccrualFailureDetectorSpec.scala index f611fc9812..4aab105273 100644 --- a/akka-cluster/src/test/scala/akka/cluster/AccrualFailureDetectorSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/AccrualFailureDetectorSpec.scala @@ -9,7 +9,7 @@ class AccrualFailureDetectorSpec extends AkkaSpec(""" """) { "An AccrualFailureDetector" must { - val conn = Address("akka", "", Some("localhost"), Some(2552)) + val conn = Address("akka", "", "localhost", 2552) "mark node as available after a series of successful heartbeats" in { val fd = new AccrualFailureDetector(system) From 11f067abfc8817cad13a6ed407b1b310381bff6d Mon Sep 17 00:00:00 2001 From: Roland Date: Mon, 13 Feb 2012 20:42:14 +0100 Subject: [PATCH 11/11] fix DefaultResizer.pressure, make ResizerSpec less flaky MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - also clean up left-over reference to “buddy-wakeup-threshold” --- .../src/test/scala/akka/config/ConfigSpec.scala | 2 +- .../src/test/scala/akka/routing/ResizerSpec.scala | 7 +++---- akka-actor/src/main/scala/akka/routing/Routing.scala | 2 +- 3 files changed, 5 insertions(+), 6 deletions(-) diff --git a/akka-actor-tests/src/test/scala/akka/config/ConfigSpec.scala b/akka-actor-tests/src/test/scala/akka/config/ConfigSpec.scala index 13bb3b4f27..127907412e 100644 --- a/akka-actor-tests/src/test/scala/akka/config/ConfigSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/config/ConfigSpec.scala @@ -53,7 +53,7 @@ class ConfigSpec extends AkkaSpec(ConfigFactory.defaultReference) { c.getMilliseconds("shutdown-timeout") must equal(1 * 1000) c.getInt("throughput") must equal(5) c.getMilliseconds("throughput-deadline-time") must equal(0) - c.getInt("buddy-wakeup-threshold") must equal(5) + c.getBoolean("attempt-teamwork") must equal(true) } //Fork join executor config diff --git a/akka-actor-tests/src/test/scala/akka/routing/ResizerSpec.scala b/akka-actor-tests/src/test/scala/akka/routing/ResizerSpec.scala index b9765c8e92..1f78c64edf 100644 --- a/akka-actor-tests/src/test/scala/akka/routing/ResizerSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/routing/ResizerSpec.scala @@ -27,7 +27,6 @@ object ResizerSpec { } bal-disp { type = BalancingDispatcher - buddy-wakeup-threshold = 1 } """ @@ -190,7 +189,7 @@ class ResizerSpec extends AkkaSpec(ResizerSpec.config) with DefaultTimeout with def loop(loops: Int, d: Duration) = { for (m ← 0 until loops) router ! d - for (m ← 0 until loops) expectMsg(d * 2, "done") + for (m ← 0 until loops) expectMsg(d * 3, "done") } // 2 more should go thru without triggering more @@ -199,8 +198,8 @@ class ResizerSpec extends AkkaSpec(ResizerSpec.config) with DefaultTimeout with routees(router) must be(3) // a whole bunch should max it out - loop(4, 500 millis) - awaitCond(routees(router) == 4) + loop(10, 500 millis) + awaitCond(routees(router) > 3) loop(10, 500 millis) awaitCond(routees(router) == 5) diff --git a/akka-actor/src/main/scala/akka/routing/Routing.scala b/akka-actor/src/main/scala/akka/routing/Routing.scala index b050a21b53..4ff6609255 100644 --- a/akka-actor/src/main/scala/akka/routing/Routing.scala +++ b/akka-actor/src/main/scala/akka/routing/Routing.scala @@ -1059,7 +1059,7 @@ case class DefaultResizer( case a: LocalActorRef ⇒ val cell = a.underlying pressureThreshold match { - case 1 ⇒ cell.mailbox.isScheduled && cell.currentMessage != null + case 1 ⇒ cell.mailbox.isScheduled && cell.mailbox.hasMessages case i if i < 1 ⇒ cell.mailbox.isScheduled && cell.currentMessage != null case threshold ⇒ cell.mailbox.numberOfMessages >= threshold }