From 6bf2f53a21b915d18c2799d57e82abb3b15f296b Mon Sep 17 00:00:00 2001 From: Kaspar Fischer Date: Mon, 28 Dec 2015 20:51:17 -0800 Subject: [PATCH] Benchmark for the flatMapMerge operator MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add a JMH benchmark for Flow.flatMapMerge(). The benchmark processes 100k elements that – depending on the parameter NumberOfStreams – come from: - NumberOfStreams=0: a single source. In this case, flatMapMerge() is not used. This is the base line against which the other tests run. - NumberOfStreams>0: the 100k elements come from NumberOfStreams-many sources, each of which produces (100k/NumberOfStreams)-many elements. The sources are merged together via flatMapMerge(). Notice that in the latter case, the sources are pre-fused to make the comparison as fair as possible. --- .../akka/stream/FlatMapMergeBenchmark.scala | 55 +++++++++++++++++++ 1 file changed, 55 insertions(+) create mode 100644 akka-bench-jmh-dev/src/main/scala/akka/stream/FlatMapMergeBenchmark.scala diff --git a/akka-bench-jmh-dev/src/main/scala/akka/stream/FlatMapMergeBenchmark.scala b/akka-bench-jmh-dev/src/main/scala/akka/stream/FlatMapMergeBenchmark.scala new file mode 100644 index 0000000000..06c87028c9 --- /dev/null +++ b/akka-bench-jmh-dev/src/main/scala/akka/stream/FlatMapMergeBenchmark.scala @@ -0,0 +1,55 @@ +/** + * Copyright (C) 2014 Typesafe Inc. + */ + +package akka.stream + +import akka.actor.ActorSystem +import akka.stream.scaladsl._ +import java.util.concurrent.TimeUnit +import org.openjdk.jmh.annotations._ +import scala.concurrent._ +import scala.concurrent.duration.Duration.Inf + +@State(Scope.Benchmark) +@OutputTimeUnit(TimeUnit.MILLISECONDS) +@BenchmarkMode(Array(Mode.Throughput)) +class FlatMapMergeBenchmark { + implicit val system = ActorSystem("FlatMapMergeBenchmark") + val materializerSettings = ActorMaterializerSettings(system).withDispatcher("akka.test.stream-dispatcher") + implicit val materializer = ActorMaterializer(materializerSettings) + + val NumberOfElements = 100000 + + @Param(Array("0", "1", "10")) + val NumberOfStreams = 0 + + var graph: RunnableGraph[Future[Unit]] = _ + + def createSource(count: Int): Graph[SourceShape[Int], Unit] = akka.stream.Fusing.aggressive(Source.repeat(1).take(count)) + + @Setup + def setup() { + val source = NumberOfStreams match { + // Base line: process NumberOfElements-many elements from a single source without using flatMapMerge + case 0 => createSource(NumberOfElements) + // Stream merging: process NumberOfElements-many elements from n sources, each producing (NumberOfElements/n)-many elements + case n => + val subSource = createSource(NumberOfElements / n) + Source.repeat(()).take(n).flatMapMerge(n, _ => subSource) + } + graph = Source.fromGraph(source).toMat(Sink.ignore)(Keep.right) + } + + @TearDown + def shutdown() { + system.shutdown() + system.awaitTermination() + } + + @Benchmark + @OperationsPerInvocation(100000) // Note: needs to match NumberOfElements. + def flat_map_merge_100k_elements() { + Await.result(graph.run(), Inf) + } +}