Make SingleConsumerMultiProducer the default mail box for typed (#27857)
* Merge benchmark projects * Make SingleConsumerMultiProducer the default mail box for typed * Make SingleConsumerMultiProducer the default mail box for typed * Add default mailbox back to classic + define default mailbox for typed in reference * Fix custom dispatcher
This commit is contained in:
commit
dfaee6f9a1
14 changed files with 72 additions and 90 deletions
|
|
@ -0,0 +1,23 @@
|
|||
/*
|
||||
* Copyright (C) 2019 Lightbend Inc. <https://www.lightbend.com>
|
||||
*/
|
||||
|
||||
package akka.actor.typed.internal.adpater
|
||||
|
||||
import akka.actor
|
||||
import akka.actor.typed.Props
|
||||
import akka.actor.typed.internal.adapter.PropsAdapter
|
||||
import akka.actor.typed.scaladsl.Behaviors
|
||||
import org.scalatest.Matchers
|
||||
import org.scalatest.WordSpec
|
||||
|
||||
class PropsAdapterSpec extends WordSpec with Matchers {
|
||||
|
||||
"PropsAdapter" should {
|
||||
"default to akka.dispatch.SingleConsumerOnlyUnboundedMailbox" in {
|
||||
val props: Props = Props.empty
|
||||
val pa: actor.Props = PropsAdapter(() => Behaviors.empty, props)
|
||||
pa.mailbox shouldEqual "akka.actor.typed.default-mailbox"
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -23,6 +23,11 @@ akka.actor.typed {
|
|||
# behavior. This property defines the capacity in number of messages of the stash
|
||||
# buffer. If the capacity is exceed then additional incoming messages are dropped.
|
||||
restart-stash-capacity = 1000
|
||||
|
||||
# Typed mailbox defaults to the single consumber mailbox as balancing dispatcher is not supported
|
||||
default-mailbox {
|
||||
mailbox-type = "akka.dispatch.SingleConsumerOnlyUnboundedMailbox"
|
||||
}
|
||||
}
|
||||
|
||||
# Load typed extensions by a classic extension.
|
||||
|
|
|
|||
|
|
@ -189,12 +189,12 @@ abstract class MailboxSelector extends Props
|
|||
object MailboxSelector {
|
||||
|
||||
/**
|
||||
* Scala API: The default mailbox is unbounded and backed by a [[java.util.concurrent.ConcurrentLinkedQueue]]
|
||||
* Scala API: The default mailbox is SingleConsumerOnlyUnboundedMailbox
|
||||
*/
|
||||
def default(): MailboxSelector = DefaultMailboxSelector.empty
|
||||
def default(): MailboxSelector = fromConfig("akka.actor.typed.default-mailbox")
|
||||
|
||||
/**
|
||||
* Java API: The default mailbox is unbounded and backed by a [[java.util.concurrent.ConcurrentLinkedQueue]]
|
||||
* Java API: The default mailbox is SingleConsumerOnlyUnboundedMailbox
|
||||
*/
|
||||
def defaultMailbox(): MailboxSelector = default()
|
||||
|
||||
|
|
|
|||
|
|
@ -23,22 +23,22 @@ import akka.dispatch.Mailboxes
|
|||
rethrowTypedFailure: Boolean = true): akka.actor.Props = {
|
||||
val props = akka.actor.Props(new ActorAdapter(behavior(), rethrowTypedFailure))
|
||||
|
||||
val p1 = (deploy.firstOrElse[DispatcherSelector](DispatcherDefault.empty) match {
|
||||
val dispatcherProps = (deploy.firstOrElse[DispatcherSelector](DispatcherDefault.empty) match {
|
||||
case _: DispatcherDefault => props
|
||||
case DispatcherFromConfig(name, _) => props.withDispatcher(name)
|
||||
case _: DispatcherSameAsParent => props.withDispatcher(Deploy.DispatcherSameAsParent)
|
||||
}).withDeploy(Deploy.local) // disallow remote deployment for typed actors
|
||||
|
||||
val p2 = deploy.firstOrElse[MailboxSelector](MailboxSelector.default()) match {
|
||||
case _: DefaultMailboxSelector => p1
|
||||
val mailboxProps = deploy.firstOrElse[MailboxSelector](MailboxSelector.default()) match {
|
||||
case _: DefaultMailboxSelector => dispatcherProps
|
||||
case BoundedMailboxSelector(capacity, _) =>
|
||||
// specific support in classic Mailboxes
|
||||
p1.withMailbox(s"${Mailboxes.BoundedCapacityPrefix}$capacity")
|
||||
dispatcherProps.withMailbox(s"${Mailboxes.BoundedCapacityPrefix}$capacity")
|
||||
case MailboxFromConfigSelector(path, _) =>
|
||||
props.withMailbox(path)
|
||||
dispatcherProps.withMailbox(path)
|
||||
}
|
||||
|
||||
p2.withDeploy(Deploy.local) // disallow remote deployment for typed actors
|
||||
mailboxProps.withDeploy(Deploy.local) // disallow remote deployment for typed actors
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,10 +0,0 @@
|
|||
# Akka Microbenchmarks
|
||||
|
||||
This subproject contains some microbenchmarks excercising key parts of Akka Typed.
|
||||
|
||||
You can run them like:
|
||||
|
||||
project akka-bench-jmh-typed
|
||||
jmh:run -i 3 -wi 3 -f 1 .*ActorCreationBenchmark
|
||||
|
||||
Use 'jmh:run -h' to get an overview of the available options.
|
||||
|
|
@ -1,36 +0,0 @@
|
|||
/*
|
||||
* Copyright (C) 2018-2019 Lightbend Inc. <https://www.lightbend.com>
|
||||
*/
|
||||
|
||||
package akka
|
||||
|
||||
import org.openjdk.jmh.results.RunResult
|
||||
import org.openjdk.jmh.runner.Runner
|
||||
import org.openjdk.jmh.runner.options.CommandLineOptions
|
||||
|
||||
object BenchRunner {
|
||||
def main(args: Array[String]) = {
|
||||
import akka.util.ccompat.JavaConverters._
|
||||
|
||||
val args2 = args.toList.flatMap {
|
||||
case "quick" => "-i 1 -wi 1 -f1 -t1".split(" ").toList
|
||||
case "full" => "-i 10 -wi 4 -f3 -t1".split(" ").toList
|
||||
case "jitwatch" => "-jvmArgs=-XX:+UnlockDiagnosticVMOptions -XX:+TraceClassLoading -XX:+LogCompilation" :: Nil
|
||||
case other => other :: Nil
|
||||
}
|
||||
|
||||
val opts = new CommandLineOptions(args2: _*)
|
||||
val results = new Runner(opts).run()
|
||||
|
||||
val report = results.asScala.map { result: RunResult =>
|
||||
val bench = result.getParams.getBenchmark
|
||||
val params =
|
||||
result.getParams.getParamsKeys.asScala.map(key => s"$key=${result.getParams.getParam(key)}").mkString("_")
|
||||
val score = result.getAggregatedResult.getPrimaryResult.getScore.round
|
||||
val unit = result.getAggregatedResult.getPrimaryResult.getScoreUnit
|
||||
s"\t${bench}_${params}\t$score\t$unit"
|
||||
}
|
||||
|
||||
report.toList.sorted.foreach(println)
|
||||
}
|
||||
}
|
||||
|
|
@ -10,3 +10,8 @@ You can run them like:
|
|||
jmh:run -i 3 -wi 3 -f 1 .*ActorCreationBenchmark
|
||||
|
||||
Use 'jmh:run -h' to get an overview of the available options.
|
||||
|
||||
Some potentially out of date resources for writing JMH benchmarks:
|
||||
|
||||
* [Studying what's wrong with JMH benchmarks](https://www.researchgate.net/publication/333825812_What's_Wrong_With_My_Benchmark_Results_Studying_Bad_Practices_in_JMH_Benchmarks)
|
||||
* [Writing good benchmarks](http://tutorials.jenkov.com/java-performance/jmh.html#writing-good-benchmarks)
|
||||
|
|
|
|||
|
|
@ -39,7 +39,7 @@ class TypedActorBenchmark {
|
|||
@Param(Array("50"))
|
||||
var batchSize = 0
|
||||
|
||||
@Param(Array("akka.dispatch.SingleConsumerOnlyUnboundedMailbox"))
|
||||
@Param(Array("akka.dispatch.SingleConsumerOnlyUnboundedMailbox", "akka.dispatch.UnboundedMailbox"))
|
||||
var mailbox = ""
|
||||
|
||||
@Param(Array("fjp-dispatcher")) // @Param(Array("fjp-dispatcher", "affinity-dispatcher"))
|
||||
|
|
@ -23,6 +23,23 @@ For more details on advanced mailbox config and custom mailbox implementations,
|
|||
|
||||
## Mailbox Selection
|
||||
|
||||
### Default Mailbox
|
||||
|
||||
The default mailbox is used when the mailbox is not specified.
|
||||
This is an unbounded mailbox, backed by a
|
||||
`java.util.concurrent.ConcurrentLinkedQueue`.
|
||||
|
||||
`SingleConsumerOnlyUnboundedMailbox` is an even more efficient mailbox, and
|
||||
it can be used as the default mailbox, but it cannot be used with a BalancingDispatcher.
|
||||
|
||||
Configuration of `SingleConsumerOnlyUnboundedMailbox` as default mailbox:
|
||||
|
||||
```
|
||||
akka.actor.default-mailbox {
|
||||
mailbox-type = "akka.dispatch.SingleConsumerOnlyUnboundedMailbox"
|
||||
}
|
||||
```
|
||||
|
||||
### Requiring a Message Queue Type for an Actor
|
||||
|
||||
It is possible to require a certain type of message queue for a certain type of actor
|
||||
|
|
|
|||
|
|
@ -46,20 +46,7 @@ Java
|
|||
|
||||
### Default Mailbox
|
||||
|
||||
The default mailbox is used when the mailbox is not specified.
|
||||
This is an unbounded mailbox, backed by a
|
||||
`java.util.concurrent.ConcurrentLinkedQueue`.
|
||||
|
||||
`SingleConsumerOnlyUnboundedMailbox` is an even more efficient mailbox, and
|
||||
it can be used as the default mailbox, but it cannot be used with a BalancingDispatcher.
|
||||
|
||||
Configuration of `SingleConsumerOnlyUnboundedMailbox` as default mailbox:
|
||||
|
||||
```
|
||||
akka.actor.default-mailbox {
|
||||
mailbox-type = "akka.dispatch.SingleConsumerOnlyUnboundedMailbox"
|
||||
}
|
||||
```
|
||||
The default mailbox is used when the mailbox is not specified and is the **SingleConsumerOnlyUnboundedMailbox**>
|
||||
|
||||
### Which Configuration is passed to the Mailbox Type
|
||||
|
||||
|
|
@ -75,19 +62,19 @@ fall-back to the default mailbox configuration section.
|
|||
Akka ships with a number of mailbox implementations:
|
||||
|
||||
*
|
||||
**UnboundedMailbox** (default)
|
||||
* The default mailbox
|
||||
* Backed by a `java.util.concurrent.ConcurrentLinkedQueue`
|
||||
* Blocking: No
|
||||
* Bounded: No
|
||||
* Configuration name: `"unbounded"` or `"akka.dispatch.UnboundedMailbox"`
|
||||
*
|
||||
**SingleConsumerOnlyUnboundedMailbox**
|
||||
Depending on your use case, this queue may or may not be faster than the default one — be sure to benchmark properly!
|
||||
**SingleConsumerOnlyUnboundedMailbox** (default)
|
||||
* This is the default
|
||||
* Backed by a Multiple-Producer Single-Consumer queue, cannot be used with `BalancingDispatcher`
|
||||
* Blocking: No
|
||||
* Bounded: No
|
||||
* Configuration name: `"akka.dispatch.SingleConsumerOnlyUnboundedMailbox"`
|
||||
*
|
||||
**UnboundedMailbox**
|
||||
* Backed by a `java.util.concurrent.ConcurrentLinkedQueue`
|
||||
* Blocking: No
|
||||
* Bounded: No
|
||||
* Configuration name: `"unbounded"` or `"akka.dispatch.UnboundedMailbox"`
|
||||
|
||||
*
|
||||
**NonBlockingBoundedMailbox**
|
||||
* Backed by a very efficient Multiple-Producer Single-Consumer queue
|
||||
|
|
|
|||
13
build.sbt
13
build.sbt
|
|
@ -42,7 +42,6 @@ lazy val aggregatedProjects: Seq[ProjectReference] = List[ProjectReference](
|
|||
actorTyped,
|
||||
actorTypedTests,
|
||||
benchJmh,
|
||||
benchJmhTyped,
|
||||
cluster,
|
||||
clusterMetrics,
|
||||
clusterSharding,
|
||||
|
|
@ -77,7 +76,7 @@ lazy val root = Project(id = "akka", base = file("."))
|
|||
.aggregate(aggregatedProjects: _*)
|
||||
.settings(rootSettings: _*)
|
||||
.settings(
|
||||
unidocRootIgnoreProjects := Seq(remoteTests, benchJmh, benchJmhTyped, protobuf, protobufV3, akkaScalaNightly, docs))
|
||||
unidocRootIgnoreProjects := Seq(remoteTests, benchJmh, protobuf, protobufV3, akkaScalaNightly, docs))
|
||||
.settings(unmanagedSources in (Compile, headerCreate) := (baseDirectory.value / "project").**("*.scala").get)
|
||||
.enablePlugins(CopyrightHeaderForBuild)
|
||||
|
||||
|
|
@ -104,21 +103,13 @@ lazy val akkaScalaNightly = akkaModule("akka-scala-nightly")
|
|||
.disablePlugins(ValidatePullRequest, MimaPlugin, CopyrightHeaderInPr)
|
||||
|
||||
lazy val benchJmh = akkaModule("akka-bench-jmh")
|
||||
.dependsOn(Seq(actor, stream, streamTests, persistence, distributedData, jackson, testkit).map(
|
||||
.dependsOn(Seq(actor, actorTyped, stream, streamTests, persistence, distributedData, jackson, testkit).map(
|
||||
_ % "compile->compile;compile->test"): _*)
|
||||
.settings(Dependencies.benchJmh)
|
||||
.settings(javacOptions += "-parameters") // for Jackson
|
||||
.enablePlugins(JmhPlugin, ScaladocNoVerificationOfDiagrams, NoPublish, CopyrightHeader)
|
||||
.disablePlugins(MimaPlugin, WhiteSourcePlugin, ValidatePullRequest, CopyrightHeaderInPr)
|
||||
|
||||
// typed benchmarks only on 2.12+
|
||||
lazy val benchJmhTyped = akkaModule("akka-bench-jmh-typed")
|
||||
.dependsOn(Seq(persistenceTyped, distributedData, clusterTyped, testkit, benchJmh).map(
|
||||
_ % "compile->compile;compile->test"): _*)
|
||||
.settings(Dependencies.benchJmh)
|
||||
.enablePlugins(JmhPlugin, ScaladocNoVerificationOfDiagrams, NoPublish, CopyrightHeader)
|
||||
.disablePlugins(MimaPlugin, WhiteSourcePlugin, ValidatePullRequest, CopyrightHeaderInPr)
|
||||
|
||||
lazy val cluster = akkaModule("akka-cluster")
|
||||
.dependsOn(remote, remoteTests % "test->test", testkit % "test->test", jackson % "test->test")
|
||||
.settings(Dependencies.cluster)
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue