diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/ActorRefBackpressureSourceSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/ActorRefBackpressureSourceSpec.scala index 03fb9d6a06..204a8cce79 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/ActorRefBackpressureSourceSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/ActorRefBackpressureSourceSpec.scala @@ -49,13 +49,14 @@ class ActorRefBackpressureSourceSpec extends StreamSpec { } "fail when consumer does not await ack" in assertAllStagesStopped { + val probe = TestProbe() val (ref, s) = Source .actorRefWithBackpressure[Int](AckMsg, PartialFunction.empty, PartialFunction.empty) .toMat(TestSink.probe[Int])(Keep.both) .run() val sub = s.expectSubscription() - for (n <- 1 to 20) ref ! n + for (n <- 1 to 20) probe.send(ref, n) sub.request(1) @scala.annotation.tailrec @@ -66,7 +67,10 @@ class ActorRefBackpressureSourceSpec extends StreamSpec { s.expectNextOrError() match { case Right(`n`) => verifyNext(n + 1) case Right(x) => fail(s"expected $n, got $x") - case Left(t) => t.getMessage shouldBe "Received new element before ack was signaled back" + case Left(e: IllegalStateException) => + e.getMessage shouldBe "Received new element before ack was signaled back" + case Left(e) => + fail(s"Expected IllegalStateException, got ${e.getClass}", e) } } verifyNext(1)