diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowInterleaveSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowInterleaveSpec.scala index 45bdd6faf3..10a1d79cc5 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowInterleaveSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowInterleaveSpec.scala @@ -44,22 +44,97 @@ class FlowInterleaveSpec extends BaseTwoStreamsSetup { probe.expectComplete() } - "complete upstream if any of its upstream completes with eagerClose = true" in assertAllStagesStopped { + "eagerClose = true, first stream closed" in assertAllStagesStopped { val probe = TestSubscriber.manualProbe[Int]() - Source(0 to 2).interleave(Source(3 to 5), 2, eagerClose = true).runWith(Sink.fromSubscriber(probe)) + val source1 = TestPublisher.probe[Int]() + val source2 = TestPublisher.probe[Int]() + + Source.fromPublisher(source1).interleave(Source.fromPublisher(source2), 2, eagerClose = true).runWith(Sink.fromSubscriber(probe)) probe.expectSubscription().request(10) - probe.expectNext(0, 1, 3, 4, 2) + + // just to make it extra clear that it eagerly pulls all inputs + source1.expectRequest() + source2.expectRequest() + + source1.sendNext(0) + source2.sendNext(10) + + source1.expectRequest() + source1.sendNext(1) + + source2.expectRequest() + source2.sendNext(11) + + source1.expectRequest() + source1.sendNext(2) + source1.sendComplete() + + probe.expectNext(0, 1, 10, 11, 2) probe.expectComplete() + source2.expectCancellation() + } + + "eagerClose = true, non-current-upstream closed" in assertAllStagesStopped { + val probe = TestSubscriber.manualProbe[Int]() + + val source1 = TestPublisher.probe[Int]() + val source2 = TestPublisher.probe[Int]() + + Source.fromPublisher(source1).interleave(Source.fromPublisher(source2), 2, eagerClose = true).runWith(Sink.fromSubscriber(probe)) + probe.expectSubscription().request(10) + + // just to make it extra clear that it eagerly pulls all inputs + source1.expectRequest() + source2.expectRequest() + + source1.sendNext(0) + source2.sendNext(10) + + source1.expectRequest() + source1.sendNext(1) + + source2.expectRequest() + // don't emit but cancel the other source + source1.sendComplete() + + probe.expectNext(0, 1, 10) + probe.expectComplete() + source2.expectCancellation() } "eagerClose = true, other stream closed" in assertAllStagesStopped { val probe = TestSubscriber.manualProbe[Int]() - Source(0 to 2).interleave(Source(3 to 4), 2, eagerClose = true).runWith(Sink.fromSubscriber(probe)) + val source1 = TestPublisher.probe[Int]() + val source2 = TestPublisher.probe[Int]() + + Source.fromPublisher(source1) + .interleave( + Source.fromPublisher(source2), + 2, + eagerClose = true + ).runWith(Sink.fromSubscriber(probe)) + probe.expectSubscription().request(10) - probe.expectNext(0, 1, 3, 4) + + // just to make it extra clear that it eagerly pulls all inputs + source1.expectRequest() + source2.expectRequest() + + source1.sendNext(0) + source2.sendNext(10) + + source1.expectRequest() + source1.sendNext(1) + + source2.expectRequest() + source2.sendNext(11) + source2.sendComplete() + + probe.expectNext(0, 1, 10, 11) probe.expectComplete() + source1.expectCancellation() } "work with segmentSize = 1" in assertAllStagesStopped {