diff --git a/akka-actor/src/main/scala/akka/actor/Stash.scala b/akka-actor/src/main/scala/akka/actor/Stash.scala index feaec7e64b..1abbb9f6fb 100644 --- a/akka-actor/src/main/scala/akka/actor/Stash.scala +++ b/akka-actor/src/main/scala/akka/actor/Stash.scala @@ -123,14 +123,8 @@ private[akka] trait StashSupport { /* The capacity of the stash. Configured in the actor's mailbox or dispatcher config. */ - private val capacity: Int = { - val dispatcher = context.system.settings.config.getConfig(context.props.dispatcher) - val fallback = dispatcher.withFallback(context.system.settings.config.getConfig(Mailboxes.DefaultMailboxId)) - val config = - if (context.props.mailbox == Mailboxes.DefaultMailboxId) fallback - else context.system.settings.config.getConfig(context.props.mailbox).withFallback(fallback) - config.getInt("stash-capacity") - } + private val capacity: Int = + context.system.mailboxes.stashCapacity(context.props.dispatcher, context.props.mailbox) /** * INTERNAL API. diff --git a/akka-actor/src/main/scala/akka/dispatch/Mailboxes.scala b/akka-actor/src/main/scala/akka/dispatch/Mailboxes.scala index 9f69bcd76a..59c70ee65c 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Mailboxes.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Mailboxes.scala @@ -6,7 +6,6 @@ package akka.dispatch import java.lang.reflect.ParameterizedType import java.util.concurrent.ConcurrentHashMap - import akka.ConfigurationException import akka.actor.{ Actor, ActorRef, ActorSystem, DeadLetter, Deploy, DynamicAccess, Props } import akka.dispatch.sysmsg.{ EarliestFirstSystemMessageList, LatestFirstSystemMessageList, SystemMessage, SystemMessageList } @@ -14,8 +13,9 @@ import akka.event.EventStream import akka.event.Logging.Warning import akka.util.Reflect import com.typesafe.config.{ Config, ConfigFactory } - import scala.util.control.NonFatal +import java.util.concurrent.atomic.AtomicReference +import scala.annotation.tailrec object Mailboxes { final val DefaultMailboxId = "akka.actor.default-mailbox" @@ -232,4 +232,42 @@ private[akka] class Mailboxes( .withFallback(settings.config.getConfig(id)) .withFallback(defaultMailboxConfig) } + + private val stashCapacityCache = new AtomicReference[Map[String, Int]](Map.empty[String, Int]) + private val defaultStashCapacity: Int = + stashCapacityFromConfig(Dispatchers.DefaultDispatcherId, Mailboxes.DefaultMailboxId) + + /** + * INTERNAL API: The capacity of the stash. Configured in the actor's mailbox or dispatcher config. + */ + private[akka] final def stashCapacity(dispatcher: String, mailbox: String): Int = { + + @tailrec def updateCache(cache: Map[String, Int], key: String, value: Int): Boolean = { + stashCapacityCache.compareAndSet(cache, cache.updated(key, value)) || + updateCache(stashCapacityCache.get, key, value) // recursive, try again + } + + if (dispatcher == Dispatchers.DefaultDispatcherId && mailbox == Mailboxes.DefaultMailboxId) + defaultStashCapacity + else { + val cache = stashCapacityCache.get + val key = dispatcher + "-" + mailbox + cache.get(key) match { + case Some(value) ⇒ value + case None ⇒ + val value = stashCapacityFromConfig(dispatcher, mailbox) + updateCache(cache, key, value) + value + } + } + } + + private def stashCapacityFromConfig(dispatcher: String, mailbox: String): Int = { + val disp = settings.config.getConfig(dispatcher) + val fallback = disp.withFallback(settings.config.getConfig(Mailboxes.DefaultMailboxId)) + val config = + if (mailbox == Mailboxes.DefaultMailboxId) fallback + else settings.config.getConfig(mailbox).withFallback(fallback) + config.getInt("stash-capacity") + } } diff --git a/akka-bench-jmh/src/main/scala/akka/actor/StashCreationBenchmark.scala b/akka-bench-jmh/src/main/scala/akka/actor/StashCreationBenchmark.scala new file mode 100644 index 0000000000..54baf8c547 --- /dev/null +++ b/akka-bench-jmh/src/main/scala/akka/actor/StashCreationBenchmark.scala @@ -0,0 +1,61 @@ +/** + * Copyright (C) 2015 Typesafe Inc. + */ +package akka.actor + +import scala.concurrent.Await +import scala.concurrent.duration._ +import akka.testkit.TestProbe +import com.typesafe.config.ConfigFactory +import org.openjdk.jmh.annotations._ +import java.util.concurrent.TimeUnit + +object StashCreationBenchmark { + class StashingActor extends Actor with Stash { + def receive = { + case msg => sender() ! msg + } + } + + val props = Props[StashingActor] +} + +@State(Scope.Benchmark) +@BenchmarkMode(Array(Mode.SampleTime)) +@Fork(3) +@Warmup(iterations = 5) +@Measurement(iterations = 10) +class StashCreationBenchmark { + val conf = ConfigFactory.parseString(""" + my-dispatcher = { + stash-capacity = 1000 + } + """) + implicit val system: ActorSystem = ActorSystem("StashCreationBenchmark", conf) + val probe = TestProbe() + + @TearDown(Level.Trial) + def shutdown() { + system.terminate() + Await.ready(system.whenTerminated, 15.seconds) + } + + @Benchmark + @OutputTimeUnit(TimeUnit.MICROSECONDS) + def testDefault: Boolean = { + val stash = system.actorOf(StashCreationBenchmark.props) + stash.tell("hello", probe.ref) + probe.expectMsg("hello") + true + } + + @Benchmark + @OutputTimeUnit(TimeUnit.MICROSECONDS) + def testCustom: Boolean = { + val stash = system.actorOf(StashCreationBenchmark.props.withDispatcher("my-dispatcher")) + stash.tell("hello", probe.ref) + probe.expectMsg("hello") + true + } +} +