#19267 add .viaAsync combinator

This commit is contained in:
Roland Kuhn 2015-12-22 21:15:57 +01:00
parent 8a5a420108
commit a733096564
6 changed files with 188 additions and 3 deletions

View file

@ -105,7 +105,7 @@ class FusingSpec extends AkkaSpec with ScalaFutures with ConversionCheckedTriple
.sorted should ===(0 to 198 by 2)
}
"use multiple actors when there are asynchronous boundaries in the subflows" in {
"use multiple actors when there are asynchronous boundaries in the subflows (manual)" in {
def ref = {
val bus = GraphInterpreter.currentInterpreter.log.asInstanceOf[BusLogging]
bus.logSource
@ -120,7 +120,26 @@ class FusingSpec extends AkkaSpec with ScalaFutures with ConversionCheckedTriple
.sorted should ===(0 to 9)
val refs = receiveN(20)
withClue(s"refs=\n${refs.mkString("\n")}") {
refs.toSet.size should ===(11)
refs.toSet.size should ===(11) // main flow + 10 subflows
}
}
"use multiple actors when there are asynchronous boundaries in the subflows (combinator)" in {
def ref = {
val bus = GraphInterpreter.currentInterpreter.log.asInstanceOf[BusLogging]
bus.logSource
}
val flow = Flow[Int].map(x { testActor ! ref; x })
Source(0 to 9)
.map(x { testActor ! ref; x })
.flatMapMerge(5, i Source.single(i).viaAsync(flow))
.grouped(1000)
.runWith(Sink.head)
.futureValue
.sorted should ===(0 to 9)
val refs = receiveN(20)
withClue(s"refs=\n${refs.mkString("\n")}") {
refs.toSet.size should ===(11) // main flow + 10 subflows
}
}