fix: Race condition in LazySourceSpec #28689

This commit is contained in:
Johan Andrén 2021-06-14 10:01:50 +02:00 committed by Arnout Engelen
parent 51924f5026
commit fda6ce3c50
No known key found for this signature in database
GPG key ID: 061107B0F74A6DAA

View file

@ -75,13 +75,16 @@ class LazySourceSpec extends StreamSpec with DefaultTimeout with ScalaFutures {
"never construct the source when there was no demand" in assertAllStagesStopped {
val constructed = new AtomicBoolean(false)
Source
.lazySingle { () =>
val termination = Source
.lazyFuture { () =>
constructed.set(true)
1
Future.successful(1)
}
.runWith(Sink.cancelled)
.watchTermination()(Keep.right)
.toMat(Sink.cancelled)(Keep.left)
.run()
termination.futureValue // stream should terminate
constructed.get() should ===(false)
}
@ -124,14 +127,17 @@ class LazySourceSpec extends StreamSpec with DefaultTimeout with ScalaFutures {
"never construct the source when there was no demand" in assertAllStagesStopped {
val constructed = new AtomicBoolean(false)
Source
val (lazySourceMatVal, termination) = Source
.lazySource { () =>
constructed.set(true); Source(List(1, 2, 3))
}
.watchTermination()(Keep.both)
.toMat(Sink.cancelled)(Keep.left)
.run()
termination.futureValue // stream should terminate
constructed.get() should ===(false)
lazySourceMatVal.failed.futureValue shouldBe a[NeverMaterializedException]
}
"fail the materialized value when downstream cancels without ever consuming any element" in assertAllStagesStopped {
@ -270,17 +276,20 @@ class LazySourceSpec extends StreamSpec with DefaultTimeout with ScalaFutures {
"never construct the source when there was no demand" in assertAllStagesStopped {
val constructed = new AtomicBoolean(false)
Source
val (lazyFutureSourceMatval, termination) = Source
.lazyFutureSource { () =>
Future {
constructed.set(true)
Source(List(1, 2, 3))
};
}
.watchTermination()(Keep.both)
.toMat(Sink.cancelled)(Keep.left)
.run()
termination.futureValue // stream should terminate
constructed.get() should ===(false)
lazyFutureSourceMatval.failed.futureValue shouldBe a[NeverMaterializedException]
}
"fail the materialized value when downstream cancels without ever consuming any element" in assertAllStagesStopped {