Removing receiver from Envelope and switch to use the Mailbox.actor instead, this should speed up the BalancingDispatcher by some since it doesn't entail any allocations in adopting a message

This commit is contained in:
Viktor Klang 2011-10-19 13:19:44 +02:00
parent bde3969f65
commit 0dc3c5ad3d
9 changed files with 21 additions and 38 deletions

View file

@ -127,10 +127,10 @@ object ActorModelSpec {
super.unregister(actor) super.unregister(actor)
} }
protected[akka] abstract override def dispatch(invocation: Envelope) { protected[akka] abstract override def dispatch(receiver: ActorCell, invocation: Envelope) {
val stats = getStats(invocation.receiver.self) val stats = getStats(receiver.self)
stats.msgsReceived.incrementAndGet() stats.msgsReceived.incrementAndGet()
super.dispatch(invocation) super.dispatch(receiver, invocation)
} }
protected[akka] abstract override def start() { protected[akka] abstract override def start() {

View file

@ -80,12 +80,7 @@ abstract class MailboxSpec extends AkkaSpec with BeforeAndAfterAll with BeforeAn
result result
} }
def createMessageInvocation(msg: Any): Envelope = { def createMessageInvocation(msg: Any): Envelope = Envelope(msg, NullChannel)
new Envelope(
actorOf(new Actor { //Dummy actor
def receive = { case _ }
}).asInstanceOf[LocalActorRef].underlying, msg, NullChannel)
}
def ensureInitialMailboxState(config: MailboxType, q: Mailbox) { def ensureInitialMailboxState(config: MailboxType, q: Mailbox) {
q must not be null q must not be null

View file

@ -278,7 +278,7 @@ private[akka] class ActorCell(
//TODO FIXME remove this method //TODO FIXME remove this method
def supervisor: Option[ActorRef] = props.supervisor def supervisor: Option[ActorRef] = props.supervisor
def postMessageToMailbox(message: Any, channel: UntypedChannel): Unit = dispatcher dispatch Envelope(this, message, channel) def postMessageToMailbox(message: Any, channel: UntypedChannel): Unit = dispatcher.dispatch(this, Envelope(message, channel))
def postMessageToMailboxAndCreateFutureResultWithTimeout( def postMessageToMailboxAndCreateFutureResultWithTimeout(
message: Any, message: Any,
@ -288,7 +288,7 @@ private[akka] class ActorCell(
case f: ActorPromise f case f: ActorPromise f
case _ new ActorPromise(timeout)(dispatcher) case _ new ActorPromise(timeout)(dispatcher)
} }
dispatcher dispatch Envelope(this, message, future) dispatcher.dispatch(this, Envelope(message, future))
future future
} }

View file

@ -17,13 +17,7 @@ import scala.annotation.tailrec
/** /**
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a> * @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/ */
final case class Envelope(val receiver: ActorCell, val message: Any, val channel: UntypedChannel) { final case class Envelope(val message: Any, val channel: UntypedChannel)
if (receiver eq null) throw new IllegalArgumentException("Receiver can't be null")
final def invoke() {
receiver invoke this
}
}
object SystemMessage { object SystemMessage {
@tailrec @tailrec
@ -295,7 +289,7 @@ abstract class MessageDispatcher(val app: AkkaApplication) extends Serializable
/** /**
* Will be called when the dispatcher is to queue an invocation for execution * Will be called when the dispatcher is to queue an invocation for execution
*/ */
protected[akka] def dispatch(invocation: Envelope) protected[akka] def dispatch(receiver: ActorCell, invocation: Envelope)
/** /**
* Suggest to register the provided mailbox for execution * Suggest to register the provided mailbox for execution

View file

@ -57,12 +57,7 @@ class BalancingDispatcher(
class SharingMailbox(_actor: ActorCell) extends Mailbox(_actor) with DefaultSystemMessageQueue { class SharingMailbox(_actor: ActorCell) extends Mailbox(_actor) with DefaultSystemMessageQueue {
final def enqueue(handle: Envelope) = messageQueue.enqueue(handle) final def enqueue(handle: Envelope) = messageQueue.enqueue(handle)
final def dequeue(): Envelope = { final def dequeue(): Envelope = messageQueue.dequeue()
val envelope = messageQueue.dequeue()
if (envelope eq null) null
else if (envelope.receiver eq actor) envelope
else envelope.copy(receiver = actor)
}
final def numberOfMessages: Int = messageQueue.numberOfMessages final def numberOfMessages: Int = messageQueue.numberOfMessages
@ -106,8 +101,7 @@ class BalancingDispatcher(
} else true } else true
} }
override protected[akka] def dispatch(invocation: Envelope) = { override protected[akka] def dispatch(receiver: ActorCell, invocation: Envelope) = {
val receiver = invocation.receiver
messageQueue enqueue invocation messageQueue enqueue invocation
val buddy = buddies.pollFirst() val buddy = buddies.pollFirst()

View file

@ -78,8 +78,8 @@ class Dispatcher(
protected[akka] val executorServiceFactory = executorServiceFactoryProvider.createExecutorServiceFactory(name) protected[akka] val executorServiceFactory = executorServiceFactoryProvider.createExecutorServiceFactory(name)
protected[akka] val executorService = new AtomicReference[ExecutorService](new LazyExecutorServiceWrapper(executorServiceFactory.createExecutorService)) protected[akka] val executorService = new AtomicReference[ExecutorService](new LazyExecutorServiceWrapper(executorServiceFactory.createExecutorService))
protected[akka] def dispatch(invocation: Envelope) = { protected[akka] def dispatch(receiver: ActorCell, invocation: Envelope) = {
val mbox = invocation.receiver.mailbox val mbox = receiver.mailbox
mbox enqueue invocation mbox enqueue invocation
registerForExecution(mbox, true, false) registerForExecution(mbox, true, false)
} }

View file

@ -170,7 +170,7 @@ abstract class Mailbox(val actor: ActorCell) extends AbstractMailbox with Messag
var processedMessages = 0 var processedMessages = 0
val deadlineNs = if (dispatcher.isThroughputDeadlineTimeDefined) System.nanoTime + TimeUnit.MILLISECONDS.toNanos(dispatcher.throughputDeadlineTime) else 0 val deadlineNs = if (dispatcher.isThroughputDeadlineTimeDefined) System.nanoTime + TimeUnit.MILLISECONDS.toNanos(dispatcher.throughputDeadlineTime) else 0
do { do {
nextMessage.invoke actor invoke nextMessage
processAllSystemMessages() //After we're done, process all system messages processAllSystemMessages() //After we're done, process all system messages
@ -182,7 +182,7 @@ abstract class Mailbox(val actor: ActorCell) extends AbstractMailbox with Messag
} else null //Abort } else null //Abort
} while (nextMessage ne null) } while (nextMessage ne null)
} else { //If we only run one message per process } else { //If we only run one message per process
nextMessage.invoke //Just run it actor invoke nextMessage //Just run it
processAllSystemMessages() //After we're done, process all system messages processAllSystemMessages() //After we're done, process all system messages
} }
} }

View file

@ -101,7 +101,7 @@ class ActorSerialization(val app: AkkaApplication, remote: RemoteSupport) {
l map { m l map { m
remoteActorSerialization.createRemoteMessageProtocolBuilder( remoteActorSerialization.createRemoteMessageProtocolBuilder(
Option(m.receiver.self), localRef,
Left(actorRef.uuid), Left(actorRef.uuid),
actorRef.address, actorRef.address,
app.AkkaConfig.ActorTimeoutMillis, app.AkkaConfig.ActorTimeoutMillis,

View file

@ -150,20 +150,20 @@ class CallingThreadDispatcher(_app: AkkaApplication, val name: String = "calling
} }
} }
protected[akka] override def dispatch(handle: Envelope) { protected[akka] override def dispatch(receiver: ActorCell, handle: Envelope) {
val mbox = getMailbox(handle.receiver) val mbox = getMailbox(receiver)
val queue = mbox.queue val queue = mbox.queue
val execute = mbox.suspendSwitch.fold { val execute = mbox.suspendSwitch.fold {
queue.push(handle) queue.push(handle)
if (warnings && handle.channel.isInstanceOf[Promise[_]]) { if (warnings && handle.channel.isInstanceOf[Promise[_]]) {
app.eventHandler.warning(this, "suspendSwitch, creating Future could deadlock; target: %s" format handle.receiver) app.eventHandler.warning(this, "suspendSwitch, creating Future could deadlock; target: %s" format receiver)
} }
false false
} { } {
queue.push(handle) queue.push(handle)
if (queue.isActive) { if (queue.isActive) {
if (warnings && handle.channel.isInstanceOf[Promise[_]]) { if (warnings && handle.channel.isInstanceOf[Promise[_]]) {
app.eventHandler.warning(this, "blocked on this thread, creating Future could deadlock; target: %s" format handle.receiver) app.eventHandler.warning(this, "blocked on this thread, creating Future could deadlock; target: %s" format receiver)
} }
false false
} else { } else {
@ -200,10 +200,10 @@ class CallingThreadDispatcher(_app: AkkaApplication, val name: String = "calling
} }
if (handle ne null) { if (handle ne null) {
try { try {
handle.invoke mbox.actor.invoke(handle)
if (warnings) handle.channel match { if (warnings) handle.channel match {
case f: ActorPromise if !f.isCompleted case f: ActorPromise if !f.isCompleted
app.eventHandler.warning(this, "calling %s with message %s did not reply as expected, might deadlock" format (handle.receiver, handle.message)) app.eventHandler.warning(this, "calling %s with message %s did not reply as expected, might deadlock" format (mbox.actor, handle.message))
case _ case _
} }
true true