Fixing (yes, I know, I've said this a biiiiiillion times) the BalancingDispatcher, removing some wasteful volatile reads in the hot path

This commit is contained in:
Viktor Klang 2011-11-18 17:03:35 +01:00
parent 069c68f6d4
commit 9ae3d7feba
7 changed files with 106 additions and 115 deletions

View file

@ -232,9 +232,6 @@ abstract class ActorModelSpec extends AkkaSpec {
protected def newInterceptedDispatcher: MessageDispatcherInterceptor
protected def dispatcherType: String
// BalancingDispatcher of course does not work when another actor is in the pool, so overridden below
protected def wavesSupervisorDispatcher(dispatcher: MessageDispatcher) = dispatcher
"A " + dispatcherType must {
"must dynamically handle its own life cycle" in {
@ -347,9 +344,25 @@ abstract class ActorModelSpec extends AkkaSpec {
val boss = actorOf(Props(context {
case "run" for (_ 1 to num) (context.self startsWatching context.actorOf(props)) ! cachedMessage
case Terminated(child) stopLatch.countDown()
}).withDispatcher(wavesSupervisorDispatcher(dispatcher)))
}).withDispatcher(system.dispatcherFactory.newPinnedDispatcher("boss")))
boss ! "run"
assertCountDown(cachedMessage.latch, waitTime, "Counting down from " + num)
try {
assertCountDown(cachedMessage.latch, waitTime, "Counting down from " + num)
} catch {
case e
val buddies = dispatcher.asInstanceOf[BalancingDispatcher].buddies
val mq = dispatcher.asInstanceOf[BalancingDispatcher].messageQueue
System.err.println("Buddies left: ")
buddies.toArray foreach {
case cell: ActorCell
System.err.println(" - " + cell.self.path + " " + cell.isShutdown + " " + cell.mailbox.status + " " + cell.mailbox.numberOfMessages + " " + SystemMessage.size(cell.mailbox.systemDrain()))
}
System.err.println("Mailbox: " + mq.numberOfMessages + " " + mq.hasMessages + " ")
throw e
}
assertCountDown(stopLatch, waitTime, "Expected all children to stop")
boss.stop()
}
@ -451,8 +464,6 @@ class BalancingDispatcherModelSpec extends ActorModelSpec {
def dispatcherType = "Balancing Dispatcher"
override def wavesSupervisorDispatcher(dispatcher: MessageDispatcher) = system.dispatcher
"A " + dispatcherType must {
"process messages in parallel" in {
implicit val dispatcher = newInterceptedDispatcher

View file

@ -167,6 +167,7 @@ private[akka] class ActorCell(
}
}
//Memory consistency is handled by the Mailbox (reading mailbox status then processing messages, then writing mailbox status
final def systemInvoke(message: SystemMessage) {
def create(): Unit = try {
@ -244,26 +245,23 @@ private[akka] class ActorCell(
}
try {
val isClosed = mailbox.isClosed //Fence plus volatile read
if (!isClosed) {
if (stopping) message match {
case Terminate() terminate() // to allow retry
case _
}
else message match {
case Create() create()
case Recreate(cause) recreate(cause)
case Link(subject)
system.deathWatch.subscribe(self, subject)
if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self.toString, "now monitoring " + subject))
case Unlink(subject)
system.deathWatch.unsubscribe(self, subject)
if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self.toString, "stopped monitoring " + subject))
case Suspend() suspend()
case Resume() resume()
case Terminate() terminate()
case Supervise(child) supervise(child)
}
if (stopping) message match {
case Terminate() terminate() // to allow retry
case _
}
else message match {
case Create() create()
case Recreate(cause) recreate(cause)
case Link(subject)
system.deathWatch.subscribe(self, subject)
if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self.toString, "now monitoring " + subject))
case Unlink(subject)
system.deathWatch.unsubscribe(self, subject)
if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self.toString, "stopped monitoring " + subject))
case Suspend() suspend()
case Resume() resume()
case Terminate() terminate()
case Supervise(child) supervise(child)
}
} catch {
case e //Should we really catch everything here?
@ -273,50 +271,48 @@ private[akka] class ActorCell(
}
}
//Memory consistency is handled by the Mailbox (reading mailbox status then processing messages, then writing mailbox status
final def invoke(messageHandle: Envelope) {
try {
val isClosed = mailbox.isClosed //Fence plus volatile read
if (!isClosed) {
currentMessage = messageHandle
currentMessage = messageHandle
try {
try {
try {
cancelReceiveTimeout() // FIXME: leave this here?
messageHandle.message match {
case msg: AutoReceivedMessage autoReceiveMessage(messageHandle)
case msg
if (stopping) {
// receiving Terminated in response to stopping children is too common to generate noise
if (!msg.isInstanceOf[Terminated]) system.deadLetterMailbox.enqueue(self, messageHandle)
} else {
actor(msg)
}
}
currentMessage = null // reset current message after successful invocation
} catch {
case e
system.eventStream.publish(Error(e, self.toString, e.getMessage))
// prevent any further messages to be processed until the actor has been restarted
dispatcher.suspend(this)
// make sure that InterruptedException does not leave this thread
if (e.isInstanceOf[InterruptedException]) {
val ex = ActorInterruptedException(e)
props.faultHandler.handleSupervisorFailing(self, children)
parent.tell(Failed(ex), self)
throw e //Re-throw InterruptedExceptions as expected
cancelReceiveTimeout() // FIXME: leave this here?
messageHandle.message match {
case msg: AutoReceivedMessage autoReceiveMessage(messageHandle)
case msg
if (stopping) {
// receiving Terminated in response to stopping children is too common to generate noise
if (!msg.isInstanceOf[Terminated]) system.deadLetterMailbox.enqueue(self, messageHandle)
} else {
props.faultHandler.handleSupervisorFailing(self, children)
parent.tell(Failed(e), self)
actor(msg)
}
} finally {
checkReceiveTimeout // Reschedule receive timeout
}
currentMessage = null // reset current message after successful invocation
} catch {
case e
system.eventStream.publish(Error(e, self.toString, e.getMessage))
throw e
// prevent any further messages to be processed until the actor has been restarted
dispatcher.suspend(this)
// make sure that InterruptedException does not leave this thread
if (e.isInstanceOf[InterruptedException]) {
val ex = ActorInterruptedException(e)
props.faultHandler.handleSupervisorFailing(self, children)
parent.tell(Failed(ex), self)
throw e //Re-throw InterruptedExceptions as expected
} else {
props.faultHandler.handleSupervisorFailing(self, children)
parent.tell(Failed(e), self)
}
} finally {
checkReceiveTimeout // Reschedule receive timeout
}
} catch {
case e
system.eventStream.publish(Error(e, self.toString, e.getMessage))
throw e
}
}
}

View file

@ -236,10 +236,8 @@ abstract class MessageDispatcher(val prerequisites: DispatcherPrerequisites) ext
*/
def resume(actor: ActorCell): Unit = {
val mbox = actor.mailbox
if (mbox.dispatcher eq this) {
mbox.becomeOpen()
if ((mbox.dispatcher eq this) && mbox.becomeOpen())
registerForExecution(mbox, false, false)
}
}
/**

View file

@ -12,6 +12,7 @@ import annotation.tailrec
import akka.actor.ActorSystem
import akka.event.EventStream
import akka.actor.Scheduler
import java.util.concurrent.atomic.AtomicBoolean
/**
* An executor based event driven dispatcher which will try to redistribute work from busy actors to idle actors. It is assumed
@ -39,9 +40,10 @@ class BalancingDispatcher(
_timeoutMs: Long)
extends Dispatcher(_prerequisites, _name, throughput, throughputDeadlineTime, mailboxType, config, _timeoutMs) {
private val buddies = new ConcurrentSkipListSet[ActorCell](akka.util.Helpers.IdentityHashComparator)
val buddies = new ConcurrentSkipListSet[ActorCell](akka.util.Helpers.IdentityHashComparator)
val rebalance = new AtomicBoolean(false)
protected val messageQueue: MessageQueue = mailboxType match {
val messageQueue: MessageQueue = mailboxType match {
case u: UnboundedMailbox new QueueBasedMessageQueue with UnboundedMessageQueueSemantics {
final val queue = new ConcurrentLinkedQueue[Envelope]
}
@ -66,13 +68,13 @@ class BalancingDispatcher(
protected[akka] override def register(actor: ActorCell) = {
super.register(actor)
registerForExecution(actor.mailbox, false, false) //Allow newcomers to be productive from the first moment
buddies.add(actor)
}
protected[akka] override def unregister(actor: ActorCell) = {
super.unregister(actor)
buddies.remove(actor)
intoTheFray(except = actor)
super.unregister(actor)
intoTheFray(except = actor) //When someone leaves, he tosses a friend into the fray
}
protected override def cleanUpMailboxFor(actor: ActorCell, mailBox: Mailbox) {
@ -88,29 +90,27 @@ class BalancingDispatcher(
}
}
protected[akka] override def registerForExecution(mbox: Mailbox, hasMessagesHint: Boolean, hasSystemMessagesHint: Boolean): Boolean = {
if (!super.registerForExecution(mbox, hasMessagesHint, hasSystemMessagesHint)) {
mbox match {
case share: SharingMailbox if !share.isClosed buddies.add(share.actor); false
case _ false
}
} else true
}
def intoTheFray(except: ActorCell): Unit =
if (rebalance.compareAndSet(false, true)) {
try {
val i = buddies.iterator()
def intoTheFray(except: ActorCell): Unit = {
var buddy = buddies.pollFirst()
while (buddy ne null) {
val mbox = buddy.mailbox
buddy = if ((buddy eq except) || (!registerForExecution(mbox, false, false) && mbox.isClosed)) buddies.pollFirst() else null
@tailrec
def throwIn(): Unit = {
val n = if (i.hasNext) i.next() else null
if (n eq null) ()
else if ((n ne except) && registerForExecution(n.mailbox, false, false)) ()
else throwIn()
}
throwIn()
} finally {
rebalance.set(false)
}
}
}
override protected[akka] def dispatch(receiver: ActorCell, invocation: Envelope) = {
messageQueue.enqueue(receiver.self, invocation)
registerForExecution(receiver.mailbox, false, false)
intoTheFray(except = receiver)
if (!registerForExecution(receiver.mailbox, false, false))
intoTheFray(except = receiver)
}
}

View file

@ -107,20 +107,16 @@ class Dispatcher(
protected[akka] def createMailbox(actor: ActorCell): Mailbox = mailboxType.create(actor)
protected[akka] def shutdown {
executorService.getAndSet(new ExecutorServiceDelegate {
protected[akka] def shutdown: Unit =
Option(executorService.getAndSet(new ExecutorServiceDelegate {
lazy val executor = executorServiceFactory.createExecutorService
}) match {
case null
case some some.shutdown()
}
}
})) foreach { _.shutdown() }
/**
* Returns if it was registered
*/
protected[akka] override def registerForExecution(mbox: Mailbox, hasMessageHint: Boolean, hasSystemMessageHint: Boolean): Boolean = {
if (mbox.shouldBeScheduledForExecution(hasMessageHint, hasSystemMessageHint)) { //This needs to be here to ensure thread safety and no races
if (mbox.canBeScheduledForExecution(hasMessageHint, hasSystemMessageHint)) { //This needs to be here to ensure thread safety and no races
if (mbox.setAsScheduled()) {
try {
executorService.get() execute mbox

View file

@ -128,15 +128,20 @@ abstract class Mailbox(val actor: ActorCell) extends AbstractMailbox with Messag
protected final def systemQueueGet: SystemMessage = AbstractMailbox.systemQueueUpdater.get(this)
protected final def systemQueuePut(_old: SystemMessage, _new: SystemMessage): Boolean = AbstractMailbox.systemQueueUpdater.compareAndSet(this, _old, _new)
def shouldBeScheduledForExecution(hasMessageHint: Boolean, hasSystemMessageHint: Boolean): Boolean = status match {
final def canBeScheduledForExecution(hasMessageHint: Boolean, hasSystemMessageHint: Boolean): Boolean = status match {
case Open | Scheduled hasMessageHint || hasSystemMessageHint || hasSystemMessages || hasMessages
case Closed false
case _ hasSystemMessageHint || hasSystemMessages
}
final def run = {
try processMailbox() finally {
setAsIdle()
try {
if (!isClosed) { //Volatile read, needed here
processAllSystemMessages() //First, deal with any system messages
processMailbox() //Then deal with messages
}
} finally {
setAsIdle() //Volatile write, needed here
dispatcher.registerForExecution(this, false, false)
}
}
@ -146,9 +151,7 @@ abstract class Mailbox(val actor: ActorCell) extends AbstractMailbox with Messag
*
* @return true if the processing finished before the mailbox was empty, due to the throughput constraint
*/
final def processMailbox() {
processAllSystemMessages() //First, process all system messages
private final def processMailbox() {
if (shouldProcessMessage) {
var nextMessage = dequeue()
if (nextMessage ne null) { //If we have a message
@ -175,7 +178,7 @@ abstract class Mailbox(val actor: ActorCell) extends AbstractMailbox with Messag
}
}
def processAllSystemMessages() {
final def processAllSystemMessages() {
var nextMessage = systemDrain()
try {
while (nextMessage ne null) {

View file

@ -26,19 +26,6 @@ object Helpers {
def compare(a: AnyRef, b: AnyRef): Int = compareIdentityHash(a, b)
}
def intToBytes(value: Int): Array[Byte] = {
val bytes = new Array[Byte](4)
bytes(0) = (value >>> 24).asInstanceOf[Byte]
bytes(1) = (value >>> 16).asInstanceOf[Byte]
bytes(2) = (value >>> 8).asInstanceOf[Byte]
bytes(3) = value.asInstanceOf[Byte]
bytes
}
def bytesToInt(bytes: Array[Byte], offset: Int): Int = {
(0 until 4).foldLeft(0)((value, index) value + ((bytes(index + offset) & 0x000000FF) << ((4 - 1 - index) * 8)))
}
final val base64chars = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789*?"
@tailrec