Cleanups in typed TestProbe fishForMessage impl (#25863)

* Tighten MatchError handling in fishForMessage

Avoid a whole bunch of other code being accidentally handled.

Also it allows for the inner 'loop' method to be @tailrec (next commit).

* Guarantee match exhaustion in fishForMessage

... by introducing FishingOutcome.ContinueOutcome to the type hierarchy.

* Simplify some timeout calculation in fishForMessage

'timeout' is always finite, as it's a `FiniteDuration`, and, even if it
weren't, the calculation is safe for non-finite durations.

* Make fishForMessage's inner loop @tailrec!

Apparently @tailrec and try/catch (with a re-throw in the catch) don't
mix well.

* Avoid double handing of timeouts in fishForMessage

If 'newTimeout' is sub-zero, just loop again which will trigger the
other throw.

* Make receiveOne return Option[M] rather than nullable M

Avoid future users from forgetting to consider the null case, which
happened in fishForMessage_internal.

* Switch to pattern matching Options

* Cast properly

Thanks, type inferencer, for inferring Nothing there! o_O

* Make fish's PartialFunction convenience explicit

* Update the scaladsl fishForMessage Scaladocs

* Restore MatchError catching logic & docs

TIL partial functions aren't of class PartialFunction anymore, they're
lambda classes.

* Tweak assertFail so it can be used more

* Avoid name shadowing

... on request. (I <3 name shadowing)

* Fix formatting
This commit is contained in:
Dale Wijnand 2018-11-29 10:25:21 +00:00 committed by Patrik Nordwall
parent 52af98f1da
commit 68b5fbb2ff
3 changed files with 41 additions and 41 deletions

View file

@ -16,8 +16,9 @@ import akka.annotation.DoNotInherit
object FishingOutcome {
case object Continue extends FishingOutcome
case object ContinueAndIgnore extends FishingOutcome
sealed trait ContinueOutcome extends FishingOutcome
case object Continue extends ContinueOutcome
case object ContinueAndIgnore extends ContinueOutcome
case object Complete extends FishingOutcome
final case class Fail(error: String) extends FishingOutcome
}

View file

@ -75,7 +75,7 @@ private[akka] final class TestProbeImpl[M](name: String, system: ActorSystem[_])
override def remaining: FiniteDuration = end match {
case f: FiniteDuration f - now
case _ throw new AssertionError("`remaining` may not be called outside of `within`")
case _ assertFail("`remaining` may not be called outside of `within`")
}
override def getRemaining: java.time.Duration = remaining.asJava
@ -134,9 +134,11 @@ private[akka] final class TestProbeImpl[M](name: String, system: ActorSystem[_])
private def expectMessage_internal[T <: M](max: Duration, obj: T, hint: Option[String] = None): T = {
val o = receiveOne(max)
val hintOrEmptyString = hint.map(": " + _).getOrElse("")
assert(o != null, s"timeout ($max) during expectMessage while waiting for $obj" + hintOrEmptyString)
assert(obj == o, s"expected $obj, found $o" + hintOrEmptyString)
o.asInstanceOf[T]
o match {
case Some(m) if obj == m m.asInstanceOf[T]
case Some(m) assertFail(s"expected $obj, found $m$hintOrEmptyString")
case None assertFail(s"timeout ($max) during expectMessage while waiting for $obj$hintOrEmptyString")
}
}
/**
@ -145,8 +147,8 @@ private[akka] final class TestProbeImpl[M](name: String, system: ActorSystem[_])
*
* This method does NOT automatically scale its Duration parameter!
*/
private def receiveOne(max: Duration): M = {
val message =
private def receiveOne(max: Duration): Option[M] = {
val message = Option(
if (max == Duration.Zero) {
queue.pollFirst
} else if (max.isFinite) {
@ -154,8 +156,9 @@ private[akka] final class TestProbeImpl[M](name: String, system: ActorSystem[_])
} else {
queue.takeFirst
}
)
lastWasNoMessage = false
lastMessage = if (message == null) None else Some(message)
lastMessage = message
message
}
@ -170,15 +173,20 @@ private[akka] final class TestProbeImpl[M](name: String, system: ActorSystem[_])
private def expectNoMessage_internal(max: FiniteDuration): Unit = {
val o = receiveOne(max)
assert(o == null, s"received unexpected message $o")
lastWasNoMessage = true
o match {
case None lastWasNoMessage = true
case Some(m) assertFail(s"received unexpected message $m")
}
}
override protected def expectMessageClass_internal[C](max: FiniteDuration, c: Class[C]): C = {
val o = receiveOne(max)
assert(o != null, s"timeout ($max) during expectMessageClass waiting for $c")
assert(BoxedType(c) isInstance o, s"expected $c, found ${o.getClass} ($o)")
o.asInstanceOf[C]
val bt = BoxedType(c)
o match {
case Some(m) if bt isInstance m m.asInstanceOf[C]
case Some(m) assertFail(s"expected $c, found ${m.getClass} ($m)")
case None assertFail(s"timeout ($max) during expectMessageClass waiting for $c")
}
}
override protected def receiveN_internal(n: Int, max: FiniteDuration): immutable.Seq[M] = {
@ -192,40 +200,29 @@ private[akka] final class TestProbeImpl[M](name: String, system: ActorSystem[_])
}
override protected def fishForMessage_internal(max: FiniteDuration, hint: String, fisher: M FishingOutcome): List[M] = {
// not tailrec but that should be ok
def loop(timeout: FiniteDuration, seen: List[M]): List[M] = {
@tailrec def loop(timeout: FiniteDuration, seen: List[M]): List[M] = {
val start = System.nanoTime()
val maybeMsg = Option(receiveOne(timeout))
val maybeMsg = receiveOne(timeout)
maybeMsg match {
case Some(message)
try {
fisher(message) match {
case FishingOutcome.Complete (message :: seen).reverse
case FishingOutcome.Fail(error) throw new AssertionError(s"$error, hint: $hint")
case continue
val newTimeout =
if (timeout.isFinite()) timeout - (System.nanoTime() - start).nanos
else timeout
if (newTimeout.toMillis <= 0) {
throw new AssertionError(s"timeout ($max) during fishForMessage, seen messages ${seen.reverse}, hint: $hint")
} else {
continue match {
case FishingOutcome.Continue loop(newTimeout, message :: seen)
case FishingOutcome.ContinueAndIgnore loop(newTimeout, seen)
case _ ??? // cannot happen
}
}
}
} catch {
val outcome = try fisher(message) catch {
case ex: MatchError throw new AssertionError(
s"Unexpected message $message while fishing for messages, " +
s"seen messages ${seen.reverse}, hint: $hint", ex)
}
outcome match {
case FishingOutcome.Complete (message :: seen).reverse
case FishingOutcome.Fail(error) assertFail(s"$error, hint: $hint")
case continue: FishingOutcome.ContinueOutcome
val newTimeout = timeout - (System.nanoTime() - start).nanos
continue match {
case FishingOutcome.Continue loop(newTimeout, message :: seen)
case FishingOutcome.ContinueAndIgnore loop(newTimeout, seen)
}
}
case None
throw new AssertionError(s"timeout ($max) during fishForMessage, seen messages ${seen.reverse}, hint: $hint")
assertFail(s"timeout ($max) during fishForMessage, seen messages ${seen.reverse}, hint: $hint")
}
}
@ -282,4 +279,6 @@ private[akka] final class TestProbeImpl[M](name: String, system: ActorSystem[_])
*/
private def now: FiniteDuration = System.nanoTime.nanos
private def assertFail(msg: String): Nothing = throw new AssertionError(msg)
}

View file

@ -175,8 +175,8 @@ object TestProbe {
* * [[FishingOutcomes.complete]] - successfully complete and return the message
* * [[FishingOutcomes.fail]] - fail the test with a custom message
*
* Additionally failures includes the list of messages consumed. If a message of type `M` but not of type `T` is
* received this will also fail the test, additionally if the `fisher` function throws a match error the error
* Additionally failures includes the list of messages consumed.
* If the `fisher` function throws a match error the error
* is decorated with some fishing details and the test is failed (making it convenient to use this method with a
* partial function).
*