From 6b71804de33ce913817a39cfe450fb7b7081a0c2 Mon Sep 17 00:00:00 2001 From: Arnout Engelen Date: Wed, 18 Aug 2021 10:01:26 +0200 Subject: [PATCH] Fix actor ref backpressure source spec instability (#30538) * Check exception type To get a better error message on an unexpected exception * scalafmt * Log the cause as well * Make sure 'sender' is populated Looks like the sender is required, and the test is a race condition between the two possible failures. * Nicer pattern match --- .../stream/scaladsl/ActorRefBackpressureSourceSpec.scala | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/ActorRefBackpressureSourceSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/ActorRefBackpressureSourceSpec.scala index 03fb9d6a06..204a8cce79 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/ActorRefBackpressureSourceSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/ActorRefBackpressureSourceSpec.scala @@ -49,13 +49,14 @@ class ActorRefBackpressureSourceSpec extends StreamSpec { } "fail when consumer does not await ack" in assertAllStagesStopped { + val probe = TestProbe() val (ref, s) = Source .actorRefWithBackpressure[Int](AckMsg, PartialFunction.empty, PartialFunction.empty) .toMat(TestSink.probe[Int])(Keep.both) .run() val sub = s.expectSubscription() - for (n <- 1 to 20) ref ! n + for (n <- 1 to 20) probe.send(ref, n) sub.request(1) @scala.annotation.tailrec @@ -66,7 +67,10 @@ class ActorRefBackpressureSourceSpec extends StreamSpec { s.expectNextOrError() match { case Right(`n`) => verifyNext(n + 1) case Right(x) => fail(s"expected $n, got $x") - case Left(t) => t.getMessage shouldBe "Received new element before ack was signaled back" + case Left(e: IllegalStateException) => + e.getMessage shouldBe "Received new element before ack was signaled back" + case Left(e) => + fail(s"Expected IllegalStateException, got ${e.getClass}", e) } } verifyNext(1)