Fix akka-bench-jmh warnings. (#27438)

This commit is contained in:
Helena Edelson 2019-08-02 01:44:27 -07:00 committed by Arnout Engelen
parent a2ecd915cb
commit d12bc13fcb
17 changed files with 65 additions and 65 deletions

View file

@ -94,20 +94,20 @@ object BenchmarkActors {
} }
private def startPingPongActorPairs(messagesPerPair: Int, numPairs: Int, dispatcher: String)( private def startPingPongActorPairs(messagesPerPair: Int, numPairs: Int, dispatcher: String)(
implicit system: ActorSystem) = { implicit system: ActorSystem): (Vector[(ActorRef, ActorRef)], CountDownLatch) = {
val fullPathToDispatcher = "akka.actor." + dispatcher val fullPathToDispatcher = "akka.actor." + dispatcher
val latch = new CountDownLatch(numPairs * 2) val latch = new CountDownLatch(numPairs * 2)
val actors = for { val actors = List
i <- (1 to numPairs).toVector .fill(numPairs) {
} yield {
val ping = system.actorOf(PingPong.props(messagesPerPair, latch).withDispatcher(fullPathToDispatcher)) val ping = system.actorOf(PingPong.props(messagesPerPair, latch).withDispatcher(fullPathToDispatcher))
val pong = system.actorOf(PingPong.props(messagesPerPair, latch).withDispatcher(fullPathToDispatcher)) val pong = system.actorOf(PingPong.props(messagesPerPair, latch).withDispatcher(fullPathToDispatcher))
(ping, pong) (ping, pong)
} }
.toVector
(actors, latch) (actors, latch)
} }
private def initiatePingPongForPairs(refs: Vector[(ActorRef, ActorRef)], inFlight: Int) = { private def initiatePingPongForPairs(refs: Vector[(ActorRef, ActorRef)], inFlight: Int): Unit = {
for { for {
(ping, pong) <- refs (ping, pong) <- refs
_ <- 1 to inFlight _ <- 1 to inFlight
@ -117,7 +117,7 @@ object BenchmarkActors {
} }
private def startEchoActorPairs(messagesPerPair: Int, numPairs: Int, dispatcher: String, batchSize: Int)( private def startEchoActorPairs(messagesPerPair: Int, numPairs: Int, dispatcher: String, batchSize: Int)(
implicit system: ActorSystem) = { implicit system: ActorSystem): (Vector[ActorRef], CountDownLatch) = {
val fullPathToDispatcher = "akka.actor." + dispatcher val fullPathToDispatcher = "akka.actor." + dispatcher
val latch = new CountDownLatch(numPairs) val latch = new CountDownLatch(numPairs)
@ -127,18 +127,18 @@ object BenchmarkActors {
(actors, latch) (actors, latch)
} }
private def initiateEchoPairs(refs: Vector[ActorRef]) = { private def initiateEchoPairs(refs: Vector[ActorRef]): Unit = {
refs.foreach(_ ! Message) refs.foreach(_ ! Message)
} }
def printProgress(totalMessages: Long, numActors: Int, startNanoTime: Long) = { def printProgress(totalMessages: Long, numActors: Int, startNanoTime: Long): Unit = {
val durationMicros = (System.nanoTime() - startNanoTime) / 1000 val durationMicros = (System.nanoTime() - startNanoTime) / 1000
println( println(
f" $totalMessages messages by $numActors actors took ${durationMicros / 1000} ms, " + f" $totalMessages messages by $numActors actors took ${durationMicros / 1000} ms, " +
f"${totalMessages.toDouble / durationMicros}%,.2f M msg/s") f"${totalMessages.toDouble / durationMicros}%,.2f M msg/s")
} }
def requireRightNumberOfCores(numCores: Int) = def requireRightNumberOfCores(numCores: Int): Unit =
require( require(
Runtime.getRuntime.availableProcessors == numCores, Runtime.getRuntime.availableProcessors == numCores,
s"Update the cores constant to ${Runtime.getRuntime.availableProcessors}") s"Update the cores constant to ${Runtime.getRuntime.availableProcessors}")

View file

@ -23,7 +23,7 @@ class DirectByteBufferPoolBenchmark {
val random = new Random val random = new Random
var arteryPool: DirectByteBufferPool = _ private[akka] var arteryPool: DirectByteBufferPool = _
@Setup(Level.Trial) @Setup(Level.Trial)
def setup(): Unit = { def setup(): Unit = {

View file

@ -67,14 +67,14 @@ class TellOnlyBenchmark {
probe.send(actor, message) probe.send(actor, message)
probe.expectMsg(message) probe.expectMsg(message)
probe.send(actor, flipDrop) probe.send(actor, flipDrop)
probe.expectNoMsg(200.millis) probe.expectNoMessage(200.millis)
System.gc() System.gc()
} }
@TearDown(Level.Iteration) @TearDown(Level.Iteration)
def shutdownIteration(): Unit = { def shutdownIteration(): Unit = {
probe.send(actor, flipDrop) probe.send(actor, flipDrop)
probe.expectNoMsg(200.millis) probe.expectNoMessage(200.millis)
actor ! stop actor ! stop
probe.expectTerminated(actor, timeout) probe.expectTerminated(actor, timeout)
actor = null actor = null

View file

@ -51,8 +51,10 @@ class ORSetSerializationBenchmark {
private val ref2 = (1 to 10).map(n => system2.actorOf(Props.empty, s"ref2-$n")) private val ref2 = (1 to 10).map(n => system2.actorOf(Props.empty, s"ref2-$n"))
private val orSet = { private val orSet = {
val set1 = ref1.foldLeft(ORSet.empty[ActorRef]) { case (acc, r) => acc.add(Cluster(system1), r) } val selfUniqueAddress1 = SelfUniqueAddress(Cluster(system1).selfUniqueAddress)
val set2 = ref2.foldLeft(ORSet.empty[ActorRef]) { case (acc, r) => acc.add(Cluster(system2), r) } val selfUniqueAddress2 = SelfUniqueAddress(Cluster(system2).selfUniqueAddress)
val set1 = ref1.foldLeft(ORSet.empty[ActorRef]) { case (acc, r) => acc.add(selfUniqueAddress1, r) }
val set2 = ref2.foldLeft(ORSet.empty[ActorRef]) { case (acc, r) => acc.add(selfUniqueAddress2, r) }
set1.merge(set2) set1.merge(set2)
} }

View file

@ -47,7 +47,7 @@ class VersionVectorBenchmark {
@Setup(Level.Trial) @Setup(Level.Trial)
def setup(): Unit = { def setup(): Unit = {
vv1 = (1 to size).foldLeft(VersionVector.empty)((vv, n) => vv + nextNode()) vv1 = (1 to size).foldLeft(VersionVector.empty)((vv, _) => vv + nextNode())
vv2 = vv1 + nextNode() vv2 = vv1 + nextNode()
vv3 = vv1 + nextNode() vv3 = vv1 + nextNode()
dot1 = VersionVector(nodeA, vv1.versionAt(nodeA)) dot1 = VersionVector(nodeA, vv1.versionAt(nodeA))

View file

@ -88,7 +88,7 @@ class `persistAsync, defer`(respondAfter: Int) extends PersistentActor {
override def receiveCommand = { override def receiveCommand = {
case n: Int => case n: Int =>
persistAsync(Evt(n)) { e => persistAsync(Evt(n)) { _ =>
} }
deferAsync(Evt(n)) { e => deferAsync(Evt(n)) { e =>
if (e.i == respondAfter) sender() ! e.i if (e.i == respondAfter) sender() ! e.i
@ -104,9 +104,9 @@ class `persistAsync, defer, respond ASAP`(respondAfter: Int) extends PersistentA
override def receiveCommand = { override def receiveCommand = {
case n: Int => case n: Int =>
persistAsync(Evt(n)) { e => persistAsync(Evt(n)) { _ =>
} }
deferAsync(Evt(n)) { e => deferAsync(Evt(n)) { _ =>
} }
if (n == respondAfter) sender() ! n if (n == respondAfter) sender() ! n
} }

View file

@ -141,7 +141,7 @@ class PersistPersistentActorWithAtLeastOnceDelivery(
override def receiveCommand = { override def receiveCommand = {
case n: Int => case n: Int =>
persist(MsgSent(n)) { e => persist(MsgSent(n)) { _ =>
deliver(downStream)(deliveryId => Msg(deliveryId, n)) deliver(downStream)(deliveryId => Msg(deliveryId, n))
if (n == respondAfter) if (n == respondAfter)
//switch to wait all message confirmed //switch to wait all message confirmed
@ -180,7 +180,7 @@ class PersistAsyncPersistentActorWithAtLeastOnceDelivery(
override def receiveCommand = { override def receiveCommand = {
case n: Int => case n: Int =>
persistAsync(MsgSent(n)) { e => persistAsync(MsgSent(n)) { _ =>
deliver(downStream)(deliveryId => Msg(deliveryId, n)) deliver(downStream)(deliveryId => Msg(deliveryId, n))
if (n == respondAfter) if (n == respondAfter)
//switch to wait all message confirmed //switch to wait all message confirmed

View file

@ -4,31 +4,34 @@
package akka.remote.artery package akka.remote.artery
import akka.actor._
import akka.Done
import akka.NotUsed
import akka.remote._
import akka.remote.artery.compress._
import akka.serialization.{ BaseSerializer, ByteBufferSerializer, SerializationExtension }
import akka.stream.ActorMaterializer
import akka.stream.ActorMaterializerSettings
import akka.stream.scaladsl._
import akka.util.OptionVal
import com.typesafe.config.ConfigFactory
import java.io.IOException import java.io.IOException
import java.nio.ByteBuffer import java.nio.ByteBuffer
import java.nio.ByteOrder import java.nio.ByteOrder
import java.util.concurrent.CountDownLatch import java.util.concurrent.CountDownLatch
import java.util.concurrent.TimeUnit import java.util.concurrent.TimeUnit
import akka.remote.artery.Decoder.InboundCompressionAccess
import org.openjdk.jmh.annotations._
import scala.annotation.tailrec import scala.annotation.tailrec
import scala.concurrent.Await import scala.concurrent.Await
import scala.concurrent.Future import scala.concurrent.Future
import scala.concurrent.duration._ import scala.concurrent.duration._
import akka.Done
import akka.NotUsed
import akka.actor._
import akka.remote._
import akka.remote.artery.Decoder.InboundCompressionAccess
import akka.remote.artery.compress._
import akka.serialization.BaseSerializer
import akka.serialization.ByteBufferSerializer
import akka.serialization.SerializationExtension
import akka.stream.ActorMaterializer
import akka.stream.ActorMaterializerSettings
import akka.stream.scaladsl._
import akka.util.OptionVal
import com.github.ghik.silencer.silent
import com.typesafe.config.ConfigFactory
import org.openjdk.jmh.annotations._
@State(Scope.Benchmark) @State(Scope.Benchmark)
@OutputTimeUnit(TimeUnit.MILLISECONDS) @OutputTimeUnit(TimeUnit.MILLISECONDS)
@BenchmarkMode(Array(Mode.Throughput)) @BenchmarkMode(Array(Mode.Throughput))
@ -38,6 +41,7 @@ import scala.concurrent.duration._
class CodecBenchmark { class CodecBenchmark {
import CodecBenchmark._ import CodecBenchmark._
@silent("immutable val") // JMH updates this via reflection
@Param(Array(Standard, RemoteInstrument)) @Param(Array(Standard, RemoteInstrument))
private var configType: String = _ private var configType: String = _
@ -66,11 +70,11 @@ class CodecBenchmark {
override def publishDropped(inbound: InboundEnvelope, reason: String): Unit = () override def publishDropped(inbound: InboundEnvelope, reason: String): Unit = ()
} }
private var materializer: ActorMaterializer = _ @silent("never used") private var materializer: ActorMaterializer = _
private var remoteRefB: RemoteActorRef = _ @silent("never used") private var remoteRefB: RemoteActorRef = _
private var resolvedRef: InternalActorRef = _ @silent("never used") private var resolvedRef: InternalActorRef = _
private var senderStringA: String = _ @silent("never used") private var senderStringA: String = _
private var recipientStringB: String = _ @silent("never used") private var recipientStringB: String = _
private var encodeGraph: Flow[String, Unit, NotUsed] = _ private var encodeGraph: Flow[String, Unit, NotUsed] = _
private var decodeGraph: Flow[String, Unit, NotUsed] = _ private var decodeGraph: Flow[String, Unit, NotUsed] = _
@ -112,7 +116,7 @@ class CodecBenchmark {
val actorOnSystemA = system.actorOf(Props.empty, "a") val actorOnSystemA = system.actorOf(Props.empty, "a")
senderStringA = actorOnSystemA.path.toSerializationFormatWithAddress(uniqueLocalAddress.address) senderStringA = actorOnSystemA.path.toSerializationFormatWithAddress(uniqueLocalAddress.address)
val actorOnSystemB = systemB.actorOf(Props.empty, "b") systemB.actorOf(Props.empty, "b")
val addressB = systemB.asInstanceOf[ExtendedActorSystem].provider.getDefaultAddress val addressB = systemB.asInstanceOf[ExtendedActorSystem].provider.getDefaultAddress
val rootB = RootActorPath(addressB) val rootB = RootActorPath(addressB)
remoteRefB = Await remoteRefB = Await
@ -147,7 +151,7 @@ class CodecBenchmark {
debugLogSend = false, debugLogSend = false,
version = ArteryTransport.HighestVersion)) version = ArteryTransport.HighestVersion))
val encoderInput: Flow[String, OutboundEnvelope, NotUsed] = val encoderInput: Flow[String, OutboundEnvelope, NotUsed] =
Flow[String].map(msg => outboundEnvelopePool.acquire().init(OptionVal.None, payload, OptionVal.Some(remoteRefB))) Flow[String].map(_ => outboundEnvelopePool.acquire().init(OptionVal.None, payload, OptionVal.Some(remoteRefB)))
val compressions = new InboundCompressionsImpl(system, inboundContext, inboundContext.settings.Advanced.Compression) val compressions = new InboundCompressionsImpl(system, inboundContext, inboundContext.settings.Advanced.Compression)
val decoder: Flow[EnvelopeBuffer, InboundEnvelope, InboundCompressionAccess] = val decoder: Flow[EnvelopeBuffer, InboundEnvelope, InboundCompressionAccess] =
Flow.fromGraph( Flow.fromGraph(

View file

@ -17,6 +17,7 @@ import akka.actor._
import akka.serialization.Serialization import akka.serialization.Serialization
import akka.serialization.SerializationExtension import akka.serialization.SerializationExtension
import akka.serialization.SerializerWithStringManifest import akka.serialization.SerializerWithStringManifest
import com.github.ghik.silencer.silent
import com.typesafe.config.ConfigFactory import com.typesafe.config.ConfigFactory
import org.openjdk.jmh.annotations._ import org.openjdk.jmh.annotations._
@ -184,6 +185,7 @@ class JacksonSerializationBench {
var system: ActorSystem = _ var system: ActorSystem = _
var serialization: Serialization = _ var serialization: Serialization = _
@silent("immutable val") // JMH updates this via reflection
@Param(Array("jackson-json", "jackson-cbor")) // "java" @Param(Array("jackson-json", "jackson-cbor")) // "java"
private var serializerName: String = _ private var serializerName: String = _

View file

@ -16,7 +16,7 @@ import com.typesafe.config.ConfigFactory
import org.openjdk.jmh.annotations._ import org.openjdk.jmh.annotations._
import scala.concurrent.duration._ import scala.concurrent.duration._
import scala.concurrent.{ Await, Future } import scala.concurrent.Await
object AskBenchmark { object AskBenchmark {
final val OperationsPerInvocation = 100000 final val OperationsPerInvocation = 100000

View file

@ -100,7 +100,7 @@ class FlowMapBenchmark {
// source setup // source setup
private def mkMaps[O, Mat](source: Source[O, Mat], count: Int)(flow: => Graph[FlowShape[O, O], _]): Source[O, Mat] = { private def mkMaps[O, Mat](source: Source[O, Mat], count: Int)(flow: => Graph[FlowShape[O, O], _]): Source[O, Mat] = {
var f = source var f = source
for (i <- 1 to count) for (_ <- 1 to count)
f = f.via(flow) f = f.via(flow)
f f
} }

View file

@ -48,7 +48,7 @@ object MaterializationBenchmark {
val broadcast = b.add(Broadcast[Unit](numOfJunctions)) val broadcast = b.add(Broadcast[Unit](numOfJunctions))
val merge = b.add(Merge[Unit](numOfJunctions)) val merge = b.add(Merge[Unit](numOfJunctions))
for (i <- 0 until numOfJunctions) { for (_ <- 0 until numOfJunctions) {
broadcast ~> merge broadcast ~> merge
} }
@ -62,7 +62,7 @@ object MaterializationBenchmark {
import GraphDSL.Implicits._ import GraphDSL.Implicits._
val flow = Flow[Unit].map(identity) val flow = Flow[Unit].map(identity)
var out: Outlet[Unit] = source.out var out: Outlet[Unit] = source.out
for (i <- 0 until numOfFlows) { for (_ <- 0 until numOfFlows) {
val flowShape = b.add(flow) val flowShape = b.add(flow)
out ~> flowShape out ~> flowShape
out = flowShape.outlet out = flowShape.outlet

View file

@ -4,7 +4,6 @@
package akka.stream package akka.stream
/*
import java.util import java.util
import java.util.concurrent.TimeUnit import java.util.concurrent.TimeUnit
@ -18,7 +17,6 @@ import org.reactivestreams.{ Publisher, Subscriber, Subscription }
@OutputTimeUnit(TimeUnit.MILLISECONDS) @OutputTimeUnit(TimeUnit.MILLISECONDS)
@BenchmarkMode(Array(Mode.Throughput)) @BenchmarkMode(Array(Mode.Throughput))
class NewLayoutBenchmark { class NewLayoutBenchmark {
// TODO: This benchmark is heavily copy-pasta. This is a temporary benchmark as these two implementations // TODO: This benchmark is heavily copy-pasta. This is a temporary benchmark as these two implementations
// will never exist at the same time. This needs to be turned into a better one once the design // will never exist at the same time. This needs to be turned into a better one once the design
// settles. // settles.
@ -360,6 +358,4 @@ class NewLayoutBenchmark {
def mat_source_flow_and_sink_old(blackhole: org.openjdk.jmh.infra.Blackhole): Unit = { def mat_source_flow_and_sink_old(blackhole: org.openjdk.jmh.infra.Blackhole): Unit = {
testMaterializeOld(sourceFlowSinkOld, blackhole: org.openjdk.jmh.infra.Blackhole) testMaterializeOld(sourceFlowSinkOld, blackhole: org.openjdk.jmh.infra.Blackhole)
} }
} }
*/

View file

@ -69,7 +69,7 @@ class PartitionHubBenchmark {
val source = testSource.runWith( val source = testSource.runWith(
PartitionHub.sink[java.lang.Integer]( PartitionHub.sink[java.lang.Integer](
(size, elem) => elem.intValue % NumberOfStreams, (_, elem) => elem.intValue % NumberOfStreams,
startAfterNrOfConsumers = NumberOfStreams, startAfterNrOfConsumers = NumberOfStreams,
bufferSize = BufferSize))(materializer) bufferSize = BufferSize))(materializer)

View file

@ -4,17 +4,17 @@
package akka.stream.impl package akka.stream.impl
import java.io.OutputStream
import java.util.concurrent.TimeUnit import java.util.concurrent.TimeUnit
import akka.Done import scala.concurrent.Await
import scala.concurrent.duration._
import akka.actor.ActorSystem import akka.actor.ActorSystem
import akka.stream.ActorMaterializer import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{ Keep, Sink, StreamConverters } import akka.stream.scaladsl.Keep
import akka.stream.scaladsl.Sink
import akka.stream.scaladsl.StreamConverters
import org.openjdk.jmh.annotations.TearDown import org.openjdk.jmh.annotations.TearDown
import scala.concurrent.{ Await, Future }
import scala.concurrent.duration._
import org.openjdk.jmh.annotations._ import org.openjdk.jmh.annotations._
object OutputStreamSourceStageBenchmark { object OutputStreamSourceStageBenchmark {
@ -30,9 +30,6 @@ class OutputStreamSourceStageBenchmark {
private val bytes: Array[Byte] = Array.emptyByteArray private val bytes: Array[Byte] = Array.emptyByteArray
private var os: OutputStream = _
private var done: Future[Done] = _
@Benchmark @Benchmark
@OperationsPerInvocation(WritesPerBench) @OperationsPerInvocation(WritesPerBench)
def consumeWrites(): Unit = { def consumeWrites(): Unit = {

View file

@ -35,7 +35,7 @@ class FileSourcesScaleBenchmark {
val files: Seq[Path] = { val files: Seq[Path] = {
val line = ByteString("x" * 2048 + "\n") val line = ByteString("x" * 2048 + "\n")
(1 to FILES_NUMBER).map(i => { (1 to FILES_NUMBER).map(i => {
val f = Files.createTempFile(getClass.getName, i + ".bench.tmp") val f = Files.createTempFile(getClass.getName, s"$i.bench.tmp")
val ft = Source val ft = Source
.fromIterator(() => Iterator.continually(line)) .fromIterator(() => Iterator.continually(line))

View file

@ -21,7 +21,6 @@ object AkkaDisciplinePlugin extends AutoPlugin with ScalafixSupport {
// We allow warnings in docs to get the 'snippets' right // We allow warnings in docs to get the 'snippets' right
"akka-docs", "akka-docs",
// To be reviewed // To be reviewed
"akka-bench-jmh",
"akka-bench-jmh-typed") "akka-bench-jmh-typed")
val strictProjects = Set("akka-discovery", "akka-protobuf", "akka-coordination") val strictProjects = Set("akka-discovery", "akka-protobuf", "akka-coordination")