Merge pull request #1062 from akka/wip-2821-CTDMS-∂π

fix CallingThreadDispatcherModelSpec, see #2821
This commit is contained in:
Roland Kuhn 2013-01-25 03:08:11 -08:00
commit aea3ff69f8

View file

@ -50,7 +50,11 @@ private[testkit] class CallingThreadDispatcherQueues extends Extension {
// we have to forget about long-gone threads sometime
private def gc {
queues = queues mapValues (_ filter (_.get ne null)) filter (!_._2.isEmpty)
queues = (Map.newBuilder[CallingThreadMailbox, Set[WeakReference[NestingQueue]]] /: queues) {
case (m, (k, v))
val nv = v filter (_.get ne null)
if (nv.isEmpty) m else m += (k -> nv)
}.result
}
protected[akka] def registerQueue(mbox: CallingThreadMailbox, q: NestingQueue): Unit = synchronized {
@ -67,6 +71,10 @@ private[testkit] class CallingThreadDispatcherQueues extends Extension {
}
}
protected[akka] def unregisterQueues(mbox: CallingThreadMailbox): Unit = synchronized {
queues -= mbox
}
/*
* This method must be called with "own" being this thread's queue for the
* given mailbox. When this method returns, the queue will be entered
@ -151,10 +159,21 @@ class CallingThreadDispatcher(
}
}
protected[akka] override def unregister(actor: ActorCell): Unit = {
val mbox = actor.mailbox match {
case m: CallingThreadMailbox Some(m)
case _ None
}
super.unregister(actor)
mbox foreach CallingThreadDispatcherQueues(actor.system).unregisterQueues
}
override def suspend(actor: ActorCell) {
actor.mailbox match {
case m: CallingThreadMailbox m.suspendSwitch.switchOn; m.suspend()
case m m.systemEnqueue(actor.self, Suspend())
case m: CallingThreadMailbox
m.suspendSwitch.switchOn; m.suspend()
case m
m.systemEnqueue(actor.self, Suspend())
}
}
@ -319,10 +338,11 @@ class CallingThreadMailbox(_receiver: akka.actor.Cell, val mailboxType: MailboxT
* the gather operation, tough luck: no guaranteed delivery to deadLetters.
*/
suspendSwitch.locked {
val q = queue
CallingThreadDispatcherQueues(actor.system).gatherFromAllOtherQueues(this, q)
val qq = queue
CallingThreadDispatcherQueues(actor.system).gatherFromAllOtherQueues(this, qq)
super.cleanUp()
q.q.cleanUp(actor.self, actor.systemImpl.deadLetterQueue)
qq.q.cleanUp(actor.self, actor.systemImpl.deadLetterQueue)
q.remove()
}
}
}