diff --git a/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala index 93c7720b86..71d79c580b 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala @@ -419,7 +419,7 @@ abstract class ActorModelSpec extends JUnitSuite { case actor: LocalActorRef ⇒ val cell = actor.underlying val mbox = cell.mailbox - System.err.println("Left in the registry: " + actor.address + " => " + cell + " => " + mbox.hasMessages + " " + mbox.hasSystemMessages + " " + mbox.numberOfMessages + " " + mbox.dispatcherLock.locked) + System.err.println("Left in the registry: " + actor.address + " => " + cell + " => " + mbox.hasMessages + " " + mbox.hasSystemMessages + " " + mbox.numberOfMessages + " " + mbox.isScheduled) var message = mbox.dequeue() while (message ne null) { System.err.println("Lingering message for " + cell + " " + message) diff --git a/akka-actor/src/main/scala/akka/dispatch/MessageHandling.scala b/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala similarity index 99% rename from akka-actor/src/main/scala/akka/dispatch/MessageHandling.scala rename to akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala index 41dd64847b..39e421cdc9 100644 --- a/akka-actor/src/main/scala/akka/dispatch/MessageHandling.scala +++ b/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala @@ -88,7 +88,6 @@ abstract class MessageDispatcher extends Serializable { protected[akka] val deadLetterMailbox: Mailbox = DeadLetterMailbox object DeadLetterMailbox extends Mailbox { - dispatcherLock.tryLock() become(Mailbox.Closed) override def dispatcher = null //MessageDispatcher.this override def enqueue(envelope: Envelope) { envelope.channel sendException new ActorKilledException("Actor has been stopped") } @@ -252,7 +251,7 @@ abstract class MessageDispatcher extends Serializable { */ def resume(actor: ActorCell): Unit = if (uuids.contains(actor.uuid)) { val mbox = actor.mailbox - mbox.become(Mailbox.Open) + mbox.become(Mailbox.Idle) registerForExecution(mbox, false, false) } diff --git a/akka-actor/src/main/scala/akka/dispatch/Dispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/Dispatcher.scala index 40303b0015..04ee11b1b6 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Dispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Dispatcher.scala @@ -127,14 +127,14 @@ class Dispatcher( */ protected[akka] override def registerForExecution(mbox: Mailbox, hasMessageHint: Boolean, hasSystemMessageHint: Boolean): Boolean = { if (mbox.shouldBeRegisteredForExecution(hasMessageHint, hasSystemMessageHint)) { //This needs to be here to ensure thread safety and no races - if (mbox.dispatcherLock.tryLock()) { + if (mbox.setAsScheduled()) { try { executorService.get() execute mbox true } catch { case e: RejectedExecutionException ⇒ EventHandler.warning(this, e.toString) - mbox.dispatcherLock.unlock() + mbox.setAsIdle() throw e } } else false diff --git a/akka-actor/src/main/scala/akka/dispatch/MailboxHandling.scala b/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala similarity index 67% rename from akka-actor/src/main/scala/akka/dispatch/MailboxHandling.scala rename to akka-actor/src/main/scala/akka/dispatch/Mailbox.scala index 0b572bb177..1fd3f6e80e 100644 --- a/akka-actor/src/main/scala/akka/dispatch/MailboxHandling.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala @@ -15,11 +15,18 @@ import annotation.tailrec class MessageQueueAppendFailedException(message: String, cause: Throwable = null) extends AkkaException(message, cause) -object Mailbox { +private[dispatch] object Mailbox { + type Status = Int - val Open = 0 - val Suspended = 1 - val Closed = 2 + + // primary status: only first three + final val Idle = 0 + final val Suspended = 1 + final val Closed = 2 + // secondary status: Idle or Suspended plus Scheduled + final val Scheduled = 3 + final val ScheduledSuspended = 4 // may only happen for system message processing + //private[Mailbox] val mailboxStatusUpdater = AtomicReferenceFieldUpdater.newUpdater[Mailbox, Status](classOf[Mailbox], classOf[Status], "_status") } @@ -31,14 +38,27 @@ abstract class Mailbox extends MessageQueue with SystemMessageQueue with Runnabl /* * Internal implementation of MessageDispatcher uses these, don't touch or rely on */ - final val dispatcherLock = new SimpleLock(startLocked = false) - val _status: AtomicInteger = new AtomicInteger(Open) //Must be named _status because of the updater + private[dispatch] val _status: AtomicInteger = new AtomicInteger(Idle) //Must be named _status because of the updater final def status: Mailbox.Status = _status.get() //mailboxStatusUpdater.get(this) - final def isSuspended: Boolean = status == Suspended + final def isActive: Boolean = status match { + case Idle | Scheduled ⇒ true + case _ ⇒ false + } + + // FIXME: this does not seem to be used anywhere, remove? + final def isSuspended: Boolean = status match { + case Suspended | ScheduledSuspended ⇒ true + case _ ⇒ false + } + final def isClosed: Boolean = status == Closed - final def isOpen: Boolean = status == Open + + final def isScheduled: Boolean = status match { + case Scheduled | ScheduledSuspended ⇒ true + case _ ⇒ false + } /** * Internal method to enforce a volatile write of the status @@ -50,28 +70,61 @@ abstract class Mailbox extends MessageQueue with SystemMessageQueue with Runnabl else acknowledgeStatus() } - def become(newStatus: Status): Boolean = { - @tailrec - def transcend(): Boolean = { - val current = _status.get() - if (current == Closed) { _status.set(Closed); false } //Ensure that the write is always performed - else if (_status.compareAndSet(current, newStatus)) true - else transcend() + /** + * set new primary status: Idle, Suspended or Closed. Caller does not need to + * worry about whether status was Scheduled or not. + */ + @tailrec + final def become(newStatus: Status): Boolean = { + newStatus match { + case Idle ⇒ + status match { + case s @ (Idle | Scheduled) ⇒ if (_status.compareAndSet(s, s)) true else become(newStatus) + case s @ Suspended ⇒ if (_status.compareAndSet(s, Idle)) true else become(newStatus) + case s @ ScheduledSuspended ⇒ if (_status.compareAndSet(s, Scheduled)) true else become(newStatus) + case s @ Closed ⇒ _status.set(Closed); false + } + case Suspended ⇒ + status match { + case s @ (Suspended | ScheduledSuspended) ⇒ if (_status.compareAndSet(s, s)) true else become(newStatus) + case s @ Idle ⇒ if (_status.compareAndSet(s, Suspended)) true else become(newStatus) + case s @ Scheduled ⇒ if (_status.compareAndSet(s, ScheduledSuspended)) true else become(newStatus) + case s @ Closed ⇒ _status.set(Closed); false + } + case Closed ⇒ + status match { + case s @ Closed ⇒ _status.set(Closed); false + case s ⇒ if (_status.compareAndSet(s, Closed)) true else become(newStatus) + } } - transcend() - } //mailboxStatusUpdater.set(this, newStatus) + } + + @tailrec + final def setAsScheduled(): Boolean = status match { + case s @ Idle ⇒ if (_status.compareAndSet(s, Scheduled)) true else setAsScheduled() + case s @ Suspended ⇒ if (_status.compareAndSet(s, ScheduledSuspended)) true else setAsScheduled() + case _ ⇒ false + } + + @tailrec + final def setAsIdle(): Boolean = status match { + case s @ Scheduled ⇒ if (_status.compareAndSet(s, Idle)) true else setAsIdle() + case s @ ScheduledSuspended ⇒ if (_status.compareAndSet(s, Suspended)) true else setAsIdle() + case s ⇒ acknowledgeStatus(); false + // TODO: I think this write is needed to make memory consistent after processMailbox(), but someone else should check + } def shouldBeRegisteredForExecution(hasMessageHint: Boolean, hasSystemMessageHint: Boolean): Boolean = status match { - case `Open` ⇒ hasMessageHint || hasSystemMessageHint || hasSystemMessages || hasMessages - case `Closed` ⇒ false - case `Suspended` ⇒ hasSystemMessageHint || hasSystemMessages + case Idle | Scheduled ⇒ hasMessageHint || hasSystemMessageHint || hasSystemMessages || hasMessages + case Closed ⇒ false + case Suspended | ScheduledSuspended ⇒ hasSystemMessageHint || hasSystemMessages } final def run = { try { processMailbox() } catch { case ie: InterruptedException ⇒ Thread.currentThread().interrupt() //Restore interrupt } finally { - dispatcherLock.unlock() + setAsIdle() dispatcher.registerForExecution(this, false, false) } } @@ -84,7 +137,7 @@ abstract class Mailbox extends MessageQueue with SystemMessageQueue with Runnabl final def processMailbox() { processAllSystemMessages() - if (isOpen) { + if (isActive) { var nextMessage = dequeue() if (nextMessage ne null) { //If we have a message if (dispatcher.throughput <= 1) { //If we only run one message per process { @@ -100,7 +153,7 @@ abstract class Mailbox extends MessageQueue with SystemMessageQueue with Runnabl processAllSystemMessages() - nextMessage = if (isOpen) { // If we aren't suspended, we need to make sure we're not overstepping our boundaries + nextMessage = if (isActive) { // If we aren't suspended, we need to make sure we're not overstepping our boundaries processedMessages += 1 if ((processedMessages >= dispatcher.throughput) || (isDeadlineEnabled && System.nanoTime >= deadlineNs)) // If we're throttled, break out null //We reached our boundaries, abort