From a47bccbec3c291039e23c7dfa0850df4597ec41f Mon Sep 17 00:00:00 2001 From: Kam Kasravi Date: Mon, 17 Oct 2016 13:57:06 -0700 Subject: [PATCH] Fixes #20553 Tree flattening should be separate from Fusing --- .../test/scala/akka/stream/FusingSpec.scala | 61 ++++++++++++++++++- .../scaladsl/UnzipWithApply.scala.template | 4 +- .../src/main/scala/akka/stream/Fusing.scala | 20 ++---- .../scala/akka/stream/impl/StreamLayout.scala | 47 ++++++++++++-- .../akka/stream/impl/fusing/Fusing.scala | 58 ++++++++++++++---- project/MiMa.scala | 7 ++- 6 files changed, 158 insertions(+), 39 deletions(-) diff --git a/akka-stream-tests/src/test/scala/akka/stream/FusingSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/FusingSpec.scala index f5bb5f1bd1..5907bccfd4 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/FusingSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/FusingSpec.scala @@ -3,12 +3,14 @@ */ package akka.stream +import akka.NotUsed import akka.stream.scaladsl._ import akka.stream.testkit.StreamSpec import akka.stream.Attributes._ import akka.stream.Fusing.FusedGraph + import scala.annotation.tailrec -import akka.stream.impl.StreamLayout.{ CopiedModule, Module } +import akka.stream.impl.StreamLayout._ import akka.stream.impl.fusing.GraphInterpreter import akka.event.BusLogging @@ -23,7 +25,7 @@ class FusingSpec extends StreamSpec { .via(Flow[Int].fold(1)(_ + _).named("mainSink")) def singlePath[S <: Shape, M](fg: FusedGraph[S, M], from: Attribute, to: Attribute): Unit = { - val starts = fg.module.info.allModules.filter(_.attributes.contains(from)) + val starts = fg.module.info.subModules.filter(_.attributes.contains(from)) starts.size should ===(1) val start = starts.head val ups = fg.module.info.upstreams @@ -44,6 +46,48 @@ class FusingSpec extends StreamSpec { rec(start) } + case object NoSubModulesModule extends AtomicModule { + override def shape = ClosedShape + override def replaceShape(s: Shape) = + if (s != shape) throw new UnsupportedOperationException("cannot replace shape") + else this + + override def compose(that: Module): Module = compose(that, scaladsl.Keep.left) + + override def compose[A, B, C](that: Module, f: (A, B) ⇒ C): Module = { + if (f eq scaladsl.Keep.right) { + that + } else if (f eq scaladsl.Keep.left) { + val mat = + if (IgnorableMatValComp(that)) { + Ignore + } else { + Transform(_ ⇒ NotUsed, that.materializedValueComputation) + } + CompositeModule( + if (that.isSealed) Set(that) else that.subModules, + that.shape, + that.downstreams, + that.upstreams, + mat, + if (this.isSealed) Attributes.none else attributes) + } else { + throw new UnsupportedOperationException( + "It is invalid to combine materialized value with BogusModule " + + "except with Keep.left or Keep.right") + } + } + + override def withAttributes(attributes: Attributes): Module = + throw new UnsupportedOperationException("BogusModule cannot carry attributes") + + override def attributes = Attributes.none + override def carbonCopy: Module = this + override def isRunnable: Boolean = true + override def isAtomic: Boolean = true + override def materializedValueComputation: MaterializedValueNode = Ignore + } + "Fusing" must { def verify[S <: Shape, M](fused: FusedGraph[S, M], modules: Int, downstreams: Int): Unit = { @@ -85,6 +129,19 @@ class FusingSpec extends StreamSpec { verify(fused, modules = 2, downstreams = 6) } + "fuse a Module with no subModules" in { + val structuralInfoModule = Fusing.structuralInfo(RunnableGraph(NoSubModulesModule), Attributes.none) + structuralInfoModule.matValues.size > 0 + } + + "fuse a Module with empty graph" in { + val g = GraphDSL.create() { implicit b ⇒ + ClosedShape + } + val structuralInfoModule = Fusing.structuralInfo(g, Attributes.none) + structuralInfoModule.matValues.size > 0 + } + } "SubFusingActorMaterializer" must { diff --git a/akka-stream/src/main/boilerplate/akka/stream/scaladsl/UnzipWithApply.scala.template b/akka-stream/src/main/boilerplate/akka/stream/scaladsl/UnzipWithApply.scala.template index d87d3e5d13..170501dd1b 100644 --- a/akka-stream/src/main/boilerplate/akka/stream/scaladsl/UnzipWithApply.scala.template +++ b/akka-stream/src/main/boilerplate/akka/stream/scaladsl/UnzipWithApply.scala.template @@ -42,7 +42,7 @@ trait UnzipWithApply { } [2..20#/** `UnzipWith` specialized for 1 outputs */ -class UnzipWith1[In, [#A1#]](unzipper: In ⇒ ([#A1#])) extends GraphStage[FanOutShape1[In, [#A1#]]] { +class UnzipWith1[In, [#A1#]](val unzipper: In ⇒ ([#A1#])) extends GraphStage[FanOutShape1[In, [#A1#]]] { override def initialAttributes = Attributes.name("UnzipWith1") override val shape: FanOutShape1[In, [#A1#]] = new FanOutShape1[In, [#A1#]]("UnzipWith1") def in: Inlet[In] = shape.in @@ -92,4 +92,4 @@ class UnzipWith1[In, [#A1#]](unzipper: In ⇒ ([#A1#])) extends GraphStage[FanOu override def toString = "UnzipWith1" } # -] \ No newline at end of file +] diff --git a/akka-stream/src/main/scala/akka/stream/Fusing.scala b/akka-stream/src/main/scala/akka/stream/Fusing.scala index b89641eef5..3ed5881350 100644 --- a/akka-stream/src/main/scala/akka/stream/Fusing.scala +++ b/akka-stream/src/main/scala/akka/stream/Fusing.scala @@ -33,6 +33,12 @@ object Fusing { */ def aggressive[S <: Shape, M](g: Graph[S, M]): FusedGraph[S, M] = Impl.aggressive(g) + /** + * Return the StructuralInfo for this Graph without any fusing + */ + def structuralInfo[S <: Shape, M](g: Graph[S, M], attributes: Attributes): StructuralInfoModule = + Impl.structuralInfo(g, attributes) + /** * A fused graph of the right shape, containing a [[FusedModule]] which * holds more information on the operation structure of the contained stream @@ -53,18 +59,4 @@ object Fusing { } } - /** - * When fusing a [[Graph]] a part of the internal stage wirings are hidden within - * [[akka.stream.impl.fusing.GraphInterpreter#GraphAssembly]] objects that are - * optimized for high-speed execution. This structural information bundle contains - * the wirings in a more accessible form, allowing traversal from port to upstream - * or downstream port and from there to the owning module (or graph vertex). - */ - final case class StructuralInfo( - upstreams: immutable.Map[InPort, OutPort], - downstreams: immutable.Map[OutPort, InPort], - inOwners: immutable.Map[InPort, Module], - outOwners: immutable.Map[OutPort, Module], - allModules: Set[Module]) - } diff --git a/akka-stream/src/main/scala/akka/stream/impl/StreamLayout.scala b/akka-stream/src/main/scala/akka/stream/impl/StreamLayout.scala index 80ba1c2bae..997f3f55a8 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/StreamLayout.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/StreamLayout.scala @@ -115,10 +115,11 @@ object StreamLayout { } def apply(module: Module): Boolean = module match { - case _: AtomicModule | EmptyModule ⇒ true - case CopiedModule(_, _, module) ⇒ IgnorableMatValComp(module) - case CompositeModule(_, _, _, _, comp, _) ⇒ IgnorableMatValComp(comp) - case FusedModule(_, _, _, _, comp, _, _) ⇒ IgnorableMatValComp(comp) + case _: AtomicModule | EmptyModule ⇒ true + case CopiedModule(_, _, module) ⇒ IgnorableMatValComp(module) + case CompositeModule(_, _, _, _, comp, _) ⇒ IgnorableMatValComp(comp) + case FusedModule(_, _, _, _, comp, _, _) ⇒ IgnorableMatValComp(comp) + case StructuralInfoModule(_, _, _, _, _, _, _, comp, _) ⇒ IgnorableMatValComp(comp) } } @@ -445,6 +446,40 @@ object StreamLayout { def apply(m: Module, s: Shape): CompositeModule = CompositeModule(Set(m), s, Map.empty, Map.empty, Atomic(m), Attributes.none) } + /** + * INTERNAL API + * + * When fusing a [[Graph]] a part of the internal stage wirings are hidden within + * [[akka.stream.impl.fusing.GraphInterpreter#GraphAssembly]] objects that are + * optimized for high-speed execution. This structural information module contains + * the wirings in a more accessible form, allowing traversal from port to upstream + * or downstream port and from there to the owning module (or graph vertex). + */ + final case class StructuralInfoModule( + override val subModules: Set[Module], + override val shape: Shape, + override val downstreams: Map[OutPort, InPort], + override val upstreams: Map[InPort, OutPort], + inOwners: Map[InPort, Module], + outOwners: Map[OutPort, Module], + matValues: List[(Module, MaterializedValueNode)], + override val materializedValueComputation: MaterializedValueNode, + override val attributes: Attributes) extends Module { + + override def isFused: Boolean = false + + override def replaceShape(s: Shape): Module = + if (s != shape) { + shape.requireSamePortsAs(s) + copy(shape = s) + } else this + + override def carbonCopy: Module = CopiedModule(shape.deepCopy(), attributes, copyOf = this) + + override def withAttributes(attributes: Attributes): StructuralInfoModule = copy(attributes = attributes) + + } + final case class FusedModule( override val subModules: Set[Module], override val shape: Shape, @@ -452,7 +487,7 @@ object StreamLayout { override val upstreams: Map[InPort, OutPort], override val materializedValueComputation: MaterializedValueNode, override val attributes: Attributes, - info: Fusing.StructuralInfo) extends Module { + info: StructuralInfoModule) extends Module { override def isFused: Boolean = true @@ -921,7 +956,7 @@ abstract class MaterializerSession(val topLevel: StreamLayout.Module, val initia enterScope(copied) materializedValues.put(copied, materializeModule(copied, subEffectiveAttributes)) exitScope(copied) - case composite @ (_: CompositeModule | _: FusedModule) ⇒ + case composite @ (_: CompositeModule | _: FusedModule | _: StructuralInfoModule) ⇒ materializedValues.put(composite, materializeComposite(composite, subEffectiveAttributes)) case EmptyModule ⇒ // nothing to do or say } diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/Fusing.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/Fusing.scala index 7ace7f49dd..160b5b4cf8 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/Fusing.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/Fusing.scala @@ -3,19 +3,21 @@ */ package akka.stream.impl.fusing -import java.{ util ⇒ ju } import java.util.Arrays -import scala.collection.immutable -import scala.collection.JavaConverters._ -import scala.util.control.NonFatal -import scala.annotation.tailrec -import akka.stream._ +import java.{ util ⇒ ju } + import akka.stream.Attributes.AsyncBoundary -import akka.stream.Fusing.{ FusedGraph, StructuralInfo } -import akka.stream.stage.GraphStageWithMaterializedValue +import akka.stream.Fusing.FusedGraph +import akka.stream._ import akka.stream.impl.StreamLayout import akka.stream.impl.StreamLayout._ import akka.stream.impl.fusing.GraphStages.MaterializedValueSource +import akka.stream.stage.GraphStageWithMaterializedValue + +import scala.annotation.tailrec +import scala.collection.JavaConverters._ +import scala.collection.immutable +import scala.util.control.NonFatal /** * INTERNAL API @@ -34,6 +36,31 @@ private[stream] object Fusing { case _ ⇒ doAggressive(g) } + def structuralInfo[S <: Shape, M](g: Graph[S, M], attributes: Attributes): StructuralInfoModule = { + val struct = new BuildStructuralInfo + /* + * First perform normalization by descending the module tree and recording + * information in the BuildStructuralInfo instance. + */ + val matValue = + try descend(g.module, Attributes.none, struct, struct.newGroup(0), 0) + catch { + case NonFatal(ex) ⇒ + if (Debug) struct.dump() + throw ex + } + /* + * Then create a copy of the original Shape with the new copied ports. + */ + val shape = g.shape.copyFromPorts( + struct.newInlets(g.shape.inlets), + struct.newOutlets(g.shape.outlets)).asInstanceOf[S] + /* + * Extract the full topological information from the builder + */ + struct.toInfo(shape, matValue, attributes) + } + private def doAggressive[S <: Shape, M](g: Graph[S, M]): FusedGraph[S, M] = { val struct = new BuildStructuralInfo /* @@ -57,7 +84,7 @@ private[stream] object Fusing { * Extract the full topological information from the builder before * removing assembly-internal (fused) wirings in the next step. */ - val info = struct.toInfo + val info = struct.toInfo(shape, matValue) /* * Perform the fusing of `struct.groups` into GraphModules (leaving them * as they are for non-fusable modules). @@ -443,13 +470,18 @@ private[stream] object Fusing { * it and performing normalization. */ final class BuildStructuralInfo { - def toInfo: StructuralInfo = - StructuralInfo( - immutable.Map.empty ++ upstreams.asScala, + def toInfo[S <: Shape](shape: S, matValues: List[(Module, MaterializedValueNode)], + attributes: Attributes = Attributes.none): StructuralInfoModule = + StructuralInfoModule( + Set.empty ++ modules.asScala, + shape, immutable.Map.empty ++ downstreams.asScala, + immutable.Map.empty ++ upstreams.asScala, immutable.Map.empty ++ inOwners.asScala, immutable.Map.empty ++ outOwners.asScala, - Set.empty ++ modules.asScala) + matValues, + matValues.head._2, + attributes) /** * the set of all contained modules diff --git a/project/MiMa.scala b/project/MiMa.scala index de415e3df4..aa873b08c3 100644 --- a/project/MiMa.scala +++ b/project/MiMa.scala @@ -1015,13 +1015,16 @@ object MiMa extends AutoPlugin { ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.actor.VirtualPathContainer.log"), ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.actor.VirtualPathContainer.this"), ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.remote.RemoteSystemDaemon.this") - ), "2.4.12" -> Seq( ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.Materializer.materialize"), // #21775 - overrode ByteString.stringPrefix and made it final - ProblemFilters.exclude[FinalMethodProblem]("akka.util.ByteString.stringPrefix") + ProblemFilters.exclude[FinalMethodProblem]("akka.util.ByteString.stringPrefix"), + + // #20553 Tree flattening should be separate from Fusing + ProblemFilters.exclude[MissingClassProblem]("akka.stream.Fusing$StructuralInfo"), + ProblemFilters.exclude[MissingClassProblem]("akka.stream.Fusing$StructuralInfo$") ) ) }