Internal dispatcher to protect against starvation (#26816)

* Allow for dispatcher aliases and define a internal dispatcher
* Test checking dispatcher name
* MiMa for Dispatchers
* Migration guide entry
* No need to have custom dispatcher lookup logic in streams anymore
* Default dispatcher size and migration note about that
* Test checking exact config values...
* Typed receptionist on internal dispatcher
* All internal usages of system.dispatcher gone through
This commit is contained in:
Johan Andrén 2019-05-02 22:35:25 +02:00 committed by Patrik Nordwall
parent e34a711adf
commit 81b1e2ef9b
57 changed files with 524 additions and 329 deletions

View file

@ -0,0 +1,167 @@
/*
* Copyright (C) 2009-2019 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.actor
import akka.ConfigurationException
import akka.actor.setup.ActorSystemSetup
import akka.dispatch.{ Dispatchers, ExecutionContexts }
import akka.testkit.{ AkkaSpec, ImplicitSender, TestActors, TestProbe }
import com.typesafe.config.ConfigFactory
import scala.concurrent.ExecutionContext
import scala.concurrent.duration._
object ActorSystemDispatchersSpec {
class SnitchingExecutionContext(testActor: ActorRef, underlying: ExecutionContext) extends ExecutionContext {
def execute(runnable: Runnable): Unit = {
testActor ! "called"
underlying.execute(runnable)
}
def reportFailure(t: Throwable): Unit = {
testActor ! "failed"
underlying.reportFailure(t)
}
}
}
class ActorSystemDispatchersSpec extends AkkaSpec(ConfigFactory.parseString("""
dispatcher-loop-1 = "dispatcher-loop-2"
dispatcher-loop-2 = "dispatcher-loop-1"
""")) with ImplicitSender {
import ActorSystemDispatchersSpec._
"The ActorSystem" must {
"work with a passed in ExecutionContext" in {
val ecProbe = TestProbe()
val ec = new SnitchingExecutionContext(ecProbe.ref, ExecutionContexts.global())
val system2 = ActorSystem(name = "ActorSystemDispatchersSpec-passed-in-ec", defaultExecutionContext = Some(ec))
try {
val ref = system2.actorOf(Props(new Actor {
def receive = {
case "ping" => sender() ! "pong"
}
}))
val probe = TestProbe()
ref.tell("ping", probe.ref)
ecProbe.expectMsg(1.second, "called")
probe.expectMsg(1.second, "pong")
} finally {
shutdown(system2)
}
}
"not use passed in ExecutionContext if executor is configured" in {
val ecProbe = TestProbe()
val ec = new SnitchingExecutionContext(ecProbe.ref, ExecutionContexts.global())
val config = ConfigFactory.parseString("akka.actor.default-dispatcher.executor = \"fork-join-executor\"")
val system2 = ActorSystem(
name = "ActorSystemDispatchersSpec-ec-configured",
config = Some(config),
defaultExecutionContext = Some(ec))
try {
val ref = system2.actorOf(TestActors.echoActorProps)
val probe = TestProbe()
ref.tell("ping", probe.ref)
ecProbe.expectNoMessage(200.millis)
probe.expectMsg(1.second, "ping")
} finally {
shutdown(system2)
}
}
def userGuardianDispatcher(system: ActorSystem): String = {
val impl = system.asInstanceOf[ActorSystemImpl]
impl.guardian.asInstanceOf[ActorRefWithCell].underlying.asInstanceOf[ActorCell].dispatcher.id
}
"provide a single place to override the internal dispatcher" in {
val sys = ActorSystem(
"ActorSystemDispatchersSpec-override-internal-disp",
ConfigFactory.parseString("""
akka.actor.internal-dispatcher = akka.actor.default-dispatcher
"""))
try {
// that the user guardian runs on the overriden dispatcher instead of internal
// isn't really a guarantee any internal actor has been made running on the right one
// but it's better than no test coverage at all
userGuardianDispatcher(sys) should ===("akka.actor.default-dispatcher")
} finally {
shutdown(sys)
}
}
"provide internal execution context instance through BootstrapSetup" in {
val ecProbe = TestProbe()
val ec = new SnitchingExecutionContext(ecProbe.ref, ExecutionContexts.global())
// using the default for internal dispatcher and passing a pre-existing execution context
val system2 =
ActorSystem(
name = "ActorSystemDispatchersSpec-passed-in-ec-for-internal",
config = Some(ConfigFactory.parseString("""
akka.actor.internal-dispatcher = akka.actor.default-dispatcher
""")),
defaultExecutionContext = Some(ec))
try {
val ref = system2.actorOf(Props(new Actor {
def receive = {
case "ping" => sender() ! "pong"
}
}).withDispatcher(Dispatchers.InternalDispatcherId))
val probe = TestProbe()
ref.tell("ping", probe.ref)
ecProbe.expectMsg(1.second, "called")
probe.expectMsg(1.second, "pong")
} finally {
shutdown(system2)
}
}
"use an internal dispatcher for the guardian by default" in {
userGuardianDispatcher(system) should ===("akka.actor.internal-dispatcher")
}
"use the default dispatcher by a user provided user guardian" in {
val sys = new ActorSystemImpl(
"ActorSystemDispatchersSpec-custom-user-guardian",
ConfigFactory.defaultReference(),
getClass.getClassLoader,
None,
Some(Props.empty),
ActorSystemSetup.empty)
sys.start()
try {
userGuardianDispatcher(sys) should ===("akka.actor.default-dispatcher")
} finally shutdown(sys)
}
"provide a good error on an dispatcher alias loop in the config" in {
intercept[ConfigurationException] {
system.dispatchers.lookup("dispatcher-loop-1")
}
}
}
}

View file

@ -11,15 +11,14 @@ import akka.actor.setup.ActorSystemSetup
import akka.dispatch._ import akka.dispatch._
import akka.japi.Util.immutableSeq import akka.japi.Util.immutableSeq
import akka.pattern.ask import akka.pattern.ask
import akka.testkit._ import akka.testkit.{ TestKit, _ }
import akka.testkit.TestKit
import akka.util.Helpers.ConfigOps import akka.util.Helpers.ConfigOps
import akka.util.{ Switch, Timeout } import akka.util.{ Switch, Timeout }
import com.github.ghik.silencer.silent import com.github.ghik.silencer.silent
import com.typesafe.config.{ Config, ConfigFactory } import com.typesafe.config.{ Config, ConfigFactory }
import scala.concurrent.duration._ import scala.concurrent.duration._
import scala.concurrent.{ Await, ExecutionContext, Future } import scala.concurrent.{ Await, Future }
import scala.language.postfixOps import scala.language.postfixOps
import scala.util.Properties import scala.util.Properties
@ -107,19 +106,6 @@ object ActorSystemSpec {
override def dispatcher(): MessageDispatcher = instance override def dispatcher(): MessageDispatcher = instance
} }
class TestExecutionContext(testActor: ActorRef, underlying: ExecutionContext) extends ExecutionContext {
def execute(runnable: Runnable): Unit = {
testActor ! "called"
underlying.execute(runnable)
}
def reportFailure(t: Throwable): Unit = {
testActor ! "failed"
underlying.reportFailure(t)
}
}
val config = s""" val config = s"""
slow { slow {
type="${classOf[SlowDispatcher].getName}" type="${classOf[SlowDispatcher].getName}"
@ -372,50 +358,6 @@ class ActorSystemSpec extends AkkaSpec(ActorSystemSpec.config) with ImplicitSend
} }
} }
"work with a passed in ExecutionContext" in {
val ecProbe = TestProbe()
val ec = new ActorSystemSpec.TestExecutionContext(ecProbe.ref, ExecutionContexts.global())
val system2 = ActorSystem(name = "default", defaultExecutionContext = Some(ec))
try {
val ref = system2.actorOf(Props(new Actor {
def receive = {
case "ping" => sender() ! "pong"
}
}))
val probe = TestProbe()
ref.tell("ping", probe.ref)
ecProbe.expectMsg(1.second, "called")
probe.expectMsg(1.second, "pong")
} finally {
shutdown(system2)
}
}
"not use passed in ExecutionContext if executor is configured" in {
val ecProbe = TestProbe()
val ec = new ActorSystemSpec.TestExecutionContext(ecProbe.ref, ExecutionContexts.global())
val config = ConfigFactory.parseString("akka.actor.default-dispatcher.executor = \"fork-join-executor\"")
val system2 = ActorSystem(name = "default", config = Some(config), defaultExecutionContext = Some(ec))
try {
val ref = system2.actorOf(TestActors.echoActorProps)
val probe = TestProbe()
ref.tell("ping", probe.ref)
ecProbe.expectNoMessage(200.millis)
probe.expectMsg(1.second, "ping")
} finally {
shutdown(system2)
}
}
"not allow top-level actor creation with custom guardian" in { "not allow top-level actor creation with custom guardian" in {
val sys = new ActorSystemImpl( val sys = new ActorSystemImpl(
"custom", "custom",

View file

@ -94,7 +94,7 @@ class ConfigSpec extends AkkaSpec(ConfigFactory.defaultReference(ActorSystem.fin
{ {
val pool = c.getConfig("fork-join-executor") val pool = c.getConfig("fork-join-executor")
pool.getInt("parallelism-min") should ===(8) pool.getInt("parallelism-min") should ===(8)
pool.getDouble("parallelism-factor") should ===(3.0) pool.getDouble("parallelism-factor") should ===(1.0)
pool.getInt("parallelism-max") should ===(64) pool.getInt("parallelism-max") should ===(64)
pool.getString("task-peeking-mode") should be("FIFO") pool.getString("task-peeking-mode") should be("FIFO")
} }

View file

@ -23,7 +23,9 @@ class DispatcherShutdownSpec extends WordSpec with Matchers {
.dumpAllThreads(false, false) .dumpAllThreads(false, false)
.toList .toList
.map(_.getThreadName) .map(_.getThreadName)
.filter(_.startsWith("DispatcherShutdownSpec-akka.actor.default")) .filter(name =>
name.startsWith("DispatcherShutdownSpec-akka.actor.default") || name.startsWith(
"DispatcherShutdownSpec-akka.actor.internal")) // nothing is run on default without any user actors started
.size .size
val system = ActorSystem("DispatcherShutdownSpec") val system = ActorSystem("DispatcherShutdownSpec")

View file

@ -4,6 +4,8 @@
package akka.actor.typed package akka.actor.typed
import akka.annotation.InternalApi
import scala.concurrent.ExecutionContextExecutor import scala.concurrent.ExecutionContextExecutor
object Dispatchers { object Dispatchers {
@ -13,6 +15,11 @@ object Dispatchers {
* configuration of the default dispatcher. * configuration of the default dispatcher.
*/ */
final val DefaultDispatcherId = "akka.actor.default-dispatcher" final val DefaultDispatcherId = "akka.actor.default-dispatcher"
/**
* INTERNAL API
*/
@InternalApi final val InternalDispatcherId = "akka.actor.internal-dispatcher"
} }
/** /**

View file

@ -4,14 +4,12 @@
package akka.actor.typed.receptionist package akka.actor.typed.receptionist
import akka.actor.typed.{ ActorRef, ActorSystem, Extension, ExtensionId } import akka.actor.typed.{ ActorRef, ActorSystem, Dispatchers, Extension, ExtensionId, ExtensionSetup, Props }
import akka.actor.typed.internal.receptionist._ import akka.actor.typed.internal.receptionist._
import akka.annotation.DoNotInherit import akka.annotation.DoNotInherit
import scala.collection.JavaConverters._ import scala.collection.JavaConverters._
import scala.reflect.ClassTag import scala.reflect.ClassTag
import akka.actor.typed.ExtensionSetup
import akka.actor.typed.Props
import akka.annotation.InternalApi import akka.annotation.InternalApi
/** /**
@ -51,7 +49,10 @@ abstract class Receptionist extends Extension {
} else LocalReceptionist } else LocalReceptionist
import akka.actor.typed.scaladsl.adapter._ import akka.actor.typed.scaladsl.adapter._
system.internalSystemActorOf(provider.behavior, "receptionist", Props.empty) system.internalSystemActorOf(
provider.behavior,
"receptionist",
Props.empty.withDispatcherFromConfig(Dispatchers.InternalDispatcherId))
} }
} }

View file

@ -1,3 +1,5 @@
# excludes for 2.6
ProblemFilters.exclude[MissingClassProblem]("akka.actor.Inbox$") ProblemFilters.exclude[MissingClassProblem]("akka.actor.Inbox$")
ProblemFilters.exclude[MissingClassProblem]("akka.actor.Inbox") ProblemFilters.exclude[MissingClassProblem]("akka.actor.Inbox")
ProblemFilters.exclude[MissingClassProblem]("akka.actor.ActorDSL$") ProblemFilters.exclude[MissingClassProblem]("akka.actor.ActorDSL$")
@ -13,3 +15,6 @@ ProblemFilters.exclude[DirectMissingMethodProblem]("akka.actor.ActorRefFactory.a
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.actor.ActorSystem.actorFor") ProblemFilters.exclude[DirectMissingMethodProblem]("akka.actor.ActorSystem.actorFor")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.actor.ChildActorPath.this") ProblemFilters.exclude[DirectMissingMethodProblem]("akka.actor.ChildActorPath.this")
ProblemFilters.exclude[MissingClassProblem]("akka.actor.dungeon.UndefinedUidActorRef") ProblemFilters.exclude[MissingClassProblem]("akka.actor.dungeon.UndefinedUidActorRef")
# Protect internals against starvation #23576
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.dispatch.Dispatchers.this")

View file

@ -451,7 +451,7 @@ akka {
# The parallelism factor is used to determine thread pool size using the # The parallelism factor is used to determine thread pool size using the
# following formula: ceil(available processors * factor). Resulting size # following formula: ceil(available processors * factor). Resulting size
# is then bounded by the parallelism-min and parallelism-max values. # is then bounded by the parallelism-min and parallelism-max values.
parallelism-factor = 3.0 parallelism-factor = 1.0
# Max number of threads to cap factor-based parallelism number to # Max number of threads to cap factor-based parallelism number to
parallelism-max = 64 parallelism-max = 64
@ -533,6 +533,20 @@ akka {
mailbox-requirement = "" mailbox-requirement = ""
} }
# Default separate internal dispatcher to run Akka internal tasks and actors on
# protecting them against starvation because of accidental blocking in user actors (which run on the
# default dispatcher)
internal-dispatcher {
type = "Dispatcher"
executor = "fork-join-executor"
throughput = 5
fork-join-executor {
parallelism-min = 4
parallelism-factor = 1.0
parallelism-max = 64
}
}
default-blocking-io-dispatcher { default-blocking-io-dispatcher {
type = "Dispatcher" type = "Dispatcher"
executor = "thread-pool-executor" executor = "thread-pool-executor"
@ -855,11 +869,11 @@ akka {
# Fully qualified config path which holds the dispatcher configuration # Fully qualified config path which holds the dispatcher configuration
# for the read/write worker actors # for the read/write worker actors
worker-dispatcher = "akka.actor.default-dispatcher" worker-dispatcher = "akka.actor.internal-dispatcher"
# Fully qualified config path which holds the dispatcher configuration # Fully qualified config path which holds the dispatcher configuration
# for the selector management actors # for the selector management actors
management-dispatcher = "akka.actor.default-dispatcher" management-dispatcher = "akka.actor.internal-dispatcher"
# Fully qualified config path which holds the dispatcher configuration # Fully qualified config path which holds the dispatcher configuration
# on which file IO tasks are scheduled # on which file IO tasks are scheduled
@ -937,11 +951,11 @@ akka {
# Fully qualified config path which holds the dispatcher configuration # Fully qualified config path which holds the dispatcher configuration
# for the read/write worker actors # for the read/write worker actors
worker-dispatcher = "akka.actor.default-dispatcher" worker-dispatcher = "akka.actor.internal-dispatcher"
# Fully qualified config path which holds the dispatcher configuration # Fully qualified config path which holds the dispatcher configuration
# for the selector management actors # for the selector management actors
management-dispatcher = "akka.actor.default-dispatcher" management-dispatcher = "akka.actor.internal-dispatcher"
} }
udp-connected { udp-connected {
@ -993,18 +1007,18 @@ akka {
# Fully qualified config path which holds the dispatcher configuration # Fully qualified config path which holds the dispatcher configuration
# for the read/write worker actors # for the read/write worker actors
worker-dispatcher = "akka.actor.default-dispatcher" worker-dispatcher = "akka.actor.internal-dispatcher"
# Fully qualified config path which holds the dispatcher configuration # Fully qualified config path which holds the dispatcher configuration
# for the selector management actors # for the selector management actors
management-dispatcher = "akka.actor.default-dispatcher" management-dispatcher = "akka.actor.internal-dispatcher"
} }
dns { dns {
# Fully qualified config path which holds the dispatcher configuration # Fully qualified config path which holds the dispatcher configuration
# for the manager and resolver router actors. # for the manager and resolver router actors.
# For actual router configuration see akka.actor.deployment./IO-DNS/* # For actual router configuration see akka.actor.deployment./IO-DNS/*
dispatcher = "akka.actor.default-dispatcher" dispatcher = "akka.actor.internal-dispatcher"
# Name of the subconfig at path akka.io.dns, see inet-address below # Name of the subconfig at path akka.io.dns, see inet-address below
# #

View file

@ -5,21 +5,20 @@
package akka.actor package akka.actor
import akka.dispatch.sysmsg._ import akka.dispatch.sysmsg._
import akka.dispatch.{ RequiresMessageQueue, UnboundedMessageQueueSemantics } import akka.dispatch.{ Mailboxes, RequiresMessageQueue, UnboundedMessageQueueSemantics }
import akka.routing._ import akka.routing._
import akka.event._ import akka.event._
import akka.util.Helpers import akka.util.Helpers
import akka.util.Collections.EmptyImmutableSeq import akka.util.Collections.EmptyImmutableSeq
import scala.util.control.NonFatal import scala.util.control.NonFatal
import java.util.concurrent.atomic.AtomicLong import java.util.concurrent.atomic.AtomicLong
import scala.concurrent.{ ExecutionContextExecutor, Future, Promise } import scala.concurrent.{ ExecutionContextExecutor, Future, Promise }
import scala.annotation.implicitNotFound import scala.annotation.implicitNotFound
import akka.ConfigurationException import akka.ConfigurationException
import akka.annotation.DoNotInherit import akka.annotation.DoNotInherit
import akka.annotation.InternalApi import akka.annotation.InternalApi
import akka.dispatch.Mailboxes
import akka.serialization.Serialization import akka.serialization.Serialization
import akka.util.OptionVal import akka.util.OptionVal
@ -484,7 +483,7 @@ private[akka] class LocalActorRefProvider private[akka] (
*/ */
protected def systemGuardianStrategy: SupervisorStrategy = SupervisorStrategy.defaultStrategy protected def systemGuardianStrategy: SupervisorStrategy = SupervisorStrategy.defaultStrategy
private lazy val defaultDispatcher = system.dispatchers.defaultGlobalDispatcher private def internalDispatcher = system.dispatchers.internalDispatcher
private lazy val defaultMailbox = system.mailboxes.lookup(Mailboxes.DefaultMailboxId) private lazy val defaultMailbox = system.mailboxes.lookup(Mailboxes.DefaultMailboxId)
@ -492,7 +491,7 @@ private[akka] class LocalActorRefProvider private[akka] (
new LocalActorRef( new LocalActorRef(
system, system,
Props(classOf[LocalActorRefProvider.Guardian], rootGuardianStrategy), Props(classOf[LocalActorRefProvider.Guardian], rootGuardianStrategy),
defaultDispatcher, internalDispatcher,
defaultMailbox, defaultMailbox,
theOneWhoWalksTheBubblesOfSpaceTime, theOneWhoWalksTheBubblesOfSpaceTime,
rootPath) { rootPath) {
@ -511,10 +510,16 @@ private[akka] class LocalActorRefProvider private[akka] (
override lazy val guardian: LocalActorRef = { override lazy val guardian: LocalActorRef = {
val cell = rootGuardian.underlying val cell = rootGuardian.underlying
cell.reserveChild("user") cell.reserveChild("user")
// make user provided guardians not run on internal dispatcher
val dispatcher =
system.guardianProps match {
case None => internalDispatcher
case Some(props) => system.dispatchers.lookup(props.dispatcher)
}
val ref = new LocalActorRef( val ref = new LocalActorRef(
system, system,
system.guardianProps.getOrElse(Props(classOf[LocalActorRefProvider.Guardian], guardianStrategy)), system.guardianProps.getOrElse(Props(classOf[LocalActorRefProvider.Guardian], guardianStrategy)),
defaultDispatcher, dispatcher,
defaultMailbox, defaultMailbox,
rootGuardian, rootGuardian,
rootPath / "user") rootPath / "user")
@ -529,7 +534,7 @@ private[akka] class LocalActorRefProvider private[akka] (
val ref = new LocalActorRef( val ref = new LocalActorRef(
system, system,
Props(classOf[LocalActorRefProvider.SystemGuardian], systemGuardianStrategy, guardian), Props(classOf[LocalActorRefProvider.SystemGuardian], systemGuardianStrategy, guardian),
defaultDispatcher, internalDispatcher,
defaultMailbox, defaultMailbox,
rootGuardian, rootGuardian,
rootPath / "system") rootPath / "system")

View file

@ -319,7 +319,12 @@ object ActorSystem {
*/ */
final val config: Config = { final val config: Config = {
val config = cfg.withFallback(ConfigFactory.defaultReference(classLoader)) val config = cfg.withFallback(ConfigFactory.defaultReference(classLoader))
config.checkValid(ConfigFactory.defaultReference(classLoader), "akka")
config.checkValid(
ConfigFactory
.defaultReference(classLoader)
.withoutPath(Dispatchers.InternalDispatcherId), // allow this to be both string and config object
"akka")
config config
} }
@ -840,7 +845,8 @@ private[akka] class ActorSystemImpl(
dynamicAccess, dynamicAccess,
settings, settings,
mailboxes, mailboxes,
defaultExecutionContext)) defaultExecutionContext),
log)
val dispatcher: ExecutionContextExecutor = dispatchers.defaultGlobalDispatcher val dispatcher: ExecutionContextExecutor = dispatchers.defaultGlobalDispatcher

View file

@ -454,7 +454,7 @@ final class CoordinatedShutdown private[akka] (
*/ */
def run(reason: Reason, fromPhase: Option[String]): Future[Done] = { def run(reason: Reason, fromPhase: Option[String]): Future[Done] = {
if (runStarted.compareAndSet(None, Some(reason))) { if (runStarted.compareAndSet(None, Some(reason))) {
import system.dispatcher implicit val ec = system.dispatchers.internalDispatcher
val debugEnabled = log.isDebugEnabled val debugEnabled = log.isDebugEnabled
def loop(remainingPhases: List[String]): Future[Done] = { def loop(remainingPhases: List[String]): Future[Done] = {
remainingPhases match { remainingPhases match {

View file

@ -6,11 +6,12 @@ package akka.dispatch
import java.util.concurrent.{ ConcurrentHashMap, ThreadFactory } import java.util.concurrent.{ ConcurrentHashMap, ThreadFactory }
import com.typesafe.config.{ Config, ConfigFactory } import com.typesafe.config.{ Config, ConfigFactory, ConfigValueType }
import akka.actor.{ ActorSystem, DynamicAccess, Scheduler } import akka.actor.{ ActorSystem, DynamicAccess, Scheduler }
import akka.event.Logging.Warning import akka.event.Logging.Warning
import akka.event.EventStream import akka.event.{ EventStream, LoggingAdapter }
import akka.ConfigurationException import akka.ConfigurationException
import akka.annotation.{ DoNotInherit, InternalApi }
import akka.util.Helpers.ConfigOps import akka.util.Helpers.ConfigOps
import com.github.ghik.silencer.silent import com.github.ghik.silencer.silent
@ -32,6 +33,7 @@ trait DispatcherPrerequisites {
/** /**
* INTERNAL API * INTERNAL API
*/ */
@InternalApi
private[akka] final case class DefaultDispatcherPrerequisites( private[akka] final case class DefaultDispatcherPrerequisites(
threadFactory: ThreadFactory, threadFactory: ThreadFactory,
eventStream: EventStream, eventStream: EventStream,
@ -49,6 +51,21 @@ object Dispatchers {
* configuration of the default dispatcher. * configuration of the default dispatcher.
*/ */
final val DefaultDispatcherId = "akka.actor.default-dispatcher" final val DefaultDispatcherId = "akka.actor.default-dispatcher"
/**
* The id of a default dispatcher to use for operations known to be blocking. Note that
* for optimal performance you will want to isolate different blocking resources
* on different thread pools.
*/
final val DefaultBlockingDispatcherId: String = "akka.actor.default-blocking-io-dispatcher"
/**
* INTERNAL API
*/
@InternalApi
private[akka] final val InternalDispatcherId = "akka.actor.internal-dispatcher"
private val MaxDispatcherAliasDepth = 20
} }
/** /**
@ -56,10 +73,19 @@ object Dispatchers {
* for different environments. Use the `lookup` method to create * for different environments. Use the `lookup` method to create
* a dispatcher as specified in configuration. * a dispatcher as specified in configuration.
* *
* A dispatcher config can also be an alias, in that case it is a config string value pointing
* to the actual dispatcher config.
*
* Look in `akka.actor.default-dispatcher` section of the reference.conf * Look in `akka.actor.default-dispatcher` section of the reference.conf
* for documentation of dispatcher options. * for documentation of dispatcher options.
*
* Not for user instantiation or extension
*/ */
class Dispatchers(val settings: ActorSystem.Settings, val prerequisites: DispatcherPrerequisites) { @DoNotInherit
class Dispatchers @InternalApi private[akka] (
val settings: ActorSystem.Settings,
val prerequisites: DispatcherPrerequisites,
logger: LoggingAdapter) {
import Dispatchers._ import Dispatchers._
@ -75,13 +101,23 @@ class Dispatchers(val settings: ActorSystem.Settings, val prerequisites: Dispatc
private val dispatcherConfigurators = new ConcurrentHashMap[String, MessageDispatcherConfigurator] private val dispatcherConfigurators = new ConcurrentHashMap[String, MessageDispatcherConfigurator]
/**
* INTERNAL API
*/
private[akka] val internalDispatcher = lookup(Dispatchers.InternalDispatcherId)
/** /**
* Returns a dispatcher as specified in configuration. Please note that this * Returns a dispatcher as specified in configuration. Please note that this
* method _may_ create and return a NEW dispatcher, _every_ call. * method _may_ create and return a NEW dispatcher, _every_ call (depending on the `MessageDispatcherConfigurator` /
* dispatcher config the id points to).
*
* A dispatcher id can also be an alias. In the case it is a string value in the config it is treated as the id
* of the actual dispatcher config to use. If several ids leading to the same actual dispatcher config is used only one
* instance is created. This means that for dispatchers you expect to be shared they will be.
* *
* Throws ConfigurationException if the specified dispatcher cannot be found in the configuration. * Throws ConfigurationException if the specified dispatcher cannot be found in the configuration.
*/ */
def lookup(id: String): MessageDispatcher = lookupConfigurator(id).dispatcher() def lookup(id: String): MessageDispatcher = lookupConfigurator(id, 0).dispatcher()
/** /**
* Checks that the configuration provides a section for the given dispatcher. * Checks that the configuration provides a section for the given dispatcher.
@ -91,15 +127,37 @@ class Dispatchers(val settings: ActorSystem.Settings, val prerequisites: Dispatc
*/ */
def hasDispatcher(id: String): Boolean = dispatcherConfigurators.containsKey(id) || cachingConfig.hasPath(id) def hasDispatcher(id: String): Boolean = dispatcherConfigurators.containsKey(id) || cachingConfig.hasPath(id)
private def lookupConfigurator(id: String): MessageDispatcherConfigurator = { private def lookupConfigurator(id: String, depth: Int): MessageDispatcherConfigurator = {
if (depth > MaxDispatcherAliasDepth)
throw new ConfigurationException(
s"Didn't find a concrete dispatcher config after following $MaxDispatcherAliasDepth, " +
s"is there a loop in your config? last looked for id was $id")
dispatcherConfigurators.get(id) match { dispatcherConfigurators.get(id) match {
case null => case null =>
// It doesn't matter if we create a dispatcher configurator that isn't used due to concurrent lookup. // It doesn't matter if we create a dispatcher configurator that isn't used due to concurrent lookup.
// That shouldn't happen often and in case it does the actual ExecutorService isn't // That shouldn't happen often and in case it does the actual ExecutorService isn't
// created until used, i.e. cheap. // created until used, i.e. cheap.
val newConfigurator =
if (cachingConfig.hasPath(id)) configuratorFrom(config(id)) val newConfigurator: MessageDispatcherConfigurator =
else throw new ConfigurationException(s"Dispatcher [$id] not configured") if (cachingConfig.hasPath(id)) {
val valueAtPath = cachingConfig.getValue(id)
valueAtPath.valueType() match {
case ConfigValueType.STRING =>
// a dispatcher key can be an alias of another dispatcher, if it is a string
// we treat that string value as the id of a dispatcher to lookup, it will be stored
// both under the actual id and the alias id in the 'dispatcherConfigurators' cache
val actualId = valueAtPath.unwrapped().asInstanceOf[String]
logger.debug("Dispatcher id [{}] is an alias, actual dispatcher will be [{}]", id, actualId)
lookupConfigurator(actualId, depth + 1)
case ConfigValueType.OBJECT =>
configuratorFrom(config(id))
case unexpected =>
throw new ConfigurationException(
s"Expected either a dispatcher config or an alias at [$id] but found [$unexpected]")
}
} else throw new ConfigurationException(s"Dispatcher [$id] not configured")
dispatcherConfigurators.putIfAbsent(id, newConfigurator) match { dispatcherConfigurators.putIfAbsent(id, newConfigurator) match {
case null => newConfigurator case null => newConfigurator

View file

@ -22,8 +22,7 @@
# Provides periodic statistics collection and publication throughout the cluster. # Provides periodic statistics collection and publication throughout the cluster.
akka.cluster.metrics { akka.cluster.metrics {
# Full path of dispatcher configuration key. # Full path of dispatcher configuration key.
# Use "" for default key `akka.actor.default-dispatcher`. dispatcher = "akka.actor.default-dispatcher"
dispatcher = ""
# How long should any actor wait before starting the periodic tasks. # How long should any actor wait before starting the periodic tasks.
periodic-tasks-initial-delay = 1s periodic-tasks-initial-delay = 1s
# Sigar native library extract location. # Sigar native library extract location.

View file

@ -5,7 +5,6 @@
package akka.cluster.metrics package akka.cluster.metrics
import com.typesafe.config.Config import com.typesafe.config.Config
import akka.dispatch.Dispatchers
import scala.concurrent.duration.FiniteDuration import scala.concurrent.duration.FiniteDuration
import akka.util.Helpers.Requiring import akka.util.Helpers.Requiring
import akka.util.Helpers.ConfigOps import akka.util.Helpers.ConfigOps
@ -19,10 +18,7 @@ case class ClusterMetricsSettings(config: Config) {
private val cc = config.getConfig("akka.cluster.metrics") private val cc = config.getConfig("akka.cluster.metrics")
// Extension. // Extension.
val MetricsDispatcher: String = cc.getString("dispatcher") match { val MetricsDispatcher: String = cc.getString("dispatcher")
case "" => Dispatchers.DefaultDispatcherId
case id => id
}
val PeriodicTasksInitialDelay: FiniteDuration = cc.getMillisDuration("periodic-tasks-initial-delay") val PeriodicTasksInitialDelay: FiniteDuration = cc.getMillisDuration("periodic-tasks-initial-delay")
val NativeLibraryExtractFolder: String = cc.getString("native-library-extract-folder") val NativeLibraryExtractFolder: String = cc.getString("native-library-extract-folder")

View file

@ -157,11 +157,10 @@ akka.cluster.sharding {
} }
# The id of the dispatcher to use for ClusterSharding actors. # The id of the dispatcher to use for ClusterSharding actors.
# If not specified default dispatcher is used.
# If specified you need to define the settings of the actual dispatcher. # If specified you need to define the settings of the actual dispatcher.
# This dispatcher for the entity actors is defined by the user provided # This dispatcher for the entity actors is defined by the user provided
# Props, i.e. this dispatcher is not used for the entity actors. # Props, i.e. this dispatcher is not used for the entity actors.
use-dispatcher = "" use-dispatcher = "akka.actor.internal-dispatcher"
# Config path of the lease that each shard must acquire before starting entity actors # Config path of the lease that each shard must acquire before starting entity actors
# default is no lease # default is no lease

View file

@ -31,7 +31,6 @@ import akka.cluster.ClusterSettings.DataCenter
import akka.cluster.ddata.Replicator import akka.cluster.ddata.Replicator
import akka.cluster.ddata.ReplicatorSettings import akka.cluster.ddata.ReplicatorSettings
import akka.cluster.singleton.ClusterSingletonManager import akka.cluster.singleton.ClusterSingletonManager
import akka.dispatch.Dispatchers
import akka.event.Logging import akka.event.Logging
import akka.pattern.BackoffOpts import akka.pattern.BackoffOpts
import akka.pattern.ask import akka.pattern.ask
@ -179,10 +178,7 @@ class ClusterSharding(system: ExtendedActorSystem) extends Extension {
private lazy val guardian: ActorRef = { private lazy val guardian: ActorRef = {
val guardianName: String = val guardianName: String =
system.settings.config.getString("akka.cluster.sharding.guardian-name") system.settings.config.getString("akka.cluster.sharding.guardian-name")
val dispatcher = system.settings.config.getString("akka.cluster.sharding.use-dispatcher") match { val dispatcher = system.settings.config.getString("akka.cluster.sharding.use-dispatcher")
case "" => Dispatchers.DefaultDispatcherId
case id => id
}
system.systemActorOf(Props[ClusterShardingGuardian].withDispatcher(dispatcher), guardianName) system.systemActorOf(Props[ClusterShardingGuardian].withDispatcher(dispatcher), guardianName)
} }

View file

@ -33,9 +33,8 @@ akka.cluster.pub-sub {
send-to-dead-letters-when-no-subscribers = on send-to-dead-letters-when-no-subscribers = on
# The id of the dispatcher to use for DistributedPubSubMediator actors. # The id of the dispatcher to use for DistributedPubSubMediator actors.
# If not specified default dispatcher is used.
# If specified you need to define the settings of the actual dispatcher. # If specified you need to define the settings of the actual dispatcher.
use-dispatcher = "" use-dispatcher = "akka.actor.internal-dispatcher"
} }
# //#pub-sub-ext-config # //#pub-sub-ext-config
@ -74,10 +73,9 @@ akka.cluster.client.receptionist {
# after this time of inactivity. # after this time of inactivity.
response-tunnel-receive-timeout = 30s response-tunnel-receive-timeout = 30s
# The id of the dispatcher to use for ClusterReceptionist actors. # The id of the dispatcher to use for ClusterReceptionist actors.
# If not specified default dispatcher is used.
# If specified you need to define the settings of the actual dispatcher. # If specified you need to define the settings of the actual dispatcher.
use-dispatcher = "" use-dispatcher = "akka.actor.internal-dispatcher"
# How often failure detection heartbeat messages should be received for # How often failure detection heartbeat messages should be received for
# each ClusterClient # each ClusterClient

View file

@ -36,7 +36,6 @@ import akka.routing.ConsistentHash
import akka.routing.MurmurHash import akka.routing.MurmurHash
import com.typesafe.config.Config import com.typesafe.config.Config
import akka.remote.DeadlineFailureDetector import akka.remote.DeadlineFailureDetector
import akka.dispatch.Dispatchers
import akka.util.MessageBuffer import akka.util.MessageBuffer
import akka.util.ccompat._ import akka.util.ccompat._
import scala.collection.immutable.{ HashMap, HashSet } import scala.collection.immutable.{ HashMap, HashSet }
@ -596,10 +595,7 @@ final class ClusterClientReceptionist(system: ExtendedActorSystem) extends Exten
system.deadLetters system.deadLetters
else { else {
val name = config.getString("name") val name = config.getString("name")
val dispatcher = config.getString("use-dispatcher") match { val dispatcher = config.getString("use-dispatcher")
case "" => Dispatchers.DefaultDispatcherId
case id => id
}
// important to use val mediator here to activate it outside of ClusterReceptionist constructor // important to use val mediator here to activate it outside of ClusterReceptionist constructor
val mediator = pubSubMediator val mediator = pubSubMediator
system.systemActorOf( system.systemActorOf(

View file

@ -29,7 +29,6 @@ import akka.routing.BroadcastRoutingLogic
import scala.collection.immutable.TreeMap import scala.collection.immutable.TreeMap
import com.typesafe.config.Config import com.typesafe.config.Config
import akka.dispatch.Dispatchers
object DistributedPubSubSettings { object DistributedPubSubSettings {
@ -930,10 +929,7 @@ class DistributedPubSub(system: ExtendedActorSystem) extends Extension {
system.deadLetters system.deadLetters
else { else {
val name = system.settings.config.getString("akka.cluster.pub-sub.name") val name = system.settings.config.getString("akka.cluster.pub-sub.name")
val dispatcher = system.settings.config.getString("akka.cluster.pub-sub.use-dispatcher") match { val dispatcher = system.settings.config.getString("akka.cluster.pub-sub.use-dispatcher")
case "" => Dispatchers.DefaultDispatcherId
case id => id
}
system.systemActorOf(DistributedPubSubMediator.props(settings).withDispatcher(dispatcher), name) system.systemActorOf(DistributedPubSubMediator.props(settings).withDispatcher(dispatcher), name)
} }
} }

View file

@ -34,6 +34,7 @@ import akka.coordination.lease.LeaseUsageSettings
import akka.pattern.ask import akka.pattern.ask
import akka.util.Timeout import akka.util.Timeout
import akka.coordination.lease.scaladsl.{ Lease, LeaseProvider } import akka.coordination.lease.scaladsl.{ Lease, LeaseProvider }
import akka.dispatch.Dispatchers
import com.github.ghik.silencer.silent import com.github.ghik.silencer.silent
import scala.util.control.NonFatal import scala.util.control.NonFatal
@ -163,7 +164,9 @@ object ClusterSingletonManager {
* Scala API: Factory method for `ClusterSingletonManager` [[akka.actor.Props]]. * Scala API: Factory method for `ClusterSingletonManager` [[akka.actor.Props]].
*/ */
def props(singletonProps: Props, terminationMessage: Any, settings: ClusterSingletonManagerSettings): Props = def props(singletonProps: Props, terminationMessage: Any, settings: ClusterSingletonManagerSettings): Props =
Props(new ClusterSingletonManager(singletonProps, terminationMessage, settings)).withDeploy(Deploy.local) Props(new ClusterSingletonManager(singletonProps, terminationMessage, settings))
.withDispatcher(Dispatchers.InternalDispatcherId)
.withDeploy(Deploy.local)
/** /**
* INTERNAL API * INTERNAL API

View file

@ -6,6 +6,7 @@ package akka.cluster.singleton
import akka.actor._ import akka.actor._
import akka.cluster.{ Cluster, Member, MemberStatus } import akka.cluster.{ Cluster, Member, MemberStatus }
import scala.collection.immutable import scala.collection.immutable
import akka.cluster.ClusterEvent._ import akka.cluster.ClusterEvent._
import akka.cluster.ClusterEvent.MemberRemoved import akka.cluster.ClusterEvent.MemberRemoved
@ -13,6 +14,7 @@ import akka.cluster.ClusterEvent.MemberUp
import akka.actor.RootActorPath import akka.actor.RootActorPath
import akka.cluster.ClusterEvent.CurrentClusterState import akka.cluster.ClusterEvent.CurrentClusterState
import akka.cluster.ClusterEvent.MemberExited import akka.cluster.ClusterEvent.MemberExited
import scala.concurrent.duration._ import scala.concurrent.duration._
import scala.language.postfixOps import scala.language.postfixOps
import com.typesafe.config.Config import com.typesafe.config.Config
@ -21,6 +23,7 @@ import akka.event.Logging
import akka.util.MessageBuffer import akka.util.MessageBuffer
import akka.cluster.ClusterSettings import akka.cluster.ClusterSettings
import akka.cluster.ClusterSettings.DataCenter import akka.cluster.ClusterSettings.DataCenter
import akka.dispatch.Dispatchers
object ClusterSingletonProxySettings { object ClusterSingletonProxySettings {
@ -127,7 +130,9 @@ object ClusterSingletonProxy {
* @param settings see [[ClusterSingletonProxySettings]] * @param settings see [[ClusterSingletonProxySettings]]
*/ */
def props(singletonManagerPath: String, settings: ClusterSingletonProxySettings): Props = def props(singletonManagerPath: String, settings: ClusterSingletonProxySettings): Props =
Props(new ClusterSingletonProxy(singletonManagerPath, settings)).withDeploy(Deploy.local) Props(new ClusterSingletonProxy(singletonManagerPath, settings))
.withDispatcher(Dispatchers.InternalDispatcherId)
.withDeploy(Deploy.local)
private case object TryToIdentifySingleton extends NoSerializationVerificationNeeded private case object TryToIdentifySingleton extends NoSerializationVerificationNeeded

View file

@ -4,12 +4,8 @@
package akka.cluster.ddata.typed.scaladsl package akka.cluster.ddata.typed.scaladsl
import akka.actor.typed.ActorSystem import akka.actor.typed.{ ActorRef, ActorSystem, Extension, ExtensionId, Props }
import akka.actor.typed.Extension
import akka.actor.typed.ExtensionId
import akka.actor.typed.ActorRef
import akka.actor.ExtendedActorSystem import akka.actor.ExtendedActorSystem
import akka.actor.typed.Props
import akka.cluster.{ ddata => dd } import akka.cluster.{ ddata => dd }
import akka.cluster.ddata.SelfUniqueAddress import akka.cluster.ddata.SelfUniqueAddress
@ -50,7 +46,10 @@ class DistributedData(system: ActorSystem[_]) extends Extension {
val underlyingReplicator = dd.DistributedData(untypedSystem).replicator val underlyingReplicator = dd.DistributedData(untypedSystem).replicator
val replicatorBehavior = Replicator.behavior(settings, underlyingReplicator) val replicatorBehavior = Replicator.behavior(settings, underlyingReplicator)
system.internalSystemActorOf(replicatorBehavior, ReplicatorSettings.name(system), Props.empty) system.internalSystemActorOf(
replicatorBehavior,
ReplicatorSettings.name(system),
Props.empty.withDispatcherFromConfig(settings.dispatcher))
} }
/** /**

View file

@ -148,10 +148,9 @@ akka {
# Disable with "off". # Disable with "off".
publish-stats-interval = off publish-stats-interval = off
# The id of the dispatcher to use for cluster actors. If not specified # The id of the dispatcher to use for cluster actors.
# default dispatcher is used.
# If specified you need to define the settings of the actual dispatcher. # If specified you need to define the settings of the actual dispatcher.
use-dispatcher = "" use-dispatcher = "akka.actor.internal-dispatcher"
# Gossip to random node with newer or older state information, if any with # Gossip to random node with newer or older state information, if any with
# this probability. Otherwise Gossip to any random live node. # this probability. Otherwise Gossip to any random live node.

View file

@ -12,7 +12,6 @@ import scala.concurrent.duration.Duration
import akka.actor.Address import akka.actor.Address
import akka.actor.AddressFromURIString import akka.actor.AddressFromURIString
import akka.annotation.InternalApi import akka.annotation.InternalApi
import akka.dispatch.Dispatchers
import akka.util.Helpers.{ toRootLowerCase, ConfigOps, Requiring } import akka.util.Helpers.{ toRootLowerCase, ConfigOps, Requiring }
import scala.concurrent.duration.FiniteDuration import scala.concurrent.duration.FiniteDuration
@ -179,10 +178,7 @@ final class ClusterSettings(val config: Config, val systemName: String) {
val RunCoordinatedShutdownWhenDown: Boolean = cc.getBoolean("run-coordinated-shutdown-when-down") val RunCoordinatedShutdownWhenDown: Boolean = cc.getBoolean("run-coordinated-shutdown-when-down")
val JmxEnabled: Boolean = cc.getBoolean("jmx.enabled") val JmxEnabled: Boolean = cc.getBoolean("jmx.enabled")
val JmxMultiMbeansInSameEnabled: Boolean = cc.getBoolean("jmx.multi-mbeans-in-same-jvm") val JmxMultiMbeansInSameEnabled: Boolean = cc.getBoolean("jmx.multi-mbeans-in-same-jvm")
val UseDispatcher: String = cc.getString("use-dispatcher") match { val UseDispatcher: String = cc.getString("use-dispatcher")
case "" => Dispatchers.DefaultDispatcherId
case id => id
}
val GossipDifferentViewProbability: Double = cc.getDouble("gossip-different-view-probability") val GossipDifferentViewProbability: Double = cc.getDouble("gossip-different-view-probability")
val ReduceGossipDifferentViewProbability: Int = cc.getInt("reduce-gossip-different-view-probability") val ReduceGossipDifferentViewProbability: Int = cc.getInt("reduce-gossip-different-view-probability")
val SchedulerTickDuration: FiniteDuration = cc.getMillisDuration("scheduler.tick-duration") val SchedulerTickDuration: FiniteDuration = cc.getMillisDuration("scheduler.tick-duration")

View file

@ -48,7 +48,7 @@ class ClusterConfigSpec extends AkkaSpec {
SelfDataCenter should ===("default") SelfDataCenter should ===("default")
Roles should ===(Set(ClusterSettings.DcRolePrefix + "default")) Roles should ===(Set(ClusterSettings.DcRolePrefix + "default"))
JmxEnabled should ===(true) JmxEnabled should ===(true)
UseDispatcher should ===(Dispatchers.DefaultDispatcherId) UseDispatcher should ===(Dispatchers.InternalDispatcherId)
GossipDifferentViewProbability should ===(0.8 +- 0.0001) GossipDifferentViewProbability should ===(0.8 +- 0.0001)
ReduceGossipDifferentViewProbability should ===(400) ReduceGossipDifferentViewProbability should ===(400)
SchedulerTickDuration should ===(33 millis) SchedulerTickDuration should ===(33 millis)

View file

@ -36,6 +36,6 @@ class LeaseProvider(system: ExtendedActorSystem) extends Extension {
*/ */
def getLease(leaseName: String, configPath: String, ownerName: String): Lease = { def getLease(leaseName: String, configPath: String, ownerName: String): Lease = {
val scalaLease = delegate.getLease(leaseName, configPath, ownerName) val scalaLease = delegate.getLease(leaseName, configPath, ownerName)
new LeaseAdapter(scalaLease)(system.dispatcher) new LeaseAdapter(scalaLease)(system.dispatchers.internalDispatcher)
} }
} }

View file

@ -55,7 +55,7 @@ private[akka] final class AggregateServiceDiscovery(system: ExtendedActorSystem)
val serviceDiscovery = Discovery(system) val serviceDiscovery = Discovery(system)
settings.discoveryMethods.map(mech => (mech, serviceDiscovery.loadServiceDiscovery(mech))) settings.discoveryMethods.map(mech => (mech, serviceDiscovery.loadServiceDiscovery(mech)))
} }
private implicit val ec = system.dispatcher private implicit val ec = system.dispatchers.internalDispatcher
/** /**
* Each discovery method is given the resolveTimeout rather than reducing it each time between methods. * Each discovery method is given the resolveTimeout rather than reducing it each time between methods.

View file

@ -84,7 +84,7 @@ private[akka] class DnsServiceDiscovery(system: ExtendedActorSystem) extends Ser
// (eventually visible) // (eventually visible)
private var asyncDnsCache: OptionVal[AsyncDnsCache] = OptionVal.None private var asyncDnsCache: OptionVal[AsyncDnsCache] = OptionVal.None
import system.dispatcher private implicit val ec = system.dispatchers.internalDispatcher
dns.ask(AsyncDnsManager.GetCache)(Timeout(30.seconds)).onComplete { dns.ask(AsyncDnsManager.GetCache)(Timeout(30.seconds)).onComplete {
case Success(cache: AsyncDnsCache) => case Success(cache: AsyncDnsCache) =>

View file

@ -26,10 +26,9 @@ akka.cluster.distributed-data {
# the replicas. Next chunk will be transferred in next round of gossip. # the replicas. Next chunk will be transferred in next round of gossip.
max-delta-elements = 1000 max-delta-elements = 1000
# The id of the dispatcher to use for Replicator actors. If not specified # The id of the dispatcher to use for Replicator actors.
# default dispatcher is used.
# If specified you need to define the settings of the actual dispatcher. # If specified you need to define the settings of the actual dispatcher.
use-dispatcher = "" use-dispatcher = "akka.actor.internal-dispatcher"
# How often the Replicator checks for pruning of data associated with # How often the Replicator checks for pruning of data associated with
# removed cluster nodes. If this is set to 'off' the pruning feature will # removed cluster nodes. If this is set to 'off' the pruning feature will

View file

@ -243,7 +243,7 @@ final class LmdbDurableStore(config: Config) extends Actor with ActorLogging {
dbPut(OptionVal.None, key, data) dbPut(OptionVal.None, key, data)
} else { } else {
if (pending.isEmpty) if (pending.isEmpty)
context.system.scheduler.scheduleOnce(writeBehindInterval, self, WriteBehind)(context.system.dispatcher) context.system.scheduler.scheduleOnce(writeBehindInterval, self, WriteBehind)(context.dispatcher)
pending.put(key, data) pending.put(key, data)
} }
reply match { reply match {

View file

@ -80,10 +80,7 @@ object ReplicatorSettings {
* the default configuration `akka.cluster.distributed-data`. * the default configuration `akka.cluster.distributed-data`.
*/ */
def apply(config: Config): ReplicatorSettings = { def apply(config: Config): ReplicatorSettings = {
val dispatcher = config.getString("use-dispatcher") match { val dispatcher = config.getString("use-dispatcher")
case "" => Dispatchers.DefaultDispatcherId
case id => id
}
val pruningInterval = toRootLowerCase(config.getString("pruning-interval")) match { val pruningInterval = toRootLowerCase(config.getString("pruning-interval")) match {
case "off" | "false" => Duration.Zero case "off" | "false" => Duration.Zero
@ -299,7 +296,7 @@ final class ReplicatorSettings(
def withDispatcher(dispatcher: String): ReplicatorSettings = { def withDispatcher(dispatcher: String): ReplicatorSettings = {
val d = dispatcher match { val d = dispatcher match {
case "" => Dispatchers.DefaultDispatcherId case "" => Dispatchers.InternalDispatcherId
case id => id case id => id
} }
copy(dispatcher = d) copy(dispatcher = d)

View file

@ -162,7 +162,7 @@ class ReplicatorMessageSerializer(val system: ExtendedActorSystem)
system.scheduler.schedule(cacheTimeToLive, cacheTimeToLive / 2) { system.scheduler.schedule(cacheTimeToLive, cacheTimeToLive / 2) {
readCache.evict() readCache.evict()
writeCache.evict() writeCache.evict()
}(system.dispatcher) }(system.dispatchers.internalDispatcher)
private val writeAckBytes = dm.Empty.getDefaultInstance.toByteArray private val writeAckBytes = dm.Empty.getDefaultInstance.toByteArray
private val dummyAddress = UniqueAddress(Address("a", "b", "c", 2552), 1L) private val dummyAddress = UniqueAddress(Address("a", "b", "c", 2552), 1L)

View file

@ -877,37 +877,10 @@ akka.cluster.log-info-verbose = on
<a id="cluster-dispatcher"></a> <a id="cluster-dispatcher"></a>
### Cluster Dispatcher ### Cluster Dispatcher
Under the hood the cluster extension is implemented with actors and it can be necessary Under the hood the cluster extension is implemented with actors. To protect them against
to create a bulkhead for those actors to avoid disturbance from other actors. Especially disturbance from user actors they are by default run on the internal dispatcher configured
the heartbeating actors that is used for failure detection can generate false positives under `akka.actor.internal-dispatcher`. The cluster actors can potentially be isolated even
if they are not given a chance to run at regular intervals. further onto their own dispatcher using the setting `akka.cluster.use-dispatcher`.
For this purpose you can define a separate dispatcher to be used for the cluster actors:
```
akka.cluster.use-dispatcher = cluster-dispatcher
cluster-dispatcher {
type = "Dispatcher"
executor = "fork-join-executor"
fork-join-executor {
parallelism-min = 2
parallelism-max = 4
}
}
```
@@@ note
Normally it should not be necessary to configure a separate dispatcher for the Cluster.
The default-dispatcher should be sufficient for performing the Cluster tasks, i.e. `akka.cluster.use-dispatcher`
should not be changed. If you have Cluster related problems when using the default-dispatcher that is typically an
indication that you are running blocking or CPU intensive actors/tasks on the default-dispatcher.
Use dedicated dispatchers for such actors/tasks instead of running them on the default-dispatcher,
because that may starve system internal tasks.
Related config properties: `akka.cluster.use-dispatcher = akka.cluster.cluster-dispatcher`.
Corresponding default values: `akka.cluster.use-dispatcher =`.
@@@
### Configuration Compatibility Check ### Configuration Compatibility Check

View file

@ -25,6 +25,13 @@ dispatchers in this ActorSystem. If no ExecutionContext is given, it will fallba
`akka.actor.default-dispatcher.default-executor.fallback`. By default this is a "fork-join-executor", which `akka.actor.default-dispatcher.default-executor.fallback`. By default this is a "fork-join-executor", which
gives excellent performance in most cases. gives excellent performance in most cases.
## Internal dispatcher
To protect the internal Actors that is spawned by the various Akka modules, a separate internal dispatcher is used by default.
The internal dispatcher can be tuned in a fine grained way with the setting `akka.actor.internal-dispatcher`, it can also
be replaced by another dispatcher by making `akka.actor.internal-dispatcher` an @ref[alias](#dispatcher-aliases).
<a id="dispatcher-lookup"></a> <a id="dispatcher-lookup"></a>
## Looking up a Dispatcher ## Looking up a Dispatcher
@ -184,6 +191,19 @@ is used for `PinnedDispatcher` to keep resource usage down in case of idle actor
thread all the time you need to add `thread-pool-executor.allow-core-timeout=off` to the thread all the time you need to add `thread-pool-executor.allow-core-timeout=off` to the
configuration of the `PinnedDispatcher`. configuration of the `PinnedDispatcher`.
## Dispatcher aliases
When a dispatcher is looked up, and the given setting contains a string rather than a dispatcher config block,
the lookup will treat it as an alias, and follow that string to an alternate location for a dispatcher config.
If the dispatcher config is referenced both through an alias and through the absolute path only one dispatcher will
be used and shared among the two ids.
Example: configuring `internal-dispatcher` to be an alias for `default-dispatcher`:
```
akka.actor.internal-dispatcher = akka.actor.default-dispatcher
```
## Blocking Needs Careful Management ## Blocking Needs Careful Management
In some cases it is unavoidable to do blocking operations, i.e. to put a thread In some cases it is unavoidable to do blocking operations, i.e. to put a thread

View file

@ -34,6 +34,34 @@ Use plain `system.actorOf` instead of the DSL to create Actors if you have been
`actorFor` has been deprecated since `2.2`. Use `ActorSelection` instead. `actorFor` has been deprecated since `2.2`. Use `ActorSelection` instead.
## Internal dispatcher introduced
To protect the Akka internals against starvation when user code blocks the default dispatcher (for example by accidental
use of blocking APIs from actors) a new internal dispatcher has been added. All of Akka's internal, non-blocking actors
now run on the internal dispatcher by default.
The dispatcher can be configured through `akka.actor.internal-dispatcher`.
For maximum performance, you might want to use a single shared dispatcher for all non-blocking,
asynchronous actors, user actors and Akka internal actors. In that case, can configure the
`akka.actor.internal-dispatcher` with a string value of `akka.actor.default-dispatcher`.
This reinstantiates the behavior from previous Akka versions but also removes the isolation between
user and Akka internals. So, use at your own risk!
Several `use-dispatcher` configuration settings that previously accepted an empty value to fall back to the default
dispatcher has now gotten an explicit value of `akka.actor.internal-dispatcher` and no longer accept an empty
string as value. If such an empty value is used in your `application.conf` the same result is achieved by simply removing
that entry completely and having the default apply.
For more details about configuring dispatchers, see the @ref[Dispatchers](../dispatchers.md)
## Default dispatcher size
Previously the factor for the default dispatcher was set a bit high (`3.0`) to give some extra threads in case of accidental
blocking and protect a bit against starving the internal actors. Since the internal actors are now on a separate dispatcher
the default dispatcher has been adjusted down to `1.0` which means the number of threads will be one per core, but at least
`8` and at most `64`. This can be tuned using the individual settings in `akka.actor.default-dispatcher.fork-join-executor`.
## Default remoting is now Artery TCP ## Default remoting is now Artery TCP
@ref[Artery TCP](../remoting-artery.md) is now the default remoting implementation. @ref[Artery TCP](../remoting-artery.md) is now the default remoting implementation.

View file

@ -146,6 +146,8 @@ private[remote] class Remoting(_system: ExtendedActorSystem, _provider: RemoteAc
import provider.remoteSettings._ import provider.remoteSettings._
private implicit val ec = system.dispatchers.lookup(Dispatcher)
val transportSupervisor = system.systemActorOf(configureDispatcher(Props[TransportSupervisor]), "transports") val transportSupervisor = system.systemActorOf(configureDispatcher(Props[TransportSupervisor]), "transports")
override def localAddressForRemote(remote: Address): Address = override def localAddressForRemote(remote: Address): Address =
@ -167,7 +169,6 @@ private[remote] class Remoting(_system: ExtendedActorSystem, _provider: RemoteAc
endpointManager = None endpointManager = None
} }
import system.dispatcher
(manager ? ShutdownAndFlush) (manager ? ShutdownAndFlush)
.mapTo[Boolean] .mapTo[Boolean]
.andThen { .andThen {
@ -252,7 +253,6 @@ private[remote] class Remoting(_system: ExtendedActorSystem, _provider: RemoteAc
override def managementCommand(cmd: Any): Future[Boolean] = endpointManager match { override def managementCommand(cmd: Any): Future[Boolean] = endpointManager match {
case Some(manager) => case Some(manager) =>
import system.dispatcher
implicit val timeout = CommandAckTimeout implicit val timeout = CommandAckTimeout
(manager ? ManagementCommand(cmd)).map { case ManagementCommandAck(status) => status } (manager ? ManagementCommand(cmd)).map { case ManagementCommandAck(status) => status }
case None => case None =>

View file

@ -504,7 +504,7 @@ private[remote] abstract class ArteryTransport(_system: ExtendedActorSystem, _pr
system.scheduler.schedule(removeAfter, interval) { system.scheduler.schedule(removeAfter, interval) {
if (!isShutdown) if (!isShutdown)
associationRegistry.removeUnusedQuarantined(removeAfter) associationRegistry.removeUnusedQuarantined(removeAfter)
}(system.dispatcher) }(system.dispatchers.internalDispatcher)
} }
// Select inbound lane based on destination to preserve message order, // Select inbound lane based on destination to preserve message order,
@ -559,11 +559,12 @@ private[remote] abstract class ArteryTransport(_system: ExtendedActorSystem, _pr
val a = association(from.address) val a = association(from.address)
// make sure uid is same for active association // make sure uid is same for active association
if (a.associationState.uniqueRemoteAddressValue().contains(from)) { if (a.associationState.uniqueRemoteAddressValue().contains(from)) {
import system.dispatcher
a.changeActorRefCompression(table).foreach { _ => a.changeActorRefCompression(table)
a.sendControl(ActorRefCompressionAdvertisementAck(localAddress, table.version)) .foreach { _ =>
system.eventStream.publish(Events.ReceivedActorRefCompressionTable(from, table)) a.sendControl(ActorRefCompressionAdvertisementAck(localAddress, table.version))
} system.eventStream.publish(Events.ReceivedActorRefCompressionTable(from, table))
}(system.dispatchers.internalDispatcher)
} }
} else } else
log.debug( log.debug(
@ -590,11 +591,11 @@ private[remote] abstract class ArteryTransport(_system: ExtendedActorSystem, _pr
val a = association(from.address) val a = association(from.address)
// make sure uid is same for active association // make sure uid is same for active association
if (a.associationState.uniqueRemoteAddressValue().contains(from)) { if (a.associationState.uniqueRemoteAddressValue().contains(from)) {
import system.dispatcher a.changeClassManifestCompression(table)
a.changeClassManifestCompression(table).foreach { _ => .foreach { _ =>
a.sendControl(ClassManifestCompressionAdvertisementAck(localAddress, table.version)) a.sendControl(ClassManifestCompressionAdvertisementAck(localAddress, table.version))
system.eventStream.publish(Events.ReceivedClassManifestCompressionTable(from, table)) system.eventStream.publish(Events.ReceivedClassManifestCompressionTable(from, table))
} }(system.dispatchers.internalDispatcher)
} }
} else } else
log.debug( log.debug(
@ -681,7 +682,7 @@ private[remote] abstract class ArteryTransport(_system: ExtendedActorSystem, _pr
"remoteFlushOnShutdown") "remoteFlushOnShutdown")
flushingPromise.future flushingPromise.future
} }
implicit val ec = system.dispatcher implicit val ec = system.dispatchers.internalDispatcher
flushing.recover { case _ => Done }.flatMap(_ => internalShutdown()) flushing.recover { case _ => Done }.flatMap(_ => internalShutdown())
} else { } else {
Future.successful(Done) Future.successful(Done)
@ -689,7 +690,7 @@ private[remote] abstract class ArteryTransport(_system: ExtendedActorSystem, _pr
} }
private def internalShutdown(): Future[Done] = { private def internalShutdown(): Future[Done] = {
import system.dispatcher implicit val ec = system.dispatchers.internalDispatcher
killSwitch.abort(ShutdownSignal) killSwitch.abort(ShutdownSignal)
topLevelFlightRecorder.loFreq(Transport_KillSwitchPulled, NoMetaData) topLevelFlightRecorder.loFreq(Transport_KillSwitchPulled, NoMetaData)
@ -722,7 +723,7 @@ private[remote] abstract class ArteryTransport(_system: ExtendedActorSystem, _pr
* Will complete successfully even if one of the stream completion futures failed * Will complete successfully even if one of the stream completion futures failed
*/ */
private def streamsCompleted: Future[Done] = { private def streamsCompleted: Future[Done] = {
implicit val ec = system.dispatcher implicit val ec = system.dispatchers.internalDispatcher
for { for {
_ <- Future.traverse(associationRegistry.allAssociations)(_.streamsCompleted) _ <- Future.traverse(associationRegistry.allAssociations)(_.streamsCompleted)
_ <- Future.sequence(streamMatValues.get().valuesIterator.map { _ <- Future.sequence(streamMatValues.get().valuesIterator.map {

View file

@ -196,7 +196,7 @@ private[remote] class Association(
updateOutboundCompression(c => c.clearCompression()) updateOutboundCompression(c => c.clearCompression())
private def updateOutboundCompression(action: OutboundCompressionAccess => Future[Done]): Future[Done] = { private def updateOutboundCompression(action: OutboundCompressionAccess => Future[Done]): Future[Done] = {
import transport.system.dispatcher implicit val ec = transport.system.dispatchers.internalDispatcher
val c = outboundCompressionAccess val c = outboundCompressionAccess
if (c.isEmpty) Future.successful(Done) if (c.isEmpty) Future.successful(Done)
else if (c.size == 1) action(c.head) else if (c.size == 1) action(c.head)
@ -276,7 +276,7 @@ private[remote] class Association(
// clear outbound compression, it's safe to do that several times if someone else // clear outbound compression, it's safe to do that several times if someone else
// completes handshake at same time, but it's important to clear it before // completes handshake at same time, but it's important to clear it before
// we signal that the handshake is completed (uniqueRemoteAddressPromise.trySuccess) // we signal that the handshake is completed (uniqueRemoteAddressPromise.trySuccess)
import transport.system.dispatcher implicit val ec = transport.system.dispatchers.internalDispatcher
clearOutboundCompression().map { _ => clearOutboundCompression().map { _ =>
current.uniqueRemoteAddressPromise.trySuccess(peer) current.uniqueRemoteAddressPromise.trySuccess(peer)
current.uniqueRemoteAddressValue() match { current.uniqueRemoteAddressValue() match {
@ -572,7 +572,7 @@ private[remote] class Association(
stopQuarantinedTimer.set(Some(transport.system.scheduler.scheduleOnce(advancedSettings.StopQuarantinedAfterIdle) { stopQuarantinedTimer.set(Some(transport.system.scheduler.scheduleOnce(advancedSettings.StopQuarantinedAfterIdle) {
if (associationState.isQuarantined()) if (associationState.isQuarantined())
abortQuarantined() abortQuarantined()
}(transport.system.dispatcher))) }(transport.system.dispatchers.internalDispatcher)))
} }
private def abortQuarantined(): Unit = { private def abortQuarantined(): Unit = {
@ -803,7 +803,7 @@ private[remote] class Association(
val (queueValues, compressionAccessValues, laneCompletedValues) = values.unzip3 val (queueValues, compressionAccessValues, laneCompletedValues) = values.unzip3
import transport.system.dispatcher implicit val ec = transport.system.dispatchers.internalDispatcher
// tear down all parts if one part fails or completes // tear down all parts if one part fails or completes
Future.firstCompletedOf(laneCompletedValues).failed.foreach { reason => Future.firstCompletedOf(laneCompletedValues).failed.foreach { reason =>

View file

@ -255,7 +255,7 @@ private[remote] class ArteryAeronUdpTransport(_system: ExtendedActorSystem, _pro
private def startAeronErrorLog(): Unit = { private def startAeronErrorLog(): Unit = {
aeronErrorLog = new AeronErrorLog(new File(aeronDir, CncFileDescriptor.CNC_FILE), log) aeronErrorLog = new AeronErrorLog(new File(aeronDir, CncFileDescriptor.CNC_FILE), log)
val lastTimestamp = new AtomicLong(0L) val lastTimestamp = new AtomicLong(0L)
import system.dispatcher implicit val ec = system.dispatchers.internalDispatcher
aeronErrorLogTask = system.scheduler.schedule(3.seconds, 5.seconds) { aeronErrorLogTask = system.scheduler.schedule(3.seconds, 5.seconds) {
if (!isShutdown) { if (!isShutdown) {
val newLastTimestamp = aeronErrorLog.logErrors(log, lastTimestamp.get) val newLastTimestamp = aeronErrorLog.logErrors(log, lastTimestamp.get)
@ -265,7 +265,7 @@ private[remote] class ArteryAeronUdpTransport(_system: ExtendedActorSystem, _pro
} }
private def startAeronCounterLog(): Unit = { private def startAeronCounterLog(): Unit = {
import system.dispatcher implicit val ec = system.dispatchers.internalDispatcher
aeronCounterTask = system.scheduler.schedule(5.seconds, 5.seconds) { aeronCounterTask = system.scheduler.schedule(5.seconds, 5.seconds) {
if (!isShutdown && log.isDebugEnabled) { if (!isShutdown && log.isDebugEnabled) {
aeron.countersReader.forEach(new MetaData() { aeron.countersReader.forEach(new MetaData() {
@ -379,7 +379,7 @@ private[remote] class ArteryAeronUdpTransport(_system: ExtendedActorSystem, _pro
} }
.to(immutable.Vector) .to(immutable.Vector)
import system.dispatcher implicit val ec = system.dispatchers.internalDispatcher
// tear down the upstream hub part if downstream lane fails // tear down the upstream hub part if downstream lane fails
// lanes are not completed with success by themselves so we don't have to care about onSuccess // lanes are not completed with success by themselves so we don't have to care about onSuccess
@ -420,19 +420,20 @@ private[remote] class ArteryAeronUdpTransport(_system: ExtendedActorSystem, _pro
} }
override protected def shutdownTransport(): Future[Done] = { override protected def shutdownTransport(): Future[Done] = {
import system.dispatcher taskRunner
taskRunner.stop().map { _ => .stop()
topLevelFlightRecorder.loFreq(Transport_Stopped, NoMetaData) .map { _ =>
if (aeronErrorLogTask != null) { topLevelFlightRecorder.loFreq(Transport_Stopped, NoMetaData)
aeronErrorLogTask.cancel() if (aeronErrorLogTask != null) {
topLevelFlightRecorder.loFreq(Transport_AeronErrorLogTaskStopped, NoMetaData) aeronErrorLogTask.cancel()
} topLevelFlightRecorder.loFreq(Transport_AeronErrorLogTaskStopped, NoMetaData)
if (aeron != null) aeron.close() }
if (aeronErrorLog != null) aeronErrorLog.close() if (aeron != null) aeron.close()
if (mediaDriver.get.isDefined) stopMediaDriver() if (aeronErrorLog != null) aeronErrorLog.close()
if (mediaDriver.get.isDefined) stopMediaDriver()
Done Done
} }(system.dispatchers.internalDispatcher)
} }
} }

View file

@ -389,7 +389,7 @@ private[remote] class ArteryTcpTransport(
} }
.to(immutable.Vector) .to(immutable.Vector)
import system.dispatcher implicit val ec = system.dispatchers.internalDispatcher
// tear down the upstream hub part if downstream lane fails // tear down the upstream hub part if downstream lane fails
// lanes are not completed with success by themselves so we don't have to care about onSuccess // lanes are not completed with success by themselves so we don't have to care about onSuccess
@ -433,7 +433,7 @@ private[remote] class ArteryTcpTransport(
} }
override protected def shutdownTransport(): Future[Done] = { override protected def shutdownTransport(): Future[Done] = {
import system.dispatcher implicit val ec = system.dispatchers.internalDispatcher
inboundKillSwitch.shutdown() inboundKillSwitch.shutdown()
unbind().map { _ => unbind().map { _ =>
topLevelFlightRecorder.loFreq(Transport_Stopped, NoMetaData) topLevelFlightRecorder.loFreq(Transport_Stopped, NoMetaData)
@ -444,7 +444,7 @@ private[remote] class ArteryTcpTransport(
private def unbind(): Future[Done] = { private def unbind(): Future[Done] = {
serverBinding match { serverBinding match {
case Some(binding) => case Some(binding) =>
import system.dispatcher implicit val ec = system.dispatchers.internalDispatcher
for { for {
b <- binding b <- binding
_ <- b.unbind() _ <- b.unbind()

View file

@ -156,7 +156,7 @@ object ActorTransportAdapter {
} }
abstract class ActorTransportAdapter(wrappedTransport: Transport, system: ActorSystem) abstract class ActorTransportAdapter(wrappedTransport: Transport, system: ActorSystem)
extends AbstractTransportAdapter(wrappedTransport)(system.dispatcher) { extends AbstractTransportAdapter(wrappedTransport)(system.dispatchers.internalDispatcher) {
import ActorTransportAdapter._ import ActorTransportAdapter._

View file

@ -61,7 +61,7 @@ private[remote] object FailureInjectorTransportAdapter {
private[remote] class FailureInjectorTransportAdapter( private[remote] class FailureInjectorTransportAdapter(
wrappedTransport: Transport, wrappedTransport: Transport,
val extendedSystem: ExtendedActorSystem) val extendedSystem: ExtendedActorSystem)
extends AbstractTransportAdapter(wrappedTransport)(extendedSystem.dispatcher) extends AbstractTransportAdapter(wrappedTransport)(extendedSystem.dispatchers.internalDispatcher)
with AssociationEventListener { with AssociationEventListener {
private def rng = ThreadLocalRandom.current() private def rng = ThreadLocalRandom.current()

View file

@ -0,0 +1,33 @@
/*
* Copyright (C) 2009-2019 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.stream
import akka.dispatch.Dispatchers
import akka.stream.testkit.StreamSpec
class StreamDispatcherSpec extends StreamSpec {
"The default blocking io dispatcher for streams" must {
"be the same as the default blocking io dispatcher for actors" in {
val streamIoDispatcher = system.dispatchers.lookup(ActorAttributes.IODispatcher.dispatcher)
val actorIoDispatcher = system.dispatchers.lookup(Dispatchers.DefaultBlockingDispatcherId)
streamIoDispatcher shouldBe theSameInstanceAs(actorIoDispatcher)
}
}
"The deprecated default stream io dispatcher" must {
"be the same as the default blocking io dispatcher for actors" in {
// in case it is still used
val streamIoDispatcher = system.dispatchers.lookup("akka.stream.default-blocking-io-dispatcher")
val actorIoDispatcher = system.dispatchers.lookup(Dispatchers.DefaultBlockingDispatcherId)
streamIoDispatcher shouldBe theSameInstanceAs(actorIoDispatcher)
}
}
}

View file

@ -8,7 +8,7 @@ import java.nio.file.StandardOpenOption.{ CREATE, WRITE }
import java.nio.file._ import java.nio.file._
import akka.actor.ActorSystem import akka.actor.ActorSystem
import akka.dispatch.ExecutionContexts import akka.dispatch.{ Dispatchers, ExecutionContexts }
import akka.stream.impl.PhasedFusingActorMaterializer import akka.stream.impl.PhasedFusingActorMaterializer
import akka.stream.impl.StreamSupervisor import akka.stream.impl.StreamSupervisor
import akka.stream.impl.StreamSupervisor.Children import akka.stream.impl.StreamSupervisor.Children
@ -157,7 +157,7 @@ class FileSinkSpec extends StreamSpec(UnboundedMailboxConfig) {
"use dedicated blocking-io-dispatcher by default" in assertAllStagesStopped { "use dedicated blocking-io-dispatcher by default" in assertAllStagesStopped {
targetFile { f => targetFile { f =>
val sys = ActorSystem("dispatcher-testing", UnboundedMailboxConfig) val sys = ActorSystem("FileSinkSpec-dispatcher-testing-1", UnboundedMailboxConfig)
val materializer = ActorMaterializer()(sys) val materializer = ActorMaterializer()(sys)
try { try {
Source.fromIterator(() => Iterator.continually(TestByteStrings.head)).runWith(FileIO.toPath(f))(materializer) Source.fromIterator(() => Iterator.continually(TestByteStrings.head)).runWith(FileIO.toPath(f))(materializer)
@ -167,14 +167,15 @@ class FileSinkSpec extends StreamSpec(UnboundedMailboxConfig) {
.supervisor .supervisor
.tell(StreamSupervisor.GetChildren, testActor) .tell(StreamSupervisor.GetChildren, testActor)
val ref = expectMsgType[Children].children.find(_.path.toString contains "fileSink").get val ref = expectMsgType[Children].children.find(_.path.toString contains "fileSink").get
assertDispatcher(ref, "akka.stream.default-blocking-io-dispatcher") // haven't figured out why this returns the aliased id rather than the id, but the stage is going away so whatever
assertDispatcher(ref, Dispatchers.DefaultBlockingDispatcherId)
} finally shutdown(sys) } finally shutdown(sys)
} }
} }
"allow overriding the dispatcher using Attributes" in assertAllStagesStopped { "allow overriding the dispatcher using Attributes" in assertAllStagesStopped {
targetFile { f => targetFile { f =>
val sys = ActorSystem("dispatcher-testing", UnboundedMailboxConfig) val sys = ActorSystem("FileSinkSpec-dispatcher-testing-2", UnboundedMailboxConfig)
val materializer = ActorMaterializer()(sys) val materializer = ActorMaterializer()(sys)
try { try {

View file

@ -9,6 +9,7 @@ import java.nio.file.{ Files, NoSuchFileException }
import java.util.Random import java.util.Random
import akka.actor.ActorSystem import akka.actor.ActorSystem
import akka.dispatch.Dispatchers
import akka.stream.IOResult._ import akka.stream.IOResult._
import akka.stream._ import akka.stream._
import akka.stream.impl.{ PhasedFusingActorMaterializer, StreamSupervisor } import akka.stream.impl.{ PhasedFusingActorMaterializer, StreamSupervisor }
@ -251,7 +252,7 @@ class FileSourceSpec extends StreamSpec(UnboundedMailboxConfig) {
.supervisor .supervisor
.tell(StreamSupervisor.GetChildren, testActor) .tell(StreamSupervisor.GetChildren, testActor)
val ref = expectMsgType[Children].children.find(_.path.toString contains "fileSource").get val ref = expectMsgType[Children].children.find(_.path.toString contains "fileSource").get
try assertDispatcher(ref, "akka.stream.default-blocking-io-dispatcher") try assertDispatcher(ref, ActorAttributes.IODispatcher.dispatcher)
finally p.cancel() finally p.cancel()
} finally shutdown(sys) } finally shutdown(sys)
} }

View file

@ -24,6 +24,8 @@ import akka.util.ByteString
import scala.concurrent.duration._ import scala.concurrent.duration._
import java.util.concurrent.ThreadLocalRandom import java.util.concurrent.ThreadLocalRandom
import akka.dispatch.Dispatchers
import scala.concurrent.{ Await, Future } import scala.concurrent.{ Await, Future }
import scala.util.control.NoStackTrace import scala.util.control.NoStackTrace
@ -223,7 +225,7 @@ class InputStreamSinkSpec extends StreamSpec(UnboundedMailboxConfig) {
.supervisor .supervisor
.tell(StreamSupervisor.GetChildren, testActor) .tell(StreamSupervisor.GetChildren, testActor)
val ref = expectMsgType[Children].children.find(_.path.toString contains "inputStreamSink").get val ref = expectMsgType[Children].children.find(_.path.toString contains "inputStreamSink").get
assertDispatcher(ref, "akka.stream.default-blocking-io-dispatcher") assertDispatcher(ref, ActorAttributes.IODispatcher.dispatcher)
} finally shutdown(sys) } finally shutdown(sys)
} }

View file

@ -7,6 +7,7 @@ package akka.stream.scaladsl
import java.util.concurrent.{ CompletionStage, TimeUnit } import java.util.concurrent.{ CompletionStage, TimeUnit }
import akka.actor.ActorSystem import akka.actor.ActorSystem
import akka.dispatch.Dispatchers
import akka.{ Done, NotUsed } import akka.{ Done, NotUsed }
import akka.stream.Attributes._ import akka.stream.Attributes._
import akka.stream._ import akka.stream._
@ -282,8 +283,8 @@ class AttributesSpec
Source Source
.fromGraph( .fromGraph(
// directly on stage // directly on stage
new ThreadNameSnitchingStage("akka.stream.default-blocking-io-dispatcher").addAttributes( new ThreadNameSnitchingStage(ActorAttributes.IODispatcher.dispatcher)
ActorAttributes.dispatcher("my-dispatcher"))) .addAttributes(ActorAttributes.dispatcher("my-dispatcher")))
.runWith(Sink.head) .runWith(Sink.head)
.futureValue .futureValue
@ -293,20 +294,20 @@ class AttributesSpec
"use the most specific dispatcher when another one is defined on a surrounding composed graph" in { "use the most specific dispatcher when another one is defined on a surrounding composed graph" in {
val dispatcher = val dispatcher =
Source Source
.fromGraph(new ThreadNameSnitchingStage("akka.stream.default-blocking-io-dispatcher")) .fromGraph(new ThreadNameSnitchingStage(ActorAttributes.IODispatcher.dispatcher))
.map(identity) .map(identity)
// this is now for the composed source -> flow graph // this is now for the composed source -> flow graph
.addAttributes(ActorAttributes.dispatcher("my-dispatcher")) .addAttributes(ActorAttributes.dispatcher("my-dispatcher"))
.runWith(Sink.head) .runWith(Sink.head)
.futureValue .futureValue
dispatcher should startWith("AttributesSpec-akka.stream.default-blocking-io-dispatcher") dispatcher should startWith(s"AttributesSpec-${Dispatchers.DefaultBlockingDispatcherId}")
} }
"not change dispatcher from one defined on a surrounding graph" in { "not change dispatcher from one defined on a surrounding graph" in {
val dispatcher = val dispatcher =
Source Source
.fromGraph(new ThreadNameSnitchingStage("akka.stream.default-blocking-io-dispatcher")) .fromGraph(new ThreadNameSnitchingStage(ActorAttributes.IODispatcher.dispatcher))
// this already introduces an async boundary here // this already introduces an async boundary here
.map(identity) .map(identity)
// this is now just for map since there already is one in-between stage and map // this is now just for map since there already is one in-between stage and map
@ -315,13 +316,13 @@ class AttributesSpec
.runWith(Sink.head) .runWith(Sink.head)
.futureValue .futureValue
dispatcher should startWith("AttributesSpec-akka.stream.default-blocking-io-dispatcher") dispatcher should startWith(s"AttributesSpec-${Dispatchers.DefaultBlockingDispatcherId}")
} }
"change dispatcher when defined directly on top of the async boundary" in { "change dispatcher when defined directly on top of the async boundary" in {
val dispatcher = val dispatcher =
Source Source
.fromGraph(new ThreadNameSnitchingStage("akka.stream.default-blocking-io-dispatcher")) .fromGraph(new ThreadNameSnitchingStage(ActorAttributes.IODispatcher.dispatcher))
.async .async
.withAttributes(ActorAttributes.dispatcher("my-dispatcher")) .withAttributes(ActorAttributes.dispatcher("my-dispatcher"))
.runWith(Sink.head) .runWith(Sink.head)
@ -333,7 +334,7 @@ class AttributesSpec
"change dispatcher when defined on the async call" in { "change dispatcher when defined on the async call" in {
val dispatcher = val dispatcher =
Source Source
.fromGraph(new ThreadNameSnitchingStage("akka.stream.default-blocking-io-dispatcher")) .fromGraph(new ThreadNameSnitchingStage(ActorAttributes.IODispatcher.dispatcher))
.async("my-dispatcher") .async("my-dispatcher")
.runWith(Sink.head) .runWith(Sink.head)
.futureValue .futureValue
@ -411,7 +412,7 @@ class AttributesSpec
"not change dispatcher from one defined on a surrounding graph" in { "not change dispatcher from one defined on a surrounding graph" in {
val dispatcherF = val dispatcherF =
javadsl.Source javadsl.Source
.fromGraph(new ThreadNameSnitchingStage("akka.stream.default-blocking-io-dispatcher")) .fromGraph(new ThreadNameSnitchingStage(ActorAttributes.IODispatcher.dispatcher))
// this already introduces an async boundary here // this already introduces an async boundary here
.detach .detach
// this is now just for map since there already is one in-between stage and map // this is now just for map since there already is one in-between stage and map
@ -421,13 +422,13 @@ class AttributesSpec
val dispatcher = dispatcherF.toCompletableFuture.get(remainingOrDefault.toMillis, TimeUnit.MILLISECONDS) val dispatcher = dispatcherF.toCompletableFuture.get(remainingOrDefault.toMillis, TimeUnit.MILLISECONDS)
dispatcher should startWith("AttributesSpec-akka.stream.default-blocking-io-dispatcher") dispatcher should startWith(s"AttributesSpec-${Dispatchers.DefaultBlockingDispatcherId}")
} }
"change dispatcher when defined directly on top of the async boundary" in { "change dispatcher when defined directly on top of the async boundary" in {
val dispatcherF = val dispatcherF =
javadsl.Source javadsl.Source
.fromGraph(new ThreadNameSnitchingStage("akka.stream.default-blocking-io-dispatcher")) .fromGraph(new ThreadNameSnitchingStage(ActorAttributes.IODispatcher.dispatcher))
.async .async
.withAttributes(ActorAttributes.dispatcher("my-dispatcher")) .withAttributes(ActorAttributes.dispatcher("my-dispatcher"))
.runWith(javadsl.Sink.head(), materializer) .runWith(javadsl.Sink.head(), materializer)
@ -507,12 +508,12 @@ class AttributesSpec
try { try {
val dispatcher = val dispatcher =
Source Source
.fromGraph(new ThreadNameSnitchingStage("akka.stream.default-blocking-io-dispatcher")) .fromGraph(new ThreadNameSnitchingStage(ActorAttributes.IODispatcher.dispatcher))
.runWith(Sink.head)(myDispatcherMaterializer) .runWith(Sink.head)(myDispatcherMaterializer)
.futureValue .futureValue
// should not override stage specific dispatcher // should not override stage specific dispatcher
dispatcher should startWith("AttributesSpec-akka.stream.default-blocking-io-dispatcher") dispatcher should startWith("AttributesSpec-akka.actor.default-blocking-io-dispatcher")
} finally { } finally {
myDispatcherMaterializer.shutdown() myDispatcherMaterializer.shutdown()
@ -565,7 +566,7 @@ class AttributesSpec
val threadName = val threadName =
Source.fromGraph(new ThreadNameSnitchingStage(None).addAttributes(Attributes(IODispatcher))).runWith(Sink.head) Source.fromGraph(new ThreadNameSnitchingStage(None).addAttributes(Attributes(IODispatcher))).runWith(Sink.head)
threadName.futureValue should startWith("AttributesSpec-akka.stream.default-blocking-io-dispatcher") threadName.futureValue should startWith("AttributesSpec-akka.actor.default-blocking-io-dispatcher")
} }
"allow for specifying a custom default io-dispatcher" in { "allow for specifying a custom default io-dispatcher" in {
@ -586,19 +587,6 @@ class AttributesSpec
TestKit.shutdownActorSystem(system) TestKit.shutdownActorSystem(system)
} }
} }
"resolve the dispatcher attribute" in {
import ActorAttributes._
Dispatcher.resolve(dispatcher("my-dispatcher"), materializer.settings) should be("my-dispatcher")
}
"resolve the blocking io dispatcher attribute" in {
import ActorAttributes._
Dispatcher.resolve(Attributes(IODispatcher), materializer.settings) should be(
"akka.stream.default-blocking-io-dispatcher")
}
} }
} }

View file

@ -7,6 +7,7 @@ package akka.stream.scaladsl
import java.util.stream.Collectors import java.util.stream.Collectors
import akka.actor.ActorSystem import akka.actor.ActorSystem
import akka.dispatch.Dispatchers
import akka.stream._ import akka.stream._
import akka.stream.impl.{ PhasedFusingActorMaterializer, StreamSupervisor } import akka.stream.impl.{ PhasedFusingActorMaterializer, StreamSupervisor }
import akka.stream.impl.StreamSupervisor.Children import akka.stream.impl.StreamSupervisor.Children
@ -80,7 +81,7 @@ class SinkAsJavaStreamSpec extends StreamSpec(UnboundedMailboxConfig) {
.supervisor .supervisor
.tell(StreamSupervisor.GetChildren, testActor) .tell(StreamSupervisor.GetChildren, testActor)
val ref = expectMsgType[Children].children.find(_.path.toString contains "asJavaStream").get val ref = expectMsgType[Children].children.find(_.path.toString contains "asJavaStream").get
assertDispatcher(ref, "akka.stream.default-blocking-io-dispatcher") assertDispatcher(ref, ActorAttributes.IODispatcher.dispatcher)
} finally shutdown(sys) } finally shutdown(sys)
} }
} }

View file

@ -8,6 +8,7 @@ import java.util.concurrent.atomic.AtomicInteger
import akka.Done import akka.Done
import akka.actor.ActorSystem import akka.actor.ActorSystem
import akka.dispatch.Dispatchers
import akka.stream.impl.StreamSupervisor.Children import akka.stream.impl.StreamSupervisor.Children
import akka.stream.impl.{ PhasedFusingActorMaterializer, StreamSupervisor } import akka.stream.impl.{ PhasedFusingActorMaterializer, StreamSupervisor }
import akka.stream.testkit.Utils._ import akka.stream.testkit.Utils._
@ -325,7 +326,7 @@ class UnfoldResourceAsyncSourceSpec extends StreamSpec(UnboundedMailboxConfig) {
.supervisor .supervisor
.tell(StreamSupervisor.GetChildren, testActor) .tell(StreamSupervisor.GetChildren, testActor)
val ref = expectMsgType[Children].children.find(_.path.toString contains "unfoldResourceSourceAsync").get val ref = expectMsgType[Children].children.find(_.path.toString contains "unfoldResourceSourceAsync").get
assertDispatcher(ref, "akka.stream.default-blocking-io-dispatcher") assertDispatcher(ref, ActorAttributes.IODispatcher.dispatcher)
} finally shutdown(sys) } finally shutdown(sys)
} }

View file

@ -10,6 +10,7 @@ import java.nio.file.Files
import java.util.concurrent.atomic.AtomicInteger import java.util.concurrent.atomic.AtomicInteger
import akka.actor.ActorSystem import akka.actor.ActorSystem
import akka.dispatch.Dispatchers
import akka.stream.ActorAttributes._ import akka.stream.ActorAttributes._
import akka.stream.Supervision._ import akka.stream.Supervision._
import akka.stream.impl.StreamSupervisor.Children import akka.stream.impl.StreamSupervisor.Children
@ -163,7 +164,7 @@ class UnfoldResourceSourceSpec extends StreamSpec(UnboundedMailboxConfig) {
.supervisor .supervisor
.tell(StreamSupervisor.GetChildren, testActor) .tell(StreamSupervisor.GetChildren, testActor)
val ref = expectMsgType[Children].children.find(_.path.toString contains "unfoldResourceSource").get val ref = expectMsgType[Children].children.find(_.path.toString contains "unfoldResourceSource").get
try assertDispatcher(ref, "akka.stream.default-blocking-io-dispatcher") try assertDispatcher(ref, ActorAttributes.IODispatcher.dispatcher)
finally p.cancel() finally p.cancel()
} finally shutdown(sys) } finally shutdown(sys)
} }

View file

@ -62,10 +62,12 @@ ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.scaladsl.FlowO
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.scaladsl.FlowOps.zipLatestWithGraph") ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.scaladsl.FlowOps.zipLatestWithGraph")
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.scaladsl.FlowOps.zipLatestWith") ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.scaladsl.FlowOps.zipLatestWith")
## 2.6 # dispatcher aliases made internal streams dispatcher resolve superfluous #26775
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.ActorAttributes#Dispatcher.resolve")
# #24372 No Future/CompletionStage in StreamRefs # #24372 No Future/CompletionStage in StreamRefs
# FIXME why was change not detected? # no filter because MiMa doesn't check the generic signature
# https://github.com/lightbend/migration-manager/issues/40
# 26188 remove Timed # 26188 remove Timed
ProblemFilters.exclude[MissingClassProblem]("akka.stream.extra.Timed$TimedFlowContext") ProblemFilters.exclude[MissingClassProblem]("akka.stream.extra.Timed$TimedFlowContext")

View file

@ -14,11 +14,13 @@ akka {
max-input-buffer-size = 16 max-input-buffer-size = 16
# Fully qualified config path which holds the dispatcher configuration # Fully qualified config path which holds the dispatcher configuration
# to be used by ActorMaterializer when creating Actors. # or full dispatcher configuration to be used by ActorMaterializer when creating Actors.
# When this value is left empty, the default-dispatcher will be used. dispatcher = "akka.actor.default-dispatcher"
dispatcher = ""
blocking-io-dispatcher = "akka.stream.default-blocking-io-dispatcher" # Fully qualified config path which holds the dispatcher configuration
# or full dispatcher configuration to be used by stream operators that
# perform blocking operations
blocking-io-dispatcher = "akka.actor.default-blocking-io-dispatcher"
# Cleanup leaked publishers and subscribers when they are not used within a given # Cleanup leaked publishers and subscribers when they are not used within a given
# deadline # deadline
@ -122,21 +124,12 @@ akka {
//#stream-ref //#stream-ref
} }
# Deprecated, use akka.stream.materializer.blocking-io-dispatcher, this setting # Deprecated, left here to not break Akka HTTP which refers to it
# was never applied because of bug #24357 blocking-io-dispatcher = "akka.actor.default-blocking-io-dispatcher"
# It must still have a valid value because used from Akka HTTP.
blocking-io-dispatcher = "akka.stream.default-blocking-io-dispatcher"
default-blocking-io-dispatcher {
type = "Dispatcher"
executor = "thread-pool-executor"
throughput = 1
thread-pool-executor {
fixed-pool-size = 16
}
}
# Deprecated, will not be used unless user code refer to it, use 'akka.stream.materializer.blocking-io-dispatcher'
# instead, or if from code, prefer the 'ActorAttributes.IODispatcher' attribute
default-blocking-io-dispatcher = "akka.actor.default-blocking-io-dispatcher"
} }
# configure overrides to ssl-configuration here (to be used by akka-streams, and akka-http i.e. when serving https connections) # configure overrides to ssl-configuration here (to be used by akka-streams, and akka-http i.e. when serving https connections)

View file

@ -405,33 +405,8 @@ object ActorAttributes {
import Attributes._ import Attributes._
final case class Dispatcher(dispatcher: String) extends MandatoryAttribute final case class Dispatcher(dispatcher: String) extends MandatoryAttribute
object Dispatcher {
/**
* INTERNAL API
* Resolves the dispatcher's name with a fallback to the default blocking IO dispatcher.
* Note that `IODispatcher.dispatcher` is not used here as the config used to create [[ActorMaterializerSettings]]
* is not easily accessible, instead the name is taken from `settings.blockingIoDispatcher`
*/
@InternalApi
private[akka] def resolve(attributes: Attributes, settings: ActorMaterializerSettings): String =
attributes.mandatoryAttribute[Dispatcher] match {
case IODispatcher => settings.blockingIoDispatcher
case Dispatcher(dispatcher) => dispatcher
}
/**
* INTERNAL API
* Resolves the dispatcher name with a fallback to the default blocking IO dispatcher.
*/
@InternalApi
private[akka] def resolve(context: MaterializationContext): String =
resolve(context.effectiveAttributes, ActorMaterializerHelper.downcast(context.materializer).settings)
}
final case class SupervisionStrategy(decider: Supervision.Decider) extends MandatoryAttribute final case class SupervisionStrategy(decider: Supervision.Decider) extends MandatoryAttribute
// this is actually a config key that needs reading and itself will contain the actual dispatcher name
val IODispatcher: Dispatcher = ActorAttributes.Dispatcher("akka.stream.materializer.blocking-io-dispatcher") val IODispatcher: Dispatcher = ActorAttributes.Dispatcher("akka.stream.materializer.blocking-io-dispatcher")
/** /**

View file

@ -8,16 +8,7 @@ import java.util
import java.util.concurrent.atomic.AtomicBoolean import java.util.concurrent.atomic.AtomicBoolean
import akka.NotUsed import akka.NotUsed
import akka.actor.{ import akka.actor.{ ActorContext, ActorRef, ActorRefFactory, ActorSystem, Cancellable, ExtendedActorSystem, PoisonPill }
ActorContext,
ActorRef,
ActorRefFactory,
ActorSystem,
Cancellable,
Deploy,
ExtendedActorSystem,
PoisonPill
}
import akka.annotation.{ DoNotInherit, InternalApi } import akka.annotation.{ DoNotInherit, InternalApi }
import akka.dispatch.Dispatchers import akka.dispatch.Dispatchers
import akka.event.{ Logging, LoggingAdapter } import akka.event.{ Logging, LoggingAdapter }
@ -421,14 +412,10 @@ private final case class SavedIslandData(
Attributes( Attributes(
Attributes.InputBuffer(settings.initialInputBufferSize, settings.maxInputBufferSize) :: Attributes.InputBuffer(settings.initialInputBufferSize, settings.maxInputBufferSize) ::
ActorAttributes.SupervisionStrategy(settings.supervisionDecider) :: ActorAttributes.SupervisionStrategy(settings.supervisionDecider) ::
ActorAttributes.Dispatcher(if (settings.dispatcher == Deploy.NoDispatcherGiven) Dispatchers.DefaultDispatcherId ActorAttributes.Dispatcher(settings.dispatcher) :: Nil)
else settings.dispatcher) :: Nil)
} }
override lazy val executionContext: ExecutionContextExecutor = dispatchers.lookup(settings.dispatcher match { override lazy val executionContext: ExecutionContextExecutor = dispatchers.lookup(settings.dispatcher)
case Deploy.NoDispatcherGiven => Dispatchers.DefaultDispatcherId
case other => other
})
override def schedulePeriodically( override def schedulePeriodically(
initialDelay: FiniteDuration, initialDelay: FiniteDuration,
@ -782,7 +769,7 @@ private final case class SavedIslandData(
case _ => case _ =>
val props = ActorGraphInterpreter val props = ActorGraphInterpreter
.props(shell) .props(shell)
.withDispatcher(ActorAttributes.Dispatcher.resolve(effectiveAttributes, settings)) .withDispatcher(effectiveAttributes.mandatoryAttribute[ActorAttributes.Dispatcher].dispatcher)
val actorName = fullIslandName match { val actorName = fullIslandName match {
case OptionVal.Some(n) => n case OptionVal.Some(n) => n
@ -933,7 +920,7 @@ private final case class SavedIslandData(
def materializeAtomic(mod: AtomicModule[Shape, Any], attributes: Attributes): (NotUsed, Any) = { def materializeAtomic(mod: AtomicModule[Shape, Any], attributes: Attributes): (NotUsed, Any) = {
val tls = mod.asInstanceOf[TlsModule] val tls = mod.asInstanceOf[TlsModule]
val dispatcher = ActorAttributes.Dispatcher.resolve(attributes, materializer.settings) val dispatcher = attributes.mandatoryAttribute[ActorAttributes.Dispatcher].dispatcher
val maxInputBuffer = attributes.mandatoryAttribute[Attributes.InputBuffer].max val maxInputBuffer = attributes.mandatoryAttribute[Attributes.InputBuffer].max
val props = val props =

View file

@ -38,8 +38,9 @@ import scala.concurrent.{ Future, Promise }
val ioResultPromise = Promise[IOResult]() val ioResultPromise = Promise[IOResult]()
val props = FileSubscriber.props(f, ioResultPromise, maxInputBufferSize, startPosition, options) val props = FileSubscriber.props(f, ioResultPromise, maxInputBufferSize, startPosition, options)
val ref = materializer.actorOf(
val ref = materializer.actorOf(context, props.withDispatcher(Dispatcher.resolve(context))) context,
props.withDispatcher(context.effectiveAttributes.mandatoryAttribute[Dispatcher].dispatcher))
(akka.stream.actor.ActorSubscriber[ByteString](ref), ioResultPromise.future) (akka.stream.actor.ActorSubscriber[ByteString](ref), ioResultPromise.future)
} }
@ -72,7 +73,7 @@ import scala.concurrent.{ Future, Promise }
val props = OutputStreamSubscriber val props = OutputStreamSubscriber
.props(os, ioResultPromise, maxInputBufferSize, autoFlush) .props(os, ioResultPromise, maxInputBufferSize, autoFlush)
.withDispatcher(Dispatcher.resolve(context)) .withDispatcher(context.effectiveAttributes.mandatoryAttribute[Dispatcher].dispatcher)
val ref = materializer.actorOf(context, props) val ref = materializer.actorOf(context, props)
(akka.stream.actor.ActorSubscriber[ByteString](ref), ioResultPromise.future) (akka.stream.actor.ActorSubscriber[ByteString](ref), ioResultPromise.future)

View file

@ -159,7 +159,9 @@ private[akka] final class FileSource(path: Path, chunkSize: Int, startPosition:
val pub = try { val pub = try {
val is = createInputStream() // can throw, i.e. FileNotFound val is = createInputStream() // can throw, i.e. FileNotFound
val props = InputStreamPublisher.props(is, ioResultPromise, chunkSize).withDispatcher(Dispatcher.resolve(context)) val props = InputStreamPublisher
.props(is, ioResultPromise, chunkSize)
.withDispatcher(context.effectiveAttributes.mandatoryAttribute[Dispatcher].dispatcher)
val ref = materializer.actorOf(context, props) val ref = materializer.actorOf(context, props)
akka.stream.actor.ActorPublisher[ByteString](ref) akka.stream.actor.ActorPublisher[ByteString](ref)

View file

@ -33,8 +33,7 @@ object MaterializerState {
def streamSnapshots(mat: Materializer): Future[immutable.Seq[StreamSnapshot]] = { def streamSnapshots(mat: Materializer): Future[immutable.Seq[StreamSnapshot]] = {
mat match { mat match {
case impl: PhasedFusingActorMaterializer => case impl: PhasedFusingActorMaterializer =>
import impl.system.dispatcher requestFromSupervisor(impl.supervisor)(impl.system.dispatchers.internalDispatcher)
requestFromSupervisor(impl.supervisor)
} }
} }