Merge pull request #28348 from akka/wip-27189-partition-patriknw

Partition must not pull from closed in, #27189
This commit is contained in:
Patrik Nordwall 2019-12-12 14:11:23 +01:00 committed by GitHub
commit 7d51a8d5f9
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 63 additions and 11 deletions

View file

@ -159,6 +159,60 @@ class GraphPartitionSpec extends StreamSpec("""
p1Sub.expectCancellation()
}
"handle upstream completes and downstream cancel" in assertAllStagesStopped {
val c1 = TestSubscriber.probe[String]()
val c2 = TestSubscriber.probe[String]()
RunnableGraph
.fromGraph(GraphDSL.create() { implicit b =>
val partition = b.add(Partition[String](2, {
case s if s == "a" || s == "b" => 0
case _ => 1
}))
Source(List("a", "b", "c", "d")) ~> partition.in
partition.out(0) ~> Sink.fromSubscriber(c1)
partition.out(1) ~> Sink.fromSubscriber(c2)
ClosedShape
})
.run()
c1.request(10)
c2.request(1)
c1.expectNext("a")
c1.expectNext("b")
c2.expectNext("c")
c2.cancel()
c1.expectComplete()
}
"handle upstream completes and downstream pulls" in assertAllStagesStopped {
val c1 = TestSubscriber.probe[String]()
val c2 = TestSubscriber.probe[String]()
RunnableGraph
.fromGraph(GraphDSL.create() { implicit b =>
val partition = b.add(Partition[String](2, {
case s if s == "a" || s == "b" => 0
case _ => 1
}))
Source(List("a", "b", "c")) ~> partition.in
partition.out(0) ~> Sink.fromSubscriber(c1)
partition.out(1) ~> Sink.fromSubscriber(c2)
ClosedShape
})
.run()
c1.request(10)
// no demand from c2 yet
c1.expectNext("a")
c1.expectNext("b")
c2.request(1)
c2.expectNext("c")
c1.expectComplete()
c2.expectComplete()
}
"work with merge" in assertAllStagesStopped {
val s = Sink.seq[Int]
val input = Set(5, 2, 9, 1, 1, 1, 10)

View file

@ -858,12 +858,10 @@ final class Partition[T](val outputPorts: Int, val partitioner: T => Int, val ea
if (idx == outPendingIdx) {
push(o, elem)
outPendingElem = null
if (!isClosed(in)) {
if (!hasBeenPulled(in)) {
pull(in)
}
} else
if (isClosed(in))
completeStage()
else if (!hasBeenPulled(in))
pull(in)
}
} else if (!hasBeenPulled(in))
pull(in)
@ -875,12 +873,12 @@ final class Partition[T](val outputPorts: Int, val partitioner: T => Int, val ea
downstreamRunning -= 1
if (downstreamRunning == 0)
cancelStage(cause)
else if (outPendingElem != null) {
if (idx == outPendingIdx) {
outPendingElem = null
if (!hasBeenPulled(in))
pull(in)
}
else if (outPendingElem != null && idx == outPendingIdx) {
outPendingElem = null
if (isClosed(in))
cancelStage(cause)
else if (!hasBeenPulled(in))
pull(in)
}
}
})