diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl2/FlowGraph.scala b/akka-stream/src/main/scala/akka/stream/scaladsl2/FlowGraph.scala index 4afede1fb2..eb7aaee529 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl2/FlowGraph.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl2/FlowGraph.scala @@ -14,24 +14,35 @@ import akka.stream.impl2.Ast /** * Fan-in and fan-out vertices in the [[FlowGraph]] implements - * this marker interface. + * this marker interface. Edges may end at a `JunctionInPort`. */ -sealed trait Junction[T] extends JunctionInPort[T] with JunctionOutPort[T] { - override def port: Int = FlowGraphInternal.UnlabeledPort - override def vertex: FlowGraphInternal.Vertex - type NextT = T - override def next = this +sealed trait JunctionInPort[T] { + private[akka] def port: Int = FlowGraphInternal.UnlabeledPort + private[akka] def vertex: FlowGraphInternal.Vertex + type NextT + private[akka] def next: JunctionOutPort[NextT] } -sealed trait JunctionInPort[T] { - def port: Int = FlowGraphInternal.UnlabeledPort - def vertex: FlowGraphInternal.Vertex - type NextT - def next: JunctionOutPort[NextT] -} +/** + * Fan-in and fan-out vertices in the [[FlowGraph]] implements + * this marker interface. Edges may start at a `JunctionOutPort`. + */ sealed trait JunctionOutPort[T] { - def port: Int = FlowGraphInternal.UnlabeledPort - def vertex: FlowGraphInternal.Vertex + private[akka] def port: Int = FlowGraphInternal.UnlabeledPort + private[akka] def vertex: FlowGraphInternal.Vertex +} + +/** + * INTERNAL API + * + * Fan-in and fan-out vertices in the [[FlowGraph]] implements + * this marker interface. + */ +private[akka] sealed trait Junction[T] extends JunctionInPort[T] with JunctionOutPort[T] { + override private[akka] def port: Int = FlowGraphInternal.UnlabeledPort + override private[akka] def vertex: FlowGraphInternal.Vertex + override type NextT = T + override private[akka] def next = this } object Merge { @@ -58,7 +69,7 @@ object Merge { * and one output flow/sink to the `Merge` vertex. */ final class Merge[T](override val name: Option[String]) extends FlowGraphInternal.InternalVertex with Junction[T] { - override val vertex = this + override private[akka] val vertex = this override val minimumInputCount: Int = 2 override val maximumInputCount: Int = Int.MaxValue override val minimumOutputCount: Int = 1 @@ -87,11 +98,11 @@ object Broadcast { * two downstream subscribers have been established. */ final class Broadcast[T](override val name: Option[String]) extends FlowGraphInternal.InternalVertex with Junction[T] { - override val vertex = this - override val minimumInputCount: Int = 1 - override val maximumInputCount: Int = 1 - override val minimumOutputCount: Int = 2 - override val maximumOutputCount: Int = Int.MaxValue + override private[akka] def vertex = this + override def minimumInputCount: Int = 1 + override def maximumInputCount: Int = 1 + override def minimumOutputCount: Int = 2 + override def maximumOutputCount: Int = Int.MaxValue } object Zip { @@ -111,17 +122,17 @@ object Zip { */ def apply[A, B](name: String): Zip[A, B] = new Zip[A, B](Some(name)) - class Left[A, B] private[akka] (val vertex: Zip[A, B]) extends JunctionInPort[A] { - override val port = 0 + class Left[A, B] private[akka] (private[akka] val vertex: Zip[A, B]) extends JunctionInPort[A] { + override private[akka] def port = 0 type NextT = (A, B) - override def next = vertex.out + override private[akka] def next = vertex.out } - class Right[A, B] private[akka] (val vertex: Zip[A, B]) extends JunctionInPort[B] { - override val port = 1 + class Right[A, B] private[akka] (private[akka] val vertex: Zip[A, B]) extends JunctionInPort[B] { + override private[akka] def port = 1 type NextT = (A, B) - override def next = vertex.out + override private[akka] def next = vertex.out } - class Out[A, B] private[akka] (val vertex: Zip[A, B]) extends JunctionOutPort[(A, B)] + class Out[A, B] private[akka] (private[akka] val vertex: Zip[A, B]) extends JunctionOutPort[(A, B)] } /** @@ -134,10 +145,10 @@ final class Zip[A, B](override val name: Option[String]) extends FlowGraphIntern val right = new Zip.Right(this) val out = new Zip.Out(this) - override val minimumInputCount: Int = 2 - override val maximumInputCount: Int = 2 - override val minimumOutputCount: Int = 1 - override val maximumOutputCount: Int = 1 + override def minimumInputCount: Int = 2 + override def maximumInputCount: Int = 2 + override def minimumOutputCount: Int = 1 + override def maximumOutputCount: Int = 1 } object UndefinedSink { @@ -162,10 +173,10 @@ object UndefinedSink { * be replaced with [[FlowGraphBuilder#attachSink]]. */ final class UndefinedSink[T](override val name: Option[String]) extends FlowGraphInternal.InternalVertex { - override val minimumInputCount: Int = 1 - override val maximumInputCount: Int = 1 - override val minimumOutputCount: Int = 0 - override val maximumOutputCount: Int = 0 + override def minimumInputCount: Int = 1 + override def maximumInputCount: Int = 1 + override def minimumOutputCount: Int = 0 + override def maximumOutputCount: Int = 0 } object UndefinedSource { @@ -190,10 +201,10 @@ object UndefinedSource { * be replaced with [[FlowGraphBuilder#attachSource]]. */ final class UndefinedSource[T](override val name: Option[String]) extends FlowGraphInternal.InternalVertex { - override val minimumInputCount: Int = 0 - override val maximumInputCount: Int = 0 - override val minimumOutputCount: Int = 1 - override val maximumOutputCount: Int = 1 + override def minimumInputCount: Int = 0 + override def maximumInputCount: Int = 0 + override def minimumOutputCount: Int = 1 + override def maximumOutputCount: Int = 1 } /** @@ -201,7 +212,7 @@ final class UndefinedSource[T](override val name: Option[String]) extends FlowGr */ private[akka] object FlowGraphInternal { - val UnlabeledPort = -1 + def UnlabeledPort = -1 sealed trait Vertex case class SourceVertex(source: Source[_]) extends Vertex { @@ -369,7 +380,7 @@ class FlowGraphBuilder private (graph: Graph[FlowGraphInternal.Vertex, LkDiEdge] case Some(node) ⇒ require( (node.inDegree + 1) <= iv.maximumInputCount, - s"${node.value}) must have at most ${iv.maximumInputCount} incoming edges") + s"${node.value} must have at most ${iv.maximumInputCount} incoming edges") case _ ⇒ // ok } case _ ⇒ // ok, no checks here @@ -383,7 +394,7 @@ class FlowGraphBuilder private (graph: Graph[FlowGraphInternal.Vertex, LkDiEdge] case Some(node) ⇒ require( (node.outDegree + 1) <= iv.maximumOutputCount, - s"${node.value}) must have at most ${iv.maximumOutputCount} outgoing edges") + s"${node.value} must have at most ${iv.maximumOutputCount} outgoing edges") case _ ⇒ // ok } case _ ⇒ // ok, no checks here diff --git a/akka-stream/src/test/scala/akka/stream/scaladsl2/FlowGraphCompileSpec.scala b/akka-stream/src/test/scala/akka/stream/scaladsl2/FlowGraphCompileSpec.scala index 8185f47e6d..6c4e8e1cc4 100644 --- a/akka-stream/src/test/scala/akka/stream/scaladsl2/FlowGraphCompileSpec.scala +++ b/akka-stream/src/test/scala/akka/stream/scaladsl2/FlowGraphCompileSpec.scala @@ -89,7 +89,7 @@ class FlowGraphCompileSpec extends AkkaSpec { val merge = Merge[String] val bcast1 = Broadcast[String] val bcast2 = Broadcast[String] - val feedbackLoopBuffer = FlowFrom[String].buffer(10, OverflowStrategy.DropBuffer) + val feedbackLoopBuffer = FlowFrom[String].buffer(10, OverflowStrategy.dropBuffer) b. addEdge(in1, f1, merge). addEdge(merge, f2, bcast1). @@ -108,7 +108,7 @@ class FlowGraphCompileSpec extends AkkaSpec { val merge = Merge[String] val bcast1 = Broadcast[String] val bcast2 = Broadcast[String] - val feedbackLoopBuffer = FlowFrom[String].buffer(10, OverflowStrategy.DropBuffer) + val feedbackLoopBuffer = FlowFrom[String].buffer(10, OverflowStrategy.dropBuffer) import FlowGraphImplicits._ in1 ~> f1 ~> merge ~> f2 ~> bcast1 ~> f3 ~> out1 bcast1 ~> feedbackLoopBuffer ~> bcast2 ~> f5 ~> merge