Merge pull request #16734 from ktoso/wip-graphflow-rename-ktoso

=str #16687 rename internal GraphFlow to GraphBackedFlow
This commit is contained in:
Konrad Malawski 2015-01-29 12:23:05 +01:00
commit 638d00bc79
6 changed files with 59 additions and 59 deletions

View file

@ -440,7 +440,7 @@ class FlowGraphCompileSpec extends AkkaSpec {
}
}
"Junction is connected through GraphFlow" in {
"Junction is connected through GraphBackedFlow" in {
val gflow = Flow[Int, String]() { implicit builder
import FlowGraphImplicits._

View file

@ -9,7 +9,7 @@ import akka.stream.testkit.AkkaSpec
import akka.stream.testkit.StreamTestKit.SubscriberProbe
import akka.stream.testkit.StreamTestKit
object GraphFlowSpec {
object GraphBackedFlowSpec {
val source1 = Source(0 to 3)
val inMerge = Merge[Int]
val outMerge = Merge[String]
@ -31,9 +31,9 @@ object GraphFlowSpec {
val stdResult = Set(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
}
class GraphFlowSpec extends AkkaSpec {
class GraphBackedFlowSpec extends AkkaSpec {
import GraphFlowSpec._
import GraphBackedFlowSpec._
val settings = ActorFlowMaterializerSettings(system)
.withInputBuffer(initialSize = 2, maxSize = 16)
@ -90,7 +90,7 @@ class GraphFlowSpec extends AkkaSpec {
validateProbe(probe, stdRequests, stdResult)
}
"work with another GraphFlow" in {
"work with another GraphBackedFlow" in {
val in1 = UndefinedSource[Int]
val out1 = UndefinedSink[String]
@ -188,7 +188,7 @@ class GraphFlowSpec extends AkkaSpec {
validateProbe(probe, stdRequests, stdResult)
}
"work with an GraphFlow" in {
"work with an GraphBackedFlow" in {
val out1 = UndefinedSink[String]
val in2 = UndefinedSource[String]
@ -292,7 +292,7 @@ class GraphFlowSpec extends AkkaSpec {
validateProbe(probe, stdRequests, stdResult)
}
"work with a GraphFlow" in {
"work with a GraphBackedFlow" in {
val in1 = UndefinedSource[Int]
val out1 = UndefinedSink[String]

View file

@ -115,7 +115,7 @@ object Flow {
/**
* Create a [[Flow]] from a seemingly disconnected [[Source]] and [[Sink]] pair.
*/
def apply[I, O](sink: Sink[I], source: Source[O]): Flow[I, O] = GraphFlow(sink, source)
def apply[I, O](sink: Sink[I], source: Source[O]): Flow[I, O] = GraphBackedFlow(sink, source)
}
/**

View file

@ -619,7 +619,7 @@ class FlowGraphBuilder private[akka] (
flow match {
case pipe: Pipe[In, Out]
addGraphEdge(source, junctionIn.vertex, pipe, inputPort = junctionIn.port, outputPort = UnlabeledPort)
case gflow: GraphFlow[In, _, _, Out]
case gflow: GraphBackedFlow[In, _, _, Out]
val tOut = UndefinedSink[In]
val tIn = UndefinedSource[Out]
addEdge(source, tOut)
@ -638,7 +638,7 @@ class FlowGraphBuilder private[akka] (
flow match {
case pipe: Pipe[In, Out]
addGraphEdge(junctionOut.vertex, sink, pipe, inputPort = UnlabeledPort, outputPort = junctionOut.port)
case gflow: GraphFlow[In, _, _, Out]
case gflow: GraphBackedFlow[In, _, _, Out]
val tOut = UndefinedSink[In]
val tIn = UndefinedSource[Out]
addEdge(junctionOut, tOut)
@ -658,7 +658,7 @@ class FlowGraphBuilder private[akka] (
flow match {
case pipe: Pipe[In, Out]
addGraphEdge(junctionOut.vertex, junctionIn.vertex, pipe, inputPort = junctionIn.port, outputPort = junctionOut.port)
case gflow: GraphFlow[In, _, _, Out]
case gflow: GraphBackedFlow[In, _, _, Out]
val tOut = UndefinedSink[In]
val tIn = UndefinedSource[Out]
addEdge(junctionOut, tOut)
@ -675,7 +675,7 @@ class FlowGraphBuilder private[akka] (
(source, flow) match {
case (spipe: SourcePipe[In], pipe: Pipe[In, Out])
addSourceToPipeEdge(spipe.input, Pipe(spipe).appendPipe(pipe), junctionIn)
case (gsource: GraphSource[_, In], _)
case (gsource: GraphBackedSource[_, In], _)
val tOut = UndefinedSink[In]
val tIn = UndefinedSource[Out]
addEdge(gsource, tOut)
@ -694,7 +694,7 @@ class FlowGraphBuilder private[akka] (
(flow, sink) match {
case (pipe: Pipe[In, Out], spipe: SinkPipe[Out])
addPipeToSinkEdge(junctionOut, pipe.appendPipe(Pipe(spipe)), spipe.output)
case (_, gsink: GraphSink[Out, _])
case (_, gsink: GraphBackedSink[Out, _])
val tOut = UndefinedSink[In]
val tIn = UndefinedSource[Out]
addEdge(tIn, gsink)
@ -702,7 +702,7 @@ class FlowGraphBuilder private[akka] (
connect(tOut, flow, tIn)
case (pipe: Pipe[In, Out], sink: Sink[Out])
addPipeToSinkEdge(junctionOut, pipe, sink)
case (gf: GraphFlow[_, Out, _, _], sink: Sink[Out])
case (gf: GraphBackedFlow[_, Out, _, _], sink: Sink[Out])
addPipeToSinkEdge(junctionOut, gf.inPipe, sink)
case x throwUnsupportedValue(x)
}
@ -726,7 +726,7 @@ class FlowGraphBuilder private[akka] (
val newPipe = pipe.via(Pipe(sinkPipe))
val snk = sinkPipe.output
addEdge(source, newPipe, snk) // recursive, but now it is a Source-Pipe-Sink
case (_, gflow: GraphFlow[In, _, _, Out], _)
case (_, gflow: GraphBackedFlow[In, _, _, Out], _)
val tOut = UndefinedSink[In]
val tIn = UndefinedSource[Out]
addEdge(source, tOut)
@ -746,7 +746,7 @@ class FlowGraphBuilder private[akka] (
flow match {
case pipe: Pipe[In, Out]
addGraphEdge(source, sink, pipe, inputPort = UnlabeledPort, outputPort = UnlabeledPort)
case gflow: GraphFlow[In, _, _, Out]
case gflow: GraphBackedFlow[In, _, _, Out]
val tOut = UndefinedSink[In]
val tIn = UndefinedSource[Out]
addEdge(source, tOut)
@ -763,13 +763,13 @@ class FlowGraphBuilder private[akka] (
(flow, sink) match {
case (pipe: Pipe[In, Out], spipe: SinkPipe[Out])
addGraphEdge(source, SinkVertex(spipe.output), pipe.appendPipe(Pipe(spipe)), inputPort = UnlabeledPort, outputPort = UnlabeledPort)
case (gflow: GraphFlow[In, _, _, Out], _)
case (gflow: GraphBackedFlow[In, _, _, Out], _)
val tOut = UndefinedSink[In]
val tIn = UndefinedSource[Out]
addEdge(source, tOut)
addEdge(tIn, sink)
connect(tOut, gflow, tIn)
case (_, gSink: GraphSink[Out, _])
case (_, gSink: GraphBackedSink[Out, _])
val oOut = UndefinedSink[Out]
addEdge(source, flow, oOut)
gSink.importAndConnect(this, oOut)
@ -786,7 +786,7 @@ class FlowGraphBuilder private[akka] (
(flow, source) match {
case (pipe: Pipe[In, Out], spipe: SourcePipe[Out])
addGraphEdge(SourceVertex(spipe.input), sink, Pipe(spipe).appendPipe(pipe), inputPort = UnlabeledPort, outputPort = UnlabeledPort)
case (_, gsource: GraphSource[_, In])
case (_, gsource: GraphBackedSource[_, In])
val tOut1 = UndefinedSource[In]
val tOut2 = UndefinedSink[In]
val tIn = UndefinedSource[Out]
@ -841,7 +841,7 @@ class FlowGraphBuilder private[akka] (
case spipe: SinkPipe[Out]
val pipe = edge.label.pipe.appendPipe(Pipe(spipe))
addOrReplaceSinkEdge(edge.from.label, SinkVertex(spipe.output), pipe, edge.label.inputPort, edge.label.outputPort)
case gsink: GraphSink[Out, _]
case gsink: GraphBackedSink[Out, _]
gsink.importAndConnect(this, token)
case sink: Sink[Out]
addOrReplaceSinkEdge(edge.from.label, SinkVertex(sink), edge.label.pipe, edge.label.inputPort, edge.label.outputPort)
@ -861,7 +861,7 @@ class FlowGraphBuilder private[akka] (
case spipe: SourcePipe[In]
val pipe = Pipe(spipe).appendPipe(edge.label.pipe)
addOrReplaceSourceEdge(SourceVertex(spipe.input), edge.to.label, pipe, edge.label.inputPort, edge.label.outputPort)
case gsource: GraphSource[_, In]
case gsource: GraphBackedSource[_, In]
gsource.importAndConnect(this, token)
case source: Source[In]
addOrReplaceSourceEdge(SourceVertex(source), edge.to.label, edge.label.pipe, edge.label.inputPort, edge.label.outputPort)
@ -924,7 +924,7 @@ class FlowGraphBuilder private[akka] (
val newPipe = outEdge.label.pipe.appendPipe(pipe.asInstanceOf[Pipe[Any, Nothing]]).appendPipe(inEdge.label.pipe)
addOrReplaceGraphEdge(outEdge.from.label, inEdge.to.label, newPipe, inEdge.label.inputPort, outEdge.label.outputPort)
}
case gflow: GraphFlow[A, _, _, B]
case gflow: GraphBackedFlow[A, _, _, B]
require(joining == false, "Graph flows should have been split up to pipes while joining")
gflow.importAndConnect(this, out, in)
case x throwUnsupportedValue(x)
@ -1336,7 +1336,7 @@ class PartialFlowGraph private[akka] (private[akka] val graph: DirectedGraphBuil
*/
def toSource[O](out: UndefinedSink[O]): Source[O] = {
checkUndefinedSinksAndSources(sources = Nil, sinks = List(out), description = "Source")
GraphSource(this, out, Pipe.empty[O])
GraphBackedSource(this, out, Pipe.empty[O])
}
/**
@ -1345,7 +1345,7 @@ class PartialFlowGraph private[akka] (private[akka] val graph: DirectedGraphBuil
*/
def toFlow[I, O](in: UndefinedSource[I], out: UndefinedSink[O]): Flow[I, O] = {
checkUndefinedSinksAndSources(sources = List(in), sinks = List(out), description = "Flow")
GraphFlow(Pipe.empty[I], in, this, out, Pipe.empty[O])
GraphBackedFlow(Pipe.empty[I], in, this, out, Pipe.empty[O])
}
/**
@ -1354,7 +1354,7 @@ class PartialFlowGraph private[akka] (private[akka] val graph: DirectedGraphBuil
*/
def toSink[I](in: UndefinedSource[I]): Sink[I] = {
checkUndefinedSinksAndSources(sources = List(in), sinks = Nil, description = "Sink")
GraphSink(Pipe.empty[I], in, this)
GraphBackedSink(Pipe.empty[I], in, this)
}
private def checkUndefinedSinksAndSources(sources: List[UndefinedSource[_]], sinks: List[UndefinedSink[_]], description: String): Unit = {

View file

@ -11,13 +11,13 @@ import scala.collection.immutable
/**
* INTERNAL API
*/
private[scaladsl] object GraphFlow {
private[scaladsl] object GraphBackedFlow {
/**
* Create a [[GraphFlow]] from this [[Flow]]
* Create a [[GraphBackedFlow]] from this [[Flow]]
*/
def apply[In, Out](flow: Flow[In, Out]) = flow match {
case gFlow: GraphFlow[In, _, _, Out] gFlow
case gFlow: GraphBackedFlow[In, _, _, Out] gFlow
case _ Flow() { implicit b
import FlowGraphImplicits._
val in = UndefinedSource[In]
@ -28,7 +28,7 @@ private[scaladsl] object GraphFlow {
}
/**
* Create a [[GraphFlow]] from a seemingly disconnected [[Source]] and [[Sink]] pair.
* Create a [[GraphBackedFlow]] from a seemingly disconnected [[Source]] and [[Sink]] pair.
*/
def apply[I, O](sink: Sink[I], source: Source[O]) = Flow() { implicit b
import FlowGraphImplicits._
@ -40,23 +40,23 @@ private[scaladsl] object GraphFlow {
}
}
private[scaladsl] final case class GraphFlow[-In, CIn, COut, +Out](
private[scaladsl] final case class GraphBackedFlow[-In, CIn, COut, +Out](
inPipe: Pipe[In, CIn],
in: UndefinedSource[CIn],
graph: PartialFlowGraph,
out: UndefinedSink[COut],
outPipe: Pipe[COut, Out])
extends Flow[In, Out] {
override type Repr[+O] = GraphFlow[In @uncheckedVariance, CIn, COut, O]
override type Repr[+O] = GraphBackedFlow[In @uncheckedVariance, CIn, COut, O]
private[scaladsl] def prepend[T](pipe: Pipe[T, In]): GraphFlow[T, CIn, COut, Out] = copy(inPipe = pipe.appendPipe(inPipe))
private[scaladsl] def prepend[T](pipe: Pipe[T, In]): GraphBackedFlow[T, CIn, COut, Out] = copy(inPipe = pipe.appendPipe(inPipe))
private[scaladsl] def prepend(pipe: SourcePipe[In]): GraphSource[COut, Out] = {
private[scaladsl] def prepend(pipe: SourcePipe[In]): GraphBackedSource[COut, Out] = {
val b = new FlowGraphBuilder()
b.allowCycles() // FIXME: remove after #16571 is cleared
val (nIn, nOut) = remap(b)
b.attachSource(nIn, pipe.appendPipe(inPipe))
GraphSource(b.partialBuild(), nOut, outPipe)
GraphBackedSource(b.partialBuild(), nOut, outPipe)
}
private[scaladsl] def remap(builder: FlowGraphBuilder): (UndefinedSource[CIn], UndefinedSink[COut]) = {
@ -74,14 +74,14 @@ private[scaladsl] final case class GraphFlow[-In, CIn, COut, +Out](
def via[T](flow: Flow[Out, T]): Flow[In, T] = flow match {
case pipe: Pipe[Out, T] copy(outPipe = outPipe.appendPipe(pipe))
case gFlow: GraphFlow[Out, _, _, T]
case gFlow: GraphBackedFlow[Out, _, _, T]
val (newGraph, nOut) = FlowGraphBuilder(graph) { b
b.allowCycles() // FIXME: remove after #16571 is cleared
val (oIn, oOut) = gFlow.remap(b)
b.connect(out, outPipe.via(gFlow.inPipe), oIn)
(b.partialBuild(), oOut)
}
GraphFlow(inPipe, in, newGraph, nOut, gFlow.outPipe)
GraphBackedFlow(inPipe, in, newGraph, nOut, gFlow.outPipe)
case x FlowGraphInternal.throwUnsupportedValue(x)
}
@ -90,13 +90,13 @@ private[scaladsl] final case class GraphFlow[-In, CIn, COut, +Out](
val newGraph = PartialFlowGraph(this.graph) { builder
builder.attachSink(out, outPipe.to(sinkPipe))
}
GraphSink(inPipe, in, newGraph)
case gSink: GraphSink[Out, Out]
GraphBackedSink(inPipe, in, newGraph)
case gSink: GraphBackedSink[Out, Out]
val newGraph = PartialFlowGraph(graph) { b
val oIn = gSink.remap(b)
b.connect(out, outPipe.via(gSink.inPipe), oIn)
}
GraphSink(inPipe, in, newGraph)
GraphBackedSink(inPipe, in, newGraph)
case sink: Sink[Out] to(Pipe.empty.withSink(sink)) // recursive, but now it is a SinkPipe
}
@ -106,7 +106,7 @@ private[scaladsl] final case class GraphFlow[-In, CIn, COut, +Out](
b.allowCycles()
b.allowDisconnected()
}
case gFlow: GraphFlow[Out, _, _, In]
case gFlow: GraphBackedFlow[Out, _, _, In]
FlowGraph(graph) { b
val (oIn, oOut) = gFlow.remap(b)
b.connect(out, outPipe.via(gFlow.inPipe), oIn, joining = true)
@ -125,8 +125,8 @@ private[scaladsl] final case class GraphFlow[-In, CIn, COut, +Out](
def withAttributes(attr: OperationAttributes): Repr[Out] = copy(outPipe = outPipe.withAttributes(attr))
}
private[scaladsl] final case class GraphSource[COut, +Out](graph: PartialFlowGraph, out: UndefinedSink[COut], outPipe: Pipe[COut, Out]) extends Source[Out] {
override type Repr[+O] = GraphSource[COut, O]
private[scaladsl] final case class GraphBackedSource[COut, +Out](graph: PartialFlowGraph, out: UndefinedSink[COut], outPipe: Pipe[COut, Out]) extends Source[Out] {
override type Repr[+O] = GraphBackedSource[COut, O]
private[scaladsl] def remap(builder: FlowGraphBuilder): UndefinedSink[COut] = {
val nOut = UndefinedSink[COut]
@ -141,14 +141,14 @@ private[scaladsl] final case class GraphSource[COut, +Out](graph: PartialFlowGra
override def via[T](flow: Flow[Out, T]): Source[T] = flow match {
case pipe: Pipe[Out, T] copy(outPipe = outPipe.appendPipe(pipe))
case gFlow: GraphFlow[Out, _, _, T]
case gFlow: GraphBackedFlow[Out, _, _, T]
val (newGraph, nOut) = FlowGraphBuilder(graph) { b
b.allowCycles() // FIXME: remove after #16571 is cleared
val (oIn, oOut) = gFlow.remap(b)
b.connect(out, outPipe.via(gFlow.inPipe), oIn)
(b.partialBuild(), oOut)
}
GraphSource(newGraph, nOut, gFlow.outPipe)
GraphBackedSource(newGraph, nOut, gFlow.outPipe)
}
override def to(sink: Sink[Out]): RunnableFlow = sink match {
@ -156,7 +156,7 @@ private[scaladsl] final case class GraphSource[COut, +Out](graph: PartialFlowGra
FlowGraph(this.graph) { implicit builder
builder.attachSink(out, outPipe.to(sinkPipe))
}
case gSink: GraphSink[Out, _]
case gSink: GraphBackedSink[Out, _]
FlowGraph(graph) { b
val oIn = gSink.remap(b)
b.connect(out, outPipe.via(gSink.inPipe), oIn)
@ -173,7 +173,7 @@ private[scaladsl] final case class GraphSource[COut, +Out](graph: PartialFlowGra
def withAttributes(attr: OperationAttributes): Repr[Out] = copy(outPipe = outPipe.withAttributes(attr))
}
private[scaladsl] final case class GraphSink[-In, CIn](inPipe: Pipe[In, CIn], in: UndefinedSource[CIn], graph: PartialFlowGraph) extends Sink[In] {
private[scaladsl] final case class GraphBackedSink[-In, CIn](inPipe: Pipe[In, CIn], in: UndefinedSource[CIn], graph: PartialFlowGraph) extends Sink[In] {
private[scaladsl] def remap(builder: FlowGraphBuilder): UndefinedSource[CIn] = {
val nIn = UndefinedSource[CIn]
@ -187,8 +187,8 @@ private[scaladsl] final case class GraphSink[-In, CIn](inPipe: Pipe[In, CIn], in
}
}
private[scaladsl] def prepend[T](pipe: Pipe[T, In]): GraphSink[T, CIn] = {
GraphSink(pipe.appendPipe(inPipe), in, graph)
private[scaladsl] def prepend[T](pipe: Pipe[T, In]): GraphBackedSink[T, CIn] = {
GraphBackedSink(pipe.appendPipe(inPipe), in, graph)
}
private[scaladsl] def importAndConnect(builder: FlowGraphBuilder, oOut: UndefinedSink[In @uncheckedVariance]): Unit = {

View file

@ -45,21 +45,21 @@ private[akka] final case class Pipe[-In, +Out](ops: List[AstNode], keys: List[Ke
private[stream] def withSource(in: Source[In]): SourcePipe[Out] = SourcePipe(in, ops, keys)
override def via[T](flow: Flow[Out, T]): Flow[In, T] = flow match {
case p: Pipe[Out, T] this.appendPipe(p)
case gf: GraphFlow[Out, _, _, T] gf.prepend(this)
case x FlowGraphInternal.throwUnsupportedValue(x)
case p: Pipe[Out, T] this.appendPipe(p)
case gf: GraphBackedFlow[Out, _, _, T] gf.prepend(this)
case x FlowGraphInternal.throwUnsupportedValue(x)
}
override def to(sink: Sink[Out]): Sink[In] = sink match {
case sp: SinkPipe[Out] sp.prependPipe(this)
case gs: GraphSink[Out, _] gs.prepend(this)
case gs: GraphBackedSink[Out, _] gs.prepend(this)
case d: Sink[Out] this.withSink(d)
}
override def join(flow: Flow[Out, In]): RunnableFlow = flow match {
case p: Pipe[Out, In] GraphFlow(this).join(p)
case gf: GraphFlow[Out, _, _, In] gf.join(this)
case x FlowGraphInternal.throwUnsupportedValue(x)
case p: Pipe[Out, In] GraphBackedFlow(this).join(p)
case gf: GraphBackedFlow[Out, _, _, In] gf.join(this)
case x FlowGraphInternal.throwUnsupportedValue(x)
}
override def withKey(key: Key[_]): Pipe[In, Out] = Pipe(ops, keys :+ key)
@ -93,14 +93,14 @@ private[stream] final case class SourcePipe[+Out](input: Source[_], ops: List[As
private[stream] def appendPipe[T](pipe: Pipe[Out, T]): SourcePipe[T] = SourcePipe(input, pipe.ops ::: ops, keys ::: pipe.keys) // FIXME raw addition of AstNodes
override def via[T](flow: Flow[Out, T]): Source[T] = flow match {
case p: Pipe[Out, T] this.appendPipe(p)
case g: GraphFlow[Out, _, _, T] g.prepend(this)
case x FlowGraphInternal.throwUnsupportedValue(x)
case p: Pipe[Out, T] this.appendPipe(p)
case g: GraphBackedFlow[Out, _, _, T] g.prepend(this)
case x FlowGraphInternal.throwUnsupportedValue(x)
}
override def to(sink: Sink[Out]): RunnableFlow = sink match {
case sp: SinkPipe[Out] RunnablePipe(input, sp.output, sp.ops ::: ops, keys ::: sp.keys) // FIXME raw addition of AstNodes
case g: GraphSink[Out, _] g.prepend(this)
case g: GraphBackedSink[Out, _] g.prepend(this)
case d: Sink[Out] this.withSink(d)
}