diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowGraphInitSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowGraphInitSpec.scala index db0d91fd72..d1811c736a 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowGraphInitSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowGraphInitSpec.scala @@ -3,13 +3,13 @@ */ package akka.stream.scaladsl +import akka.stream.FlowMaterializer +import akka.stream.testkit.AkkaSpec + import scala.concurrent.Await import scala.concurrent.Future import scala.concurrent.duration._ -import akka.stream.FlowMaterializer -import akka.stream.testkit.AkkaSpec - class FlowGraphInitSpec extends AkkaSpec { import system.dispatcher @@ -40,5 +40,86 @@ class FlowGraphInitSpec extends AkkaSpec { val graphs = Vector.fill(5)(Future(create())) val result = Await.result(Future.sequence(graphs), 5.seconds).flatten.size should be(5) } + + "fail when the same `KeyedSink` is used in it multiple times" in { + val s = Source(1 to 5) + val b = Broadcast[Int] + + val sink: KeyedSink[Int] = Sink.foreach[Int](i ⇒ i) + val otherSink: KeyedSink[Int] = Sink.foreach[Int](i ⇒ 2 * i) + + FlowGraph { implicit builder ⇒ + import FlowGraphImplicits._ + // format: OFF + s ~> b ~> sink + b ~> otherSink // this is fine + // format: ON + } + + val ex1 = intercept[IllegalArgumentException] { + FlowGraph { implicit builder ⇒ + import FlowGraphImplicits._ + // format: OFF + s ~> b ~> sink + b ~> sink // this is not fine + // format: ON + } + } + ex1.getMessage should include(sink.getClass.getSimpleName) + + val ex2 = intercept[IllegalArgumentException] { + FlowGraph { implicit builder ⇒ + import FlowGraphImplicits._ + // format: OFF + s ~> b ~> sink + b ~> otherSink // this is fine + b ~> sink // this is not fine + // format: ON + } + } + ex2.getMessage should include(sink.getClass.getSimpleName) + } + + "fail when the same `KeyedSource` is used in it multiple times" in { + val s = Sink.ignore + val m = Merge[Int] + + val source1: KeyedSource[Int] = Source.subscriber + val source2: KeyedSource[Int] = Source.subscriber + + FlowGraph { implicit builder ⇒ + import FlowGraphImplicits._ + // KeyedSources of same type should be fine to be mixed + // format: OFF + source1 ~> m + m ~> s + source2 ~> m + // format: ON + } + + val ex1 = intercept[IllegalArgumentException] { + FlowGraph { implicit builder ⇒ + import FlowGraphImplicits._ + // format: OFF + source1 ~> m + m ~> s + source1 ~> m // whoops + // format: ON + } + } + ex1.getMessage should include(source1.getClass.getSimpleName) + + val ex2 = intercept[IllegalArgumentException] { + FlowGraph { implicit builder ⇒ + import FlowGraphImplicits._ + // format: OFF + source1 ~> m + source2 ~> m ~> s + source1 ~> m // this is not fine + // format: ON + } + } + ex2.getMessage should include(source1.getClass.getSimpleName) + } } } diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphFlowSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphFlowSpec.scala index f216b141f3..7320201139 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphFlowSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphFlowSpec.scala @@ -202,7 +202,12 @@ class GraphFlowSpec extends AkkaSpec { val out = UndefinedSink[Int] val probe = StreamTestKit.SubscriberProbe[Int]() - val source = Source[Int]() { implicit b ⇒ + val source1 = Source[Int]() { implicit b ⇒ + import FlowGraphImplicits._ + Source(1 to 5) ~> Flow[Int].map(_ * 2) ~> out + out + } + val source2 = Source[Int]() { implicit b ⇒ import FlowGraphImplicits._ Source(1 to 5) ~> Flow[Int].map(_ * 2) ~> out out @@ -211,8 +216,8 @@ class GraphFlowSpec extends AkkaSpec { FlowGraph { implicit b ⇒ import FlowGraphImplicits._ val merge = Merge[Int]("merge") - source ~> merge ~> Sink(probe) - source ~> Flow[Int].map(_ * 10) ~> merge + source1 ~> merge ~> Sink(probe) + source2 ~> Flow[Int].map(_ * 10) ~> merge }.run() validateProbe(probe, 10, Set(2, 4, 6, 8, 10, 20, 40, 60, 80, 100)) diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/ActorFlowSource.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/ActorFlowSource.scala index e34d4d34d0..ecc257160e 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/ActorFlowSource.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/ActorFlowSource.scala @@ -87,6 +87,7 @@ private[scaladsl] final case class SubscriberSource[Out]() extends KeyedActorFlo override def attach(flowSubscriber: Subscriber[Out], materializer: ActorBasedFlowMaterializer, flowName: String): Subscriber[Out] = flowSubscriber + } /** diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/FlowGraph.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/FlowGraph.scala index d09f62ada7..ebd5c7ccec 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/FlowGraph.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/FlowGraph.scala @@ -5,13 +5,19 @@ package akka.stream.scaladsl import akka.stream.impl.Ast.FanInAstNode import akka.stream.impl.Ast +import java.util + +import org.reactivestreams.Publisher +import org.reactivestreams.Subscriber + import scala.language.existentials -import scalax.collection.edge.{ LkBase, LkDiEdge } -import scalax.collection.mutable.Graph +import scalax.collection.edge.LkBase +import scalax.collection.edge.LkDiEdge import scalax.collection.immutable.{ Graph ⇒ ImmutableGraph } import org.reactivestreams.Subscriber import org.reactivestreams.Publisher import akka.stream.FlowMaterializer +import scalax.collection.mutable.Graph /** * Fan-in and fan-out vertices in the [[FlowGraph]] implements @@ -455,18 +461,44 @@ private[akka] object FlowGraphInternal { case class SourceVertex(source: Source[_]) extends Vertex { override def toString = source.toString - // these are unique keys, case class equality would break them - final override def equals(other: Any): Boolean = super.equals(other) - final override def hashCode: Int = super.hashCode + + /** + * These are unique keys, case class equality would break them. + * In the case of KeyedSources we MUST compare by object equality, in order to avoid ambigiousities in materialization. + */ + final override def equals(other: Any): Boolean = other match { + case v: SourceVertex ⇒ (source, v.source) match { + case (k1: KeyedSource[_], k2: KeyedSource[_]) ⇒ k1 == k2 + case _ ⇒ super.equals(other) + } + case _ ⇒ false + } + final override def hashCode: Int = source match { + case k: KeyedSource[_] ⇒ k.hashCode + case _ ⇒ super.hashCode + } final override private[scaladsl] def newInstance() = this.copy() } case class SinkVertex(sink: Sink[_]) extends Vertex { override def toString = sink.toString - // these are unique keys, case class equality would break them - final override def equals(other: Any): Boolean = super.equals(other) - final override def hashCode: Int = super.hashCode + + /** + * These are unique keys, case class equality would break them. + * In the case of KeyedSources we MUST compare by object equality, in order to avoid ambigiousities in materialization. + */ + final override def equals(other: Any): Boolean = other match { + case v: SinkVertex ⇒ (sink, v.sink) match { + case (k1: KeyedSink[_], k2: KeyedSink[_]) ⇒ k1 == k2 + case _ ⇒ super.equals(other) + } + case _ ⇒ false + } + final override def hashCode: Int = sink match { + case k: KeyedSink[_] ⇒ k.hashCode + case _ ⇒ super.hashCode + } final override private[scaladsl] def newInstance() = this.copy() } @@ -873,6 +905,8 @@ class FlowGraphBuilder private (graph: Graph[FlowGraphInternal.Vertex, FlowGraph } private def checkAddSourceSinkPrecondition(vertex: Vertex): Unit = { + checkAmbigiousKeyedElement(vertex) + vertex match { case node @ (_: UndefinedSource[_] | _: UndefinedSink[_]) ⇒ require(!graph.contains(node), s"[$node] instance is already used in this flow graph") @@ -880,6 +914,18 @@ class FlowGraphBuilder private (graph: Graph[FlowGraphInternal.Vertex, FlowGraph } } + private def checkAmbigiousKeyedElement(vertex: Vertex): Unit = { + def warningMessage(el: Any): String = + s"An `${el}` instance MUST NOT be used more than once in a `FlowGraph` to avoid ambiguity. " + + s"Use individual instances instead the same one multiple times instead. Nodes are: ${graph.nodes}" + + vertex match { + case v: SourceVertex if v.source.isInstanceOf[KeyedSource[_]] ⇒ require(!graph.contains(v), warningMessage(v.source)) + case v: SinkVertex if v.sink.isInstanceOf[KeyedSink[_]] ⇒ require(!graph.contains(v), warningMessage(v.sink)) + case _ ⇒ // ok + } + } + private def checkAddOrReplaceSourceSinkPrecondition(vertex: Vertex): Unit = { vertex match { // it is ok to add or replace edges with new or existing undefined sources or sinks @@ -896,7 +942,7 @@ class FlowGraphBuilder private (graph: Graph[FlowGraphInternal.Vertex, FlowGraph case Some(node) ⇒ require( (node.inDegree + 1) <= iv.maximumInputCount, - s"${node.value} must have at most ${iv.maximumInputCount} incoming edges, , has ${node.inDegree}\n${graph.edges}") + s"${node.value} must have at most ${iv.maximumInputCount} incoming edges, has ${node.inDegree}\n${graph.edges}") case _ ⇒ // ok } case _ ⇒ // ok, no checks here