From 6c79c2b60b47cbab53be88249589aa51742a3619 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Johan=20Andr=C3=A9n?= Date: Thu, 28 Jan 2021 10:34:56 +0100 Subject: [PATCH] Race condition in prefixAndTail around double materialization fixed #29965 (#29976) --- .../stream/scaladsl/FlowPrefixAndTailSpec.scala | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowPrefixAndTailSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowPrefixAndTailSpec.scala index 066b1eba75..a42ac62eb4 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowPrefixAndTailSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowPrefixAndTailSpec.scala @@ -92,21 +92,25 @@ class FlowPrefixAndTailSpec extends StreamSpec(""" "throw if tail is attempted to be materialized twice" in assertAllStagesStopped { val futureSink = newHeadSink - val fut = Source(1 to 2).prefixAndTail(1).runWith(futureSink) - val (takes, tail) = Await.result(fut, 3.seconds) - takes should be(Seq(1)) + val fut = Source(1 to 3).prefixAndTail(1).runWith(futureSink) + val (prefix, tail) = Await.result(fut, 3.seconds) + prefix should be(Seq(1)) val subscriber1 = TestSubscriber.probe[Int]() tail.to(Sink.fromSubscriber(subscriber1)).run() + // make sure it was materialized once before ... + subscriber1.ensureSubscription() + subscriber1.request(1) + subscriber1.expectNext(2) + // ... verifying what happens on a second materialization val subscriber2 = TestSubscriber.probe[Int]() tail.to(Sink.fromSubscriber(subscriber2)).run() val ex = subscriber2.expectSubscriptionAndError() ex.getMessage should ===("Substream Source(TailSource) cannot be materialized more than once") ex.getStackTrace.exists(_.getClassName contains "FlowPrefixAndTailSpec") shouldBe true - subscriber1.requestNext(2).expectComplete() - + subscriber1.requestNext(3).expectComplete() } "signal error if substream has been not subscribed in time" in assertAllStagesStopped {