Fix actor ref backpressure source spec instability (#30538)

* Check exception type

To get a better error message on an unexpected exception

* scalafmt

* Log the cause as well

* Make sure 'sender' is populated

Looks like the sender is required, and the test is a race
condition between the two possible failures.

* Nicer pattern match
This commit is contained in:
Arnout Engelen 2021-08-18 10:01:26 +02:00 committed by GitHub
parent 4c732e9865
commit 6b71804de3
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23

View file

@ -49,13 +49,14 @@ class ActorRefBackpressureSourceSpec extends StreamSpec {
} }
"fail when consumer does not await ack" in assertAllStagesStopped { "fail when consumer does not await ack" in assertAllStagesStopped {
val probe = TestProbe()
val (ref, s) = Source val (ref, s) = Source
.actorRefWithBackpressure[Int](AckMsg, PartialFunction.empty, PartialFunction.empty) .actorRefWithBackpressure[Int](AckMsg, PartialFunction.empty, PartialFunction.empty)
.toMat(TestSink.probe[Int])(Keep.both) .toMat(TestSink.probe[Int])(Keep.both)
.run() .run()
val sub = s.expectSubscription() val sub = s.expectSubscription()
for (n <- 1 to 20) ref ! n for (n <- 1 to 20) probe.send(ref, n)
sub.request(1) sub.request(1)
@scala.annotation.tailrec @scala.annotation.tailrec
@ -66,7 +67,10 @@ class ActorRefBackpressureSourceSpec extends StreamSpec {
s.expectNextOrError() match { s.expectNextOrError() match {
case Right(`n`) => verifyNext(n + 1) case Right(`n`) => verifyNext(n + 1)
case Right(x) => fail(s"expected $n, got $x") case Right(x) => fail(s"expected $n, got $x")
case Left(t) => t.getMessage shouldBe "Received new element before ack was signaled back" case Left(e: IllegalStateException) =>
e.getMessage shouldBe "Received new element before ack was signaled back"
case Left(e) =>
fail(s"Expected IllegalStateException, got ${e.getClass}", e)
} }
} }
verifyNext(1) verifyNext(1)