diff --git a/akka-actor/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala b/akka-actor/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala index 4d8b191eb5..63a0044368 100644 --- a/akka-actor/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala +++ b/akka-actor/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala @@ -94,7 +94,7 @@ class ExecutorBasedEventDrivenDispatcher( def dispatch(invocation: MessageInvocation) = { val mbox = getMailbox(invocation.receiver) mbox enqueue invocation - mbox.registerForExecution + registerForExecution(mbox) } /** @@ -144,6 +144,17 @@ class ExecutorBasedEventDrivenDispatcher( "Can't build a new thread pool for a dispatcher that is already up and running") } + private[akka] def registerForExecution(mbox: MessageQueue with ExecutableMailbox): Unit = if (active.isOn) { + if (mbox.dispatcherLock.tryLock()) { + try { + executor execute mbox + } catch { + case e: RejectedExecutionException => + mbox.dispatcherLock.unlock() + throw e + } + } + } else log.warning("%s is shut down,\n\tignoring the rest of the messages in the mailbox of\n\t%s", this, mbox) override val toString = getClass.getSimpleName + "[" + name + "]" @@ -168,7 +179,8 @@ trait ExecutableMailbox extends Runnable { self: MessageQueue => } finally { dispatcherLock.unlock() } - if (reschedule || !self.isEmpty) registerForExecution + if (reschedule || !self.isEmpty) + dispatcher.registerForExecution(this) } /** @@ -200,17 +212,5 @@ trait ExecutableMailbox extends Runnable { self: MessageQueue => } false } - - def registerForExecution: Unit = if (dispatcher.active.isOn) { - if (dispatcherLock.tryLock()) { - try { - dispatcher.execute(this) - } catch { - case e: RejectedExecutionException => - dispatcherLock.unlock() - throw e - } - } - } else dispatcher.log.warning("%s is shut down,\n\tignoring the rest of the messages in the mailbox of\n\t%s", toString, this) } diff --git a/akka-actor/src/main/scala/dispatch/HawtDispatcher.scala b/akka-actor/src/main/scala/dispatch/HawtDispatcher.scala index cb96b39a3b..dabb5a4c2e 100644 --- a/akka-actor/src/main/scala/dispatch/HawtDispatcher.scala +++ b/akka-actor/src/main/scala/dispatch/HawtDispatcher.scala @@ -13,6 +13,7 @@ import org.fusesource.hawtdispatch.ListEventAggregator import java.util.concurrent.atomic.{AtomicInteger, AtomicBoolean} import java.util.concurrent.CountDownLatch +import se.scalablesolutions.akka.util.Switch /** * Holds helper methods for working with actors that are using a HawtDispatcher as it's dispatcher. @@ -141,19 +142,17 @@ object HawtDispatcher { class HawtDispatcher(val aggregate: Boolean = true, val parent: DispatchQueue = globalQueue) extends MessageDispatcher { import HawtDispatcher._ - private val active = new AtomicBoolean(false) + private val active = new Switch(false) val mailboxType: Option[MailboxType] = None - def start = if (active.compareAndSet(false, true)) retainNonDaemon + def start = active switchOn { retainNonDaemon } - def execute(task: Runnable) {} + def shutdown = active switchOff { releaseNonDaemon } - def shutdown = if (active.compareAndSet(true, false)) releaseNonDaemon + def isShutdown = active.isOff - def isShutdown = !active.get - - def dispatch(invocation: MessageInvocation) = if (active.get()) { + def dispatch(invocation: MessageInvocation) = if (active.isOn) { mailbox(invocation.receiver).dispatch(invocation) } else { log.warning("%s is shut down,\n\tignoring the the messages sent to\n\t%s", toString, invocation.receiver) diff --git a/akka-actor/src/main/scala/dispatch/MessageHandling.scala b/akka-actor/src/main/scala/dispatch/MessageHandling.scala index 60c62c56b9..c9df3f3b87 100644 --- a/akka-actor/src/main/scala/dispatch/MessageHandling.scala +++ b/akka-actor/src/main/scala/dispatch/MessageHandling.scala @@ -64,8 +64,6 @@ trait MessageDispatcher extends MailboxFactory with Logging { def dispatch(invocation: MessageInvocation): Unit - def execute(task: Runnable): Unit - def start: Unit def shutdown: Unit diff --git a/akka-actor/src/main/scala/dispatch/ThreadBasedDispatcher.scala b/akka-actor/src/main/scala/dispatch/ThreadBasedDispatcher.scala index a32d2a2957..8c2229c6b3 100644 --- a/akka-actor/src/main/scala/dispatch/ThreadBasedDispatcher.scala +++ b/akka-actor/src/main/scala/dispatch/ThreadBasedDispatcher.scala @@ -37,7 +37,7 @@ class ThreadBasedDispatcher(private val actor: ActorRef, _mailboxType: MailboxTy } object ThreadBasedDispatcher { - def oneThread(b: ThreadPoolBuilder) { + val oneThread: (ThreadPoolBuilder) => Unit = b => { b setCorePoolSize 1 b setMaxPoolSize 1 b setAllowCoreThreadTimeout true diff --git a/akka-actor/src/main/scala/dispatch/ThreadPoolBuilder.scala b/akka-actor/src/main/scala/dispatch/ThreadPoolBuilder.scala index 7559785dcf..1e5e257349 100644 --- a/akka-actor/src/main/scala/dispatch/ThreadPoolBuilder.scala +++ b/akka-actor/src/main/scala/dispatch/ThreadPoolBuilder.scala @@ -29,8 +29,6 @@ trait ThreadPoolBuilder extends Logging { private lazy val threadFactory = new MonitorableThreadFactory(name) protected var executor: ExecutorService = _ - - def execute(task: Runnable) = executor execute task def isShutdown = executor.isShutdown