Additional tests for async boundaries and GroupBy #24676
This commit is contained in:
parent
d08f31bcdb
commit
dfd8d8aa81
5 changed files with 83 additions and 28 deletions
|
|
@ -4,20 +4,19 @@
|
||||||
|
|
||||||
package akka.stream
|
package akka.stream
|
||||||
|
|
||||||
|
import akka.stream.impl.fusing.GraphInterpreter
|
||||||
import akka.stream.scaladsl._
|
import akka.stream.scaladsl._
|
||||||
import akka.stream.testkit.StreamSpec
|
import akka.stream.testkit.StreamSpec
|
||||||
import akka.stream.impl.fusing.GraphInterpreter
|
|
||||||
import akka.event.BusLogging
|
|
||||||
|
|
||||||
class FusingSpec extends StreamSpec {
|
class FusingSpec extends StreamSpec {
|
||||||
|
|
||||||
final val Debug = false
|
|
||||||
implicit val materializer = ActorMaterializer()
|
implicit val materializer = ActorMaterializer()
|
||||||
|
|
||||||
def graph(async: Boolean) =
|
def actorRunningStage = {
|
||||||
Source.unfold(1)(x ⇒ Some(x → x)).filter(_ % 2 == 1)
|
GraphInterpreter.currentInterpreter.context
|
||||||
.alsoTo(Flow[Int].fold(0)(_ + _).to(Sink.head.named("otherSink")).addAttributes(if (async) Attributes.asyncBoundary else Attributes.none))
|
}
|
||||||
.via(Flow[Int].fold(1)(_ + _).named("mainSink"))
|
|
||||||
|
val snitchFlow = Flow[Int].map(x ⇒ { testActor ! actorRunningStage; x }).async
|
||||||
|
|
||||||
"SubFusingActorMaterializer" must {
|
"SubFusingActorMaterializer" must {
|
||||||
|
|
||||||
|
|
@ -33,41 +32,56 @@ class FusingSpec extends StreamSpec {
|
||||||
}
|
}
|
||||||
|
|
||||||
"use multiple actors when there are asynchronous boundaries in the subflows (manual)" in {
|
"use multiple actors when there are asynchronous boundaries in the subflows (manual)" in {
|
||||||
def ref = {
|
val async = Flow[Int].map(x ⇒ { testActor ! actorRunningStage; x }).async
|
||||||
val bus = GraphInterpreter.currentInterpreter.log.asInstanceOf[BusLogging]
|
|
||||||
bus.logSource
|
|
||||||
}
|
|
||||||
val async = Flow[Int].map(x ⇒ { testActor ! ref; x }).async
|
|
||||||
Source(0 to 9)
|
Source(0 to 9)
|
||||||
.map(x ⇒ { testActor ! ref; x })
|
.via(snitchFlow.async)
|
||||||
.flatMapMerge(5, i ⇒ Source.single(i).via(async))
|
.flatMapMerge(5, i ⇒ Source.single(i).via(async))
|
||||||
.grouped(1000)
|
.grouped(1000)
|
||||||
.runWith(Sink.head)
|
.runWith(Sink.head)
|
||||||
.futureValue
|
.futureValue
|
||||||
.sorted should ===(0 to 9)
|
.sorted should ===(0 to 9)
|
||||||
val refs = receiveN(20)
|
val refs = receiveN(20)
|
||||||
withClue(s"refs=\n${refs.mkString("\n")}") {
|
refs.toSet should have size (11) // main flow + 10 subflows
|
||||||
refs.toSet.size should ===(11) // main flow + 10 subflows
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
"use multiple actors when there are asynchronous boundaries in the subflows (combinator)" in {
|
"use multiple actors when there are asynchronous boundaries in the subflows (combinator)" in {
|
||||||
def ref = {
|
|
||||||
val bus = GraphInterpreter.currentInterpreter.log.asInstanceOf[BusLogging]
|
|
||||||
bus.logSource
|
|
||||||
}
|
|
||||||
val flow = Flow[Int].map(x ⇒ { testActor ! ref; x })
|
|
||||||
Source(0 to 9)
|
Source(0 to 9)
|
||||||
.map(x ⇒ { testActor ! ref; x })
|
.via(snitchFlow)
|
||||||
.flatMapMerge(5, i ⇒ Source.single(i).via(flow.async))
|
.flatMapMerge(5, i ⇒ Source.single(i).via(snitchFlow.async))
|
||||||
.grouped(1000)
|
.grouped(1000)
|
||||||
.runWith(Sink.head)
|
.runWith(Sink.head)
|
||||||
.futureValue
|
.futureValue
|
||||||
.sorted should ===(0 to 9)
|
.sorted should ===(0 to 9)
|
||||||
val refs = receiveN(20)
|
val refs = receiveN(20)
|
||||||
withClue(s"refs=\n${refs.mkString("\n")}") {
|
refs.toSet should have size (11) // main flow + 10 subflows
|
||||||
refs.toSet.size should ===(11) // main flow + 10 subflows
|
}
|
||||||
}
|
|
||||||
|
"use one actor per grouped substream when there is an async boundary around the flow (manual)" in {
|
||||||
|
val in = 0 to 9
|
||||||
|
Source(in)
|
||||||
|
.via(snitchFlow)
|
||||||
|
.groupBy(in.size, identity)
|
||||||
|
.via(snitchFlow.async)
|
||||||
|
.mergeSubstreams
|
||||||
|
.runWith(Sink.seq)
|
||||||
|
.futureValue.sorted should ===(in)
|
||||||
|
val refs = receiveN(in.size + in.size) // each element through the first map, then the second map
|
||||||
|
|
||||||
|
refs.toSet should have size (in.size + 1) // outer/main actor + 1 actor per subflow
|
||||||
|
}
|
||||||
|
|
||||||
|
"use one actor per grouped substream when there is an async boundary around the flow (combinator)" in {
|
||||||
|
val in = 0 to 9
|
||||||
|
Source(in)
|
||||||
|
.via(snitchFlow)
|
||||||
|
.groupBy(in.size, identity)
|
||||||
|
.via(snitchFlow)
|
||||||
|
.async
|
||||||
|
.mergeSubstreams
|
||||||
|
.runWith(Sink.seq)
|
||||||
|
.futureValue.sorted should ===(in)
|
||||||
|
val refs = receiveN(in.size + in.size) // each element through the first map, then the second map
|
||||||
|
refs.toSet should have size (in.size + 1) // outer/main actor + 1 actor per subflow
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -6,7 +6,7 @@ package akka.stream.scaladsl
|
||||||
|
|
||||||
import java.util
|
import java.util
|
||||||
|
|
||||||
import akka.NotUsed
|
import akka.{ Done, NotUsed }
|
||||||
import akka.actor.ActorSystem
|
import akka.actor.ActorSystem
|
||||||
import akka.stream.Attributes._
|
import akka.stream.Attributes._
|
||||||
import akka.stream.impl.SinkModule
|
import akka.stream.impl.SinkModule
|
||||||
|
|
@ -26,6 +26,8 @@ import akka.stream.testkit.scaladsl.TestSource
|
||||||
import akka.stream.testkit.scaladsl.TestSink
|
import akka.stream.testkit.scaladsl.TestSink
|
||||||
import java.util.concurrent.ThreadLocalRandom
|
import java.util.concurrent.ThreadLocalRandom
|
||||||
|
|
||||||
|
import akka.testkit.TestLatch
|
||||||
|
|
||||||
object FlowGroupBySpec {
|
object FlowGroupBySpec {
|
||||||
|
|
||||||
implicit class Lift[M](val f: SubFlow[Int, M, Source[Int, M]#Repr, RunnableGraph[M]]) extends AnyVal {
|
implicit class Lift[M](val f: SubFlow[Int, M, Source[Int, M]#Repr, RunnableGraph[M]]) extends AnyVal {
|
||||||
|
|
@ -617,6 +619,35 @@ class FlowGroupBySpec extends StreamSpec {
|
||||||
upstreamSubscription.sendComplete()
|
upstreamSubscription.sendComplete()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
"not block all substreams when one is blocked but has a buffer in front" in assertAllStagesStopped {
|
||||||
|
case class Elem(id: Int, substream: Int, f: () ⇒ Any)
|
||||||
|
val queue = Source.queue[Elem](3, OverflowStrategy.backpressure)
|
||||||
|
.groupBy(2, _.substream)
|
||||||
|
.buffer(2, OverflowStrategy.backpressure)
|
||||||
|
.map { _.f() }.async
|
||||||
|
.to(Sink.ignore)
|
||||||
|
.run()
|
||||||
|
|
||||||
|
val threeProcessed = Promise[Done]()
|
||||||
|
val blockSubStream1 = TestLatch()
|
||||||
|
List(
|
||||||
|
Elem(1, 1, () ⇒ {
|
||||||
|
// timeout just to not wait forever if something is wrong, not really relevant for test
|
||||||
|
Await.result(blockSubStream1, 10.seconds)
|
||||||
|
1
|
||||||
|
}),
|
||||||
|
Elem(2, 1, () ⇒ 2),
|
||||||
|
Elem(3, 2, () ⇒ {
|
||||||
|
threeProcessed.success(Done)
|
||||||
|
3
|
||||||
|
})).foreach(queue.offer)
|
||||||
|
// two and three are processed as fast as possible, not blocked by substream 1 being clogged
|
||||||
|
threeProcessed.future.futureValue should ===(Done)
|
||||||
|
// let 1 pass so stream can complete
|
||||||
|
blockSubStream1.open()
|
||||||
|
queue.complete()
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -743,7 +743,11 @@ private final case class SavedIslandData(islandGlobalOffset: Int, lastVisitedOff
|
||||||
case OptionVal.Some(n) ⇒ n
|
case OptionVal.Some(n) ⇒ n
|
||||||
case OptionVal.None ⇒ islandName
|
case OptionVal.None ⇒ islandName
|
||||||
}
|
}
|
||||||
materializer.actorOf(props, actorName)
|
|
||||||
|
val ref = materializer.actorOf(props, actorName)
|
||||||
|
if (PhasedFusingActorMaterializer.Debug) {
|
||||||
|
println(s"Spawned actor [$ref] with shell: $shell")
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -114,6 +114,9 @@ class SubFlow[In, Out, Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Flow[I
|
||||||
* | +------+ +------+ |
|
* | +------+ +------+ |
|
||||||
* +----------------------------+
|
* +----------------------------+
|
||||||
* }}}
|
* }}}
|
||||||
|
*
|
||||||
|
* Note that attributes set on the returned graph, including async boundaries are now for the entire graph and not
|
||||||
|
* the `SubFlow`. for example `async` will not have any effect as the returned graph is the entire, closed graph.
|
||||||
*/
|
*/
|
||||||
def to(sink: Graph[SinkShape[Out], _]): Sink[In, Mat] =
|
def to(sink: Graph[SinkShape[Out], _]): Sink[In, Mat] =
|
||||||
new Sink(delegate.to(sink))
|
new Sink(delegate.to(sink))
|
||||||
|
|
|
||||||
|
|
@ -21,6 +21,9 @@ trait SubFlow[+Out, +Mat, +F[+_], C] extends FlowOps[Out, Mat] {
|
||||||
/**
|
/**
|
||||||
* Attach a [[Sink]] to each sub-flow, closing the overall Graph that is being
|
* Attach a [[Sink]] to each sub-flow, closing the overall Graph that is being
|
||||||
* constructed.
|
* constructed.
|
||||||
|
*
|
||||||
|
* Note that attributes set on the returned graph, including async boundaries are now for the entire graph and not
|
||||||
|
* the `SubFlow`. for example `async` will not have any effect as the returned graph is the entire, closed graph.
|
||||||
*/
|
*/
|
||||||
def to[M](sink: Graph[SinkShape[Out], M]): C
|
def to[M](sink: Graph[SinkShape[Out], M]): C
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue