Actually use the relative blocking io dispatcher setting #24357

* Remove docs and have only deprecation comment on old setting
* ConfigFactory.load fixed
This commit is contained in:
Johan Andrén 2018-02-22 13:42:59 +01:00 committed by Patrik Nordwall
parent 89d5b5d00e
commit fd6f30673a
14 changed files with 187 additions and 51 deletions

View file

@ -1,8 +0,0 @@
# Disable phases in Coordinated Shutdown
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.actor.CoordinatedShutdown#Phase.copy")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.actor.CoordinatedShutdown#Phase.this")
ProblemFilters.exclude[MissingTypesProblem]("akka.actor.CoordinatedShutdown$Phase$")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.actor.CoordinatedShutdown#Phase.apply")
# Path based WriteFile command #23902
ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.io.TcpConnection.PendingWriteFile")

View file

@ -3,3 +3,12 @@ ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.actor.ActorSystem.get
# #23770 typed actor context ask # #23770 typed actor context ask
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.pattern.PromiseActorRef.apply") ProblemFilters.exclude[DirectMissingMethodProblem]("akka.pattern.PromiseActorRef.apply")
# Disable phases in Coordinated Shutdown
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.actor.CoordinatedShutdown#Phase.copy")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.actor.CoordinatedShutdown#Phase.this")
ProblemFilters.exclude[MissingTypesProblem]("akka.actor.CoordinatedShutdown$Phase$")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.actor.CoordinatedShutdown#Phase.apply")
# Path based WriteFile command #23902
ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.io.TcpConnection.PendingWriteFile")

View file

@ -6,12 +6,14 @@ package akka.stream.scaladsl
import java.util.Optional import java.util.Optional
import java.util.concurrent.{ CompletableFuture, CompletionStage, TimeUnit } import java.util.concurrent.{ CompletableFuture, CompletionStage, TimeUnit }
import akka.actor.ActorSystem
import akka.{ Done, NotUsed } import akka.{ Done, NotUsed }
import akka.stream.Attributes._ import akka.stream.Attributes._
import akka.stream._ import akka.stream._
import akka.stream.javadsl import akka.stream.javadsl
import akka.stream.stage._ import akka.stream.stage._
import akka.stream.testkit._ import akka.stream.testkit._
import akka.testkit.TestKit
import com.typesafe.config.ConfigFactory import com.typesafe.config.ConfigFactory
object AttributesSpec { object AttributesSpec {
@ -76,10 +78,12 @@ object AttributesSpec {
} }
} }
class ThreadNameSnitchingStage(initialDispatcher: String) extends GraphStage[SourceShape[String]] { class ThreadNameSnitchingStage(initialDispatcher: Option[String]) extends GraphStage[SourceShape[String]] {
def this(initialDispatcher: String) = this(Some(initialDispatcher))
val out = Outlet[String]("out") val out = Outlet[String]("out")
override val shape = SourceShape.of(out) override val shape = SourceShape.of(out)
override protected def initialAttributes: Attributes = ActorAttributes.dispatcher(initialDispatcher) override protected def initialAttributes: Attributes =
initialDispatcher.fold(Attributes.none)(name ActorAttributes.dispatcher(name))
def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) { def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) {
setHandler(out, new OutHandler { setHandler(out, new OutHandler {
def onPull(): Unit = { def onPull(): Unit = {
@ -105,7 +109,10 @@ class AttributesSpec extends StreamSpec(ConfigFactory.parseString(
} }
throughput = 1 throughput = 1
} }
""").withFallback(Utils.UnboundedMailboxConfig)) { """)
// we need to revert to the regular mailbox or else the test suite will complain
// about using non-test worthy dispatchers
.withFallback(Utils.UnboundedMailboxConfig)) {
import AttributesSpec._ import AttributesSpec._
@ -491,4 +498,69 @@ class AttributesSpec extends StreamSpec(ConfigFactory.parseString(
} }
} }
"the default dispatcher attributes" must {
val config = ConfigFactory.parseString(s"""
my-dispatcher {
type = Dispatcher
executor = "thread-pool-executor"
thread-pool-executor {
fixed-pool-size = 1
}
throughput = 1
}
my-io-dispatcher = $${my-dispatcher}
akka.stream.materializer.dispatcher = "my-dispatcher"
akka.stream.materializer.blocking-io-dispatcher = "my-io-dispatcher"
""")
// we need to revert to the regular mailbox or else the test suite will complain
// about using non-test worthy dispatchers
.withFallback(Utils.UnboundedMailboxConfig).resolve()
"allow for specifying a custom default dispatcher" in {
val system = ActorSystem("AttributesSpec-default-dispatcher-override", config)
try {
val mat = ActorMaterializer()(system)
val threadName =
Source.fromGraph(new ThreadNameSnitchingStage(None)).runWith(Sink.head)(mat)
threadName.futureValue should startWith("AttributesSpec-default-dispatcher-override-my-dispatcher-")
} finally {
TestKit.shutdownActorSystem(system)
}
}
"use the default-io-dispatcher by default" in {
val threadName =
Source.fromGraph(new ThreadNameSnitchingStage(None)
// FIXME surprising that we don't have something shorter than this
.addAttributes(Attributes.none and ActorAttributes.IODispatcher)).runWith(Sink.head)
threadName.futureValue should startWith("AttributesSpec-akka.stream.default-blocking-io-dispatcher")
}
"allow for specifying a custom default io-dispatcher" in {
val system = ActorSystem("AttributesSpec-io-dispatcher-override", config)
try {
val mat = ActorMaterializer()(system)
val threadName =
Source.fromGraph(new ThreadNameSnitchingStage(None)
// FIXME surprising that we don't have something shorter than this
.addAttributes(Attributes.none and ActorAttributes.IODispatcher)).runWith(Sink.head)(mat)
threadName.futureValue should startWith("AttributesSpec-io-dispatcher-override-my-io-dispatcher-")
} finally {
TestKit.shutdownActorSystem(system)
}
}
}
} }

View file

@ -13,6 +13,9 @@ ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.ActorRefSou
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.ActorRefBackpressureSinkStage.this") ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.ActorRefBackpressureSinkStage.this")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.ActorRefSourceActor.this") ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.ActorRefSourceActor.this")
# 24357 io-dispatcher setting
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.ActorMaterializerSettings.this")
# #24254 add collectType # #24254 add collectType
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.scaladsl.FlowOps.collectType") ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.scaladsl.FlowOps.collectType")

