2015-12-14 17:02:00 +01:00
|
|
|
/**
|
2018-01-04 17:26:29 +00:00
|
|
|
* Copyright (C) 2015-2018 Lightbend Inc. <https://www.lightbend.com>
|
2015-12-14 17:02:00 +01:00
|
|
|
*/
|
2018-03-13 23:45:55 +09:00
|
|
|
|
2015-12-14 17:02:00 +01:00
|
|
|
package akka.stream
|
|
|
|
|
|
2018-05-03 13:24:51 +02:00
|
|
|
import akka.stream.impl.fusing.GraphInterpreter
|
2015-12-14 17:02:00 +01:00
|
|
|
import akka.stream.scaladsl._
|
2016-07-28 16:43:08 +08:00
|
|
|
import akka.stream.testkit.StreamSpec
|
2015-12-14 17:02:00 +01:00
|
|
|
|
2016-07-28 16:43:08 +08:00
|
|
|
class FusingSpec extends StreamSpec {
|
2015-12-14 17:02:00 +01:00
|
|
|
|
|
|
|
|
implicit val materializer = ActorMaterializer()
|
|
|
|
|
|
2018-05-03 13:24:51 +02:00
|
|
|
def actorRunningStage = {
|
|
|
|
|
GraphInterpreter.currentInterpreter.context
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
val snitchFlow = Flow[Int].map(x ⇒ { testActor ! actorRunningStage; x }).async
|
2015-12-15 16:44:48 +01:00
|
|
|
|
2015-12-17 13:35:37 +01:00
|
|
|
"SubFusingActorMaterializer" must {
|
|
|
|
|
|
|
|
|
|
"work with asynchronous boundaries in the subflows" in {
|
2016-02-10 13:56:38 +01:00
|
|
|
val async = Flow[Int].map(_ * 2).async
|
2015-12-17 13:35:37 +01:00
|
|
|
Source(0 to 9)
|
|
|
|
|
.map(_ * 10)
|
|
|
|
|
.flatMapMerge(5, i ⇒ Source(i to (i + 9)).via(async))
|
|
|
|
|
.grouped(1000)
|
|
|
|
|
.runWith(Sink.head)
|
|
|
|
|
.futureValue
|
|
|
|
|
.sorted should ===(0 to 198 by 2)
|
|
|
|
|
}
|
|
|
|
|
|
2015-12-22 21:15:57 +01:00
|
|
|
"use multiple actors when there are asynchronous boundaries in the subflows (manual)" in {
|
2018-05-03 13:24:51 +02:00
|
|
|
val async = Flow[Int].map(x ⇒ { testActor ! actorRunningStage; x }).async
|
2015-12-17 13:35:37 +01:00
|
|
|
Source(0 to 9)
|
2018-05-03 13:24:51 +02:00
|
|
|
.via(snitchFlow.async)
|
2015-12-17 13:35:37 +01:00
|
|
|
.flatMapMerge(5, i ⇒ Source.single(i).via(async))
|
|
|
|
|
.grouped(1000)
|
|
|
|
|
.runWith(Sink.head)
|
|
|
|
|
.futureValue
|
|
|
|
|
.sorted should ===(0 to 9)
|
|
|
|
|
val refs = receiveN(20)
|
2018-05-03 13:24:51 +02:00
|
|
|
refs.toSet should have size (11) // main flow + 10 subflows
|
2015-12-22 21:15:57 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
"use multiple actors when there are asynchronous boundaries in the subflows (combinator)" in {
|
|
|
|
|
Source(0 to 9)
|
2018-05-03 13:24:51 +02:00
|
|
|
.via(snitchFlow)
|
|
|
|
|
.flatMapMerge(5, i ⇒ Source.single(i).via(snitchFlow.async))
|
2015-12-22 21:15:57 +01:00
|
|
|
.grouped(1000)
|
|
|
|
|
.runWith(Sink.head)
|
|
|
|
|
.futureValue
|
|
|
|
|
.sorted should ===(0 to 9)
|
|
|
|
|
val refs = receiveN(20)
|
2018-05-03 13:24:51 +02:00
|
|
|
refs.toSet should have size (11) // main flow + 10 subflows
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
"use one actor per grouped substream when there is an async boundary around the flow (manual)" in {
|
|
|
|
|
val in = 0 to 9
|
|
|
|
|
Source(in)
|
|
|
|
|
.via(snitchFlow)
|
|
|
|
|
.groupBy(in.size, identity)
|
|
|
|
|
.via(snitchFlow.async)
|
|
|
|
|
.mergeSubstreams
|
|
|
|
|
.runWith(Sink.seq)
|
|
|
|
|
.futureValue.sorted should ===(in)
|
|
|
|
|
val refs = receiveN(in.size + in.size) // each element through the first map, then the second map
|
|
|
|
|
|
|
|
|
|
refs.toSet should have size (in.size + 1) // outer/main actor + 1 actor per subflow
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
"use one actor per grouped substream when there is an async boundary around the flow (combinator)" in {
|
|
|
|
|
val in = 0 to 9
|
|
|
|
|
Source(in)
|
|
|
|
|
.via(snitchFlow)
|
|
|
|
|
.groupBy(in.size, identity)
|
|
|
|
|
.via(snitchFlow)
|
|
|
|
|
.async
|
|
|
|
|
.mergeSubstreams
|
|
|
|
|
.runWith(Sink.seq)
|
|
|
|
|
.futureValue.sorted should ===(in)
|
|
|
|
|
val refs = receiveN(in.size + in.size) // each element through the first map, then the second map
|
|
|
|
|
refs.toSet should have size (in.size + 1) // outer/main actor + 1 actor per subflow
|
2015-12-17 13:35:37 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
2015-12-14 17:02:00 +01:00
|
|
|
}
|