Fixing performance regression

This commit is contained in:
Viktor Klang 2010-10-11 12:42:02 +02:00
parent 685c6df702
commit 66fcd626de
2 changed files with 17 additions and 24 deletions

View file

@ -9,6 +9,7 @@ import se.scalablesolutions.akka.util.ReflectiveAccess.EnterpriseModule
import java.util.Queue import java.util.Queue
import java.util.concurrent.{RejectedExecutionException, ConcurrentLinkedQueue, LinkedBlockingQueue} import java.util.concurrent.{RejectedExecutionException, ConcurrentLinkedQueue, LinkedBlockingQueue}
import se.scalablesolutions.akka.util.Switch
/** /**
* Default settings are: * Default settings are:
@ -85,7 +86,7 @@ class ExecutorBasedEventDrivenDispatcher(
val mailboxType = Some(_mailboxType) val mailboxType = Some(_mailboxType)
@volatile private[akka] var active = false private[akka] val active = new Switch(false)
val name = "akka:event-driven:dispatcher:" + _name val name = "akka:event-driven:dispatcher:" + _name
init init
@ -99,20 +100,20 @@ class ExecutorBasedEventDrivenDispatcher(
/** /**
* @return the mailbox associated with the actor * @return the mailbox associated with the actor
*/ */
private def getMailbox(receiver: ActorRef) = { private def getMailbox(receiver: ActorRef) = receiver.mailbox.asInstanceOf[MessageQueue with ExecutableMailbox]
val mb = receiver.mailbox.asInstanceOf[MessageQueue with ExecutableMailbox]
mb.register(this)
mb
}
override def mailboxSize(actorRef: ActorRef) = getMailbox(actorRef).size override def mailboxSize(actorRef: ActorRef) = getMailbox(actorRef).size
def createTransientMailbox(actorRef: ActorRef, mailboxType: TransientMailboxType): AnyRef = mailboxType match { def createTransientMailbox(actorRef: ActorRef, mailboxType: TransientMailboxType): AnyRef = mailboxType match {
case UnboundedMailbox(blocking) => case UnboundedMailbox(blocking) => new DefaultUnboundedMessageQueue(blocking) with ExecutableMailbox {
new DefaultUnboundedMessageQueue(blocking) with ExecutableMailbox def dispatcher = ExecutorBasedEventDrivenDispatcher.this
}
case BoundedMailbox(blocking, capacity, pushTimeOut) => case BoundedMailbox(blocking, capacity, pushTimeOut) =>
val cap = if (mailboxCapacity == -1) capacity else mailboxCapacity val cap = if (mailboxCapacity == -1) capacity else mailboxCapacity
new DefaultBoundedMessageQueue(cap, pushTimeOut, blocking) with ExecutableMailbox new DefaultBoundedMessageQueue(cap, pushTimeOut, blocking) with ExecutableMailbox {
def dispatcher = ExecutorBasedEventDrivenDispatcher.this
}
} }
/** /**
@ -128,24 +129,23 @@ class ExecutorBasedEventDrivenDispatcher(
case JMSBasedDurableMailbox(serializer) => throw new UnsupportedOperationException("JMSBasedDurableMailbox is not yet supported") case JMSBasedDurableMailbox(serializer) => throw new UnsupportedOperationException("JMSBasedDurableMailbox is not yet supported")
} }
def start = if (!active) { def start = active switchOn {
log.debug("Starting up %s\n\twith throughput [%d]", toString, throughput) log.debug("Starting up %s\n\twith throughput [%d]", toString, throughput)
active = true
} }
def shutdown = if (active) { def shutdown = active switchOff {
log.debug("Shutting down %s", toString) log.debug("Shutting down %s", toString)
executor.shutdownNow executor.shutdownNow
active = false
uuids.clear uuids.clear
} }
def ensureNotActive(): Unit = if (active) { def ensureNotActive(): Unit = if (active.isOn) {
throw new IllegalActorStateException( throw new IllegalActorStateException(
"Can't build a new thread pool for a dispatcher that is already up and running") "Can't build a new thread pool for a dispatcher that is already up and running")
} }
override def toString = "ExecutorBasedEventDrivenDispatcher[" + name + "]"
override val toString = getClass.getSimpleName + "[" + name + "]"
// FIXME: should we have an unbounded queue and not bounded as default ???? // FIXME: should we have an unbounded queue and not bounded as default ????
private[akka] def init = { private[akka] def init = {
@ -160,11 +160,7 @@ class ExecutorBasedEventDrivenDispatcher(
*/ */
trait ExecutableMailbox extends Runnable { self: MessageQueue => trait ExecutableMailbox extends Runnable { self: MessageQueue =>
private var _dispatcher: Option[ExecutorBasedEventDrivenDispatcher] = None def dispatcher: ExecutorBasedEventDrivenDispatcher
def register(md: ExecutorBasedEventDrivenDispatcher) = _dispatcher = Some(md)
def dispatcher: ExecutorBasedEventDrivenDispatcher = _dispatcher.getOrElse(
throw new IllegalActorStateException("mailbox.register(dispatcher) has not been invoked"))
final def run = { final def run = {
val reschedule = try { val reschedule = try {
@ -205,8 +201,7 @@ trait ExecutableMailbox extends Runnable { self: MessageQueue =>
false false
} }
def registerForExecution: Unit = if (dispatcher.active.isOn) {
def registerForExecution: Unit = if (dispatcher.active) {
if (dispatcherLock.tryLock()) { if (dispatcherLock.tryLock()) {
try { try {
dispatcher.execute(this) dispatcher.execute(this)

View file

@ -34,8 +34,6 @@ class ThreadBasedDispatcher(private val actor: ActorRef, _mailboxType: MailboxTy
if (actorRef != actor) throw new IllegalArgumentException("Cannot register to anyone but " + actor) if (actorRef != actor) throw new IllegalArgumentException("Cannot register to anyone but " + actor)
super.register(actorRef) super.register(actorRef)
} }
override def toString = "ThreadBasedDispatcher[" + name + "]"
} }
object ThreadBasedDispatcher { object ThreadBasedDispatcher {