From 419daa7a86181de824ce833579944d78f084427d Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Fri, 12 Sep 2014 12:12:32 +0200 Subject: [PATCH 1/2] =str #15869 Re-enable flow graph checks --- .../akka/stream/scaladsl2/FlowGraph.scala | 51 ++++++++++--------- .../scaladsl2/FlowGraphCompileSpec.scala | 22 ++++++++ 2 files changed, 48 insertions(+), 25 deletions(-) diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl2/FlowGraph.scala b/akka-stream/src/main/scala/akka/stream/scaladsl2/FlowGraph.scala index 2cf2c043a3..4afede1fb2 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl2/FlowGraph.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl2/FlowGraph.scala @@ -363,29 +363,31 @@ class FlowGraphBuilder private (graph: Graph[FlowGraphInternal.Vertex, LkDiEdge] require(graph.find(node) == None, s"[$node] instance is already used in this flow graph") private def checkJunctionInPortPrecondition(junction: JunctionInPort[_]): Unit = { - // FIXME: Reenable checks - // junction match { - // case _: FanOutOperation[_] if in ⇒ - // graph.find(junction.vertex) match { - // case Some(existing) if existing.incoming.nonEmpty ⇒ - // throw new IllegalArgumentException(s"Fan-out [$junction] is already attached to input [${existing.incoming.head}]") - // case _ ⇒ // ok - // } - // case _ ⇒ // ok - // } + junction.vertex match { + case iv: InternalVertex ⇒ + graph.find(iv) match { + case Some(node) ⇒ + require( + (node.inDegree + 1) <= iv.maximumInputCount, + s"${node.value}) must have at most ${iv.maximumInputCount} incoming edges") + case _ ⇒ // ok + } + case _ ⇒ // ok, no checks here + } } private def checkJunctionOutPortPrecondition(junction: JunctionOutPort[_]): Unit = { - // FIXME: Reenable checks - // junction match { - // case _: FanInOperation[_] if !in ⇒ - // graph.find(junction.vertex) match { - // case Some(existing) if existing.outgoing.nonEmpty ⇒ - // throw new IllegalArgumentException(s"Fan-in [$junction] is already attached to output [${existing.outgoing.head}]") - // case _ ⇒ // ok - // } - // case _ ⇒ // ok - // } + junction.vertex match { + case iv: InternalVertex ⇒ + graph.find(iv) match { + case Some(node) ⇒ + require( + (node.outDegree + 1) <= iv.maximumOutputCount, + s"${node.value}) must have at most ${iv.maximumOutputCount} outgoing edges") + case _ ⇒ // ok + } + case _ ⇒ // ok, no checks here + } } /** @@ -431,22 +433,21 @@ class FlowGraphBuilder private (graph: Graph[FlowGraphInternal.Vertex, LkDiEdge] throw new IllegalArgumentException("Undefined sources or sinks: " + formatted.mkString(", ")) } - // we will be able to relax these checks graph.nodes.foreach { node ⇒ node.value match { case v: InternalVertex ⇒ require( node.inDegree >= v.minimumInputCount, - s"${node.incoming}) must have at least ${v.minimumInputCount} incoming edges") + s"$v must have at least ${v.minimumInputCount} incoming edges") require( node.inDegree <= v.maximumInputCount, - s"${node.incoming}) must have at most ${v.maximumInputCount} incoming edges") + s"$v must have at most ${v.maximumInputCount} incoming edges") require( node.outDegree >= v.minimumOutputCount, - s"${node.outgoing.size}) must have at least ${v.minimumOutputCount} outgoing edges") + s"$v must have at least ${v.minimumOutputCount} outgoing edges") require( node.outDegree <= v.maximumOutputCount, - s"${node.incoming}) must have at most ${v.maximumOutputCount} outgoing edges") + s"$v must have at most ${v.maximumOutputCount} outgoing edges") case _ ⇒ // no check for other node types } } diff --git a/akka-stream/src/test/scala/akka/stream/scaladsl2/FlowGraphCompileSpec.scala b/akka-stream/src/test/scala/akka/stream/scaladsl2/FlowGraphCompileSpec.scala index 9dce1e0124..8185f47e6d 100644 --- a/akka-stream/src/test/scala/akka/stream/scaladsl2/FlowGraphCompileSpec.scala +++ b/akka-stream/src/test/scala/akka/stream/scaladsl2/FlowGraphCompileSpec.scala @@ -251,5 +251,27 @@ class FlowGraphCompileSpec extends AkkaSpec { }.getMessage should include("empty") } + "check maximumInputCount" in { + intercept[IllegalArgumentException] { + FlowGraph { implicit b ⇒ + val bcast = Broadcast[String] + import FlowGraphImplicits._ + in1 ~> bcast ~> out1 + in2 ~> bcast // wrong + } + }.getMessage should include("at most 1 incoming") + } + + "check maximumOutputCount" in { + intercept[IllegalArgumentException] { + FlowGraph { implicit b ⇒ + val merge = Merge[String] + import FlowGraphImplicits._ + in1 ~> merge ~> out1 + merge ~> out2 // wrong + } + }.getMessage should include("at most 1 outgoing") + } + } } From c2d8891cd8eaad6d14a0f6e2906be73239e31b24 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Fri, 12 Sep 2014 12:35:10 +0200 Subject: [PATCH 2/2] =str make some FlowGraph things akka internal --- .../akka/stream/scaladsl2/FlowGraph.scala | 95 +++++++++++-------- .../scaladsl2/FlowGraphCompileSpec.scala | 4 +- 2 files changed, 55 insertions(+), 44 deletions(-) diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl2/FlowGraph.scala b/akka-stream/src/main/scala/akka/stream/scaladsl2/FlowGraph.scala index 4afede1fb2..eb7aaee529 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl2/FlowGraph.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl2/FlowGraph.scala @@ -14,24 +14,35 @@ import akka.stream.impl2.Ast /** * Fan-in and fan-out vertices in the [[FlowGraph]] implements - * this marker interface. + * this marker interface. Edges may end at a `JunctionInPort`. */ -sealed trait Junction[T] extends JunctionInPort[T] with JunctionOutPort[T] { - override def port: Int = FlowGraphInternal.UnlabeledPort - override def vertex: FlowGraphInternal.Vertex - type NextT = T - override def next = this +sealed trait JunctionInPort[T] { + private[akka] def port: Int = FlowGraphInternal.UnlabeledPort + private[akka] def vertex: FlowGraphInternal.Vertex + type NextT + private[akka] def next: JunctionOutPort[NextT] } -sealed trait JunctionInPort[T] { - def port: Int = FlowGraphInternal.UnlabeledPort - def vertex: FlowGraphInternal.Vertex - type NextT - def next: JunctionOutPort[NextT] -} +/** + * Fan-in and fan-out vertices in the [[FlowGraph]] implements + * this marker interface. Edges may start at a `JunctionOutPort`. + */ sealed trait JunctionOutPort[T] { - def port: Int = FlowGraphInternal.UnlabeledPort - def vertex: FlowGraphInternal.Vertex + private[akka] def port: Int = FlowGraphInternal.UnlabeledPort + private[akka] def vertex: FlowGraphInternal.Vertex +} + +/** + * INTERNAL API + * + * Fan-in and fan-out vertices in the [[FlowGraph]] implements + * this marker interface. + */ +private[akka] sealed trait Junction[T] extends JunctionInPort[T] with JunctionOutPort[T] { + override private[akka] def port: Int = FlowGraphInternal.UnlabeledPort + override private[akka] def vertex: FlowGraphInternal.Vertex + override type NextT = T + override private[akka] def next = this } object Merge { @@ -58,7 +69,7 @@ object Merge { * and one output flow/sink to the `Merge` vertex. */ final class Merge[T](override val name: Option[String]) extends FlowGraphInternal.InternalVertex with Junction[T] { - override val vertex = this + override private[akka] val vertex = this override val minimumInputCount: Int = 2 override val maximumInputCount: Int = Int.MaxValue override val minimumOutputCount: Int = 1 @@ -87,11 +98,11 @@ object Broadcast { * two downstream subscribers have been established. */ final class Broadcast[T](override val name: Option[String]) extends FlowGraphInternal.InternalVertex with Junction[T] { - override val vertex = this - override val minimumInputCount: Int = 1 - override val maximumInputCount: Int = 1 - override val minimumOutputCount: Int = 2 - override val maximumOutputCount: Int = Int.MaxValue + override private[akka] def vertex = this + override def minimumInputCount: Int = 1 + override def maximumInputCount: Int = 1 + override def minimumOutputCount: Int = 2 + override def maximumOutputCount: Int = Int.MaxValue } object Zip { @@ -111,17 +122,17 @@ object Zip { */ def apply[A, B](name: String): Zip[A, B] = new Zip[A, B](Some(name)) - class Left[A, B] private[akka] (val vertex: Zip[A, B]) extends JunctionInPort[A] { - override val port = 0 + class Left[A, B] private[akka] (private[akka] val vertex: Zip[A, B]) extends JunctionInPort[A] { + override private[akka] def port = 0 type NextT = (A, B) - override def next = vertex.out + override private[akka] def next = vertex.out } - class Right[A, B] private[akka] (val vertex: Zip[A, B]) extends JunctionInPort[B] { - override val port = 1 + class Right[A, B] private[akka] (private[akka] val vertex: Zip[A, B]) extends JunctionInPort[B] { + override private[akka] def port = 1 type NextT = (A, B) - override def next = vertex.out + override private[akka] def next = vertex.out } - class Out[A, B] private[akka] (val vertex: Zip[A, B]) extends JunctionOutPort[(A, B)] + class Out[A, B] private[akka] (private[akka] val vertex: Zip[A, B]) extends JunctionOutPort[(A, B)] } /** @@ -134,10 +145,10 @@ final class Zip[A, B](override val name: Option[String]) extends FlowGraphIntern val right = new Zip.Right(this) val out = new Zip.Out(this) - override val minimumInputCount: Int = 2 - override val maximumInputCount: Int = 2 - override val minimumOutputCount: Int = 1 - override val maximumOutputCount: Int = 1 + override def minimumInputCount: Int = 2 + override def maximumInputCount: Int = 2 + override def minimumOutputCount: Int = 1 + override def maximumOutputCount: Int = 1 } object UndefinedSink { @@ -162,10 +173,10 @@ object UndefinedSink { * be replaced with [[FlowGraphBuilder#attachSink]]. */ final class UndefinedSink[T](override val name: Option[String]) extends FlowGraphInternal.InternalVertex { - override val minimumInputCount: Int = 1 - override val maximumInputCount: Int = 1 - override val minimumOutputCount: Int = 0 - override val maximumOutputCount: Int = 0 + override def minimumInputCount: Int = 1 + override def maximumInputCount: Int = 1 + override def minimumOutputCount: Int = 0 + override def maximumOutputCount: Int = 0 } object UndefinedSource { @@ -190,10 +201,10 @@ object UndefinedSource { * be replaced with [[FlowGraphBuilder#attachSource]]. */ final class UndefinedSource[T](override val name: Option[String]) extends FlowGraphInternal.InternalVertex { - override val minimumInputCount: Int = 0 - override val maximumInputCount: Int = 0 - override val minimumOutputCount: Int = 1 - override val maximumOutputCount: Int = 1 + override def minimumInputCount: Int = 0 + override def maximumInputCount: Int = 0 + override def minimumOutputCount: Int = 1 + override def maximumOutputCount: Int = 1 } /** @@ -201,7 +212,7 @@ final class UndefinedSource[T](override val name: Option[String]) extends FlowGr */ private[akka] object FlowGraphInternal { - val UnlabeledPort = -1 + def UnlabeledPort = -1 sealed trait Vertex case class SourceVertex(source: Source[_]) extends Vertex { @@ -369,7 +380,7 @@ class FlowGraphBuilder private (graph: Graph[FlowGraphInternal.Vertex, LkDiEdge] case Some(node) ⇒ require( (node.inDegree + 1) <= iv.maximumInputCount, - s"${node.value}) must have at most ${iv.maximumInputCount} incoming edges") + s"${node.value} must have at most ${iv.maximumInputCount} incoming edges") case _ ⇒ // ok } case _ ⇒ // ok, no checks here @@ -383,7 +394,7 @@ class FlowGraphBuilder private (graph: Graph[FlowGraphInternal.Vertex, LkDiEdge] case Some(node) ⇒ require( (node.outDegree + 1) <= iv.maximumOutputCount, - s"${node.value}) must have at most ${iv.maximumOutputCount} outgoing edges") + s"${node.value} must have at most ${iv.maximumOutputCount} outgoing edges") case _ ⇒ // ok } case _ ⇒ // ok, no checks here diff --git a/akka-stream/src/test/scala/akka/stream/scaladsl2/FlowGraphCompileSpec.scala b/akka-stream/src/test/scala/akka/stream/scaladsl2/FlowGraphCompileSpec.scala index 8185f47e6d..6c4e8e1cc4 100644 --- a/akka-stream/src/test/scala/akka/stream/scaladsl2/FlowGraphCompileSpec.scala +++ b/akka-stream/src/test/scala/akka/stream/scaladsl2/FlowGraphCompileSpec.scala @@ -89,7 +89,7 @@ class FlowGraphCompileSpec extends AkkaSpec { val merge = Merge[String] val bcast1 = Broadcast[String] val bcast2 = Broadcast[String] - val feedbackLoopBuffer = FlowFrom[String].buffer(10, OverflowStrategy.DropBuffer) + val feedbackLoopBuffer = FlowFrom[String].buffer(10, OverflowStrategy.dropBuffer) b. addEdge(in1, f1, merge). addEdge(merge, f2, bcast1). @@ -108,7 +108,7 @@ class FlowGraphCompileSpec extends AkkaSpec { val merge = Merge[String] val bcast1 = Broadcast[String] val bcast2 = Broadcast[String] - val feedbackLoopBuffer = FlowFrom[String].buffer(10, OverflowStrategy.DropBuffer) + val feedbackLoopBuffer = FlowFrom[String].buffer(10, OverflowStrategy.dropBuffer) import FlowGraphImplicits._ in1 ~> f1 ~> merge ~> f2 ~> bcast1 ~> f3 ~> out1 bcast1 ~> feedbackLoopBuffer ~> bcast2 ~> f5 ~> merge