diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl2/FlowGraph.scala b/akka-stream/src/main/scala/akka/stream/scaladsl2/FlowGraph.scala index 5de26ce57c..ff39c568c8 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl2/FlowGraph.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl2/FlowGraph.scala @@ -4,7 +4,7 @@ package akka.stream.scaladsl2 import scala.language.existentials -import scalax.collection.edge.LDiEdge +import scalax.collection.edge.LkDiEdge import scalax.collection.mutable.Graph import scalax.collection.immutable.{ Graph ⇒ ImmutableGraph } import org.reactivestreams.Subscriber @@ -160,34 +160,41 @@ private[akka] object FlowGraphInternal { } } + // flow not part of equals/hashCode + case class EdgeLabel(qualifier: Int)(val flow: ProcessorFlow[Any, Any]) { + override def toString: String = flow.toString + } + } /** * Builder of [[FlowGraph]] and [[PartialFlowGraph]]. * Syntactic sugar is provided by [[FlowGraphImplicits]]. */ -class FlowGraphBuilder private (graph: Graph[FlowGraphInternal.Vertex, LDiEdge]) { +class FlowGraphBuilder private (graph: Graph[FlowGraphInternal.Vertex, LkDiEdge]) { import FlowGraphInternal._ - private[akka] def this() = this(Graph.empty[FlowGraphInternal.Vertex, LDiEdge]) + private[akka] def this() = this(Graph.empty[FlowGraphInternal.Vertex, LkDiEdge]) - private[akka] def this(immutableGraph: ImmutableGraph[FlowGraphInternal.Vertex, LDiEdge]) = - this(Graph.from(edges = immutableGraph.edges.map(e ⇒ LDiEdge(e.from.value, e.to.value)(e.label)).toIterable)) + private[akka] def this(immutableGraph: ImmutableGraph[FlowGraphInternal.Vertex, LkDiEdge]) = + this(Graph.from(edges = immutableGraph.edges.map(e ⇒ LkDiEdge(e.from.value, e.to.value)(e.label)).toIterable)) - private implicit val edgeFactory = scalax.collection.edge.LDiEdge + private implicit val edgeFactory = scalax.collection.edge.LkDiEdge + + var edgeQualifier = graph.edges.size def addEdge[In, Out](source: Source[In], flow: ProcessorFlow[In, Out], sink: Junction[Out]): this.type = { val sourceVertex = SourceVertex(source) checkAddSourceSinkPrecondition(sourceVertex) checkAddFanPrecondition(sink, in = true) - graph.addLEdge(sourceVertex, sink)(flow) + addGraphEdge(sourceVertex, sink, flow) this } def addEdge[In, Out](source: UndefinedSource[In], flow: ProcessorFlow[In, Out], sink: Junction[Out]): this.type = { checkAddSourceSinkPrecondition(source) checkAddFanPrecondition(sink, in = true) - graph.addLEdge(source, sink)(flow) + addGraphEdge(source, sink, flow) this } @@ -195,21 +202,21 @@ class FlowGraphBuilder private (graph: Graph[FlowGraphInternal.Vertex, LDiEdge]) val sinkVertex = SinkVertex(sink) checkAddSourceSinkPrecondition(sinkVertex) checkAddFanPrecondition(source, in = false) - graph.addLEdge(source, sinkVertex)(flow) + addGraphEdge(source, sinkVertex, flow) this } def addEdge[In, Out](source: Junction[In], flow: ProcessorFlow[In, Out], sink: UndefinedSink[Out]): this.type = { checkAddSourceSinkPrecondition(sink) checkAddFanPrecondition(source, in = false) - graph.addLEdge(source, sink)(flow) + addGraphEdge(source, sink, flow) this } def addEdge[In, Out](source: Junction[In], flow: ProcessorFlow[In, Out], sink: Junction[Out]): this.type = { checkAddFanPrecondition(source, in = false) checkAddFanPrecondition(sink, in = true) - graph.addLEdge(source, sink)(flow) + addGraphEdge(source, sink, flow) this } @@ -223,6 +230,13 @@ class FlowGraphBuilder private (graph: Graph[FlowGraphInternal.Vertex, LDiEdge]) this } + private def addGraphEdge[In, Out](from: Vertex, to: Vertex, flow: ProcessorFlow[In, Out]): Unit = { + if (edgeQualifier == Int.MaxValue) throw new IllegalArgumentException(s"Too many edges") + val label = EdgeLabel(edgeQualifier)(flow.asInstanceOf[ProcessorFlow[Any, Any]]) + graph.addLEdge(from, to)(label) + edgeQualifier += 1 + } + def attachSink[Out](token: UndefinedSink[Out], sink: Sink[Out]): this.type = { graph.find(token) match { case Some(existing) ⇒ @@ -286,8 +300,8 @@ class FlowGraphBuilder private (graph: Graph[FlowGraphInternal.Vertex, LDiEdge]) } //convert it to an immutable.Graph - private def immutableGraph(): ImmutableGraph[Vertex, LDiEdge] = - ImmutableGraph.from(edges = graph.edges.map(e ⇒ LDiEdge(e.from.value, e.to.value)(e.label)).toIterable) + private def immutableGraph(): ImmutableGraph[Vertex, LkDiEdge] = + ImmutableGraph.from(edges = graph.edges.map(e ⇒ LkDiEdge(e.from.value, e.to.value)(e.label)).toIterable) private def checkPartialBuildPreconditions(): Unit = { graph.findCycle match { @@ -346,7 +360,7 @@ object FlowGraph { * Build a [[FlowGraph]] from scratch. */ def apply(block: FlowGraphBuilder ⇒ Unit): FlowGraph = - apply(ImmutableGraph.empty[FlowGraphInternal.Vertex, LDiEdge])(block) + apply(ImmutableGraph.empty[FlowGraphInternal.Vertex, LkDiEdge])(block) /** * Continue building a [[FlowGraph]] from an existing `PartialFlowGraph`. @@ -363,7 +377,7 @@ object FlowGraph { def apply(flowGraph: FlowGraph)(block: FlowGraphBuilder ⇒ Unit): FlowGraph = apply(flowGraph.graph)(block) - private def apply(graph: ImmutableGraph[FlowGraphInternal.Vertex, LDiEdge])(block: FlowGraphBuilder ⇒ Unit): FlowGraph = { + private def apply(graph: ImmutableGraph[FlowGraphInternal.Vertex, LkDiEdge])(block: FlowGraphBuilder ⇒ Unit): FlowGraph = { val builder = new FlowGraphBuilder(graph) block(builder) builder.build() @@ -373,7 +387,7 @@ object FlowGraph { /** * Concrete flow graph that can be materialized with [[#run]]. */ -class FlowGraph private[akka] (private[akka] val graph: ImmutableGraph[FlowGraphInternal.Vertex, LDiEdge]) { +class FlowGraph private[akka] (private[akka] val graph: ImmutableGraph[FlowGraphInternal.Vertex, LkDiEdge]) { import FlowGraphInternal._ /** @@ -401,7 +415,7 @@ class FlowGraph private[akka] (private[akka] val graph: ImmutableGraph[FlowGraph if (memo.visited(edge)) { memo } else { - val flow = edge.label.asInstanceOf[ProcessorFlow[Any, Any]] + val flow = edge.label.asInstanceOf[EdgeLabel].flow // returns the materialized sink, if any def connectToDownstream(publisher: Publisher[Any]): Option[(SinkWithKey[_, _], Any)] = { @@ -496,7 +510,7 @@ object PartialFlowGraph { * Build a [[PartialFlowGraph]] from scratch. */ def apply(block: FlowGraphBuilder ⇒ Unit): PartialFlowGraph = - apply(ImmutableGraph.empty[FlowGraphInternal.Vertex, LDiEdge])(block) + apply(ImmutableGraph.empty[FlowGraphInternal.Vertex, LkDiEdge])(block) /** * Continue building a [[PartialFlowGraph]] from an existing `PartialFlowGraph`. @@ -510,7 +524,7 @@ object PartialFlowGraph { def apply(flowGraph: FlowGraph)(block: FlowGraphBuilder ⇒ Unit): PartialFlowGraph = apply(flowGraph.graph)(block) - private def apply(graph: ImmutableGraph[FlowGraphInternal.Vertex, LDiEdge])(block: FlowGraphBuilder ⇒ Unit): PartialFlowGraph = { + private def apply(graph: ImmutableGraph[FlowGraphInternal.Vertex, LkDiEdge])(block: FlowGraphBuilder ⇒ Unit): PartialFlowGraph = { val builder = new FlowGraphBuilder(graph) block(builder) builder.partialBuild() @@ -522,7 +536,7 @@ object PartialFlowGraph { * `PartialFlowGraph` may have sources and sinks that are not attach, and it can therefore not * be `run`. */ -class PartialFlowGraph private[akka] (private[akka] val graph: ImmutableGraph[FlowGraphInternal.Vertex, LDiEdge]) { +class PartialFlowGraph private[akka] (private[akka] val graph: ImmutableGraph[FlowGraphInternal.Vertex, LkDiEdge]) { import FlowGraphInternal._ def undefinedSources: Set[UndefinedSource[_]] = diff --git a/akka-stream/src/test/scala/akka/stream/scaladsl2/FlowGraphCompileSpec.scala b/akka-stream/src/test/scala/akka/stream/scaladsl2/FlowGraphCompileSpec.scala index 59f6371816..100a7d8b47 100644 --- a/akka-stream/src/test/scala/akka/stream/scaladsl2/FlowGraphCompileSpec.scala +++ b/akka-stream/src/test/scala/akka/stream/scaladsl2/FlowGraphCompileSpec.scala @@ -121,12 +121,7 @@ class FlowGraphCompileSpec extends AkkaSpec { val merge = Merge[String] import FlowGraphImplicits._ in1 ~> f1 ~> bcast ~> f2 ~> merge ~> f3 ~> out1 - bcast ~> f4 ~> bcast2 ~> f5 ~> merge - bcast2 ~> f6 ~> out2 - - // FIXME the following is doesn't work because of edge equality - // in1 ~> f1 ~> bcast ~> f2 ~> merge ~> f3 ~> out1 - // bcast ~> f4 ~> merge + bcast ~> f4 ~> merge }.run() }