finish EventFilter work and apply in three places

- fix memory visibility issue with occurrences counter
- add non-throwing awaitCond and use that for better error reporting
- move occurrence book-keeping (side-effect) out of PF guard, was
  evaluated multiple times (of course!)
- apply in cases where EventFilter.custom was used (one legitimate use
  case remains)
This commit is contained in:
Roland 2011-11-09 11:04:39 +01:00
parent c1a9475015
commit b3249e0adb
4 changed files with 68 additions and 67 deletions

View file

@ -65,7 +65,7 @@ object FSMActorSpec {
whenUnhandled {
case Ev(msg) {
log.info("unhandled event " + msg + " in state " + stateName + " with data " + stateData)
log.warning("unhandled event " + msg + " in state " + stateName + " with data " + stateData)
unhandledLatch.open
stay
}
@ -138,10 +138,7 @@ class FSMActorSpec extends AkkaSpec(Configuration("akka.actor.debug.fsm" -> true
transitionCallBackLatch.await
lockedLatch.await
filterEvents(EventFilter.custom {
case Logging.Info(_: Lock, _) true
case _ false
}) {
EventFilter.warning(start = "unhandled event", occurrences = 1) intercept {
lock ! "not_handled"
unhandledLatch.await
}
@ -200,40 +197,38 @@ class FSMActorSpec extends AkkaSpec(Configuration("akka.actor.debug.fsm" -> true
new TestKit(AkkaApplication("fsm event", AkkaApplication.defaultConfig ++
Configuration("akka.loglevel" -> "DEBUG",
"akka.actor.debug.fsm" -> true))) {
app.mainbus.publish(TestEvent.Mute(EventFilter.custom {
case _: Logging.Debug true
case _ false
}))
val fsm = TestActorRef(new Actor with LoggingFSM[Int, Null] {
startWith(1, null)
when(1) {
case Ev("go")
setTimer("t", Shutdown, 1.5 seconds, false)
goto(2)
EventFilter.debug() intercept {
val fsm = TestActorRef(new Actor with LoggingFSM[Int, Null] {
startWith(1, null)
when(1) {
case Ev("go")
setTimer("t", Shutdown, 1.5 seconds, false)
goto(2)
}
when(2) {
case Ev("stop")
cancelTimer("t")
stop
}
onTermination {
case StopEvent(r, _, _) testActor ! r
}
})
app.mainbus.subscribe(testActor, classOf[Logging.Debug])
fsm ! "go"
expectMsgPF(1 second, hint = "processing Event(go,null)") {
case Logging.Debug(`fsm`, s: String) if s.startsWith("processing Event(go,null) from Actor[testActor") true
}
when(2) {
case Ev("stop")
cancelTimer("t")
stop
expectMsg(1 second, Logging.Debug(fsm, "setting timer 't'/1500 milliseconds: Shutdown"))
expectMsg(1 second, Logging.Debug(fsm, "transition 1 -> 2"))
fsm ! "stop"
expectMsgPF(1 second, hint = "processing Event(stop,null)") {
case Logging.Debug(`fsm`, s: String) if s.startsWith("processing Event(stop,null) from Actor[testActor") true
}
onTermination {
case StopEvent(r, _, _) testActor ! r
}
})
app.mainbus.subscribe(testActor, classOf[Logging.Debug])
fsm ! "go"
expectMsgPF(1 second, hint = "processing Event(go,null)") {
case Logging.Debug(`fsm`, s: String) if s.startsWith("processing Event(go,null) from Actor[testActor") true
expectMsgAllOf(1 second, Logging.Debug(fsm, "canceling timer 't'"), Normal)
expectNoMsg(1 second)
app.mainbus.unsubscribe(testActor)
}
expectMsg(1 second, Logging.Debug(fsm, "setting timer 't'/1500 milliseconds: Shutdown"))
expectMsg(1 second, Logging.Debug(fsm, "transition 1 -> 2"))
fsm ! "stop"
expectMsgPF(1 second, hint = "processing Event(stop,null)") {
case Logging.Debug(`fsm`, s: String) if s.startsWith("processing Event(stop,null) from Actor[testActor") true
}
expectMsgAllOf(1 second, Logging.Debug(fsm, "canceling timer 't'"), Normal)
expectNoMsg(1 second)
app.mainbus.unsubscribe(testActor)
}
}

View file

@ -106,22 +106,19 @@ class FSMTimingSpec extends AkkaSpec with ImplicitSender {
}
"notify unhandled messages" in {
filterEvents(EventFilter.custom {
case Logging.Warning(`fsm`, "unhandled event Tick in state TestUnhandled") true
case Logging.Warning(`fsm`, "unhandled event Unhandled(test) in state TestUnhandled") true
case _ false
}) {
fsm ! TestUnhandled
within(1 second) {
fsm ! Tick
fsm ! SetHandler
fsm ! Tick
expectMsg(Unhandled(Tick))
fsm ! Unhandled("test")
fsm ! Cancel
expectMsg(Transition(fsm, TestUnhandled, Initial))
filterEvents(EventFilter.warning("unhandled event Tick in state TestUnhandled", source = fsm, occurrences = 1),
EventFilter.warning("unhandled event Unhandled(test) in state TestUnhandled", source = fsm, occurrences = 1)) {
fsm ! TestUnhandled
within(1 second) {
fsm ! Tick
fsm ! SetHandler
fsm ! Tick
expectMsg(Unhandled(Tick))
fsm ! Unhandled("test")
fsm ! Cancel
expectMsg(Transition(fsm, TestUnhandled, Initial))
}
}
}
}
}

View file

@ -48,11 +48,12 @@ object TestEvent {
* error messages.
*
* See the companion object for convenient factory methods.
*
*
* If the `occurrences` is set to Int.MaxValue, no tracking is done.
*/
abstract class EventFilter(occurrences: Int) {
@volatile // JMM does not guarantee visibility for non-final fields
private var todo = occurrences
/**
@ -69,7 +70,7 @@ abstract class EventFilter(occurrences: Int) {
}
def awaitDone(max: Duration): Boolean = {
if (todo != Int.MaxValue && todo > 0) TestKit.awaitCond(todo == 0, max)
if (todo != Int.MaxValue && todo > 0) TestKit.awaitCond(todo == 0, max, noThrow = true)
todo == Int.MaxValue || todo == 0
}
@ -82,7 +83,10 @@ abstract class EventFilter(occurrences: Int) {
try {
val result = code
if (!awaitDone(app.AkkaConfig.TestEventFilterLeeway))
throw new AssertionError("Timeout waiting for " + todo + " messages on " + this)
if (todo > 0)
throw new AssertionError("Timeout waiting for " + todo + " messages on " + this)
else
throw new AssertionError("Received " + (-todo) + " messages too many on " + this)
result
} finally app.mainbus publish TestEvent.UnMute(this)
}
@ -438,12 +442,12 @@ class TestEventListener extends Logging.DefaultLogger {
var filters: List[EventFilter] = Nil
override def receive: Actor.Receive = ({
case InitializeLogger(bus) Seq(classOf[Mute], classOf[UnMute]) foreach (bus.subscribe(context.self, _))
case Mute(filters) filters foreach addFilter
case UnMute(filters) filters foreach removeFilter
case event: LogEvent if filter(event)
}: Actor.Receive) orElse super.receive
override def receive = {
case InitializeLogger(bus) Seq(classOf[Mute], classOf[UnMute]) foreach (bus.subscribe(context.self, _))
case Mute(filters) filters foreach addFilter
case UnMute(filters) filters foreach removeFilter
case event: LogEvent if (!filter(event)) print(event)
}
def filter(event: LogEvent): Boolean = filters exists (f try { f(event) } catch { case e: Exception false })

View file

@ -559,19 +559,24 @@ object TestKit {
*
* Note that the timeout is scaled using Duration.timeFactor.
*/
def awaitCond(p: Boolean, max: Duration, interval: Duration = 100.millis) {
def awaitCond(p: Boolean, max: Duration, interval: Duration = 100.millis, noThrow: Boolean = false): Boolean = {
val stop = now + max
@tailrec
def poll(t: Duration) {
def poll(): Boolean = {
if (!p) {
assert(now < stop, "timeout " + max + " expired")
Thread.sleep(t.toMillis)
poll((stop - now) min interval)
}
val toSleep = stop - now
if (toSleep <= Duration.Zero) {
if (noThrow) false
else throw new AssertionError("timeout " + max + " expired")
} else {
Thread.sleep((toSleep min interval).toMillis)
poll()
}
} else true
}
poll(max min interval)
poll()
}
/**