From 897fecdd9790b9aeba7a06abbe7b156bc71169fc Mon Sep 17 00:00:00 2001 From: Kopaniev Vladyslav Date: Fri, 5 Apr 2019 12:04:25 +0300 Subject: [PATCH] Use BenchTestSource in streams JMH benchmarks #26563 --- .../akka/remote/artery/BenchTestSource.scala | 2 +- .../akka/stream/FlatMapMergeBenchmark.scala | 4 +- .../scala/akka/stream/FlowMapBenchmark.scala | 37 +++---------------- .../scala/akka/stream/FramingBenchmark.scala | 8 ++-- .../akka/stream/SourceRefBenchmark.scala | 31 ++-------------- 5 files changed, 17 insertions(+), 65 deletions(-) diff --git a/akka-bench-jmh/src/main/scala/akka/remote/artery/BenchTestSource.scala b/akka-bench-jmh/src/main/scala/akka/remote/artery/BenchTestSource.scala index 811662f0be..fb0f710025 100644 --- a/akka-bench-jmh/src/main/scala/akka/remote/artery/BenchTestSource.scala +++ b/akka-bench-jmh/src/main/scala/akka/remote/artery/BenchTestSource.scala @@ -19,7 +19,7 @@ import akka.stream.stage.OutHandler class BenchTestSource(elementCount: Int) extends GraphStage[SourceShape[java.lang.Integer]] { private val elements = new Array[java.lang.Integer](elementCount) - (1 to elementCount).map(n => elements(n - 1) = n) + (1 to elementCount).foreach(n => elements(n - 1) = n) val out: Outlet[java.lang.Integer] = Outlet("BenchTestSource") override val shape: SourceShape[java.lang.Integer] = SourceShape(out) diff --git a/akka-bench-jmh/src/main/scala/akka/stream/FlatMapMergeBenchmark.scala b/akka-bench-jmh/src/main/scala/akka/stream/FlatMapMergeBenchmark.scala index e31cf55eba..35dd5962a8 100644 --- a/akka-bench-jmh/src/main/scala/akka/stream/FlatMapMergeBenchmark.scala +++ b/akka-bench-jmh/src/main/scala/akka/stream/FlatMapMergeBenchmark.scala @@ -8,6 +8,7 @@ import akka.{ Done, NotUsed } import akka.actor.ActorSystem import akka.stream.scaladsl._ import java.util.concurrent.TimeUnit +import akka.remote.artery.BenchTestSourceSameElement import org.openjdk.jmh.annotations._ import scala.concurrent._ import scala.concurrent.duration._ @@ -27,7 +28,8 @@ class FlatMapMergeBenchmark { var graph: RunnableGraph[Future[Done]] = _ - def createSource(count: Int): Graph[SourceShape[Int], NotUsed] = Source.repeat(1).take(count) + def createSource(count: Int): Graph[SourceShape[java.lang.Integer], NotUsed] = + new BenchTestSourceSameElement(count, 1) @Setup def setup(): Unit = { diff --git a/akka-bench-jmh/src/main/scala/akka/stream/FlowMapBenchmark.scala b/akka-bench-jmh/src/main/scala/akka/stream/FlowMapBenchmark.scala index dbf41643cd..de8d7f3baf 100644 --- a/akka-bench-jmh/src/main/scala/akka/stream/FlowMapBenchmark.scala +++ b/akka-bench-jmh/src/main/scala/akka/stream/FlowMapBenchmark.scala @@ -7,13 +7,13 @@ package akka.stream import java.util.concurrent.TimeUnit import akka.NotUsed import akka.actor.ActorSystem +import akka.remote.artery.BenchTestSource import akka.stream.scaladsl._ import com.typesafe.config.ConfigFactory import org.openjdk.jmh.annotations._ import java.util.concurrent.Semaphore import scala.util.Success import akka.stream.impl.fusing.GraphStages -import org.reactivestreams._ import scala.concurrent.Await import scala.concurrent.duration._ @@ -59,7 +59,7 @@ class FlowMapBenchmark { final val successFailure = Success(new Exception) // safe to be benchmark scoped because the flows we construct in this bench are stateless - var flow: Source[Int, NotUsed] = _ + var flow: Source[java.lang.Integer, NotUsed] = _ @Param(Array("8", "32", "128")) var initialInputBufferSize = 0 @@ -73,38 +73,11 @@ class FlowMapBenchmark { materializer = ActorMaterializer(settings) - // Important to use a synchronous, zero overhead source, otherwise the slowness of the source - // might bias the benchmark, since the stream always adjusts the rate to the slowest stage. - val syncTestPublisher = new Publisher[Int] { - override def subscribe(s: Subscriber[_ >: Int]): Unit = { - val sub = new Subscription { - var counter = 0 // Piggyback on caller thread, no need for volatile - - override def request(n: Long): Unit = { - var i = n - while (i > 0) { - s.onNext(counter) - counter += 1 - if (counter == 100000) { - s.onComplete() - return - } - i -= 1 - } - } - - override def cancel(): Unit = () - } - - s.onSubscribe(sub) - } - } - - flow = mkMaps(Source.fromPublisher(syncTestPublisher), numberOfMapOps) { + flow = mkMaps(Source.fromGraph(new BenchTestSource(100000)), numberOfMapOps) { if (UseGraphStageIdentity) - GraphStages.identity[Int] + GraphStages.identity[java.lang.Integer] else - Flow[Int].map(identity) + Flow[java.lang.Integer].map(identity) } } diff --git a/akka-bench-jmh/src/main/scala/akka/stream/FramingBenchmark.scala b/akka-bench-jmh/src/main/scala/akka/stream/FramingBenchmark.scala index f371d2f067..d5825f1a89 100644 --- a/akka-bench-jmh/src/main/scala/akka/stream/FramingBenchmark.scala +++ b/akka-bench-jmh/src/main/scala/akka/stream/FramingBenchmark.scala @@ -8,6 +8,7 @@ import java.util.concurrent.{ Semaphore, TimeUnit } import akka.NotUsed import akka.actor.ActorSystem +import akka.remote.artery.BenchTestSourceSameElement import akka.stream.scaladsl.{ Framing, Sink, Source } import akka.util.ByteString import com.typesafe.config.{ Config, ConfigFactory } @@ -64,9 +65,10 @@ class FramingBenchmark { materializer = ActorMaterializer() val frame = List.range(0, messageSize, 1).map(_ => Random.nextPrintableChar()).mkString + "\n" - flow = Source - .repeat(ByteString(List.range(0, framePerSeq, 1).map(_ => frame).mkString)) - .take(100000) + val messageChunk = ByteString(List.range(0, framePerSeq, 1).map(_ => frame).mkString) + + Source + .fromGraph(new BenchTestSourceSameElement(100000, messageChunk)) .via(Framing.delimiter(ByteString("\n"), Int.MaxValue)) } diff --git a/akka-bench-jmh/src/main/scala/akka/stream/SourceRefBenchmark.scala b/akka-bench-jmh/src/main/scala/akka/stream/SourceRefBenchmark.scala index 5393393f37..cb00eebcf3 100644 --- a/akka-bench-jmh/src/main/scala/akka/stream/SourceRefBenchmark.scala +++ b/akka-bench-jmh/src/main/scala/akka/stream/SourceRefBenchmark.scala @@ -7,10 +7,10 @@ package akka.stream import java.util.concurrent.{ Semaphore, TimeUnit } import akka.actor.ActorSystem +import akka.remote.artery.BenchTestSource import akka.stream.scaladsl._ import com.typesafe.config.ConfigFactory import org.openjdk.jmh.annotations._ -import org.reactivestreams._ import scala.concurrent.Await import scala.concurrent.duration._ @@ -40,39 +40,14 @@ class SourceRefBenchmark { final val successFailure = Success(new Exception) // safe to be benchmark scoped because the flows we construct in this bench are stateless - var sourceRef: SourceRef[Int] = _ + var sourceRef: SourceRef[java.lang.Integer] = _ // @Param(Array("16", "32", "128")) // var initialInputBufferSize = 0 @Setup(Level.Invocation) def setup(): Unit = { - val sourcePublisher = new Publisher[Int] { - override def subscribe(s: Subscriber[_ >: Int]): Unit = { - val sub = new Subscription { - var counter = 0 // Piggyback on caller thread, no need for volatile - - override def request(n: Long): Unit = { - var i = n - while (i > 0) { - s.onNext(counter) - counter += 1 - if (counter == 100000) { - s.onComplete() - return - } - i -= 1 - } - } - - override def cancel(): Unit = () - } - - s.onSubscribe(sub) - } - } - - sourceRef = Await.result(Source.fromPublisher(sourcePublisher).runWith(StreamRefs.sourceRef()), 10.seconds) + sourceRef = Await.result(Source.fromGraph(new BenchTestSource(100000)).runWith(StreamRefs.sourceRef()), 10.seconds) } @TearDown