From 989995e1188190aa02bc32a9a80e738a280c8382 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Fri, 3 Oct 2014 15:13:26 +0200 Subject: [PATCH] +str #15905 Add capability to merge and connect flow graphs * import edges of another flow graph to the builder * connect undefined sink with undefined source, which may originate from different flow graphs * illustrate how this low level api can be used in a high level api (lego bricks) --- .../scaladsl2/FlowGraphCompileSpec.scala | 27 ++++ .../scaladsl2/GraphOpsIntegrationSpec.scala | 62 +++++++++ .../akka/stream/scaladsl2/FlowGraph.scala | 122 +++++++++++------- 3 files changed, 161 insertions(+), 50 deletions(-) diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl2/FlowGraphCompileSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl2/FlowGraphCompileSpec.scala index e2780d6ef2..b64fcf14b1 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl2/FlowGraphCompileSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl2/FlowGraphCompileSpec.scala @@ -404,5 +404,32 @@ class FlowGraphCompileSpec extends AkkaSpec { } } + "support interconnect between two partial flow graphs" in { + val output1 = UndefinedSink[String] + val output2 = UndefinedSink[String] + val partial1 = PartialFlowGraph { implicit b ⇒ + import FlowGraphImplicits._ + val bcast = Broadcast[String] + in1 ~> bcast ~> output1 + bcast ~> output2 + } + + val input1 = UndefinedSource[String] + val input2 = UndefinedSource[String] + val partial2 = PartialFlowGraph { implicit b ⇒ + import FlowGraphImplicits._ + val merge = Merge[String] + input1 ~> merge ~> out1 + input2 ~> merge + } + + FlowGraph { b ⇒ + b.importPartialFlowGraph(partial1) + b.importPartialFlowGraph(partial2) + b.connect(output1, f1, input1) + b.connect(output2, f2, input2) + }.run() + } + } } diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl2/GraphOpsIntegrationSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl2/GraphOpsIntegrationSpec.scala index 26351f1f4f..4dab3c4783 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl2/GraphOpsIntegrationSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl2/GraphOpsIntegrationSpec.scala @@ -6,9 +6,54 @@ import akka.stream.testkit.{ StreamTestKit, AkkaSpec } import scala.concurrent.Await import scala.concurrent.duration._ import akka.stream.scaladsl2.FlowGraphImplicits._ +import akka.util.ByteString import akka.stream.testkit.StreamTestKit.SubscriberProbe +import akka.stream.testkit.StreamTestKit.OnNext + +object GraphOpsIntegrationSpec { + + object Lego { + def apply(pipeline: Flow[String, String]): Lego = { + val in = UndefinedSource[String] + val out = UndefinedSink[ByteString] + val graph = PartialFlowGraph { implicit builder ⇒ + val balance = Balance[String] + val merge = Merge[String] + in ~> Flow[String].map(_.trim) ~> balance + balance ~> pipeline ~> merge + balance ~> pipeline ~> merge + balance ~> pipeline ~> merge + merge ~> Flow[String].map(_.trim).map(ByteString.fromString) ~> out + } + new Lego(in, out, graph) + } + } + + class Lego private ( + private val in: UndefinedSource[String], + private val out: UndefinedSink[ByteString], + private val graph: PartialFlowGraph) { + + def connect(that: Lego, adapter: Flow[ByteString, String]): Lego = { + val newGraph = PartialFlowGraph { builder ⇒ + builder.importPartialFlowGraph(this.graph) + builder.importPartialFlowGraph(that.graph) + builder.connect(this.out, adapter, that.in) + } + new Lego(this.in, that.out, newGraph) + } + + def run(source: Source[String], sink: Sink[ByteString])(implicit materializer: FlowMaterializer): Unit = + FlowGraph(graph) { builder ⇒ + builder.attachSource(in, source) + builder.attachSink(out, sink) + }.run() + + } +} class GraphOpsIntegrationSpec extends AkkaSpec { + import GraphOpsIntegrationSpec._ val settings = MaterializerSettings(system) .withInputBuffer(initialSize = 2, maxSize = 16) @@ -166,6 +211,23 @@ class GraphOpsIntegrationSpec extends AkkaSpec { s2.expectComplete() } + "be possible to use as lego bricks" in { + val lego1 = Lego(Flow[String].filter(_.length > 3).map(s ⇒ s" $s ")) + val lego2 = Lego(Flow[String].map(_.toUpperCase)) + val lego3 = lego1.connect(lego2, Flow[ByteString].map(_.utf8String)) + val source = PublisherTap(Source(List("green ", "blue", "red", "yellow", "black")).toPublisher) + val s = SubscriberProbe[ByteString] + val sink = SubscriberDrain(s) + lego3.run(source, sink) + val sub = s.expectSubscription() + sub.request(100) + val result = (s.probe.receiveN(4) collect { + case OnNext(b: ByteString) ⇒ b.utf8String + }).sorted + result should be(Vector("BLACK", "BLUE", "GREEN", "YELLOW")) + s.expectComplete() + } + } } diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl2/FlowGraph.scala b/akka-stream/src/main/scala/akka/stream/scaladsl2/FlowGraph.scala index 86b3ad9b5a..aa290b7944 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl2/FlowGraph.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl2/FlowGraph.scala @@ -410,11 +410,13 @@ private[akka] object FlowGraphInternal { override def toString: String = pipe.toString - def withPipe(newFlow: Pipe[Any, Nothing]): EdgeLabel = - EdgeLabel(qualifier)(newFlow, inputPort, outputPort) } type EdgeType[T] = LkDiEdge[T] { type L1 = EdgeLabel } + + def edges(graph: scalax.collection.Graph[Vertex, EdgeType]): Iterable[EdgeType[Vertex]] = + graph.edges.map(e ⇒ LkDiEdge(e.from.value, e.to.value)(e.label)) + } /** @@ -427,11 +429,7 @@ class FlowGraphBuilder private (graph: Graph[FlowGraphInternal.Vertex, FlowGraph private[akka] def this() = this(Graph.empty[FlowGraphInternal.Vertex, FlowGraphInternal.EdgeType]) private[akka] def this(immutableGraph: ImmutableGraph[FlowGraphInternal.Vertex, FlowGraphInternal.EdgeType]) = - this({ - val edges: Iterable[FlowGraphInternal.EdgeType[FlowGraphInternal.Vertex]] = - immutableGraph.edges.map(e ⇒ LkDiEdge(e.from.value, e.to.value)(e.label)) - Graph.from(edges = edges) - }) + this(Graph.from(edges = FlowGraphInternal.edges(immutableGraph))) private implicit val edgeFactory = scalax.collection.edge.LkDiEdge.asInstanceOf[LkBase.LkEdgeCompanion[EdgeType]] @@ -441,7 +439,6 @@ class FlowGraphBuilder private (graph: Graph[FlowGraphInternal.Vertex, FlowGraph private def addTapPipeEdge[In, Out](tap: Tap[In], pipe: Pipe[In, Out], junctionIn: JunctionInPort[Out]): this.type = { val tapVertex = TapVertex(tap) - checkAddTapDrainPrecondition(tapVertex) checkJunctionInPortPrecondition(junctionIn) addGraphEdge(tapVertex, junctionIn.vertex, pipe, inputPort = junctionIn.port, outputPort = UnlabeledPort) this @@ -449,14 +446,12 @@ class FlowGraphBuilder private (graph: Graph[FlowGraphInternal.Vertex, FlowGraph private def addPipeDrainEdge[In, Out](junctionOut: JunctionOutPort[In], pipe: Pipe[In, Out], drain: Drain[Out]): this.type = { val drainVertex = DrainVertex(drain) - checkAddTapDrainPrecondition(drainVertex) checkJunctionOutPortPrecondition(junctionOut) addGraphEdge(junctionOut.vertex, drainVertex, pipe, inputPort = UnlabeledPort, outputPort = junctionOut.port) this } def addEdge[In, Out](source: UndefinedSource[In], flow: Flow[In, Out], junctionIn: JunctionInPort[Out]): this.type = { - checkAddTapDrainPrecondition(source) checkJunctionInPortPrecondition(junctionIn) flow match { case pipe: Pipe[In, Out] ⇒ @@ -468,7 +463,6 @@ class FlowGraphBuilder private (graph: Graph[FlowGraphInternal.Vertex, FlowGraph } def addEdge[In, Out](junctionOut: JunctionOutPort[In], flow: Flow[In, Out], sink: UndefinedSink[Out]): this.type = { - checkAddTapDrainPrecondition(sink) checkJunctionOutPortPrecondition(junctionOut) flow match { case pipe: Pipe[In, Out] ⇒ @@ -526,11 +520,7 @@ class FlowGraphBuilder private (graph: Graph[FlowGraphInternal.Vertex, FlowGraph def addEdge[In, Out](source: Source[In], flow: Flow[In, Out], sink: Sink[Out]): this.type = { (source, flow, sink) match { case (tap: Tap[In], pipe: Pipe[In, Out], drain: Drain[Out]) ⇒ - val tapVertex = TapVertex(tap) - val drainVertex = DrainVertex(drain) - checkAddTapDrainPrecondition(tapVertex) - checkAddTapDrainPrecondition(drainVertex) - addGraphEdge(tapVertex, drainVertex, pipe, inputPort = UnlabeledPort, outputPort = UnlabeledPort) + addGraphEdge(TapVertex(tap), DrainVertex(drain), pipe, inputPort = UnlabeledPort, outputPort = UnlabeledPort) case (sourcePipe: SourcePipe[In], pipe: Pipe[In, Out], sinkPipe: SinkPipe[Out]) ⇒ val tap = sourcePipe.input val newPipe = Pipe(sourcePipe.ops).connect(pipe).connect(Pipe(sinkPipe.ops)) @@ -552,8 +542,6 @@ class FlowGraphBuilder private (graph: Graph[FlowGraphInternal.Vertex, FlowGraph } def addEdge[In, Out](source: UndefinedSource[In], flow: Flow[In, Out], sink: UndefinedSink[Out]): this.type = { - checkAddTapDrainPrecondition(source) - checkAddTapDrainPrecondition(sink) flow match { case pipe: Pipe[In, Out] ⇒ addGraphEdge(source, sink, pipe, inputPort = UnlabeledPort, outputPort = UnlabeledPort) @@ -564,32 +552,22 @@ class FlowGraphBuilder private (graph: Graph[FlowGraphInternal.Vertex, FlowGraph } def addEdge[In, Out](source: UndefinedSource[In], flow: Flow[In, Out], sink: Sink[Out]): this.type = { - checkAddTapDrainPrecondition(source) (flow, sink) match { case (pipe: Pipe[In, Out], drain: Drain[Out]) ⇒ - val drainVertex = DrainVertex(drain) - checkAddTapDrainPrecondition(drainVertex) - addGraphEdge(source, drainVertex, pipe, inputPort = UnlabeledPort, outputPort = UnlabeledPort) + addGraphEdge(source, DrainVertex(drain), pipe, inputPort = UnlabeledPort, outputPort = UnlabeledPort) case (pipe: Pipe[In, Out], spipe: SinkPipe[Out]) ⇒ - val drainVertex = DrainVertex(spipe.output) - checkAddTapDrainPrecondition(drainVertex) - addGraphEdge(source, drainVertex, pipe.appendPipe(Pipe(spipe.ops)), inputPort = UnlabeledPort, outputPort = UnlabeledPort) + addGraphEdge(source, DrainVertex(spipe.output), pipe.appendPipe(Pipe(spipe.ops)), inputPort = UnlabeledPort, outputPort = UnlabeledPort) case _ ⇒ throw new IllegalArgumentException(OnlyPipesErrorMessage) } this } def addEdge[In, Out](source: Source[In], flow: Flow[In, Out], sink: UndefinedSink[Out]): this.type = { - checkAddTapDrainPrecondition(sink) (flow, source) match { case (pipe: Pipe[In, Out], tap: Tap[In]) ⇒ - val tapVertex = TapVertex(tap) - checkAddTapDrainPrecondition(tapVertex) - addGraphEdge(tapVertex, sink, pipe, inputPort = UnlabeledPort, outputPort = UnlabeledPort) + addGraphEdge(TapVertex(tap), sink, pipe, inputPort = UnlabeledPort, outputPort = UnlabeledPort) case (pipe: Pipe[In, Out], spipe: SourcePipe[Out]) ⇒ - val tapVertex = TapVertex(spipe.input) - checkAddTapDrainPrecondition(tapVertex) - addGraphEdge(tapVertex, sink, Pipe(spipe.ops).appendPipe(pipe), inputPort = UnlabeledPort, outputPort = UnlabeledPort) + addGraphEdge(TapVertex(spipe.input), sink, Pipe(spipe.ops).appendPipe(pipe), inputPort = UnlabeledPort, outputPort = UnlabeledPort) case _ ⇒ throw new IllegalArgumentException(OnlyPipesErrorMessage) } this @@ -597,6 +575,8 @@ class FlowGraphBuilder private (graph: Graph[FlowGraphInternal.Vertex, FlowGraph private def addGraphEdge[In, Out](from: Vertex, to: Vertex, pipe: Pipe[In, Out], inputPort: Int, outputPort: Int): Unit = { if (edgeQualifier == Int.MaxValue) throw new IllegalArgumentException(s"Too many edges") + checkAddTapDrainPrecondition(from) + checkAddTapDrainPrecondition(to) val label = EdgeLabel(edgeQualifier)(pipe.asInstanceOf[Pipe[Any, Nothing]], inputPort, outputPort) graph.addLEdge(from, to)(label) edgeQualifier += 1 @@ -609,15 +589,10 @@ class FlowGraphBuilder private (graph: Graph[FlowGraphInternal.Vertex, FlowGraph graph.remove(existing) sink match { case drain: Drain[Out] ⇒ - val drainVertex = DrainVertex(drain) - checkAddTapDrainPrecondition(drainVertex) - graph.addLEdge(edge.from.value, drainVertex)(edge.label) + addGraphEdge(edge.from.value, DrainVertex(drain), edge.label.pipe, edge.label.inputPort, edge.label.outputPort) case spipe: SinkPipe[Out] ⇒ val pipe = edge.label.pipe.appendPipe(Pipe(spipe.ops)) - val label = edge.label.withPipe(pipe) - val drainVertex = DrainVertex(spipe.output) - checkAddTapDrainPrecondition(drainVertex) - graph.addLEdge(edge.from.value, drainVertex)(label) + addGraphEdge(edge.from.value, DrainVertex(spipe.output), pipe, edge.label.inputPort, edge.label.outputPort) case _ ⇒ throw new IllegalArgumentException(OnlyPipesErrorMessage) } @@ -633,15 +608,10 @@ class FlowGraphBuilder private (graph: Graph[FlowGraphInternal.Vertex, FlowGraph graph.remove(existing) source match { case tap: Tap[In] ⇒ - val tapVertex = TapVertex(tap) - checkAddTapDrainPrecondition(tapVertex) - graph.addLEdge(tapVertex, edge.to.value)(edge.label) + addGraphEdge(TapVertex(tap), edge.to.value, edge.label.pipe, edge.label.inputPort, edge.label.outputPort) case spipe: SourcePipe[In] ⇒ val pipe = Pipe(spipe.ops).appendPipe(edge.label.pipe) - val label = edge.label.withPipe(pipe) - val tapVertex = TapVertex(spipe.input) - checkAddTapDrainPrecondition(tapVertex) - graph.addLEdge(tapVertex, edge.to.value)(label) + addGraphEdge(TapVertex(spipe.input), edge.to.value, pipe, edge.label.inputPort, edge.label.outputPort) case _ ⇒ throw new IllegalArgumentException(OnlyPipesErrorMessage) } @@ -651,6 +621,54 @@ class FlowGraphBuilder private (graph: Graph[FlowGraphInternal.Vertex, FlowGraph this } + /** + * Attach the undefined `out` to the undefined `in` with a flow in-between. + * Note that one [[PartialFlowGraph]] can be connected to another `PartialFlowGraph` + * by first importing the other `PartialFlowGraph` with [[#importPartialFlowGraph]] + * and then connect them with this method. + */ + def connect[A, B](out: UndefinedSink[A], flow: Flow[A, B], in: UndefinedSource[B]): this.type = { + require(graph.contains(out), s"Couldn't connect from [$out], no matching UndefinedSink") + require(graph.contains(in), s"Couldn't connect to [$in], no matching UndefinedSource") + + val outEdge = graph.get(out).incoming.head + val inEdge = graph.get(in).outgoing.head + flow match { + case pipe: Pipe[A, B] ⇒ + val newPipe = outEdge.label.pipe.appendPipe(pipe.asInstanceOf[Pipe[Any, Nothing]]).appendPipe(inEdge.label.pipe) + graph.remove(out) + graph.remove(in) + addGraphEdge(outEdge.from.value, inEdge.to.value, newPipe, inEdge.label.inputPort, outEdge.label.outputPort) + case _ ⇒ + throw new IllegalArgumentException(OnlyPipesErrorMessage) + } + + this + } + + /** + * Import all edges from another [[FlowGraph]] to this builder. + */ + def importFlowGraph(flowGraph: FlowGraph): this.type = { + importGraph(flowGraph.graph) + this + } + + /** + * Import all edges from another [[PartialFlowGraph]] to this builder. + * After importing you can [[#connect]] undefined sources and sinks in + * two different `PartialFlowGraph` instances. + */ + def importPartialFlowGraph(partialFlowGraph: PartialFlowGraph): this.type = { + importGraph(partialFlowGraph.graph) + this + } + + private def importGraph(immutableGraph: ImmutableGraph[Vertex, EdgeType]): Unit = + immutableGraph.edges foreach { edge ⇒ + addGraphEdge(edge.from.value, edge.to.value, edge.label.pipe, edge.label.inputPort, edge.label.outputPort) + } + /** * Flow graphs with cycles are in general dangerous as it can result in deadlocks. * Therefore, cycles in the graph are by default disallowed. `IllegalArgumentException` will @@ -661,8 +679,13 @@ class FlowGraphBuilder private (graph: Graph[FlowGraphInternal.Vertex, FlowGraph cyclesAllowed = true } - private def checkAddTapDrainPrecondition(node: Vertex): Unit = - require(graph.find(node) == None, s"[$node] instance is already used in this flow graph") + private def checkAddTapDrainPrecondition(vertex: Vertex): Unit = { + vertex match { + case node @ (_: UndefinedSource[_] | _: UndefinedSink[_]) ⇒ + require(graph.find(node) == None, s"[$node] instance is already used in this flow graph") + case _ ⇒ // ok + } + } private def checkJunctionInPortPrecondition(junction: JunctionInPort[_]): Unit = { junction.vertex match { @@ -711,8 +734,7 @@ class FlowGraphBuilder private (graph: Graph[FlowGraphInternal.Vertex, FlowGraph //convert it to an immutable.Graph private def immutableGraph(): ImmutableGraph[Vertex, FlowGraphInternal.EdgeType] = { - val edges = graph.edges.map(e ⇒ LkDiEdge(e.from.value, e.to.value)(e.label)) - ImmutableGraph.from(edges = edges: Iterable[FlowGraphInternal.EdgeType[FlowGraphInternal.Vertex]]) + ImmutableGraph.from(edges = FlowGraphInternal.edges(graph)) } private def checkPartialBuildPreconditions(): Unit = {