Race condition in prefixAndTail around double materialization fixed #29965 (#29976)

This commit is contained in:
Johan Andrén 2021-01-28 10:34:56 +01:00 committed by GitHub
parent 7d25067568
commit 6c79c2b60b
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23

View file

@ -92,21 +92,25 @@ class FlowPrefixAndTailSpec extends StreamSpec("""
"throw if tail is attempted to be materialized twice" in assertAllStagesStopped {
val futureSink = newHeadSink
val fut = Source(1 to 2).prefixAndTail(1).runWith(futureSink)
val (takes, tail) = Await.result(fut, 3.seconds)
takes should be(Seq(1))
val fut = Source(1 to 3).prefixAndTail(1).runWith(futureSink)
val (prefix, tail) = Await.result(fut, 3.seconds)
prefix should be(Seq(1))
val subscriber1 = TestSubscriber.probe[Int]()
tail.to(Sink.fromSubscriber(subscriber1)).run()
// make sure it was materialized once before ...
subscriber1.ensureSubscription()
subscriber1.request(1)
subscriber1.expectNext(2)
// ... verifying what happens on a second materialization
val subscriber2 = TestSubscriber.probe[Int]()
tail.to(Sink.fromSubscriber(subscriber2)).run()
val ex = subscriber2.expectSubscriptionAndError()
ex.getMessage should ===("Substream Source(TailSource) cannot be materialized more than once")
ex.getStackTrace.exists(_.getClassName contains "FlowPrefixAndTailSpec") shouldBe true
subscriber1.requestNext(2).expectComplete()
subscriber1.requestNext(3).expectComplete()
}
"signal error if substream has been not subscribed in time" in assertAllStagesStopped {