diff --git a/akka-bench-jmh-dev/src/main/scala/akka/BenchRunner.scala b/akka-bench-jmh-dev/src/main/scala/akka/BenchRunner.scala new file mode 100644 index 0000000000..b6fa9780f5 --- /dev/null +++ b/akka-bench-jmh-dev/src/main/scala/akka/BenchRunner.scala @@ -0,0 +1,28 @@ +package akka + +import org.openjdk.jmh.results.RunResult +import org.openjdk.jmh.runner.Runner +import org.openjdk.jmh.runner.options.CommandLineOptions + +object BenchRunner extends App { + import scala.collection.JavaConversions._ + + val args2 = args.toList match { + case "quick" :: tail => "-i 1 -wi 1 -f1 -t1".split(" ").toList ::: tail + case "full" :: tail => "-i 10 -wi 4 -f3 -t1".split(" ").toList ::: tail + case other => other + } + + val opts = new CommandLineOptions(args2: _*) + val results = new Runner(opts).run() + + val report = results.map { result: RunResult ⇒ + val bench = result.getParams.getBenchmark + val params = result.getParams.getParamsKeys.map(key => s"$key=${result.getParams.getParam(key)}").mkString("_") + val score = result.getAggregatedResult.getPrimaryResult.getScore.round + val unit = result.getAggregatedResult.getPrimaryResult.getScoreUnit + s"\t${bench}_${params}\t$score\t$unit" + } + + report.toList.sorted.foreach(println) +} diff --git a/akka-bench-jmh-dev/src/main/scala/akka/http/HttpBenchmark.scala b/akka-bench-jmh-dev/src/main/scala/akka/http/HttpBenchmark.scala index f1157a0660..1d23a3fe0f 100644 --- a/akka-bench-jmh-dev/src/main/scala/akka/http/HttpBenchmark.scala +++ b/akka-bench-jmh-dev/src/main/scala/akka/http/HttpBenchmark.scala @@ -20,8 +20,8 @@ import scala.util.Try import com.typesafe.config.ConfigFactory @State(Scope.Benchmark) -@OutputTimeUnit(TimeUnit.MICROSECONDS) -@BenchmarkMode(Array(Mode.SampleTime)) +@OutputTimeUnit(TimeUnit.SECONDS) +@BenchmarkMode(Array(Mode.Throughput)) class HttpBenchmark { val config = ConfigFactory.parseString( diff --git a/akka-bench-jmh-dev/src/main/scala/akka/stream/FlowMapBenchmark.scala b/akka-bench-jmh-dev/src/main/scala/akka/stream/FlowMapBenchmark.scala new file mode 100644 index 0000000000..9ce0a7e34d --- /dev/null +++ b/akka-bench-jmh-dev/src/main/scala/akka/stream/FlowMapBenchmark.scala @@ -0,0 +1,96 @@ +/** + * Copyright (C) 2014 Typesafe Inc. + */ + +package akka.stream + +import java.util.concurrent.TimeUnit + +import akka.actor.ActorSystem +import akka.stream.scaladsl._ +import com.typesafe.config.ConfigFactory +import org.openjdk.jmh.annotations._ + +import scala.concurrent.Lock +import scala.util.Success + +@State(Scope.Benchmark) +@OutputTimeUnit(TimeUnit.MILLISECONDS) +@BenchmarkMode(Array(Mode.Throughput)) +class FlowMapBenchmark { + + val config = ConfigFactory.parseString( + """ + akka { + log-config-on-start = off + log-dead-letters-during-shutdown = off + loglevel = "WARNING" + + test { + timefactor = 1.0 + filter-leeway = 3s + single-expect-default = 3s + default-timeout = 5s + calling-thread-dispatcher { + type = akka.testkit.CallingThreadDispatcherConfigurator + } + } + }""".stripMargin).withFallback(ConfigFactory.load()) + + implicit val system = ActorSystem("test", config) + + var materializer: ActorMaterializer = _ + + + // manual, and not via @Param, because we want @OperationsPerInvocation on our tests + final val data100k = (1 to 100000).toVector + + final val successMarker = Success(1) + final val successFailure = Success(new Exception) + + // safe to be benchmark scoped because the flows we construct in this bench are stateless + var flow: Source[Int, Unit] = _ + + @Param(Array("2", "8")) // todo + val initialInputBufferSize = 0 + + @Param(Array("1", "5", "10")) + val numberOfMapOps = 0 + + @Setup + def setup() { + val settings = ActorMaterializerSettings(system) + .withInputBuffer(initialInputBufferSize, 16) + + materializer = ActorMaterializer(settings) + + flow = mkMaps(Source(data100k), numberOfMapOps)(identity) + } + + @TearDown + def shutdown() { + system.shutdown() + system.awaitTermination() + } + + @Benchmark + @OperationsPerInvocation(100000) + def flow_map_100k_elements() { + val lock = new Lock() // todo rethink what is the most lightweight way to await for a streams completion + lock.acquire() + + flow.runWith(Sink.onComplete(_ ⇒ lock.release()))(materializer) + + lock.acquire() + } + + // source setup + private def mkMaps[O, Mat](source: Source[O, Mat], count: Int)(op: O ⇒ O): Source[O, Mat] = { + var f = source + for (i ← 1 to count) + f = f.map(op) + f + } + + +} diff --git a/akka-bench-jmh-dev/src/main/scala/akka/stream/GraphBuilderBenchmark.scala b/akka-bench-jmh-dev/src/main/scala/akka/stream/GraphBuilderBenchmark.scala new file mode 100644 index 0000000000..8cf52d7172 --- /dev/null +++ b/akka-bench-jmh-dev/src/main/scala/akka/stream/GraphBuilderBenchmark.scala @@ -0,0 +1,37 @@ +/** + * Copyright (C) 2015 Typesafe Inc. + */ + +package akka.stream + +import java.util.concurrent.TimeUnit +import org.openjdk.jmh.annotations._ + +@State(Scope.Benchmark) +@OutputTimeUnit(TimeUnit.SECONDS) +@BenchmarkMode(Array(Mode.Throughput)) +class GraphBuilderBenchmark { + + @Param(Array("1", "10", "100", "1000")) + val complexity = 0 + + @Benchmark + def flow_with_map() { + MaterializationBenchmark.flowWithMapBuilder(complexity) + } + + @Benchmark + def graph_with_junctions() { + MaterializationBenchmark.graphWithJunctionsBuilder(complexity) + } + + @Benchmark + def graph_with_nested_imports() { + MaterializationBenchmark.graphWithNestedImportsBuilder(complexity) + } + + @Benchmark + def graph_with_imported_flow() { + MaterializationBenchmark.graphWithImportedFlowBuilder(complexity) + } +} diff --git a/akka-bench-jmh-dev/src/main/scala/akka/stream/MaterializationBenchmark.scala b/akka-bench-jmh-dev/src/main/scala/akka/stream/MaterializationBenchmark.scala new file mode 100644 index 0000000000..b06d726031 --- /dev/null +++ b/akka-bench-jmh-dev/src/main/scala/akka/stream/MaterializationBenchmark.scala @@ -0,0 +1,122 @@ +/** + * Copyright (C) 2015 Typesafe Inc. + */ + +package akka.stream + +import java.util.concurrent.TimeUnit +import akka.actor.ActorSystem +import akka.stream.scaladsl._ +import org.openjdk.jmh.annotations._ + +object MaterializationBenchmark { + + val flowWithMapBuilder = (numOfCombinators: Int) => { + var source = Source.single(()) + for (_ <- 1 to numOfCombinators) { + source = source.map(identity) + } + source.to(Sink.ignore) + } + + val graphWithJunctionsBuilder = (numOfJunctions: Int) => + FlowGraph.closed() { implicit b => + import FlowGraph.Implicits._ + + val broadcast = b.add(Broadcast[Unit](numOfJunctions)) + var outlet = broadcast.out(0) + for (i <- 1 until numOfJunctions) { + val merge = b.add(Merge[Unit](2)) + outlet ~> merge + broadcast.out(i) ~> merge + outlet = merge.out + } + + Source.single(()) ~> broadcast + outlet ~> Sink.ignore + } + + val graphWithNestedImportsBuilder = (numOfNestedGraphs: Int) => { + var flow: Graph[FlowShape[Unit, Unit], Unit] = Flow[Unit].map(identity) + for (_ <- 1 to numOfNestedGraphs) { + flow = FlowGraph.partial(flow) { b ⇒ + flow ⇒ + FlowShape(flow.inlet, flow.outlet) + } + } + + FlowGraph.closed(flow) { implicit b ⇒ + flow ⇒ + import FlowGraph.Implicits._ + Source.single(()) ~> flow ~> Sink.ignore + } + } + + val graphWithImportedFlowBuilder = (numOfFlows: Int) => { + val flow = Flow[Unit].map(identity) + FlowGraph.closed() { b ⇒ + val source = b.add(Source.single(())) + var outlet = source + for (i <- 0 until numOfFlows) { + val flowShape = b.add(flow) + b.addEdge(outlet, flowShape.inlet) + outlet = flowShape.outlet + } + + val sink = b.add(Sink.ignore) + b.addEdge(outlet, sink) + } + } +} + +@State(Scope.Benchmark) +@OutputTimeUnit(TimeUnit.SECONDS) +@BenchmarkMode(Array(Mode.Throughput)) +class MaterializationBenchmark { + import MaterializationBenchmark._ + + implicit val system = ActorSystem("MaterializationBenchmark") + implicit val mat = ActorMaterializer() + + var flowWithMap: RunnableGraph[Unit] = _ + var graphWithJunctions: RunnableGraph[Unit] = _ + var graphWithNestedImports: RunnableGraph[Unit] = _ + var graphWithImportedFlow: RunnableGraph[Unit] = _ + + @Param(Array("1", "10", "100", "1000")) + val complexity = 0 + + @Setup + def setup() { + flowWithMap = flowWithMapBuilder(complexity) + graphWithJunctions = graphWithJunctionsBuilder(complexity) + graphWithNestedImports = graphWithNestedImportsBuilder(complexity) + graphWithImportedFlow = graphWithImportedFlowBuilder(complexity) + } + + @TearDown + def shutdown() { + system.shutdown() + system.awaitTermination() + } + + @Benchmark + def flow_with_map() { + flowWithMap.run() + } + + @Benchmark + def graph_with_junctions() { + graphWithJunctions.run() + } + + @Benchmark + def graph_with_nested_imports() { + graphWithNestedImports.run() + } + + @Benchmark + def graph_with_imported_flow() { + graphWithImportedFlow.run() + } +} diff --git a/akka-bench-jmh-dev/src/main/scala/akka/stream/io/FileSourcesBenchmark.scala b/akka-bench-jmh-dev/src/main/scala/akka/stream/io/FileSourcesBenchmark.scala new file mode 100644 index 0000000000..100c15de09 --- /dev/null +++ b/akka-bench-jmh-dev/src/main/scala/akka/stream/io/FileSourcesBenchmark.scala @@ -0,0 +1,105 @@ +/** + * Copyright (C) 2014 Typesafe Inc. + */ + +package akka.stream.io + +import java.io.{FileInputStream, File} +import java.util.concurrent.TimeUnit + +import akka.actor.ActorSystem +import akka.stream.{Attributes, ActorMaterializer} +import akka.stream.scaladsl._ +import akka.util.ByteString +import org.openjdk.jmh.annotations._ + +import scala.concurrent.duration._ +import scala.concurrent.{Promise, Await, Future} + +/** + * Benchmark (bufSize) Mode Cnt Score Error Units + * FileSourcesBenchmark.fileChannel 2048 avgt 100 1140.192 ± 55.184 ms/op + */ +@State(Scope.Benchmark) +@OutputTimeUnit(TimeUnit.MILLISECONDS) +@BenchmarkMode(Array(Mode.AverageTime)) +class FileSourcesBenchmark { + + implicit val system = ActorSystem("file-sources-benchmark") + implicit val mat = ActorMaterializer() + + val file: File = { + val line = ByteString("x" * 2048 + "\n") + + val f = File.createTempFile(getClass.getName, ".bench.tmp") + f.deleteOnExit() + + val ft = Source(() ⇒ Iterator.continually(line)) + .take(10 * 39062) // adjust as needed + .runWith(SynchronousFileSink(f)) + Await.result(ft, 30.seconds) + + f + } + + @Param(Array("2048")) + val bufSize = 0 + + var fileChannelSource: Source[ByteString, Future[Long]] = _ + var fileInputStreamSource: Source[ByteString, Future[Long]] = _ + var ioSourceLinesIterator: Source[ByteString, Unit] = _ + + @Setup + def setup() { + fileChannelSource = SynchronousFileSource(file, bufSize) + fileInputStreamSource = InputStreamSource(() ⇒ new FileInputStream(file), bufSize) + ioSourceLinesIterator = Source(() ⇒ scala.io.Source.fromFile(file).getLines()).map(ByteString(_)) + } + + @TearDown + def teardown(): Unit = { + file.delete() + } + + @TearDown + def shutdown() { + system.shutdown() + system.awaitTermination() + } + + @Benchmark + def fileChannel() = { + val h = fileChannelSource.to(Sink.ignore).run() + + Await.result(h, 30.seconds) + } + + @Benchmark + def fileChannel_noReadAhead() = { + val h = fileChannelSource.withAttributes(Attributes.inputBuffer(1, 1)).to(Sink.ignore).run() + + Await.result(h, 30.seconds) + } + + @Benchmark + def inputStream() = { + val h = fileInputStreamSource.to(Sink.ignore).run() + + Await.result(h, 30.seconds) + } + + /* + * The previous status quo was very slow: + * Benchmark Mode Cnt Score Error Units + * FileSourcesBenchmark.naive_ioSourceLinesIterator avgt 20 7067.944 ± 1341.847 ms/op + */ + @Benchmark + def naive_ioSourceLinesIterator() = { + val p = Promise[Unit]() + ioSourceLinesIterator.to(Sink.onComplete(p.complete(_))).run() + + Await.result(p.future, 30.seconds) + } + + +}