Merge pull request #16193 from drewhk/wip-16109-new-graph-lib-drewhk

=str  #16109: Replace ScalaGraph with custom graph impl
This commit is contained in:
drewhk 2014-11-05 14:07:34 +01:00
commit a5efcb5ef6
4 changed files with 683 additions and 84 deletions

View file

@ -0,0 +1,382 @@
/**
* Copyright (C) 2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.stream.impl
import akka.stream.testkit.AkkaSpec
class DirectedGraphBuilderSpec extends AkkaSpec {
"DirectedGraphBuilder" must {
"add and remove vertices" in {
val g = new DirectedGraphBuilder[String, Int]
g.contains(1) should be(false)
g.nodes.isEmpty should be(true)
g.edges.isEmpty should be(true)
g.find(1) should be(None)
g.addVertex(1)
g.contains(1) should be(true)
g.nodes.size should be(1)
g.edges.isEmpty should be(true)
g.find(1) should be(Some(Vertex(1)))
g.addVertex(1)
g.contains(1) should be(true)
g.nodes.size should be(1)
g.edges.isEmpty should be(true)
g.find(1) should be(Some(Vertex(1)))
g.addVertex(2)
g.contains(1) should be(true)
g.contains(2) should be(true)
g.nodes.size should be(2)
g.find(1) should be(Some(Vertex(1)))
g.find(2) should be(Some(Vertex(2)))
g.remove(2)
g.contains(1) should be(true)
g.contains(2) should be(false)
g.nodes.size should be(1)
g.edges.isEmpty should be(true)
g.find(1) should be(Some(Vertex(1)))
g.find(2) should be(None)
}
"add and remove edges" in {
val g = new DirectedGraphBuilder[String, Int]
g.nodes.size should be(0)
g.edges.size should be(0)
g.contains(1) should be(false)
g.contains(2) should be(false)
g.contains(3) should be(false)
g.containsEdge("1 -> 2") should be(false)
g.containsEdge("2 -> 3") should be(false)
g.addEdge(1, 2, "1 -> 2")
g.nodes.size should be(2)
g.edges.size should be(1)
g.contains(1) should be(true)
g.contains(2) should be(true)
g.containsEdge("1 -> 2") should be(true)
g.get(1).incoming.isEmpty should be(true)
g.get(1).outgoing.head.label should be("1 -> 2")
g.get(2).outgoing.isEmpty should be(true)
g.get(2).incoming.head.label should be("1 -> 2")
g.get(1).outgoing.head.from.label should be(1)
g.get(1).outgoing.head.to.label should be(2)
g.addEdge(2, 3, "2 -> 3")
g.nodes.size should be(3)
g.edges.size should be(2)
g.contains(1) should be(true)
g.contains(2) should be(true)
g.contains(3) should be(true)
g.containsEdge("1 -> 2") should be(true)
g.containsEdge("2 -> 3") should be(true)
g.get(1).incoming.isEmpty should be(true)
g.get(1).outgoing.head.label should be("1 -> 2")
g.get(2).outgoing.head.label should be("2 -> 3")
g.get(2).incoming.head.label should be("1 -> 2")
g.get(3).incoming.head.label should be("2 -> 3")
g.get(3).outgoing.isEmpty should be(true)
g.get(1).outgoing.head.from.label should be(1)
g.get(1).outgoing.head.to.label should be(2)
g.get(2).outgoing.head.from.label should be(2)
g.get(2).outgoing.head.to.label should be(3)
// Will reposition edge
g.addEdge(2, 4, "2 -> 3")
g.nodes.size should be(4)
g.edges.size should be(2)
g.contains(1) should be(true)
g.contains(2) should be(true)
g.contains(3) should be(true)
g.contains(4) should be(true)
g.containsEdge("1 -> 2") should be(true)
g.containsEdge("2 -> 3") should be(true)
g.get(1).incoming.isEmpty should be(true)
g.get(1).outgoing.head.label should be("1 -> 2")
g.get(2).outgoing.head.label should be("2 -> 3")
g.get(2).incoming.head.label should be("1 -> 2")
g.get(3).incoming.isEmpty should be(true)
g.get(3).outgoing.isEmpty should be(true)
g.get(4).incoming.head.label should be("2 -> 3")
g.get(4).outgoing.isEmpty should be(true)
g.get(1).outgoing.head.from.label should be(1)
g.get(1).outgoing.head.to.label should be(2)
g.get(2).outgoing.head.from.label should be(2)
g.get(2).outgoing.head.to.label should be(4)
// Will remove dangling edge
g.remove(4)
g.nodes.size should be(3)
g.edges.size should be(1)
g.contains(1) should be(true)
g.contains(2) should be(true)
g.contains(3) should be(true)
g.contains(4) should be(false)
g.containsEdge("1 -> 2") should be(true)
g.containsEdge("2 -> 3") should be(false)
g.get(1).incoming.isEmpty should be(true)
g.get(1).outgoing.head.label should be("1 -> 2")
g.get(2).outgoing.isEmpty should be(true)
g.get(2).incoming.head.label should be("1 -> 2")
g.get(3).incoming.isEmpty should be(true)
g.get(3).outgoing.isEmpty should be(true)
g.get(1).outgoing.head.from.label should be(1)
g.get(1).outgoing.head.to.label should be(2)
// Remove remaining edge
g.removeEdge("1 -> 2")
g.nodes.size should be(3)
g.edges.isEmpty should be(true)
g.contains(1) should be(true)
g.contains(2) should be(true)
g.contains(3) should be(true)
g.contains(4) should be(false)
g.containsEdge("1 -> 2") should be(false)
g.containsEdge("2 -> 3") should be(false)
g.get(1).incoming.isEmpty should be(true)
g.get(1).outgoing.isEmpty should be(true)
g.get(2).outgoing.isEmpty should be(true)
g.get(2).incoming.isEmpty should be(true)
g.get(3).incoming.isEmpty should be(true)
g.get(3).outgoing.isEmpty should be(true)
}
}
"work correctly with isolated nodes" in {
val g = new DirectedGraphBuilder[String, Int]
(1 to 99) foreach { i
g.addVertex(i)
g.nodes.size should be(i)
g.find(i) should be(Some(Vertex(i)))
}
g.isWeaklyConnected should be(false)
g.findCycle.isEmpty should be(true)
g.edgePredecessorBFSfoldLeft(g.get(99))(true) { (_, _) false } should be(true)
}
"work correctly with simple chains" in {
val g = new DirectedGraphBuilder[String, Int]
(1 to 99) foreach { i
g.addEdge(i, i + 1, s"$i -> ${i + 1}")
g.nodes.size should be(i + 1)
g.edges.size should be(i)
g.find(i) should be(Some(Vertex(i)))
g.find(i + 1) should be(Some(Vertex(i + 1)))
g.edges.contains(s"$i -> ${i + 1}")
}
g.isWeaklyConnected should be(true)
g.findCycle.isEmpty should be(true)
g.edgePredecessorBFSfoldLeft(g.get(100))(100) { (sum, e) sum + e.from.label } should be(5050)
(1 to 100) foreach (g.remove(_))
g.nodes.isEmpty should be(true)
g.edges.isEmpty should be(true)
}
"work correctly with weakly connected chains" in {
val g = new DirectedGraphBuilder[String, Int]
(1 to 49) foreach { i
g.addEdge(i, i + 1, s"$i -> ${i + 1}")
g.nodes.size should be(i + 1)
g.edges.size should be(i)
g.find(i) should be(Some(Vertex(i)))
g.find(i + 1) should be(Some(Vertex(i + 1)))
g.edges.contains(s"$i -> ${i + 1}")
}
(100 to 51 by -1) foreach { i
g.addEdge(i, i - 1, s"$i -> ${i - 1}")
g.find(i) should be(Some(Vertex(i)))
g.find(i - 1) should be(Some(Vertex(i - 1)))
g.edges.contains(s"$i -> ${i - 1}")
}
g.nodes.size should be(100)
g.edges.size should be(99)
g.isWeaklyConnected should be(true)
g.findCycle.isEmpty should be(true)
g.edgePredecessorBFSfoldLeft(g.get(50))(50) { (sum, e) sum + e.from.label } should be(5050)
(1 to 100) foreach (g.remove(_))
g.nodes.isEmpty should be(true)
g.edges.isEmpty should be(true)
}
"work correctly with directed cycles" in {
val g = new DirectedGraphBuilder[String, Int]
(1 to 99) foreach { i
g.addEdge(i, i + 1, s"$i -> ${i + 1}")
g.nodes.size should be(i + 1)
g.edges.size should be(i)
g.find(i) should be(Some(Vertex(i)))
g.find(i + 1) should be(Some(Vertex(i + 1)))
g.edges.contains(s"$i -> ${i + 1}")
}
g.addEdge(100, 1, "100 -> 1")
g.nodes.size should be(100)
g.edges.size should be(100)
g.isWeaklyConnected should be(true)
g.findCycle.toSet.size should be(100)
g.findCycle.toSet should be((1 to 100).map(Vertex(_)).toSet)
g.edgePredecessorBFSfoldLeft(g.get(100))(0) { (sum, e) sum + e.from.label } should be(5050)
(1 to 100) foreach (g.remove(_))
g.nodes.isEmpty should be(true)
g.edges.isEmpty should be(true)
}
"work correctly with undirected cycles" in {
val g = new DirectedGraphBuilder[String, Int]
(1 to 49) foreach { i
g.addEdge(i, i + 1, s"$i -> ${i + 1}")
g.nodes.size should be(i + 1)
g.edges.size should be(i)
g.find(i) should be(Some(Vertex(i)))
g.find(i + 1) should be(Some(Vertex(i + 1)))
g.edges.contains(s"$i -> ${i + 1}")
}
(100 to 51 by -1) foreach { i
g.addEdge(i, i - 1, s"$i -> ${i - 1}")
g.find(i) should be(Some(Vertex(i)))
g.find(i - 1) should be(Some(Vertex(i - 1)))
g.edges.contains(s"$i -> ${i - 1}")
}
g.addEdge(100, 1, "100 -> 1")
g.nodes.size should be(100)
g.edges.size should be(100)
g.isWeaklyConnected should be(true)
g.findCycle.isEmpty should be(true)
g.edgePredecessorBFSfoldLeft(g.get(50))(50) { (sum, e) sum + e.from.label } should be(5150)
(1 to 100) foreach (g.remove(_))
g.nodes.isEmpty should be(true)
g.edges.isEmpty should be(true)
}
"work correctly with two linked cycles, both directed" in {
val g = new DirectedGraphBuilder[String, Int]
g.addEdge(0, 1, "0 -> 1")
g.addEdge(1, 2, "1 -> 2")
g.addEdge(2, 0, "2 -> 0")
g.addEdge(1, 3, "1 -> 3")
g.addEdge(3, 0, "3 -> 0")
g.nodes.size should be(4)
g.isWeaklyConnected should be(true)
g.findCycle.nonEmpty should be(true)
g.findCycle.size should be(3)
g.removeEdge("1 -> 2")
g.isWeaklyConnected should be(true)
g.findCycle.nonEmpty should be(true)
g.findCycle.size should be(3)
g.findCycle.map(_.label).toSet should be(Set(0, 1, 3))
g.removeEdge("1 -> 3")
g.addEdge(1, 2, "1 -> 2")
g.nodes.size should be(4)
g.isWeaklyConnected should be(true)
g.findCycle.nonEmpty should be(true)
g.findCycle.size should be(3)
g.findCycle.map(_.label).toSet should be(Set(0, 1, 2))
g.removeEdge("1 -> 2")
g.isWeaklyConnected should be(true)
g.findCycle.isEmpty should be(true)
}
"work correctly with two linked cycles, one undirected" in {
val g = new DirectedGraphBuilder[String, Int]
g.addEdge(0, 1, "0 -> 1")
g.addEdge(1, 2, "1 -> 2")
g.addEdge(2, 0, "2 -> 0")
g.addEdge(1, 3, "1 -> 3")
g.addEdge(0, 3, "3 <- 0")
g.nodes.size should be(4)
g.isWeaklyConnected should be(true)
g.findCycle.nonEmpty should be(true)
g.findCycle.size should be(3)
g.findCycle.map(_.label).toSet should be(Set(0, 1, 2))
g.removeEdge("1 -> 2")
g.isWeaklyConnected should be(true)
g.findCycle.isEmpty should be(true)
g.removeEdge("1 -> 3")
g.isWeaklyConnected should be(true)
g.findCycle.isEmpty should be(true)
g.remove(0)
g.isWeaklyConnected should be(false)
g.findCycle.isEmpty should be(true)
}
"copy correctly" in {
val g1 = new DirectedGraphBuilder[String, Int]
(1 to 49) foreach { i
g1.addEdge(i, i + 1, s"$i -> ${i + 1}")
}
(100 to 51 by -1) foreach { i
g1.addEdge(i, i - 1, s"$i -> ${i - 1}")
}
g1.addEdge(0, 1, "0 -> 1")
g1.addEdge(2, 0, "2 -> 0")
g1.addEdge(1, 3, "1 -> 3")
g1.addEdge(3, 0, "3 -> 0")
g1.addVertex(200)
val g2 = g1.copy()
g2.nodes.size should be(102)
g2.nodes.toSet should be(g1.nodes.toSet)
g2.edges.toSet should be(g1.edges.toSet)
g2.nodes foreach { v2
val v1 = g1.find(v2.label).get
v1.label should be(v2.label)
v1.incoming should be(v2.incoming)
v1.outgoing should be(v2.outgoing)
v1.incoming.map(_.to) should be(v2.incoming.map(_.to))
v1.incoming.map(_.from) should be(v2.incoming.map(_.from))
v1.outgoing.map(_.to) should be(v2.outgoing.map(_.to))
v1.outgoing.map(_.from) should be(v2.outgoing.map(_.from))
}
}
}

View file

@ -303,6 +303,7 @@ class FlowGraphCompileSpec extends AkkaSpec {
val merge = Merge[String]
import FlowGraphImplicits._
in1 ~> merge ~> out1
in2 ~> merge
merge ~> out2 // wrong
}
}.getMessage should include("at most 1 outgoing")

View file

@ -0,0 +1,247 @@
/**
* Copyright (C) 2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.stream.impl
import scala.annotation.tailrec
import scala.collection.immutable
/**
* INTERNAL API
*/
private[akka] final case class Vertex[E, V](label: V) {
private var inEdgeSet = Set.empty[Edge[E, V]]
private var outEdgeSet = Set.empty[Edge[E, V]]
def addOutEdge(e: Edge[E, V]): Unit = outEdgeSet += e
def addInEdge(e: Edge[E, V]): Unit = inEdgeSet += e
def removeOutEdge(e: Edge[E, V]): Unit = outEdgeSet -= e
def removeInEdge(e: Edge[E, V]): Unit = inEdgeSet -= e
def inDegree: Int = inEdgeSet.size
def outDegree: Int = outEdgeSet.size
def isolated: Boolean = inDegree == 0 && outDegree == 0
def isSink: Boolean = outEdgeSet.isEmpty
def successors: Set[Vertex[E, V]] = outEdgeSet.map(_.to)
def predecessors: Set[Vertex[E, V]] = inEdgeSet.map(_.from)
def neighbors: Set[Vertex[E, V]] = successors ++ predecessors
def incoming: Set[Edge[E, V]] = inEdgeSet
def outgoing: Set[Edge[E, V]] = outEdgeSet
override def equals(obj: Any): Boolean = obj match {
case v: Vertex[_, _] label.equals(v.label)
case _ false
}
override def hashCode(): Int = label.hashCode()
}
/**
* INTERNAL API
*/
private[akka] final case class Edge[E, V](label: E, from: Vertex[E, V], to: Vertex[E, V]) {
override def equals(obj: Any): Boolean = obj match {
case e: Edge[_, _] label.equals(e.label)
case _ false
}
override def hashCode(): Int = label.hashCode()
}
/**
* INTERNAL API
*/
private[akka] class DirectedGraphBuilder[E, V] {
private var vertexMap = Map.empty[V, Vertex[E, V]]
private var edgeMap = Map.empty[E, Edge[E, V]]
def edges: immutable.Seq[Edge[E, V]] = edgeMap.values.toVector
def nodes: immutable.Seq[Vertex[E, V]] = vertexMap.values.toVector
def nonEmpty: Boolean = vertexMap.nonEmpty
def addVertex(v: V): Vertex[E, V] = vertexMap.get(v) match {
case None
val vx = Vertex[E, V](v)
vertexMap += v -> vx
vx
case Some(vx) vx
}
def addEdge(from: V, to: V, label: E): Unit = {
val vfrom = addVertex(from)
val vto = addVertex(to)
removeEdge(label) // Need to remap existing labels
val edge = Edge[E, V](label, vfrom, vto)
edgeMap += label -> edge
vfrom.addOutEdge(edge)
vto.addInEdge(edge)
}
def find(v: V): Option[Vertex[E, V]] = vertexMap.get(v)
def get(v: V): Vertex[E, V] = vertexMap(v)
def contains(v: V): Boolean = vertexMap.contains(v)
def containsEdge(e: E): Boolean = edgeMap.contains(e)
def exists(p: Vertex[E, V] Boolean) = vertexMap.values.exists(p)
def removeEdge(label: E): Unit = edgeMap.get(label) match {
case Some(e)
edgeMap -= label
e.from.removeOutEdge(e)
e.to.removeInEdge(e)
case None
}
def remove(v: V): Unit = vertexMap.get(v) match {
case Some(vx)
vertexMap -= v
vx.incoming foreach { edge removeEdge(edge.label) }
vx.outgoing foreach { edge removeEdge(edge.label) }
case None
}
/**
* Performs a deep copy of the builder. Since the builder is mutable it is not safe to share instances of it
* without making a defensive copy first.
*/
def copy(): DirectedGraphBuilder[E, V] = {
val result = new DirectedGraphBuilder[E, V]()
edgeMap.foreach {
case (label, e)
result.addEdge(e.from.label, e.to.label, e.label)
}
vertexMap.filter(_._2.isolated) foreach {
case (_, n)
result.addVertex(n.label)
}
result
}
/**
* Returns true if for every vertex pair there is an undirected path connecting them
*/
def isWeaklyConnected: Boolean = {
if (vertexMap.isEmpty) true
else {
var unvisited = vertexMap.values.toSet
var toVisit = Set(unvisited.head)
while (toVisit.nonEmpty) {
val v = toVisit.head
unvisited -= v
toVisit = toVisit.tail
toVisit ++= v.neighbors.iterator.filter(unvisited.contains) // visit all unvisited neighbors of v (neighbors are undirected)
}
unvisited.isEmpty // if we ended up with unvisited nodes starting from one node we are unconnected
}
}
/**
* Finds a directed cycle in the graph
*/
def findCycle: immutable.Seq[Vertex[E, V]] = {
if (vertexMap.size < 2 || edgeMap.size < 2) Nil
else {
// Vertices we have not visited at all yet
var unvisited = vertexMap.values.toSet
// Attempts to find a cycle in a connected component
def findCycleInComponent(
componentEntryVertex: Vertex[E, V],
toVisit: Vertex[E, V],
cycleCandidate: List[Vertex[E, V]]): List[Vertex[E, V]] = {
if (!unvisited(toVisit)) Nil
else {
unvisited -= toVisit
val successors = toVisit.successors
if (successors.contains(componentEntryVertex)) toVisit :: cycleCandidate
else {
val newCycleCandidate = toVisit :: cycleCandidate
// search in all successors
@tailrec def traverse(toTraverse: Set[Vertex[E, V]]): List[Vertex[E, V]] = {
if (toTraverse.isEmpty) Nil
else {
val c = findCycleInComponent(componentEntryVertex, toVisit = toTraverse.head, newCycleCandidate)
if (c.nonEmpty) c
else traverse(toTraverse = toTraverse.tail)
}
}
traverse(toTraverse = successors)
}
}
}
// Traverse all weakly connected components and try to find cycles in each of them
@tailrec def findNextCycle(): List[Vertex[E, V]] = {
if (unvisited.size < 2) Nil
else {
// Pick a node to recursively start visiting its successors
val componentEntry = unvisited.head
if (componentEntry.inDegree < 1 || componentEntry.outDegree < 1) {
unvisited -= componentEntry
findNextCycle()
} else {
val cycleCandidate =
findCycleInComponent(componentEntry, toVisit = componentEntry, cycleCandidate = Nil)
if (cycleCandidate.nonEmpty) cycleCandidate
else findNextCycle()
}
}
}
findNextCycle()
}
}
def edgePredecessorBFSfoldLeft[T](start: Vertex[E, V])(zero: T)(f: (T, Edge[E, V]) T): T = {
var aggr: T = zero
var unvisited = edgeMap.values.toSet
// Queue to maintain BFS state
var toVisit = immutable.Queue() ++ start.incoming
while (toVisit.nonEmpty) {
val (e, nextToVisit) = toVisit.dequeue
toVisit = nextToVisit
unvisited -= e
aggr = f(aggr, e)
val unvisitedPredecessors = e.from.incoming.filter(unvisited.contains)
unvisited --= unvisitedPredecessors
toVisit = toVisit ++ unvisitedPredecessors
}
aggr
}
}

View file

@ -11,13 +11,10 @@ import org.reactivestreams.Publisher
import org.reactivestreams.Subscriber
import scala.language.existentials
import scalax.collection.edge.LkBase
import scalax.collection.edge.LkDiEdge
import scalax.collection.immutable.{ Graph ImmutableGraph }
import org.reactivestreams.Subscriber
import org.reactivestreams.Publisher
import akka.stream.FlowMaterializer
import scalax.collection.mutable.Graph
import akka.stream.impl.{ DirectedGraphBuilder, Edge, Vertex GVertex }
/**
* Fan-in and fan-out vertices in the [[FlowGraph]] implements
@ -433,22 +430,6 @@ final class UndefinedSource[+T](override val name: Option[String]) extends FlowG
*/
private[akka] object FlowGraphInternal {
/**
* INTERNAL API
* Workaround for issue #16109. Reflection, used by scalax.graph, is not thread
* safe. Initialize one graph before it is used for real.
*/
private[akka] val reflectionIssueWorkaround = {
val graph = ImmutableGraph.empty[FlowGraphInternal.Vertex, FlowGraphInternal.EdgeType]
val builder = new FlowGraphBuilder(graph)
val merge = Merge[String]
builder.
addEdge(Source.empty[String], merge).
addEdge(Source.empty[String], merge).
addEdge(merge, Sink.ignore)
builder.build()
}
def throwUnsupportedValue(x: Any): Nothing =
throw new IllegalArgumentException(s"Unsupported value [$x] of type [${x.getClass.getName}]. Only Pipes and Graphs are supported!")
@ -541,11 +522,6 @@ private[akka] object FlowGraphInternal {
}
type EdgeType[T] = LkDiEdge[T] { type L1 = EdgeLabel }
def edges(graph: scalax.collection.Graph[Vertex, EdgeType]): Iterable[EdgeType[Vertex]] =
graph.edges.map(e LkDiEdge(e.from.value, e.to.value)(e.label))
}
object FlowGraphBuilder {
@ -559,17 +535,14 @@ object FlowGraphBuilder {
* Builder of [[FlowGraph]] and [[PartialFlowGraph]].
* Syntactic sugar is provided by [[FlowGraphImplicits]].
*/
class FlowGraphBuilder private (graph: Graph[FlowGraphInternal.Vertex, FlowGraphInternal.EdgeType]) {
class FlowGraphBuilder private[akka] (_graph: DirectedGraphBuilder[FlowGraphInternal.EdgeLabel, FlowGraphInternal.Vertex]) {
import FlowGraphInternal._
private val graph = new DirectedGraphBuilder[FlowGraphInternal.EdgeLabel, FlowGraphInternal.Vertex]()
private[akka] def this() = this(Graph.empty[FlowGraphInternal.Vertex, FlowGraphInternal.EdgeType])
private[akka] def this() = this(new DirectedGraphBuilder[FlowGraphInternal.EdgeLabel, FlowGraphInternal.Vertex]())
private[akka] def this(immutableGraph: ImmutableGraph[FlowGraphInternal.Vertex, FlowGraphInternal.EdgeType]) =
this(Graph.from(edges = FlowGraphInternal.edges(immutableGraph)))
private implicit val edgeFactory = scalax.collection.edge.LkDiEdge.asInstanceOf[LkBase.LkEdgeCompanion[EdgeType]]
var edgeQualifier = graph.edges.size
private var edgeQualifier = 0
importGraph(_graph)
private var cyclesAllowed = false
@ -777,7 +750,7 @@ class FlowGraphBuilder private (graph: Graph[FlowGraphInternal.Vertex, FlowGraph
private def uncheckedAddGraphEdge[In, Out](from: Vertex, to: Vertex, pipe: Pipe[In, Out], inputPort: Int, outputPort: Int): Unit = {
if (edgeQualifier == Int.MaxValue) throw new IllegalArgumentException(s"Too many edges")
val label = EdgeLabel(edgeQualifier)(pipe.asInstanceOf[Pipe[Any, Nothing]], inputPort, outputPort)
graph.addLEdge(from, to)(label)
graph.addEdge(from, to, label)
edgeQualifier += 1
}
@ -797,15 +770,15 @@ class FlowGraphBuilder private (graph: Graph[FlowGraphInternal.Vertex, FlowGraph
graph.find(token) match {
case Some(existing)
val edge = existing.incoming.head
graph.remove(existing)
graph.remove(existing.label)
sink match {
case spipe: SinkPipe[Out]
val pipe = edge.label.pipe.appendPipe(Pipe(spipe.ops))
addGraphEdge(edge.from.value, SinkVertex(spipe.output), pipe, edge.label.inputPort, edge.label.outputPort)
addGraphEdge(edge.from.label, SinkVertex(spipe.output), pipe, edge.label.inputPort, edge.label.outputPort)
case gsink: GraphSink[Out, _]
gsink.importAndConnect(this, token)
case sink: Sink[Out]
addGraphEdge(edge.from.value, SinkVertex(sink), edge.label.pipe, edge.label.inputPort, edge.label.outputPort)
addGraphEdge(edge.from.label, SinkVertex(sink), edge.label.pipe, edge.label.inputPort, edge.label.outputPort)
}
case None throw new IllegalArgumentException(s"No matching UndefinedSink [${token}]")
@ -817,15 +790,15 @@ class FlowGraphBuilder private (graph: Graph[FlowGraphInternal.Vertex, FlowGraph
graph.find(token) match {
case Some(existing)
val edge = existing.outgoing.head
graph.remove(existing)
graph.remove(existing.label)
source match {
case spipe: SourcePipe[In]
val pipe = Pipe(spipe.ops).appendPipe(edge.label.pipe)
addGraphEdge(SourceVertex(spipe.input), edge.to.value, pipe, edge.label.inputPort, edge.label.outputPort)
addGraphEdge(SourceVertex(spipe.input), edge.to.label, pipe, edge.label.inputPort, edge.label.outputPort)
case gsource: GraphSource[_, In]
gsource.importAndConnect(this, token)
case source: Source[In]
addGraphEdge(SourceVertex(source), edge.to.value, edge.label.pipe, edge.label.inputPort, edge.label.outputPort)
addGraphEdge(SourceVertex(source), edge.to.label, edge.label.pipe, edge.label.inputPort, edge.label.outputPort)
case x throwUnsupportedValue(x)
}
@ -851,7 +824,7 @@ class FlowGraphBuilder private (graph: Graph[FlowGraphInternal.Vertex, FlowGraph
val newPipe = outEdge.label.pipe.appendPipe(pipe.asInstanceOf[Pipe[Any, Nothing]]).appendPipe(inEdge.label.pipe)
graph.remove(out)
graph.remove(in)
addOrReplaceGraphEdge(outEdge.from.value, inEdge.to.value, newPipe, inEdge.label.inputPort, outEdge.label.outputPort)
addOrReplaceGraphEdge(outEdge.from.label, inEdge.to.label, newPipe, inEdge.label.inputPort, outEdge.label.outputPort)
case gflow: GraphFlow[A, _, _, B]
gflow.importAndConnect(this, out, in)
case x throwUnsupportedValue(x)
@ -878,9 +851,9 @@ class FlowGraphBuilder private (graph: Graph[FlowGraphInternal.Vertex, FlowGraph
this
}
private def importGraph(immutableGraph: ImmutableGraph[Vertex, EdgeType]): Unit =
immutableGraph.edges foreach { edge
addGraphEdge(edge.from.value, edge.to.value, edge.label.pipe, edge.label.inputPort, edge.label.outputPort)
private def importGraph(builder: DirectedGraphBuilder[EdgeLabel, Vertex]): Unit =
builder.edges foreach { edge
addGraphEdge(edge.from.label, edge.to.label, edge.label.pipe, edge.label.inputPort, edge.label.outputPort)
}
private[scaladsl] def remapPartialFlowGraph(partialFlowGraph: PartialFlowGraph, vertexMapping: Map[Vertex, Vertex]): this.type = {
@ -888,7 +861,7 @@ class FlowGraphBuilder private (graph: Graph[FlowGraphInternal.Vertex, FlowGraph
def get(vertex: Vertex): Vertex = mapping.getOrElseUpdate(vertex, vertex.newInstance())
partialFlowGraph.graph.edges.foreach { edge
addGraphEdge(get(edge.from.value), get(edge.to.value), edge.label.pipe, edge.label.inputPort, edge.label.outputPort)
addGraphEdge(get(edge.from.label), get(edge.to.label), edge.label.pipe, edge.label.inputPort, edge.label.outputPort)
}
this
@ -941,8 +914,8 @@ class FlowGraphBuilder private (graph: Graph[FlowGraphInternal.Vertex, FlowGraph
graph.find(iv) match {
case Some(node)
require(
(node.inDegree + 1) <= iv.maximumInputCount,
s"${node.value} must have at most ${iv.maximumInputCount} incoming edges, has ${node.inDegree}\n${graph.edges}")
node.inDegree <= iv.maximumInputCount,
s"${node.label} must have at most ${iv.maximumInputCount} incoming edges, has ${node.inDegree}\n${graph.edges}")
case _ // ok
}
case _ // ok, no checks here
@ -955,8 +928,8 @@ class FlowGraphBuilder private (graph: Graph[FlowGraphInternal.Vertex, FlowGraph
graph.find(iv) match {
case Some(node)
require(
(node.outDegree + 1) <= iv.maximumOutputCount,
s"${node.value} must have at most ${iv.maximumOutputCount} outgoing edges, has ${node.outDegree}\n${graph.edges}")
node.outDegree <= iv.maximumOutputCount,
s"${node.label} must have at most ${iv.maximumOutputCount} outgoing edges, has ${node.outDegree}\n${graph.edges}")
case _ // ok
}
case _ // ok, no checks here
@ -969,7 +942,7 @@ class FlowGraphBuilder private (graph: Graph[FlowGraphInternal.Vertex, FlowGraph
private[akka] def build(): FlowGraph = {
checkPartialBuildPreconditions()
checkBuildPreconditions()
new FlowGraph(immutableGraph())
new FlowGraph(graph.copy())
}
/**
@ -977,30 +950,26 @@ class FlowGraphBuilder private (graph: Graph[FlowGraphInternal.Vertex, FlowGraph
*/
private[akka] def partialBuild(): PartialFlowGraph = {
checkPartialBuildPreconditions()
new PartialFlowGraph(immutableGraph())
}
//convert it to an immutable.Graph
private def immutableGraph(): ImmutableGraph[Vertex, FlowGraphInternal.EdgeType] = {
ImmutableGraph.from(edges = FlowGraphInternal.edges(graph))
new PartialFlowGraph(graph.copy())
}
private def checkPartialBuildPreconditions(): Unit = {
if (!cyclesAllowed) graph.findCycle match {
case None
case Some(cycle) throw new IllegalArgumentException("Cycle detected, not supported yet. " + cycle)
if (!cyclesAllowed) {
val cycle = graph.findCycle
if (cycle.nonEmpty)
throw new IllegalArgumentException("Cycle detected, but cycle support was not enabled. Cycle is " + cycle.map(_.label).mkString(" -> "))
}
}
private def checkBuildPreconditions(): Unit = {
val undefinedSourcesSinks = graph.nodes.filter {
_.value match {
_.label match {
case _: UndefinedSource[_] | _: UndefinedSink[_] true
case x false
}
}
if (undefinedSourcesSinks.nonEmpty) {
val formatted = undefinedSourcesSinks.map(n n.value match {
val formatted = undefinedSourcesSinks.map(n n.label match {
case u: UndefinedSource[_] s"$u -> ${n.outgoing.head.label} -> ${n.outgoing.head.to}"
case u: UndefinedSink[_] s"${n.incoming.head.from} -> ${n.incoming.head.label} -> $u"
})
@ -1008,7 +977,7 @@ class FlowGraphBuilder private (graph: Graph[FlowGraphInternal.Vertex, FlowGraph
}
graph.nodes.foreach { node
node.value match {
node.label match {
case v: InternalVertex
require(
node.inDegree >= v.minimumInputCount,
@ -1034,12 +1003,12 @@ class FlowGraphBuilder private (graph: Graph[FlowGraphInternal.Vertex, FlowGraph
}
require(graph.nonEmpty, "Graph must not be empty")
require(graph.exists(graph having ((node = { n n.isLeaf && n.diSuccessors.isEmpty }))),
require(graph.exists(_.outDegree == 0),
"Graph must have at least one sink")
require(graph.exists(graph having ((node = { n n.isLeaf && n.diPredecessors.isEmpty }))),
require(graph.exists(_.inDegree == 0),
"Graph must have at least one source")
require(graph.isConnected, "Graph must be connected")
require(graph.isWeaklyConnected, "Graph must be connected")
}
}
@ -1055,7 +1024,7 @@ object FlowGraph {
* Build a [[FlowGraph]] from scratch.
*/
def apply(block: FlowGraphBuilder Unit): FlowGraph =
apply(ImmutableGraph.empty[FlowGraphInternal.Vertex, FlowGraphInternal.EdgeType])(block)
apply(new DirectedGraphBuilder[FlowGraphInternal.EdgeLabel, FlowGraphInternal.Vertex])(block)
/**
* Continue building a [[FlowGraph]] from an existing `PartialFlowGraph`.
@ -1072,7 +1041,7 @@ object FlowGraph {
def apply(flowGraph: FlowGraph)(block: FlowGraphBuilder Unit): FlowGraph =
apply(flowGraph.graph)(block)
private def apply(graph: ImmutableGraph[FlowGraphInternal.Vertex, FlowGraphInternal.EdgeType])(block: FlowGraphBuilder Unit): FlowGraph = {
private def apply(graph: DirectedGraphBuilder[FlowGraphInternal.EdgeLabel, FlowGraphInternal.Vertex])(block: FlowGraphBuilder Unit): FlowGraph = {
val builder = new FlowGraphBuilder(graph)
block(builder)
builder.build()
@ -1085,7 +1054,7 @@ object FlowGraph {
* Build a `FlowGraph` by starting with one of the `apply` methods in
* in [[FlowGraph$ companion object]]. Syntactic sugar is provided by [[FlowGraphImplicits]].
*/
class FlowGraph private[akka] (private[akka] val graph: ImmutableGraph[FlowGraphInternal.Vertex, FlowGraphInternal.EdgeType]) extends RunnableFlow {
class FlowGraph private[akka] (private[akka] val graph: DirectedGraphBuilder[FlowGraphInternal.EdgeLabel, FlowGraphInternal.Vertex]) extends RunnableFlow {
import FlowGraphInternal._
/**
@ -1095,7 +1064,7 @@ class FlowGraph private[akka] (private[akka] val graph: ImmutableGraph[FlowGraph
val edges = graph.edges
if (edges.size == 1) {
val edge = edges.head
(edge.from.value, edge.to.value) match {
(edge.from.label, edge.to.label) match {
case (sourceVertex: SourceVertex, sinkVertex: SinkVertex)
val pipe = edge.label.pipe
runSimple(sourceVertex, sinkVertex, pipe)
@ -1123,23 +1092,22 @@ class FlowGraph private[akka] (private[akka] val graph: ImmutableGraph[FlowGraph
}
private def runGraph()(implicit materializer: FlowMaterializer): MaterializedMap = {
import scalax.collection.GraphTraversal._
// start with sinks
val startingNodes = graph.nodes.filter(n n.isLeaf && n.diSuccessors.isEmpty)
val startingNodes = graph.nodes.filter(_.isSink)
case class Memo(visited: Set[graph.EdgeT] = Set.empty,
downstreamSubscriber: Map[graph.EdgeT, Subscriber[Any]] = Map.empty,
upstreamPublishers: Map[graph.EdgeT, Publisher[Any]] = Map.empty,
type E = Edge[FlowGraphInternal.EdgeLabel, FlowGraphInternal.Vertex]
case class Memo(visited: Set[E] = Set.empty,
downstreamSubscriber: Map[E, Subscriber[Any]] = Map.empty,
upstreamPublishers: Map[E, Publisher[Any]] = Map.empty,
sources: Map[SourceVertex, SinkPipe[Any]] = Map.empty,
materializedSinks: Map[KeyedSink[_], Any] = Map.empty)
val result = startingNodes.foldLeft(Memo()) {
case (memo, start)
val traverser = graph.innerEdgeTraverser(start, parameters = Parameters(direction = Predecessors, kind = BreadthFirst),
ordering = graph.defaultEdgeOrdering)
traverser.foldLeft(memo) {
graph.edgePredecessorBFSfoldLeft(start)(memo) {
case (memo, edge)
if (memo.visited(edge)) {
memo
@ -1149,7 +1117,7 @@ class FlowGraph private[akka] (private[akka] val graph: ImmutableGraph[FlowGraph
// returns the materialized sink, if any
def connectToDownstream(publisher: Publisher[Any]): Option[(KeyedSink[_], Any)] = {
val f = pipe.withSource(PublisherSource(publisher))
edge.to.value match {
edge.to.label match {
case SinkVertex(sink: KeyedSink[_])
val mf = f.withSink(sink).run()
Some(sink -> mf.get(sink))
@ -1162,7 +1130,7 @@ class FlowGraph private[akka] (private[akka] val graph: ImmutableGraph[FlowGraph
}
}
edge.from.value match {
edge.from.label match {
case source: SourceVertex
val f = pipe.withSink(SubscriberSink(memo.downstreamSubscriber(edge)))
// connect the source with the pipe later
@ -1228,7 +1196,7 @@ object PartialFlowGraph {
* Build a [[PartialFlowGraph]] from scratch.
*/
def apply(block: FlowGraphBuilder Unit): PartialFlowGraph =
apply(ImmutableGraph.empty[FlowGraphInternal.Vertex, FlowGraphInternal.EdgeType])(block)
apply(new DirectedGraphBuilder[FlowGraphInternal.EdgeLabel, FlowGraphInternal.Vertex])(block)
/**
* Continue building a [[PartialFlowGraph]] from an existing `PartialFlowGraph`.
@ -1242,7 +1210,8 @@ object PartialFlowGraph {
def apply(flowGraph: FlowGraph)(block: FlowGraphBuilder Unit): PartialFlowGraph =
apply(flowGraph.graph)(block)
private def apply(graph: ImmutableGraph[FlowGraphInternal.Vertex, FlowGraphInternal.EdgeType])(block: FlowGraphBuilder Unit): PartialFlowGraph = {
private def apply(graph: DirectedGraphBuilder[FlowGraphInternal.EdgeLabel, FlowGraphInternal.Vertex])(block: FlowGraphBuilder Unit): PartialFlowGraph = {
// FlowGraphBuilder does a full import on the passed graph, so no defensive copy needed
val builder = new FlowGraphBuilder(graph)
block(builder)
builder.partialBuild()
@ -1256,16 +1225,16 @@ object PartialFlowGraph {
* Build a `PartialFlowGraph` by starting with one of the `apply` methods in
* in [[FlowGraph$ companion object]]. Syntactic sugar is provided by [[FlowGraphImplicits]].
*/
class PartialFlowGraph private[akka] (private[akka] val graph: ImmutableGraph[FlowGraphInternal.Vertex, FlowGraphInternal.EdgeType]) {
class PartialFlowGraph private[akka] (private[akka] val graph: DirectedGraphBuilder[FlowGraphInternal.EdgeLabel, FlowGraphInternal.Vertex]) {
import FlowGraphInternal._
def undefinedSources: Set[UndefinedSource[_]] =
graph.nodes.iterator.map(_.value).collect {
graph.nodes.map(_.label).collect {
case n: UndefinedSource[_] n
}.toSet
def undefinedSinks: Set[UndefinedSink[_]] =
graph.nodes.iterator.map(_.value).collect {
graph.nodes.map(_.label).collect {
case n: UndefinedSink[_] n
}.toSet