View file

@ -18,6 +18,8 @@ akka {
# When this value is left empty, the default-dispatcher will be used. # When this value is left empty, the default-dispatcher will be used.
dispatcher = "" dispatcher = ""
blocking-io-dispatcher = "akka.stream.default-blocking-io-dispatcher"
# Cleanup leaked publishers and subscribers when they are not used within a given # Cleanup leaked publishers and subscribers when they are not used within a given
# deadline # deadline
subscription-timeout { subscription-timeout {
@ -111,10 +113,9 @@ akka {
//#stream-ref //#stream-ref
} }
# Fully qualified config path which holds the dispatcher configuration # Deprecated, use akka.stream.materializer.blocking-io-dispatcher, this setting
# to be used by ActorMaterializer when creating Actors for IO operations, # was never applied because of bug #24357
# such as FileSource, FileSink and others. blocking-io-dispatcher = "<deprecated>"
blocking-io-dispatcher = "akka.stream.default-blocking-io-dispatcher"
default-blocking-io-dispatcher { default-blocking-io-dispatcher {
type = "Dispatcher" type = "Dispatcher"

View file

@ -6,7 +6,8 @@ package akka.stream
import java.util.concurrent.TimeUnit import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicBoolean import java.util.concurrent.atomic.AtomicBoolean
import akka.actor.{ ActorContext, ActorRef, ActorRefFactory, ActorSystem, ActorSystemImpl, ExtendedActorSystem, Props } import akka.actor.{ ActorContext, ActorRef, ActorRefFactory, ActorSystem, ExtendedActorSystem, Props }
import akka.annotation.InternalApi
import akka.event.LoggingAdapter import akka.event.LoggingAdapter
import akka.util.Helpers.toRootLowerCase import akka.util.Helpers.toRootLowerCase
import akka.stream.impl._ import akka.stream.impl._
@ -253,11 +254,16 @@ object ActorMaterializerSettings {
outputBurstLimit: Int, outputBurstLimit: Int,
fuzzingMode: Boolean, fuzzingMode: Boolean,
autoFusing: Boolean, autoFusing: Boolean,
maxFixedBufferSize: Int) = maxFixedBufferSize: Int) = {
// these sins were comitted in the name of bin comp:
val config = ConfigFactory.defaultReference
new ActorMaterializerSettings( new ActorMaterializerSettings(
initialInputBufferSize, maxInputBufferSize, dispatcher, supervisionDecider, subscriptionTimeoutSettings, debugLogging, initialInputBufferSize, maxInputBufferSize, dispatcher, supervisionDecider, subscriptionTimeoutSettings, debugLogging,
outputBurstLimit, fuzzingMode, autoFusing, maxFixedBufferSize, 1000, IOSettings(tcpWriteBufferSize = 16 * 1024), outputBurstLimit, fuzzingMode, autoFusing, maxFixedBufferSize, 1000, IOSettings(tcpWriteBufferSize = 16 * 1024),
StreamRefSettings(ConfigFactory.load().getConfig("akka.stream.materializer.stream-ref"))) StreamRefSettings(config.getConfig("akka.stream.materializer.stream-ref")),
config.getString("akka.stream.blocking-io-dispatcher")
)
}
/** /**
* Create [[ActorMaterializerSettings]] from the settings of an [[akka.actor.ActorSystem]] (Scala). * Create [[ActorMaterializerSettings]] from the settings of an [[akka.actor.ActorSystem]] (Scala).
@ -282,7 +288,8 @@ object ActorMaterializerSettings {
maxFixedBufferSize = config.getInt("max-fixed-buffer-size"), maxFixedBufferSize = config.getInt("max-fixed-buffer-size"),
syncProcessingLimit = config.getInt("sync-processing-limit"), syncProcessingLimit = config.getInt("sync-processing-limit"),
ioSettings = IOSettings(config.getConfig("io")), ioSettings = IOSettings(config.getConfig("io")),
streamRefSettings = StreamRefSettings(config.getConfig("stream-ref"))) streamRefSettings = StreamRefSettings(config.getConfig("stream-ref")),
blockingIoDispatcher = config.getString("blocking-io-dispatcher"))
/** /**
* Create [[ActorMaterializerSettings]] from individual settings (Java). * Create [[ActorMaterializerSettings]] from individual settings (Java).
@ -299,11 +306,15 @@ object ActorMaterializerSettings {
outputBurstLimit: Int, outputBurstLimit: Int,
fuzzingMode: Boolean, fuzzingMode: Boolean,
autoFusing: Boolean, autoFusing: Boolean,
maxFixedBufferSize: Int) = maxFixedBufferSize: Int) = {
// these sins were comitted in the name of bin comp:
val config = ConfigFactory.defaultReference
new ActorMaterializerSettings( new ActorMaterializerSettings(
initialInputBufferSize, maxInputBufferSize, dispatcher, supervisionDecider, subscriptionTimeoutSettings, debugLogging, initialInputBufferSize, maxInputBufferSize, dispatcher, supervisionDecider, subscriptionTimeoutSettings, debugLogging,
outputBurstLimit, fuzzingMode, autoFusing, maxFixedBufferSize, 1000, IOSettings(tcpWriteBufferSize = 16 * 1024), outputBurstLimit, fuzzingMode, autoFusing, maxFixedBufferSize, 1000, IOSettings(tcpWriteBufferSize = 16 * 1024),
StreamRefSettings(ConfigFactory.load().getConfig("akka.stream.materializer.stream-ref"))) StreamRefSettings(config.getConfig("akka.stream.materializer.stream-ref")),
config.getString("akka.stream.blocking-io-dispatcher"))
}
/** /**
* Create [[ActorMaterializerSettings]] from the settings of an [[akka.actor.ActorSystem]] (Java). * Create [[ActorMaterializerSettings]] from the settings of an [[akka.actor.ActorSystem]] (Java).
@ -322,8 +333,10 @@ object ActorMaterializerSettings {
/** /**
* This class describes the configurable properties of the [[ActorMaterializer]]. * This class describes the configurable properties of the [[ActorMaterializer]].
* Please refer to the `withX` methods for descriptions of the individual settings. * Please refer to the `withX` methods for descriptions of the individual settings.
*
* The constructor is not public API, use create or apply on the [[ActorMaterializerSettings]] companion instead.
*/ */
final class ActorMaterializerSettings private ( final class ActorMaterializerSettings @InternalApi private (
/* /*
* Important note: `initialInputBufferSize`, `maxInputBufferSize`, `dispatcher` and * Important note: `initialInputBufferSize`, `maxInputBufferSize`, `dispatcher` and
* `supervisionDecider` must not be used as values in the materializer, or anything the materializer phases use * `supervisionDecider` must not be used as values in the materializer, or anything the materializer phases use
@ -342,7 +355,8 @@ final class ActorMaterializerSettings private (
val maxFixedBufferSize: Int, val maxFixedBufferSize: Int,
val syncProcessingLimit: Int, val syncProcessingLimit: Int,
val ioSettings: IOSettings, val ioSettings: IOSettings,
val streamRefSettings: StreamRefSettings) { val streamRefSettings: StreamRefSettings,
val blockingIoDispatcher: String) {
require(initialInputBufferSize > 0, "initialInputBufferSize must be > 0") require(initialInputBufferSize > 0, "initialInputBufferSize must be > 0")
require(syncProcessingLimit > 0, "syncProcessingLimit must be > 0") require(syncProcessingLimit > 0, "syncProcessingLimit must be > 0")
@ -351,6 +365,7 @@ final class ActorMaterializerSettings private (
require(initialInputBufferSize <= maxInputBufferSize, s"initialInputBufferSize($initialInputBufferSize) must be <= maxInputBufferSize($maxInputBufferSize)") require(initialInputBufferSize <= maxInputBufferSize, s"initialInputBufferSize($initialInputBufferSize) must be <= maxInputBufferSize($maxInputBufferSize)")
// backwards compatibility when added IOSettings, shouldn't be needed since private, but added to satisfy mima // backwards compatibility when added IOSettings, shouldn't be needed since private, but added to satisfy mima
@deprecated("Use ActorMaterializerSettings.apply or ActorMaterializerSettings.create instead", "2.5.10")
def this( def this(
initialInputBufferSize: Int, initialInputBufferSize: Int,
maxInputBufferSize: Int, maxInputBufferSize: Int,
@ -364,11 +379,15 @@ final class ActorMaterializerSettings private (
maxFixedBufferSize: Int, maxFixedBufferSize: Int,
syncProcessingLimit: Int, syncProcessingLimit: Int,
ioSettings: IOSettings) = ioSettings: IOSettings) =
// using config like this is not quite right but the only way to solve backwards comp without hard coding settings
this(initialInputBufferSize, maxInputBufferSize, dispatcher, supervisionDecider, subscriptionTimeoutSettings, debugLogging, this(initialInputBufferSize, maxInputBufferSize, dispatcher, supervisionDecider, subscriptionTimeoutSettings, debugLogging,
outputBurstLimit, fuzzingMode, autoFusing, maxFixedBufferSize, syncProcessingLimit, ioSettings, outputBurstLimit, fuzzingMode, autoFusing, maxFixedBufferSize, syncProcessingLimit, ioSettings,
StreamRefSettings(ConfigFactory.load().getConfig("akka.stream.materializer.stream-ref"))) StreamRefSettings(ConfigFactory.defaultReference().getConfig("akka.stream.materializer.stream-ref")),
ConfigFactory.defaultReference().getString("akka.stream.blocking-io-dispatcher")
)
// backwards compatibility when added IOSettings, shouldn't be needed since private, but added to satisfy mima // backwards compatibility when added IOSettings, shouldn't be needed since private, but added to satisfy mima
@deprecated("Use ActorMaterializerSettings.apply or ActorMaterializerSettings.create instead", "2.5.10")
def this( def this(
initialInputBufferSize: Int, initialInputBufferSize: Int,
maxInputBufferSize: Int, maxInputBufferSize: Int,
@ -381,11 +400,15 @@ final class ActorMaterializerSettings private (
autoFusing: Boolean, autoFusing: Boolean,
maxFixedBufferSize: Int, maxFixedBufferSize: Int,
syncProcessingLimit: Int) = syncProcessingLimit: Int) =
// using config like this is not quite right but the only way to solve backwards comp without hard coding settings
this(initialInputBufferSize, maxInputBufferSize, dispatcher, supervisionDecider, subscriptionTimeoutSettings, debugLogging, this(initialInputBufferSize, maxInputBufferSize, dispatcher, supervisionDecider, subscriptionTimeoutSettings, debugLogging,
outputBurstLimit, fuzzingMode, autoFusing, maxFixedBufferSize, syncProcessingLimit, outputBurstLimit, fuzzingMode, autoFusing, maxFixedBufferSize, syncProcessingLimit,
IOSettings(tcpWriteBufferSize = 16 * 1024), StreamRefSettings(ConfigFactory.load().getConfig("akka.stream.materializer.stream-ref"))) IOSettings(tcpWriteBufferSize = 16 * 1024), StreamRefSettings(ConfigFactory.defaultReference().getConfig("akka.stream.materializer.stream-ref")),
ConfigFactory.defaultReference().getString("akka.stream.blocking-io-dispatcher")
)
// backwards compatibility when added IOSettings, shouldn't be needed since private, but added to satisfy mima // backwards compatibility when added IOSettings, shouldn't be needed since private, but added to satisfy mima
@deprecated("Use ActorMaterializerSettings.apply or ActorMaterializerSettings.create instead", "2.5.10")
def this( def this(
initialInputBufferSize: Int, initialInputBufferSize: Int,
maxInputBufferSize: Int, maxInputBufferSize: Int,
@ -397,9 +420,12 @@ final class ActorMaterializerSettings private (
fuzzingMode: Boolean, fuzzingMode: Boolean,
autoFusing: Boolean, autoFusing: Boolean,
maxFixedBufferSize: Int) = maxFixedBufferSize: Int) =
// using config like this is not quite right but the only way to solve backwards comp without hard coding settings
this(initialInputBufferSize, maxInputBufferSize, dispatcher, supervisionDecider, subscriptionTimeoutSettings, debugLogging, this(initialInputBufferSize, maxInputBufferSize, dispatcher, supervisionDecider, subscriptionTimeoutSettings, debugLogging,
outputBurstLimit, fuzzingMode, autoFusing, maxFixedBufferSize, 1000, IOSettings(tcpWriteBufferSize = 16 * 1024), outputBurstLimit, fuzzingMode, autoFusing, maxFixedBufferSize, 1000, IOSettings(tcpWriteBufferSize = 16 * 1024),
StreamRefSettings(ConfigFactory.load().getConfig("akka.stream.materializer.stream-ref"))) StreamRefSettings(ConfigFactory.defaultReference().getConfig("akka.stream.materializer.stream-ref")),
ConfigFactory.defaultReference().getString("akka.stream.blocking-io-dispatcher")
)
private def copy( private def copy(
initialInputBufferSize: Int = this.initialInputBufferSize, initialInputBufferSize: Int = this.initialInputBufferSize,
@ -414,10 +440,11 @@ final class ActorMaterializerSettings private (
maxFixedBufferSize: Int = this.maxFixedBufferSize, maxFixedBufferSize: Int = this.maxFixedBufferSize,
syncProcessingLimit: Int = this.syncProcessingLimit, syncProcessingLimit: Int = this.syncProcessingLimit,
ioSettings: IOSettings = this.ioSettings, ioSettings: IOSettings = this.ioSettings,
streamRefSettings: StreamRefSettings = this.streamRefSettings) = { streamRefSettings: StreamRefSettings = this.streamRefSettings,
blockingIoDispatcher: String = this.blockingIoDispatcher) = {
new ActorMaterializerSettings( new ActorMaterializerSettings(
initialInputBufferSize, maxInputBufferSize, dispatcher, supervisionDecider, subscriptionTimeoutSettings, debugLogging, initialInputBufferSize, maxInputBufferSize, dispatcher, supervisionDecider, subscriptionTimeoutSettings, debugLogging,
outputBurstLimit, fuzzingMode, autoFusing, maxFixedBufferSize, syncProcessingLimit, ioSettings, streamRefSettings) outputBurstLimit, fuzzingMode, autoFusing, maxFixedBufferSize, syncProcessingLimit, ioSettings, streamRefSettings, blockingIoDispatcher)
} }
/** /**
@ -541,6 +568,10 @@ final class ActorMaterializerSettings private (
if (streamRefSettings == this.streamRefSettings) this if (streamRefSettings == this.streamRefSettings) this
else copy(streamRefSettings = streamRefSettings) else copy(streamRefSettings = streamRefSettings)
def withBlockingIoDispatcher(newBlockingIoDispatcher: String): ActorMaterializerSettings =
if (newBlockingIoDispatcher == blockingIoDispatcher) this
else copy(blockingIoDispatcher = newBlockingIoDispatcher)
private def requirePowerOfTwo(n: Integer, name: String): Unit = { private def requirePowerOfTwo(n: Integer, name: String): Unit = {
require(n > 0, s"$name must be > 0") require(n > 0, s"$name must be > 0")
require((n & (n - 1)) == 0, s"$name must be a power of two") require((n & (n - 1)) == 0, s"$name must be a power of two")
@ -558,7 +589,8 @@ final class ActorMaterializerSettings private (
s.syncProcessingLimit == syncProcessingLimit && s.syncProcessingLimit == syncProcessingLimit &&
s.fuzzingMode == fuzzingMode && s.fuzzingMode == fuzzingMode &&
s.autoFusing == autoFusing && s.autoFusing == autoFusing &&
s.ioSettings == ioSettings s.ioSettings == ioSettings &&
s.blockingIoDispatcher == blockingIoDispatcher
case _ false case _ false
} }

View file

@ -367,7 +367,8 @@ object ActorAttributes {
final case class Dispatcher(dispatcher: String) extends MandatoryAttribute final case class Dispatcher(dispatcher: String) extends MandatoryAttribute
final case class SupervisionStrategy(decider: Supervision.Decider) extends MandatoryAttribute final case class SupervisionStrategy(decider: Supervision.Decider) extends MandatoryAttribute
val IODispatcher: Dispatcher = ActorAttributes.Dispatcher("akka.stream.default-blocking-io-dispatcher") // this is actually a config key that needs reading and itself will contain the actual dispatcher name
val IODispatcher: Dispatcher = ActorAttributes.Dispatcher("akka.stream.blocking-io-dispatcher")
/** /**
* Specifies the name of the dispatcher. This also adds an async boundary. * Specifies the name of the dispatcher. This also adds an async boundary.

View file

@ -56,6 +56,8 @@ abstract class Materializer {
* within Sources, Sinks, etc. This [[scala.concurrent.ExecutionContextExecutor]] * within Sources, Sinks, etc. This [[scala.concurrent.ExecutionContextExecutor]]
* can be used by parts of the flow to submit processing jobs for execution, * can be used by parts of the flow to submit processing jobs for execution,
* run Future callbacks, etc. * run Future callbacks, etc.
*
* Note that this is not necessarily the same execution context the stream stage itself is running on.
*/ */
implicit def executionContext: ExecutionContextExecutor implicit def executionContext: ExecutionContextExecutor

View file

@ -43,11 +43,12 @@ import scala.concurrent.{ Await, ExecutionContextExecutor }
* INTERNAL API * INTERNAL API
*/ */
@InternalApi private[akka] override def actorOf(context: MaterializationContext, props: Props): ActorRef = { @InternalApi private[akka] override def actorOf(context: MaterializationContext, props: Props): ActorRef = {
// if the props already have a dispatcher set we respect that, if not
// we take it from the attributes
val effectiveProps = val effectiveProps =
if (props.dispatcher == Dispatchers.DefaultDispatcherId) if (props.dispatcher == Dispatchers.DefaultDispatcherId)
props.withDispatcher(context.effectiveAttributes.mandatoryAttribute[ActorAttributes.Dispatcher].dispatcher) props.withDispatcher(context.effectiveAttributes.mandatoryAttribute[ActorAttributes.Dispatcher].dispatcher)
else if (props.dispatcher == ActorAttributes.IODispatcher.dispatcher)
// this one is actually not a dispatcher but a relative config key pointing containing the actual dispatcher name
props.withDispatcher(settings.blockingIoDispatcher)
else props else props
actorOf(effectiveProps, context.islandName) actorOf(effectiveProps, context.islandName)
} }

View file

@ -12,8 +12,10 @@ import akka.actor.{ ActorContext, ActorRef, ActorRefFactory, ActorSystem, Cancel
import akka.annotation.{ DoNotInherit, InternalApi } import akka.annotation.{ DoNotInherit, InternalApi }
import akka.dispatch.Dispatchers import akka.dispatch.Dispatchers
import akka.event.{ Logging, LoggingAdapter } import akka.event.{ Logging, LoggingAdapter }
import akka.stream.ActorAttributes.Dispatcher
import akka.stream.Attributes.InputBuffer import akka.stream.Attributes.InputBuffer
import akka.stream._ import akka.stream._
import akka.stream.impl.Stages.DefaultAttributes.IODispatcher
import akka.stream.impl.StreamLayout.AtomicModule import akka.stream.impl.StreamLayout.AtomicModule
import akka.stream.impl.fusing.ActorGraphInterpreter.{ ActorOutputBoundary, BatchingActorInputBoundary } import akka.stream.impl.fusing.ActorGraphInterpreter.{ ActorOutputBoundary, BatchingActorInputBoundary }
import akka.stream.impl.fusing.GraphInterpreter.Connection import akka.stream.impl.fusing.GraphInterpreter.Connection
@ -376,7 +378,7 @@ private final case class SavedIslandData(islandGlobalOffset: Int, lastVisitedOff
* Default attributes for the materializer, based on the [[ActorMaterializerSettings]] and * Default attributes for the materializer, based on the [[ActorMaterializerSettings]] and
* are always seen as least specific, so any attribute specified in the graph "wins" over these. * are always seen as least specific, so any attribute specified in the graph "wins" over these.
* In addition to that this also guarantees that the attributes `InputBuffer`, `SupervisionStrategy`, * In addition to that this also guarantees that the attributes `InputBuffer`, `SupervisionStrategy`,
* and `Dispatcher` is _always_ present in the attributes. * and `Dispatcher` is _always_ present in the attributes and can be accessed through `Attributes.mandatoryAttribute`
* *
* When these attributes are needed later in the materialization process it is important that the * When these attributes are needed later in the materialization process it is important that the
* they are gotten through the attributes and not through the [[ActorMaterializerSettings]] * they are gotten through the attributes and not through the [[ActorMaterializerSettings]]
@ -736,8 +738,14 @@ private final case class SavedIslandData(islandGlobalOffset: Int, lastVisitedOff
fuseIntoExistingInterperter(shell) fuseIntoExistingInterperter(shell)
case _ case _
val dispatcher =
effectiveAttributes.mandatoryAttribute[ActorAttributes.Dispatcher] match {
case ActorAttributes.IODispatcher settings.blockingIoDispatcher
case ActorAttributes.Dispatcher(dispatcher) dispatcher
}
val props = ActorGraphInterpreter.props(shell) val props = ActorGraphInterpreter.props(shell)
.withDispatcher(effectiveAttributes.mandatoryAttribute[ActorAttributes.Dispatcher].dispatcher) .withDispatcher(dispatcher)
val actorName = fullIslandName match { val actorName = fullIslandName match {
case OptionVal.Some(n) n case OptionVal.Some(n) n
case OptionVal.None islandName case OptionVal.None islandName
@ -879,7 +887,10 @@ private final case class SavedIslandData(islandGlobalOffset: Int, lastVisitedOff
def materializeAtomic(mod: AtomicModule[Shape, Any], attributes: Attributes): (NotUsed, Any) = { def materializeAtomic(mod: AtomicModule[Shape, Any], attributes: Attributes): (NotUsed, Any) = {
val tls = mod.asInstanceOf[TlsModule] val tls = mod.asInstanceOf[TlsModule]
val dispatcher = attributes.mandatoryAttribute[ActorAttributes.Dispatcher].dispatcher val dispatcher = attributes.mandatoryAttribute[Dispatcher] match {
case IODispatcher materializer.settings.blockingIoDispatcher
case Dispatcher(name) name
}
val maxInputBuffer = attributes.mandatoryAttribute[Attributes.InputBuffer].max val maxInputBuffer = attributes.mandatoryAttribute[Attributes.InputBuffer].max
val props = val props =

View file

@ -4,7 +4,7 @@
package akka.stream.impl.io package akka.stream.impl.io
import java.io.OutputStream import java.io.OutputStream
import java.nio.file.{ Path, OpenOption } import java.nio.file.{ OpenOption, Path }
import akka.annotation.InternalApi import akka.annotation.InternalApi
import akka.stream._ import akka.stream._
@ -33,7 +33,11 @@ import scala.concurrent.{ Future, Promise }
val ioResultPromise = Promise[IOResult]() val ioResultPromise = Promise[IOResult]()
val props = FileSubscriber.props(f, ioResultPromise, maxInputBufferSize, startPosition, options) val props = FileSubscriber.props(f, ioResultPromise, maxInputBufferSize, startPosition, options)
val dispatcher = context.effectiveAttributes.get[Dispatcher](IODispatcher).dispatcher
val dispatcher = context.effectiveAttributes.mandatoryAttribute[Dispatcher] match {
case IODispatcher ActorMaterializerHelper.downcast(context.materializer).settings.blockingIoDispatcher
case Dispatcher(name) name
}
val ref = materializer.actorOf(context, props.withDispatcher(dispatcher)) val ref = materializer.actorOf(context, props.withDispatcher(dispatcher))
(akka.stream.actor.ActorSubscriber[ByteString](ref), ioResultPromise.future) (akka.stream.actor.ActorSubscriber[ByteString](ref), ioResultPromise.future)
@ -61,7 +65,12 @@ import scala.concurrent.{ Future, Promise }
val maxInputBufferSize = context.effectiveAttributes.mandatoryAttribute[Attributes.InputBuffer].max val maxInputBufferSize = context.effectiveAttributes.mandatoryAttribute[Attributes.InputBuffer].max
val props = OutputStreamSubscriber.props(os, ioResultPromise, maxInputBufferSize, autoFlush) val dispatcher = context.effectiveAttributes.mandatoryAttribute[Dispatcher] match {
case IODispatcher ActorMaterializerHelper.downcast(context.materializer).settings.blockingIoDispatcher
case Dispatcher(name) name
}
val props = OutputStreamSubscriber.props(os, ioResultPromise, maxInputBufferSize, autoFlush).withDispatcher(dispatcher)
val ref = materializer.actorOf(context, props) val ref = materializer.actorOf(context, props)
(akka.stream.actor.ActorSubscriber[ByteString](ref), ioResultPromise.future) (akka.stream.actor.ActorSubscriber[ByteString](ref), ioResultPromise.future)

View file

@ -10,7 +10,9 @@ import java.nio.file.{ Files, NoSuchFileException, Path, StandardOpenOption }
import akka.Done import akka.Done
import akka.annotation.InternalApi import akka.annotation.InternalApi
import akka.stream.ActorAttributes.Dispatcher
import akka.stream.Attributes.InputBuffer import akka.stream.Attributes.InputBuffer
import akka.stream.impl.Stages.DefaultAttributes.IODispatcher
import akka.stream.impl.{ ErrorPublisher, SourceModule } import akka.stream.impl.{ ErrorPublisher, SourceModule }
import akka.stream.stage._ import akka.stream.stage._
import akka.stream.{ IOResult, _ } import akka.stream.{ IOResult, _ }
@ -153,7 +155,12 @@ private[akka] final class FileSource(path: Path, chunkSize: Int, startPosition:
val pub = try { val pub = try {
val is = createInputStream() // can throw, i.e. FileNotFound val is = createInputStream() // can throw, i.e. FileNotFound
val props = InputStreamPublisher.props(is, ioResultPromise, chunkSize) val dispatcher = context.effectiveAttributes.mandatoryAttribute[Dispatcher] match {
case IODispatcher ActorMaterializerHelper.downcast(context.materializer).settings.blockingIoDispatcher
case Dispatcher(name) name
}
val props = InputStreamPublisher.props(is, ioResultPromise, chunkSize).withDispatcher(dispatcher)
val ref = materializer.actorOf(context, props) val ref = materializer.actorOf(context, props)
akka.stream.actor.ActorPublisher[ByteString](ref) akka.stream.actor.ActorPublisher[ByteString](ref)

View file

@ -6,22 +6,18 @@ package akka.stream.impl.io
import java.io.{ IOException, OutputStream } import java.io.{ IOException, OutputStream }
import java.util.concurrent.atomic.AtomicReference import java.util.concurrent.atomic.AtomicReference
import java.util.concurrent.{ BlockingQueue, LinkedBlockingQueue } import java.util.concurrent.{ BlockingQueue, LinkedBlockingQueue }
import akka.stream.{ Outlet, SourceShape, Attributes }
import akka.stream.Attributes.InputBuffer import akka.stream.Attributes.InputBuffer
import akka.stream.impl.Stages.DefaultAttributes import akka.stream.impl.Stages.DefaultAttributes
import akka.stream.impl.io.OutputStreamSourceStage._ import akka.stream.impl.io.OutputStreamSourceStage._
import akka.stream.stage._ import akka.stream.stage._
import akka.stream.{ ActorMaterializerHelper, Attributes, Outlet, SourceShape }
import akka.util.ByteString import akka.util.ByteString
import scala.concurrent.duration.FiniteDuration import scala.concurrent.duration.FiniteDuration
import scala.concurrent.{ Await, Future, Promise } import scala.concurrent.{ Await, ExecutionContext, Future, Promise }
import scala.util.control.NonFatal import scala.util.control.NonFatal
import scala.util.{ Failure, Success, Try } import scala.util.{ Failure, Success, Try }
import akka.stream.ActorAttributes
import akka.stream.impl.Stages.DefaultAttributes.IODispatcher
import akka.stream.ActorAttributes.Dispatcher
import scala.concurrent.ExecutionContext
import akka.stream.ActorMaterializer
import akka.stream.ActorMaterializerHelper
private[stream] object OutputStreamSourceStage { private[stream] object OutputStreamSourceStage {
sealed trait AdapterToStageMessage sealed trait AdapterToStageMessage
@ -42,8 +38,6 @@ final private[stream] class OutputStreamSourceStage(writeTimeout: FiniteDuration
override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, OutputStream) = { override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, OutputStream) = {
val maxBuffer = inheritedAttributes.getAttribute(classOf[InputBuffer], InputBuffer(16, 16)).max val maxBuffer = inheritedAttributes.getAttribute(classOf[InputBuffer], InputBuffer(16, 16)).max
val dispatcherId = inheritedAttributes.get[Dispatcher](IODispatcher).dispatcher
require(maxBuffer > 0, "Buffer size must be greater than 0") require(maxBuffer > 0, "Buffer size must be greater than 0")
val dataQueue = new LinkedBlockingQueue[ByteString](maxBuffer) val dataQueue = new LinkedBlockingQueue[ByteString](maxBuffer)
@ -109,8 +103,10 @@ final private[stream] class OutputStreamSourceStage(writeTimeout: FiniteDuration
} }
override def preStart(): Unit = { override def preStart(): Unit = {
dispatcher = ActorMaterializerHelper.downcast(materializer).system.dispatchers.lookup(dispatcherId) // this stage is running on the blocking IO dispatcher by default, but we also want to schedule futures
super.preStart() // that are blocking, so we need to look it up
val actorMat = ActorMaterializerHelper.downcast(materializer)
dispatcher = actorMat.system.dispatchers.lookup(actorMat.settings.blockingIoDispatcher)
} }
setHandler(out, new OutHandler { setHandler(out, new OutHandler {