fix InterruptedException handling in CallingThreadDispatcher

instead of just re-setting the flag upon swallowed exception, clear the
flag when fully done and re-throw last swallowed IE.
This commit is contained in:
Roland 2011-10-25 15:07:20 +02:00
parent c059d1bc11
commit 6bcdba40c0
2 changed files with 20 additions and 4 deletions

View file

@ -379,9 +379,18 @@ abstract class ActorModelSpec extends AkkaSpec {
val a = newTestActor(dispatcher) val a = newTestActor(dispatcher)
val f1 = a ? Reply("foo") val f1 = a ? Reply("foo")
val f2 = a ? Reply("bar") val f2 = a ? Reply("bar")
val f3 = a ? Interrupt val f3 = try {
a ? Interrupt
} catch {
// CallingThreadDispatcher throws IE directly
case ie: InterruptedException new KeptPromise(Left(ActorInterruptedException(ie)))
}
val f4 = a ? Reply("foo2") val f4 = a ? Reply("foo2")
val f5 = a ? Interrupt val f5 = try {
a ? Interrupt
} catch {
case ie: InterruptedException new KeptPromise(Left(ActorInterruptedException(ie)))
}
val f6 = a ? Reply("bar2") val f6 = a ? Reply("bar2")
assert(f1.get === "foo") assert(f1.get === "foo")

View file

@ -187,7 +187,8 @@ class CallingThreadDispatcher(_app: AkkaApplication, val name: String = "calling
* it is suspendSwitch and resumed. * it is suspendSwitch and resumed.
*/ */
@tailrec @tailrec
private def runQueue(mbox: CallingThreadMailbox, queue: NestingQueue) { private def runQueue(mbox: CallingThreadMailbox, queue: NestingQueue, interruptedex: InterruptedException = null) {
var intex = interruptedex;
assert(queue.isActive) assert(queue.isActive)
mbox.lock.lock mbox.lock.lock
val recurse = try { val recurse = try {
@ -214,6 +215,7 @@ class CallingThreadDispatcher(_app: AkkaApplication, val name: String = "calling
case ie: InterruptedException case ie: InterruptedException
app.eventHandler.error(this, ie) app.eventHandler.error(this, ie)
Thread.currentThread().interrupt() Thread.currentThread().interrupt()
intex = ie
true true
case e case e
app.eventHandler.error(this, e) app.eventHandler.error(this, e)
@ -230,7 +232,12 @@ class CallingThreadDispatcher(_app: AkkaApplication, val name: String = "calling
mbox.lock.unlock mbox.lock.unlock
} }
if (recurse) { if (recurse) {
runQueue(mbox, queue) runQueue(mbox, queue, intex)
} else {
if (intex ne null) {
Thread.interrupted // clear flag
throw intex
}
} }
} }
} }