diff --git a/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/GraphInterpreterSpecKit.scala b/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/GraphInterpreterSpecKit.scala index f805a91080..ed423f64ff 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/GraphInterpreterSpecKit.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/GraphInterpreterSpecKit.scala @@ -299,10 +299,12 @@ trait GraphInterpreterSpecKit extends AkkaSpec { } implicit class ToGraphStage[I, O](stage: Stage[I, O]) { - def toGS = + def toGS: PushPullGraphStage[Any, Any, Any] = { + val s = stage new PushPullGraphStage[Any, Any, Any]( - (_) ⇒ stage.asInstanceOf[Stage[Any, Any]], + (_) ⇒ s.asInstanceOf[Stage[Any, Any]], Attributes.none) + } } abstract class OneBoundedSetup[T](_ops: GraphStageWithMaterializedValue[Shape, Any]*) extends Builder { diff --git a/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/InterpreterStressSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/InterpreterStressSpec.scala index c273912a3c..ef41d96563 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/InterpreterStressSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/InterpreterStressSpec.scala @@ -16,11 +16,11 @@ class InterpreterStressSpec extends AkkaSpec with GraphInterpreterSpecKit { val halfLength = chainLength / 2 val repetition = 100 - val f = (x: Int) ⇒ x + 1 + val map = Map((x: Int) ⇒ x + 1, stoppingDecider).toGS "Interpreter" must { - "work with a massive chain of maps" in new OneBoundedSetup[Int](Array.fill(chainLength)(Map(f, stoppingDecider))) { + "work with a massive chain of maps" in new OneBoundedSetup[Int](Vector.fill(chainLength)(map): _*) { lastEvents() should be(Set.empty) val tstamp = System.nanoTime() @@ -42,9 +42,10 @@ class InterpreterStressSpec extends AkkaSpec with GraphInterpreterSpecKit { info(s"Chain finished in $time seconds ${(chainLength * repetition) / (time * 1000 * 1000)} million maps/s") } - "work with a massive chain of maps with early complete" in new OneBoundedSetup[Int](Vector.fill(halfLength)(Map((x: Int) ⇒ x + 1, stoppingDecider)) ++ - Seq(Take(repetition / 2)) ++ - Seq.fill(halfLength)(Map((x: Int) ⇒ x + 1, stoppingDecider))) { + "work with a massive chain of maps with early complete" in new OneBoundedSetup[Int]( + Vector.fill(halfLength)(map) ++ + Seq(Take(repetition / 2).toGS) ++ + Vector.fill(halfLength)(map): _*) { lastEvents() should be(Set.empty) val tstamp = System.nanoTime() @@ -70,7 +71,7 @@ class InterpreterStressSpec extends AkkaSpec with GraphInterpreterSpecKit { info(s"Chain finished in $time seconds ${(chainLength * repetition) / (time * 1000 * 1000)} million maps/s") } - "work with a massive chain of takes" in new OneBoundedSetup[Int](Vector.fill(chainLength)(Take(1))) { + "work with a massive chain of takes" in new OneBoundedSetup[Int](Vector.fill(chainLength / 10)(Take(1))) { lastEvents() should be(Set.empty) downstream.requestOne() @@ -99,7 +100,7 @@ class InterpreterStressSpec extends AkkaSpec with GraphInterpreterSpecKit { } - "work with a massive chain of conflates by overflowing to the heap" in new OneBoundedSetup[Int](Vector.fill(100000)(Conflate( + "work with a massive chain of conflates by overflowing to the heap" in new OneBoundedSetup[Int](Vector.fill(chainLength / 10)(Conflate( (in: Int) ⇒ in, (agg: Int, in: Int) ⇒ agg + in, Supervision.stoppingDecider))) {