Use BenchTestSource in streams JMH benchmarks #26563

This commit is contained in:
Kopaniev Vladyslav 2019-04-05 12:04:25 +03:00 committed by Johan Andrén
parent ca6d0f8c22
commit 897fecdd97
5 changed files with 17 additions and 65 deletions

View file

@ -19,7 +19,7 @@ import akka.stream.stage.OutHandler
class BenchTestSource(elementCount: Int) extends GraphStage[SourceShape[java.lang.Integer]] {
private val elements = new Array[java.lang.Integer](elementCount)
(1 to elementCount).map(n => elements(n - 1) = n)
(1 to elementCount).foreach(n => elements(n - 1) = n)
val out: Outlet[java.lang.Integer] = Outlet("BenchTestSource")
override val shape: SourceShape[java.lang.Integer] = SourceShape(out)

View file

@ -8,6 +8,7 @@ import akka.{ Done, NotUsed }
import akka.actor.ActorSystem
import akka.stream.scaladsl._
import java.util.concurrent.TimeUnit
import akka.remote.artery.BenchTestSourceSameElement
import org.openjdk.jmh.annotations._
import scala.concurrent._
import scala.concurrent.duration._
@ -27,7 +28,8 @@ class FlatMapMergeBenchmark {
var graph: RunnableGraph[Future[Done]] = _
def createSource(count: Int): Graph[SourceShape[Int], NotUsed] = Source.repeat(1).take(count)
def createSource(count: Int): Graph[SourceShape[java.lang.Integer], NotUsed] =
new BenchTestSourceSameElement(count, 1)
@Setup
def setup(): Unit = {

View file

@ -7,13 +7,13 @@ package akka.stream
import java.util.concurrent.TimeUnit
import akka.NotUsed
import akka.actor.ActorSystem
import akka.remote.artery.BenchTestSource
import akka.stream.scaladsl._
import com.typesafe.config.ConfigFactory
import org.openjdk.jmh.annotations._
import java.util.concurrent.Semaphore
import scala.util.Success
import akka.stream.impl.fusing.GraphStages
import org.reactivestreams._
import scala.concurrent.Await
import scala.concurrent.duration._
@ -59,7 +59,7 @@ class FlowMapBenchmark {
final val successFailure = Success(new Exception)
// safe to be benchmark scoped because the flows we construct in this bench are stateless
var flow: Source[Int, NotUsed] = _
var flow: Source[java.lang.Integer, NotUsed] = _
@Param(Array("8", "32", "128"))
var initialInputBufferSize = 0
@ -73,38 +73,11 @@ class FlowMapBenchmark {
materializer = ActorMaterializer(settings)
// Important to use a synchronous, zero overhead source, otherwise the slowness of the source
// might bias the benchmark, since the stream always adjusts the rate to the slowest stage.
val syncTestPublisher = new Publisher[Int] {
override def subscribe(s: Subscriber[_ >: Int]): Unit = {
val sub = new Subscription {
var counter = 0 // Piggyback on caller thread, no need for volatile
override def request(n: Long): Unit = {
var i = n
while (i > 0) {
s.onNext(counter)
counter += 1
if (counter == 100000) {
s.onComplete()
return
}
i -= 1
}
}
override def cancel(): Unit = ()
}
s.onSubscribe(sub)
}
}
flow = mkMaps(Source.fromPublisher(syncTestPublisher), numberOfMapOps) {
flow = mkMaps(Source.fromGraph(new BenchTestSource(100000)), numberOfMapOps) {
if (UseGraphStageIdentity)
GraphStages.identity[Int]
GraphStages.identity[java.lang.Integer]
else
Flow[Int].map(identity)
Flow[java.lang.Integer].map(identity)
}
}

View file

@ -8,6 +8,7 @@ import java.util.concurrent.{ Semaphore, TimeUnit }
import akka.NotUsed
import akka.actor.ActorSystem
import akka.remote.artery.BenchTestSourceSameElement
import akka.stream.scaladsl.{ Framing, Sink, Source }
import akka.util.ByteString
import com.typesafe.config.{ Config, ConfigFactory }
@ -64,9 +65,10 @@ class FramingBenchmark {
materializer = ActorMaterializer()
val frame = List.range(0, messageSize, 1).map(_ => Random.nextPrintableChar()).mkString + "\n"
flow = Source
.repeat(ByteString(List.range(0, framePerSeq, 1).map(_ => frame).mkString))
.take(100000)
val messageChunk = ByteString(List.range(0, framePerSeq, 1).map(_ => frame).mkString)
Source
.fromGraph(new BenchTestSourceSameElement(100000, messageChunk))
.via(Framing.delimiter(ByteString("\n"), Int.MaxValue))
}

View file

@ -7,10 +7,10 @@ package akka.stream
import java.util.concurrent.{ Semaphore, TimeUnit }
import akka.actor.ActorSystem
import akka.remote.artery.BenchTestSource
import akka.stream.scaladsl._
import com.typesafe.config.ConfigFactory
import org.openjdk.jmh.annotations._
import org.reactivestreams._
import scala.concurrent.Await
import scala.concurrent.duration._
@ -40,39 +40,14 @@ class SourceRefBenchmark {
final val successFailure = Success(new Exception)
// safe to be benchmark scoped because the flows we construct in this bench are stateless
var sourceRef: SourceRef[Int] = _
var sourceRef: SourceRef[java.lang.Integer] = _
// @Param(Array("16", "32", "128"))
// var initialInputBufferSize = 0
@Setup(Level.Invocation)
def setup(): Unit = {
val sourcePublisher = new Publisher[Int] {
override def subscribe(s: Subscriber[_ >: Int]): Unit = {
val sub = new Subscription {
var counter = 0 // Piggyback on caller thread, no need for volatile
override def request(n: Long): Unit = {
var i = n
while (i > 0) {
s.onNext(counter)
counter += 1
if (counter == 100000) {
s.onComplete()
return
}
i -= 1
}
}
override def cancel(): Unit = ()
}
s.onSubscribe(sub)
}
}
sourceRef = Await.result(Source.fromPublisher(sourcePublisher).runWith(StreamRefs.sourceRef()), 10.seconds)
sourceRef = Await.result(Source.fromGraph(new BenchTestSource(100000)).runWith(StreamRefs.sourceRef()), 10.seconds)
}
@TearDown