From fde71193602820ff00b4affaedb3907d61df86a0 Mon Sep 17 00:00:00 2001 From: Konrad Malawski Date: Wed, 28 Jan 2015 11:49:07 +0100 Subject: [PATCH] =str #16687 rename internal GraphFlow to GraphBackedFlow Same rename made for GraphSource and GraphSink for consistency --- .../scaladsl/FlowGraphCompileSpec.scala | 2 +- ...owSpec.scala => GraphBackedFlowSpec.scala} | 12 ++--- .../scala/akka/stream/scaladsl/Flow.scala | 2 +- .../akka/stream/scaladsl/FlowGraph.scala | 34 +++++++------- ...{GraphFlow.scala => GraphBackedFlow.scala} | 46 +++++++++---------- .../scala/akka/stream/scaladsl/Pipe.scala | 22 ++++----- 6 files changed, 59 insertions(+), 59 deletions(-) rename akka-stream-tests/src/test/scala/akka/stream/scaladsl/{GraphFlowSpec.scala => GraphBackedFlowSpec.scala} (97%) rename akka-stream/src/main/scala/akka/stream/scaladsl/{GraphFlow.scala => GraphBackedFlow.scala} (77%) diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowGraphCompileSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowGraphCompileSpec.scala index 8ee54a103d..65104273ce 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowGraphCompileSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowGraphCompileSpec.scala @@ -440,7 +440,7 @@ class FlowGraphCompileSpec extends AkkaSpec { } } - "Junction is connected through GraphFlow" in { + "Junction is connected through GraphBackedFlow" in { val gflow = Flow[Int, String]() { implicit builder ⇒ import FlowGraphImplicits._ diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphFlowSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphBackedFlowSpec.scala similarity index 97% rename from akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphFlowSpec.scala rename to akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphBackedFlowSpec.scala index be62c9ac47..052510904b 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphFlowSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphBackedFlowSpec.scala @@ -9,7 +9,7 @@ import akka.stream.testkit.AkkaSpec import akka.stream.testkit.StreamTestKit.SubscriberProbe import akka.stream.testkit.StreamTestKit -object GraphFlowSpec { +object GraphBackedFlowSpec { val source1 = Source(0 to 3) val inMerge = Merge[Int] val outMerge = Merge[String] @@ -31,9 +31,9 @@ object GraphFlowSpec { val stdResult = Set(1, 2, 3, 4, 5, 6, 7, 8, 9, 10) } -class GraphFlowSpec extends AkkaSpec { +class GraphBackedFlowSpec extends AkkaSpec { - import GraphFlowSpec._ + import GraphBackedFlowSpec._ val settings = ActorFlowMaterializerSettings(system) .withInputBuffer(initialSize = 2, maxSize = 16) @@ -90,7 +90,7 @@ class GraphFlowSpec extends AkkaSpec { validateProbe(probe, stdRequests, stdResult) } - "work with another GraphFlow" in { + "work with another GraphBackedFlow" in { val in1 = UndefinedSource[Int] val out1 = UndefinedSink[String] @@ -188,7 +188,7 @@ class GraphFlowSpec extends AkkaSpec { validateProbe(probe, stdRequests, stdResult) } - "work with an GraphFlow" in { + "work with an GraphBackedFlow" in { val out1 = UndefinedSink[String] val in2 = UndefinedSource[String] @@ -292,7 +292,7 @@ class GraphFlowSpec extends AkkaSpec { validateProbe(probe, stdRequests, stdResult) } - "work with a GraphFlow" in { + "work with a GraphBackedFlow" in { val in1 = UndefinedSource[Int] val out1 = UndefinedSink[String] diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala index 30dbae3ce6..7810dcea89 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala @@ -115,7 +115,7 @@ object Flow { /** * Create a [[Flow]] from a seemingly disconnected [[Source]] and [[Sink]] pair. */ - def apply[I, O](sink: Sink[I], source: Source[O]): Flow[I, O] = GraphFlow(sink, source) + def apply[I, O](sink: Sink[I], source: Source[O]): Flow[I, O] = GraphBackedFlow(sink, source) } /** diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/FlowGraph.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/FlowGraph.scala index f951c6b7f8..3e38421378 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/FlowGraph.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/FlowGraph.scala @@ -619,7 +619,7 @@ class FlowGraphBuilder private[akka] ( flow match { case pipe: Pipe[In, Out] ⇒ addGraphEdge(source, junctionIn.vertex, pipe, inputPort = junctionIn.port, outputPort = UnlabeledPort) - case gflow: GraphFlow[In, _, _, Out] ⇒ + case gflow: GraphBackedFlow[In, _, _, Out] ⇒ val tOut = UndefinedSink[In] val tIn = UndefinedSource[Out] addEdge(source, tOut) @@ -638,7 +638,7 @@ class FlowGraphBuilder private[akka] ( flow match { case pipe: Pipe[In, Out] ⇒ addGraphEdge(junctionOut.vertex, sink, pipe, inputPort = UnlabeledPort, outputPort = junctionOut.port) - case gflow: GraphFlow[In, _, _, Out] ⇒ + case gflow: GraphBackedFlow[In, _, _, Out] ⇒ val tOut = UndefinedSink[In] val tIn = UndefinedSource[Out] addEdge(junctionOut, tOut) @@ -658,7 +658,7 @@ class FlowGraphBuilder private[akka] ( flow match { case pipe: Pipe[In, Out] ⇒ addGraphEdge(junctionOut.vertex, junctionIn.vertex, pipe, inputPort = junctionIn.port, outputPort = junctionOut.port) - case gflow: GraphFlow[In, _, _, Out] ⇒ + case gflow: GraphBackedFlow[In, _, _, Out] ⇒ val tOut = UndefinedSink[In] val tIn = UndefinedSource[Out] addEdge(junctionOut, tOut) @@ -675,7 +675,7 @@ class FlowGraphBuilder private[akka] ( (source, flow) match { case (spipe: SourcePipe[In], pipe: Pipe[In, Out]) ⇒ addSourceToPipeEdge(spipe.input, Pipe(spipe).appendPipe(pipe), junctionIn) - case (gsource: GraphSource[_, In], _) ⇒ + case (gsource: GraphBackedSource[_, In], _) ⇒ val tOut = UndefinedSink[In] val tIn = UndefinedSource[Out] addEdge(gsource, tOut) @@ -694,7 +694,7 @@ class FlowGraphBuilder private[akka] ( (flow, sink) match { case (pipe: Pipe[In, Out], spipe: SinkPipe[Out]) ⇒ addPipeToSinkEdge(junctionOut, pipe.appendPipe(Pipe(spipe)), spipe.output) - case (_, gsink: GraphSink[Out, _]) ⇒ + case (_, gsink: GraphBackedSink[Out, _]) ⇒ val tOut = UndefinedSink[In] val tIn = UndefinedSource[Out] addEdge(tIn, gsink) @@ -702,7 +702,7 @@ class FlowGraphBuilder private[akka] ( connect(tOut, flow, tIn) case (pipe: Pipe[In, Out], sink: Sink[Out]) ⇒ addPipeToSinkEdge(junctionOut, pipe, sink) - case (gf: GraphFlow[_, Out, _, _], sink: Sink[Out]) ⇒ + case (gf: GraphBackedFlow[_, Out, _, _], sink: Sink[Out]) ⇒ addPipeToSinkEdge(junctionOut, gf.inPipe, sink) case x ⇒ throwUnsupportedValue(x) } @@ -726,7 +726,7 @@ class FlowGraphBuilder private[akka] ( val newPipe = pipe.via(Pipe(sinkPipe)) val snk = sinkPipe.output addEdge(source, newPipe, snk) // recursive, but now it is a Source-Pipe-Sink - case (_, gflow: GraphFlow[In, _, _, Out], _) ⇒ + case (_, gflow: GraphBackedFlow[In, _, _, Out], _) ⇒ val tOut = UndefinedSink[In] val tIn = UndefinedSource[Out] addEdge(source, tOut) @@ -746,7 +746,7 @@ class FlowGraphBuilder private[akka] ( flow match { case pipe: Pipe[In, Out] ⇒ addGraphEdge(source, sink, pipe, inputPort = UnlabeledPort, outputPort = UnlabeledPort) - case gflow: GraphFlow[In, _, _, Out] ⇒ + case gflow: GraphBackedFlow[In, _, _, Out] ⇒ val tOut = UndefinedSink[In] val tIn = UndefinedSource[Out] addEdge(source, tOut) @@ -763,13 +763,13 @@ class FlowGraphBuilder private[akka] ( (flow, sink) match { case (pipe: Pipe[In, Out], spipe: SinkPipe[Out]) ⇒ addGraphEdge(source, SinkVertex(spipe.output), pipe.appendPipe(Pipe(spipe)), inputPort = UnlabeledPort, outputPort = UnlabeledPort) - case (gflow: GraphFlow[In, _, _, Out], _) ⇒ + case (gflow: GraphBackedFlow[In, _, _, Out], _) ⇒ val tOut = UndefinedSink[In] val tIn = UndefinedSource[Out] addEdge(source, tOut) addEdge(tIn, sink) connect(tOut, gflow, tIn) - case (_, gSink: GraphSink[Out, _]) ⇒ + case (_, gSink: GraphBackedSink[Out, _]) ⇒ val oOut = UndefinedSink[Out] addEdge(source, flow, oOut) gSink.importAndConnect(this, oOut) @@ -786,7 +786,7 @@ class FlowGraphBuilder private[akka] ( (flow, source) match { case (pipe: Pipe[In, Out], spipe: SourcePipe[Out]) ⇒ addGraphEdge(SourceVertex(spipe.input), sink, Pipe(spipe).appendPipe(pipe), inputPort = UnlabeledPort, outputPort = UnlabeledPort) - case (_, gsource: GraphSource[_, In]) ⇒ + case (_, gsource: GraphBackedSource[_, In]) ⇒ val tOut1 = UndefinedSource[In] val tOut2 = UndefinedSink[In] val tIn = UndefinedSource[Out] @@ -841,7 +841,7 @@ class FlowGraphBuilder private[akka] ( case spipe: SinkPipe[Out] ⇒ val pipe = edge.label.pipe.appendPipe(Pipe(spipe)) addOrReplaceSinkEdge(edge.from.label, SinkVertex(spipe.output), pipe, edge.label.inputPort, edge.label.outputPort) - case gsink: GraphSink[Out, _] ⇒ + case gsink: GraphBackedSink[Out, _] ⇒ gsink.importAndConnect(this, token) case sink: Sink[Out] ⇒ addOrReplaceSinkEdge(edge.from.label, SinkVertex(sink), edge.label.pipe, edge.label.inputPort, edge.label.outputPort) @@ -861,7 +861,7 @@ class FlowGraphBuilder private[akka] ( case spipe: SourcePipe[In] ⇒ val pipe = Pipe(spipe).appendPipe(edge.label.pipe) addOrReplaceSourceEdge(SourceVertex(spipe.input), edge.to.label, pipe, edge.label.inputPort, edge.label.outputPort) - case gsource: GraphSource[_, In] ⇒ + case gsource: GraphBackedSource[_, In] ⇒ gsource.importAndConnect(this, token) case source: Source[In] ⇒ addOrReplaceSourceEdge(SourceVertex(source), edge.to.label, edge.label.pipe, edge.label.inputPort, edge.label.outputPort) @@ -924,7 +924,7 @@ class FlowGraphBuilder private[akka] ( val newPipe = outEdge.label.pipe.appendPipe(pipe.asInstanceOf[Pipe[Any, Nothing]]).appendPipe(inEdge.label.pipe) addOrReplaceGraphEdge(outEdge.from.label, inEdge.to.label, newPipe, inEdge.label.inputPort, outEdge.label.outputPort) } - case gflow: GraphFlow[A, _, _, B] ⇒ + case gflow: GraphBackedFlow[A, _, _, B] ⇒ require(joining == false, "Graph flows should have been split up to pipes while joining") gflow.importAndConnect(this, out, in) case x ⇒ throwUnsupportedValue(x) @@ -1336,7 +1336,7 @@ class PartialFlowGraph private[akka] (private[akka] val graph: DirectedGraphBuil */ def toSource[O](out: UndefinedSink[O]): Source[O] = { checkUndefinedSinksAndSources(sources = Nil, sinks = List(out), description = "Source") - GraphSource(this, out, Pipe.empty[O]) + GraphBackedSource(this, out, Pipe.empty[O]) } /** @@ -1345,7 +1345,7 @@ class PartialFlowGraph private[akka] (private[akka] val graph: DirectedGraphBuil */ def toFlow[I, O](in: UndefinedSource[I], out: UndefinedSink[O]): Flow[I, O] = { checkUndefinedSinksAndSources(sources = List(in), sinks = List(out), description = "Flow") - GraphFlow(Pipe.empty[I], in, this, out, Pipe.empty[O]) + GraphBackedFlow(Pipe.empty[I], in, this, out, Pipe.empty[O]) } /** @@ -1354,7 +1354,7 @@ class PartialFlowGraph private[akka] (private[akka] val graph: DirectedGraphBuil */ def toSink[I](in: UndefinedSource[I]): Sink[I] = { checkUndefinedSinksAndSources(sources = List(in), sinks = Nil, description = "Sink") - GraphSink(Pipe.empty[I], in, this) + GraphBackedSink(Pipe.empty[I], in, this) } private def checkUndefinedSinksAndSources(sources: List[UndefinedSource[_]], sinks: List[UndefinedSink[_]], description: String): Unit = { diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/GraphFlow.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/GraphBackedFlow.scala similarity index 77% rename from akka-stream/src/main/scala/akka/stream/scaladsl/GraphFlow.scala rename to akka-stream/src/main/scala/akka/stream/scaladsl/GraphBackedFlow.scala index a40d1781c9..3140b04b55 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/GraphFlow.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/GraphBackedFlow.scala @@ -11,13 +11,13 @@ import scala.collection.immutable /** * INTERNAL API */ -private[scaladsl] object GraphFlow { +private[scaladsl] object GraphBackedFlow { /** - * Create a [[GraphFlow]] from this [[Flow]] + * Create a [[GraphBackedFlow]] from this [[Flow]] */ def apply[In, Out](flow: Flow[In, Out]) = flow match { - case gFlow: GraphFlow[In, _, _, Out] ⇒ gFlow + case gFlow: GraphBackedFlow[In, _, _, Out] ⇒ gFlow case _ ⇒ Flow() { implicit b ⇒ import FlowGraphImplicits._ val in = UndefinedSource[In] @@ -28,7 +28,7 @@ private[scaladsl] object GraphFlow { } /** - * Create a [[GraphFlow]] from a seemingly disconnected [[Source]] and [[Sink]] pair. + * Create a [[GraphBackedFlow]] from a seemingly disconnected [[Source]] and [[Sink]] pair. */ def apply[I, O](sink: Sink[I], source: Source[O]) = Flow() { implicit b ⇒ import FlowGraphImplicits._ @@ -40,23 +40,23 @@ private[scaladsl] object GraphFlow { } } -private[scaladsl] final case class GraphFlow[-In, CIn, COut, +Out]( +private[scaladsl] final case class GraphBackedFlow[-In, CIn, COut, +Out]( inPipe: Pipe[In, CIn], in: UndefinedSource[CIn], graph: PartialFlowGraph, out: UndefinedSink[COut], outPipe: Pipe[COut, Out]) extends Flow[In, Out] { - override type Repr[+O] = GraphFlow[In @uncheckedVariance, CIn, COut, O] + override type Repr[+O] = GraphBackedFlow[In @uncheckedVariance, CIn, COut, O] - private[scaladsl] def prepend[T](pipe: Pipe[T, In]): GraphFlow[T, CIn, COut, Out] = copy(inPipe = pipe.appendPipe(inPipe)) + private[scaladsl] def prepend[T](pipe: Pipe[T, In]): GraphBackedFlow[T, CIn, COut, Out] = copy(inPipe = pipe.appendPipe(inPipe)) - private[scaladsl] def prepend(pipe: SourcePipe[In]): GraphSource[COut, Out] = { + private[scaladsl] def prepend(pipe: SourcePipe[In]): GraphBackedSource[COut, Out] = { val b = new FlowGraphBuilder() b.allowCycles() // FIXME: remove after #16571 is cleared val (nIn, nOut) = remap(b) b.attachSource(nIn, pipe.appendPipe(inPipe)) - GraphSource(b.partialBuild(), nOut, outPipe) + GraphBackedSource(b.partialBuild(), nOut, outPipe) } private[scaladsl] def remap(builder: FlowGraphBuilder): (UndefinedSource[CIn], UndefinedSink[COut]) = { @@ -74,14 +74,14 @@ private[scaladsl] final case class GraphFlow[-In, CIn, COut, +Out]( def via[T](flow: Flow[Out, T]): Flow[In, T] = flow match { case pipe: Pipe[Out, T] ⇒ copy(outPipe = outPipe.appendPipe(pipe)) - case gFlow: GraphFlow[Out, _, _, T] ⇒ + case gFlow: GraphBackedFlow[Out, _, _, T] ⇒ val (newGraph, nOut) = FlowGraphBuilder(graph) { b ⇒ b.allowCycles() // FIXME: remove after #16571 is cleared val (oIn, oOut) = gFlow.remap(b) b.connect(out, outPipe.via(gFlow.inPipe), oIn) (b.partialBuild(), oOut) } - GraphFlow(inPipe, in, newGraph, nOut, gFlow.outPipe) + GraphBackedFlow(inPipe, in, newGraph, nOut, gFlow.outPipe) case x ⇒ FlowGraphInternal.throwUnsupportedValue(x) } @@ -90,13 +90,13 @@ private[scaladsl] final case class GraphFlow[-In, CIn, COut, +Out]( val newGraph = PartialFlowGraph(this.graph) { builder ⇒ builder.attachSink(out, outPipe.to(sinkPipe)) } - GraphSink(inPipe, in, newGraph) - case gSink: GraphSink[Out, Out] ⇒ + GraphBackedSink(inPipe, in, newGraph) + case gSink: GraphBackedSink[Out, Out] ⇒ val newGraph = PartialFlowGraph(graph) { b ⇒ val oIn = gSink.remap(b) b.connect(out, outPipe.via(gSink.inPipe), oIn) } - GraphSink(inPipe, in, newGraph) + GraphBackedSink(inPipe, in, newGraph) case sink: Sink[Out] ⇒ to(Pipe.empty.withSink(sink)) // recursive, but now it is a SinkPipe } @@ -106,7 +106,7 @@ private[scaladsl] final case class GraphFlow[-In, CIn, COut, +Out]( b.allowCycles() b.allowDisconnected() } - case gFlow: GraphFlow[Out, _, _, In] ⇒ + case gFlow: GraphBackedFlow[Out, _, _, In] ⇒ FlowGraph(graph) { b ⇒ val (oIn, oOut) = gFlow.remap(b) b.connect(out, outPipe.via(gFlow.inPipe), oIn, joining = true) @@ -125,8 +125,8 @@ private[scaladsl] final case class GraphFlow[-In, CIn, COut, +Out]( def withAttributes(attr: OperationAttributes): Repr[Out] = copy(outPipe = outPipe.withAttributes(attr)) } -private[scaladsl] final case class GraphSource[COut, +Out](graph: PartialFlowGraph, out: UndefinedSink[COut], outPipe: Pipe[COut, Out]) extends Source[Out] { - override type Repr[+O] = GraphSource[COut, O] +private[scaladsl] final case class GraphBackedSource[COut, +Out](graph: PartialFlowGraph, out: UndefinedSink[COut], outPipe: Pipe[COut, Out]) extends Source[Out] { + override type Repr[+O] = GraphBackedSource[COut, O] private[scaladsl] def remap(builder: FlowGraphBuilder): UndefinedSink[COut] = { val nOut = UndefinedSink[COut] @@ -141,14 +141,14 @@ private[scaladsl] final case class GraphSource[COut, +Out](graph: PartialFlowGra override def via[T](flow: Flow[Out, T]): Source[T] = flow match { case pipe: Pipe[Out, T] ⇒ copy(outPipe = outPipe.appendPipe(pipe)) - case gFlow: GraphFlow[Out, _, _, T] ⇒ + case gFlow: GraphBackedFlow[Out, _, _, T] ⇒ val (newGraph, nOut) = FlowGraphBuilder(graph) { b ⇒ b.allowCycles() // FIXME: remove after #16571 is cleared val (oIn, oOut) = gFlow.remap(b) b.connect(out, outPipe.via(gFlow.inPipe), oIn) (b.partialBuild(), oOut) } - GraphSource(newGraph, nOut, gFlow.outPipe) + GraphBackedSource(newGraph, nOut, gFlow.outPipe) } override def to(sink: Sink[Out]): RunnableFlow = sink match { @@ -156,7 +156,7 @@ private[scaladsl] final case class GraphSource[COut, +Out](graph: PartialFlowGra FlowGraph(this.graph) { implicit builder ⇒ builder.attachSink(out, outPipe.to(sinkPipe)) } - case gSink: GraphSink[Out, _] ⇒ + case gSink: GraphBackedSink[Out, _] ⇒ FlowGraph(graph) { b ⇒ val oIn = gSink.remap(b) b.connect(out, outPipe.via(gSink.inPipe), oIn) @@ -173,7 +173,7 @@ private[scaladsl] final case class GraphSource[COut, +Out](graph: PartialFlowGra def withAttributes(attr: OperationAttributes): Repr[Out] = copy(outPipe = outPipe.withAttributes(attr)) } -private[scaladsl] final case class GraphSink[-In, CIn](inPipe: Pipe[In, CIn], in: UndefinedSource[CIn], graph: PartialFlowGraph) extends Sink[In] { +private[scaladsl] final case class GraphBackedSink[-In, CIn](inPipe: Pipe[In, CIn], in: UndefinedSource[CIn], graph: PartialFlowGraph) extends Sink[In] { private[scaladsl] def remap(builder: FlowGraphBuilder): UndefinedSource[CIn] = { val nIn = UndefinedSource[CIn] @@ -187,8 +187,8 @@ private[scaladsl] final case class GraphSink[-In, CIn](inPipe: Pipe[In, CIn], in } } - private[scaladsl] def prepend[T](pipe: Pipe[T, In]): GraphSink[T, CIn] = { - GraphSink(pipe.appendPipe(inPipe), in, graph) + private[scaladsl] def prepend[T](pipe: Pipe[T, In]): GraphBackedSink[T, CIn] = { + GraphBackedSink(pipe.appendPipe(inPipe), in, graph) } private[scaladsl] def importAndConnect(builder: FlowGraphBuilder, oOut: UndefinedSink[In @uncheckedVariance]): Unit = { diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Pipe.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Pipe.scala index de0a2c2f98..ecd5fb97ca 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Pipe.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Pipe.scala @@ -45,21 +45,21 @@ private[akka] final case class Pipe[-In, +Out](ops: List[AstNode], keys: List[Ke private[stream] def withSource(in: Source[In]): SourcePipe[Out] = SourcePipe(in, ops, keys) override def via[T](flow: Flow[Out, T]): Flow[In, T] = flow match { - case p: Pipe[Out, T] ⇒ this.appendPipe(p) - case gf: GraphFlow[Out, _, _, T] ⇒ gf.prepend(this) - case x ⇒ FlowGraphInternal.throwUnsupportedValue(x) + case p: Pipe[Out, T] ⇒ this.appendPipe(p) + case gf: GraphBackedFlow[Out, _, _, T] ⇒ gf.prepend(this) + case x ⇒ FlowGraphInternal.throwUnsupportedValue(x) } override def to(sink: Sink[Out]): Sink[In] = sink match { case sp: SinkPipe[Out] ⇒ sp.prependPipe(this) - case gs: GraphSink[Out, _] ⇒ gs.prepend(this) + case gs: GraphBackedSink[Out, _] ⇒ gs.prepend(this) case d: Sink[Out] ⇒ this.withSink(d) } override def join(flow: Flow[Out, In]): RunnableFlow = flow match { - case p: Pipe[Out, In] ⇒ GraphFlow(this).join(p) - case gf: GraphFlow[Out, _, _, In] ⇒ gf.join(this) - case x ⇒ FlowGraphInternal.throwUnsupportedValue(x) + case p: Pipe[Out, In] ⇒ GraphBackedFlow(this).join(p) + case gf: GraphBackedFlow[Out, _, _, In] ⇒ gf.join(this) + case x ⇒ FlowGraphInternal.throwUnsupportedValue(x) } override def withKey(key: Key[_]): Pipe[In, Out] = Pipe(ops, keys :+ key) @@ -93,14 +93,14 @@ private[stream] final case class SourcePipe[+Out](input: Source[_], ops: List[As private[stream] def appendPipe[T](pipe: Pipe[Out, T]): SourcePipe[T] = SourcePipe(input, pipe.ops ::: ops, keys ::: pipe.keys) // FIXME raw addition of AstNodes override def via[T](flow: Flow[Out, T]): Source[T] = flow match { - case p: Pipe[Out, T] ⇒ this.appendPipe(p) - case g: GraphFlow[Out, _, _, T] ⇒ g.prepend(this) - case x ⇒ FlowGraphInternal.throwUnsupportedValue(x) + case p: Pipe[Out, T] ⇒ this.appendPipe(p) + case g: GraphBackedFlow[Out, _, _, T] ⇒ g.prepend(this) + case x ⇒ FlowGraphInternal.throwUnsupportedValue(x) } override def to(sink: Sink[Out]): RunnableFlow = sink match { case sp: SinkPipe[Out] ⇒ RunnablePipe(input, sp.output, sp.ops ::: ops, keys ::: sp.keys) // FIXME raw addition of AstNodes - case g: GraphSink[Out, _] ⇒ g.prepend(this) + case g: GraphBackedSink[Out, _] ⇒ g.prepend(this) case d: Sink[Out] ⇒ this.withSink(d) }