+str #17254: Accept graphs instead of Flow/Source/Sink etc.

This commit is contained in:
Endre Sándor Varga 2015-04-24 12:14:04 +02:00
parent 1a5d114290
commit d73f78dcbf
9 changed files with 100 additions and 90 deletions

View file

@ -141,7 +141,7 @@ class FlowMapAsyncUnorderedSpec extends AkkaSpec {
.withAttributes(supervisionStrategy(resumingDecider))
.runWith(TestSink.probe[Int])
.request(10)
.expectNext(1, 2, 4, 5)
.expectNextUnordered(1, 2, 4, 5)
.expectComplete()
}

View file

@ -38,6 +38,14 @@ object Flow {
* it so also in type.
*/
def wrap[I, O, M](g: Graph[FlowShape[I, O], M]): Flow[I, O, M] = new Flow(scaladsl.Flow.wrap(g))
/**
* Helper to create `Flow` from a pair of sink and source.
*/
def wrap[I, O, M1, M2, M](
sink: Graph[SinkShape[I], M1],
source: Graph[SourceShape[O], M2],
combine: function.Function2[M1, M2, M]): Flow[I, O, M] = new Flow(scaladsl.Flow.wrap(sink, source)(combine.apply _))
}
/** Create a `Flow` which can process elements of type `T`. */
@ -59,38 +67,38 @@ class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Graph
/**
* Transform this [[Flow]] by appending the given processing steps.
*/
def via[T, M](flow: javadsl.Flow[Out, T, M]): javadsl.Flow[In, T, Mat] =
new Flow(delegate.via(flow.asScala))
def via[T, M](flow: Graph[FlowShape[Out, T], M]): javadsl.Flow[In, T, Mat] =
new Flow(delegate.via(flow))
/**
* Transform this [[Flow]] by appending the given processing steps.
*/
def via[T, M, M2](flow: javadsl.Flow[Out, T, M], combine: function.Function2[Mat, M, M2]): javadsl.Flow[In, T, M2] =
new Flow(delegate.viaMat(flow.asScala)(combinerToScala(combine)))
def via[T, M, M2](flow: Graph[FlowShape[Out, T], M], combine: function.Function2[Mat, M, M2]): javadsl.Flow[In, T, M2] =
new Flow(delegate.viaMat(flow)(combinerToScala(combine)))
/**
* Connect this [[Flow]] to a [[Sink]], concatenating the processing steps of both.
*/
def to(sink: javadsl.Sink[Out, _]): javadsl.Sink[In, Mat] =
new Sink(delegate.to(sink.asScala))
def to(sink: Graph[SinkShape[Out], _]): javadsl.Sink[In, Mat] =
new Sink(delegate.to(sink))
/**
* Connect this [[Flow]] to a [[Sink]], concatenating the processing steps of both.
*/
def to[M, M2](sink: javadsl.Sink[Out, M], combine: function.Function2[Mat, M, M2]): javadsl.Sink[In, M2] =
new Sink(delegate.toMat(sink.asScala)(combinerToScala(combine)))
def to[M, M2](sink: Graph[SinkShape[Out], M], combine: function.Function2[Mat, M, M2]): javadsl.Sink[In, M2] =
new Sink(delegate.toMat(sink)(combinerToScala(combine)))
/**
* Join this [[Flow]] to another [[Flow]], by cross connecting the inputs and outputs, creating a [[RunnableFlow]]
*/
def join[M](flow: javadsl.Flow[Out, In, M]): javadsl.RunnableFlow[Mat] =
new RunnableFlowAdapter(delegate.join(flow.asScala))
def join[M](flow: Graph[FlowShape[Out, In], M]): javadsl.RunnableFlow[Mat] =
new RunnableFlowAdapter(delegate.join(flow))
/**
* Join this [[Flow]] to another [[Flow]], by cross connecting the inputs and outputs, creating a [[RunnableFlow]]
*/
def join[M, M2](flow: javadsl.Flow[Out, In, M], combine: function.Function2[Mat, M, M2]): javadsl.RunnableFlow[M2] =
new RunnableFlowAdapter(delegate.joinMat(flow.asScala)(combinerToScala(combine)))
def join[M, M2](flow: Graph[FlowShape[Out, In], M], combine: function.Function2[Mat, M, M2]): javadsl.RunnableFlow[M2] =
new RunnableFlowAdapter(delegate.joinMat(flow)(combinerToScala(combine)))
/**
* Join this [[Flow]] to a [[BidiFlow]] to close off the top of the protocol stack:
@ -109,8 +117,8 @@ class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Graph
* value of the current flow (ignoring the [[BidiFlow]]s value), use
* [[Flow#joinMat[I2* joinMat]] if a different strategy is needed.
*/
def join[I2, O2, Mat2](bidi: BidiFlow[Out, O2, I2, In, Mat2]): Flow[I2, O2, Mat] =
new Flow(delegate.join(bidi.asScala))
def join[I2, O2, Mat2](bidi: Graph[BidiShape[Out, O2, I2, In], Mat2]): Flow[I2, O2, Mat] =
new Flow(delegate.join(bidi))
/**
* Join this [[Flow]] to a [[BidiFlow]] to close off the top of the protocol stack:
@ -128,8 +136,8 @@ class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Graph
* The `combine` function is used to compose the materialized values of this flow and that
* [[BidiFlow]] into the materialized value of the resulting [[Flow]].
*/
def join[I2, O2, Mat2, M](bidi: BidiFlow[Out, O2, I2, In, Mat2], combine: function.Function2[Mat, Mat2, M]): Flow[I2, O2, M] =
new Flow(delegate.joinMat(bidi.asScala)(combinerToScala(combine)))
def join[I2, O2, Mat2, M](bidi: Graph[BidiShape[Out, O2, I2, In], Mat2], combine: function.Function2[Mat, Mat2, M]): Flow[I2, O2, M] =
new Flow(delegate.joinMat(bidi)(combinerToScala(combine)))
/**
* Connect the `KeyedSource` to this `Flow` and then connect it to the `KeyedSink` and run it.
@ -140,8 +148,8 @@ class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Graph
* @tparam T materialized type of given KeyedSource
* @tparam U materialized type of given KeyedSink
*/
def runWith[T, U](source: javadsl.Source[In, T], sink: javadsl.Sink[Out, U], materializer: FlowMaterializer): akka.japi.Pair[T, U] = {
val p = delegate.runWith(source.asScala, sink.asScala)(materializer)
def runWith[T, U](source: Graph[SourceShape[In], T], sink: Graph[SinkShape[Out], U], materializer: FlowMaterializer): akka.japi.Pair[T, U] = {
val p = delegate.runWith(source, sink)(materializer)
akka.japi.Pair(p._1.asInstanceOf[T], p._2.asInstanceOf[U])
}
@ -585,8 +593,8 @@ class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Graph
* Returns a new `Flow` that concatenates a secondary `Source` to this flow so that,
* the first element emitted by the given ("second") source is emitted after the last element of this Flow.
*/
def concat[M](second: javadsl.Source[Out @uncheckedVariance, M]): javadsl.Flow[In, Out, Mat @uncheckedVariance Pair M] =
new Flow(delegate.concat(second.asScala).mapMaterialized(p Pair(p._1, p._2)))
def concat[M](second: Graph[SourceShape[Out @uncheckedVariance], M]): javadsl.Flow[In, Out, Mat @uncheckedVariance Pair M] =
new Flow(delegate.concat(second).mapMaterialized(p Pair(p._1, p._2)))
override def withAttributes(attr: OperationAttributes): javadsl.Flow[In, Out, Mat] =
new Flow(delegate.withAttributes(attr))

View file

@ -271,7 +271,7 @@ object FlowGraph {
final class Builder[+Mat]()(private implicit val delegate: scaladsl.FlowGraph.Builder[Mat]) { self
import akka.stream.scaladsl.FlowGraph.Implicits._
def flow[A, B, M](from: Outlet[A], via: Flow[A, B, M], to: Inlet[B]): Unit = delegate.addEdge(from, via.asScala, to)
def flow[A, B, M](from: Outlet[A], via: Graph[FlowShape[A, B], M], to: Inlet[B]): Unit = delegate.addEdge(from, via, to)
def edge[T](from: Outlet[T], to: Inlet[T]): Unit = delegate.addEdge(from, to)
@ -282,9 +282,9 @@ object FlowGraph {
*/
def graph[S <: Shape](graph: Graph[S, _]): S = delegate.add(graph)
def source[T](source: Source[T, _]): Outlet[T] = delegate.add(source.asScala)
def source[T](source: Graph[SourceShape[T], _]): Outlet[T] = delegate.add(source).outlet
def sink[T](sink: Sink[T, _]): Inlet[T] = delegate.add(sink.asScala)
def sink[T](sink: Graph[SinkShape[T], _]): Inlet[T] = delegate.add(sink).inlet
/**
* Returns an [[Outlet]] that gives access to the materialized value of this graph. Once the graph is materialized
@ -305,14 +305,14 @@ object FlowGraph {
def run(mat: FlowMaterializer): Unit = delegate.buildRunnable().run()(mat)
def from[T](out: Outlet[T]): ForwardOps[T] = new ForwardOps(out)
def from[T, M](src: Source[T, M]): ForwardOps[T] = new ForwardOps(delegate.add(src.asScala))
def from[T, M](src: Graph[SourceShape[T], M]): ForwardOps[T] = new ForwardOps(delegate.add(src).outlet)
def from[T](src: SourceShape[T]): ForwardOps[T] = new ForwardOps(src.outlet)
def from[I, O](f: FlowShape[I, O]): ForwardOps[O] = new ForwardOps(f.outlet)
def from[I, O](j: UniformFanInShape[I, O]): ForwardOps[O] = new ForwardOps(j.out)
def from[I, O](j: UniformFanOutShape[I, O]): ForwardOps[O] = new ForwardOps(findOut(delegate, j, 0))
def to[T](in: Inlet[T]): ReverseOps[T] = new ReverseOps(in)
def to[T, M](dst: Sink[T, M]): ReverseOps[T] = new ReverseOps(delegate.add(dst.asScala))
def to[T, M](dst: Graph[SinkShape[T], M]): ReverseOps[T] = new ReverseOps(delegate.add(dst).inlet)
def to[T](dst: SinkShape[T]): ReverseOps[T] = new ReverseOps(dst.inlet)
def to[I, O](f: FlowShape[I, O]): ReverseOps[I] = new ReverseOps(f.inlet)
def to[I, O](j: UniformFanInShape[I, O]): ReverseOps[I] = new ReverseOps(findIn(delegate, j, 0))
@ -320,12 +320,12 @@ object FlowGraph {
final class ForwardOps[T](out: Outlet[T]) {
def to(in: Inlet[T]): Builder[Mat] = { out ~> in; self }
def to[M](dst: Sink[T, M]): Builder[Mat] = { out ~> dst.asScala; self }
def to[M](dst: Graph[SinkShape[T], M]): Builder[Mat] = { out ~> dst; self }
def to(dst: SinkShape[T]): Builder[Mat] = { out ~> dst; self }
def to[U](f: FlowShape[T, U]): Builder[Mat] = { out ~> f; self }
def to[U](j: UniformFanInShape[T, U]): Builder[Mat] = { out ~> j; self }
def to[U](j: UniformFanOutShape[T, U]): Builder[Mat] = { out ~> j; self }
def via[U, M](f: Flow[T, U, M]): ForwardOps[U] = from((out ~> f.asScala).outlet)
def via[U, M](f: Graph[FlowShape[T, U], M]): ForwardOps[U] = from((out ~> f).outlet)
def via[U](f: FlowShape[T, U]): ForwardOps[U] = from((out ~> f).outlet)
def via[U](j: UniformFanInShape[T, U]): ForwardOps[U] = from((out ~> j).outlet)
def via[U](j: UniformFanOutShape[T, U]): ForwardOps[U] = from((out ~> j).outlet)
@ -334,12 +334,12 @@ object FlowGraph {
final class ReverseOps[T](out: Inlet[T]) {
def from(dst: Outlet[T]): Builder[Mat] = { out <~ dst; self }
def from[M](dst: Source[T, M]): Builder[Mat] = { out <~ dst.asScala; self }
def from[M](dst: Graph[SourceShape[T], M]): Builder[Mat] = { out <~ dst; self }
def from(dst: SourceShape[T]): Builder[Mat] = { out <~ dst; self }
def from[U](f: FlowShape[U, T]): Builder[Mat] = { out <~ f; self }
def from[U](j: UniformFanInShape[U, T]): Builder[Mat] = { out <~ j; self }
def from[U](j: UniformFanOutShape[U, T]): Builder[Mat] = { out <~ j; self }
def via[U, M](f: Flow[U, T, M]): ReverseOps[U] = to((out <~ f.asScala).inlet)
def via[U, M](f: Graph[FlowShape[U, T], M]): ReverseOps[U] = to((out <~ f).inlet)
def via[U](f: FlowShape[U, T]): ReverseOps[U] = to((out <~ f).inlet)
def via[U](j: UniformFanInShape[U, T]): ReverseOps[U] = to((out <~ j).inlet)
def via[U](j: UniformFanOutShape[U, T]): ReverseOps[U] = to((out <~ j).inlet)

View file

@ -18,6 +18,7 @@ object Sink {
val factory: SinkCreate = new SinkCreate {}
/** Adapt [[scaladsl.Sink]] for use within Java DSL */
//FIXME: Is this needed now?
def adapt[O, M](sink: scaladsl.Sink[O, M]): javadsl.Sink[O, M] =
new Sink(sink)
@ -137,8 +138,8 @@ class Sink[-In, +Mat](delegate: scaladsl.Sink[In, Mat]) extends Graph[SinkShape[
/**
* Connect this `Sink` to a `Source` and run it.
*/
def runWith[M](source: javadsl.Source[In, M], materializer: FlowMaterializer): M =
asScala.runWith(source.asScala)(materializer)
def runWith[M](source: Graph[SourceShape[In], M], materializer: FlowMaterializer): M =
asScala.runWith(source)(materializer)
/**
* Transform only the materialized value of this Sink, leaving all other properties as they were.

View file

@ -31,6 +31,7 @@ object Source {
val factory: SourceCreate = new SourceCreate {}
/** Adapt [[scaladsl.Source]] for use within JavaDSL */
// FIXME: is this needed now?
def adapt[O, M](source: scaladsl.Source[O, M]): Source[O, M] =
new Source(source)
@ -206,8 +207,8 @@ object Source {
* emitted by the second source is emitted after the last element of the first
* source.
*/
def concat[T, M1, M2](first: Source[T, M1], second: Source[T, M2]): Source[T, (M1, M2)] =
new Source(scaladsl.Source.concat(first.asScala, second.asScala))
def concat[T, M1, M2](first: Graph[SourceShape[T], M1], second: Graph[SourceShape[T], M2]): Source[T, (M1, M2)] =
new Source(scaladsl.Source.concat(first, second))
/**
* A graph with the shape of a source logically is a source, this method makes
@ -240,33 +241,33 @@ class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[Sour
/**
* Transform this [[Source]] by appending the given processing stages.
*/
def via[T, M](flow: javadsl.Flow[Out, T, M]): javadsl.Source[T, Mat] =
new Source(delegate.via(flow.asScala))
def via[T, M](flow: Graph[FlowShape[Out, T], M]): javadsl.Source[T, Mat] =
new Source(delegate.via(flow))
/**
* Transform this [[Source]] by appending the given processing stages.
*/
def via[T, M, M2](flow: javadsl.Flow[Out, T, M], combine: function.Function2[Mat, M, M2]): javadsl.Source[T, M2] =
new Source(delegate.viaMat(flow.asScala)(combinerToScala(combine)))
def via[T, M, M2](flow: Graph[FlowShape[Out, T], M], combine: function.Function2[Mat, M, M2]): javadsl.Source[T, M2] =
new Source(delegate.viaMat(flow)(combinerToScala(combine)))
/**
* Connect this [[Source]] to a [[Sink]], concatenating the processing steps of both.
*/
def to[M](sink: javadsl.Sink[Out, M]): javadsl.RunnableFlow[Mat] =
new RunnableFlowAdapter(delegate.to(sink.asScala))
def to[M](sink: Graph[SinkShape[Out], M]): javadsl.RunnableFlow[Mat] =
new RunnableFlowAdapter(delegate.to(sink))
/**
* Connect this [[Source]] to a [[Sink]], concatenating the processing steps of both.
*/
def to[M, M2](sink: javadsl.Sink[Out, M], combine: function.Function2[Mat, M, M2]): javadsl.RunnableFlow[M2] =
new RunnableFlowAdapter(delegate.toMat(sink.asScala)(combinerToScala(combine)))
def to[M, M2](sink: Graph[SinkShape[Out], M], combine: function.Function2[Mat, M, M2]): javadsl.RunnableFlow[M2] =
new RunnableFlowAdapter(delegate.toMat(sink)(combinerToScala(combine)))
/**
* Connect this `Source` to a `Sink` and run it. The returned value is the materialized value
* of the `Sink`, e.g. the `Publisher` of a `Sink.publisher`.
*/
def runWith[M](sink: Sink[Out, M], materializer: FlowMaterializer): M =
delegate.runWith(sink.asScala)(materializer)
def runWith[M](sink: Graph[SinkShape[Out], M], materializer: FlowMaterializer): M =
delegate.runWith(sink)(materializer)
/**
* Shortcut for running this `Source` with a fold function.
@ -284,7 +285,7 @@ class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[Sour
* emitted by that source is emitted after the last element of this
* source.
*/
def concat[Out2 >: Out, M2](second: Source[Out2, M2]): Source[Out2, (Mat, M2)] =
def concat[Out2 >: Out, M2](second: Graph[SourceShape[Out2], M2]): Source[Out2, (Mat, M2)] =
Source.concat(this, second)
/**

View file

@ -46,7 +46,7 @@ final class Flow[-In, +Out, +Mat](private[stream] override val module: Module)
* value of the current flow (ignoring the other Flows value), use
* [[Flow#viaMat viaMat]] if a different strategy is needed.
*/
def via[T, Mat2](flow: Flow[Out, T, Mat2]): Flow[In, T, Mat] = viaMat(flow)(Keep.left)
def via[T, Mat2](flow: Graph[FlowShape[Out, T], Mat2]): Flow[In, T, Mat] = viaMat(flow)(Keep.left)
/**
* Transform this [[Flow]] by appending the given processing steps.
@ -64,7 +64,7 @@ final class Flow[-In, +Out, +Mat](private[stream] override val module: Module)
* The `combine` function is used to compose the materialized values of this flow and that
* flow into the materialized value of the resulting Flow.
*/
def viaMat[T, Mat2, Mat3](flow: Flow[Out, T, Mat2])(combine: (Mat, Mat2) Mat3): Flow[In, T, Mat3] = {
def viaMat[T, Mat2, Mat3](flow: Graph[FlowShape[Out, T], Mat2])(combine: (Mat, Mat2) Mat3): Flow[In, T, Mat3] = {
if (this.isIdentity) flow.asInstanceOf[Flow[In, T, Mat2]].mapMaterialized(combine(().asInstanceOf[Mat], _))
else {
val flowCopy = flow.module.carbonCopy
@ -92,7 +92,7 @@ final class Flow[-In, +Out, +Mat](private[stream] override val module: Module)
* value of the current flow (ignoring the given Sinks value), use
* [[Flow#toMat[Mat2* toMat]] if a different strategy is needed.
*/
def to[Mat2](sink: Sink[Out, Mat2]): Sink[In, Mat] = {
def to[Mat2](sink: Graph[SinkShape[Out], Mat2]): Sink[In, Mat] = {
toMat(sink)(Keep.left)
}
@ -112,7 +112,7 @@ final class Flow[-In, +Out, +Mat](private[stream] override val module: Module)
* The `combine` function is used to compose the materialized values of this flow and that
* Sink into the materialized value of the resulting Sink.
*/
def toMat[Mat2, Mat3](sink: Sink[Out, Mat2])(combine: (Mat, Mat2) Mat3): Sink[In, Mat3] = {
def toMat[Mat2, Mat3](sink: Graph[SinkShape[Out], Mat2])(combine: (Mat, Mat2) Mat3): Sink[In, Mat3] = {
if (isIdentity) sink.asInstanceOf[Sink[In, Mat3]]
else {
val sinkCopy = sink.module.carbonCopy
@ -142,7 +142,7 @@ final class Flow[-In, +Out, +Mat](private[stream] override val module: Module)
* value of the current flow (ignoring the other Flows value), use
* [[Flow#joinMat[Mat2* joinMat]] if a different strategy is needed.
*/
def join[Mat2](flow: Flow[Out, In, Mat2]): RunnableFlow[Mat] = joinMat(flow)(Keep.left)
def join[Mat2](flow: Graph[FlowShape[Out, In], Mat2]): RunnableFlow[Mat] = joinMat(flow)(Keep.left)
/**
* Join this [[Flow]] to another [[Flow]], by cross connecting the inputs and outputs, creating a [[RunnableFlow]]
@ -156,7 +156,7 @@ final class Flow[-In, +Out, +Mat](private[stream] override val module: Module)
* The `combine` function is used to compose the materialized values of this flow and that
* Flow into the materialized value of the resulting Flow.
*/
def joinMat[Mat2, Mat3](flow: Flow[Out, In, Mat2])(combine: (Mat, Mat2) Mat3): RunnableFlow[Mat3] = {
def joinMat[Mat2, Mat3](flow: Graph[FlowShape[Out, In], Mat2])(combine: (Mat, Mat2) Mat3): RunnableFlow[Mat3] = {
val flowCopy = flow.module.carbonCopy
RunnableFlow(
module
@ -182,7 +182,7 @@ final class Flow[-In, +Out, +Mat](private[stream] override val module: Module)
* value of the current flow (ignoring the [[BidiFlow]]s value), use
* [[Flow#joinMat[I2* joinMat]] if a different strategy is needed.
*/
def join[I2, O2, Mat2](bidi: BidiFlow[Out, O2, I2, In, Mat2]): Flow[I2, O2, Mat] = joinMat(bidi)(Keep.left)
def join[I2, O2, Mat2](bidi: Graph[BidiShape[Out, O2, I2, In], Mat2]): Flow[I2, O2, Mat] = joinMat(bidi)(Keep.left)
/**
* Join this [[Flow]] to a [[BidiFlow]] to close off the top of the protocol stack:
@ -200,7 +200,7 @@ final class Flow[-In, +Out, +Mat](private[stream] override val module: Module)
* The `combine` function is used to compose the materialized values of this flow and that
* [[BidiFlow]] into the materialized value of the resulting [[Flow]].
*/
def joinMat[I2, O2, Mat2, M](bidi: BidiFlow[Out, O2, I2, In, Mat2])(combine: (Mat, Mat2) M): Flow[I2, O2, M] = {
def joinMat[I2, O2, Mat2, M](bidi: Graph[BidiShape[Out, O2, I2, In], Mat2])(combine: (Mat, Mat2) M): Flow[I2, O2, M] = {
val copy = bidi.module.carbonCopy
val ins = copy.shape.inlets
val outs = copy.shape.outlets
@ -221,8 +221,8 @@ final class Flow[-In, +Out, +Mat](private[stream] override val module: Module)
* The resulting Flows materialized value is a Tuple2 containing both materialized
* values (of this Flow and that Source).
*/
def concat[Out2 >: Out, Mat2](source: Source[Out2, Mat2]): Flow[In, Out2, (Mat, Mat2)] =
concatMat[Out2, Mat2, (Mat, Mat2)](source, Keep.both)
def concat[Out2 >: Out, Mat2](source: Graph[SourceShape[Out2], Mat2]): Flow[In, Out2, (Mat, Mat2)] =
concatMat[Out2, Mat2, (Mat, Mat2)](source)(Keep.both)
/**
* Concatenate the given [[Source]] to this [[Flow]], meaning that once this
@ -231,7 +231,7 @@ final class Flow[-In, +Out, +Mat](private[stream] override val module: Module)
* together with this Flow and just kept from producing elements by asserting
* back-pressure until its time comes.
*/
def concatMat[Out2 >: Out, Mat2, Mat3](source: Source[Out2, Mat2], combine: (Mat, Mat2) Mat3): Flow[In, Out2, Mat3] =
def concatMat[Out2 >: Out, Mat2, Mat3](source: Graph[SourceShape[Out2], Mat2])(combine: (Mat, Mat2) Mat3): Flow[In, Out2, Mat3] =
this.viaMat(Flow(source) { implicit builder
s
import FlowGraph.Implicits._
@ -275,8 +275,8 @@ final class Flow[-In, +Out, +Mat](private[stream] override val module: Module)
* the materialized values of the `Source` and `Sink`, e.g. the `Subscriber` of a of a [[Source#subscriber]] and
* and `Publisher` of a [[Sink#publisher]].
*/
def runWith[Mat1, Mat2](source: Source[In, Mat1], sink: Sink[Out, Mat2])(implicit materializer: FlowMaterializer): (Mat1, Mat2) = {
source.via(this).toMat(sink)(Keep.both).run()
def runWith[Mat1, Mat2](source: Graph[SourceShape[In], Mat1], sink: Graph[SinkShape[Out], Mat2])(implicit materializer: FlowMaterializer): (Mat1, Mat2) = {
Source.wrap(source).via(this).toMat(sink)(Keep.both).run()
}
/** Converts this Scala DSL element to it's Java DSL counterpart. */
@ -303,7 +303,7 @@ object Flow extends FlowApply {
/**
* Helper to create `Flow` from a pair of sink and source.
*/
def wrap[I, O, M1, M2, M](sink: Sink[I, M1], source: Source[O, M2])(f: (M1, M2) M): Flow[I, O, M] =
def wrap[I, O, M1, M2, M](sink: Graph[SinkShape[I], M1], source: Graph[SourceShape[O], M2])(f: (M1, M2) M): Flow[I, O, M] =
Flow(sink, source)(f) { implicit b (in, out) (in.inlet, out.outlet) }
}

View file

@ -299,7 +299,7 @@ object FlowGraph extends GraphApply {
class Builder[+M] private[stream] () {
private var moduleInProgress: Module = EmptyModule
def addEdge[A, B, M2](from: Outlet[A], via: Flow[A, B, M2], to: Inlet[B]): Unit = {
def addEdge[A, B, M2](from: Outlet[A], via: Graph[FlowShape[A, B], M2], to: Inlet[B]): Unit = {
val flowCopy = via.module.carbonCopy
moduleInProgress =
moduleInProgress
@ -465,7 +465,7 @@ object FlowGraph extends GraphApply {
b.addEdge(importAndGetPort(b), to)
}
def ~>[Out](via: Flow[T, Out, Any])(implicit b: Builder[_]): PortOps[Out, Unit] = {
def ~>[Out](via: Graph[FlowShape[T, Out], Any])(implicit b: Builder[_]): PortOps[Out, Unit] = {
val s = b.add(via)
b.addEdge(importAndGetPort(b), s.inlet)
s.outlet
@ -495,8 +495,8 @@ object FlowGraph extends GraphApply {
flow.outlet
}
def ~>(to: Sink[T, _])(implicit b: Builder[_]): Unit = {
b.addEdge(importAndGetPort(b), b.add(to))
def ~>(to: Graph[SinkShape[T], _])(implicit b: Builder[_]): Unit = {
b.addEdge(importAndGetPort(b), b.add(to).inlet)
}
def ~>(to: SinkShape[T])(implicit b: Builder[_]): Unit = {
@ -511,7 +511,7 @@ object FlowGraph extends GraphApply {
b.addEdge(from, importAndGetPortReverse(b))
}
def <~[In](via: Flow[In, T, _])(implicit b: Builder[_]): ReversePortOps[In] = {
def <~[In](via: Graph[FlowShape[In, T], _])(implicit b: Builder[_]): ReversePortOps[In] = {
val s = b.add(via)
b.addEdge(s.outlet, importAndGetPortReverse(b))
s.inlet
@ -541,8 +541,8 @@ object FlowGraph extends GraphApply {
flow.inlet
}
def <~(from: Source[T, _])(implicit b: Builder[_]): Unit = {
b.addEdge(b.add(from), importAndGetPortReverse(b))
def <~(from: Graph[SourceShape[T], _])(implicit b: Builder[_]): Unit = {
b.addEdge(b.add(from).outlet, importAndGetPortReverse(b))
}
def <~(from: SourceShape[T])(implicit b: Builder[_]): Unit = {
@ -591,8 +591,8 @@ object FlowGraph extends GraphApply {
override def importAndGetPortReverse(b: Builder[_]): Inlet[In] = j.in
}
implicit class SinkArrow[T](val s: Sink[T, _]) extends AnyVal with ReverseCombinerBase[T] {
override def importAndGetPortReverse(b: Builder[_]): Inlet[T] = b.add(s)
implicit class SinkArrow[T](val s: Graph[SinkShape[T], _]) extends AnyVal with ReverseCombinerBase[T] {
override def importAndGetPortReverse(b: Builder[_]): Inlet[T] = b.add(s).inlet
}
implicit class SinkShapeArrow[T](val s: SinkShape[T]) extends AnyVal with ReverseCombinerBase[T] {
@ -602,7 +602,7 @@ object FlowGraph extends GraphApply {
implicit class FlowShapeArrow[I, O](val f: FlowShape[I, O]) extends AnyVal with ReverseCombinerBase[I] {
override def importAndGetPortReverse(b: Builder[_]): Inlet[I] = f.inlet
def <~>[I2, O2, Mat](bidi: BidiFlow[O, O2, I2, I, Mat])(implicit b: Builder[_]): BidiShape[O, O2, I2, I] = {
def <~>[I2, O2, Mat](bidi: Graph[BidiShape[O, O2, I2, I], Mat])(implicit b: Builder[_]): BidiShape[O, O2, I2, I] = {
val shape = b.add(bidi)
b.addEdge(f.outlet, shape.in1)
b.addEdge(shape.out2, f.inlet)
@ -615,15 +615,15 @@ object FlowGraph extends GraphApply {
bidi
}
def <~>[M](flow: Flow[O, I, M])(implicit b: Builder[_]): Unit = {
def <~>[M](flow: Graph[FlowShape[O, I], M])(implicit b: Builder[_]): Unit = {
val shape = b.add(flow)
b.addEdge(shape.outlet, f.inlet)
b.addEdge(f.outlet, shape.inlet)
}
}
implicit class FlowArrow[I, O, M](val f: Flow[I, O, M]) extends AnyVal {
def <~>[I2, O2, Mat](bidi: BidiFlow[O, O2, I2, I, Mat])(implicit b: Builder[_]): BidiShape[O, O2, I2, I] = {
implicit class FlowArrow[I, O, M](val f: Graph[FlowShape[I, O], M]) extends AnyVal {
def <~>[I2, O2, Mat](bidi: Graph[BidiShape[O, O2, I2, I], Mat])(implicit b: Builder[_]): BidiShape[O, O2, I2, I] = {
val shape = b.add(bidi)
val flow = b.add(f)
b.addEdge(flow.outlet, shape.in1)
@ -638,7 +638,7 @@ object FlowGraph extends GraphApply {
bidi
}
def <~>[M2](flow: Flow[O, I, M2])(implicit b: Builder[_]): Unit = {
def <~>[M2](flow: Graph[FlowShape[O, I], M2])(implicit b: Builder[_]): Unit = {
val shape = b.add(flow)
val ff = b.add(f)
b.addEdge(shape.outlet, ff.inlet)
@ -653,7 +653,7 @@ object FlowGraph extends GraphApply {
other
}
def <~>[I3, O3, M](otherFlow: BidiFlow[O1, O3, I3, I2, M])(implicit b: Builder[_]): BidiShape[O1, O3, I3, I2] = {
def <~>[I3, O3, M](otherFlow: Graph[BidiShape[O1, O3, I3, I2], M])(implicit b: Builder[_]): BidiShape[O1, O3, I3, I2] = {
val other = b.add(otherFlow)
b.addEdge(bidi.out1, other.in1)
b.addEdge(other.out2, bidi.in2)
@ -665,7 +665,7 @@ object FlowGraph extends GraphApply {
b.addEdge(flow.outlet, bidi.in2)
}
def <~>[M](f: Flow[O1, I2, M])(implicit b: Builder[_]): Unit = {
def <~>[M](f: Graph[FlowShape[O1, I2], M])(implicit b: Builder[_]): Unit = {
val flow = b.add(f)
b.addEdge(bidi.out1, flow.inlet)
b.addEdge(flow.outlet, bidi.in2)
@ -683,8 +683,8 @@ object FlowGraph extends GraphApply {
implicit def flow2flow[I, O](f: FlowShape[I, O])(implicit b: Builder[_]): PortOps[O, Unit] =
new PortOps(f.outlet, b)
implicit class SourceArrow[T](val s: Source[T, _]) extends AnyVal with CombinerBase[T] {
override def importAndGetPort(b: Builder[_]): Outlet[T] = b.add(s)
implicit class SourceArrow[T](val s: Graph[SourceShape[T], _]) extends AnyVal with CombinerBase[T] {
override def importAndGetPort(b: Builder[_]): Outlet[T] = b.add(s).outlet
}
implicit class SourceShapeArrow[T](val s: SourceShape[T]) extends AnyVal with CombinerBase[T] {

View file

@ -34,8 +34,8 @@ final class Sink[-In, +Mat](private[stream] override val module: Module)
* Connect this `Sink` to a `Source` and run it. The returned value is the materialized value
* of the `Source`, e.g. the `Subscriber` of a [[Source#subscriber]].
*/
def runWith[Mat2](source: Source[In, Mat2])(implicit materializer: FlowMaterializer): Mat2 =
source.to(this).run()
def runWith[Mat2](source: Graph[SourceShape[In], Mat2])(implicit materializer: FlowMaterializer): Mat2 =
Source.wrap(source).to(this).run()
def mapMaterialized[Mat2](f: Mat Mat2): Sink[In, Mat2] =
new Sink(module.transformMaterializedValue(f.asInstanceOf[Any Any]))

View file

@ -50,13 +50,13 @@ final class Source[+Out, +Mat](private[stream] override val module: Module)
/**
* Transform this [[akka.stream.scaladsl.Source]] by appending the given processing stages.
*/
def via[T, Mat2](flow: Flow[Out, T, Mat2]): Source[T, Mat] = viaMat(flow)(Keep.left)
def via[T, Mat2](flow: Graph[FlowShape[Out, T], Mat2]): Source[T, Mat] = viaMat(flow)(Keep.left)
/**
* Transform this [[akka.stream.scaladsl.Source]] by appending the given processing stages.
*/
def viaMat[T, Mat2, Mat3](flow: Flow[Out, T, Mat2])(combine: (Mat, Mat2) Mat3): Source[T, Mat3] = {
if (flow.isIdentity) this.asInstanceOf[Source[T, Mat3]]
def viaMat[T, Mat2, Mat3](flow: Graph[FlowShape[Out, T], Mat2])(combine: (Mat, Mat2) Mat3): Source[T, Mat3] = {
if (flow.module.isInstanceOf[Stages.Identity]) this.asInstanceOf[Source[T, Mat3]]
else {
val flowCopy = flow.module.carbonCopy
new Source(
@ -70,13 +70,13 @@ final class Source[+Out, +Mat](private[stream] override val module: Module)
* Connect this [[akka.stream.scaladsl.Source]] to a [[akka.stream.scaladsl.Sink]],
* concatenating the processing steps of both.
*/
def to[Mat2](sink: Sink[Out, Mat2]): RunnableFlow[Mat] = toMat(sink)(Keep.left)
def to[Mat2](sink: Graph[SinkShape[Out], Mat2]): RunnableFlow[Mat] = toMat(sink)(Keep.left)
/**
* Connect this [[akka.stream.scaladsl.Source]] to a [[akka.stream.scaladsl.Sink]],
* concatenating the processing steps of both.
*/
def toMat[Mat2, Mat3](sink: Sink[Out, Mat2])(combine: (Mat, Mat2) Mat3): RunnableFlow[Mat3] = {
def toMat[Mat2, Mat3](sink: Graph[SinkShape[Out], Mat2])(combine: (Mat, Mat2) Mat3): RunnableFlow[Mat3] = {
val sinkCopy = sink.module.carbonCopy
RunnableFlow(module.growConnect(sinkCopy, shape.outlet, sinkCopy.shape.inlets.head, combine))
}
@ -107,7 +107,7 @@ final class Source[+Out, +Mat](private[stream] override val module: Module)
* Connect this `Source` to a `Sink` and run it. The returned value is the materialized value
* of the `Sink`, e.g. the `Publisher` of a [[akka.stream.scaladsl.Sink#publisher]].
*/
def runWith[Mat2](sink: Sink[Out, Mat2])(implicit materializer: FlowMaterializer): Mat2 = toMat(sink)(Keep.right).run()
def runWith[Mat2](sink: Graph[SinkShape[Out], Mat2])(implicit materializer: FlowMaterializer): Mat2 = toMat(sink)(Keep.right).run()
/**
* Shortcut for running this `Source` with a fold function.
@ -134,14 +134,14 @@ final class Source[+Out, +Mat](private[stream] override val module: Module)
* emitted by that source is emitted after the last element of this
* source.
*/
def concat[Out2 >: Out, M](second: Source[Out2, M]): Source[Out2, (Mat, M)] = concatMat(second)(Keep.both)
def concat[Out2 >: Out, M](second: Graph[SourceShape[Out2], M]): Source[Out2, (Mat, M)] = concatMat(second)(Keep.both)
/**
* Concatenates a second source so that the first element
* emitted by that source is emitted after the last element of this
* source.
*/
def concatMat[Out2 >: Out, Mat2, Mat3](second: Source[Out2, Mat2])(
def concatMat[Out2 >: Out, Mat2, Mat3](second: Graph[SourceShape[Out2], Mat2])(
combine: (Mat, Mat2) Mat3): Source[Out2, Mat3] = Source.concatMat(this, second)(combine)
/**
@ -151,7 +151,7 @@ final class Source[+Out, +Mat](private[stream] override val module: Module)
*
* This is a shorthand for [[concat]]
*/
def ++[Out2 >: Out, M](second: Source[Out2, M]): Source[Out2, (Mat, M)] = concat(second)
def ++[Out2 >: Out, M](second: Graph[SourceShape[Out2], M]): Source[Out2, (Mat, M)] = concat(second)
override def withAttributes(attr: OperationAttributes): Repr[Out, Mat] =
new Source(module.withAttributes(attr).wrap())
@ -310,7 +310,7 @@ object Source extends SourceApply {
* emitted by the second source is emitted after the last element of the first
* source.
*/
def concat[T, Mat1, Mat2](source1: Source[T, Mat1], source2: Source[T, Mat2]): Source[T, (Mat1, Mat2)] =
def concat[T, Mat1, Mat2](source1: Graph[SourceShape[T], Mat1], source2: Graph[SourceShape[T], Mat2]): Source[T, (Mat1, Mat2)] =
concatMat(source1, source2)(Keep.both).withAttributes(DefaultAttributes.concatSource)
/**
@ -318,7 +318,7 @@ object Source extends SourceApply {
* emitted by the second source is emitted after the last element of the first
* source.
*/
def concatMat[T, Mat1, Mat2, Mat3](source1: Source[T, Mat1], source2: Source[T, Mat2])(
def concatMat[T, Mat1, Mat2, Mat3](source1: Graph[SourceShape[T], Mat1], source2: Graph[SourceShape[T], Mat2])(
combine: (Mat1, Mat2) Mat3): Source[T, Mat3] =
wrap(FlowGraph.partial(source1, source2)(combine) { implicit b
(s1, s2)