diff --git a/akka-core/src/main/scala/actor/Actor.scala b/akka-core/src/main/scala/actor/Actor.scala index c6181c515a..0118aea1c0 100644 --- a/akka-core/src/main/scala/actor/Actor.scala +++ b/akka-core/src/main/scala/actor/Actor.scala @@ -415,6 +415,10 @@ trait Actor extends TransactionManagement { def start: Actor = synchronized { if (_isShutDown) throw new IllegalStateException("Can't restart an actor that has been shut down with 'exit'") if (!_isRunning) { + if (messageDispatcher.isShutdown && + messageDispatcher.isInstanceOf[Dispatchers.globalExecutorBasedEventDrivenDispatcher.type]) { + messageDispatcher.asInstanceOf[ExecutorBasedEventDrivenDispatcher].init + } messageDispatcher.register(this) messageDispatcher.start _isRunning = true diff --git a/akka-core/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala b/akka-core/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala index 3085ffcb11..7da13a10b3 100644 --- a/akka-core/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala +++ b/akka-core/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala @@ -57,9 +57,8 @@ class ExecutorBasedEventDrivenDispatcher(_name: String) extends MessageDispatche @volatile private var active: Boolean = false val name: String = "event-driven:executor:dispatcher:" + _name - - withNewThreadPoolWithLinkedBlockingQueueWithUnboundedCapacity.buildThreadPool - + init + def dispatch(invocation: MessageInvocation) = if (active) { executor.execute(new Runnable() { def run = { @@ -79,10 +78,14 @@ class ExecutorBasedEventDrivenDispatcher(_name: String) extends MessageDispatche } def shutdown = if (active) { + log.debug("Shutting down ThreadBasedDispatcher [%s]", name) executor.shutdownNow active = false + references.clear } - + def ensureNotActive: Unit = if (active) throw new IllegalStateException( "Can't build a new thread pool for a dispatcher that is already up and running") + + private[akka] def init = withNewThreadPoolWithLinkedBlockingQueueWithUnboundedCapacity.buildThreadPool } \ No newline at end of file diff --git a/akka-core/src/main/scala/dispatch/Reactor.scala b/akka-core/src/main/scala/dispatch/Reactor.scala index 1c5edc0400..f7bfa52215 100644 --- a/akka-core/src/main/scala/dispatch/Reactor.scala +++ b/akka-core/src/main/scala/dispatch/Reactor.scala @@ -6,7 +6,7 @@ package se.scalablesolutions.akka.dispatch import java.util.List -import se.scalablesolutions.akka.util.HashCode +import se.scalablesolutions.akka.util.{HashCode, Logging} import se.scalablesolutions.akka.stm.Transaction import se.scalablesolutions.akka.actor.Actor @@ -56,7 +56,7 @@ trait MessageInvoker { def invoke(message: MessageInvocation) } -trait MessageDispatcher { +trait MessageDispatcher extends Logging { protected val references = new ConcurrentHashMap[String, Actor] def dispatch(invocation: MessageInvocation) def start @@ -64,6 +64,7 @@ trait MessageDispatcher { def register(actor: Actor) = references.put(actor.uuid, actor) def unregister(actor: Actor) = references.remove(actor.uuid) def canBeShutDown: Boolean = references.isEmpty + def isShutdown: Boolean } trait MessageDemultiplexer { diff --git a/akka-core/src/main/scala/dispatch/ReactorBasedSingleThreadEventDrivenDispatcher.scala b/akka-core/src/main/scala/dispatch/ReactorBasedSingleThreadEventDrivenDispatcher.scala index 12de4ff0ec..15af513d62 100644 --- a/akka-core/src/main/scala/dispatch/ReactorBasedSingleThreadEventDrivenDispatcher.scala +++ b/akka-core/src/main/scala/dispatch/ReactorBasedSingleThreadEventDrivenDispatcher.scala @@ -36,6 +36,8 @@ class ReactorBasedSingleThreadEventDrivenDispatcher(name: String) extends Abstra selectorThread.start } + def isShutdown = !active + class Demultiplexer(private val messageQueue: ReactiveMessageQueue) extends MessageDemultiplexer { private val selectedQueue: List[MessageInvocation] = new LinkedList[MessageInvocation] diff --git a/akka-core/src/main/scala/dispatch/ThreadBasedDispatcher.scala b/akka-core/src/main/scala/dispatch/ThreadBasedDispatcher.scala index 98063f0bcb..27eb3a1ea5 100644 --- a/akka-core/src/main/scala/dispatch/ThreadBasedDispatcher.scala +++ b/akka-core/src/main/scala/dispatch/ThreadBasedDispatcher.scala @@ -39,9 +39,13 @@ class ThreadBasedDispatcher private[akka] (val name: String, val messageHandler: selectorThread.start } + def isShutdown = !active + def shutdown = if (active) { + log.debug("Shutting down ExecutorBasedEventDrivenDispatcher [%s]", name) active = false selectorThread.interrupt + references.clear } } diff --git a/akka-core/src/main/scala/dispatch/ThreadPoolBuilder.scala b/akka-core/src/main/scala/dispatch/ThreadPoolBuilder.scala index c73310cf29..cb465907cb 100644 --- a/akka-core/src/main/scala/dispatch/ThreadPoolBuilder.scala +++ b/akka-core/src/main/scala/dispatch/ThreadPoolBuilder.scala @@ -28,6 +28,8 @@ trait ThreadPoolBuilder { protected var executor: ExecutorService = _ + def isShutdown = executor.isShutdown + def buildThreadPool = synchronized { ensureNotActive inProcessOfBuilding = false