=str Sugar to make flows between vertices optional

This commit is contained in:
Patrik Nordwall 2014-09-11 07:45:19 +02:00
parent 8bfd0c077e
commit b8c5019185
4 changed files with 51 additions and 3 deletions

View file

@ -369,6 +369,11 @@ trait FlowOps[-In, +Out] {
andThen(TimerTransform(name, mkTransformer.asInstanceOf[() TimerTransformer[Any, Any]]))
}
object ProcessorFlow {
private val emptyInstance = ProcessorFlow[Any, Any](ops = Nil)
def empty[T]: ProcessorFlow[T, T] = emptyInstance.asInstanceOf[ProcessorFlow[T, T]]
}
/**
* Flow without attached input and without attached output, can be used as a `Processor`.
*/

View file

@ -574,6 +574,11 @@ object FlowGraphImplicits {
def ~>[Out](flow: ProcessorFlow[In, Out])(implicit builder: FlowGraphBuilder): SourceNextStep[In, Out] = {
new SourceNextStep(source, flow, builder)
}
def ~>(sink: Junction[In])(implicit builder: FlowGraphBuilder): Junction[In] = {
builder.addEdge(source, ProcessorFlow.empty[In], sink)
sink
}
}
class SourceNextStep[In, Out](source: Source[In], flow: ProcessorFlow[In, Out], builder: FlowGraphBuilder) {
@ -587,6 +592,20 @@ object FlowGraphImplicits {
def ~>[Out](flow: ProcessorFlow[In, Out])(implicit builder: FlowGraphBuilder): JunctionNextStep[In, Out] = {
new JunctionNextStep(junction, flow, builder)
}
def ~>(sink: Sink[In])(implicit builder: FlowGraphBuilder): Unit =
builder.addEdge(junction, ProcessorFlow.empty[In], sink)
def ~>(sink: UndefinedSink[In])(implicit builder: FlowGraphBuilder): Unit =
builder.addEdge(junction, ProcessorFlow.empty[In], sink)
def ~>(sink: Junction[In])(implicit builder: FlowGraphBuilder): Junction[In] = {
builder.addEdge(junction, ProcessorFlow.empty[In], sink)
sink
}
def ~>(flow: FlowWithSink[In, _])(implicit builder: FlowGraphBuilder): Unit =
builder.addEdge(junction, flow)
}
class JunctionNextStep[In, Out](junction: Junction[In], flow: ProcessorFlow[In, Out], builder: FlowGraphBuilder) {
@ -611,12 +630,16 @@ object FlowGraphImplicits {
}
}
// FIXME add more for FlowWithSource and FlowWithSink, and shortcuts injecting identity flows
implicit class UndefinedSourceOps[In](val source: UndefinedSource[In]) extends AnyVal {
def ~>[Out](flow: ProcessorFlow[In, Out])(implicit builder: FlowGraphBuilder): UndefinedSourceNextStep[In, Out] = {
new UndefinedSourceNextStep(source, flow, builder)
}
def ~>(sink: Junction[In])(implicit builder: FlowGraphBuilder): Junction[In] = {
builder.addEdge(source, ProcessorFlow.empty[In], sink)
sink
}
}
class UndefinedSourceNextStep[In, Out](source: UndefinedSource[In], flow: ProcessorFlow[In, Out], builder: FlowGraphBuilder) {

View file

@ -19,7 +19,7 @@ object FlowFrom {
* Helper to create `Flow` without [[Source]].
* Example usage: `FlowFrom[Int]`
*/
def apply[T]: ProcessorFlow[T, T] = ProcessorFlow[T, T](Nil)
def apply[T]: ProcessorFlow[T, T] = ProcessorFlow.empty[T]
/**
* Helper to create `Flow` with [[Source]] from `Publisher`.

View file

@ -209,5 +209,25 @@ class FlowGraphCompileSpec extends AkkaSpec {
}.run()
}
"make it optional to specify flows" in {
FlowGraph { implicit b
val merge = Merge[String]
val bcast = Broadcast[String]
import FlowGraphImplicits._
in1 ~> merge ~> bcast ~> out1
in2 ~> merge
bcast ~> out2
}.run()
}
"use FlowWithSource and FlowWithSink" in {
FlowGraph { implicit b
val bcast = Broadcast[String]
import FlowGraphImplicits._
f1.withSource(in1) ~> bcast ~> f2.withSink(out1)
bcast ~> f3.withSink(out2)
}.run()
}
}
}