decoupled the mailbox implementation from the actor. The implementation is now controled by dispatcher associated with the actor.

This commit is contained in:
Hiram Chirino 2010-07-21 09:44:18 -04:00
parent bc29b0ef2e
commit cc7da99bea
9 changed files with 61 additions and 46 deletions

View file

@ -247,10 +247,10 @@ private[akka] class AsyncCallbackAdapter(exchange: Exchange, callback: AsyncCall
def spawnLink[T <: Actor: Manifest]: ActorRef = unsupported
def spawnLinkRemote[T <: Actor : Manifest](hostname: String, port: Int): ActorRef = unsupported
def shutdownLinkedActors: Unit = unsupported
def mailboxSize: Int = unsupported
def supervisor: Option[ActorRef] = unsupported
protected[akka] def postMessageToMailboxAndCreateFutureResultWithTimeout[T](message: Any, timeout: Long, senderOption: Option[ActorRef], senderFuture: Option[CompletableFuture[T]]) = unsupported
protected[akka] def mailbox: Deque[MessageInvocation] = unsupported
protected[akka] def mailbox: AnyRef = unsupported
protected[akka] def mailbox_=(msg: AnyRef):AnyRef = unsupported
protected[akka] def restart(reason: Throwable, maxNrOfRetries: Int, withinTimeRange: Int): Unit = unsupported
protected[akka] def restartLinkedActors(reason: Throwable, maxNrOfRetries: Int, withinTimeRange: Int): Unit = unsupported
protected[akka] def handleTrapExit(dead: ActorRef, reason: Throwable): Unit = unsupported

View file

@ -514,7 +514,7 @@ trait ActorRef extends TransactionManagement {
/**
* Returns the mailbox size.
*/
def mailboxSize: Int
def mailboxSize = dispatcher.mailboxSize(this)
/**
* Returns the supervisor, if there is one.
@ -542,8 +542,9 @@ trait ActorRef extends TransactionManagement {
protected[akka] def supervisor_=(sup: Option[ActorRef]): Unit
protected[akka] def mailbox: Deque[MessageInvocation]
protected[akka] def mailbox: AnyRef
protected[akka] def mailbox_=(value: AnyRef): AnyRef
protected[akka] def handleTrapExit(dead: ActorRef, reason: Throwable): Unit
protected[akka] def restart(reason: Throwable, maxNrOfRetries: Int, withinTimeRange: Int): Unit
@ -599,8 +600,8 @@ sealed class LocalActorRef private[akka](
@volatile private var loader: Option[ClassLoader] = None
@volatile private var maxNrOfRetriesCount: Int = 0
@volatile private var restartsWithinTimeRangeTimestamp: Long = 0L
protected[akka] val _mailbox: Deque[MessageInvocation] = new ConcurrentLinkedDeque[MessageInvocation]
@volatile private var _mailbox: AnyRef = _
protected[this] val actorInstance = guard.withGuard { new AtomicReference[Actor](newActor) }
// Needed to be able to null out the 'val self: ActorRef' member variables to make the Actor
@ -890,17 +891,9 @@ sealed class LocalActorRef private[akka](
/**
* Returns the mailbox.
*/
def mailbox: Deque[MessageInvocation] = _mailbox
def mailbox: AnyRef = _mailbox
/**
* Returns the mailbox size.
*/
def mailboxSize: Int = _mailbox.size
/**
* Returns a copy of all the messages, put into a List[MessageInvocation].
*/
def messagesInMailbox: List[MessageInvocation] = _mailbox.toArray.toList.asInstanceOf[List[MessageInvocation]]
protected[akka] def mailbox_=(value: AnyRef):AnyRef = { _mailbox = value; value }
/**
* Shuts down and removes all linked actors.
@ -927,10 +920,7 @@ sealed class LocalActorRef private[akka](
createRemoteRequestProtocolBuilder(this, message, true, senderOption).build, None)
} else {
val invocation = new MessageInvocation(this, message, senderOption, None, transactionSet.get)
if (dispatcher.usesActorMailbox) {
_mailbox.add(invocation)
invocation.send
} else invocation.send
invocation.send
}
}
@ -951,7 +941,6 @@ sealed class LocalActorRef private[akka](
else new DefaultCompletableFuture[T](timeout)
val invocation = new MessageInvocation(
this, message, senderOption, Some(future.asInstanceOf[CompletableFuture[Any]]), transactionSet.get)
if (dispatcher.usesActorMailbox) _mailbox.add(invocation)
invocation.send
future
}
@ -1338,10 +1327,10 @@ private[akka] case class RemoteActorRef private[akka] (
def spawnRemote[T <: Actor: Manifest](hostname: String, port: Int): ActorRef = unsupported
def spawnLink[T <: Actor: Manifest]: ActorRef = unsupported
def spawnLinkRemote[T <: Actor : Manifest](hostname: String, port: Int): ActorRef = unsupported
def mailboxSize: Int = unsupported
def supervisor: Option[ActorRef] = unsupported
def shutdownLinkedActors: Unit = unsupported
protected[akka] def mailbox: Deque[MessageInvocation] = unsupported
protected[akka] def mailbox: AnyRef = unsupported
protected[akka] def mailbox_=(value: AnyRef):AnyRef = unsupported
protected[akka] def handleTrapExit(dead: ActorRef, reason: Throwable): Unit = unsupported
protected[akka] def restart(reason: Throwable, maxNrOfRetries: Int, withinTimeRange: Int): Unit = unsupported
protected[akka] def restartLinkedActors(reason: Throwable, maxNrOfRetries: Int, withinTimeRange: Int): Unit = unsupported

View file

@ -5,6 +5,7 @@
package se.scalablesolutions.akka.dispatch
import se.scalablesolutions.akka.actor.{ActorRef, IllegalActorStateException}
import jsr166x.ConcurrentLinkedDeque
/**
* Default settings are:
@ -67,15 +68,34 @@ class ExecutorBasedEventDrivenDispatcher(_name: String, throughput: Int = Dispat
val name = "akka:event-driven:dispatcher:" + _name
init
def dispatch(invocation: MessageInvocation) = dispatch(invocation.receiver)
def dispatch(invocation: MessageInvocation) = {
getMailbox(invocation.receiver).add(invocation)
dispatch(invocation.receiver)
}
/**
* @return the mailbox associated with the actor
*/
private def getMailbox(receiver: ActorRef) = receiver.mailbox.asInstanceOf[ConcurrentLinkedDeque[MessageInvocation]]
override def mailboxSize(actorRef: ActorRef) = getMailbox(actorRef).size
override def register(actorRef: ActorRef) = {
// The actor will need a ConcurrentLinkedDeque based mailbox
if( actorRef.mailbox == null ) {
actorRef.mailbox = new ConcurrentLinkedDeque[MessageInvocation]()
}
super.register(actorRef)
}
def dispatch(receiver: ActorRef): Unit = if (active) {
executor.execute(new Runnable() {
def run = {
var lockAcquiredOnce = false
var finishedBeforeMailboxEmpty = false
val lock = receiver.dispatcherLock
val mailbox = receiver.mailbox
val mailbox = getMailbox(receiver)
// this do-while loop is required to prevent missing new messages between the end of the inner while
// loop and releasing the lock
do {
@ -102,15 +122,16 @@ class ExecutorBasedEventDrivenDispatcher(_name: String, throughput: Int = Dispat
*/
def processMailbox(receiver: ActorRef): Boolean = {
var processedMessages = 0
var messageInvocation = receiver.mailbox.poll
val mailbox = getMailbox(receiver)
var messageInvocation = mailbox.poll
while (messageInvocation != null) {
messageInvocation.invoke
processedMessages += 1
// check if we simply continue with other messages, or reached the throughput limit
if (throughput <= 0 || processedMessages < throughput) messageInvocation = receiver.mailbox.poll
if (throughput <= 0 || processedMessages < throughput) messageInvocation = mailbox.poll
else {
messageInvocation = null
return !receiver.mailbox.isEmpty
return !mailbox.isEmpty
}
}
false
@ -128,8 +149,6 @@ class ExecutorBasedEventDrivenDispatcher(_name: String, throughput: Int = Dispat
references.clear
}
def usesActorMailbox = true
def ensureNotActive(): Unit = if (active) throw new IllegalActorStateException(
"Can't build a new thread pool for a dispatcher that is already up and running")

View file

@ -7,6 +7,7 @@ package se.scalablesolutions.akka.dispatch
import java.util.concurrent.CopyOnWriteArrayList
import se.scalablesolutions.akka.actor.{Actor, ActorRef, IllegalActorStateException}
import jsr166x.ConcurrentLinkedDeque
/**
* An executor based event driven dispatcher which will try to redistribute work from busy actors to idle actors. It is assumed
@ -44,7 +45,16 @@ class ExecutorBasedEventDrivenWorkStealingDispatcher(_name: String) extends Mess
val name = "akka:event-driven-work-stealing:dispatcher:" + _name
init
/**
* @return the mailbox associated with the actor
*/
private def getMailbox(receiver: ActorRef) = receiver.mailbox.asInstanceOf[ConcurrentLinkedDeque[MessageInvocation]]
override def mailboxSize(actorRef: ActorRef) = getMailbox(actorRef).size
def dispatch(invocation: MessageInvocation) = if (active) {
getMailbox(invocation.receiver).add(invocation)
executor.execute(new Runnable() {
def run = {
if (!tryProcessMailbox(invocation.receiver)) {
@ -76,7 +86,7 @@ class ExecutorBasedEventDrivenWorkStealingDispatcher(_name: String) extends Mess
lock.unlock
}
}
} while ((lockAcquiredOnce && !receiver.mailbox.isEmpty))
} while ((lockAcquiredOnce && !getMailbox(receiver).isEmpty))
return lockAcquiredOnce
}
@ -85,10 +95,11 @@ class ExecutorBasedEventDrivenWorkStealingDispatcher(_name: String) extends Mess
* Process the messages in the mailbox of the given actor.
*/
private def processMailbox(receiver: ActorRef) = {
var messageInvocation = receiver.mailbox.poll
val mailbox = getMailbox(receiver)
var messageInvocation = mailbox.poll
while (messageInvocation != null) {
messageInvocation.invoke
messageInvocation = receiver.mailbox.poll
messageInvocation = mailbox.poll
}
}
@ -116,7 +127,7 @@ class ExecutorBasedEventDrivenWorkStealingDispatcher(_name: String) extends Mess
for (i <- 0 to actors.length) {
val index = (i + startIndex) % actors.length
val actor = actors(index)
if (actor != receiver && actor.mailbox.isEmpty) return (Some(actor), index)
if (actor != receiver && getMailbox(actor).isEmpty) return (Some(actor), index)
}
(None, startIndex) // nothing found, reuse same start index next time
}
@ -139,7 +150,7 @@ class ExecutorBasedEventDrivenWorkStealingDispatcher(_name: String) extends Mess
* Steal a message from the receiver and give it to the thief.
*/
private def donateMessage(receiver: ActorRef, thief: ActorRef): Boolean = {
val donated = receiver.mailbox.pollLast
val donated = getMailbox(receiver).pollLast
if (donated ne null) {
if (donated.senderFuture.isDefined) thief.self.postMessageToMailboxAndCreateFutureResultWithTimeout[Any](
donated.message, receiver.timeout, donated.sender, donated.senderFuture)
@ -169,6 +180,10 @@ class ExecutorBasedEventDrivenWorkStealingDispatcher(_name: String) extends Mess
override def register(actorRef: ActorRef) = {
verifyActorsAreOfSameType(actorRef)
// The actor will need a ConcurrentLinkedDeque based mailbox
if( actorRef.mailbox == null ) {
actorRef.mailbox = new ConcurrentLinkedDeque[MessageInvocation]()
}
pooledActors.add(actorRef)
super.register(actorRef)
}
@ -178,8 +193,6 @@ class ExecutorBasedEventDrivenWorkStealingDispatcher(_name: String) extends Mess
super.unregister(actorRef)
}
def usesActorMailbox = true
private def verifyActorsAreOfSameType(actorOfId: ActorRef) = {
actorType match {
case None => actorType = Some(actorOfId.actor.getClass)

View file

@ -79,7 +79,7 @@ trait MessageDispatcher extends Logging {
}
def canBeShutDown: Boolean = references.isEmpty
def isShutdown: Boolean
def usesActorMailbox : Boolean
def mailboxSize(actorRef: ActorRef):Int = 0
}
/**

View file

@ -41,8 +41,6 @@ class ReactorBasedSingleThreadEventDrivenDispatcher(_name: String)
def isShutdown = !active
def usesActorMailbox = false
override def toString = "ReactorBasedSingleThreadEventDrivenDispatcher[" + name + "]"
class Demultiplexer(private val messageQueue: ReactiveMessageQueue) extends MessageDemultiplexer {

View file

@ -139,8 +139,6 @@ class ReactorBasedThreadPoolEventDrivenDispatcher(_name: String)
else nrOfBusyMessages < 100
}
def usesActorMailbox = false
def ensureNotActive(): Unit = if (active) throw new IllegalActorStateException(
"Can't build a new thread pool for a dispatcher that is already up and running")

View file

@ -40,8 +40,6 @@ class ThreadBasedDispatcher(private val actor: ActorRef) extends MessageDispatch
def isShutdown = !active
def usesActorMailbox = false
def shutdown = if (active) {
log.debug("Shutting down %s", toString)
active = false

View file

@ -166,7 +166,7 @@ class MyStatelessActor extends Actor {
class MyStatelessActorWithMessagesInMailbox extends Actor {
def receive = {
case "hello" =>
println("# messages in mailbox " + self.mailbox.size)
println("# messages in mailbox " + self.mailboxSize)
Thread.sleep(500)
case "hello-reply" => self.reply("world")
}