diff --git a/akka-actor/src/main/scala/akka/actor/Actor.scala b/akka-actor/src/main/scala/akka/actor/Actor.scala index 607f54204b..8c3e478644 100644 --- a/akka-actor/src/main/scala/akka/actor/Actor.scala +++ b/akka-actor/src/main/scala/akka/actor/Actor.scala @@ -23,7 +23,10 @@ import akka.japi. {Creator, Procedure} */ @serializable sealed trait LifeCycleMessage -case class HotSwap(code: ActorRef => Actor.Receive, discardOld: Boolean = true) extends LifeCycleMessage { +/* Marker trait to show which Messages are automatically handled by Akka */ +sealed trait AutoReceivedMessage { self: LifeCycleMessage => } + +case class HotSwap(code: ActorRef => Actor.Receive, discardOld: Boolean = true) extends AutoReceivedMessage with LifeCycleMessage { /** * Java API */ @@ -40,22 +43,22 @@ case class HotSwap(code: ActorRef => Actor.Receive, discardOld: Boolean = true) def this(code: akka.japi.Function[ActorRef,Procedure[Any]]) = this(code, true) } -case object RevertHotSwap extends LifeCycleMessage +case object RevertHotSwap extends AutoReceivedMessage with LifeCycleMessage -case class Restart(reason: Throwable) extends LifeCycleMessage +case class Restart(reason: Throwable) extends AutoReceivedMessage with LifeCycleMessage -case class Exit(dead: ActorRef, killer: Throwable) extends LifeCycleMessage +case class Exit(dead: ActorRef, killer: Throwable) extends AutoReceivedMessage with LifeCycleMessage -case class Link(child: ActorRef) extends LifeCycleMessage +case class Link(child: ActorRef) extends AutoReceivedMessage with LifeCycleMessage -case class Unlink(child: ActorRef) extends LifeCycleMessage +case class Unlink(child: ActorRef) extends AutoReceivedMessage with LifeCycleMessage -case class UnlinkAndStop(child: ActorRef) extends LifeCycleMessage +case class UnlinkAndStop(child: ActorRef) extends AutoReceivedMessage with LifeCycleMessage + +case object PoisonPill extends AutoReceivedMessage with LifeCycleMessage case object ReceiveTimeout extends LifeCycleMessage -case object PoisonPill extends LifeCycleMessage - case class MaximumNumberOfRestartsWithinTimeRangeReached( @BeanProperty val victim: ActorRef, @BeanProperty val maxNrOfRetries: Option[Int], @@ -303,8 +306,7 @@ trait Actor extends Logging { "\n\t\t'val actor = Actor.actorOf[MyActor]', or" + "\n\t\t'val actor = Actor.actorOf(new MyActor(..))', or" + "\n\t\t'val actor = Actor.actor { case msg => .. } }'") - val ref = optRef.asInstanceOf[Some[ActorRef]].get - ref.id = getClass.getName //FIXME: Is this needed? + optRef.asInstanceOf[Some[ActorRef]].get.id = getClass.getName //FIXME: Is this needed? optRef.asInstanceOf[Some[ActorRef]] } @@ -426,56 +428,57 @@ trait Actor extends Logging { /** * Reverts the Actor behavior to the previous one in the hotswap stack. */ - def unbecome: Unit = if (!self.hotswap.isEmpty) self.hotswap = self.hotswap.pop + def unbecome: Unit = { + val h = self.hotswap + if (h.nonEmpty) + self.hotswap = h.pop + } // ========================================= // ==== INTERNAL IMPLEMENTATION DETAILS ==== // ========================================= - private[akka] def apply(msg: Any) = fullBehavior(msg) + private[akka] final def apply(msg: Any) = fullBehavior(msg) //TODO: Scala 2.9.0 => processingBehavior.applyOrElse(msg, unhandledMsgFun) + + private final def autoReceiveMessage(msg: AutoReceivedMessage) { + msg match { + case HotSwap(code,discardOld) => become(code(self),discardOld) + case RevertHotSwap => unbecome + case Exit(dead, reason) => self.handleTrapExit(dead, reason) + case Link(child) => self.link(child) + case Unlink(child) => self.unlink(child) + case UnlinkAndStop(child) => self.unlink(child); child.stop + case Restart(reason) => throw reason + case PoisonPill => { + val f = self.senderFuture + if(f.isDefined) { + f.get.completeWithException(new ActorKilledException("PoisonPill")) + } + self.stop + } + + } + } /*Processingbehavior and fullBehavior are duplicates so make sure changes are done to both */ private lazy val processingBehavior: Receive = { - lazy val defaultBehavior = receive + val defaultBehavior = receive val actorBehavior: Receive = { - case HotSwap(code,discardOld) => become(code(self),discardOld) - case RevertHotSwap => unbecome - case Exit(dead, reason) => self.handleTrapExit(dead, reason) - case Link(child) => self.link(child) - case Unlink(child) => self.unlink(child) - case UnlinkAndStop(child) => self.unlink(child); child.stop - case Restart(reason) => throw reason - case PoisonPill => if(self.senderFuture.isDefined) { - self.senderFuture.get.completeWithException( - new ActorKilledException("PoisonPill") - ) - } - self.stop - case msg if !self.hotswap.isEmpty && + case l: AutoReceivedMessage => autoReceiveMessage(l) + case msg if self.hotswap.nonEmpty && self.hotswap.head.isDefinedAt(msg) => self.hotswap.head.apply(msg) case msg if self.hotswap.isEmpty && defaultBehavior.isDefinedAt(msg) => defaultBehavior.apply(msg) } actorBehavior } - + + //TODO: Scala2.9.0 replace with: val unhandledMsgFun: Any => Unit = unhandled _ private lazy val fullBehavior: Receive = { - lazy val defaultBehavior = receive + val defaultBehavior = receive val actorBehavior: Receive = { - case HotSwap(code, discardOld) => become(code(self), discardOld) - case RevertHotSwap => unbecome - case Exit(dead, reason) => self.handleTrapExit(dead, reason) - case Link(child) => self.link(child) - case Unlink(child) => self.unlink(child) - case UnlinkAndStop(child) => self.unlink(child); child.stop - case Restart(reason) => throw reason - case PoisonPill => if(self.senderFuture.isDefined) { - self.senderFuture.get.completeWithException( - new ActorKilledException("PoisonPill") - ) - } - self.stop - case msg if !self.hotswap.isEmpty && + case l: AutoReceivedMessage => autoReceiveMessage(l) + case msg if self.hotswap.nonEmpty && self.hotswap.head.isDefinedAt(msg) => self.hotswap.head.apply(msg) case msg if self.hotswap.isEmpty && defaultBehavior.isDefinedAt(msg) => defaultBehavior.apply(msg) diff --git a/akka-actor/src/main/scala/akka/actor/ActorRef.scala b/akka-actor/src/main/scala/akka/actor/ActorRef.scala index 3e6342bf36..1980af2377 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRef.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRef.scala @@ -829,7 +829,16 @@ class LocalActorRef private[akka] ( else { currentMessage = messageHandle try { - dispatch(messageHandle) + Actor.log.slf4j.trace("Invoking actor with message: {}\n", messageHandle) + try { + cancelReceiveTimeout // FIXME: leave this here? + actor(messageHandle.message) + } catch { + case e: InterruptedException => {} // received message while actor is shutting down, ignore + case e => handleExceptionInDispatch(e, messageHandle.message) + } finally { + checkReceiveTimeout // Reschedule receive timeout + } } catch { case e => Actor.log.slf4j.error("Could not invoke actor [{}]", this) @@ -1003,22 +1012,6 @@ class LocalActorRef private[akka] ( a } - private def dispatch[T](messageHandle: MessageInvocation) = { - Actor.log.slf4j.trace("Invoking actor with message: {}\n", messageHandle) - val message = messageHandle.message //serializeMessage(messageHandle.message) - - try { - cancelReceiveTimeout // FIXME: leave this here? - actor(message) - } catch { - case e: InterruptedException => {} // received message while actor is shutting down, ignore - case e => handleExceptionInDispatch(e, message) - } - finally { - checkReceiveTimeout // Reschedule receive timeout - } - } - private def shutDownTemporaryActor(temporaryActor: ActorRef) { Actor.log.slf4j.info("Actor [{}] configured as TEMPORARY and will not be restarted.", temporaryActor.id) temporaryActor.stop diff --git a/akka-actor/src/main/scala/akka/dispatch/ExecutorBasedEventDrivenDispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/ExecutorBasedEventDrivenDispatcher.scala index 4e48806a8c..2fa16eca71 100644 --- a/akka-actor/src/main/scala/akka/dispatch/ExecutorBasedEventDrivenDispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/ExecutorBasedEventDrivenDispatcher.scala @@ -9,7 +9,7 @@ import akka.util.{ReflectiveAccess, Switch} import java.util.Queue import java.util.concurrent.atomic.AtomicReference -import java.util.concurrent.{ExecutorService, RejectedExecutionException, ConcurrentLinkedQueue, LinkedBlockingQueue} +import java.util.concurrent.{ TimeUnit, ExecutorService, RejectedExecutionException, ConcurrentLinkedQueue, LinkedBlockingQueue} /** * Default settings are: @@ -128,7 +128,7 @@ class ExecutorBasedEventDrivenDispatcher( private[akka] def registerForExecution(mbox: MessageQueue with ExecutableMailbox): Unit = if (active.isOn) { - if (mbox.suspended.isOff && mbox.dispatcherLock.tryLock()) { + if (!mbox.suspended.locked && mbox.dispatcherLock.tryLock()) { try { executorService.get() execute mbox } catch { @@ -143,13 +143,13 @@ class ExecutorBasedEventDrivenDispatcher( def suspend(actorRef: ActorRef) { log.slf4j.debug("Suspending {}",actorRef.uuid) - getMailbox(actorRef).suspended.switchOn + getMailbox(actorRef).suspended.tryLock } def resume(actorRef: ActorRef) { log.slf4j.debug("Resuming {}",actorRef.uuid) val mbox = getMailbox(actorRef) - mbox.suspended.switchOff + mbox.suspended.tryUnlock registerForExecution(mbox) } } @@ -162,12 +162,14 @@ trait ExecutableMailbox extends Runnable { self: MessageQueue => def dispatcher: ExecutorBasedEventDrivenDispatcher final def run = { - val reschedule = try { - try { processMailbox() } catch { case ie: InterruptedException => true } + try { + processMailbox() + } catch { + case ie: InterruptedException => } finally { dispatcherLock.unlock() } - if (reschedule || !self.isEmpty) + if (!self.isEmpty) dispatcher.registerForExecution(this) } @@ -176,33 +178,33 @@ trait ExecutableMailbox extends Runnable { self: MessageQueue => * * @return true if the processing finished before the mailbox was empty, due to the throughput constraint */ - final def processMailbox(): Boolean = { - if (self.suspended.isOn) - true - else { + final def processMailbox() { + if (!self.suspended.locked) { var nextMessage = self.dequeue - if (nextMessage ne null) { - val throttle = dispatcher.throughput > 0 - var processedMessages = 0 - val isDeadlineEnabled = throttle && dispatcher.throughputDeadlineTime > 0 - val started = if (isDeadlineEnabled) System.currentTimeMillis else 0 - do { - nextMessage.invoke + if (nextMessage ne null) { //If we have a message + if (dispatcher.throughput <= 1) //If we only run one message per process + nextMessage.invoke //Just run it + else { //But otherwise, if we are throttled, we need to do some book-keeping + var processedMessages = 0 + val isDeadlineEnabled = dispatcher.throughputDeadlineTime > 0 + val deadlineNs = if (isDeadlineEnabled) System.nanoTime + TimeUnit.MILLISECONDS.toNanos(dispatcher.throughputDeadlineTime) else 0 + do { + nextMessage.invoke - if (throttle) { // Will be elided when false - processedMessages += 1 - if ((processedMessages >= dispatcher.throughput) || - (isDeadlineEnabled && (System.currentTimeMillis - started) >= dispatcher.throughputDeadlineTime)) // If we're throttled, break out - return !self.isEmpty - } - - if (self.suspended.isOn) - return true - - nextMessage = self.dequeue - } while (nextMessage ne null) + nextMessage = + if (self.suspended.locked) { + null //If we are suspended, abort + } + else { //If we aren't suspended, we need to make sure we're not overstepping our boundaries + processedMessages += 1 + if ((processedMessages >= dispatcher.throughput) || (isDeadlineEnabled && System.nanoTime >= deadlineNs)) // If we're throttled, break out + null //We reached our boundaries, abort + else + self.dequeue //Dequeue the next message + } + } while (nextMessage ne null) + } } - false } } } diff --git a/akka-actor/src/main/scala/akka/dispatch/ExecutorBasedEventDrivenWorkStealingDispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/ExecutorBasedEventDrivenWorkStealingDispatcher.scala index 3d4a6c439b..54aec2607d 100644 --- a/akka-actor/src/main/scala/akka/dispatch/ExecutorBasedEventDrivenWorkStealingDispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/ExecutorBasedEventDrivenWorkStealingDispatcher.scala @@ -95,13 +95,13 @@ class ExecutorBasedEventDrivenWorkStealingDispatcher( * @return */ private def processMailbox(mailbox: MessageQueue): Boolean = try { - if (mailbox.suspended.isOn) + if (mailbox.suspended.locked) return false var messageInvocation = mailbox.dequeue while (messageInvocation ne null) { messageInvocation.invoke - if (mailbox.suspended.isOn) + if (mailbox.suspended.locked) return false messageInvocation = mailbox.dequeue } @@ -180,12 +180,12 @@ class ExecutorBasedEventDrivenWorkStealingDispatcher( def suspend(actorRef: ActorRef) { - getMailbox(actorRef).suspended.switchOn + getMailbox(actorRef).suspended.tryLock } def resume(actorRef: ActorRef) { val mbox = getMailbox(actorRef) - mbox.suspended.switchOff + mbox.suspended.tryUnlock executorService.get() execute mbox } diff --git a/akka-actor/src/main/scala/akka/dispatch/MailboxHandling.scala b/akka-actor/src/main/scala/akka/dispatch/MailboxHandling.scala index 9b364b3af1..68e8cf68ce 100644 --- a/akka-actor/src/main/scala/akka/dispatch/MailboxHandling.scala +++ b/akka-actor/src/main/scala/akka/dispatch/MailboxHandling.scala @@ -19,7 +19,7 @@ class MessageQueueAppendFailedException(message: String) extends AkkaException(m */ trait MessageQueue { val dispatcherLock = new SimpleLock - val suspended = new Switch(false) + val suspended = new SimpleLock def enqueue(handle: MessageInvocation) def dequeue(): MessageInvocation def size: Int