+str #16191 adds Flow.concat(Source):Flow method

This commit is contained in:
Konrad 'ktoso' Malawski 2014-11-03 12:59:05 +01:00
parent 62fb38b402
commit 026feea69b
3 changed files with 47 additions and 0 deletions

View file

@ -309,6 +309,28 @@ class FlowSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug.rece
val d2: Flow[String, (Boolean, Source[Fruit])] = Flow[String].map(_ new Apple).groupBy(_ true) val d2: Flow[String, (Boolean, Source[Fruit])] = Flow[String].map(_ new Apple).groupBy(_ true)
val d3: Flow[String, (immutable.Seq[Apple], Source[Fruit])] = Flow[String].map(_ new Apple).prefixAndTail(1) val d3: Flow[String, (immutable.Seq[Apple], Source[Fruit])] = Flow[String].map(_ new Apple).prefixAndTail(1)
} }
"be able to concat with a Source" in {
val f1: Flow[Int, String] = Flow[Int].map(_.toString + "-s")
val s1: Source[Int] = Source(List(1, 2, 3))
val s2: Source[Int] = Source(List(4, 5, 6))
val subs = StreamTestKit.SubscriberProbe[String]()
val subSink = Sink.publisher[String]
val (_, res) = f1.concat(s2).runWith(s1, subSink)
res.subscribe(subs)
val sub = subs.expectSubscription()
sub.request(9)
subs.expectNext("1-s")
subs.expectNext("2-s")
subs.expectNext("3-s")
subs.expectNext("4-s")
subs.expectNext("5-s")
subs.expectNext("6-s")
subs.expectComplete()
}
} }
"A Flow with multiple subscribers (FanOutBox)" must { "A Flow with multiple subscribers (FanOutBox)" must {

View file

@ -369,6 +369,13 @@ class Flow[-In, +Out](delegate: scaladsl.Flow[In, Out]) {
def flatten[U](strategy: akka.stream.FlattenStrategy[Out, U]): javadsl.Flow[In, U] = def flatten[U](strategy: akka.stream.FlattenStrategy[Out, U]): javadsl.Flow[In, U] =
new Flow(delegate.flatten(strategy)) new Flow(delegate.flatten(strategy))
/**
* Returns a new `Flow` that concatenates a secondary `Source` to this flow so that,
* the first element emitted by the given ("second") source is emitted after the last element of this Flow.
*/
def concat(second: javadsl.Source[In]): javadsl.Flow[In, Out] =
new Flow(delegate.concat(second.asScala))
} }
/** /**

View file

@ -40,6 +40,24 @@ trait Flow[-In, +Out] extends FlowOps[Out] {
(m.get(source), m.get(sink)) (m.get(source), m.get(sink))
} }
/**
* Returns a new `Flow` that concatenates a secondary `Source` to this flow so that,
* the first element emitted by the given ("second") source is emitted after the last element of this Flow.
*/
def concat(second: Source[In]): Flow[In, Out] = {
Flow() { b
val concatter = Concat[Out]
val source = UndefinedSource[In]
val sink = UndefinedSink[Out]
b.addEdge(source, this, concatter.first)
.addEdge(second, this, concatter.second)
.addEdge(concatter.out, sink)
source sink
}
}
} }
object Flow { object Flow {