diff --git a/akka-actor-tests/src/test/scala/akka/actor/ActorSystemSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/ActorSystemSpec.scala index 8c67a70031..80b763e25d 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/ActorSystemSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/ActorSystemSpec.scala @@ -7,12 +7,11 @@ import language.postfixOps import akka.testkit._ import org.scalatest.junit.JUnitSuiteLike import com.typesafe.config.ConfigFactory -import scala.concurrent.Await +import scala.concurrent.{ ExecutionContext, Await, Future } import scala.concurrent.duration._ import java.util.concurrent.{ RejectedExecutionException, ConcurrentLinkedQueue } import akka.util.Timeout import akka.japi.Util.immutableSeq -import scala.concurrent.Future import akka.pattern.ask import akka.dispatch._ import com.typesafe.config.Config @@ -110,6 +109,19 @@ object ActorSystemSpec { override def dispatcher(): MessageDispatcher = instance } + class TestExecutionContext(testActor: ActorRef, underlying: ExecutionContext) extends ExecutionContext { + + def execute(runnable: Runnable): Unit = { + testActor ! "called" + underlying.execute(runnable) + } + + def reportFailure(t: Throwable): Unit = { + testActor ! "failed" + underlying.reportFailure(t) + } + } + val config = s""" akka.extensions = ["akka.actor.TestExtension"] slow { @@ -305,6 +317,54 @@ class ActorSystemSpec extends AkkaSpec(ActorSystemSpec.config) with ImplicitSend } } + "work with a passed in ExecutionContext" in { + val ecProbe = TestProbe() + val ec = new ActorSystemSpec.TestExecutionContext(ecProbe.ref, ExecutionContexts.global()) + + val system2 = ActorSystem(name = "default", defaultExecutionContext = Some(ec)) + + try { + val ref = system2.actorOf(Props(new Actor { + def receive = { + case "ping" ⇒ sender ! "pong" + } + })) + + val probe = TestProbe() + + ref.tell("ping", probe.ref) + + ecProbe.expectMsg(1.second, "called") + probe.expectMsg(1.second, "pong") + } finally { + shutdown(system2) + } + } + + "not use passed in ExecutionContext if executor is configured" in { + val ecProbe = TestProbe() + val ec = new ActorSystemSpec.TestExecutionContext(ecProbe.ref, ExecutionContexts.global()) + + val config = ConfigFactory.parseString("akka.actor.default-dispatcher.executor = \"fork-join-executor\"") + val system2 = ActorSystem(name = "default", config = Some(config), defaultExecutionContext = Some(ec)) + + try { + val ref = system2.actorOf(Props(new Actor { + def receive = { + case "ping" ⇒ sender ! "pong" + } + })) + + val probe = TestProbe() + + ref.tell("ping", probe.ref) + + ecProbe.expectNoMsg() + probe.expectMsg(1.second, "pong") + } finally { + shutdown(system2) + } + } } } diff --git a/akka-actor-tests/src/test/scala/akka/config/ConfigSpec.scala b/akka-actor-tests/src/test/scala/akka/config/ConfigSpec.scala index 6d0225e9af..c86e315bc5 100644 --- a/akka-actor-tests/src/test/scala/akka/config/ConfigSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/config/ConfigSpec.scala @@ -72,13 +72,19 @@ class ConfigSpec extends AkkaSpec(ConfigFactory.defaultReference(ActorSystem.fin { c.getString("type") should equal("Dispatcher") - c.getString("executor") should equal("fork-join-executor") + c.getString("executor") should equal("default-executor") c.getDuration("shutdown-timeout", TimeUnit.MILLISECONDS) should equal(1 * 1000) c.getInt("throughput") should equal(5) c.getDuration("throughput-deadline-time", TimeUnit.MILLISECONDS) should equal(0) c.getBoolean("attempt-teamwork") should equal(true) } + //Default executor config + { + val pool = c.getConfig("default-executor") + pool.getString("fallback") should equal("fork-join-executor") + } + //Fork join executor config { diff --git a/akka-actor/src/main/resources/reference.conf b/akka-actor/src/main/resources/reference.conf index 2bbe9cc242..3201e7c06f 100644 --- a/akka-actor/src/main/resources/reference.conf +++ b/akka-actor/src/main/resources/reference.conf @@ -238,10 +238,23 @@ akka { # Which kind of ExecutorService to use for this dispatcher # Valid options: + # - "default-executor" requires a "default-executor" section # - "fork-join-executor" requires a "fork-join-executor" section # - "thread-pool-executor" requires a "thread-pool-executor" section # - A FQCN of a class extending ExecutorServiceConfigurator - executor = "fork-join-executor" + executor = "default-executor" + + # This will be used if you have set "executor = "default-executor"". + # If an ActorSystem is created with a given ExecutionContext, this + # ExecutionContext will be used as the default executor for all + # dispatchers in the ActorSystem configured with + # executor = "default-executor". Note that "default-executor" + # is the default value for executor, and therefore used if not + # specified otherwise. If no ExecutionContext is given, + # the executor configured in "fallback" will be used. + default-executor { + fallback = "fork-join-executor" + } # This will be used if you have set "executor = "fork-join-executor"" fork-join-executor { diff --git a/akka-actor/src/main/scala/akka/actor/ActorSystem.scala b/akka-actor/src/main/scala/akka/actor/ActorSystem.scala index 55077d2ef8..3840415309 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorSystem.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorSystem.scala @@ -56,7 +56,7 @@ object ActorSystem { def create(name: String): ActorSystem = apply(name) /** - * Creates a new ActorSystem with the name "default", and the specified Config, then + * Creates a new ActorSystem with the specified name, and the specified Config, then * obtains the current ClassLoader by first inspecting the current threads' getContextClassLoader, * then tries to walk the stack to find the callers class loader, then falls back to the ClassLoader * associated with the ActorSystem class. @@ -66,12 +66,29 @@ object ActorSystem { def create(name: String, config: Config): ActorSystem = apply(name, config) /** - * Creates a new ActorSystem with the name "default", the specified Config, and specified ClassLoader + * Creates a new ActorSystem with the specified name, the specified Config, and specified ClassLoader * * @see The Typesafe Config Library API Documentation */ def create(name: String, config: Config, classLoader: ClassLoader): ActorSystem = apply(name, config, classLoader) + /** + * Creates a new ActorSystem with the specified name, the specified Config, the specified ClassLoader, + * and the specified ExecutionContext. The ExecutionContext will be used as the default executor inside this ActorSystem. + * If [[null]] is passed in for the Config, ClassLoader and/or ExecutionContext parameters, the respective default value + * will be used. If no Config is given, the default reference config will be obtained from the ClassLoader. + * If no ClassLoader is given, it obtains the current ClassLoader by first inspecting the current + * threads' getContextClassLoader, then tries to walk the stack to find the callers class loader, then + * falls back to the ClassLoader associated with the ActorSystem class. If no ExecutionContext is given, the + * system will fallback to the executor configured under "akka.actor.default-dispatcher.default-executor.fallback". + * Note that the given ExecutionContext will be used by all dispatchers that have been configured with + * executor = "default-executor", including those that have not defined the executor setting and thereby fallback + * to the default of "default-dispatcher.executor". + * + * @see The Typesafe Config Library API Documentation + */ + def create(name: String, config: Config, classLoader: ClassLoader, defaultExecutionContext: ExecutionContext): ActorSystem = apply(name, Option(config), Option(classLoader), Option(defaultExecutionContext)) + /** * Creates a new ActorSystem with the name "default", * obtains the current ClassLoader by first inspecting the current threads' getContextClassLoader, @@ -88,27 +105,41 @@ object ActorSystem { * associated with the ActorSystem class. * Then it loads the default reference configuration using the ClassLoader. */ - def apply(name: String): ActorSystem = { - val classLoader = findClassLoader() - apply(name, ConfigFactory.load(classLoader), classLoader) - } + def apply(name: String): ActorSystem = apply(name, None, None, None) /** - * Creates a new ActorSystem with the name "default", and the specified Config, then + * Creates a new ActorSystem with the specified name, and the specified Config, then * obtains the current ClassLoader by first inspecting the current threads' getContextClassLoader, * then tries to walk the stack to find the callers class loader, then falls back to the ClassLoader * associated with the ActorSystem class. * * @see The Typesafe Config Library API Documentation */ - def apply(name: String, config: Config): ActorSystem = apply(name, config, findClassLoader()) + def apply(name: String, config: Config): ActorSystem = apply(name, Option(config), None, None) /** - * Creates a new ActorSystem with the name "default", the specified Config, and specified ClassLoader + * Creates a new ActorSystem with the specified name, the specified Config, and specified ClassLoader * * @see The Typesafe Config Library API Documentation */ - def apply(name: String, config: Config, classLoader: ClassLoader): ActorSystem = new ActorSystemImpl(name, config, classLoader).start() + def apply(name: String, config: Config, classLoader: ClassLoader): ActorSystem = apply(name, Option(config), Option(classLoader), None) + + /** + * Creates a new ActorSystem with the specified name, + * the specified ClassLoader if given, otherwise obtains the current ClassLoader by first inspecting the current + * threads' getContextClassLoader, then tries to walk the stack to find the callers class loader, then + * falls back to the ClassLoader associated with the ActorSystem class. + * If an ExecutionContext is given, it will be used as the default executor inside this ActorSystem. + * If no ExecutionContext is given, the system will fallback to the executor configured under "akka.actor.default-dispatcher.default-executor.fallback". + * The system will use the passed in config, or falls back to the deafult reference configuration using the ClassLoader. + * + * @see The Typesafe Config Library API Documentation + */ + def apply(name: String, config: Option[Config] = None, classLoader: Option[ClassLoader] = None, defaultExecutionContext: Option[ExecutionContext] = None): ActorSystem = { + val cl = classLoader.getOrElse(findClassLoader()) + val appConfig = config.getOrElse(ConfigFactory.load(cl)) + new ActorSystemImpl(name, appConfig, cl, defaultExecutionContext).start() + } /** * Settings are the overall ActorSystem Settings which also provides a convenient access to the Config object. @@ -454,7 +485,7 @@ abstract class ExtendedActorSystem extends ActorSystem { private[akka] def printTree: String } -private[akka] class ActorSystemImpl(val name: String, applicationConfig: Config, classLoader: ClassLoader) extends ExtendedActorSystem { +private[akka] class ActorSystemImpl(val name: String, applicationConfig: Config, classLoader: ClassLoader, defaultExecutionContext: Option[ExecutionContext]) extends ExtendedActorSystem { if (!name.matches("""^[a-zA-Z0-9][a-zA-Z0-9-]*$""")) throw new IllegalArgumentException( @@ -552,7 +583,7 @@ private[akka] class ActorSystemImpl(val name: String, applicationConfig: Config, val mailboxes: Mailboxes = new Mailboxes(settings, eventStream, dynamicAccess, deadLetters) val dispatchers: Dispatchers = new Dispatchers(settings, DefaultDispatcherPrerequisites( - threadFactory, eventStream, scheduler, dynamicAccess, settings, mailboxes)) + threadFactory, eventStream, scheduler, dynamicAccess, settings, mailboxes, defaultExecutionContext)) val dispatcher: ExecutionContextExecutor = dispatchers.defaultGlobalDispatcher diff --git a/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala index 9ee9cfca8f..182f577e5e 100644 --- a/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala @@ -5,11 +5,11 @@ package akka.dispatch import java.util.concurrent._ -import akka.event.Logging.{ Error, LogEventException } +import akka.event.Logging.{ Debug, Error, LogEventException } import akka.actor._ import akka.dispatch.sysmsg._ -import akka.event.EventStream -import com.typesafe.config.Config +import akka.event.{ BusLogging, EventStream } +import com.typesafe.config.{ ConfigFactory, Config } import akka.util.{ Unsafe, Index } import scala.annotation.tailrec import scala.concurrent.forkjoin.{ ForkJoinTask, ForkJoinPool } @@ -19,6 +19,7 @@ import scala.concurrent.ExecutionContextExecutor import scala.concurrent.duration.FiniteDuration import scala.util.control.NonFatal import scala.util.Try +import java.{ util ⇒ ju } final case class Envelope private (val message: Any, val sender: ActorRef) @@ -318,7 +319,7 @@ abstract class MessageDispatcherConfigurator(_config: Config, val prerequisites: def dispatcher(): MessageDispatcher def configureExecutor(): ExecutorServiceConfigurator = { - config.getString("executor") match { + def configurator(executor: String): ExecutorServiceConfigurator = executor match { case null | "" | "fork-join-executor" ⇒ new ForkJoinExecutorConfigurator(config.getConfig("fork-join-executor"), prerequisites) case "thread-pool-executor" ⇒ new ThreadPoolExecutorConfigurator(config.getConfig("thread-pool-executor"), prerequisites) case fqcn ⇒ @@ -332,6 +333,11 @@ abstract class MessageDispatcherConfigurator(_config: Config, val prerequisites: .format(fqcn, config.getString("id"), classOf[Config], classOf[DispatcherPrerequisites]), exception) }).get } + + config.getString("executor") match { + case "default-executor" ⇒ new DefaultExecutorServiceConfigurator(config.getConfig("default-executor"), prerequisites, configurator(config.getString("default-executor.fallback"))) + case other ⇒ configurator(other) + } } } @@ -423,3 +429,26 @@ class ForkJoinExecutorConfigurator(config: Config, prerequisites: DispatcherPrer config.getInt("parallelism-max"))) } } + +class DefaultExecutorServiceConfigurator(config: Config, prerequisites: DispatcherPrerequisites, fallback: ExecutorServiceConfigurator) extends ExecutorServiceConfigurator(config, prerequisites) { + val provider: ExecutorServiceFactoryProvider = + prerequisites.defaultExecutionContext match { + case Some(ec) ⇒ + prerequisites.eventStream.publish(Debug("DefaultExecutorServiceConfigurator", this.getClass, s"Using passed in ExecutionContext as default executor for this ActorSystem. If you want to use a different executor, please specify one in akka.actor.default-dispatcher.default-executor.")) + + new AbstractExecutorService with ExecutorServiceFactory with ExecutorServiceFactoryProvider { + def createExecutorServiceFactory(id: String, threadFactory: ThreadFactory): ExecutorServiceFactory = this + def createExecutorService: ExecutorService = this + def shutdown(): Unit = () + def isTerminated: Boolean = false + def awaitTermination(timeout: Long, unit: TimeUnit): Boolean = false + def shutdownNow(): ju.List[Runnable] = ju.Collections.emptyList() + def execute(command: Runnable): Unit = ec.execute(command) + def isShutdown: Boolean = false + } + case None ⇒ fallback + } + + def createExecutorServiceFactory(id: String, threadFactory: ThreadFactory): ExecutorServiceFactory = + provider.createExecutorServiceFactory(id, threadFactory) +} diff --git a/akka-actor/src/main/scala/akka/dispatch/Dispatchers.scala b/akka-actor/src/main/scala/akka/dispatch/Dispatchers.scala index 3d98a29d11..a8f69a32a6 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Dispatchers.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Dispatchers.scala @@ -13,6 +13,7 @@ import scala.concurrent.duration.Duration import akka.ConfigurationException import akka.actor.Deploy import akka.util.Helpers.ConfigOps +import scala.concurrent.ExecutionContext /** * DispatcherPrerequisites represents useful contextual pieces when constructing a MessageDispatcher @@ -24,6 +25,7 @@ trait DispatcherPrerequisites { def dynamicAccess: DynamicAccess def settings: ActorSystem.Settings def mailboxes: Mailboxes + def defaultExecutionContext: Option[ExecutionContext] } /** @@ -35,7 +37,8 @@ private[akka] case class DefaultDispatcherPrerequisites( val scheduler: Scheduler, val dynamicAccess: DynamicAccess, val settings: ActorSystem.Settings, - val mailboxes: Mailboxes) extends DispatcherPrerequisites + val mailboxes: Mailboxes, + val defaultExecutionContext: Option[ExecutionContext]) extends DispatcherPrerequisites object Dispatchers { /** diff --git a/akka-docs/rst/java/dispatchers.rst b/akka-docs/rst/java/dispatchers.rst index e7b99b22be..6e16647824 100644 --- a/akka-docs/rst/java/dispatchers.rst +++ b/akka-docs/rst/java/dispatchers.rst @@ -11,7 +11,11 @@ Default dispatcher ------------------ Every ``ActorSystem`` will have a default dispatcher that will be used in case nothing else is configured for an ``Actor``. -The default dispatcher can be configured, and is by default a ``Dispatcher`` with a "fork-join-executor", which gives excellent performance in most cases. +The default dispatcher can be configured, and is by default a ``Dispatcher`` with the specified ``default-executor``. +If an ActorSystem is created with an ExecutionContext passed in, this ExecutionContext will be used as the default executor for all +dispatchers in this ActorSystem. If no ExecutionContext is given, it will fallback to the executor specified in +``akka.actor.default-dispatcher.default-executor.fallback``. By default this is a "fork-join-executor", which +gives excellent performance in most cases. .. _dispatcher-lookup-java: diff --git a/akka-docs/rst/scala/dispatchers.rst b/akka-docs/rst/scala/dispatchers.rst index 89c403af03..14b0eec1af 100644 --- a/akka-docs/rst/scala/dispatchers.rst +++ b/akka-docs/rst/scala/dispatchers.rst @@ -11,7 +11,11 @@ Default dispatcher ------------------ Every ``ActorSystem`` will have a default dispatcher that will be used in case nothing else is configured for an ``Actor``. -The default dispatcher can be configured, and is by default a ``Dispatcher`` with a "fork-join-executor", which gives excellent performance in most cases. +The default dispatcher can be configured, and is by default a ``Dispatcher`` with the specified ``default-executor``. +If an ActorSystem is created with an ExecutionContext passed in, this ExecutionContext will be used as the default executor for all +dispatchers in this ActorSystem. If no ExecutionContext is given, it will fallback to the executor specified in +``akka.actor.default-dispatcher.default-executor.fallback``. By default this is a "fork-join-executor", which +gives excellent performance in most cases. .. _dispatcher-lookup-scala: