add Java graph DSL
This commit is contained in:
parent
e870c7cbd2
commit
703fb7891b
3 changed files with 47 additions and 5 deletions
|
|
@ -278,9 +278,11 @@ object FlowGraph {
|
||||||
* The [[FlowGraphBuilder]] is mutable and not thread-safe,
|
* The [[FlowGraphBuilder]] is mutable and not thread-safe,
|
||||||
* thus you should construct your Graph and then share the constructed immutable [[FlowGraph]].
|
* thus you should construct your Graph and then share the constructed immutable [[FlowGraph]].
|
||||||
*/
|
*/
|
||||||
def builder(): Builder = new Builder(new scaladsl.FlowGraph.Builder)
|
def builder(): Builder = new Builder()(new scaladsl.FlowGraph.Builder)
|
||||||
|
|
||||||
|
class Builder()(private implicit val delegate: scaladsl.FlowGraph.Builder) { self ⇒
|
||||||
|
import scaladsl.FlowGraph.Implicits._
|
||||||
|
|
||||||
class Builder(delegate: scaladsl.FlowGraph.Builder) {
|
|
||||||
def flow[A, B, M](from: Outlet[A], via: Flow[A, B, M], to: Inlet[B]): Unit = delegate.addEdge(from, via.asScala, to)
|
def flow[A, B, M](from: Outlet[A], via: Flow[A, B, M], to: Inlet[B]): Unit = delegate.addEdge(from, via.asScala, to)
|
||||||
|
|
||||||
def edge[T](from: Outlet[T], to: Inlet[T]): Unit = delegate.addEdge(from, to)
|
def edge[T](from: Outlet[T], to: Inlet[T]): Unit = delegate.addEdge(from, to)
|
||||||
|
|
@ -297,5 +299,45 @@ object FlowGraph {
|
||||||
def sink[T](sink: Sink[T, _]): Inlet[T] = delegate.add(sink.asScala)
|
def sink[T](sink: Sink[T, _]): Inlet[T] = delegate.add(sink.asScala)
|
||||||
|
|
||||||
def run(mat: FlowMaterializer): Unit = delegate.buildRunnable().run()(mat)
|
def run(mat: FlowMaterializer): Unit = delegate.buildRunnable().run()(mat)
|
||||||
|
|
||||||
|
def from[T](out: Outlet[T]): ForwardOps[T] = new ForwardOps(out)
|
||||||
|
def from[T, M](src: Source[T, M]): ForwardOps[T] = new ForwardOps(delegate.add(src.asScala))
|
||||||
|
def from[T](src: SourceShape[T]): ForwardOps[T] = new ForwardOps(src.outlet)
|
||||||
|
def from[I, O](f: FlowShape[I, O]): ForwardOps[O] = new ForwardOps(f.outlet)
|
||||||
|
def from[I, O](j: UniformFanInShape[I, O]): ForwardOps[O] = new ForwardOps(j.out)
|
||||||
|
def from[I, O](j: UniformFanOutShape[I, O]): ForwardOps[O] = new ForwardOps(findOut(delegate, j, 0))
|
||||||
|
|
||||||
|
def to[T](in: Inlet[T]): ReverseOps[T] = new ReverseOps(in)
|
||||||
|
def to[T, M](dst: Sink[T, M]): ReverseOps[T] = new ReverseOps(delegate.add(dst.asScala))
|
||||||
|
def to[T](dst: SinkShape[T]): ReverseOps[T] = new ReverseOps(dst.inlet)
|
||||||
|
def to[I, O](f: FlowShape[I, O]): ReverseOps[I] = new ReverseOps(f.inlet)
|
||||||
|
def to[I, O](j: UniformFanInShape[I, O]): ReverseOps[I] = new ReverseOps(findIn(delegate, j, 0))
|
||||||
|
def to[I, O](j: UniformFanOutShape[I, O]): ReverseOps[I] = new ReverseOps(j.in)
|
||||||
|
|
||||||
|
class ForwardOps[T](out: Outlet[T]) {
|
||||||
|
def to(in: Inlet[T]): Builder = { out ~> in; self }
|
||||||
|
def to[M](dst: Sink[T, M]): Builder = { out ~> dst.asScala; self }
|
||||||
|
def to(dst: SinkShape[T]): Builder = { out ~> dst; self }
|
||||||
|
def to[U](f: FlowShape[T, U]): Builder = { out ~> f; self }
|
||||||
|
def to[U](j: UniformFanInShape[T, U]): Builder = { out ~> j; self }
|
||||||
|
def to[U](j: UniformFanOutShape[T, U]): Builder = { out ~> j; self }
|
||||||
|
def via[U, M](f: Flow[T, U, M]): ForwardOps[U] = from((out ~> f.asScala).outlet)
|
||||||
|
def via[U](f: FlowShape[T, U]): ForwardOps[U] = from((out ~> f).outlet)
|
||||||
|
def via[U](j: UniformFanInShape[T, U]): ForwardOps[U] = from((out ~> j).outlet)
|
||||||
|
def via[U](j: UniformFanOutShape[T, U]): ForwardOps[U] = from((out ~> j).outlet)
|
||||||
|
}
|
||||||
|
|
||||||
|
class ReverseOps[T](out: Inlet[T]) {
|
||||||
|
def from(dst: Outlet[T]): Builder = { out <~ dst; self }
|
||||||
|
def from[M](dst: Source[T, M]): Builder = { out <~ dst.asScala; self }
|
||||||
|
def from(dst: SourceShape[T]): Builder = { out <~ dst; self }
|
||||||
|
def from[U](f: FlowShape[U, T]): Builder = { out <~ f; self }
|
||||||
|
def from[U](j: UniformFanInShape[U, T]): Builder = { out <~ j; self }
|
||||||
|
def from[U](j: UniformFanOutShape[U, T]): Builder = { out <~ j; self }
|
||||||
|
def via[U, M](f: Flow[U, T, M]): ReverseOps[U] = to((out <~ f.asScala).inlet)
|
||||||
|
def via[U](f: FlowShape[U, T]): ReverseOps[U] = to((out <~ f).inlet)
|
||||||
|
def via[U](j: UniformFanInShape[U, T]): ReverseOps[U] = to((out <~ j).inlet)
|
||||||
|
def via[U](j: UniformFanOutShape[U, T]): ReverseOps[U] = to((out <~ j).inlet)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -271,7 +271,7 @@ object FlowGraph extends GraphApply {
|
||||||
object Implicits {
|
object Implicits {
|
||||||
|
|
||||||
@tailrec
|
@tailrec
|
||||||
private def findOut[I, O](b: Builder, junction: UniformFanOutShape[I, O], n: Int): Outlet[O] = {
|
private[stream] def findOut[I, O](b: Builder, junction: UniformFanOutShape[I, O], n: Int): Outlet[O] = {
|
||||||
if (n == junction.outArray.length)
|
if (n == junction.outArray.length)
|
||||||
throw new IllegalArgumentException(s"no more outlets free on $junction")
|
throw new IllegalArgumentException(s"no more outlets free on $junction")
|
||||||
else if (b.module.downstreams.contains(junction.out(n))) findOut(b, junction, n + 1)
|
else if (b.module.downstreams.contains(junction.out(n))) findOut(b, junction, n + 1)
|
||||||
|
|
@ -279,7 +279,7 @@ object FlowGraph extends GraphApply {
|
||||||
}
|
}
|
||||||
|
|
||||||
@tailrec
|
@tailrec
|
||||||
private def findIn[I, O](b: Builder, junction: UniformFanInShape[I, O], n: Int): Inlet[I] = {
|
private[stream] def findIn[I, O](b: Builder, junction: UniformFanInShape[I, O], n: Int): Inlet[I] = {
|
||||||
if (n == junction.inArray.length)
|
if (n == junction.inArray.length)
|
||||||
throw new IllegalArgumentException(s"no more inlets free on $junction")
|
throw new IllegalArgumentException(s"no more inlets free on $junction")
|
||||||
else if (b.module.upstreams.contains(junction.in(n))) findIn(b, junction, n + 1)
|
else if (b.module.upstreams.contains(junction.in(n))) findIn(b, junction, n + 1)
|
||||||
|
|
|
||||||
|
|
@ -21,7 +21,7 @@ private[akka] object JavaConverters {
|
||||||
def asJava: javadsl.Sink[In, Mat] = new javadsl.Sink(sink)
|
def asJava: javadsl.Sink[In, Mat] = new javadsl.Sink(sink)
|
||||||
}
|
}
|
||||||
implicit final class AsAsJavaFlowGraphBuilder[Out](val builder: scaladsl.FlowGraph.Builder) extends AnyVal {
|
implicit final class AsAsJavaFlowGraphBuilder[Out](val builder: scaladsl.FlowGraph.Builder) extends AnyVal {
|
||||||
def asJava: javadsl.FlowGraph.Builder = new javadsl.FlowGraph.Builder(builder)
|
def asJava: javadsl.FlowGraph.Builder = new javadsl.FlowGraph.Builder()(builder)
|
||||||
}
|
}
|
||||||
|
|
||||||
implicit final class AddAsScalaSource[Out, Mat](val source: javadsl.Source[Out, Mat]) extends AnyVal {
|
implicit final class AddAsScalaSource[Out, Mat](val source: javadsl.Source[Out, Mat]) extends AnyVal {
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue