Merge pull request #15878 from akka/wip-15869-preconditions-patriknw
=str #15869 Re-enable flow graph checks
This commit is contained in:
commit
67cf00221f
2 changed files with 101 additions and 67 deletions
|
|
@ -14,24 +14,35 @@ import akka.stream.impl2.Ast
|
|||
|
||||
/**
|
||||
* Fan-in and fan-out vertices in the [[FlowGraph]] implements
|
||||
* this marker interface.
|
||||
* this marker interface. Edges may end at a `JunctionInPort`.
|
||||
*/
|
||||
sealed trait Junction[T] extends JunctionInPort[T] with JunctionOutPort[T] {
|
||||
override def port: Int = FlowGraphInternal.UnlabeledPort
|
||||
override def vertex: FlowGraphInternal.Vertex
|
||||
type NextT = T
|
||||
override def next = this
|
||||
sealed trait JunctionInPort[T] {
|
||||
private[akka] def port: Int = FlowGraphInternal.UnlabeledPort
|
||||
private[akka] def vertex: FlowGraphInternal.Vertex
|
||||
type NextT
|
||||
private[akka] def next: JunctionOutPort[NextT]
|
||||
}
|
||||
|
||||
sealed trait JunctionInPort[T] {
|
||||
def port: Int = FlowGraphInternal.UnlabeledPort
|
||||
def vertex: FlowGraphInternal.Vertex
|
||||
type NextT
|
||||
def next: JunctionOutPort[NextT]
|
||||
}
|
||||
/**
|
||||
* Fan-in and fan-out vertices in the [[FlowGraph]] implements
|
||||
* this marker interface. Edges may start at a `JunctionOutPort`.
|
||||
*/
|
||||
sealed trait JunctionOutPort[T] {
|
||||
def port: Int = FlowGraphInternal.UnlabeledPort
|
||||
def vertex: FlowGraphInternal.Vertex
|
||||
private[akka] def port: Int = FlowGraphInternal.UnlabeledPort
|
||||
private[akka] def vertex: FlowGraphInternal.Vertex
|
||||
}
|
||||
|
||||
/**
|
||||
* INTERNAL API
|
||||
*
|
||||
* Fan-in and fan-out vertices in the [[FlowGraph]] implements
|
||||
* this marker interface.
|
||||
*/
|
||||
private[akka] sealed trait Junction[T] extends JunctionInPort[T] with JunctionOutPort[T] {
|
||||
override private[akka] def port: Int = FlowGraphInternal.UnlabeledPort
|
||||
override private[akka] def vertex: FlowGraphInternal.Vertex
|
||||
override type NextT = T
|
||||
override private[akka] def next = this
|
||||
}
|
||||
|
||||
object Merge {
|
||||
|
|
@ -58,7 +69,7 @@ object Merge {
|
|||
* and one output flow/sink to the `Merge` vertex.
|
||||
*/
|
||||
final class Merge[T](override val name: Option[String]) extends FlowGraphInternal.InternalVertex with Junction[T] {
|
||||
override val vertex = this
|
||||
override private[akka] val vertex = this
|
||||
override val minimumInputCount: Int = 2
|
||||
override val maximumInputCount: Int = Int.MaxValue
|
||||
override val minimumOutputCount: Int = 1
|
||||
|
|
@ -87,11 +98,11 @@ object Broadcast {
|
|||
* two downstream subscribers have been established.
|
||||
*/
|
||||
final class Broadcast[T](override val name: Option[String]) extends FlowGraphInternal.InternalVertex with Junction[T] {
|
||||
override val vertex = this
|
||||
override val minimumInputCount: Int = 1
|
||||
override val maximumInputCount: Int = 1
|
||||
override val minimumOutputCount: Int = 2
|
||||
override val maximumOutputCount: Int = Int.MaxValue
|
||||
override private[akka] def vertex = this
|
||||
override def minimumInputCount: Int = 1
|
||||
override def maximumInputCount: Int = 1
|
||||
override def minimumOutputCount: Int = 2
|
||||
override def maximumOutputCount: Int = Int.MaxValue
|
||||
}
|
||||
|
||||
object Zip {
|
||||
|
|
@ -111,17 +122,17 @@ object Zip {
|
|||
*/
|
||||
def apply[A, B](name: String): Zip[A, B] = new Zip[A, B](Some(name))
|
||||
|
||||
class Left[A, B] private[akka] (val vertex: Zip[A, B]) extends JunctionInPort[A] {
|
||||
override val port = 0
|
||||
class Left[A, B] private[akka] (private[akka] val vertex: Zip[A, B]) extends JunctionInPort[A] {
|
||||
override private[akka] def port = 0
|
||||
type NextT = (A, B)
|
||||
override def next = vertex.out
|
||||
override private[akka] def next = vertex.out
|
||||
}
|
||||
class Right[A, B] private[akka] (val vertex: Zip[A, B]) extends JunctionInPort[B] {
|
||||
override val port = 1
|
||||
class Right[A, B] private[akka] (private[akka] val vertex: Zip[A, B]) extends JunctionInPort[B] {
|
||||
override private[akka] def port = 1
|
||||
type NextT = (A, B)
|
||||
override def next = vertex.out
|
||||
override private[akka] def next = vertex.out
|
||||
}
|
||||
class Out[A, B] private[akka] (val vertex: Zip[A, B]) extends JunctionOutPort[(A, B)]
|
||||
class Out[A, B] private[akka] (private[akka] val vertex: Zip[A, B]) extends JunctionOutPort[(A, B)]
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -134,10 +145,10 @@ final class Zip[A, B](override val name: Option[String]) extends FlowGraphIntern
|
|||
val right = new Zip.Right(this)
|
||||
val out = new Zip.Out(this)
|
||||
|
||||
override val minimumInputCount: Int = 2
|
||||
override val maximumInputCount: Int = 2
|
||||
override val minimumOutputCount: Int = 1
|
||||
override val maximumOutputCount: Int = 1
|
||||
override def minimumInputCount: Int = 2
|
||||
override def maximumInputCount: Int = 2
|
||||
override def minimumOutputCount: Int = 1
|
||||
override def maximumOutputCount: Int = 1
|
||||
}
|
||||
|
||||
object UndefinedSink {
|
||||
|
|
@ -162,10 +173,10 @@ object UndefinedSink {
|
|||
* be replaced with [[FlowGraphBuilder#attachSink]].
|
||||
*/
|
||||
final class UndefinedSink[T](override val name: Option[String]) extends FlowGraphInternal.InternalVertex {
|
||||
override val minimumInputCount: Int = 1
|
||||
override val maximumInputCount: Int = 1
|
||||
override val minimumOutputCount: Int = 0
|
||||
override val maximumOutputCount: Int = 0
|
||||
override def minimumInputCount: Int = 1
|
||||
override def maximumInputCount: Int = 1
|
||||
override def minimumOutputCount: Int = 0
|
||||
override def maximumOutputCount: Int = 0
|
||||
}
|
||||
|
||||
object UndefinedSource {
|
||||
|
|
@ -190,10 +201,10 @@ object UndefinedSource {
|
|||
* be replaced with [[FlowGraphBuilder#attachSource]].
|
||||
*/
|
||||
final class UndefinedSource[T](override val name: Option[String]) extends FlowGraphInternal.InternalVertex {
|
||||
override val minimumInputCount: Int = 0
|
||||
override val maximumInputCount: Int = 0
|
||||
override val minimumOutputCount: Int = 1
|
||||
override val maximumOutputCount: Int = 1
|
||||
override def minimumInputCount: Int = 0
|
||||
override def maximumInputCount: Int = 0
|
||||
override def minimumOutputCount: Int = 1
|
||||
override def maximumOutputCount: Int = 1
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -201,7 +212,7 @@ final class UndefinedSource[T](override val name: Option[String]) extends FlowGr
|
|||
*/
|
||||
private[akka] object FlowGraphInternal {
|
||||
|
||||
val UnlabeledPort = -1
|
||||
def UnlabeledPort = -1
|
||||
|
||||
sealed trait Vertex
|
||||
case class SourceVertex(source: Source[_]) extends Vertex {
|
||||
|
|
@ -363,29 +374,31 @@ class FlowGraphBuilder private (graph: Graph[FlowGraphInternal.Vertex, LkDiEdge]
|
|||
require(graph.find(node) == None, s"[$node] instance is already used in this flow graph")
|
||||
|
||||
private def checkJunctionInPortPrecondition(junction: JunctionInPort[_]): Unit = {
|
||||
// FIXME: Reenable checks
|
||||
// junction match {
|
||||
// case _: FanOutOperation[_] if in ⇒
|
||||
// graph.find(junction.vertex) match {
|
||||
// case Some(existing) if existing.incoming.nonEmpty ⇒
|
||||
// throw new IllegalArgumentException(s"Fan-out [$junction] is already attached to input [${existing.incoming.head}]")
|
||||
// case _ ⇒ // ok
|
||||
// }
|
||||
// case _ ⇒ // ok
|
||||
// }
|
||||
junction.vertex match {
|
||||
case iv: InternalVertex ⇒
|
||||
graph.find(iv) match {
|
||||
case Some(node) ⇒
|
||||
require(
|
||||
(node.inDegree + 1) <= iv.maximumInputCount,
|
||||
s"${node.value} must have at most ${iv.maximumInputCount} incoming edges")
|
||||
case _ ⇒ // ok
|
||||
}
|
||||
case _ ⇒ // ok, no checks here
|
||||
}
|
||||
}
|
||||
|
||||
private def checkJunctionOutPortPrecondition(junction: JunctionOutPort[_]): Unit = {
|
||||
// FIXME: Reenable checks
|
||||
// junction match {
|
||||
// case _: FanInOperation[_] if !in ⇒
|
||||
// graph.find(junction.vertex) match {
|
||||
// case Some(existing) if existing.outgoing.nonEmpty ⇒
|
||||
// throw new IllegalArgumentException(s"Fan-in [$junction] is already attached to output [${existing.outgoing.head}]")
|
||||
// case _ ⇒ // ok
|
||||
// }
|
||||
// case _ ⇒ // ok
|
||||
// }
|
||||
junction.vertex match {
|
||||
case iv: InternalVertex ⇒
|
||||
graph.find(iv) match {
|
||||
case Some(node) ⇒
|
||||
require(
|
||||
(node.outDegree + 1) <= iv.maximumOutputCount,
|
||||
s"${node.value} must have at most ${iv.maximumOutputCount} outgoing edges")
|
||||
case _ ⇒ // ok
|
||||
}
|
||||
case _ ⇒ // ok, no checks here
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -431,22 +444,21 @@ class FlowGraphBuilder private (graph: Graph[FlowGraphInternal.Vertex, LkDiEdge]
|
|||
throw new IllegalArgumentException("Undefined sources or sinks: " + formatted.mkString(", "))
|
||||
}
|
||||
|
||||
// we will be able to relax these checks
|
||||
graph.nodes.foreach { node ⇒
|
||||
node.value match {
|
||||
case v: InternalVertex ⇒
|
||||
require(
|
||||
node.inDegree >= v.minimumInputCount,
|
||||
s"${node.incoming}) must have at least ${v.minimumInputCount} incoming edges")
|
||||
s"$v must have at least ${v.minimumInputCount} incoming edges")
|
||||
require(
|
||||
node.inDegree <= v.maximumInputCount,
|
||||
s"${node.incoming}) must have at most ${v.maximumInputCount} incoming edges")
|
||||
s"$v must have at most ${v.maximumInputCount} incoming edges")
|
||||
require(
|
||||
node.outDegree >= v.minimumOutputCount,
|
||||
s"${node.outgoing.size}) must have at least ${v.minimumOutputCount} outgoing edges")
|
||||
s"$v must have at least ${v.minimumOutputCount} outgoing edges")
|
||||
require(
|
||||
node.outDegree <= v.maximumOutputCount,
|
||||
s"${node.incoming}) must have at most ${v.maximumOutputCount} outgoing edges")
|
||||
s"$v must have at most ${v.maximumOutputCount} outgoing edges")
|
||||
case _ ⇒ // no check for other node types
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -89,7 +89,7 @@ class FlowGraphCompileSpec extends AkkaSpec {
|
|||
val merge = Merge[String]
|
||||
val bcast1 = Broadcast[String]
|
||||
val bcast2 = Broadcast[String]
|
||||
val feedbackLoopBuffer = FlowFrom[String].buffer(10, OverflowStrategy.DropBuffer)
|
||||
val feedbackLoopBuffer = FlowFrom[String].buffer(10, OverflowStrategy.dropBuffer)
|
||||
b.
|
||||
addEdge(in1, f1, merge).
|
||||
addEdge(merge, f2, bcast1).
|
||||
|
|
@ -108,7 +108,7 @@ class FlowGraphCompileSpec extends AkkaSpec {
|
|||
val merge = Merge[String]
|
||||
val bcast1 = Broadcast[String]
|
||||
val bcast2 = Broadcast[String]
|
||||
val feedbackLoopBuffer = FlowFrom[String].buffer(10, OverflowStrategy.DropBuffer)
|
||||
val feedbackLoopBuffer = FlowFrom[String].buffer(10, OverflowStrategy.dropBuffer)
|
||||
import FlowGraphImplicits._
|
||||
in1 ~> f1 ~> merge ~> f2 ~> bcast1 ~> f3 ~> out1
|
||||
bcast1 ~> feedbackLoopBuffer ~> bcast2 ~> f5 ~> merge
|
||||
|
|
@ -251,5 +251,27 @@ class FlowGraphCompileSpec extends AkkaSpec {
|
|||
}.getMessage should include("empty")
|
||||
}
|
||||
|
||||
"check maximumInputCount" in {
|
||||
intercept[IllegalArgumentException] {
|
||||
FlowGraph { implicit b ⇒
|
||||
val bcast = Broadcast[String]
|
||||
import FlowGraphImplicits._
|
||||
in1 ~> bcast ~> out1
|
||||
in2 ~> bcast // wrong
|
||||
}
|
||||
}.getMessage should include("at most 1 incoming")
|
||||
}
|
||||
|
||||
"check maximumOutputCount" in {
|
||||
intercept[IllegalArgumentException] {
|
||||
FlowGraph { implicit b ⇒
|
||||
val merge = Merge[String]
|
||||
import FlowGraphImplicits._
|
||||
in1 ~> merge ~> out1
|
||||
merge ~> out2 // wrong
|
||||
}
|
||||
}.getMessage should include("at most 1 outgoing")
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue