Merge pull request #30321 from johanandren/wip-28689-lazysourcespec-fail

fix: Race condition in LazySourceSpec
This commit is contained in:
Patrik Nordwall 2021-08-18 12:36:09 +02:00 committed by GitHub
commit 7058c9b522
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23

View file

@ -75,13 +75,16 @@ class LazySourceSpec extends StreamSpec with DefaultTimeout with ScalaFutures {
"never construct the source when there was no demand" in assertAllStagesStopped { "never construct the source when there was no demand" in assertAllStagesStopped {
val constructed = new AtomicBoolean(false) val constructed = new AtomicBoolean(false)
Source val termination = Source
.lazySingle { () => .lazyFuture { () =>
constructed.set(true) constructed.set(true)
1 Future.successful(1)
} }
.runWith(Sink.cancelled) .watchTermination()(Keep.right)
.toMat(Sink.cancelled)(Keep.left)
.run()
termination.futureValue // stream should terminate
constructed.get() should ===(false) constructed.get() should ===(false)
} }
@ -124,14 +127,17 @@ class LazySourceSpec extends StreamSpec with DefaultTimeout with ScalaFutures {
"never construct the source when there was no demand" in assertAllStagesStopped { "never construct the source when there was no demand" in assertAllStagesStopped {
val constructed = new AtomicBoolean(false) val constructed = new AtomicBoolean(false)
Source val (lazySourceMatVal, termination) = Source
.lazySource { () => .lazySource { () =>
constructed.set(true); Source(List(1, 2, 3)) constructed.set(true); Source(List(1, 2, 3))
} }
.watchTermination()(Keep.both)
.toMat(Sink.cancelled)(Keep.left) .toMat(Sink.cancelled)(Keep.left)
.run() .run()
termination.futureValue // stream should terminate
constructed.get() should ===(false) constructed.get() should ===(false)
lazySourceMatVal.failed.futureValue shouldBe a[NeverMaterializedException]
} }
"fail the materialized value when downstream cancels without ever consuming any element" in assertAllStagesStopped { "fail the materialized value when downstream cancels without ever consuming any element" in assertAllStagesStopped {
@ -270,17 +276,20 @@ class LazySourceSpec extends StreamSpec with DefaultTimeout with ScalaFutures {
"never construct the source when there was no demand" in assertAllStagesStopped { "never construct the source when there was no demand" in assertAllStagesStopped {
val constructed = new AtomicBoolean(false) val constructed = new AtomicBoolean(false)
Source val (lazyFutureSourceMatval, termination) = Source
.lazyFutureSource { () => .lazyFutureSource { () =>
Future { Future {
constructed.set(true) constructed.set(true)
Source(List(1, 2, 3)) Source(List(1, 2, 3))
}; };
} }
.watchTermination()(Keep.both)
.toMat(Sink.cancelled)(Keep.left) .toMat(Sink.cancelled)(Keep.left)
.run() .run()
termination.futureValue // stream should terminate
constructed.get() should ===(false) constructed.get() should ===(false)
lazyFutureSourceMatval.failed.futureValue shouldBe a[NeverMaterializedException]
} }
"fail the materialized value when downstream cancels without ever consuming any element" in assertAllStagesStopped { "fail the materialized value when downstream cancels without ever consuming any element" in assertAllStagesStopped {