diff --git a/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala index ca2bbed25f..5a0fd6ee0d 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala @@ -120,17 +120,11 @@ object ActorModelSpec { protected[akka] abstract override def register(actor: ActorCell) { getStats(actor.ref).registers.incrementAndGet() super.register(actor) - //printMembers("after registering " + actor) } protected[akka] abstract override def unregister(actor: ActorCell) { getStats(actor.ref).unregisters.incrementAndGet() super.unregister(actor) - //printMembers("after unregistering " + actor) - } - - def printMembers(when: String) { - System.err.println(when + " then " + uuids.toArray.toList.map(_.toString.split("-")(0)).mkString("==> ", ", ", "<==")) } protected[akka] abstract override def dispatch(invocation: Envelope) { diff --git a/akka-actor-tests/src/test/scala/akka/actor/dispatch/DispatcherActorSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/dispatch/DispatcherActorSpec.scala index 9877d4eac6..1aae1091f6 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/dispatch/DispatcherActorSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/dispatch/DispatcherActorSpec.scala @@ -81,7 +81,7 @@ class DispatcherActorSpec extends AkkaSpec { (1 to 100) foreach { _ ⇒ slowOne ! "ping" } fastOne ! "sabotage" start.countDown() - val result = latch.await(5, TimeUnit.SECONDS) + val result = latch.await(10, TimeUnit.SECONDS) fastOne.stop() slowOne.stop() assert(result === true) diff --git a/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala index c7e7fab35a..4f6cdb73ec 100644 --- a/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala @@ -70,8 +70,8 @@ object MessageDispatcher { abstract class MessageDispatcher(val app: AkkaApplication) extends Serializable { import MessageDispatcher._ - protected val uuids = new ConcurrentSkipListSet[Uuid] protected val _tasks = new AtomicLong(0L) + protected val _actors = new AtomicLong(0L) protected val guard = new ReentrantGuard protected val active = new Switch(false) @@ -123,7 +123,7 @@ abstract class MessageDispatcher(val app: AkkaApplication) extends Serializable final def detach(actor: ActorCell) { guard withGuard { unregister(actor) - if (uuids.isEmpty && _tasks.get == 0) { + if (_tasks.get == 0 && _actors.get == 0) { shutdownSchedule match { case UNSCHEDULED ⇒ shutdownSchedule = SCHEDULED @@ -155,7 +155,7 @@ abstract class MessageDispatcher(val app: AkkaApplication) extends Serializable private val taskCleanup: () ⇒ Unit = () ⇒ if (_tasks.decrementAndGet() == 0) { guard withGuard { - if (_tasks.get == 0 && uuids.isEmpty) { + if (_tasks.get == 0 && _actors.get == 0) { shutdownSchedule match { case UNSCHEDULED ⇒ shutdownSchedule = SCHEDULED @@ -173,9 +173,8 @@ abstract class MessageDispatcher(val app: AkkaApplication) extends Serializable * and only call it under the dispatcher-guard, see "attach" for the only invocation */ protected[akka] def register(actor: ActorCell) { - if (uuids add actor.uuid) { - systemDispatch(SystemEnvelope(actor, Create, NullChannel)) //FIXME should this be here or moved into ActorCell.start perhaps? - } else System.err.println("Couldn't register: " + actor) + _actors.incrementAndGet() + systemDispatch(SystemEnvelope(actor, Create, NullChannel)) //FIXME should this be here or moved into ActorCell.start perhaps? } /** @@ -183,12 +182,11 @@ abstract class MessageDispatcher(val app: AkkaApplication) extends Serializable * and only call it under the dispatcher-guard, see "detach" for the only invocation */ protected[akka] def unregister(actor: ActorCell) { - if (uuids remove actor.uuid) { - val mailBox = actor.mailbox - mailBox.becomeClosed() - actor.mailbox = deadLetterMailbox //FIXME getAndSet would be preferrable here - cleanUpMailboxFor(actor, mailBox) - } else System.err.println("Couldn't unregister: " + actor) + _actors.decrementAndGet() + val mailBox = actor.mailbox + mailBox.becomeClosed() + actor.mailbox = deadLetterMailbox //FIXME getAndSet would be preferrable here + cleanUpMailboxFor(actor, mailBox) } /** @@ -222,7 +220,7 @@ abstract class MessageDispatcher(val app: AkkaApplication) extends Serializable shutdownSchedule = SCHEDULED app.scheduler.scheduleOnce(this, timeoutMs, TimeUnit.MILLISECONDS) case SCHEDULED ⇒ - if (uuids.isEmpty && _tasks.get == 0) { + if (_tasks.get == 0) { active switchOff { shutdown() // shut down in the dispatcher's references is zero } @@ -243,16 +241,21 @@ abstract class MessageDispatcher(val app: AkkaApplication) extends Serializable /** * After the call to this method, the dispatcher mustn't begin any new message processing for the specified reference */ - def suspend(actor: ActorCell): Unit = - if (uuids.contains(actor.uuid)) actor.mailbox.becomeSuspended() + def suspend(actor: ActorCell): Unit = { + val mbox = actor.mailbox + if (mbox.dispatcher eq this) + mbox.becomeSuspended() + } /* * After the call to this method, the dispatcher must begin any new message processing for the specified reference */ - def resume(actor: ActorCell): Unit = if (uuids.contains(actor.uuid)) { + def resume(actor: ActorCell): Unit = { val mbox = actor.mailbox - mbox.becomeOpen() - registerForExecution(mbox, false, false) + if (mbox.dispatcher eq this) { + mbox.becomeOpen() + registerForExecution(mbox, false, false) + } } /**