=ben add benchmark runner and move stream benchmarks to dev project
This commit is contained in:
parent
15d9a1eed4
commit
bab83e9d64
6 changed files with 390 additions and 2 deletions
28
akka-bench-jmh-dev/src/main/scala/akka/BenchRunner.scala
Normal file
28
akka-bench-jmh-dev/src/main/scala/akka/BenchRunner.scala
Normal file
|
|
@ -0,0 +1,28 @@
|
|||
package akka
|
||||
|
||||
import org.openjdk.jmh.results.RunResult
|
||||
import org.openjdk.jmh.runner.Runner
|
||||
import org.openjdk.jmh.runner.options.CommandLineOptions
|
||||
|
||||
object BenchRunner extends App {
|
||||
import scala.collection.JavaConversions._
|
||||
|
||||
val args2 = args.toList match {
|
||||
case "quick" :: tail => "-i 1 -wi 1 -f1 -t1".split(" ").toList ::: tail
|
||||
case "full" :: tail => "-i 10 -wi 4 -f3 -t1".split(" ").toList ::: tail
|
||||
case other => other
|
||||
}
|
||||
|
||||
val opts = new CommandLineOptions(args2: _*)
|
||||
val results = new Runner(opts).run()
|
||||
|
||||
val report = results.map { result: RunResult ⇒
|
||||
val bench = result.getParams.getBenchmark
|
||||
val params = result.getParams.getParamsKeys.map(key => s"$key=${result.getParams.getParam(key)}").mkString("_")
|
||||
val score = result.getAggregatedResult.getPrimaryResult.getScore.round
|
||||
val unit = result.getAggregatedResult.getPrimaryResult.getScoreUnit
|
||||
s"\t${bench}_${params}\t$score\t$unit"
|
||||
}
|
||||
|
||||
report.toList.sorted.foreach(println)
|
||||
}
|
||||
|
|
@ -20,8 +20,8 @@ import scala.util.Try
|
|||
import com.typesafe.config.ConfigFactory
|
||||
|
||||
@State(Scope.Benchmark)
|
||||
@OutputTimeUnit(TimeUnit.MICROSECONDS)
|
||||
@BenchmarkMode(Array(Mode.SampleTime))
|
||||
@OutputTimeUnit(TimeUnit.SECONDS)
|
||||
@BenchmarkMode(Array(Mode.Throughput))
|
||||
class HttpBenchmark {
|
||||
|
||||
val config = ConfigFactory.parseString(
|
||||
|
|
|
|||
|
|
@ -0,0 +1,96 @@
|
|||
/**
|
||||
* Copyright (C) 2014 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
|
||||
package akka.stream
|
||||
|
||||
import java.util.concurrent.TimeUnit
|
||||
|
||||
import akka.actor.ActorSystem
|
||||
import akka.stream.scaladsl._
|
||||
import com.typesafe.config.ConfigFactory
|
||||
import org.openjdk.jmh.annotations._
|
||||
|
||||
import scala.concurrent.Lock
|
||||
import scala.util.Success
|
||||
|
||||
@State(Scope.Benchmark)
|
||||
@OutputTimeUnit(TimeUnit.MILLISECONDS)
|
||||
@BenchmarkMode(Array(Mode.Throughput))
|
||||
class FlowMapBenchmark {
|
||||
|
||||
val config = ConfigFactory.parseString(
|
||||
"""
|
||||
akka {
|
||||
log-config-on-start = off
|
||||
log-dead-letters-during-shutdown = off
|
||||
loglevel = "WARNING"
|
||||
|
||||
test {
|
||||
timefactor = 1.0
|
||||
filter-leeway = 3s
|
||||
single-expect-default = 3s
|
||||
default-timeout = 5s
|
||||
calling-thread-dispatcher {
|
||||
type = akka.testkit.CallingThreadDispatcherConfigurator
|
||||
}
|
||||
}
|
||||
}""".stripMargin).withFallback(ConfigFactory.load())
|
||||
|
||||
implicit val system = ActorSystem("test", config)
|
||||
|
||||
var materializer: ActorMaterializer = _
|
||||
|
||||
|
||||
// manual, and not via @Param, because we want @OperationsPerInvocation on our tests
|
||||
final val data100k = (1 to 100000).toVector
|
||||
|
||||
final val successMarker = Success(1)
|
||||
final val successFailure = Success(new Exception)
|
||||
|
||||
// safe to be benchmark scoped because the flows we construct in this bench are stateless
|
||||
var flow: Source[Int, Unit] = _
|
||||
|
||||
@Param(Array("2", "8")) // todo
|
||||
val initialInputBufferSize = 0
|
||||
|
||||
@Param(Array("1", "5", "10"))
|
||||
val numberOfMapOps = 0
|
||||
|
||||
@Setup
|
||||
def setup() {
|
||||
val settings = ActorMaterializerSettings(system)
|
||||
.withInputBuffer(initialInputBufferSize, 16)
|
||||
|
||||
materializer = ActorMaterializer(settings)
|
||||
|
||||
flow = mkMaps(Source(data100k), numberOfMapOps)(identity)
|
||||
}
|
||||
|
||||
@TearDown
|
||||
def shutdown() {
|
||||
system.shutdown()
|
||||
system.awaitTermination()
|
||||
}
|
||||
|
||||
@Benchmark
|
||||
@OperationsPerInvocation(100000)
|
||||
def flow_map_100k_elements() {
|
||||
val lock = new Lock() // todo rethink what is the most lightweight way to await for a streams completion
|
||||
lock.acquire()
|
||||
|
||||
flow.runWith(Sink.onComplete(_ ⇒ lock.release()))(materializer)
|
||||
|
||||
lock.acquire()
|
||||
}
|
||||
|
||||
// source setup
|
||||
private def mkMaps[O, Mat](source: Source[O, Mat], count: Int)(op: O ⇒ O): Source[O, Mat] = {
|
||||
var f = source
|
||||
for (i ← 1 to count)
|
||||
f = f.map(op)
|
||||
f
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
|
@ -0,0 +1,37 @@
|
|||
/**
|
||||
* Copyright (C) 2015 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
|
||||
package akka.stream
|
||||
|
||||
import java.util.concurrent.TimeUnit
|
||||
import org.openjdk.jmh.annotations._
|
||||
|
||||
@State(Scope.Benchmark)
|
||||
@OutputTimeUnit(TimeUnit.SECONDS)
|
||||
@BenchmarkMode(Array(Mode.Throughput))
|
||||
class GraphBuilderBenchmark {
|
||||
|
||||
@Param(Array("1", "10", "100", "1000"))
|
||||
val complexity = 0
|
||||
|
||||
@Benchmark
|
||||
def flow_with_map() {
|
||||
MaterializationBenchmark.flowWithMapBuilder(complexity)
|
||||
}
|
||||
|
||||
@Benchmark
|
||||
def graph_with_junctions() {
|
||||
MaterializationBenchmark.graphWithJunctionsBuilder(complexity)
|
||||
}
|
||||
|
||||
@Benchmark
|
||||
def graph_with_nested_imports() {
|
||||
MaterializationBenchmark.graphWithNestedImportsBuilder(complexity)
|
||||
}
|
||||
|
||||
@Benchmark
|
||||
def graph_with_imported_flow() {
|
||||
MaterializationBenchmark.graphWithImportedFlowBuilder(complexity)
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,122 @@
|
|||
/**
|
||||
* Copyright (C) 2015 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
|
||||
package akka.stream
|
||||
|
||||
import java.util.concurrent.TimeUnit
|
||||
import akka.actor.ActorSystem
|
||||
import akka.stream.scaladsl._
|
||||
import org.openjdk.jmh.annotations._
|
||||
|
||||
object MaterializationBenchmark {
|
||||
|
||||
val flowWithMapBuilder = (numOfCombinators: Int) => {
|
||||
var source = Source.single(())
|
||||
for (_ <- 1 to numOfCombinators) {
|
||||
source = source.map(identity)
|
||||
}
|
||||
source.to(Sink.ignore)
|
||||
}
|
||||
|
||||
val graphWithJunctionsBuilder = (numOfJunctions: Int) =>
|
||||
FlowGraph.closed() { implicit b =>
|
||||
import FlowGraph.Implicits._
|
||||
|
||||
val broadcast = b.add(Broadcast[Unit](numOfJunctions))
|
||||
var outlet = broadcast.out(0)
|
||||
for (i <- 1 until numOfJunctions) {
|
||||
val merge = b.add(Merge[Unit](2))
|
||||
outlet ~> merge
|
||||
broadcast.out(i) ~> merge
|
||||
outlet = merge.out
|
||||
}
|
||||
|
||||
Source.single(()) ~> broadcast
|
||||
outlet ~> Sink.ignore
|
||||
}
|
||||
|
||||
val graphWithNestedImportsBuilder = (numOfNestedGraphs: Int) => {
|
||||
var flow: Graph[FlowShape[Unit, Unit], Unit] = Flow[Unit].map(identity)
|
||||
for (_ <- 1 to numOfNestedGraphs) {
|
||||
flow = FlowGraph.partial(flow) { b ⇒
|
||||
flow ⇒
|
||||
FlowShape(flow.inlet, flow.outlet)
|
||||
}
|
||||
}
|
||||
|
||||
FlowGraph.closed(flow) { implicit b ⇒
|
||||
flow ⇒
|
||||
import FlowGraph.Implicits._
|
||||
Source.single(()) ~> flow ~> Sink.ignore
|
||||
}
|
||||
}
|
||||
|
||||
val graphWithImportedFlowBuilder = (numOfFlows: Int) => {
|
||||
val flow = Flow[Unit].map(identity)
|
||||
FlowGraph.closed() { b ⇒
|
||||
val source = b.add(Source.single(()))
|
||||
var outlet = source
|
||||
for (i <- 0 until numOfFlows) {
|
||||
val flowShape = b.add(flow)
|
||||
b.addEdge(outlet, flowShape.inlet)
|
||||
outlet = flowShape.outlet
|
||||
}
|
||||
|
||||
val sink = b.add(Sink.ignore)
|
||||
b.addEdge(outlet, sink)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@State(Scope.Benchmark)
|
||||
@OutputTimeUnit(TimeUnit.SECONDS)
|
||||
@BenchmarkMode(Array(Mode.Throughput))
|
||||
class MaterializationBenchmark {
|
||||
import MaterializationBenchmark._
|
||||
|
||||
implicit val system = ActorSystem("MaterializationBenchmark")
|
||||
implicit val mat = ActorMaterializer()
|
||||
|
||||
var flowWithMap: RunnableGraph[Unit] = _
|
||||
var graphWithJunctions: RunnableGraph[Unit] = _
|
||||
var graphWithNestedImports: RunnableGraph[Unit] = _
|
||||
var graphWithImportedFlow: RunnableGraph[Unit] = _
|
||||
|
||||
@Param(Array("1", "10", "100", "1000"))
|
||||
val complexity = 0
|
||||
|
||||
@Setup
|
||||
def setup() {
|
||||
flowWithMap = flowWithMapBuilder(complexity)
|
||||
graphWithJunctions = graphWithJunctionsBuilder(complexity)
|
||||
graphWithNestedImports = graphWithNestedImportsBuilder(complexity)
|
||||
graphWithImportedFlow = graphWithImportedFlowBuilder(complexity)
|
||||
}
|
||||
|
||||
@TearDown
|
||||
def shutdown() {
|
||||
system.shutdown()
|
||||
system.awaitTermination()
|
||||
}
|
||||
|
||||
@Benchmark
|
||||
def flow_with_map() {
|
||||
flowWithMap.run()
|
||||
}
|
||||
|
||||
@Benchmark
|
||||
def graph_with_junctions() {
|
||||
graphWithJunctions.run()
|
||||
}
|
||||
|
||||
@Benchmark
|
||||
def graph_with_nested_imports() {
|
||||
graphWithNestedImports.run()
|
||||
}
|
||||
|
||||
@Benchmark
|
||||
def graph_with_imported_flow() {
|
||||
graphWithImportedFlow.run()
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,105 @@
|
|||
/**
|
||||
* Copyright (C) 2014 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
|
||||
package akka.stream.io
|
||||
|
||||
import java.io.{FileInputStream, File}
|
||||
import java.util.concurrent.TimeUnit
|
||||
|
||||
import akka.actor.ActorSystem
|
||||
import akka.stream.{Attributes, ActorMaterializer}
|
||||
import akka.stream.scaladsl._
|
||||
import akka.util.ByteString
|
||||
import org.openjdk.jmh.annotations._
|
||||
|
||||
import scala.concurrent.duration._
|
||||
import scala.concurrent.{Promise, Await, Future}
|
||||
|
||||
/**
|
||||
* Benchmark (bufSize) Mode Cnt Score Error Units
|
||||
* FileSourcesBenchmark.fileChannel 2048 avgt 100 1140.192 ± 55.184 ms/op
|
||||
*/
|
||||
@State(Scope.Benchmark)
|
||||
@OutputTimeUnit(TimeUnit.MILLISECONDS)
|
||||
@BenchmarkMode(Array(Mode.AverageTime))
|
||||
class FileSourcesBenchmark {
|
||||
|
||||
implicit val system = ActorSystem("file-sources-benchmark")
|
||||
implicit val mat = ActorMaterializer()
|
||||
|
||||
val file: File = {
|
||||
val line = ByteString("x" * 2048 + "\n")
|
||||
|
||||
val f = File.createTempFile(getClass.getName, ".bench.tmp")
|
||||
f.deleteOnExit()
|
||||
|
||||
val ft = Source(() ⇒ Iterator.continually(line))
|
||||
.take(10 * 39062) // adjust as needed
|
||||
.runWith(SynchronousFileSink(f))
|
||||
Await.result(ft, 30.seconds)
|
||||
|
||||
f
|
||||
}
|
||||
|
||||
@Param(Array("2048"))
|
||||
val bufSize = 0
|
||||
|
||||
var fileChannelSource: Source[ByteString, Future[Long]] = _
|
||||
var fileInputStreamSource: Source[ByteString, Future[Long]] = _
|
||||
var ioSourceLinesIterator: Source[ByteString, Unit] = _
|
||||
|
||||
@Setup
|
||||
def setup() {
|
||||
fileChannelSource = SynchronousFileSource(file, bufSize)
|
||||
fileInputStreamSource = InputStreamSource(() ⇒ new FileInputStream(file), bufSize)
|
||||
ioSourceLinesIterator = Source(() ⇒ scala.io.Source.fromFile(file).getLines()).map(ByteString(_))
|
||||
}
|
||||
|
||||
@TearDown
|
||||
def teardown(): Unit = {
|
||||
file.delete()
|
||||
}
|
||||
|
||||
@TearDown
|
||||
def shutdown() {
|
||||
system.shutdown()
|
||||
system.awaitTermination()
|
||||
}
|
||||
|
||||
@Benchmark
|
||||
def fileChannel() = {
|
||||
val h = fileChannelSource.to(Sink.ignore).run()
|
||||
|
||||
Await.result(h, 30.seconds)
|
||||
}
|
||||
|
||||
@Benchmark
|
||||
def fileChannel_noReadAhead() = {
|
||||
val h = fileChannelSource.withAttributes(Attributes.inputBuffer(1, 1)).to(Sink.ignore).run()
|
||||
|
||||
Await.result(h, 30.seconds)
|
||||
}
|
||||
|
||||
@Benchmark
|
||||
def inputStream() = {
|
||||
val h = fileInputStreamSource.to(Sink.ignore).run()
|
||||
|
||||
Await.result(h, 30.seconds)
|
||||
}
|
||||
|
||||
/*
|
||||
* The previous status quo was very slow:
|
||||
* Benchmark Mode Cnt Score Error Units
|
||||
* FileSourcesBenchmark.naive_ioSourceLinesIterator avgt 20 7067.944 ± 1341.847 ms/op
|
||||
*/
|
||||
@Benchmark
|
||||
def naive_ioSourceLinesIterator() = {
|
||||
val p = Promise[Unit]()
|
||||
ioSourceLinesIterator.to(Sink.onComplete(p.complete(_))).run()
|
||||
|
||||
Await.result(p.future, 30.seconds)
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
Loading…
Add table
Add a link
Reference in a new issue