From c0e85c71ce92c1eafa24f873913a984b416d928a Mon Sep 17 00:00:00 2001 From: Roland Date: Thu, 24 Jan 2013 16:36:26 +0100 Subject: [PATCH] fix CallingThreadDispatcherModelSpec, see #2821 MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The problem was that the “waves of actors” test leaves behind lots of garbage for the CTD to clean up, and its own CallingThreadDispatcherQueues.gc() then happens to run when the guardian creates the top-level actor in the following test case, which takes longer than 3 seconds to run. Fix it by making the GC interval 100ms instead of 1sec so that the amount of garbage is limited. Also, replacing .mapValues(...).filter(...) with .foldLeft(.newBuilder)(...) makes it twice as fast. And even more so: unregistering mailboxes upon actor termination removes the cost nearly completely for the “waves of actors” case. --- .../testkit/CallingThreadDispatcher.scala | 32 +++++++++++++++---- 1 file changed, 26 insertions(+), 6 deletions(-) diff --git a/akka-testkit/src/main/scala/akka/testkit/CallingThreadDispatcher.scala b/akka-testkit/src/main/scala/akka/testkit/CallingThreadDispatcher.scala index 6b8ea3b5df..5ecdcf3948 100644 --- a/akka-testkit/src/main/scala/akka/testkit/CallingThreadDispatcher.scala +++ b/akka-testkit/src/main/scala/akka/testkit/CallingThreadDispatcher.scala @@ -50,7 +50,11 @@ private[testkit] class CallingThreadDispatcherQueues extends Extension { // we have to forget about long-gone threads sometime private def gc { - queues = queues mapValues (_ filter (_.get ne null)) filter (!_._2.isEmpty) + queues = (Map.newBuilder[CallingThreadMailbox, Set[WeakReference[NestingQueue]]] /: queues) { + case (m, (k, v)) ⇒ + val nv = v filter (_.get ne null) + if (nv.isEmpty) m else m += (k -> nv) + }.result } protected[akka] def registerQueue(mbox: CallingThreadMailbox, q: NestingQueue): Unit = synchronized { @@ -67,6 +71,10 @@ private[testkit] class CallingThreadDispatcherQueues extends Extension { } } + protected[akka] def unregisterQueues(mbox: CallingThreadMailbox): Unit = synchronized { + queues -= mbox + } + /* * This method must be called with "own" being this thread's queue for the * given mailbox. When this method returns, the queue will be entered @@ -151,10 +159,21 @@ class CallingThreadDispatcher( } } + protected[akka] override def unregister(actor: ActorCell): Unit = { + val mbox = actor.mailbox match { + case m: CallingThreadMailbox ⇒ Some(m) + case _ ⇒ None + } + super.unregister(actor) + mbox foreach CallingThreadDispatcherQueues(actor.system).unregisterQueues + } + override def suspend(actor: ActorCell) { actor.mailbox match { - case m: CallingThreadMailbox ⇒ m.suspendSwitch.switchOn; m.suspend() - case m ⇒ m.systemEnqueue(actor.self, Suspend()) + case m: CallingThreadMailbox ⇒ + m.suspendSwitch.switchOn; m.suspend() + case m ⇒ + m.systemEnqueue(actor.self, Suspend()) } } @@ -319,10 +338,11 @@ class CallingThreadMailbox(_receiver: akka.actor.Cell, val mailboxType: MailboxT * the gather operation, tough luck: no guaranteed delivery to deadLetters. */ suspendSwitch.locked { - val q = queue - CallingThreadDispatcherQueues(actor.system).gatherFromAllOtherQueues(this, q) + val qq = queue + CallingThreadDispatcherQueues(actor.system).gatherFromAllOtherQueues(this, qq) super.cleanUp() - q.q.cleanUp(actor.self, actor.systemImpl.deadLetterQueue) + qq.q.cleanUp(actor.self, actor.systemImpl.deadLetterQueue) + q.remove() } } }