diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowGroupBySpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowGroupBySpec.scala index 5646ce1d7c..0e1a377ba5 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowGroupBySpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowGroupBySpec.scala @@ -407,6 +407,59 @@ class FlowGroupBySpec extends StreamSpec { upstream.sendComplete() } + "work if pull is exercised from multiple substreams while downstream is backpressuring (#24353)" in assertAllStagesStopped { + val upstream = TestPublisher.probe[Int]() + val downstreamMaster = TestSubscriber.probe[Source[Int, NotUsed]]() + + Source + .fromPublisher(upstream) + .via(new GroupBy[Int, Int](10, elem ⇒ elem)) + .runWith(Sink.fromSubscriber(downstreamMaster)) + + val substream1 = TestSubscriber.probe[Int]() + downstreamMaster.request(1) + upstream.sendNext(1) + downstreamMaster.expectNext().runWith(Sink.fromSubscriber(substream1)) + + val substream2 = TestSubscriber.probe[Int]() + downstreamMaster.request(1) + upstream.sendNext(2) + downstreamMaster.expectNext().runWith(Sink.fromSubscriber(substream2)) + + substream1.request(1) + substream1.expectNext(1) + substream2.request(1) + substream2.expectNext(2) + + // Both substreams pull + substream1.request(1) + substream2.request(1) + + // Upstream sends new groups + upstream.sendNext(3) + upstream.sendNext(4) + + val substream3 = TestSubscriber.probe[Int]() + val substream4 = TestSubscriber.probe[Int]() + downstreamMaster.request(1) + downstreamMaster.expectNext().runWith(Sink.fromSubscriber(substream3)) + downstreamMaster.request(1) + downstreamMaster.expectNext().runWith(Sink.fromSubscriber(substream4)) + + substream3.request(1) + substream3.expectNext(3) + substream4.request(1) + substream4.expectNext(4) + + // Cleanup, not part of the actual test + substream1.cancel() + substream2.cancel() + substream3.cancel() + substream4.cancel() + downstreamMaster.cancel() + upstream.sendComplete() + } + "work with random demand" in assertAllStagesStopped { val mat = ActorMaterializer(ActorMaterializerSettings(system) .withInputBuffer(initialSize = 1, maxSize = 1)) diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/StreamOfStreams.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/StreamOfStreams.scala index 23e7a5d47e..103ed52cde 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/StreamOfStreams.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/StreamOfStreams.scala @@ -255,7 +255,8 @@ import scala.collection.JavaConverters._ failStage(ex) } - private def needToPull: Boolean = !(hasBeenPulled(in) || isClosed(in) || hasNextElement) + private def needToPull: Boolean = + !(hasBeenPulled(in) || isClosed(in) || hasNextElement || substreamWaitingToBePushed.nonEmpty) override def preStart(): Unit = timeout = ActorMaterializerHelper.downcast(interpreter.materializer).settings.subscriptionTimeoutSettings.timeout