MapAsync wouldn't complete when upstream does in all scenarios #28657

This commit is contained in:
Johan Andrén 2020-03-03 17:17:50 +01:00 committed by GitHub
parent e6ee6ee4e0
commit 3157b0199b
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 84 additions and 9 deletions

View file

@ -295,6 +295,43 @@ class FlowMapAsyncSpec extends StreamSpec {
3.seconds) should ===("happy!")
}
"complete without requiring further demand (parallelism = 1)" in assertAllStagesStopped {
import system.dispatcher
Source
.single(1)
.mapAsync(1)(v => Future { Thread.sleep(20); v })
.runWith(TestSink.probe[Int])
.request(1)
.expectNext(1)
.expectComplete()
}
"complete without requiring further demand with already completed future (parallelism = 1)" in assertAllStagesStopped {
Source
.single(1)
.mapAsync(1)(v => Future.successful(v))
.runWith(TestSink.probe[Int])
.request(1)
.expectNext(1)
.expectComplete()
}
"complete without requiring further demand (parallelism = 2)" in assertAllStagesStopped {
import system.dispatcher
val probe =
Source(1 :: 2 :: Nil).mapAsync(2)(v => Future { Thread.sleep(20); v }).runWith(TestSink.probe[Int])
probe.request(2).expectNextN(2)
probe.expectComplete()
}
"complete without requiring further demand with already completed future (parallelism = 2)" in assertAllStagesStopped {
val probe = Source(1 :: 2 :: Nil).mapAsync(2)(v => Future.successful(v)).runWith(TestSink.probe[Int])
probe.request(2).expectNextN(2)
probe.expectComplete()
}
"finish after future failure" in assertAllStagesStopped {
import system.dispatcher
Await.result(

View file

@ -52,6 +52,41 @@ class FlowMapAsyncUnorderedSpec extends StreamSpec {
c.expectComplete()
}
"complete without requiring further demand (parallelism = 1)" in assertAllStagesStopped {
import system.dispatcher
Source
.single(1)
.mapAsyncUnordered(1)(v => Future { Thread.sleep(20); v })
.runWith(TestSink.probe[Int])
.requestNext(1)
.expectComplete()
}
"complete without requiring further demand with already completed future (parallelism = 1)" in assertAllStagesStopped {
Source
.single(1)
.mapAsyncUnordered(1)(v => Future.successful(v))
.runWith(TestSink.probe[Int])
.requestNext(1)
.expectComplete()
}
"complete without requiring further demand (parallelism = 2)" in assertAllStagesStopped {
import system.dispatcher
val probe =
Source(1 :: 2 :: Nil).mapAsyncUnordered(2)(v => Future { Thread.sleep(20); v }).runWith(TestSink.probe[Int])
probe.request(2).expectNextN(2)
probe.expectComplete()
}
"complete without requiring further demand with already completed future (parallelism = 2)" in assertAllStagesStopped {
val probe = Source(1 :: 2 :: Nil).mapAsyncUnordered(2)(v => Future.successful(v)).runWith(TestSink.probe[Int])
probe.request(2).expectNextN(2)
probe.expectComplete()
}
"not run more futures than requested elements" in {
val probe = TestProbe()
val c = TestSubscriber.manualProbe[Int]()

View file

@ -1315,10 +1315,8 @@ private[stream] object Collect {
@tailrec
private def pushNextIfPossible(): Unit =
if (buffer.isEmpty) {
if (isClosed(in)) completeStage()
else pullIfNeeded()
} else if (buffer.peek().elem eq NotYetThere) pullIfNeeded() // ahead of line blocking to keep order
if (buffer.isEmpty) pullIfNeeded()
else if (buffer.peek().elem eq NotYetThere) pullIfNeeded() // ahead of line blocking to keep order
else if (isAvailable(out)) {
val holder = buffer.dequeue()
holder.elem match {
@ -1343,7 +1341,9 @@ private[stream] object Collect {
}
private def pullIfNeeded(): Unit = {
if (buffer.used < parallelism && !hasBeenPulled(in)) tryPull(in)
if (isClosed(in) && buffer.isEmpty) completeStage()
else if (buffer.used < parallelism && !hasBeenPulled(in)) tryPull(in)
// else already pulled and waiting for next element
}
setHandlers(in, out, this)
@ -1378,19 +1378,21 @@ private[stream] object Collect {
override def preStart(): Unit = buffer = BufferImpl(parallelism, inheritedAttributes)
def futureCompleted(result: Try[Out]): Unit = {
def isCompleted = isClosed(in) && todo == 0
inFlight -= 1
result match {
case Success(elem) if elem != null =>
if (isAvailable(out)) {
if (!hasBeenPulled(in)) tryPull(in)
push(out, elem)
if (isCompleted) completeStage()
} else buffer.enqueue(elem)
case Success(null) =>
if (isClosed(in) && todo == 0) completeStage()
if (isCompleted) completeStage()
else if (!hasBeenPulled(in)) tryPull(in)
case Failure(ex) =>
if (decider(ex) == Supervision.Stop) failStage(ex)
else if (isClosed(in) && todo == 0) completeStage()
else if (isCompleted) completeStage()
else if (!hasBeenPulled(in)) tryPull(in)
}
}
@ -1418,9 +1420,10 @@ private[stream] object Collect {
override def onPull(): Unit = {
if (!buffer.isEmpty) push(out, buffer.dequeue())
else if (isClosed(in) && todo == 0) completeStage()
if (todo < parallelism && !hasBeenPulled(in)) tryPull(in)
val leftTodo = todo
if (isClosed(in) && leftTodo == 0) completeStage()
else if (leftTodo < parallelism && !hasBeenPulled(in)) tryPull(in)
}
setHandlers(in, out, this)