diff --git a/akka-actor-tests/src/test/scala/akka/actor/LoggingReceiveSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/LoggingReceiveSpec.scala index acf44573c1..68044a3c1e 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/LoggingReceiveSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/LoggingReceiveSpec.scala @@ -134,6 +134,7 @@ class LoggingReceiveSpec }) actor ! PoisonPill expectMsg(300 millis, EventHandler.Debug(actor.underlyingActor, "received AutoReceiveMessage PoisonPill")) + awaitCond(actor.isShutdown, 100 millis) } "log LifeCycle changes if requested" in { diff --git a/akka-actor/src/main/java/akka/dispatch/AbstractMailbox.java b/akka-actor/src/main/java/akka/dispatch/AbstractMailbox.java index bfe6dad361..21d41ac921 100644 --- a/akka-actor/src/main/java/akka/dispatch/AbstractMailbox.java +++ b/akka-actor/src/main/java/akka/dispatch/AbstractMailbox.java @@ -7,6 +7,6 @@ package akka.dispatch; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; abstract class AbstractMailbox { - private volatile int _status = Mailbox.Idle(); + private volatile int _status; // not initialized because this is faster: 0 == Open protected final static AtomicIntegerFieldUpdater updater = AtomicIntegerFieldUpdater.newUpdater(AbstractMailbox.class, "_status"); -} \ No newline at end of file +} diff --git a/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala index 39e421cdc9..054697e10a 100644 --- a/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala @@ -88,7 +88,7 @@ abstract class MessageDispatcher extends Serializable { protected[akka] val deadLetterMailbox: Mailbox = DeadLetterMailbox object DeadLetterMailbox extends Mailbox { - become(Mailbox.Closed) + becomeClosed() override def dispatcher = null //MessageDispatcher.this override def enqueue(envelope: Envelope) { envelope.channel sendException new ActorKilledException("Actor has been stopped") } override def dequeue() = null @@ -185,7 +185,7 @@ abstract class MessageDispatcher extends Serializable { protected[akka] def unregister(actor: ActorCell) { if (uuids remove actor.uuid) { val mailBox = actor.mailbox - mailBox.become(Mailbox.Closed) + mailBox.becomeClosed() actor.mailbox = deadLetterMailbox //FIXME getAndSet would be preferrable here cleanUpMailboxFor(actor, mailBox) } else System.err.println("Couldn't unregister: " + actor) @@ -244,14 +244,14 @@ abstract class MessageDispatcher extends Serializable { * After the call to this method, the dispatcher mustn't begin any new message processing for the specified reference */ def suspend(actor: ActorCell): Unit = - if (uuids.contains(actor.uuid)) actor.mailbox.become(Mailbox.Suspended) + if (uuids.contains(actor.uuid)) actor.mailbox.becomeSuspended() /* * After the call to this method, the dispatcher must begin any new message processing for the specified reference */ def resume(actor: ActorCell): Unit = if (uuids.contains(actor.uuid)) { val mbox = actor.mailbox - mbox.become(Mailbox.Idle) + mbox.becomeOpen() registerForExecution(mbox, false, false) } diff --git a/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala b/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala index 2788d58e11..86d9cbfba7 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala @@ -19,13 +19,16 @@ private[dispatch] object Mailbox { type Status = Int + /* + * the following assigned numbers CANNOT be changed without looking at the code which uses them! + */ + // primary status: only first three - final val Idle = 0 + final val Open = 0 // _status is not initialized in AbstractMailbox, so default must be zero! final val Suspended = 1 final val Closed = 2 - // secondary status: Idle or Suspended plus Scheduled - final val Scheduled = 3 - final val ScheduledSuspended = 4 // may only happen for system message processing + // secondary status: Scheduled bit may be added to Open/Suspended + final val Scheduled = 4 } /** @@ -36,23 +39,13 @@ abstract class Mailbox extends AbstractMailbox with MessageQueue with SystemMess final def status: Mailbox.Status = AbstractMailbox.updater.get(this) - final def isActive: Boolean = status match { - case Idle | Scheduled ⇒ true - case _ ⇒ false - } + final def isActive: Boolean = (status & 3) == Open - // FIXME: this does not seem to be used anywhere, remove? - final def isSuspended: Boolean = status match { - case Suspended | ScheduledSuspended ⇒ true - case _ ⇒ false - } + final def isSuspended: Boolean = (status & 3) == Suspended final def isClosed: Boolean = status == Closed - final def isScheduled: Boolean = status match { - case Scheduled | ScheduledSuspended ⇒ true - case _ ⇒ false - } + final def isScheduled: Boolean = (status & Scheduled) != 0 @inline protected final def updateStatus(oldStatus: Status, newStatus: Status): Boolean = @@ -73,52 +66,73 @@ abstract class Mailbox extends AbstractMailbox with MessageQueue with SystemMess } /** - * set new primary status: Idle, Suspended or Closed. Caller does not need to - * worry about whether status was Scheduled or not. + * set new primary status Open. Caller does not need to worry about whether + * status was Scheduled or not. */ @tailrec - final def become(newStatus: Status): Boolean = { - newStatus match { - case Idle ⇒ - status match { - case s @ (Idle | Scheduled) ⇒ if (updateStatus(s, s)) true else become(newStatus) - case s @ Suspended ⇒ if (updateStatus(s, Idle)) true else become(newStatus) - case s @ ScheduledSuspended ⇒ if (updateStatus(s, Scheduled)) true else become(newStatus) - case Closed ⇒ setStatus(Closed); false - } - case Suspended ⇒ - status match { - case s @ (Suspended | ScheduledSuspended) ⇒ if (updateStatus(s, s)) true else become(newStatus) - case s @ Idle ⇒ if (updateStatus(s, Suspended)) true else become(newStatus) - case s @ Scheduled ⇒ if (updateStatus(s, ScheduledSuspended)) true else become(newStatus) - case Closed ⇒ setStatus(Closed); false - } - case Closed ⇒ - status match { - case Closed ⇒ setStatus(Closed); false - case s ⇒ if (updateStatus(s, Closed)) true else become(newStatus) - } + final def becomeOpen(): Boolean = status match { + case Closed ⇒ setStatus(Closed); false + case s ⇒ updateStatus(s, Open | s & Scheduled) || becomeOpen() + } + + /** + * set new primary status Suspended. Caller does not need to worry about whether + * status was Scheduled or not. + */ + @tailrec + final def becomeSuspended(): Boolean = status match { + case Closed ⇒ setStatus(Closed); false + case s ⇒ updateStatus(s, Suspended | s & Scheduled) || becomeSuspended() + } + + /** + * set new primary status Closed. Caller does not need to worry about whether + * status was Scheduled or not. + */ + @tailrec + final def becomeClosed(): Boolean = status match { + case Closed ⇒ setStatus(Closed); false + case s ⇒ updateStatus(s, Closed) || becomeClosed() + } + + /** + * Set Scheduled status, keeping primary status as is. + */ + @tailrec + final def setAsScheduled(): Boolean = { + val s = status + /* + * only try to add Scheduled bit if pure Open/Suspended, not Closed or with + * Scheduled bit already set (this is one of the reasons why the numbers + * cannot be changed in object Mailbox above) + */ + if (s <= Suspended) updateStatus(s, s | Scheduled) || setAsScheduled() + else false + } + + /** + * Reset Scheduled status, keeping primary status as is. + */ + @tailrec + final def setAsIdle(): Boolean = { + val s = status + /* + * only try to remove Scheduled bit if currently Scheduled, not Closed or + * without Scheduled bit set (this is one of the reasons why the numbers + * cannot be changed in object Mailbox above) + */ + if (s >= Scheduled) { + updateStatus(s, s & ~Scheduled) || setAsIdle() + } else { + acknowledgeStatus() // this write is needed to make memory consistent after processMailbox() + false } } - @tailrec - final def setAsScheduled(): Boolean = status match { - case s @ Idle ⇒ if (updateStatus(s, Scheduled)) true else setAsScheduled() - case s @ Suspended ⇒ if (updateStatus(s, ScheduledSuspended)) true else setAsScheduled() - case _ ⇒ false - } - - @tailrec - final def setAsIdle(): Boolean = status match { - case s @ Scheduled ⇒ if (updateStatus(s, Idle)) true else setAsIdle() - case s @ ScheduledSuspended ⇒ if (updateStatus(s, Suspended)) true else setAsIdle() - case s ⇒ acknowledgeStatus(); false - } - def shouldBeRegisteredForExecution(hasMessageHint: Boolean, hasSystemMessageHint: Boolean): Boolean = status match { - case Idle | Scheduled ⇒ hasMessageHint || hasSystemMessageHint || hasSystemMessages || hasMessages - case Closed ⇒ false - case Suspended | ScheduledSuspended ⇒ hasSystemMessageHint || hasSystemMessages + case Open | Scheduled ⇒ hasMessageHint || hasSystemMessageHint || hasSystemMessages || hasMessages + case Closed ⇒ false + case _ ⇒ hasSystemMessageHint || hasSystemMessages } final def run = {