diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl2/Flow.scala b/akka-stream/src/main/scala/akka/stream/scaladsl2/Flow.scala index ab9000620e..09557cf29d 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl2/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl2/Flow.scala @@ -369,6 +369,11 @@ trait FlowOps[-In, +Out] { andThen(TimerTransform(name, mkTransformer.asInstanceOf[() ⇒ TimerTransformer[Any, Any]])) } +object ProcessorFlow { + private val emptyInstance = ProcessorFlow[Any, Any](ops = Nil) + def empty[T]: ProcessorFlow[T, T] = emptyInstance.asInstanceOf[ProcessorFlow[T, T]] +} + /** * Flow without attached input and without attached output, can be used as a `Processor`. */ diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl2/FlowGraph.scala b/akka-stream/src/main/scala/akka/stream/scaladsl2/FlowGraph.scala index ff39c568c8..6fcf1114a4 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl2/FlowGraph.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl2/FlowGraph.scala @@ -588,6 +588,11 @@ object FlowGraphImplicits { def ~>[Out](flow: ProcessorFlow[In, Out])(implicit builder: FlowGraphBuilder): SourceNextStep[In, Out] = { new SourceNextStep(source, flow, builder) } + + def ~>(sink: Junction[In])(implicit builder: FlowGraphBuilder): Junction[In] = { + builder.addEdge(source, ProcessorFlow.empty[In], sink) + sink + } } class SourceNextStep[In, Out](source: Source[In], flow: ProcessorFlow[In, Out], builder: FlowGraphBuilder) { @@ -601,6 +606,20 @@ object FlowGraphImplicits { def ~>[Out](flow: ProcessorFlow[In, Out])(implicit builder: FlowGraphBuilder): JunctionNextStep[In, Out] = { new JunctionNextStep(junction, flow, builder) } + + def ~>(sink: Sink[In])(implicit builder: FlowGraphBuilder): Unit = + builder.addEdge(junction, ProcessorFlow.empty[In], sink) + + def ~>(sink: UndefinedSink[In])(implicit builder: FlowGraphBuilder): Unit = + builder.addEdge(junction, ProcessorFlow.empty[In], sink) + + def ~>(sink: Junction[In])(implicit builder: FlowGraphBuilder): Junction[In] = { + builder.addEdge(junction, ProcessorFlow.empty[In], sink) + sink + } + + def ~>(flow: FlowWithSink[In, _])(implicit builder: FlowGraphBuilder): Unit = + builder.addEdge(junction, flow) } class JunctionNextStep[In, Out](junction: Junction[In], flow: ProcessorFlow[In, Out], builder: FlowGraphBuilder) { @@ -625,12 +644,16 @@ object FlowGraphImplicits { } } - // FIXME add more for FlowWithSource and FlowWithSink, and shortcuts injecting identity flows - implicit class UndefinedSourceOps[In](val source: UndefinedSource[In]) extends AnyVal { def ~>[Out](flow: ProcessorFlow[In, Out])(implicit builder: FlowGraphBuilder): UndefinedSourceNextStep[In, Out] = { new UndefinedSourceNextStep(source, flow, builder) } + + def ~>(sink: Junction[In])(implicit builder: FlowGraphBuilder): Junction[In] = { + builder.addEdge(source, ProcessorFlow.empty[In], sink) + sink + } + } class UndefinedSourceNextStep[In, Out](source: UndefinedSource[In], flow: ProcessorFlow[In, Out], builder: FlowGraphBuilder) { diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl2/Source.scala b/akka-stream/src/main/scala/akka/stream/scaladsl2/Source.scala index fc66571808..bc6a24c9de 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl2/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl2/Source.scala @@ -19,7 +19,7 @@ object FlowFrom { * Helper to create `Flow` without [[Source]]. * Example usage: `FlowFrom[Int]` */ - def apply[T]: ProcessorFlow[T, T] = ProcessorFlow[T, T](Nil) + def apply[T]: ProcessorFlow[T, T] = ProcessorFlow.empty[T] /** * Helper to create `Flow` with [[Source]] from `Publisher`. diff --git a/akka-stream/src/test/scala/akka/stream/scaladsl2/FlowGraphCompileSpec.scala b/akka-stream/src/test/scala/akka/stream/scaladsl2/FlowGraphCompileSpec.scala index 100a7d8b47..73f2bc1ca7 100644 --- a/akka-stream/src/test/scala/akka/stream/scaladsl2/FlowGraphCompileSpec.scala +++ b/akka-stream/src/test/scala/akka/stream/scaladsl2/FlowGraphCompileSpec.scala @@ -204,5 +204,25 @@ class FlowGraphCompileSpec extends AkkaSpec { }.run() } + "make it optional to specify flows" in { + FlowGraph { implicit b ⇒ + val merge = Merge[String] + val bcast = Broadcast[String] + import FlowGraphImplicits._ + in1 ~> merge ~> bcast ~> out1 + in2 ~> merge + bcast ~> out2 + }.run() + } + + "use FlowWithSource and FlowWithSink" in { + FlowGraph { implicit b ⇒ + val bcast = Broadcast[String] + import FlowGraphImplicits._ + f1.withSource(in1) ~> bcast ~> f2.withSink(out1) + bcast ~> f3.withSink(out2) + }.run() + } + } }