diff --git a/akka-actor/src/main/scala/actor/ActorRef.scala b/akka-actor/src/main/scala/actor/ActorRef.scala index 7a6e493f43..f88f40fecf 100644 --- a/akka-actor/src/main/scala/actor/ActorRef.scala +++ b/akka-actor/src/main/scala/actor/ActorRef.scala @@ -781,7 +781,6 @@ class LocalActorRef private[akka] ( "Can't restart an actor that has been shut down with 'stop' or 'exit'") if (!isRunning) { dispatcher.register(this) - dispatcher.start if (isTransactor) transactorConfig = transactorConfig.copy(factory = Some(TransactionFactory(transactorConfig.config, id))) diff --git a/akka-actor/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala b/akka-actor/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala index 6ec5f6963e..49e94afc6f 100644 --- a/akka-actor/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala +++ b/akka-actor/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala @@ -101,8 +101,6 @@ class ExecutorBasedEventDrivenDispatcher( registerForExecution(mbox) } - def isShutdown = active.isOff - /** * @return the mailbox associated with the actor */ diff --git a/akka-actor/src/main/scala/dispatch/ExecutorBasedEventDrivenWorkStealingDispatcher.scala b/akka-actor/src/main/scala/dispatch/ExecutorBasedEventDrivenWorkStealingDispatcher.scala index a9ea028210..53296cce71 100644 --- a/akka-actor/src/main/scala/dispatch/ExecutorBasedEventDrivenWorkStealingDispatcher.scala +++ b/akka-actor/src/main/scala/dispatch/ExecutorBasedEventDrivenWorkStealingDispatcher.scala @@ -94,8 +94,6 @@ class ExecutorBasedEventDrivenWorkStealingDispatcher( mailboxWasProcessed } - def isShutdown = active.isOff - /** * Process the messages in the mailbox of the given actor. * @return diff --git a/akka-actor/src/main/scala/dispatch/HawtDispatcher.scala b/akka-actor/src/main/scala/dispatch/HawtDispatcher.scala index 4ca63f64f2..a5bc315204 100644 --- a/akka-actor/src/main/scala/dispatch/HawtDispatcher.scala +++ b/akka-actor/src/main/scala/dispatch/HawtDispatcher.scala @@ -150,8 +150,6 @@ class HawtDispatcher(val aggregate: Boolean = true, val parent: DispatchQueue = def shutdown = active switchOff { releaseNonDaemon } - def isShutdown = active.isOff - def dispatch(invocation: MessageInvocation) = if (active.isOn) { mailbox(invocation.receiver).dispatch(invocation) } else { diff --git a/akka-actor/src/main/scala/dispatch/MessageHandling.scala b/akka-actor/src/main/scala/dispatch/MessageHandling.scala index 88d978b311..c84a56e78d 100644 --- a/akka-actor/src/main/scala/dispatch/MessageHandling.scala +++ b/akka-actor/src/main/scala/dispatch/MessageHandling.scala @@ -4,15 +4,12 @@ package se.scalablesolutions.akka.dispatch -import se.scalablesolutions.akka.actor.{Actor, ActorRef, Uuid, ActorInitializationException} -import se.scalablesolutions.akka.util.{SimpleLock, Duration, HashCode, Logging} -import se.scalablesolutions.akka.util.ReflectiveAccess.EnterpriseModule -import se.scalablesolutions.akka.AkkaException +import se.scalablesolutions.akka.actor.{ActorRegistry, ActorRef, Uuid, ActorInitializationException} import org.multiverse.commitbarriers.CountDownCommitBarrier -import java.util.{Queue, List} import java.util.concurrent._ +import se.scalablesolutions.akka.util. {Logging, HashCode} /** * @author Jonas Bonér @@ -61,12 +58,6 @@ final class MessageInvocation(val receiver: ActorRef, trait MessageDispatcher extends MailboxFactory with Logging { protected val uuids = new ConcurrentSkipListSet[Uuid] - - def dispatch(invocation: MessageInvocation): Unit - - def start: Unit - - def shutdown: Unit def register(actorRef: ActorRef) { start @@ -77,15 +68,29 @@ trait MessageDispatcher extends MailboxFactory with Logging { def unregister(actorRef: ActorRef) = { uuids remove actorRef.uuid actorRef.mailbox = null - if (canBeShutDown) shutdown // shut down in the dispatcher's references is zero + if (uuids.isEmpty) shutdown // shut down in the dispatcher's references is zero + } + + def stopAllLinkedActors { + val i = uuids.iterator + while(i.hasNext()) { + val uuid = i.next() + i.remove() + ActorRegistry.actorFor(uuid) match { + case Some(actor) => actor.stop + case None => log.warn("stopAllLinkedActors couldn't find linked actor: " + uuid) + } + } + if(uuids.isEmpty) shutdown } def suspend(actorRef: ActorRef): Unit def resume(actorRef: ActorRef): Unit - - def canBeShutDown: Boolean = uuids.isEmpty - def isShutdown: Boolean + def dispatch(invocation: MessageInvocation): Unit + + protected def start: Unit + protected def shutdown: Unit /** * Returns the size of the mailbox for the specified actor diff --git a/akka-actor/src/test/scala/actor/supervisor/SupervisorMiscSpec.scala b/akka-actor/src/test/scala/actor/supervisor/SupervisorMiscSpec.scala index 2ad8ac267b..17cfc94f83 100644 --- a/akka-actor/src/test/scala/actor/supervisor/SupervisorMiscSpec.scala +++ b/akka-actor/src/test/scala/actor/supervisor/SupervisorMiscSpec.scala @@ -70,10 +70,10 @@ class SupervisorMiscSpec extends WordSpec with MustMatchers { actor4 ! "kill" countDownLatch.await() - assert(!actor1.dispatcher.isShutdown, "dispatcher1 is shutdown") - assert(!actor2.dispatcher.isShutdown, "dispatcher2 is shutdown") - assert(!actor3.dispatcher.isShutdown, "dispatcher3 is shutdown") - assert(!actor4.dispatcher.isShutdown, "dispatcher4 is shutdown") + assert(!actor1.isShutdown, "actor1 is shutdown") + assert(!actor2.isShutdown, "actor2 is shutdown") + assert(!actor3.isShutdown, "actor3 is shutdown") + assert(!actor4.isShutdown, "actor4 is shutdown") } } } diff --git a/akka-actor/src/test/scala/dispatch/ExecutorBasedEventDrivenDispatcherActorSpec.scala b/akka-actor/src/test/scala/dispatch/ExecutorBasedEventDrivenDispatcherActorSpec.scala index 1c2670da0d..f2f4787070 100644 --- a/akka-actor/src/test/scala/dispatch/ExecutorBasedEventDrivenDispatcherActorSpec.scala +++ b/akka-actor/src/test/scala/dispatch/ExecutorBasedEventDrivenDispatcherActorSpec.scala @@ -98,7 +98,6 @@ class ExecutorBasedEventDrivenDispatcherActorSpec extends JUnitSuite { val result = latch.await(3,TimeUnit.SECONDS) fastOne.stop slowOne.stop - throughputDispatcher.shutdown assert(result === true) }