From cc7da99bea97bb9b5308fdac7df21bb691c4a7ba Mon Sep 17 00:00:00 2001 From: Hiram Chirino Date: Wed, 21 Jul 2010 09:44:18 -0400 Subject: [PATCH] decoupled the mailbox implementation from the actor. The implementation is now controled by dispatcher associated with the actor. --- .../main/scala/component/ActorComponent.scala | 4 +-- akka-core/src/main/scala/actor/ActorRef.scala | 33 +++++++------------ .../ExecutorBasedEventDrivenDispatcher.scala | 33 +++++++++++++++---- ...sedEventDrivenWorkStealingDispatcher.scala | 27 +++++++++++---- .../main/scala/dispatch/MessageHandling.scala | 2 +- ...sedSingleThreadEventDrivenDispatcher.scala | 2 -- ...BasedThreadPoolEventDrivenDispatcher.scala | 2 -- .../dispatch/ThreadBasedDispatcher.scala | 2 -- .../SerializableTypeClassActorSpec.scala | 2 +- 9 files changed, 61 insertions(+), 46 deletions(-) diff --git a/akka-camel/src/main/scala/component/ActorComponent.scala b/akka-camel/src/main/scala/component/ActorComponent.scala index e267fcd077..01e96b6e20 100644 --- a/akka-camel/src/main/scala/component/ActorComponent.scala +++ b/akka-camel/src/main/scala/component/ActorComponent.scala @@ -247,10 +247,10 @@ private[akka] class AsyncCallbackAdapter(exchange: Exchange, callback: AsyncCall def spawnLink[T <: Actor: Manifest]: ActorRef = unsupported def spawnLinkRemote[T <: Actor : Manifest](hostname: String, port: Int): ActorRef = unsupported def shutdownLinkedActors: Unit = unsupported - def mailboxSize: Int = unsupported def supervisor: Option[ActorRef] = unsupported protected[akka] def postMessageToMailboxAndCreateFutureResultWithTimeout[T](message: Any, timeout: Long, senderOption: Option[ActorRef], senderFuture: Option[CompletableFuture[T]]) = unsupported - protected[akka] def mailbox: Deque[MessageInvocation] = unsupported + protected[akka] def mailbox: AnyRef = unsupported + protected[akka] def mailbox_=(msg: AnyRef):AnyRef = unsupported protected[akka] def restart(reason: Throwable, maxNrOfRetries: Int, withinTimeRange: Int): Unit = unsupported protected[akka] def restartLinkedActors(reason: Throwable, maxNrOfRetries: Int, withinTimeRange: Int): Unit = unsupported protected[akka] def handleTrapExit(dead: ActorRef, reason: Throwable): Unit = unsupported diff --git a/akka-core/src/main/scala/actor/ActorRef.scala b/akka-core/src/main/scala/actor/ActorRef.scala index 82f035f311..90c09892ce 100644 --- a/akka-core/src/main/scala/actor/ActorRef.scala +++ b/akka-core/src/main/scala/actor/ActorRef.scala @@ -514,7 +514,7 @@ trait ActorRef extends TransactionManagement { /** * Returns the mailbox size. */ - def mailboxSize: Int + def mailboxSize = dispatcher.mailboxSize(this) /** * Returns the supervisor, if there is one. @@ -542,8 +542,9 @@ trait ActorRef extends TransactionManagement { protected[akka] def supervisor_=(sup: Option[ActorRef]): Unit - protected[akka] def mailbox: Deque[MessageInvocation] - + protected[akka] def mailbox: AnyRef + protected[akka] def mailbox_=(value: AnyRef): AnyRef + protected[akka] def handleTrapExit(dead: ActorRef, reason: Throwable): Unit protected[akka] def restart(reason: Throwable, maxNrOfRetries: Int, withinTimeRange: Int): Unit @@ -599,8 +600,8 @@ sealed class LocalActorRef private[akka]( @volatile private var loader: Option[ClassLoader] = None @volatile private var maxNrOfRetriesCount: Int = 0 @volatile private var restartsWithinTimeRangeTimestamp: Long = 0L - - protected[akka] val _mailbox: Deque[MessageInvocation] = new ConcurrentLinkedDeque[MessageInvocation] + @volatile private var _mailbox: AnyRef = _ + protected[this] val actorInstance = guard.withGuard { new AtomicReference[Actor](newActor) } // Needed to be able to null out the 'val self: ActorRef' member variables to make the Actor @@ -890,17 +891,9 @@ sealed class LocalActorRef private[akka]( /** * Returns the mailbox. */ - def mailbox: Deque[MessageInvocation] = _mailbox + def mailbox: AnyRef = _mailbox - /** - * Returns the mailbox size. - */ - def mailboxSize: Int = _mailbox.size - - /** - * Returns a copy of all the messages, put into a List[MessageInvocation]. - */ - def messagesInMailbox: List[MessageInvocation] = _mailbox.toArray.toList.asInstanceOf[List[MessageInvocation]] + protected[akka] def mailbox_=(value: AnyRef):AnyRef = { _mailbox = value; value } /** * Shuts down and removes all linked actors. @@ -927,10 +920,7 @@ sealed class LocalActorRef private[akka]( createRemoteRequestProtocolBuilder(this, message, true, senderOption).build, None) } else { val invocation = new MessageInvocation(this, message, senderOption, None, transactionSet.get) - if (dispatcher.usesActorMailbox) { - _mailbox.add(invocation) - invocation.send - } else invocation.send + invocation.send } } @@ -951,7 +941,6 @@ sealed class LocalActorRef private[akka]( else new DefaultCompletableFuture[T](timeout) val invocation = new MessageInvocation( this, message, senderOption, Some(future.asInstanceOf[CompletableFuture[Any]]), transactionSet.get) - if (dispatcher.usesActorMailbox) _mailbox.add(invocation) invocation.send future } @@ -1338,10 +1327,10 @@ private[akka] case class RemoteActorRef private[akka] ( def spawnRemote[T <: Actor: Manifest](hostname: String, port: Int): ActorRef = unsupported def spawnLink[T <: Actor: Manifest]: ActorRef = unsupported def spawnLinkRemote[T <: Actor : Manifest](hostname: String, port: Int): ActorRef = unsupported - def mailboxSize: Int = unsupported def supervisor: Option[ActorRef] = unsupported def shutdownLinkedActors: Unit = unsupported - protected[akka] def mailbox: Deque[MessageInvocation] = unsupported + protected[akka] def mailbox: AnyRef = unsupported + protected[akka] def mailbox_=(value: AnyRef):AnyRef = unsupported protected[akka] def handleTrapExit(dead: ActorRef, reason: Throwable): Unit = unsupported protected[akka] def restart(reason: Throwable, maxNrOfRetries: Int, withinTimeRange: Int): Unit = unsupported protected[akka] def restartLinkedActors(reason: Throwable, maxNrOfRetries: Int, withinTimeRange: Int): Unit = unsupported diff --git a/akka-core/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala b/akka-core/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala index 1d34083e0a..89fb90d16e 100644 --- a/akka-core/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala +++ b/akka-core/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala @@ -5,6 +5,7 @@ package se.scalablesolutions.akka.dispatch import se.scalablesolutions.akka.actor.{ActorRef, IllegalActorStateException} +import jsr166x.ConcurrentLinkedDeque /** * Default settings are: @@ -67,15 +68,34 @@ class ExecutorBasedEventDrivenDispatcher(_name: String, throughput: Int = Dispat val name = "akka:event-driven:dispatcher:" + _name init - def dispatch(invocation: MessageInvocation) = dispatch(invocation.receiver) + def dispatch(invocation: MessageInvocation) = { + getMailbox(invocation.receiver).add(invocation) + dispatch(invocation.receiver) + } + + /** + * @return the mailbox associated with the actor + */ + private def getMailbox(receiver: ActorRef) = receiver.mailbox.asInstanceOf[ConcurrentLinkedDeque[MessageInvocation]] + + override def mailboxSize(actorRef: ActorRef) = getMailbox(actorRef).size + + override def register(actorRef: ActorRef) = { + // The actor will need a ConcurrentLinkedDeque based mailbox + if( actorRef.mailbox == null ) { + actorRef.mailbox = new ConcurrentLinkedDeque[MessageInvocation]() + } + super.register(actorRef) + } def dispatch(receiver: ActorRef): Unit = if (active) { + executor.execute(new Runnable() { def run = { var lockAcquiredOnce = false var finishedBeforeMailboxEmpty = false val lock = receiver.dispatcherLock - val mailbox = receiver.mailbox + val mailbox = getMailbox(receiver) // this do-while loop is required to prevent missing new messages between the end of the inner while // loop and releasing the lock do { @@ -102,15 +122,16 @@ class ExecutorBasedEventDrivenDispatcher(_name: String, throughput: Int = Dispat */ def processMailbox(receiver: ActorRef): Boolean = { var processedMessages = 0 - var messageInvocation = receiver.mailbox.poll + val mailbox = getMailbox(receiver) + var messageInvocation = mailbox.poll while (messageInvocation != null) { messageInvocation.invoke processedMessages += 1 // check if we simply continue with other messages, or reached the throughput limit - if (throughput <= 0 || processedMessages < throughput) messageInvocation = receiver.mailbox.poll + if (throughput <= 0 || processedMessages < throughput) messageInvocation = mailbox.poll else { messageInvocation = null - return !receiver.mailbox.isEmpty + return !mailbox.isEmpty } } false @@ -128,8 +149,6 @@ class ExecutorBasedEventDrivenDispatcher(_name: String, throughput: Int = Dispat references.clear } - def usesActorMailbox = true - def ensureNotActive(): Unit = if (active) throw new IllegalActorStateException( "Can't build a new thread pool for a dispatcher that is already up and running") diff --git a/akka-core/src/main/scala/dispatch/ExecutorBasedEventDrivenWorkStealingDispatcher.scala b/akka-core/src/main/scala/dispatch/ExecutorBasedEventDrivenWorkStealingDispatcher.scala index 76138dce35..6f4ec934eb 100644 --- a/akka-core/src/main/scala/dispatch/ExecutorBasedEventDrivenWorkStealingDispatcher.scala +++ b/akka-core/src/main/scala/dispatch/ExecutorBasedEventDrivenWorkStealingDispatcher.scala @@ -7,6 +7,7 @@ package se.scalablesolutions.akka.dispatch import java.util.concurrent.CopyOnWriteArrayList import se.scalablesolutions.akka.actor.{Actor, ActorRef, IllegalActorStateException} +import jsr166x.ConcurrentLinkedDeque /** * An executor based event driven dispatcher which will try to redistribute work from busy actors to idle actors. It is assumed @@ -44,7 +45,16 @@ class ExecutorBasedEventDrivenWorkStealingDispatcher(_name: String) extends Mess val name = "akka:event-driven-work-stealing:dispatcher:" + _name init + + /** + * @return the mailbox associated with the actor + */ + private def getMailbox(receiver: ActorRef) = receiver.mailbox.asInstanceOf[ConcurrentLinkedDeque[MessageInvocation]] + + override def mailboxSize(actorRef: ActorRef) = getMailbox(actorRef).size + def dispatch(invocation: MessageInvocation) = if (active) { + getMailbox(invocation.receiver).add(invocation) executor.execute(new Runnable() { def run = { if (!tryProcessMailbox(invocation.receiver)) { @@ -76,7 +86,7 @@ class ExecutorBasedEventDrivenWorkStealingDispatcher(_name: String) extends Mess lock.unlock } } - } while ((lockAcquiredOnce && !receiver.mailbox.isEmpty)) + } while ((lockAcquiredOnce && !getMailbox(receiver).isEmpty)) return lockAcquiredOnce } @@ -85,10 +95,11 @@ class ExecutorBasedEventDrivenWorkStealingDispatcher(_name: String) extends Mess * Process the messages in the mailbox of the given actor. */ private def processMailbox(receiver: ActorRef) = { - var messageInvocation = receiver.mailbox.poll + val mailbox = getMailbox(receiver) + var messageInvocation = mailbox.poll while (messageInvocation != null) { messageInvocation.invoke - messageInvocation = receiver.mailbox.poll + messageInvocation = mailbox.poll } } @@ -116,7 +127,7 @@ class ExecutorBasedEventDrivenWorkStealingDispatcher(_name: String) extends Mess for (i <- 0 to actors.length) { val index = (i + startIndex) % actors.length val actor = actors(index) - if (actor != receiver && actor.mailbox.isEmpty) return (Some(actor), index) + if (actor != receiver && getMailbox(actor).isEmpty) return (Some(actor), index) } (None, startIndex) // nothing found, reuse same start index next time } @@ -139,7 +150,7 @@ class ExecutorBasedEventDrivenWorkStealingDispatcher(_name: String) extends Mess * Steal a message from the receiver and give it to the thief. */ private def donateMessage(receiver: ActorRef, thief: ActorRef): Boolean = { - val donated = receiver.mailbox.pollLast + val donated = getMailbox(receiver).pollLast if (donated ne null) { if (donated.senderFuture.isDefined) thief.self.postMessageToMailboxAndCreateFutureResultWithTimeout[Any]( donated.message, receiver.timeout, donated.sender, donated.senderFuture) @@ -169,6 +180,10 @@ class ExecutorBasedEventDrivenWorkStealingDispatcher(_name: String) extends Mess override def register(actorRef: ActorRef) = { verifyActorsAreOfSameType(actorRef) + // The actor will need a ConcurrentLinkedDeque based mailbox + if( actorRef.mailbox == null ) { + actorRef.mailbox = new ConcurrentLinkedDeque[MessageInvocation]() + } pooledActors.add(actorRef) super.register(actorRef) } @@ -178,8 +193,6 @@ class ExecutorBasedEventDrivenWorkStealingDispatcher(_name: String) extends Mess super.unregister(actorRef) } - def usesActorMailbox = true - private def verifyActorsAreOfSameType(actorOfId: ActorRef) = { actorType match { case None => actorType = Some(actorOfId.actor.getClass) diff --git a/akka-core/src/main/scala/dispatch/MessageHandling.scala b/akka-core/src/main/scala/dispatch/MessageHandling.scala index a73f2b691b..92926bb253 100644 --- a/akka-core/src/main/scala/dispatch/MessageHandling.scala +++ b/akka-core/src/main/scala/dispatch/MessageHandling.scala @@ -79,7 +79,7 @@ trait MessageDispatcher extends Logging { } def canBeShutDown: Boolean = references.isEmpty def isShutdown: Boolean - def usesActorMailbox : Boolean + def mailboxSize(actorRef: ActorRef):Int = 0 } /** diff --git a/akka-core/src/main/scala/dispatch/ReactorBasedSingleThreadEventDrivenDispatcher.scala b/akka-core/src/main/scala/dispatch/ReactorBasedSingleThreadEventDrivenDispatcher.scala index 8a951a4e72..e6896ef706 100644 --- a/akka-core/src/main/scala/dispatch/ReactorBasedSingleThreadEventDrivenDispatcher.scala +++ b/akka-core/src/main/scala/dispatch/ReactorBasedSingleThreadEventDrivenDispatcher.scala @@ -41,8 +41,6 @@ class ReactorBasedSingleThreadEventDrivenDispatcher(_name: String) def isShutdown = !active - def usesActorMailbox = false - override def toString = "ReactorBasedSingleThreadEventDrivenDispatcher[" + name + "]" class Demultiplexer(private val messageQueue: ReactiveMessageQueue) extends MessageDemultiplexer { diff --git a/akka-core/src/main/scala/dispatch/ReactorBasedThreadPoolEventDrivenDispatcher.scala b/akka-core/src/main/scala/dispatch/ReactorBasedThreadPoolEventDrivenDispatcher.scala index 370426b2fd..530184d4b2 100644 --- a/akka-core/src/main/scala/dispatch/ReactorBasedThreadPoolEventDrivenDispatcher.scala +++ b/akka-core/src/main/scala/dispatch/ReactorBasedThreadPoolEventDrivenDispatcher.scala @@ -139,8 +139,6 @@ class ReactorBasedThreadPoolEventDrivenDispatcher(_name: String) else nrOfBusyMessages < 100 } - def usesActorMailbox = false - def ensureNotActive(): Unit = if (active) throw new IllegalActorStateException( "Can't build a new thread pool for a dispatcher that is already up and running") diff --git a/akka-core/src/main/scala/dispatch/ThreadBasedDispatcher.scala b/akka-core/src/main/scala/dispatch/ThreadBasedDispatcher.scala index 7355012b1f..012c4899d8 100644 --- a/akka-core/src/main/scala/dispatch/ThreadBasedDispatcher.scala +++ b/akka-core/src/main/scala/dispatch/ThreadBasedDispatcher.scala @@ -40,8 +40,6 @@ class ThreadBasedDispatcher(private val actor: ActorRef) extends MessageDispatch def isShutdown = !active - def usesActorMailbox = false - def shutdown = if (active) { log.debug("Shutting down %s", toString) active = false diff --git a/akka-core/src/test/scala/SerializableTypeClassActorSpec.scala b/akka-core/src/test/scala/SerializableTypeClassActorSpec.scala index 2b26c9ad81..c5084c5e30 100644 --- a/akka-core/src/test/scala/SerializableTypeClassActorSpec.scala +++ b/akka-core/src/test/scala/SerializableTypeClassActorSpec.scala @@ -166,7 +166,7 @@ class MyStatelessActor extends Actor { class MyStatelessActorWithMessagesInMailbox extends Actor { def receive = { case "hello" => - println("# messages in mailbox " + self.mailbox.size) + println("# messages in mailbox " + self.mailboxSize) Thread.sleep(500) case "hello-reply" => self.reply("world") }