diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl2/GraphFlowSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl2/GraphFlowSpec.scala new file mode 100644 index 0000000000..0930ad8ba6 --- /dev/null +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl2/GraphFlowSpec.scala @@ -0,0 +1,324 @@ +/** + * Copyright (C) 2009-2014 Typesafe Inc. + */ +package akka.stream.scaladsl2 + +import akka.stream.MaterializerSettings +import akka.stream.testkit.StreamTestKit.SubscriberProbe +import akka.stream.testkit.{ StreamTestKit, AkkaSpec } + +object GraphFlowSpec { + val tap1 = Source(0 to 3) + val inMerge = Merge[Int]("m1") + val outMerge = Merge[String]("m3") + + val partialGraph = PartialFlowGraph { implicit b ⇒ + import FlowGraphImplicits._ + val tap2 = Source(4 to 9) + val tap3 = Source.empty[Int] + val tap4 = Source.empty[String] + val m2 = Merge[Int]("m2") + + inMerge ~> Flow[Int].map(_ * 2) ~> m2 ~> Flow[Int].map(_ / 2).map(i ⇒ (i + 1).toString) ~> outMerge + tap2 ~> inMerge + tap3 ~> m2 + tap4 ~> outMerge + } + + val stdRequests = 10 + val stdResult = Set(1, 2, 3, 4, 5, 6, 7, 8, 9, 10) +} + +class GraphFlowSpec extends AkkaSpec { + + import GraphFlowSpec._ + + val settings = MaterializerSettings(system) + .withInputBuffer(initialSize = 2, maxSize = 16) + .withFanOutBuffer(initialSize = 1, maxSize = 16) + + implicit val materializer = FlowMaterializer(settings) + + def validateProbe(probe: SubscriberProbe[Int], requests: Int, result: Set[Int]): Unit = { + val subscription = probe.expectSubscription() + + val collected = (1 to requests).map { _ ⇒ + subscription.request(1) + probe.expectNext() + }.toSet + + collected should be(result) + probe.expectComplete() + + } + + "FlowGraphs" when { + "turned into flows" should { + "work with a Tap and Drain" in { + val in = UndefinedSource[Int] + val out = UndefinedSink[Int] + val probe = StreamTestKit.SubscriberProbe[Int]() + + val flow = Flow(partialGraph) { implicit b ⇒ + import FlowGraphImplicits._ + in ~> inMerge + outMerge ~> Flow[String].map(_.toInt) ~> out + in -> out + } + + tap1.connect(flow).connect(Sink(probe)).run() + + validateProbe(probe, stdRequests, stdResult) + } + + "be transformable with a Pipe" in { + val in = UndefinedSource[Int] + val out = UndefinedSink[String] + + val probe = StreamTestKit.SubscriberProbe[Int]() + + val flow = Flow[Int, String](partialGraph) { implicit b ⇒ + import FlowGraphImplicits._ + in ~> inMerge + outMerge ~> out + in -> out + } + + tap1.connect(flow).map(_.toInt).connect(Sink(probe)).run() + + validateProbe(probe, stdRequests, stdResult) + } + + "work with another GraphFlow" in { + val in1 = UndefinedSource[Int] + val out1 = UndefinedSink[String] + + val in2 = UndefinedSource[String] + val out2 = UndefinedSink[Int] + + val probe = StreamTestKit.SubscriberProbe[Int]() + + val flow1 = Flow(partialGraph) { implicit b ⇒ + import FlowGraphImplicits._ + in1 ~> inMerge + outMerge ~> out1 + in1 -> out1 + } + + val flow2 = Flow() { implicit b ⇒ + import FlowGraphImplicits._ + in2 ~> Flow[String].map(_.toInt) ~> out2 + in2 -> out2 + } + + tap1.connect(flow1).connect(flow2).connect(Sink(probe)).run() + + validateProbe(probe, stdRequests, stdResult) + } + + "be reusable multiple times" in { + val in = UndefinedSource[Int] + val out = UndefinedSink[Int] + val probe = StreamTestKit.SubscriberProbe[Int]() + + val flow = Flow() { implicit b ⇒ + import FlowGraphImplicits._ + in ~> Flow[Int].map(_ * 2) ~> out + in -> out + } + + FlowGraph { implicit b ⇒ + import FlowGraphImplicits._ + Source(1 to 5) ~> flow ~> flow ~> Sink(probe) + }.run() + + validateProbe(probe, 5, Set(4, 8, 12, 16, 20)) + } + } + + "turned into sources" should { + "work with a Drain" in { + val out = UndefinedSink[Int] + val probe = StreamTestKit.SubscriberProbe[Int]() + + val source = Source(partialGraph) { implicit b ⇒ + import FlowGraphImplicits._ + tap1 ~> inMerge + outMerge ~> Flow[String].map(_.toInt) ~> out + out + } + + source.connect(Sink(probe)).run() + + validateProbe(probe, stdRequests, stdResult) + } + + "be transformable with a Pipe" in { + val out = UndefinedSink[String] + + val probe = StreamTestKit.SubscriberProbe[Int]() + + val source = Source[String](partialGraph) { implicit b ⇒ + import FlowGraphImplicits._ + tap1 ~> inMerge + outMerge ~> out + out + } + + source.map(_.toInt).connect(Sink(probe)).run() + + validateProbe(probe, stdRequests, stdResult) + } + + "work with an GraphFlow" in { + val out1 = UndefinedSink[String] + + val in2 = UndefinedSource[String] + val out2 = UndefinedSink[Int] + + val probe = StreamTestKit.SubscriberProbe[Int]() + + val source = Source(partialGraph) { implicit b ⇒ + import FlowGraphImplicits._ + tap1 ~> inMerge + outMerge ~> out1 + out1 + } + + val flow = Flow() { implicit b ⇒ + import FlowGraphImplicits._ + in2 ~> Flow[String].map(_.toInt) ~> out2 + in2 -> out2 + } + + source.connect(flow).connect(Sink(probe)).run() + + validateProbe(probe, stdRequests, stdResult) + } + + "be reusable multiple times" in { + val out = UndefinedSink[Int] + val probe = StreamTestKit.SubscriberProbe[Int]() + + val source = Source[Int]() { implicit b ⇒ + import FlowGraphImplicits._ + Source(1 to 5) ~> Flow[Int].map(_ * 2) ~> out + out + } + + FlowGraph { implicit b ⇒ + import FlowGraphImplicits._ + val merge = Merge[Int]("merge") + source ~> merge ~> Sink(probe) + source ~> Flow[Int].map(_ * 10) ~> merge + }.run() + + validateProbe(probe, 10, Set(2, 4, 6, 8, 10, 20, 40, 60, 80, 100)) + } + } + "turned into sinks" should { + "work with a Tap" in { + val in = UndefinedSource[Int] + val probe = StreamTestKit.SubscriberProbe[Int]() + + val sink = Sink(partialGraph) { implicit b ⇒ + import FlowGraphImplicits._ + in ~> inMerge + outMerge ~> Flow[String].map(_.toInt) ~> Sink(probe) + in + } + + tap1.connect(sink).run() + + validateProbe(probe, stdRequests, stdResult) + } + + "be transformable with a Pipe" in { + val in = UndefinedSource[String] + + val probe = StreamTestKit.SubscriberProbe[Int]() + + val sink = Sink(partialGraph) { implicit b ⇒ + import FlowGraphImplicits._ + in ~> Flow[String].map(_.toInt) ~> inMerge + outMerge ~> Flow[String].map(_.toInt) ~> Sink(probe) + in + } + + val iSink = Flow[Int].map(_.toString).connect(sink) + tap1.connect(iSink).run() + + validateProbe(probe, stdRequests, stdResult) + } + + "work with a GraphFlow" in { + val in1 = UndefinedSource[Int] + val out1 = UndefinedSink[String] + + val in2 = UndefinedSource[String] + + val probe = StreamTestKit.SubscriberProbe[Int]() + + val flow = Flow(partialGraph) { implicit b ⇒ + import FlowGraphImplicits._ + in1 ~> inMerge + outMerge ~> out1 + in1 -> out1 + } + + val sink = Sink() { implicit b ⇒ + import FlowGraphImplicits._ + in2 ~> Flow[String].map(_.toInt) ~> Sink(probe) + in2 + } + + tap1.connect(flow).connect(sink).run() + + validateProbe(probe, stdRequests, stdResult) + } + } + + "used together" should { + "materialize properly" in { + val probe = StreamTestKit.SubscriberProbe[Int]() + val inSource = SubscriberTap[Int] + val outSink = PublisherDrain[Int] + + val flow = Flow(partialGraph) { implicit b ⇒ + import FlowGraphImplicits._ + val in = UndefinedSource[Int] + val out = UndefinedSink[Int] + in ~> inMerge + outMerge ~> Flow[String].map(_.toInt) ~> out + in -> out + } + + val source = Source[String]() { implicit b ⇒ + import FlowGraphImplicits._ + val out = UndefinedSink[String] + inSource ~> Flow[Int].map(_.toString) ~> out + out + } + + val sink = Sink() { implicit b ⇒ + import FlowGraphImplicits._ + val in = UndefinedSource[String] + in ~> Flow[String].map(_.toInt) ~> outSink + in + } + + val mm = FlowGraph { implicit b ⇒ + import FlowGraphImplicits._ + source ~> Flow[String].map(_.toInt) ~> flow ~> Flow[Int].map(_.toString) ~> sink + }.run() + + val subscriber = mm.materializedTap(inSource) + val publisher = mm.materializedDrain(outSink) + tap1.runWith(PublisherDrain()).subscribe(subscriber) + publisher.subscribe(probe) + + validateProbe(probe, stdRequests, stdResult) + } + } + } +} diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl2/Drain.scala b/akka-stream/src/main/scala/akka/stream/scaladsl2/Drain.scala index f38d7eb854..c0d4122d6c 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl2/Drain.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl2/Drain.scala @@ -26,8 +26,6 @@ import java.util.concurrent.atomic.AtomicReference * Drain nodes themselves (or construct an ActorBasedFlowMaterializer). */ trait Drain[-In] extends Sink[In] { - override def runWith(tap: TapWithKey[In])(implicit materializer: FlowMaterializer): tap.MaterializedType = - tap.connect(this).run().materializedTap(tap) } /** @@ -46,19 +44,20 @@ trait SimpleDrain[-In] extends Drain[In] { * @param flowName the name of the current flow, which should be used in log statements or error messages */ def attach(flowPublisher: Publisher[In @uncheckedVariance], materializer: ActorBasedFlowMaterializer, flowName: String): Unit + /** * This method is only used for Drains that return true from [[#isActive]], which then must * implement it. */ def create(materializer: ActorBasedFlowMaterializer, flowName: String): Subscriber[In] @uncheckedVariance = throw new UnsupportedOperationException(s"forgot to implement create() for $getClass that says isActive==true") + /** * This method indicates whether this Drain can create a Subscriber instead of being * attached to a Publisher. This is only used if the Flow does not contain any * operations. */ def isActive: Boolean = false - } /** @@ -82,12 +81,14 @@ trait DrainWithKey[-In] extends Drain[In] { * @param flowName the name of the current flow, which should be used in log statements or error messages */ def attach(flowPublisher: Publisher[In @uncheckedVariance], materializer: ActorBasedFlowMaterializer, flowName: String): MaterializedType + /** * This method is only used for Drains that return true from [[#isActive]], which then must * implement it. */ def create(materializer: ActorBasedFlowMaterializer, flowName: String): (Subscriber[In] @uncheckedVariance, MaterializedType) = throw new UnsupportedOperationException(s"forgot to implement create() for $getClass that says isActive==true") + /** * This method indicates whether this Drain can create a Subscriber instead of being * attached to a Publisher. This is only used if the Flow does not contain any diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl2/Flow.scala b/akka-stream/src/main/scala/akka/stream/scaladsl2/Flow.scala index e73ae46b1d..921b4b033c 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl2/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl2/Flow.scala @@ -34,7 +34,10 @@ trait Flow[-In, +Out] extends FlowOps[Out] { * the materialized values of the `Tap` and `Drain`, e.g. the `Subscriber` of a [[SubscriberTap]] and * and `Publisher` of a [[PublisherDrain]]. */ - def runWith(tap: TapWithKey[In], drain: DrainWithKey[Out])(implicit materializer: FlowMaterializer): (tap.MaterializedType, drain.MaterializedType) + def runWith(tap: TapWithKey[In], drain: DrainWithKey[Out])(implicit materializer: FlowMaterializer): (tap.MaterializedType, drain.MaterializedType) = { + val m = tap.connect(this).connect(drain).run() + (m.materializedTap(tap), m.materializedDrain(drain)) + } } object Flow { @@ -43,6 +46,26 @@ object Flow { * Example usage: `Flow[Int]` */ def apply[T]: Flow[T, T] = Pipe.empty[T] + + /** + * Creates a `Flow` by using an empty [[FlowGraphBuilder]] on a block that expects a [[FlowGraphBuilder]] and + * returns the `UndefinedSource` and `UndefinedSink`. + */ + def apply[I, O]()(block: FlowGraphBuilder ⇒ (UndefinedSource[I], UndefinedSink[O])): Flow[I, O] = + createFlowFromBuilder(new FlowGraphBuilder(), block) + + /** + * Creates a `Flow` by using a [[FlowGraphBuilder]] from this [[PartialFlowGraph]] on a block that expects + * a [[FlowGraphBuilder]] and returns the `UndefinedSource` and `UndefinedSink`. + */ + def apply[I, O](graph: PartialFlowGraph)(block: FlowGraphBuilder ⇒ (UndefinedSource[I], UndefinedSink[O])): Flow[I, O] = + createFlowFromBuilder(new FlowGraphBuilder(graph.graph), block) + + private def createFlowFromBuilder[I, O](builder: FlowGraphBuilder, + block: FlowGraphBuilder ⇒ (UndefinedSource[I], UndefinedSink[O])): Flow[I, O] = { + val (in, out) = block(builder) + builder.partialBuild().toFlow(in, out) + } } /** @@ -415,7 +438,7 @@ trait FlowOps[+Out] { /** INTERNAL API */ // Storing ops in reverse order - protected def andThen[U](op: AstNode): Repr[U] + private[scaladsl2] def andThen[U](op: AstNode): Repr[U] } /** diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl2/FlowGraph.scala b/akka-stream/src/main/scala/akka/stream/scaladsl2/FlowGraph.scala index 3a69efe315..fdf94214f9 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl2/FlowGraph.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl2/FlowGraph.scala @@ -84,6 +84,8 @@ final class Merge[T](override val name: Option[String]) extends FlowGraphInterna override val maximumOutputCount: Int = 1 override private[akka] def astNode = Ast.Merge + + final override private[scaladsl2] def newInstance() = new Merge[T](None) } object MergePreferred { @@ -131,6 +133,8 @@ final class MergePreferred[T](override val name: Option[String]) extends FlowGra override val maximumOutputCount: Int = 1 override private[akka] def astNode = Ast.MergePreferred + + final override private[scaladsl2] def newInstance() = new MergePreferred[T](None) } object Broadcast { @@ -163,6 +167,8 @@ final class Broadcast[T](override val name: Option[String]) extends FlowGraphInt override def maximumOutputCount: Int = Int.MaxValue override private[akka] def astNode = Ast.Broadcast + + final override private[scaladsl2] def newInstance() = new Broadcast[T](None) } object Balance { @@ -195,6 +201,8 @@ final class Balance[T](override val name: Option[String]) extends FlowGraphInter override def maximumOutputCount: Int = Int.MaxValue override private[akka] def astNode = Ast.Balance + + final override private[scaladsl2] def newInstance() = new Balance[T](None) } object Zip { @@ -243,6 +251,8 @@ final class Zip[A, B](override val name: Option[String]) extends FlowGraphIntern override def maximumOutputCount: Int = 1 override private[akka] def astNode = Ast.Zip + + final override private[scaladsl2] def newInstance() = new Zip[A, B](name = None) } object Unzip { @@ -289,6 +299,8 @@ final class Unzip[A, B](override val name: Option[String]) extends FlowGraphInte override def maximumOutputCount: Int = 2 override private[akka] def astNode = Ast.Unzip + + final override private[scaladsl2] def newInstance() = new Unzip[A, B](name = None) } object Concat { @@ -337,6 +349,8 @@ final class Concat[T](override val name: Option[String]) extends FlowGraphIntern override def maximumOutputCount: Int = 1 override private[akka] def astNode = Ast.Concat + + final override private[scaladsl2] def newInstance() = new Concat[T](name = None) } object UndefinedSink { @@ -361,12 +375,15 @@ object UndefinedSink { * be replaced with [[FlowGraphBuilder#attachDrain]]. */ final class UndefinedSink[-T](override val name: Option[String]) extends FlowGraphInternal.InternalVertex { + override def minimumInputCount: Int = 1 override def maximumInputCount: Int = 1 override def minimumOutputCount: Int = 0 override def maximumOutputCount: Int = 0 override private[akka] def astNode = throw new UnsupportedOperationException("Undefined sinks cannot be materialized") + + final override private[scaladsl2] def newInstance() = new UndefinedSink[T](name = None) } object UndefinedSource { @@ -397,28 +414,40 @@ final class UndefinedSource[+T](override val name: Option[String]) extends FlowG override def maximumOutputCount: Int = 1 override private[akka] def astNode = throw new UnsupportedOperationException("Undefined sources cannot be materialized") + + final override private[scaladsl2] def newInstance() = new UndefinedSource[T](name = None) } /** * INTERNAL API */ private[akka] object FlowGraphInternal { - val OnlyPipesErrorMessage = "Only pipes are supported currently!" + def throwUnsupportedValue(x: Any): Nothing = + throw new IllegalArgumentException(s"Unsupported value [$x] of type [${x.getClass.getName}]. Only Pipes and Graphs are supported!") def UnlabeledPort = -1 - sealed trait Vertex + sealed trait Vertex { + // must return a new instance that is uniquely identifiable (i.e. no name for hashCode or equality) + private[scaladsl2] def newInstance(): Vertex + } + case class TapVertex(tap: Tap[_]) extends Vertex { override def toString = tap.toString // these are unique keys, case class equality would break them final override def equals(other: Any): Boolean = super.equals(other) final override def hashCode: Int = super.hashCode + + final override private[scaladsl2] def newInstance() = this.copy() } + case class DrainVertex(drain: Drain[_]) extends Vertex { override def toString = drain.toString // these are unique keys, case class equality would break them final override def equals(other: Any): Boolean = super.equals(other) final override def hashCode: Int = super.hashCode + + final override private[scaladsl2] def newInstance() = this.copy() } sealed trait InternalVertex extends Vertex { @@ -466,6 +495,13 @@ private[akka] object FlowGraphInternal { } +object FlowGraphBuilder { + private[scaladsl2] def apply[T](partialFlowGraph: PartialFlowGraph)(block: FlowGraphBuilder ⇒ T): T = { + val builder = new FlowGraphBuilder(partialFlowGraph.graph) + block(builder) + } +} + /** * Builder of [[FlowGraph]] and [[PartialFlowGraph]]. * Syntactic sugar is provided by [[FlowGraphImplicits]]. @@ -498,72 +534,104 @@ class FlowGraphBuilder private (graph: Graph[FlowGraphInternal.Vertex, FlowGraph this } + def addEdge[T](source: UndefinedSource[T], junctionIn: JunctionInPort[T]): this.type = addEdge(source, Pipe.empty[T], junctionIn) + def addEdge[In, Out](source: UndefinedSource[In], flow: Flow[In, Out], junctionIn: JunctionInPort[Out]): this.type = { checkJunctionInPortPrecondition(junctionIn) flow match { case pipe: Pipe[In, Out] ⇒ addGraphEdge(source, junctionIn.vertex, pipe, inputPort = junctionIn.port, outputPort = UnlabeledPort) - case _ ⇒ - throw new IllegalArgumentException(OnlyPipesErrorMessage) + case gflow: GraphFlow[In, _, _, Out] ⇒ + val tOut = UndefinedSink[In] + val tIn = UndefinedSource[Out] + addEdge(source, tOut) + addEdge(tIn, junctionIn) + connect(tOut, gflow, tIn) + case x ⇒ throwUnsupportedValue(x) } this } + def addEdge[T](junctionOut: JunctionOutPort[T], sink: UndefinedSink[T]): this.type = + addEdge(junctionOut, Pipe.empty[T], sink) + def addEdge[In, Out](junctionOut: JunctionOutPort[In], flow: Flow[In, Out], sink: UndefinedSink[Out]): this.type = { checkJunctionOutPortPrecondition(junctionOut) flow match { case pipe: Pipe[In, Out] ⇒ addGraphEdge(junctionOut.vertex, sink, pipe, inputPort = UnlabeledPort, outputPort = junctionOut.port) - case _ ⇒ - throw new IllegalArgumentException(OnlyPipesErrorMessage) + case gflow: GraphFlow[In, _, _, Out] ⇒ + val tOut = UndefinedSink[In] + val tIn = UndefinedSource[Out] + addEdge(junctionOut, tOut) + addEdge(tIn, sink) + connect(tOut, gflow, tIn) + case x ⇒ throwUnsupportedValue(x) } this } + def addEdge[T](junctionOut: JunctionOutPort[T], junctionIn: JunctionInPort[T]): this.type = + addEdge(junctionOut, Pipe.empty[T], junctionIn) + def addEdge[In, Out](junctionOut: JunctionOutPort[In], flow: Flow[In, Out], junctionIn: JunctionInPort[Out]): this.type = { checkJunctionOutPortPrecondition(junctionOut) checkJunctionInPortPrecondition(junctionIn) flow match { case pipe: Pipe[In, Out] ⇒ addGraphEdge(junctionOut.vertex, junctionIn.vertex, pipe, inputPort = junctionIn.port, outputPort = junctionOut.port) - case _ ⇒ - throw new IllegalArgumentException(OnlyPipesErrorMessage) + case gflow: GraphFlow[In, _, _, Out] ⇒ + val tOut = UndefinedSink[In] + val tIn = UndefinedSource[Out] + addEdge(junctionOut, tOut) + addEdge(tIn, junctionIn) + connect(tOut, gflow, tIn) + case x ⇒ throwUnsupportedValue(x) } this } + def addEdge[T](source: Source[T], junctionIn: JunctionInPort[T]): this.type = addEdge(source, Pipe.empty[T], junctionIn) + def addEdge[In, Out](source: Source[In], flow: Flow[In, Out], junctionIn: JunctionInPort[Out]): this.type = { (source, flow) match { case (tap: Tap[In], pipe: Pipe[In, Out]) ⇒ addTapPipeEdge(tap, pipe, junctionIn) case (spipe: SourcePipe[In], pipe: Pipe[In, Out]) ⇒ addTapPipeEdge(spipe.input, Pipe(spipe.ops).appendPipe(pipe), junctionIn) - case _ ⇒ - throw new IllegalArgumentException(OnlyPipesErrorMessage) + case (gsource: GraphSource[_, In], _) ⇒ + val tOut = UndefinedSink[In] + val tIn = UndefinedSource[Out] + addEdge(gsource, tOut) + addEdge(tIn, junctionIn) + connect(tOut, flow, tIn) + case x ⇒ throwUnsupportedValue(x) } this } - def addEdge[In, Out](junctionOut: JunctionOutPort[In], sink: Sink[In]): this.type = { - sink match { - case drain: Drain[In] ⇒ addPipeDrainEdge(junctionOut, Pipe.empty[In], drain) - case pipe: SinkPipe[In] ⇒ addPipeDrainEdge(junctionOut, Pipe(pipe.ops), pipe.output) - case _ ⇒ throw new IllegalArgumentException(OnlyPipesErrorMessage) - } - this - } + def addEdge[T](junctionOut: JunctionOutPort[T], sink: Sink[T]): this.type = + addEdge(junctionOut, Pipe.empty[T], sink) def addEdge[In, Out](junctionOut: JunctionOutPort[In], flow: Flow[In, Out], sink: Sink[Out]): this.type = { (flow, sink) match { case (pipe: Pipe[In, Out], drain: Drain[Out]) ⇒ addPipeDrainEdge(junctionOut, pipe, drain) case (pipe: Pipe[In, Out], spipe: SinkPipe[Out]) ⇒ - addEdge(junctionOut, pipe.connect(spipe)) - case _ ⇒ throw new IllegalArgumentException(OnlyPipesErrorMessage) + addPipeDrainEdge(junctionOut, pipe.appendPipe(Pipe(spipe.ops)), spipe.output) + case (_, gsink: GraphSink[Out, _]) ⇒ + val tOut = UndefinedSink[In] + val tIn = UndefinedSource[Out] + addEdge(tIn, gsink) + addEdge(junctionOut, tOut) + connect(tOut, flow, tIn) + case x ⇒ throwUnsupportedValue(x) } this } + def addEdge[T](source: Source[T], sink: Sink[T]): this.type = addEdge(source, Pipe.empty[T], sink) + def addEdge[In, Out](source: Source[In], flow: Flow[In, Out], sink: Sink[Out]): this.type = { (source, flow, sink) match { case (tap: Tap[In], pipe: Pipe[In, Out], drain: Drain[Out]) ⇒ @@ -581,54 +649,98 @@ class FlowGraphBuilder private (graph: Graph[FlowGraphInternal.Vertex, FlowGraph val tap = sourcePipe.input val newPipe = Pipe(sourcePipe.ops).connect(pipe) addEdge(tap, newPipe, drain) // recursive, but now it is a Tap-Pipe-Drain - case _ ⇒ - throw new IllegalArgumentException(OnlyPipesErrorMessage) + case (_, gflow: GraphFlow[In, _, _, Out], _) ⇒ + val tOut = UndefinedSink[In] + val tIn = UndefinedSource[Out] + addEdge(source, tOut) + addEdge(tIn, sink) + connect(tOut, gflow, tIn) + case x ⇒ throwUnsupportedValue(x) } this } + def addEdge[T](source: UndefinedSource[T], sink: UndefinedSink[T]): this.type = addEdge(source, Pipe.empty[T], sink) + def addEdge[In, Out](source: UndefinedSource[In], flow: Flow[In, Out], sink: UndefinedSink[Out]): this.type = { flow match { case pipe: Pipe[In, Out] ⇒ addGraphEdge(source, sink, pipe, inputPort = UnlabeledPort, outputPort = UnlabeledPort) - case _ ⇒ - throw new IllegalArgumentException(OnlyPipesErrorMessage) + case gflow: GraphFlow[In, _, _, Out] ⇒ + val tOut = UndefinedSink[In] + val tIn = UndefinedSource[Out] + addEdge(source, tOut) + addEdge(tIn, sink) + connect(tOut, gflow, tIn) + case x ⇒ throwUnsupportedValue(x) } this } + def addEdge[T](source: UndefinedSource[T], sink: Sink[T]): this.type = addEdge(source, Pipe.empty[T], sink) + def addEdge[In, Out](source: UndefinedSource[In], flow: Flow[In, Out], sink: Sink[Out]): this.type = { (flow, sink) match { case (pipe: Pipe[In, Out], drain: Drain[Out]) ⇒ addGraphEdge(source, DrainVertex(drain), pipe, inputPort = UnlabeledPort, outputPort = UnlabeledPort) case (pipe: Pipe[In, Out], spipe: SinkPipe[Out]) ⇒ addGraphEdge(source, DrainVertex(spipe.output), pipe.appendPipe(Pipe(spipe.ops)), inputPort = UnlabeledPort, outputPort = UnlabeledPort) - case _ ⇒ throw new IllegalArgumentException(OnlyPipesErrorMessage) + case (gflow: GraphFlow[In, _, _, Out], _) ⇒ + val tOut = UndefinedSink[In] + val tIn = UndefinedSource[Out] + addEdge(source, tOut) + addEdge(tIn, sink) + connect(tOut, gflow, tIn) + case (_, gSink: GraphSink[Out, _]) ⇒ + val oOut = UndefinedSink[Out] + addEdge(source, flow, oOut) + gSink.importAndConnect(this, oOut) + case x ⇒ throwUnsupportedValue(x) } this } + def addEdge[T](source: Source[T], sink: UndefinedSink[T]): this.type = addEdge(source, Pipe.empty[T], sink) + def addEdge[In, Out](source: Source[In], flow: Flow[In, Out], sink: UndefinedSink[Out]): this.type = { (flow, source) match { case (pipe: Pipe[In, Out], tap: Tap[In]) ⇒ addGraphEdge(TapVertex(tap), sink, pipe, inputPort = UnlabeledPort, outputPort = UnlabeledPort) case (pipe: Pipe[In, Out], spipe: SourcePipe[Out]) ⇒ addGraphEdge(TapVertex(spipe.input), sink, Pipe(spipe.ops).appendPipe(pipe), inputPort = UnlabeledPort, outputPort = UnlabeledPort) - case _ ⇒ throw new IllegalArgumentException(OnlyPipesErrorMessage) + case (_, gsource: GraphSource[_, In]) ⇒ + val tOut1 = UndefinedSource[In] + val tOut2 = UndefinedSink[In] + val tIn = UndefinedSource[Out] + addEdge(tOut1, tOut2) + gsource.importAndConnect(this, tOut1) + addEdge(tIn, sink) + connect(tOut2, flow, tIn) + case x ⇒ throwUnsupportedValue(x) } this } - private def addGraphEdge[In, Out](from: Vertex, to: Vertex, pipe: Pipe[In, Out], inputPort: Int, outputPort: Int): Unit = { + private def uncheckedAddGraphEdge[In, Out](from: Vertex, to: Vertex, pipe: Pipe[In, Out], inputPort: Int, outputPort: Int): Unit = { if (edgeQualifier == Int.MaxValue) throw new IllegalArgumentException(s"Too many edges") - checkAddTapDrainPrecondition(from) - checkAddTapDrainPrecondition(to) val label = EdgeLabel(edgeQualifier)(pipe.asInstanceOf[Pipe[Any, Nothing]], inputPort, outputPort) graph.addLEdge(from, to)(label) edgeQualifier += 1 } + private def addGraphEdge[In, Out](from: Vertex, to: Vertex, pipe: Pipe[In, Out], inputPort: Int, outputPort: Int): Unit = { + checkAddTapDrainPrecondition(from) + checkAddTapDrainPrecondition(to) + uncheckedAddGraphEdge(from, to, pipe, inputPort, outputPort) + } + + private def addOrReplaceGraphEdge[In, Out](from: Vertex, to: Vertex, pipe: Pipe[In, Out], inputPort: Int, outputPort: Int): Unit = { + checkAddOrReplaceTapDrainPrecondition(from) + checkAddOrReplaceTapDrainPrecondition(to) + uncheckedAddGraphEdge(from, to, pipe, inputPort, outputPort) + } + def attachSink[Out](token: UndefinedSink[Out], sink: Sink[Out]): this.type = { graph.find(token) match { case Some(existing) ⇒ @@ -640,7 +752,9 @@ class FlowGraphBuilder private (graph: Graph[FlowGraphInternal.Vertex, FlowGraph case spipe: SinkPipe[Out] ⇒ val pipe = edge.label.pipe.appendPipe(Pipe(spipe.ops)) addGraphEdge(edge.from.value, DrainVertex(spipe.output), pipe, edge.label.inputPort, edge.label.outputPort) - case _ ⇒ throw new IllegalArgumentException(OnlyPipesErrorMessage) + case gsink: GraphSink[Out, _] ⇒ + gsink.importAndConnect(this, token) + case x ⇒ throwUnsupportedValue(x) } case None ⇒ throw new IllegalArgumentException(s"No matching UndefinedSink [${token}]") @@ -659,8 +773,9 @@ class FlowGraphBuilder private (graph: Graph[FlowGraphInternal.Vertex, FlowGraph case spipe: SourcePipe[In] ⇒ val pipe = Pipe(spipe.ops).appendPipe(edge.label.pipe) addGraphEdge(TapVertex(spipe.input), edge.to.value, pipe, edge.label.inputPort, edge.label.outputPort) - case _ ⇒ - throw new IllegalArgumentException(OnlyPipesErrorMessage) + case gsource: GraphSource[_, In] ⇒ + gsource.importAndConnect(this, token) + case x ⇒ throwUnsupportedValue(x) } case None ⇒ throw new IllegalArgumentException(s"No matching UndefinedSource [${token}]") @@ -685,9 +800,10 @@ class FlowGraphBuilder private (graph: Graph[FlowGraphInternal.Vertex, FlowGraph val newPipe = outEdge.label.pipe.appendPipe(pipe.asInstanceOf[Pipe[Any, Nothing]]).appendPipe(inEdge.label.pipe) graph.remove(out) graph.remove(in) - addGraphEdge(outEdge.from.value, inEdge.to.value, newPipe, inEdge.label.inputPort, outEdge.label.outputPort) - case _ ⇒ - throw new IllegalArgumentException(OnlyPipesErrorMessage) + addOrReplaceGraphEdge(outEdge.from.value, inEdge.to.value, newPipe, inEdge.label.inputPort, outEdge.label.outputPort) + case gflow: GraphFlow[A, _, _, B] ⇒ + gflow.importAndConnect(this, out, in) + case x ⇒ throwUnsupportedValue(x) } this @@ -716,6 +832,17 @@ class FlowGraphBuilder private (graph: Graph[FlowGraphInternal.Vertex, FlowGraph addGraphEdge(edge.from.value, edge.to.value, edge.label.pipe, edge.label.inputPort, edge.label.outputPort) } + private[scaladsl2] def remapPartialFlowGraph(partialFlowGraph: PartialFlowGraph, vertexMapping: Map[Vertex, Vertex]): this.type = { + val mapping = collection.mutable.Map.empty[Vertex, Vertex] ++ vertexMapping + def get(vertex: Vertex): Vertex = mapping.getOrElseUpdate(vertex, vertex.newInstance()) + + partialFlowGraph.graph.edges.foreach { edge ⇒ + addGraphEdge(get(edge.from.value), get(edge.to.value), edge.label.pipe, edge.label.inputPort, edge.label.outputPort) + } + + this + } + /** * Flow graphs with cycles are in general dangerous as it can result in deadlocks. * Therefore, cycles in the graph are by default disallowed. `IllegalArgumentException` will @@ -729,11 +856,20 @@ class FlowGraphBuilder private (graph: Graph[FlowGraphInternal.Vertex, FlowGraph private def checkAddTapDrainPrecondition(vertex: Vertex): Unit = { vertex match { case node @ (_: UndefinedSource[_] | _: UndefinedSink[_]) ⇒ - require(graph.find(node) == None, s"[$node] instance is already used in this flow graph") + require(!graph.contains(node), s"[$node] instance is already used in this flow graph") case _ ⇒ // ok } } + private def checkAddOrReplaceTapDrainPrecondition(vertex: Vertex): Unit = { + vertex match { + // it is ok to add or replace edges with new or existing undefined sources or sinks + case node @ (_: UndefinedSource[_] | _: UndefinedSink[_]) ⇒ + // all other nodes must already exist in the graph + case node ⇒ require(graph.contains(node), s"[$node] instance is not in this flow graph") + } + } + private def checkJunctionInPortPrecondition(junction: JunctionInPort[_]): Unit = { junction.vertex match { case iv: InternalVertex ⇒ @@ -741,7 +877,7 @@ class FlowGraphBuilder private (graph: Graph[FlowGraphInternal.Vertex, FlowGraph case Some(node) ⇒ require( (node.inDegree + 1) <= iv.maximumInputCount, - s"${node.value} must have at most ${iv.maximumInputCount} incoming edges") + s"${node.value} must have at most ${iv.maximumInputCount} incoming edges, , has ${node.inDegree}\n${graph.edges}") case _ ⇒ // ok } case _ ⇒ // ok, no checks here @@ -755,7 +891,7 @@ class FlowGraphBuilder private (graph: Graph[FlowGraphInternal.Vertex, FlowGraph case Some(node) ⇒ require( (node.outDegree + 1) <= iv.maximumOutputCount, - s"${node.value} must have at most ${iv.maximumOutputCount} outgoing edges") + s"${node.value} must have at most ${iv.maximumOutputCount} outgoing edges, has ${node.outDegree}\n${graph.edges}") case _ ⇒ // ok } case _ ⇒ // ok, no checks here @@ -884,13 +1020,13 @@ object FlowGraph { * Build a `FlowGraph` by starting with one of the `apply` methods in * in [[FlowGraph$ companion object]]. Syntactic sugar is provided by [[FlowGraphImplicits]]. */ -class FlowGraph private[akka] (private[akka] val graph: ImmutableGraph[FlowGraphInternal.Vertex, FlowGraphInternal.EdgeType]) { +class FlowGraph private[akka] (private[akka] val graph: ImmutableGraph[FlowGraphInternal.Vertex, FlowGraphInternal.EdgeType]) extends RunnableFlow { import FlowGraphInternal._ /** * Materialize the `FlowGraph` and attach all sinks and sources. */ - def run()(implicit materializer: FlowMaterializer): MaterializedMap = { + override def run()(implicit materializer: FlowMaterializer): MaterializedMap = { val edges = graph.edges if (edges.size == 1) { val edge = edges.head @@ -1046,7 +1182,6 @@ object PartialFlowGraph { block(builder) builder.partialBuild() } - } /** @@ -1069,6 +1204,48 @@ class PartialFlowGraph private[akka] (private[akka] val graph: ImmutableGraph[Fl case n: UndefinedSink[_] ⇒ n }.toSet + /** + * Creates a [[Source]] from this `PartialFlowGraph`. There needs to be only one [[UndefinedSink]] and + * no [[UndefinedSource]] in the graph, and you need to provide it as a parameter. + */ + def toSource[O](out: UndefinedSink[O]): Source[O] = { + require(graph.contains(out), s"Couldn't create Source with [$out], no matching UndefinedSink") + checkUndefinedSinksAndSources(sources = Nil, sinks = List(out), description = "Source") + GraphSource(this, out, Pipe.empty[O]) + } + + /** + * Creates a [[Flow]] from this `PartialFlowGraph`. There needs to be only one [[UndefinedSource]] and + * one [[UndefinedSink]] in the graph, and you need to provide them as parameters. + */ + def toFlow[I, O](in: UndefinedSource[I], out: UndefinedSink[O]): Flow[I, O] = { + checkUndefinedSinksAndSources(sources = List(in), sinks = List(out), description = "Flow") + GraphFlow(Pipe.empty[I], in, this, out, Pipe.empty[O]) + } + + /** + * Creates a [[Sink]] from this `PartialFlowGraph`. There needs to be only one [[UndefinedSource]] and + * no [[UndefinedSink]] in the graph, and you need to provide it as a parameter. + */ + def toSink[I](in: UndefinedSource[I]): Sink[I] = { + checkUndefinedSinksAndSources(sources = List(in), sinks = Nil, description = "Sink") + GraphSink(Pipe.empty[I], in, this) + } + + private def checkUndefinedSinksAndSources(sources: List[UndefinedSource[_]], sinks: List[UndefinedSink[_]], description: String): Unit = { + def expected(name: String, num: Int): String = s"Couldn't create $description, expected $num undefined $name${if (num == 1) "" else "s"}, but found" + def checkNodes(nodes: List[Vertex], nodeDescription: String): Int = (0 /: nodes) { + case (size, node) ⇒ + require(graph.contains(node), s"Couldn't create $description with [$node], no matching $nodeDescription") + size + 1 + } + val numSources = checkNodes(sources, "UndefinedSource") + val numSinks = checkNodes(sinks, "UndefinedSink") + val uSources = undefinedSources + require(uSources.size == numSources, s"${expected("source", numSources)} ${uSources}") + val uSinks = undefinedSinks + require(uSinks.size == numSinks, s"${expected("sink", numSinks)} ${uSinks}") + } } /** @@ -1104,20 +1281,26 @@ object FlowGraphImplicits { new SourceNextStep(source, flow, builder) def ~>(junctionIn: JunctionInPort[Out])(implicit builder: FlowGraphBuilder): JunctionOutPort[junctionIn.NextT] = { - builder.addEdge(source, Pipe.empty[Out], junctionIn) + builder.addEdge(source, junctionIn) junctionIn.next } def ~>(sink: Sink[Out])(implicit builder: FlowGraphBuilder): Unit = - builder.addEdge(source, Pipe.empty[Out], sink) + builder.addEdge(source, sink) } class SourceNextStep[In, Out](source: Source[In], flow: Flow[In, Out], builder: FlowGraphBuilder) { + def ~>[O](otherflow: Flow[Out, O])(implicit builder: FlowGraphBuilder): SourceNextStep[In, O] = + new SourceNextStep(source, flow.connect(otherflow), builder) + def ~>(junctionIn: JunctionInPort[Out]): JunctionOutPort[junctionIn.NextT] = { builder.addEdge(source, flow, junctionIn) junctionIn.next } + def ~>(sink: UndefinedSink[Out])(implicit builder: FlowGraphBuilder): Unit = + builder.addEdge(source, flow, sink) + def ~>(sink: Sink[Out]): Unit = builder.addEdge(source, flow, sink) } @@ -1130,7 +1313,7 @@ object FlowGraphImplicits { builder.addEdge(junction, Pipe.empty[In], sink) def ~>(junctionIn: JunctionInPort[In])(implicit builder: FlowGraphBuilder): JunctionOutPort[junctionIn.NextT] = { - builder.addEdge(junction, Pipe.empty[In], junctionIn) + builder.addEdge(junction, junctionIn) junctionIn.next } @@ -1157,7 +1340,7 @@ object FlowGraphImplicits { new UndefinedSourceNextStep(source, flow, builder) def ~>(junctionIn: JunctionInPort[In])(implicit builder: FlowGraphBuilder): JunctionOutPort[junctionIn.NextT] = { - builder.addEdge(source, Pipe.empty[In], junctionIn) + builder.addEdge(source, junctionIn) junctionIn.next } @@ -1168,6 +1351,16 @@ object FlowGraphImplicits { builder.addEdge(source, flow, junctionIn) junctionIn.next } - } + def ~>[T](otherFlow: Flow[Out, T])(implicit builder: FlowGraphBuilder): UndefinedSourceNextStep[In, T] = + new UndefinedSourceNextStep(source, flow.connect(otherFlow), builder) + + def ~>(sink: Sink[Out]): Unit = { + builder.addEdge(source, flow, sink) + } + + def ~>(sink: UndefinedSink[Out]): Unit = { + builder.addEdge(source, flow, sink) + } + } } diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl2/GraphFlow.scala b/akka-stream/src/main/scala/akka/stream/scaladsl2/GraphFlow.scala new file mode 100644 index 0000000000..fae11c7e09 --- /dev/null +++ b/akka-stream/src/main/scala/akka/stream/scaladsl2/GraphFlow.scala @@ -0,0 +1,128 @@ +/** + * Copyright (C) 2014 Typesafe Inc. + */ +package akka.stream.scaladsl2 + +import akka.stream.impl2.Ast.AstNode + +import scala.annotation.unchecked.uncheckedVariance + +private[scaladsl2] case class GraphFlow[-In, CIn, COut, +Out](inPipe: Pipe[In, CIn], in: UndefinedSource[CIn], graph: PartialFlowGraph, out: UndefinedSink[COut], outPipe: Pipe[COut, Out]) extends Flow[In, Out] { + override type Repr[+O] = GraphFlow[In @uncheckedVariance, CIn, COut, O] + + private[scaladsl2] def prepend[T](pipe: Pipe[T, In]): GraphFlow[T, CIn, COut, Out] = copy(inPipe = pipe.appendPipe(inPipe)) + + private[scaladsl2] def prepend(pipe: SourcePipe[In]): GraphSource[COut, Out] = { + val newGraph = PartialFlowGraph(graph) { b ⇒ + b.attachSource(in, pipe.appendPipe(inPipe)) + } + GraphSource(newGraph, out, outPipe) + } + + private[scaladsl2] def remap(builder: FlowGraphBuilder): (UndefinedSource[CIn], UndefinedSink[COut]) = { + val nIn = UndefinedSource[CIn] + val nOut = UndefinedSink[COut] + builder.remapPartialFlowGraph(graph, Map(in -> nIn, out -> nOut)) + (nIn, nOut) + } + + private[scaladsl2] def importAndConnect(builder: FlowGraphBuilder, oOut: UndefinedSink[In @uncheckedVariance], oIn: UndefinedSource[Out @uncheckedVariance]): Unit = { + val (nIn, nOut) = remap(builder) + builder.connect(oOut, inPipe, nIn) + builder.connect(nOut, outPipe, oIn) + } + + def connect[T](flow: Flow[Out, T]): Flow[In, T] = flow match { + case pipe: Pipe[Out, T] ⇒ copy(outPipe = outPipe.appendPipe(pipe)) + case gFlow: GraphFlow[Out, _, _, T] ⇒ + val (newGraph, nOut) = FlowGraphBuilder(graph) { b ⇒ + val (oIn, oOut) = gFlow.remap(b) + b.connect(out, outPipe.connect(gFlow.inPipe), oIn) + (b.partialBuild(), oOut) + } + GraphFlow(inPipe, in, newGraph, nOut, gFlow.outPipe) + } + + override def connect(sink: Sink[Out]) = sink match { + case drain: Drain[Out] ⇒ connect(Pipe.empty.withDrain(drain)) // recursive, but now it is a SinkPipe + case sinkPipe: SinkPipe[Out] ⇒ + val newGraph = PartialFlowGraph(this.graph) { builder ⇒ + builder.attachSink(out, outPipe.connect(sinkPipe)) + } + GraphSink(inPipe, in, newGraph) + case gSink: GraphSink[Out, Out] ⇒ + val newGraph = PartialFlowGraph(graph) { b ⇒ + val oIn = gSink.remap(b) + b.connect(out, outPipe.connect(gSink.inPipe), oIn) + } + GraphSink(inPipe, in, newGraph) + } + + override private[scaladsl2] def andThen[T](op: AstNode): Repr[T] = copy(outPipe = outPipe.andThen(op)) +} + +private[scaladsl2] case class GraphSource[COut, +Out](graph: PartialFlowGraph, out: UndefinedSink[COut], outPipe: Pipe[COut, Out]) extends Source[Out] { + override type Repr[+O] = GraphSource[COut, O] + + private[scaladsl2] def remap(builder: FlowGraphBuilder): UndefinedSink[COut] = { + val nOut = UndefinedSink[COut] + builder.remapPartialFlowGraph(graph, Map(out -> nOut)) + nOut + } + + private[scaladsl2] def importAndConnect(builder: FlowGraphBuilder, oIn: UndefinedSource[Out @uncheckedVariance]): Unit = { + val nOut = remap(builder) + builder.connect(nOut, outPipe, oIn) + } + + override def connect[T](flow: Flow[Out, T]): Source[T] = flow match { + case pipe: Pipe[Out, T] ⇒ copy(outPipe = outPipe.appendPipe(pipe)) + case gFlow: GraphFlow[Out, _, _, T] ⇒ + val (newGraph, nOut) = FlowGraphBuilder(graph) { b ⇒ + val (oIn, oOut) = gFlow.remap(b) + b.connect(out, outPipe.connect(gFlow.inPipe), oIn) + (b.partialBuild(), oOut) + } + GraphSource(newGraph, nOut, gFlow.outPipe) + } + + override def connect(sink: Sink[Out]): RunnableFlow = sink match { + case drain: Drain[Out] ⇒ + connect(Pipe.empty.withDrain(drain)) // recursive, but now it is a SinkPipe + case sinkPipe: SinkPipe[Out] ⇒ + FlowGraph(this.graph) { implicit builder ⇒ + builder.attachSink(out, outPipe.connect(sinkPipe)) + } + case gSink: GraphSink[Out, _] ⇒ + FlowGraph(graph) { b ⇒ + val oIn = gSink.remap(b) + b.connect(out, outPipe.connect(gSink.inPipe), oIn) + } + } + + override private[scaladsl2] def andThen[T](op: AstNode): Repr[T] = copy(outPipe = outPipe.andThen(op)) +} + +private[scaladsl2] case class GraphSink[-In, CIn](inPipe: Pipe[In, CIn], in: UndefinedSource[CIn], graph: PartialFlowGraph) extends Sink[In] { + + private[scaladsl2] def remap(builder: FlowGraphBuilder): UndefinedSource[CIn] = { + val nIn = UndefinedSource[CIn] + builder.remapPartialFlowGraph(graph, Map(in -> nIn)) + nIn + } + + private[scaladsl2] def prepend(pipe: SourcePipe[In]): FlowGraph = { + FlowGraph(this.graph) { b ⇒ + b.attachSource(in, pipe.connect(inPipe)) + } + } + + private[scaladsl2] def prepend[T](pipe: Pipe[T, In]): GraphSink[T, CIn] = { + GraphSink(pipe.appendPipe(inPipe), in, graph) + } + + private[scaladsl2] def importAndConnect(builder: FlowGraphBuilder, oOut: UndefinedSink[In @uncheckedVariance]): Unit = { + val nIn = remap(builder) + builder.connect(oOut, inPipe, nIn) + } +} diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl2/Pipe.scala b/akka-stream/src/main/scala/akka/stream/scaladsl2/Pipe.scala index 2dcbff2b15..76095d0c70 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl2/Pipe.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl2/Pipe.scala @@ -13,8 +13,6 @@ import scala.language.existentials private[scaladsl2] object Pipe { private val emptyInstance = Pipe[Any, Any](ops = Nil) def empty[T]: Pipe[T, T] = emptyInstance.asInstanceOf[Pipe[T, T]] - - val OnlyPipesErrorMessage = "Only pipes are supported currently!" } /** @@ -23,26 +21,23 @@ private[scaladsl2] object Pipe { private[scaladsl2] final case class Pipe[-In, +Out](ops: List[AstNode]) extends Flow[In, Out] { override type Repr[+O] = Pipe[In @uncheckedVariance, O] - override protected def andThen[U](op: AstNode): Repr[U] = this.copy(ops = op :: ops) + override private[scaladsl2] def andThen[U](op: AstNode): Repr[U] = this.copy(ops = op :: ops) private[scaladsl2] def withDrain(out: Drain[Out]): SinkPipe[In] = SinkPipe(out, ops) private[scaladsl2] def withTap(in: Tap[In]): SourcePipe[Out] = SourcePipe(in, ops) override def connect[T](flow: Flow[Out, T]): Flow[In, T] = flow match { - case p: Pipe[T, In] ⇒ Pipe(p.ops ++: ops) - case _ ⇒ throw new IllegalArgumentException(Pipe.OnlyPipesErrorMessage) + case p: Pipe[T, In] ⇒ Pipe(p.ops ++: ops) + case gf: GraphFlow[Out, _, _, T] ⇒ gf.prepend(this) + case x ⇒ FlowGraphInternal.throwUnsupportedValue(x) } override def connect(sink: Sink[Out]): Sink[In] = sink match { - case sp: SinkPipe[Out] ⇒ sp.prependPipe(this) - case d: Drain[Out] ⇒ this.withDrain(d) - case _ ⇒ throw new IllegalArgumentException(Pipe.OnlyPipesErrorMessage) - } - - override def runWith(tap: TapWithKey[In], drain: DrainWithKey[Out])(implicit materializer: FlowMaterializer): (tap.MaterializedType, drain.MaterializedType) = { - val m = tap.connect(this).connect(drain).run() - (m.materializedTap(tap), m.materializedDrain(drain)) + case d: Drain[Out] ⇒ this.withDrain(d) + case sp: SinkPipe[Out] ⇒ sp.prependPipe(this) + case gs: GraphSink[Out, _] ⇒ gs.prepend(this) + case x ⇒ FlowGraphInternal.throwUnsupportedValue(x) } private[scaladsl2] def appendPipe[T](pipe: Pipe[Out, T]): Pipe[In, T] = Pipe(pipe.ops ++: ops) @@ -56,10 +51,6 @@ private[scaladsl2] final case class SinkPipe[-In](output: Drain[_], ops: List[As private[scaladsl2] def withTap(in: Tap[In]): RunnablePipe = RunnablePipe(in, output, ops) private[scaladsl2] def prependPipe[T](pipe: Pipe[T, In]): SinkPipe[T] = SinkPipe(output, ops ::: pipe.ops) - - override def runWith(tap: TapWithKey[In])(implicit materializer: FlowMaterializer): tap.MaterializedType = - tap.connect(this).run().materializedTap(tap) - } /** @@ -68,26 +59,24 @@ private[scaladsl2] final case class SinkPipe[-In](output: Drain[_], ops: List[As private[scaladsl2] final case class SourcePipe[+Out](input: Tap[_], ops: List[AstNode]) extends Source[Out] { override type Repr[+O] = SourcePipe[O] - override protected def andThen[U](op: AstNode): Repr[U] = SourcePipe(input, op :: ops) + override private[scaladsl2] def andThen[U](op: AstNode): Repr[U] = SourcePipe(input, op :: ops) private[scaladsl2] def withDrain(out: Drain[Out]): RunnablePipe = RunnablePipe(input, out, ops) private[scaladsl2] def appendPipe[T](pipe: Pipe[Out, T]): SourcePipe[T] = SourcePipe(input, pipe.ops ++: ops) override def connect[T](flow: Flow[Out, T]): Source[T] = flow match { - case p: Pipe[Out, T] ⇒ appendPipe(p) - case _ ⇒ throw new IllegalArgumentException(Pipe.OnlyPipesErrorMessage) + case p: Pipe[Out, T] ⇒ appendPipe(p) + case g: GraphFlow[Out, _, _, T] ⇒ g.prepend(this) + case x ⇒ FlowGraphInternal.throwUnsupportedValue(x) } override def connect(sink: Sink[Out]): RunnableFlow = sink match { - case sp: SinkPipe[Out] ⇒ RunnablePipe(input, sp.output, sp.ops ++: ops) - case d: Drain[Out] ⇒ this.withDrain(d) - case _ ⇒ throw new IllegalArgumentException(Pipe.OnlyPipesErrorMessage) + case sp: SinkPipe[Out] ⇒ RunnablePipe(input, sp.output, sp.ops ++: ops) + case d: Drain[Out] ⇒ this.withDrain(d) + case g: GraphSink[Out, _] ⇒ g.prepend(this) + case x ⇒ FlowGraphInternal.throwUnsupportedValue(x) } - - override def runWith(drain: DrainWithKey[Out])(implicit materializer: FlowMaterializer): drain.MaterializedType = - withDrain(drain).run().materializedDrain(drain) - } /** diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl2/Sink.scala b/akka-stream/src/main/scala/akka/stream/scaladsl2/Sink.scala index 9aa76947ee..c982edd1cd 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl2/Sink.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl2/Sink.scala @@ -18,7 +18,9 @@ trait Sink[-In] { * Connect this `Sink` to a `Tap` and run it. The returned value is the materialized value * of the `Tap`, e.g. the `Subscriber` of a [[SubscriberTap]]. */ - def runWith(tap: TapWithKey[In])(implicit materializer: FlowMaterializer): tap.MaterializedType + def runWith(tap: TapWithKey[In])(implicit materializer: FlowMaterializer): tap.MaterializedType = + tap.connect(this).run().materializedTap(tap) + } object Sink { @@ -26,4 +28,23 @@ object Sink { * Helper to create [[Sink]] from `Subscriber`. */ def apply[T](subscriber: Subscriber[T]): Drain[T] = SubscriberDrain(subscriber) + + /** + * Creates a `Sink` by using an empty [[FlowGraphBuilder]] on a block that expects a [[FlowGraphBuilder]] and + * returns the `UndefinedSource`. + */ + def apply[T]()(block: FlowGraphBuilder ⇒ UndefinedSource[T]): Sink[T] = + createSinkFromBuilder(new FlowGraphBuilder(), block) + + /** + * Creates a `Sink` by using a FlowGraphBuilder from this [[PartialFlowGraph]] on a block that expects + * a [[FlowGraphBuilder]] and returns the `UndefinedSource`. + */ + def apply[T](graph: PartialFlowGraph)(block: FlowGraphBuilder ⇒ UndefinedSource[T]): Sink[T] = + createSinkFromBuilder(new FlowGraphBuilder(graph.graph), block) + + private def createSinkFromBuilder[T](builder: FlowGraphBuilder, block: FlowGraphBuilder ⇒ UndefinedSource[T]): Sink[T] = { + val in = block(builder) + builder.partialBuild().toSink(in) + } } diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl2/Source.scala b/akka-stream/src/main/scala/akka/stream/scaladsl2/Source.scala index dc1904721d..12f83729ec 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl2/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl2/Source.scala @@ -35,7 +35,8 @@ trait Source[+Out] extends FlowOps[Out] { * Connect this `Source` to a `Drain` and run it. The returned value is the materialized value * of the `Drain`, e.g. the `Publisher` of a [[PublisherDrain]]. */ - def runWith(drain: DrainWithKey[Out])(implicit materializer: FlowMaterializer): drain.MaterializedType + def runWith(drain: DrainWithKey[Out])(implicit materializer: FlowMaterializer): drain.MaterializedType = + connect(drain).run().materializedDrain(drain) /** * Shortcut for running this `Source` with a fold function. @@ -136,4 +137,22 @@ object Source { */ def failed[T](cause: Throwable): Source[T] = apply(ErrorPublisher(cause)) + /** + * Creates a `Source` by using an empty [[FlowGraphBuilder]] on a block that expects a [[FlowGraphBuilder]] and + * returns the `UndefinedSink`. + */ + def apply[T]()(block: FlowGraphBuilder ⇒ UndefinedSink[T]): Source[T] = + createSourceFromBuilder(new FlowGraphBuilder(), block) + + /** + * Creates a `Source` by using a [[FlowGraphBuilder]] from this [[PartialFlowGraph]] on a block that expects + * a [[FlowGraphBuilder]] and returns the `UndefinedSink`. + */ + def apply[T](graph: PartialFlowGraph)(block: FlowGraphBuilder ⇒ UndefinedSink[T]): Source[T] = + createSourceFromBuilder(new FlowGraphBuilder(graph.graph), block) + + private def createSourceFromBuilder[T](builder: FlowGraphBuilder, block: FlowGraphBuilder ⇒ UndefinedSink[T]): Source[T] = { + val out = block(builder) + builder.partialBuild().toSource(out) + } } diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl2/Tap.scala b/akka-stream/src/main/scala/akka/stream/scaladsl2/Tap.scala index cd0b3895da..1d807e2aee 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl2/Tap.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl2/Tap.scala @@ -34,11 +34,8 @@ trait Tap[+Out] extends Source[Out] { override def connect(sink: Sink[Out]): RunnableFlow = sourcePipe.connect(sink) - override def runWith(drain: DrainWithKey[Out])(implicit materializer: FlowMaterializer): drain.MaterializedType = - connect(drain).run().materializedDrain(drain) - /** INTERNAL API */ - override protected def andThen[U](op: AstNode) = SourcePipe(this, List(op)) + override private[scaladsl2] def andThen[U](op: AstNode) = SourcePipe(this, List(op)) } /** @@ -57,12 +54,14 @@ trait SimpleTap[+Out] extends Tap[Out] { * @param flowName the name of the current flow, which should be used in log statements or error messages */ def attach(flowSubscriber: Subscriber[Out] @uncheckedVariance, materializer: ActorBasedFlowMaterializer, flowName: String): Unit + /** * This method is only used for Taps that return true from [[#isActive]], which then must * implement it. */ def create(materializer: ActorBasedFlowMaterializer, flowName: String): Publisher[Out] @uncheckedVariance = throw new UnsupportedOperationException(s"forgot to implement create() for $getClass that says isActive==true") + /** * This method indicates whether this Tap can create a Publisher instead of being * attached to a Subscriber. This is only used if the Flow does not contain any @@ -96,12 +95,14 @@ trait TapWithKey[+Out] extends Tap[Out] { * @param flowName the name of the current flow, which should be used in log statements or error messages */ def attach(flowSubscriber: Subscriber[Out] @uncheckedVariance, materializer: ActorBasedFlowMaterializer, flowName: String): MaterializedType + /** * This method is only used for Taps that return true from [[#isActive]], which then must * implement it. */ def create(materializer: ActorBasedFlowMaterializer, flowName: String): (Publisher[Out] @uncheckedVariance, MaterializedType) = throw new UnsupportedOperationException(s"forgot to implement create() for $getClass that says isActive==true") + /** * This method indicates whether this Tap can create a Publisher instead of being * attached to a Subscriber. This is only used if the Flow does not contain any