From a81a61ba1fa6fa36603c913a8a0e4b66c0d68f29 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Johan=20Andr=C3=A9n?= Date: Thu, 18 Aug 2016 15:44:27 +0200 Subject: [PATCH] Balancer should not push to a closed out #20943 --- .../stream/scaladsl/GraphBalanceSpec.scala | 33 ++++++++++++++++++- .../scala/akka/stream/scaladsl/Graph.scala | 14 ++++++-- .../scala/akka/stream/stage/GraphStage.scala | 2 +- 3 files changed, 45 insertions(+), 4 deletions(-) diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphBalanceSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphBalanceSpec.scala index 7a05f0a5ce..0bffe24625 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphBalanceSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphBalanceSpec.scala @@ -3,10 +3,11 @@ package akka.stream.scaladsl import scala.concurrent.Await import scala.concurrent.duration._ import scala.concurrent.Future -import akka.stream.{ SourceShape, ClosedShape, ActorMaterializer, ActorMaterializerSettings } +import akka.stream._ import akka.stream.testkit._ import akka.stream.testkit.scaladsl._ import akka.stream.testkit.Utils._ +import akka.util.ByteString class GraphBalanceSpec extends StreamSpec { @@ -256,6 +257,36 @@ class GraphBalanceSpec extends StreamSpec { bsub.expectCancellation() } + // Bug #20943 + "not push output twice" in assertAllStagesStopped { + val p1 = TestPublisher.manualProbe[Int]() + val c1 = TestSubscriber.manualProbe[Int]() + val c2 = TestSubscriber.manualProbe[Int]() + + RunnableGraph.fromGraph(GraphDSL.create() { implicit b ⇒ + val balance = b.add(Balance[Int](2)) + Source.fromPublisher(p1.getPublisher) ~> balance.in + balance.out(0) ~> Sink.fromSubscriber(c1) + balance.out(1) ~> Sink.fromSubscriber(c2) + ClosedShape + }).run() + + val bsub = p1.expectSubscription() + val sub1 = c1.expectSubscription() + val sub2 = c2.expectSubscription() + + sub1.request(1) + p1.expectRequest(bsub, 16) + bsub.sendNext(1) + c1.expectNext(1) + + sub2.request(1) + sub2.cancel() + bsub.sendNext(2) + + sub1.cancel() + bsub.expectCancellation() + } } } diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Graph.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Graph.scala index c3cefbf40e..a57a6ac437 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Graph.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Graph.scala @@ -617,10 +617,20 @@ final class Balance[T](val outputPorts: Int, waitForAllDownstreams: Boolean) ext private var needDownstreamPulls: Int = if (waitForAllDownstreams) outputPorts else 0 private var downstreamsRunning: Int = outputPorts + @tailrec private def dequeueAndDispatch(): Unit = { val out = pendingQueue.dequeue() - push(out, grab(in)) - if (!noPending) pull(in) + // out is null if depleted pendingQueue without reaching + // an out that is not closed, in which case we just return + if (out ne null) { + if (!isClosed(out)) { + push(out, grab(in)) + if (!noPending) pull(in) + } else { + // try to find one output that isn't closed + dequeueAndDispatch() + } + } } setHandler(in, new InHandler { diff --git a/akka-stream/src/main/scala/akka/stream/stage/GraphStage.scala b/akka-stream/src/main/scala/akka/stream/stage/GraphStage.scala index 2617c4d745..def3197b89 100644 --- a/akka-stream/src/main/scala/akka/stream/stage/GraphStage.scala +++ b/akka-stream/src/main/scala/akka/stream/stage/GraphStage.scala @@ -455,8 +455,8 @@ abstract class GraphStageLogic private[stream] (val inCount: Int, val outCount: // Detailed error information should not add overhead to the hot path ReactiveStreamsCompliance.requireNonNullElement(elem) - require(isAvailable(out), s"Cannot push port ($out) twice") require(!isClosed(out), s"Cannot pull closed port ($out)") + require(isAvailable(out), s"Cannot push port ($out) twice") // No error, just InClosed caused the actual pull to be ignored, but the status flag still needs to be flipped connection.portState = portState ^ PushStartFlip