diff --git a/akka-stream-tests/src/test/scala/akka/stream/impl/DirectedGraphBuilderSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/impl/DirectedGraphBuilderSpec.scala new file mode 100644 index 0000000000..6de50b9b47 --- /dev/null +++ b/akka-stream-tests/src/test/scala/akka/stream/impl/DirectedGraphBuilderSpec.scala @@ -0,0 +1,382 @@ +/** + * Copyright (C) 2014 Typesafe Inc. + */ +package akka.stream.impl + +import akka.stream.testkit.AkkaSpec + +class DirectedGraphBuilderSpec extends AkkaSpec { + + "DirectedGraphBuilder" must { + + "add and remove vertices" in { + val g = new DirectedGraphBuilder[String, Int] + g.contains(1) should be(false) + g.nodes.isEmpty should be(true) + g.edges.isEmpty should be(true) + g.find(1) should be(None) + + g.addVertex(1) + + g.contains(1) should be(true) + g.nodes.size should be(1) + g.edges.isEmpty should be(true) + g.find(1) should be(Some(Vertex(1))) + + g.addVertex(1) + + g.contains(1) should be(true) + g.nodes.size should be(1) + g.edges.isEmpty should be(true) + g.find(1) should be(Some(Vertex(1))) + + g.addVertex(2) + + g.contains(1) should be(true) + g.contains(2) should be(true) + g.nodes.size should be(2) + g.find(1) should be(Some(Vertex(1))) + g.find(2) should be(Some(Vertex(2))) + + g.remove(2) + g.contains(1) should be(true) + g.contains(2) should be(false) + g.nodes.size should be(1) + g.edges.isEmpty should be(true) + g.find(1) should be(Some(Vertex(1))) + g.find(2) should be(None) + } + + "add and remove edges" in { + val g = new DirectedGraphBuilder[String, Int] + + g.nodes.size should be(0) + g.edges.size should be(0) + g.contains(1) should be(false) + g.contains(2) should be(false) + g.contains(3) should be(false) + g.containsEdge("1 -> 2") should be(false) + g.containsEdge("2 -> 3") should be(false) + + g.addEdge(1, 2, "1 -> 2") + + g.nodes.size should be(2) + g.edges.size should be(1) + g.contains(1) should be(true) + g.contains(2) should be(true) + g.containsEdge("1 -> 2") should be(true) + g.get(1).incoming.isEmpty should be(true) + g.get(1).outgoing.head.label should be("1 -> 2") + g.get(2).outgoing.isEmpty should be(true) + g.get(2).incoming.head.label should be("1 -> 2") + g.get(1).outgoing.head.from.label should be(1) + g.get(1).outgoing.head.to.label should be(2) + + g.addEdge(2, 3, "2 -> 3") + + g.nodes.size should be(3) + g.edges.size should be(2) + g.contains(1) should be(true) + g.contains(2) should be(true) + g.contains(3) should be(true) + g.containsEdge("1 -> 2") should be(true) + g.containsEdge("2 -> 3") should be(true) + g.get(1).incoming.isEmpty should be(true) + g.get(1).outgoing.head.label should be("1 -> 2") + g.get(2).outgoing.head.label should be("2 -> 3") + g.get(2).incoming.head.label should be("1 -> 2") + g.get(3).incoming.head.label should be("2 -> 3") + g.get(3).outgoing.isEmpty should be(true) + g.get(1).outgoing.head.from.label should be(1) + g.get(1).outgoing.head.to.label should be(2) + g.get(2).outgoing.head.from.label should be(2) + g.get(2).outgoing.head.to.label should be(3) + + // Will reposition edge + g.addEdge(2, 4, "2 -> 3") + + g.nodes.size should be(4) + g.edges.size should be(2) + g.contains(1) should be(true) + g.contains(2) should be(true) + g.contains(3) should be(true) + g.contains(4) should be(true) + g.containsEdge("1 -> 2") should be(true) + g.containsEdge("2 -> 3") should be(true) + g.get(1).incoming.isEmpty should be(true) + g.get(1).outgoing.head.label should be("1 -> 2") + g.get(2).outgoing.head.label should be("2 -> 3") + g.get(2).incoming.head.label should be("1 -> 2") + g.get(3).incoming.isEmpty should be(true) + g.get(3).outgoing.isEmpty should be(true) + g.get(4).incoming.head.label should be("2 -> 3") + g.get(4).outgoing.isEmpty should be(true) + g.get(1).outgoing.head.from.label should be(1) + g.get(1).outgoing.head.to.label should be(2) + g.get(2).outgoing.head.from.label should be(2) + g.get(2).outgoing.head.to.label should be(4) + + // Will remove dangling edge + g.remove(4) + + g.nodes.size should be(3) + g.edges.size should be(1) + g.contains(1) should be(true) + g.contains(2) should be(true) + g.contains(3) should be(true) + g.contains(4) should be(false) + g.containsEdge("1 -> 2") should be(true) + g.containsEdge("2 -> 3") should be(false) + g.get(1).incoming.isEmpty should be(true) + g.get(1).outgoing.head.label should be("1 -> 2") + g.get(2).outgoing.isEmpty should be(true) + g.get(2).incoming.head.label should be("1 -> 2") + g.get(3).incoming.isEmpty should be(true) + g.get(3).outgoing.isEmpty should be(true) + g.get(1).outgoing.head.from.label should be(1) + g.get(1).outgoing.head.to.label should be(2) + + // Remove remaining edge + g.removeEdge("1 -> 2") + + g.nodes.size should be(3) + g.edges.isEmpty should be(true) + g.contains(1) should be(true) + g.contains(2) should be(true) + g.contains(3) should be(true) + g.contains(4) should be(false) + g.containsEdge("1 -> 2") should be(false) + g.containsEdge("2 -> 3") should be(false) + g.get(1).incoming.isEmpty should be(true) + g.get(1).outgoing.isEmpty should be(true) + g.get(2).outgoing.isEmpty should be(true) + g.get(2).incoming.isEmpty should be(true) + g.get(3).incoming.isEmpty should be(true) + g.get(3).outgoing.isEmpty should be(true) + } + } + + "work correctly with isolated nodes" in { + val g = new DirectedGraphBuilder[String, Int] + (1 to 99) foreach { i ⇒ + g.addVertex(i) + g.nodes.size should be(i) + g.find(i) should be(Some(Vertex(i))) + } + + g.isWeaklyConnected should be(false) + g.findCycle.isEmpty should be(true) + g.edgePredecessorBFSfoldLeft(g.get(99))(true) { (_, _) ⇒ false } should be(true) + } + + "work correctly with simple chains" in { + val g = new DirectedGraphBuilder[String, Int] + + (1 to 99) foreach { i ⇒ + g.addEdge(i, i + 1, s"$i -> ${i + 1}") + g.nodes.size should be(i + 1) + g.edges.size should be(i) + g.find(i) should be(Some(Vertex(i))) + g.find(i + 1) should be(Some(Vertex(i + 1))) + g.edges.contains(s"$i -> ${i + 1}") + } + + g.isWeaklyConnected should be(true) + g.findCycle.isEmpty should be(true) + g.edgePredecessorBFSfoldLeft(g.get(100))(100) { (sum, e) ⇒ sum + e.from.label } should be(5050) + + (1 to 100) foreach (g.remove(_)) + g.nodes.isEmpty should be(true) + g.edges.isEmpty should be(true) + } + + "work correctly with weakly connected chains" in { + val g = new DirectedGraphBuilder[String, Int] + + (1 to 49) foreach { i ⇒ + g.addEdge(i, i + 1, s"$i -> ${i + 1}") + g.nodes.size should be(i + 1) + g.edges.size should be(i) + g.find(i) should be(Some(Vertex(i))) + g.find(i + 1) should be(Some(Vertex(i + 1))) + g.edges.contains(s"$i -> ${i + 1}") + } + + (100 to 51 by -1) foreach { i ⇒ + g.addEdge(i, i - 1, s"$i -> ${i - 1}") + g.find(i) should be(Some(Vertex(i))) + g.find(i - 1) should be(Some(Vertex(i - 1))) + g.edges.contains(s"$i -> ${i - 1}") + } + + g.nodes.size should be(100) + g.edges.size should be(99) + + g.isWeaklyConnected should be(true) + g.findCycle.isEmpty should be(true) + g.edgePredecessorBFSfoldLeft(g.get(50))(50) { (sum, e) ⇒ sum + e.from.label } should be(5050) + + (1 to 100) foreach (g.remove(_)) + g.nodes.isEmpty should be(true) + g.edges.isEmpty should be(true) + } + + "work correctly with directed cycles" in { + val g = new DirectedGraphBuilder[String, Int] + + (1 to 99) foreach { i ⇒ + g.addEdge(i, i + 1, s"$i -> ${i + 1}") + g.nodes.size should be(i + 1) + g.edges.size should be(i) + g.find(i) should be(Some(Vertex(i))) + g.find(i + 1) should be(Some(Vertex(i + 1))) + g.edges.contains(s"$i -> ${i + 1}") + } + g.addEdge(100, 1, "100 -> 1") + g.nodes.size should be(100) + g.edges.size should be(100) + + g.isWeaklyConnected should be(true) + g.findCycle.toSet.size should be(100) + g.findCycle.toSet should be((1 to 100).map(Vertex(_)).toSet) + g.edgePredecessorBFSfoldLeft(g.get(100))(0) { (sum, e) ⇒ sum + e.from.label } should be(5050) + + (1 to 100) foreach (g.remove(_)) + g.nodes.isEmpty should be(true) + g.edges.isEmpty should be(true) + } + + "work correctly with undirected cycles" in { + val g = new DirectedGraphBuilder[String, Int] + + (1 to 49) foreach { i ⇒ + g.addEdge(i, i + 1, s"$i -> ${i + 1}") + g.nodes.size should be(i + 1) + g.edges.size should be(i) + g.find(i) should be(Some(Vertex(i))) + g.find(i + 1) should be(Some(Vertex(i + 1))) + g.edges.contains(s"$i -> ${i + 1}") + } + + (100 to 51 by -1) foreach { i ⇒ + g.addEdge(i, i - 1, s"$i -> ${i - 1}") + g.find(i) should be(Some(Vertex(i))) + g.find(i - 1) should be(Some(Vertex(i - 1))) + g.edges.contains(s"$i -> ${i - 1}") + } + + g.addEdge(100, 1, "100 -> 1") + g.nodes.size should be(100) + g.edges.size should be(100) + + g.isWeaklyConnected should be(true) + g.findCycle.isEmpty should be(true) + g.edgePredecessorBFSfoldLeft(g.get(50))(50) { (sum, e) ⇒ sum + e.from.label } should be(5150) + + (1 to 100) foreach (g.remove(_)) + g.nodes.isEmpty should be(true) + g.edges.isEmpty should be(true) + } + + "work correctly with two linked cycles, both directed" in { + val g = new DirectedGraphBuilder[String, Int] + g.addEdge(0, 1, "0 -> 1") + g.addEdge(1, 2, "1 -> 2") + g.addEdge(2, 0, "2 -> 0") + + g.addEdge(1, 3, "1 -> 3") + g.addEdge(3, 0, "3 -> 0") + + g.nodes.size should be(4) + g.isWeaklyConnected should be(true) + g.findCycle.nonEmpty should be(true) + g.findCycle.size should be(3) + + g.removeEdge("1 -> 2") + g.isWeaklyConnected should be(true) + g.findCycle.nonEmpty should be(true) + g.findCycle.size should be(3) + g.findCycle.map(_.label).toSet should be(Set(0, 1, 3)) + + g.removeEdge("1 -> 3") + g.addEdge(1, 2, "1 -> 2") + g.nodes.size should be(4) + g.isWeaklyConnected should be(true) + g.findCycle.nonEmpty should be(true) + g.findCycle.size should be(3) + g.findCycle.map(_.label).toSet should be(Set(0, 1, 2)) + + g.removeEdge("1 -> 2") + g.isWeaklyConnected should be(true) + g.findCycle.isEmpty should be(true) + } + + "work correctly with two linked cycles, one undirected" in { + val g = new DirectedGraphBuilder[String, Int] + g.addEdge(0, 1, "0 -> 1") + g.addEdge(1, 2, "1 -> 2") + g.addEdge(2, 0, "2 -> 0") + + g.addEdge(1, 3, "1 -> 3") + g.addEdge(0, 3, "3 <- 0") + + g.nodes.size should be(4) + g.isWeaklyConnected should be(true) + g.findCycle.nonEmpty should be(true) + g.findCycle.size should be(3) + g.findCycle.map(_.label).toSet should be(Set(0, 1, 2)) + + g.removeEdge("1 -> 2") + g.isWeaklyConnected should be(true) + g.findCycle.isEmpty should be(true) + + g.removeEdge("1 -> 3") + g.isWeaklyConnected should be(true) + g.findCycle.isEmpty should be(true) + + g.remove(0) + g.isWeaklyConnected should be(false) + g.findCycle.isEmpty should be(true) + } + + "copy correctly" in { + val g1 = new DirectedGraphBuilder[String, Int] + + (1 to 49) foreach { i ⇒ + g1.addEdge(i, i + 1, s"$i -> ${i + 1}") + } + + (100 to 51 by -1) foreach { i ⇒ + g1.addEdge(i, i - 1, s"$i -> ${i - 1}") + } + + g1.addEdge(0, 1, "0 -> 1") + g1.addEdge(2, 0, "2 -> 0") + + g1.addEdge(1, 3, "1 -> 3") + g1.addEdge(3, 0, "3 -> 0") + + g1.addVertex(200) + + val g2 = g1.copy() + + g2.nodes.size should be(102) + g2.nodes.toSet should be(g1.nodes.toSet) + g2.edges.toSet should be(g1.edges.toSet) + + g2.nodes foreach { v2 ⇒ + val v1 = g1.find(v2.label).get + + v1.label should be(v2.label) + v1.incoming should be(v2.incoming) + v1.outgoing should be(v2.outgoing) + + v1.incoming.map(_.to) should be(v2.incoming.map(_.to)) + v1.incoming.map(_.from) should be(v2.incoming.map(_.from)) + + v1.outgoing.map(_.to) should be(v2.outgoing.map(_.to)) + v1.outgoing.map(_.from) should be(v2.outgoing.map(_.from)) + } + } + +} diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowGraphCompileSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowGraphCompileSpec.scala index e868b74f45..bfbba0b880 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowGraphCompileSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowGraphCompileSpec.scala @@ -303,6 +303,7 @@ class FlowGraphCompileSpec extends AkkaSpec { val merge = Merge[String] import FlowGraphImplicits._ in1 ~> merge ~> out1 + in2 ~> merge merge ~> out2 // wrong } }.getMessage should include("at most 1 outgoing") diff --git a/akka-stream/src/main/scala/akka/stream/impl/DirectedGraphBuilder.scala b/akka-stream/src/main/scala/akka/stream/impl/DirectedGraphBuilder.scala new file mode 100644 index 0000000000..73c7222e9f --- /dev/null +++ b/akka-stream/src/main/scala/akka/stream/impl/DirectedGraphBuilder.scala @@ -0,0 +1,247 @@ +/** + * Copyright (C) 2014 Typesafe Inc. + */ +package akka.stream.impl + +import scala.annotation.tailrec +import scala.collection.immutable + +/** + * INTERNAL API + */ +private[akka] final case class Vertex[E, V](label: V) { + private var inEdgeSet = Set.empty[Edge[E, V]] + private var outEdgeSet = Set.empty[Edge[E, V]] + + def addOutEdge(e: Edge[E, V]): Unit = outEdgeSet += e + def addInEdge(e: Edge[E, V]): Unit = inEdgeSet += e + + def removeOutEdge(e: Edge[E, V]): Unit = outEdgeSet -= e + def removeInEdge(e: Edge[E, V]): Unit = inEdgeSet -= e + + def inDegree: Int = inEdgeSet.size + def outDegree: Int = outEdgeSet.size + + def isolated: Boolean = inDegree == 0 && outDegree == 0 + + def isSink: Boolean = outEdgeSet.isEmpty + + def successors: Set[Vertex[E, V]] = outEdgeSet.map(_.to) + def predecessors: Set[Vertex[E, V]] = inEdgeSet.map(_.from) + + def neighbors: Set[Vertex[E, V]] = successors ++ predecessors + + def incoming: Set[Edge[E, V]] = inEdgeSet + def outgoing: Set[Edge[E, V]] = outEdgeSet + + override def equals(obj: Any): Boolean = obj match { + case v: Vertex[_, _] ⇒ label.equals(v.label) + case _ ⇒ false + } + + override def hashCode(): Int = label.hashCode() +} + +/** + * INTERNAL API + */ +private[akka] final case class Edge[E, V](label: E, from: Vertex[E, V], to: Vertex[E, V]) { + + override def equals(obj: Any): Boolean = obj match { + case e: Edge[_, _] ⇒ label.equals(e.label) + case _ ⇒ false + } + + override def hashCode(): Int = label.hashCode() +} + +/** + * INTERNAL API + */ +private[akka] class DirectedGraphBuilder[E, V] { + private var vertexMap = Map.empty[V, Vertex[E, V]] + private var edgeMap = Map.empty[E, Edge[E, V]] + + def edges: immutable.Seq[Edge[E, V]] = edgeMap.values.toVector + + def nodes: immutable.Seq[Vertex[E, V]] = vertexMap.values.toVector + + def nonEmpty: Boolean = vertexMap.nonEmpty + + def addVertex(v: V): Vertex[E, V] = vertexMap.get(v) match { + case None ⇒ + val vx = Vertex[E, V](v) + vertexMap += v -> vx + vx + + case Some(vx) ⇒ vx + } + + def addEdge(from: V, to: V, label: E): Unit = { + val vfrom = addVertex(from) + val vto = addVertex(to) + + removeEdge(label) // Need to remap existing labels + val edge = Edge[E, V](label, vfrom, vto) + edgeMap += label -> edge + + vfrom.addOutEdge(edge) + vto.addInEdge(edge) + + } + + def find(v: V): Option[Vertex[E, V]] = vertexMap.get(v) + + def get(v: V): Vertex[E, V] = vertexMap(v) + + def contains(v: V): Boolean = vertexMap.contains(v) + + def containsEdge(e: E): Boolean = edgeMap.contains(e) + + def exists(p: Vertex[E, V] ⇒ Boolean) = vertexMap.values.exists(p) + + def removeEdge(label: E): Unit = edgeMap.get(label) match { + case Some(e) ⇒ + edgeMap -= label + e.from.removeOutEdge(e) + e.to.removeInEdge(e) + case None ⇒ + } + + def remove(v: V): Unit = vertexMap.get(v) match { + case Some(vx) ⇒ + vertexMap -= v + + vx.incoming foreach { edge ⇒ removeEdge(edge.label) } + vx.outgoing foreach { edge ⇒ removeEdge(edge.label) } + + case None ⇒ + } + + /** + * Performs a deep copy of the builder. Since the builder is mutable it is not safe to share instances of it + * without making a defensive copy first. + */ + def copy(): DirectedGraphBuilder[E, V] = { + val result = new DirectedGraphBuilder[E, V]() + + edgeMap.foreach { + case (label, e) ⇒ + result.addEdge(e.from.label, e.to.label, e.label) + } + + vertexMap.filter(_._2.isolated) foreach { + case (_, n) ⇒ + result.addVertex(n.label) + } + + result + } + + /** + * Returns true if for every vertex pair there is an undirected path connecting them + */ + def isWeaklyConnected: Boolean = { + if (vertexMap.isEmpty) true + else { + var unvisited = vertexMap.values.toSet + var toVisit = Set(unvisited.head) + + while (toVisit.nonEmpty) { + val v = toVisit.head + unvisited -= v + toVisit = toVisit.tail + toVisit ++= v.neighbors.iterator.filter(unvisited.contains) // visit all unvisited neighbors of v (neighbors are undirected) + } + + unvisited.isEmpty // if we ended up with unvisited nodes starting from one node we are unconnected + } + } + + /** + * Finds a directed cycle in the graph + */ + def findCycle: immutable.Seq[Vertex[E, V]] = { + if (vertexMap.size < 2 || edgeMap.size < 2) Nil + else { + // Vertices we have not visited at all yet + var unvisited = vertexMap.values.toSet + + // Attempts to find a cycle in a connected component + def findCycleInComponent( + componentEntryVertex: Vertex[E, V], + toVisit: Vertex[E, V], + cycleCandidate: List[Vertex[E, V]]): List[Vertex[E, V]] = { + + if (!unvisited(toVisit)) Nil + else { + unvisited -= toVisit + + val successors = toVisit.successors + if (successors.contains(componentEntryVertex)) toVisit :: cycleCandidate + else { + val newCycleCandidate = toVisit :: cycleCandidate + + // search in all successors + @tailrec def traverse(toTraverse: Set[Vertex[E, V]]): List[Vertex[E, V]] = { + if (toTraverse.isEmpty) Nil + else { + val c = findCycleInComponent(componentEntryVertex, toVisit = toTraverse.head, newCycleCandidate) + if (c.nonEmpty) c + else traverse(toTraverse = toTraverse.tail) + } + } + + traverse(toTraverse = successors) + } + } + + } + + // Traverse all weakly connected components and try to find cycles in each of them + @tailrec def findNextCycle(): List[Vertex[E, V]] = { + if (unvisited.size < 2) Nil + else { + // Pick a node to recursively start visiting its successors + val componentEntry = unvisited.head + + if (componentEntry.inDegree < 1 || componentEntry.outDegree < 1) { + unvisited -= componentEntry + findNextCycle() + } else { + val cycleCandidate = + findCycleInComponent(componentEntry, toVisit = componentEntry, cycleCandidate = Nil) + + if (cycleCandidate.nonEmpty) cycleCandidate + else findNextCycle() + } + + } + + } + + findNextCycle() + } + } + + def edgePredecessorBFSfoldLeft[T](start: Vertex[E, V])(zero: T)(f: (T, Edge[E, V]) ⇒ T): T = { + var aggr: T = zero + var unvisited = edgeMap.values.toSet + // Queue to maintain BFS state + var toVisit = immutable.Queue() ++ start.incoming + + while (toVisit.nonEmpty) { + val (e, nextToVisit) = toVisit.dequeue + toVisit = nextToVisit + + unvisited -= e + aggr = f(aggr, e) + val unvisitedPredecessors = e.from.incoming.filter(unvisited.contains) + unvisited --= unvisitedPredecessors + toVisit = toVisit ++ unvisitedPredecessors + } + + aggr + } + +} diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/FlowGraph.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/FlowGraph.scala index 941b576de3..0a9dbd93cc 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/FlowGraph.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/FlowGraph.scala @@ -11,13 +11,10 @@ import org.reactivestreams.Publisher import org.reactivestreams.Subscriber import scala.language.existentials -import scalax.collection.edge.LkBase -import scalax.collection.edge.LkDiEdge -import scalax.collection.immutable.{ Graph ⇒ ImmutableGraph } import org.reactivestreams.Subscriber import org.reactivestreams.Publisher import akka.stream.FlowMaterializer -import scalax.collection.mutable.Graph +import akka.stream.impl.{ DirectedGraphBuilder, Edge, Vertex ⇒ GVertex } /** * Fan-in and fan-out vertices in the [[FlowGraph]] implements @@ -433,22 +430,6 @@ final class UndefinedSource[+T](override val name: Option[String]) extends FlowG */ private[akka] object FlowGraphInternal { - /** - * INTERNAL API - * Workaround for issue #16109. Reflection, used by scalax.graph, is not thread - * safe. Initialize one graph before it is used for real. - */ - private[akka] val reflectionIssueWorkaround = { - val graph = ImmutableGraph.empty[FlowGraphInternal.Vertex, FlowGraphInternal.EdgeType] - val builder = new FlowGraphBuilder(graph) - val merge = Merge[String] - builder. - addEdge(Source.empty[String], merge). - addEdge(Source.empty[String], merge). - addEdge(merge, Sink.ignore) - builder.build() - } - def throwUnsupportedValue(x: Any): Nothing = throw new IllegalArgumentException(s"Unsupported value [$x] of type [${x.getClass.getName}]. Only Pipes and Graphs are supported!") @@ -541,11 +522,6 @@ private[akka] object FlowGraphInternal { } - type EdgeType[T] = LkDiEdge[T] { type L1 = EdgeLabel } - - def edges(graph: scalax.collection.Graph[Vertex, EdgeType]): Iterable[EdgeType[Vertex]] = - graph.edges.map(e ⇒ LkDiEdge(e.from.value, e.to.value)(e.label)) - } object FlowGraphBuilder { @@ -559,17 +535,14 @@ object FlowGraphBuilder { * Builder of [[FlowGraph]] and [[PartialFlowGraph]]. * Syntactic sugar is provided by [[FlowGraphImplicits]]. */ -class FlowGraphBuilder private (graph: Graph[FlowGraphInternal.Vertex, FlowGraphInternal.EdgeType]) { +class FlowGraphBuilder private[akka] (_graph: DirectedGraphBuilder[FlowGraphInternal.EdgeLabel, FlowGraphInternal.Vertex]) { import FlowGraphInternal._ + private val graph = new DirectedGraphBuilder[FlowGraphInternal.EdgeLabel, FlowGraphInternal.Vertex]() - private[akka] def this() = this(Graph.empty[FlowGraphInternal.Vertex, FlowGraphInternal.EdgeType]) + private[akka] def this() = this(new DirectedGraphBuilder[FlowGraphInternal.EdgeLabel, FlowGraphInternal.Vertex]()) - private[akka] def this(immutableGraph: ImmutableGraph[FlowGraphInternal.Vertex, FlowGraphInternal.EdgeType]) = - this(Graph.from(edges = FlowGraphInternal.edges(immutableGraph))) - - private implicit val edgeFactory = scalax.collection.edge.LkDiEdge.asInstanceOf[LkBase.LkEdgeCompanion[EdgeType]] - - var edgeQualifier = graph.edges.size + private var edgeQualifier = 0 + importGraph(_graph) private var cyclesAllowed = false @@ -777,7 +750,7 @@ class FlowGraphBuilder private (graph: Graph[FlowGraphInternal.Vertex, FlowGraph private def uncheckedAddGraphEdge[In, Out](from: Vertex, to: Vertex, pipe: Pipe[In, Out], inputPort: Int, outputPort: Int): Unit = { if (edgeQualifier == Int.MaxValue) throw new IllegalArgumentException(s"Too many edges") val label = EdgeLabel(edgeQualifier)(pipe.asInstanceOf[Pipe[Any, Nothing]], inputPort, outputPort) - graph.addLEdge(from, to)(label) + graph.addEdge(from, to, label) edgeQualifier += 1 } @@ -797,15 +770,15 @@ class FlowGraphBuilder private (graph: Graph[FlowGraphInternal.Vertex, FlowGraph graph.find(token) match { case Some(existing) ⇒ val edge = existing.incoming.head - graph.remove(existing) + graph.remove(existing.label) sink match { case spipe: SinkPipe[Out] ⇒ val pipe = edge.label.pipe.appendPipe(Pipe(spipe.ops)) - addGraphEdge(edge.from.value, SinkVertex(spipe.output), pipe, edge.label.inputPort, edge.label.outputPort) + addGraphEdge(edge.from.label, SinkVertex(spipe.output), pipe, edge.label.inputPort, edge.label.outputPort) case gsink: GraphSink[Out, _] ⇒ gsink.importAndConnect(this, token) case sink: Sink[Out] ⇒ - addGraphEdge(edge.from.value, SinkVertex(sink), edge.label.pipe, edge.label.inputPort, edge.label.outputPort) + addGraphEdge(edge.from.label, SinkVertex(sink), edge.label.pipe, edge.label.inputPort, edge.label.outputPort) } case None ⇒ throw new IllegalArgumentException(s"No matching UndefinedSink [${token}]") @@ -817,15 +790,15 @@ class FlowGraphBuilder private (graph: Graph[FlowGraphInternal.Vertex, FlowGraph graph.find(token) match { case Some(existing) ⇒ val edge = existing.outgoing.head - graph.remove(existing) + graph.remove(existing.label) source match { case spipe: SourcePipe[In] ⇒ val pipe = Pipe(spipe.ops).appendPipe(edge.label.pipe) - addGraphEdge(SourceVertex(spipe.input), edge.to.value, pipe, edge.label.inputPort, edge.label.outputPort) + addGraphEdge(SourceVertex(spipe.input), edge.to.label, pipe, edge.label.inputPort, edge.label.outputPort) case gsource: GraphSource[_, In] ⇒ gsource.importAndConnect(this, token) case source: Source[In] ⇒ - addGraphEdge(SourceVertex(source), edge.to.value, edge.label.pipe, edge.label.inputPort, edge.label.outputPort) + addGraphEdge(SourceVertex(source), edge.to.label, edge.label.pipe, edge.label.inputPort, edge.label.outputPort) case x ⇒ throwUnsupportedValue(x) } @@ -851,7 +824,7 @@ class FlowGraphBuilder private (graph: Graph[FlowGraphInternal.Vertex, FlowGraph val newPipe = outEdge.label.pipe.appendPipe(pipe.asInstanceOf[Pipe[Any, Nothing]]).appendPipe(inEdge.label.pipe) graph.remove(out) graph.remove(in) - addOrReplaceGraphEdge(outEdge.from.value, inEdge.to.value, newPipe, inEdge.label.inputPort, outEdge.label.outputPort) + addOrReplaceGraphEdge(outEdge.from.label, inEdge.to.label, newPipe, inEdge.label.inputPort, outEdge.label.outputPort) case gflow: GraphFlow[A, _, _, B] ⇒ gflow.importAndConnect(this, out, in) case x ⇒ throwUnsupportedValue(x) @@ -878,9 +851,9 @@ class FlowGraphBuilder private (graph: Graph[FlowGraphInternal.Vertex, FlowGraph this } - private def importGraph(immutableGraph: ImmutableGraph[Vertex, EdgeType]): Unit = - immutableGraph.edges foreach { edge ⇒ - addGraphEdge(edge.from.value, edge.to.value, edge.label.pipe, edge.label.inputPort, edge.label.outputPort) + private def importGraph(builder: DirectedGraphBuilder[EdgeLabel, Vertex]): Unit = + builder.edges foreach { edge ⇒ + addGraphEdge(edge.from.label, edge.to.label, edge.label.pipe, edge.label.inputPort, edge.label.outputPort) } private[scaladsl] def remapPartialFlowGraph(partialFlowGraph: PartialFlowGraph, vertexMapping: Map[Vertex, Vertex]): this.type = { @@ -888,7 +861,7 @@ class FlowGraphBuilder private (graph: Graph[FlowGraphInternal.Vertex, FlowGraph def get(vertex: Vertex): Vertex = mapping.getOrElseUpdate(vertex, vertex.newInstance()) partialFlowGraph.graph.edges.foreach { edge ⇒ - addGraphEdge(get(edge.from.value), get(edge.to.value), edge.label.pipe, edge.label.inputPort, edge.label.outputPort) + addGraphEdge(get(edge.from.label), get(edge.to.label), edge.label.pipe, edge.label.inputPort, edge.label.outputPort) } this @@ -941,8 +914,8 @@ class FlowGraphBuilder private (graph: Graph[FlowGraphInternal.Vertex, FlowGraph graph.find(iv) match { case Some(node) ⇒ require( - (node.inDegree + 1) <= iv.maximumInputCount, - s"${node.value} must have at most ${iv.maximumInputCount} incoming edges, has ${node.inDegree}\n${graph.edges}") + node.inDegree <= iv.maximumInputCount, + s"${node.label} must have at most ${iv.maximumInputCount} incoming edges, has ${node.inDegree}\n${graph.edges}") case _ ⇒ // ok } case _ ⇒ // ok, no checks here @@ -955,8 +928,8 @@ class FlowGraphBuilder private (graph: Graph[FlowGraphInternal.Vertex, FlowGraph graph.find(iv) match { case Some(node) ⇒ require( - (node.outDegree + 1) <= iv.maximumOutputCount, - s"${node.value} must have at most ${iv.maximumOutputCount} outgoing edges, has ${node.outDegree}\n${graph.edges}") + node.outDegree <= iv.maximumOutputCount, + s"${node.label} must have at most ${iv.maximumOutputCount} outgoing edges, has ${node.outDegree}\n${graph.edges}") case _ ⇒ // ok } case _ ⇒ // ok, no checks here @@ -969,7 +942,7 @@ class FlowGraphBuilder private (graph: Graph[FlowGraphInternal.Vertex, FlowGraph private[akka] def build(): FlowGraph = { checkPartialBuildPreconditions() checkBuildPreconditions() - new FlowGraph(immutableGraph()) + new FlowGraph(graph.copy()) } /** @@ -977,30 +950,26 @@ class FlowGraphBuilder private (graph: Graph[FlowGraphInternal.Vertex, FlowGraph */ private[akka] def partialBuild(): PartialFlowGraph = { checkPartialBuildPreconditions() - new PartialFlowGraph(immutableGraph()) - } - - //convert it to an immutable.Graph - private def immutableGraph(): ImmutableGraph[Vertex, FlowGraphInternal.EdgeType] = { - ImmutableGraph.from(edges = FlowGraphInternal.edges(graph)) + new PartialFlowGraph(graph.copy()) } private def checkPartialBuildPreconditions(): Unit = { - if (!cyclesAllowed) graph.findCycle match { - case None ⇒ - case Some(cycle) ⇒ throw new IllegalArgumentException("Cycle detected, not supported yet. " + cycle) + if (!cyclesAllowed) { + val cycle = graph.findCycle + if (cycle.nonEmpty) + throw new IllegalArgumentException("Cycle detected, but cycle support was not enabled. Cycle is " + cycle.map(_.label).mkString(" -> ")) } } private def checkBuildPreconditions(): Unit = { val undefinedSourcesSinks = graph.nodes.filter { - _.value match { + _.label match { case _: UndefinedSource[_] | _: UndefinedSink[_] ⇒ true case x ⇒ false } } if (undefinedSourcesSinks.nonEmpty) { - val formatted = undefinedSourcesSinks.map(n ⇒ n.value match { + val formatted = undefinedSourcesSinks.map(n ⇒ n.label match { case u: UndefinedSource[_] ⇒ s"$u -> ${n.outgoing.head.label} -> ${n.outgoing.head.to}" case u: UndefinedSink[_] ⇒ s"${n.incoming.head.from} -> ${n.incoming.head.label} -> $u" }) @@ -1008,7 +977,7 @@ class FlowGraphBuilder private (graph: Graph[FlowGraphInternal.Vertex, FlowGraph } graph.nodes.foreach { node ⇒ - node.value match { + node.label match { case v: InternalVertex ⇒ require( node.inDegree >= v.minimumInputCount, @@ -1034,12 +1003,12 @@ class FlowGraphBuilder private (graph: Graph[FlowGraphInternal.Vertex, FlowGraph } require(graph.nonEmpty, "Graph must not be empty") - require(graph.exists(graph having ((node = { n ⇒ n.isLeaf && n.diSuccessors.isEmpty }))), + require(graph.exists(_.outDegree == 0), "Graph must have at least one sink") - require(graph.exists(graph having ((node = { n ⇒ n.isLeaf && n.diPredecessors.isEmpty }))), + require(graph.exists(_.inDegree == 0), "Graph must have at least one source") - require(graph.isConnected, "Graph must be connected") + require(graph.isWeaklyConnected, "Graph must be connected") } } @@ -1055,7 +1024,7 @@ object FlowGraph { * Build a [[FlowGraph]] from scratch. */ def apply(block: FlowGraphBuilder ⇒ Unit): FlowGraph = - apply(ImmutableGraph.empty[FlowGraphInternal.Vertex, FlowGraphInternal.EdgeType])(block) + apply(new DirectedGraphBuilder[FlowGraphInternal.EdgeLabel, FlowGraphInternal.Vertex])(block) /** * Continue building a [[FlowGraph]] from an existing `PartialFlowGraph`. @@ -1072,7 +1041,7 @@ object FlowGraph { def apply(flowGraph: FlowGraph)(block: FlowGraphBuilder ⇒ Unit): FlowGraph = apply(flowGraph.graph)(block) - private def apply(graph: ImmutableGraph[FlowGraphInternal.Vertex, FlowGraphInternal.EdgeType])(block: FlowGraphBuilder ⇒ Unit): FlowGraph = { + private def apply(graph: DirectedGraphBuilder[FlowGraphInternal.EdgeLabel, FlowGraphInternal.Vertex])(block: FlowGraphBuilder ⇒ Unit): FlowGraph = { val builder = new FlowGraphBuilder(graph) block(builder) builder.build() @@ -1085,7 +1054,7 @@ object FlowGraph { * Build a `FlowGraph` by starting with one of the `apply` methods in * in [[FlowGraph$ companion object]]. Syntactic sugar is provided by [[FlowGraphImplicits]]. */ -class FlowGraph private[akka] (private[akka] val graph: ImmutableGraph[FlowGraphInternal.Vertex, FlowGraphInternal.EdgeType]) extends RunnableFlow { +class FlowGraph private[akka] (private[akka] val graph: DirectedGraphBuilder[FlowGraphInternal.EdgeLabel, FlowGraphInternal.Vertex]) extends RunnableFlow { import FlowGraphInternal._ /** @@ -1095,7 +1064,7 @@ class FlowGraph private[akka] (private[akka] val graph: ImmutableGraph[FlowGraph val edges = graph.edges if (edges.size == 1) { val edge = edges.head - (edge.from.value, edge.to.value) match { + (edge.from.label, edge.to.label) match { case (sourceVertex: SourceVertex, sinkVertex: SinkVertex) ⇒ val pipe = edge.label.pipe runSimple(sourceVertex, sinkVertex, pipe) @@ -1123,23 +1092,22 @@ class FlowGraph private[akka] (private[akka] val graph: ImmutableGraph[FlowGraph } private def runGraph()(implicit materializer: FlowMaterializer): MaterializedMap = { - import scalax.collection.GraphTraversal._ // start with sinks - val startingNodes = graph.nodes.filter(n ⇒ n.isLeaf && n.diSuccessors.isEmpty) + val startingNodes = graph.nodes.filter(_.isSink) - case class Memo(visited: Set[graph.EdgeT] = Set.empty, - downstreamSubscriber: Map[graph.EdgeT, Subscriber[Any]] = Map.empty, - upstreamPublishers: Map[graph.EdgeT, Publisher[Any]] = Map.empty, + type E = Edge[FlowGraphInternal.EdgeLabel, FlowGraphInternal.Vertex] + + case class Memo(visited: Set[E] = Set.empty, + downstreamSubscriber: Map[E, Subscriber[Any]] = Map.empty, + upstreamPublishers: Map[E, Publisher[Any]] = Map.empty, sources: Map[SourceVertex, SinkPipe[Any]] = Map.empty, materializedSinks: Map[KeyedSink[_], Any] = Map.empty) val result = startingNodes.foldLeft(Memo()) { case (memo, start) ⇒ - val traverser = graph.innerEdgeTraverser(start, parameters = Parameters(direction = Predecessors, kind = BreadthFirst), - ordering = graph.defaultEdgeOrdering) - traverser.foldLeft(memo) { + graph.edgePredecessorBFSfoldLeft(start)(memo) { case (memo, edge) ⇒ if (memo.visited(edge)) { memo @@ -1149,7 +1117,7 @@ class FlowGraph private[akka] (private[akka] val graph: ImmutableGraph[FlowGraph // returns the materialized sink, if any def connectToDownstream(publisher: Publisher[Any]): Option[(KeyedSink[_], Any)] = { val f = pipe.withSource(PublisherSource(publisher)) - edge.to.value match { + edge.to.label match { case SinkVertex(sink: KeyedSink[_]) ⇒ val mf = f.withSink(sink).run() Some(sink -> mf.get(sink)) @@ -1162,7 +1130,7 @@ class FlowGraph private[akka] (private[akka] val graph: ImmutableGraph[FlowGraph } } - edge.from.value match { + edge.from.label match { case source: SourceVertex ⇒ val f = pipe.withSink(SubscriberSink(memo.downstreamSubscriber(edge))) // connect the source with the pipe later @@ -1228,7 +1196,7 @@ object PartialFlowGraph { * Build a [[PartialFlowGraph]] from scratch. */ def apply(block: FlowGraphBuilder ⇒ Unit): PartialFlowGraph = - apply(ImmutableGraph.empty[FlowGraphInternal.Vertex, FlowGraphInternal.EdgeType])(block) + apply(new DirectedGraphBuilder[FlowGraphInternal.EdgeLabel, FlowGraphInternal.Vertex])(block) /** * Continue building a [[PartialFlowGraph]] from an existing `PartialFlowGraph`. @@ -1242,7 +1210,8 @@ object PartialFlowGraph { def apply(flowGraph: FlowGraph)(block: FlowGraphBuilder ⇒ Unit): PartialFlowGraph = apply(flowGraph.graph)(block) - private def apply(graph: ImmutableGraph[FlowGraphInternal.Vertex, FlowGraphInternal.EdgeType])(block: FlowGraphBuilder ⇒ Unit): PartialFlowGraph = { + private def apply(graph: DirectedGraphBuilder[FlowGraphInternal.EdgeLabel, FlowGraphInternal.Vertex])(block: FlowGraphBuilder ⇒ Unit): PartialFlowGraph = { + // FlowGraphBuilder does a full import on the passed graph, so no defensive copy needed val builder = new FlowGraphBuilder(graph) block(builder) builder.partialBuild() @@ -1256,16 +1225,16 @@ object PartialFlowGraph { * Build a `PartialFlowGraph` by starting with one of the `apply` methods in * in [[FlowGraph$ companion object]]. Syntactic sugar is provided by [[FlowGraphImplicits]]. */ -class PartialFlowGraph private[akka] (private[akka] val graph: ImmutableGraph[FlowGraphInternal.Vertex, FlowGraphInternal.EdgeType]) { +class PartialFlowGraph private[akka] (private[akka] val graph: DirectedGraphBuilder[FlowGraphInternal.EdgeLabel, FlowGraphInternal.Vertex]) { import FlowGraphInternal._ def undefinedSources: Set[UndefinedSource[_]] = - graph.nodes.iterator.map(_.value).collect { + graph.nodes.map(_.label).collect { case n: UndefinedSource[_] ⇒ n }.toSet def undefinedSinks: Set[UndefinedSink[_]] = - graph.nodes.iterator.map(_.value).collect { + graph.nodes.map(_.label).collect { case n: UndefinedSink[_] ⇒ n }.toSet