From b51e241126e4cfe2c5d02d86419aadfa939c2739 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Thu, 16 Mar 2017 17:06:51 +0100 Subject: [PATCH] add sub-stream benchmark * and remove invalid graph_with_nested_imports --- .../akka/stream/GraphBuilderBenchmark.scala | 5 +- .../stream/MaterializationBenchmark.scala | 50 +++++++++++-------- 2 files changed, 31 insertions(+), 24 deletions(-) diff --git a/akka-bench-jmh/src/main/scala/akka/stream/GraphBuilderBenchmark.scala b/akka-bench-jmh/src/main/scala/akka/stream/GraphBuilderBenchmark.scala index 3a8818906f..3c2da2883d 100644 --- a/akka-bench-jmh/src/main/scala/akka/stream/GraphBuilderBenchmark.scala +++ b/akka-bench-jmh/src/main/scala/akka/stream/GraphBuilderBenchmark.scala @@ -26,11 +26,8 @@ class GraphBuilderBenchmark { def graph_with_junctions(): RunnableGraph[NotUsed] = MaterializationBenchmark.graphWithJunctionsBuilder(complexity) - @Benchmark - def graph_with_nested_imports(): RunnableGraph[NotUsed] = - MaterializationBenchmark.graphWithNestedImportsBuilder(complexity) - @Benchmark def graph_with_imported_flow(): RunnableGraph[NotUsed] = MaterializationBenchmark.graphWithImportedFlowBuilder(complexity) + } diff --git a/akka-bench-jmh/src/main/scala/akka/stream/MaterializationBenchmark.scala b/akka-bench-jmh/src/main/scala/akka/stream/MaterializationBenchmark.scala index 32e8da50e7..d2aa1d86c4 100644 --- a/akka-bench-jmh/src/main/scala/akka/stream/MaterializationBenchmark.scala +++ b/akka-bench-jmh/src/main/scala/akka/stream/MaterializationBenchmark.scala @@ -11,6 +11,9 @@ import akka.stream.scaladsl._ import org.openjdk.jmh.annotations._ import scala.concurrent.Await import scala.concurrent.duration._ +import scala.concurrent.Future +import akka.stream.impl.ConstantFun +import akka.Done object MaterializationBenchmark { @@ -40,21 +43,6 @@ object MaterializationBenchmark { ClosedShape }) - val graphWithNestedImportsBuilder = (numOfNestedGraphs: Int) => { - var flow: Graph[FlowShape[Unit, Unit], NotUsed] = Flow[Unit].map(identity) - for (_ <- 1 to numOfNestedGraphs) { - flow = GraphDSL.create(flow) { b ⇒ flow ⇒ - FlowShape(flow.in, flow.out) - } - } - - RunnableGraph.fromGraph(GraphDSL.create(flow) { implicit b ⇒ flow ⇒ - import GraphDSL.Implicits._ - Source.single(()) ~> flow ~> Sink.ignore - ClosedShape - }) - } - val graphWithImportedFlowBuilder = (numOfFlows: Int) => RunnableGraph.fromGraph(GraphDSL.create(Source.single(())) { implicit b ⇒ source ⇒ import GraphDSL.Implicits._ @@ -68,6 +56,24 @@ object MaterializationBenchmark { out ~> Sink.ignore ClosedShape }) + + final val subStreamCount = 10000 + + val subStreamBuilder: Int => RunnableGraph[Future[Unit]] = numOfCombinators => { + + val subFlow = { + var flow = Flow[Unit] + for (_ <- 1 to numOfCombinators) { + flow = flow.map(identity) + } + flow + } + + Source.repeat(Source.single(())) + .take(subStreamCount) + .flatMapConcat(_.via(subFlow)) + .toMat(Sink.last)(Keep.right) + } } @State(Scope.Benchmark) @@ -81,18 +87,18 @@ class MaterializationBenchmark { var flowWithMap: RunnableGraph[NotUsed] = _ var graphWithJunctions: RunnableGraph[NotUsed] = _ - var graphWithNestedImports: RunnableGraph[NotUsed] = _ var graphWithImportedFlow: RunnableGraph[NotUsed] = _ + var subStream: RunnableGraph[Future[Unit]] = _ - @Param(Array("1", "10", "100", "1000")) + @Param(Array("1", "10", "100")) var complexity = 0 @Setup def setup(): Unit = { flowWithMap = flowWithMapBuilder(complexity) graphWithJunctions = graphWithJunctionsBuilder(complexity) - graphWithNestedImports = graphWithNestedImportsBuilder(complexity) graphWithImportedFlow = graphWithImportedFlowBuilder(complexity) + subStream = subStreamBuilder(complexity) } @TearDown @@ -107,8 +113,12 @@ class MaterializationBenchmark { def graph_with_junctions(): NotUsed = graphWithJunctions.run() @Benchmark - def graph_with_nested_imports(): NotUsed = graphWithNestedImports.run() + def graph_with_imported_flow(): NotUsed = graphWithImportedFlow.run() @Benchmark - def graph_with_imported_flow(): NotUsed = graphWithImportedFlow.run() + @OperationsPerInvocation(subStreamCount) + def sub_stream(): Done = { + Await.result(subStream.run(), 5.seconds) + Done + } }