Merge branch 'master' into wip-1377-context-patriknw

This commit is contained in:
Patrik Nordwall 2011-12-07 08:07:58 +01:00
commit 1a93ddb7c0
10 changed files with 161 additions and 49 deletions

View file

@ -276,7 +276,7 @@ class ActorRefSpec extends AkkaSpec with DefaultTimeout {
(intercept[java.lang.IllegalStateException] { (intercept[java.lang.IllegalStateException] {
in.readObject in.readObject
}).getMessage must be === "Trying to deserialize a serialized ActorRef without an ActorSystem in scope." + }).getMessage must be === "Trying to deserialize a serialized ActorRef without an ActorSystem in scope." +
" Use akka.serialization.Serialization.currentSystem.withValue(system) { ... }" " Use 'akka.serialization.Serialization.currentSystem.withValue(system) { ... }'"
} }
"must return deadLetters on deserialize if not present in actor hierarchy (and remoting is not enabled)" in { "must return deadLetters on deserialize if not present in actor hierarchy (and remoting is not enabled)" in {

View file

@ -105,8 +105,13 @@ akka {
# A FQCN to a class inheriting MessageDispatcherConfigurator with a no-arg visible constructor # A FQCN to a class inheriting MessageDispatcherConfigurator with a no-arg visible constructor
name = "DefaultDispatcher" # Optional, will be a generated UUID if omitted name = "DefaultDispatcher" # Optional, will be a generated UUID if omitted
keep-alive-time = 60s # Keep alive time for threads keep-alive-time = 60s # Keep alive time for threads
core-pool-size-min = 8 # minimum number of threads to cap factor-based core number to
core-pool-size-factor = 8.0 # No of core threads ... ceil(available processors * factor) core-pool-size-factor = 8.0 # No of core threads ... ceil(available processors * factor)
core-pool-size-max = 4096 # maximum number of threads to cap factor-based number to
# Hint: max-pool-size is only used for bounded task queues
max-pool-size-min = 8 # minimum number of threads to cap factor-based max number to
max-pool-size-factor = 8.0 # Max no of threads ... ceil(available processors * factor) max-pool-size-factor = 8.0 # Max no of threads ... ceil(available processors * factor)
max-pool-size-max = 4096 # maximum number of threads to cap factor-based max number to
task-queue-size = -1 # Specifies the bounded capacity of the task queue (< 1 == unbounded) task-queue-size = -1 # Specifies the bounded capacity of the task queue (< 1 == unbounded)
task-queue-type = "linked" # Specifies which type of task queue will be used, can be "array" or "linked" (default) task-queue-type = "linked" # Specifies which type of task queue will be used, can be "array" or "linked" (default)
allow-core-timeout = on # Allow core threads to time out allow-core-timeout = on # Allow core threads to time out

View file

@ -86,7 +86,8 @@ class ActorKilledException private[akka] (message: String, cause: Throwable)
case class InvalidActorNameException(message: String) extends AkkaException(message) case class InvalidActorNameException(message: String) extends AkkaException(message)
case class ActorInitializationException private[akka] (actor: ActorRef, message: String, cause: Throwable = null) case class ActorInitializationException private[akka] (actor: ActorRef, message: String, cause: Throwable = null)
extends AkkaException(message, cause) with NoStackTrace { extends AkkaException(message, cause)
with NoStackTrace {
def this(msg: String) = this(null, msg, null); def this(msg: String) = this(null, msg, null);
} }
@ -102,11 +103,13 @@ class InvalidMessageException private[akka] (message: String, cause: Throwable =
} }
case class DeathPactException private[akka] (dead: ActorRef) case class DeathPactException private[akka] (dead: ActorRef)
extends AkkaException("monitored actor " + dead + " terminated") extends AkkaException("Monitored actor [" + dead + "] terminated")
with NoStackTrace with NoStackTrace
// must not pass InterruptedException to other threads // must not pass InterruptedException to other threads
case class ActorInterruptedException private[akka] (cause: Throwable) extends AkkaException(cause.getMessage, cause) with NoStackTrace case class ActorInterruptedException private[akka] (cause: Throwable)
extends AkkaException(cause.getMessage, cause)
with NoStackTrace
/** /**
* This message is thrown by default when an Actors behavior doesn't match a message * This message is thrown by default when an Actors behavior doesn't match a message
@ -117,7 +120,7 @@ case class UnhandledMessageException(msg: Any, ref: ActorRef = null) extends Exc
// constructor with 'null' ActorRef needed to work with client instantiation of remote exception // constructor with 'null' ActorRef needed to work with client instantiation of remote exception
override def getMessage = override def getMessage =
if (ref ne null) "Actor %s does not handle [%s]".format(ref, msg) if (ref ne null) "Actor [%s] does not handle [%s]".format(ref, msg)
else "Actor does not handle [%s]".format(msg) else "Actor does not handle [%s]".format(msg)
override def fillInStackTrace() = this //Don't waste cycles generating stack trace override def fillInStackTrace() = this //Don't waste cycles generating stack trace
@ -169,7 +172,7 @@ object Actor {
object emptyBehavior extends Receive { object emptyBehavior extends Receive {
def isDefinedAt(x: Any) = false def isDefinedAt(x: Any) = false
def apply(x: Any) = throw new UnsupportedOperationException("empty behavior apply()") def apply(x: Any) = throw new UnsupportedOperationException("Empty behavior apply()")
} }
} }

View file

@ -17,6 +17,31 @@ import akka.japi.Procedure
* The actor context - the view of the actor cell from the actor. * The actor context - the view of the actor cell from the actor.
* Exposes contextual information for the actor and the current message. * Exposes contextual information for the actor and the current message.
* TODO: everything here for current compatibility - could be limited more * TODO: everything here for current compatibility - could be limited more
*
* There are several possibilities for creating actors (see [[akka.actor.Props]]
* for details on `props`):
*
* {{{
* // Java or Scala
* context.actorOf(props, "name")
* context.actorOf(props)
*
* // Scala
* context.actorOf[MyActor]("name")
* context.actorOf[MyActor]
* context.actorOf(new MyActor(...))
*
* // Java
* context.actorOf(classOf[MyActor]);
* context.actorOf(new Creator<MyActor>() {
* public MyActor create() { ... }
* });
* context.actorOf(new Creator<MyActor>() {
* public MyActor create() { ... }
* }, "name");
* }}}
*
* Where no name is given explicitly, one will be automatically generated.
*/ */
trait ActorContext extends ActorRefFactory { trait ActorContext extends ActorRefFactory {
@ -177,14 +202,20 @@ private[akka] class ActorCell(
var childrenRefs: TreeMap[String, ChildRestartStats] = emptyChildrenRefs var childrenRefs: TreeMap[String, ChildRestartStats] = emptyChildrenRefs
private def _actorOf(props: Props, name: String): ActorRef = {
val actor = provider.actorOf(systemImpl, props, guardian, name, false)
childrenRefs = childrenRefs.updated(name, ChildRestartStats(actor))
actor
}
def actorOf(props: Props): ActorRef = _actorOf(props, randomName())
def actorOf(props: Props, name: String): ActorRef = { def actorOf(props: Props, name: String): ActorRef = {
if (name == null || name == "" || name.charAt(0) == '$') if (name == null || name == "" || name.charAt(0) == '$')
throw new InvalidActorNameException("actor name must not be null, empty or start with $") throw new InvalidActorNameException("actor name must not be null, empty or start with $")
if (childrenRefs contains name) if (childrenRefs contains name)
throw new InvalidActorNameException("actor name " + name + " is not unique!") throw new InvalidActorNameException("actor name " + name + " is not unique!")
val actor = provider.actorOf(systemImpl, props, guardian, name, false) _actorOf(props, name)
childrenRefs = childrenRefs.updated(name, ChildRestartStats(actor))
actor
} }
var currentMessage: Envelope = null var currentMessage: Envelope = null
@ -199,7 +230,7 @@ private[akka] class ActorCell(
var nextNameSequence: Long = 0 var nextNameSequence: Long = 0
//Not thread safe, so should only be used inside the actor that inhabits this ActorCell //Not thread safe, so should only be used inside the actor that inhabits this ActorCell
override protected def randomName(): String = { protected def randomName(): String = {
val n = nextNameSequence + 1 val n = nextNameSequence + 1
nextNameSequence = n nextNameSequence = n
Helpers.base64(n) Helpers.base64(n)

View file

@ -215,7 +215,8 @@ class LocalActorRef private[akka] (
/** /**
* Is the actor terminated? * Is the actor terminated?
* If this method returns true, it will never return false again, but if it returns false, you cannot be sure if it's alive still (race condition) * If this method returns true, it will never return false again, but if it
* returns false, you cannot be sure if it's alive still (race condition)
*/ */
override def isTerminated: Boolean = actorCell.isTerminated override def isTerminated: Boolean = actorCell.isTerminated
@ -314,7 +315,7 @@ case class SerializedActorRef(path: String) {
def readResolve(): AnyRef = currentSystem.value match { def readResolve(): AnyRef = currentSystem.value match {
case null throw new IllegalStateException( case null throw new IllegalStateException(
"Trying to deserialize a serialized ActorRef without an ActorSystem in scope." + "Trying to deserialize a serialized ActorRef without an ActorSystem in scope." +
" Use akka.serialization.Serialization.currentSystem.withValue(system) { ... }") " Use 'akka.serialization.Serialization.currentSystem.withValue(system) { ... }'")
case someSystem someSystem.actorFor(path) case someSystem someSystem.actorFor(path)
} }
} }
@ -329,7 +330,7 @@ trait MinimalActorRef extends InternalActorRef {
if (name.size == 1 && name.head.isEmpty) this if (name.size == 1 && name.head.isEmpty) this
else Nobody else Nobody
//FIXME REMOVE THIS, ticket #1416 //FIXME REMOVE THIS, ticket #1416
//FIXME REMOVE THIS, ticket #1415 //FIXME REMOVE THIS, ticket #1415
def suspend(): Unit = () def suspend(): Unit = ()
def resume(): Unit = () def resume(): Unit = ()
@ -341,7 +342,7 @@ trait MinimalActorRef extends InternalActorRef {
def !(message: Any)(implicit sender: ActorRef = null): Unit = () def !(message: Any)(implicit sender: ActorRef = null): Unit = ()
def ?(message: Any)(implicit timeout: Timeout): Future[Any] = def ?(message: Any)(implicit timeout: Timeout): Future[Any] =
throw new UnsupportedOperationException("Not supported for %s".format(getClass.getName)) throw new UnsupportedOperationException("Not supported for [%s]".format(getClass.getName))
def sendSystemMessage(message: SystemMessage): Unit = () def sendSystemMessage(message: SystemMessage): Unit = ()
def restart(cause: Throwable): Unit = () def restart(cause: Throwable): Unit = ()
@ -378,7 +379,7 @@ class DeadLetterActorRef(val eventStream: EventStream) extends MinimalActorRef {
private[akka] def init(dispatcher: MessageDispatcher, rootPath: ActorPath) { private[akka] def init(dispatcher: MessageDispatcher, rootPath: ActorPath) {
_path = rootPath / "null" _path = rootPath / "null"
brokenPromise = new KeptPromise[Any](Left(new ActorKilledException("In DeadLetterActorRef, promises are always broken.")))(dispatcher) brokenPromise = new KeptPromise[Any](Left(new ActorKilledException("In DeadLetterActorRef - promises are always broken.")))(dispatcher)
} }
override def isTerminated(): Boolean = true override def isTerminated(): Boolean = true

View file

@ -120,7 +120,8 @@ trait ActorRefProvider {
} }
/** /**
* Interface implemented by ActorSystem and AkkaContext, the only two places from which you can get fresh actors. * Interface implemented by ActorSystem and AkkaContext, the only two places
* from which you can get fresh actors.
*/ */
trait ActorRefFactory { trait ActorRefFactory {
@ -137,16 +138,18 @@ trait ActorRefFactory {
protected def lookupRoot: InternalActorRef protected def lookupRoot: InternalActorRef
protected def randomName(): String
/** /**
* Create new actor as child of this context and give it an automatically * Create new actor as child of this context and give it an automatically
* generated name (currently similar to base64-encoded integer count, * generated name (currently similar to base64-encoded integer count,
* reversed and with $ prepended, may change in the future). * reversed and with $ prepended, may change in the future).
* *
* See [[akka.actor.Props]] for details on how to obtain a `Props` object. * See [[akka.actor.Props]] for details on how to obtain a `Props` object.
*
* When invoked on ActorSystem, this method sends a message to the guardian
* actor and blocks waiting for a reply, see `akka.actor.creation-timeout` in
* the `reference.conf`.
*/ */
def actorOf(props: Props): ActorRef = provider.actorOf(systemImpl, props, guardian, randomName(), false) def actorOf(props: Props): ActorRef
/** /**
* Create new actor as child of this context with the given name, which must * Create new actor as child of this context with the given name, which must
@ -154,6 +157,10 @@ trait ActorRefFactory {
* and `InvalidActorNameException` is thrown. * and `InvalidActorNameException` is thrown.
* *
* See [[akka.actor.Props]] for details on how to obtain a `Props` object. * See [[akka.actor.Props]] for details on how to obtain a `Props` object.
*
* When invoked on ActorSystem, this method sends a message to the guardian
* actor and blocks waiting for a reply, see `akka.actor.creation-timeout` in
* the `reference.conf`.
*/ */
def actorOf(props: Props, name: String): ActorRef def actorOf(props: Props, name: String): ActorRef
@ -162,6 +169,10 @@ trait ActorRefFactory {
* generated name (currently similar to base64-encoded integer count, * generated name (currently similar to base64-encoded integer count,
* reversed and with $ prepended, may change in the future). The type must have * reversed and with $ prepended, may change in the future). The type must have
* a no-arg constructor which will be invoked using reflection. * a no-arg constructor which will be invoked using reflection.
*
* When invoked on ActorSystem, this method sends a message to the guardian
* actor and blocks waiting for a reply, see `akka.actor.creation-timeout` in
* the `reference.conf`.
*/ */
def actorOf[T <: Actor](implicit m: Manifest[T]): ActorRef = actorOf(Props(m.erasure.asInstanceOf[Class[_ <: Actor]])) def actorOf[T <: Actor](implicit m: Manifest[T]): ActorRef = actorOf(Props(m.erasure.asInstanceOf[Class[_ <: Actor]]))
@ -170,6 +181,10 @@ trait ActorRefFactory {
* not be null, empty or start with $. If the given name is already in use, * not be null, empty or start with $. If the given name is already in use,
* and `InvalidActorNameException` is thrown. The type must have * and `InvalidActorNameException` is thrown. The type must have
* a no-arg constructor which will be invoked using reflection. * a no-arg constructor which will be invoked using reflection.
*
* When invoked on ActorSystem, this method sends a message to the guardian
* actor and blocks waiting for a reply, see `akka.actor.creation-timeout` in
* the `reference.conf`.
*/ */
def actorOf[T <: Actor](name: String)(implicit m: Manifest[T]): ActorRef = def actorOf[T <: Actor](name: String)(implicit m: Manifest[T]): ActorRef =
actorOf(Props(m.erasure.asInstanceOf[Class[_ <: Actor]]), name) actorOf(Props(m.erasure.asInstanceOf[Class[_ <: Actor]]), name)
@ -179,6 +194,10 @@ trait ActorRefFactory {
* generated name (currently similar to base64-encoded integer count, * generated name (currently similar to base64-encoded integer count,
* reversed and with $ prepended, may change in the future). The class must have * reversed and with $ prepended, may change in the future). The class must have
* a no-arg constructor which will be invoked using reflection. * a no-arg constructor which will be invoked using reflection.
*
* When invoked on ActorSystem, this method sends a message to the guardian
* actor and blocks waiting for a reply, see `akka.actor.creation-timeout` in
* the `reference.conf`.
*/ */
def actorOf[T <: Actor](clazz: Class[T]): ActorRef = actorOf(Props(clazz)) def actorOf[T <: Actor](clazz: Class[T]): ActorRef = actorOf(Props(clazz))
@ -188,6 +207,10 @@ trait ActorRefFactory {
* reversed and with $ prepended, may change in the future). Use this * reversed and with $ prepended, may change in the future). Use this
* method to pass constructor arguments to the [[akka.actor.Actor]] while using * method to pass constructor arguments to the [[akka.actor.Actor]] while using
* only default [[akka.actor.Props]]; otherwise refer to `actorOf(Props)`. * only default [[akka.actor.Props]]; otherwise refer to `actorOf(Props)`.
*
* When invoked on ActorSystem, this method sends a message to the guardian
* actor and blocks waiting for a reply, see `akka.actor.creation-timeout` in
* the `reference.conf`.
*/ */
def actorOf(factory: Actor): ActorRef = actorOf(Props(() factory)) def actorOf(factory: Actor): ActorRef = actorOf(Props(() factory))
@ -197,6 +220,10 @@ trait ActorRefFactory {
* count, reversed and with $ prepended, may change in the future). * count, reversed and with $ prepended, may change in the future).
* *
* Identical to `actorOf(Props(() => creator.create()))`. * Identical to `actorOf(Props(() => creator.create()))`.
*
* When invoked on ActorSystem, this method sends a message to the guardian
* actor and blocks waiting for a reply, see `akka.actor.creation-timeout` in
* the `reference.conf`.
*/ */
def actorOf(creator: UntypedActorFactory): ActorRef = actorOf(Props(() creator.create())) def actorOf(creator: UntypedActorFactory): ActorRef = actorOf(Props(() creator.create()))
@ -206,6 +233,10 @@ trait ActorRefFactory {
* and `InvalidActorNameException` is thrown. * and `InvalidActorNameException` is thrown.
* *
* Identical to `actorOf(Props(() => creator.create()), name)`. * Identical to `actorOf(Props(() => creator.create()), name)`.
*
* When invoked on ActorSystem, this method sends a message to the guardian
* actor and blocks waiting for a reply, see `akka.actor.creation-timeout` in
* the `reference.conf`.
*/ */
def actorOf(creator: UntypedActorFactory, name: String): ActorRef = actorOf(Props(() creator.create()), name) def actorOf(creator: UntypedActorFactory, name: String): ActorRef = actorOf(Props(() creator.create()), name)
@ -298,6 +329,11 @@ class ActorRefProviderException(message: String) extends AkkaException(message)
*/ */
private[akka] case class CreateChild(props: Props, name: String) private[akka] case class CreateChild(props: Props, name: String)
/**
* Internal Akka use only, used in implementation of system.actorOf.
*/
private[akka] case class CreateRandomNameChild(props: Props)
/** /**
* Local ActorRef provider. * Local ActorRef provider.
*/ */
@ -366,9 +402,10 @@ class LocalActorRefProvider(
private class Guardian extends Actor { private class Guardian extends Actor {
def receive = { def receive = {
case Terminated(_) context.self.stop() case Terminated(_) context.self.stop()
case CreateChild(child, name) sender ! (try context.actorOf(child, name) catch { case e: Exception e }) case CreateChild(child, name) sender ! (try context.actorOf(child, name) catch { case e: Exception e })
case m deadLetters ! DeadLetter(m, sender, self) case CreateRandomNameChild(child) sender ! (try context.actorOf(child) catch { case e: Exception e })
case m deadLetters ! DeadLetter(m, sender, self)
} }
} }
@ -377,8 +414,9 @@ class LocalActorRefProvider(
case Terminated(_) case Terminated(_)
eventStream.stopDefaultLoggers() eventStream.stopDefaultLoggers()
context.self.stop() context.self.stop()
case CreateChild(child, name) sender ! (try context.actorOf(child, name) catch { case e: Exception e }) case CreateChild(child, name) sender ! (try context.actorOf(child, name) catch { case e: Exception e })
case m deadLetters ! DeadLetter(m, sender, self) case CreateRandomNameChild(child) sender ! (try context.actorOf(child) catch { case e: Exception e })
case m deadLetters ! DeadLetter(m, sender, self)
} }
} }

View file

@ -29,17 +29,17 @@ object ActorSystem {
val Version = "2.0-SNAPSHOT" val Version = "2.0-SNAPSHOT"
val envHome = System.getenv("AKKA_HOME") match { val EnvHome = System.getenv("AKKA_HOME") match {
case null | "" | "." None case null | "" | "." None
case value Some(value) case value Some(value)
} }
val systemHome = System.getProperty("akka.home") match { val SystemHome = System.getProperty("akka.home") match {
case null | "" None case null | "" None
case value Some(value) case value Some(value)
} }
val GlobalHome = systemHome orElse envHome val GlobalHome = SystemHome orElse EnvHome
def create(name: String, config: Config): ActorSystem = apply(name, config) def create(name: String, config: Config): ActorSystem = apply(name, config)
def apply(name: String, config: Config): ActorSystem = new ActorSystemImpl(name, config).start() def apply(name: String, config: Config): ActorSystem = new ActorSystemImpl(name, config).start()
@ -48,6 +48,7 @@ object ActorSystem {
* Uses the standard default Config from ConfigFactory.load(), since none is provided. * Uses the standard default Config from ConfigFactory.load(), since none is provided.
*/ */
def create(name: String): ActorSystem = apply(name) def create(name: String): ActorSystem = apply(name)
/** /**
* Uses the standard default Config from ConfigFactory.load(), since none is provided. * Uses the standard default Config from ConfigFactory.load(), since none is provided.
*/ */
@ -107,7 +108,6 @@ object ActorSystem {
"] does not match the provided config version [" + ConfigVersion + "]") "] does not match the provided config version [" + ConfigVersion + "]")
override def toString: String = config.root.render override def toString: String = config.root.render
} }
// TODO move to migration kit // TODO move to migration kit
@ -151,15 +151,38 @@ object ActorSystem {
} catch { case _ None } } catch { case _ None }
private def emptyConfig = ConfigFactory.systemProperties private def emptyConfig = ConfigFactory.systemProperties
} }
} }
/** /**
* An actor system is a hierarchical group of actors which share common * An actor system is a hierarchical group of actors which share common
* configuration, e.g. dispatchers, deployments, remote capabilities and * configuration, e.g. dispatchers, deployments, remote capabilities and
* addresses. It is also the entry point for creating or looking up actors. * addresses. It is also the entry point for creating or looking up actors.
*
* There are several possibilities for creating actors (see [[akka.actor.Props]]
* for details on `props`):
*
* {{{
* // Java or Scala
* system.actorOf(props, "name")
* system.actorOf(props)
*
* // Scala
* system.actorOf[MyActor]("name")
* system.actorOf[MyActor]
* system.actorOf(new MyActor(...))
*
* // Java
* system.actorOf(classOf[MyActor]);
* system.actorOf(new Creator<MyActor>() {
* public MyActor create() { ... }
* });
* system.actorOf(new Creator<MyActor>() {
* public MyActor create() { ... }
* }, "name");
* }}}
*
* Where no name is given explicitly, one will be automatically generated.
*/ */
abstract class ActorSystem extends ActorRefFactory { abstract class ActorSystem extends ActorRefFactory {
import ActorSystem._ import ActorSystem._
@ -317,6 +340,14 @@ class ActorSystemImpl(val name: String, applicationConfig: Config) extends Actor
} }
} }
def actorOf(props: Props): ActorRef = {
implicit val timeout = settings.CreationTimeout
(guardian ? CreateRandomNameChild(props)).get match {
case ref: ActorRef ref
case ex: Exception throw ex
}
}
import settings._ import settings._
// this provides basic logging (to stdout) until .start() is called below // this provides basic logging (to stdout) until .start() is called below
@ -370,9 +401,6 @@ class ActorSystemImpl(val name: String, applicationConfig: Config) extends Actor
def nodename: String = provider.nodename def nodename: String = provider.nodename
def clustername: String = provider.clustername def clustername: String = provider.clustername
private final val nextName = new AtomicLong
override protected def randomName(): String = Helpers.base64(nextName.incrementAndGet())
def /(actorName: String): ActorPath = guardian.path / actorName def /(actorName: String): ActorPath = guardian.path / actorName
def /(path: Iterable[String]): ActorPath = guardian.path / path def /(path: Iterable[String]): ActorPath = guardian.path / path
@ -408,7 +436,7 @@ class ActorSystemImpl(val name: String, applicationConfig: Config) extends Actor
protected def createScheduler(): Scheduler = { protected def createScheduler(): Scheduler = {
val threadFactory = new MonitorableThreadFactory("DefaultScheduler") val threadFactory = new MonitorableThreadFactory("DefaultScheduler")
val hwt = new HashedWheelTimer(log, threadFactory, settings.SchedulerTickDuration, settings.SchedulerTicksPerWheel) val hwt = new HashedWheelTimer(log, threadFactory, settings.SchedulerTickDuration, settings.SchedulerTicksPerWheel)
// note that dispatcher is by-name parameter in DefaultScheduler constructor, // note that dispatcher is by-name parameter in DefaultScheduler constructor,
// because dispatcher is not initialized when the scheduler is created // because dispatcher is not initialized when the scheduler is created
def safeDispatcher = { def safeDispatcher = {
if (dispatcher eq null) { if (dispatcher eq null) {

View file

@ -310,22 +310,26 @@ abstract class MessageDispatcherConfigurator() {
settings: Settings, settings: Settings,
createDispatcher: (ThreadPoolConfig) MessageDispatcher): ThreadPoolConfigDispatcherBuilder = { createDispatcher: (ThreadPoolConfig) MessageDispatcher): ThreadPoolConfigDispatcherBuilder = {
import ThreadPoolConfigDispatcherBuilder.conf_? import ThreadPoolConfigDispatcherBuilder.conf_?
import scala.math.{ min, max }
//Apply the following options to the config if they are present in the config //Apply the following options to the config if they are present in the config
ThreadPoolConfigDispatcherBuilder(createDispatcher, ThreadPoolConfig()).configure( ThreadPoolConfigDispatcherBuilder(createDispatcher, ThreadPoolConfig())
conf_?(Some(config getMilliseconds "keep-alive-time"))(time _.setKeepAliveTime(Duration(time, TimeUnit.MILLISECONDS))), .setKeepAliveTime(Duration(config getMilliseconds "keep-alive-time", TimeUnit.MILLISECONDS))
conf_?(Some(config getDouble "core-pool-size-factor"))(factor _.setCorePoolSizeFromFactor(factor)), .setAllowCoreThreadTimeout(config getBoolean "allow-core-timeout")
conf_?(Some(config getDouble "max-pool-size-factor"))(factor _.setMaxPoolSizeFromFactor(factor)), .setCorePoolSize(min(max(ThreadPoolConfig.scaledPoolSize(config getDouble "core-pool-size-factor"),
conf_?(Some(config getBoolean "allow-core-timeout"))(allow _.setAllowCoreThreadTimeout(allow)), config getInt "core-pool-size-min"), config getInt "core-pool-size-max"))
conf_?(Some(config getInt "task-queue-size") flatMap { .setMaxPoolSize(min(max(ThreadPoolConfig.scaledPoolSize(config getDouble "max-pool-size-factor"),
case size if size > 0 config getInt "max-pool-size-min"), config getInt "max-pool-size-max"))
Some(config getString "task-queue-type") map { .configure(
case "array" ThreadPoolConfig.arrayBlockingQueue(size, false) //TODO config fairness? conf_?(Some(config getInt "task-queue-size") flatMap {
case "" | "linked" ThreadPoolConfig.linkedBlockingQueue(size) case size if size > 0
case x throw new IllegalArgumentException("[%s] is not a valid task-queue-type [array|linked]!" format x) Some(config getString "task-queue-type") map {
} case "array" ThreadPoolConfig.arrayBlockingQueue(size, false) //TODO config fairness?
case _ None case "" | "linked" ThreadPoolConfig.linkedBlockingQueue(size)
})(queueFactory _.setQueueFactory(queueFactory))) case x throw new IllegalArgumentException("[%s] is not a valid task-queue-type [array|linked]!" format x)
}
case _ None
})(queueFactory _.setQueueFactory(queueFactory)))
} }
} }

View file

@ -22,7 +22,7 @@ object LoggingBus {
/** /**
* This trait brings log level handling to the EventStream: it reads the log * This trait brings log level handling to the EventStream: it reads the log
* levels for the initial logging (StandardOutLogger) and the loggers&level * levels for the initial logging (StandardOutLogger) and the loggers & level
* for after-init logging, possibly keeping the StandardOutLogger enabled if * for after-init logging, possibly keeping the StandardOutLogger enabled if
* it is part of the configured loggers. All configured loggers are treated as * it is part of the configured loggers. All configured loggers are treated as
* system services and managed by this trait, i.e. subscribed/unsubscribed in * system services and managed by this trait, i.e. subscribed/unsubscribed in

View file

@ -128,6 +128,7 @@ class AkkaSpecSpec extends WordSpec with MustMatchers {
implicit val davyJones = otherSystem.actorOf(Props(new Actor { implicit val davyJones = otherSystem.actorOf(Props(new Actor {
def receive = { def receive = {
case m: DeadLetter locker :+= m case m: DeadLetter locker :+= m
case "Die!" sender ! "finally gone"; self.stop()
} }
}), "davyJones") }), "davyJones")
@ -148,6 +149,7 @@ class AkkaSpecSpec extends WordSpec with MustMatchers {
system.registerOnTermination(latch.countDown()) system.registerOnTermination(latch.countDown())
system.stop() system.stop()
latch.await(2 seconds) latch.await(2 seconds)
(davyJones ? "Die!").get must be === "finally gone"
// this will typically also contain log messages which were sent after the logger shutdown // this will typically also contain log messages which were sent after the logger shutdown
locker must contain(DeadLetter(42, davyJones, probe.ref)) locker must contain(DeadLetter(42, davyJones, probe.ref))