=str 16940: Implement shallow copy of layouts

This commit is contained in:
Endre Sándor Varga 2015-04-23 13:32:07 +02:00
parent 33919f683c
commit 82a7f13a02
9 changed files with 197 additions and 99 deletions

View file

@ -3,12 +3,13 @@
*/
package akka.stream.scaladsl
import scala.concurrent.Promise
import scala.concurrent.{ Await, Promise }
import akka.stream._
import akka.stream.scaladsl._
import akka.stream.testkit.StreamTestKit
import akka.stream.testkit.TwoStreamsSetup
import scala.concurrent.duration._
class GraphConcatSpec extends TwoStreamsSetup {
@ -154,6 +155,33 @@ class GraphConcatSpec extends TwoStreamsSetup {
promise.failure(TestException)
subscriber.expectError(TestException)
}
"work with Source DSL" in {
val testSource = Source(1 to 5).concat(Source(6 to 10)).grouped(1000)
Await.result(testSource.runWith(Sink.head), 3.seconds) should ===(1 to 10)
val runnable = testSource.toMat(Sink.ignore)(Keep.left)
val (m1, m2) = runnable.run()
m1.isInstanceOf[Unit] should be(true)
m2.isInstanceOf[Unit] should be(true)
runnable.mapMaterialized((_) "boo").run() should be("boo")
}
"work with Flow DSL" in {
val testFlow = Flow[Int].concat(Source(6 to 10)).grouped(1000)
Await.result(Source(1 to 5).viaMat(testFlow)(Keep.both).runWith(Sink.head), 3.seconds) should ===(1 to 10)
val runnable = Source(1 to 5).viaMat(testFlow)(Keep.both).to(Sink.ignore)
val (m1, (m2, m3)) = runnable.run()
m1.isInstanceOf[Unit] should be(true)
m2.isInstanceOf[Unit] should be(true)
m3.isInstanceOf[Unit] should be(true)
runnable.mapMaterialized((_) "boo").run() should be("boo")
}
}
}

View file

@ -22,7 +22,7 @@ trait BidiFlowApply {
*/
def apply[I1, O1, I2, O2, Mat](g1: Graph[Shape, Mat])(buildBlock: FlowGraph.Builder[Mat] => (g1.Shape) ⇒ BidiShape[I1, O1, I2, O2]): BidiFlow[I1, O1, I2, O2, Mat] = {
val builder = new FlowGraph.Builder
val p = builder.add(g1, Keep.right)
val p = builder.add(g1)
val shape = buildBlock(builder)(p)
builder.buildBidiFlow(shape)
}
@ -35,7 +35,7 @@ trait BidiFlowApply {
buildBlock: FlowGraph.Builder[Mat] => ([#g1.Shape#]) ⇒ BidiShape[I##1, O##1, I##2, O##2]): BidiFlow[I##1, O##1, I##2, O##2, Mat] = {
val builder = new FlowGraph.Builder
val curried = combineMat.curried
val p##1 = builder.add(g##1, (_: Any, m##1: M##1) ⇒ curried(m##1))
val p##1 = builder.add(g##1, (m##1: M##1) ⇒ curried(m##1))
[2..#val p1 = builder.add(g1, (f: M1 ⇒ Any, m1: M1) ⇒ f(m1))#
]
val shape = buildBlock(builder)([#p1#])

View file

@ -26,7 +26,7 @@ trait FlowApply {
*/
def apply[I, O, Mat](g1: Graph[Shape, Mat])(buildBlock: FlowGraph.Builder[Mat] => (g1.Shape) ⇒ (Inlet[I], Outlet[O])): Flow[I, O, Mat] = {
val builder = new FlowGraph.Builder
val p = builder.add(g1, Keep.right)
val p = builder.add(g1)
val (inlet, outlet) = buildBlock(builder)(p)
builder.buildFlow(inlet, outlet)
}
@ -41,7 +41,7 @@ trait FlowApply {
buildBlock: FlowGraph.Builder[Mat] => ([#g1.Shape#]) ⇒ (Inlet[I], Outlet[O])): Flow[I, O, Mat] = {
val builder = new FlowGraph.Builder
val curried = combineMat.curried
val p##1 = builder.add(g##1, (_: Any, m##1: M##1) ⇒ curried(m##1))
val p##1 = builder.add(g##1, (m##1: M##1) ⇒ curried(m##1))
[2..#val p1 = builder.add(g1, (f: M1 ⇒ Any, m1: M1) ⇒ f(m1))#
]
val (inlet, outlet) = buildBlock(builder)([#p1#])

View file

@ -71,7 +71,7 @@ trait GraphApply {
def closed[Mat, [#M1#]]([#g1: Graph[Shape, M1]#])(combineMat: ([#M1#]) ⇒ Mat)(buildBlock: FlowGraph.Builder[Mat] ⇒ ([#g1.Shape#]) ⇒ Unit): RunnableFlow[Mat] = {
val builder = new FlowGraph.Builder
val curried = combineMat.curried
val s##1 = builder.add(g##1, (_: Any, m##1: M##1) ⇒ curried(m##1))
val s##1 = builder.add(g##1, (m##1: M##1) ⇒ curried(m##1))
[2..#val s1 = builder.add(g1, (f: M1 ⇒ Any, m1: M1) ⇒ f(m1))#
]
buildBlock(builder)([#s1#])
@ -87,7 +87,7 @@ trait GraphApply {
def partial[S <: Shape, Mat, [#M1#]]([#g1: Graph[Shape, M1]#])(combineMat: ([#M1#]) ⇒ Mat)(buildBlock: FlowGraph.Builder[Mat] ⇒ ([#g1.Shape#]) ⇒ S): Graph[S, Mat] = {
val builder = new FlowGraph.Builder
val curried = combineMat.curried
val s##1 = builder.add(g##1, (_: Any, m##1: M##1) ⇒ curried(m##1))
val s##1 = builder.add(g##1, (m##1: M##1) ⇒ curried(m##1))
[2..#val s1 = builder.add(g1, (f: M1 ⇒ Any, m1: M1) ⇒ f(m1))#
]
val s = buildBlock(builder)([#s1#])

View file

@ -24,7 +24,7 @@ trait SinkApply {
*/
def apply[In, Mat](g1: Graph[Shape, Mat])(buildBlock: FlowGraph.Builder[Mat] => (g1.Shape) => Inlet[In]): Sink[In, Mat] = {
val builder = new FlowGraph.Builder
val s = builder.add(g1, Keep.right)
val s = builder.add(g1)
val inlet = buildBlock(builder)(s)
builder.buildSink(inlet)
}
@ -38,7 +38,7 @@ trait SinkApply {
buildBlock: FlowGraph.Builder[Mat] ⇒ ([#g1.Shape#]) ⇒ Inlet[In]): Sink[In, Mat] = {
val builder = new FlowGraph.Builder
val curried = combineMat.curried
val s##1 = builder.add(g##1, (_: Any, m##1: M##1) ⇒ curried(m##1))
val s##1 = builder.add(g##1, (m##1: M##1) ⇒ curried(m##1))
[2..#val s1 = builder.add(g1, (f: M1 ⇒ Any, m1: M1) ⇒ f(m1))#
]
val inlet = buildBlock(builder)([#s1#])

View file

@ -25,7 +25,7 @@ trait SourceApply {
*/
def apply[Out, Mat](g1: Graph[Shape, Mat])(buildBlock: FlowGraph.Builder[Mat] => (g1.Shape) => Outlet[Out]): Source[Out, Mat] = {
val builder = new FlowGraph.Builder
val p = builder.add(g1, Keep.right)
val p = builder.add(g1)
val port = buildBlock(builder)(p)
builder.buildSource(port)
}
@ -40,7 +40,7 @@ trait SourceApply {
buildBlock: FlowGraph.Builder[Mat] ⇒ ([#g1.Shape#]) ⇒ Outlet[Out]): Source[Out, Mat] = {
val builder = new FlowGraph.Builder
val curried = combineMat.curried
val p##1 = builder.add(g##1, (_: Any, m##1: M##1) ⇒ curried(m##1))
val p##1 = builder.add(g##1, (m##1: M##1) ⇒ curried(m##1))
[2..#val p1 = builder.add(g1, (f: M1 ⇒ Any, m1: M1) ⇒ f(m1))#
]
val port = buildBlock(builder)([#p1#])

View file

@ -9,9 +9,9 @@ import akka.stream.impl.StreamLayout.Module
import akka.stream.scaladsl.Keep
import akka.stream._
import org.reactivestreams.{ Subscription, Publisher, Subscriber }
import akka.event.Logging.simpleName
import scala.collection.mutable
import scala.util.control.NonFatal
import akka.event.Logging.simpleName
/**
* INTERNAL API
@ -39,22 +39,24 @@ private[akka] object StreamLayout {
*/
def replaceShape(s: Shape): Module
lazy val inPorts: Set[InPort] = shape.inlets.toSet
lazy val outPorts: Set[OutPort] = shape.outlets.toSet
final lazy val inPorts: Set[InPort] = shape.inlets.toSet
final lazy val outPorts: Set[OutPort] = shape.outlets.toSet
def isRunnable: Boolean = inPorts.isEmpty && outPorts.isEmpty
def isSink: Boolean = (inPorts.size == 1) && outPorts.isEmpty
def isSource: Boolean = (outPorts.size == 1) && inPorts.isEmpty
def isFlow: Boolean = (inPorts.size == 1) && (outPorts.size == 1)
def isBidiFlow: Boolean = (inPorts.size == 2) && (outPorts.size == 2)
final def isSink: Boolean = (inPorts.size == 1) && outPorts.isEmpty
final def isSource: Boolean = (outPorts.size == 1) && inPorts.isEmpty
final def isFlow: Boolean = (inPorts.size == 1) && (outPorts.size == 1)
final def isBidiFlow: Boolean = (inPorts.size == 2) && (outPorts.size == 2)
def isAtomic: Boolean = subModules.isEmpty
def isCopied: Boolean = false
def growConnect(that: Module, from: OutPort, to: InPort): Module =
final def growConnect(that: Module, from: OutPort, to: InPort): Module =
growConnect(that, from, to, Keep.left)
def growConnect[A, B, C](that: Module, from: OutPort, to: InPort, f: (A, B) C): Module =
final def growConnect[A, B, C](that: Module, from: OutPort, to: InPort, f: (A, B) C): Module =
this.grow(that, f).connect(from, to)
def connect[A, B](from: OutPort, to: InPort): Module = {
final def connect[A, B](from: OutPort, to: InPort): Module = {
if (Debug) validate()
require(outPorts(from),
@ -67,19 +69,21 @@ private[akka] object StreamLayout {
CompositeModule(
subModules,
AmorphousShape(shape.inlets.filterNot(_ == to), shape.outlets.filterNot(_ == from)),
(from, to) :: connections,
downstreams.updated(from, to),
upstreams.updated(to, from),
materializedValueComputation,
attributes)
}
def transformMaterializedValue(f: Any Any): Module = {
final def transformMaterializedValue(f: Any Any): Module = {
if (Debug) validate()
CompositeModule(
subModules = if (this.isAtomic) Set(this) else this.subModules,
subModules = if (this.isSealed) Set(this) else this.subModules,
shape,
connections,
Transform(f, this.materializedValueComputation),
downstreams,
upstreams,
Transform(f, if (this.isSealed) Atomic(this) else this.materializedValueComputation),
attributes)
}
@ -91,16 +95,20 @@ private[akka] object StreamLayout {
require(that ne this, "A module cannot be added to itself. You should pass a separate instance to grow().")
require(!subModules(that), "An existing submodule cannot be added again. All contained modules must be unique.")
val modules1 = if (this.isAtomic) Set(this) else this.subModules
val modules2 = if (that.isAtomic) Set(that) else that.subModules
val modules1 = if (this.isSealed) Set(this) else this.subModules
val modules2 = if (that.isSealed) Set(that) else that.subModules
val matComputation1 = if (this.isSealed) Atomic(this) else this.materializedValueComputation
val matComputation2 = if (that.isSealed) Atomic(that) else that.materializedValueComputation
CompositeModule(
modules1 ++ modules2,
AmorphousShape(shape.inlets ++ that.shape.inlets, shape.outlets ++ that.shape.outlets),
connections reverse_::: that.connections,
if (f eq Keep.left) materializedValueComputation
else if (f eq Keep.right) that.materializedValueComputation
else Combine(f.asInstanceOf[(Any, Any) Any], this.materializedValueComputation, that.materializedValueComputation),
downstreams ++ that.downstreams,
upstreams ++ that.upstreams,
if (f eq Keep.left) matComputation1
else if (f eq Keep.right) matComputation2
else Combine(f.asInstanceOf[(Any, Any) Any], matComputation1, matComputation2),
attributes)
}
@ -110,26 +118,27 @@ private[akka] object StreamLayout {
CompositeModule(
subModules = Set(this),
shape,
connections,
/*
* Composite modules always maintain the flattened upstreams/downstreams map (i.e. they contain all the
* layout information of all the nested modules). Copied modules break the nesting, scoping them to the
* copied module. The MaterializerSession will take care of propagating the necessary Publishers and Subscribers
* from the enclosed scope to the outer scope.
*/
downstreams,
upstreams,
/*
* Wrapping like this shields the outer module from the details of the
* materialized value computation of its submodules, which is important
* to keep the re-binding of identities to computation nodes manageable
* in carbonCopy.
* materialized value computation of its submodules.
*/
Atomic(this),
OperationAttributes.none)
}
def subModules: Set[Module]
def isAtomic: Boolean = subModules.isEmpty
final def isSealed: Boolean = isAtomic || isCopied
/**
* A list of connections whose port-wise ordering is STABLE across carbonCopy.
*/
def connections: List[(OutPort, InPort)] = Nil
final lazy val downstreams: Map[OutPort, InPort] = connections.toMap
final lazy val upstreams: Map[InPort, OutPort] = connections.map(_.swap).toMap
def downstreams: Map[OutPort, InPort] = Map.empty
def upstreams: Map[InPort, OutPort] = Map.empty
def materializedValueComputation: MaterializedValueNode = Atomic(this)
def carbonCopy: Module
@ -140,7 +149,7 @@ private[akka] object StreamLayout {
final override def hashCode(): Int = super.hashCode()
final override def equals(obj: scala.Any): Boolean = super.equals(obj)
def validate(level: Int = 0, doPrint: Boolean = false, idMap: mutable.Map[AnyRef, Int] = mutable.Map.empty): Unit = {
final def validate(level: Int = 0, doPrint: Boolean = false, idMap: mutable.Map[AnyRef, Int] = mutable.Map.empty): Unit = {
val ids = Iterator from 1
def id(obj: AnyRef) = idMap get obj match {
case Some(x) x
@ -177,12 +186,13 @@ private[akka] object StreamLayout {
}
if (dupIn.nonEmpty) problems ::= s"duplicate ports in submodules ${ins(dupIn)}"
if (dupOut.nonEmpty) problems ::= s"duplicate ports in submodules ${outs(dupOut)}"
if (!isAtomic && (inset -- allIn).nonEmpty) problems ::= s"foreign inlets ${ins(inset -- allIn)}"
if (!isAtomic && (outset -- allOut).nonEmpty) problems ::= s"foreign outlets ${outs(outset -- allOut)}"
if (!isSealed && (inset -- allIn).nonEmpty) problems ::= s"foreign inlets ${ins(inset -- allIn)}"
if (!isSealed && (outset -- allOut).nonEmpty) problems ::= s"foreign outlets ${outs(outset -- allOut)}"
val unIn = allIn -- inset -- upstreams.keySet
if (unIn.nonEmpty) problems ::= s"unconnected inlets ${ins(unIn)}"
if (unIn.nonEmpty && !isCopied) problems ::= s"unconnected inlets ${ins(unIn)}"
val unOut = allOut -- outset -- downstreams.keySet
if (unOut.nonEmpty) problems ::= s"unconnected outlets ${outs(unOut)}"
if (unOut.nonEmpty && !isCopied) problems ::= s"unconnected outlets ${outs(unOut)}"
def atomics(n: MaterializedValueNode): Set[Module] =
n match {
case Ignore Set.empty
@ -215,6 +225,10 @@ private[akka] object StreamLayout {
else throw new UnsupportedOperationException("cannot replace the shape of the EmptyModule")
override def grow(that: Module): Module = that
override def grow[A, B, C](that: Module, f: (A, B) C): Module =
throw new UnsupportedOperationException("It is invalid to combine materialized value with EmptyModule")
override def wrap(): Module = this
override def subModules: Set[Module] = Set.empty
@ -230,10 +244,30 @@ private[akka] object StreamLayout {
override def materializedValueComputation: MaterializedValueNode = Ignore
}
final case class CopiedModule(shape: Shape, attributes: OperationAttributes, copyOf: Module) extends Module {
override val subModules: Set[Module] = Set(copyOf)
override def withAttributes(attr: OperationAttributes): Module = this.copy(attributes = attr)
override def carbonCopy: Module = this.copy(shape = shape.deepCopy())
override def replaceShape(s: Shape): Module = {
shape.requireSamePortsAs(s)
copy(shape = s)
}
override val materializedValueComputation: MaterializedValueNode = Atomic(copyOf)
override def isCopied: Boolean = true
override def toString: String = "copy of " + copyOf.toString
}
final case class CompositeModule(
subModules: Set[Module],
shape: Shape,
override val connections: List[(OutPort, InPort)],
override val downstreams: Map[OutPort, InPort],
override val upstreams: Map[InPort, OutPort],
override val materializedValueComputation: MaterializedValueNode,
attributes: OperationAttributes) extends Module {
@ -242,51 +276,14 @@ private[akka] object StreamLayout {
copy(shape = s)
}
override def carbonCopy: Module = {
val out = mutable.Map[OutPort, OutPort]()
val in = mutable.Map[InPort, InPort]()
val subMap = mutable.Map[Module, Module]()
val subs = subModules map { s
val n = s.carbonCopy
out ++= s.shape.outlets.zip(n.shape.outlets)
in ++= s.shape.inlets.zip(n.shape.inlets)
s.connections.zip(n.connections) foreach {
case ((oldOut, oldIn), (newOut, newIn))
out(oldOut) = newOut
in(oldIn) = newIn
}
subMap(s) = n
n
}
val newShape = shape.copyFromPorts(shape.inlets.map(in.asInstanceOf[Inlet[_] Inlet[_]]),
shape.outlets.map(out.asInstanceOf[Outlet[_] Outlet[_]]))
val conn = connections.map(p (out(p._1), in(p._2)))
def mapComp(n: MaterializedValueNode): MaterializedValueNode =
n match {
case Ignore Ignore
case Transform(f, dep) Transform(f, mapComp(dep))
case Atomic(mod) Atomic(subMap(mod))
case Combine(f, left, right) Combine(f, mapComp(left), mapComp(right))
}
val comp =
try mapComp(materializedValueComputation)
catch {
case so: StackOverflowError
throw new UnsupportedOperationException("materialized value computation is too complex, please group into sub-graphs")
}
copy(subModules = subs, shape = newShape, connections = conn, materializedValueComputation = comp)
}
override def carbonCopy: Module = CopiedModule(shape.deepCopy(), attributes, copyOf = this)
override def withAttributes(attributes: OperationAttributes): Module = copy(attributes = attributes)
override def toString =
s"""
| Modules: ${subModules.toSeq.map(m " " + m.getClass.getName).mkString("\n")}
| Module: ${this.attributes.nameOption.getOrElse("unnamed")}
| Modules: ${subModules.toSeq.map(m " " + m.attributes.nameOption.getOrElse(m.getClass.getName)).mkString("\n")}
| Downstreams:
| ${downstreams.map { case (in, out) s" $in -> $out" }.mkString("\n")}
| Upstreams:
@ -413,7 +410,7 @@ private[stream] class MaterializedValuePublisher extends Publisher[Any] {
* B-GET - pushAndClose fires here
* A-GET - pushAndClose fires here
*
* The proof that there are no cases:
* The proof that there are no other cases:
*
* - all permutations of 4 operations are 4! = 24
* - the operations of A and B are cannot be reordered, so there are 24 / (2 * 2) = 6 actual orderings
@ -468,8 +465,58 @@ private[stream] class MaterializedValuePublisher extends Publisher[Any] {
private[stream] abstract class MaterializerSession(val topLevel: StreamLayout.Module) {
import StreamLayout._
private val subscribers = collection.mutable.HashMap[InPort, Subscriber[Any]]().withDefaultValue(null)
private val publishers = collection.mutable.HashMap[OutPort, Publisher[Any]]().withDefaultValue(null)
private var subscribersStack: List[mutable.Map[InPort, Subscriber[Any]]] =
mutable.Map.empty[InPort, Subscriber[Any]].withDefaultValue(null) :: Nil
private var publishersStack: List[mutable.Map[OutPort, Publisher[Any]]] =
mutable.Map.empty[OutPort, Publisher[Any]].withDefaultValue(null) :: Nil
/*
* Please note that this stack keeps track of the scoped modules wrapped in CopiedModule but not the CopiedModule
* itself. The reason is that the CopiedModule itself is only needed for the enterScope and exitScope methods but
* not elsewhere. For this reason they are just simply passed as parameters to those methods.
*
* The reason why the encapsulated (copied) modules are stored as mutable state to save subclasses of this class
* from passing the current scope around or even knowing about it.
*/
private var moduleStack: List[Module] = topLevel :: Nil
private def subscribers: mutable.Map[InPort, Subscriber[Any]] = subscribersStack.head
private def publishers: mutable.Map[OutPort, Publisher[Any]] = publishersStack.head
private def currentLayout: Module = moduleStack.head
// Enters a copied module and establishes a scope that prevents internals to leak out and interfere with copies
// of the same module.
// We don't store the enclosing CopiedModule itself as state since we don't use it anywhere else than exit and enter
private def enterScope(enclosing: CopiedModule): Unit = {
subscribersStack ::= mutable.Map.empty.withDefaultValue(null)
publishersStack ::= mutable.Map.empty.withDefaultValue(null)
moduleStack ::= enclosing.copyOf
}
// Exits the scope of the copied module and propagates Publishers/Subscribers to the enclosing scope assigning
// them to the copied ports instead of the original ones (since there might be multiple copies of the same module
// leading to port identity collisions)
// We don't store the enclosing CopiedModule itself as state since we don't use it anywhere else than exit and enter
private def exitScope(enclosing: CopiedModule): Unit = {
val scopeSubscribers = subscribers
val scopePublishers = publishers
subscribersStack = subscribersStack.tail
publishersStack = publishersStack.tail
moduleStack = moduleStack.tail
// When we exit the scope of a copied module, pick up the Subscribers/Publishers belonging to exposed ports of
// the original module and assign them to the copy ports in the outer scope that we will return to
enclosing.copyOf.shape.inlets.iterator.zip(enclosing.shape.inlets.iterator).foreach {
case (original, exposed)
assignPort(exposed, scopeSubscribers(original))
}
enclosing.copyOf.shape.outlets.iterator.zip(enclosing.shape.outlets.iterator).foreach {
case (original, exposed)
assignPort(exposed, scopePublishers(original))
}
}
final def materialize(): Any = {
require(topLevel ne EmptyModule, "An empty module cannot be materialized (EmptyModule was given)")
@ -495,6 +542,10 @@ private[stream] abstract class MaterializerSession(val topLevel: StreamLayout.Mo
assignPort(mv.shape.outlet, pub)
case atomic if atomic.isAtomic
materializedValues.put(atomic, materializeAtomic(atomic, subEffectiveAttributes))
case copied: CopiedModule
enterScope(copied)
materializedValues.put(copied, materializeModule(copied, subEffectiveAttributes))
exitScope(copied)
case composite
materializedValues.put(composite, materializeComposite(composite, subEffectiveAttributes))
}
@ -524,15 +575,21 @@ private[stream] abstract class MaterializerSession(val topLevel: StreamLayout.Mo
}
final protected def assignPort(in: InPort, subscriber: Subscriber[Any]): Unit = {
subscribers.put(in, subscriber)
val publisher = publishers(topLevel.upstreams(in))
if (publisher ne null) attach(publisher, subscriber)
subscribers(in) = subscriber
// Interface (unconnected) ports of the current scope will be wired when exiting the scope
if (!currentLayout.inPorts(in)) {
val publisher = publishers(currentLayout.upstreams(in))
if (publisher ne null) attach(publisher, subscriber)
}
}
final protected def assignPort(out: OutPort, publisher: Publisher[Any]): Unit = {
publishers.put(out, publisher)
val subscriber = subscribers(topLevel.downstreams(out))
if (subscriber ne null) attach(publisher, subscriber)
publishers(out) = publisher
// Interface (unconnected) ports of the current scope will be wired when exiting the scope
if (!currentLayout.outPorts(out)) {
val subscriber = subscribers(currentLayout.downstreams(out))
if (subscriber ne null) attach(publisher, subscriber)
}
}
}

View file

@ -65,7 +65,7 @@ final class Flow[-In, +Out, +Mat](private[stream] override val module: Module)
* flow into the materialized value of the resulting Flow.
*/
def viaMat[T, Mat2, Mat3](flow: Flow[Out, T, Mat2])(combine: (Mat, Mat2) Mat3): Flow[In, T, Mat3] = {
if (this.isIdentity) flow.asInstanceOf[Flow[In, T, Mat3]]
if (this.isIdentity) flow.asInstanceOf[Flow[In, T, Mat2]].mapMaterialized(combine(().asInstanceOf[Mat], _))
else {
val flowCopy = flow.module.carbonCopy
new Flow(

View file

@ -259,6 +259,19 @@ object FlowGraph extends GraphApply {
graph.shape.copyFromPorts(copy.shape.inlets, copy.shape.outlets).asInstanceOf[S]
}
/**
* INTERNAL API.
*
* This is only used by the materialization-importing apply methods of Source,
* Flow, Sink and Graph.
*/
private[stream] def add[S <: Shape, A](graph: Graph[S, _], transform: (A) Any): S = {
if (StreamLayout.Debug) graph.module.validate()
val copy = graph.module.carbonCopy
moduleInProgress = moduleInProgress.grow(copy.transformMaterializedValue(transform.asInstanceOf[Any Any]))
graph.shape.copyFromPorts(copy.shape.inlets, copy.shape.outlets).asInstanceOf[S]
}
/**
* INTERNAL API.
*