+str #16038 Create Source/Flow/Sink from partial flow graphs

This commit is contained in:
Björn Antonsson 2014-10-10 10:39:29 +02:00
parent bea750f005
commit 5570704f6c
9 changed files with 783 additions and 84 deletions

View file

@ -0,0 +1,324 @@
/**
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.stream.scaladsl2
import akka.stream.MaterializerSettings
import akka.stream.testkit.StreamTestKit.SubscriberProbe
import akka.stream.testkit.{ StreamTestKit, AkkaSpec }
object GraphFlowSpec {
val tap1 = Source(0 to 3)
val inMerge = Merge[Int]("m1")
val outMerge = Merge[String]("m3")
val partialGraph = PartialFlowGraph { implicit b
import FlowGraphImplicits._
val tap2 = Source(4 to 9)
val tap3 = Source.empty[Int]
val tap4 = Source.empty[String]
val m2 = Merge[Int]("m2")
inMerge ~> Flow[Int].map(_ * 2) ~> m2 ~> Flow[Int].map(_ / 2).map(i (i + 1).toString) ~> outMerge
tap2 ~> inMerge
tap3 ~> m2
tap4 ~> outMerge
}
val stdRequests = 10
val stdResult = Set(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
}
class GraphFlowSpec extends AkkaSpec {
import GraphFlowSpec._
val settings = MaterializerSettings(system)
.withInputBuffer(initialSize = 2, maxSize = 16)
.withFanOutBuffer(initialSize = 1, maxSize = 16)
implicit val materializer = FlowMaterializer(settings)
def validateProbe(probe: SubscriberProbe[Int], requests: Int, result: Set[Int]): Unit = {
val subscription = probe.expectSubscription()
val collected = (1 to requests).map { _
subscription.request(1)
probe.expectNext()
}.toSet
collected should be(result)
probe.expectComplete()
}
"FlowGraphs" when {
"turned into flows" should {
"work with a Tap and Drain" in {
val in = UndefinedSource[Int]
val out = UndefinedSink[Int]
val probe = StreamTestKit.SubscriberProbe[Int]()
val flow = Flow(partialGraph) { implicit b
import FlowGraphImplicits._
in ~> inMerge
outMerge ~> Flow[String].map(_.toInt) ~> out
in -> out
}
tap1.connect(flow).connect(Sink(probe)).run()
validateProbe(probe, stdRequests, stdResult)
}
"be transformable with a Pipe" in {
val in = UndefinedSource[Int]
val out = UndefinedSink[String]
val probe = StreamTestKit.SubscriberProbe[Int]()
val flow = Flow[Int, String](partialGraph) { implicit b
import FlowGraphImplicits._
in ~> inMerge
outMerge ~> out
in -> out
}
tap1.connect(flow).map(_.toInt).connect(Sink(probe)).run()
validateProbe(probe, stdRequests, stdResult)
}
"work with another GraphFlow" in {
val in1 = UndefinedSource[Int]
val out1 = UndefinedSink[String]
val in2 = UndefinedSource[String]
val out2 = UndefinedSink[Int]
val probe = StreamTestKit.SubscriberProbe[Int]()
val flow1 = Flow(partialGraph) { implicit b
import FlowGraphImplicits._
in1 ~> inMerge
outMerge ~> out1
in1 -> out1
}
val flow2 = Flow() { implicit b
import FlowGraphImplicits._
in2 ~> Flow[String].map(_.toInt) ~> out2
in2 -> out2
}
tap1.connect(flow1).connect(flow2).connect(Sink(probe)).run()
validateProbe(probe, stdRequests, stdResult)
}
"be reusable multiple times" in {
val in = UndefinedSource[Int]
val out = UndefinedSink[Int]
val probe = StreamTestKit.SubscriberProbe[Int]()
val flow = Flow() { implicit b
import FlowGraphImplicits._
in ~> Flow[Int].map(_ * 2) ~> out
in -> out
}
FlowGraph { implicit b
import FlowGraphImplicits._
Source(1 to 5) ~> flow ~> flow ~> Sink(probe)
}.run()
validateProbe(probe, 5, Set(4, 8, 12, 16, 20))
}
}
"turned into sources" should {
"work with a Drain" in {
val out = UndefinedSink[Int]
val probe = StreamTestKit.SubscriberProbe[Int]()
val source = Source(partialGraph) { implicit b
import FlowGraphImplicits._
tap1 ~> inMerge
outMerge ~> Flow[String].map(_.toInt) ~> out
out
}
source.connect(Sink(probe)).run()
validateProbe(probe, stdRequests, stdResult)
}
"be transformable with a Pipe" in {
val out = UndefinedSink[String]
val probe = StreamTestKit.SubscriberProbe[Int]()
val source = Source[String](partialGraph) { implicit b
import FlowGraphImplicits._
tap1 ~> inMerge
outMerge ~> out
out
}
source.map(_.toInt).connect(Sink(probe)).run()
validateProbe(probe, stdRequests, stdResult)
}
"work with an GraphFlow" in {
val out1 = UndefinedSink[String]
val in2 = UndefinedSource[String]
val out2 = UndefinedSink[Int]
val probe = StreamTestKit.SubscriberProbe[Int]()
val source = Source(partialGraph) { implicit b
import FlowGraphImplicits._
tap1 ~> inMerge
outMerge ~> out1
out1
}
val flow = Flow() { implicit b
import FlowGraphImplicits._
in2 ~> Flow[String].map(_.toInt) ~> out2
in2 -> out2
}
source.connect(flow).connect(Sink(probe)).run()
validateProbe(probe, stdRequests, stdResult)
}
"be reusable multiple times" in {
val out = UndefinedSink[Int]
val probe = StreamTestKit.SubscriberProbe[Int]()
val source = Source[Int]() { implicit b
import FlowGraphImplicits._
Source(1 to 5) ~> Flow[Int].map(_ * 2) ~> out
out
}
FlowGraph { implicit b
import FlowGraphImplicits._
val merge = Merge[Int]("merge")
source ~> merge ~> Sink(probe)
source ~> Flow[Int].map(_ * 10) ~> merge
}.run()
validateProbe(probe, 10, Set(2, 4, 6, 8, 10, 20, 40, 60, 80, 100))
}
}
"turned into sinks" should {
"work with a Tap" in {
val in = UndefinedSource[Int]
val probe = StreamTestKit.SubscriberProbe[Int]()
val sink = Sink(partialGraph) { implicit b
import FlowGraphImplicits._
in ~> inMerge
outMerge ~> Flow[String].map(_.toInt) ~> Sink(probe)
in
}
tap1.connect(sink).run()
validateProbe(probe, stdRequests, stdResult)
}
"be transformable with a Pipe" in {
val in = UndefinedSource[String]
val probe = StreamTestKit.SubscriberProbe[Int]()
val sink = Sink(partialGraph) { implicit b
import FlowGraphImplicits._
in ~> Flow[String].map(_.toInt) ~> inMerge
outMerge ~> Flow[String].map(_.toInt) ~> Sink(probe)
in
}
val iSink = Flow[Int].map(_.toString).connect(sink)
tap1.connect(iSink).run()
validateProbe(probe, stdRequests, stdResult)
}
"work with a GraphFlow" in {
val in1 = UndefinedSource[Int]
val out1 = UndefinedSink[String]
val in2 = UndefinedSource[String]
val probe = StreamTestKit.SubscriberProbe[Int]()
val flow = Flow(partialGraph) { implicit b
import FlowGraphImplicits._
in1 ~> inMerge
outMerge ~> out1
in1 -> out1
}
val sink = Sink() { implicit b
import FlowGraphImplicits._
in2 ~> Flow[String].map(_.toInt) ~> Sink(probe)
in2
}
tap1.connect(flow).connect(sink).run()
validateProbe(probe, stdRequests, stdResult)
}
}
"used together" should {
"materialize properly" in {
val probe = StreamTestKit.SubscriberProbe[Int]()
val inSource = SubscriberTap[Int]
val outSink = PublisherDrain[Int]
val flow = Flow(partialGraph) { implicit b
import FlowGraphImplicits._
val in = UndefinedSource[Int]
val out = UndefinedSink[Int]
in ~> inMerge
outMerge ~> Flow[String].map(_.toInt) ~> out
in -> out
}
val source = Source[String]() { implicit b
import FlowGraphImplicits._
val out = UndefinedSink[String]
inSource ~> Flow[Int].map(_.toString) ~> out
out
}
val sink = Sink() { implicit b
import FlowGraphImplicits._
val in = UndefinedSource[String]
in ~> Flow[String].map(_.toInt) ~> outSink
in
}
val mm = FlowGraph { implicit b
import FlowGraphImplicits._
source ~> Flow[String].map(_.toInt) ~> flow ~> Flow[Int].map(_.toString) ~> sink
}.run()
val subscriber = mm.materializedTap(inSource)
val publisher = mm.materializedDrain(outSink)
tap1.runWith(PublisherDrain()).subscribe(subscriber)
publisher.subscribe(probe)
validateProbe(probe, stdRequests, stdResult)
}
}
}
}

View file

@ -26,8 +26,6 @@ import java.util.concurrent.atomic.AtomicReference
* Drain nodes themselves (or construct an ActorBasedFlowMaterializer).
*/
trait Drain[-In] extends Sink[In] {
override def runWith(tap: TapWithKey[In])(implicit materializer: FlowMaterializer): tap.MaterializedType =
tap.connect(this).run().materializedTap(tap)
}
/**
@ -46,19 +44,20 @@ trait SimpleDrain[-In] extends Drain[In] {
* @param flowName the name of the current flow, which should be used in log statements or error messages
*/
def attach(flowPublisher: Publisher[In @uncheckedVariance], materializer: ActorBasedFlowMaterializer, flowName: String): Unit
/**
* This method is only used for Drains that return true from [[#isActive]], which then must
* implement it.
*/
def create(materializer: ActorBasedFlowMaterializer, flowName: String): Subscriber[In] @uncheckedVariance =
throw new UnsupportedOperationException(s"forgot to implement create() for $getClass that says isActive==true")
/**
* This method indicates whether this Drain can create a Subscriber instead of being
* attached to a Publisher. This is only used if the Flow does not contain any
* operations.
*/
def isActive: Boolean = false
}
/**
@ -82,12 +81,14 @@ trait DrainWithKey[-In] extends Drain[In] {
* @param flowName the name of the current flow, which should be used in log statements or error messages
*/
def attach(flowPublisher: Publisher[In @uncheckedVariance], materializer: ActorBasedFlowMaterializer, flowName: String): MaterializedType
/**
* This method is only used for Drains that return true from [[#isActive]], which then must
* implement it.
*/
def create(materializer: ActorBasedFlowMaterializer, flowName: String): (Subscriber[In] @uncheckedVariance, MaterializedType) =
throw new UnsupportedOperationException(s"forgot to implement create() for $getClass that says isActive==true")
/**
* This method indicates whether this Drain can create a Subscriber instead of being
* attached to a Publisher. This is only used if the Flow does not contain any

View file

@ -34,7 +34,10 @@ trait Flow[-In, +Out] extends FlowOps[Out] {
* the materialized values of the `Tap` and `Drain`, e.g. the `Subscriber` of a [[SubscriberTap]] and
* and `Publisher` of a [[PublisherDrain]].
*/
def runWith(tap: TapWithKey[In], drain: DrainWithKey[Out])(implicit materializer: FlowMaterializer): (tap.MaterializedType, drain.MaterializedType)
def runWith(tap: TapWithKey[In], drain: DrainWithKey[Out])(implicit materializer: FlowMaterializer): (tap.MaterializedType, drain.MaterializedType) = {
val m = tap.connect(this).connect(drain).run()
(m.materializedTap(tap), m.materializedDrain(drain))
}
}
object Flow {
@ -43,6 +46,26 @@ object Flow {
* Example usage: `Flow[Int]`
*/
def apply[T]: Flow[T, T] = Pipe.empty[T]
/**
* Creates a `Flow` by using an empty [[FlowGraphBuilder]] on a block that expects a [[FlowGraphBuilder]] and
* returns the `UndefinedSource` and `UndefinedSink`.
*/
def apply[I, O]()(block: FlowGraphBuilder (UndefinedSource[I], UndefinedSink[O])): Flow[I, O] =
createFlowFromBuilder(new FlowGraphBuilder(), block)
/**
* Creates a `Flow` by using a [[FlowGraphBuilder]] from this [[PartialFlowGraph]] on a block that expects
* a [[FlowGraphBuilder]] and returns the `UndefinedSource` and `UndefinedSink`.
*/
def apply[I, O](graph: PartialFlowGraph)(block: FlowGraphBuilder (UndefinedSource[I], UndefinedSink[O])): Flow[I, O] =
createFlowFromBuilder(new FlowGraphBuilder(graph.graph), block)
private def createFlowFromBuilder[I, O](builder: FlowGraphBuilder,
block: FlowGraphBuilder (UndefinedSource[I], UndefinedSink[O])): Flow[I, O] = {
val (in, out) = block(builder)
builder.partialBuild().toFlow(in, out)
}
}
/**
@ -415,7 +438,7 @@ trait FlowOps[+Out] {
/** INTERNAL API */
// Storing ops in reverse order
protected def andThen[U](op: AstNode): Repr[U]
private[scaladsl2] def andThen[U](op: AstNode): Repr[U]
}
/**

View file

@ -84,6 +84,8 @@ final class Merge[T](override val name: Option[String]) extends FlowGraphInterna
override val maximumOutputCount: Int = 1
override private[akka] def astNode = Ast.Merge
final override private[scaladsl2] def newInstance() = new Merge[T](None)
}
object MergePreferred {
@ -131,6 +133,8 @@ final class MergePreferred[T](override val name: Option[String]) extends FlowGra
override val maximumOutputCount: Int = 1
override private[akka] def astNode = Ast.MergePreferred
final override private[scaladsl2] def newInstance() = new MergePreferred[T](None)
}
object Broadcast {
@ -163,6 +167,8 @@ final class Broadcast[T](override val name: Option[String]) extends FlowGraphInt
override def maximumOutputCount: Int = Int.MaxValue
override private[akka] def astNode = Ast.Broadcast
final override private[scaladsl2] def newInstance() = new Broadcast[T](None)
}
object Balance {
@ -195,6 +201,8 @@ final class Balance[T](override val name: Option[String]) extends FlowGraphInter
override def maximumOutputCount: Int = Int.MaxValue
override private[akka] def astNode = Ast.Balance
final override private[scaladsl2] def newInstance() = new Balance[T](None)
}
object Zip {
@ -243,6 +251,8 @@ final class Zip[A, B](override val name: Option[String]) extends FlowGraphIntern
override def maximumOutputCount: Int = 1
override private[akka] def astNode = Ast.Zip
final override private[scaladsl2] def newInstance() = new Zip[A, B](name = None)
}
object Unzip {
@ -289,6 +299,8 @@ final class Unzip[A, B](override val name: Option[String]) extends FlowGraphInte
override def maximumOutputCount: Int = 2
override private[akka] def astNode = Ast.Unzip
final override private[scaladsl2] def newInstance() = new Unzip[A, B](name = None)
}
object Concat {
@ -337,6 +349,8 @@ final class Concat[T](override val name: Option[String]) extends FlowGraphIntern
override def maximumOutputCount: Int = 1
override private[akka] def astNode = Ast.Concat
final override private[scaladsl2] def newInstance() = new Concat[T](name = None)
}
object UndefinedSink {
@ -361,12 +375,15 @@ object UndefinedSink {
* be replaced with [[FlowGraphBuilder#attachDrain]].
*/
final class UndefinedSink[-T](override val name: Option[String]) extends FlowGraphInternal.InternalVertex {
override def minimumInputCount: Int = 1
override def maximumInputCount: Int = 1
override def minimumOutputCount: Int = 0
override def maximumOutputCount: Int = 0
override private[akka] def astNode = throw new UnsupportedOperationException("Undefined sinks cannot be materialized")
final override private[scaladsl2] def newInstance() = new UndefinedSink[T](name = None)
}
object UndefinedSource {
@ -397,28 +414,40 @@ final class UndefinedSource[+T](override val name: Option[String]) extends FlowG
override def maximumOutputCount: Int = 1
override private[akka] def astNode = throw new UnsupportedOperationException("Undefined sources cannot be materialized")
final override private[scaladsl2] def newInstance() = new UndefinedSource[T](name = None)
}
/**
* INTERNAL API
*/
private[akka] object FlowGraphInternal {
val OnlyPipesErrorMessage = "Only pipes are supported currently!"
def throwUnsupportedValue(x: Any): Nothing =
throw new IllegalArgumentException(s"Unsupported value [$x] of type [${x.getClass.getName}]. Only Pipes and Graphs are supported!")
def UnlabeledPort = -1
sealed trait Vertex
sealed trait Vertex {
// must return a new instance that is uniquely identifiable (i.e. no name for hashCode or equality)
private[scaladsl2] def newInstance(): Vertex
}
case class TapVertex(tap: Tap[_]) extends Vertex {
override def toString = tap.toString
// these are unique keys, case class equality would break them
final override def equals(other: Any): Boolean = super.equals(other)
final override def hashCode: Int = super.hashCode
final override private[scaladsl2] def newInstance() = this.copy()
}
case class DrainVertex(drain: Drain[_]) extends Vertex {
override def toString = drain.toString
// these are unique keys, case class equality would break them
final override def equals(other: Any): Boolean = super.equals(other)
final override def hashCode: Int = super.hashCode
final override private[scaladsl2] def newInstance() = this.copy()
}
sealed trait InternalVertex extends Vertex {
@ -466,6 +495,13 @@ private[akka] object FlowGraphInternal {
}
object FlowGraphBuilder {
private[scaladsl2] def apply[T](partialFlowGraph: PartialFlowGraph)(block: FlowGraphBuilder T): T = {
val builder = new FlowGraphBuilder(partialFlowGraph.graph)
block(builder)
}
}
/**
* Builder of [[FlowGraph]] and [[PartialFlowGraph]].
* Syntactic sugar is provided by [[FlowGraphImplicits]].
@ -498,72 +534,104 @@ class FlowGraphBuilder private (graph: Graph[FlowGraphInternal.Vertex, FlowGraph
this
}
def addEdge[T](source: UndefinedSource[T], junctionIn: JunctionInPort[T]): this.type = addEdge(source, Pipe.empty[T], junctionIn)
def addEdge[In, Out](source: UndefinedSource[In], flow: Flow[In, Out], junctionIn: JunctionInPort[Out]): this.type = {
checkJunctionInPortPrecondition(junctionIn)
flow match {
case pipe: Pipe[In, Out]
addGraphEdge(source, junctionIn.vertex, pipe, inputPort = junctionIn.port, outputPort = UnlabeledPort)
case _
throw new IllegalArgumentException(OnlyPipesErrorMessage)
case gflow: GraphFlow[In, _, _, Out]
val tOut = UndefinedSink[In]
val tIn = UndefinedSource[Out]
addEdge(source, tOut)
addEdge(tIn, junctionIn)
connect(tOut, gflow, tIn)
case x throwUnsupportedValue(x)
}
this
}
def addEdge[T](junctionOut: JunctionOutPort[T], sink: UndefinedSink[T]): this.type =
addEdge(junctionOut, Pipe.empty[T], sink)
def addEdge[In, Out](junctionOut: JunctionOutPort[In], flow: Flow[In, Out], sink: UndefinedSink[Out]): this.type = {
checkJunctionOutPortPrecondition(junctionOut)
flow match {
case pipe: Pipe[In, Out]
addGraphEdge(junctionOut.vertex, sink, pipe, inputPort = UnlabeledPort, outputPort = junctionOut.port)
case _
throw new IllegalArgumentException(OnlyPipesErrorMessage)
case gflow: GraphFlow[In, _, _, Out]
val tOut = UndefinedSink[In]
val tIn = UndefinedSource[Out]
addEdge(junctionOut, tOut)
addEdge(tIn, sink)
connect(tOut, gflow, tIn)
case x throwUnsupportedValue(x)
}
this
}
def addEdge[T](junctionOut: JunctionOutPort[T], junctionIn: JunctionInPort[T]): this.type =
addEdge(junctionOut, Pipe.empty[T], junctionIn)
def addEdge[In, Out](junctionOut: JunctionOutPort[In], flow: Flow[In, Out], junctionIn: JunctionInPort[Out]): this.type = {
checkJunctionOutPortPrecondition(junctionOut)
checkJunctionInPortPrecondition(junctionIn)
flow match {
case pipe: Pipe[In, Out]
addGraphEdge(junctionOut.vertex, junctionIn.vertex, pipe, inputPort = junctionIn.port, outputPort = junctionOut.port)
case _
throw new IllegalArgumentException(OnlyPipesErrorMessage)
case gflow: GraphFlow[In, _, _, Out]
val tOut = UndefinedSink[In]
val tIn = UndefinedSource[Out]
addEdge(junctionOut, tOut)
addEdge(tIn, junctionIn)
connect(tOut, gflow, tIn)
case x throwUnsupportedValue(x)
}
this
}
def addEdge[T](source: Source[T], junctionIn: JunctionInPort[T]): this.type = addEdge(source, Pipe.empty[T], junctionIn)
def addEdge[In, Out](source: Source[In], flow: Flow[In, Out], junctionIn: JunctionInPort[Out]): this.type = {
(source, flow) match {
case (tap: Tap[In], pipe: Pipe[In, Out])
addTapPipeEdge(tap, pipe, junctionIn)
case (spipe: SourcePipe[In], pipe: Pipe[In, Out])
addTapPipeEdge(spipe.input, Pipe(spipe.ops).appendPipe(pipe), junctionIn)
case _
throw new IllegalArgumentException(OnlyPipesErrorMessage)
case (gsource: GraphSource[_, In], _)
val tOut = UndefinedSink[In]
val tIn = UndefinedSource[Out]
addEdge(gsource, tOut)
addEdge(tIn, junctionIn)
connect(tOut, flow, tIn)
case x throwUnsupportedValue(x)
}
this
}
def addEdge[In, Out](junctionOut: JunctionOutPort[In], sink: Sink[In]): this.type = {
sink match {
case drain: Drain[In] addPipeDrainEdge(junctionOut, Pipe.empty[In], drain)
case pipe: SinkPipe[In] addPipeDrainEdge(junctionOut, Pipe(pipe.ops), pipe.output)
case _ throw new IllegalArgumentException(OnlyPipesErrorMessage)
}
this
}
def addEdge[T](junctionOut: JunctionOutPort[T], sink: Sink[T]): this.type =
addEdge(junctionOut, Pipe.empty[T], sink)
def addEdge[In, Out](junctionOut: JunctionOutPort[In], flow: Flow[In, Out], sink: Sink[Out]): this.type = {
(flow, sink) match {
case (pipe: Pipe[In, Out], drain: Drain[Out])
addPipeDrainEdge(junctionOut, pipe, drain)
case (pipe: Pipe[In, Out], spipe: SinkPipe[Out])
addEdge(junctionOut, pipe.connect(spipe))
case _ throw new IllegalArgumentException(OnlyPipesErrorMessage)
addPipeDrainEdge(junctionOut, pipe.appendPipe(Pipe(spipe.ops)), spipe.output)
case (_, gsink: GraphSink[Out, _])
val tOut = UndefinedSink[In]
val tIn = UndefinedSource[Out]
addEdge(tIn, gsink)
addEdge(junctionOut, tOut)
connect(tOut, flow, tIn)
case x throwUnsupportedValue(x)
}
this
}
def addEdge[T](source: Source[T], sink: Sink[T]): this.type = addEdge(source, Pipe.empty[T], sink)
def addEdge[In, Out](source: Source[In], flow: Flow[In, Out], sink: Sink[Out]): this.type = {
(source, flow, sink) match {
case (tap: Tap[In], pipe: Pipe[In, Out], drain: Drain[Out])
@ -581,54 +649,98 @@ class FlowGraphBuilder private (graph: Graph[FlowGraphInternal.Vertex, FlowGraph
val tap = sourcePipe.input
val newPipe = Pipe(sourcePipe.ops).connect(pipe)
addEdge(tap, newPipe, drain) // recursive, but now it is a Tap-Pipe-Drain
case _
throw new IllegalArgumentException(OnlyPipesErrorMessage)
case (_, gflow: GraphFlow[In, _, _, Out], _)
val tOut = UndefinedSink[In]
val tIn = UndefinedSource[Out]
addEdge(source, tOut)
addEdge(tIn, sink)
connect(tOut, gflow, tIn)
case x throwUnsupportedValue(x)
}
this
}
def addEdge[T](source: UndefinedSource[T], sink: UndefinedSink[T]): this.type = addEdge(source, Pipe.empty[T], sink)
def addEdge[In, Out](source: UndefinedSource[In], flow: Flow[In, Out], sink: UndefinedSink[Out]): this.type = {
flow match {
case pipe: Pipe[In, Out]
addGraphEdge(source, sink, pipe, inputPort = UnlabeledPort, outputPort = UnlabeledPort)
case _
throw new IllegalArgumentException(OnlyPipesErrorMessage)
case gflow: GraphFlow[In, _, _, Out]
val tOut = UndefinedSink[In]
val tIn = UndefinedSource[Out]
addEdge(source, tOut)
addEdge(tIn, sink)
connect(tOut, gflow, tIn)
case x throwUnsupportedValue(x)
}
this
}
def addEdge[T](source: UndefinedSource[T], sink: Sink[T]): this.type = addEdge(source, Pipe.empty[T], sink)
def addEdge[In, Out](source: UndefinedSource[In], flow: Flow[In, Out], sink: Sink[Out]): this.type = {
(flow, sink) match {
case (pipe: Pipe[In, Out], drain: Drain[Out])
addGraphEdge(source, DrainVertex(drain), pipe, inputPort = UnlabeledPort, outputPort = UnlabeledPort)
case (pipe: Pipe[In, Out], spipe: SinkPipe[Out])
addGraphEdge(source, DrainVertex(spipe.output), pipe.appendPipe(Pipe(spipe.ops)), inputPort = UnlabeledPort, outputPort = UnlabeledPort)
case _ throw new IllegalArgumentException(OnlyPipesErrorMessage)
case (gflow: GraphFlow[In, _, _, Out], _)
val tOut = UndefinedSink[In]
val tIn = UndefinedSource[Out]
addEdge(source, tOut)
addEdge(tIn, sink)
connect(tOut, gflow, tIn)
case (_, gSink: GraphSink[Out, _])
val oOut = UndefinedSink[Out]
addEdge(source, flow, oOut)
gSink.importAndConnect(this, oOut)
case x throwUnsupportedValue(x)
}
this
}
def addEdge[T](source: Source[T], sink: UndefinedSink[T]): this.type = addEdge(source, Pipe.empty[T], sink)
def addEdge[In, Out](source: Source[In], flow: Flow[In, Out], sink: UndefinedSink[Out]): this.type = {
(flow, source) match {
case (pipe: Pipe[In, Out], tap: Tap[In])
addGraphEdge(TapVertex(tap), sink, pipe, inputPort = UnlabeledPort, outputPort = UnlabeledPort)
case (pipe: Pipe[In, Out], spipe: SourcePipe[Out])
addGraphEdge(TapVertex(spipe.input), sink, Pipe(spipe.ops).appendPipe(pipe), inputPort = UnlabeledPort, outputPort = UnlabeledPort)
case _ throw new IllegalArgumentException(OnlyPipesErrorMessage)
case (_, gsource: GraphSource[_, In])
val tOut1 = UndefinedSource[In]
val tOut2 = UndefinedSink[In]
val tIn = UndefinedSource[Out]
addEdge(tOut1, tOut2)
gsource.importAndConnect(this, tOut1)
addEdge(tIn, sink)
connect(tOut2, flow, tIn)
case x throwUnsupportedValue(x)
}
this
}
private def addGraphEdge[In, Out](from: Vertex, to: Vertex, pipe: Pipe[In, Out], inputPort: Int, outputPort: Int): Unit = {
private def uncheckedAddGraphEdge[In, Out](from: Vertex, to: Vertex, pipe: Pipe[In, Out], inputPort: Int, outputPort: Int): Unit = {
if (edgeQualifier == Int.MaxValue) throw new IllegalArgumentException(s"Too many edges")
checkAddTapDrainPrecondition(from)
checkAddTapDrainPrecondition(to)
val label = EdgeLabel(edgeQualifier)(pipe.asInstanceOf[Pipe[Any, Nothing]], inputPort, outputPort)
graph.addLEdge(from, to)(label)
edgeQualifier += 1
}
private def addGraphEdge[In, Out](from: Vertex, to: Vertex, pipe: Pipe[In, Out], inputPort: Int, outputPort: Int): Unit = {
checkAddTapDrainPrecondition(from)
checkAddTapDrainPrecondition(to)
uncheckedAddGraphEdge(from, to, pipe, inputPort, outputPort)
}
private def addOrReplaceGraphEdge[In, Out](from: Vertex, to: Vertex, pipe: Pipe[In, Out], inputPort: Int, outputPort: Int): Unit = {
checkAddOrReplaceTapDrainPrecondition(from)
checkAddOrReplaceTapDrainPrecondition(to)
uncheckedAddGraphEdge(from, to, pipe, inputPort, outputPort)
}
def attachSink[Out](token: UndefinedSink[Out], sink: Sink[Out]): this.type = {
graph.find(token) match {
case Some(existing)
@ -640,7 +752,9 @@ class FlowGraphBuilder private (graph: Graph[FlowGraphInternal.Vertex, FlowGraph
case spipe: SinkPipe[Out]
val pipe = edge.label.pipe.appendPipe(Pipe(spipe.ops))
addGraphEdge(edge.from.value, DrainVertex(spipe.output), pipe, edge.label.inputPort, edge.label.outputPort)
case _ throw new IllegalArgumentException(OnlyPipesErrorMessage)
case gsink: GraphSink[Out, _]
gsink.importAndConnect(this, token)
case x throwUnsupportedValue(x)
}
case None throw new IllegalArgumentException(s"No matching UndefinedSink [${token}]")
@ -659,8 +773,9 @@ class FlowGraphBuilder private (graph: Graph[FlowGraphInternal.Vertex, FlowGraph
case spipe: SourcePipe[In]
val pipe = Pipe(spipe.ops).appendPipe(edge.label.pipe)
addGraphEdge(TapVertex(spipe.input), edge.to.value, pipe, edge.label.inputPort, edge.label.outputPort)
case _
throw new IllegalArgumentException(OnlyPipesErrorMessage)
case gsource: GraphSource[_, In]
gsource.importAndConnect(this, token)
case x throwUnsupportedValue(x)
}
case None throw new IllegalArgumentException(s"No matching UndefinedSource [${token}]")
@ -685,9 +800,10 @@ class FlowGraphBuilder private (graph: Graph[FlowGraphInternal.Vertex, FlowGraph
val newPipe = outEdge.label.pipe.appendPipe(pipe.asInstanceOf[Pipe[Any, Nothing]]).appendPipe(inEdge.label.pipe)
graph.remove(out)
graph.remove(in)
addGraphEdge(outEdge.from.value, inEdge.to.value, newPipe, inEdge.label.inputPort, outEdge.label.outputPort)
case _
throw new IllegalArgumentException(OnlyPipesErrorMessage)
addOrReplaceGraphEdge(outEdge.from.value, inEdge.to.value, newPipe, inEdge.label.inputPort, outEdge.label.outputPort)
case gflow: GraphFlow[A, _, _, B]
gflow.importAndConnect(this, out, in)
case x throwUnsupportedValue(x)
}
this
@ -716,6 +832,17 @@ class FlowGraphBuilder private (graph: Graph[FlowGraphInternal.Vertex, FlowGraph
addGraphEdge(edge.from.value, edge.to.value, edge.label.pipe, edge.label.inputPort, edge.label.outputPort)
}
private[scaladsl2] def remapPartialFlowGraph(partialFlowGraph: PartialFlowGraph, vertexMapping: Map[Vertex, Vertex]): this.type = {
val mapping = collection.mutable.Map.empty[Vertex, Vertex] ++ vertexMapping
def get(vertex: Vertex): Vertex = mapping.getOrElseUpdate(vertex, vertex.newInstance())
partialFlowGraph.graph.edges.foreach { edge
addGraphEdge(get(edge.from.value), get(edge.to.value), edge.label.pipe, edge.label.inputPort, edge.label.outputPort)
}
this
}
/**
* Flow graphs with cycles are in general dangerous as it can result in deadlocks.
* Therefore, cycles in the graph are by default disallowed. `IllegalArgumentException` will
@ -729,11 +856,20 @@ class FlowGraphBuilder private (graph: Graph[FlowGraphInternal.Vertex, FlowGraph
private def checkAddTapDrainPrecondition(vertex: Vertex): Unit = {
vertex match {
case node @ (_: UndefinedSource[_] | _: UndefinedSink[_])
require(graph.find(node) == None, s"[$node] instance is already used in this flow graph")
require(!graph.contains(node), s"[$node] instance is already used in this flow graph")
case _ // ok
}
}
private def checkAddOrReplaceTapDrainPrecondition(vertex: Vertex): Unit = {
vertex match {
// it is ok to add or replace edges with new or existing undefined sources or sinks
case node @ (_: UndefinedSource[_] | _: UndefinedSink[_])
// all other nodes must already exist in the graph
case node require(graph.contains(node), s"[$node] instance is not in this flow graph")
}
}
private def checkJunctionInPortPrecondition(junction: JunctionInPort[_]): Unit = {
junction.vertex match {
case iv: InternalVertex
@ -741,7 +877,7 @@ class FlowGraphBuilder private (graph: Graph[FlowGraphInternal.Vertex, FlowGraph
case Some(node)
require(
(node.inDegree + 1) <= iv.maximumInputCount,
s"${node.value} must have at most ${iv.maximumInputCount} incoming edges")
s"${node.value} must have at most ${iv.maximumInputCount} incoming edges, , has ${node.inDegree}\n${graph.edges}")
case _ // ok
}
case _ // ok, no checks here
@ -755,7 +891,7 @@ class FlowGraphBuilder private (graph: Graph[FlowGraphInternal.Vertex, FlowGraph
case Some(node)
require(
(node.outDegree + 1) <= iv.maximumOutputCount,
s"${node.value} must have at most ${iv.maximumOutputCount} outgoing edges")
s"${node.value} must have at most ${iv.maximumOutputCount} outgoing edges, has ${node.outDegree}\n${graph.edges}")
case _ // ok
}
case _ // ok, no checks here
@ -884,13 +1020,13 @@ object FlowGraph {
* Build a `FlowGraph` by starting with one of the `apply` methods in
* in [[FlowGraph$ companion object]]. Syntactic sugar is provided by [[FlowGraphImplicits]].
*/
class FlowGraph private[akka] (private[akka] val graph: ImmutableGraph[FlowGraphInternal.Vertex, FlowGraphInternal.EdgeType]) {
class FlowGraph private[akka] (private[akka] val graph: ImmutableGraph[FlowGraphInternal.Vertex, FlowGraphInternal.EdgeType]) extends RunnableFlow {
import FlowGraphInternal._
/**
* Materialize the `FlowGraph` and attach all sinks and sources.
*/
def run()(implicit materializer: FlowMaterializer): MaterializedMap = {
override def run()(implicit materializer: FlowMaterializer): MaterializedMap = {
val edges = graph.edges
if (edges.size == 1) {
val edge = edges.head
@ -1046,7 +1182,6 @@ object PartialFlowGraph {
block(builder)
builder.partialBuild()
}
}
/**
@ -1069,6 +1204,48 @@ class PartialFlowGraph private[akka] (private[akka] val graph: ImmutableGraph[Fl
case n: UndefinedSink[_] n
}.toSet
/**
* Creates a [[Source]] from this `PartialFlowGraph`. There needs to be only one [[UndefinedSink]] and
* no [[UndefinedSource]] in the graph, and you need to provide it as a parameter.
*/
def toSource[O](out: UndefinedSink[O]): Source[O] = {
require(graph.contains(out), s"Couldn't create Source with [$out], no matching UndefinedSink")
checkUndefinedSinksAndSources(sources = Nil, sinks = List(out), description = "Source")
GraphSource(this, out, Pipe.empty[O])
}
/**
* Creates a [[Flow]] from this `PartialFlowGraph`. There needs to be only one [[UndefinedSource]] and
* one [[UndefinedSink]] in the graph, and you need to provide them as parameters.
*/
def toFlow[I, O](in: UndefinedSource[I], out: UndefinedSink[O]): Flow[I, O] = {
checkUndefinedSinksAndSources(sources = List(in), sinks = List(out), description = "Flow")
GraphFlow(Pipe.empty[I], in, this, out, Pipe.empty[O])
}
/**
* Creates a [[Sink]] from this `PartialFlowGraph`. There needs to be only one [[UndefinedSource]] and
* no [[UndefinedSink]] in the graph, and you need to provide it as a parameter.
*/
def toSink[I](in: UndefinedSource[I]): Sink[I] = {
checkUndefinedSinksAndSources(sources = List(in), sinks = Nil, description = "Sink")
GraphSink(Pipe.empty[I], in, this)
}
private def checkUndefinedSinksAndSources(sources: List[UndefinedSource[_]], sinks: List[UndefinedSink[_]], description: String): Unit = {
def expected(name: String, num: Int): String = s"Couldn't create $description, expected $num undefined $name${if (num == 1) "" else "s"}, but found"
def checkNodes(nodes: List[Vertex], nodeDescription: String): Int = (0 /: nodes) {
case (size, node)
require(graph.contains(node), s"Couldn't create $description with [$node], no matching $nodeDescription")
size + 1
}
val numSources = checkNodes(sources, "UndefinedSource")
val numSinks = checkNodes(sinks, "UndefinedSink")
val uSources = undefinedSources
require(uSources.size == numSources, s"${expected("source", numSources)} ${uSources}")
val uSinks = undefinedSinks
require(uSinks.size == numSinks, s"${expected("sink", numSinks)} ${uSinks}")
}
}
/**
@ -1104,20 +1281,26 @@ object FlowGraphImplicits {
new SourceNextStep(source, flow, builder)
def ~>(junctionIn: JunctionInPort[Out])(implicit builder: FlowGraphBuilder): JunctionOutPort[junctionIn.NextT] = {
builder.addEdge(source, Pipe.empty[Out], junctionIn)
builder.addEdge(source, junctionIn)
junctionIn.next
}
def ~>(sink: Sink[Out])(implicit builder: FlowGraphBuilder): Unit =
builder.addEdge(source, Pipe.empty[Out], sink)
builder.addEdge(source, sink)
}
class SourceNextStep[In, Out](source: Source[In], flow: Flow[In, Out], builder: FlowGraphBuilder) {
def ~>[O](otherflow: Flow[Out, O])(implicit builder: FlowGraphBuilder): SourceNextStep[In, O] =
new SourceNextStep(source, flow.connect(otherflow), builder)
def ~>(junctionIn: JunctionInPort[Out]): JunctionOutPort[junctionIn.NextT] = {
builder.addEdge(source, flow, junctionIn)
junctionIn.next
}
def ~>(sink: UndefinedSink[Out])(implicit builder: FlowGraphBuilder): Unit =
builder.addEdge(source, flow, sink)
def ~>(sink: Sink[Out]): Unit =
builder.addEdge(source, flow, sink)
}
@ -1130,7 +1313,7 @@ object FlowGraphImplicits {
builder.addEdge(junction, Pipe.empty[In], sink)
def ~>(junctionIn: JunctionInPort[In])(implicit builder: FlowGraphBuilder): JunctionOutPort[junctionIn.NextT] = {
builder.addEdge(junction, Pipe.empty[In], junctionIn)
builder.addEdge(junction, junctionIn)
junctionIn.next
}
@ -1157,7 +1340,7 @@ object FlowGraphImplicits {
new UndefinedSourceNextStep(source, flow, builder)
def ~>(junctionIn: JunctionInPort[In])(implicit builder: FlowGraphBuilder): JunctionOutPort[junctionIn.NextT] = {
builder.addEdge(source, Pipe.empty[In], junctionIn)
builder.addEdge(source, junctionIn)
junctionIn.next
}
@ -1168,6 +1351,16 @@ object FlowGraphImplicits {
builder.addEdge(source, flow, junctionIn)
junctionIn.next
}
def ~>[T](otherFlow: Flow[Out, T])(implicit builder: FlowGraphBuilder): UndefinedSourceNextStep[In, T] =
new UndefinedSourceNextStep(source, flow.connect(otherFlow), builder)
def ~>(sink: Sink[Out]): Unit = {
builder.addEdge(source, flow, sink)
}
def ~>(sink: UndefinedSink[Out]): Unit = {
builder.addEdge(source, flow, sink)
}
}
}

View file

@ -0,0 +1,128 @@
/**
* Copyright (C) 2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.stream.scaladsl2
import akka.stream.impl2.Ast.AstNode
import scala.annotation.unchecked.uncheckedVariance
private[scaladsl2] case class GraphFlow[-In, CIn, COut, +Out](inPipe: Pipe[In, CIn], in: UndefinedSource[CIn], graph: PartialFlowGraph, out: UndefinedSink[COut], outPipe: Pipe[COut, Out]) extends Flow[In, Out] {
override type Repr[+O] = GraphFlow[In @uncheckedVariance, CIn, COut, O]
private[scaladsl2] def prepend[T](pipe: Pipe[T, In]): GraphFlow[T, CIn, COut, Out] = copy(inPipe = pipe.appendPipe(inPipe))
private[scaladsl2] def prepend(pipe: SourcePipe[In]): GraphSource[COut, Out] = {
val newGraph = PartialFlowGraph(graph) { b
b.attachSource(in, pipe.appendPipe(inPipe))
}
GraphSource(newGraph, out, outPipe)
}
private[scaladsl2] def remap(builder: FlowGraphBuilder): (UndefinedSource[CIn], UndefinedSink[COut]) = {
val nIn = UndefinedSource[CIn]
val nOut = UndefinedSink[COut]
builder.remapPartialFlowGraph(graph, Map(in -> nIn, out -> nOut))
(nIn, nOut)
}
private[scaladsl2] def importAndConnect(builder: FlowGraphBuilder, oOut: UndefinedSink[In @uncheckedVariance], oIn: UndefinedSource[Out @uncheckedVariance]): Unit = {
val (nIn, nOut) = remap(builder)
builder.connect(oOut, inPipe, nIn)
builder.connect(nOut, outPipe, oIn)
}
def connect[T](flow: Flow[Out, T]): Flow[In, T] = flow match {
case pipe: Pipe[Out, T] copy(outPipe = outPipe.appendPipe(pipe))
case gFlow: GraphFlow[Out, _, _, T]
val (newGraph, nOut) = FlowGraphBuilder(graph) { b
val (oIn, oOut) = gFlow.remap(b)
b.connect(out, outPipe.connect(gFlow.inPipe), oIn)
(b.partialBuild(), oOut)
}
GraphFlow(inPipe, in, newGraph, nOut, gFlow.outPipe)
}
override def connect(sink: Sink[Out]) = sink match {
case drain: Drain[Out] connect(Pipe.empty.withDrain(drain)) // recursive, but now it is a SinkPipe
case sinkPipe: SinkPipe[Out]
val newGraph = PartialFlowGraph(this.graph) { builder
builder.attachSink(out, outPipe.connect(sinkPipe))
}
GraphSink(inPipe, in, newGraph)
case gSink: GraphSink[Out, Out]
val newGraph = PartialFlowGraph(graph) { b
val oIn = gSink.remap(b)
b.connect(out, outPipe.connect(gSink.inPipe), oIn)
}
GraphSink(inPipe, in, newGraph)
}
override private[scaladsl2] def andThen[T](op: AstNode): Repr[T] = copy(outPipe = outPipe.andThen(op))
}
private[scaladsl2] case class GraphSource[COut, +Out](graph: PartialFlowGraph, out: UndefinedSink[COut], outPipe: Pipe[COut, Out]) extends Source[Out] {
override type Repr[+O] = GraphSource[COut, O]
private[scaladsl2] def remap(builder: FlowGraphBuilder): UndefinedSink[COut] = {
val nOut = UndefinedSink[COut]
builder.remapPartialFlowGraph(graph, Map(out -> nOut))
nOut
}
private[scaladsl2] def importAndConnect(builder: FlowGraphBuilder, oIn: UndefinedSource[Out @uncheckedVariance]): Unit = {
val nOut = remap(builder)
builder.connect(nOut, outPipe, oIn)
}
override def connect[T](flow: Flow[Out, T]): Source[T] = flow match {
case pipe: Pipe[Out, T] copy(outPipe = outPipe.appendPipe(pipe))
case gFlow: GraphFlow[Out, _, _, T]
val (newGraph, nOut) = FlowGraphBuilder(graph) { b
val (oIn, oOut) = gFlow.remap(b)
b.connect(out, outPipe.connect(gFlow.inPipe), oIn)
(b.partialBuild(), oOut)
}
GraphSource(newGraph, nOut, gFlow.outPipe)
}
override def connect(sink: Sink[Out]): RunnableFlow = sink match {
case drain: Drain[Out]
connect(Pipe.empty.withDrain(drain)) // recursive, but now it is a SinkPipe
case sinkPipe: SinkPipe[Out]
FlowGraph(this.graph) { implicit builder
builder.attachSink(out, outPipe.connect(sinkPipe))
}
case gSink: GraphSink[Out, _]
FlowGraph(graph) { b
val oIn = gSink.remap(b)
b.connect(out, outPipe.connect(gSink.inPipe), oIn)
}
}
override private[scaladsl2] def andThen[T](op: AstNode): Repr[T] = copy(outPipe = outPipe.andThen(op))
}
private[scaladsl2] case class GraphSink[-In, CIn](inPipe: Pipe[In, CIn], in: UndefinedSource[CIn], graph: PartialFlowGraph) extends Sink[In] {
private[scaladsl2] def remap(builder: FlowGraphBuilder): UndefinedSource[CIn] = {
val nIn = UndefinedSource[CIn]
builder.remapPartialFlowGraph(graph, Map(in -> nIn))
nIn
}
private[scaladsl2] def prepend(pipe: SourcePipe[In]): FlowGraph = {
FlowGraph(this.graph) { b
b.attachSource(in, pipe.connect(inPipe))
}
}
private[scaladsl2] def prepend[T](pipe: Pipe[T, In]): GraphSink[T, CIn] = {
GraphSink(pipe.appendPipe(inPipe), in, graph)
}
private[scaladsl2] def importAndConnect(builder: FlowGraphBuilder, oOut: UndefinedSink[In @uncheckedVariance]): Unit = {
val nIn = remap(builder)
builder.connect(oOut, inPipe, nIn)
}
}

View file

@ -13,8 +13,6 @@ import scala.language.existentials
private[scaladsl2] object Pipe {
private val emptyInstance = Pipe[Any, Any](ops = Nil)
def empty[T]: Pipe[T, T] = emptyInstance.asInstanceOf[Pipe[T, T]]
val OnlyPipesErrorMessage = "Only pipes are supported currently!"
}
/**
@ -23,7 +21,7 @@ private[scaladsl2] object Pipe {
private[scaladsl2] final case class Pipe[-In, +Out](ops: List[AstNode]) extends Flow[In, Out] {
override type Repr[+O] = Pipe[In @uncheckedVariance, O]
override protected def andThen[U](op: AstNode): Repr[U] = this.copy(ops = op :: ops)
override private[scaladsl2] def andThen[U](op: AstNode): Repr[U] = this.copy(ops = op :: ops)
private[scaladsl2] def withDrain(out: Drain[Out]): SinkPipe[In] = SinkPipe(out, ops)
@ -31,18 +29,15 @@ private[scaladsl2] final case class Pipe[-In, +Out](ops: List[AstNode]) extends
override def connect[T](flow: Flow[Out, T]): Flow[In, T] = flow match {
case p: Pipe[T, In] Pipe(p.ops ++: ops)
case _ throw new IllegalArgumentException(Pipe.OnlyPipesErrorMessage)
case gf: GraphFlow[Out, _, _, T] gf.prepend(this)
case x FlowGraphInternal.throwUnsupportedValue(x)
}
override def connect(sink: Sink[Out]): Sink[In] = sink match {
case sp: SinkPipe[Out] sp.prependPipe(this)
case d: Drain[Out] this.withDrain(d)
case _ throw new IllegalArgumentException(Pipe.OnlyPipesErrorMessage)
}
override def runWith(tap: TapWithKey[In], drain: DrainWithKey[Out])(implicit materializer: FlowMaterializer): (tap.MaterializedType, drain.MaterializedType) = {
val m = tap.connect(this).connect(drain).run()
(m.materializedTap(tap), m.materializedDrain(drain))
case sp: SinkPipe[Out] sp.prependPipe(this)
case gs: GraphSink[Out, _] gs.prepend(this)
case x FlowGraphInternal.throwUnsupportedValue(x)
}
private[scaladsl2] def appendPipe[T](pipe: Pipe[Out, T]): Pipe[In, T] = Pipe(pipe.ops ++: ops)
@ -56,10 +51,6 @@ private[scaladsl2] final case class SinkPipe[-In](output: Drain[_], ops: List[As
private[scaladsl2] def withTap(in: Tap[In]): RunnablePipe = RunnablePipe(in, output, ops)
private[scaladsl2] def prependPipe[T](pipe: Pipe[T, In]): SinkPipe[T] = SinkPipe(output, ops ::: pipe.ops)
override def runWith(tap: TapWithKey[In])(implicit materializer: FlowMaterializer): tap.MaterializedType =
tap.connect(this).run().materializedTap(tap)
}
/**
@ -68,7 +59,7 @@ private[scaladsl2] final case class SinkPipe[-In](output: Drain[_], ops: List[As
private[scaladsl2] final case class SourcePipe[+Out](input: Tap[_], ops: List[AstNode]) extends Source[Out] {
override type Repr[+O] = SourcePipe[O]
override protected def andThen[U](op: AstNode): Repr[U] = SourcePipe(input, op :: ops)
override private[scaladsl2] def andThen[U](op: AstNode): Repr[U] = SourcePipe(input, op :: ops)
private[scaladsl2] def withDrain(out: Drain[Out]): RunnablePipe = RunnablePipe(input, out, ops)
@ -76,18 +67,16 @@ private[scaladsl2] final case class SourcePipe[+Out](input: Tap[_], ops: List[As
override def connect[T](flow: Flow[Out, T]): Source[T] = flow match {
case p: Pipe[Out, T] appendPipe(p)
case _ throw new IllegalArgumentException(Pipe.OnlyPipesErrorMessage)
case g: GraphFlow[Out, _, _, T] g.prepend(this)
case x FlowGraphInternal.throwUnsupportedValue(x)
}
override def connect(sink: Sink[Out]): RunnableFlow = sink match {
case sp: SinkPipe[Out] RunnablePipe(input, sp.output, sp.ops ++: ops)
case d: Drain[Out] this.withDrain(d)
case _ throw new IllegalArgumentException(Pipe.OnlyPipesErrorMessage)
case g: GraphSink[Out, _] g.prepend(this)
case x FlowGraphInternal.throwUnsupportedValue(x)
}
override def runWith(drain: DrainWithKey[Out])(implicit materializer: FlowMaterializer): drain.MaterializedType =
withDrain(drain).run().materializedDrain(drain)
}
/**

View file

@ -18,7 +18,9 @@ trait Sink[-In] {
* Connect this `Sink` to a `Tap` and run it. The returned value is the materialized value
* of the `Tap`, e.g. the `Subscriber` of a [[SubscriberTap]].
*/
def runWith(tap: TapWithKey[In])(implicit materializer: FlowMaterializer): tap.MaterializedType
def runWith(tap: TapWithKey[In])(implicit materializer: FlowMaterializer): tap.MaterializedType =
tap.connect(this).run().materializedTap(tap)
}
object Sink {
@ -26,4 +28,23 @@ object Sink {
* Helper to create [[Sink]] from `Subscriber`.
*/
def apply[T](subscriber: Subscriber[T]): Drain[T] = SubscriberDrain(subscriber)
/**
* Creates a `Sink` by using an empty [[FlowGraphBuilder]] on a block that expects a [[FlowGraphBuilder]] and
* returns the `UndefinedSource`.
*/
def apply[T]()(block: FlowGraphBuilder UndefinedSource[T]): Sink[T] =
createSinkFromBuilder(new FlowGraphBuilder(), block)
/**
* Creates a `Sink` by using a FlowGraphBuilder from this [[PartialFlowGraph]] on a block that expects
* a [[FlowGraphBuilder]] and returns the `UndefinedSource`.
*/
def apply[T](graph: PartialFlowGraph)(block: FlowGraphBuilder UndefinedSource[T]): Sink[T] =
createSinkFromBuilder(new FlowGraphBuilder(graph.graph), block)
private def createSinkFromBuilder[T](builder: FlowGraphBuilder, block: FlowGraphBuilder UndefinedSource[T]): Sink[T] = {
val in = block(builder)
builder.partialBuild().toSink(in)
}
}

View file

@ -35,7 +35,8 @@ trait Source[+Out] extends FlowOps[Out] {
* Connect this `Source` to a `Drain` and run it. The returned value is the materialized value
* of the `Drain`, e.g. the `Publisher` of a [[PublisherDrain]].
*/
def runWith(drain: DrainWithKey[Out])(implicit materializer: FlowMaterializer): drain.MaterializedType
def runWith(drain: DrainWithKey[Out])(implicit materializer: FlowMaterializer): drain.MaterializedType =
connect(drain).run().materializedDrain(drain)
/**
* Shortcut for running this `Source` with a fold function.
@ -136,4 +137,22 @@ object Source {
*/
def failed[T](cause: Throwable): Source[T] = apply(ErrorPublisher(cause))
/**
* Creates a `Source` by using an empty [[FlowGraphBuilder]] on a block that expects a [[FlowGraphBuilder]] and
* returns the `UndefinedSink`.
*/
def apply[T]()(block: FlowGraphBuilder UndefinedSink[T]): Source[T] =
createSourceFromBuilder(new FlowGraphBuilder(), block)
/**
* Creates a `Source` by using a [[FlowGraphBuilder]] from this [[PartialFlowGraph]] on a block that expects
* a [[FlowGraphBuilder]] and returns the `UndefinedSink`.
*/
def apply[T](graph: PartialFlowGraph)(block: FlowGraphBuilder UndefinedSink[T]): Source[T] =
createSourceFromBuilder(new FlowGraphBuilder(graph.graph), block)
private def createSourceFromBuilder[T](builder: FlowGraphBuilder, block: FlowGraphBuilder UndefinedSink[T]): Source[T] = {
val out = block(builder)
builder.partialBuild().toSource(out)
}
}

View file

@ -34,11 +34,8 @@ trait Tap[+Out] extends Source[Out] {
override def connect(sink: Sink[Out]): RunnableFlow = sourcePipe.connect(sink)
override def runWith(drain: DrainWithKey[Out])(implicit materializer: FlowMaterializer): drain.MaterializedType =
connect(drain).run().materializedDrain(drain)
/** INTERNAL API */
override protected def andThen[U](op: AstNode) = SourcePipe(this, List(op))
override private[scaladsl2] def andThen[U](op: AstNode) = SourcePipe(this, List(op))
}
/**
@ -57,12 +54,14 @@ trait SimpleTap[+Out] extends Tap[Out] {
* @param flowName the name of the current flow, which should be used in log statements or error messages
*/
def attach(flowSubscriber: Subscriber[Out] @uncheckedVariance, materializer: ActorBasedFlowMaterializer, flowName: String): Unit
/**
* This method is only used for Taps that return true from [[#isActive]], which then must
* implement it.
*/
def create(materializer: ActorBasedFlowMaterializer, flowName: String): Publisher[Out] @uncheckedVariance =
throw new UnsupportedOperationException(s"forgot to implement create() for $getClass that says isActive==true")
/**
* This method indicates whether this Tap can create a Publisher instead of being
* attached to a Subscriber. This is only used if the Flow does not contain any
@ -96,12 +95,14 @@ trait TapWithKey[+Out] extends Tap[Out] {
* @param flowName the name of the current flow, which should be used in log statements or error messages
*/
def attach(flowSubscriber: Subscriber[Out] @uncheckedVariance, materializer: ActorBasedFlowMaterializer, flowName: String): MaterializedType
/**
* This method is only used for Taps that return true from [[#isActive]], which then must
* implement it.
*/
def create(materializer: ActorBasedFlowMaterializer, flowName: String): (Publisher[Out] @uncheckedVariance, MaterializedType) =
throw new UnsupportedOperationException(s"forgot to implement create() for $getClass that says isActive==true")
/**
* This method indicates whether this Tap can create a Publisher instead of being
* attached to a Subscriber. This is only used if the Flow does not contain any