+act #2075 Added possibility to pass an ExecutionContext to an ActorSystem

This commit is contained in:
Dario Rexin 2014-01-16 23:24:06 +01:00
parent 32b76adb9a
commit 4b2d98c5cb
8 changed files with 173 additions and 23 deletions

View file

@ -7,12 +7,11 @@ import language.postfixOps
import akka.testkit._ import akka.testkit._
import org.scalatest.junit.JUnitSuiteLike import org.scalatest.junit.JUnitSuiteLike
import com.typesafe.config.ConfigFactory import com.typesafe.config.ConfigFactory
import scala.concurrent.Await import scala.concurrent.{ ExecutionContext, Await, Future }
import scala.concurrent.duration._ import scala.concurrent.duration._
import java.util.concurrent.{ RejectedExecutionException, ConcurrentLinkedQueue } import java.util.concurrent.{ RejectedExecutionException, ConcurrentLinkedQueue }
import akka.util.Timeout import akka.util.Timeout
import akka.japi.Util.immutableSeq import akka.japi.Util.immutableSeq
import scala.concurrent.Future
import akka.pattern.ask import akka.pattern.ask
import akka.dispatch._ import akka.dispatch._
import com.typesafe.config.Config import com.typesafe.config.Config
@ -110,6 +109,19 @@ object ActorSystemSpec {
override def dispatcher(): MessageDispatcher = instance override def dispatcher(): MessageDispatcher = instance
} }
class TestExecutionContext(testActor: ActorRef, underlying: ExecutionContext) extends ExecutionContext {
def execute(runnable: Runnable): Unit = {
testActor ! "called"
underlying.execute(runnable)
}
def reportFailure(t: Throwable): Unit = {
testActor ! "failed"
underlying.reportFailure(t)
}
}
val config = s""" val config = s"""
akka.extensions = ["akka.actor.TestExtension"] akka.extensions = ["akka.actor.TestExtension"]
slow { slow {
@ -305,6 +317,54 @@ class ActorSystemSpec extends AkkaSpec(ActorSystemSpec.config) with ImplicitSend
} }
} }
"work with a passed in ExecutionContext" in {
val ecProbe = TestProbe()
val ec = new ActorSystemSpec.TestExecutionContext(ecProbe.ref, ExecutionContexts.global())
val system2 = ActorSystem(name = "default", defaultExecutionContext = Some(ec))
try {
val ref = system2.actorOf(Props(new Actor {
def receive = {
case "ping" sender ! "pong"
}
}))
val probe = TestProbe()
ref.tell("ping", probe.ref)
ecProbe.expectMsg(1.second, "called")
probe.expectMsg(1.second, "pong")
} finally {
shutdown(system2)
}
}
"not use passed in ExecutionContext if executor is configured" in {
val ecProbe = TestProbe()
val ec = new ActorSystemSpec.TestExecutionContext(ecProbe.ref, ExecutionContexts.global())
val config = ConfigFactory.parseString("akka.actor.default-dispatcher.executor = \"fork-join-executor\"")
val system2 = ActorSystem(name = "default", config = Some(config), defaultExecutionContext = Some(ec))
try {
val ref = system2.actorOf(Props(new Actor {
def receive = {
case "ping" sender ! "pong"
}
}))
val probe = TestProbe()
ref.tell("ping", probe.ref)
ecProbe.expectNoMsg()
probe.expectMsg(1.second, "pong")
} finally {
shutdown(system2)
}
}
} }
} }

View file

