Merge pull request #1883 from akka/wip-3720-actor-size-and-speed-regression-master-ban
=act #3720 Optimze actor creation speed and actor size
This commit is contained in:
commit
f50d293fdc
8 changed files with 423 additions and 46 deletions
|
|
@ -5,9 +5,10 @@
|
||||||
package akka
|
package akka
|
||||||
|
|
||||||
import scala.collection.immutable
|
import scala.collection.immutable
|
||||||
import java.net.{ SocketAddress, ServerSocket, DatagramSocket, InetSocketAddress }
|
import scala.concurrent.duration.Duration
|
||||||
|
import java.net.{ SocketAddress, InetSocketAddress }
|
||||||
import java.nio.channels.{ DatagramChannel, ServerSocketChannel }
|
import java.nio.channels.{ DatagramChannel, ServerSocketChannel }
|
||||||
import akka.actor.{ Terminated, ActorSystem, ActorRef }
|
import akka.actor.{ ActorSystem, ActorRef }
|
||||||
import akka.testkit.TestProbe
|
import akka.testkit.TestProbe
|
||||||
|
|
||||||
object TestUtils {
|
object TestUtils {
|
||||||
|
|
@ -33,10 +34,10 @@ object TestUtils {
|
||||||
} collect { case (socket, address) ⇒ socket.close(); address }
|
} collect { case (socket, address) ⇒ socket.close(); address }
|
||||||
}
|
}
|
||||||
|
|
||||||
def verifyActorTermination(actor: ActorRef)(implicit system: ActorSystem): Unit = {
|
def verifyActorTermination(actor: ActorRef, max: Duration = Duration.Undefined)(implicit system: ActorSystem): Unit = {
|
||||||
val watcher = TestProbe()
|
val watcher = TestProbe()
|
||||||
watcher.watch(actor)
|
watcher.watch(actor)
|
||||||
watcher.expectTerminated(actor)
|
watcher.expectTerminated(actor, max)
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
||||||
150
akka-actor-tests/src/test/scala/akka/actor/ActorPerfSpec.scala
Normal file
150
akka-actor-tests/src/test/scala/akka/actor/ActorPerfSpec.scala
Normal file
|
|
@ -0,0 +1,150 @@
|
||||||
|
/**
|
||||||
|
* Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
|
||||||
|
*/
|
||||||
|
package akka.actor
|
||||||
|
|
||||||
|
import scala.language.postfixOps
|
||||||
|
|
||||||
|
import akka.testkit.{ PerformanceTest, ImplicitSender, AkkaSpec }
|
||||||
|
import java.lang.management.ManagementFactory
|
||||||
|
import scala.concurrent.duration._
|
||||||
|
import akka.TestUtils
|
||||||
|
import scala.util.Try
|
||||||
|
|
||||||
|
object ActorPerfSpec {
|
||||||
|
|
||||||
|
case class Create(number: Int, props: () ⇒ Props)
|
||||||
|
case object Created
|
||||||
|
case object IsAlive
|
||||||
|
case object Alive
|
||||||
|
case class WaitForChildren(number: Int)
|
||||||
|
case object Waited
|
||||||
|
|
||||||
|
class EmptyActor extends Actor {
|
||||||
|
def receive = {
|
||||||
|
case IsAlive ⇒ sender ! Alive
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
class EmptyArgsActor(val foo: Int, val bar: Int) extends Actor {
|
||||||
|
def receive = {
|
||||||
|
case IsAlive ⇒ sender ! Alive
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
class Driver extends Actor {
|
||||||
|
|
||||||
|
def receive = {
|
||||||
|
case IsAlive ⇒
|
||||||
|
sender ! Alive
|
||||||
|
case Create(number, propsCreator) ⇒
|
||||||
|
for (i ← 1 to number) {
|
||||||
|
context.actorOf(propsCreator.apply())
|
||||||
|
}
|
||||||
|
sender ! Created
|
||||||
|
case WaitForChildren(number) ⇒
|
||||||
|
context.children.foreach(_ ! IsAlive)
|
||||||
|
context.become(waiting(number, sender), false)
|
||||||
|
}
|
||||||
|
|
||||||
|
def waiting(number: Int, replyTo: ActorRef): Receive = {
|
||||||
|
var current = number
|
||||||
|
|
||||||
|
{
|
||||||
|
case Alive ⇒
|
||||||
|
current -= 1
|
||||||
|
if (current == 0) {
|
||||||
|
replyTo ! Waited
|
||||||
|
context.unbecome()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
|
||||||
|
class ActorPerfSpec extends AkkaSpec("akka.actor.serialize-messages = off") with ImplicitSender {
|
||||||
|
|
||||||
|
import ActorPerfSpec._
|
||||||
|
|
||||||
|
val warmUp: Int = Integer.getInteger("akka.test.actor.ActorPerfSpec.warmUp", 50000)
|
||||||
|
val numberOfActors: Int = Integer.getInteger("akka.test.actor.ActorPerfSpec.numberOfActors", 100000)
|
||||||
|
val numberOfRepeats: Int = Integer.getInteger("akka.test.actor.ActorPerfSpec.numberOfRepeats", 2)
|
||||||
|
|
||||||
|
def testActorCreation(name: String, propsCreator: () ⇒ Props): Unit = {
|
||||||
|
val actorName = name.replaceAll("[ #\\?/!\\*%\\(\\)\\[\\]]", "_")
|
||||||
|
if (warmUp > 0)
|
||||||
|
measure(s"${actorName}_warmup", warmUp, propsCreator)
|
||||||
|
val results = for (i ← 1 to numberOfRepeats) yield measure(s"${actorName}_driver_$i", numberOfActors, propsCreator)
|
||||||
|
results.foreach {
|
||||||
|
case (duration, memory) ⇒
|
||||||
|
val micros = duration.toMicros
|
||||||
|
val avgMicros = micros.toDouble / numberOfActors
|
||||||
|
val avgMemory = memory.toDouble / numberOfActors
|
||||||
|
println(s"$name Created $numberOfActors");
|
||||||
|
println(s"In $micros us, avg: ${avgMicros}")
|
||||||
|
println(s"Footprint ${memory / 1024} KB, avg: ${avgMemory} B")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
def measure(name: String, number: Int, propsCreator: () ⇒ Props): (Duration, Long) = {
|
||||||
|
val memMx = ManagementFactory.getMemoryMXBean()
|
||||||
|
val driver = system.actorOf(Props[Driver], name)
|
||||||
|
driver ! IsAlive
|
||||||
|
expectMsg(Alive)
|
||||||
|
System.gc()
|
||||||
|
val memBefore = memMx.getHeapMemoryUsage
|
||||||
|
val start = System.nanoTime()
|
||||||
|
driver ! Create(number, propsCreator)
|
||||||
|
expectMsgPF(15 seconds, s"$name waiting for Created") { case Created ⇒ }
|
||||||
|
val stop = System.nanoTime()
|
||||||
|
val duration = Duration.fromNanos(stop - start)
|
||||||
|
driver ! WaitForChildren(number)
|
||||||
|
expectMsgPF(15 seconds, s"$name waiting for Waited") { case Waited ⇒ }
|
||||||
|
System.gc()
|
||||||
|
val memAfter = memMx.getHeapMemoryUsage
|
||||||
|
driver ! PoisonPill
|
||||||
|
TestUtils.verifyActorTermination(driver, 15 seconds)
|
||||||
|
(duration, memAfter.getUsed - memBefore.getUsed)
|
||||||
|
}
|
||||||
|
|
||||||
|
"Actor creation with actorFor" must {
|
||||||
|
|
||||||
|
"measure time for Props[EmptyActor] with new Props" taggedAs PerformanceTest in {
|
||||||
|
testActorCreation("Props[EmptyActor] new", () ⇒ { Props[EmptyActor] })
|
||||||
|
}
|
||||||
|
|
||||||
|
"measure time for Props[EmptyActor] with same Props" taggedAs PerformanceTest in {
|
||||||
|
val props = Props[EmptyActor]
|
||||||
|
testActorCreation("Props[EmptyActor] same", () ⇒ { props })
|
||||||
|
}
|
||||||
|
|
||||||
|
"measure time for Props(new EmptyActor) with new Props" taggedAs PerformanceTest in {
|
||||||
|
testActorCreation("Props(new EmptyActor) new", () ⇒ { Props(new EmptyActor) })
|
||||||
|
}
|
||||||
|
|
||||||
|
"measure time for Props(new EmptyActor) with same Props" taggedAs PerformanceTest in {
|
||||||
|
val props = Props(new EmptyActor)
|
||||||
|
testActorCreation("Props(new EmptyActor) same", () ⇒ { props })
|
||||||
|
}
|
||||||
|
|
||||||
|
"measure time for Props(classOf[EmptyArgsActor], ...) with new Props" taggedAs PerformanceTest in {
|
||||||
|
testActorCreation("Props(classOf[EmptyArgsActor], ...) new", () ⇒ { Props(classOf[EmptyArgsActor], 4711, 1729) })
|
||||||
|
}
|
||||||
|
|
||||||
|
"measure time for Props(classOf[EmptyArgsActor], ...) with same Props" taggedAs PerformanceTest in {
|
||||||
|
val props = Props(classOf[EmptyArgsActor], 4711, 1729)
|
||||||
|
testActorCreation("Props(classOf[EmptyArgsActor], ...) same", () ⇒ { props })
|
||||||
|
}
|
||||||
|
|
||||||
|
"measure time for Props(new EmptyArgsActor(...)) with new Props" taggedAs PerformanceTest in {
|
||||||
|
testActorCreation("Props(new EmptyArgsActor(...)) new", () ⇒ { Props(new EmptyArgsActor(4711, 1729)) })
|
||||||
|
}
|
||||||
|
|
||||||
|
"measure time for Props(new EmptyArgsActor(...)) with same Props" taggedAs PerformanceTest in {
|
||||||
|
val props = Props(new EmptyArgsActor(4711, 1729))
|
||||||
|
testActorCreation("Props(new EmptyArgsActor(...)) same", () ⇒ { props })
|
||||||
|
}
|
||||||
|
}
|
||||||
|
override def expectedTestDuration = 2 minutes
|
||||||
|
}
|
||||||
|
|
@ -66,7 +66,7 @@ object Props {
|
||||||
* Scala API: Returns a Props that has default values except for "creator" which will be a function that creates an instance
|
* Scala API: Returns a Props that has default values except for "creator" which will be a function that creates an instance
|
||||||
* of the supplied type using the default constructor.
|
* of the supplied type using the default constructor.
|
||||||
*/
|
*/
|
||||||
def apply[T <: Actor: ClassTag](): Props = apply(defaultDeploy, implicitly[ClassTag[T]].runtimeClass, Vector.empty)
|
def apply[T <: Actor: ClassTag](): Props = apply(defaultDeploy, implicitly[ClassTag[T]].runtimeClass, List.empty)
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Scala API: Returns a Props that has default values except for "creator" which will be a function that creates an instance
|
* Scala API: Returns a Props that has default values except for "creator" which will be a function that creates an instance
|
||||||
|
|
@ -120,13 +120,13 @@ object Props {
|
||||||
/**
|
/**
|
||||||
* Scala API: create a Props given a class and its constructor arguments.
|
* Scala API: create a Props given a class and its constructor arguments.
|
||||||
*/
|
*/
|
||||||
def apply(clazz: Class[_], args: Any*): Props = apply(defaultDeploy, clazz, args.toVector)
|
def apply(clazz: Class[_], args: Any*): Props = apply(defaultDeploy, clazz, args.toList)
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Java API: create a Props given a class and its constructor arguments.
|
* Java API: create a Props given a class and its constructor arguments.
|
||||||
*/
|
*/
|
||||||
@varargs
|
@varargs
|
||||||
def create(clazz: Class[_], args: AnyRef*): Props = apply(defaultDeploy, clazz, args.toVector)
|
def create(clazz: Class[_], args: AnyRef*): Props = apply(defaultDeploy, clazz, args.toList)
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Create new Props from the given [[akka.japi.Creator]].
|
* Create new Props from the given [[akka.japi.Creator]].
|
||||||
|
|
@ -176,34 +176,28 @@ final case class Props(deploy: Deploy, clazz: Class[_], args: immutable.Seq[Any]
|
||||||
|
|
||||||
// derived property, does not need to be serialized
|
// derived property, does not need to be serialized
|
||||||
@transient
|
@transient
|
||||||
private[this] var _constructor: Constructor[_] = _
|
private[this] var _producer: IndirectActorProducer = _
|
||||||
|
|
||||||
// derived property, does not need to be serialized
|
// derived property, does not need to be serialized
|
||||||
@transient
|
@transient
|
||||||
private[this] var _cachedActorClass: Class[_ <: Actor] = _
|
private[this] var _cachedActorClass: Class[_ <: Actor] = _
|
||||||
|
|
||||||
private[this] def constructor: Constructor[_] = {
|
private[this] def producer: IndirectActorProducer = {
|
||||||
if (_constructor eq null)
|
if (_producer eq null)
|
||||||
_constructor = Reflect.findConstructor(clazz, args)
|
_producer = IndirectActorProducer(clazz, args)
|
||||||
|
|
||||||
_constructor
|
_producer
|
||||||
}
|
}
|
||||||
|
|
||||||
private[this] def cachedActorClass: Class[_ <: Actor] = {
|
private[this] def cachedActorClass: Class[_ <: Actor] = {
|
||||||
if (_cachedActorClass eq null)
|
if (_cachedActorClass eq null)
|
||||||
_cachedActorClass =
|
_cachedActorClass = producer.actorClass
|
||||||
if (classOf[IndirectActorProducer].isAssignableFrom(clazz))
|
|
||||||
Reflect.instantiate(constructor, args).asInstanceOf[IndirectActorProducer].actorClass
|
|
||||||
else if (classOf[Actor].isAssignableFrom(clazz))
|
|
||||||
clazz.asInstanceOf[Class[_ <: Actor]]
|
|
||||||
else
|
|
||||||
throw new IllegalArgumentException(s"unknown actor creator [$clazz]")
|
|
||||||
|
|
||||||
_cachedActorClass
|
_cachedActorClass
|
||||||
}
|
}
|
||||||
|
|
||||||
// validate constructor signature; throws IllegalArgumentException if invalid
|
// validate producer constructor signature; throws IllegalArgumentException if invalid
|
||||||
constructor
|
producer
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* No-args constructor that sets all the default values.
|
* No-args constructor that sets all the default values.
|
||||||
|
|
@ -211,7 +205,7 @@ final case class Props(deploy: Deploy, clazz: Class[_], args: immutable.Seq[Any]
|
||||||
* @deprecated use `Props.create(clazz, args ...)` instead
|
* @deprecated use `Props.create(clazz, args ...)` instead
|
||||||
*/
|
*/
|
||||||
@deprecated("use Props.create()", "2.2")
|
@deprecated("use Props.create()", "2.2")
|
||||||
def this() = this(Props.defaultDeploy, classOf[CreatorFunctionConsumer], Vector(Props.defaultCreator))
|
def this() = this(Props.defaultDeploy, classOf[CreatorFunctionConsumer], List(Props.defaultCreator))
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Java API: create Props from an [[UntypedActorFactory]]
|
* Java API: create Props from an [[UntypedActorFactory]]
|
||||||
|
|
@ -222,7 +216,7 @@ final case class Props(deploy: Deploy, clazz: Class[_], args: immutable.Seq[Any]
|
||||||
* non-serializable
|
* non-serializable
|
||||||
*/
|
*/
|
||||||
@deprecated("use Props.create()", "2.2")
|
@deprecated("use Props.create()", "2.2")
|
||||||
def this(factory: UntypedActorFactory) = this(Props.defaultDeploy, classOf[UntypedActorFactoryConsumer], Vector(factory))
|
def this(factory: UntypedActorFactory) = this(Props.defaultDeploy, classOf[UntypedActorFactoryConsumer], List(factory))
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Java API: create Props from a given [[java.lang.Class]]
|
* Java API: create Props from a given [[java.lang.Class]]
|
||||||
|
|
@ -231,7 +225,7 @@ final case class Props(deploy: Deploy, clazz: Class[_], args: immutable.Seq[Any]
|
||||||
* another API
|
* another API
|
||||||
*/
|
*/
|
||||||
@deprecated("use Props.create()", "2.2")
|
@deprecated("use Props.create()", "2.2")
|
||||||
def this(actorClass: Class[_ <: Actor]) = this(Props.defaultDeploy, actorClass, Vector.empty)
|
def this(actorClass: Class[_ <: Actor]) = this(Props.defaultDeploy, actorClass, List.empty)
|
||||||
|
|
||||||
@deprecated("There is no use-case for this method anymore", "2.2")
|
@deprecated("There is no use-case for this method anymore", "2.2")
|
||||||
def creator: () ⇒ Actor = newActor
|
def creator: () ⇒ Actor = newActor
|
||||||
|
|
@ -327,11 +321,7 @@ final case class Props(deploy: Deploy, clazz: Class[_], args: immutable.Seq[Any]
|
||||||
* used within the implementation of [[IndirectActorProducer#produce]].
|
* used within the implementation of [[IndirectActorProducer#produce]].
|
||||||
*/
|
*/
|
||||||
private[akka] def newActor(): Actor = {
|
private[akka] def newActor(): Actor = {
|
||||||
Reflect.instantiate(constructor, args) match {
|
producer.produce()
|
||||||
case a: Actor ⇒ a
|
|
||||||
case i: IndirectActorProducer ⇒ i.produce()
|
|
||||||
case _ ⇒ throw new IllegalArgumentException(s"unknown actor creator [$clazz]")
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -362,6 +352,37 @@ trait IndirectActorProducer {
|
||||||
def actorClass: Class[_ <: Actor]
|
def actorClass: Class[_ <: Actor]
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private[akka] object IndirectActorProducer {
|
||||||
|
val UntypedActorFactoryConsumerClass = classOf[UntypedActorFactoryConsumer]
|
||||||
|
val CreatorFunctionConsumerClass = classOf[CreatorFunctionConsumer]
|
||||||
|
val CreatorConsumerClass = classOf[CreatorConsumer]
|
||||||
|
val TypedCreatorFunctionConsumerClass = classOf[TypedCreatorFunctionConsumer]
|
||||||
|
|
||||||
|
def apply(clazz: Class[_], args: immutable.Seq[Any]): IndirectActorProducer = {
|
||||||
|
if (classOf[IndirectActorProducer].isAssignableFrom(clazz)) {
|
||||||
|
def get1stArg[T]: T = args.head.asInstanceOf[T]
|
||||||
|
def get2ndArg[T]: T = args.tail.head.asInstanceOf[T]
|
||||||
|
// The cost of doing reflection to create these for every props
|
||||||
|
// is rather high, so we match on them and do new instead
|
||||||
|
clazz match {
|
||||||
|
case TypedCreatorFunctionConsumerClass ⇒
|
||||||
|
new TypedCreatorFunctionConsumer(get1stArg, get2ndArg)
|
||||||
|
case UntypedActorFactoryConsumerClass ⇒
|
||||||
|
new UntypedActorFactoryConsumer(get1stArg)
|
||||||
|
case CreatorFunctionConsumerClass ⇒
|
||||||
|
new CreatorFunctionConsumer(get1stArg)
|
||||||
|
case CreatorConsumerClass ⇒
|
||||||
|
new CreatorConsumer(get1stArg, get2ndArg)
|
||||||
|
case _ ⇒
|
||||||
|
Reflect.instantiate(clazz, args).asInstanceOf[IndirectActorProducer]
|
||||||
|
}
|
||||||
|
} else if (classOf[Actor].isAssignableFrom(clazz)) {
|
||||||
|
if (args.isEmpty) new NoArgsReflectConstructor(clazz.asInstanceOf[Class[_ <: Actor]])
|
||||||
|
else new ArgsReflectConstructor(clazz.asInstanceOf[Class[_ <: Actor]], args)
|
||||||
|
} else throw new IllegalArgumentException(s"unknown actor creator [$clazz]")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* INTERNAL API
|
* INTERNAL API
|
||||||
*/
|
*/
|
||||||
|
|
@ -393,3 +414,21 @@ private[akka] class TypedCreatorFunctionConsumer(clz: Class[_ <: Actor], creator
|
||||||
override def actorClass = clz
|
override def actorClass = clz
|
||||||
override def produce() = creator()
|
override def produce() = creator()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* INTERNAL API
|
||||||
|
*/
|
||||||
|
private[akka] class ArgsReflectConstructor(clz: Class[_ <: Actor], args: immutable.Seq[Any]) extends IndirectActorProducer {
|
||||||
|
private[this] val constructor: Constructor[_] = Reflect.findConstructor(clz, args)
|
||||||
|
override def actorClass = clz
|
||||||
|
override def produce() = Reflect.instantiate(constructor, args).asInstanceOf[Actor]
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* INTERNAL API
|
||||||
|
*/
|
||||||
|
private[akka] class NoArgsReflectConstructor(clz: Class[_ <: Actor]) extends IndirectActorProducer {
|
||||||
|
Reflect.findConstructor(clz, List.empty)
|
||||||
|
override def actorClass = clz
|
||||||
|
override def produce() = Reflect.instantiate(clz)
|
||||||
|
}
|
||||||
|
|
|
||||||
|
|
@ -18,9 +18,6 @@ import scala.concurrent.ExecutionContext
|
||||||
import scala.concurrent.duration.FiniteDuration
|
import scala.concurrent.duration.FiniteDuration
|
||||||
import scala.util.control.NonFatal
|
import scala.util.control.NonFatal
|
||||||
import scala.util.Try
|
import scala.util.Try
|
||||||
import scala.util.Failure
|
|
||||||
import akka.util.Reflect
|
|
||||||
import java.lang.reflect.ParameterizedType
|
|
||||||
|
|
||||||
final case class Envelope private (val message: Any, val sender: ActorRef)
|
final case class Envelope private (val message: Any, val sender: ActorRef)
|
||||||
|
|
||||||
|
|
@ -308,7 +305,9 @@ abstract class ExecutorServiceConfigurator(config: Config, prerequisites: Dispat
|
||||||
/**
|
/**
|
||||||
* Base class to be used for hooking in new dispatchers into Dispatchers.
|
* Base class to be used for hooking in new dispatchers into Dispatchers.
|
||||||
*/
|
*/
|
||||||
abstract class MessageDispatcherConfigurator(val config: Config, val prerequisites: DispatcherPrerequisites) {
|
abstract class MessageDispatcherConfigurator(_config: Config, val prerequisites: DispatcherPrerequisites) {
|
||||||
|
|
||||||
|
val config: Config = new CachingConfig(_config)
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Returns an instance of MessageDispatcher given the configuration.
|
* Returns an instance of MessageDispatcher given the configuration.
|
||||||
|
|
|
||||||
171
akka-actor/src/main/scala/akka/dispatch/CachingConfig.scala
Normal file
171
akka-actor/src/main/scala/akka/dispatch/CachingConfig.scala
Normal file
|
|
@ -0,0 +1,171 @@
|
||||||
|
/**
|
||||||
|
* Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
|
||||||
|
*/
|
||||||
|
|
||||||
|
package akka.dispatch
|
||||||
|
|
||||||
|
import com.typesafe.config._
|
||||||
|
import java.util.concurrent.ConcurrentHashMap
|
||||||
|
import scala.util.{ Failure, Success, Try }
|
||||||
|
|
||||||
|
/**
|
||||||
|
* INTERNAL API
|
||||||
|
*/
|
||||||
|
private[akka] object CachingConfig {
|
||||||
|
val emptyConfig = ConfigFactory.empty()
|
||||||
|
|
||||||
|
sealed abstract trait PathEntry {
|
||||||
|
val valid: Boolean
|
||||||
|
val exists: Boolean
|
||||||
|
val config: Config
|
||||||
|
}
|
||||||
|
case class ValuePathEntry(valid: Boolean, exists: Boolean, config: Config = emptyConfig) extends PathEntry
|
||||||
|
case class StringPathEntry(valid: Boolean, exists: Boolean, config: Config, value: String) extends PathEntry
|
||||||
|
|
||||||
|
val invalidPathEntry = ValuePathEntry(false, true)
|
||||||
|
val nonExistingPathEntry = ValuePathEntry(true, false)
|
||||||
|
val emptyPathEntry = ValuePathEntry(true, true)
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* INTERNAL API
|
||||||
|
*
|
||||||
|
* A CachingConfig is a Config that wraps another Config and is used to cache path lookup and string
|
||||||
|
* retrieval, which we happen to do a lot in some critical paths of the actor creation and mailbox
|
||||||
|
* selection code.
|
||||||
|
*
|
||||||
|
* All other Config operations are delegated to the wrapped Config.
|
||||||
|
*/
|
||||||
|
private[akka] class CachingConfig(_config: Config) extends Config {
|
||||||
|
|
||||||
|
import CachingConfig._
|
||||||
|
|
||||||
|
private val (config: Config, entryMap: ConcurrentHashMap[String, PathEntry]) = _config match {
|
||||||
|
case cc: CachingConfig ⇒ (cc.config, cc.entryMap)
|
||||||
|
case _ ⇒ (_config, new ConcurrentHashMap[String, PathEntry])
|
||||||
|
}
|
||||||
|
|
||||||
|
private def getPathEntry(path: String): PathEntry = entryMap.get(path) match {
|
||||||
|
case null ⇒
|
||||||
|
val ne = Try { config.hasPath(path) } match {
|
||||||
|
case Failure(e) ⇒ invalidPathEntry
|
||||||
|
case Success(false) ⇒ nonExistingPathEntry
|
||||||
|
case _ ⇒
|
||||||
|
Try { config.getValue(path) } match {
|
||||||
|
case Failure(e) ⇒
|
||||||
|
emptyPathEntry
|
||||||
|
case Success(v) ⇒
|
||||||
|
if (v.valueType() == ConfigValueType.STRING)
|
||||||
|
StringPathEntry(true, true, v.atKey("cached"), v.unwrapped().asInstanceOf[String])
|
||||||
|
else
|
||||||
|
ValuePathEntry(true, true, v.atKey("cached"))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
entryMap.putIfAbsent(path, ne) match {
|
||||||
|
case null ⇒ ne
|
||||||
|
case e ⇒ e
|
||||||
|
}
|
||||||
|
|
||||||
|
case e ⇒ e
|
||||||
|
}
|
||||||
|
|
||||||
|
def checkValid(reference: Config, restrictToPaths: String*) {
|
||||||
|
config.checkValid(reference, restrictToPaths: _*)
|
||||||
|
}
|
||||||
|
|
||||||
|
def root() = config.root()
|
||||||
|
|
||||||
|
def origin() = config.origin()
|
||||||
|
|
||||||
|
def withFallback(other: ConfigMergeable) = new CachingConfig(config.withFallback(other))
|
||||||
|
|
||||||
|
def resolve() = resolve(ConfigResolveOptions.defaults())
|
||||||
|
|
||||||
|
def resolve(options: ConfigResolveOptions) = {
|
||||||
|
val resolved = config.resolve(options)
|
||||||
|
if (resolved eq config) this
|
||||||
|
else new CachingConfig(resolved)
|
||||||
|
}
|
||||||
|
|
||||||
|
def hasPath(path: String) = {
|
||||||
|
val entry = getPathEntry(path)
|
||||||
|
if (entry.valid)
|
||||||
|
entry.exists
|
||||||
|
else // run the real code to get proper exceptions
|
||||||
|
config.hasPath(path)
|
||||||
|
}
|
||||||
|
|
||||||
|
def isEmpty = config.isEmpty
|
||||||
|
|
||||||
|
def entrySet() = config.entrySet()
|
||||||
|
|
||||||
|
def getBoolean(path: String) = config.getBoolean(path)
|
||||||
|
|
||||||
|
def getNumber(path: String) = config.getNumber(path)
|
||||||
|
|
||||||
|
def getInt(path: String) = config.getInt(path)
|
||||||
|
|
||||||
|
def getLong(path: String) = config.getLong(path)
|
||||||
|
|
||||||
|
def getDouble(path: String) = config.getDouble(path)
|
||||||
|
|
||||||
|
def getString(path: String) = {
|
||||||
|
getPathEntry(path) match {
|
||||||
|
case StringPathEntry(_, _, _, string) ⇒
|
||||||
|
string
|
||||||
|
case e ⇒ e.config.getString("cached")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
def getObject(path: String) = config.getObject(path)
|
||||||
|
|
||||||
|
def getConfig(path: String) = config.getConfig(path)
|
||||||
|
|
||||||
|
def getAnyRef(path: String) = config.getAnyRef(path)
|
||||||
|
|
||||||
|
def getValue(path: String) = config.getValue(path)
|
||||||
|
|
||||||
|
def getBytes(path: String) = config.getBytes(path)
|
||||||
|
|
||||||
|
def getMilliseconds(path: String) = config.getMilliseconds(path)
|
||||||
|
|
||||||
|
def getNanoseconds(path: String) = config.getNanoseconds(path)
|
||||||
|
|
||||||
|
def getList(path: String) = config.getList(path)
|
||||||
|
|
||||||
|
def getBooleanList(path: String) = config.getBooleanList(path)
|
||||||
|
|
||||||
|
def getNumberList(path: String) = config.getNumberList(path)
|
||||||
|
|
||||||
|
def getIntList(path: String) = config.getIntList(path)
|
||||||
|
|
||||||
|
def getLongList(path: String) = config.getLongList(path)
|
||||||
|
|
||||||
|
def getDoubleList(path: String) = config.getDoubleList(path)
|
||||||
|
|
||||||
|
def getStringList(path: String) = config.getStringList(path)
|
||||||
|
|
||||||
|
def getObjectList(path: String) = config.getObjectList(path)
|
||||||
|
|
||||||
|
def getConfigList(path: String) = config.getConfigList(path)
|
||||||
|
|
||||||
|
def getAnyRefList(path: String) = config.getAnyRefList(path)
|
||||||
|
|
||||||
|
def getBytesList(path: String) = config.getBytesList(path)
|
||||||
|
|
||||||
|
def getMillisecondsList(path: String) = config.getMillisecondsList(path)
|
||||||
|
|
||||||
|
def getNanosecondsList(path: String) = config.getNanosecondsList(path)
|
||||||
|
|
||||||
|
def withOnlyPath(path: String) = new CachingConfig(config.withOnlyPath(path))
|
||||||
|
|
||||||
|
def withoutPath(path: String) = new CachingConfig(config.withoutPath(path))
|
||||||
|
|
||||||
|
def atPath(path: String) = new CachingConfig(config.atPath(path))
|
||||||
|
|
||||||
|
def atKey(key: String) = new CachingConfig(config.atKey(key))
|
||||||
|
|
||||||
|
def withValue(path: String, value: ConfigValue) = new CachingConfig(config.withValue(path, value))
|
||||||
|
}
|
||||||
|
|
||||||
|
|
@ -56,6 +56,8 @@ class Dispatchers(val settings: ActorSystem.Settings, val prerequisites: Dispatc
|
||||||
|
|
||||||
import Dispatchers._
|
import Dispatchers._
|
||||||
|
|
||||||
|
val cachingConfig = new CachingConfig(settings.config)
|
||||||
|
|
||||||
val defaultDispatcherConfig: Config =
|
val defaultDispatcherConfig: Config =
|
||||||
idConfig(DefaultDispatcherId).withFallback(settings.config.getConfig(DefaultDispatcherId))
|
idConfig(DefaultDispatcherId).withFallback(settings.config.getConfig(DefaultDispatcherId))
|
||||||
|
|
||||||
|
|
@ -80,7 +82,7 @@ class Dispatchers(val settings: ActorSystem.Settings, val prerequisites: Dispatc
|
||||||
* using this dispatcher, because the details can only be checked by trying
|
* using this dispatcher, because the details can only be checked by trying
|
||||||
* to instantiate it, which might be undesirable when just checking.
|
* to instantiate it, which might be undesirable when just checking.
|
||||||
*/
|
*/
|
||||||
def hasDispatcher(id: String): Boolean = settings.config.hasPath(id)
|
def hasDispatcher(id: String): Boolean = cachingConfig.hasPath(id)
|
||||||
|
|
||||||
private def lookupConfigurator(id: String): MessageDispatcherConfigurator = {
|
private def lookupConfigurator(id: String): MessageDispatcherConfigurator = {
|
||||||
dispatcherConfigurators.get(id) match {
|
dispatcherConfigurators.get(id) match {
|
||||||
|
|
@ -89,7 +91,7 @@ class Dispatchers(val settings: ActorSystem.Settings, val prerequisites: Dispatc
|
||||||
// That shouldn't happen often and in case it does the actual ExecutorService isn't
|
// That shouldn't happen often and in case it does the actual ExecutorService isn't
|
||||||
// created until used, i.e. cheap.
|
// created until used, i.e. cheap.
|
||||||
val newConfigurator =
|
val newConfigurator =
|
||||||
if (settings.config.hasPath(id)) configuratorFrom(config(id))
|
if (cachingConfig.hasPath(id)) configuratorFrom(config(id))
|
||||||
else throw new ConfigurationException(s"Dispatcher [$id] not configured")
|
else throw new ConfigurationException(s"Dispatcher [$id] not configured")
|
||||||
|
|
||||||
dispatcherConfigurators.putIfAbsent(id, newConfigurator) match {
|
dispatcherConfigurators.putIfAbsent(id, newConfigurator) match {
|
||||||
|
|
|
||||||
|
|
@ -8,6 +8,7 @@ import scala.collection.immutable
|
||||||
import java.lang.reflect.Type
|
import java.lang.reflect.Type
|
||||||
import scala.annotation.tailrec
|
import scala.annotation.tailrec
|
||||||
import java.lang.reflect.ParameterizedType
|
import java.lang.reflect.ParameterizedType
|
||||||
|
import scala.util.Try
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Collection of internal reflection utilities which may or may not be
|
* Collection of internal reflection utilities which may or may not be
|
||||||
|
|
@ -80,17 +81,30 @@ private[akka] object Reflect {
|
||||||
val argClasses = args map safeGetClass mkString ", "
|
val argClasses = args map safeGetClass mkString ", "
|
||||||
throw new IllegalArgumentException(s"$msg found on $clazz for arguments [$argClasses]")
|
throw new IllegalArgumentException(s"$msg found on $clazz for arguments [$argClasses]")
|
||||||
}
|
}
|
||||||
val candidates =
|
|
||||||
clazz.getDeclaredConstructors filter (c ⇒
|
val constructor: Constructor[T] =
|
||||||
c.getParameterTypes.length == args.length &&
|
if (args.isEmpty) Try { clazz.getDeclaredConstructor() } getOrElse (null)
|
||||||
(c.getParameterTypes zip args forall {
|
else {
|
||||||
case (found, required) ⇒
|
val length = args.length
|
||||||
found.isInstance(required) || BoxedType(found).isInstance(required) ||
|
val candidates =
|
||||||
(required == null && !found.isPrimitive)
|
clazz.getDeclaredConstructors.asInstanceOf[Array[Constructor[T]]].iterator filter { c ⇒
|
||||||
}))
|
val parameterTypes = c.getParameterTypes
|
||||||
if (candidates.size == 1) candidates.head.asInstanceOf[Constructor[T]]
|
parameterTypes.length == length &&
|
||||||
else if (candidates.size > 1) error("multiple matching constructors")
|
(parameterTypes.iterator zip args.iterator forall {
|
||||||
else error("no matching constructor")
|
case (found, required) ⇒
|
||||||
|
found.isInstance(required) || BoxedType(found).isInstance(required) ||
|
||||||
|
(required == null && !found.isPrimitive)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
if (candidates.hasNext) {
|
||||||
|
val cstrtr = candidates.next()
|
||||||
|
if (candidates.hasNext) error("multiple matching constructors")
|
||||||
|
else cstrtr
|
||||||
|
} else null
|
||||||
|
}
|
||||||
|
|
||||||
|
if (constructor == null) error("no matching constructor")
|
||||||
|
else constructor
|
||||||
}
|
}
|
||||||
|
|
||||||
private def safeGetClass(a: Any): Class[_] =
|
private def safeGetClass(a: Any): Class[_] =
|
||||||
|
|
|
||||||
|
|
@ -7,3 +7,4 @@ import org.scalatest.Tag
|
||||||
|
|
||||||
object TimingTest extends Tag("timing")
|
object TimingTest extends Tag("timing")
|
||||||
object LongRunningTest extends Tag("long-running")
|
object LongRunningTest extends Tag("long-running")
|
||||||
|
object PerformanceTest extends Tag("performance")
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue