From aeb990deb29fb0dff236585616f17bbddc95ae05 Mon Sep 17 00:00:00 2001 From: Roland Kuhn Date: Thu, 17 Mar 2016 12:45:12 +0100 Subject: [PATCH] add tests for large flows --- .../akka/stream/impl/StreamLayoutSpec.scala | 83 ++++++++++++++----- 1 file changed, 63 insertions(+), 20 deletions(-) diff --git a/akka-stream-tests/src/test/scala/akka/stream/impl/StreamLayoutSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/impl/StreamLayoutSpec.scala index 71c766438e..6e25c173d1 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/impl/StreamLayoutSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/impl/StreamLayoutSpec.scala @@ -25,6 +25,8 @@ class StreamLayoutSpec extends AkkaSpec { def testSource(): Module = testAtomic(0, 1) def testSink(): Module = testAtomic(1, 0) + implicit val materializer = ActorMaterializer(ActorMaterializerSettings(system).withAutoFusing(false)) + "StreamLayout" must { "be able to model simple linear stages" in { @@ -95,14 +97,6 @@ class StreamLayoutSpec extends AkkaSpec { runnable0123a.isSource should be(false) } - "be able to model hierarchic linear modules" in { - pending - } - - "be able to model graph layouts" in { - pending - } - "be able to materialize linear layouts" in { val source = testSource() val stage1 = testStage() @@ -116,24 +110,73 @@ class StreamLayoutSpec extends AkkaSpec { checkMaterialized(runnable) } - "be able to materialize DAG layouts" in { - pending + val tooDeepForStack = 50000 - } - "be able to materialize cyclic layouts" in { - pending + "fail fusing when value computation is too complex" in { + // this tests that the canary in to coal mine actually works + val g = (1 to tooDeepForStack) + .foldLeft(Flow[Int].mapMaterializedValue(_ ⇒ 1)) { (flow, i) ⇒ + flow.mapMaterializedValue(x ⇒ x + i) + } + a[StackOverflowError] shouldBe thrownBy { + Fusing.aggressive(g) + } } - "be able to model hierarchic graph modules" in { - pending + "not fail materialization when building a large graph with simple computation" when { + + "starting from a Source" in { + val g = (1 to tooDeepForStack) + .foldLeft(Source.single(42).mapMaterializedValue(_ ⇒ 1))( + (f, i) ⇒ f.map(identity)) + val (mat, fut) = g.toMat(Sink.seq)(Keep.both).run() + mat should ===(1) + fut.futureValue should ===(List(42)) + } + + "starting from a Flow" in { + val g = (1 to tooDeepForStack).foldLeft(Flow[Int])((f, i) ⇒ f.map(identity)) + val (mat, fut) = g.runWith(Source.single(42).mapMaterializedValue(_ ⇒ 1), Sink.seq) + mat should ===(1) + fut.futureValue should ===(List(42)) + } + + "using .via" in { + val g = (1 to tooDeepForStack) + .foldLeft(Source.single(42).mapMaterializedValue(_ ⇒ 1))( + (f, i) ⇒ f.via(Flow[Int].map(identity))) + val (mat, fut) = g.toMat(Sink.seq)(Keep.both).run() + mat should ===(1) + fut.futureValue should ===(List(42)) + } } - "be able to model hierarchic attributes" in { - pending - } + "not fail fusing & materialization when building a large graph with simple computation" when { - "be able to model hierarchic cycle detection" in { - pending + "starting from a Source" in { + val g = Source fromGraph Fusing.aggressive((1 to tooDeepForStack) + .foldLeft(Source.single(42).mapMaterializedValue(_ ⇒ 1))( + (f, i) ⇒ f.map(identity))) + val (mat, fut) = g.toMat(Sink.seq)(Keep.both).run() + mat should ===(1) + fut.futureValue should ===(List(42)) + } + + "starting from a Flow" in { + val g = Flow fromGraph Fusing.aggressive((1 to tooDeepForStack).foldLeft(Flow[Int])((f, i) ⇒ f.map(identity))) + val (mat, fut) = g.runWith(Source.single(42).mapMaterializedValue(_ ⇒ 1), Sink.seq) + mat should ===(1) + fut.futureValue should ===(List(42)) + } + + "using .via" in { + val g = Source fromGraph Fusing.aggressive((1 to tooDeepForStack) + .foldLeft(Source.single(42).mapMaterializedValue(_ ⇒ 1))( + (f, i) ⇒ f.via(Flow[Int].map(identity)))) + val (mat, fut) = g.toMat(Sink.seq)(Keep.both).run() + mat should ===(1) + fut.futureValue should ===(List(42)) + } } }