From 026feea69b62ef775eb6e5116ae59e3e83cf780c Mon Sep 17 00:00:00 2001 From: Konrad 'ktoso' Malawski Date: Mon, 3 Nov 2014 12:59:05 +0100 Subject: [PATCH] +str #16191 adds Flow.concat(Source):Flow method --- .../scala/akka/stream/scaladsl/FlowSpec.scala | 22 +++++++++++++++++++ .../main/scala/akka/stream/javadsl/Flow.scala | 7 ++++++ .../scala/akka/stream/scaladsl/Flow.scala | 18 +++++++++++++++ 3 files changed, 47 insertions(+) diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSpec.scala index 826fd1a890..7dd4c36642 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSpec.scala @@ -309,6 +309,28 @@ class FlowSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug.rece val d2: Flow[String, (Boolean, Source[Fruit])] = Flow[String].map(_ ⇒ new Apple).groupBy(_ ⇒ true) val d3: Flow[String, (immutable.Seq[Apple], Source[Fruit])] = Flow[String].map(_ ⇒ new Apple).prefixAndTail(1) } + + "be able to concat with a Source" in { + val f1: Flow[Int, String] = Flow[Int].map(_.toString + "-s") + val s1: Source[Int] = Source(List(1, 2, 3)) + val s2: Source[Int] = Source(List(4, 5, 6)) + + val subs = StreamTestKit.SubscriberProbe[String]() + val subSink = Sink.publisher[String] + + val (_, res) = f1.concat(s2).runWith(s1, subSink) + + res.subscribe(subs) + val sub = subs.expectSubscription() + sub.request(9) + subs.expectNext("1-s") + subs.expectNext("2-s") + subs.expectNext("3-s") + subs.expectNext("4-s") + subs.expectNext("5-s") + subs.expectNext("6-s") + subs.expectComplete() + } } "A Flow with multiple subscribers (FanOutBox)" must { diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala index 45aeaaaca0..43fb78d222 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala @@ -369,6 +369,13 @@ class Flow[-In, +Out](delegate: scaladsl.Flow[In, Out]) { def flatten[U](strategy: akka.stream.FlattenStrategy[Out, U]): javadsl.Flow[In, U] = new Flow(delegate.flatten(strategy)) + /** + * Returns a new `Flow` that concatenates a secondary `Source` to this flow so that, + * the first element emitted by the given ("second") source is emitted after the last element of this Flow. + */ + def concat(second: javadsl.Source[In]): javadsl.Flow[In, Out] = + new Flow(delegate.concat(second.asScala)) + } /** diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala index 23cef6f6b5..73b0345313 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala @@ -40,6 +40,24 @@ trait Flow[-In, +Out] extends FlowOps[Out] { (m.get(source), m.get(sink)) } + /** + * Returns a new `Flow` that concatenates a secondary `Source` to this flow so that, + * the first element emitted by the given ("second") source is emitted after the last element of this Flow. + */ + def concat(second: Source[In]): Flow[In, Out] = { + Flow() { b ⇒ + val concatter = Concat[Out] + val source = UndefinedSource[In] + val sink = UndefinedSink[Out] + + b.addEdge(source, this, concatter.first) + .addEdge(second, this, concatter.second) + .addEdge(concatter.out, sink) + + source → sink + } + } + } object Flow {