Too many Tocks in typed TimerSpec #24360

This commit is contained in:
Johan Andrén 2018-02-13 18:08:14 +01:00 committed by GitHub
parent 1cd370be60
commit 3bd3f00c41
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 206 additions and 10 deletions

View file

@ -11,6 +11,7 @@ import scala.concurrent.duration._
import scala.util.control.NoStackTrace
import akka.actor.typed.scaladsl.Behaviors
import akka.actor.typed.scaladsl.TimerScheduler
import akka.testkit.TimingTest
import akka.testkit.typed.TestKitSettings
import akka.testkit.typed.TestKit
import akka.testkit.typed.scaladsl._
@ -32,6 +33,7 @@ class TimerSpec extends TestKit("TimerSpec")
case class Tock(n: Int) extends Event
case class GotPostStop(timerActive: Boolean) extends Event
case class GotPreRestart(timerActive: Boolean) extends Event
case object Cancelled extends Event
class Exc extends RuntimeException("simulated exc") with NoStackTrace
@ -60,6 +62,7 @@ class TimerSpec extends TestKit("TimerSpec")
Behaviors.stopped
case Cancel
timer.cancel("T")
monitor ! Cancelled
Behaviors.same
case Throw(e)
throw e
@ -78,7 +81,7 @@ class TimerSpec extends TestKit("TimerSpec")
}
"A timer" must {
"schedule non-repeated ticks" in {
"schedule non-repeated ticks" taggedAs TimingTest in {
val probe = TestProbe[Event]("evt")
val behv = Behaviors.withTimers[Command] { timer
timer.startSingleTimer("T", Tick(1), 10.millis)
@ -93,7 +96,7 @@ class TimerSpec extends TestKit("TimerSpec")
probe.expectMessage(GotPostStop(false))
}
"schedule repeated ticks" in {
"schedule repeated ticks" taggedAs TimingTest in {
val probe = TestProbe[Event]("evt")
val behv = Behaviors.withTimers[Command] { timer
timer.startPeriodicTimer("T", Tick(1), interval)
@ -111,7 +114,7 @@ class TimerSpec extends TestKit("TimerSpec")
probe.expectMessage(GotPostStop(false))
}
"replace timer" in {
"replace timer" taggedAs TimingTest in {
val probe = TestProbe[Event]("evt")
val behv = Behaviors.withTimers[Command] { timer
timer.startPeriodicTimer("T", Tick(1), interval)
@ -131,7 +134,7 @@ class TimerSpec extends TestKit("TimerSpec")
probe.expectMessage(GotPostStop(false))
}
"cancel timer" in {
"cancel timer" taggedAs TimingTest in {
val probe = TestProbe[Event]("evt")
val behv = Behaviors.withTimers[Command] { timer
timer.startPeriodicTimer("T", Tick(1), interval)
@ -141,13 +144,20 @@ class TimerSpec extends TestKit("TimerSpec")
val ref = spawn(behv)
probe.expectMessage(Tock(1))
ref ! Cancel
probe.fishForMessage(3.seconds) {
// we don't know that we will see exactly one tock
case _: Tock FishingOutcomes.Continue
// but we know that after we saw Cancelled we won't see any more
case Cancelled FishingOutcomes.Complete
case msg FishingOutcomes.Fail(s"unexpected msg: $msg")
}
probe.expectNoMessage(interval + 100.millis.dilated)
ref ! End
probe.expectMessage(GotPostStop(false))
}
"discard timers from old incarnation after restart, alt 1" in {
"discard timers from old incarnation after restart, alt 1" taggedAs TimingTest in {
val probe = TestProbe[Event]("evt")
val startCounter = new AtomicInteger(0)
val behv = Behaviors.supervise(Behaviors.withTimers[Command] { timer
@ -171,7 +181,7 @@ class TimerSpec extends TestKit("TimerSpec")
probe.expectMessage(GotPostStop(false))
}
"discard timers from old incarnation after restart, alt 2" in {
"discard timers from old incarnation after restart, alt 2" taggedAs TimingTest in {
val probe = TestProbe[Event]("evt")
val behv = Behaviors.supervise(Behaviors.withTimers[Command] { timer
timer.startPeriodicTimer("T", Tick(1), interval)
@ -197,7 +207,7 @@ class TimerSpec extends TestKit("TimerSpec")
probe.expectMessage(GotPostStop(false))
}
"cancel timers when stopped from exception" in {
"cancel timers when stopped from exception" taggedAs TimingTest in {
val probe = TestProbe[Event]()
val behv = Behaviors.withTimers[Command] { timer
timer.startPeriodicTimer("T", Tick(1), interval)
@ -208,7 +218,7 @@ class TimerSpec extends TestKit("TimerSpec")
probe.expectMessage(GotPostStop(false))
}
"cancel timers when stopped voluntarily" in {
"cancel timers when stopped voluntarily" taggedAs TimingTest in {
val probe = TestProbe[Event]()
val behv = Behaviors.withTimers[Command] { timer
timer.startPeriodicTimer("T", Tick(1), interval)

View file

@ -3,7 +3,7 @@ package akka.testkit.typed
import akka.actor.typed.scaladsl.Behaviors
import akka.actor.typed.scaladsl.AskPattern._
import akka.actor.typed.{ ActorRef, ActorSystem, Behavior, Props }
import akka.annotation.ApiMayChange
import akka.annotation.{ ApiMayChange, DoNotInherit }
import akka.testkit.typed.TestKit._
import akka.util.Timeout
import com.typesafe.config.Config
@ -17,6 +17,14 @@ import scala.util.control.NoStackTrace
*/
case class TE(message: String) extends RuntimeException(message) with NoStackTrace
/**
* Not for user extension.
*
* Instances are available from `FishingOutcomes` in the respective dsls: [[akka.testkit.typed.scaladsl.FishingOutcomes]]
* and [[akka.testkit.typed.javadsl.FishingOutcomes]]
*/
@DoNotInherit abstract class FishingOutcome private[akka] ()
object TestKit {
private[akka] sealed trait TestKitCommand

View file

@ -4,6 +4,28 @@
package akka.testkit.typed.javadsl
import akka.actor.typed.ActorSystem
import akka.testkit.typed.FishingOutcome
import scala.concurrent.duration.FiniteDuration
import scala.collection.JavaConverters._
object FishingOutcomes {
/**
* Consume this message and continue with the next
*/
def continue(): FishingOutcome = akka.testkit.typed.scaladsl.FishingOutcomes.Continue
/**
* Complete fishing and return this message
*/
def complete(): FishingOutcome = akka.testkit.typed.scaladsl.FishingOutcomes.Complete
/**
* Fail fishing with a custom error message
*/
def fail(error: String): FishingOutcome = akka.testkit.typed.scaladsl.FishingOutcomes.Fail(error)
}
/**
* Java API:
@ -18,4 +40,31 @@ class TestProbe[M](name: String, system: ActorSystem[_]) extends akka.testkit.ty
def expectMessageType[T <: M](t: Class[T]): T =
expectMessageClass_internal(remainingOrDefault, t)
/**
* Java API: Allows for flexible matching of multiple messages within a timeout, the fisher function is fed each incoming
* message, and returns one of the following effects to decide on what happens next:
*
* * [[FishingOutcomes.continue()]] - continue with the next message given that the timeout has not been reached
* * [[FishingOutcomes.complete()]] - successfully complete and return the message
* * [[FishingOutcomes.fail(errorMsg)]] - fail the test with a custom message
*
* Additionally failures includes the list of messages consumed. If a message of type `M` but not of type `T` is
* received this will also fail the test, additionally if the `fisher` function throws a match error the error
* is decorated with some fishing details and the test is failed (making it convenient to use this method with a
* partial function).
*
* @param max Max total time without the fisher function returning `CompleteFishing` before failing
* The timeout is dilated.
* @return The messages accepted in the order they arrived
*/
// FIXME same name would cause ambiguity but I'm out of ideas how to fix, separate Scala/Java TestProbe APIs?
def fishForMessageJava(max: FiniteDuration, fisher: java.util.function.Function[M, FishingOutcome]): java.util.List[M] =
fishForMessage(max)(fisher.apply).asJava
/**
* Same as the other `fishForMessageJava` but includes the provided hint in all error messages
*/
def fishForMessageJava(max: FiniteDuration, hint: String, fisher: java.util.function.Function[M, FishingOutcome]): java.util.List[M] =
fishForMessage(max, hint)(fisher.apply).asJava
}

View file

@ -11,11 +11,12 @@ import akka.actor.typed.scaladsl.Behaviors
import java.util.concurrent.LinkedBlockingDeque
import java.util.concurrent.atomic.AtomicInteger
import akka.annotation.DoNotInherit
import akka.util.Timeout
import akka.util.PrettyDuration.PrettyPrintableDuration
import scala.concurrent.Await
import akka.testkit.typed.TestKitSettings
import akka.testkit.typed.{ FishingOutcome, TestKitSettings }
import akka.util.BoxedType
import scala.annotation.tailrec
@ -45,6 +46,29 @@ object TestProbe {
}
}
object FishingOutcomes {
/**
* Consume this message, collect it into the result, and continue with the next message
*/
case object Continue extends FishingOutcome
/**
* Consume this message, but do not collect it into the result, and continue with the next message
*/
case object ContinueAndIgnore extends FishingOutcome
/**
* Complete fishing and return this message
*/
case object Complete extends FishingOutcome
/**
* Fail fishing with a custom error message
*/
case class Fail(error: String) extends FishingOutcome
}
class TestProbe[M](name: String)(implicit system: ActorSystem[_]) {
import TestProbe._
@ -247,6 +271,56 @@ class TestProbe[M](name: String)(implicit system: ActorSystem[_]) {
o.asInstanceOf[C]
}
/**
* Allows for flexible matching of multiple messages within a timeout, the fisher function is fed each incoming
* message, and returns one of the following effects to decide on what happens next:
*
* * [[FishingOutcomes.Continue]] - continue with the next message given that the timeout has not been reached
* * [[FishingOutcomes.Complete]] - successfully complete and return the message
* * [[FishingOutcomes.Fail]] - fail the test with a custom message
*
* Additionally failures includes the list of messages consumed. If a message of type `M` but not of type `T` is
* received this will also fail the test, additionally if the `fisher` function throws a match error the error
* is decorated with some fishing details and the test is failed (making it convenient to use this method with a
* partial function).
*
* @param max Max total time without the fisher function returning `CompleteFishing` before failing
* The timeout is dilated.
* @return The messages accepted in the order they arrived
*/
def fishForMessage(max: FiniteDuration, hint: String = "")(fisher: M FishingOutcome): List[M] = {
// not tailrec but that should be ok
def loop(timeout: FiniteDuration, seen: List[M]): List[M] = {
val start = System.nanoTime()
val msg = receiveOne(timeout)
try {
fisher(msg) match {
case FishingOutcomes.Complete (msg :: seen).reverse
case FishingOutcomes.Fail(error) throw new AssertionError(s"$error, hint: $hint")
case continue
val newTimeout =
if (timeout.isFinite()) timeout - (System.nanoTime() - start).nanos
else timeout
if (newTimeout.toMillis <= 0) {
throw new AssertionError(s"timeout ($max) during fishForMessage, seen messages ${seen.reverse}, hint: $hint")
} else {
continue match {
case FishingOutcomes.Continue loop(newTimeout, msg :: seen)
case FishingOutcomes.ContinueAndIgnore loop(newTimeout, seen)
}
}
}
} catch {
case ex: MatchError throw new AssertionError(
s"Unexpected message $msg while fishing for messages, " +
s"seen messages ${seen.reverse}, hint: $hint", ex)
}
}
loop(max.dilated, Nil)
}
/**
* Expect the given actor to be stopped or stop withing the given timeout or
* throw an [[AssertionError]].

View file

@ -36,6 +36,61 @@ class TestProbeSpec extends TestKit with WordSpecLike with Matchers with BeforeA
probe.expectTerminated(ref, 500.millis)
}
"allow fishing for message" in {
val probe = TestProbe[String]()
probe.ref ! "one"
probe.ref ! "two"
val result = probe.fishForMessage(300.millis) {
case "one" FishingOutcomes.Continue
case "two" FishingOutcomes.Complete
}
result should ===(List("one", "two"))
}
"allow failing when fishing for message" in {
val probe = TestProbe[String]()
probe.ref ! "one"
probe.ref ! "two"
intercept[AssertionError] {
probe.fishForMessage(300.millis) {
case "one" FishingOutcomes.Continue
case "two" FishingOutcomes.Fail("not the fish I'm looking for")
}
}
}
"fail for unknown message when fishing for messages" in {
val probe = TestProbe[String]()
probe.ref ! "one"
probe.ref ! "two"
intercept[AssertionError] {
probe.fishForMessage(300.millis) {
case "one" FishingOutcomes.Continue
}
}
}
"time out when fishing for messages" in {
val probe = TestProbe[String]()
probe.ref ! "one"
intercept[AssertionError] {
probe.fishForMessage(300.millis) {
case "one" FishingOutcomes.Continue
}
}
}
}
override protected def afterAll(): Unit = {