Merge pull request #16010 from akka/wip-15946-simple-flow-graphs-patriknw

+str #15946 Support flows without junctions in FlowGraph
This commit is contained in:
Patrik Nordwall 2014-10-03 14:09:33 +02:00
commit b2e7804512
4 changed files with 402 additions and 169 deletions

View file

@ -164,53 +164,65 @@ class FlowGraphCompileSpec extends AkkaSpec {
}.run()
}
"attachTap and attachDrain" in {
"attachSource and attachSink" in {
val mg = FlowGraph { b
val merge = Merge[String]
val undefinedSrc1 = UndefinedTap[String]
val undefinedSrc2 = UndefinedTap[String]
val undefinedDrain1 = UndefinedDrain[String]
val undefinedSource1 = UndefinedSource[String]
val undefinedSource2 = UndefinedSource[String]
val undefinedSink1 = UndefinedSink[String]
b.
addEdge(undefinedSrc1, f1, merge).
addEdge(UndefinedTap[String]("src2"), f2, merge).
addEdge(merge, f3, undefinedDrain1)
addEdge(undefinedSource1, f1, merge).
addEdge(UndefinedSource[String]("src2"), f2, merge).
addEdge(merge, f3, undefinedSink1)
b.attachTap(undefinedSrc1, in1)
b.attachTap(UndefinedTap[String]("src2"), in2)
b.attachDrain(undefinedDrain1, out1)
b.attachSource(undefinedSource1, in1)
b.attachSource(UndefinedSource[String]("src2"), in2)
b.attachSink(undefinedSink1, out1)
}.run()
out1.publisher(mg) should not be (null)
}
"build partial flow graphs" in {
val undefinedSrc1 = UndefinedTap[String]
val undefinedSrc2 = UndefinedTap[String]
val undefinedDrain1 = UndefinedDrain[String]
val undefinedSource1 = UndefinedSource[String]
val undefinedSource2 = UndefinedSource[String]
val undefinedSink1 = UndefinedSink[String]
val bcast = Broadcast[String]
val partial1 = PartialFlowGraph { implicit b
import FlowGraphImplicits._
val merge = Merge[String]
undefinedSrc1 ~> f1 ~> merge ~> f2 ~> bcast ~> f3 ~> undefinedDrain1
undefinedSrc2 ~> f4 ~> merge
undefinedSource1 ~> f1 ~> merge ~> f2 ~> bcast ~> f3 ~> undefinedSink1
undefinedSource2 ~> f4 ~> merge
}
partial1.undefinedTaps should be(Set(undefinedSrc1, undefinedSrc2))
partial1.undefinedDrains should be(Set(undefinedDrain1))
partial1.undefinedSources should be(Set(undefinedSource1, undefinedSource2))
partial1.undefinedSinks should be(Set(undefinedSink1))
val partial2 = PartialFlowGraph(partial1) { implicit b
import FlowGraphImplicits._
b.attachTap(undefinedSrc1, in1)
b.attachTap(undefinedSrc2, in2)
bcast ~> f5 ~> UndefinedDrain[String]("drain2")
b.attachSource(undefinedSource1, in1)
b.attachSource(undefinedSource2, in2)
bcast ~> f5 ~> UndefinedSink[String]("drain2")
}
partial2.undefinedTaps should be(Set.empty)
partial2.undefinedDrains should be(Set(undefinedDrain1, UndefinedDrain[String]("drain2")))
partial2.undefinedSources should be(Set.empty)
partial2.undefinedSinks should be(Set(undefinedSink1, UndefinedSink[String]("drain2")))
FlowGraph(partial2) { implicit b
b.attachDrain(undefinedDrain1, out1)
b.attachDrain(UndefinedDrain[String]("drain2"), out2)
FlowGraph(partial2) { b
b.attachSink(undefinedSink1, out1)
b.attachSink(UndefinedSink[String]("drain2"), out2)
}.run()
FlowGraph(partial2) { b
b.attachSink(undefinedSink1, f1.connect(out1))
b.attachSink(UndefinedSink[String]("drain2"), f2.connect(out2))
}.run()
FlowGraph(partial1) { implicit b
import FlowGraphImplicits._
b.attachSink(undefinedSink1, f1.connect(out1))
b.attachSource(undefinedSource1, Source(List("a", "b", "c")).connect(f1))
b.attachSource(undefinedSource2, Source(List("d", "e", "f")).connect(f2))
bcast ~> f5 ~> out2
}.run()
}
@ -293,8 +305,8 @@ class FlowGraphCompileSpec extends AkkaSpec {
FlowGraph { b
val merge = Merge[Fruit]
b.
addEdge(Source[Fruit](() Some(new Apple)), merge).
addEdge(Source[Apple](() Some(new Apple)), merge).
addEdge(Source[Fruit](() Some(new Apple)), Flow[Fruit], merge).
addEdge(Source[Apple](() Some(new Apple)), Flow[Apple], merge).
addEdge(merge, Flow[Fruit].map(identity), out)
}
}
@ -315,22 +327,22 @@ class FlowGraphCompileSpec extends AkkaSpec {
inB ~> merge
inA ~> Flow[Fruit].map(identity) ~> merge
inB ~> Flow[Apple].map(identity) ~> merge
UndefinedTap[Apple] ~> merge
UndefinedTap[Apple] ~> Flow[Fruit].map(identity) ~> merge
UndefinedTap[Apple] ~> Flow[Apple].map(identity) ~> merge
UndefinedSource[Apple] ~> merge
UndefinedSource[Apple] ~> Flow[Fruit].map(identity) ~> merge
UndefinedSource[Apple] ~> Flow[Apple].map(identity) ~> merge
merge ~> Flow[Fruit].map(identity) ~> outA
Source[Apple](() Some(new Apple)) ~> Broadcast[Apple] ~> merge
Source[Apple](() Some(new Apple)) ~> Broadcast[Apple] ~> outB
Source[Apple](() Some(new Apple)) ~> Broadcast[Apple] ~> UndefinedDrain[Fruit]
Source[Apple](() Some(new Apple)) ~> Broadcast[Apple] ~> UndefinedSink[Fruit]
inB ~> Broadcast[Apple] ~> merge
Source(List(1 -> "a", 2 -> "b", 3 -> "c")) ~> unzip.in
unzip.right ~> whatever
unzip.left ~> UndefinedDrain[Any]
unzip.left ~> UndefinedSink[Any]
"UndefinedTap[Fruit] ~> Flow[Apple].map(identity) ~> merge" shouldNot compile
"UndefinedTap[Fruit] ~> Broadcast[Apple]" shouldNot compile
"UndefinedSource[Fruit] ~> Flow[Apple].map(identity) ~> merge" shouldNot compile
"UndefinedSource[Fruit] ~> Broadcast[Apple]" shouldNot compile
"merge ~> Broadcast[Apple]" shouldNot compile
"merge ~> Flow[Fruit].map(identity) ~> Broadcast[Apple]" shouldNot compile
"inB ~> merge ~> Broadcast[Apple]" shouldNot compile
@ -338,5 +350,49 @@ class FlowGraphCompileSpec extends AkkaSpec {
}
}
"build with plain flow without junctions" in {
FlowGraph { b
b.addEdge(in1, f1, out1)
}.run()
FlowGraph { b
b.addEdge(in1, f1, f2.connect(out1))
}.run()
FlowGraph { b
b.addEdge(in1.connect(f1), f2, out1)
}.run()
FlowGraph { implicit b
import FlowGraphImplicits._
in1 ~> f1 ~> out1
}.run()
FlowGraph { implicit b
import FlowGraphImplicits._
in1 ~> out1
}.run()
FlowGraph { implicit b
import FlowGraphImplicits._
in1 ~> f1.connect(out1)
}.run()
FlowGraph { implicit b
import FlowGraphImplicits._
in1.connect(f1) ~> out1
}.run()
FlowGraph { implicit b
import FlowGraphImplicits._
in1.connect(f1) ~> f2.connect(out1)
}.run()
}
"build partial with only undefined sources and sinks" in {
PartialFlowGraph { b
b.addEdge(UndefinedSource[String], f1, UndefinedSink[String])
}
PartialFlowGraph { b
b.addEdge(UndefinedSource[String], f1, out1)
}
PartialFlowGraph { b
b.addEdge(in1, f1, UndefinedSink[String])
}
}
}
}

