diff --git a/akka-actor/src/main/java/akka/dispatch/AbstractMailbox.java b/akka-actor/src/main/java/akka/dispatch/AbstractMailbox.java new file mode 100644 index 0000000000..541f64da95 --- /dev/null +++ b/akka-actor/src/main/java/akka/dispatch/AbstractMailbox.java @@ -0,0 +1,12 @@ +/** + * Copyright (C) 2009-2011 Typesafe Inc. + */ + +package akka.dispatch; + +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; + +abstract class AbstractMailbox { + private volatile int _status = Mailbox.Idle(); + protected static AtomicIntegerFieldUpdater updater = AtomicIntegerFieldUpdater.newUpdater(AbstractMailbox.class, "_status"); +} \ No newline at end of file diff --git a/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala b/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala index 72ab6c1ccd..2788d58e11 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala @@ -26,21 +26,15 @@ private[dispatch] object Mailbox { // secondary status: Idle or Suspended plus Scheduled final val Scheduled = 3 final val ScheduledSuspended = 4 // may only happen for system message processing - - //private[Mailbox] val mailboxStatusUpdater = AtomicReferenceFieldUpdater.newUpdater[Mailbox, Status](classOf[Mailbox], classOf[Status], "_status") } /** * @author Jonas Bonér */ -abstract class Mailbox extends MessageQueue with SystemMessageQueue with Runnable { +abstract class Mailbox extends AbstractMailbox with MessageQueue with SystemMessageQueue with Runnable { import Mailbox._ - /* - * Internal implementation of MessageDispatcher uses these, don't touch or rely on - */ - private[dispatch] val _status: AtomicInteger = new AtomicInteger(Idle) //Must be named _status because of the updater - final def status: Mailbox.Status = _status.get() //mailboxStatusUpdater.get(this) + final def status: Mailbox.Status = AbstractMailbox.updater.get(this) final def isActive: Boolean = status match { case Idle | Scheduled ⇒ true @@ -60,13 +54,21 @@ abstract class Mailbox extends MessageQueue with SystemMessageQueue with Runnabl case _ ⇒ false } + @inline + protected final def updateStatus(oldStatus: Status, newStatus: Status): Boolean = + AbstractMailbox.updater.compareAndSet(this, oldStatus, newStatus) + + @inline + protected final def setStatus(newStatus: Status): Unit = + AbstractMailbox.updater.set(this, newStatus) + /** * Internal method to enforce a volatile write of the status */ @tailrec final def acknowledgeStatus() { - val s = _status.get() - if (_status.compareAndSet(s, s)) () + val s = status + if (updateStatus(s, s)) () else acknowledgeStatus() } @@ -76,41 +78,40 @@ abstract class Mailbox extends MessageQueue with SystemMessageQueue with Runnabl */ @tailrec final def become(newStatus: Status): Boolean = { - import _status.{ compareAndSet ⇒ CAS, set ⇒ SET } newStatus match { case Idle ⇒ status match { - case s @ (Idle | Scheduled) ⇒ if (CAS(s, s)) true else become(newStatus) - case s @ Suspended ⇒ if (CAS(s, Idle)) true else become(newStatus) - case s @ ScheduledSuspended ⇒ if (CAS(s, Scheduled)) true else become(newStatus) - case Closed ⇒ SET(Closed); false + case s @ (Idle | Scheduled) ⇒ if (updateStatus(s, s)) true else become(newStatus) + case s @ Suspended ⇒ if (updateStatus(s, Idle)) true else become(newStatus) + case s @ ScheduledSuspended ⇒ if (updateStatus(s, Scheduled)) true else become(newStatus) + case Closed ⇒ setStatus(Closed); false } case Suspended ⇒ status match { - case s @ (Suspended | ScheduledSuspended) ⇒ if (CAS(s, s)) true else become(newStatus) - case s @ Idle ⇒ if (CAS(s, Suspended)) true else become(newStatus) - case s @ Scheduled ⇒ if (CAS(s, ScheduledSuspended)) true else become(newStatus) - case Closed ⇒ SET(Closed); false + case s @ (Suspended | ScheduledSuspended) ⇒ if (updateStatus(s, s)) true else become(newStatus) + case s @ Idle ⇒ if (updateStatus(s, Suspended)) true else become(newStatus) + case s @ Scheduled ⇒ if (updateStatus(s, ScheduledSuspended)) true else become(newStatus) + case Closed ⇒ setStatus(Closed); false } case Closed ⇒ status match { - case Closed ⇒ SET(Closed); false - case s ⇒ if (CAS(s, Closed)) true else become(newStatus) + case Closed ⇒ setStatus(Closed); false + case s ⇒ if (updateStatus(s, Closed)) true else become(newStatus) } } } @tailrec final def setAsScheduled(): Boolean = status match { - case s @ Idle ⇒ if (_status.compareAndSet(s, Scheduled)) true else setAsScheduled() - case s @ Suspended ⇒ if (_status.compareAndSet(s, ScheduledSuspended)) true else setAsScheduled() + case s @ Idle ⇒ if (updateStatus(s, Scheduled)) true else setAsScheduled() + case s @ Suspended ⇒ if (updateStatus(s, ScheduledSuspended)) true else setAsScheduled() case _ ⇒ false } @tailrec final def setAsIdle(): Boolean = status match { - case s @ Scheduled ⇒ if (_status.compareAndSet(s, Idle)) true else setAsIdle() - case s @ ScheduledSuspended ⇒ if (_status.compareAndSet(s, Suspended)) true else setAsIdle() + case s @ Scheduled ⇒ if (updateStatus(s, Idle)) true else setAsIdle() + case s @ ScheduledSuspended ⇒ if (updateStatus(s, Suspended)) true else setAsIdle() case s ⇒ acknowledgeStatus(); false }