groupBy pulls upstream when a substream materialization is waiting #24353

This commit is contained in:
Anil Gursel 2018-01-18 16:51:19 -06:00
parent e81f350b2f
commit 7b09a3e5e9
2 changed files with 55 additions and 1 deletions

View file

@ -407,6 +407,59 @@ class FlowGroupBySpec extends StreamSpec {
upstream.sendComplete()
}
"work if pull is exercised from multiple substreams while downstream is backpressuring (#24353)" in assertAllStagesStopped {
val upstream = TestPublisher.probe[Int]()
val downstreamMaster = TestSubscriber.probe[Source[Int, NotUsed]]()
Source
.fromPublisher(upstream)
.via(new GroupBy[Int, Int](10, elem elem))
.runWith(Sink.fromSubscriber(downstreamMaster))
val substream1 = TestSubscriber.probe[Int]()
downstreamMaster.request(1)
upstream.sendNext(1)
downstreamMaster.expectNext().runWith(Sink.fromSubscriber(substream1))
val substream2 = TestSubscriber.probe[Int]()
downstreamMaster.request(1)
upstream.sendNext(2)
downstreamMaster.expectNext().runWith(Sink.fromSubscriber(substream2))
substream1.request(1)
substream1.expectNext(1)
substream2.request(1)
substream2.expectNext(2)
// Both substreams pull
substream1.request(1)
substream2.request(1)
// Upstream sends new groups
upstream.sendNext(3)
upstream.sendNext(4)
val substream3 = TestSubscriber.probe[Int]()
val substream4 = TestSubscriber.probe[Int]()
downstreamMaster.request(1)
downstreamMaster.expectNext().runWith(Sink.fromSubscriber(substream3))
downstreamMaster.request(1)
downstreamMaster.expectNext().runWith(Sink.fromSubscriber(substream4))
substream3.request(1)
substream3.expectNext(3)
substream4.request(1)
substream4.expectNext(4)
// Cleanup, not part of the actual test
substream1.cancel()
substream2.cancel()
substream3.cancel()
substream4.cancel()
downstreamMaster.cancel()
upstream.sendComplete()
}
"work with random demand" in assertAllStagesStopped {
val mat = ActorMaterializer(ActorMaterializerSettings(system)
.withInputBuffer(initialSize = 1, maxSize = 1))

View file

@ -255,7 +255,8 @@ import scala.collection.JavaConverters._
failStage(ex)
}
private def needToPull: Boolean = !(hasBeenPulled(in) || isClosed(in) || hasNextElement)
private def needToPull: Boolean =
!(hasBeenPulled(in) || isClosed(in) || hasNextElement || substreamWaitingToBePushed.nonEmpty)
override def preStart(): Unit =
timeout = ActorMaterializerHelper.downcast(interpreter.materializer).settings.subscriptionTimeoutSettings.timeout