From 7b09a3e5e9bce015b381e10e13097ee05c2e28cd Mon Sep 17 00:00:00 2001 From: Anil Gursel Date: Thu, 18 Jan 2018 16:51:19 -0600 Subject: [PATCH] groupBy pulls upstream when a substream materialization is waiting #24353 --- .../stream/scaladsl/FlowGroupBySpec.scala | 53 +++++++++++++++++++ .../stream/impl/fusing/StreamOfStreams.scala | 3 +- 2 files changed, 55 insertions(+), 1 deletion(-) diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowGroupBySpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowGroupBySpec.scala index 5646ce1d7c..0e1a377ba5 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowGroupBySpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowGroupBySpec.scala @@ -407,6 +407,59 @@ class FlowGroupBySpec extends StreamSpec { upstream.sendComplete() } + "work if pull is exercised from multiple substreams while downstream is backpressuring (#24353)" in assertAllStagesStopped { + val upstream = TestPublisher.probe[Int]() + val downstreamMaster = TestSubscriber.probe[Source[Int, NotUsed]]() + + Source + .fromPublisher(upstream) + .via(new GroupBy[Int, Int](10, elem ⇒ elem)) + .runWith(Sink.fromSubscriber(downstreamMaster)) + + val substream1 = TestSubscriber.probe[Int]() + downstreamMaster.request(1) + upstream.sendNext(1) + downstreamMaster.expectNext().runWith(Sink.fromSubscriber(substream1)) + + val substream2 = TestSubscriber.probe[Int]() + downstreamMaster.request(1) + upstream.sendNext(2) + downstreamMaster.expectNext().runWith(Sink.fromSubscriber(substream2)) + + substream1.request(1) + substream1.expectNext(1) + substream2.request(1) + substream2.expectNext(2) + + // Both substreams pull + substream1.request(1) + substream2.request(1) + + // Upstream sends new groups + upstream.sendNext(3) + upstream.sendNext(4) + + val substream3 = TestSubscriber.probe[Int]() + val substream4 = TestSubscriber.probe[Int]() + downstreamMaster.request(1) + downstreamMaster.expectNext().runWith(Sink.fromSubscriber(substream3)) + downstreamMaster.request(1) + downstreamMaster.expectNext().runWith(Sink.fromSubscriber(substream4)) + + substream3.request(1) + substream3.expectNext(3) + substream4.request(1) + substream4.expectNext(4) + + // Cleanup, not part of the actual test + substream1.cancel() + substream2.cancel() + substream3.cancel() + substream4.cancel() + downstreamMaster.cancel() + upstream.sendComplete() + } + "work with random demand" in assertAllStagesStopped { val mat = ActorMaterializer(ActorMaterializerSettings(system) .withInputBuffer(initialSize = 1, maxSize = 1)) diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/StreamOfStreams.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/StreamOfStreams.scala index 23e7a5d47e..103ed52cde 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/StreamOfStreams.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/StreamOfStreams.scala @@ -255,7 +255,8 @@ import scala.collection.JavaConverters._ failStage(ex) } - private def needToPull: Boolean = !(hasBeenPulled(in) || isClosed(in) || hasNextElement) + private def needToPull: Boolean = + !(hasBeenPulled(in) || isClosed(in) || hasNextElement || substreamWaitingToBePushed.nonEmpty) override def preStart(): Unit = timeout = ActorMaterializerHelper.downcast(interpreter.materializer).settings.subscriptionTimeoutSettings.timeout