diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowMapAsyncUnorderedSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowMapAsyncUnorderedSpec.scala index ea29b43ac3..08d63abc51 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowMapAsyncUnorderedSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowMapAsyncUnorderedSpec.scala @@ -141,7 +141,7 @@ class FlowMapAsyncUnorderedSpec extends AkkaSpec { .withAttributes(supervisionStrategy(resumingDecider)) .runWith(TestSink.probe[Int]) .request(10) - .expectNext(1, 2, 4, 5) + .expectNextUnordered(1, 2, 4, 5) .expectComplete() } diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala index 88dd171ad1..fd64a13203 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala @@ -38,6 +38,14 @@ object Flow { * it so also in type. */ def wrap[I, O, M](g: Graph[FlowShape[I, O], M]): Flow[I, O, M] = new Flow(scaladsl.Flow.wrap(g)) + + /** + * Helper to create `Flow` from a pair of sink and source. + */ + def wrap[I, O, M1, M2, M]( + sink: Graph[SinkShape[I], M1], + source: Graph[SourceShape[O], M2], + combine: function.Function2[M1, M2, M]): Flow[I, O, M] = new Flow(scaladsl.Flow.wrap(sink, source)(combine.apply _)) } /** Create a `Flow` which can process elements of type `T`. */ @@ -59,38 +67,38 @@ class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Graph /** * Transform this [[Flow]] by appending the given processing steps. */ - def via[T, M](flow: javadsl.Flow[Out, T, M]): javadsl.Flow[In, T, Mat] = - new Flow(delegate.via(flow.asScala)) + def via[T, M](flow: Graph[FlowShape[Out, T], M]): javadsl.Flow[In, T, Mat] = + new Flow(delegate.via(flow)) /** * Transform this [[Flow]] by appending the given processing steps. */ - def via[T, M, M2](flow: javadsl.Flow[Out, T, M], combine: function.Function2[Mat, M, M2]): javadsl.Flow[In, T, M2] = - new Flow(delegate.viaMat(flow.asScala)(combinerToScala(combine))) + def via[T, M, M2](flow: Graph[FlowShape[Out, T], M], combine: function.Function2[Mat, M, M2]): javadsl.Flow[In, T, M2] = + new Flow(delegate.viaMat(flow)(combinerToScala(combine))) /** * Connect this [[Flow]] to a [[Sink]], concatenating the processing steps of both. */ - def to(sink: javadsl.Sink[Out, _]): javadsl.Sink[In, Mat] = - new Sink(delegate.to(sink.asScala)) + def to(sink: Graph[SinkShape[Out], _]): javadsl.Sink[In, Mat] = + new Sink(delegate.to(sink)) /** * Connect this [[Flow]] to a [[Sink]], concatenating the processing steps of both. */ - def to[M, M2](sink: javadsl.Sink[Out, M], combine: function.Function2[Mat, M, M2]): javadsl.Sink[In, M2] = - new Sink(delegate.toMat(sink.asScala)(combinerToScala(combine))) + def to[M, M2](sink: Graph[SinkShape[Out], M], combine: function.Function2[Mat, M, M2]): javadsl.Sink[In, M2] = + new Sink(delegate.toMat(sink)(combinerToScala(combine))) /** * Join this [[Flow]] to another [[Flow]], by cross connecting the inputs and outputs, creating a [[RunnableFlow]] */ - def join[M](flow: javadsl.Flow[Out, In, M]): javadsl.RunnableFlow[Mat] = - new RunnableFlowAdapter(delegate.join(flow.asScala)) + def join[M](flow: Graph[FlowShape[Out, In], M]): javadsl.RunnableFlow[Mat] = + new RunnableFlowAdapter(delegate.join(flow)) /** * Join this [[Flow]] to another [[Flow]], by cross connecting the inputs and outputs, creating a [[RunnableFlow]] */ - def join[M, M2](flow: javadsl.Flow[Out, In, M], combine: function.Function2[Mat, M, M2]): javadsl.RunnableFlow[M2] = - new RunnableFlowAdapter(delegate.joinMat(flow.asScala)(combinerToScala(combine))) + def join[M, M2](flow: Graph[FlowShape[Out, In], M], combine: function.Function2[Mat, M, M2]): javadsl.RunnableFlow[M2] = + new RunnableFlowAdapter(delegate.joinMat(flow)(combinerToScala(combine))) /** * Join this [[Flow]] to a [[BidiFlow]] to close off the “top” of the protocol stack: @@ -109,8 +117,8 @@ class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Graph * value of the current flow (ignoring the [[BidiFlow]]’s value), use * [[Flow#joinMat[I2* joinMat]] if a different strategy is needed. */ - def join[I2, O2, Mat2](bidi: BidiFlow[Out, O2, I2, In, Mat2]): Flow[I2, O2, Mat] = - new Flow(delegate.join(bidi.asScala)) + def join[I2, O2, Mat2](bidi: Graph[BidiShape[Out, O2, I2, In], Mat2]): Flow[I2, O2, Mat] = + new Flow(delegate.join(bidi)) /** * Join this [[Flow]] to a [[BidiFlow]] to close off the “top” of the protocol stack: @@ -128,8 +136,8 @@ class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Graph * The `combine` function is used to compose the materialized values of this flow and that * [[BidiFlow]] into the materialized value of the resulting [[Flow]]. */ - def join[I2, O2, Mat2, M](bidi: BidiFlow[Out, O2, I2, In, Mat2], combine: function.Function2[Mat, Mat2, M]): Flow[I2, O2, M] = - new Flow(delegate.joinMat(bidi.asScala)(combinerToScala(combine))) + def join[I2, O2, Mat2, M](bidi: Graph[BidiShape[Out, O2, I2, In], Mat2], combine: function.Function2[Mat, Mat2, M]): Flow[I2, O2, M] = + new Flow(delegate.joinMat(bidi)(combinerToScala(combine))) /** * Connect the `KeyedSource` to this `Flow` and then connect it to the `KeyedSink` and run it. @@ -140,8 +148,8 @@ class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Graph * @tparam T materialized type of given KeyedSource * @tparam U materialized type of given KeyedSink */ - def runWith[T, U](source: javadsl.Source[In, T], sink: javadsl.Sink[Out, U], materializer: FlowMaterializer): akka.japi.Pair[T, U] = { - val p = delegate.runWith(source.asScala, sink.asScala)(materializer) + def runWith[T, U](source: Graph[SourceShape[In], T], sink: Graph[SinkShape[Out], U], materializer: FlowMaterializer): akka.japi.Pair[T, U] = { + val p = delegate.runWith(source, sink)(materializer) akka.japi.Pair(p._1.asInstanceOf[T], p._2.asInstanceOf[U]) } @@ -585,8 +593,8 @@ class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Graph * Returns a new `Flow` that concatenates a secondary `Source` to this flow so that, * the first element emitted by the given ("second") source is emitted after the last element of this Flow. */ - def concat[M](second: javadsl.Source[Out @uncheckedVariance, M]): javadsl.Flow[In, Out, Mat @uncheckedVariance Pair M] = - new Flow(delegate.concat(second.asScala).mapMaterialized(p ⇒ Pair(p._1, p._2))) + def concat[M](second: Graph[SourceShape[Out @uncheckedVariance], M]): javadsl.Flow[In, Out, Mat @uncheckedVariance Pair M] = + new Flow(delegate.concat(second).mapMaterialized(p ⇒ Pair(p._1, p._2))) override def withAttributes(attr: OperationAttributes): javadsl.Flow[In, Out, Mat] = new Flow(delegate.withAttributes(attr)) diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Graph.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Graph.scala index bb7ed097dc..b0bd1caca0 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Graph.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Graph.scala @@ -271,7 +271,7 @@ object FlowGraph { final class Builder[+Mat]()(private implicit val delegate: scaladsl.FlowGraph.Builder[Mat]) { self ⇒ import akka.stream.scaladsl.FlowGraph.Implicits._ - def flow[A, B, M](from: Outlet[A], via: Flow[A, B, M], to: Inlet[B]): Unit = delegate.addEdge(from, via.asScala, to) + def flow[A, B, M](from: Outlet[A], via: Graph[FlowShape[A, B], M], to: Inlet[B]): Unit = delegate.addEdge(from, via, to) def edge[T](from: Outlet[T], to: Inlet[T]): Unit = delegate.addEdge(from, to) @@ -282,9 +282,9 @@ object FlowGraph { */ def graph[S <: Shape](graph: Graph[S, _]): S = delegate.add(graph) - def source[T](source: Source[T, _]): Outlet[T] = delegate.add(source.asScala) + def source[T](source: Graph[SourceShape[T], _]): Outlet[T] = delegate.add(source).outlet - def sink[T](sink: Sink[T, _]): Inlet[T] = delegate.add(sink.asScala) + def sink[T](sink: Graph[SinkShape[T], _]): Inlet[T] = delegate.add(sink).inlet /** * Returns an [[Outlet]] that gives access to the materialized value of this graph. Once the graph is materialized @@ -305,14 +305,14 @@ object FlowGraph { def run(mat: FlowMaterializer): Unit = delegate.buildRunnable().run()(mat) def from[T](out: Outlet[T]): ForwardOps[T] = new ForwardOps(out) - def from[T, M](src: Source[T, M]): ForwardOps[T] = new ForwardOps(delegate.add(src.asScala)) + def from[T, M](src: Graph[SourceShape[T], M]): ForwardOps[T] = new ForwardOps(delegate.add(src).outlet) def from[T](src: SourceShape[T]): ForwardOps[T] = new ForwardOps(src.outlet) def from[I, O](f: FlowShape[I, O]): ForwardOps[O] = new ForwardOps(f.outlet) def from[I, O](j: UniformFanInShape[I, O]): ForwardOps[O] = new ForwardOps(j.out) def from[I, O](j: UniformFanOutShape[I, O]): ForwardOps[O] = new ForwardOps(findOut(delegate, j, 0)) def to[T](in: Inlet[T]): ReverseOps[T] = new ReverseOps(in) - def to[T, M](dst: Sink[T, M]): ReverseOps[T] = new ReverseOps(delegate.add(dst.asScala)) + def to[T, M](dst: Graph[SinkShape[T], M]): ReverseOps[T] = new ReverseOps(delegate.add(dst).inlet) def to[T](dst: SinkShape[T]): ReverseOps[T] = new ReverseOps(dst.inlet) def to[I, O](f: FlowShape[I, O]): ReverseOps[I] = new ReverseOps(f.inlet) def to[I, O](j: UniformFanInShape[I, O]): ReverseOps[I] = new ReverseOps(findIn(delegate, j, 0)) @@ -320,12 +320,12 @@ object FlowGraph { final class ForwardOps[T](out: Outlet[T]) { def to(in: Inlet[T]): Builder[Mat] = { out ~> in; self } - def to[M](dst: Sink[T, M]): Builder[Mat] = { out ~> dst.asScala; self } + def to[M](dst: Graph[SinkShape[T], M]): Builder[Mat] = { out ~> dst; self } def to(dst: SinkShape[T]): Builder[Mat] = { out ~> dst; self } def to[U](f: FlowShape[T, U]): Builder[Mat] = { out ~> f; self } def to[U](j: UniformFanInShape[T, U]): Builder[Mat] = { out ~> j; self } def to[U](j: UniformFanOutShape[T, U]): Builder[Mat] = { out ~> j; self } - def via[U, M](f: Flow[T, U, M]): ForwardOps[U] = from((out ~> f.asScala).outlet) + def via[U, M](f: Graph[FlowShape[T, U], M]): ForwardOps[U] = from((out ~> f).outlet) def via[U](f: FlowShape[T, U]): ForwardOps[U] = from((out ~> f).outlet) def via[U](j: UniformFanInShape[T, U]): ForwardOps[U] = from((out ~> j).outlet) def via[U](j: UniformFanOutShape[T, U]): ForwardOps[U] = from((out ~> j).outlet) @@ -334,12 +334,12 @@ object FlowGraph { final class ReverseOps[T](out: Inlet[T]) { def from(dst: Outlet[T]): Builder[Mat] = { out <~ dst; self } - def from[M](dst: Source[T, M]): Builder[Mat] = { out <~ dst.asScala; self } + def from[M](dst: Graph[SourceShape[T], M]): Builder[Mat] = { out <~ dst; self } def from(dst: SourceShape[T]): Builder[Mat] = { out <~ dst; self } def from[U](f: FlowShape[U, T]): Builder[Mat] = { out <~ f; self } def from[U](j: UniformFanInShape[U, T]): Builder[Mat] = { out <~ j; self } def from[U](j: UniformFanOutShape[U, T]): Builder[Mat] = { out <~ j; self } - def via[U, M](f: Flow[U, T, M]): ReverseOps[U] = to((out <~ f.asScala).inlet) + def via[U, M](f: Graph[FlowShape[U, T], M]): ReverseOps[U] = to((out <~ f).inlet) def via[U](f: FlowShape[U, T]): ReverseOps[U] = to((out <~ f).inlet) def via[U](j: UniformFanInShape[U, T]): ReverseOps[U] = to((out <~ j).inlet) def via[U](j: UniformFanOutShape[U, T]): ReverseOps[U] = to((out <~ j).inlet) diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala index 4abb4e9fef..08b12ee49f 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala @@ -18,6 +18,7 @@ object Sink { val factory: SinkCreate = new SinkCreate {} /** Adapt [[scaladsl.Sink]] for use within Java DSL */ + //FIXME: Is this needed now? def adapt[O, M](sink: scaladsl.Sink[O, M]): javadsl.Sink[O, M] = new Sink(sink) @@ -137,8 +138,8 @@ class Sink[-In, +Mat](delegate: scaladsl.Sink[In, Mat]) extends Graph[SinkShape[ /** * Connect this `Sink` to a `Source` and run it. */ - def runWith[M](source: javadsl.Source[In, M], materializer: FlowMaterializer): M = - asScala.runWith(source.asScala)(materializer) + def runWith[M](source: Graph[SourceShape[In], M], materializer: FlowMaterializer): M = + asScala.runWith(source)(materializer) /** * Transform only the materialized value of this Sink, leaving all other properties as they were. diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala index d706af778b..77e186d693 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala @@ -31,6 +31,7 @@ object Source { val factory: SourceCreate = new SourceCreate {} /** Adapt [[scaladsl.Source]] for use within JavaDSL */ + // FIXME: is this needed now? def adapt[O, M](source: scaladsl.Source[O, M]): Source[O, M] = new Source(source) @@ -206,8 +207,8 @@ object Source { * emitted by the second source is emitted after the last element of the first * source. */ - def concat[T, M1, M2](first: Source[T, M1], second: Source[T, M2]): Source[T, (M1, M2)] = - new Source(scaladsl.Source.concat(first.asScala, second.asScala)) + def concat[T, M1, M2](first: Graph[SourceShape[T], M1], second: Graph[SourceShape[T], M2]): Source[T, (M1, M2)] = + new Source(scaladsl.Source.concat(first, second)) /** * A graph with the shape of a source logically is a source, this method makes @@ -240,33 +241,33 @@ class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[Sour /** * Transform this [[Source]] by appending the given processing stages. */ - def via[T, M](flow: javadsl.Flow[Out, T, M]): javadsl.Source[T, Mat] = - new Source(delegate.via(flow.asScala)) + def via[T, M](flow: Graph[FlowShape[Out, T], M]): javadsl.Source[T, Mat] = + new Source(delegate.via(flow)) /** * Transform this [[Source]] by appending the given processing stages. */ - def via[T, M, M2](flow: javadsl.Flow[Out, T, M], combine: function.Function2[Mat, M, M2]): javadsl.Source[T, M2] = - new Source(delegate.viaMat(flow.asScala)(combinerToScala(combine))) + def via[T, M, M2](flow: Graph[FlowShape[Out, T], M], combine: function.Function2[Mat, M, M2]): javadsl.Source[T, M2] = + new Source(delegate.viaMat(flow)(combinerToScala(combine))) /** * Connect this [[Source]] to a [[Sink]], concatenating the processing steps of both. */ - def to[M](sink: javadsl.Sink[Out, M]): javadsl.RunnableFlow[Mat] = - new RunnableFlowAdapter(delegate.to(sink.asScala)) + def to[M](sink: Graph[SinkShape[Out], M]): javadsl.RunnableFlow[Mat] = + new RunnableFlowAdapter(delegate.to(sink)) /** * Connect this [[Source]] to a [[Sink]], concatenating the processing steps of both. */ - def to[M, M2](sink: javadsl.Sink[Out, M], combine: function.Function2[Mat, M, M2]): javadsl.RunnableFlow[M2] = - new RunnableFlowAdapter(delegate.toMat(sink.asScala)(combinerToScala(combine))) + def to[M, M2](sink: Graph[SinkShape[Out], M], combine: function.Function2[Mat, M, M2]): javadsl.RunnableFlow[M2] = + new RunnableFlowAdapter(delegate.toMat(sink)(combinerToScala(combine))) /** * Connect this `Source` to a `Sink` and run it. The returned value is the materialized value * of the `Sink`, e.g. the `Publisher` of a `Sink.publisher`. */ - def runWith[M](sink: Sink[Out, M], materializer: FlowMaterializer): M = - delegate.runWith(sink.asScala)(materializer) + def runWith[M](sink: Graph[SinkShape[Out], M], materializer: FlowMaterializer): M = + delegate.runWith(sink)(materializer) /** * Shortcut for running this `Source` with a fold function. @@ -284,7 +285,7 @@ class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[Sour * emitted by that source is emitted after the last element of this * source. */ - def concat[Out2 >: Out, M2](second: Source[Out2, M2]): Source[Out2, (Mat, M2)] = + def concat[Out2 >: Out, M2](second: Graph[SourceShape[Out2], M2]): Source[Out2, (Mat, M2)] = Source.concat(this, second) /** diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala index 3ef4f7d3f6..57fec5e7c4 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala @@ -46,7 +46,7 @@ final class Flow[-In, +Out, +Mat](private[stream] override val module: Module) * value of the current flow (ignoring the other Flow’s value), use * [[Flow#viaMat viaMat]] if a different strategy is needed. */ - def via[T, Mat2](flow: Flow[Out, T, Mat2]): Flow[In, T, Mat] = viaMat(flow)(Keep.left) + def via[T, Mat2](flow: Graph[FlowShape[Out, T], Mat2]): Flow[In, T, Mat] = viaMat(flow)(Keep.left) /** * Transform this [[Flow]] by appending the given processing steps. @@ -64,7 +64,7 @@ final class Flow[-In, +Out, +Mat](private[stream] override val module: Module) * The `combine` function is used to compose the materialized values of this flow and that * flow into the materialized value of the resulting Flow. */ - def viaMat[T, Mat2, Mat3](flow: Flow[Out, T, Mat2])(combine: (Mat, Mat2) ⇒ Mat3): Flow[In, T, Mat3] = { + def viaMat[T, Mat2, Mat3](flow: Graph[FlowShape[Out, T], Mat2])(combine: (Mat, Mat2) ⇒ Mat3): Flow[In, T, Mat3] = { if (this.isIdentity) flow.asInstanceOf[Flow[In, T, Mat2]].mapMaterialized(combine(().asInstanceOf[Mat], _)) else { val flowCopy = flow.module.carbonCopy @@ -92,7 +92,7 @@ final class Flow[-In, +Out, +Mat](private[stream] override val module: Module) * value of the current flow (ignoring the given Sink’s value), use * [[Flow#toMat[Mat2* toMat]] if a different strategy is needed. */ - def to[Mat2](sink: Sink[Out, Mat2]): Sink[In, Mat] = { + def to[Mat2](sink: Graph[SinkShape[Out], Mat2]): Sink[In, Mat] = { toMat(sink)(Keep.left) } @@ -112,7 +112,7 @@ final class Flow[-In, +Out, +Mat](private[stream] override val module: Module) * The `combine` function is used to compose the materialized values of this flow and that * Sink into the materialized value of the resulting Sink. */ - def toMat[Mat2, Mat3](sink: Sink[Out, Mat2])(combine: (Mat, Mat2) ⇒ Mat3): Sink[In, Mat3] = { + def toMat[Mat2, Mat3](sink: Graph[SinkShape[Out], Mat2])(combine: (Mat, Mat2) ⇒ Mat3): Sink[In, Mat3] = { if (isIdentity) sink.asInstanceOf[Sink[In, Mat3]] else { val sinkCopy = sink.module.carbonCopy @@ -142,7 +142,7 @@ final class Flow[-In, +Out, +Mat](private[stream] override val module: Module) * value of the current flow (ignoring the other Flow’s value), use * [[Flow#joinMat[Mat2* joinMat]] if a different strategy is needed. */ - def join[Mat2](flow: Flow[Out, In, Mat2]): RunnableFlow[Mat] = joinMat(flow)(Keep.left) + def join[Mat2](flow: Graph[FlowShape[Out, In], Mat2]): RunnableFlow[Mat] = joinMat(flow)(Keep.left) /** * Join this [[Flow]] to another [[Flow]], by cross connecting the inputs and outputs, creating a [[RunnableFlow]] @@ -156,7 +156,7 @@ final class Flow[-In, +Out, +Mat](private[stream] override val module: Module) * The `combine` function is used to compose the materialized values of this flow and that * Flow into the materialized value of the resulting Flow. */ - def joinMat[Mat2, Mat3](flow: Flow[Out, In, Mat2])(combine: (Mat, Mat2) ⇒ Mat3): RunnableFlow[Mat3] = { + def joinMat[Mat2, Mat3](flow: Graph[FlowShape[Out, In], Mat2])(combine: (Mat, Mat2) ⇒ Mat3): RunnableFlow[Mat3] = { val flowCopy = flow.module.carbonCopy RunnableFlow( module @@ -182,7 +182,7 @@ final class Flow[-In, +Out, +Mat](private[stream] override val module: Module) * value of the current flow (ignoring the [[BidiFlow]]’s value), use * [[Flow#joinMat[I2* joinMat]] if a different strategy is needed. */ - def join[I2, O2, Mat2](bidi: BidiFlow[Out, O2, I2, In, Mat2]): Flow[I2, O2, Mat] = joinMat(bidi)(Keep.left) + def join[I2, O2, Mat2](bidi: Graph[BidiShape[Out, O2, I2, In], Mat2]): Flow[I2, O2, Mat] = joinMat(bidi)(Keep.left) /** * Join this [[Flow]] to a [[BidiFlow]] to close off the “top” of the protocol stack: @@ -200,7 +200,7 @@ final class Flow[-In, +Out, +Mat](private[stream] override val module: Module) * The `combine` function is used to compose the materialized values of this flow and that * [[BidiFlow]] into the materialized value of the resulting [[Flow]]. */ - def joinMat[I2, O2, Mat2, M](bidi: BidiFlow[Out, O2, I2, In, Mat2])(combine: (Mat, Mat2) ⇒ M): Flow[I2, O2, M] = { + def joinMat[I2, O2, Mat2, M](bidi: Graph[BidiShape[Out, O2, I2, In], Mat2])(combine: (Mat, Mat2) ⇒ M): Flow[I2, O2, M] = { val copy = bidi.module.carbonCopy val ins = copy.shape.inlets val outs = copy.shape.outlets @@ -221,8 +221,8 @@ final class Flow[-In, +Out, +Mat](private[stream] override val module: Module) * The resulting Flow’s materialized value is a Tuple2 containing both materialized * values (of this Flow and that Source). */ - def concat[Out2 >: Out, Mat2](source: Source[Out2, Mat2]): Flow[In, Out2, (Mat, Mat2)] = - concatMat[Out2, Mat2, (Mat, Mat2)](source, Keep.both) + def concat[Out2 >: Out, Mat2](source: Graph[SourceShape[Out2], Mat2]): Flow[In, Out2, (Mat, Mat2)] = + concatMat[Out2, Mat2, (Mat, Mat2)](source)(Keep.both) /** * Concatenate the given [[Source]] to this [[Flow]], meaning that once this @@ -231,7 +231,7 @@ final class Flow[-In, +Out, +Mat](private[stream] override val module: Module) * together with this Flow and just kept from producing elements by asserting * back-pressure until its time comes. */ - def concatMat[Out2 >: Out, Mat2, Mat3](source: Source[Out2, Mat2], combine: (Mat, Mat2) ⇒ Mat3): Flow[In, Out2, Mat3] = + def concatMat[Out2 >: Out, Mat2, Mat3](source: Graph[SourceShape[Out2], Mat2])(combine: (Mat, Mat2) ⇒ Mat3): Flow[In, Out2, Mat3] = this.viaMat(Flow(source) { implicit builder ⇒ s ⇒ import FlowGraph.Implicits._ @@ -275,8 +275,8 @@ final class Flow[-In, +Out, +Mat](private[stream] override val module: Module) * the materialized values of the `Source` and `Sink`, e.g. the `Subscriber` of a of a [[Source#subscriber]] and * and `Publisher` of a [[Sink#publisher]]. */ - def runWith[Mat1, Mat2](source: Source[In, Mat1], sink: Sink[Out, Mat2])(implicit materializer: FlowMaterializer): (Mat1, Mat2) = { - source.via(this).toMat(sink)(Keep.both).run() + def runWith[Mat1, Mat2](source: Graph[SourceShape[In], Mat1], sink: Graph[SinkShape[Out], Mat2])(implicit materializer: FlowMaterializer): (Mat1, Mat2) = { + Source.wrap(source).via(this).toMat(sink)(Keep.both).run() } /** Converts this Scala DSL element to it's Java DSL counterpart. */ @@ -303,7 +303,7 @@ object Flow extends FlowApply { /** * Helper to create `Flow` from a pair of sink and source. */ - def wrap[I, O, M1, M2, M](sink: Sink[I, M1], source: Source[O, M2])(f: (M1, M2) ⇒ M): Flow[I, O, M] = + def wrap[I, O, M1, M2, M](sink: Graph[SinkShape[I], M1], source: Graph[SourceShape[O], M2])(f: (M1, M2) ⇒ M): Flow[I, O, M] = Flow(sink, source)(f) { implicit b ⇒ (in, out) ⇒ (in.inlet, out.outlet) } } diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Graph.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Graph.scala index 6d9d2d84a3..19c874694e 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Graph.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Graph.scala @@ -299,7 +299,7 @@ object FlowGraph extends GraphApply { class Builder[+M] private[stream] () { private var moduleInProgress: Module = EmptyModule - def addEdge[A, B, M2](from: Outlet[A], via: Flow[A, B, M2], to: Inlet[B]): Unit = { + def addEdge[A, B, M2](from: Outlet[A], via: Graph[FlowShape[A, B], M2], to: Inlet[B]): Unit = { val flowCopy = via.module.carbonCopy moduleInProgress = moduleInProgress @@ -465,7 +465,7 @@ object FlowGraph extends GraphApply { b.addEdge(importAndGetPort(b), to) } - def ~>[Out](via: Flow[T, Out, Any])(implicit b: Builder[_]): PortOps[Out, Unit] = { + def ~>[Out](via: Graph[FlowShape[T, Out], Any])(implicit b: Builder[_]): PortOps[Out, Unit] = { val s = b.add(via) b.addEdge(importAndGetPort(b), s.inlet) s.outlet @@ -495,8 +495,8 @@ object FlowGraph extends GraphApply { flow.outlet } - def ~>(to: Sink[T, _])(implicit b: Builder[_]): Unit = { - b.addEdge(importAndGetPort(b), b.add(to)) + def ~>(to: Graph[SinkShape[T], _])(implicit b: Builder[_]): Unit = { + b.addEdge(importAndGetPort(b), b.add(to).inlet) } def ~>(to: SinkShape[T])(implicit b: Builder[_]): Unit = { @@ -511,7 +511,7 @@ object FlowGraph extends GraphApply { b.addEdge(from, importAndGetPortReverse(b)) } - def <~[In](via: Flow[In, T, _])(implicit b: Builder[_]): ReversePortOps[In] = { + def <~[In](via: Graph[FlowShape[In, T], _])(implicit b: Builder[_]): ReversePortOps[In] = { val s = b.add(via) b.addEdge(s.outlet, importAndGetPortReverse(b)) s.inlet @@ -541,8 +541,8 @@ object FlowGraph extends GraphApply { flow.inlet } - def <~(from: Source[T, _])(implicit b: Builder[_]): Unit = { - b.addEdge(b.add(from), importAndGetPortReverse(b)) + def <~(from: Graph[SourceShape[T], _])(implicit b: Builder[_]): Unit = { + b.addEdge(b.add(from).outlet, importAndGetPortReverse(b)) } def <~(from: SourceShape[T])(implicit b: Builder[_]): Unit = { @@ -591,8 +591,8 @@ object FlowGraph extends GraphApply { override def importAndGetPortReverse(b: Builder[_]): Inlet[In] = j.in } - implicit class SinkArrow[T](val s: Sink[T, _]) extends AnyVal with ReverseCombinerBase[T] { - override def importAndGetPortReverse(b: Builder[_]): Inlet[T] = b.add(s) + implicit class SinkArrow[T](val s: Graph[SinkShape[T], _]) extends AnyVal with ReverseCombinerBase[T] { + override def importAndGetPortReverse(b: Builder[_]): Inlet[T] = b.add(s).inlet } implicit class SinkShapeArrow[T](val s: SinkShape[T]) extends AnyVal with ReverseCombinerBase[T] { @@ -602,7 +602,7 @@ object FlowGraph extends GraphApply { implicit class FlowShapeArrow[I, O](val f: FlowShape[I, O]) extends AnyVal with ReverseCombinerBase[I] { override def importAndGetPortReverse(b: Builder[_]): Inlet[I] = f.inlet - def <~>[I2, O2, Mat](bidi: BidiFlow[O, O2, I2, I, Mat])(implicit b: Builder[_]): BidiShape[O, O2, I2, I] = { + def <~>[I2, O2, Mat](bidi: Graph[BidiShape[O, O2, I2, I], Mat])(implicit b: Builder[_]): BidiShape[O, O2, I2, I] = { val shape = b.add(bidi) b.addEdge(f.outlet, shape.in1) b.addEdge(shape.out2, f.inlet) @@ -615,15 +615,15 @@ object FlowGraph extends GraphApply { bidi } - def <~>[M](flow: Flow[O, I, M])(implicit b: Builder[_]): Unit = { + def <~>[M](flow: Graph[FlowShape[O, I], M])(implicit b: Builder[_]): Unit = { val shape = b.add(flow) b.addEdge(shape.outlet, f.inlet) b.addEdge(f.outlet, shape.inlet) } } - implicit class FlowArrow[I, O, M](val f: Flow[I, O, M]) extends AnyVal { - def <~>[I2, O2, Mat](bidi: BidiFlow[O, O2, I2, I, Mat])(implicit b: Builder[_]): BidiShape[O, O2, I2, I] = { + implicit class FlowArrow[I, O, M](val f: Graph[FlowShape[I, O], M]) extends AnyVal { + def <~>[I2, O2, Mat](bidi: Graph[BidiShape[O, O2, I2, I], Mat])(implicit b: Builder[_]): BidiShape[O, O2, I2, I] = { val shape = b.add(bidi) val flow = b.add(f) b.addEdge(flow.outlet, shape.in1) @@ -638,7 +638,7 @@ object FlowGraph extends GraphApply { bidi } - def <~>[M2](flow: Flow[O, I, M2])(implicit b: Builder[_]): Unit = { + def <~>[M2](flow: Graph[FlowShape[O, I], M2])(implicit b: Builder[_]): Unit = { val shape = b.add(flow) val ff = b.add(f) b.addEdge(shape.outlet, ff.inlet) @@ -653,7 +653,7 @@ object FlowGraph extends GraphApply { other } - def <~>[I3, O3, M](otherFlow: BidiFlow[O1, O3, I3, I2, M])(implicit b: Builder[_]): BidiShape[O1, O3, I3, I2] = { + def <~>[I3, O3, M](otherFlow: Graph[BidiShape[O1, O3, I3, I2], M])(implicit b: Builder[_]): BidiShape[O1, O3, I3, I2] = { val other = b.add(otherFlow) b.addEdge(bidi.out1, other.in1) b.addEdge(other.out2, bidi.in2) @@ -665,7 +665,7 @@ object FlowGraph extends GraphApply { b.addEdge(flow.outlet, bidi.in2) } - def <~>[M](f: Flow[O1, I2, M])(implicit b: Builder[_]): Unit = { + def <~>[M](f: Graph[FlowShape[O1, I2], M])(implicit b: Builder[_]): Unit = { val flow = b.add(f) b.addEdge(bidi.out1, flow.inlet) b.addEdge(flow.outlet, bidi.in2) @@ -683,8 +683,8 @@ object FlowGraph extends GraphApply { implicit def flow2flow[I, O](f: FlowShape[I, O])(implicit b: Builder[_]): PortOps[O, Unit] = new PortOps(f.outlet, b) - implicit class SourceArrow[T](val s: Source[T, _]) extends AnyVal with CombinerBase[T] { - override def importAndGetPort(b: Builder[_]): Outlet[T] = b.add(s) + implicit class SourceArrow[T](val s: Graph[SourceShape[T], _]) extends AnyVal with CombinerBase[T] { + override def importAndGetPort(b: Builder[_]): Outlet[T] = b.add(s).outlet } implicit class SourceShapeArrow[T](val s: SourceShape[T]) extends AnyVal with CombinerBase[T] { diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala index 7f8012e630..efdd8c3a9f 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala @@ -34,8 +34,8 @@ final class Sink[-In, +Mat](private[stream] override val module: Module) * Connect this `Sink` to a `Source` and run it. The returned value is the materialized value * of the `Source`, e.g. the `Subscriber` of a [[Source#subscriber]]. */ - def runWith[Mat2](source: Source[In, Mat2])(implicit materializer: FlowMaterializer): Mat2 = - source.to(this).run() + def runWith[Mat2](source: Graph[SourceShape[In], Mat2])(implicit materializer: FlowMaterializer): Mat2 = + Source.wrap(source).to(this).run() def mapMaterialized[Mat2](f: Mat ⇒ Mat2): Sink[In, Mat2] = new Sink(module.transformMaterializedValue(f.asInstanceOf[Any ⇒ Any])) diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala index 89c4fba1b9..5316259b35 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala @@ -50,13 +50,13 @@ final class Source[+Out, +Mat](private[stream] override val module: Module) /** * Transform this [[akka.stream.scaladsl.Source]] by appending the given processing stages. */ - def via[T, Mat2](flow: Flow[Out, T, Mat2]): Source[T, Mat] = viaMat(flow)(Keep.left) + def via[T, Mat2](flow: Graph[FlowShape[Out, T], Mat2]): Source[T, Mat] = viaMat(flow)(Keep.left) /** * Transform this [[akka.stream.scaladsl.Source]] by appending the given processing stages. */ - def viaMat[T, Mat2, Mat3](flow: Flow[Out, T, Mat2])(combine: (Mat, Mat2) ⇒ Mat3): Source[T, Mat3] = { - if (flow.isIdentity) this.asInstanceOf[Source[T, Mat3]] + def viaMat[T, Mat2, Mat3](flow: Graph[FlowShape[Out, T], Mat2])(combine: (Mat, Mat2) ⇒ Mat3): Source[T, Mat3] = { + if (flow.module.isInstanceOf[Stages.Identity]) this.asInstanceOf[Source[T, Mat3]] else { val flowCopy = flow.module.carbonCopy new Source( @@ -70,13 +70,13 @@ final class Source[+Out, +Mat](private[stream] override val module: Module) * Connect this [[akka.stream.scaladsl.Source]] to a [[akka.stream.scaladsl.Sink]], * concatenating the processing steps of both. */ - def to[Mat2](sink: Sink[Out, Mat2]): RunnableFlow[Mat] = toMat(sink)(Keep.left) + def to[Mat2](sink: Graph[SinkShape[Out], Mat2]): RunnableFlow[Mat] = toMat(sink)(Keep.left) /** * Connect this [[akka.stream.scaladsl.Source]] to a [[akka.stream.scaladsl.Sink]], * concatenating the processing steps of both. */ - def toMat[Mat2, Mat3](sink: Sink[Out, Mat2])(combine: (Mat, Mat2) ⇒ Mat3): RunnableFlow[Mat3] = { + def toMat[Mat2, Mat3](sink: Graph[SinkShape[Out], Mat2])(combine: (Mat, Mat2) ⇒ Mat3): RunnableFlow[Mat3] = { val sinkCopy = sink.module.carbonCopy RunnableFlow(module.growConnect(sinkCopy, shape.outlet, sinkCopy.shape.inlets.head, combine)) } @@ -107,7 +107,7 @@ final class Source[+Out, +Mat](private[stream] override val module: Module) * Connect this `Source` to a `Sink` and run it. The returned value is the materialized value * of the `Sink`, e.g. the `Publisher` of a [[akka.stream.scaladsl.Sink#publisher]]. */ - def runWith[Mat2](sink: Sink[Out, Mat2])(implicit materializer: FlowMaterializer): Mat2 = toMat(sink)(Keep.right).run() + def runWith[Mat2](sink: Graph[SinkShape[Out], Mat2])(implicit materializer: FlowMaterializer): Mat2 = toMat(sink)(Keep.right).run() /** * Shortcut for running this `Source` with a fold function. @@ -134,14 +134,14 @@ final class Source[+Out, +Mat](private[stream] override val module: Module) * emitted by that source is emitted after the last element of this * source. */ - def concat[Out2 >: Out, M](second: Source[Out2, M]): Source[Out2, (Mat, M)] = concatMat(second)(Keep.both) + def concat[Out2 >: Out, M](second: Graph[SourceShape[Out2], M]): Source[Out2, (Mat, M)] = concatMat(second)(Keep.both) /** * Concatenates a second source so that the first element * emitted by that source is emitted after the last element of this * source. */ - def concatMat[Out2 >: Out, Mat2, Mat3](second: Source[Out2, Mat2])( + def concatMat[Out2 >: Out, Mat2, Mat3](second: Graph[SourceShape[Out2], Mat2])( combine: (Mat, Mat2) ⇒ Mat3): Source[Out2, Mat3] = Source.concatMat(this, second)(combine) /** @@ -151,7 +151,7 @@ final class Source[+Out, +Mat](private[stream] override val module: Module) * * This is a shorthand for [[concat]] */ - def ++[Out2 >: Out, M](second: Source[Out2, M]): Source[Out2, (Mat, M)] = concat(second) + def ++[Out2 >: Out, M](second: Graph[SourceShape[Out2], M]): Source[Out2, (Mat, M)] = concat(second) override def withAttributes(attr: OperationAttributes): Repr[Out, Mat] = new Source(module.withAttributes(attr).wrap()) @@ -310,7 +310,7 @@ object Source extends SourceApply { * emitted by the second source is emitted after the last element of the first * source. */ - def concat[T, Mat1, Mat2](source1: Source[T, Mat1], source2: Source[T, Mat2]): Source[T, (Mat1, Mat2)] = + def concat[T, Mat1, Mat2](source1: Graph[SourceShape[T], Mat1], source2: Graph[SourceShape[T], Mat2]): Source[T, (Mat1, Mat2)] = concatMat(source1, source2)(Keep.both).withAttributes(DefaultAttributes.concatSource) /** @@ -318,7 +318,7 @@ object Source extends SourceApply { * emitted by the second source is emitted after the last element of the first * source. */ - def concatMat[T, Mat1, Mat2, Mat3](source1: Source[T, Mat1], source2: Source[T, Mat2])( + def concatMat[T, Mat1, Mat2, Mat3](source1: Graph[SourceShape[T], Mat1], source2: Graph[SourceShape[T], Mat2])( combine: (Mat1, Mat2) ⇒ Mat3): Source[T, Mat3] = wrap(FlowGraph.partial(source1, source2)(combine) { implicit b ⇒ (s1, s2) ⇒