diff --git a/akka-bench-jmh/src/main/scala/akka/actor/BenchmarkActors.scala b/akka-bench-jmh/src/main/scala/akka/actor/BenchmarkActors.scala index 10b5f404ed..4556c641e5 100644 --- a/akka-bench-jmh/src/main/scala/akka/actor/BenchmarkActors.scala +++ b/akka-bench-jmh/src/main/scala/akka/actor/BenchmarkActors.scala @@ -94,20 +94,20 @@ object BenchmarkActors { } private def startPingPongActorPairs(messagesPerPair: Int, numPairs: Int, dispatcher: String)( - implicit system: ActorSystem) = { + implicit system: ActorSystem): (Vector[(ActorRef, ActorRef)], CountDownLatch) = { val fullPathToDispatcher = "akka.actor." + dispatcher val latch = new CountDownLatch(numPairs * 2) - val actors = for { - i <- (1 to numPairs).toVector - } yield { - val ping = system.actorOf(PingPong.props(messagesPerPair, latch).withDispatcher(fullPathToDispatcher)) - val pong = system.actorOf(PingPong.props(messagesPerPair, latch).withDispatcher(fullPathToDispatcher)) - (ping, pong) - } + val actors = List + .fill(numPairs) { + val ping = system.actorOf(PingPong.props(messagesPerPair, latch).withDispatcher(fullPathToDispatcher)) + val pong = system.actorOf(PingPong.props(messagesPerPair, latch).withDispatcher(fullPathToDispatcher)) + (ping, pong) + } + .toVector (actors, latch) } - private def initiatePingPongForPairs(refs: Vector[(ActorRef, ActorRef)], inFlight: Int) = { + private def initiatePingPongForPairs(refs: Vector[(ActorRef, ActorRef)], inFlight: Int): Unit = { for { (ping, pong) <- refs _ <- 1 to inFlight @@ -117,7 +117,7 @@ object BenchmarkActors { } private def startEchoActorPairs(messagesPerPair: Int, numPairs: Int, dispatcher: String, batchSize: Int)( - implicit system: ActorSystem) = { + implicit system: ActorSystem): (Vector[ActorRef], CountDownLatch) = { val fullPathToDispatcher = "akka.actor." + dispatcher val latch = new CountDownLatch(numPairs) @@ -127,18 +127,18 @@ object BenchmarkActors { (actors, latch) } - private def initiateEchoPairs(refs: Vector[ActorRef]) = { + private def initiateEchoPairs(refs: Vector[ActorRef]): Unit = { refs.foreach(_ ! Message) } - def printProgress(totalMessages: Long, numActors: Int, startNanoTime: Long) = { + def printProgress(totalMessages: Long, numActors: Int, startNanoTime: Long): Unit = { val durationMicros = (System.nanoTime() - startNanoTime) / 1000 println( f" $totalMessages messages by $numActors actors took ${durationMicros / 1000} ms, " + f"${totalMessages.toDouble / durationMicros}%,.2f M msg/s") } - def requireRightNumberOfCores(numCores: Int) = + def requireRightNumberOfCores(numCores: Int): Unit = require( Runtime.getRuntime.availableProcessors == numCores, s"Update the cores constant to ${Runtime.getRuntime.availableProcessors}") diff --git a/akka-bench-jmh/src/main/scala/akka/actor/DirectByteBufferPoolBenchmark.scala b/akka-bench-jmh/src/main/scala/akka/actor/DirectByteBufferPoolBenchmark.scala index 9dd17f213c..929e43454a 100644 --- a/akka-bench-jmh/src/main/scala/akka/actor/DirectByteBufferPoolBenchmark.scala +++ b/akka-bench-jmh/src/main/scala/akka/actor/DirectByteBufferPoolBenchmark.scala @@ -23,7 +23,7 @@ class DirectByteBufferPoolBenchmark { val random = new Random - var arteryPool: DirectByteBufferPool = _ + private[akka] var arteryPool: DirectByteBufferPool = _ @Setup(Level.Trial) def setup(): Unit = { diff --git a/akka-bench-jmh/src/main/scala/akka/actor/TellOnlyBenchmark.scala b/akka-bench-jmh/src/main/scala/akka/actor/TellOnlyBenchmark.scala index e62341cf09..9ca90c2aa5 100644 --- a/akka-bench-jmh/src/main/scala/akka/actor/TellOnlyBenchmark.scala +++ b/akka-bench-jmh/src/main/scala/akka/actor/TellOnlyBenchmark.scala @@ -67,14 +67,14 @@ class TellOnlyBenchmark { probe.send(actor, message) probe.expectMsg(message) probe.send(actor, flipDrop) - probe.expectNoMsg(200.millis) + probe.expectNoMessage(200.millis) System.gc() } @TearDown(Level.Iteration) def shutdownIteration(): Unit = { probe.send(actor, flipDrop) - probe.expectNoMsg(200.millis) + probe.expectNoMessage(200.millis) actor ! stop probe.expectTerminated(actor, timeout) actor = null diff --git a/akka-bench-jmh/src/main/scala/akka/cluster/ddata/ORSetSerializationBenchmark.scala b/akka-bench-jmh/src/main/scala/akka/cluster/ddata/ORSetSerializationBenchmark.scala index b273bbba88..581c9bda70 100644 --- a/akka-bench-jmh/src/main/scala/akka/cluster/ddata/ORSetSerializationBenchmark.scala +++ b/akka-bench-jmh/src/main/scala/akka/cluster/ddata/ORSetSerializationBenchmark.scala @@ -51,8 +51,10 @@ class ORSetSerializationBenchmark { private val ref2 = (1 to 10).map(n => system2.actorOf(Props.empty, s"ref2-$n")) private val orSet = { - val set1 = ref1.foldLeft(ORSet.empty[ActorRef]) { case (acc, r) => acc.add(Cluster(system1), r) } - val set2 = ref2.foldLeft(ORSet.empty[ActorRef]) { case (acc, r) => acc.add(Cluster(system2), r) } + val selfUniqueAddress1 = SelfUniqueAddress(Cluster(system1).selfUniqueAddress) + val selfUniqueAddress2 = SelfUniqueAddress(Cluster(system2).selfUniqueAddress) + val set1 = ref1.foldLeft(ORSet.empty[ActorRef]) { case (acc, r) => acc.add(selfUniqueAddress1, r) } + val set2 = ref2.foldLeft(ORSet.empty[ActorRef]) { case (acc, r) => acc.add(selfUniqueAddress2, r) } set1.merge(set2) } diff --git a/akka-bench-jmh/src/main/scala/akka/cluster/ddata/VersionVectorBenchmark.scala b/akka-bench-jmh/src/main/scala/akka/cluster/ddata/VersionVectorBenchmark.scala index aba04cdaaa..996e6e4fc0 100644 --- a/akka-bench-jmh/src/main/scala/akka/cluster/ddata/VersionVectorBenchmark.scala +++ b/akka-bench-jmh/src/main/scala/akka/cluster/ddata/VersionVectorBenchmark.scala @@ -47,7 +47,7 @@ class VersionVectorBenchmark { @Setup(Level.Trial) def setup(): Unit = { - vv1 = (1 to size).foldLeft(VersionVector.empty)((vv, n) => vv + nextNode()) + vv1 = (1 to size).foldLeft(VersionVector.empty)((vv, _) => vv + nextNode()) vv2 = vv1 + nextNode() vv3 = vv1 + nextNode() dot1 = VersionVector(nodeA, vv1.versionAt(nodeA)) diff --git a/akka-bench-jmh/src/main/scala/akka/persistence/PersistenceActorDeferBenchmark.scala b/akka-bench-jmh/src/main/scala/akka/persistence/PersistenceActorDeferBenchmark.scala index 896cc40191..55788d2ba9 100644 --- a/akka-bench-jmh/src/main/scala/akka/persistence/PersistenceActorDeferBenchmark.scala +++ b/akka-bench-jmh/src/main/scala/akka/persistence/PersistenceActorDeferBenchmark.scala @@ -88,7 +88,7 @@ class `persistAsync, defer`(respondAfter: Int) extends PersistentActor { override def receiveCommand = { case n: Int => - persistAsync(Evt(n)) { e => + persistAsync(Evt(n)) { _ => } deferAsync(Evt(n)) { e => if (e.i == respondAfter) sender() ! e.i @@ -104,9 +104,9 @@ class `persistAsync, defer, respond ASAP`(respondAfter: Int) extends PersistentA override def receiveCommand = { case n: Int => - persistAsync(Evt(n)) { e => + persistAsync(Evt(n)) { _ => } - deferAsync(Evt(n)) { e => + deferAsync(Evt(n)) { _ => } if (n == respondAfter) sender() ! n } diff --git a/akka-bench-jmh/src/main/scala/akka/persistence/PersistentActorWithAtLeastOnceDeliveryBenchmark.scala b/akka-bench-jmh/src/main/scala/akka/persistence/PersistentActorWithAtLeastOnceDeliveryBenchmark.scala index 8547136d29..8ab28ddd14 100644 --- a/akka-bench-jmh/src/main/scala/akka/persistence/PersistentActorWithAtLeastOnceDeliveryBenchmark.scala +++ b/akka-bench-jmh/src/main/scala/akka/persistence/PersistentActorWithAtLeastOnceDeliveryBenchmark.scala @@ -141,7 +141,7 @@ class PersistPersistentActorWithAtLeastOnceDelivery( override def receiveCommand = { case n: Int => - persist(MsgSent(n)) { e => + persist(MsgSent(n)) { _ => deliver(downStream)(deliveryId => Msg(deliveryId, n)) if (n == respondAfter) //switch to wait all message confirmed @@ -180,7 +180,7 @@ class PersistAsyncPersistentActorWithAtLeastOnceDelivery( override def receiveCommand = { case n: Int => - persistAsync(MsgSent(n)) { e => + persistAsync(MsgSent(n)) { _ => deliver(downStream)(deliveryId => Msg(deliveryId, n)) if (n == respondAfter) //switch to wait all message confirmed diff --git a/akka-bench-jmh/src/main/scala/akka/remote/artery/CodecBenchmark.scala b/akka-bench-jmh/src/main/scala/akka/remote/artery/CodecBenchmark.scala index 7e92418fa4..3dc5621f17 100644 --- a/akka-bench-jmh/src/main/scala/akka/remote/artery/CodecBenchmark.scala +++ b/akka-bench-jmh/src/main/scala/akka/remote/artery/CodecBenchmark.scala @@ -4,31 +4,34 @@ package akka.remote.artery -import akka.actor._ -import akka.Done -import akka.NotUsed -import akka.remote._ -import akka.remote.artery.compress._ -import akka.serialization.{ BaseSerializer, ByteBufferSerializer, SerializationExtension } -import akka.stream.ActorMaterializer -import akka.stream.ActorMaterializerSettings -import akka.stream.scaladsl._ -import akka.util.OptionVal -import com.typesafe.config.ConfigFactory import java.io.IOException import java.nio.ByteBuffer import java.nio.ByteOrder import java.util.concurrent.CountDownLatch import java.util.concurrent.TimeUnit -import akka.remote.artery.Decoder.InboundCompressionAccess -import org.openjdk.jmh.annotations._ - import scala.annotation.tailrec import scala.concurrent.Await import scala.concurrent.Future import scala.concurrent.duration._ +import akka.Done +import akka.NotUsed +import akka.actor._ +import akka.remote._ +import akka.remote.artery.Decoder.InboundCompressionAccess +import akka.remote.artery.compress._ +import akka.serialization.BaseSerializer +import akka.serialization.ByteBufferSerializer +import akka.serialization.SerializationExtension +import akka.stream.ActorMaterializer +import akka.stream.ActorMaterializerSettings +import akka.stream.scaladsl._ +import akka.util.OptionVal +import com.github.ghik.silencer.silent +import com.typesafe.config.ConfigFactory +import org.openjdk.jmh.annotations._ + @State(Scope.Benchmark) @OutputTimeUnit(TimeUnit.MILLISECONDS) @BenchmarkMode(Array(Mode.Throughput)) @@ -38,6 +41,7 @@ import scala.concurrent.duration._ class CodecBenchmark { import CodecBenchmark._ + @silent("immutable val") // JMH updates this via reflection @Param(Array(Standard, RemoteInstrument)) private var configType: String = _ @@ -66,11 +70,11 @@ class CodecBenchmark { override def publishDropped(inbound: InboundEnvelope, reason: String): Unit = () } - private var materializer: ActorMaterializer = _ - private var remoteRefB: RemoteActorRef = _ - private var resolvedRef: InternalActorRef = _ - private var senderStringA: String = _ - private var recipientStringB: String = _ + @silent("never used") private var materializer: ActorMaterializer = _ + @silent("never used") private var remoteRefB: RemoteActorRef = _ + @silent("never used") private var resolvedRef: InternalActorRef = _ + @silent("never used") private var senderStringA: String = _ + @silent("never used") private var recipientStringB: String = _ private var encodeGraph: Flow[String, Unit, NotUsed] = _ private var decodeGraph: Flow[String, Unit, NotUsed] = _ @@ -112,7 +116,7 @@ class CodecBenchmark { val actorOnSystemA = system.actorOf(Props.empty, "a") senderStringA = actorOnSystemA.path.toSerializationFormatWithAddress(uniqueLocalAddress.address) - val actorOnSystemB = systemB.actorOf(Props.empty, "b") + systemB.actorOf(Props.empty, "b") val addressB = systemB.asInstanceOf[ExtendedActorSystem].provider.getDefaultAddress val rootB = RootActorPath(addressB) remoteRefB = Await @@ -147,7 +151,7 @@ class CodecBenchmark { debugLogSend = false, version = ArteryTransport.HighestVersion)) val encoderInput: Flow[String, OutboundEnvelope, NotUsed] = - Flow[String].map(msg => outboundEnvelopePool.acquire().init(OptionVal.None, payload, OptionVal.Some(remoteRefB))) + Flow[String].map(_ => outboundEnvelopePool.acquire().init(OptionVal.None, payload, OptionVal.Some(remoteRefB))) val compressions = new InboundCompressionsImpl(system, inboundContext, inboundContext.settings.Advanced.Compression) val decoder: Flow[EnvelopeBuffer, InboundEnvelope, InboundCompressionAccess] = Flow.fromGraph( diff --git a/akka-bench-jmh/src/main/scala/akka/serialization/jackson/JacksonSerializationBench.scala b/akka-bench-jmh/src/main/scala/akka/serialization/jackson/JacksonSerializationBench.scala index 029464e320..5ed62697fc 100644 --- a/akka-bench-jmh/src/main/scala/akka/serialization/jackson/JacksonSerializationBench.scala +++ b/akka-bench-jmh/src/main/scala/akka/serialization/jackson/JacksonSerializationBench.scala @@ -17,6 +17,7 @@ import akka.actor._ import akka.serialization.Serialization import akka.serialization.SerializationExtension import akka.serialization.SerializerWithStringManifest +import com.github.ghik.silencer.silent import com.typesafe.config.ConfigFactory import org.openjdk.jmh.annotations._ @@ -184,6 +185,7 @@ class JacksonSerializationBench { var system: ActorSystem = _ var serialization: Serialization = _ + @silent("immutable val") // JMH updates this via reflection @Param(Array("jackson-json", "jackson-cbor")) // "java" private var serializerName: String = _ diff --git a/akka-bench-jmh/src/main/scala/akka/stream/AskBenchmark.scala b/akka-bench-jmh/src/main/scala/akka/stream/AskBenchmark.scala index e39605ae12..afb178afa2 100644 --- a/akka-bench-jmh/src/main/scala/akka/stream/AskBenchmark.scala +++ b/akka-bench-jmh/src/main/scala/akka/stream/AskBenchmark.scala @@ -16,7 +16,7 @@ import com.typesafe.config.ConfigFactory import org.openjdk.jmh.annotations._ import scala.concurrent.duration._ -import scala.concurrent.{ Await, Future } +import scala.concurrent.Await object AskBenchmark { final val OperationsPerInvocation = 100000 diff --git a/akka-bench-jmh/src/main/scala/akka/stream/FlowMapBenchmark.scala b/akka-bench-jmh/src/main/scala/akka/stream/FlowMapBenchmark.scala index de8d7f3baf..1435636859 100644 --- a/akka-bench-jmh/src/main/scala/akka/stream/FlowMapBenchmark.scala +++ b/akka-bench-jmh/src/main/scala/akka/stream/FlowMapBenchmark.scala @@ -100,7 +100,7 @@ class FlowMapBenchmark { // source setup private def mkMaps[O, Mat](source: Source[O, Mat], count: Int)(flow: => Graph[FlowShape[O, O], _]): Source[O, Mat] = { var f = source - for (i <- 1 to count) + for (_ <- 1 to count) f = f.via(flow) f } diff --git a/akka-bench-jmh/src/main/scala/akka/stream/MaterializationBenchmark.scala b/akka-bench-jmh/src/main/scala/akka/stream/MaterializationBenchmark.scala index 6fcb0ebd4d..609f017afb 100644 --- a/akka-bench-jmh/src/main/scala/akka/stream/MaterializationBenchmark.scala +++ b/akka-bench-jmh/src/main/scala/akka/stream/MaterializationBenchmark.scala @@ -48,7 +48,7 @@ object MaterializationBenchmark { val broadcast = b.add(Broadcast[Unit](numOfJunctions)) val merge = b.add(Merge[Unit](numOfJunctions)) - for (i <- 0 until numOfJunctions) { + for (_ <- 0 until numOfJunctions) { broadcast ~> merge } @@ -62,7 +62,7 @@ object MaterializationBenchmark { import GraphDSL.Implicits._ val flow = Flow[Unit].map(identity) var out: Outlet[Unit] = source.out - for (i <- 0 until numOfFlows) { + for (_ <- 0 until numOfFlows) { val flowShape = b.add(flow) out ~> flowShape out = flowShape.outlet diff --git a/akka-bench-jmh/src/main/scala/akka/stream/NewLayoutBenchmark.scala b/akka-bench-jmh/src/main/scala/akka/stream/NewLayoutBenchmark.template similarity index 99% rename from akka-bench-jmh/src/main/scala/akka/stream/NewLayoutBenchmark.scala rename to akka-bench-jmh/src/main/scala/akka/stream/NewLayoutBenchmark.template index c61dc8c3dd..31e34c5e98 100644 --- a/akka-bench-jmh/src/main/scala/akka/stream/NewLayoutBenchmark.scala +++ b/akka-bench-jmh/src/main/scala/akka/stream/NewLayoutBenchmark.template @@ -4,7 +4,6 @@ package akka.stream -/* import java.util import java.util.concurrent.TimeUnit @@ -18,7 +17,6 @@ import org.reactivestreams.{ Publisher, Subscriber, Subscription } @OutputTimeUnit(TimeUnit.MILLISECONDS) @BenchmarkMode(Array(Mode.Throughput)) class NewLayoutBenchmark { - // TODO: This benchmark is heavily copy-pasta. This is a temporary benchmark as these two implementations // will never exist at the same time. This needs to be turned into a better one once the design // settles. @@ -360,6 +358,4 @@ class NewLayoutBenchmark { def mat_source_flow_and_sink_old(blackhole: org.openjdk.jmh.infra.Blackhole): Unit = { testMaterializeOld(sourceFlowSinkOld, blackhole: org.openjdk.jmh.infra.Blackhole) } - } - */ diff --git a/akka-bench-jmh/src/main/scala/akka/stream/PartitionHubBenchmark.scala b/akka-bench-jmh/src/main/scala/akka/stream/PartitionHubBenchmark.scala index eeb7f08fc6..f63f57f682 100644 --- a/akka-bench-jmh/src/main/scala/akka/stream/PartitionHubBenchmark.scala +++ b/akka-bench-jmh/src/main/scala/akka/stream/PartitionHubBenchmark.scala @@ -69,7 +69,7 @@ class PartitionHubBenchmark { val source = testSource.runWith( PartitionHub.sink[java.lang.Integer]( - (size, elem) => elem.intValue % NumberOfStreams, + (_, elem) => elem.intValue % NumberOfStreams, startAfterNrOfConsumers = NumberOfStreams, bufferSize = BufferSize))(materializer) diff --git a/akka-bench-jmh/src/main/scala/akka/stream/impl/OutputStreamSourceStageBenchmark.scala b/akka-bench-jmh/src/main/scala/akka/stream/impl/OutputStreamSourceStageBenchmark.scala index 26655089b3..ebeccc3a5b 100644 --- a/akka-bench-jmh/src/main/scala/akka/stream/impl/OutputStreamSourceStageBenchmark.scala +++ b/akka-bench-jmh/src/main/scala/akka/stream/impl/OutputStreamSourceStageBenchmark.scala @@ -4,17 +4,17 @@ package akka.stream.impl -import java.io.OutputStream import java.util.concurrent.TimeUnit -import akka.Done +import scala.concurrent.Await +import scala.concurrent.duration._ + import akka.actor.ActorSystem import akka.stream.ActorMaterializer -import akka.stream.scaladsl.{ Keep, Sink, StreamConverters } +import akka.stream.scaladsl.Keep +import akka.stream.scaladsl.Sink +import akka.stream.scaladsl.StreamConverters import org.openjdk.jmh.annotations.TearDown - -import scala.concurrent.{ Await, Future } -import scala.concurrent.duration._ import org.openjdk.jmh.annotations._ object OutputStreamSourceStageBenchmark { @@ -30,9 +30,6 @@ class OutputStreamSourceStageBenchmark { private val bytes: Array[Byte] = Array.emptyByteArray - private var os: OutputStream = _ - private var done: Future[Done] = _ - @Benchmark @OperationsPerInvocation(WritesPerBench) def consumeWrites(): Unit = { diff --git a/akka-bench-jmh/src/main/scala/akka/stream/io/FileSourcesScaleBenchmark.scala b/akka-bench-jmh/src/main/scala/akka/stream/io/FileSourcesScaleBenchmark.scala index aedf05484c..0e2f08a634 100644 --- a/akka-bench-jmh/src/main/scala/akka/stream/io/FileSourcesScaleBenchmark.scala +++ b/akka-bench-jmh/src/main/scala/akka/stream/io/FileSourcesScaleBenchmark.scala @@ -35,7 +35,7 @@ class FileSourcesScaleBenchmark { val files: Seq[Path] = { val line = ByteString("x" * 2048 + "\n") (1 to FILES_NUMBER).map(i => { - val f = Files.createTempFile(getClass.getName, i + ".bench.tmp") + val f = Files.createTempFile(getClass.getName, s"$i.bench.tmp") val ft = Source .fromIterator(() => Iterator.continually(line)) diff --git a/project/AkkaDisciplinePlugin.scala b/project/AkkaDisciplinePlugin.scala index e25fe2fdf6..bf36a3687b 100644 --- a/project/AkkaDisciplinePlugin.scala +++ b/project/AkkaDisciplinePlugin.scala @@ -21,7 +21,6 @@ object AkkaDisciplinePlugin extends AutoPlugin with ScalafixSupport { // We allow warnings in docs to get the 'snippets' right "akka-docs", // To be reviewed - "akka-bench-jmh", "akka-bench-jmh-typed") val strictProjects = Set("akka-discovery", "akka-protobuf", "akka-coordination")