Fixed bug in shutdown management of global event-based dispatcher

This commit is contained in:
Jonas Bonér 2010-01-02 08:40:09 +01:00
parent d2e67f0c2b
commit fd2af286e5
6 changed files with 22 additions and 6 deletions

View file

@ -415,6 +415,10 @@ trait Actor extends TransactionManagement {
def start: Actor = synchronized { def start: Actor = synchronized {
if (_isShutDown) throw new IllegalStateException("Can't restart an actor that has been shut down with 'exit'") if (_isShutDown) throw new IllegalStateException("Can't restart an actor that has been shut down with 'exit'")
if (!_isRunning) { if (!_isRunning) {
if (messageDispatcher.isShutdown &&
messageDispatcher.isInstanceOf[Dispatchers.globalExecutorBasedEventDrivenDispatcher.type]) {
messageDispatcher.asInstanceOf[ExecutorBasedEventDrivenDispatcher].init
}
messageDispatcher.register(this) messageDispatcher.register(this)
messageDispatcher.start messageDispatcher.start
_isRunning = true _isRunning = true

View file

@ -57,9 +57,8 @@ class ExecutorBasedEventDrivenDispatcher(_name: String) extends MessageDispatche
@volatile private var active: Boolean = false @volatile private var active: Boolean = false
val name: String = "event-driven:executor:dispatcher:" + _name val name: String = "event-driven:executor:dispatcher:" + _name
init
withNewThreadPoolWithLinkedBlockingQueueWithUnboundedCapacity.buildThreadPool
def dispatch(invocation: MessageInvocation) = if (active) { def dispatch(invocation: MessageInvocation) = if (active) {
executor.execute(new Runnable() { executor.execute(new Runnable() {
def run = { def run = {
@ -79,10 +78,14 @@ class ExecutorBasedEventDrivenDispatcher(_name: String) extends MessageDispatche
} }
def shutdown = if (active) { def shutdown = if (active) {
log.debug("Shutting down ThreadBasedDispatcher [%s]", name)
executor.shutdownNow executor.shutdownNow
active = false active = false
references.clear
} }
def ensureNotActive: Unit = if (active) throw new IllegalStateException( def ensureNotActive: Unit = if (active) throw new IllegalStateException(
"Can't build a new thread pool for a dispatcher that is already up and running") "Can't build a new thread pool for a dispatcher that is already up and running")
private[akka] def init = withNewThreadPoolWithLinkedBlockingQueueWithUnboundedCapacity.buildThreadPool
} }

View file

@ -6,7 +6,7 @@ package se.scalablesolutions.akka.dispatch
import java.util.List import java.util.List
import se.scalablesolutions.akka.util.HashCode import se.scalablesolutions.akka.util.{HashCode, Logging}
import se.scalablesolutions.akka.stm.Transaction import se.scalablesolutions.akka.stm.Transaction
import se.scalablesolutions.akka.actor.Actor import se.scalablesolutions.akka.actor.Actor
@ -56,7 +56,7 @@ trait MessageInvoker {
def invoke(message: MessageInvocation) def invoke(message: MessageInvocation)
} }
trait MessageDispatcher { trait MessageDispatcher extends Logging {
protected val references = new ConcurrentHashMap[String, Actor] protected val references = new ConcurrentHashMap[String, Actor]
def dispatch(invocation: MessageInvocation) def dispatch(invocation: MessageInvocation)
def start def start
@ -64,6 +64,7 @@ trait MessageDispatcher {
def register(actor: Actor) = references.put(actor.uuid, actor) def register(actor: Actor) = references.put(actor.uuid, actor)
def unregister(actor: Actor) = references.remove(actor.uuid) def unregister(actor: Actor) = references.remove(actor.uuid)
def canBeShutDown: Boolean = references.isEmpty def canBeShutDown: Boolean = references.isEmpty
def isShutdown: Boolean
} }
trait MessageDemultiplexer { trait MessageDemultiplexer {

View file

@ -36,6 +36,8 @@ class ReactorBasedSingleThreadEventDrivenDispatcher(name: String) extends Abstra
selectorThread.start selectorThread.start
} }
def isShutdown = !active
class Demultiplexer(private val messageQueue: ReactiveMessageQueue) extends MessageDemultiplexer { class Demultiplexer(private val messageQueue: ReactiveMessageQueue) extends MessageDemultiplexer {
private val selectedQueue: List[MessageInvocation] = new LinkedList[MessageInvocation] private val selectedQueue: List[MessageInvocation] = new LinkedList[MessageInvocation]

View file

@ -39,9 +39,13 @@ class ThreadBasedDispatcher private[akka] (val name: String, val messageHandler:
selectorThread.start selectorThread.start
} }
def isShutdown = !active
def shutdown = if (active) { def shutdown = if (active) {
log.debug("Shutting down ExecutorBasedEventDrivenDispatcher [%s]", name)
active = false active = false
selectorThread.interrupt selectorThread.interrupt
references.clear
} }
} }

View file

@ -28,6 +28,8 @@ trait ThreadPoolBuilder {
protected var executor: ExecutorService = _ protected var executor: ExecutorService = _
def isShutdown = executor.isShutdown
def buildThreadPool = synchronized { def buildThreadPool = synchronized {
ensureNotActive ensureNotActive
inProcessOfBuilding = false inProcessOfBuilding = false