From 703fb7891b88522709a32b65991ce08af8a0eb0d Mon Sep 17 00:00:00 2001 From: Roland Kuhn Date: Thu, 26 Feb 2015 23:41:35 +0100 Subject: [PATCH] add Java graph DSL --- .../scala/akka/stream/javadsl/Graph.scala | 46 ++++++++++++++++++- .../scala/akka/stream/scaladsl/Graph.scala | 4 +- .../akka/stream/scaladsl/JavaConverters.scala | 2 +- 3 files changed, 47 insertions(+), 5 deletions(-) diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Graph.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Graph.scala index 7486f7efcd..63cd51d487 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Graph.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Graph.scala @@ -278,9 +278,11 @@ object FlowGraph { * The [[FlowGraphBuilder]] is mutable and not thread-safe, * thus you should construct your Graph and then share the constructed immutable [[FlowGraph]]. */ - def builder(): Builder = new Builder(new scaladsl.FlowGraph.Builder) + def builder(): Builder = new Builder()(new scaladsl.FlowGraph.Builder) + + class Builder()(private implicit val delegate: scaladsl.FlowGraph.Builder) { self ⇒ + import scaladsl.FlowGraph.Implicits._ - class Builder(delegate: scaladsl.FlowGraph.Builder) { def flow[A, B, M](from: Outlet[A], via: Flow[A, B, M], to: Inlet[B]): Unit = delegate.addEdge(from, via.asScala, to) def edge[T](from: Outlet[T], to: Inlet[T]): Unit = delegate.addEdge(from, to) @@ -297,5 +299,45 @@ object FlowGraph { def sink[T](sink: Sink[T, _]): Inlet[T] = delegate.add(sink.asScala) def run(mat: FlowMaterializer): Unit = delegate.buildRunnable().run()(mat) + + def from[T](out: Outlet[T]): ForwardOps[T] = new ForwardOps(out) + def from[T, M](src: Source[T, M]): ForwardOps[T] = new ForwardOps(delegate.add(src.asScala)) + def from[T](src: SourceShape[T]): ForwardOps[T] = new ForwardOps(src.outlet) + def from[I, O](f: FlowShape[I, O]): ForwardOps[O] = new ForwardOps(f.outlet) + def from[I, O](j: UniformFanInShape[I, O]): ForwardOps[O] = new ForwardOps(j.out) + def from[I, O](j: UniformFanOutShape[I, O]): ForwardOps[O] = new ForwardOps(findOut(delegate, j, 0)) + + def to[T](in: Inlet[T]): ReverseOps[T] = new ReverseOps(in) + def to[T, M](dst: Sink[T, M]): ReverseOps[T] = new ReverseOps(delegate.add(dst.asScala)) + def to[T](dst: SinkShape[T]): ReverseOps[T] = new ReverseOps(dst.inlet) + def to[I, O](f: FlowShape[I, O]): ReverseOps[I] = new ReverseOps(f.inlet) + def to[I, O](j: UniformFanInShape[I, O]): ReverseOps[I] = new ReverseOps(findIn(delegate, j, 0)) + def to[I, O](j: UniformFanOutShape[I, O]): ReverseOps[I] = new ReverseOps(j.in) + + class ForwardOps[T](out: Outlet[T]) { + def to(in: Inlet[T]): Builder = { out ~> in; self } + def to[M](dst: Sink[T, M]): Builder = { out ~> dst.asScala; self } + def to(dst: SinkShape[T]): Builder = { out ~> dst; self } + def to[U](f: FlowShape[T, U]): Builder = { out ~> f; self } + def to[U](j: UniformFanInShape[T, U]): Builder = { out ~> j; self } + def to[U](j: UniformFanOutShape[T, U]): Builder = { out ~> j; self } + def via[U, M](f: Flow[T, U, M]): ForwardOps[U] = from((out ~> f.asScala).outlet) + def via[U](f: FlowShape[T, U]): ForwardOps[U] = from((out ~> f).outlet) + def via[U](j: UniformFanInShape[T, U]): ForwardOps[U] = from((out ~> j).outlet) + def via[U](j: UniformFanOutShape[T, U]): ForwardOps[U] = from((out ~> j).outlet) + } + + class ReverseOps[T](out: Inlet[T]) { + def from(dst: Outlet[T]): Builder = { out <~ dst; self } + def from[M](dst: Source[T, M]): Builder = { out <~ dst.asScala; self } + def from(dst: SourceShape[T]): Builder = { out <~ dst; self } + def from[U](f: FlowShape[U, T]): Builder = { out <~ f; self } + def from[U](j: UniformFanInShape[U, T]): Builder = { out <~ j; self } + def from[U](j: UniformFanOutShape[U, T]): Builder = { out <~ j; self } + def via[U, M](f: Flow[U, T, M]): ReverseOps[U] = to((out <~ f.asScala).inlet) + def via[U](f: FlowShape[U, T]): ReverseOps[U] = to((out <~ f).inlet) + def via[U](j: UniformFanInShape[U, T]): ReverseOps[U] = to((out <~ j).inlet) + def via[U](j: UniformFanOutShape[U, T]): ReverseOps[U] = to((out <~ j).inlet) + } } } diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Graph.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Graph.scala index 408d5b1787..fa456f4d9b 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Graph.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Graph.scala @@ -271,7 +271,7 @@ object FlowGraph extends GraphApply { object Implicits { @tailrec - private def findOut[I, O](b: Builder, junction: UniformFanOutShape[I, O], n: Int): Outlet[O] = { + private[stream] def findOut[I, O](b: Builder, junction: UniformFanOutShape[I, O], n: Int): Outlet[O] = { if (n == junction.outArray.length) throw new IllegalArgumentException(s"no more outlets free on $junction") else if (b.module.downstreams.contains(junction.out(n))) findOut(b, junction, n + 1) @@ -279,7 +279,7 @@ object FlowGraph extends GraphApply { } @tailrec - private def findIn[I, O](b: Builder, junction: UniformFanInShape[I, O], n: Int): Inlet[I] = { + private[stream] def findIn[I, O](b: Builder, junction: UniformFanInShape[I, O], n: Int): Inlet[I] = { if (n == junction.inArray.length) throw new IllegalArgumentException(s"no more inlets free on $junction") else if (b.module.upstreams.contains(junction.in(n))) findIn(b, junction, n + 1) diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/JavaConverters.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/JavaConverters.scala index 16c8447cd6..7aab257926 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/JavaConverters.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/JavaConverters.scala @@ -21,7 +21,7 @@ private[akka] object JavaConverters { def asJava: javadsl.Sink[In, Mat] = new javadsl.Sink(sink) } implicit final class AsAsJavaFlowGraphBuilder[Out](val builder: scaladsl.FlowGraph.Builder) extends AnyVal { - def asJava: javadsl.FlowGraph.Builder = new javadsl.FlowGraph.Builder(builder) + def asJava: javadsl.FlowGraph.Builder = new javadsl.FlowGraph.Builder()(builder) } implicit final class AddAsScalaSource[Out, Mat](val source: javadsl.Source[Out, Mat]) extends AnyVal {