fix CallingThreadDispatcher and re-enable its test

This commit is contained in:
Roland 2011-10-21 18:47:44 +02:00
parent bb942750aa
commit fc8ab7dad8
5 changed files with 88 additions and 83 deletions

View file

@ -243,6 +243,9 @@ abstract class ActorModelSpec extends AkkaSpec {
protected def newInterceptedDispatcher: MessageDispatcherInterceptor protected def newInterceptedDispatcher: MessageDispatcherInterceptor
protected def dispatcherType: String protected def dispatcherType: String
// BalancingDispatcher of course does not work when another actor is in the pool, so overridden below
protected def wavesSupervisorDispatcher(dispatcher: MessageDispatcher) = dispatcher
"A " + dispatcherType must { "A " + dispatcherType must {
"must dynamically handle its own life cycle" in { "must dynamically handle its own life cycle" in {
@ -325,28 +328,6 @@ abstract class ActorModelSpec extends AkkaSpec {
thread.start() thread.start()
} }
"process messages in parallel" in {
implicit val dispatcher = newInterceptedDispatcher
val aStart, aStop, bParallel = new CountDownLatch(1)
val a, b = newTestActor(dispatcher)
a ! Meet(aStart, aStop)
assertCountDown(aStart, Testing.testTime(3000), "Should process first message within 3 seconds")
b ! CountDown(bParallel)
assertCountDown(bParallel, Testing.testTime(3000), "Should process other actors in parallel")
aStop.countDown()
a.stop
b.stop
while (!a.isShutdown && !b.isShutdown) {} //Busy wait for termination
assertRefDefaultZero(a)(registers = 1, unregisters = 1, msgsReceived = 1, msgsProcessed = 1)
assertRefDefaultZero(b)(registers = 1, unregisters = 1, msgsReceived = 1, msgsProcessed = 1)
}
"not process messages for a suspended actor" in { "not process messages for a suspended actor" in {
implicit val dispatcher = newInterceptedDispatcher implicit val dispatcher = newInterceptedDispatcher
val a = newTestActor(dispatcher).asInstanceOf[LocalActorRef] val a = newTestActor(dispatcher).asInstanceOf[LocalActorRef]
@ -375,13 +356,15 @@ abstract class ActorModelSpec extends AkkaSpec {
val boss = actorOf(Props(context { val boss = actorOf(Props(context {
case "run" case "run"
for (_ 1 to num) context.actorOf(props) ! cachedMessage for (_ 1 to num) context.actorOf(props) ! cachedMessage
})) ! "run" }).withDispatcher(wavesSupervisorDispatcher(dispatcher)))
boss ! "run"
try { try {
assertCountDown(cachedMessage.latch, Testing.testTime(10000), "Should process " + num + " countdowns") assertCountDown(cachedMessage.latch, Testing.testTime(10000), "Should process " + num + " countdowns")
} catch { } catch {
case e case e
System.err.println("Error: " + e.getMessage + " missing count downs == " + cachedMessage.latch.getCount() + " out of " + num) System.err.println("Error: " + e.getMessage + " missing count downs == " + cachedMessage.latch.getCount() + " out of " + num)
} }
boss.stop()
} }
for (run 1 to 3) { for (run 1 to 3) {
flood(40000) flood(40000)
@ -467,6 +450,28 @@ class DispatcherModelSpec extends ActorModelSpec {
assert(each.await.exception.get.isInstanceOf[ActorKilledException]) assert(each.await.exception.get.isInstanceOf[ActorKilledException])
a.stop() a.stop()
} }
"process messages in parallel" in {
implicit val dispatcher = newInterceptedDispatcher
val aStart, aStop, bParallel = new CountDownLatch(1)
val a, b = newTestActor(dispatcher)
a ! Meet(aStart, aStop)
assertCountDown(aStart, Testing.testTime(3000), "Should process first message within 3 seconds")
b ! CountDown(bParallel)
assertCountDown(bParallel, Testing.testTime(3000), "Should process other actors in parallel")
aStop.countDown()
a.stop
b.stop
while (!a.isShutdown && !b.isShutdown) {} //Busy wait for termination
assertRefDefaultZero(a)(registers = 1, unregisters = 1, msgsReceived = 1, msgsProcessed = 1)
assertRefDefaultZero(b)(registers = 1, unregisters = 1, msgsReceived = 1, msgsProcessed = 1)
}
} }
} }
@ -481,4 +486,30 @@ class BalancingDispatcherModelSpec extends ActorModelSpec {
ThreadPoolConfig(app)).build.asInstanceOf[MessageDispatcherInterceptor] ThreadPoolConfig(app)).build.asInstanceOf[MessageDispatcherInterceptor]
def dispatcherType = "Balancing Dispatcher" def dispatcherType = "Balancing Dispatcher"
override def wavesSupervisorDispatcher(dispatcher: MessageDispatcher) = app.dispatcher
"A " + dispatcherType must {
"process messages in parallel" in {
implicit val dispatcher = newInterceptedDispatcher
val aStart, aStop, bParallel = new CountDownLatch(1)
val a, b = newTestActor(dispatcher)
a ! Meet(aStart, aStop)
assertCountDown(aStart, Testing.testTime(3000), "Should process first message within 3 seconds")
b ! CountDown(bParallel)
assertCountDown(bParallel, Testing.testTime(3000), "Should process other actors in parallel")
aStop.countDown()
a.stop
b.stop
while (!a.isShutdown && !b.isShutdown) {} //Busy wait for termination
assertRefDefaultZero(a)(registers = 1, unregisters = 1, msgsReceived = 1, msgsProcessed = 1)
assertRefDefaultZero(b)(registers = 1, unregisters = 1, msgsReceived = 1, msgsProcessed = 1)
}
}
} }

View file

@ -7,50 +7,11 @@ import akka.actor.dispatch.ActorModelSpec
import java.util.concurrent.CountDownLatch import java.util.concurrent.CountDownLatch
import org.junit.{ After, Test } import org.junit.{ After, Test }
// TODO fix this test when the CallingThreadDispatcher is fixed @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
/*
class CallingThreadDispatcherModelSpec extends ActorModelSpec { class CallingThreadDispatcherModelSpec extends ActorModelSpec {
import ActorModelSpec._ import ActorModelSpec._
def newInterceptedDispatcher = new CallingThreadDispatcher with MessageDispatcherInterceptor def newInterceptedDispatcher = new CallingThreadDispatcher(app, "test", true) with MessageDispatcherInterceptor
def dispatcherType = "Calling Thread Dispatcher" def dispatcherType = "Calling Thread Dispatcher"
// A CallingThreadDispatcher can by design not process messages in parallel,
// so disable this test
//override def dispatcherShouldProcessMessagesInParallel {}
// This test needs to be adapted: CTD runs the flood completely sequentially
// with start, invocation, stop, schedule shutdown, abort shutdown, repeat;
// add "keeper" actor to lock down the dispatcher instance, since the
// frequent attempted shutdown seems rather costly (random timing failures
// without this fix)
// override def dispatcherShouldHandleWavesOfActors {
// implicit val dispatcher = newInterceptedDispatcher
//
// def flood(num: Int) {
// val cachedMessage = CountDownNStop(new CountDownLatch(num))
// val keeper = newTestActor
// (1 to num) foreach { _
// newTestActor ! cachedMessage
// }
// keeper.stop()
// assertCountDown(cachedMessage.latch, 10000, "Should process " + num + " countdowns")
// }
// for (run 1 to 3) {
// flood(10000)
// assertDispatcher(dispatcher)(starts = run, stops = run)
// }
// }
//override def dispatcherShouldCompleteAllUncompletedSenderFuturesOnDeregister {
//Can't handle this...
//}
@After
def after {
//remove the interrupted status since we are messing with interrupted exceptions.
Thread.interrupted()
}
} }
*/

View file

@ -15,7 +15,7 @@ import annotation.tailrec
class MessageQueueAppendFailedException(message: String, cause: Throwable = null) extends AkkaException(message, cause) class MessageQueueAppendFailedException(message: String, cause: Throwable = null) extends AkkaException(message, cause)
private[dispatch] object Mailbox { object Mailbox {
type Status = Int type Status = Int

View file

@ -31,11 +31,12 @@ import akka.AkkaApplication
* within one of its methods taking a closure argument. * within one of its methods taking a closure argument.
*/ */
object CallingThreadDispatcher { private[testkit] object CallingThreadDispatcher {
// PRIVATE DATA // PRIVATE DATA
private var queues = Map[CallingThreadMailbox, Set[WeakReference[NestingQueue]]]() private var queues = Map[CallingThreadMailbox, Set[WeakReference[NestingQueue]]]()
private var lastGC = 0l
// we have to forget about long-gone threads sometime // we have to forget about long-gone threads sometime
private def gc { private def gc {
@ -49,28 +50,28 @@ object CallingThreadDispatcher {
} else { } else {
queues += mbox -> Set(new WeakReference(q)) queues += mbox -> Set(new WeakReference(q))
} }
val now = System.nanoTime
if (now - lastGC > 1000000000l) {
lastGC = now
gc gc
} }
}
/* /*
* This method must be called with "own" being this thread's queue for the * This method must be called with "own" being this thread's queue for the
* given mailbox. When this method returns, the queue will be entered * given mailbox. When this method returns, the queue will be entered
* (active). * (active).
*/ */
protected[akka] def gatherFromAllInactiveQueues(mbox: CallingThreadMailbox, own: NestingQueue): Unit = synchronized { protected[akka] def gatherFromAllOtherQueues(mbox: CallingThreadMailbox, own: NestingQueue): Unit = synchronized {
if (!own.isActive) own.enter if (!own.isActive) own.enter
if (queues contains mbox) { if (queues contains mbox) {
for { for {
ref queues(mbox) ref queues(mbox)
q = ref.get val q = ref.get
if (q ne null) && !q.isActive if (q ne null) && (q ne own)
/*
* if q.isActive was false, then it cannot change to true while we are
* holding the mbox.suspende.switch's lock under which we are currently
* executing
*/
} { } {
while (q.peek ne null) { while (q.peek ne null) {
// this is safe because this method is only ever called while holding the suspendSwitch monitor
own.push(q.pop) own.push(q.pop)
} }
} }
@ -129,7 +130,7 @@ class CallingThreadDispatcher(_app: AkkaApplication, val name: String = "calling
val queue = mbox.queue val queue = mbox.queue
val wasActive = queue.isActive val wasActive = queue.isActive
val switched = mbox.suspendSwitch.switchOff { val switched = mbox.suspendSwitch.switchOff {
gatherFromAllInactiveQueues(mbox, queue) gatherFromAllOtherQueues(mbox, queue)
} }
if (switched && !wasActive) { if (switched && !wasActive) {
runQueue(mbox, queue) runQueue(mbox, queue)
@ -142,11 +143,11 @@ class CallingThreadDispatcher(_app: AkkaApplication, val name: String = "calling
protected[akka] override def systemDispatch(receiver: ActorCell, message: SystemMessage) { protected[akka] override def systemDispatch(receiver: ActorCell, message: SystemMessage) {
val mbox = getMailbox(receiver) val mbox = getMailbox(receiver)
mbox.lock.lock mbox.systemEnqueue(message)
try { val queue = mbox.queue
receiver systemInvoke message if (!queue.isActive) {
} finally { queue.enter
mbox.lock.unlock runQueue(mbox, queue)
} }
} }
@ -190,6 +191,7 @@ class CallingThreadDispatcher(_app: AkkaApplication, val name: String = "calling
assert(queue.isActive) assert(queue.isActive)
mbox.lock.lock mbox.lock.lock
val recurse = try { val recurse = try {
mbox.processAllSystemMessages()
val handle = mbox.suspendSwitch.fold[Envelope] { val handle = mbox.suspendSwitch.fold[Envelope] {
queue.leave queue.leave
null null
@ -200,6 +202,7 @@ class CallingThreadDispatcher(_app: AkkaApplication, val name: String = "calling
} }
if (handle ne null) { if (handle ne null) {
try { try {
if (Mailbox.debug) println(mbox.actor + " processing message " + handle)
mbox.actor.invoke(handle) mbox.actor.invoke(handle)
if (warnings) handle.channel match { if (warnings) handle.channel match {
case f: ActorPromise if !f.isCompleted case f: ActorPromise if !f.isCompleted
@ -208,6 +211,10 @@ class CallingThreadDispatcher(_app: AkkaApplication, val name: String = "calling
} }
true true
} catch { } catch {
case ie: InterruptedException
app.eventHandler.error(this, ie)
Thread.currentThread().interrupt()
true
case e case e
app.eventHandler.error(this, e) app.eventHandler.error(this, e)
queue.leave queue.leave
@ -217,6 +224,8 @@ class CallingThreadDispatcher(_app: AkkaApplication, val name: String = "calling
queue.leave queue.leave
false false
} else false } else false
} catch {
case e queue.leave; throw e
} finally { } finally {
mbox.lock.unlock mbox.lock.unlock
} }
@ -244,7 +253,11 @@ class NestingQueue {
class CallingThreadMailbox(val dispatcher: MessageDispatcher, _receiver: ActorCell) extends Mailbox(_receiver) with DefaultSystemMessageQueue { class CallingThreadMailbox(val dispatcher: MessageDispatcher, _receiver: ActorCell) extends Mailbox(_receiver) with DefaultSystemMessageQueue {
private val q = new ThreadLocal[NestingQueue]() { private val q = new ThreadLocal[NestingQueue]() {
override def initialValue = new NestingQueue override def initialValue = {
val queue = new NestingQueue
CallingThreadDispatcher.registerQueue(CallingThreadMailbox.this, queue)
queue
}
} }
def queue = q.get def queue = q.get

View file

@ -6,7 +6,7 @@ include "akka-reference.conf"
akka { akka {
event-handlers = ["akka.testkit.TestEventListener"] event-handlers = ["akka.testkit.TestEventListener"]
event-handler-level = "ERROR" event-handler-level = "WARNING"
actor { actor {
default-dispatcher { default-dispatcher {
core-pool-size-factor = 1 core-pool-size-factor = 1