View file

@ -1,12 +1,12 @@
package akka.stream.scaladsl2
import akka.stream.testkit.AkkaSpec
import akka.stream.{ OverflowStrategy, MaterializerSettings }
import akka.stream.testkit.{ StreamTestKit, AkkaSpec }
import scala.concurrent.Await
import scala.concurrent.duration._
import akka.stream.scaladsl2.FlowGraphImplicits._
import akka.stream.testkit.StreamTestKit.SubscriberProbe
class GraphOpsIntegrationSpec extends AkkaSpec {
@ -98,6 +98,54 @@ class GraphOpsIntegrationSpec extends AkkaSpec {
Await.result(g.getDrainFor(resultFuture), 3.seconds) should contain theSameElementsAs (Seq(2, 4, 6, 5, 7, 9))
}
"be able to run plain flow" in {
val p = Source(List(1, 2, 3)).toPublisher()
val s = SubscriberProbe[Int]
val flow = Flow[Int].map(_ * 2)
FlowGraph { implicit builder
import FlowGraphImplicits._
PublisherTap(p) ~> flow ~> SubscriberDrain(s)
}.run()
val sub = s.expectSubscription()
sub.request(10)
s.expectNext(1 * 2)
s.expectNext(2 * 2)
s.expectNext(3 * 2)
s.expectComplete()
}
"support continued transformation from undefined source/sink" in {
val input1 = UndefinedSource[Int]
val output1 = UndefinedSink[Int]
val output2 = UndefinedSink[String]
val partial = PartialFlowGraph { implicit builder
val bcast = Broadcast[String]("bcast")
input1 ~> Flow[Int].map(_.toString) ~> bcast ~> Flow[String].map(_.toInt) ~> output1
bcast ~> Flow[String].map("elem-" + _) ~> output2
}
val s1 = SubscriberProbe[Int]
val s2 = SubscriberProbe[String]
FlowGraph(partial) { builder
builder.attachSource(input1, Source(List(0, 1, 2).map(_ + 1)))
builder.attachSink(output1, Flow[Int].filter(n (n % 2) != 0).connect(SubscriberDrain(s1)))
builder.attachSink(output2, Flow[String].map(_.toUpperCase).connect(SubscriberDrain(s2)))
}.run()
val sub1 = s1.expectSubscription()
val sub2 = s2.expectSubscription()
sub1.request(10)
sub2.request(10)
s1.expectNext(1)
s1.expectNext(3)
s1.expectComplete()
s2.expectNext("ELEM-1")
s2.expectNext("ELEM-2")
s2.expectNext("ELEM-3")
s2.expectComplete()
}
}
}

View file

@ -73,7 +73,7 @@ object Merge {
* (picking randomly when several have elements ready).
*
* When building the [[FlowGraph]] you must connect one or more input pipes/taps
* and one output pipe/drain to the `Merge` vertex.
* and one output pipe/sink to the `Merge` vertex.
*/
final class Merge[T](override val name: Option[String]) extends FlowGraphInternal.InternalVertex with Junction[T] {
override private[akka] val vertex = this
@ -258,64 +258,64 @@ final class Concat[T](override val name: Option[String]) extends FlowGraphIntern
override private[akka] def astNode = Ast.Concat
}
object UndefinedDrain {
object UndefinedSink {
/**
* Create a new anonymous `UndefinedDrain` vertex with the specified input type.
* Note that a `UndefinedDrain` instance can only be used at one place (one vertex)
* Create a new anonymous `UndefinedSink` vertex with the specified input type.
* Note that a `UndefinedSink` instance can only be used at one place (one vertex)
* in the `FlowGraph`. This method creates a new instance every time it
* is called and those instances are not `equal`.
*/
def apply[T]: UndefinedDrain[T] = new UndefinedDrain[T](None)
def apply[T]: UndefinedSink[T] = new UndefinedSink[T](None)
/**
* Create a named `UndefinedDrain` vertex with the specified input type.
* Note that a `UndefinedDrain` with a specific name can only be used at one place (one vertex)
* Create a named `UndefinedSink` vertex with the specified input type.
* Note that a `UndefinedSink` with a specific name can only be used at one place (one vertex)
* in the `FlowGraph`. Calling this method several times with the same name
* returns instances that are `equal`.
*/
def apply[T](name: String): UndefinedDrain[T] = new UndefinedDrain[T](Some(name))
def apply[T](name: String): UndefinedSink[T] = new UndefinedSink[T](Some(name))
}
/**
* It is possible to define a [[PartialFlowGraph]] with output pipes that are not connected
* yet by using this placeholder instead of the real [[Drain]]. Later the placeholder can
* be replaced with [[FlowGraphBuilder#attachDrain]].
*/
final class UndefinedDrain[-T](override val name: Option[String]) extends FlowGraphInternal.InternalVertex {
final class UndefinedSink[-T](override val name: Option[String]) extends FlowGraphInternal.InternalVertex {
override def minimumInputCount: Int = 1
override def maximumInputCount: Int = 1
override def minimumOutputCount: Int = 0
override def maximumOutputCount: Int = 0
override private[akka] def astNode = throw new UnsupportedOperationException("Undefined drains cannot be materialized")
override private[akka] def astNode = throw new UnsupportedOperationException("Undefined sinks cannot be materialized")
}
object UndefinedTap {
object UndefinedSource {
/**
* Create a new anonymous `UndefinedTap` vertex with the specified input type.
* Note that a `UndefinedTap` instance can only be used at one place (one vertex)
* Create a new anonymous `UndefinedSource` vertex with the specified input type.
* Note that a `UndefinedSource` instance can only be used at one place (one vertex)
* in the `FlowGraph`. This method creates a new instance every time it
* is called and those instances are not `equal`.
*/
def apply[T]: UndefinedTap[T] = new UndefinedTap[T](None)
def apply[T]: UndefinedSource[T] = new UndefinedSource[T](None)
/**
* Create a named `UndefinedTap` vertex with the specified output type.
* Note that a `UndefinedTap` with a specific name can only be used at one place (one vertex)
* Create a named `UndefinedSource` vertex with the specified output type.
* Note that a `UndefinedSource` with a specific name can only be used at one place (one vertex)
* in the `FlowGraph`. Calling this method several times with the same name
* returns instances that are `equal`.
*/
def apply[T](name: String): UndefinedTap[T] = new UndefinedTap[T](Some(name))
def apply[T](name: String): UndefinedSource[T] = new UndefinedSource[T](Some(name))
}
/**
* It is possible to define a [[PartialFlowGraph]] with input pipes that are not connected
* yet by using this placeholder instead of the real [[Tap]]. Later the placeholder can
* be replaced with [[FlowGraphBuilder#attachTap]].
*/
final class UndefinedTap[+T](override val name: Option[String]) extends FlowGraphInternal.InternalVertex {
final class UndefinedSource[+T](override val name: Option[String]) extends FlowGraphInternal.InternalVertex {
override def minimumInputCount: Int = 0
override def maximumInputCount: Int = 0
override def minimumOutputCount: Int = 1
override def maximumOutputCount: Int = 1
override private[akka] def astNode = throw new UnsupportedOperationException("Undefined taps cannot be materialized")
override private[akka] def astNode = throw new UnsupportedOperationException("Undefined sources cannot be materialized")
}
/**
@ -375,6 +375,9 @@ private[akka] object FlowGraphInternal {
val outputPort: Int) {
override def toString: String = pipe.toString
def withPipe(newFlow: Pipe[Any, Nothing]): EdgeLabel =
EdgeLabel(qualifier)(newFlow, inputPort, outputPort)
}
type EdgeType[T] = LkDiEdge[T] { type L1 = EdgeLabel }
@ -402,97 +405,157 @@ class FlowGraphBuilder private (graph: Graph[FlowGraphInternal.Vertex, FlowGraph
private var cyclesAllowed = false
private def addTapPipeEdge[In, Out](tap: Tap[In], pipe: Pipe[In, Out], drain: JunctionInPort[Out]): this.type = {
private def addTapPipeEdge[In, Out](tap: Tap[In], pipe: Pipe[In, Out], junctionIn: JunctionInPort[Out]): this.type = {
val tapVertex = TapVertex(tap)
checkAddTapDrainPrecondition(tapVertex)
checkJunctionInPortPrecondition(drain)
addGraphEdge(tapVertex, drain.vertex, pipe, inputPort = drain.port, outputPort = UnlabeledPort)
checkJunctionInPortPrecondition(junctionIn)
addGraphEdge(tapVertex, junctionIn.vertex, pipe, inputPort = junctionIn.port, outputPort = UnlabeledPort)
this
}
private def addPipeDrainEdge[In, Out](tap: JunctionOutPort[In], pipe: Pipe[In, Out], drain: Drain[Out]): this.type = {
private def addPipeDrainEdge[In, Out](junctionOut: JunctionOutPort[In], pipe: Pipe[In, Out], drain: Drain[Out]): this.type = {
val drainVertex = DrainVertex(drain)
checkAddTapDrainPrecondition(drainVertex)
checkJunctionOutPortPrecondition(tap)
addGraphEdge(tap.vertex, drainVertex, pipe, inputPort = UnlabeledPort, outputPort = tap.port)
checkJunctionOutPortPrecondition(junctionOut)
addGraphEdge(junctionOut.vertex, drainVertex, pipe, inputPort = UnlabeledPort, outputPort = junctionOut.port)
this
}
def addEdge[In, Out](tap: UndefinedTap[In], flow: Flow[In, Out], drain: JunctionInPort[Out]): this.type = {
checkAddTapDrainPrecondition(tap)
checkJunctionInPortPrecondition(drain)
def addEdge[In, Out](source: UndefinedSource[In], flow: Flow[In, Out], junctionIn: JunctionInPort[Out]): this.type = {
checkAddTapDrainPrecondition(source)
checkJunctionInPortPrecondition(junctionIn)
flow match {
case pipe: Pipe[In, Out]
addGraphEdge(tap, drain.vertex, pipe, inputPort = drain.port, outputPort = UnlabeledPort)
addGraphEdge(source, junctionIn.vertex, pipe, inputPort = junctionIn.port, outputPort = UnlabeledPort)
case _
throw new IllegalArgumentException(OnlyPipesErrorMessage)
}
this
}
def addEdge[In, Out](tap: JunctionOutPort[In], flow: Flow[In, Out], drain: UndefinedDrain[Out]): this.type = {
checkAddTapDrainPrecondition(drain)
checkJunctionOutPortPrecondition(tap)
def addEdge[In, Out](junctionOut: JunctionOutPort[In], flow: Flow[In, Out], sink: UndefinedSink[Out]): this.type = {
checkAddTapDrainPrecondition(sink)
checkJunctionOutPortPrecondition(junctionOut)
flow match {
case pipe: Pipe[In, Out]
addGraphEdge(tap.vertex, drain, pipe, inputPort = UnlabeledPort, outputPort = tap.port)
addGraphEdge(junctionOut.vertex, sink, pipe, inputPort = UnlabeledPort, outputPort = junctionOut.port)
case _
throw new IllegalArgumentException(OnlyPipesErrorMessage)
}
this
}
def addEdge[In, Out](tap: JunctionOutPort[In], flow: Flow[In, Out], drain: JunctionInPort[Out]): this.type = {
checkJunctionOutPortPrecondition(tap)
checkJunctionInPortPrecondition(drain)
def addEdge[In, Out](junctionOut: JunctionOutPort[In], flow: Flow[In, Out], junctionIn: JunctionInPort[Out]): this.type = {
checkJunctionOutPortPrecondition(junctionOut)
checkJunctionInPortPrecondition(junctionIn)
flow match {
case pipe: Pipe[In, Out]
addGraphEdge(tap.vertex, drain.vertex, pipe, inputPort = drain.port, outputPort = tap.port)
addGraphEdge(junctionOut.vertex, junctionIn.vertex, pipe, inputPort = junctionIn.port, outputPort = junctionOut.port)
case _
throw new IllegalArgumentException(OnlyPipesErrorMessage)
}
this
}
def addEdge[In, Out](source: Source[In], flow: Flow[In, Out], drain: JunctionInPort[Out]): this.type = {
def addEdge[In, Out](source: Source[In], flow: Flow[In, Out], junctionIn: JunctionInPort[Out]): this.type = {
(source, flow) match {
case (tap: Tap[In], pipe: Pipe[In, Out])
addTapPipeEdge(tap, pipe, drain)
addTapPipeEdge(tap, pipe, junctionIn)
case (spipe: SourcePipe[In], pipe: Pipe[In, Out])
addEdge(spipe.connect(pipe), drain)
addTapPipeEdge(spipe.input, Pipe(spipe.ops).appendPipe(pipe), junctionIn)
case _
throw new IllegalArgumentException(OnlyPipesErrorMessage)
}
this
}
def addEdge[Out](source: Source[Out], drain: JunctionInPort[Out]): this.type = {
source match {
case tap: Tap[Out]
addTapPipeEdge(tap, Pipe.empty[Out], drain)
case pipe: SourcePipe[Out]
addTapPipeEdge(pipe.input, Pipe(pipe.ops), drain)
case _
throw new IllegalArgumentException(OnlyPipesErrorMessage)
}
this
}
def addEdge[In, Out](tap: JunctionOutPort[In], sink: Sink[In]): this.type = {
def addEdge[In, Out](junctionOut: JunctionOutPort[In], sink: Sink[In]): this.type = {
sink match {
case drain: Drain[In] addPipeDrainEdge(tap, Pipe.empty[In], drain)
case pipe: SinkPipe[In] addPipeDrainEdge(tap, Pipe(pipe.ops), pipe.output)
case drain: Drain[In] addPipeDrainEdge(junctionOut, Pipe.empty[In], drain)
case pipe: SinkPipe[In] addPipeDrainEdge(junctionOut, Pipe(pipe.ops), pipe.output)
case _ throw new IllegalArgumentException(OnlyPipesErrorMessage)
}
this
}
def addEdge[In, Out](tap: JunctionOutPort[In], flow: Flow[In, Out], sink: Sink[Out]): this.type = {
def addEdge[In, Out](junctionOut: JunctionOutPort[In], flow: Flow[In, Out], sink: Sink[Out]): this.type = {
(flow, sink) match {
case (pipe: Pipe[In, Out], drain: Drain[Out])
addPipeDrainEdge(tap, pipe, drain)
addPipeDrainEdge(junctionOut, pipe, drain)
case (pipe: Pipe[In, Out], spipe: SinkPipe[Out])
addEdge(tap, pipe.connect(spipe))
addEdge(junctionOut, pipe.connect(spipe))
case _ throw new IllegalArgumentException(OnlyPipesErrorMessage)
}
this
}
def addEdge[In, Out](source: Source[In], flow: Flow[In, Out], sink: Sink[Out]): this.type = {
(source, flow, sink) match {
case (tap: Tap[In], pipe: Pipe[In, Out], drain: Drain[Out])
val tapVertex = TapVertex(tap)
val drainVertex = DrainVertex(drain)
checkAddTapDrainPrecondition(tapVertex)
checkAddTapDrainPrecondition(drainVertex)
addGraphEdge(tapVertex, drainVertex, pipe, inputPort = UnlabeledPort, outputPort = UnlabeledPort)
case (sourcePipe: SourcePipe[In], pipe: Pipe[In, Out], sinkPipe: SinkPipe[Out])
val tap = sourcePipe.input
val newPipe = Pipe(sourcePipe.ops).connect(pipe).connect(Pipe(sinkPipe.ops))
val drain = sinkPipe.output
addEdge(tap, newPipe, drain) // recursive, but now it is a Tap-Pipe-Drain
case (tap: Tap[In], pipe: Pipe[In, Out], sinkPipe: SinkPipe[Out])
val newPipe = pipe.connect(Pipe(sinkPipe.ops))
val drain = sinkPipe.output
addEdge(tap, newPipe, drain) // recursive, but now it is a Tap-Pipe-Drain
case (sourcePipe: SourcePipe[In], pipe: Pipe[In, Out], drain: Drain[Out])
val tap = sourcePipe.input
val newPipe = Pipe(sourcePipe.ops).connect(pipe)
addEdge(tap, newPipe, drain) // recursive, but now it is a Tap-Pipe-Drain
case _
throw new IllegalArgumentException(OnlyPipesErrorMessage)
}
this
}
def addEdge[In, Out](source: UndefinedSource[In], flow: Flow[In, Out], sink: UndefinedSink[Out]): this.type = {
checkAddTapDrainPrecondition(source)
checkAddTapDrainPrecondition(sink)
flow match {
case pipe: Pipe[In, Out]
addGraphEdge(source, sink, pipe, inputPort = UnlabeledPort, outputPort = UnlabeledPort)
case _
throw new IllegalArgumentException(OnlyPipesErrorMessage)
}
this
}
def addEdge[In, Out](source: UndefinedSource[In], flow: Flow[In, Out], sink: Sink[Out]): this.type = {
checkAddTapDrainPrecondition(source)
(flow, sink) match {
case (pipe: Pipe[In, Out], drain: Drain[Out])
val drainVertex = DrainVertex(drain)
checkAddTapDrainPrecondition(drainVertex)
addGraphEdge(source, drainVertex, pipe, inputPort = UnlabeledPort, outputPort = UnlabeledPort)
case (pipe: Pipe[In, Out], spipe: SinkPipe[Out])
val drainVertex = DrainVertex(spipe.output)
checkAddTapDrainPrecondition(drainVertex)
addGraphEdge(source, drainVertex, pipe.appendPipe(Pipe(spipe.ops)), inputPort = UnlabeledPort, outputPort = UnlabeledPort)
case _ throw new IllegalArgumentException(OnlyPipesErrorMessage)
}
this
}
def addEdge[In, Out](source: Source[In], flow: Flow[In, Out], sink: UndefinedSink[Out]): this.type = {
checkAddTapDrainPrecondition(sink)
(flow, source) match {
case (pipe: Pipe[In, Out], tap: Tap[In])
val tapVertex = TapVertex(tap)
checkAddTapDrainPrecondition(tapVertex)
addGraphEdge(tapVertex, sink, pipe, inputPort = UnlabeledPort, outputPort = UnlabeledPort)
case (pipe: Pipe[In, Out], spipe: SourcePipe[Out])
val tapVertex = TapVertex(spipe.input)
checkAddTapDrainPrecondition(tapVertex)
addGraphEdge(tapVertex, sink, Pipe(spipe.ops).appendPipe(pipe), inputPort = UnlabeledPort, outputPort = UnlabeledPort)
case _ throw new IllegalArgumentException(OnlyPipesErrorMessage)
}
this
@ -505,24 +568,51 @@ class FlowGraphBuilder private (graph: Graph[FlowGraphInternal.Vertex, FlowGraph
edgeQualifier += 1
}
def attachDrain[Out](token: UndefinedDrain[Out], drain: Drain[Out]): this.type = {
def attachSink[Out](token: UndefinedSink[Out], sink: Sink[Out]): this.type = {
graph.find(token) match {
case Some(existing)
val edge = existing.incoming.head
graph.remove(existing)
graph.addLEdge(edge.from.value, DrainVertex(drain))(edge.label)
case None throw new IllegalArgumentException(s"No matching UndefinedDrain [${token}]")
sink match {
case drain: Drain[Out]
val drainVertex = DrainVertex(drain)
checkAddTapDrainPrecondition(drainVertex)
graph.addLEdge(edge.from.value, drainVertex)(edge.label)
case spipe: SinkPipe[Out]
val pipe = edge.label.pipe.appendPipe(Pipe(spipe.ops))
val label = edge.label.withPipe(pipe)
val drainVertex = DrainVertex(spipe.output)
checkAddTapDrainPrecondition(drainVertex)
graph.addLEdge(edge.from.value, drainVertex)(label)
case _ throw new IllegalArgumentException(OnlyPipesErrorMessage)
}
case None throw new IllegalArgumentException(s"No matching UndefinedSink [${token}]")
}
this
}
def attachTap[In](token: UndefinedTap[In], tap: Tap[In]): this.type = {
def attachSource[In](token: UndefinedSource[In], source: Source[In]): this.type = {
graph.find(token) match {
case Some(existing)
val edge = existing.outgoing.head
graph.remove(existing)
graph.addLEdge(TapVertex(tap), edge.to.value)(edge.label)
case None throw new IllegalArgumentException(s"No matching UndefinedTap [${token}]")
source match {
case tap: Tap[In]
val tapVertex = TapVertex(tap)
checkAddTapDrainPrecondition(tapVertex)
graph.addLEdge(tapVertex, edge.to.value)(edge.label)
case spipe: SourcePipe[In]
val pipe = Pipe(spipe.ops).appendPipe(edge.label.pipe)
val label = edge.label.withPipe(pipe)
val tapVertex = TapVertex(spipe.input)
checkAddTapDrainPrecondition(tapVertex)
graph.addLEdge(tapVertex, edge.to.value)(label)
case _
throw new IllegalArgumentException(OnlyPipesErrorMessage)
}
case None throw new IllegalArgumentException(s"No matching UndefinedSource [${token}]")
}
this
}
@ -599,18 +689,18 @@ class FlowGraphBuilder private (graph: Graph[FlowGraphInternal.Vertex, FlowGraph
}
private def checkBuildPreconditions(): Unit = {
val undefinedTapsDrains = graph.nodes.filter {
val undefinedSourcesDrains = graph.nodes.filter {
_.value match {
case _: UndefinedTap[_] | _: UndefinedDrain[_] true
case x false
case _: UndefinedSource[_] | _: UndefinedSink[_] true
case x false
}
}
if (undefinedTapsDrains.nonEmpty) {
val formatted = undefinedTapsDrains.map(n n.value match {
case u: UndefinedTap[_] s"$u -> ${n.outgoing.head.label} -> ${n.outgoing.head.to}"
case u: UndefinedDrain[_] s"${n.incoming.head.from} -> ${n.incoming.head.label} -> $u"
if (undefinedSourcesDrains.nonEmpty) {
val formatted = undefinedSourcesDrains.map(n n.value match {
case u: UndefinedSource[_] s"$u -> ${n.outgoing.head.label} -> ${n.outgoing.head.to}"
case u: UndefinedSink[_] s"${n.incoming.head.from} -> ${n.incoming.head.label} -> $u"
})
throw new IllegalArgumentException("Undefined taps or drains: " + formatted.mkString(", "))
throw new IllegalArgumentException("Undefined sources or sinks: " + formatted.mkString(", "))
}
graph.nodes.foreach { node
@ -634,9 +724,9 @@ class FlowGraphBuilder private (graph: Graph[FlowGraphInternal.Vertex, FlowGraph
require(graph.nonEmpty, "Graph must not be empty")
require(graph.exists(graph having ((node = { n n.isLeaf && n.diSuccessors.isEmpty }))),
"Graph must have at least one drain")
"Graph must have at least one sink")
require(graph.exists(graph having ((node = { n n.isLeaf && n.diPredecessors.isEmpty }))),
"Graph must have at least one tap")
"Graph must have at least one source")
require(graph.isConnected, "Graph must be connected")
}
@ -658,7 +748,7 @@ object FlowGraph {
/**
* Continue building a [[FlowGraph]] from an existing `PartialFlowGraph`.
* For example you can attach undefined taps and drains with
* For example you can attach undefined sources and sinks with
* [[FlowGraphBuilder#attachTap]] and [[FlowGraphBuilder#attachDrain]]
*/
def apply(partialFlowGraph: PartialFlowGraph)(block: FlowGraphBuilder Unit): FlowGraph =
@ -688,9 +778,40 @@ class FlowGraph private[akka] (private[akka] val graph: ImmutableGraph[FlowGraph
import FlowGraphInternal._
/**
* Materialize the `FlowGraph` and attach all drains and taps.
* Materialize the `FlowGraph` and attach all sinks and sources.
*/
def run()(implicit materializer: FlowMaterializer): MaterializedPipeGraph = {
def run()(implicit materializer: FlowMaterializer): MaterializedFlowGraph = {
val edges = graph.edges
if (edges.size == 1) {
val edge = edges.head
(edge.from.value, edge.to.value) match {
case (tapVertex: TapVertex, drainVertex: DrainVertex)
val pipe = edge.label.pipe
runSimple(tapVertex, drainVertex, pipe)
case _
runGraph()
}
} else
runGraph()
}
/**
* Run FlowGraph that only contains one edge from a `Source` to a `Sink`.
*/
private def runSimple(tapVertex: TapVertex, drainVertex: DrainVertex, pipe: Pipe[Any, Nothing])(implicit materializer: FlowMaterializer): MaterializedFlowGraph = {
val mf = pipe.withTap(tapVertex.tap).withDrain(drainVertex.drain).run()
val materializedSources: Map[TapWithKey[_, _], Any] = tapVertex match {
case TapVertex(tap: TapWithKey[_, _]) Map(tap -> mf.getTapFor(tap))
case _ Map.empty
}
val materializedSinks: Map[DrainWithKey[_, _], Any] = drainVertex match {
case DrainVertex(drain: DrainWithKey[_, _]) Map(drain -> mf.getDrainFor(drain))
case _ Map.empty
}
new MaterializedFlowGraph(materializedSources, materializedSinks)
}
private def runGraph()(implicit materializer: FlowMaterializer): MaterializedFlowGraph = {
import scalax.collection.GraphTraversal._
// start with drains
@ -731,11 +852,11 @@ class FlowGraph private[akka] (private[akka] val graph: ImmutableGraph[FlowGraph
}
edge.from.value match {
case src: TapVertex
case tap: TapVertex
val f = pipe.withDrain(SubscriberDrain(memo.downstreamSubscriber(edge)))
// connect the tap with the pipe later
memo.copy(visited = memo.visited + edge,
taps = memo.taps.updated(src, f))
taps = memo.taps.updated(tap, f))
case v: InternalVertex
if (memo.upstreamPublishers.contains(edge)) {
@ -772,15 +893,15 @@ class FlowGraph private[akka] (private[akka] val graph: ImmutableGraph[FlowGraph
// connect all input taps as the last thing
val materializedTaps = result.taps.foldLeft(Map.empty[TapWithKey[_, _], Any]) {
case (acc, (TapVertex(src), pipe))
val mf = pipe.withTap(src).run()
src match {
case srcKey: TapWithKey[_, _] acc.updated(srcKey, mf.getTapFor(srcKey))
case (acc, (TapVertex(tap), pipe))
val mf = pipe.withTap(tap).run()
tap match {
case tapKey: TapWithKey[_, _] acc.updated(tapKey, mf.getTapFor(tapKey))
case _ acc
}
}
new MaterializedPipeGraph(materializedTaps, result.materializedDrains)
new MaterializedFlowGraph(materializedTaps, result.materializedDrains)
}
}
@ -819,7 +940,7 @@ object PartialFlowGraph {
}
/**
* `PartialFlowGraph` may have taps and drains that are not attached, and it can therefore not
* `PartialFlowGraph` may have sources and sinks that are not attached, and it can therefore not
* be `run` until those are attached.
*
* Build a `PartialFlowGraph` by starting with one of the `apply` methods in
@ -828,14 +949,14 @@ object PartialFlowGraph {
class PartialFlowGraph private[akka] (private[akka] val graph: ImmutableGraph[FlowGraphInternal.Vertex, FlowGraphInternal.EdgeType]) {
import FlowGraphInternal._
def undefinedTaps: Set[UndefinedTap[_]] =
def undefinedSources: Set[UndefinedSource[_]] =
graph.nodes.iterator.map(_.value).collect {
case n: UndefinedTap[_] n
case n: UndefinedSource[_] n
}.toSet
def undefinedDrains: Set[UndefinedDrain[_]] =
def undefinedSinks: Set[UndefinedSink[_]] =
graph.nodes.iterator.map(_.value).collect {
case n: UndefinedDrain[_] n
case n: UndefinedSink[_] n
}.toSet
}
@ -845,7 +966,7 @@ class PartialFlowGraph private[akka] (private[akka] val graph: ImmutableGraph[Fl
* accessor method to retrieve the materialized `Tap` or `Drain`, e.g.
* [[SubscriberTap#subscriber]] or [[PublisherDrain#publisher]].
*/
class MaterializedPipeGraph(materializedTaps: Map[TapWithKey[_, _], Any], materializedDrains: Map[DrainWithKey[_, _], Any])
class MaterializedFlowGraph(materializedTaps: Map[TapWithKey[_, _], Any], materializedDrains: Map[DrainWithKey[_, _], Any])
extends MaterializedTap with MaterializedDrain {
/**
@ -874,69 +995,75 @@ class MaterializedPipeGraph(materializedTaps: Map[TapWithKey[_, _], Any], materi
*/
object FlowGraphImplicits {
class SourceNextStep[In, Out](source: Source[In], flow: Flow[In, Out], builder: FlowGraphBuilder) {
def ~>(drain: JunctionInPort[Out]): JunctionOutPort[drain.NextT] = {
builder.addEdge(source, flow, drain)
drain.next
implicit class SourceOps[Out](val source: Source[Out]) extends AnyVal {
def ~>[O](flow: Flow[Out, O])(implicit builder: FlowGraphBuilder): SourceNextStep[Out, O] =
new SourceNextStep(source, flow, builder)
def ~>(junctionIn: JunctionInPort[Out])(implicit builder: FlowGraphBuilder): JunctionOutPort[junctionIn.NextT] = {
builder.addEdge(source, Pipe.empty[Out], junctionIn)
junctionIn.next
}
def ~>(sink: Sink[Out])(implicit builder: FlowGraphBuilder): Unit =
builder.addEdge(source, Pipe.empty[Out], sink)
}
class SourceNextStep[In, Out](source: Source[In], flow: Flow[In, Out], builder: FlowGraphBuilder) {
def ~>(junctionIn: JunctionInPort[Out]): JunctionOutPort[junctionIn.NextT] = {
builder.addEdge(source, flow, junctionIn)
junctionIn.next
}
def ~>(sink: Sink[Out]): Unit =
builder.addEdge(source, flow, sink)
}
implicit class JunctionOps[In](val junction: JunctionOutPort[In]) extends AnyVal {
def ~>[Out](flow: Flow[In, Out])(implicit builder: FlowGraphBuilder): JunctionNextStep[In, Out] =
new JunctionNextStep(junction, flow, builder)
def ~>(drain: UndefinedDrain[In])(implicit builder: FlowGraphBuilder): Unit =
builder.addEdge(junction, Pipe.empty[In], drain)
def ~>(sink: UndefinedSink[In])(implicit builder: FlowGraphBuilder): Unit =
builder.addEdge(junction, Pipe.empty[In], sink)
def ~>(drain: JunctionInPort[In])(implicit builder: FlowGraphBuilder): JunctionOutPort[drain.NextT] = {
builder.addEdge(junction, Pipe.empty[In], drain)
drain.next
def ~>(junctionIn: JunctionInPort[In])(implicit builder: FlowGraphBuilder): JunctionOutPort[junctionIn.NextT] = {
builder.addEdge(junction, Pipe.empty[In], junctionIn)
junctionIn.next
}
def ~>(sink: Sink[In])(implicit builder: FlowGraphBuilder): Unit = builder.addEdge(junction, sink)
}
class JunctionNextStep[In, Out](junction: JunctionOutPort[In], flow: Flow[In, Out], builder: FlowGraphBuilder) {
def ~>(drain: JunctionInPort[Out]): JunctionOutPort[drain.NextT] = {
builder.addEdge(junction, flow, drain)
drain.next
class JunctionNextStep[In, Out](junctionOut: JunctionOutPort[In], flow: Flow[In, Out], builder: FlowGraphBuilder) {
def ~>(junctionIn: JunctionInPort[Out]): JunctionOutPort[junctionIn.NextT] = {
builder.addEdge(junctionOut, flow, junctionIn)
junctionIn.next
}
def ~>(sink: Sink[Out]): Unit = {
builder.addEdge(junction, flow, sink)
builder.addEdge(junctionOut, flow, sink)
}
def ~>(drain: UndefinedDrain[Out]): Unit = {
builder.addEdge(junction, flow, drain)
def ~>(sink: UndefinedSink[Out]): Unit = {
builder.addEdge(junctionOut, flow, sink)
}
}
implicit class SourceOps[Out](val source: Source[Out]) extends AnyVal {
implicit class UndefinedSourceOps[In](val source: UndefinedSource[In]) extends AnyVal {
def ~>[Out](flow: Flow[In, Out])(implicit builder: FlowGraphBuilder): UndefinedSourceNextStep[In, Out] =
new UndefinedSourceNextStep(source, flow, builder)
def ~>[O](flow: Flow[Out, O])(implicit builder: FlowGraphBuilder): SourceNextStep[Out, O] =
new SourceNextStep(source, flow, builder)
def ~>(drain: JunctionInPort[Out])(implicit builder: FlowGraphBuilder): JunctionOutPort[drain.NextT] = {
builder.addEdge(source, drain)
drain.next
}
}
implicit class UndefinedTapOps[In](val tap: UndefinedTap[In]) extends AnyVal {
def ~>[Out](flow: Flow[In, Out])(implicit builder: FlowGraphBuilder): UndefinedTapNextStep[In, Out] =
new UndefinedTapNextStep(tap, flow, builder)
def ~>(drain: JunctionInPort[In])(implicit builder: FlowGraphBuilder): JunctionOutPort[drain.NextT] = {
builder.addEdge(tap, Pipe.empty[In], drain)
drain.next
def ~>(junctionIn: JunctionInPort[In])(implicit builder: FlowGraphBuilder): JunctionOutPort[junctionIn.NextT] = {
builder.addEdge(source, Pipe.empty[In], junctionIn)
junctionIn.next
}
}
class UndefinedTapNextStep[In, Out](tap: UndefinedTap[In], flow: Flow[In, Out], builder: FlowGraphBuilder) {
def ~>(drain: JunctionInPort[Out]): JunctionOutPort[drain.NextT] = {
builder.addEdge(tap, flow, drain)
drain.next
class UndefinedSourceNextStep[In, Out](source: UndefinedSource[In], flow: Flow[In, Out], builder: FlowGraphBuilder) {
def ~>(junctionIn: JunctionInPort[Out]): JunctionOutPort[junctionIn.NextT] = {
builder.addEdge(source, flow, junctionIn)
junctionIn.next
}
}

View file

@ -226,9 +226,9 @@ private[scaladsl2] final case class Pipe[-In, +Out](ops: List[AstNode]) extends
override protected def andThen[U](op: AstNode): Repr[U] = this.copy(ops = op :: ops)
def withDrain(out: Drain[Out]): SinkPipe[In] = SinkPipe(out, ops)
private[scaladsl2] def withDrain(out: Drain[Out]): SinkPipe[In] = SinkPipe(out, ops)
def withTap(in: Tap[In]): SourcePipe[Out] = SourcePipe(in, ops)
private[scaladsl2] def withTap(in: Tap[In]): SourcePipe[Out] = SourcePipe(in, ops)
override def connect[T](flow: Flow[Out, T]): Flow[In, T] = flow match {
case p: Pipe[T, In] Pipe(p.ops ++: ops)
@ -240,6 +240,8 @@ private[scaladsl2] final case class Pipe[-In, +Out](ops: List[AstNode]) extends
case d: Drain[Out] this.withDrain(d)
case _ throw new IllegalArgumentException(Pipe.OnlyPipesErrorMessage)
}
private[scaladsl2] def appendPipe[T](pipe: Pipe[Out, T]): Pipe[In, T] = Pipe(pipe.ops ++: ops)
}
/**
@ -247,9 +249,9 @@ private[scaladsl2] final case class Pipe[-In, +Out](ops: List[AstNode]) extends
*/
private[scaladsl2] final case class SinkPipe[-In](output: Drain[_], ops: List[AstNode]) extends Sink[In] {
def withTap(in: Tap[In]): RunnablePipe = RunnablePipe(in, output, ops)
private[scaladsl2] def withTap(in: Tap[In]): RunnablePipe = RunnablePipe(in, output, ops)
def prependPipe[T](pipe: Pipe[T, In]): SinkPipe[T] = SinkPipe(output, ops ::: pipe.ops)
private[scaladsl2] def prependPipe[T](pipe: Pipe[T, In]): SinkPipe[T] = SinkPipe(output, ops ::: pipe.ops)
override def toSubscriber()(implicit materializer: FlowMaterializer): Subscriber[In @uncheckedVariance] = {
val subIn = SubscriberTap[In]()
@ -266,9 +268,9 @@ private[scaladsl2] final case class SourcePipe[+Out](input: Tap[_], ops: List[As
override protected def andThen[U](op: AstNode): Repr[U] = SourcePipe(input, op :: ops)
def withDrain(out: Drain[Out]): RunnablePipe = RunnablePipe(input, out, ops)
private[scaladsl2] def withDrain(out: Drain[Out]): RunnablePipe = RunnablePipe(input, out, ops)
def appendPipe[T](pipe: Pipe[Out, T]): SourcePipe[T] = SourcePipe(input, pipe.ops ++: ops)
private[scaladsl2] def appendPipe[T](pipe: Pipe[Out, T]): SourcePipe[T] = SourcePipe(input, pipe.ops ++: ops)
override def connect[T](flow: Flow[Out, T]): Source[T] = flow match {
case p: Pipe[Out, T] appendPipe(p)