From 251a7cc7e399778d4e2765742ea7dcb69086e64b Mon Sep 17 00:00:00 2001 From: Roland Date: Mon, 13 Feb 2012 12:38:59 +0100 Subject: [PATCH] clean up BalancingDispatcher: - change from messageQueue.numberOfMessages to maintaining an AtomicLong for performance reasons - add comments/scaladoc where missing - remove some assert()s - fix ResiserSpec to employ buddy-wakeup-threshold --- .../akka/actor/dispatch/ActorModelSpec.scala | 4 +- .../test/scala/akka/routing/ResizerSpec.scala | 1 + .../src/main/scala/akka/actor/ActorCell.scala | 5 ++ .../akka/dispatch/AbstractDispatcher.scala | 2 +- .../akka/dispatch/BalancingDispatcher.scala | 55 +++++++++++++++---- .../main/scala/akka/dispatch/Mailbox.scala | 29 +++++++--- 6 files changed, 74 insertions(+), 22 deletions(-) diff --git a/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala index a735e7298b..15886973b2 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala @@ -352,7 +352,7 @@ abstract class ActorModelSpec(config: String) extends AkkaSpec(config) with Defa def flood(num: Int) { val cachedMessage = CountDownNStop(new CountDownLatch(num)) val stopLatch = new CountDownLatch(num) - val waitTime = (20 seconds).dilated.toMillis + val waitTime = (30 seconds).dilated.toMillis val boss = system.actorOf(Props(new Actor { def receive = { case "run" ⇒ for (_ ← 1 to num) (context.watch(context.actorOf(props))) ! cachedMessage @@ -369,7 +369,7 @@ abstract class ActorModelSpec(config: String) extends AkkaSpec(config) with Defa val buddies = dispatcher.buddies val mq = dispatcher.messageQueue - System.err.println("Buddies left: " + buddies.size + " stopLatch: " + stopLatch.getCount + " inhab:" + dispatcher.inhab) + System.err.println("Buddies left: " + buddies.size + " stopLatch: " + stopLatch.getCount + " inhab:" + dispatcher.inhabitants) buddies.toArray sorted new Ordering[AnyRef] { def compare(l: AnyRef, r: AnyRef) = (l, r) match { case (ll: ActorCell, rr: ActorCell) ⇒ ll.self.path.toString.compareTo(rr.self.path.toString) diff --git a/akka-actor-tests/src/test/scala/akka/routing/ResizerSpec.scala b/akka-actor-tests/src/test/scala/akka/routing/ResizerSpec.scala index 2130afe107..26b5021c18 100644 --- a/akka-actor-tests/src/test/scala/akka/routing/ResizerSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/routing/ResizerSpec.scala @@ -26,6 +26,7 @@ object ResizerSpec { } bal-disp { type = BalancingDispatcher + buddy-wakeup-threshold = 1 } """ diff --git a/akka-actor/src/main/scala/akka/actor/ActorCell.scala b/akka-actor/src/main/scala/akka/actor/ActorCell.scala index f4112e5d37..9268406086 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorCell.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorCell.scala @@ -290,6 +290,11 @@ private[akka] class ActorCell( // ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅ parent.sendSystemMessage(akka.dispatch.Supervise(self)) + /* + * attach before submitting the mailbox for the first time, because + * otherwise the actor could already be dead before the dispatcher is + * informed of its existence (with reversed attach/detach sequence). + */ dispatcher.attach(this) // ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅ diff --git a/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala index da0e4fdc6e..9d1575c4ec 100644 --- a/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala @@ -260,7 +260,7 @@ abstract class MessageDispatcher(val prerequisites: DispatcherPrerequisites) ext mailBox.cleanUp() } - def inhab = inhabitantsUpdater.get(this) + def inhabitants: Long = inhabitantsUpdater.get(this) private val shutdownAction = new Runnable { @tailrec diff --git a/akka-actor/src/main/scala/akka/dispatch/BalancingDispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/BalancingDispatcher.scala index 63fafef7d1..70101578f0 100644 --- a/akka-actor/src/main/scala/akka/dispatch/BalancingDispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/BalancingDispatcher.scala @@ -11,6 +11,8 @@ import annotation.tailrec import java.util.concurrent.atomic.AtomicBoolean import akka.util.{ Duration, Helpers } import java.util.{ Comparator, Iterator } +import akka.util.Unsafe +import java.util.concurrent.atomic.AtomicLong /** * An executor based event driven dispatcher which will try to redistribute work from busy actors to idle actors. It is assumed @@ -43,18 +45,46 @@ class BalancingDispatcher( })) val messageQueue: MessageQueue = mailboxType match { - case _: UnboundedMailbox ⇒ new QueueBasedMessageQueue with UnboundedMessageQueueSemantics { - final val queue = new ConcurrentLinkedQueue[Envelope] - } - case BoundedMailbox(cap, timeout) ⇒ new QueueBasedMessageQueue with BoundedMessageQueueSemantics { - final val queue = new LinkedBlockingQueue[Envelope](cap) - final val pushTimeOut = timeout - } + case UnboundedMailbox() ⇒ + new QueueBasedMessageQueue with UnboundedMessageQueueSemantics { + final val queue = new ConcurrentLinkedQueue[Envelope] + + override def enqueue(receiver: ActorRef, handle: Envelope) = { + super.enqueue(receiver, handle) + _pressure.getAndIncrement() + } + + override def dequeue(): Envelope = + super.dequeue() match { + case null ⇒ null + case x ⇒ _pressure.getAndDecrement(); x + } + } + + case BoundedMailbox(cap, timeout) ⇒ + new QueueBasedMessageQueue with BoundedMessageQueueSemantics { + final val queue = new LinkedBlockingQueue[Envelope](cap) + final val pushTimeOut = timeout + + override def enqueue(receiver: ActorRef, handle: Envelope) = { + super.enqueue(receiver, handle) + _pressure.getAndIncrement() + } + + override def dequeue(): Envelope = + super.dequeue() match { + case null ⇒ null + case x ⇒ _pressure.getAndDecrement(); x + } + } + case other ⇒ throw new IllegalArgumentException("Only handles BoundedMailbox and UnboundedMailbox, but you specified [" + other + "]") } protected[akka] override def createMailbox(actor: ActorCell): Mailbox = new SharingMailbox(actor) + private val _pressure = new AtomicLong + class SharingMailbox(_actor: ActorCell) extends Mailbox(_actor) with DefaultSystemMessageQueue { final def enqueue(receiver: ActorRef, handle: Envelope) = messageQueue.enqueue(receiver, handle) @@ -81,6 +111,11 @@ class BalancingDispatcher( } protected[akka] override def systemDispatch(receiver: ActorCell, invocation: SystemMessage): Unit = + /* + * need to filter out Create() messages here because BalancingDispatcher + * already enqueues this within register(), which is called first by the + * ActorCell. + */ invocation match { case Create() ⇒ case x ⇒ super.systemDispatch(receiver, invocation) @@ -91,13 +126,13 @@ class BalancingDispatcher( mbox.systemEnqueue(actor.self, Create()) // must make sure that Create() is the first message enqueued in this mailbox super.register(actor) - assert(buddies.add(actor)) + buddies.add(actor) // must make sure that buddy-add is executed before the actor has had a chance to die registerForExecution(mbox, false, true) } protected[akka] override def unregister(actor: ActorCell) = { - assert(buddies.remove(actor)) + buddies.remove(actor) super.unregister(actor) if (messageQueue.hasMessages) registerOne() } @@ -106,7 +141,7 @@ class BalancingDispatcher( messageQueue.enqueue(receiver.self, invocation) if (!registerForExecution(receiver.mailbox, false, false) && buddyWakeupThreshold >= 0 && - messageQueue.numberOfMessages >= buddyWakeupThreshold) registerOne() + _pressure.get >= buddyWakeupThreshold) registerOne() } @tailrec private def registerOne(i: Iterator[ActorCell] = buddies.iterator): Unit = diff --git a/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala b/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala index 32fce8564e..4c50cb5c8d 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala @@ -239,15 +239,26 @@ private[akka] abstract class Mailbox(val actor: ActorCell) extends MessageQueue } trait MessageQueue { - /* - * These method need to be implemented in subclasses; they should not rely on the internal stuff above. + /** + * Try to enqueue the message to this queue, or throw an exception. */ - def enqueue(receiver: ActorRef, handle: Envelope) + def enqueue(receiver: ActorRef, handle: Envelope): Unit // NOTE: receiver is used only in two places, but cannot be removed + /** + * Try to dequeue the next message from this queue, return null failing that. + */ def dequeue(): Envelope + /** + * Should return the current number of messages held in this queue; may + * always return 0 if no other value is available efficiently. Do not use + * this for testing for presence of messages, use `hasMessages` instead. + */ def numberOfMessages: Int + /** + * Indicates whether this queue is non-empty. + */ def hasMessages: Boolean } @@ -295,15 +306,15 @@ trait DefaultSystemMessageQueue { self: Mailbox ⇒ } trait UnboundedMessageQueueSemantics extends QueueBasedMessageQueue { - final def enqueue(receiver: ActorRef, handle: Envelope): Unit = queue add handle - final def dequeue(): Envelope = queue.poll() + def enqueue(receiver: ActorRef, handle: Envelope): Unit = queue add handle + def dequeue(): Envelope = queue.poll() } trait BoundedMessageQueueSemantics extends QueueBasedMessageQueue { def pushTimeOut: Duration override def queue: BlockingQueue[Envelope] - final def enqueue(receiver: ActorRef, handle: Envelope) { + def enqueue(receiver: ActorRef, handle: Envelope) { if (pushTimeOut.length > 0) { queue.offer(handle, pushTimeOut.length, pushTimeOut.unit) || { throw new MessageQueueAppendFailedException("Couldn't enqueue message " + handle + " to " + receiver) @@ -311,13 +322,13 @@ trait BoundedMessageQueueSemantics extends QueueBasedMessageQueue { } else queue put handle } - final def dequeue(): Envelope = queue.poll() + def dequeue(): Envelope = queue.poll() } trait QueueBasedMessageQueue extends MessageQueue { def queue: Queue[Envelope] - final def numberOfMessages = queue.size - final def hasMessages = !queue.isEmpty + def numberOfMessages = queue.size + def hasMessages = !queue.isEmpty } /**