From b3249e0adb424ab036fef9e32d059eb2632f70d4 Mon Sep 17 00:00:00 2001 From: Roland Date: Wed, 9 Nov 2011 11:04:39 +0100 Subject: [PATCH] finish EventFilter work and apply in three places - fix memory visibility issue with occurrences counter - add non-throwing awaitCond and use that for better error reporting - move occurrence book-keeping (side-effect) out of PF guard, was evaluated multiple times (of course!) - apply in cases where EventFilter.custom was used (one legitimate use case remains) --- .../test/scala/akka/actor/FSMActorSpec.scala | 67 +++++++++---------- .../test/scala/akka/actor/FSMTimingSpec.scala | 27 ++++---- .../akka/testkit/TestEventListener.scala | 22 +++--- .../src/main/scala/akka/testkit/TestKit.scala | 19 ++++-- 4 files changed, 68 insertions(+), 67 deletions(-) diff --git a/akka-actor-tests/src/test/scala/akka/actor/FSMActorSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/FSMActorSpec.scala index b6e972513f..05716ea04b 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/FSMActorSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/FSMActorSpec.scala @@ -65,7 +65,7 @@ object FSMActorSpec { whenUnhandled { case Ev(msg) ⇒ { - log.info("unhandled event " + msg + " in state " + stateName + " with data " + stateData) + log.warning("unhandled event " + msg + " in state " + stateName + " with data " + stateData) unhandledLatch.open stay } @@ -138,10 +138,7 @@ class FSMActorSpec extends AkkaSpec(Configuration("akka.actor.debug.fsm" -> true transitionCallBackLatch.await lockedLatch.await - filterEvents(EventFilter.custom { - case Logging.Info(_: Lock, _) ⇒ true - case _ ⇒ false - }) { + EventFilter.warning(start = "unhandled event", occurrences = 1) intercept { lock ! "not_handled" unhandledLatch.await } @@ -200,40 +197,38 @@ class FSMActorSpec extends AkkaSpec(Configuration("akka.actor.debug.fsm" -> true new TestKit(AkkaApplication("fsm event", AkkaApplication.defaultConfig ++ Configuration("akka.loglevel" -> "DEBUG", "akka.actor.debug.fsm" -> true))) { - app.mainbus.publish(TestEvent.Mute(EventFilter.custom { - case _: Logging.Debug ⇒ true - case _ ⇒ false - })) - val fsm = TestActorRef(new Actor with LoggingFSM[Int, Null] { - startWith(1, null) - when(1) { - case Ev("go") ⇒ - setTimer("t", Shutdown, 1.5 seconds, false) - goto(2) + EventFilter.debug() intercept { + val fsm = TestActorRef(new Actor with LoggingFSM[Int, Null] { + startWith(1, null) + when(1) { + case Ev("go") ⇒ + setTimer("t", Shutdown, 1.5 seconds, false) + goto(2) + } + when(2) { + case Ev("stop") ⇒ + cancelTimer("t") + stop + } + onTermination { + case StopEvent(r, _, _) ⇒ testActor ! r + } + }) + app.mainbus.subscribe(testActor, classOf[Logging.Debug]) + fsm ! "go" + expectMsgPF(1 second, hint = "processing Event(go,null)") { + case Logging.Debug(`fsm`, s: String) if s.startsWith("processing Event(go,null) from Actor[testActor") ⇒ true } - when(2) { - case Ev("stop") ⇒ - cancelTimer("t") - stop + expectMsg(1 second, Logging.Debug(fsm, "setting timer 't'/1500 milliseconds: Shutdown")) + expectMsg(1 second, Logging.Debug(fsm, "transition 1 -> 2")) + fsm ! "stop" + expectMsgPF(1 second, hint = "processing Event(stop,null)") { + case Logging.Debug(`fsm`, s: String) if s.startsWith("processing Event(stop,null) from Actor[testActor") ⇒ true } - onTermination { - case StopEvent(r, _, _) ⇒ testActor ! r - } - }) - app.mainbus.subscribe(testActor, classOf[Logging.Debug]) - fsm ! "go" - expectMsgPF(1 second, hint = "processing Event(go,null)") { - case Logging.Debug(`fsm`, s: String) if s.startsWith("processing Event(go,null) from Actor[testActor") ⇒ true + expectMsgAllOf(1 second, Logging.Debug(fsm, "canceling timer 't'"), Normal) + expectNoMsg(1 second) + app.mainbus.unsubscribe(testActor) } - expectMsg(1 second, Logging.Debug(fsm, "setting timer 't'/1500 milliseconds: Shutdown")) - expectMsg(1 second, Logging.Debug(fsm, "transition 1 -> 2")) - fsm ! "stop" - expectMsgPF(1 second, hint = "processing Event(stop,null)") { - case Logging.Debug(`fsm`, s: String) if s.startsWith("processing Event(stop,null) from Actor[testActor") ⇒ true - } - expectMsgAllOf(1 second, Logging.Debug(fsm, "canceling timer 't'"), Normal) - expectNoMsg(1 second) - app.mainbus.unsubscribe(testActor) } } diff --git a/akka-actor-tests/src/test/scala/akka/actor/FSMTimingSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/FSMTimingSpec.scala index df255f619f..bfda48fba9 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/FSMTimingSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/FSMTimingSpec.scala @@ -106,22 +106,19 @@ class FSMTimingSpec extends AkkaSpec with ImplicitSender { } "notify unhandled messages" in { - filterEvents(EventFilter.custom { - case Logging.Warning(`fsm`, "unhandled event Tick in state TestUnhandled") ⇒ true - case Logging.Warning(`fsm`, "unhandled event Unhandled(test) in state TestUnhandled") ⇒ true - case _ ⇒ false - }) { - fsm ! TestUnhandled - within(1 second) { - fsm ! Tick - fsm ! SetHandler - fsm ! Tick - expectMsg(Unhandled(Tick)) - fsm ! Unhandled("test") - fsm ! Cancel - expectMsg(Transition(fsm, TestUnhandled, Initial)) + filterEvents(EventFilter.warning("unhandled event Tick in state TestUnhandled", source = fsm, occurrences = 1), + EventFilter.warning("unhandled event Unhandled(test) in state TestUnhandled", source = fsm, occurrences = 1)) { + fsm ! TestUnhandled + within(1 second) { + fsm ! Tick + fsm ! SetHandler + fsm ! Tick + expectMsg(Unhandled(Tick)) + fsm ! Unhandled("test") + fsm ! Cancel + expectMsg(Transition(fsm, TestUnhandled, Initial)) + } } - } } } diff --git a/akka-testkit/src/main/scala/akka/testkit/TestEventListener.scala b/akka-testkit/src/main/scala/akka/testkit/TestEventListener.scala index 9c4e421fa9..1180fc52f8 100644 --- a/akka-testkit/src/main/scala/akka/testkit/TestEventListener.scala +++ b/akka-testkit/src/main/scala/akka/testkit/TestEventListener.scala @@ -48,11 +48,12 @@ object TestEvent { * error messages. * * See the companion object for convenient factory methods. - * + * * If the `occurrences` is set to Int.MaxValue, no tracking is done. */ abstract class EventFilter(occurrences: Int) { + @volatile // JMM does not guarantee visibility for non-final fields private var todo = occurrences /** @@ -69,7 +70,7 @@ abstract class EventFilter(occurrences: Int) { } def awaitDone(max: Duration): Boolean = { - if (todo != Int.MaxValue && todo > 0) TestKit.awaitCond(todo == 0, max) + if (todo != Int.MaxValue && todo > 0) TestKit.awaitCond(todo == 0, max, noThrow = true) todo == Int.MaxValue || todo == 0 } @@ -82,7 +83,10 @@ abstract class EventFilter(occurrences: Int) { try { val result = code if (!awaitDone(app.AkkaConfig.TestEventFilterLeeway)) - throw new AssertionError("Timeout waiting for " + todo + " messages on " + this) + if (todo > 0) + throw new AssertionError("Timeout waiting for " + todo + " messages on " + this) + else + throw new AssertionError("Received " + (-todo) + " messages too many on " + this) result } finally app.mainbus publish TestEvent.UnMute(this) } @@ -438,12 +442,12 @@ class TestEventListener extends Logging.DefaultLogger { var filters: List[EventFilter] = Nil - override def receive: Actor.Receive = ({ - case InitializeLogger(bus) ⇒ Seq(classOf[Mute], classOf[UnMute]) foreach (bus.subscribe(context.self, _)) - case Mute(filters) ⇒ filters foreach addFilter - case UnMute(filters) ⇒ filters foreach removeFilter - case event: LogEvent if filter(event) ⇒ - }: Actor.Receive) orElse super.receive + override def receive = { + case InitializeLogger(bus) ⇒ Seq(classOf[Mute], classOf[UnMute]) foreach (bus.subscribe(context.self, _)) + case Mute(filters) ⇒ filters foreach addFilter + case UnMute(filters) ⇒ filters foreach removeFilter + case event: LogEvent ⇒ if (!filter(event)) print(event) + } def filter(event: LogEvent): Boolean = filters exists (f ⇒ try { f(event) } catch { case e: Exception ⇒ false }) diff --git a/akka-testkit/src/main/scala/akka/testkit/TestKit.scala b/akka-testkit/src/main/scala/akka/testkit/TestKit.scala index d0cee3c740..e8d82a4650 100644 --- a/akka-testkit/src/main/scala/akka/testkit/TestKit.scala +++ b/akka-testkit/src/main/scala/akka/testkit/TestKit.scala @@ -559,19 +559,24 @@ object TestKit { * * Note that the timeout is scaled using Duration.timeFactor. */ - def awaitCond(p: ⇒ Boolean, max: Duration, interval: Duration = 100.millis) { + def awaitCond(p: ⇒ Boolean, max: Duration, interval: Duration = 100.millis, noThrow: Boolean = false): Boolean = { val stop = now + max @tailrec - def poll(t: Duration) { + def poll(): Boolean = { if (!p) { - assert(now < stop, "timeout " + max + " expired") - Thread.sleep(t.toMillis) - poll((stop - now) min interval) - } + val toSleep = stop - now + if (toSleep <= Duration.Zero) { + if (noThrow) false + else throw new AssertionError("timeout " + max + " expired") + } else { + Thread.sleep((toSleep min interval).toMillis) + poll() + } + } else true } - poll(max min interval) + poll() } /**