+str #15946 Support flows without junctions in FlowGraph

* run simple flow with source/sink without any junctions in FlowGraph
* partial flow graph with undefined source/sink and flows, without junctions
* replace undefined source/sink with flow with source/sink
This commit is contained in:
Patrik Nordwall 2014-10-01 09:41:34 +02:00
parent 2f6499bcdf
commit f836339996
4 changed files with 402 additions and 169 deletions

View file

@ -1,12 +1,12 @@
package akka.stream.scaladsl2
import akka.stream.testkit.AkkaSpec
import akka.stream.{ OverflowStrategy, MaterializerSettings }
import akka.stream.testkit.{ StreamTestKit, AkkaSpec }
import scala.concurrent.Await
import scala.concurrent.duration._
import akka.stream.scaladsl2.FlowGraphImplicits._
import akka.stream.testkit.StreamTestKit.SubscriberProbe
class GraphOpsIntegrationSpec extends AkkaSpec {
@ -98,6 +98,54 @@ class GraphOpsIntegrationSpec extends AkkaSpec {
Await.result(g.getDrainFor(resultFuture), 3.seconds) should contain theSameElementsAs (Seq(2, 4, 6, 5, 7, 9))
}
"be able to run plain flow" in {
val p = Source(List(1, 2, 3)).toPublisher()
val s = SubscriberProbe[Int]
val flow = Flow[Int].map(_ * 2)
FlowGraph { implicit builder
import FlowGraphImplicits._
PublisherTap(p) ~> flow ~> SubscriberDrain(s)
}.run()
val sub = s.expectSubscription()
sub.request(10)
s.expectNext(1 * 2)
s.expectNext(2 * 2)
s.expectNext(3 * 2)
s.expectComplete()
}
"support continued transformation from undefined source/sink" in {
val input1 = UndefinedSource[Int]
val output1 = UndefinedSink[Int]
val output2 = UndefinedSink[String]
val partial = PartialFlowGraph { implicit builder
val bcast = Broadcast[String]("bcast")
input1 ~> Flow[Int].map(_.toString) ~> bcast ~> Flow[String].map(_.toInt) ~> output1
bcast ~> Flow[String].map("elem-" + _) ~> output2
}
val s1 = SubscriberProbe[Int]
val s2 = SubscriberProbe[String]
FlowGraph(partial) { builder
builder.attachSource(input1, Source(List(0, 1, 2).map(_ + 1)))
builder.attachSink(output1, Flow[Int].filter(n (n % 2) != 0).connect(SubscriberDrain(s1)))
builder.attachSink(output2, Flow[String].map(_.toUpperCase).connect(SubscriberDrain(s2)))
}.run()
val sub1 = s1.expectSubscription()
val sub2 = s2.expectSubscription()
sub1.request(10)
sub2.request(10)
s1.expectNext(1)
s1.expectNext(3)
s1.expectComplete()
s2.expectNext("ELEM-1")
s2.expectNext("ELEM-2")
s2.expectNext("ELEM-3")
s2.expectComplete()
}
}
}