=str #15844 Use key labeled edges to support multiple edges between same vertices

This commit is contained in:
Patrik Nordwall 2014-09-11 07:43:05 +02:00
parent c385d69573
commit 353640a6c1
2 changed files with 35 additions and 26 deletions

View file

@ -4,7 +4,7 @@
package akka.stream.scaladsl2 package akka.stream.scaladsl2
import scala.language.existentials import scala.language.existentials
import scalax.collection.edge.LDiEdge import scalax.collection.edge.LkDiEdge
import scalax.collection.mutable.Graph import scalax.collection.mutable.Graph
import scalax.collection.immutable.{ Graph ImmutableGraph } import scalax.collection.immutable.{ Graph ImmutableGraph }
import org.reactivestreams.Subscriber import org.reactivestreams.Subscriber
@ -160,34 +160,41 @@ private[akka] object FlowGraphInternal {
} }
} }
// flow not part of equals/hashCode
case class EdgeLabel(qualifier: Int)(val flow: ProcessorFlow[Any, Any]) {
override def toString: String = flow.toString
}
} }
/** /**
* Builder of [[FlowGraph]] and [[PartialFlowGraph]]. * Builder of [[FlowGraph]] and [[PartialFlowGraph]].
* Syntactic sugar is provided by [[FlowGraphImplicits]]. * Syntactic sugar is provided by [[FlowGraphImplicits]].
*/ */
class FlowGraphBuilder private (graph: Graph[FlowGraphInternal.Vertex, LDiEdge]) { class FlowGraphBuilder private (graph: Graph[FlowGraphInternal.Vertex, LkDiEdge]) {
import FlowGraphInternal._ import FlowGraphInternal._
private[akka] def this() = this(Graph.empty[FlowGraphInternal.Vertex, LDiEdge]) private[akka] def this() = this(Graph.empty[FlowGraphInternal.Vertex, LkDiEdge])
private[akka] def this(immutableGraph: ImmutableGraph[FlowGraphInternal.Vertex, LDiEdge]) = private[akka] def this(immutableGraph: ImmutableGraph[FlowGraphInternal.Vertex, LkDiEdge]) =
this(Graph.from(edges = immutableGraph.edges.map(e LDiEdge(e.from.value, e.to.value)(e.label)).toIterable)) this(Graph.from(edges = immutableGraph.edges.map(e LkDiEdge(e.from.value, e.to.value)(e.label)).toIterable))
private implicit val edgeFactory = scalax.collection.edge.LDiEdge private implicit val edgeFactory = scalax.collection.edge.LkDiEdge
var edgeQualifier = graph.edges.size
def addEdge[In, Out](source: Source[In], flow: ProcessorFlow[In, Out], sink: Junction[Out]): this.type = { def addEdge[In, Out](source: Source[In], flow: ProcessorFlow[In, Out], sink: Junction[Out]): this.type = {
val sourceVertex = SourceVertex(source) val sourceVertex = SourceVertex(source)
checkAddSourceSinkPrecondition(sourceVertex) checkAddSourceSinkPrecondition(sourceVertex)
checkAddFanPrecondition(sink, in = true) checkAddFanPrecondition(sink, in = true)
graph.addLEdge(sourceVertex, sink)(flow) addGraphEdge(sourceVertex, sink, flow)
this this
} }
def addEdge[In, Out](source: UndefinedSource[In], flow: ProcessorFlow[In, Out], sink: Junction[Out]): this.type = { def addEdge[In, Out](source: UndefinedSource[In], flow: ProcessorFlow[In, Out], sink: Junction[Out]): this.type = {
checkAddSourceSinkPrecondition(source) checkAddSourceSinkPrecondition(source)
checkAddFanPrecondition(sink, in = true) checkAddFanPrecondition(sink, in = true)
graph.addLEdge(source, sink)(flow) addGraphEdge(source, sink, flow)
this this
} }
@ -195,21 +202,21 @@ class FlowGraphBuilder private (graph: Graph[FlowGraphInternal.Vertex, LDiEdge])
val sinkVertex = SinkVertex(sink) val sinkVertex = SinkVertex(sink)
checkAddSourceSinkPrecondition(sinkVertex) checkAddSourceSinkPrecondition(sinkVertex)
checkAddFanPrecondition(source, in = false) checkAddFanPrecondition(source, in = false)
graph.addLEdge(source, sinkVertex)(flow) addGraphEdge(source, sinkVertex, flow)
this this
} }
def addEdge[In, Out](source: Junction[In], flow: ProcessorFlow[In, Out], sink: UndefinedSink[Out]): this.type = { def addEdge[In, Out](source: Junction[In], flow: ProcessorFlow[In, Out], sink: UndefinedSink[Out]): this.type = {
checkAddSourceSinkPrecondition(sink) checkAddSourceSinkPrecondition(sink)
checkAddFanPrecondition(source, in = false) checkAddFanPrecondition(source, in = false)
graph.addLEdge(source, sink)(flow) addGraphEdge(source, sink, flow)
this this
} }
def addEdge[In, Out](source: Junction[In], flow: ProcessorFlow[In, Out], sink: Junction[Out]): this.type = { def addEdge[In, Out](source: Junction[In], flow: ProcessorFlow[In, Out], sink: Junction[Out]): this.type = {
checkAddFanPrecondition(source, in = false) checkAddFanPrecondition(source, in = false)
checkAddFanPrecondition(sink, in = true) checkAddFanPrecondition(sink, in = true)
graph.addLEdge(source, sink)(flow) addGraphEdge(source, sink, flow)
this this
} }
@ -223,6 +230,13 @@ class FlowGraphBuilder private (graph: Graph[FlowGraphInternal.Vertex, LDiEdge])
this this
} }
private def addGraphEdge[In, Out](from: Vertex, to: Vertex, flow: ProcessorFlow[In, Out]): Unit = {
if (edgeQualifier == Int.MaxValue) throw new IllegalArgumentException(s"Too many edges")
val label = EdgeLabel(edgeQualifier)(flow.asInstanceOf[ProcessorFlow[Any, Any]])
graph.addLEdge(from, to)(label)
edgeQualifier += 1
}
def attachSink[Out](token: UndefinedSink[Out], sink: Sink[Out]): this.type = { def attachSink[Out](token: UndefinedSink[Out], sink: Sink[Out]): this.type = {
graph.find(token) match { graph.find(token) match {
case Some(existing) case Some(existing)
@ -286,8 +300,8 @@ class FlowGraphBuilder private (graph: Graph[FlowGraphInternal.Vertex, LDiEdge])
} }
//convert it to an immutable.Graph //convert it to an immutable.Graph
private def immutableGraph(): ImmutableGraph[Vertex, LDiEdge] = private def immutableGraph(): ImmutableGraph[Vertex, LkDiEdge] =
ImmutableGraph.from(edges = graph.edges.map(e LDiEdge(e.from.value, e.to.value)(e.label)).toIterable) ImmutableGraph.from(edges = graph.edges.map(e LkDiEdge(e.from.value, e.to.value)(e.label)).toIterable)
private def checkPartialBuildPreconditions(): Unit = { private def checkPartialBuildPreconditions(): Unit = {
graph.findCycle match { graph.findCycle match {
@ -346,7 +360,7 @@ object FlowGraph {
* Build a [[FlowGraph]] from scratch. * Build a [[FlowGraph]] from scratch.
*/ */
def apply(block: FlowGraphBuilder Unit): FlowGraph = def apply(block: FlowGraphBuilder Unit): FlowGraph =
apply(ImmutableGraph.empty[FlowGraphInternal.Vertex, LDiEdge])(block) apply(ImmutableGraph.empty[FlowGraphInternal.Vertex, LkDiEdge])(block)
/** /**
* Continue building a [[FlowGraph]] from an existing `PartialFlowGraph`. * Continue building a [[FlowGraph]] from an existing `PartialFlowGraph`.
@ -363,7 +377,7 @@ object FlowGraph {
def apply(flowGraph: FlowGraph)(block: FlowGraphBuilder Unit): FlowGraph = def apply(flowGraph: FlowGraph)(block: FlowGraphBuilder Unit): FlowGraph =
apply(flowGraph.graph)(block) apply(flowGraph.graph)(block)
private def apply(graph: ImmutableGraph[FlowGraphInternal.Vertex, LDiEdge])(block: FlowGraphBuilder Unit): FlowGraph = { private def apply(graph: ImmutableGraph[FlowGraphInternal.Vertex, LkDiEdge])(block: FlowGraphBuilder Unit): FlowGraph = {
val builder = new FlowGraphBuilder(graph) val builder = new FlowGraphBuilder(graph)
block(builder) block(builder)
builder.build() builder.build()
@ -373,7 +387,7 @@ object FlowGraph {
/** /**
* Concrete flow graph that can be materialized with [[#run]]. * Concrete flow graph that can be materialized with [[#run]].
*/ */
class FlowGraph private[akka] (private[akka] val graph: ImmutableGraph[FlowGraphInternal.Vertex, LDiEdge]) { class FlowGraph private[akka] (private[akka] val graph: ImmutableGraph[FlowGraphInternal.Vertex, LkDiEdge]) {
import FlowGraphInternal._ import FlowGraphInternal._
/** /**
@ -401,7 +415,7 @@ class FlowGraph private[akka] (private[akka] val graph: ImmutableGraph[FlowGraph
if (memo.visited(edge)) { if (memo.visited(edge)) {
memo memo
} else { } else {
val flow = edge.label.asInstanceOf[ProcessorFlow[Any, Any]] val flow = edge.label.asInstanceOf[EdgeLabel].flow
// returns the materialized sink, if any // returns the materialized sink, if any
def connectToDownstream(publisher: Publisher[Any]): Option[(SinkWithKey[_, _], Any)] = { def connectToDownstream(publisher: Publisher[Any]): Option[(SinkWithKey[_, _], Any)] = {
@ -496,7 +510,7 @@ object PartialFlowGraph {
* Build a [[PartialFlowGraph]] from scratch. * Build a [[PartialFlowGraph]] from scratch.
*/ */
def apply(block: FlowGraphBuilder Unit): PartialFlowGraph = def apply(block: FlowGraphBuilder Unit): PartialFlowGraph =
apply(ImmutableGraph.empty[FlowGraphInternal.Vertex, LDiEdge])(block) apply(ImmutableGraph.empty[FlowGraphInternal.Vertex, LkDiEdge])(block)
/** /**
* Continue building a [[PartialFlowGraph]] from an existing `PartialFlowGraph`. * Continue building a [[PartialFlowGraph]] from an existing `PartialFlowGraph`.
@ -510,7 +524,7 @@ object PartialFlowGraph {
def apply(flowGraph: FlowGraph)(block: FlowGraphBuilder Unit): PartialFlowGraph = def apply(flowGraph: FlowGraph)(block: FlowGraphBuilder Unit): PartialFlowGraph =
apply(flowGraph.graph)(block) apply(flowGraph.graph)(block)
private def apply(graph: ImmutableGraph[FlowGraphInternal.Vertex, LDiEdge])(block: FlowGraphBuilder Unit): PartialFlowGraph = { private def apply(graph: ImmutableGraph[FlowGraphInternal.Vertex, LkDiEdge])(block: FlowGraphBuilder Unit): PartialFlowGraph = {
val builder = new FlowGraphBuilder(graph) val builder = new FlowGraphBuilder(graph)
block(builder) block(builder)
builder.partialBuild() builder.partialBuild()
@ -522,7 +536,7 @@ object PartialFlowGraph {
* `PartialFlowGraph` may have sources and sinks that are not attach, and it can therefore not * `PartialFlowGraph` may have sources and sinks that are not attach, and it can therefore not
* be `run`. * be `run`.
*/ */
class PartialFlowGraph private[akka] (private[akka] val graph: ImmutableGraph[FlowGraphInternal.Vertex, LDiEdge]) { class PartialFlowGraph private[akka] (private[akka] val graph: ImmutableGraph[FlowGraphInternal.Vertex, LkDiEdge]) {
import FlowGraphInternal._ import FlowGraphInternal._
def undefinedSources: Set[UndefinedSource[_]] = def undefinedSources: Set[UndefinedSource[_]] =

View file

@ -121,12 +121,7 @@ class FlowGraphCompileSpec extends AkkaSpec {
val merge = Merge[String] val merge = Merge[String]
import FlowGraphImplicits._ import FlowGraphImplicits._
in1 ~> f1 ~> bcast ~> f2 ~> merge ~> f3 ~> out1 in1 ~> f1 ~> bcast ~> f2 ~> merge ~> f3 ~> out1
bcast ~> f4 ~> bcast2 ~> f5 ~> merge bcast ~> f4 ~> merge
bcast2 ~> f6 ~> out2
// FIXME the following is doesn't work because of edge equality
// in1 ~> f1 ~> bcast ~> f2 ~> merge ~> f3 ~> out1
// bcast ~> f4 ~> merge
}.run() }.run()
} }