!str #18692 javadsl.FlowGraph.Builder.add()

* also make factories more consistent by only offering
  FlowGraph.create()
* also remove secondary (edge-based) FlowGraph.Builder DSL
* also improve naming for conversions from Graph to
  Source/Flow/BidiFlow/Sink
This commit is contained in:
Viktor Klang 2015-10-21 22:45:39 +02:00 committed by Roland Kuhn
parent 0f99a42df9
commit f29d7affbd
120 changed files with 1535 additions and 1897 deletions

View file

@ -476,9 +476,6 @@ object ZipWith extends ZipWithApply
* '''Cancels when''' any downstream cancels
*/
object Unzip {
private final val _identity: Any Any = a a
/**
* Create a new `Unzip`.
*/
@ -511,9 +508,7 @@ object Concat {
/**
* Create a new `Concat`.
*/
def apply[T](inputCount: Int = 2): Concat[T] = {
new Concat(inputCount)
}
def apply[T](inputCount: Int = 2): Concat[T] = new Concat(inputCount)
}
/**
@ -567,7 +562,6 @@ class Concat[T](inputCount: Int) extends GraphStage[UniformFanInShape[T, T]] {
setHandler(out, new OutHandler {
override def onPull() = pull(in(activeStream))
})
}
}
@ -576,7 +570,7 @@ object FlowGraph extends GraphApply {
class Builder[+M] private[stream] () {
private var moduleInProgress: Module = EmptyModule
def addEdge[A1, A >: A1, B, B1 >: B, M2](from: Outlet[A1], via: Graph[FlowShape[A, B], M2], to: Inlet[B1]): Unit = {
private[FlowGraph] def addEdge[A1, A >: A1, B, B1 >: B, M2](from: Outlet[A1], via: Graph[FlowShape[A, B], M2], to: Inlet[B1]): Unit = {
val flowCopy = via.module.carbonCopy
moduleInProgress =
moduleInProgress
@ -585,9 +579,8 @@ object FlowGraph extends GraphApply {
.wire(flowCopy.shape.outlets.head, to)
}
def addEdge[T, U >: T](from: Outlet[T], to: Inlet[U]): Unit = {
private[FlowGraph] def addEdge[T, U >: T](from: Outlet[T], to: Inlet[U]): Unit =
moduleInProgress = moduleInProgress.wire(from, to)
}
/**
* Import a graph into this module, performing a deep copy, discarding its
@ -657,64 +650,10 @@ object FlowGraph extends GraphApply {
.wire(port, op.inPort)
}
private[stream] def buildRunnable[Mat](): RunnableGraph[Mat] = {
if (!moduleInProgress.isRunnable) {
throw new IllegalArgumentException(
"Cannot build the RunnableGraph because there are unconnected ports: " +
(moduleInProgress.outPorts ++ moduleInProgress.inPorts).mkString(", "))
}
new RunnableGraph(moduleInProgress.nest())
}
private[stream] def buildSource[T, Mat](outlet: Outlet[T]): Source[T, Mat] = {
if (moduleInProgress.isRunnable)
throw new IllegalArgumentException("Cannot build the Source since no ports remain open")
if (!moduleInProgress.isSource)
throw new IllegalArgumentException(
s"Cannot build Source with open inputs (${moduleInProgress.inPorts.mkString(",")}) and outputs (${moduleInProgress.outPorts.mkString(",")})")
if (moduleInProgress.outPorts.head != outlet)
throw new IllegalArgumentException(s"provided Outlet $outlet does not equal the modules open Outlet ${moduleInProgress.outPorts.head}")
new Source(moduleInProgress.replaceShape(SourceShape(outlet)).nest())
}
private[stream] def buildFlow[In, Out, Mat](inlet: Inlet[In], outlet: Outlet[Out]): Flow[In, Out, Mat] = {
if (!moduleInProgress.isFlow)
throw new IllegalArgumentException(
s"Cannot build Flow with open inputs (${moduleInProgress.inPorts.mkString(",")}) and outputs (${moduleInProgress.outPorts.mkString(",")})")
if (moduleInProgress.outPorts.head != outlet)
throw new IllegalArgumentException(s"provided Outlet $outlet does not equal the modules open Outlet ${moduleInProgress.outPorts.head}")
if (moduleInProgress.inPorts.head != inlet)
throw new IllegalArgumentException(s"provided Inlet $inlet does not equal the modules open Inlet ${moduleInProgress.inPorts.head}")
new Flow(moduleInProgress.replaceShape(FlowShape(inlet, outlet)).nest())
}
private[stream] def buildBidiFlow[I1, O1, I2, O2, Mat](shape: BidiShape[I1, O1, I2, O2]): BidiFlow[I1, O1, I2, O2, Mat] = {
if (!moduleInProgress.isBidiFlow)
throw new IllegalArgumentException(
s"Cannot build BidiFlow with open inputs (${moduleInProgress.inPorts.mkString(",")}) and outputs (${moduleInProgress.outPorts.mkString(",")})")
if (moduleInProgress.outPorts.toSet != shape.outlets.toSet)
throw new IllegalArgumentException(s"provided Outlets [${shape.outlets.mkString(",")}] does not equal the modules open Outlets [${moduleInProgress.outPorts.mkString(",")}]")
if (moduleInProgress.inPorts.toSet != shape.inlets.toSet)
throw new IllegalArgumentException(s"provided Inlets [${shape.inlets.mkString(",")}] does not equal the modules open Inlets [${moduleInProgress.inPorts.mkString(",")}]")
new BidiFlow(moduleInProgress.replaceShape(shape).nest())
}
private[stream] def buildSink[T, Mat](inlet: Inlet[T]): Sink[T, Mat] = {
if (moduleInProgress.isRunnable)
throw new IllegalArgumentException("Cannot build the Sink since no ports remain open")
if (!moduleInProgress.isSink)
throw new IllegalArgumentException(
s"Cannot build Sink with open inputs (${moduleInProgress.inPorts.mkString(",")}) and outputs (${moduleInProgress.outPorts.mkString(",")})")
if (moduleInProgress.inPorts.head != inlet)
throw new IllegalArgumentException(s"provided Inlet $inlet does not equal the modules open Inlet ${moduleInProgress.inPorts.head}")
new Sink(moduleInProgress.replaceShape(SinkShape(inlet)).nest())
}
private[stream] def module: Module = moduleInProgress
/** Converts this Scala DSL element to it's Java DSL counterpart. */
def asJava: javadsl.FlowGraph.Builder[M] = new javadsl.FlowGraph.Builder()(this)
}
object Implicits {
@ -735,12 +674,11 @@ object FlowGraph extends GraphApply {
else junction.in(n)
}
trait CombinerBase[T] extends Any {
sealed trait CombinerBase[T] extends Any {
def importAndGetPort(b: Builder[_]): Outlet[T]
def ~>[U >: T](to: Inlet[U])(implicit b: Builder[_]): Unit = {
def ~>[U >: T](to: Inlet[U])(implicit b: Builder[_]): Unit =
b.addEdge(importAndGetPort(b), to)
}
def ~>[Out](via: Graph[FlowShape[T, Out], Any])(implicit b: Builder[_]): PortOps[Out, Unit] = {
val s = b.add(via)
@ -772,21 +710,18 @@ object FlowGraph extends GraphApply {
flow.outlet
}
def ~>(to: Graph[SinkShape[T], _])(implicit b: Builder[_]): Unit = {
def ~>(to: Graph[SinkShape[T], _])(implicit b: Builder[_]): Unit =
b.addEdge(importAndGetPort(b), b.add(to).inlet)
}
def ~>(to: SinkShape[T])(implicit b: Builder[_]): Unit = {
def ~>(to: SinkShape[T])(implicit b: Builder[_]): Unit =
b.addEdge(importAndGetPort(b), to.inlet)
}
}
trait ReverseCombinerBase[T] extends Any {
sealed trait ReverseCombinerBase[T] extends Any {
def importAndGetPortReverse(b: Builder[_]): Inlet[T]
def <~[U <: T](from: Outlet[U])(implicit b: Builder[_]): Unit = {
def <~[U <: T](from: Outlet[U])(implicit b: Builder[_]): Unit =
b.addEdge(from, importAndGetPortReverse(b))
}
def <~[In](via: Graph[FlowShape[In, T], _])(implicit b: Builder[_]): ReversePortOps[In] = {
val s = b.add(via)
@ -818,13 +753,11 @@ object FlowGraph extends GraphApply {
flow.inlet
}
def <~(from: Graph[SourceShape[T], _])(implicit b: Builder[_]): Unit = {
def <~(from: Graph[SourceShape[T], _])(implicit b: Builder[_]): Unit =
b.addEdge(b.add(from).outlet, importAndGetPortReverse(b))
}
def <~(from: SourceShape[T])(implicit b: Builder[_]): Unit = {
def <~(from: SourceShape[T])(implicit b: Builder[_]): Unit =
b.addEdge(from.outlet, importAndGetPortReverse(b))
}
}
// Although Mat is always Unit, it cannot be removed as a type parameter, otherwise the "override type"
@ -855,7 +788,7 @@ object FlowGraph extends GraphApply {
throw new UnsupportedOperationException("Cannot use viaMat on a port")
}
class DisabledPortOps[Out, Mat](msg: String) extends PortOps[Out, Mat](null, null) {
final class DisabledPortOps[Out, Mat](msg: String) extends PortOps[Out, Mat](null, null) {
override def importAndGetPort(b: Builder[_]): Outlet[Out] = throw new IllegalArgumentException(msg)
override def viaMat[T, Mat2, Mat3](flow: Graph[FlowShape[Out, T], Mat2])(combine: (Mat, Mat2) Mat3) =
@ -866,28 +799,28 @@ object FlowGraph extends GraphApply {
override def importAndGetPortReverse(b: Builder[_]): Inlet[In] = inlet
}
class DisabledReversePortOps[In](msg: String) extends ReversePortOps[In](null) {
final class DisabledReversePortOps[In](msg: String) extends ReversePortOps[In](null) {
override def importAndGetPortReverse(b: Builder[_]): Inlet[In] = throw new IllegalArgumentException(msg)
}
implicit class FanInOps[In, Out](val j: UniformFanInShape[In, Out]) extends AnyVal with CombinerBase[Out] with ReverseCombinerBase[In] {
implicit final class FanInOps[In, Out](val j: UniformFanInShape[In, Out]) extends AnyVal with CombinerBase[Out] with ReverseCombinerBase[In] {
override def importAndGetPort(b: Builder[_]): Outlet[Out] = j.out
override def importAndGetPortReverse(b: Builder[_]): Inlet[In] = findIn(b, j, 0)
}
implicit class FanOutOps[In, Out](val j: UniformFanOutShape[In, Out]) extends AnyVal with ReverseCombinerBase[In] {
implicit final class FanOutOps[In, Out](val j: UniformFanOutShape[In, Out]) extends AnyVal with ReverseCombinerBase[In] {
override def importAndGetPortReverse(b: Builder[_]): Inlet[In] = j.in
}
implicit class SinkArrow[T](val s: Graph[SinkShape[T], _]) extends AnyVal with ReverseCombinerBase[T] {
implicit final class SinkArrow[T](val s: Graph[SinkShape[T], _]) extends AnyVal with ReverseCombinerBase[T] {
override def importAndGetPortReverse(b: Builder[_]): Inlet[T] = b.add(s).inlet
}
implicit class SinkShapeArrow[T](val s: SinkShape[T]) extends AnyVal with ReverseCombinerBase[T] {
implicit final class SinkShapeArrow[T](val s: SinkShape[T]) extends AnyVal with ReverseCombinerBase[T] {
override def importAndGetPortReverse(b: Builder[_]): Inlet[T] = s.inlet
}
implicit class FlowShapeArrow[I, O](val f: FlowShape[I, O]) extends AnyVal with ReverseCombinerBase[I] {
implicit final class FlowShapeArrow[I, O](val f: FlowShape[I, O]) extends AnyVal with ReverseCombinerBase[I] {
override def importAndGetPortReverse(b: Builder[_]): Inlet[I] = f.inlet
def <~>[I2, O2, Mat](bidi: Graph[BidiShape[O, O2, I2, I], Mat])(implicit b: Builder[_]): BidiShape[O, O2, I2, I] = {
@ -910,7 +843,7 @@ object FlowGraph extends GraphApply {
}
}
implicit class FlowArrow[I, O, M](val f: Graph[FlowShape[I, O], M]) extends AnyVal {
implicit final class FlowArrow[I, O, M](val f: Graph[FlowShape[I, O], M]) extends AnyVal {
def <~>[I2, O2, Mat](bidi: Graph[BidiShape[O, O2, I2, I], Mat])(implicit b: Builder[_]): BidiShape[O, O2, I2, I] = {
val shape = b.add(bidi)
val flow = b.add(f)
@ -934,7 +867,7 @@ object FlowGraph extends GraphApply {
}
}
implicit class BidiFlowShapeArrow[I1, O1, I2, O2](val bidi: BidiShape[I1, O1, I2, O2]) extends AnyVal {
implicit final class BidiFlowShapeArrow[I1, O1, I2, O2](val bidi: BidiShape[I1, O1, I2, O2]) extends AnyVal {
def <~>[I3, O3](other: BidiShape[O1, O3, I3, I2])(implicit b: Builder[_]): BidiShape[O1, O3, I3, I2] = {
b.addEdge(bidi.out1, other.in1)
b.addEdge(other.out2, bidi.in2)
@ -971,11 +904,11 @@ object FlowGraph extends GraphApply {
implicit def flow2flow[I, O](f: FlowShape[I, O])(implicit b: Builder[_]): PortOps[O, Unit] =
new PortOps(f.outlet, b)
implicit class SourceArrow[T](val s: Graph[SourceShape[T], _]) extends AnyVal with CombinerBase[T] {
implicit final class SourceArrow[T](val s: Graph[SourceShape[T], _]) extends AnyVal with CombinerBase[T] {
override def importAndGetPort(b: Builder[_]): Outlet[T] = b.add(s).outlet
}
implicit class SourceShapeArrow[T](val s: SourceShape[T]) extends AnyVal with CombinerBase[T] {
implicit final class SourceShapeArrow[T](val s: SourceShape[T]) extends AnyVal with CombinerBase[T] {
override def importAndGetPort(b: Builder[_]): Outlet[T] = s.outlet
}
}