diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphConcatSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphConcatSpec.scala index 5f0d2add03..bdc57e9b0a 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphConcatSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphConcatSpec.scala @@ -3,12 +3,13 @@ */ package akka.stream.scaladsl -import scala.concurrent.Promise +import scala.concurrent.{ Await, Promise } import akka.stream._ import akka.stream.scaladsl._ import akka.stream.testkit.StreamTestKit import akka.stream.testkit.TwoStreamsSetup +import scala.concurrent.duration._ class GraphConcatSpec extends TwoStreamsSetup { @@ -154,6 +155,33 @@ class GraphConcatSpec extends TwoStreamsSetup { promise.failure(TestException) subscriber.expectError(TestException) } + + "work with Source DSL" in { + val testSource = Source(1 to 5).concat(Source(6 to 10)).grouped(1000) + Await.result(testSource.runWith(Sink.head), 3.seconds) should ===(1 to 10) + + val runnable = testSource.toMat(Sink.ignore)(Keep.left) + val (m1, m2) = runnable.run() + m1.isInstanceOf[Unit] should be(true) + m2.isInstanceOf[Unit] should be(true) + + runnable.mapMaterialized((_) ⇒ "boo").run() should be("boo") + + } + + "work with Flow DSL" in { + val testFlow = Flow[Int].concat(Source(6 to 10)).grouped(1000) + Await.result(Source(1 to 5).viaMat(testFlow)(Keep.both).runWith(Sink.head), 3.seconds) should ===(1 to 10) + + val runnable = Source(1 to 5).viaMat(testFlow)(Keep.both).to(Sink.ignore) + val (m1, (m2, m3)) = runnable.run() + m1.isInstanceOf[Unit] should be(true) + m2.isInstanceOf[Unit] should be(true) + m3.isInstanceOf[Unit] should be(true) + + runnable.mapMaterialized((_) ⇒ "boo").run() should be("boo") + + } } } diff --git a/akka-stream/src/main/boilerplate/akka/stream/scaladsl/BidiFlowApply.scala.template b/akka-stream/src/main/boilerplate/akka/stream/scaladsl/BidiFlowApply.scala.template index 8a395be009..f34cecdfdc 100644 --- a/akka-stream/src/main/boilerplate/akka/stream/scaladsl/BidiFlowApply.scala.template +++ b/akka-stream/src/main/boilerplate/akka/stream/scaladsl/BidiFlowApply.scala.template @@ -22,7 +22,7 @@ trait BidiFlowApply { */ def apply[I1, O1, I2, O2, Mat](g1: Graph[Shape, Mat])(buildBlock: FlowGraph.Builder[Mat] => (g1.Shape) ⇒ BidiShape[I1, O1, I2, O2]): BidiFlow[I1, O1, I2, O2, Mat] = { val builder = new FlowGraph.Builder - val p = builder.add(g1, Keep.right) + val p = builder.add(g1) val shape = buildBlock(builder)(p) builder.buildBidiFlow(shape) } @@ -35,7 +35,7 @@ trait BidiFlowApply { buildBlock: FlowGraph.Builder[Mat] => ([#g1.Shape#]) ⇒ BidiShape[I##1, O##1, I##2, O##2]): BidiFlow[I##1, O##1, I##2, O##2, Mat] = { val builder = new FlowGraph.Builder val curried = combineMat.curried - val p##1 = builder.add(g##1, (_: Any, m##1: M##1) ⇒ curried(m##1)) + val p##1 = builder.add(g##1, (m##1: M##1) ⇒ curried(m##1)) [2..#val p1 = builder.add(g1, (f: M1 ⇒ Any, m1: M1) ⇒ f(m1))# ] val shape = buildBlock(builder)([#p1#]) diff --git a/akka-stream/src/main/boilerplate/akka/stream/scaladsl/FlowApply.scala.template b/akka-stream/src/main/boilerplate/akka/stream/scaladsl/FlowApply.scala.template index 670a7db88d..236f2ca5ff 100644 --- a/akka-stream/src/main/boilerplate/akka/stream/scaladsl/FlowApply.scala.template +++ b/akka-stream/src/main/boilerplate/akka/stream/scaladsl/FlowApply.scala.template @@ -26,7 +26,7 @@ trait FlowApply { */ def apply[I, O, Mat](g1: Graph[Shape, Mat])(buildBlock: FlowGraph.Builder[Mat] => (g1.Shape) ⇒ (Inlet[I], Outlet[O])): Flow[I, O, Mat] = { val builder = new FlowGraph.Builder - val p = builder.add(g1, Keep.right) + val p = builder.add(g1) val (inlet, outlet) = buildBlock(builder)(p) builder.buildFlow(inlet, outlet) } @@ -41,7 +41,7 @@ trait FlowApply { buildBlock: FlowGraph.Builder[Mat] => ([#g1.Shape#]) ⇒ (Inlet[I], Outlet[O])): Flow[I, O, Mat] = { val builder = new FlowGraph.Builder val curried = combineMat.curried - val p##1 = builder.add(g##1, (_: Any, m##1: M##1) ⇒ curried(m##1)) + val p##1 = builder.add(g##1, (m##1: M##1) ⇒ curried(m##1)) [2..#val p1 = builder.add(g1, (f: M1 ⇒ Any, m1: M1) ⇒ f(m1))# ] val (inlet, outlet) = buildBlock(builder)([#p1#]) diff --git a/akka-stream/src/main/boilerplate/akka/stream/scaladsl/GraphApply.scala.template b/akka-stream/src/main/boilerplate/akka/stream/scaladsl/GraphApply.scala.template index 04849804fa..3542e789bf 100644 --- a/akka-stream/src/main/boilerplate/akka/stream/scaladsl/GraphApply.scala.template +++ b/akka-stream/src/main/boilerplate/akka/stream/scaladsl/GraphApply.scala.template @@ -71,7 +71,7 @@ trait GraphApply { def closed[Mat, [#M1#]]([#g1: Graph[Shape, M1]#])(combineMat: ([#M1#]) ⇒ Mat)(buildBlock: FlowGraph.Builder[Mat] ⇒ ([#g1.Shape#]) ⇒ Unit): RunnableFlow[Mat] = { val builder = new FlowGraph.Builder val curried = combineMat.curried - val s##1 = builder.add(g##1, (_: Any, m##1: M##1) ⇒ curried(m##1)) + val s##1 = builder.add(g##1, (m##1: M##1) ⇒ curried(m##1)) [2..#val s1 = builder.add(g1, (f: M1 ⇒ Any, m1: M1) ⇒ f(m1))# ] buildBlock(builder)([#s1#]) @@ -87,7 +87,7 @@ trait GraphApply { def partial[S <: Shape, Mat, [#M1#]]([#g1: Graph[Shape, M1]#])(combineMat: ([#M1#]) ⇒ Mat)(buildBlock: FlowGraph.Builder[Mat] ⇒ ([#g1.Shape#]) ⇒ S): Graph[S, Mat] = { val builder = new FlowGraph.Builder val curried = combineMat.curried - val s##1 = builder.add(g##1, (_: Any, m##1: M##1) ⇒ curried(m##1)) + val s##1 = builder.add(g##1, (m##1: M##1) ⇒ curried(m##1)) [2..#val s1 = builder.add(g1, (f: M1 ⇒ Any, m1: M1) ⇒ f(m1))# ] val s = buildBlock(builder)([#s1#]) diff --git a/akka-stream/src/main/boilerplate/akka/stream/scaladsl/SinkApply.scala.template b/akka-stream/src/main/boilerplate/akka/stream/scaladsl/SinkApply.scala.template index d4a289bad9..889c6488a7 100644 --- a/akka-stream/src/main/boilerplate/akka/stream/scaladsl/SinkApply.scala.template +++ b/akka-stream/src/main/boilerplate/akka/stream/scaladsl/SinkApply.scala.template @@ -24,7 +24,7 @@ trait SinkApply { */ def apply[In, Mat](g1: Graph[Shape, Mat])(buildBlock: FlowGraph.Builder[Mat] => (g1.Shape) => Inlet[In]): Sink[In, Mat] = { val builder = new FlowGraph.Builder - val s = builder.add(g1, Keep.right) + val s = builder.add(g1) val inlet = buildBlock(builder)(s) builder.buildSink(inlet) } @@ -38,7 +38,7 @@ trait SinkApply { buildBlock: FlowGraph.Builder[Mat] ⇒ ([#g1.Shape#]) ⇒ Inlet[In]): Sink[In, Mat] = { val builder = new FlowGraph.Builder val curried = combineMat.curried - val s##1 = builder.add(g##1, (_: Any, m##1: M##1) ⇒ curried(m##1)) + val s##1 = builder.add(g##1, (m##1: M##1) ⇒ curried(m##1)) [2..#val s1 = builder.add(g1, (f: M1 ⇒ Any, m1: M1) ⇒ f(m1))# ] val inlet = buildBlock(builder)([#s1#]) diff --git a/akka-stream/src/main/boilerplate/akka/stream/scaladsl/SourceApply.scala.template b/akka-stream/src/main/boilerplate/akka/stream/scaladsl/SourceApply.scala.template index 93190c0582..b4af84cfca 100644 --- a/akka-stream/src/main/boilerplate/akka/stream/scaladsl/SourceApply.scala.template +++ b/akka-stream/src/main/boilerplate/akka/stream/scaladsl/SourceApply.scala.template @@ -25,7 +25,7 @@ trait SourceApply { */ def apply[Out, Mat](g1: Graph[Shape, Mat])(buildBlock: FlowGraph.Builder[Mat] => (g1.Shape) => Outlet[Out]): Source[Out, Mat] = { val builder = new FlowGraph.Builder - val p = builder.add(g1, Keep.right) + val p = builder.add(g1) val port = buildBlock(builder)(p) builder.buildSource(port) } @@ -40,7 +40,7 @@ trait SourceApply { buildBlock: FlowGraph.Builder[Mat] ⇒ ([#g1.Shape#]) ⇒ Outlet[Out]): Source[Out, Mat] = { val builder = new FlowGraph.Builder val curried = combineMat.curried - val p##1 = builder.add(g##1, (_: Any, m##1: M##1) ⇒ curried(m##1)) + val p##1 = builder.add(g##1, (m##1: M##1) ⇒ curried(m##1)) [2..#val p1 = builder.add(g1, (f: M1 ⇒ Any, m1: M1) ⇒ f(m1))# ] val port = buildBlock(builder)([#p1#]) diff --git a/akka-stream/src/main/scala/akka/stream/impl/StreamLayout.scala b/akka-stream/src/main/scala/akka/stream/impl/StreamLayout.scala index e8eb61a230..d84232d5b8 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/StreamLayout.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/StreamLayout.scala @@ -9,9 +9,9 @@ import akka.stream.impl.StreamLayout.Module import akka.stream.scaladsl.Keep import akka.stream._ import org.reactivestreams.{ Subscription, Publisher, Subscriber } -import akka.event.Logging.simpleName import scala.collection.mutable import scala.util.control.NonFatal +import akka.event.Logging.simpleName /** * INTERNAL API @@ -39,22 +39,24 @@ private[akka] object StreamLayout { */ def replaceShape(s: Shape): Module - lazy val inPorts: Set[InPort] = shape.inlets.toSet - lazy val outPorts: Set[OutPort] = shape.outlets.toSet + final lazy val inPorts: Set[InPort] = shape.inlets.toSet + final lazy val outPorts: Set[OutPort] = shape.outlets.toSet def isRunnable: Boolean = inPorts.isEmpty && outPorts.isEmpty - def isSink: Boolean = (inPorts.size == 1) && outPorts.isEmpty - def isSource: Boolean = (outPorts.size == 1) && inPorts.isEmpty - def isFlow: Boolean = (inPorts.size == 1) && (outPorts.size == 1) - def isBidiFlow: Boolean = (inPorts.size == 2) && (outPorts.size == 2) + final def isSink: Boolean = (inPorts.size == 1) && outPorts.isEmpty + final def isSource: Boolean = (outPorts.size == 1) && inPorts.isEmpty + final def isFlow: Boolean = (inPorts.size == 1) && (outPorts.size == 1) + final def isBidiFlow: Boolean = (inPorts.size == 2) && (outPorts.size == 2) + def isAtomic: Boolean = subModules.isEmpty + def isCopied: Boolean = false - def growConnect(that: Module, from: OutPort, to: InPort): Module = + final def growConnect(that: Module, from: OutPort, to: InPort): Module = growConnect(that, from, to, Keep.left) - def growConnect[A, B, C](that: Module, from: OutPort, to: InPort, f: (A, B) ⇒ C): Module = + final def growConnect[A, B, C](that: Module, from: OutPort, to: InPort, f: (A, B) ⇒ C): Module = this.grow(that, f).connect(from, to) - def connect[A, B](from: OutPort, to: InPort): Module = { + final def connect[A, B](from: OutPort, to: InPort): Module = { if (Debug) validate() require(outPorts(from), @@ -67,19 +69,21 @@ private[akka] object StreamLayout { CompositeModule( subModules, AmorphousShape(shape.inlets.filterNot(_ == to), shape.outlets.filterNot(_ == from)), - (from, to) :: connections, + downstreams.updated(from, to), + upstreams.updated(to, from), materializedValueComputation, attributes) } - def transformMaterializedValue(f: Any ⇒ Any): Module = { + final def transformMaterializedValue(f: Any ⇒ Any): Module = { if (Debug) validate() CompositeModule( - subModules = if (this.isAtomic) Set(this) else this.subModules, + subModules = if (this.isSealed) Set(this) else this.subModules, shape, - connections, - Transform(f, this.materializedValueComputation), + downstreams, + upstreams, + Transform(f, if (this.isSealed) Atomic(this) else this.materializedValueComputation), attributes) } @@ -91,16 +95,20 @@ private[akka] object StreamLayout { require(that ne this, "A module cannot be added to itself. You should pass a separate instance to grow().") require(!subModules(that), "An existing submodule cannot be added again. All contained modules must be unique.") - val modules1 = if (this.isAtomic) Set(this) else this.subModules - val modules2 = if (that.isAtomic) Set(that) else that.subModules + val modules1 = if (this.isSealed) Set(this) else this.subModules + val modules2 = if (that.isSealed) Set(that) else that.subModules + + val matComputation1 = if (this.isSealed) Atomic(this) else this.materializedValueComputation + val matComputation2 = if (that.isSealed) Atomic(that) else that.materializedValueComputation CompositeModule( modules1 ++ modules2, AmorphousShape(shape.inlets ++ that.shape.inlets, shape.outlets ++ that.shape.outlets), - connections reverse_::: that.connections, - if (f eq Keep.left) materializedValueComputation - else if (f eq Keep.right) that.materializedValueComputation - else Combine(f.asInstanceOf[(Any, Any) ⇒ Any], this.materializedValueComputation, that.materializedValueComputation), + downstreams ++ that.downstreams, + upstreams ++ that.upstreams, + if (f eq Keep.left) matComputation1 + else if (f eq Keep.right) matComputation2 + else Combine(f.asInstanceOf[(Any, Any) ⇒ Any], matComputation1, matComputation2), attributes) } @@ -110,26 +118,27 @@ private[akka] object StreamLayout { CompositeModule( subModules = Set(this), shape, - connections, + /* + * Composite modules always maintain the flattened upstreams/downstreams map (i.e. they contain all the + * layout information of all the nested modules). Copied modules break the nesting, scoping them to the + * copied module. The MaterializerSession will take care of propagating the necessary Publishers and Subscribers + * from the enclosed scope to the outer scope. + */ + downstreams, + upstreams, /* * Wrapping like this shields the outer module from the details of the - * materialized value computation of its submodules, which is important - * to keep the re-binding of identities to computation nodes manageable - * in carbonCopy. + * materialized value computation of its submodules. */ Atomic(this), OperationAttributes.none) } def subModules: Set[Module] - def isAtomic: Boolean = subModules.isEmpty + final def isSealed: Boolean = isAtomic || isCopied - /** - * A list of connections whose port-wise ordering is STABLE across carbonCopy. - */ - def connections: List[(OutPort, InPort)] = Nil - final lazy val downstreams: Map[OutPort, InPort] = connections.toMap - final lazy val upstreams: Map[InPort, OutPort] = connections.map(_.swap).toMap + def downstreams: Map[OutPort, InPort] = Map.empty + def upstreams: Map[InPort, OutPort] = Map.empty def materializedValueComputation: MaterializedValueNode = Atomic(this) def carbonCopy: Module @@ -140,7 +149,7 @@ private[akka] object StreamLayout { final override def hashCode(): Int = super.hashCode() final override def equals(obj: scala.Any): Boolean = super.equals(obj) - def validate(level: Int = 0, doPrint: Boolean = false, idMap: mutable.Map[AnyRef, Int] = mutable.Map.empty): Unit = { + final def validate(level: Int = 0, doPrint: Boolean = false, idMap: mutable.Map[AnyRef, Int] = mutable.Map.empty): Unit = { val ids = Iterator from 1 def id(obj: AnyRef) = idMap get obj match { case Some(x) ⇒ x @@ -177,12 +186,13 @@ private[akka] object StreamLayout { } if (dupIn.nonEmpty) problems ::= s"duplicate ports in submodules ${ins(dupIn)}" if (dupOut.nonEmpty) problems ::= s"duplicate ports in submodules ${outs(dupOut)}" - if (!isAtomic && (inset -- allIn).nonEmpty) problems ::= s"foreign inlets ${ins(inset -- allIn)}" - if (!isAtomic && (outset -- allOut).nonEmpty) problems ::= s"foreign outlets ${outs(outset -- allOut)}" + if (!isSealed && (inset -- allIn).nonEmpty) problems ::= s"foreign inlets ${ins(inset -- allIn)}" + if (!isSealed && (outset -- allOut).nonEmpty) problems ::= s"foreign outlets ${outs(outset -- allOut)}" val unIn = allIn -- inset -- upstreams.keySet - if (unIn.nonEmpty) problems ::= s"unconnected inlets ${ins(unIn)}" + if (unIn.nonEmpty && !isCopied) problems ::= s"unconnected inlets ${ins(unIn)}" val unOut = allOut -- outset -- downstreams.keySet - if (unOut.nonEmpty) problems ::= s"unconnected outlets ${outs(unOut)}" + if (unOut.nonEmpty && !isCopied) problems ::= s"unconnected outlets ${outs(unOut)}" + def atomics(n: MaterializedValueNode): Set[Module] = n match { case Ignore ⇒ Set.empty @@ -215,6 +225,10 @@ private[akka] object StreamLayout { else throw new UnsupportedOperationException("cannot replace the shape of the EmptyModule") override def grow(that: Module): Module = that + + override def grow[A, B, C](that: Module, f: (A, B) ⇒ C): Module = + throw new UnsupportedOperationException("It is invalid to combine materialized value with EmptyModule") + override def wrap(): Module = this override def subModules: Set[Module] = Set.empty @@ -230,10 +244,30 @@ private[akka] object StreamLayout { override def materializedValueComputation: MaterializedValueNode = Ignore } + final case class CopiedModule(shape: Shape, attributes: OperationAttributes, copyOf: Module) extends Module { + override val subModules: Set[Module] = Set(copyOf) + + override def withAttributes(attr: OperationAttributes): Module = this.copy(attributes = attr) + + override def carbonCopy: Module = this.copy(shape = shape.deepCopy()) + + override def replaceShape(s: Shape): Module = { + shape.requireSamePortsAs(s) + copy(shape = s) + } + + override val materializedValueComputation: MaterializedValueNode = Atomic(copyOf) + + override def isCopied: Boolean = true + + override def toString: String = "copy of " + copyOf.toString + } + final case class CompositeModule( subModules: Set[Module], shape: Shape, - override val connections: List[(OutPort, InPort)], + override val downstreams: Map[OutPort, InPort], + override val upstreams: Map[InPort, OutPort], override val materializedValueComputation: MaterializedValueNode, attributes: OperationAttributes) extends Module { @@ -242,51 +276,14 @@ private[akka] object StreamLayout { copy(shape = s) } - override def carbonCopy: Module = { - val out = mutable.Map[OutPort, OutPort]() - val in = mutable.Map[InPort, InPort]() - val subMap = mutable.Map[Module, Module]() - - val subs = subModules map { s ⇒ - val n = s.carbonCopy - out ++= s.shape.outlets.zip(n.shape.outlets) - in ++= s.shape.inlets.zip(n.shape.inlets) - s.connections.zip(n.connections) foreach { - case ((oldOut, oldIn), (newOut, newIn)) ⇒ - out(oldOut) = newOut - in(oldIn) = newIn - } - subMap(s) = n - n - } - - val newShape = shape.copyFromPorts(shape.inlets.map(in.asInstanceOf[Inlet[_] ⇒ Inlet[_]]), - shape.outlets.map(out.asInstanceOf[Outlet[_] ⇒ Outlet[_]])) - - val conn = connections.map(p ⇒ (out(p._1), in(p._2))) - - def mapComp(n: MaterializedValueNode): MaterializedValueNode = - n match { - case Ignore ⇒ Ignore - case Transform(f, dep) ⇒ Transform(f, mapComp(dep)) - case Atomic(mod) ⇒ Atomic(subMap(mod)) - case Combine(f, left, right) ⇒ Combine(f, mapComp(left), mapComp(right)) - } - val comp = - try mapComp(materializedValueComputation) - catch { - case so: StackOverflowError ⇒ - throw new UnsupportedOperationException("materialized value computation is too complex, please group into sub-graphs") - } - - copy(subModules = subs, shape = newShape, connections = conn, materializedValueComputation = comp) - } + override def carbonCopy: Module = CopiedModule(shape.deepCopy(), attributes, copyOf = this) override def withAttributes(attributes: OperationAttributes): Module = copy(attributes = attributes) override def toString = s""" - | Modules: ${subModules.toSeq.map(m ⇒ " " + m.getClass.getName).mkString("\n")} + | Module: ${this.attributes.nameOption.getOrElse("unnamed")} + | Modules: ${subModules.toSeq.map(m ⇒ " " + m.attributes.nameOption.getOrElse(m.getClass.getName)).mkString("\n")} | Downstreams: | ${downstreams.map { case (in, out) ⇒ s" $in -> $out" }.mkString("\n")} | Upstreams: @@ -413,7 +410,7 @@ private[stream] class MaterializedValuePublisher extends Publisher[Any] { * B-GET - pushAndClose fires here * A-GET - pushAndClose fires here * - * The proof that there are no cases: + * The proof that there are no other cases: * * - all permutations of 4 operations are 4! = 24 * - the operations of A and B are cannot be reordered, so there are 24 / (2 * 2) = 6 actual orderings @@ -468,8 +465,58 @@ private[stream] class MaterializedValuePublisher extends Publisher[Any] { private[stream] abstract class MaterializerSession(val topLevel: StreamLayout.Module) { import StreamLayout._ - private val subscribers = collection.mutable.HashMap[InPort, Subscriber[Any]]().withDefaultValue(null) - private val publishers = collection.mutable.HashMap[OutPort, Publisher[Any]]().withDefaultValue(null) + private var subscribersStack: List[mutable.Map[InPort, Subscriber[Any]]] = + mutable.Map.empty[InPort, Subscriber[Any]].withDefaultValue(null) :: Nil + private var publishersStack: List[mutable.Map[OutPort, Publisher[Any]]] = + mutable.Map.empty[OutPort, Publisher[Any]].withDefaultValue(null) :: Nil + + /* + * Please note that this stack keeps track of the scoped modules wrapped in CopiedModule but not the CopiedModule + * itself. The reason is that the CopiedModule itself is only needed for the enterScope and exitScope methods but + * not elsewhere. For this reason they are just simply passed as parameters to those methods. + * + * The reason why the encapsulated (copied) modules are stored as mutable state to save subclasses of this class + * from passing the current scope around or even knowing about it. + */ + private var moduleStack: List[Module] = topLevel :: Nil + + private def subscribers: mutable.Map[InPort, Subscriber[Any]] = subscribersStack.head + private def publishers: mutable.Map[OutPort, Publisher[Any]] = publishersStack.head + private def currentLayout: Module = moduleStack.head + + // Enters a copied module and establishes a scope that prevents internals to leak out and interfere with copies + // of the same module. + // We don't store the enclosing CopiedModule itself as state since we don't use it anywhere else than exit and enter + private def enterScope(enclosing: CopiedModule): Unit = { + subscribersStack ::= mutable.Map.empty.withDefaultValue(null) + publishersStack ::= mutable.Map.empty.withDefaultValue(null) + moduleStack ::= enclosing.copyOf + } + + // Exits the scope of the copied module and propagates Publishers/Subscribers to the enclosing scope assigning + // them to the copied ports instead of the original ones (since there might be multiple copies of the same module + // leading to port identity collisions) + // We don't store the enclosing CopiedModule itself as state since we don't use it anywhere else than exit and enter + private def exitScope(enclosing: CopiedModule): Unit = { + val scopeSubscribers = subscribers + val scopePublishers = publishers + subscribersStack = subscribersStack.tail + publishersStack = publishersStack.tail + moduleStack = moduleStack.tail + + // When we exit the scope of a copied module, pick up the Subscribers/Publishers belonging to exposed ports of + // the original module and assign them to the copy ports in the outer scope that we will return to + enclosing.copyOf.shape.inlets.iterator.zip(enclosing.shape.inlets.iterator).foreach { + case (original, exposed) ⇒ + assignPort(exposed, scopeSubscribers(original)) + } + + enclosing.copyOf.shape.outlets.iterator.zip(enclosing.shape.outlets.iterator).foreach { + case (original, exposed) ⇒ + assignPort(exposed, scopePublishers(original)) + } + + } final def materialize(): Any = { require(topLevel ne EmptyModule, "An empty module cannot be materialized (EmptyModule was given)") @@ -495,6 +542,10 @@ private[stream] abstract class MaterializerSession(val topLevel: StreamLayout.Mo assignPort(mv.shape.outlet, pub) case atomic if atomic.isAtomic ⇒ materializedValues.put(atomic, materializeAtomic(atomic, subEffectiveAttributes)) + case copied: CopiedModule ⇒ + enterScope(copied) + materializedValues.put(copied, materializeModule(copied, subEffectiveAttributes)) + exitScope(copied) case composite ⇒ materializedValues.put(composite, materializeComposite(composite, subEffectiveAttributes)) } @@ -524,15 +575,21 @@ private[stream] abstract class MaterializerSession(val topLevel: StreamLayout.Mo } final protected def assignPort(in: InPort, subscriber: Subscriber[Any]): Unit = { - subscribers.put(in, subscriber) - val publisher = publishers(topLevel.upstreams(in)) - if (publisher ne null) attach(publisher, subscriber) + subscribers(in) = subscriber + // Interface (unconnected) ports of the current scope will be wired when exiting the scope + if (!currentLayout.inPorts(in)) { + val publisher = publishers(currentLayout.upstreams(in)) + if (publisher ne null) attach(publisher, subscriber) + } } final protected def assignPort(out: OutPort, publisher: Publisher[Any]): Unit = { - publishers.put(out, publisher) - val subscriber = subscribers(topLevel.downstreams(out)) - if (subscriber ne null) attach(publisher, subscriber) + publishers(out) = publisher + // Interface (unconnected) ports of the current scope will be wired when exiting the scope + if (!currentLayout.outPorts(out)) { + val subscriber = subscribers(currentLayout.downstreams(out)) + if (subscriber ne null) attach(publisher, subscriber) + } } } diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala index 072f0cc965..db678b0653 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala @@ -65,7 +65,7 @@ final class Flow[-In, +Out, +Mat](private[stream] override val module: Module) * flow into the materialized value of the resulting Flow. */ def viaMat[T, Mat2, Mat3](flow: Flow[Out, T, Mat2])(combine: (Mat, Mat2) ⇒ Mat3): Flow[In, T, Mat3] = { - if (this.isIdentity) flow.asInstanceOf[Flow[In, T, Mat3]] + if (this.isIdentity) flow.asInstanceOf[Flow[In, T, Mat2]].mapMaterialized(combine(().asInstanceOf[Mat], _)) else { val flowCopy = flow.module.carbonCopy new Flow( diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Graph.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Graph.scala index f03c8ace5d..afd8415837 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Graph.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Graph.scala @@ -259,6 +259,19 @@ object FlowGraph extends GraphApply { graph.shape.copyFromPorts(copy.shape.inlets, copy.shape.outlets).asInstanceOf[S] } + /** + * INTERNAL API. + * + * This is only used by the materialization-importing apply methods of Source, + * Flow, Sink and Graph. + */ + private[stream] def add[S <: Shape, A](graph: Graph[S, _], transform: (A) ⇒ Any): S = { + if (StreamLayout.Debug) graph.module.validate() + val copy = graph.module.carbonCopy + moduleInProgress = moduleInProgress.grow(copy.transformMaterializedValue(transform.asInstanceOf[Any ⇒ Any])) + graph.shape.copyFromPorts(copy.shape.inlets, copy.shape.outlets).asInstanceOf[S] + } + /** * INTERNAL API. *