Added lookup method in Dispatchers to provide a registry of configured dispatchers to be shared between actors. See #1458
This commit is contained in:
parent
03e731e098
commit
eede488fd3
6 changed files with 57 additions and 20 deletions
|
|
@ -10,8 +10,18 @@ import akka.testkit.AkkaSpec
|
||||||
import scala.collection.JavaConverters._
|
import scala.collection.JavaConverters._
|
||||||
import com.typesafe.config.ConfigFactory
|
import com.typesafe.config.ConfigFactory
|
||||||
|
|
||||||
|
object DispatchersSpec {
|
||||||
|
val config = """
|
||||||
|
myapp {
|
||||||
|
mydispatcher {
|
||||||
|
throughput = 17
|
||||||
|
}
|
||||||
|
}
|
||||||
|
"""
|
||||||
|
}
|
||||||
|
|
||||||
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
|
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
|
||||||
class DispatchersSpec extends AkkaSpec {
|
class DispatchersSpec extends AkkaSpec(DispatchersSpec.config) {
|
||||||
|
|
||||||
val df = system.dispatcherFactory
|
val df = system.dispatcherFactory
|
||||||
import df._
|
import df._
|
||||||
|
|
@ -34,14 +44,6 @@ class DispatchersSpec extends AkkaSpec {
|
||||||
|
|
||||||
val defaultDispatcherConfig = settings.config.getConfig("akka.actor.default-dispatcher")
|
val defaultDispatcherConfig = settings.config.getConfig("akka.actor.default-dispatcher")
|
||||||
|
|
||||||
val dispatcherConf = ConfigFactory.parseString("""
|
|
||||||
myapp {
|
|
||||||
mydispatcher {
|
|
||||||
throughput = 17
|
|
||||||
}
|
|
||||||
}
|
|
||||||
""")
|
|
||||||
|
|
||||||
lazy val allDispatchers: Map[String, Option[MessageDispatcher]] = {
|
lazy val allDispatchers: Map[String, Option[MessageDispatcher]] = {
|
||||||
validTypes.map(t ⇒ (t, from(ConfigFactory.parseMap(Map(tipe -> t).asJava).withFallback(defaultDispatcherConfig)))).toMap
|
validTypes.map(t ⇒ (t, from(ConfigFactory.parseMap(Map(tipe -> t).asJava).withFallback(defaultDispatcherConfig)))).toMap
|
||||||
}
|
}
|
||||||
|
|
@ -59,15 +61,20 @@ class DispatchersSpec extends AkkaSpec {
|
||||||
}
|
}
|
||||||
|
|
||||||
"use defined properties when newFromConfig" in {
|
"use defined properties when newFromConfig" in {
|
||||||
val dispatcher = newFromConfig("myapp.mydispatcher", defaultGlobalDispatcher, dispatcherConf)
|
val dispatcher = newFromConfig("myapp.mydispatcher")
|
||||||
dispatcher.throughput must be(17)
|
dispatcher.throughput must be(17)
|
||||||
}
|
}
|
||||||
|
|
||||||
"use specific name when newFromConfig" in {
|
"use specific name when newFromConfig" in {
|
||||||
val dispatcher = newFromConfig("myapp.mydispatcher", defaultGlobalDispatcher, dispatcherConf)
|
val dispatcher = newFromConfig("myapp.mydispatcher")
|
||||||
dispatcher.name must be("mydispatcher")
|
dispatcher.name must be("mydispatcher")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
"use default dispatcher when not configured" in {
|
||||||
|
val dispatcher = newFromConfig("myapp.other-dispatcher")
|
||||||
|
dispatcher must be === defaultGlobalDispatcher
|
||||||
|
}
|
||||||
|
|
||||||
"throw IllegalArgumentException if type does not exist" in {
|
"throw IllegalArgumentException if type does not exist" in {
|
||||||
intercept[IllegalArgumentException] {
|
intercept[IllegalArgumentException] {
|
||||||
from(ConfigFactory.parseMap(Map(tipe -> "typedoesntexist").asJava).withFallback(defaultDispatcherConfig))
|
from(ConfigFactory.parseMap(Map(tipe -> "typedoesntexist").asJava).withFallback(defaultDispatcherConfig))
|
||||||
|
|
@ -81,6 +88,13 @@ class DispatchersSpec extends AkkaSpec {
|
||||||
assert(typesAndValidators.forall(tuple ⇒ tuple._2(allDispatchers(tuple._1).get)))
|
assert(typesAndValidators.forall(tuple ⇒ tuple._2(allDispatchers(tuple._1).get)))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
"provide lookup of dispatchers by key" in {
|
||||||
|
val d1 = lookup("myapp.mydispatcher")
|
||||||
|
val d2 = lookup("myapp.mydispatcher")
|
||||||
|
d1 must be === d2
|
||||||
|
d1.name must be("mydispatcher")
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -4,10 +4,12 @@
|
||||||
|
|
||||||
package akka.dispatch
|
package akka.dispatch
|
||||||
|
|
||||||
|
import java.util.concurrent.TimeUnit
|
||||||
|
import java.util.concurrent.ConcurrentHashMap
|
||||||
|
|
||||||
import akka.actor.LocalActorRef
|
import akka.actor.LocalActorRef
|
||||||
import akka.actor.newUuid
|
import akka.actor.newUuid
|
||||||
import akka.util.{ Duration, ReflectiveAccess }
|
import akka.util.{ Duration, ReflectiveAccess }
|
||||||
import java.util.concurrent.TimeUnit
|
|
||||||
import akka.actor.ActorSystem
|
import akka.actor.ActorSystem
|
||||||
import akka.event.EventStream
|
import akka.event.EventStream
|
||||||
import akka.actor.Scheduler
|
import akka.actor.Scheduler
|
||||||
|
|
@ -29,8 +31,8 @@ case class DefaultDispatcherPrerequisites(
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* It is recommended to define the dispatcher in configuration to allow for tuning
|
* It is recommended to define the dispatcher in configuration to allow for tuning
|
||||||
* for different environments. Use the `newFromConfig` method to create a dispatcher
|
* for different environments. Use the `lookup` or `newFromConfig` method to create
|
||||||
* as specified in configuration.
|
* a dispatcher as specified in configuration.
|
||||||
*
|
*
|
||||||
* Scala API. Dispatcher factory.
|
* Scala API. Dispatcher factory.
|
||||||
* <p/>
|
* <p/>
|
||||||
|
|
@ -72,6 +74,26 @@ class Dispatchers(val settings: ActorSystem.Settings, val prerequisites: Dispatc
|
||||||
throw new ConfigurationException("Wrong configuration [akka.actor.default-dispatcher]")
|
throw new ConfigurationException("Wrong configuration [akka.actor.default-dispatcher]")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private val dispatchers = new ConcurrentHashMap[String, MessageDispatcher]
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns a dispatcher as specified in configuration, or if not defined it uses
|
||||||
|
* the default dispatcher. The same dispatcher instance is returned for subsequent
|
||||||
|
* lookups.
|
||||||
|
*/
|
||||||
|
def lookup(key: String): MessageDispatcher = {
|
||||||
|
dispatchers.get(key) match {
|
||||||
|
case null ⇒
|
||||||
|
// doesn't matter if we create a dispatcher that isn't used due to concurrent lookup
|
||||||
|
val newDispatcher = newFromConfig(key)
|
||||||
|
dispatchers.putIfAbsent(key, newDispatcher) match {
|
||||||
|
case null ⇒ newDispatcher
|
||||||
|
case existing ⇒ existing
|
||||||
|
}
|
||||||
|
case existing ⇒ existing
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Creates an thread based dispatcher serving a single actor through the same single thread.
|
* Creates an thread based dispatcher serving a single actor through the same single thread.
|
||||||
* Uses the default timeout
|
* Uses the default timeout
|
||||||
|
|
@ -176,6 +198,7 @@ class Dispatchers(val settings: ActorSystem.Settings, val prerequisites: Dispatc
|
||||||
ThreadPoolConfigDispatcherBuilder(config ⇒
|
ThreadPoolConfigDispatcherBuilder(config ⇒
|
||||||
new BalancingDispatcher(prerequisites, name, throughput, throughputDeadline, mailboxType,
|
new BalancingDispatcher(prerequisites, name, throughput, throughputDeadline, mailboxType,
|
||||||
config, settings.DispatcherDefaultShutdown), ThreadPoolConfig())
|
config, settings.DispatcherDefaultShutdown), ThreadPoolConfig())
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Creates a new dispatcher as specified in configuration
|
* Creates a new dispatcher as specified in configuration
|
||||||
* or if not defined it uses the supplied dispatcher.
|
* or if not defined it uses the supplied dispatcher.
|
||||||
|
|
|
||||||
|
|
@ -73,7 +73,7 @@ public class UntypedActorTestBase {
|
||||||
public void propsActorOf() {
|
public void propsActorOf() {
|
||||||
ActorSystem system = ActorSystem.create("MySystem");
|
ActorSystem system = ActorSystem.create("MySystem");
|
||||||
//#creating-props
|
//#creating-props
|
||||||
MessageDispatcher dispatcher = system.dispatcherFactory().newFromConfig("my-dispatcher");
|
MessageDispatcher dispatcher = system.dispatcherFactory().lookup("my-dispatcher");
|
||||||
ActorRef myActor = system.actorOf(new Props().withCreator(MyUntypedActor.class).withDispatcher(dispatcher),
|
ActorRef myActor = system.actorOf(new Props().withCreator(MyUntypedActor.class).withDispatcher(dispatcher),
|
||||||
"myactor");
|
"myactor");
|
||||||
//#creating-props
|
//#creating-props
|
||||||
|
|
|
||||||
|
|
@ -187,7 +187,7 @@ class ActorDocSpec extends AkkaSpec(Map("akka.loglevel" -> "INFO")) {
|
||||||
"creating actor with Props" in {
|
"creating actor with Props" in {
|
||||||
//#creating-props
|
//#creating-props
|
||||||
import akka.actor.Props
|
import akka.actor.Props
|
||||||
val dispatcher = system.dispatcherFactory.newFromConfig("my-dispatcher")
|
val dispatcher = system.dispatcherFactory.lookup("my-dispatcher")
|
||||||
val myActor = system.actorOf(Props[MyActor].withDispatcher(dispatcher), name = "myactor")
|
val myActor = system.actorOf(Props[MyActor].withDispatcher(dispatcher), name = "myactor")
|
||||||
//#creating-props
|
//#creating-props
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -66,14 +66,14 @@ class DispatcherDocSpec extends AkkaSpec(DispatcherDocSpec.config) {
|
||||||
"defining dispatcher" in {
|
"defining dispatcher" in {
|
||||||
//#defining-dispatcher
|
//#defining-dispatcher
|
||||||
import akka.actor.Props
|
import akka.actor.Props
|
||||||
val dispatcher = system.dispatcherFactory.newFromConfig("my-dispatcher")
|
val dispatcher = system.dispatcherFactory.lookup("my-dispatcher")
|
||||||
val myActor1 = system.actorOf(Props[MyActor].withDispatcher(dispatcher), name = "myactor1")
|
val myActor1 = system.actorOf(Props[MyActor].withDispatcher(dispatcher), name = "myactor1")
|
||||||
val myActor2 = system.actorOf(Props[MyActor].withDispatcher(dispatcher), name = "myactor2")
|
val myActor2 = system.actorOf(Props[MyActor].withDispatcher(dispatcher), name = "myactor2")
|
||||||
//#defining-dispatcher
|
//#defining-dispatcher
|
||||||
}
|
}
|
||||||
|
|
||||||
"defining dispatcher with bounded queue" in {
|
"defining dispatcher with bounded queue" in {
|
||||||
val dispatcher = system.dispatcherFactory.newFromConfig("my-dispatcher-bounded-queue")
|
val dispatcher = system.dispatcherFactory.lookup("my-dispatcher-bounded-queue")
|
||||||
}
|
}
|
||||||
|
|
||||||
"defining priority dispatcher" in {
|
"defining priority dispatcher" in {
|
||||||
|
|
@ -122,7 +122,7 @@ class DispatcherDocSpec extends AkkaSpec(DispatcherDocSpec.config) {
|
||||||
}
|
}
|
||||||
|
|
||||||
"defining balancing dispatcher" in {
|
"defining balancing dispatcher" in {
|
||||||
val dispatcher = system.dispatcherFactory.newFromConfig("my-balancing-dispatcher")
|
val dispatcher = system.dispatcherFactory.lookup("my-balancing-dispatcher")
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -46,7 +46,7 @@ There are 4 different types of message dispatchers:
|
||||||
|
|
||||||
It is recommended to define the dispatcher in :ref:`configuration` to allow for tuning for different environments.
|
It is recommended to define the dispatcher in :ref:`configuration` to allow for tuning for different environments.
|
||||||
|
|
||||||
Example of a custom event-based dispatcher, which can be created with ``system.dispatcherFactory.newFromConfig("my-dispatcher")``
|
Example of a custom event-based dispatcher, which can be fetched with ``system.dispatcherFactory.lookup("my-dispatcher")``
|
||||||
as in the example above:
|
as in the example above:
|
||||||
|
|
||||||
.. includecode:: code/DispatcherDocSpec.scala#my-dispatcher-config
|
.. includecode:: code/DispatcherDocSpec.scala#my-dispatcher-config
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue