diff --git a/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala index d07fa3ee76..755e122ec1 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala @@ -243,6 +243,9 @@ abstract class ActorModelSpec extends AkkaSpec { protected def newInterceptedDispatcher: MessageDispatcherInterceptor protected def dispatcherType: String + // BalancingDispatcher of course does not work when another actor is in the pool, so overridden below + protected def wavesSupervisorDispatcher(dispatcher: MessageDispatcher) = dispatcher + "A " + dispatcherType must { "must dynamically handle its own life cycle" in { @@ -325,28 +328,6 @@ abstract class ActorModelSpec extends AkkaSpec { thread.start() } - "process messages in parallel" in { - implicit val dispatcher = newInterceptedDispatcher - val aStart, aStop, bParallel = new CountDownLatch(1) - val a, b = newTestActor(dispatcher) - - a ! Meet(aStart, aStop) - assertCountDown(aStart, Testing.testTime(3000), "Should process first message within 3 seconds") - - b ! CountDown(bParallel) - assertCountDown(bParallel, Testing.testTime(3000), "Should process other actors in parallel") - - aStop.countDown() - - a.stop - b.stop - - while (!a.isShutdown && !b.isShutdown) {} //Busy wait for termination - - assertRefDefaultZero(a)(registers = 1, unregisters = 1, msgsReceived = 1, msgsProcessed = 1) - assertRefDefaultZero(b)(registers = 1, unregisters = 1, msgsReceived = 1, msgsProcessed = 1) - } - "not process messages for a suspended actor" in { implicit val dispatcher = newInterceptedDispatcher val a = newTestActor(dispatcher).asInstanceOf[LocalActorRef] @@ -375,13 +356,15 @@ abstract class ActorModelSpec extends AkkaSpec { val boss = actorOf(Props(context ⇒ { case "run" ⇒ for (_ ← 1 to num) context.actorOf(props) ! cachedMessage - })) ! "run" + }).withDispatcher(wavesSupervisorDispatcher(dispatcher))) + boss ! "run" try { assertCountDown(cachedMessage.latch, Testing.testTime(10000), "Should process " + num + " countdowns") } catch { case e ⇒ System.err.println("Error: " + e.getMessage + " missing count downs == " + cachedMessage.latch.getCount() + " out of " + num) } + boss.stop() } for (run ← 1 to 3) { flood(40000) @@ -467,6 +450,28 @@ class DispatcherModelSpec extends ActorModelSpec { assert(each.await.exception.get.isInstanceOf[ActorKilledException]) a.stop() } + + "process messages in parallel" in { + implicit val dispatcher = newInterceptedDispatcher + val aStart, aStop, bParallel = new CountDownLatch(1) + val a, b = newTestActor(dispatcher) + + a ! Meet(aStart, aStop) + assertCountDown(aStart, Testing.testTime(3000), "Should process first message within 3 seconds") + + b ! CountDown(bParallel) + assertCountDown(bParallel, Testing.testTime(3000), "Should process other actors in parallel") + + aStop.countDown() + + a.stop + b.stop + + while (!a.isShutdown && !b.isShutdown) {} //Busy wait for termination + + assertRefDefaultZero(a)(registers = 1, unregisters = 1, msgsReceived = 1, msgsProcessed = 1) + assertRefDefaultZero(b)(registers = 1, unregisters = 1, msgsReceived = 1, msgsProcessed = 1) + } } } @@ -481,4 +486,30 @@ class BalancingDispatcherModelSpec extends ActorModelSpec { ThreadPoolConfig(app)).build.asInstanceOf[MessageDispatcherInterceptor] def dispatcherType = "Balancing Dispatcher" + + override def wavesSupervisorDispatcher(dispatcher: MessageDispatcher) = app.dispatcher + + "A " + dispatcherType must { + "process messages in parallel" in { + implicit val dispatcher = newInterceptedDispatcher + val aStart, aStop, bParallel = new CountDownLatch(1) + val a, b = newTestActor(dispatcher) + + a ! Meet(aStart, aStop) + assertCountDown(aStart, Testing.testTime(3000), "Should process first message within 3 seconds") + + b ! CountDown(bParallel) + assertCountDown(bParallel, Testing.testTime(3000), "Should process other actors in parallel") + + aStop.countDown() + + a.stop + b.stop + + while (!a.isShutdown && !b.isShutdown) {} //Busy wait for termination + + assertRefDefaultZero(a)(registers = 1, unregisters = 1, msgsReceived = 1, msgsProcessed = 1) + assertRefDefaultZero(b)(registers = 1, unregisters = 1, msgsReceived = 1, msgsProcessed = 1) + } + } } diff --git a/akka-actor-tests/src/test/scala/akka/testkit/CallingThreadDispatcherModelSpec.scala b/akka-actor-tests/src/test/scala/akka/testkit/CallingThreadDispatcherModelSpec.scala index da7c8d2a2b..c977709cbc 100644 --- a/akka-actor-tests/src/test/scala/akka/testkit/CallingThreadDispatcherModelSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/testkit/CallingThreadDispatcherModelSpec.scala @@ -7,50 +7,11 @@ import akka.actor.dispatch.ActorModelSpec import java.util.concurrent.CountDownLatch import org.junit.{ After, Test } -// TODO fix this test when the CallingThreadDispatcher is fixed -/* +@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) class CallingThreadDispatcherModelSpec extends ActorModelSpec { import ActorModelSpec._ - def newInterceptedDispatcher = new CallingThreadDispatcher with MessageDispatcherInterceptor + def newInterceptedDispatcher = new CallingThreadDispatcher(app, "test", true) with MessageDispatcherInterceptor def dispatcherType = "Calling Thread Dispatcher" - // A CallingThreadDispatcher can by design not process messages in parallel, - // so disable this test - //override def dispatcherShouldProcessMessagesInParallel {} - - // This test needs to be adapted: CTD runs the flood completely sequentially - // with start, invocation, stop, schedule shutdown, abort shutdown, repeat; - // add "keeper" actor to lock down the dispatcher instance, since the - // frequent attempted shutdown seems rather costly (random timing failures - // without this fix) - // override def dispatcherShouldHandleWavesOfActors { - // implicit val dispatcher = newInterceptedDispatcher - // - // def flood(num: Int) { - // val cachedMessage = CountDownNStop(new CountDownLatch(num)) - // val keeper = newTestActor - // (1 to num) foreach { _ ⇒ - // newTestActor ! cachedMessage - // } - // keeper.stop() - // assertCountDown(cachedMessage.latch, 10000, "Should process " + num + " countdowns") - // } - // for (run ← 1 to 3) { - // flood(10000) - // assertDispatcher(dispatcher)(starts = run, stops = run) - // } - // } - - //override def dispatcherShouldCompleteAllUncompletedSenderFuturesOnDeregister { - //Can't handle this... - //} - - @After - def after { - //remove the interrupted status since we are messing with interrupted exceptions. - Thread.interrupted() - } - } -*/ diff --git a/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala b/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala index 7beb0ffe95..40dcbfd9df 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala @@ -15,7 +15,7 @@ import annotation.tailrec class MessageQueueAppendFailedException(message: String, cause: Throwable = null) extends AkkaException(message, cause) -private[dispatch] object Mailbox { +object Mailbox { type Status = Int diff --git a/akka-testkit/src/main/scala/akka/testkit/CallingThreadDispatcher.scala b/akka-testkit/src/main/scala/akka/testkit/CallingThreadDispatcher.scala index b6cb07cb02..dc27e036a9 100644 --- a/akka-testkit/src/main/scala/akka/testkit/CallingThreadDispatcher.scala +++ b/akka-testkit/src/main/scala/akka/testkit/CallingThreadDispatcher.scala @@ -31,11 +31,12 @@ import akka.AkkaApplication * within one of its methods taking a closure argument. */ -object CallingThreadDispatcher { +private[testkit] object CallingThreadDispatcher { // PRIVATE DATA private var queues = Map[CallingThreadMailbox, Set[WeakReference[NestingQueue]]]() + private var lastGC = 0l // we have to forget about long-gone threads sometime private def gc { @@ -49,7 +50,11 @@ object CallingThreadDispatcher { } else { queues += mbox -> Set(new WeakReference(q)) } - gc + val now = System.nanoTime + if (now - lastGC > 1000000000l) { + lastGC = now + gc + } } /* @@ -57,20 +62,16 @@ object CallingThreadDispatcher { * given mailbox. When this method returns, the queue will be entered * (active). */ - protected[akka] def gatherFromAllInactiveQueues(mbox: CallingThreadMailbox, own: NestingQueue): Unit = synchronized { + protected[akka] def gatherFromAllOtherQueues(mbox: CallingThreadMailbox, own: NestingQueue): Unit = synchronized { if (!own.isActive) own.enter if (queues contains mbox) { for { ref ← queues(mbox) - q = ref.get - if (q ne null) && !q.isActive - /* - * if q.isActive was false, then it cannot change to true while we are - * holding the mbox.suspende.switch's lock under which we are currently - * executing - */ + val q = ref.get + if (q ne null) && (q ne own) } { while (q.peek ne null) { + // this is safe because this method is only ever called while holding the suspendSwitch monitor own.push(q.pop) } } @@ -129,7 +130,7 @@ class CallingThreadDispatcher(_app: AkkaApplication, val name: String = "calling val queue = mbox.queue val wasActive = queue.isActive val switched = mbox.suspendSwitch.switchOff { - gatherFromAllInactiveQueues(mbox, queue) + gatherFromAllOtherQueues(mbox, queue) } if (switched && !wasActive) { runQueue(mbox, queue) @@ -142,11 +143,11 @@ class CallingThreadDispatcher(_app: AkkaApplication, val name: String = "calling protected[akka] override def systemDispatch(receiver: ActorCell, message: SystemMessage) { val mbox = getMailbox(receiver) - mbox.lock.lock - try { - receiver systemInvoke message - } finally { - mbox.lock.unlock + mbox.systemEnqueue(message) + val queue = mbox.queue + if (!queue.isActive) { + queue.enter + runQueue(mbox, queue) } } @@ -190,6 +191,7 @@ class CallingThreadDispatcher(_app: AkkaApplication, val name: String = "calling assert(queue.isActive) mbox.lock.lock val recurse = try { + mbox.processAllSystemMessages() val handle = mbox.suspendSwitch.fold[Envelope] { queue.leave null @@ -200,6 +202,7 @@ class CallingThreadDispatcher(_app: AkkaApplication, val name: String = "calling } if (handle ne null) { try { + if (Mailbox.debug) println(mbox.actor + " processing message " + handle) mbox.actor.invoke(handle) if (warnings) handle.channel match { case f: ActorPromise if !f.isCompleted ⇒ @@ -208,6 +211,10 @@ class CallingThreadDispatcher(_app: AkkaApplication, val name: String = "calling } true } catch { + case ie: InterruptedException ⇒ + app.eventHandler.error(this, ie) + Thread.currentThread().interrupt() + true case e ⇒ app.eventHandler.error(this, e) queue.leave @@ -217,6 +224,8 @@ class CallingThreadDispatcher(_app: AkkaApplication, val name: String = "calling queue.leave false } else false + } catch { + case e ⇒ queue.leave; throw e } finally { mbox.lock.unlock } @@ -244,7 +253,11 @@ class NestingQueue { class CallingThreadMailbox(val dispatcher: MessageDispatcher, _receiver: ActorCell) extends Mailbox(_receiver) with DefaultSystemMessageQueue { private val q = new ThreadLocal[NestingQueue]() { - override def initialValue = new NestingQueue + override def initialValue = { + val queue = new NestingQueue + CallingThreadDispatcher.registerQueue(CallingThreadMailbox.this, queue) + queue + } } def queue = q.get diff --git a/config/akka.test.conf b/config/akka.test.conf index 25161e06c4..7ff55c0fcd 100644 --- a/config/akka.test.conf +++ b/config/akka.test.conf @@ -6,7 +6,7 @@ include "akka-reference.conf" akka { event-handlers = ["akka.testkit.TestEventListener"] - event-handler-level = "ERROR" + event-handler-level = "WARNING" actor { default-dispatcher { core-pool-size-factor = 1