=str make some FlowGraph things akka internal

This commit is contained in:
Patrik Nordwall 2014-09-12 12:35:10 +02:00
parent 419daa7a86
commit c2d8891cd8
2 changed files with 55 additions and 44 deletions

View file

@ -14,24 +14,35 @@ import akka.stream.impl2.Ast
/** /**
* Fan-in and fan-out vertices in the [[FlowGraph]] implements * Fan-in and fan-out vertices in the [[FlowGraph]] implements
* this marker interface. * this marker interface. Edges may end at a `JunctionInPort`.
*/ */
sealed trait Junction[T] extends JunctionInPort[T] with JunctionOutPort[T] { sealed trait JunctionInPort[T] {
override def port: Int = FlowGraphInternal.UnlabeledPort private[akka] def port: Int = FlowGraphInternal.UnlabeledPort
override def vertex: FlowGraphInternal.Vertex private[akka] def vertex: FlowGraphInternal.Vertex
type NextT = T type NextT
override def next = this private[akka] def next: JunctionOutPort[NextT]
} }
sealed trait JunctionInPort[T] { /**
def port: Int = FlowGraphInternal.UnlabeledPort * Fan-in and fan-out vertices in the [[FlowGraph]] implements
def vertex: FlowGraphInternal.Vertex * this marker interface. Edges may start at a `JunctionOutPort`.
type NextT */
def next: JunctionOutPort[NextT]
}
sealed trait JunctionOutPort[T] { sealed trait JunctionOutPort[T] {
def port: Int = FlowGraphInternal.UnlabeledPort private[akka] def port: Int = FlowGraphInternal.UnlabeledPort
def vertex: FlowGraphInternal.Vertex private[akka] def vertex: FlowGraphInternal.Vertex
}
/**
* INTERNAL API
*
* Fan-in and fan-out vertices in the [[FlowGraph]] implements
* this marker interface.
*/
private[akka] sealed trait Junction[T] extends JunctionInPort[T] with JunctionOutPort[T] {
override private[akka] def port: Int = FlowGraphInternal.UnlabeledPort
override private[akka] def vertex: FlowGraphInternal.Vertex
override type NextT = T
override private[akka] def next = this
} }
object Merge { object Merge {
@ -58,7 +69,7 @@ object Merge {
* and one output flow/sink to the `Merge` vertex. * and one output flow/sink to the `Merge` vertex.
*/ */
final class Merge[T](override val name: Option[String]) extends FlowGraphInternal.InternalVertex with Junction[T] { final class Merge[T](override val name: Option[String]) extends FlowGraphInternal.InternalVertex with Junction[T] {
override val vertex = this override private[akka] val vertex = this
override val minimumInputCount: Int = 2 override val minimumInputCount: Int = 2
override val maximumInputCount: Int = Int.MaxValue override val maximumInputCount: Int = Int.MaxValue
override val minimumOutputCount: Int = 1 override val minimumOutputCount: Int = 1
@ -87,11 +98,11 @@ object Broadcast {
* two downstream subscribers have been established. * two downstream subscribers have been established.
*/ */
final class Broadcast[T](override val name: Option[String]) extends FlowGraphInternal.InternalVertex with Junction[T] { final class Broadcast[T](override val name: Option[String]) extends FlowGraphInternal.InternalVertex with Junction[T] {
override val vertex = this override private[akka] def vertex = this
override val minimumInputCount: Int = 1 override def minimumInputCount: Int = 1
override val maximumInputCount: Int = 1 override def maximumInputCount: Int = 1
override val minimumOutputCount: Int = 2 override def minimumOutputCount: Int = 2
override val maximumOutputCount: Int = Int.MaxValue override def maximumOutputCount: Int = Int.MaxValue
} }
object Zip { object Zip {
@ -111,17 +122,17 @@ object Zip {
*/ */
def apply[A, B](name: String): Zip[A, B] = new Zip[A, B](Some(name)) def apply[A, B](name: String): Zip[A, B] = new Zip[A, B](Some(name))
class Left[A, B] private[akka] (val vertex: Zip[A, B]) extends JunctionInPort[A] { class Left[A, B] private[akka] (private[akka] val vertex: Zip[A, B]) extends JunctionInPort[A] {
override val port = 0 override private[akka] def port = 0
type NextT = (A, B) type NextT = (A, B)
override def next = vertex.out override private[akka] def next = vertex.out
} }
class Right[A, B] private[akka] (val vertex: Zip[A, B]) extends JunctionInPort[B] { class Right[A, B] private[akka] (private[akka] val vertex: Zip[A, B]) extends JunctionInPort[B] {
override val port = 1 override private[akka] def port = 1
type NextT = (A, B) type NextT = (A, B)
override def next = vertex.out override private[akka] def next = vertex.out
} }
class Out[A, B] private[akka] (val vertex: Zip[A, B]) extends JunctionOutPort[(A, B)] class Out[A, B] private[akka] (private[akka] val vertex: Zip[A, B]) extends JunctionOutPort[(A, B)]
} }
/** /**
@ -134,10 +145,10 @@ final class Zip[A, B](override val name: Option[String]) extends FlowGraphIntern
val right = new Zip.Right(this) val right = new Zip.Right(this)
val out = new Zip.Out(this) val out = new Zip.Out(this)
override val minimumInputCount: Int = 2 override def minimumInputCount: Int = 2
override val maximumInputCount: Int = 2 override def maximumInputCount: Int = 2
override val minimumOutputCount: Int = 1 override def minimumOutputCount: Int = 1
override val maximumOutputCount: Int = 1 override def maximumOutputCount: Int = 1
} }
object UndefinedSink { object UndefinedSink {
@ -162,10 +173,10 @@ object UndefinedSink {
* be replaced with [[FlowGraphBuilder#attachSink]]. * be replaced with [[FlowGraphBuilder#attachSink]].
*/ */
final class UndefinedSink[T](override val name: Option[String]) extends FlowGraphInternal.InternalVertex { final class UndefinedSink[T](override val name: Option[String]) extends FlowGraphInternal.InternalVertex {
override val minimumInputCount: Int = 1 override def minimumInputCount: Int = 1
override val maximumInputCount: Int = 1 override def maximumInputCount: Int = 1
override val minimumOutputCount: Int = 0 override def minimumOutputCount: Int = 0
override val maximumOutputCount: Int = 0 override def maximumOutputCount: Int = 0
} }
object UndefinedSource { object UndefinedSource {
@ -190,10 +201,10 @@ object UndefinedSource {
* be replaced with [[FlowGraphBuilder#attachSource]]. * be replaced with [[FlowGraphBuilder#attachSource]].
*/ */
final class UndefinedSource[T](override val name: Option[String]) extends FlowGraphInternal.InternalVertex { final class UndefinedSource[T](override val name: Option[String]) extends FlowGraphInternal.InternalVertex {
override val minimumInputCount: Int = 0 override def minimumInputCount: Int = 0
override val maximumInputCount: Int = 0 override def maximumInputCount: Int = 0
override val minimumOutputCount: Int = 1 override def minimumOutputCount: Int = 1
override val maximumOutputCount: Int = 1 override def maximumOutputCount: Int = 1
} }
/** /**
@ -201,7 +212,7 @@ final class UndefinedSource[T](override val name: Option[String]) extends FlowGr
*/ */
private[akka] object FlowGraphInternal { private[akka] object FlowGraphInternal {
val UnlabeledPort = -1 def UnlabeledPort = -1
sealed trait Vertex sealed trait Vertex
case class SourceVertex(source: Source[_]) extends Vertex { case class SourceVertex(source: Source[_]) extends Vertex {
@ -369,7 +380,7 @@ class FlowGraphBuilder private (graph: Graph[FlowGraphInternal.Vertex, LkDiEdge]
case Some(node) case Some(node)
require( require(
(node.inDegree + 1) <= iv.maximumInputCount, (node.inDegree + 1) <= iv.maximumInputCount,
s"${node.value}) must have at most ${iv.maximumInputCount} incoming edges") s"${node.value} must have at most ${iv.maximumInputCount} incoming edges")
case _ // ok case _ // ok
} }
case _ // ok, no checks here case _ // ok, no checks here
@ -383,7 +394,7 @@ class FlowGraphBuilder private (graph: Graph[FlowGraphInternal.Vertex, LkDiEdge]
case Some(node) case Some(node)
require( require(
(node.outDegree + 1) <= iv.maximumOutputCount, (node.outDegree + 1) <= iv.maximumOutputCount,
s"${node.value}) must have at most ${iv.maximumOutputCount} outgoing edges") s"${node.value} must have at most ${iv.maximumOutputCount} outgoing edges")
case _ // ok case _ // ok
} }
case _ // ok, no checks here case _ // ok, no checks here

View file

@ -89,7 +89,7 @@ class FlowGraphCompileSpec extends AkkaSpec {
val merge = Merge[String] val merge = Merge[String]
val bcast1 = Broadcast[String] val bcast1 = Broadcast[String]
val bcast2 = Broadcast[String] val bcast2 = Broadcast[String]
val feedbackLoopBuffer = FlowFrom[String].buffer(10, OverflowStrategy.DropBuffer) val feedbackLoopBuffer = FlowFrom[String].buffer(10, OverflowStrategy.dropBuffer)
b. b.
addEdge(in1, f1, merge). addEdge(in1, f1, merge).
addEdge(merge, f2, bcast1). addEdge(merge, f2, bcast1).
@ -108,7 +108,7 @@ class FlowGraphCompileSpec extends AkkaSpec {
val merge = Merge[String] val merge = Merge[String]
val bcast1 = Broadcast[String] val bcast1 = Broadcast[String]
val bcast2 = Broadcast[String] val bcast2 = Broadcast[String]
val feedbackLoopBuffer = FlowFrom[String].buffer(10, OverflowStrategy.DropBuffer) val feedbackLoopBuffer = FlowFrom[String].buffer(10, OverflowStrategy.dropBuffer)
import FlowGraphImplicits._ import FlowGraphImplicits._
in1 ~> f1 ~> merge ~> f2 ~> bcast1 ~> f3 ~> out1 in1 ~> f1 ~> merge ~> f2 ~> bcast1 ~> f3 ~> out1
bcast1 ~> feedbackLoopBuffer ~> bcast2 ~> f5 ~> merge bcast1 ~> feedbackLoopBuffer ~> bcast2 ~> f5 ~> merge