diff --git a/akka-actor-tests/src/test/scala/akka/actor/FSMTimingSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/FSMTimingSpec.scala index 094bd4f196..df255f619f 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/FSMTimingSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/FSMTimingSpec.scala @@ -4,9 +4,10 @@ package akka.actor -import akka.testkit.{ AkkaSpec, ImplicitSender } +import akka.testkit._ import akka.util.Duration import akka.util.duration._ +import akka.event.Logging @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) class FSMTimingSpec extends AkkaSpec with ImplicitSender { @@ -15,7 +16,7 @@ class FSMTimingSpec extends AkkaSpec with ImplicitSender { val fsm = actorOf(new StateMachine(testActor)) fsm ! SubscribeTransitionCallBack(testActor) - expectMsg(200 millis, CurrentState(fsm, Initial)) + expectMsg(1 second, CurrentState(fsm, Initial)) ignoreMsg { case Transition(_, Initial, _) ⇒ true @@ -24,8 +25,20 @@ class FSMTimingSpec extends AkkaSpec with ImplicitSender { "A Finite State Machine" must { "receive StateTimeout" in { - within(50 millis, 250 millis) { + within(1 second) { + within(500 millis, 1 second) { + fsm ! TestStateTimeout + expectMsg(Transition(fsm, TestStateTimeout, Initial)) + } + expectNoMsg + } + } + + "cancel a StateTimeout" in { + within(1 second) { fsm ! TestStateTimeout + fsm ! Cancel + expectMsg(Cancel) expectMsg(Transition(fsm, TestStateTimeout, Initial)) expectNoMsg } @@ -36,7 +49,7 @@ class FSMTimingSpec extends AkkaSpec with ImplicitSender { fsm ! TestStateTimeoutOverride expectNoMsg } - within(50 millis) { + within(500 millis) { fsm ! Cancel expectMsg(Cancel) expectMsg(Transition(fsm, TestStateTimeout, Initial)) @@ -44,70 +57,70 @@ class FSMTimingSpec extends AkkaSpec with ImplicitSender { } "receive single-shot timer" in { - within(50 millis, 250 millis) { - fsm ! TestSingleTimer - expectMsg(Tick) - expectMsg(Transition(fsm, TestSingleTimer, Initial)) + within(1.5 seconds) { + within(500 millis, 1 second) { + fsm ! TestSingleTimer + expectMsg(Tick) + expectMsg(Transition(fsm, TestSingleTimer, Initial)) + } expectNoMsg } } "correctly cancel a named timer" in { fsm ! TestCancelTimer - within(100 millis, 200 millis) { + within(500 millis) { fsm ! Tick expectMsg(Tick) + } + within(300 millis, 1 second) { expectMsg(Tock) } fsm ! Cancel - expectMsg(Transition(fsm, TestCancelTimer, Initial)) + expectMsg(1 second, Transition(fsm, TestCancelTimer, Initial)) } "not get confused between named and state timers" in { fsm ! TestCancelStateTimerInNamedTimerMessage fsm ! Tick - expectMsg(100 millis, Tick) - Thread.sleep(200) + expectMsg(500 millis, Tick) + Thread.sleep(200) // this is ugly: need to wait for StateTimeout to be queued resume(fsm) - expectMsg(100 millis, Transition(fsm, TestCancelStateTimerInNamedTimerMessage, TestCancelStateTimerInNamedTimerMessage2)) + expectMsg(500 millis, Transition(fsm, TestCancelStateTimerInNamedTimerMessage, TestCancelStateTimerInNamedTimerMessage2)) fsm ! Cancel - within(100 millis) { - expectMsg(Cancel) + within(500 millis) { + expectMsg(Cancel) // if this is not received, that means StateTimeout was not properly discarded expectMsg(Transition(fsm, TestCancelStateTimerInNamedTimerMessage2, Initial)) } } "receive and cancel a repeated timer" in { fsm ! TestRepeatedTimer - val seq = receiveWhile(600 millis) { + val seq = receiveWhile(1 second) { case Tick ⇒ Tick } - seq must have length (5) - within(250 millis) { + seq must have length 5 + within(500 millis) { expectMsg(Transition(fsm, TestRepeatedTimer, Initial)) - expectNoMsg } } "notify unhandled messages" in { - fsm ! TestUnhandled - within(200 millis) { - fsm ! Tick - expectNoMsg - } - within(200 millis) { - fsm ! SetHandler - fsm ! Tick - expectMsg(Unhandled(Tick)) - expectNoMsg - } - within(200 millis) { - fsm ! Unhandled("test") - expectNoMsg - } - within(200 millis) { - fsm ! Cancel - expectMsg(Transition(fsm, TestUnhandled, Initial)) + filterEvents(EventFilter.custom { + case Logging.Warning(`fsm`, "unhandled event Tick in state TestUnhandled") ⇒ true + case Logging.Warning(`fsm`, "unhandled event Unhandled(test) in state TestUnhandled") ⇒ true + case _ ⇒ false + }) { + fsm ! TestUnhandled + within(1 second) { + fsm ! Tick + fsm ! SetHandler + fsm ! Tick + expectMsg(Unhandled(Tick)) + fsm ! Unhandled("test") + fsm ! Cancel + expectMsg(Transition(fsm, TestUnhandled, Initial)) + } } } @@ -151,7 +164,7 @@ object FSMTimingSpec { startWith(Initial, 0) when(Initial) { case Ev(TestSingleTimer) ⇒ - setTimer("tester", Tick, 100 millis, false) + setTimer("tester", Tick, 500 millis, false) goto(TestSingleTimer) case Ev(TestRepeatedTimer) ⇒ setTimer("tester", Tick, 100 millis, true) @@ -160,7 +173,7 @@ object FSMTimingSpec { goto(TestStateTimeout) forMax (Duration.Inf) case Ev(x: FSMTimingSpec.State) ⇒ goto(x) } - when(TestStateTimeout, stateTimeout = 100 millis) { + when(TestStateTimeout, stateTimeout = 500 millis) { case Ev(StateTimeout) ⇒ goto(Initial) case Ev(Cancel) ⇒ goto(Initial) replying (Cancel) } @@ -173,9 +186,9 @@ object FSMTimingSpec { case Ev(Tick) ⇒ tester ! Tick setTimer("hallo", Tock, 1 milli, false) - Thread.sleep(10); + TestKit.awaitCond(context.hasMessages, 1 second) cancelTimer("hallo") - setTimer("hallo", Tock, 100 millis, false) + setTimer("hallo", Tock, 500 millis, false) stay case Ev(Tock) ⇒ tester ! Tock @@ -195,11 +208,12 @@ object FSMTimingSpec { } } when(TestCancelStateTimerInNamedTimerMessage) { - // FSM is suspended after processing this message and resumed 200ms later + // FSM is suspended after processing this message and resumed 500ms later case Ev(Tick) ⇒ suspend(self) - setTimer("named", Tock, 10 millis, false) - stay forMax (100 millis) replying Tick + setTimer("named", Tock, 1 millis, false) + TestKit.awaitCond(context.hasMessages, 1 second) + stay forMax (1 millis) replying Tick case Ev(Tock) ⇒ goto(TestCancelStateTimerInNamedTimerMessage2) } diff --git a/akka-actor/src/main/scala/akka/actor/ActorCell.scala b/akka-actor/src/main/scala/akka/actor/ActorCell.scala index 180cbe5df7..b04791a6b7 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorCell.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorCell.scala @@ -23,6 +23,8 @@ private[akka] trait ActorContext extends ActorRefFactory with TypedActorFactory def self: ActorRef with ScalaActorRef + def hasMessages: Boolean + def receiveTimeout: Option[Long] def receiveTimeout_=(timeout: Option[Long]): Unit @@ -95,6 +97,8 @@ private[akka] class ActorCell( @volatile //This must be volatile since it isn't protected by the mailbox status var mailbox: Mailbox = _ + def hasMessages: Boolean = mailbox.hasMessages + def start(): Unit = { mailbox = dispatcher.createMailbox(this) diff --git a/akka-testkit/src/main/scala/akka/testkit/TestKit.scala b/akka-testkit/src/main/scala/akka/testkit/TestKit.scala index 1f4f0d680e..cb788a66d1 100644 --- a/akka-testkit/src/main/scala/akka/testkit/TestKit.scala +++ b/akka-testkit/src/main/scala/akka/testkit/TestKit.scala @@ -555,6 +555,36 @@ class TestKit(_app: AkkaApplication) { object TestKit { private[testkit] val testActorId = new AtomicInteger(0) + + /** + * Block until the given condition evaluates to `true` or the timeout + * expires, whichever comes first. + * + * If no timeout is given, take it from the innermost enclosing `within` + * block. + * + * Note that the timeout is scaled using Duration.timeFactor. + */ + def awaitCond(p: ⇒ Boolean, max: Duration, interval: Duration = 100.millis) { + val stop = now + max + + @tailrec + def poll(t: Duration) { + if (!p) { + assert(now < stop, "timeout " + max + " expired") + Thread.sleep(t.toMillis) + poll((stop - now) min interval) + } + } + + poll(max min interval) + } + + /** + * Obtain current timestamp as Duration for relative measurements (using System.nanoTime). + */ + def now: Duration = System.nanoTime().nanos + } /**