From 6109a17af410a7a23dfdde66db37106c95284467 Mon Sep 17 00:00:00 2001 From: Roland Date: Thu, 22 Sep 2011 16:23:07 +0200 Subject: [PATCH] fix ActorModelSpec - transform Dispatcher and Actor asserts into await() - add more detailed logging of what goes wrong --- .../akka/actor/dispatch/ActorModelSpec.scala | 55 +++++++++++-------- .../CallingThreadDispatcherModelSpec.scala | 1 - 2 files changed, 33 insertions(+), 23 deletions(-) diff --git a/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala index a39475ba8d..0284245974 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala @@ -92,6 +92,9 @@ object ActorModelSpec { val msgsReceived = new AtomicLong(0) val msgsProcessed = new AtomicLong(0) val restarts = new AtomicLong(0) + override def toString = "InterceptorStats(susp=" + suspensions + + ",res=" + resumes + ",reg=" + registers + ",unreg=" + unregisters + + ",recv=" + msgsReceived + ",proc=" + msgsProcessed + ",restart=" + restarts } trait MessageDispatcherInterceptor extends MessageDispatcher { @@ -143,8 +146,16 @@ object ActorModelSpec { def assertDispatcher(dispatcher: MessageDispatcherInterceptor)( starts: Long = dispatcher.starts.get(), stops: Long = dispatcher.stops.get()) { - assert(starts === dispatcher.starts.get(), "Dispatcher starts") - assert(stops === dispatcher.stops.get(), "Dispatcher stops") + val deadline = System.currentTimeMillis + dispatcher.timeoutMs * 5 + try { + await(deadline)(starts == dispatcher.starts.get) + await(deadline)(stops == dispatcher.stops.get) + } catch { + case e ⇒ + EventHandler.error(e, dispatcher, "actual: starts=" + dispatcher.starts.get + ",stops=" + dispatcher.stops.get + + " required: starts=" + starts + ",stops=" + stops) + throw e + } } def assertCountDown(latch: CountDownLatch, wait: Long, hint: AnyRef) { @@ -185,27 +196,33 @@ object ActorModelSpec { msgsProcessed: Long = statsFor(actorRef).msgsProcessed.get(), restarts: Long = statsFor(actorRef).restarts.get()) { val stats = statsFor(actorRef, Option(dispatcher).getOrElse(actorRef.asInstanceOf[LocalActorRef].underlying.dispatcher)) - assert(stats.suspensions.get() === suspensions, "Suspensions") - assert(stats.resumes.get() === resumes, "Resumes") - assert(stats.registers.get() === registers, "Registers") - assert(stats.unregisters.get() === unregisters, "Unregisters") - assert(stats.msgsReceived.get() === msgsReceived, "Received") - assert(stats.msgsProcessed.get() === msgsProcessed, "Processed") - assert(stats.restarts.get() === restarts, "Restarts") + val deadline = System.currentTimeMillis + 1000 + try { + await(deadline)(stats.suspensions.get() == suspensions) + await(deadline)(stats.resumes.get() == resumes) + await(deadline)(stats.registers.get() == registers) + await(deadline)(stats.unregisters.get() == unregisters) + await(deadline)(stats.msgsReceived.get() == msgsReceived) + await(deadline)(stats.msgsProcessed.get() == msgsProcessed) + await(deadline)(stats.restarts.get() == restarts) + } catch { + case e ⇒ + EventHandler.error(e, dispatcher, "actual: " + stats + ", required: InterceptorStats(susp=" + suspensions + + ",res=" + resumes + ",reg=" + registers + ",unreg=" + unregisters + + ",recv=" + msgsReceived + ",proc=" + msgsProcessed + ",restart=" + restarts) + throw e + } } - def await(condition: ⇒ Boolean)(withinMs: Long, intervalMs: Long = 25): Unit = try { - val until = System.currentTimeMillis() + withinMs + def await(until: Long)(condition: ⇒ Boolean): Unit = try { while (System.currentTimeMillis() <= until) { try { - if (condition) return - - Thread.sleep(intervalMs) + if (condition) return else Thread.sleep(25) } catch { case e: InterruptedException ⇒ } } - assert(0 === 1, "await failed") + throw new AssertionError("await failed") } def newTestActor(implicit d: MessageDispatcherInterceptor) = actorOf(Props[DispatcherActor].withDispatcher(d)) @@ -224,7 +241,6 @@ abstract class ActorModelSpec extends JUnitSuite { val a = newTestActor assertDispatcher(dispatcher)(starts = 1, stops = 0) a.stop() - await(dispatcher.stops.get == 1)(withinMs = dispatcher.timeoutMs * 5) assertDispatcher(dispatcher)(starts = 1, stops = 1) assertRef(a, dispatcher)( suspensions = 0, @@ -238,17 +254,14 @@ abstract class ActorModelSpec extends JUnitSuite { val futures = for (i ← 1 to 10) yield Future { i } - await(dispatcher.stops.get == 2)(withinMs = dispatcher.timeoutMs * 5) assertDispatcher(dispatcher)(starts = 2, stops = 2) val a2 = newTestActor val futures2 = for (i ← 1 to 10) yield Future { i } - await(dispatcher.starts.get == 3)(withinMs = dispatcher.timeoutMs * 5) assertDispatcher(dispatcher)(starts = 3, stops = 2) a2.stop - await(dispatcher.stops.get == 3)(withinMs = dispatcher.timeoutMs * 5) assertDispatcher(dispatcher)(starts = 3, stops = 3) } @@ -375,7 +388,6 @@ abstract class ActorModelSpec extends JUnitSuite { } for (run ← 1 to 3) { flood(10000) - await(dispatcher.stops.get == run)(withinMs = 10000) assertDispatcher(dispatcher)(starts = run, stops = run) } } @@ -392,7 +404,7 @@ abstract class ActorModelSpec extends JUnitSuite { assert(f1.get === "foo") stopped.await for (each ← shouldBeCompleted) - assert(each.exception.get.isInstanceOf[ActorKilledException]) + assert(each.await.exception.get.isInstanceOf[ActorKilledException]) a.stop() } @@ -456,4 +468,3 @@ class BalancingDispatcherModelTest extends ActorModelSpec { def newInterceptedDispatcher = new BalancingDispatcher("foo") with MessageDispatcherInterceptor } - diff --git a/akka-actor-tests/src/test/scala/akka/testkit/CallingThreadDispatcherModelSpec.scala b/akka-actor-tests/src/test/scala/akka/testkit/CallingThreadDispatcherModelSpec.scala index 01c9eee4cf..ad8d5975c7 100644 --- a/akka-actor-tests/src/test/scala/akka/testkit/CallingThreadDispatcherModelSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/testkit/CallingThreadDispatcherModelSpec.scala @@ -34,7 +34,6 @@ class CallingThreadDispatcherModelSpec extends ActorModelSpec { } for (run ← 1 to 3) { flood(10000) - await(dispatcher.stops.get == run)(withinMs = 10000) assertDispatcher(dispatcher)(starts = run, stops = run) } }