diff --git a/akka-testkit/src/main/scala/akka/testkit/CallingThreadDispatcher.scala b/akka-testkit/src/main/scala/akka/testkit/CallingThreadDispatcher.scala index 6b8ea3b5df..5ecdcf3948 100644 --- a/akka-testkit/src/main/scala/akka/testkit/CallingThreadDispatcher.scala +++ b/akka-testkit/src/main/scala/akka/testkit/CallingThreadDispatcher.scala @@ -50,7 +50,11 @@ private[testkit] class CallingThreadDispatcherQueues extends Extension { // we have to forget about long-gone threads sometime private def gc { - queues = queues mapValues (_ filter (_.get ne null)) filter (!_._2.isEmpty) + queues = (Map.newBuilder[CallingThreadMailbox, Set[WeakReference[NestingQueue]]] /: queues) { + case (m, (k, v)) ⇒ + val nv = v filter (_.get ne null) + if (nv.isEmpty) m else m += (k -> nv) + }.result } protected[akka] def registerQueue(mbox: CallingThreadMailbox, q: NestingQueue): Unit = synchronized { @@ -67,6 +71,10 @@ private[testkit] class CallingThreadDispatcherQueues extends Extension { } } + protected[akka] def unregisterQueues(mbox: CallingThreadMailbox): Unit = synchronized { + queues -= mbox + } + /* * This method must be called with "own" being this thread's queue for the * given mailbox. When this method returns, the queue will be entered @@ -151,10 +159,21 @@ class CallingThreadDispatcher( } } + protected[akka] override def unregister(actor: ActorCell): Unit = { + val mbox = actor.mailbox match { + case m: CallingThreadMailbox ⇒ Some(m) + case _ ⇒ None + } + super.unregister(actor) + mbox foreach CallingThreadDispatcherQueues(actor.system).unregisterQueues + } + override def suspend(actor: ActorCell) { actor.mailbox match { - case m: CallingThreadMailbox ⇒ m.suspendSwitch.switchOn; m.suspend() - case m ⇒ m.systemEnqueue(actor.self, Suspend()) + case m: CallingThreadMailbox ⇒ + m.suspendSwitch.switchOn; m.suspend() + case m ⇒ + m.systemEnqueue(actor.self, Suspend()) } } @@ -319,10 +338,11 @@ class CallingThreadMailbox(_receiver: akka.actor.Cell, val mailboxType: MailboxT * the gather operation, tough luck: no guaranteed delivery to deadLetters. */ suspendSwitch.locked { - val q = queue - CallingThreadDispatcherQueues(actor.system).gatherFromAllOtherQueues(this, q) + val qq = queue + CallingThreadDispatcherQueues(actor.system).gatherFromAllOtherQueues(this, qq) super.cleanUp() - q.q.cleanUp(actor.self, actor.systemImpl.deadLetterQueue) + qq.q.cleanUp(actor.self, actor.systemImpl.deadLetterQueue) + q.remove() } } }