From 288287e1820784882e586fed28d62299b2768de1 Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Fri, 23 Sep 2011 16:21:57 +0200 Subject: [PATCH] Partial fix for the raciness of BalancingDispatcher --- .../akka/actor/dispatch/ActorModelSpec.scala | 10 ++++- .../src/main/scala/akka/actor/ActorCell.scala | 5 ++- .../akka/dispatch/BalancingDispatcher.scala | 37 ++++++++++--------- .../scala/akka/dispatch/MailboxHandling.scala | 8 ++-- .../scala/akka/dispatch/MessageHandling.scala | 35 +++++++++--------- 5 files changed, 50 insertions(+), 45 deletions(-) diff --git a/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala index 3423ec505e..25bef4171a 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala @@ -120,14 +120,20 @@ object ActorModelSpec { protected[akka] abstract override def register(actor: ActorCell) { super.register(actor) + //printMembers("after registering " + actor) getStats(actor.ref).registers.incrementAndGet() } protected[akka] abstract override def unregister(actor: ActorCell) { super.unregister(actor) + //printMembers("after unregistering " + actor) getStats(actor.ref).unregisters.incrementAndGet() } + def printMembers(when: String) { + System.err.println(when + " then " + uuids.toArray.toList.map(_.toString.split("-")(0)).mkString("==> ", ", ", "<==")) + } + protected[akka] abstract override def dispatch(invocation: Envelope) { getStats(invocation.receiver.ref).msgsReceived.incrementAndGet() super.dispatch(invocation) @@ -395,12 +401,12 @@ abstract class ActorModelSpec extends JUnitSuite { assertCountDown(cachedMessage.latch, Testing.testTime(10000), "Should process " + num + " countdowns") } catch { case e ⇒ - System.err.println("Error: " + e.getMessage + " when count was: " + cachedMessage.latch.getCount()) + System.err.println("Error: " + e.getMessage + " when count was: " + cachedMessage.latch.getCount() + " expected " + num) //EventHandler.error(new Exception with NoStackTrace, null, cachedMessage.latch.getCount()) } } for (run ← 1 to 3) { - flood(10) + flood(10000) assertDispatcher(dispatcher)(starts = run, stops = run) } } diff --git a/akka-actor/src/main/scala/akka/actor/ActorCell.scala b/akka-actor/src/main/scala/akka/actor/ActorCell.scala index c4c6775d7c..bb0503ecd4 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorCell.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorCell.scala @@ -88,6 +88,7 @@ private[akka] class ActorCell( @volatile var receiveTimeout: Option[Long] = _receiveTimeout // TODO: currently settable from outside for compatibility + @volatile var currentMessage: Envelope = null val actor: AtomicReference[Actor] = new AtomicReference[Actor]() //FIXME We can most probably make this just a regular reference to Actor @@ -182,7 +183,7 @@ private[akka] class ActorCell( def postMessageToMailbox(message: Any, channel: UntypedChannel): Unit = if (isRunning) dispatcher dispatchMessage new Envelope(this, message, channel) - else throw new ActorInitializationException("Actor " + self + " is dead") + //else throw new ActorInitializationException("Actor " + self + " is dead") def postMessageToMailboxAndCreateFutureResultWithTimeout( message: Any, @@ -194,7 +195,7 @@ private[akka] class ActorCell( } dispatcher dispatchMessage new Envelope(this, message, future) future - } else throw new ActorInitializationException("Actor " + self + " is dead") + } else new KeptPromise[Any](Left(new ActorKilledException("Stopped"))) // else throw new ActorInitializationException("Actor " + self + " is dead") def sender: Option[ActorRef] = currentMessage match { case null ⇒ None diff --git a/akka-actor/src/main/scala/akka/dispatch/BalancingDispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/BalancingDispatcher.scala index fd6b8e415b..99a5f13f33 100644 --- a/akka-actor/src/main/scala/akka/dispatch/BalancingDispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/BalancingDispatcher.scala @@ -6,8 +6,9 @@ package akka.dispatch import util.DynamicVariable import akka.actor.{ ActorCell, Actor, IllegalActorStateException } -import java.util.Queue import java.util.concurrent.{ LinkedBlockingQueue, ConcurrentLinkedQueue, ConcurrentSkipListSet } +import java.util.{ Comparator, Queue } +import annotation.tailrec /** * An executor based event driven dispatcher which will try to redistribute work from busy actors to idle actors. It is assumed @@ -51,7 +52,7 @@ class BalancingDispatcher( def this(_name: String, mailboxType: MailboxType) = this(_name, Dispatchers.THROUGHPUT, Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS, mailboxType) // Needed for Java API usage - private val buddies = new ConcurrentLinkedQueue[ActorCell]() + private val buddies = new ConcurrentSkipListSet[ActorCell](new Comparator[ActorCell] { def compare(a: ActorCell, b: ActorCell) = a.uuid.compareTo(b.uuid) }) //new ConcurrentLinkedQueue[ActorCell]() protected val messageQueue: MessageQueue = mailboxType match { case u: UnboundedMailbox ⇒ new QueueBasedMessageQueue with UnboundedMessageQueueSemantics { @@ -73,10 +74,8 @@ class BalancingDispatcher( final def dequeue(): Envelope = { val envelope = messageQueue.dequeue() - if (envelope eq null) { - buddies.add(actor) - null - } else if (envelope.receiver eq actor) envelope + if (envelope eq null) null + else if (envelope.receiver eq actor) envelope else envelope.copy(receiver = actor) } @@ -89,29 +88,27 @@ class BalancingDispatcher( protected[akka] override def register(actor: ActorCell) = { super.register(actor) - buddies.add(actor) + registerForExecution(actor.mailbox, false, false) } protected[akka] override def unregister(actor: ActorCell) = { - buddies.remove(actor) super.unregister(actor) + buddies.remove(actor) } protected override def cleanUpMailboxFor(actor: ActorCell, mailBox: Mailbox) { - val m = mailBox - - if (m.hasSystemMessages) { - var envelope = m.systemDequeue() + if (mailBox.hasSystemMessages) { + var envelope = mailBox.systemDequeue() while (envelope ne null) { - deadLetterMailbox.systemEnqueue(envelope) - envelope = m.systemDequeue() + deadLetterMailbox.systemEnqueue(envelope) //Send to dead letter queue + envelope = mailBox.systemDequeue() } } } protected[akka] override def registerForExecution(mbox: Mailbox, hasMessagesHint: Boolean, hasSystemMessagesHint: Boolean): Boolean = { if (!super.registerForExecution(mbox, hasMessagesHint, hasSystemMessagesHint)) { - if (mbox.isInstanceOf[SharingMailbox]) buddies.add(mbox.asInstanceOf[SharingMailbox].actor) + if (!mbox.isClosed && mbox.isInstanceOf[SharingMailbox]) buddies.add(mbox.asInstanceOf[SharingMailbox].actor) false } else true } @@ -120,9 +117,13 @@ class BalancingDispatcher( val receiver = invocation.receiver messageQueue enqueue invocation - buddies.poll() match { - case null | `receiver` ⇒ registerForExecution(receiver.mailbox, true, false) - case buddy ⇒ registerForExecution(buddy.mailbox, true, false) + @tailrec + def getValidBuddy(): ActorCell = buddies.pollFirst() match { + case null | `receiver` ⇒ receiver + case buddy if buddy.mailbox.isOpen ⇒ buddy + case _ ⇒ getValidBuddy } + + registerForExecution(getValidBuddy().mailbox, true, false) } } diff --git a/akka-actor/src/main/scala/akka/dispatch/MailboxHandling.scala b/akka-actor/src/main/scala/akka/dispatch/MailboxHandling.scala index 26ca534b9c..64426a8e61 100644 --- a/akka-actor/src/main/scala/akka/dispatch/MailboxHandling.scala +++ b/akka-actor/src/main/scala/akka/dispatch/MailboxHandling.scala @@ -45,7 +45,7 @@ abstract class Mailbox extends MessageQueue with SystemMessageQueue with Runnabl def shouldBeRegisteredForExecution(hasMessageHint: Boolean, hasSystemMessageHint: Boolean): Boolean = status match { case CLOSED ⇒ false - case OPEN ⇒ hasMessageHint || hasSystemMessages || hasMessages + case OPEN ⇒ hasMessageHint || hasSystemMessageHint || hasSystemMessages || hasMessages case SUSPENDED ⇒ hasSystemMessageHint || hasSystemMessages } @@ -82,14 +82,12 @@ abstract class Mailbox extends MessageQueue with SystemMessageQueue with Runnabl processAllSystemMessages() - nextMessage = if (status != OPEN) { - null // If we are suspended, abort - } else { // If we aren't suspended, we need to make sure we're not overstepping our boundaries + nextMessage = if (status == OPEN) { // If we aren't suspended, we need to make sure we're not overstepping our boundaries processedMessages += 1 if ((processedMessages >= dispatcher.throughput) || (isDeadlineEnabled && System.nanoTime >= deadlineNs)) // If we're throttled, break out null //We reached our boundaries, abort else dequeue //Dequeue the next message - } + } else null //Abort } while (nextMessage ne null) } } diff --git a/akka-actor/src/main/scala/akka/dispatch/MessageHandling.scala b/akka-actor/src/main/scala/akka/dispatch/MessageHandling.scala index 36ca250770..2628847caf 100644 --- a/akka-actor/src/main/scala/akka/dispatch/MessageHandling.scala +++ b/akka-actor/src/main/scala/akka/dispatch/MessageHandling.scala @@ -20,7 +20,6 @@ final case class Envelope(val receiver: ActorCell, val message: Any, val channel if (receiver eq null) throw new IllegalArgumentException("Receiver can't be null") final def invoke() { - System.err.println("Invoking message [" + message + "] for " + receiver + " with channel " + channel) receiver invoke this } } @@ -38,7 +37,6 @@ final case class SystemEnvelope(val receiver: ActorCell, val message: SystemMess * @return whether to proceed with processing other messages */ final def invoke(): Unit = { - System.err.println("Invoking System message [" + message + "] for " + receiver + " with channel " + channel) receiver systemInvoke this } } @@ -84,12 +82,13 @@ abstract class MessageDispatcher extends Serializable { /** * Create a blackhole mailbox for the purpose of replacing the real one upon actor termination */ - protected[akka] val deadLetterMailbox = new Mailbox { + protected[akka] val deadLetterMailbox: Mailbox = DeadLetterMailbox + + object DeadLetterMailbox extends Mailbox { + dispatcherLock.tryLock() become(Mailbox.CLOSED) override def become(newStatus: Mailbox.Status) { super.become(Mailbox.CLOSED) } //Always transcend to CLOSED to preserve the volatile write override def dispatcher = null //MessageDispatcher.this - dispatcherLock.tryLock() - override def enqueue(envelope: Envelope) { envelope.channel sendException new ActorKilledException("Actor has been stopped") } override def dequeue() = null override def systemEnqueue(handle: SystemEnvelope): Unit = () @@ -165,18 +164,17 @@ abstract class MessageDispatcher extends Serializable { * and only call it under the dispatcher-guard, see "attach" for the only invocation */ protected[akka] def register(actor: ActorCell): Unit = { - if (actor.mailbox eq null) { - val mbox = createMailbox(actor) - actor.mailbox = mbox - systemDispatch(SystemEnvelope(actor, Create, NullChannel)) - } - - uuids add actor.uuid - if (active.isOff) { - active.switchOn { - start() + if (uuids add actor.uuid) { + if (actor.mailbox eq null) { + actor.mailbox = createMailbox(actor) + systemDispatch(SystemEnvelope(actor, Create, NullChannel)) } - } + if (active.isOff) { + active.switchOn { + start() + } + } + } else System.err.println("Couldn't register: " + actor) } /** @@ -186,7 +184,8 @@ abstract class MessageDispatcher extends Serializable { protected[akka] def unregister(actor: ActorCell): Unit = { if (uuids remove actor.uuid) { val mailBox = actor.mailbox - actor.mailbox = deadLetterMailbox //FIXME switch to getAndSet semantics + mailBox.become(Mailbox.CLOSED) + actor.mailbox = deadLetterMailbox //FIXME getAndSet would be preferrable here cleanUpMailboxFor(actor, mailBox) if (uuids.isEmpty && _tasks.get == 0) { shutdownSchedule match { @@ -198,7 +197,7 @@ abstract class MessageDispatcher extends Serializable { case RESCHEDULED ⇒ //Already marked for reschedule } } - } + } else System.err.println("Couldn't unregister: " + actor) } /**