diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl2/FlowGraph.scala b/akka-stream/src/main/scala/akka/stream/scaladsl2/FlowGraph.scala index 2cf2c043a3..4afede1fb2 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl2/FlowGraph.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl2/FlowGraph.scala @@ -363,29 +363,31 @@ class FlowGraphBuilder private (graph: Graph[FlowGraphInternal.Vertex, LkDiEdge] require(graph.find(node) == None, s"[$node] instance is already used in this flow graph") private def checkJunctionInPortPrecondition(junction: JunctionInPort[_]): Unit = { - // FIXME: Reenable checks - // junction match { - // case _: FanOutOperation[_] if in ⇒ - // graph.find(junction.vertex) match { - // case Some(existing) if existing.incoming.nonEmpty ⇒ - // throw new IllegalArgumentException(s"Fan-out [$junction] is already attached to input [${existing.incoming.head}]") - // case _ ⇒ // ok - // } - // case _ ⇒ // ok - // } + junction.vertex match { + case iv: InternalVertex ⇒ + graph.find(iv) match { + case Some(node) ⇒ + require( + (node.inDegree + 1) <= iv.maximumInputCount, + s"${node.value}) must have at most ${iv.maximumInputCount} incoming edges") + case _ ⇒ // ok + } + case _ ⇒ // ok, no checks here + } } private def checkJunctionOutPortPrecondition(junction: JunctionOutPort[_]): Unit = { - // FIXME: Reenable checks - // junction match { - // case _: FanInOperation[_] if !in ⇒ - // graph.find(junction.vertex) match { - // case Some(existing) if existing.outgoing.nonEmpty ⇒ - // throw new IllegalArgumentException(s"Fan-in [$junction] is already attached to output [${existing.outgoing.head}]") - // case _ ⇒ // ok - // } - // case _ ⇒ // ok - // } + junction.vertex match { + case iv: InternalVertex ⇒ + graph.find(iv) match { + case Some(node) ⇒ + require( + (node.outDegree + 1) <= iv.maximumOutputCount, + s"${node.value}) must have at most ${iv.maximumOutputCount} outgoing edges") + case _ ⇒ // ok + } + case _ ⇒ // ok, no checks here + } } /** @@ -431,22 +433,21 @@ class FlowGraphBuilder private (graph: Graph[FlowGraphInternal.Vertex, LkDiEdge] throw new IllegalArgumentException("Undefined sources or sinks: " + formatted.mkString(", ")) } - // we will be able to relax these checks graph.nodes.foreach { node ⇒ node.value match { case v: InternalVertex ⇒ require( node.inDegree >= v.minimumInputCount, - s"${node.incoming}) must have at least ${v.minimumInputCount} incoming edges") + s"$v must have at least ${v.minimumInputCount} incoming edges") require( node.inDegree <= v.maximumInputCount, - s"${node.incoming}) must have at most ${v.maximumInputCount} incoming edges") + s"$v must have at most ${v.maximumInputCount} incoming edges") require( node.outDegree >= v.minimumOutputCount, - s"${node.outgoing.size}) must have at least ${v.minimumOutputCount} outgoing edges") + s"$v must have at least ${v.minimumOutputCount} outgoing edges") require( node.outDegree <= v.maximumOutputCount, - s"${node.incoming}) must have at most ${v.maximumOutputCount} outgoing edges") + s"$v must have at most ${v.maximumOutputCount} outgoing edges") case _ ⇒ // no check for other node types } } diff --git a/akka-stream/src/test/scala/akka/stream/scaladsl2/FlowGraphCompileSpec.scala b/akka-stream/src/test/scala/akka/stream/scaladsl2/FlowGraphCompileSpec.scala index 9dce1e0124..8185f47e6d 100644 --- a/akka-stream/src/test/scala/akka/stream/scaladsl2/FlowGraphCompileSpec.scala +++ b/akka-stream/src/test/scala/akka/stream/scaladsl2/FlowGraphCompileSpec.scala @@ -251,5 +251,27 @@ class FlowGraphCompileSpec extends AkkaSpec { }.getMessage should include("empty") } + "check maximumInputCount" in { + intercept[IllegalArgumentException] { + FlowGraph { implicit b ⇒ + val bcast = Broadcast[String] + import FlowGraphImplicits._ + in1 ~> bcast ~> out1 + in2 ~> bcast // wrong + } + }.getMessage should include("at most 1 incoming") + } + + "check maximumOutputCount" in { + intercept[IllegalArgumentException] { + FlowGraph { implicit b ⇒ + val merge = Merge[String] + import FlowGraphImplicits._ + in1 ~> merge ~> out1 + merge ~> out2 // wrong + } + }.getMessage should include("at most 1 outgoing") + } + } }