=str #17191 mapAsyncUnordered shall terminate after resume

This commit is contained in:
Roland Kuhn 2015-04-14 12:39:24 +02:00
parent 386ff80a0e
commit 7681c557f7
3 changed files with 22 additions and 1 deletions

View file

@ -171,6 +171,16 @@ class FlowMapAsyncSpec extends AkkaSpec {
c.expectComplete()
}
"finish after future failure" in {
import system.dispatcher
Await.result(Source(1 to 3).mapAsync(1, n Future {
if (n == 3) throw new RuntimeException("err3b") with NoStackTrace
else n
}).withAttributes(supervisionStrategy(resumingDecider))
.grouped(10)
.runWith(Sink.head), 1.second) should be(Seq(1, 2))
}
"resume when mapAsync throws" in {
val c = StreamTestKit.SubscriberProbe[Int]()
implicit val ec = system.dispatcher

View file

@ -122,7 +122,17 @@ class FlowMapAsyncUnorderedSpec extends AkkaSpec {
val sub = c.expectSubscription()
sub.request(10)
val expected = (OnComplete :: List(1, 2, 4, 5).map(OnNext.apply)).toSet
c.probe.receiveN(5).toSet should be(expected)
c.probe.receiveWhile(2.seconds, messages = 5) { case x x }.toSet should be(expected)
}
"finish after future failure" in {
import system.dispatcher
Await.result(Source(1 to 3).mapAsyncUnordered(1, n Future {
if (n == 3) throw new RuntimeException("err3b") with NoStackTrace
else n
}).withAttributes(supervisionStrategy(resumingDecider))
.grouped(10)
.runWith(Sink.head), 1.second) should be(Seq(1, 2))
}
"resume when mapAsyncUnordered throws" in {

View file

@ -468,6 +468,7 @@ private[akka] final case class MapAsyncUnordered[In, Out](parallelism: Int, f: I
def ignoreOrFail(ex: Throwable) =
if (decider(ex) == Supervision.Stop) ctx.fail(ex)
else if (ctx.isHoldingUpstream) ctx.pull()
else if (ctx.isFinishing && todo == 0) ctx.finish()
else ctx.ignore()
inFlight -= 1