Eager close tests for InterleaveSpec made deterministic #23724
This commit is contained in:
parent
96ffced4dc
commit
5dc376bdd7
1 changed files with 80 additions and 5 deletions
|
|
@ -44,22 +44,97 @@ class FlowInterleaveSpec extends BaseTwoStreamsSetup {
|
|||
probe.expectComplete()
|
||||
}
|
||||
|
||||
"complete upstream if any of its upstream completes with eagerClose = true" in assertAllStagesStopped {
|
||||
"eagerClose = true, first stream closed" in assertAllStagesStopped {
|
||||
val probe = TestSubscriber.manualProbe[Int]()
|
||||
|
||||
Source(0 to 2).interleave(Source(3 to 5), 2, eagerClose = true).runWith(Sink.fromSubscriber(probe))
|
||||
val source1 = TestPublisher.probe[Int]()
|
||||
val source2 = TestPublisher.probe[Int]()
|
||||
|
||||
Source.fromPublisher(source1).interleave(Source.fromPublisher(source2), 2, eagerClose = true).runWith(Sink.fromSubscriber(probe))
|
||||
probe.expectSubscription().request(10)
|
||||
probe.expectNext(0, 1, 3, 4, 2)
|
||||
|
||||
// just to make it extra clear that it eagerly pulls all inputs
|
||||
source1.expectRequest()
|
||||
source2.expectRequest()
|
||||
|
||||
source1.sendNext(0)
|
||||
source2.sendNext(10)
|
||||
|
||||
source1.expectRequest()
|
||||
source1.sendNext(1)
|
||||
|
||||
source2.expectRequest()
|
||||
source2.sendNext(11)
|
||||
|
||||
source1.expectRequest()
|
||||
source1.sendNext(2)
|
||||
source1.sendComplete()
|
||||
|
||||
probe.expectNext(0, 1, 10, 11, 2)
|
||||
probe.expectComplete()
|
||||
source2.expectCancellation()
|
||||
}
|
||||
|
||||
"eagerClose = true, non-current-upstream closed" in assertAllStagesStopped {
|
||||
val probe = TestSubscriber.manualProbe[Int]()
|
||||
|
||||
val source1 = TestPublisher.probe[Int]()
|
||||
val source2 = TestPublisher.probe[Int]()
|
||||
|
||||
Source.fromPublisher(source1).interleave(Source.fromPublisher(source2), 2, eagerClose = true).runWith(Sink.fromSubscriber(probe))
|
||||
probe.expectSubscription().request(10)
|
||||
|
||||
// just to make it extra clear that it eagerly pulls all inputs
|
||||
source1.expectRequest()
|
||||
source2.expectRequest()
|
||||
|
||||
source1.sendNext(0)
|
||||
source2.sendNext(10)
|
||||
|
||||
source1.expectRequest()
|
||||
source1.sendNext(1)
|
||||
|
||||
source2.expectRequest()
|
||||
// don't emit but cancel the other source
|
||||
source1.sendComplete()
|
||||
|
||||
probe.expectNext(0, 1, 10)
|
||||
probe.expectComplete()
|
||||
source2.expectCancellation()
|
||||
}
|
||||
|
||||
"eagerClose = true, other stream closed" in assertAllStagesStopped {
|
||||
val probe = TestSubscriber.manualProbe[Int]()
|
||||
|
||||
Source(0 to 2).interleave(Source(3 to 4), 2, eagerClose = true).runWith(Sink.fromSubscriber(probe))
|
||||
val source1 = TestPublisher.probe[Int]()
|
||||
val source2 = TestPublisher.probe[Int]()
|
||||
|
||||
Source.fromPublisher(source1)
|
||||
.interleave(
|
||||
Source.fromPublisher(source2),
|
||||
2,
|
||||
eagerClose = true
|
||||
).runWith(Sink.fromSubscriber(probe))
|
||||
|
||||
probe.expectSubscription().request(10)
|
||||
probe.expectNext(0, 1, 3, 4)
|
||||
|
||||
// just to make it extra clear that it eagerly pulls all inputs
|
||||
source1.expectRequest()
|
||||
source2.expectRequest()
|
||||
|
||||
source1.sendNext(0)
|
||||
source2.sendNext(10)
|
||||
|
||||
source1.expectRequest()
|
||||
source1.sendNext(1)
|
||||
|
||||
source2.expectRequest()
|
||||
source2.sendNext(11)
|
||||
source2.sendComplete()
|
||||
|
||||
probe.expectNext(0, 1, 10, 11)
|
||||
probe.expectComplete()
|
||||
source1.expectCancellation()
|
||||
}
|
||||
|
||||
"work with segmentSize = 1" in assertAllStagesStopped {
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue