add sub-stream benchmark
* and remove invalid graph_with_nested_imports
This commit is contained in:
parent
c3c058b6bb
commit
b51e241126
2 changed files with 31 additions and 24 deletions
|
|
@ -26,11 +26,8 @@ class GraphBuilderBenchmark {
|
||||||
def graph_with_junctions(): RunnableGraph[NotUsed] =
|
def graph_with_junctions(): RunnableGraph[NotUsed] =
|
||||||
MaterializationBenchmark.graphWithJunctionsBuilder(complexity)
|
MaterializationBenchmark.graphWithJunctionsBuilder(complexity)
|
||||||
|
|
||||||
@Benchmark
|
|
||||||
def graph_with_nested_imports(): RunnableGraph[NotUsed] =
|
|
||||||
MaterializationBenchmark.graphWithNestedImportsBuilder(complexity)
|
|
||||||
|
|
||||||
@Benchmark
|
@Benchmark
|
||||||
def graph_with_imported_flow(): RunnableGraph[NotUsed] =
|
def graph_with_imported_flow(): RunnableGraph[NotUsed] =
|
||||||
MaterializationBenchmark.graphWithImportedFlowBuilder(complexity)
|
MaterializationBenchmark.graphWithImportedFlowBuilder(complexity)
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -11,6 +11,9 @@ import akka.stream.scaladsl._
|
||||||
import org.openjdk.jmh.annotations._
|
import org.openjdk.jmh.annotations._
|
||||||
import scala.concurrent.Await
|
import scala.concurrent.Await
|
||||||
import scala.concurrent.duration._
|
import scala.concurrent.duration._
|
||||||
|
import scala.concurrent.Future
|
||||||
|
import akka.stream.impl.ConstantFun
|
||||||
|
import akka.Done
|
||||||
|
|
||||||
object MaterializationBenchmark {
|
object MaterializationBenchmark {
|
||||||
|
|
||||||
|
|
@ -40,21 +43,6 @@ object MaterializationBenchmark {
|
||||||
ClosedShape
|
ClosedShape
|
||||||
})
|
})
|
||||||
|
|
||||||
val graphWithNestedImportsBuilder = (numOfNestedGraphs: Int) => {
|
|
||||||
var flow: Graph[FlowShape[Unit, Unit], NotUsed] = Flow[Unit].map(identity)
|
|
||||||
for (_ <- 1 to numOfNestedGraphs) {
|
|
||||||
flow = GraphDSL.create(flow) { b ⇒ flow ⇒
|
|
||||||
FlowShape(flow.in, flow.out)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
RunnableGraph.fromGraph(GraphDSL.create(flow) { implicit b ⇒ flow ⇒
|
|
||||||
import GraphDSL.Implicits._
|
|
||||||
Source.single(()) ~> flow ~> Sink.ignore
|
|
||||||
ClosedShape
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
val graphWithImportedFlowBuilder = (numOfFlows: Int) =>
|
val graphWithImportedFlowBuilder = (numOfFlows: Int) =>
|
||||||
RunnableGraph.fromGraph(GraphDSL.create(Source.single(())) { implicit b ⇒ source ⇒
|
RunnableGraph.fromGraph(GraphDSL.create(Source.single(())) { implicit b ⇒ source ⇒
|
||||||
import GraphDSL.Implicits._
|
import GraphDSL.Implicits._
|
||||||
|
|
@ -68,6 +56,24 @@ object MaterializationBenchmark {
|
||||||
out ~> Sink.ignore
|
out ~> Sink.ignore
|
||||||
ClosedShape
|
ClosedShape
|
||||||
})
|
})
|
||||||
|
|
||||||
|
final val subStreamCount = 10000
|
||||||
|
|
||||||
|
val subStreamBuilder: Int => RunnableGraph[Future[Unit]] = numOfCombinators => {
|
||||||
|
|
||||||
|
val subFlow = {
|
||||||
|
var flow = Flow[Unit]
|
||||||
|
for (_ <- 1 to numOfCombinators) {
|
||||||
|
flow = flow.map(identity)
|
||||||
|
}
|
||||||
|
flow
|
||||||
|
}
|
||||||
|
|
||||||
|
Source.repeat(Source.single(()))
|
||||||
|
.take(subStreamCount)
|
||||||
|
.flatMapConcat(_.via(subFlow))
|
||||||
|
.toMat(Sink.last)(Keep.right)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@State(Scope.Benchmark)
|
@State(Scope.Benchmark)
|
||||||
|
|
@ -81,18 +87,18 @@ class MaterializationBenchmark {
|
||||||
|
|
||||||
var flowWithMap: RunnableGraph[NotUsed] = _
|
var flowWithMap: RunnableGraph[NotUsed] = _
|
||||||
var graphWithJunctions: RunnableGraph[NotUsed] = _
|
var graphWithJunctions: RunnableGraph[NotUsed] = _
|
||||||
var graphWithNestedImports: RunnableGraph[NotUsed] = _
|
|
||||||
var graphWithImportedFlow: RunnableGraph[NotUsed] = _
|
var graphWithImportedFlow: RunnableGraph[NotUsed] = _
|
||||||
|
var subStream: RunnableGraph[Future[Unit]] = _
|
||||||
|
|
||||||
@Param(Array("1", "10", "100", "1000"))
|
@Param(Array("1", "10", "100"))
|
||||||
var complexity = 0
|
var complexity = 0
|
||||||
|
|
||||||
@Setup
|
@Setup
|
||||||
def setup(): Unit = {
|
def setup(): Unit = {
|
||||||
flowWithMap = flowWithMapBuilder(complexity)
|
flowWithMap = flowWithMapBuilder(complexity)
|
||||||
graphWithJunctions = graphWithJunctionsBuilder(complexity)
|
graphWithJunctions = graphWithJunctionsBuilder(complexity)
|
||||||
graphWithNestedImports = graphWithNestedImportsBuilder(complexity)
|
|
||||||
graphWithImportedFlow = graphWithImportedFlowBuilder(complexity)
|
graphWithImportedFlow = graphWithImportedFlowBuilder(complexity)
|
||||||
|
subStream = subStreamBuilder(complexity)
|
||||||
}
|
}
|
||||||
|
|
||||||
@TearDown
|
@TearDown
|
||||||
|
|
@ -107,8 +113,12 @@ class MaterializationBenchmark {
|
||||||
def graph_with_junctions(): NotUsed = graphWithJunctions.run()
|
def graph_with_junctions(): NotUsed = graphWithJunctions.run()
|
||||||
|
|
||||||
@Benchmark
|
@Benchmark
|
||||||
def graph_with_nested_imports(): NotUsed = graphWithNestedImports.run()
|
def graph_with_imported_flow(): NotUsed = graphWithImportedFlow.run()
|
||||||
|
|
||||||
@Benchmark
|
@Benchmark
|
||||||
def graph_with_imported_flow(): NotUsed = graphWithImportedFlow.run()
|
@OperationsPerInvocation(subStreamCount)
|
||||||
|
def sub_stream(): Done = {
|
||||||
|
Await.result(subStream.run(), 5.seconds)
|
||||||
|
Done
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue