Merge pull request #17410 from drewhk/wip-17407-mapAsync-resume-bug-drewhk

=str #17407: MapAsync should pull when failed futures free up space
This commit is contained in:
drewhk 2015-05-07 18:12:57 +02:00
commit 27e37cd2fc
3 changed files with 38 additions and 6 deletions

View file

@ -171,6 +171,21 @@ class FlowMapAsyncSpec extends AkkaSpec {
c.expectComplete()
}
"resume after multiple failures" in assertAllStagesStopped {
val futures: List[Future[String]] = List(
Future.failed(Utils.TE("failure1")),
Future.failed(Utils.TE("failure2")),
Future.failed(Utils.TE("failure3")),
Future.failed(Utils.TE("failure4")),
Future.failed(Utils.TE("failure5")),
Future.successful("happy!"))
Await.result(
Source(futures)
.mapAsync(2)(identity).withAttributes(supervisionStrategy(resumingDecider))
.runWith(Sink.head), 3.seconds) should ===("happy!")
}
"finish after future failure" in assertAllStagesStopped {
import system.dispatcher
Await.result(Source(1 to 3).mapAsync(1)(n Future {

View file

@ -122,6 +122,21 @@ class FlowMapAsyncUnorderedSpec extends AkkaSpec {
.expectComplete()
}
"resume after multiple failures" in assertAllStagesStopped {
val futures: List[Future[String]] = List(
Future.failed(Utils.TE("failure1")),
Future.failed(Utils.TE("failure2")),
Future.failed(Utils.TE("failure3")),
Future.failed(Utils.TE("failure4")),
Future.failed(Utils.TE("failure5")),
Future.successful("happy!"))
Await.result(
Source(futures)
.mapAsyncUnordered(2)(identity).withAttributes(supervisionStrategy(resumingDecider))
.runWith(Sink.head), 3.seconds) should ===("happy!")
}
"finish after future failure" in assertAllStagesStopped {
import system.dispatcher
Await.result(Source(1 to 3).mapAsyncUnordered(1)(n Future {