Merge pull request #30493 from raboof/fix-LazySourceSpec-construction-instability

LazySourceSpec: improve 'no demand' test stability
This commit is contained in:
Patrik Nordwall 2021-08-12 09:55:48 +02:00 committed by GitHub
commit 16384e2b7d
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23

View file

@ -37,16 +37,14 @@ class LazySourceSpec extends StreamSpec with DefaultTimeout with ScalaFutures {
}
"never construct the source when there was no demand" in assertAllStagesStopped {
val probe = TestSubscriber.probe[Int]()
val constructed = new AtomicBoolean(false)
Source
.lazySingle { () =>
constructed.set(true)
1
}
.toMat(Sink.fromSubscriber(probe))(Keep.left)
.toMat(Sink.cancelled)(Keep.left)
.run()
probe.cancel()
constructed.get() should ===(false)
}
@ -76,15 +74,13 @@ class LazySourceSpec extends StreamSpec with DefaultTimeout with ScalaFutures {
}
"never construct the source when there was no demand" in assertAllStagesStopped {
val probe = TestSubscriber.probe[Int]()
val constructed = new AtomicBoolean(false)
Source
.lazySingle { () =>
constructed.set(true)
1
}
.runWith(Sink.fromSubscriber(probe))
probe.cancel()
.runWith(Sink.cancelled)
constructed.get() should ===(false)
}
@ -127,15 +123,13 @@ class LazySourceSpec extends StreamSpec with DefaultTimeout with ScalaFutures {
}
"never construct the source when there was no demand" in assertAllStagesStopped {
val probe = TestSubscriber.probe[Int]()
val constructed = new AtomicBoolean(false)
val result = Source
.lazySource { () =>
constructed.set(true); Source(List(1, 2, 3))
}
.toMat(Sink.fromSubscriber(probe))(Keep.left)
.toMat(Sink.cancelled)(Keep.left)
.run()
probe.cancel()
constructed.get() should ===(false)
result.isCompleted should ===(false)
@ -275,7 +269,6 @@ class LazySourceSpec extends StreamSpec with DefaultTimeout with ScalaFutures {
}
"never construct the source when there was no demand" in assertAllStagesStopped {
val probe = TestSubscriber.probe[Int]()
val constructed = new AtomicBoolean(false)
val result = Source
.lazyFutureSource { () =>
@ -284,9 +277,8 @@ class LazySourceSpec extends StreamSpec with DefaultTimeout with ScalaFutures {
Source(List(1, 2, 3))
};
}
.toMat(Sink.fromSubscriber(probe))(Keep.left)
.toMat(Sink.cancelled)(Keep.left)
.run()
probe.cancel()
constructed.get() should ===(false)
result.isCompleted should ===(false)