From 0008f1f400bafcecfdddd942ddeeaadf9452144a Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Tue, 10 Dec 2019 17:33:11 +0100 Subject: [PATCH] Partition must not pull from closed in, #27189 --- .../stream/scaladsl/GraphPartitionSpec.scala | 54 +++++++++++++++++++ .../scala/akka/stream/scaladsl/Graph.scala | 20 ++++--- 2 files changed, 63 insertions(+), 11 deletions(-) diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphPartitionSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphPartitionSpec.scala index 6dbfb3afa4..3695838097 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphPartitionSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphPartitionSpec.scala @@ -159,6 +159,60 @@ class GraphPartitionSpec extends StreamSpec(""" p1Sub.expectCancellation() } + "handle upstream completes and downstream cancel" in assertAllStagesStopped { + val c1 = TestSubscriber.probe[String]() + val c2 = TestSubscriber.probe[String]() + + RunnableGraph + .fromGraph(GraphDSL.create() { implicit b => + val partition = b.add(Partition[String](2, { + case s if s == "a" || s == "b" => 0 + case _ => 1 + })) + Source(List("a", "b", "c", "d")) ~> partition.in + partition.out(0) ~> Sink.fromSubscriber(c1) + partition.out(1) ~> Sink.fromSubscriber(c2) + ClosedShape + }) + .run() + + c1.request(10) + c2.request(1) + c1.expectNext("a") + c1.expectNext("b") + c2.expectNext("c") + c2.cancel() + c1.expectComplete() + } + + "handle upstream completes and downstream pulls" in assertAllStagesStopped { + val c1 = TestSubscriber.probe[String]() + val c2 = TestSubscriber.probe[String]() + + RunnableGraph + .fromGraph(GraphDSL.create() { implicit b => + val partition = b.add(Partition[String](2, { + case s if s == "a" || s == "b" => 0 + case _ => 1 + })) + Source(List("a", "b", "c")) ~> partition.in + partition.out(0) ~> Sink.fromSubscriber(c1) + partition.out(1) ~> Sink.fromSubscriber(c2) + ClosedShape + }) + .run() + + c1.request(10) + // no demand from c2 yet + c1.expectNext("a") + c1.expectNext("b") + c2.request(1) + c2.expectNext("c") + + c1.expectComplete() + c2.expectComplete() + } + "work with merge" in assertAllStagesStopped { val s = Sink.seq[Int] val input = Set(5, 2, 9, 1, 1, 1, 10) diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Graph.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Graph.scala index ee4b175494..0cc5a47e43 100755 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Graph.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Graph.scala @@ -858,12 +858,10 @@ final class Partition[T](val outputPorts: Int, val partitioner: T => Int, val ea if (idx == outPendingIdx) { push(o, elem) outPendingElem = null - if (!isClosed(in)) { - if (!hasBeenPulled(in)) { - pull(in) - } - } else + if (isClosed(in)) completeStage() + else if (!hasBeenPulled(in)) + pull(in) } } else if (!hasBeenPulled(in)) pull(in) @@ -875,12 +873,12 @@ final class Partition[T](val outputPorts: Int, val partitioner: T => Int, val ea downstreamRunning -= 1 if (downstreamRunning == 0) cancelStage(cause) - else if (outPendingElem != null) { - if (idx == outPendingIdx) { - outPendingElem = null - if (!hasBeenPulled(in)) - pull(in) - } + else if (outPendingElem != null && idx == outPendingIdx) { + outPendingElem = null + if (isClosed(in)) + cancelStage(cause) + else if (!hasBeenPulled(in)) + pull(in) } } })