Fix bug in Balance stage when some downstreams pull and close immediately and an another try to pull but wait a response indefinitely #25387

This commit is contained in:
Carlier Guillaume 2018-07-24 17:58:42 +02:00
parent d8a47b6700
commit 70aa175d6c
2 changed files with 40 additions and 2 deletions

View file

@ -321,6 +321,44 @@ class GraphBalanceSpec extends StreamSpec {
sub1.cancel()
bsub.expectCancellation()
}
// Bug #25387
"not dequeue from empty outlet buffer" in assertAllStagesStopped {
val p1 = TestPublisher.manualProbe[Int]()
val c1 = TestSubscriber.manualProbe[Int]()
val c2 = TestSubscriber.manualProbe[Int]()
val c3 = TestSubscriber.manualProbe[Int]()
RunnableGraph.fromGraph(GraphDSL.create() { implicit b
val balance = b.add(Balance[Int](3))
Source.fromPublisher(p1.getPublisher) ~> balance.in
balance.out(0) ~> Sink.fromSubscriber(c1)
balance.out(1) ~> Sink.fromSubscriber(c2)
balance.out(2) ~> Sink.fromSubscriber(c3)
ClosedShape
}).run()
val bsub = p1.expectSubscription()
val sub1 = c1.expectSubscription()
val sub2 = c2.expectSubscription()
val sub3 = c3.expectSubscription()
sub1.request(1)
sub1.cancel()
sub2.request(1)
sub2.cancel()
p1.expectRequest(bsub, 16)
bsub.sendNext(1)
sub3.request(1)
c3.expectNext(1)
sub3.cancel()
bsub.expectCancellation()
}
}
}

View file

@ -883,8 +883,8 @@ final class Balance[T](val outputPorts: Int, val waitForAllDownstreams: Boolean,
if (!isClosed(out)) {
push(out, grab(in))
if (!noPending) pull(in)
} else {
// try to find one output that isn't closed
} else if (!noPending) {
// if they are pending outlets, try to find one output that isn't closed
dequeueAndDispatch()
}
}