diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl2/FlowGraphCompileSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl2/FlowGraphCompileSpec.scala index 802752b841..c531d44cdc 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl2/FlowGraphCompileSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl2/FlowGraphCompileSpec.scala @@ -164,53 +164,65 @@ class FlowGraphCompileSpec extends AkkaSpec { }.run() } - "attachTap and attachDrain" in { + "attachSource and attachSink" in { val mg = FlowGraph { b ⇒ val merge = Merge[String] - val undefinedSrc1 = UndefinedTap[String] - val undefinedSrc2 = UndefinedTap[String] - val undefinedDrain1 = UndefinedDrain[String] + val undefinedSource1 = UndefinedSource[String] + val undefinedSource2 = UndefinedSource[String] + val undefinedSink1 = UndefinedSink[String] b. - addEdge(undefinedSrc1, f1, merge). - addEdge(UndefinedTap[String]("src2"), f2, merge). - addEdge(merge, f3, undefinedDrain1) + addEdge(undefinedSource1, f1, merge). + addEdge(UndefinedSource[String]("src2"), f2, merge). + addEdge(merge, f3, undefinedSink1) - b.attachTap(undefinedSrc1, in1) - b.attachTap(UndefinedTap[String]("src2"), in2) - b.attachDrain(undefinedDrain1, out1) + b.attachSource(undefinedSource1, in1) + b.attachSource(UndefinedSource[String]("src2"), in2) + b.attachSink(undefinedSink1, out1) }.run() out1.publisher(mg) should not be (null) } "build partial flow graphs" in { - val undefinedSrc1 = UndefinedTap[String] - val undefinedSrc2 = UndefinedTap[String] - val undefinedDrain1 = UndefinedDrain[String] + val undefinedSource1 = UndefinedSource[String] + val undefinedSource2 = UndefinedSource[String] + val undefinedSink1 = UndefinedSink[String] val bcast = Broadcast[String] val partial1 = PartialFlowGraph { implicit b ⇒ import FlowGraphImplicits._ val merge = Merge[String] - undefinedSrc1 ~> f1 ~> merge ~> f2 ~> bcast ~> f3 ~> undefinedDrain1 - undefinedSrc2 ~> f4 ~> merge - + undefinedSource1 ~> f1 ~> merge ~> f2 ~> bcast ~> f3 ~> undefinedSink1 + undefinedSource2 ~> f4 ~> merge } - partial1.undefinedTaps should be(Set(undefinedSrc1, undefinedSrc2)) - partial1.undefinedDrains should be(Set(undefinedDrain1)) + partial1.undefinedSources should be(Set(undefinedSource1, undefinedSource2)) + partial1.undefinedSinks should be(Set(undefinedSink1)) val partial2 = PartialFlowGraph(partial1) { implicit b ⇒ import FlowGraphImplicits._ - b.attachTap(undefinedSrc1, in1) - b.attachTap(undefinedSrc2, in2) - bcast ~> f5 ~> UndefinedDrain[String]("drain2") + b.attachSource(undefinedSource1, in1) + b.attachSource(undefinedSource2, in2) + bcast ~> f5 ~> UndefinedSink[String]("drain2") } - partial2.undefinedTaps should be(Set.empty) - partial2.undefinedDrains should be(Set(undefinedDrain1, UndefinedDrain[String]("drain2"))) + partial2.undefinedSources should be(Set.empty) + partial2.undefinedSinks should be(Set(undefinedSink1, UndefinedSink[String]("drain2"))) - FlowGraph(partial2) { implicit b ⇒ - b.attachDrain(undefinedDrain1, out1) - b.attachDrain(UndefinedDrain[String]("drain2"), out2) + FlowGraph(partial2) { b ⇒ + b.attachSink(undefinedSink1, out1) + b.attachSink(UndefinedSink[String]("drain2"), out2) + }.run() + + FlowGraph(partial2) { b ⇒ + b.attachSink(undefinedSink1, f1.connect(out1)) + b.attachSink(UndefinedSink[String]("drain2"), f2.connect(out2)) + }.run() + + FlowGraph(partial1) { implicit b ⇒ + import FlowGraphImplicits._ + b.attachSink(undefinedSink1, f1.connect(out1)) + b.attachSource(undefinedSource1, Source(List("a", "b", "c")).connect(f1)) + b.attachSource(undefinedSource2, Source(List("d", "e", "f")).connect(f2)) + bcast ~> f5 ~> out2 }.run() } @@ -293,8 +305,8 @@ class FlowGraphCompileSpec extends AkkaSpec { FlowGraph { b ⇒ val merge = Merge[Fruit] b. - addEdge(Source[Fruit](() ⇒ Some(new Apple)), merge). - addEdge(Source[Apple](() ⇒ Some(new Apple)), merge). + addEdge(Source[Fruit](() ⇒ Some(new Apple)), Flow[Fruit], merge). + addEdge(Source[Apple](() ⇒ Some(new Apple)), Flow[Apple], merge). addEdge(merge, Flow[Fruit].map(identity), out) } } @@ -315,22 +327,22 @@ class FlowGraphCompileSpec extends AkkaSpec { inB ~> merge inA ~> Flow[Fruit].map(identity) ~> merge inB ~> Flow[Apple].map(identity) ~> merge - UndefinedTap[Apple] ~> merge - UndefinedTap[Apple] ~> Flow[Fruit].map(identity) ~> merge - UndefinedTap[Apple] ~> Flow[Apple].map(identity) ~> merge + UndefinedSource[Apple] ~> merge + UndefinedSource[Apple] ~> Flow[Fruit].map(identity) ~> merge + UndefinedSource[Apple] ~> Flow[Apple].map(identity) ~> merge merge ~> Flow[Fruit].map(identity) ~> outA Source[Apple](() ⇒ Some(new Apple)) ~> Broadcast[Apple] ~> merge Source[Apple](() ⇒ Some(new Apple)) ~> Broadcast[Apple] ~> outB - Source[Apple](() ⇒ Some(new Apple)) ~> Broadcast[Apple] ~> UndefinedDrain[Fruit] + Source[Apple](() ⇒ Some(new Apple)) ~> Broadcast[Apple] ~> UndefinedSink[Fruit] inB ~> Broadcast[Apple] ~> merge Source(List(1 -> "a", 2 -> "b", 3 -> "c")) ~> unzip.in unzip.right ~> whatever - unzip.left ~> UndefinedDrain[Any] + unzip.left ~> UndefinedSink[Any] - "UndefinedTap[Fruit] ~> Flow[Apple].map(identity) ~> merge" shouldNot compile - "UndefinedTap[Fruit] ~> Broadcast[Apple]" shouldNot compile + "UndefinedSource[Fruit] ~> Flow[Apple].map(identity) ~> merge" shouldNot compile + "UndefinedSource[Fruit] ~> Broadcast[Apple]" shouldNot compile "merge ~> Broadcast[Apple]" shouldNot compile "merge ~> Flow[Fruit].map(identity) ~> Broadcast[Apple]" shouldNot compile "inB ~> merge ~> Broadcast[Apple]" shouldNot compile @@ -338,5 +350,49 @@ class FlowGraphCompileSpec extends AkkaSpec { } } + "build with plain flow without junctions" in { + FlowGraph { b ⇒ + b.addEdge(in1, f1, out1) + }.run() + FlowGraph { b ⇒ + b.addEdge(in1, f1, f2.connect(out1)) + }.run() + FlowGraph { b ⇒ + b.addEdge(in1.connect(f1), f2, out1) + }.run() + FlowGraph { implicit b ⇒ + import FlowGraphImplicits._ + in1 ~> f1 ~> out1 + }.run() + FlowGraph { implicit b ⇒ + import FlowGraphImplicits._ + in1 ~> out1 + }.run() + FlowGraph { implicit b ⇒ + import FlowGraphImplicits._ + in1 ~> f1.connect(out1) + }.run() + FlowGraph { implicit b ⇒ + import FlowGraphImplicits._ + in1.connect(f1) ~> out1 + }.run() + FlowGraph { implicit b ⇒ + import FlowGraphImplicits._ + in1.connect(f1) ~> f2.connect(out1) + }.run() + } + + "build partial with only undefined sources and sinks" in { + PartialFlowGraph { b ⇒ + b.addEdge(UndefinedSource[String], f1, UndefinedSink[String]) + } + PartialFlowGraph { b ⇒ + b.addEdge(UndefinedSource[String], f1, out1) + } + PartialFlowGraph { b ⇒ + b.addEdge(in1, f1, UndefinedSink[String]) + } + } + } } diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl2/GraphOpsIntegrationSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl2/GraphOpsIntegrationSpec.scala index 712ddbd60f..8eabed8723 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl2/GraphOpsIntegrationSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl2/GraphOpsIntegrationSpec.scala @@ -1,12 +1,12 @@ package akka.stream.scaladsl2 import akka.stream.testkit.AkkaSpec - import akka.stream.{ OverflowStrategy, MaterializerSettings } import akka.stream.testkit.{ StreamTestKit, AkkaSpec } import scala.concurrent.Await import scala.concurrent.duration._ import akka.stream.scaladsl2.FlowGraphImplicits._ +import akka.stream.testkit.StreamTestKit.SubscriberProbe class GraphOpsIntegrationSpec extends AkkaSpec { @@ -98,6 +98,54 @@ class GraphOpsIntegrationSpec extends AkkaSpec { Await.result(g.getDrainFor(resultFuture), 3.seconds) should contain theSameElementsAs (Seq(2, 4, 6, 5, 7, 9)) } + + "be able to run plain flow" in { + val p = Source(List(1, 2, 3)).toPublisher() + val s = SubscriberProbe[Int] + val flow = Flow[Int].map(_ * 2) + FlowGraph { implicit builder ⇒ + import FlowGraphImplicits._ + PublisherTap(p) ~> flow ~> SubscriberDrain(s) + }.run() + val sub = s.expectSubscription() + sub.request(10) + s.expectNext(1 * 2) + s.expectNext(2 * 2) + s.expectNext(3 * 2) + s.expectComplete() + } + + "support continued transformation from undefined source/sink" in { + val input1 = UndefinedSource[Int] + val output1 = UndefinedSink[Int] + val output2 = UndefinedSink[String] + val partial = PartialFlowGraph { implicit builder ⇒ + val bcast = Broadcast[String]("bcast") + input1 ~> Flow[Int].map(_.toString) ~> bcast ~> Flow[String].map(_.toInt) ~> output1 + bcast ~> Flow[String].map("elem-" + _) ~> output2 + } + + val s1 = SubscriberProbe[Int] + val s2 = SubscriberProbe[String] + FlowGraph(partial) { builder ⇒ + builder.attachSource(input1, Source(List(0, 1, 2).map(_ + 1))) + builder.attachSink(output1, Flow[Int].filter(n ⇒ (n % 2) != 0).connect(SubscriberDrain(s1))) + builder.attachSink(output2, Flow[String].map(_.toUpperCase).connect(SubscriberDrain(s2))) + }.run() + + val sub1 = s1.expectSubscription() + val sub2 = s2.expectSubscription() + sub1.request(10) + sub2.request(10) + s1.expectNext(1) + s1.expectNext(3) + s1.expectComplete() + s2.expectNext("ELEM-1") + s2.expectNext("ELEM-2") + s2.expectNext("ELEM-3") + s2.expectComplete() + } + } } diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl2/FlowGraph.scala b/akka-stream/src/main/scala/akka/stream/scaladsl2/FlowGraph.scala index 367e19baa4..48e04ecd0d 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl2/FlowGraph.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl2/FlowGraph.scala @@ -73,7 +73,7 @@ object Merge { * (picking randomly when several have elements ready). * * When building the [[FlowGraph]] you must connect one or more input pipes/taps - * and one output pipe/drain to the `Merge` vertex. + * and one output pipe/sink to the `Merge` vertex. */ final class Merge[T](override val name: Option[String]) extends FlowGraphInternal.InternalVertex with Junction[T] { override private[akka] val vertex = this @@ -258,64 +258,64 @@ final class Concat[T](override val name: Option[String]) extends FlowGraphIntern override private[akka] def astNode = Ast.Concat } -object UndefinedDrain { +object UndefinedSink { /** - * Create a new anonymous `UndefinedDrain` vertex with the specified input type. - * Note that a `UndefinedDrain` instance can only be used at one place (one vertex) + * Create a new anonymous `UndefinedSink` vertex with the specified input type. + * Note that a `UndefinedSink` instance can only be used at one place (one vertex) * in the `FlowGraph`. This method creates a new instance every time it * is called and those instances are not `equal`. */ - def apply[T]: UndefinedDrain[T] = new UndefinedDrain[T](None) + def apply[T]: UndefinedSink[T] = new UndefinedSink[T](None) /** - * Create a named `UndefinedDrain` vertex with the specified input type. - * Note that a `UndefinedDrain` with a specific name can only be used at one place (one vertex) + * Create a named `UndefinedSink` vertex with the specified input type. + * Note that a `UndefinedSink` with a specific name can only be used at one place (one vertex) * in the `FlowGraph`. Calling this method several times with the same name * returns instances that are `equal`. */ - def apply[T](name: String): UndefinedDrain[T] = new UndefinedDrain[T](Some(name)) + def apply[T](name: String): UndefinedSink[T] = new UndefinedSink[T](Some(name)) } /** * It is possible to define a [[PartialFlowGraph]] with output pipes that are not connected * yet by using this placeholder instead of the real [[Drain]]. Later the placeholder can * be replaced with [[FlowGraphBuilder#attachDrain]]. */ -final class UndefinedDrain[-T](override val name: Option[String]) extends FlowGraphInternal.InternalVertex { +final class UndefinedSink[-T](override val name: Option[String]) extends FlowGraphInternal.InternalVertex { override def minimumInputCount: Int = 1 override def maximumInputCount: Int = 1 override def minimumOutputCount: Int = 0 override def maximumOutputCount: Int = 0 - override private[akka] def astNode = throw new UnsupportedOperationException("Undefined drains cannot be materialized") + override private[akka] def astNode = throw new UnsupportedOperationException("Undefined sinks cannot be materialized") } -object UndefinedTap { +object UndefinedSource { /** - * Create a new anonymous `UndefinedTap` vertex with the specified input type. - * Note that a `UndefinedTap` instance can only be used at one place (one vertex) + * Create a new anonymous `UndefinedSource` vertex with the specified input type. + * Note that a `UndefinedSource` instance can only be used at one place (one vertex) * in the `FlowGraph`. This method creates a new instance every time it * is called and those instances are not `equal`. */ - def apply[T]: UndefinedTap[T] = new UndefinedTap[T](None) + def apply[T]: UndefinedSource[T] = new UndefinedSource[T](None) /** - * Create a named `UndefinedTap` vertex with the specified output type. - * Note that a `UndefinedTap` with a specific name can only be used at one place (one vertex) + * Create a named `UndefinedSource` vertex with the specified output type. + * Note that a `UndefinedSource` with a specific name can only be used at one place (one vertex) * in the `FlowGraph`. Calling this method several times with the same name * returns instances that are `equal`. */ - def apply[T](name: String): UndefinedTap[T] = new UndefinedTap[T](Some(name)) + def apply[T](name: String): UndefinedSource[T] = new UndefinedSource[T](Some(name)) } /** * It is possible to define a [[PartialFlowGraph]] with input pipes that are not connected * yet by using this placeholder instead of the real [[Tap]]. Later the placeholder can * be replaced with [[FlowGraphBuilder#attachTap]]. */ -final class UndefinedTap[+T](override val name: Option[String]) extends FlowGraphInternal.InternalVertex { +final class UndefinedSource[+T](override val name: Option[String]) extends FlowGraphInternal.InternalVertex { override def minimumInputCount: Int = 0 override def maximumInputCount: Int = 0 override def minimumOutputCount: Int = 1 override def maximumOutputCount: Int = 1 - override private[akka] def astNode = throw new UnsupportedOperationException("Undefined taps cannot be materialized") + override private[akka] def astNode = throw new UnsupportedOperationException("Undefined sources cannot be materialized") } /** @@ -375,6 +375,9 @@ private[akka] object FlowGraphInternal { val outputPort: Int) { override def toString: String = pipe.toString + + def withPipe(newFlow: Pipe[Any, Nothing]): EdgeLabel = + EdgeLabel(qualifier)(newFlow, inputPort, outputPort) } type EdgeType[T] = LkDiEdge[T] { type L1 = EdgeLabel } @@ -402,97 +405,157 @@ class FlowGraphBuilder private (graph: Graph[FlowGraphInternal.Vertex, FlowGraph private var cyclesAllowed = false - private def addTapPipeEdge[In, Out](tap: Tap[In], pipe: Pipe[In, Out], drain: JunctionInPort[Out]): this.type = { + private def addTapPipeEdge[In, Out](tap: Tap[In], pipe: Pipe[In, Out], junctionIn: JunctionInPort[Out]): this.type = { val tapVertex = TapVertex(tap) checkAddTapDrainPrecondition(tapVertex) - checkJunctionInPortPrecondition(drain) - addGraphEdge(tapVertex, drain.vertex, pipe, inputPort = drain.port, outputPort = UnlabeledPort) + checkJunctionInPortPrecondition(junctionIn) + addGraphEdge(tapVertex, junctionIn.vertex, pipe, inputPort = junctionIn.port, outputPort = UnlabeledPort) this } - private def addPipeDrainEdge[In, Out](tap: JunctionOutPort[In], pipe: Pipe[In, Out], drain: Drain[Out]): this.type = { + private def addPipeDrainEdge[In, Out](junctionOut: JunctionOutPort[In], pipe: Pipe[In, Out], drain: Drain[Out]): this.type = { val drainVertex = DrainVertex(drain) checkAddTapDrainPrecondition(drainVertex) - checkJunctionOutPortPrecondition(tap) - addGraphEdge(tap.vertex, drainVertex, pipe, inputPort = UnlabeledPort, outputPort = tap.port) + checkJunctionOutPortPrecondition(junctionOut) + addGraphEdge(junctionOut.vertex, drainVertex, pipe, inputPort = UnlabeledPort, outputPort = junctionOut.port) this } - def addEdge[In, Out](tap: UndefinedTap[In], flow: Flow[In, Out], drain: JunctionInPort[Out]): this.type = { - checkAddTapDrainPrecondition(tap) - checkJunctionInPortPrecondition(drain) + def addEdge[In, Out](source: UndefinedSource[In], flow: Flow[In, Out], junctionIn: JunctionInPort[Out]): this.type = { + checkAddTapDrainPrecondition(source) + checkJunctionInPortPrecondition(junctionIn) flow match { case pipe: Pipe[In, Out] ⇒ - addGraphEdge(tap, drain.vertex, pipe, inputPort = drain.port, outputPort = UnlabeledPort) + addGraphEdge(source, junctionIn.vertex, pipe, inputPort = junctionIn.port, outputPort = UnlabeledPort) case _ ⇒ throw new IllegalArgumentException(OnlyPipesErrorMessage) } this } - def addEdge[In, Out](tap: JunctionOutPort[In], flow: Flow[In, Out], drain: UndefinedDrain[Out]): this.type = { - checkAddTapDrainPrecondition(drain) - checkJunctionOutPortPrecondition(tap) + def addEdge[In, Out](junctionOut: JunctionOutPort[In], flow: Flow[In, Out], sink: UndefinedSink[Out]): this.type = { + checkAddTapDrainPrecondition(sink) + checkJunctionOutPortPrecondition(junctionOut) flow match { case pipe: Pipe[In, Out] ⇒ - addGraphEdge(tap.vertex, drain, pipe, inputPort = UnlabeledPort, outputPort = tap.port) + addGraphEdge(junctionOut.vertex, sink, pipe, inputPort = UnlabeledPort, outputPort = junctionOut.port) case _ ⇒ throw new IllegalArgumentException(OnlyPipesErrorMessage) } this } - def addEdge[In, Out](tap: JunctionOutPort[In], flow: Flow[In, Out], drain: JunctionInPort[Out]): this.type = { - checkJunctionOutPortPrecondition(tap) - checkJunctionInPortPrecondition(drain) + def addEdge[In, Out](junctionOut: JunctionOutPort[In], flow: Flow[In, Out], junctionIn: JunctionInPort[Out]): this.type = { + checkJunctionOutPortPrecondition(junctionOut) + checkJunctionInPortPrecondition(junctionIn) flow match { case pipe: Pipe[In, Out] ⇒ - addGraphEdge(tap.vertex, drain.vertex, pipe, inputPort = drain.port, outputPort = tap.port) + addGraphEdge(junctionOut.vertex, junctionIn.vertex, pipe, inputPort = junctionIn.port, outputPort = junctionOut.port) case _ ⇒ throw new IllegalArgumentException(OnlyPipesErrorMessage) } this } - def addEdge[In, Out](source: Source[In], flow: Flow[In, Out], drain: JunctionInPort[Out]): this.type = { + def addEdge[In, Out](source: Source[In], flow: Flow[In, Out], junctionIn: JunctionInPort[Out]): this.type = { (source, flow) match { case (tap: Tap[In], pipe: Pipe[In, Out]) ⇒ - addTapPipeEdge(tap, pipe, drain) + addTapPipeEdge(tap, pipe, junctionIn) case (spipe: SourcePipe[In], pipe: Pipe[In, Out]) ⇒ - addEdge(spipe.connect(pipe), drain) + addTapPipeEdge(spipe.input, Pipe(spipe.ops).appendPipe(pipe), junctionIn) case _ ⇒ throw new IllegalArgumentException(OnlyPipesErrorMessage) } this } - def addEdge[Out](source: Source[Out], drain: JunctionInPort[Out]): this.type = { - source match { - case tap: Tap[Out] ⇒ - addTapPipeEdge(tap, Pipe.empty[Out], drain) - case pipe: SourcePipe[Out] ⇒ - addTapPipeEdge(pipe.input, Pipe(pipe.ops), drain) - case _ ⇒ - throw new IllegalArgumentException(OnlyPipesErrorMessage) - } - this - } - - def addEdge[In, Out](tap: JunctionOutPort[In], sink: Sink[In]): this.type = { + def addEdge[In, Out](junctionOut: JunctionOutPort[In], sink: Sink[In]): this.type = { sink match { - case drain: Drain[In] ⇒ addPipeDrainEdge(tap, Pipe.empty[In], drain) - case pipe: SinkPipe[In] ⇒ addPipeDrainEdge(tap, Pipe(pipe.ops), pipe.output) + case drain: Drain[In] ⇒ addPipeDrainEdge(junctionOut, Pipe.empty[In], drain) + case pipe: SinkPipe[In] ⇒ addPipeDrainEdge(junctionOut, Pipe(pipe.ops), pipe.output) case _ ⇒ throw new IllegalArgumentException(OnlyPipesErrorMessage) } this } - def addEdge[In, Out](tap: JunctionOutPort[In], flow: Flow[In, Out], sink: Sink[Out]): this.type = { + def addEdge[In, Out](junctionOut: JunctionOutPort[In], flow: Flow[In, Out], sink: Sink[Out]): this.type = { (flow, sink) match { case (pipe: Pipe[In, Out], drain: Drain[Out]) ⇒ - addPipeDrainEdge(tap, pipe, drain) + addPipeDrainEdge(junctionOut, pipe, drain) case (pipe: Pipe[In, Out], spipe: SinkPipe[Out]) ⇒ - addEdge(tap, pipe.connect(spipe)) + addEdge(junctionOut, pipe.connect(spipe)) + case _ ⇒ throw new IllegalArgumentException(OnlyPipesErrorMessage) + } + this + } + + def addEdge[In, Out](source: Source[In], flow: Flow[In, Out], sink: Sink[Out]): this.type = { + (source, flow, sink) match { + case (tap: Tap[In], pipe: Pipe[In, Out], drain: Drain[Out]) ⇒ + val tapVertex = TapVertex(tap) + val drainVertex = DrainVertex(drain) + checkAddTapDrainPrecondition(tapVertex) + checkAddTapDrainPrecondition(drainVertex) + addGraphEdge(tapVertex, drainVertex, pipe, inputPort = UnlabeledPort, outputPort = UnlabeledPort) + case (sourcePipe: SourcePipe[In], pipe: Pipe[In, Out], sinkPipe: SinkPipe[Out]) ⇒ + val tap = sourcePipe.input + val newPipe = Pipe(sourcePipe.ops).connect(pipe).connect(Pipe(sinkPipe.ops)) + val drain = sinkPipe.output + addEdge(tap, newPipe, drain) // recursive, but now it is a Tap-Pipe-Drain + case (tap: Tap[In], pipe: Pipe[In, Out], sinkPipe: SinkPipe[Out]) ⇒ + val newPipe = pipe.connect(Pipe(sinkPipe.ops)) + val drain = sinkPipe.output + addEdge(tap, newPipe, drain) // recursive, but now it is a Tap-Pipe-Drain + case (sourcePipe: SourcePipe[In], pipe: Pipe[In, Out], drain: Drain[Out]) ⇒ + val tap = sourcePipe.input + val newPipe = Pipe(sourcePipe.ops).connect(pipe) + addEdge(tap, newPipe, drain) // recursive, but now it is a Tap-Pipe-Drain + case _ ⇒ + throw new IllegalArgumentException(OnlyPipesErrorMessage) + } + + this + } + + def addEdge[In, Out](source: UndefinedSource[In], flow: Flow[In, Out], sink: UndefinedSink[Out]): this.type = { + checkAddTapDrainPrecondition(source) + checkAddTapDrainPrecondition(sink) + flow match { + case pipe: Pipe[In, Out] ⇒ + addGraphEdge(source, sink, pipe, inputPort = UnlabeledPort, outputPort = UnlabeledPort) + case _ ⇒ + throw new IllegalArgumentException(OnlyPipesErrorMessage) + } + this + } + + def addEdge[In, Out](source: UndefinedSource[In], flow: Flow[In, Out], sink: Sink[Out]): this.type = { + checkAddTapDrainPrecondition(source) + (flow, sink) match { + case (pipe: Pipe[In, Out], drain: Drain[Out]) ⇒ + val drainVertex = DrainVertex(drain) + checkAddTapDrainPrecondition(drainVertex) + addGraphEdge(source, drainVertex, pipe, inputPort = UnlabeledPort, outputPort = UnlabeledPort) + case (pipe: Pipe[In, Out], spipe: SinkPipe[Out]) ⇒ + val drainVertex = DrainVertex(spipe.output) + checkAddTapDrainPrecondition(drainVertex) + addGraphEdge(source, drainVertex, pipe.appendPipe(Pipe(spipe.ops)), inputPort = UnlabeledPort, outputPort = UnlabeledPort) + case _ ⇒ throw new IllegalArgumentException(OnlyPipesErrorMessage) + } + this + } + + def addEdge[In, Out](source: Source[In], flow: Flow[In, Out], sink: UndefinedSink[Out]): this.type = { + checkAddTapDrainPrecondition(sink) + (flow, source) match { + case (pipe: Pipe[In, Out], tap: Tap[In]) ⇒ + val tapVertex = TapVertex(tap) + checkAddTapDrainPrecondition(tapVertex) + addGraphEdge(tapVertex, sink, pipe, inputPort = UnlabeledPort, outputPort = UnlabeledPort) + case (pipe: Pipe[In, Out], spipe: SourcePipe[Out]) ⇒ + val tapVertex = TapVertex(spipe.input) + checkAddTapDrainPrecondition(tapVertex) + addGraphEdge(tapVertex, sink, Pipe(spipe.ops).appendPipe(pipe), inputPort = UnlabeledPort, outputPort = UnlabeledPort) case _ ⇒ throw new IllegalArgumentException(OnlyPipesErrorMessage) } this @@ -505,24 +568,51 @@ class FlowGraphBuilder private (graph: Graph[FlowGraphInternal.Vertex, FlowGraph edgeQualifier += 1 } - def attachDrain[Out](token: UndefinedDrain[Out], drain: Drain[Out]): this.type = { + def attachSink[Out](token: UndefinedSink[Out], sink: Sink[Out]): this.type = { graph.find(token) match { case Some(existing) ⇒ val edge = existing.incoming.head graph.remove(existing) - graph.addLEdge(edge.from.value, DrainVertex(drain))(edge.label) - case None ⇒ throw new IllegalArgumentException(s"No matching UndefinedDrain [${token}]") + sink match { + case drain: Drain[Out] ⇒ + val drainVertex = DrainVertex(drain) + checkAddTapDrainPrecondition(drainVertex) + graph.addLEdge(edge.from.value, drainVertex)(edge.label) + case spipe: SinkPipe[Out] ⇒ + val pipe = edge.label.pipe.appendPipe(Pipe(spipe.ops)) + val label = edge.label.withPipe(pipe) + val drainVertex = DrainVertex(spipe.output) + checkAddTapDrainPrecondition(drainVertex) + graph.addLEdge(edge.from.value, drainVertex)(label) + case _ ⇒ throw new IllegalArgumentException(OnlyPipesErrorMessage) + } + + case None ⇒ throw new IllegalArgumentException(s"No matching UndefinedSink [${token}]") } this } - def attachTap[In](token: UndefinedTap[In], tap: Tap[In]): this.type = { + def attachSource[In](token: UndefinedSource[In], source: Source[In]): this.type = { graph.find(token) match { case Some(existing) ⇒ val edge = existing.outgoing.head graph.remove(existing) - graph.addLEdge(TapVertex(tap), edge.to.value)(edge.label) - case None ⇒ throw new IllegalArgumentException(s"No matching UndefinedTap [${token}]") + source match { + case tap: Tap[In] ⇒ + val tapVertex = TapVertex(tap) + checkAddTapDrainPrecondition(tapVertex) + graph.addLEdge(tapVertex, edge.to.value)(edge.label) + case spipe: SourcePipe[In] ⇒ + val pipe = Pipe(spipe.ops).appendPipe(edge.label.pipe) + val label = edge.label.withPipe(pipe) + val tapVertex = TapVertex(spipe.input) + checkAddTapDrainPrecondition(tapVertex) + graph.addLEdge(tapVertex, edge.to.value)(label) + case _ ⇒ + throw new IllegalArgumentException(OnlyPipesErrorMessage) + } + + case None ⇒ throw new IllegalArgumentException(s"No matching UndefinedSource [${token}]") } this } @@ -599,18 +689,18 @@ class FlowGraphBuilder private (graph: Graph[FlowGraphInternal.Vertex, FlowGraph } private def checkBuildPreconditions(): Unit = { - val undefinedTapsDrains = graph.nodes.filter { + val undefinedSourcesDrains = graph.nodes.filter { _.value match { - case _: UndefinedTap[_] | _: UndefinedDrain[_] ⇒ true - case x ⇒ false + case _: UndefinedSource[_] | _: UndefinedSink[_] ⇒ true + case x ⇒ false } } - if (undefinedTapsDrains.nonEmpty) { - val formatted = undefinedTapsDrains.map(n ⇒ n.value match { - case u: UndefinedTap[_] ⇒ s"$u -> ${n.outgoing.head.label} -> ${n.outgoing.head.to}" - case u: UndefinedDrain[_] ⇒ s"${n.incoming.head.from} -> ${n.incoming.head.label} -> $u" + if (undefinedSourcesDrains.nonEmpty) { + val formatted = undefinedSourcesDrains.map(n ⇒ n.value match { + case u: UndefinedSource[_] ⇒ s"$u -> ${n.outgoing.head.label} -> ${n.outgoing.head.to}" + case u: UndefinedSink[_] ⇒ s"${n.incoming.head.from} -> ${n.incoming.head.label} -> $u" }) - throw new IllegalArgumentException("Undefined taps or drains: " + formatted.mkString(", ")) + throw new IllegalArgumentException("Undefined sources or sinks: " + formatted.mkString(", ")) } graph.nodes.foreach { node ⇒ @@ -634,9 +724,9 @@ class FlowGraphBuilder private (graph: Graph[FlowGraphInternal.Vertex, FlowGraph require(graph.nonEmpty, "Graph must not be empty") require(graph.exists(graph having ((node = { n ⇒ n.isLeaf && n.diSuccessors.isEmpty }))), - "Graph must have at least one drain") + "Graph must have at least one sink") require(graph.exists(graph having ((node = { n ⇒ n.isLeaf && n.diPredecessors.isEmpty }))), - "Graph must have at least one tap") + "Graph must have at least one source") require(graph.isConnected, "Graph must be connected") } @@ -658,7 +748,7 @@ object FlowGraph { /** * Continue building a [[FlowGraph]] from an existing `PartialFlowGraph`. - * For example you can attach undefined taps and drains with + * For example you can attach undefined sources and sinks with * [[FlowGraphBuilder#attachTap]] and [[FlowGraphBuilder#attachDrain]] */ def apply(partialFlowGraph: PartialFlowGraph)(block: FlowGraphBuilder ⇒ Unit): FlowGraph = @@ -688,9 +778,40 @@ class FlowGraph private[akka] (private[akka] val graph: ImmutableGraph[FlowGraph import FlowGraphInternal._ /** - * Materialize the `FlowGraph` and attach all drains and taps. + * Materialize the `FlowGraph` and attach all sinks and sources. */ - def run()(implicit materializer: FlowMaterializer): MaterializedPipeGraph = { + def run()(implicit materializer: FlowMaterializer): MaterializedFlowGraph = { + val edges = graph.edges + if (edges.size == 1) { + val edge = edges.head + (edge.from.value, edge.to.value) match { + case (tapVertex: TapVertex, drainVertex: DrainVertex) ⇒ + val pipe = edge.label.pipe + runSimple(tapVertex, drainVertex, pipe) + case _ ⇒ + runGraph() + } + } else + runGraph() + } + + /** + * Run FlowGraph that only contains one edge from a `Source` to a `Sink`. + */ + private def runSimple(tapVertex: TapVertex, drainVertex: DrainVertex, pipe: Pipe[Any, Nothing])(implicit materializer: FlowMaterializer): MaterializedFlowGraph = { + val mf = pipe.withTap(tapVertex.tap).withDrain(drainVertex.drain).run() + val materializedSources: Map[TapWithKey[_, _], Any] = tapVertex match { + case TapVertex(tap: TapWithKey[_, _]) ⇒ Map(tap -> mf.getTapFor(tap)) + case _ ⇒ Map.empty + } + val materializedSinks: Map[DrainWithKey[_, _], Any] = drainVertex match { + case DrainVertex(drain: DrainWithKey[_, _]) ⇒ Map(drain -> mf.getDrainFor(drain)) + case _ ⇒ Map.empty + } + new MaterializedFlowGraph(materializedSources, materializedSinks) + } + + private def runGraph()(implicit materializer: FlowMaterializer): MaterializedFlowGraph = { import scalax.collection.GraphTraversal._ // start with drains @@ -731,11 +852,11 @@ class FlowGraph private[akka] (private[akka] val graph: ImmutableGraph[FlowGraph } edge.from.value match { - case src: TapVertex ⇒ + case tap: TapVertex ⇒ val f = pipe.withDrain(SubscriberDrain(memo.downstreamSubscriber(edge))) // connect the tap with the pipe later memo.copy(visited = memo.visited + edge, - taps = memo.taps.updated(src, f)) + taps = memo.taps.updated(tap, f)) case v: InternalVertex ⇒ if (memo.upstreamPublishers.contains(edge)) { @@ -772,15 +893,15 @@ class FlowGraph private[akka] (private[akka] val graph: ImmutableGraph[FlowGraph // connect all input taps as the last thing val materializedTaps = result.taps.foldLeft(Map.empty[TapWithKey[_, _], Any]) { - case (acc, (TapVertex(src), pipe)) ⇒ - val mf = pipe.withTap(src).run() - src match { - case srcKey: TapWithKey[_, _] ⇒ acc.updated(srcKey, mf.getTapFor(srcKey)) + case (acc, (TapVertex(tap), pipe)) ⇒ + val mf = pipe.withTap(tap).run() + tap match { + case tapKey: TapWithKey[_, _] ⇒ acc.updated(tapKey, mf.getTapFor(tapKey)) case _ ⇒ acc } } - new MaterializedPipeGraph(materializedTaps, result.materializedDrains) + new MaterializedFlowGraph(materializedTaps, result.materializedDrains) } } @@ -819,7 +940,7 @@ object PartialFlowGraph { } /** - * `PartialFlowGraph` may have taps and drains that are not attached, and it can therefore not + * `PartialFlowGraph` may have sources and sinks that are not attached, and it can therefore not * be `run` until those are attached. * * Build a `PartialFlowGraph` by starting with one of the `apply` methods in @@ -828,14 +949,14 @@ object PartialFlowGraph { class PartialFlowGraph private[akka] (private[akka] val graph: ImmutableGraph[FlowGraphInternal.Vertex, FlowGraphInternal.EdgeType]) { import FlowGraphInternal._ - def undefinedTaps: Set[UndefinedTap[_]] = + def undefinedSources: Set[UndefinedSource[_]] = graph.nodes.iterator.map(_.value).collect { - case n: UndefinedTap[_] ⇒ n + case n: UndefinedSource[_] ⇒ n }.toSet - def undefinedDrains: Set[UndefinedDrain[_]] = + def undefinedSinks: Set[UndefinedSink[_]] = graph.nodes.iterator.map(_.value).collect { - case n: UndefinedDrain[_] ⇒ n + case n: UndefinedSink[_] ⇒ n }.toSet } @@ -845,7 +966,7 @@ class PartialFlowGraph private[akka] (private[akka] val graph: ImmutableGraph[Fl * accessor method to retrieve the materialized `Tap` or `Drain`, e.g. * [[SubscriberTap#subscriber]] or [[PublisherDrain#publisher]]. */ -class MaterializedPipeGraph(materializedTaps: Map[TapWithKey[_, _], Any], materializedDrains: Map[DrainWithKey[_, _], Any]) +class MaterializedFlowGraph(materializedTaps: Map[TapWithKey[_, _], Any], materializedDrains: Map[DrainWithKey[_, _], Any]) extends MaterializedTap with MaterializedDrain { /** @@ -874,69 +995,75 @@ class MaterializedPipeGraph(materializedTaps: Map[TapWithKey[_, _], Any], materi */ object FlowGraphImplicits { - class SourceNextStep[In, Out](source: Source[In], flow: Flow[In, Out], builder: FlowGraphBuilder) { - def ~>(drain: JunctionInPort[Out]): JunctionOutPort[drain.NextT] = { - builder.addEdge(source, flow, drain) - drain.next + implicit class SourceOps[Out](val source: Source[Out]) extends AnyVal { + + def ~>[O](flow: Flow[Out, O])(implicit builder: FlowGraphBuilder): SourceNextStep[Out, O] = + new SourceNextStep(source, flow, builder) + + def ~>(junctionIn: JunctionInPort[Out])(implicit builder: FlowGraphBuilder): JunctionOutPort[junctionIn.NextT] = { + builder.addEdge(source, Pipe.empty[Out], junctionIn) + junctionIn.next } + + def ~>(sink: Sink[Out])(implicit builder: FlowGraphBuilder): Unit = + builder.addEdge(source, Pipe.empty[Out], sink) + } + + class SourceNextStep[In, Out](source: Source[In], flow: Flow[In, Out], builder: FlowGraphBuilder) { + def ~>(junctionIn: JunctionInPort[Out]): JunctionOutPort[junctionIn.NextT] = { + builder.addEdge(source, flow, junctionIn) + junctionIn.next + } + + def ~>(sink: Sink[Out]): Unit = + builder.addEdge(source, flow, sink) } implicit class JunctionOps[In](val junction: JunctionOutPort[In]) extends AnyVal { def ~>[Out](flow: Flow[In, Out])(implicit builder: FlowGraphBuilder): JunctionNextStep[In, Out] = new JunctionNextStep(junction, flow, builder) - def ~>(drain: UndefinedDrain[In])(implicit builder: FlowGraphBuilder): Unit = - builder.addEdge(junction, Pipe.empty[In], drain) + def ~>(sink: UndefinedSink[In])(implicit builder: FlowGraphBuilder): Unit = + builder.addEdge(junction, Pipe.empty[In], sink) - def ~>(drain: JunctionInPort[In])(implicit builder: FlowGraphBuilder): JunctionOutPort[drain.NextT] = { - builder.addEdge(junction, Pipe.empty[In], drain) - drain.next + def ~>(junctionIn: JunctionInPort[In])(implicit builder: FlowGraphBuilder): JunctionOutPort[junctionIn.NextT] = { + builder.addEdge(junction, Pipe.empty[In], junctionIn) + junctionIn.next } def ~>(sink: Sink[In])(implicit builder: FlowGraphBuilder): Unit = builder.addEdge(junction, sink) } - class JunctionNextStep[In, Out](junction: JunctionOutPort[In], flow: Flow[In, Out], builder: FlowGraphBuilder) { - def ~>(drain: JunctionInPort[Out]): JunctionOutPort[drain.NextT] = { - builder.addEdge(junction, flow, drain) - drain.next + class JunctionNextStep[In, Out](junctionOut: JunctionOutPort[In], flow: Flow[In, Out], builder: FlowGraphBuilder) { + def ~>(junctionIn: JunctionInPort[Out]): JunctionOutPort[junctionIn.NextT] = { + builder.addEdge(junctionOut, flow, junctionIn) + junctionIn.next } def ~>(sink: Sink[Out]): Unit = { - builder.addEdge(junction, flow, sink) + builder.addEdge(junctionOut, flow, sink) } - def ~>(drain: UndefinedDrain[Out]): Unit = { - builder.addEdge(junction, flow, drain) + def ~>(sink: UndefinedSink[Out]): Unit = { + builder.addEdge(junctionOut, flow, sink) } } - implicit class SourceOps[Out](val source: Source[Out]) extends AnyVal { + implicit class UndefinedSourceOps[In](val source: UndefinedSource[In]) extends AnyVal { + def ~>[Out](flow: Flow[In, Out])(implicit builder: FlowGraphBuilder): UndefinedSourceNextStep[In, Out] = + new UndefinedSourceNextStep(source, flow, builder) - def ~>[O](flow: Flow[Out, O])(implicit builder: FlowGraphBuilder): SourceNextStep[Out, O] = - new SourceNextStep(source, flow, builder) - - def ~>(drain: JunctionInPort[Out])(implicit builder: FlowGraphBuilder): JunctionOutPort[drain.NextT] = { - builder.addEdge(source, drain) - drain.next - } - } - - implicit class UndefinedTapOps[In](val tap: UndefinedTap[In]) extends AnyVal { - def ~>[Out](flow: Flow[In, Out])(implicit builder: FlowGraphBuilder): UndefinedTapNextStep[In, Out] = - new UndefinedTapNextStep(tap, flow, builder) - - def ~>(drain: JunctionInPort[In])(implicit builder: FlowGraphBuilder): JunctionOutPort[drain.NextT] = { - builder.addEdge(tap, Pipe.empty[In], drain) - drain.next + def ~>(junctionIn: JunctionInPort[In])(implicit builder: FlowGraphBuilder): JunctionOutPort[junctionIn.NextT] = { + builder.addEdge(source, Pipe.empty[In], junctionIn) + junctionIn.next } } - class UndefinedTapNextStep[In, Out](tap: UndefinedTap[In], flow: Flow[In, Out], builder: FlowGraphBuilder) { - def ~>(drain: JunctionInPort[Out]): JunctionOutPort[drain.NextT] = { - builder.addEdge(tap, flow, drain) - drain.next + class UndefinedSourceNextStep[In, Out](source: UndefinedSource[In], flow: Flow[In, Out], builder: FlowGraphBuilder) { + def ~>(junctionIn: JunctionInPort[Out]): JunctionOutPort[junctionIn.NextT] = { + builder.addEdge(source, flow, junctionIn) + junctionIn.next } } diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl2/Pipe.scala b/akka-stream/src/main/scala/akka/stream/scaladsl2/Pipe.scala index 9e1504c36a..ec0dabc20c 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl2/Pipe.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl2/Pipe.scala @@ -226,9 +226,9 @@ private[scaladsl2] final case class Pipe[-In, +Out](ops: List[AstNode]) extends override protected def andThen[U](op: AstNode): Repr[U] = this.copy(ops = op :: ops) - def withDrain(out: Drain[Out]): SinkPipe[In] = SinkPipe(out, ops) + private[scaladsl2] def withDrain(out: Drain[Out]): SinkPipe[In] = SinkPipe(out, ops) - def withTap(in: Tap[In]): SourcePipe[Out] = SourcePipe(in, ops) + private[scaladsl2] def withTap(in: Tap[In]): SourcePipe[Out] = SourcePipe(in, ops) override def connect[T](flow: Flow[Out, T]): Flow[In, T] = flow match { case p: Pipe[T, In] ⇒ Pipe(p.ops ++: ops) @@ -240,6 +240,8 @@ private[scaladsl2] final case class Pipe[-In, +Out](ops: List[AstNode]) extends case d: Drain[Out] ⇒ this.withDrain(d) case _ ⇒ throw new IllegalArgumentException(Pipe.OnlyPipesErrorMessage) } + + private[scaladsl2] def appendPipe[T](pipe: Pipe[Out, T]): Pipe[In, T] = Pipe(pipe.ops ++: ops) } /** @@ -247,9 +249,9 @@ private[scaladsl2] final case class Pipe[-In, +Out](ops: List[AstNode]) extends */ private[scaladsl2] final case class SinkPipe[-In](output: Drain[_], ops: List[AstNode]) extends Sink[In] { - def withTap(in: Tap[In]): RunnablePipe = RunnablePipe(in, output, ops) + private[scaladsl2] def withTap(in: Tap[In]): RunnablePipe = RunnablePipe(in, output, ops) - def prependPipe[T](pipe: Pipe[T, In]): SinkPipe[T] = SinkPipe(output, ops ::: pipe.ops) + private[scaladsl2] def prependPipe[T](pipe: Pipe[T, In]): SinkPipe[T] = SinkPipe(output, ops ::: pipe.ops) override def toSubscriber()(implicit materializer: FlowMaterializer): Subscriber[In @uncheckedVariance] = { val subIn = SubscriberTap[In]() @@ -266,9 +268,9 @@ private[scaladsl2] final case class SourcePipe[+Out](input: Tap[_], ops: List[As override protected def andThen[U](op: AstNode): Repr[U] = SourcePipe(input, op :: ops) - def withDrain(out: Drain[Out]): RunnablePipe = RunnablePipe(input, out, ops) + private[scaladsl2] def withDrain(out: Drain[Out]): RunnablePipe = RunnablePipe(input, out, ops) - def appendPipe[T](pipe: Pipe[Out, T]): SourcePipe[T] = SourcePipe(input, pipe.ops ++: ops) + private[scaladsl2] def appendPipe[T](pipe: Pipe[Out, T]): SourcePipe[T] = SourcePipe(input, pipe.ops ++: ops) override def connect[T](flow: Flow[Out, T]): Source[T] = flow match { case p: Pipe[Out, T] ⇒ appendPipe(p)