Handle dispatcher aliases for stash config (#29485)
This commit is contained in:
parent
57d4368bed
commit
cf0fe82c16
4 changed files with 61 additions and 1 deletions
|
|
@ -63,6 +63,8 @@ object ActorWithBoundedStashSpec {
|
|||
|
||||
val dispatcherId1 = "my-dispatcher-1"
|
||||
val dispatcherId2 = "my-dispatcher-2"
|
||||
val aliasedDispatcherId1 = "my-aliased-dispatcher-1"
|
||||
val aliasedDispatcherId2 = "my-aliased-dispatcher-2"
|
||||
val mailboxId1 = "my-mailbox-1"
|
||||
val mailboxId2 = "my-mailbox-2"
|
||||
|
||||
|
|
@ -75,6 +77,8 @@ object ActorWithBoundedStashSpec {
|
|||
mailbox-type = "${classOf[Bounded100].getName}"
|
||||
stash-capacity = 20
|
||||
}
|
||||
$aliasedDispatcherId1 = $dispatcherId1
|
||||
$aliasedDispatcherId2 = $aliasedDispatcherId1
|
||||
$mailboxId1 {
|
||||
mailbox-type = "${classOf[Bounded10].getName}"
|
||||
stash-capacity = 20
|
||||
|
|
@ -154,5 +158,10 @@ class ActorWithBoundedStashSpec
|
|||
val stasher = system.actorOf(Props[StashingActorWithOverflow]().withMailbox(mailboxId2))
|
||||
testStashOverflowException(stasher)
|
||||
}
|
||||
|
||||
"get stash capacity from aliased dispatchers" in {
|
||||
val stasher = system.actorOf(Props[StashingActor]().withDispatcher(aliasedDispatcherId2))
|
||||
testDeadLetters(stasher)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -38,6 +38,8 @@ object DispatchersSpec {
|
|||
mymailbox {
|
||||
mailbox-type = "akka.actor.dispatch.DispatchersSpec$OneShotMailboxType"
|
||||
}
|
||||
my-aliased-dispatcher = myapp.mydispatcher
|
||||
missing-aliased-dispatcher = myapp.missing-dispatcher
|
||||
}
|
||||
akka.actor.deployment {
|
||||
/echo1 {
|
||||
|
|
@ -183,6 +185,38 @@ class DispatchersSpec extends AkkaSpec(DispatchersSpec.config) with ImplicitSend
|
|||
d1 should ===(d2)
|
||||
}
|
||||
|
||||
"provide lookup of aliased dispatchers" in {
|
||||
val d1 = lookup("myapp.mydispatcher")
|
||||
val d2 = lookup("myapp.my-aliased-dispatcher")
|
||||
d1 should ===(d2)
|
||||
}
|
||||
|
||||
"complain about missing aliased dispatchers" in {
|
||||
intercept[ConfigurationException] {
|
||||
lookup("myapp.missing-aliased-dispatcher")
|
||||
}
|
||||
}
|
||||
|
||||
"get config for dispatcher" in {
|
||||
val config = Dispatchers.getConfig(settings.config, "myapp.mydispatcher")
|
||||
config.getInt("throughput") should ===(17)
|
||||
}
|
||||
|
||||
"get config for aliased dispatcher" in {
|
||||
val config = Dispatchers.getConfig(settings.config, "myapp.my-aliased-dispatcher")
|
||||
config.getInt("throughput") should ===(17)
|
||||
}
|
||||
|
||||
"return empty config for missing dispatcher" in {
|
||||
val config = Dispatchers.getConfig(settings.config, "myapp.missing-dispatcher")
|
||||
config shouldBe empty
|
||||
}
|
||||
|
||||
"return empty config for missing aliased dispatcher" in {
|
||||
val config = Dispatchers.getConfig(settings.config, "myapp.missing-aliased-dispatcher")
|
||||
config shouldBe empty
|
||||
}
|
||||
|
||||
"include system name and dispatcher id in thread names for fork-join-executor" in {
|
||||
assertMyDispatcherIsUsed(system.actorOf(Props[ThreadNameEcho]().withDispatcher("myapp.mydispatcher")))
|
||||
}
|
||||
|
|
|
|||
|
|
@ -67,6 +67,23 @@ object Dispatchers {
|
|||
private[akka] final val InternalDispatcherId = "akka.actor.internal-dispatcher"
|
||||
|
||||
private val MaxDispatcherAliasDepth = 20
|
||||
|
||||
/**
|
||||
* INTERNAL API
|
||||
*
|
||||
* Get (possibly aliased) dispatcher config. Returns empty config if not found.
|
||||
*/
|
||||
private[akka] def getConfig(config: Config, id: String, depth: Int = 0): Config = {
|
||||
if (depth > MaxDispatcherAliasDepth)
|
||||
ConfigFactory.empty(s"Didn't find dispatcher config after $MaxDispatcherAliasDepth aliases")
|
||||
else if (config.hasPath(id)) {
|
||||
config.getValue(id).valueType match {
|
||||
case ConfigValueType.STRING => getConfig(config, config.getString(id), depth + 1)
|
||||
case ConfigValueType.OBJECT => config.getConfig(id)
|
||||
case unexpected => ConfigFactory.empty(s"Expected either config or alias at [$id] but found [$unexpected]")
|
||||
}
|
||||
} else ConfigFactory.empty(s"Dispatcher [$id] not configured")
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
|||
|
|
@ -303,7 +303,7 @@ private[akka] class Mailboxes(
|
|||
}
|
||||
|
||||
private def stashCapacityFromConfig(dispatcher: String, mailbox: String): Int = {
|
||||
val disp = settings.config.getConfig(dispatcher)
|
||||
val disp = Dispatchers.getConfig(settings.config, dispatcher)
|
||||
val fallback = disp.withFallback(settings.config.getConfig(Mailboxes.DefaultMailboxId))
|
||||
val config =
|
||||
if (mailbox == Mailboxes.DefaultMailboxId) fallback
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue