pekko/akka-stream-tests/src/test/scala/akka/stream/FusingSpec.scala

91 lines
2.9 KiB
Scala
Raw Normal View History

2015-12-14 17:02:00 +01:00
/**
* Copyright (C) 2015 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.stream
import akka.stream._
import akka.stream.scaladsl._
import akka.stream.testkit.AkkaSpec
import org.scalactic.ConversionCheckedTripleEquals
2015-12-15 16:44:48 +01:00
import akka.stream.Attributes._
import akka.stream.Fusing.FusedGraph
import scala.annotation.tailrec
import akka.stream.impl.StreamLayout.Module
2015-12-14 17:02:00 +01:00
class FusingSpec extends AkkaSpec with ConversionCheckedTripleEquals {
2015-12-15 16:44:48 +01:00
final val Debug = false
2015-12-14 17:02:00 +01:00
implicit val materializer = ActorMaterializer()
2015-12-15 16:44:48 +01:00
def graph(async: Boolean) =
Source.unfoldInf(1)(x (x, x)).filter(_ % 2 == 1)
.alsoTo(Flow[Int].fold(0)(_ + _).to(Sink.head.named("otherSink")).addAttributes(if (async) Attributes.asyncBoundary else Attributes.none))
.via(Flow[Int].fold(1)(_ + _).named("mainSink"))
def singlePath[S <: Shape, M](fg: FusedGraph[S, M], from: Attribute, to: Attribute): Unit = {
val starts = fg.module.info.allModules.filter(_.attributes.contains(from))
starts.size should ===(1)
val start = starts.head
val ups = fg.module.info.upstreams
val owner = fg.module.info.outOwners
@tailrec def rec(curr: Module): Unit = {
if (Debug) println(extractName(curr, "unknown"))
if (curr.attributes.contains(to)) () // done
else {
val outs = curr.inPorts.map(ups)
outs.size should ===(1)
val out = outs.head
val next = owner(out)
rec(next)
}
}
rec(start)
}
2015-12-14 17:02:00 +01:00
"Fusing" must {
2015-12-15 16:44:48 +01:00
def verify[S <: Shape, M](fused: FusedGraph[S, M], modules: Int, downstreams: Int): Unit = {
val module = fused.module
module.subModules.size should ===(modules)
module.downstreams.size should ===(modules - 1)
module.info.downstreams.size should be >= downstreams
module.info.upstreams.size should be >= downstreams
singlePath(fused, Attributes.Name("mainSink"), Attributes.Name("unfoldInf"))
singlePath(fused, Attributes.Name("otherSink"), Attributes.Name("unfoldInf"))
}
2015-12-14 17:02:00 +01:00
"fuse a moderately complex graph" in {
2015-12-15 16:44:48 +01:00
val g = graph(false)
2015-12-14 17:02:00 +01:00
val fused = Fusing.aggressive(g)
2015-12-15 16:44:48 +01:00
verify(fused, modules = 1, downstreams = 5)
2015-12-14 17:02:00 +01:00
}
"not fuse across AsyncBoundary" in {
2015-12-15 16:44:48 +01:00
val g = graph(true)
2015-12-14 17:02:00 +01:00
val fused = Fusing.aggressive(g)
2015-12-15 16:44:48 +01:00
verify(fused, modules = 2, downstreams = 5)
}
"not fuse a FusedGraph again" in {
val g = Fusing.aggressive(graph(false))
Fusing.aggressive(g) should be theSameInstanceAs g
}
"properly fuse a FusedGraph that has been extended (no AsyncBoundary)" in {
val src = Fusing.aggressive(graph(false))
val fused = Fusing.aggressive(Source.fromGraph(src).to(Sink.head))
verify(fused, modules = 1, downstreams = 6)
}
"properly fuse a FusedGraph that has been extended (with AsyncBoundary)" in {
val src = Fusing.aggressive(graph(true))
val fused = Fusing.aggressive(Source.fromGraph(src).to(Sink.head))
verify(fused, modules = 2, downstreams = 6)
2015-12-14 17:02:00 +01:00
}
}
}