fix one spurious buglet in CallingThreadDispatcher, see #1375
This commit is contained in:
parent
8abcf8ce2a
commit
6fddb87fca
3 changed files with 45 additions and 31 deletions
|
|
@ -133,11 +133,6 @@ class CallingThreadDispatcher(
|
|||
|
||||
protected[akka] override def createMailbox(actor: ActorCell) = new CallingThreadMailbox(actor)
|
||||
|
||||
private def getMailbox(actor: ActorCell): Option[CallingThreadMailbox] = actor.mailbox match {
|
||||
case m: CallingThreadMailbox ⇒ Some(m)
|
||||
case _ ⇒ None
|
||||
}
|
||||
|
||||
protected[akka] override def shutdown() {}
|
||||
|
||||
protected[akka] override def throughput = 0
|
||||
|
|
@ -147,7 +142,10 @@ class CallingThreadDispatcher(
|
|||
protected[akka] override def shutdownTimeout = 1 second
|
||||
|
||||
override def suspend(actor: ActorCell) {
|
||||
getMailbox(actor) foreach (_.suspendSwitch.switchOn)
|
||||
actor.mailbox match {
|
||||
case m: CallingThreadMailbox ⇒ m.suspendSwitch.switchOn
|
||||
case m ⇒ m.systemEnqueue(actor.self, Suspend())
|
||||
}
|
||||
}
|
||||
|
||||
override def resume(actor: ActorCell) {
|
||||
|
|
@ -187,12 +185,10 @@ class CallingThreadDispatcher(
|
|||
false
|
||||
} {
|
||||
queue.push(handle)
|
||||
if (queue.isActive)
|
||||
false
|
||||
else {
|
||||
if (!queue.isActive) {
|
||||
queue.enter
|
||||
true
|
||||
}
|
||||
} else false
|
||||
}
|
||||
if (execute) runQueue(mbox, queue)
|
||||
case m ⇒ m.enqueue(receiver.self, handle)
|
||||
|
|
@ -214,14 +210,14 @@ class CallingThreadDispatcher(
|
|||
private def runQueue(mbox: CallingThreadMailbox, queue: NestingQueue, interruptedex: InterruptedException = null) {
|
||||
var intex = interruptedex;
|
||||
assert(queue.isActive)
|
||||
mbox.lock.lock
|
||||
mbox.ctdLock.lock
|
||||
val recurse = try {
|
||||
mbox.processAllSystemMessages()
|
||||
val handle = mbox.suspendSwitch.fold[Envelope] {
|
||||
queue.leave
|
||||
null
|
||||
} {
|
||||
val ret = queue.pop
|
||||
val ret = if (mbox.isClosed) null else queue.pop
|
||||
if (ret eq null) queue.leave
|
||||
ret
|
||||
}
|
||||
|
|
@ -248,7 +244,7 @@ class CallingThreadDispatcher(
|
|||
} catch {
|
||||
case e ⇒ queue.leave; throw e
|
||||
} finally {
|
||||
mbox.lock.unlock
|
||||
mbox.ctdLock.unlock
|
||||
}
|
||||
if (recurse) {
|
||||
runQueue(mbox, queue, intex)
|
||||
|
|
@ -295,11 +291,23 @@ class CallingThreadMailbox(_receiver: ActorCell) extends Mailbox(_receiver) with
|
|||
|
||||
def queue = q.get
|
||||
|
||||
val lock = new ReentrantLock
|
||||
val ctdLock = new ReentrantLock
|
||||
val suspendSwitch = new Switch
|
||||
|
||||
override def enqueue(receiver: ActorRef, msg: Envelope) {}
|
||||
override def dequeue() = null
|
||||
override def hasMessages = queue.isEmpty
|
||||
override def numberOfMessages = queue.size
|
||||
|
||||
override def cleanUp(): Unit = {
|
||||
/*
|
||||
* This is called from dispatcher.unregister, i.e. under this.lock. If
|
||||
* another thread obtained a reference to this mailbox and enqueues after
|
||||
* the gather operation, tough luck: no guaranteed delivery to deadLetters.
|
||||
*/
|
||||
suspendSwitch.locked {
|
||||
CallingThreadDispatcherQueues(actor.system).gatherFromAllOtherQueues(this, queue)
|
||||
super.cleanUp()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue