add tests for large flows
This commit is contained in:
parent
4ff237667c
commit
aeb990deb2
1 changed files with 63 additions and 20 deletions
|
|
@ -25,6 +25,8 @@ class StreamLayoutSpec extends AkkaSpec {
|
|||
def testSource(): Module = testAtomic(0, 1)
|
||||
def testSink(): Module = testAtomic(1, 0)
|
||||
|
||||
implicit val materializer = ActorMaterializer(ActorMaterializerSettings(system).withAutoFusing(false))
|
||||
|
||||
"StreamLayout" must {
|
||||
|
||||
"be able to model simple linear stages" in {
|
||||
|
|
@ -95,14 +97,6 @@ class StreamLayoutSpec extends AkkaSpec {
|
|||
runnable0123a.isSource should be(false)
|
||||
}
|
||||
|
||||
"be able to model hierarchic linear modules" in {
|
||||
pending
|
||||
}
|
||||
|
||||
"be able to model graph layouts" in {
|
||||
pending
|
||||
}
|
||||
|
||||
"be able to materialize linear layouts" in {
|
||||
val source = testSource()
|
||||
val stage1 = testStage()
|
||||
|
|
@ -116,24 +110,73 @@ class StreamLayoutSpec extends AkkaSpec {
|
|||
checkMaterialized(runnable)
|
||||
}
|
||||
|
||||
"be able to materialize DAG layouts" in {
|
||||
pending
|
||||
val tooDeepForStack = 50000
|
||||
|
||||
}
|
||||
"be able to materialize cyclic layouts" in {
|
||||
pending
|
||||
"fail fusing when value computation is too complex" in {
|
||||
// this tests that the canary in to coal mine actually works
|
||||
val g = (1 to tooDeepForStack)
|
||||
.foldLeft(Flow[Int].mapMaterializedValue(_ ⇒ 1)) { (flow, i) ⇒
|
||||
flow.mapMaterializedValue(x ⇒ x + i)
|
||||
}
|
||||
a[StackOverflowError] shouldBe thrownBy {
|
||||
Fusing.aggressive(g)
|
||||
}
|
||||
}
|
||||
|
||||
"be able to model hierarchic graph modules" in {
|
||||
pending
|
||||
"not fail materialization when building a large graph with simple computation" when {
|
||||
|
||||
"starting from a Source" in {
|
||||
val g = (1 to tooDeepForStack)
|
||||
.foldLeft(Source.single(42).mapMaterializedValue(_ ⇒ 1))(
|
||||
(f, i) ⇒ f.map(identity))
|
||||
val (mat, fut) = g.toMat(Sink.seq)(Keep.both).run()
|
||||
mat should ===(1)
|
||||
fut.futureValue should ===(List(42))
|
||||
}
|
||||
|
||||
"starting from a Flow" in {
|
||||
val g = (1 to tooDeepForStack).foldLeft(Flow[Int])((f, i) ⇒ f.map(identity))
|
||||
val (mat, fut) = g.runWith(Source.single(42).mapMaterializedValue(_ ⇒ 1), Sink.seq)
|
||||
mat should ===(1)
|
||||
fut.futureValue should ===(List(42))
|
||||
}
|
||||
|
||||
"using .via" in {
|
||||
val g = (1 to tooDeepForStack)
|
||||
.foldLeft(Source.single(42).mapMaterializedValue(_ ⇒ 1))(
|
||||
(f, i) ⇒ f.via(Flow[Int].map(identity)))
|
||||
val (mat, fut) = g.toMat(Sink.seq)(Keep.both).run()
|
||||
mat should ===(1)
|
||||
fut.futureValue should ===(List(42))
|
||||
}
|
||||
}
|
||||
|
||||
"be able to model hierarchic attributes" in {
|
||||
pending
|
||||
}
|
||||
"not fail fusing & materialization when building a large graph with simple computation" when {
|
||||
|
||||
"be able to model hierarchic cycle detection" in {
|
||||
pending
|
||||
"starting from a Source" in {
|
||||
val g = Source fromGraph Fusing.aggressive((1 to tooDeepForStack)
|
||||
.foldLeft(Source.single(42).mapMaterializedValue(_ ⇒ 1))(
|
||||
(f, i) ⇒ f.map(identity)))
|
||||
val (mat, fut) = g.toMat(Sink.seq)(Keep.both).run()
|
||||
mat should ===(1)
|
||||
fut.futureValue should ===(List(42))
|
||||
}
|
||||
|
||||
"starting from a Flow" in {
|
||||
val g = Flow fromGraph Fusing.aggressive((1 to tooDeepForStack).foldLeft(Flow[Int])((f, i) ⇒ f.map(identity)))
|
||||
val (mat, fut) = g.runWith(Source.single(42).mapMaterializedValue(_ ⇒ 1), Sink.seq)
|
||||
mat should ===(1)
|
||||
fut.futureValue should ===(List(42))
|
||||
}
|
||||
|
||||
"using .via" in {
|
||||
val g = Source fromGraph Fusing.aggressive((1 to tooDeepForStack)
|
||||
.foldLeft(Source.single(42).mapMaterializedValue(_ ⇒ 1))(
|
||||
(f, i) ⇒ f.via(Flow[Int].map(identity))))
|
||||
val (mat, fut) = g.toMat(Sink.seq)(Keep.both).run()
|
||||
mat should ===(1)
|
||||
fut.futureValue should ===(List(42))
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue