diff --git a/akka-actor/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala b/akka-actor/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala index 1203d32fde..4d8b191eb5 100644 --- a/akka-actor/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala +++ b/akka-actor/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala @@ -9,6 +9,7 @@ import se.scalablesolutions.akka.util.ReflectiveAccess.EnterpriseModule import java.util.Queue import java.util.concurrent.{RejectedExecutionException, ConcurrentLinkedQueue, LinkedBlockingQueue} +import se.scalablesolutions.akka.util.Switch /** * Default settings are: @@ -85,7 +86,7 @@ class ExecutorBasedEventDrivenDispatcher( val mailboxType = Some(_mailboxType) - @volatile private[akka] var active = false + private[akka] val active = new Switch(false) val name = "akka:event-driven:dispatcher:" + _name init @@ -99,20 +100,20 @@ class ExecutorBasedEventDrivenDispatcher( /** * @return the mailbox associated with the actor */ - private def getMailbox(receiver: ActorRef) = { - val mb = receiver.mailbox.asInstanceOf[MessageQueue with ExecutableMailbox] - mb.register(this) - mb - } + private def getMailbox(receiver: ActorRef) = receiver.mailbox.asInstanceOf[MessageQueue with ExecutableMailbox] override def mailboxSize(actorRef: ActorRef) = getMailbox(actorRef).size def createTransientMailbox(actorRef: ActorRef, mailboxType: TransientMailboxType): AnyRef = mailboxType match { - case UnboundedMailbox(blocking) => - new DefaultUnboundedMessageQueue(blocking) with ExecutableMailbox + case UnboundedMailbox(blocking) => new DefaultUnboundedMessageQueue(blocking) with ExecutableMailbox { + def dispatcher = ExecutorBasedEventDrivenDispatcher.this + } + case BoundedMailbox(blocking, capacity, pushTimeOut) => val cap = if (mailboxCapacity == -1) capacity else mailboxCapacity - new DefaultBoundedMessageQueue(cap, pushTimeOut, blocking) with ExecutableMailbox + new DefaultBoundedMessageQueue(cap, pushTimeOut, blocking) with ExecutableMailbox { + def dispatcher = ExecutorBasedEventDrivenDispatcher.this + } } /** @@ -128,24 +129,23 @@ class ExecutorBasedEventDrivenDispatcher( case JMSBasedDurableMailbox(serializer) => throw new UnsupportedOperationException("JMSBasedDurableMailbox is not yet supported") } - def start = if (!active) { + def start = active switchOn { log.debug("Starting up %s\n\twith throughput [%d]", toString, throughput) - active = true } - def shutdown = if (active) { + def shutdown = active switchOff { log.debug("Shutting down %s", toString) executor.shutdownNow - active = false uuids.clear } - def ensureNotActive(): Unit = if (active) { + def ensureNotActive(): Unit = if (active.isOn) { throw new IllegalActorStateException( "Can't build a new thread pool for a dispatcher that is already up and running") } - override def toString = "ExecutorBasedEventDrivenDispatcher[" + name + "]" + + override val toString = getClass.getSimpleName + "[" + name + "]" // FIXME: should we have an unbounded queue and not bounded as default ???? private[akka] def init = { @@ -159,12 +159,8 @@ class ExecutorBasedEventDrivenDispatcher( * This is the behavior of an ExecutorBasedEventDrivenDispatchers mailbox. */ trait ExecutableMailbox extends Runnable { self: MessageQueue => - - private var _dispatcher: Option[ExecutorBasedEventDrivenDispatcher] = None - def register(md: ExecutorBasedEventDrivenDispatcher) = _dispatcher = Some(md) - def dispatcher: ExecutorBasedEventDrivenDispatcher = _dispatcher.getOrElse( - throw new IllegalActorStateException("mailbox.register(dispatcher) has not been invoked")) + def dispatcher: ExecutorBasedEventDrivenDispatcher final def run = { val reschedule = try { @@ -205,8 +201,7 @@ trait ExecutableMailbox extends Runnable { self: MessageQueue => false } - - def registerForExecution: Unit = if (dispatcher.active) { + def registerForExecution: Unit = if (dispatcher.active.isOn) { if (dispatcherLock.tryLock()) { try { dispatcher.execute(this) diff --git a/akka-actor/src/main/scala/dispatch/ThreadBasedDispatcher.scala b/akka-actor/src/main/scala/dispatch/ThreadBasedDispatcher.scala index f3f8494219..a32d2a2957 100644 --- a/akka-actor/src/main/scala/dispatch/ThreadBasedDispatcher.scala +++ b/akka-actor/src/main/scala/dispatch/ThreadBasedDispatcher.scala @@ -34,8 +34,6 @@ class ThreadBasedDispatcher(private val actor: ActorRef, _mailboxType: MailboxTy if (actorRef != actor) throw new IllegalArgumentException("Cannot register to anyone but " + actor) super.register(actorRef) } - - override def toString = "ThreadBasedDispatcher[" + name + "]" } object ThreadBasedDispatcher {