fold Mailbox.dispatcherLock into _status
- gets rid of one field plus object plus allocation per mailbox - retained same semantics for both roles for now (someone may try to take atomic advantage of this unification later)
This commit is contained in:
parent
2b4868fbbb
commit
ca22e04a87
4 changed files with 80 additions and 28 deletions
|
|
@ -419,7 +419,7 @@ abstract class ActorModelSpec extends JUnitSuite {
|
|||
case actor: LocalActorRef ⇒
|
||||
val cell = actor.underlying
|
||||
val mbox = cell.mailbox
|
||||
System.err.println("Left in the registry: " + actor.address + " => " + cell + " => " + mbox.hasMessages + " " + mbox.hasSystemMessages + " " + mbox.numberOfMessages + " " + mbox.dispatcherLock.locked)
|
||||
System.err.println("Left in the registry: " + actor.address + " => " + cell + " => " + mbox.hasMessages + " " + mbox.hasSystemMessages + " " + mbox.numberOfMessages + " " + mbox.isScheduled)
|
||||
var message = mbox.dequeue()
|
||||
while (message ne null) {
|
||||
System.err.println("Lingering message for " + cell + " " + message)
|
||||
|
|
|
|||
|
|
@ -127,14 +127,14 @@ class Dispatcher(
|
|||
*/
|
||||
protected[akka] override def registerForExecution(mbox: Mailbox, hasMessageHint: Boolean, hasSystemMessageHint: Boolean): Boolean = {
|
||||
if (mbox.shouldBeRegisteredForExecution(hasMessageHint, hasSystemMessageHint)) { //This needs to be here to ensure thread safety and no races
|
||||
if (mbox.dispatcherLock.tryLock()) {
|
||||
if (mbox.setAsScheduled()) {
|
||||
try {
|
||||
executorService.get() execute mbox
|
||||
true
|
||||
} catch {
|
||||
case e: RejectedExecutionException ⇒
|
||||
EventHandler.warning(this, e.toString)
|
||||
mbox.dispatcherLock.unlock()
|
||||
mbox.setAsIdle()
|
||||
throw e
|
||||
}
|
||||
} else false
|
||||
|
|
|
|||
|
|
@ -15,11 +15,18 @@ import annotation.tailrec
|
|||
|
||||
class MessageQueueAppendFailedException(message: String, cause: Throwable = null) extends AkkaException(message, cause)
|
||||
|
||||
object Mailbox {
|
||||
private[dispatch] object Mailbox {
|
||||
|
||||
type Status = Int
|
||||
val Open = 0
|
||||
val Suspended = 1
|
||||
val Closed = 2
|
||||
|
||||
// primary status: only first three
|
||||
final val Idle = 0
|
||||
final val Suspended = 1
|
||||
final val Closed = 2
|
||||
// secondary status: Idle or Suspended plus Scheduled
|
||||
final val Scheduled = 3
|
||||
final val ScheduledSuspended = 4 // may only happen for system message processing
|
||||
|
||||
//private[Mailbox] val mailboxStatusUpdater = AtomicReferenceFieldUpdater.newUpdater[Mailbox, Status](classOf[Mailbox], classOf[Status], "_status")
|
||||
}
|
||||
|
||||
|
|
@ -31,14 +38,27 @@ abstract class Mailbox extends MessageQueue with SystemMessageQueue with Runnabl
|
|||
/*
|
||||
* Internal implementation of MessageDispatcher uses these, don't touch or rely on
|
||||
*/
|
||||
final val dispatcherLock = new SimpleLock(startLocked = false)
|
||||
val _status: AtomicInteger = new AtomicInteger(Open) //Must be named _status because of the updater
|
||||
private[dispatch] val _status: AtomicInteger = new AtomicInteger(Idle) //Must be named _status because of the updater
|
||||
|
||||
final def status: Mailbox.Status = _status.get() //mailboxStatusUpdater.get(this)
|
||||
|
||||
final def isSuspended: Boolean = status == Suspended
|
||||
final def isActive: Boolean = status match {
|
||||
case Idle | Scheduled ⇒ true
|
||||
case _ ⇒ false
|
||||
}
|
||||
|
||||
// FIXME: this does not seem to be used anywhere, remove?
|
||||
final def isSuspended: Boolean = status match {
|
||||
case Suspended | ScheduledSuspended ⇒ true
|
||||
case _ ⇒ false
|
||||
}
|
||||
|
||||
final def isClosed: Boolean = status == Closed
|
||||
final def isOpen: Boolean = status == Open
|
||||
|
||||
final def isScheduled: Boolean = status match {
|
||||
case Scheduled | ScheduledSuspended ⇒ true
|
||||
case _ ⇒ false
|
||||
}
|
||||
|
||||
/**
|
||||
* Internal method to enforce a volatile write of the status
|
||||
|
|
@ -50,28 +70,61 @@ abstract class Mailbox extends MessageQueue with SystemMessageQueue with Runnabl
|
|||
else acknowledgeStatus()
|
||||
}
|
||||
|
||||
def become(newStatus: Status): Boolean = {
|
||||
@tailrec
|
||||
def transcend(): Boolean = {
|
||||
val current = _status.get()
|
||||
if (current == Closed) { _status.set(Closed); false } //Ensure that the write is always performed
|
||||
else if (_status.compareAndSet(current, newStatus)) true
|
||||
else transcend()
|
||||
/**
|
||||
* set new primary status: Idle, Suspended or Closed. Caller does not need to
|
||||
* worry about whether status was Scheduled or not.
|
||||
*/
|
||||
@tailrec
|
||||
final def become(newStatus: Status): Boolean = {
|
||||
newStatus match {
|
||||
case Idle ⇒
|
||||
status match {
|
||||
case s @ (Idle | Scheduled) ⇒ if (_status.compareAndSet(s, s)) true else become(newStatus)
|
||||
case s @ Suspended ⇒ if (_status.compareAndSet(s, Idle)) true else become(newStatus)
|
||||
case s @ ScheduledSuspended ⇒ if (_status.compareAndSet(s, Scheduled)) true else become(newStatus)
|
||||
case s @ Closed ⇒ _status.set(Closed); false
|
||||
}
|
||||
case Suspended ⇒
|
||||
status match {
|
||||
case s @ (Suspended | ScheduledSuspended) ⇒ if (_status.compareAndSet(s, s)) true else become(newStatus)
|
||||
case s @ Idle ⇒ if (_status.compareAndSet(s, Suspended)) true else become(newStatus)
|
||||
case s @ Scheduled ⇒ if (_status.compareAndSet(s, ScheduledSuspended)) true else become(newStatus)
|
||||
case s @ Closed ⇒ _status.set(Closed); false
|
||||
}
|
||||
case Closed ⇒
|
||||
status match {
|
||||
case s @ Closed ⇒ _status.set(Closed); false
|
||||
case s ⇒ if (_status.compareAndSet(s, Closed)) true else become(newStatus)
|
||||
}
|
||||
}
|
||||
transcend()
|
||||
} //mailboxStatusUpdater.set(this, newStatus)
|
||||
}
|
||||
|
||||
@tailrec
|
||||
final def setAsScheduled(): Boolean = status match {
|
||||
case s @ Idle ⇒ if (_status.compareAndSet(s, Scheduled)) true else setAsScheduled()
|
||||
case s @ Suspended ⇒ if (_status.compareAndSet(s, ScheduledSuspended)) true else setAsScheduled()
|
||||
case _ ⇒ false
|
||||
}
|
||||
|
||||
@tailrec
|
||||
final def setAsIdle(): Boolean = status match {
|
||||
case s @ Scheduled ⇒ if (_status.compareAndSet(s, Idle)) true else setAsIdle()
|
||||
case s @ ScheduledSuspended ⇒ if (_status.compareAndSet(s, Suspended)) true else setAsIdle()
|
||||
case s ⇒ acknowledgeStatus(); false
|
||||
// TODO: I think this write is needed to make memory consistent after processMailbox(), but someone else should check
|
||||
}
|
||||
|
||||
def shouldBeRegisteredForExecution(hasMessageHint: Boolean, hasSystemMessageHint: Boolean): Boolean = status match {
|
||||
case `Open` ⇒ hasMessageHint || hasSystemMessageHint || hasSystemMessages || hasMessages
|
||||
case `Closed` ⇒ false
|
||||
case `Suspended` ⇒ hasSystemMessageHint || hasSystemMessages
|
||||
case Idle | Scheduled ⇒ hasMessageHint || hasSystemMessageHint || hasSystemMessages || hasMessages
|
||||
case Closed ⇒ false
|
||||
case Suspended | ScheduledSuspended ⇒ hasSystemMessageHint || hasSystemMessages
|
||||
}
|
||||
|
||||
final def run = {
|
||||
try { processMailbox() } catch {
|
||||
case ie: InterruptedException ⇒ Thread.currentThread().interrupt() //Restore interrupt
|
||||
} finally {
|
||||
dispatcherLock.unlock()
|
||||
setAsIdle()
|
||||
dispatcher.registerForExecution(this, false, false)
|
||||
}
|
||||
}
|
||||
|
|
@ -84,7 +137,7 @@ abstract class Mailbox extends MessageQueue with SystemMessageQueue with Runnabl
|
|||
final def processMailbox() {
|
||||
processAllSystemMessages()
|
||||
|
||||
if (isOpen) {
|
||||
if (isActive) {
|
||||
var nextMessage = dequeue()
|
||||
if (nextMessage ne null) { //If we have a message
|
||||
if (dispatcher.throughput <= 1) { //If we only run one message per process {
|
||||
|
|
@ -100,7 +153,7 @@ abstract class Mailbox extends MessageQueue with SystemMessageQueue with Runnabl
|
|||
|
||||
processAllSystemMessages()
|
||||
|
||||
nextMessage = if (isOpen) { // If we aren't suspended, we need to make sure we're not overstepping our boundaries
|
||||
nextMessage = if (isActive) { // If we aren't suspended, we need to make sure we're not overstepping our boundaries
|
||||
processedMessages += 1
|
||||
if ((processedMessages >= dispatcher.throughput) || (isDeadlineEnabled && System.nanoTime >= deadlineNs)) // If we're throttled, break out
|
||||
null //We reached our boundaries, abort
|
||||
|
|
|
|||
|
|
@ -85,7 +85,6 @@ abstract class MessageDispatcher extends Serializable {
|
|||
protected[akka] val deadLetterMailbox: Mailbox = DeadLetterMailbox
|
||||
|
||||
object DeadLetterMailbox extends Mailbox {
|
||||
dispatcherLock.tryLock()
|
||||
become(Mailbox.Closed)
|
||||
override def dispatcher = null //MessageDispatcher.this
|
||||
override def enqueue(envelope: Envelope) { envelope.channel sendException new ActorKilledException("Actor has been stopped") }
|
||||
|
|
@ -249,7 +248,7 @@ abstract class MessageDispatcher extends Serializable {
|
|||
*/
|
||||
def resume(actor: ActorCell): Unit = if (uuids.contains(actor.uuid)) {
|
||||
val mbox = actor.mailbox
|
||||
mbox.become(Mailbox.Open)
|
||||
mbox.become(Mailbox.Idle)
|
||||
registerForExecution(mbox, false, false)
|
||||
}
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue