+str #15905 Add capability to merge and connect flow graphs

* import edges of another flow graph to the builder
* connect undefined sink with undefined source, which may originate
  from different flow graphs
* illustrate how this low level api can be used in a high level api
  (lego bricks)
This commit is contained in:
Patrik Nordwall 2014-10-03 15:13:26 +02:00
parent ba40bfa399
commit 989995e118
3 changed files with 161 additions and 50 deletions

View file

@ -404,5 +404,32 @@ class FlowGraphCompileSpec extends AkkaSpec {
} }
} }
"support interconnect between two partial flow graphs" in {
val output1 = UndefinedSink[String]
val output2 = UndefinedSink[String]
val partial1 = PartialFlowGraph { implicit b
import FlowGraphImplicits._
val bcast = Broadcast[String]
in1 ~> bcast ~> output1
bcast ~> output2
}
val input1 = UndefinedSource[String]
val input2 = UndefinedSource[String]
val partial2 = PartialFlowGraph { implicit b
import FlowGraphImplicits._
val merge = Merge[String]
input1 ~> merge ~> out1
input2 ~> merge
}
FlowGraph { b
b.importPartialFlowGraph(partial1)
b.importPartialFlowGraph(partial2)
b.connect(output1, f1, input1)
b.connect(output2, f2, input2)
}.run()
}
} }
} }

View file

@ -6,9 +6,54 @@ import akka.stream.testkit.{ StreamTestKit, AkkaSpec }
import scala.concurrent.Await import scala.concurrent.Await
import scala.concurrent.duration._ import scala.concurrent.duration._
import akka.stream.scaladsl2.FlowGraphImplicits._ import akka.stream.scaladsl2.FlowGraphImplicits._
import akka.util.ByteString
import akka.stream.testkit.StreamTestKit.SubscriberProbe import akka.stream.testkit.StreamTestKit.SubscriberProbe
import akka.stream.testkit.StreamTestKit.OnNext
object GraphOpsIntegrationSpec {
object Lego {
def apply(pipeline: Flow[String, String]): Lego = {
val in = UndefinedSource[String]
val out = UndefinedSink[ByteString]
val graph = PartialFlowGraph { implicit builder
val balance = Balance[String]
val merge = Merge[String]
in ~> Flow[String].map(_.trim) ~> balance
balance ~> pipeline ~> merge
balance ~> pipeline ~> merge
balance ~> pipeline ~> merge
merge ~> Flow[String].map(_.trim).map(ByteString.fromString) ~> out
}
new Lego(in, out, graph)
}
}
class Lego private (
private val in: UndefinedSource[String],
private val out: UndefinedSink[ByteString],
private val graph: PartialFlowGraph) {
def connect(that: Lego, adapter: Flow[ByteString, String]): Lego = {
val newGraph = PartialFlowGraph { builder
builder.importPartialFlowGraph(this.graph)
builder.importPartialFlowGraph(that.graph)
builder.connect(this.out, adapter, that.in)
}
new Lego(this.in, that.out, newGraph)
}
def run(source: Source[String], sink: Sink[ByteString])(implicit materializer: FlowMaterializer): Unit =
FlowGraph(graph) { builder
builder.attachSource(in, source)
builder.attachSink(out, sink)
}.run()
}
}
class GraphOpsIntegrationSpec extends AkkaSpec { class GraphOpsIntegrationSpec extends AkkaSpec {
import GraphOpsIntegrationSpec._
val settings = MaterializerSettings(system) val settings = MaterializerSettings(system)
.withInputBuffer(initialSize = 2, maxSize = 16) .withInputBuffer(initialSize = 2, maxSize = 16)
@ -166,6 +211,23 @@ class GraphOpsIntegrationSpec extends AkkaSpec {
s2.expectComplete() s2.expectComplete()
} }
"be possible to use as lego bricks" in {
val lego1 = Lego(Flow[String].filter(_.length > 3).map(s s" $s "))
val lego2 = Lego(Flow[String].map(_.toUpperCase))
val lego3 = lego1.connect(lego2, Flow[ByteString].map(_.utf8String))
val source = PublisherTap(Source(List("green ", "blue", "red", "yellow", "black")).toPublisher)
val s = SubscriberProbe[ByteString]
val sink = SubscriberDrain(s)
lego3.run(source, sink)
val sub = s.expectSubscription()
sub.request(100)
val result = (s.probe.receiveN(4) collect {
case OnNext(b: ByteString) b.utf8String
}).sorted
result should be(Vector("BLACK", "BLUE", "GREEN", "YELLOW"))
s.expectComplete()
}
} }
} }

View file

@ -410,11 +410,13 @@ private[akka] object FlowGraphInternal {
override def toString: String = pipe.toString override def toString: String = pipe.toString
def withPipe(newFlow: Pipe[Any, Nothing]): EdgeLabel =
EdgeLabel(qualifier)(newFlow, inputPort, outputPort)
} }
type EdgeType[T] = LkDiEdge[T] { type L1 = EdgeLabel } type EdgeType[T] = LkDiEdge[T] { type L1 = EdgeLabel }
def edges(graph: scalax.collection.Graph[Vertex, EdgeType]): Iterable[EdgeType[Vertex]] =
graph.edges.map(e LkDiEdge(e.from.value, e.to.value)(e.label))
} }
/** /**
@ -427,11 +429,7 @@ class FlowGraphBuilder private (graph: Graph[FlowGraphInternal.Vertex, FlowGraph
private[akka] def this() = this(Graph.empty[FlowGraphInternal.Vertex, FlowGraphInternal.EdgeType]) private[akka] def this() = this(Graph.empty[FlowGraphInternal.Vertex, FlowGraphInternal.EdgeType])
private[akka] def this(immutableGraph: ImmutableGraph[FlowGraphInternal.Vertex, FlowGraphInternal.EdgeType]) = private[akka] def this(immutableGraph: ImmutableGraph[FlowGraphInternal.Vertex, FlowGraphInternal.EdgeType]) =
this({ this(Graph.from(edges = FlowGraphInternal.edges(immutableGraph)))
val edges: Iterable[FlowGraphInternal.EdgeType[FlowGraphInternal.Vertex]] =
immutableGraph.edges.map(e LkDiEdge(e.from.value, e.to.value)(e.label))
Graph.from(edges = edges)
})
private implicit val edgeFactory = scalax.collection.edge.LkDiEdge.asInstanceOf[LkBase.LkEdgeCompanion[EdgeType]] private implicit val edgeFactory = scalax.collection.edge.LkDiEdge.asInstanceOf[LkBase.LkEdgeCompanion[EdgeType]]
@ -441,7 +439,6 @@ class FlowGraphBuilder private (graph: Graph[FlowGraphInternal.Vertex, FlowGraph
private def addTapPipeEdge[In, Out](tap: Tap[In], pipe: Pipe[In, Out], junctionIn: JunctionInPort[Out]): this.type = { private def addTapPipeEdge[In, Out](tap: Tap[In], pipe: Pipe[In, Out], junctionIn: JunctionInPort[Out]): this.type = {
val tapVertex = TapVertex(tap) val tapVertex = TapVertex(tap)
checkAddTapDrainPrecondition(tapVertex)
checkJunctionInPortPrecondition(junctionIn) checkJunctionInPortPrecondition(junctionIn)
addGraphEdge(tapVertex, junctionIn.vertex, pipe, inputPort = junctionIn.port, outputPort = UnlabeledPort) addGraphEdge(tapVertex, junctionIn.vertex, pipe, inputPort = junctionIn.port, outputPort = UnlabeledPort)
this this
@ -449,14 +446,12 @@ class FlowGraphBuilder private (graph: Graph[FlowGraphInternal.Vertex, FlowGraph
private def addPipeDrainEdge[In, Out](junctionOut: JunctionOutPort[In], pipe: Pipe[In, Out], drain: Drain[Out]): this.type = { private def addPipeDrainEdge[In, Out](junctionOut: JunctionOutPort[In], pipe: Pipe[In, Out], drain: Drain[Out]): this.type = {
val drainVertex = DrainVertex(drain) val drainVertex = DrainVertex(drain)
checkAddTapDrainPrecondition(drainVertex)
checkJunctionOutPortPrecondition(junctionOut) checkJunctionOutPortPrecondition(junctionOut)
addGraphEdge(junctionOut.vertex, drainVertex, pipe, inputPort = UnlabeledPort, outputPort = junctionOut.port) addGraphEdge(junctionOut.vertex, drainVertex, pipe, inputPort = UnlabeledPort, outputPort = junctionOut.port)
this this
} }
def addEdge[In, Out](source: UndefinedSource[In], flow: Flow[In, Out], junctionIn: JunctionInPort[Out]): this.type = { def addEdge[In, Out](source: UndefinedSource[In], flow: Flow[In, Out], junctionIn: JunctionInPort[Out]): this.type = {
checkAddTapDrainPrecondition(source)
checkJunctionInPortPrecondition(junctionIn) checkJunctionInPortPrecondition(junctionIn)
flow match { flow match {
case pipe: Pipe[In, Out] case pipe: Pipe[In, Out]
@ -468,7 +463,6 @@ class FlowGraphBuilder private (graph: Graph[FlowGraphInternal.Vertex, FlowGraph
} }
def addEdge[In, Out](junctionOut: JunctionOutPort[In], flow: Flow[In, Out], sink: UndefinedSink[Out]): this.type = { def addEdge[In, Out](junctionOut: JunctionOutPort[In], flow: Flow[In, Out], sink: UndefinedSink[Out]): this.type = {
checkAddTapDrainPrecondition(sink)
checkJunctionOutPortPrecondition(junctionOut) checkJunctionOutPortPrecondition(junctionOut)
flow match { flow match {
case pipe: Pipe[In, Out] case pipe: Pipe[In, Out]
@ -526,11 +520,7 @@ class FlowGraphBuilder private (graph: Graph[FlowGraphInternal.Vertex, FlowGraph
def addEdge[In, Out](source: Source[In], flow: Flow[In, Out], sink: Sink[Out]): this.type = { def addEdge[In, Out](source: Source[In], flow: Flow[In, Out], sink: Sink[Out]): this.type = {
(source, flow, sink) match { (source, flow, sink) match {
case (tap: Tap[In], pipe: Pipe[In, Out], drain: Drain[Out]) case (tap: Tap[In], pipe: Pipe[In, Out], drain: Drain[Out])
val tapVertex = TapVertex(tap) addGraphEdge(TapVertex(tap), DrainVertex(drain), pipe, inputPort = UnlabeledPort, outputPort = UnlabeledPort)
val drainVertex = DrainVertex(drain)
checkAddTapDrainPrecondition(tapVertex)
checkAddTapDrainPrecondition(drainVertex)
addGraphEdge(tapVertex, drainVertex, pipe, inputPort = UnlabeledPort, outputPort = UnlabeledPort)
case (sourcePipe: SourcePipe[In], pipe: Pipe[In, Out], sinkPipe: SinkPipe[Out]) case (sourcePipe: SourcePipe[In], pipe: Pipe[In, Out], sinkPipe: SinkPipe[Out])
val tap = sourcePipe.input val tap = sourcePipe.input
val newPipe = Pipe(sourcePipe.ops).connect(pipe).connect(Pipe(sinkPipe.ops)) val newPipe = Pipe(sourcePipe.ops).connect(pipe).connect(Pipe(sinkPipe.ops))
@ -552,8 +542,6 @@ class FlowGraphBuilder private (graph: Graph[FlowGraphInternal.Vertex, FlowGraph
} }
def addEdge[In, Out](source: UndefinedSource[In], flow: Flow[In, Out], sink: UndefinedSink[Out]): this.type = { def addEdge[In, Out](source: UndefinedSource[In], flow: Flow[In, Out], sink: UndefinedSink[Out]): this.type = {
checkAddTapDrainPrecondition(source)
checkAddTapDrainPrecondition(sink)
flow match { flow match {
case pipe: Pipe[In, Out] case pipe: Pipe[In, Out]
addGraphEdge(source, sink, pipe, inputPort = UnlabeledPort, outputPort = UnlabeledPort) addGraphEdge(source, sink, pipe, inputPort = UnlabeledPort, outputPort = UnlabeledPort)
@ -564,32 +552,22 @@ class FlowGraphBuilder private (graph: Graph[FlowGraphInternal.Vertex, FlowGraph
} }
def addEdge[In, Out](source: UndefinedSource[In], flow: Flow[In, Out], sink: Sink[Out]): this.type = { def addEdge[In, Out](source: UndefinedSource[In], flow: Flow[In, Out], sink: Sink[Out]): this.type = {
checkAddTapDrainPrecondition(source)
(flow, sink) match { (flow, sink) match {
case (pipe: Pipe[In, Out], drain: Drain[Out]) case (pipe: Pipe[In, Out], drain: Drain[Out])
val drainVertex = DrainVertex(drain) addGraphEdge(source, DrainVertex(drain), pipe, inputPort = UnlabeledPort, outputPort = UnlabeledPort)
checkAddTapDrainPrecondition(drainVertex)
addGraphEdge(source, drainVertex, pipe, inputPort = UnlabeledPort, outputPort = UnlabeledPort)
case (pipe: Pipe[In, Out], spipe: SinkPipe[Out]) case (pipe: Pipe[In, Out], spipe: SinkPipe[Out])
val drainVertex = DrainVertex(spipe.output) addGraphEdge(source, DrainVertex(spipe.output), pipe.appendPipe(Pipe(spipe.ops)), inputPort = UnlabeledPort, outputPort = UnlabeledPort)
checkAddTapDrainPrecondition(drainVertex)
addGraphEdge(source, drainVertex, pipe.appendPipe(Pipe(spipe.ops)), inputPort = UnlabeledPort, outputPort = UnlabeledPort)
case _ throw new IllegalArgumentException(OnlyPipesErrorMessage) case _ throw new IllegalArgumentException(OnlyPipesErrorMessage)
} }
this this
} }
def addEdge[In, Out](source: Source[In], flow: Flow[In, Out], sink: UndefinedSink[Out]): this.type = { def addEdge[In, Out](source: Source[In], flow: Flow[In, Out], sink: UndefinedSink[Out]): this.type = {
checkAddTapDrainPrecondition(sink)
(flow, source) match { (flow, source) match {
case (pipe: Pipe[In, Out], tap: Tap[In]) case (pipe: Pipe[In, Out], tap: Tap[In])
val tapVertex = TapVertex(tap) addGraphEdge(TapVertex(tap), sink, pipe, inputPort = UnlabeledPort, outputPort = UnlabeledPort)
checkAddTapDrainPrecondition(tapVertex)
addGraphEdge(tapVertex, sink, pipe, inputPort = UnlabeledPort, outputPort = UnlabeledPort)
case (pipe: Pipe[In, Out], spipe: SourcePipe[Out]) case (pipe: Pipe[In, Out], spipe: SourcePipe[Out])
val tapVertex = TapVertex(spipe.input) addGraphEdge(TapVertex(spipe.input), sink, Pipe(spipe.ops).appendPipe(pipe), inputPort = UnlabeledPort, outputPort = UnlabeledPort)
checkAddTapDrainPrecondition(tapVertex)
addGraphEdge(tapVertex, sink, Pipe(spipe.ops).appendPipe(pipe), inputPort = UnlabeledPort, outputPort = UnlabeledPort)
case _ throw new IllegalArgumentException(OnlyPipesErrorMessage) case _ throw new IllegalArgumentException(OnlyPipesErrorMessage)
} }
this this
@ -597,6 +575,8 @@ class FlowGraphBuilder private (graph: Graph[FlowGraphInternal.Vertex, FlowGraph
private def addGraphEdge[In, Out](from: Vertex, to: Vertex, pipe: Pipe[In, Out], inputPort: Int, outputPort: Int): Unit = { private def addGraphEdge[In, Out](from: Vertex, to: Vertex, pipe: Pipe[In, Out], inputPort: Int, outputPort: Int): Unit = {
if (edgeQualifier == Int.MaxValue) throw new IllegalArgumentException(s"Too many edges") if (edgeQualifier == Int.MaxValue) throw new IllegalArgumentException(s"Too many edges")
checkAddTapDrainPrecondition(from)
checkAddTapDrainPrecondition(to)
val label = EdgeLabel(edgeQualifier)(pipe.asInstanceOf[Pipe[Any, Nothing]], inputPort, outputPort) val label = EdgeLabel(edgeQualifier)(pipe.asInstanceOf[Pipe[Any, Nothing]], inputPort, outputPort)
graph.addLEdge(from, to)(label) graph.addLEdge(from, to)(label)
edgeQualifier += 1 edgeQualifier += 1
@ -609,15 +589,10 @@ class FlowGraphBuilder private (graph: Graph[FlowGraphInternal.Vertex, FlowGraph
graph.remove(existing) graph.remove(existing)
sink match { sink match {
case drain: Drain[Out] case drain: Drain[Out]
val drainVertex = DrainVertex(drain) addGraphEdge(edge.from.value, DrainVertex(drain), edge.label.pipe, edge.label.inputPort, edge.label.outputPort)
checkAddTapDrainPrecondition(drainVertex)
graph.addLEdge(edge.from.value, drainVertex)(edge.label)
case spipe: SinkPipe[Out] case spipe: SinkPipe[Out]
val pipe = edge.label.pipe.appendPipe(Pipe(spipe.ops)) val pipe = edge.label.pipe.appendPipe(Pipe(spipe.ops))
val label = edge.label.withPipe(pipe) addGraphEdge(edge.from.value, DrainVertex(spipe.output), pipe, edge.label.inputPort, edge.label.outputPort)
val drainVertex = DrainVertex(spipe.output)
checkAddTapDrainPrecondition(drainVertex)
graph.addLEdge(edge.from.value, drainVertex)(label)
case _ throw new IllegalArgumentException(OnlyPipesErrorMessage) case _ throw new IllegalArgumentException(OnlyPipesErrorMessage)
} }
@ -633,15 +608,10 @@ class FlowGraphBuilder private (graph: Graph[FlowGraphInternal.Vertex, FlowGraph
graph.remove(existing) graph.remove(existing)
source match { source match {
case tap: Tap[In] case tap: Tap[In]
val tapVertex = TapVertex(tap) addGraphEdge(TapVertex(tap), edge.to.value, edge.label.pipe, edge.label.inputPort, edge.label.outputPort)
checkAddTapDrainPrecondition(tapVertex)
graph.addLEdge(tapVertex, edge.to.value)(edge.label)
case spipe: SourcePipe[In] case spipe: SourcePipe[In]
val pipe = Pipe(spipe.ops).appendPipe(edge.label.pipe) val pipe = Pipe(spipe.ops).appendPipe(edge.label.pipe)
val label = edge.label.withPipe(pipe) addGraphEdge(TapVertex(spipe.input), edge.to.value, pipe, edge.label.inputPort, edge.label.outputPort)
val tapVertex = TapVertex(spipe.input)
checkAddTapDrainPrecondition(tapVertex)
graph.addLEdge(tapVertex, edge.to.value)(label)
case _ case _
throw new IllegalArgumentException(OnlyPipesErrorMessage) throw new IllegalArgumentException(OnlyPipesErrorMessage)
} }
@ -651,6 +621,54 @@ class FlowGraphBuilder private (graph: Graph[FlowGraphInternal.Vertex, FlowGraph
this this
} }
/**
* Attach the undefined `out` to the undefined `in` with a flow in-between.
* Note that one [[PartialFlowGraph]] can be connected to another `PartialFlowGraph`
* by first importing the other `PartialFlowGraph` with [[#importPartialFlowGraph]]
* and then connect them with this method.
*/
def connect[A, B](out: UndefinedSink[A], flow: Flow[A, B], in: UndefinedSource[B]): this.type = {
require(graph.contains(out), s"Couldn't connect from [$out], no matching UndefinedSink")
require(graph.contains(in), s"Couldn't connect to [$in], no matching UndefinedSource")
val outEdge = graph.get(out).incoming.head
val inEdge = graph.get(in).outgoing.head
flow match {
case pipe: Pipe[A, B]
val newPipe = outEdge.label.pipe.appendPipe(pipe.asInstanceOf[Pipe[Any, Nothing]]).appendPipe(inEdge.label.pipe)
graph.remove(out)
graph.remove(in)
addGraphEdge(outEdge.from.value, inEdge.to.value, newPipe, inEdge.label.inputPort, outEdge.label.outputPort)
case _
throw new IllegalArgumentException(OnlyPipesErrorMessage)
}
this
}
/**
* Import all edges from another [[FlowGraph]] to this builder.
*/
def importFlowGraph(flowGraph: FlowGraph): this.type = {
importGraph(flowGraph.graph)
this
}
/**
* Import all edges from another [[PartialFlowGraph]] to this builder.
* After importing you can [[#connect]] undefined sources and sinks in
* two different `PartialFlowGraph` instances.
*/
def importPartialFlowGraph(partialFlowGraph: PartialFlowGraph): this.type = {
importGraph(partialFlowGraph.graph)
this
}
private def importGraph(immutableGraph: ImmutableGraph[Vertex, EdgeType]): Unit =
immutableGraph.edges foreach { edge
addGraphEdge(edge.from.value, edge.to.value, edge.label.pipe, edge.label.inputPort, edge.label.outputPort)
}
/** /**
* Flow graphs with cycles are in general dangerous as it can result in deadlocks. * Flow graphs with cycles are in general dangerous as it can result in deadlocks.
* Therefore, cycles in the graph are by default disallowed. `IllegalArgumentException` will * Therefore, cycles in the graph are by default disallowed. `IllegalArgumentException` will
@ -661,8 +679,13 @@ class FlowGraphBuilder private (graph: Graph[FlowGraphInternal.Vertex, FlowGraph
cyclesAllowed = true cyclesAllowed = true
} }
private def checkAddTapDrainPrecondition(node: Vertex): Unit = private def checkAddTapDrainPrecondition(vertex: Vertex): Unit = {
require(graph.find(node) == None, s"[$node] instance is already used in this flow graph") vertex match {
case node @ (_: UndefinedSource[_] | _: UndefinedSink[_])
require(graph.find(node) == None, s"[$node] instance is already used in this flow graph")
case _ // ok
}
}
private def checkJunctionInPortPrecondition(junction: JunctionInPort[_]): Unit = { private def checkJunctionInPortPrecondition(junction: JunctionInPort[_]): Unit = {
junction.vertex match { junction.vertex match {
@ -711,8 +734,7 @@ class FlowGraphBuilder private (graph: Graph[FlowGraphInternal.Vertex, FlowGraph
//convert it to an immutable.Graph //convert it to an immutable.Graph
private def immutableGraph(): ImmutableGraph[Vertex, FlowGraphInternal.EdgeType] = { private def immutableGraph(): ImmutableGraph[Vertex, FlowGraphInternal.EdgeType] = {
val edges = graph.edges.map(e LkDiEdge(e.from.value, e.to.value)(e.label)) ImmutableGraph.from(edges = FlowGraphInternal.edges(graph))
ImmutableGraph.from(edges = edges: Iterable[FlowGraphInternal.EdgeType[FlowGraphInternal.Vertex]])
} }
private def checkPartialBuildPreconditions(): Unit = { private def checkPartialBuildPreconditions(): Unit = {