MapAsync and already failed futures #24117
This commit is contained in:
parent
2fd9bb736b
commit
af8a81f45f
2 changed files with 62 additions and 3 deletions
|
|
@ -80,6 +80,23 @@ class FlowMapAsyncSpec extends StreamSpec {
|
||||||
c.expectNoMsg(200.millis)
|
c.expectNoMsg(200.millis)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
"signal future already failed" in assertAllStagesStopped {
|
||||||
|
val latch = TestLatch(1)
|
||||||
|
val c = TestSubscriber.manualProbe[Int]()
|
||||||
|
implicit val ec = system.dispatcher
|
||||||
|
val p = Source(1 to 5).mapAsync(4)(n ⇒
|
||||||
|
if (n == 3) Future.failed[Int](new TE("err1"))
|
||||||
|
else Future {
|
||||||
|
Await.ready(latch, 10.seconds)
|
||||||
|
n
|
||||||
|
}
|
||||||
|
).to(Sink.fromSubscriber(c)).run()
|
||||||
|
val sub = c.expectSubscription()
|
||||||
|
sub.request(10)
|
||||||
|
c.expectError().getMessage should be("err1")
|
||||||
|
latch.countDown()
|
||||||
|
}
|
||||||
|
|
||||||
"signal future failure" in assertAllStagesStopped {
|
"signal future failure" in assertAllStagesStopped {
|
||||||
val latch = TestLatch(1)
|
val latch = TestLatch(1)
|
||||||
val c = TestSubscriber.manualProbe[Int]()
|
val c = TestSubscriber.manualProbe[Int]()
|
||||||
|
|
@ -229,6 +246,22 @@ class FlowMapAsyncSpec extends StreamSpec {
|
||||||
c.expectComplete()
|
c.expectComplete()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
"resume after already failed future" in assertAllStagesStopped {
|
||||||
|
val c = TestSubscriber.manualProbe[Int]()
|
||||||
|
implicit val ec = system.dispatcher
|
||||||
|
val p = Source(1 to 5)
|
||||||
|
.mapAsync(4)(n ⇒
|
||||||
|
if (n == 3) Future.failed(new TE("err3"))
|
||||||
|
else Future.successful(n)
|
||||||
|
)
|
||||||
|
.withAttributes(supervisionStrategy(resumingDecider))
|
||||||
|
.to(Sink.fromSubscriber(c)).run()
|
||||||
|
val sub = c.expectSubscription()
|
||||||
|
sub.request(10)
|
||||||
|
for (n ← List(1, 2, 4, 5)) c.expectNext(n)
|
||||||
|
c.expectComplete()
|
||||||
|
}
|
||||||
|
|
||||||
"resume after multiple failures" in assertAllStagesStopped {
|
"resume after multiple failures" in assertAllStagesStopped {
|
||||||
val futures: List[Future[String]] = List(
|
val futures: List[Future[String]] = List(
|
||||||
Future.failed(Utils.TE("failure1")),
|
Future.failed(Utils.TE("failure1")),
|
||||||
|
|
@ -371,6 +404,25 @@ class FlowMapAsyncSpec extends StreamSpec {
|
||||||
failCount.get() should ===(1)
|
failCount.get() should ===(1)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
"not invoke the decider twice for the same already failed future" in {
|
||||||
|
import system.dispatcher
|
||||||
|
val failCount = new AtomicInteger(0)
|
||||||
|
val result = Source(List(true, false))
|
||||||
|
.mapAsync(1)(elem ⇒
|
||||||
|
if (elem) Future.failed(TE("this has gone too far"))
|
||||||
|
else Future.successful(elem)
|
||||||
|
).addAttributes(supervisionStrategy {
|
||||||
|
case TE("this has gone too far") ⇒
|
||||||
|
failCount.incrementAndGet()
|
||||||
|
Supervision.resume
|
||||||
|
case _ ⇒ Supervision.stop
|
||||||
|
})
|
||||||
|
.runWith(Sink.seq)
|
||||||
|
|
||||||
|
result.futureValue should ===(Seq(false))
|
||||||
|
failCount.get() should ===(1)
|
||||||
|
}
|
||||||
|
|
||||||
"not invoke the decider twice for the same failure to produce a future" in {
|
"not invoke the decider twice for the same failure to produce a future" in {
|
||||||
import system.dispatcher
|
import system.dispatcher
|
||||||
val failCount = new AtomicInteger(0)
|
val failCount = new AtomicInteger(0)
|
||||||
|
|
|
||||||
|
|
@ -1172,7 +1172,7 @@ private[stream] object Collect {
|
||||||
private val futureCB = getAsyncCallback[Holder[Out]](holder ⇒
|
private val futureCB = getAsyncCallback[Holder[Out]](holder ⇒
|
||||||
holder.elem match {
|
holder.elem match {
|
||||||
case Success(_) ⇒ pushNextIfPossible()
|
case Success(_) ⇒ pushNextIfPossible()
|
||||||
case Failure(NonFatal(ex)) ⇒
|
case Failure(ex) ⇒
|
||||||
holder.supervisionDirectiveFor(decider, ex) match {
|
holder.supervisionDirectiveFor(decider, ex) match {
|
||||||
// fail fast as if supervision says so
|
// fail fast as if supervision says so
|
||||||
case Supervision.Stop ⇒ failStage(ex)
|
case Supervision.Stop ⇒ failStage(ex)
|
||||||
|
|
@ -1195,9 +1195,14 @@ private[stream] object Collect {
|
||||||
future.value match {
|
future.value match {
|
||||||
case None ⇒ future.onComplete(holder)(akka.dispatch.ExecutionContexts.sameThreadExecutionContext)
|
case None ⇒ future.onComplete(holder)(akka.dispatch.ExecutionContexts.sameThreadExecutionContext)
|
||||||
case Some(v) ⇒
|
case Some(v) ⇒
|
||||||
// #20217 the future is already here, avoid scheduling it on the dispatcher
|
// #20217 the future is already here, optimization: avoid scheduling it on the dispatcher and
|
||||||
|
// run the logic directly on this thread
|
||||||
holder.setElem(v)
|
holder.setElem(v)
|
||||||
pushNextIfPossible()
|
v match {
|
||||||
|
// this optimization also requires us to stop the stage to fail fast if the decider says so:
|
||||||
|
case Failure(ex) if holder.supervisionDirectiveFor(decider, ex) == Supervision.Stop ⇒ failStage(ex)
|
||||||
|
case _ ⇒ pushNextIfPossible()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
} catch {
|
} catch {
|
||||||
|
|
@ -1225,6 +1230,8 @@ private[stream] object Collect {
|
||||||
|
|
||||||
case Failure(NonFatal(ex)) ⇒
|
case Failure(NonFatal(ex)) ⇒
|
||||||
holder.supervisionDirectiveFor(decider, ex) match {
|
holder.supervisionDirectiveFor(decider, ex) match {
|
||||||
|
// this could happen if we are looping in pushNextIfPossible and end up on a failed future before the
|
||||||
|
// onComplete callback has run
|
||||||
case Supervision.Stop ⇒ failStage(ex)
|
case Supervision.Stop ⇒ failStage(ex)
|
||||||
case _ ⇒
|
case _ ⇒
|
||||||
// try next element
|
// try next element
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue