Fixes #20553 Tree flattening should be separate from Fusing

This commit is contained in:
Kam Kasravi 2016-10-17 13:57:06 -07:00
parent 3bd53c8bc2
commit a47bccbec3
6 changed files with 158 additions and 39 deletions

View file

@ -3,12 +3,14 @@
*/
package akka.stream
import akka.NotUsed
import akka.stream.scaladsl._
import akka.stream.testkit.StreamSpec
import akka.stream.Attributes._
import akka.stream.Fusing.FusedGraph
import scala.annotation.tailrec
import akka.stream.impl.StreamLayout.{ CopiedModule, Module }
import akka.stream.impl.StreamLayout._
import akka.stream.impl.fusing.GraphInterpreter
import akka.event.BusLogging
@ -23,7 +25,7 @@ class FusingSpec extends StreamSpec {
.via(Flow[Int].fold(1)(_ + _).named("mainSink"))
def singlePath[S <: Shape, M](fg: FusedGraph[S, M], from: Attribute, to: Attribute): Unit = {
val starts = fg.module.info.allModules.filter(_.attributes.contains(from))
val starts = fg.module.info.subModules.filter(_.attributes.contains(from))
starts.size should ===(1)
val start = starts.head
val ups = fg.module.info.upstreams
@ -44,6 +46,48 @@ class FusingSpec extends StreamSpec {
rec(start)
}
case object NoSubModulesModule extends AtomicModule {
override def shape = ClosedShape
override def replaceShape(s: Shape) =
if (s != shape) throw new UnsupportedOperationException("cannot replace shape")
else this
override def compose(that: Module): Module = compose(that, scaladsl.Keep.left)
override def compose[A, B, C](that: Module, f: (A, B) C): Module = {
if (f eq scaladsl.Keep.right) {
that
} else if (f eq scaladsl.Keep.left) {
val mat =
if (IgnorableMatValComp(that)) {
Ignore
} else {
Transform(_ NotUsed, that.materializedValueComputation)
}
CompositeModule(
if (that.isSealed) Set(that) else that.subModules,
that.shape,
that.downstreams,
that.upstreams,
mat,
if (this.isSealed) Attributes.none else attributes)
} else {
throw new UnsupportedOperationException(
"It is invalid to combine materialized value with BogusModule " +
"except with Keep.left or Keep.right")
}
}
override def withAttributes(attributes: Attributes): Module =
throw new UnsupportedOperationException("BogusModule cannot carry attributes")
override def attributes = Attributes.none
override def carbonCopy: Module = this
override def isRunnable: Boolean = true
override def isAtomic: Boolean = true
override def materializedValueComputation: MaterializedValueNode = Ignore
}
"Fusing" must {
def verify[S <: Shape, M](fused: FusedGraph[S, M], modules: Int, downstreams: Int): Unit = {
@ -85,6 +129,19 @@ class FusingSpec extends StreamSpec {
verify(fused, modules = 2, downstreams = 6)
}
"fuse a Module with no subModules" in {
val structuralInfoModule = Fusing.structuralInfo(RunnableGraph(NoSubModulesModule), Attributes.none)
structuralInfoModule.matValues.size > 0
}
"fuse a Module with empty graph" in {
val g = GraphDSL.create() { implicit b
ClosedShape
}
val structuralInfoModule = Fusing.structuralInfo(g, Attributes.none)
structuralInfoModule.matValues.size > 0
}
}
"SubFusingActorMaterializer" must {

View file

@ -42,7 +42,7 @@ trait UnzipWithApply {
}
[2..20#/** `UnzipWith` specialized for 1 outputs */
class UnzipWith1[In, [#A1#]](unzipper: In ⇒ ([#A1#])) extends GraphStage[FanOutShape1[In, [#A1#]]] {
class UnzipWith1[In, [#A1#]](val unzipper: In ⇒ ([#A1#])) extends GraphStage[FanOutShape1[In, [#A1#]]] {
override def initialAttributes = Attributes.name("UnzipWith1")
override val shape: FanOutShape1[In, [#A1#]] = new FanOutShape1[In, [#A1#]]("UnzipWith1")
def in: Inlet[In] = shape.in

View file

@ -33,6 +33,12 @@ object Fusing {
*/
def aggressive[S <: Shape, M](g: Graph[S, M]): FusedGraph[S, M] = Impl.aggressive(g)
/**
* Return the StructuralInfo for this Graph without any fusing
*/
def structuralInfo[S <: Shape, M](g: Graph[S, M], attributes: Attributes): StructuralInfoModule =
Impl.structuralInfo(g, attributes)
/**
* A fused graph of the right shape, containing a [[FusedModule]] which
* holds more information on the operation structure of the contained stream
@ -53,18 +59,4 @@ object Fusing {
}
}
/**
* When fusing a [[Graph]] a part of the internal stage wirings are hidden within
* [[akka.stream.impl.fusing.GraphInterpreter#GraphAssembly]] objects that are
* optimized for high-speed execution. This structural information bundle contains
* the wirings in a more accessible form, allowing traversal from port to upstream
* or downstream port and from there to the owning module (or graph vertex).
*/
final case class StructuralInfo(
upstreams: immutable.Map[InPort, OutPort],
downstreams: immutable.Map[OutPort, InPort],
inOwners: immutable.Map[InPort, Module],
outOwners: immutable.Map[OutPort, Module],
allModules: Set[Module])
}

View file

@ -115,10 +115,11 @@ object StreamLayout {
}
def apply(module: Module): Boolean =
module match {
case _: AtomicModule | EmptyModule true
case CopiedModule(_, _, module) IgnorableMatValComp(module)
case CompositeModule(_, _, _, _, comp, _) IgnorableMatValComp(comp)
case FusedModule(_, _, _, _, comp, _, _) IgnorableMatValComp(comp)
case _: AtomicModule | EmptyModule true
case CopiedModule(_, _, module) IgnorableMatValComp(module)
case CompositeModule(_, _, _, _, comp, _) IgnorableMatValComp(comp)
case FusedModule(_, _, _, _, comp, _, _) IgnorableMatValComp(comp)
case StructuralInfoModule(_, _, _, _, _, _, _, comp, _) IgnorableMatValComp(comp)
}
}
@ -445,6 +446,40 @@ object StreamLayout {
def apply(m: Module, s: Shape): CompositeModule = CompositeModule(Set(m), s, Map.empty, Map.empty, Atomic(m), Attributes.none)
}
/**
* INTERNAL API
*
* When fusing a [[Graph]] a part of the internal stage wirings are hidden within
* [[akka.stream.impl.fusing.GraphInterpreter#GraphAssembly]] objects that are
* optimized for high-speed execution. This structural information module contains
* the wirings in a more accessible form, allowing traversal from port to upstream
* or downstream port and from there to the owning module (or graph vertex).
*/
final case class StructuralInfoModule(
override val subModules: Set[Module],
override val shape: Shape,
override val downstreams: Map[OutPort, InPort],
override val upstreams: Map[InPort, OutPort],
inOwners: Map[InPort, Module],
outOwners: Map[OutPort, Module],
matValues: List[(Module, MaterializedValueNode)],
override val materializedValueComputation: MaterializedValueNode,
override val attributes: Attributes) extends Module {
override def isFused: Boolean = false
override def replaceShape(s: Shape): Module =
if (s != shape) {
shape.requireSamePortsAs(s)
copy(shape = s)
} else this
override def carbonCopy: Module = CopiedModule(shape.deepCopy(), attributes, copyOf = this)
override def withAttributes(attributes: Attributes): StructuralInfoModule = copy(attributes = attributes)
}
final case class FusedModule(
override val subModules: Set[Module],
override val shape: Shape,
@ -452,7 +487,7 @@ object StreamLayout {
override val upstreams: Map[InPort, OutPort],
override val materializedValueComputation: MaterializedValueNode,
override val attributes: Attributes,
info: Fusing.StructuralInfo) extends Module {
info: StructuralInfoModule) extends Module {
override def isFused: Boolean = true
@ -921,7 +956,7 @@ abstract class MaterializerSession(val topLevel: StreamLayout.Module, val initia
enterScope(copied)
materializedValues.put(copied, materializeModule(copied, subEffectiveAttributes))
exitScope(copied)
case composite @ (_: CompositeModule | _: FusedModule)
case composite @ (_: CompositeModule | _: FusedModule | _: StructuralInfoModule)
materializedValues.put(composite, materializeComposite(composite, subEffectiveAttributes))
case EmptyModule // nothing to do or say
}

View file

@ -3,19 +3,21 @@
*/
package akka.stream.impl.fusing
import java.{ util ju }
import java.util.Arrays
import scala.collection.immutable
import scala.collection.JavaConverters._
import scala.util.control.NonFatal
import scala.annotation.tailrec
import akka.stream._
import java.{ util ju }
import akka.stream.Attributes.AsyncBoundary
import akka.stream.Fusing.{ FusedGraph, StructuralInfo }
import akka.stream.stage.GraphStageWithMaterializedValue
import akka.stream.Fusing.FusedGraph
import akka.stream._
import akka.stream.impl.StreamLayout
import akka.stream.impl.StreamLayout._
import akka.stream.impl.fusing.GraphStages.MaterializedValueSource
import akka.stream.stage.GraphStageWithMaterializedValue
import scala.annotation.tailrec
import scala.collection.JavaConverters._
import scala.collection.immutable
import scala.util.control.NonFatal
/**
* INTERNAL API
@ -34,6 +36,31 @@ private[stream] object Fusing {
case _ doAggressive(g)
}
def structuralInfo[S <: Shape, M](g: Graph[S, M], attributes: Attributes): StructuralInfoModule = {
val struct = new BuildStructuralInfo
/*
* First perform normalization by descending the module tree and recording
* information in the BuildStructuralInfo instance.
*/
val matValue =
try descend(g.module, Attributes.none, struct, struct.newGroup(0), 0)
catch {
case NonFatal(ex)
if (Debug) struct.dump()
throw ex
}
/*
* Then create a copy of the original Shape with the new copied ports.
*/
val shape = g.shape.copyFromPorts(
struct.newInlets(g.shape.inlets),
struct.newOutlets(g.shape.outlets)).asInstanceOf[S]
/*
* Extract the full topological information from the builder
*/
struct.toInfo(shape, matValue, attributes)
}
private def doAggressive[S <: Shape, M](g: Graph[S, M]): FusedGraph[S, M] = {
val struct = new BuildStructuralInfo
/*
@ -57,7 +84,7 @@ private[stream] object Fusing {
* Extract the full topological information from the builder before
* removing assembly-internal (fused) wirings in the next step.
*/
val info = struct.toInfo
val info = struct.toInfo(shape, matValue)
/*
* Perform the fusing of `struct.groups` into GraphModules (leaving them
* as they are for non-fusable modules).
@ -443,13 +470,18 @@ private[stream] object Fusing {
* it and performing normalization.
*/
final class BuildStructuralInfo {
def toInfo: StructuralInfo =
StructuralInfo(
immutable.Map.empty ++ upstreams.asScala,
def toInfo[S <: Shape](shape: S, matValues: List[(Module, MaterializedValueNode)],
attributes: Attributes = Attributes.none): StructuralInfoModule =
StructuralInfoModule(
Set.empty ++ modules.asScala,
shape,
immutable.Map.empty ++ downstreams.asScala,
immutable.Map.empty ++ upstreams.asScala,
immutable.Map.empty ++ inOwners.asScala,
immutable.Map.empty ++ outOwners.asScala,
Set.empty ++ modules.asScala)
matValues,
matValues.head._2,
attributes)
/**
* the set of all contained modules

View file

@ -1015,13 +1015,16 @@ object MiMa extends AutoPlugin {
ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.actor.VirtualPathContainer.log"),
ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.actor.VirtualPathContainer.this"),
ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.remote.RemoteSystemDaemon.this")
),
"2.4.12" -> Seq(
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.Materializer.materialize"),
// #21775 - overrode ByteString.stringPrefix and made it final
ProblemFilters.exclude[FinalMethodProblem]("akka.util.ByteString.stringPrefix")
ProblemFilters.exclude[FinalMethodProblem]("akka.util.ByteString.stringPrefix"),
// #20553 Tree flattening should be separate from Fusing
ProblemFilters.exclude[MissingClassProblem]("akka.stream.Fusing$StructuralInfo"),
ProblemFilters.exclude[MissingClassProblem]("akka.stream.Fusing$StructuralInfo$")
)
)
}