diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphBalanceSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphBalanceSpec.scala index 1d1d9c9538..bcaedde8e0 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphBalanceSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphBalanceSpec.scala @@ -321,6 +321,44 @@ class GraphBalanceSpec extends StreamSpec { sub1.cancel() bsub.expectCancellation() } + + // Bug #25387 + "not dequeue from empty outlet buffer" in assertAllStagesStopped { + val p1 = TestPublisher.manualProbe[Int]() + val c1 = TestSubscriber.manualProbe[Int]() + val c2 = TestSubscriber.manualProbe[Int]() + val c3 = TestSubscriber.manualProbe[Int]() + + RunnableGraph.fromGraph(GraphDSL.create() { implicit b ⇒ + val balance = b.add(Balance[Int](3)) + Source.fromPublisher(p1.getPublisher) ~> balance.in + balance.out(0) ~> Sink.fromSubscriber(c1) + balance.out(1) ~> Sink.fromSubscriber(c2) + balance.out(2) ~> Sink.fromSubscriber(c3) + + ClosedShape + }).run() + + val bsub = p1.expectSubscription() + val sub1 = c1.expectSubscription() + val sub2 = c2.expectSubscription() + val sub3 = c3.expectSubscription() + + sub1.request(1) + sub1.cancel() + sub2.request(1) + sub2.cancel() + + p1.expectRequest(bsub, 16) + bsub.sendNext(1) + + sub3.request(1) + c3.expectNext(1) + + sub3.cancel() + + bsub.expectCancellation() + } } } diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Graph.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Graph.scala index 293321f35d..6ad631b6ea 100755 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Graph.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Graph.scala @@ -883,8 +883,8 @@ final class Balance[T](val outputPorts: Int, val waitForAllDownstreams: Boolean, if (!isClosed(out)) { push(out, grab(in)) if (!noPending) pull(in) - } else { - // try to find one output that isn't closed + } else if (!noPending) { + // if they are pending outlets, try to find one output that isn't closed dequeueAndDispatch() } }