diff --git a/akka-actor/src/main/scala/akka/dispatch/MailboxHandling.scala b/akka-actor/src/main/scala/akka/dispatch/MailboxHandling.scala index c2af68d995..e96f4e451f 100644 --- a/akka-actor/src/main/scala/akka/dispatch/MailboxHandling.scala +++ b/akka-actor/src/main/scala/akka/dispatch/MailboxHandling.scala @@ -10,7 +10,8 @@ import akka.util._ import java.util.Queue import akka.actor.ActorContext import java.util.concurrent._ -import atomic.AtomicReferenceFieldUpdater +import atomic.{ AtomicInteger, AtomicReferenceFieldUpdater } +import annotation.tailrec class MessageQueueAppendFailedException(message: String, cause: Throwable = null) extends AkkaException(message, cause) @@ -31,16 +32,24 @@ abstract class Mailbox extends MessageQueue with SystemMessageQueue with Runnabl * Internal implementation of MessageDispatcher uses these, don't touch or rely on */ final val dispatcherLock = new SimpleLock(startLocked = false) - @volatile - var _status: Status = Open //Must be named _status because of the updater + val _status: AtomicInteger = new AtomicInteger(Open) //Must be named _status because of the updater - final def status: Mailbox.Status = _status //mailboxStatusUpdater.get(this) + final def status: Mailbox.Status = _status.get() //mailboxStatusUpdater.get(this) final def isSuspended: Boolean = status == Suspended final def isClosed: Boolean = status == Closed final def isOpen: Boolean = status == Open - def become(newStatus: Status) = _status = newStatus //mailboxStatusUpdater.set(this, newStatus) + def become(newStatus: Status): Boolean = { + @tailrec + def transcend(): Boolean = { + val current = _status.get() + if (current == Closed) { _status.set(Closed); false } //Ensure that the write is always performed + else if (_status.compareAndSet(current, newStatus)) true + else transcend() + } + transcend() + } //mailboxStatusUpdater.set(this, newStatus) def shouldBeRegisteredForExecution(hasMessageHint: Boolean, hasSystemMessageHint: Boolean): Boolean = status match { case `Open` ⇒ hasMessageHint || hasSystemMessageHint || hasSystemMessages || hasMessages diff --git a/akka-actor/src/main/scala/akka/dispatch/MessageHandling.scala b/akka-actor/src/main/scala/akka/dispatch/MessageHandling.scala index b1a0ada5af..cff7f82066 100644 --- a/akka-actor/src/main/scala/akka/dispatch/MessageHandling.scala +++ b/akka-actor/src/main/scala/akka/dispatch/MessageHandling.scala @@ -87,7 +87,6 @@ abstract class MessageDispatcher extends Serializable { object DeadLetterMailbox extends Mailbox { dispatcherLock.tryLock() become(Mailbox.Closed) - override def become(newStatus: Mailbox.Status) { super.become(Mailbox.Closed) } //Always transcend to CLOSED to preserve the volatile write override def dispatcher = null //MessageDispatcher.this override def enqueue(envelope: Envelope) { envelope.channel sendException new ActorKilledException("Actor has been stopped") } override def dequeue() = null