@ -72,13 +72,19 @@ class ConfigSpec extends AkkaSpec(ConfigFactory.defaultReference(ActorSystem.fin
{ {
c.getString("type") should equal("Dispatcher") c.getString("type") should equal("Dispatcher")
c.getString("executor") should equal("fork-join-executor") c.getString("executor") should equal("default-executor")
c.getDuration("shutdown-timeout", TimeUnit.MILLISECONDS) should equal(1 * 1000) c.getDuration("shutdown-timeout", TimeUnit.MILLISECONDS) should equal(1 * 1000)
c.getInt("throughput") should equal(5) c.getInt("throughput") should equal(5)
c.getDuration("throughput-deadline-time", TimeUnit.MILLISECONDS) should equal(0) c.getDuration("throughput-deadline-time", TimeUnit.MILLISECONDS) should equal(0)
c.getBoolean("attempt-teamwork") should equal(true) c.getBoolean("attempt-teamwork") should equal(true)
} }
//Default executor config
{
val pool = c.getConfig("default-executor")
pool.getString("fallback") should equal("fork-join-executor")
}
//Fork join executor config //Fork join executor config
{ {

View file

@ -238,10 +238,23 @@ akka {
# Which kind of ExecutorService to use for this dispatcher # Which kind of ExecutorService to use for this dispatcher
# Valid options: # Valid options:
# - "default-executor" requires a "default-executor" section
# - "fork-join-executor" requires a "fork-join-executor" section # - "fork-join-executor" requires a "fork-join-executor" section
# - "thread-pool-executor" requires a "thread-pool-executor" section # - "thread-pool-executor" requires a "thread-pool-executor" section
# - A FQCN of a class extending ExecutorServiceConfigurator # - A FQCN of a class extending ExecutorServiceConfigurator
executor = "fork-join-executor" executor = "default-executor"
# This will be used if you have set "executor = "default-executor"".
# If an ActorSystem is created with a given ExecutionContext, this
# ExecutionContext will be used as the default executor for all
# dispatchers in the ActorSystem configured with
# executor = "default-executor". Note that "default-executor"
# is the default value for executor, and therefore used if not
# specified otherwise. If no ExecutionContext is given,
# the executor configured in "fallback" will be used.
default-executor {
fallback = "fork-join-executor"
}
# This will be used if you have set "executor = "fork-join-executor"" # This will be used if you have set "executor = "fork-join-executor""
fork-join-executor { fork-join-executor {

View file

@ -56,7 +56,7 @@ object ActorSystem {
def create(name: String): ActorSystem = apply(name) def create(name: String): ActorSystem = apply(name)
/** /**
* Creates a new ActorSystem with the name "default", and the specified Config, then * Creates a new ActorSystem with the specified name, and the specified Config, then
* obtains the current ClassLoader by first inspecting the current threads' getContextClassLoader, * obtains the current ClassLoader by first inspecting the current threads' getContextClassLoader,
* then tries to walk the stack to find the callers class loader, then falls back to the ClassLoader * then tries to walk the stack to find the callers class loader, then falls back to the ClassLoader
* associated with the ActorSystem class. * associated with the ActorSystem class.
@ -66,12 +66,29 @@ object ActorSystem {
def create(name: String, config: Config): ActorSystem = apply(name, config) def create(name: String, config: Config): ActorSystem = apply(name, config)
/** /**
* Creates a new ActorSystem with the name "default", the specified Config, and specified ClassLoader * Creates a new ActorSystem with the specified name, the specified Config, and specified ClassLoader
* *
* @see <a href="http://typesafehub.github.io/config/v1.2.0/" target="_blank">The Typesafe Config Library API Documentation</a> * @see <a href="http://typesafehub.github.io/config/v1.2.0/" target="_blank">The Typesafe Config Library API Documentation</a>
*/ */
def create(name: String, config: Config, classLoader: ClassLoader): ActorSystem = apply(name, config, classLoader) def create(name: String, config: Config, classLoader: ClassLoader): ActorSystem = apply(name, config, classLoader)
/**
* Creates a new ActorSystem with the specified name, the specified Config, the specified ClassLoader,
* and the specified ExecutionContext. The ExecutionContext will be used as the default executor inside this ActorSystem.
* If [[null]] is passed in for the Config, ClassLoader and/or ExecutionContext parameters, the respective default value
* will be used. If no Config is given, the default reference config will be obtained from the ClassLoader.
* If no ClassLoader is given, it obtains the current ClassLoader by first inspecting the current
* threads' getContextClassLoader, then tries to walk the stack to find the callers class loader, then
* falls back to the ClassLoader associated with the ActorSystem class. If no ExecutionContext is given, the
* system will fallback to the executor configured under "akka.actor.default-dispatcher.default-executor.fallback".
* Note that the given ExecutionContext will be used by all dispatchers that have been configured with
* executor = "default-executor", including those that have not defined the executor setting and thereby fallback
* to the default of "default-dispatcher.executor".
*
* @see <a href="http://typesafehub.github.io/config/v1.2.0/" target="_blank">The Typesafe Config Library API Documentation</a>
*/
def create(name: String, config: Config, classLoader: ClassLoader, defaultExecutionContext: ExecutionContext): ActorSystem = apply(name, Option(config), Option(classLoader), Option(defaultExecutionContext))
/** /**
* Creates a new ActorSystem with the name "default", * Creates a new ActorSystem with the name "default",
* obtains the current ClassLoader by first inspecting the current threads' getContextClassLoader, * obtains the current ClassLoader by first inspecting the current threads' getContextClassLoader,
@ -88,27 +105,41 @@ object ActorSystem {
* associated with the ActorSystem class. * associated with the ActorSystem class.
* Then it loads the default reference configuration using the ClassLoader. * Then it loads the default reference configuration using the ClassLoader.
*/ */
def apply(name: String): ActorSystem = { def apply(name: String): ActorSystem = apply(name, None, None, None)
val classLoader = findClassLoader()
apply(name, ConfigFactory.load(classLoader), classLoader)
}
/** /**
* Creates a new ActorSystem with the name "default", and the specified Config, then * Creates a new ActorSystem with the specified name, and the specified Config, then
* obtains the current ClassLoader by first inspecting the current threads' getContextClassLoader, * obtains the current ClassLoader by first inspecting the current threads' getContextClassLoader,
* then tries to walk the stack to find the callers class loader, then falls back to the ClassLoader * then tries to walk the stack to find the callers class loader, then falls back to the ClassLoader
* associated with the ActorSystem class. * associated with the ActorSystem class.
* *
* @see <a href="http://typesafehub.github.io/config/v1.2.0/" target="_blank">The Typesafe Config Library API Documentation</a> * @see <a href="http://typesafehub.github.io/config/v1.2.0/" target="_blank">The Typesafe Config Library API Documentation</a>
*/ */
def apply(name: String, config: Config): ActorSystem = apply(name, config, findClassLoader()) def apply(name: String, config: Config): ActorSystem = apply(name, Option(config), None, None)
/** /**
* Creates a new ActorSystem with the name "default", the specified Config, and specified ClassLoader * Creates a new ActorSystem with the specified name, the specified Config, and specified ClassLoader
* *
* @see <a href="http://typesafehub.github.io/config/v1.2.0/" target="_blank">The Typesafe Config Library API Documentation</a> * @see <a href="http://typesafehub.github.io/config/v1.2.0/" target="_blank">The Typesafe Config Library API Documentation</a>
*/ */
def apply(name: String, config: Config, classLoader: ClassLoader): ActorSystem = new ActorSystemImpl(name, config, classLoader).start() def apply(name: String, config: Config, classLoader: ClassLoader): ActorSystem = apply(name, Option(config), Option(classLoader), None)
/**
* Creates a new ActorSystem with the specified name,
* the specified ClassLoader if given, otherwise obtains the current ClassLoader by first inspecting the current
* threads' getContextClassLoader, then tries to walk the stack to find the callers class loader, then
* falls back to the ClassLoader associated with the ActorSystem class.
* If an ExecutionContext is given, it will be used as the default executor inside this ActorSystem.
* If no ExecutionContext is given, the system will fallback to the executor configured under "akka.actor.default-dispatcher.default-executor.fallback".
* The system will use the passed in config, or falls back to the deafult reference configuration using the ClassLoader.
*
* @see <a href="http://typesafehub.github.io/config/v1.2.0/" target="_blank">The Typesafe Config Library API Documentation</a>
*/
def apply(name: String, config: Option[Config] = None, classLoader: Option[ClassLoader] = None, defaultExecutionContext: Option[ExecutionContext] = None): ActorSystem = {
val cl = classLoader.getOrElse(findClassLoader())
val appConfig = config.getOrElse(ConfigFactory.load(cl))
new ActorSystemImpl(name, appConfig, cl, defaultExecutionContext).start()
}
/** /**
* Settings are the overall ActorSystem Settings which also provides a convenient access to the Config object. * Settings are the overall ActorSystem Settings which also provides a convenient access to the Config object.
@ -454,7 +485,7 @@ abstract class ExtendedActorSystem extends ActorSystem {
private[akka] def printTree: String private[akka] def printTree: String
} }
private[akka] class ActorSystemImpl(val name: String, applicationConfig: Config, classLoader: ClassLoader) extends ExtendedActorSystem { private[akka] class ActorSystemImpl(val name: String, applicationConfig: Config, classLoader: ClassLoader, defaultExecutionContext: Option[ExecutionContext]) extends ExtendedActorSystem {
if (!name.matches("""^[a-zA-Z0-9][a-zA-Z0-9-]*$""")) if (!name.matches("""^[a-zA-Z0-9][a-zA-Z0-9-]*$"""))
throw new IllegalArgumentException( throw new IllegalArgumentException(
@ -552,7 +583,7 @@ private[akka] class ActorSystemImpl(val name: String, applicationConfig: Config,
val mailboxes: Mailboxes = new Mailboxes(settings, eventStream, dynamicAccess, deadLetters) val mailboxes: Mailboxes = new Mailboxes(settings, eventStream, dynamicAccess, deadLetters)
val dispatchers: Dispatchers = new Dispatchers(settings, DefaultDispatcherPrerequisites( val dispatchers: Dispatchers = new Dispatchers(settings, DefaultDispatcherPrerequisites(
threadFactory, eventStream, scheduler, dynamicAccess, settings, mailboxes)) threadFactory, eventStream, scheduler, dynamicAccess, settings, mailboxes, defaultExecutionContext))
val dispatcher: ExecutionContextExecutor = dispatchers.defaultGlobalDispatcher val dispatcher: ExecutionContextExecutor = dispatchers.defaultGlobalDispatcher

View file

@ -5,11 +5,11 @@
package akka.dispatch package akka.dispatch
import java.util.concurrent._ import java.util.concurrent._
import akka.event.Logging.{ Error, LogEventException } import akka.event.Logging.{ Debug, Error, LogEventException }
import akka.actor._ import akka.actor._
import akka.dispatch.sysmsg._ import akka.dispatch.sysmsg._
import akka.event.EventStream import akka.event.{ BusLogging, EventStream }
import com.typesafe.config.Config import com.typesafe.config.{ ConfigFactory, Config }
import akka.util.{ Unsafe, Index } import akka.util.{ Unsafe, Index }
import scala.annotation.tailrec import scala.annotation.tailrec
import scala.concurrent.forkjoin.{ ForkJoinTask, ForkJoinPool } import scala.concurrent.forkjoin.{ ForkJoinTask, ForkJoinPool }
@ -19,6 +19,7 @@ import scala.concurrent.ExecutionContextExecutor
import scala.concurrent.duration.FiniteDuration import scala.concurrent.duration.FiniteDuration
import scala.util.control.NonFatal import scala.util.control.NonFatal
import scala.util.Try import scala.util.Try
import java.{ util ju }
final case class Envelope private (val message: Any, val sender: ActorRef) final case class Envelope private (val message: Any, val sender: ActorRef)
@ -318,7 +319,7 @@ abstract class MessageDispatcherConfigurator(_config: Config, val prerequisites:
def dispatcher(): MessageDispatcher def dispatcher(): MessageDispatcher
def configureExecutor(): ExecutorServiceConfigurator = { def configureExecutor(): ExecutorServiceConfigurator = {
config.getString("executor") match { def configurator(executor: String): ExecutorServiceConfigurator = executor match {
case null | "" | "fork-join-executor" new ForkJoinExecutorConfigurator(config.getConfig("fork-join-executor"), prerequisites) case null | "" | "fork-join-executor" new ForkJoinExecutorConfigurator(config.getConfig("fork-join-executor"), prerequisites)
case "thread-pool-executor" new ThreadPoolExecutorConfigurator(config.getConfig("thread-pool-executor"), prerequisites) case "thread-pool-executor" new ThreadPoolExecutorConfigurator(config.getConfig("thread-pool-executor"), prerequisites)
case fqcn case fqcn
@ -332,6 +333,11 @@ abstract class MessageDispatcherConfigurator(_config: Config, val prerequisites:
.format(fqcn, config.getString("id"), classOf[Config], classOf[DispatcherPrerequisites]), exception) .format(fqcn, config.getString("id"), classOf[Config], classOf[DispatcherPrerequisites]), exception)
}).get }).get
} }
config.getString("executor") match {
case "default-executor" new DefaultExecutorServiceConfigurator(config.getConfig("default-executor"), prerequisites, configurator(config.getString("default-executor.fallback")))
case other configurator(other)
}
} }
} }
@ -423,3 +429,26 @@ class ForkJoinExecutorConfigurator(config: Config, prerequisites: DispatcherPrer
config.getInt("parallelism-max"))) config.getInt("parallelism-max")))
} }
} }
class DefaultExecutorServiceConfigurator(config: Config, prerequisites: DispatcherPrerequisites, fallback: ExecutorServiceConfigurator) extends ExecutorServiceConfigurator(config, prerequisites) {
val provider: ExecutorServiceFactoryProvider =
prerequisites.defaultExecutionContext match {
case Some(ec)
prerequisites.eventStream.publish(Debug("DefaultExecutorServiceConfigurator", this.getClass, s"Using passed in ExecutionContext as default executor for this ActorSystem. If you want to use a different executor, please specify one in akka.actor.default-dispatcher.default-executor."))
new AbstractExecutorService with ExecutorServiceFactory with ExecutorServiceFactoryProvider {
def createExecutorServiceFactory(id: String, threadFactory: ThreadFactory): ExecutorServiceFactory = this
def createExecutorService: ExecutorService = this
def shutdown(): Unit = ()
def isTerminated: Boolean = false
def awaitTermination(timeout: Long, unit: TimeUnit): Boolean = false
def shutdownNow(): ju.List[Runnable] = ju.Collections.emptyList()
def execute(command: Runnable): Unit = ec.execute(command)
def isShutdown: Boolean = false
}
case None fallback
}
def createExecutorServiceFactory(id: String, threadFactory: ThreadFactory): ExecutorServiceFactory =
provider.createExecutorServiceFactory(id, threadFactory)
}

View file

@ -13,6 +13,7 @@ import scala.concurrent.duration.Duration
import akka.ConfigurationException import akka.ConfigurationException
import akka.actor.Deploy import akka.actor.Deploy
import akka.util.Helpers.ConfigOps import akka.util.Helpers.ConfigOps
import scala.concurrent.ExecutionContext
/** /**
* DispatcherPrerequisites represents useful contextual pieces when constructing a MessageDispatcher * DispatcherPrerequisites represents useful contextual pieces when constructing a MessageDispatcher
@ -24,6 +25,7 @@ trait DispatcherPrerequisites {
def dynamicAccess: DynamicAccess def dynamicAccess: DynamicAccess
def settings: ActorSystem.Settings def settings: ActorSystem.Settings
def mailboxes: Mailboxes def mailboxes: Mailboxes
def defaultExecutionContext: Option[ExecutionContext]
} }
/** /**
@ -35,7 +37,8 @@ private[akka] case class DefaultDispatcherPrerequisites(
val scheduler: Scheduler, val scheduler: Scheduler,
val dynamicAccess: DynamicAccess, val dynamicAccess: DynamicAccess,
val settings: ActorSystem.Settings, val settings: ActorSystem.Settings,
val mailboxes: Mailboxes) extends DispatcherPrerequisites val mailboxes: Mailboxes,
val defaultExecutionContext: Option[ExecutionContext]) extends DispatcherPrerequisites
object Dispatchers { object Dispatchers {
/** /**

View file

@ -11,7 +11,11 @@ Default dispatcher
------------------ ------------------
Every ``ActorSystem`` will have a default dispatcher that will be used in case nothing else is configured for an ``Actor``. Every ``ActorSystem`` will have a default dispatcher that will be used in case nothing else is configured for an ``Actor``.
The default dispatcher can be configured, and is by default a ``Dispatcher`` with a "fork-join-executor", which gives excellent performance in most cases. The default dispatcher can be configured, and is by default a ``Dispatcher`` with the specified ``default-executor``.
If an ActorSystem is created with an ExecutionContext passed in, this ExecutionContext will be used as the default executor for all
dispatchers in this ActorSystem. If no ExecutionContext is given, it will fallback to the executor specified in
``akka.actor.default-dispatcher.default-executor.fallback``. By default this is a "fork-join-executor", which
gives excellent performance in most cases.
.. _dispatcher-lookup-java: .. _dispatcher-lookup-java:

View file

@ -11,7 +11,11 @@ Default dispatcher
------------------ ------------------
Every ``ActorSystem`` will have a default dispatcher that will be used in case nothing else is configured for an ``Actor``. Every ``ActorSystem`` will have a default dispatcher that will be used in case nothing else is configured for an ``Actor``.
The default dispatcher can be configured, and is by default a ``Dispatcher`` with a "fork-join-executor", which gives excellent performance in most cases. The default dispatcher can be configured, and is by default a ``Dispatcher`` with the specified ``default-executor``.
If an ActorSystem is created with an ExecutionContext passed in, this ExecutionContext will be used as the default executor for all
dispatchers in this ActorSystem. If no ExecutionContext is given, it will fallback to the executor specified in
``akka.actor.default-dispatcher.default-executor.fallback``. By default this is a "fork-join-executor", which
gives excellent performance in most cases.
.. _dispatcher-lookup-scala: .. _dispatcher-lookup-scala: