=str #17407: MapAsync should pull when failed futures free up space

This commit is contained in:
Endre Sándor Varga 2015-05-07 15:05:29 +02:00
parent 9125a719fe
commit 7f6a67a5c0
3 changed files with 38 additions and 6 deletions

View file

@ -171,6 +171,21 @@ class FlowMapAsyncSpec extends AkkaSpec {
c.expectComplete()
}
"resume after multiple failures" in assertAllStagesStopped {
val futures: List[Future[String]] = List(
Future.failed(Utils.TE("failure1")),
Future.failed(Utils.TE("failure2")),
Future.failed(Utils.TE("failure3")),
Future.failed(Utils.TE("failure4")),
Future.failed(Utils.TE("failure5")),
Future.successful("happy!"))
Await.result(
Source(futures)
.mapAsync(2)(identity).withAttributes(supervisionStrategy(resumingDecider))
.runWith(Sink.head), 3.seconds) should ===("happy!")
}
"finish after future failure" in assertAllStagesStopped {
import system.dispatcher
Await.result(Source(1 to 3).mapAsync(1)(n Future {

View file

@ -122,6 +122,21 @@ class FlowMapAsyncUnorderedSpec extends AkkaSpec {
.expectComplete()
}
"resume after multiple failures" in assertAllStagesStopped {
val futures: List[Future[String]] = List(
Future.failed(Utils.TE("failure1")),
Future.failed(Utils.TE("failure2")),
Future.failed(Utils.TE("failure3")),
Future.failed(Utils.TE("failure4")),
Future.failed(Utils.TE("failure5")),
Future.successful("happy!"))
Await.result(
Source(futures)
.mapAsyncUnordered(2)(identity).withAttributes(supervisionStrategy(resumingDecider))
.runWith(Sink.head), 3.seconds) should ===("happy!")
}
"finish after future failure" in assertAllStagesStopped {
import system.dispatcher
Await.result(Source(1 to 3).mapAsyncUnordered(1)(n Future {

View file

@ -396,25 +396,27 @@ private[akka] final case class MapAsync[In, Out](parallelism: Int, f: In ⇒ Fut
}
override def onPull(ctx: AsyncContext[Out, (Int, Try[Out])]) = {
@tailrec def rec(hasFreedUpSpace: Boolean): DownstreamDirective =
@tailrec def rec(): DownstreamDirective =
if (elemsInFlight.isEmpty && ctx.isFinishing) ctx.finish()
else if (elemsInFlight.isEmpty || elemsInFlight.peek == NotYetThere) {
if (hasFreedUpSpace && ctx.isHoldingUpstream) ctx.holdDownstreamAndPull()
if (!elemsInFlight.isFull && ctx.isHoldingUpstream) ctx.holdDownstreamAndPull()
else ctx.holdDownstream()
} else elemsInFlight.dequeue() match {
case Failure(ex) rec(true)
case Failure(ex) rec()
case Success(elem)
if (ctx.isHoldingUpstream) ctx.pushAndPull(elem)
else ctx.push(elem)
}
rec(false)
rec()
}
override def onAsyncInput(input: (Int, Try[Out]), ctx: AsyncContext[Out, Notification]) = {
@tailrec def rec(): Directive =
if (elemsInFlight.isEmpty && ctx.isFinishing) ctx.finish()
else if (elemsInFlight.isEmpty || elemsInFlight.peek == NotYetThere) ctx.ignore()
else elemsInFlight.dequeue() match {
else if (elemsInFlight.isEmpty || (elemsInFlight.peek eq NotYetThere)) {
if (!elemsInFlight.isFull && ctx.isHoldingUpstream) ctx.pull()
else ctx.ignore()
} else elemsInFlight.dequeue() match {
case Failure(ex) rec()
case Success(elem)
if (ctx.isHoldingUpstream) ctx.pushAndPull(elem)