Making sure that you cannot go from Mailbox.Closed to anything else
This commit is contained in:
parent
29a327aba4
commit
e4947de718
2 changed files with 14 additions and 6 deletions
|
|
@ -10,7 +10,8 @@ import akka.util._
|
|||
import java.util.Queue
|
||||
import akka.actor.ActorContext
|
||||
import java.util.concurrent._
|
||||
import atomic.AtomicReferenceFieldUpdater
|
||||
import atomic.{ AtomicInteger, AtomicReferenceFieldUpdater }
|
||||
import annotation.tailrec
|
||||
|
||||
class MessageQueueAppendFailedException(message: String, cause: Throwable = null) extends AkkaException(message, cause)
|
||||
|
||||
|
|
@ -31,16 +32,24 @@ abstract class Mailbox extends MessageQueue with SystemMessageQueue with Runnabl
|
|||
* Internal implementation of MessageDispatcher uses these, don't touch or rely on
|
||||
*/
|
||||
final val dispatcherLock = new SimpleLock(startLocked = false)
|
||||
@volatile
|
||||
var _status: Status = Open //Must be named _status because of the updater
|
||||
val _status: AtomicInteger = new AtomicInteger(Open) //Must be named _status because of the updater
|
||||
|
||||
final def status: Mailbox.Status = _status //mailboxStatusUpdater.get(this)
|
||||
final def status: Mailbox.Status = _status.get() //mailboxStatusUpdater.get(this)
|
||||
|
||||
final def isSuspended: Boolean = status == Suspended
|
||||
final def isClosed: Boolean = status == Closed
|
||||
final def isOpen: Boolean = status == Open
|
||||
|
||||
def become(newStatus: Status) = _status = newStatus //mailboxStatusUpdater.set(this, newStatus)
|
||||
def become(newStatus: Status): Boolean = {
|
||||
@tailrec
|
||||
def transcend(): Boolean = {
|
||||
val current = _status.get()
|
||||
if (current == Closed) { _status.set(Closed); false } //Ensure that the write is always performed
|
||||
else if (_status.compareAndSet(current, newStatus)) true
|
||||
else transcend()
|
||||
}
|
||||
transcend()
|
||||
} //mailboxStatusUpdater.set(this, newStatus)
|
||||
|
||||
def shouldBeRegisteredForExecution(hasMessageHint: Boolean, hasSystemMessageHint: Boolean): Boolean = status match {
|
||||
case `Open` ⇒ hasMessageHint || hasSystemMessageHint || hasSystemMessages || hasMessages
|
||||
|
|
|
|||
|
|
@ -87,7 +87,6 @@ abstract class MessageDispatcher extends Serializable {
|
|||
object DeadLetterMailbox extends Mailbox {
|
||||
dispatcherLock.tryLock()
|
||||
become(Mailbox.Closed)
|
||||
override def become(newStatus: Mailbox.Status) { super.become(Mailbox.Closed) } //Always transcend to CLOSED to preserve the volatile write
|
||||
override def dispatcher = null //MessageDispatcher.this
|
||||
override def enqueue(envelope: Envelope) { envelope.channel sendException new ActorKilledException("Actor has been stopped") }
|
||||
override def dequeue() = null
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue