From 29a327aba41f7fcbe1058072f95eeb7deea220cd Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Mon, 26 Sep 2011 11:39:07 +0200 Subject: [PATCH] Changing Mailbox.Status to be an Int --- .../main/scala/akka/dispatch/Dispatcher.scala | 2 +- .../scala/akka/dispatch/MailboxHandling.scala | 27 +++++++++---------- .../scala/akka/dispatch/MessageHandling.scala | 26 +++++++++--------- 3 files changed, 27 insertions(+), 28 deletions(-) diff --git a/akka-actor/src/main/scala/akka/dispatch/Dispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/Dispatcher.scala index ce569b435b..fc0282c91b 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Dispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Dispatcher.scala @@ -122,7 +122,7 @@ class Dispatcher( protected[akka] def shutdown { val old = executorService.getAndSet(new LazyExecutorServiceWrapper(executorServiceFactory.createExecutorService)) if (old ne null) - old.shutdownNow() + old.shutdown() } /** diff --git a/akka-actor/src/main/scala/akka/dispatch/MailboxHandling.scala b/akka-actor/src/main/scala/akka/dispatch/MailboxHandling.scala index 64426a8e61..c2af68d995 100644 --- a/akka-actor/src/main/scala/akka/dispatch/MailboxHandling.scala +++ b/akka-actor/src/main/scala/akka/dispatch/MailboxHandling.scala @@ -15,11 +15,10 @@ import atomic.AtomicReferenceFieldUpdater class MessageQueueAppendFailedException(message: String, cause: Throwable = null) extends AkkaException(message, cause) object Mailbox { - sealed trait Status - case object OPEN extends Status - case object SUSPENDED extends Status - case object CLOSED extends Status - + type Status = Int + val Open = 0 + val Suspended = 1 + val Closed = 2 //private[Mailbox] val mailboxStatusUpdater = AtomicReferenceFieldUpdater.newUpdater[Mailbox, Status](classOf[Mailbox], classOf[Status], "_status") } @@ -33,20 +32,20 @@ abstract class Mailbox extends MessageQueue with SystemMessageQueue with Runnabl */ final val dispatcherLock = new SimpleLock(startLocked = false) @volatile - var _status: Status = OPEN //Must be named _status because of the updater + var _status: Status = Open //Must be named _status because of the updater final def status: Mailbox.Status = _status //mailboxStatusUpdater.get(this) - final def isSuspended: Boolean = status == SUSPENDED - final def isClosed: Boolean = status == CLOSED - final def isOpen: Boolean = status == OPEN + final def isSuspended: Boolean = status == Suspended + final def isClosed: Boolean = status == Closed + final def isOpen: Boolean = status == Open def become(newStatus: Status) = _status = newStatus //mailboxStatusUpdater.set(this, newStatus) def shouldBeRegisteredForExecution(hasMessageHint: Boolean, hasSystemMessageHint: Boolean): Boolean = status match { - case CLOSED ⇒ false - case OPEN ⇒ hasMessageHint || hasSystemMessageHint || hasSystemMessages || hasMessages - case SUSPENDED ⇒ hasSystemMessageHint || hasSystemMessages + case `Open` ⇒ hasMessageHint || hasSystemMessageHint || hasSystemMessages || hasMessages + case `Closed` ⇒ false + case `Suspended` ⇒ hasSystemMessageHint || hasSystemMessages } final def run = { @@ -66,7 +65,7 @@ abstract class Mailbox extends MessageQueue with SystemMessageQueue with Runnabl final def processMailbox() { processAllSystemMessages() - if (status == OPEN) { + if (isOpen) { var nextMessage = dequeue() if (nextMessage ne null) { //If we have a message if (dispatcher.throughput <= 1) { //If we only run one message per process { @@ -82,7 +81,7 @@ abstract class Mailbox extends MessageQueue with SystemMessageQueue with Runnabl processAllSystemMessages() - nextMessage = if (status == OPEN) { // If we aren't suspended, we need to make sure we're not overstepping our boundaries + nextMessage = if (isOpen) { // If we aren't suspended, we need to make sure we're not overstepping our boundaries processedMessages += 1 if ((processedMessages >= dispatcher.throughput) || (isDeadlineEnabled && System.nanoTime >= deadlineNs)) // If we're throttled, break out null //We reached our boundaries, abort diff --git a/akka-actor/src/main/scala/akka/dispatch/MessageHandling.scala b/akka-actor/src/main/scala/akka/dispatch/MessageHandling.scala index 2628847caf..b1a0ada5af 100644 --- a/akka-actor/src/main/scala/akka/dispatch/MessageHandling.scala +++ b/akka-actor/src/main/scala/akka/dispatch/MessageHandling.scala @@ -86,8 +86,8 @@ abstract class MessageDispatcher extends Serializable { object DeadLetterMailbox extends Mailbox { dispatcherLock.tryLock() - become(Mailbox.CLOSED) - override def become(newStatus: Mailbox.Status) { super.become(Mailbox.CLOSED) } //Always transcend to CLOSED to preserve the volatile write + become(Mailbox.Closed) + override def become(newStatus: Mailbox.Status) { super.become(Mailbox.Closed) } //Always transcend to CLOSED to preserve the volatile write override def dispatcher = null //MessageDispatcher.this override def enqueue(envelope: Envelope) { envelope.channel sendException new ActorKilledException("Actor has been stopped") } override def dequeue() = null @@ -184,7 +184,7 @@ abstract class MessageDispatcher extends Serializable { protected[akka] def unregister(actor: ActorCell): Unit = { if (uuids remove actor.uuid) { val mailBox = actor.mailbox - mailBox.become(Mailbox.CLOSED) + mailBox.become(Mailbox.Closed) actor.mailbox = deadLetterMailbox //FIXME getAndSet would be preferrable here cleanUpMailboxFor(actor, mailBox) if (uuids.isEmpty && _tasks.get == 0) { @@ -205,21 +205,20 @@ abstract class MessageDispatcher extends Serializable { * called when an actor is unregistered. */ protected def cleanUpMailboxFor(actor: ActorCell, mailBox: Mailbox): Unit = { - val m = mailBox - if (m.hasSystemMessages) { - var envelope = m.systemDequeue() + if (mailBox.hasSystemMessages) { + var envelope = mailBox.systemDequeue() while (envelope ne null) { deadLetterMailbox.systemEnqueue(envelope) - envelope = m.systemDequeue() + envelope = mailBox.systemDequeue() } } - if (m.hasMessages) { - var envelope = m.dequeue + if (mailBox.hasMessages) { + var envelope = mailBox.dequeue while (envelope ne null) { deadLetterMailbox.enqueue(envelope) - envelope = m.dequeue + envelope = mailBox.dequeue } } } @@ -248,7 +247,8 @@ abstract class MessageDispatcher extends Serializable { case SCHEDULED ⇒ if (uuids.isEmpty && _tasks.get == 0) { active switchOff { - shutdown() // shut down in the dispatcher's references is zero + if (uuids.isEmpty && _tasks.get == 0) + shutdown() // shut down in the dispatcher's references is zero } } shutdownSchedule = UNSCHEDULED @@ -268,14 +268,14 @@ abstract class MessageDispatcher extends Serializable { * After the call to this method, the dispatcher mustn't begin any new message processing for the specified reference */ def suspend(actor: ActorCell): Unit = - if (uuids.contains(actor.uuid)) actor.mailbox.become(Mailbox.SUSPENDED) + if (uuids.contains(actor.uuid)) actor.mailbox.become(Mailbox.Suspended) /* * After the call to this method, the dispatcher must begin any new message processing for the specified reference */ def resume(actor: ActorCell): Unit = if (uuids.contains(actor.uuid)) { val mbox = actor.mailbox - mbox.become(Mailbox.OPEN) + mbox.become(Mailbox.Open) registerForExecution(mbox, false, false) }