fix CallingThreadDispatcherModelSpec, see #2821
The problem was that the “waves of actors” test leaves behind lots of garbage for the CTD to clean up, and its own CallingThreadDispatcherQueues.gc() then happens to run when the guardian creates the top-level actor in the following test case, which takes longer than 3 seconds to run. Fix it by making the GC interval 100ms instead of 1sec so that the amount of garbage is limited. Also, replacing .mapValues(...).filter(...) with .foldLeft(.newBuilder)(...) makes it twice as fast. And even more so: unregistering mailboxes upon actor termination removes the cost nearly completely for the “waves of actors” case.
This commit is contained in:
parent
b6ad46e88c
commit
c0e85c71ce
1 changed files with 26 additions and 6 deletions
|
|
@ -50,7 +50,11 @@ private[testkit] class CallingThreadDispatcherQueues extends Extension {
|
|||
|
||||
// we have to forget about long-gone threads sometime
|
||||
private def gc {
|
||||
queues = queues mapValues (_ filter (_.get ne null)) filter (!_._2.isEmpty)
|
||||
queues = (Map.newBuilder[CallingThreadMailbox, Set[WeakReference[NestingQueue]]] /: queues) {
|
||||
case (m, (k, v)) ⇒
|
||||
val nv = v filter (_.get ne null)
|
||||
if (nv.isEmpty) m else m += (k -> nv)
|
||||
}.result
|
||||
}
|
||||
|
||||
protected[akka] def registerQueue(mbox: CallingThreadMailbox, q: NestingQueue): Unit = synchronized {
|
||||
|
|
@ -67,6 +71,10 @@ private[testkit] class CallingThreadDispatcherQueues extends Extension {
|
|||
}
|
||||
}
|
||||
|
||||
protected[akka] def unregisterQueues(mbox: CallingThreadMailbox): Unit = synchronized {
|
||||
queues -= mbox
|
||||
}
|
||||
|
||||
/*
|
||||
* This method must be called with "own" being this thread's queue for the
|
||||
* given mailbox. When this method returns, the queue will be entered
|
||||
|
|
@ -151,10 +159,21 @@ class CallingThreadDispatcher(
|
|||
}
|
||||
}
|
||||
|
||||
protected[akka] override def unregister(actor: ActorCell): Unit = {
|
||||
val mbox = actor.mailbox match {
|
||||
case m: CallingThreadMailbox ⇒ Some(m)
|
||||
case _ ⇒ None
|
||||
}
|
||||
super.unregister(actor)
|
||||
mbox foreach CallingThreadDispatcherQueues(actor.system).unregisterQueues
|
||||
}
|
||||
|
||||
override def suspend(actor: ActorCell) {
|
||||
actor.mailbox match {
|
||||
case m: CallingThreadMailbox ⇒ m.suspendSwitch.switchOn; m.suspend()
|
||||
case m ⇒ m.systemEnqueue(actor.self, Suspend())
|
||||
case m: CallingThreadMailbox ⇒
|
||||
m.suspendSwitch.switchOn; m.suspend()
|
||||
case m ⇒
|
||||
m.systemEnqueue(actor.self, Suspend())
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -319,10 +338,11 @@ class CallingThreadMailbox(_receiver: akka.actor.Cell, val mailboxType: MailboxT
|
|||
* the gather operation, tough luck: no guaranteed delivery to deadLetters.
|
||||
*/
|
||||
suspendSwitch.locked {
|
||||
val q = queue
|
||||
CallingThreadDispatcherQueues(actor.system).gatherFromAllOtherQueues(this, q)
|
||||
val qq = queue
|
||||
CallingThreadDispatcherQueues(actor.system).gatherFromAllOtherQueues(this, qq)
|
||||
super.cleanUp()
|
||||
q.q.cleanUp(actor.self, actor.systemImpl.deadLetterQueue)
|
||||
qq.q.cleanUp(actor.self, actor.systemImpl.deadLetterQueue)
|
||||
q.remove()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue