diff --git a/akka-actor/src/main/mima-filters/2.5.10.backwards.excludes b/akka-actor/src/main/mima-filters/2.5.10.backwards.excludes index 4c284b396f..e69de29bb2 100644 --- a/akka-actor/src/main/mima-filters/2.5.10.backwards.excludes +++ b/akka-actor/src/main/mima-filters/2.5.10.backwards.excludes @@ -1,8 +0,0 @@ -# Disable phases in Coordinated Shutdown -ProblemFilters.exclude[DirectMissingMethodProblem]("akka.actor.CoordinatedShutdown#Phase.copy") -ProblemFilters.exclude[DirectMissingMethodProblem]("akka.actor.CoordinatedShutdown#Phase.this") -ProblemFilters.exclude[MissingTypesProblem]("akka.actor.CoordinatedShutdown$Phase$") -ProblemFilters.exclude[DirectMissingMethodProblem]("akka.actor.CoordinatedShutdown#Phase.apply") - -# Path based WriteFile command #23902 -ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.io.TcpConnection.PendingWriteFile") \ No newline at end of file diff --git a/akka-actor/src/main/mima-filters/2.5.9.backwards.excludes b/akka-actor/src/main/mima-filters/2.5.9.backwards.excludes index 6da73fdc7b..c5370b95c2 100644 --- a/akka-actor/src/main/mima-filters/2.5.9.backwards.excludes +++ b/akka-actor/src/main/mima-filters/2.5.9.backwards.excludes @@ -3,3 +3,12 @@ ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.actor.ActorSystem.get # #23770 typed actor context ask ProblemFilters.exclude[DirectMissingMethodProblem]("akka.pattern.PromiseActorRef.apply") + +# Disable phases in Coordinated Shutdown +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.actor.CoordinatedShutdown#Phase.copy") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.actor.CoordinatedShutdown#Phase.this") +ProblemFilters.exclude[MissingTypesProblem]("akka.actor.CoordinatedShutdown$Phase$") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.actor.CoordinatedShutdown#Phase.apply") + +# Path based WriteFile command #23902 +ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.io.TcpConnection.PendingWriteFile") \ No newline at end of file diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/AttributesSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/AttributesSpec.scala index 65be467fc6..a82827c46d 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/AttributesSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/AttributesSpec.scala @@ -6,12 +6,14 @@ package akka.stream.scaladsl import java.util.Optional import java.util.concurrent.{ CompletableFuture, CompletionStage, TimeUnit } +import akka.actor.ActorSystem import akka.{ Done, NotUsed } import akka.stream.Attributes._ import akka.stream._ import akka.stream.javadsl import akka.stream.stage._ import akka.stream.testkit._ +import akka.testkit.TestKit import com.typesafe.config.ConfigFactory object AttributesSpec { @@ -76,10 +78,12 @@ object AttributesSpec { } } - class ThreadNameSnitchingStage(initialDispatcher: String) extends GraphStage[SourceShape[String]] { + class ThreadNameSnitchingStage(initialDispatcher: Option[String]) extends GraphStage[SourceShape[String]] { + def this(initialDispatcher: String) = this(Some(initialDispatcher)) val out = Outlet[String]("out") override val shape = SourceShape.of(out) - override protected def initialAttributes: Attributes = ActorAttributes.dispatcher(initialDispatcher) + override protected def initialAttributes: Attributes = + initialDispatcher.fold(Attributes.none)(name ⇒ ActorAttributes.dispatcher(name)) def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) { setHandler(out, new OutHandler { def onPull(): Unit = { @@ -105,7 +109,10 @@ class AttributesSpec extends StreamSpec(ConfigFactory.parseString( } throughput = 1 } - """).withFallback(Utils.UnboundedMailboxConfig)) { + """) + // we need to revert to the regular mailbox or else the test suite will complain + // about using non-test worthy dispatchers + .withFallback(Utils.UnboundedMailboxConfig)) { import AttributesSpec._ @@ -491,4 +498,69 @@ class AttributesSpec extends StreamSpec(ConfigFactory.parseString( } } + + "the default dispatcher attributes" must { + + val config = ConfigFactory.parseString(s""" + my-dispatcher { + type = Dispatcher + executor = "thread-pool-executor" + thread-pool-executor { + fixed-pool-size = 1 + } + throughput = 1 + } + my-io-dispatcher = $${my-dispatcher} + akka.stream.materializer.dispatcher = "my-dispatcher" + akka.stream.materializer.blocking-io-dispatcher = "my-io-dispatcher" + """) + // we need to revert to the regular mailbox or else the test suite will complain + // about using non-test worthy dispatchers + .withFallback(Utils.UnboundedMailboxConfig).resolve() + + "allow for specifying a custom default dispatcher" in { + + val system = ActorSystem("AttributesSpec-default-dispatcher-override", config) + try { + + val mat = ActorMaterializer()(system) + val threadName = + Source.fromGraph(new ThreadNameSnitchingStage(None)).runWith(Sink.head)(mat) + + threadName.futureValue should startWith("AttributesSpec-default-dispatcher-override-my-dispatcher-") + + } finally { + TestKit.shutdownActorSystem(system) + } + } + + "use the default-io-dispatcher by default" in { + + val threadName = + Source.fromGraph(new ThreadNameSnitchingStage(None) + // FIXME surprising that we don't have something shorter than this + .addAttributes(Attributes.none and ActorAttributes.IODispatcher)).runWith(Sink.head) + + threadName.futureValue should startWith("AttributesSpec-akka.stream.default-blocking-io-dispatcher") + } + + "allow for specifying a custom default io-dispatcher" in { + + val system = ActorSystem("AttributesSpec-io-dispatcher-override", config) + try { + + val mat = ActorMaterializer()(system) + val threadName = + Source.fromGraph(new ThreadNameSnitchingStage(None) + // FIXME surprising that we don't have something shorter than this + .addAttributes(Attributes.none and ActorAttributes.IODispatcher)).runWith(Sink.head)(mat) + + threadName.futureValue should startWith("AttributesSpec-io-dispatcher-override-my-io-dispatcher-") + + } finally { + TestKit.shutdownActorSystem(system) + } + } + } + } diff --git a/akka-stream/src/main/mima-filters/2.5.9.backwards.excludes b/akka-stream/src/main/mima-filters/2.5.9.backwards.excludes index 67c4ddf287..4cd97056f0 100644 --- a/akka-stream/src/main/mima-filters/2.5.9.backwards.excludes +++ b/akka-stream/src/main/mima-filters/2.5.9.backwards.excludes @@ -13,6 +13,9 @@ ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.ActorRefSou ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.ActorRefBackpressureSinkStage.this") ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.ActorRefSourceActor.this") +# 24357 io-dispatcher setting +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.ActorMaterializerSettings.this") + # #24254 add collectType ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.scaladsl.FlowOps.collectType") diff --git a/akka-stream/src/main/resources/reference.conf b/akka-stream/src/main/resources/reference.conf index b6c3621dcd..6d1bc2fe22 100644 --- a/akka-stream/src/main/resources/reference.conf +++ b/akka-stream/src/main/resources/reference.conf @@ -18,6 +18,8 @@ akka { # When this value is left empty, the default-dispatcher will be used. dispatcher = "" + blocking-io-dispatcher = "akka.stream.default-blocking-io-dispatcher" + # Cleanup leaked publishers and subscribers when they are not used within a given # deadline subscription-timeout { @@ -111,10 +113,9 @@ akka { //#stream-ref } - # Fully qualified config path which holds the dispatcher configuration - # to be used by ActorMaterializer when creating Actors for IO operations, - # such as FileSource, FileSink and others. - blocking-io-dispatcher = "akka.stream.default-blocking-io-dispatcher" + # Deprecated, use akka.stream.materializer.blocking-io-dispatcher, this setting + # was never applied because of bug #24357 + blocking-io-dispatcher = "" default-blocking-io-dispatcher { type = "Dispatcher" diff --git a/akka-stream/src/main/scala/akka/stream/ActorMaterializer.scala b/akka-stream/src/main/scala/akka/stream/ActorMaterializer.scala index ef542d0372..529cf73187 100644 --- a/akka-stream/src/main/scala/akka/stream/ActorMaterializer.scala +++ b/akka-stream/src/main/scala/akka/stream/ActorMaterializer.scala @@ -6,7 +6,8 @@ package akka.stream import java.util.concurrent.TimeUnit import java.util.concurrent.atomic.AtomicBoolean -import akka.actor.{ ActorContext, ActorRef, ActorRefFactory, ActorSystem, ActorSystemImpl, ExtendedActorSystem, Props } +import akka.actor.{ ActorContext, ActorRef, ActorRefFactory, ActorSystem, ExtendedActorSystem, Props } +import akka.annotation.InternalApi import akka.event.LoggingAdapter import akka.util.Helpers.toRootLowerCase import akka.stream.impl._ @@ -253,11 +254,16 @@ object ActorMaterializerSettings { outputBurstLimit: Int, fuzzingMode: Boolean, autoFusing: Boolean, - maxFixedBufferSize: Int) = + maxFixedBufferSize: Int) = { + // these sins were comitted in the name of bin comp: + val config = ConfigFactory.defaultReference new ActorMaterializerSettings( initialInputBufferSize, maxInputBufferSize, dispatcher, supervisionDecider, subscriptionTimeoutSettings, debugLogging, outputBurstLimit, fuzzingMode, autoFusing, maxFixedBufferSize, 1000, IOSettings(tcpWriteBufferSize = 16 * 1024), - StreamRefSettings(ConfigFactory.load().getConfig("akka.stream.materializer.stream-ref"))) + StreamRefSettings(config.getConfig("akka.stream.materializer.stream-ref")), + config.getString("akka.stream.blocking-io-dispatcher") + ) + } /** * Create [[ActorMaterializerSettings]] from the settings of an [[akka.actor.ActorSystem]] (Scala). @@ -282,7 +288,8 @@ object ActorMaterializerSettings { maxFixedBufferSize = config.getInt("max-fixed-buffer-size"), syncProcessingLimit = config.getInt("sync-processing-limit"), ioSettings = IOSettings(config.getConfig("io")), - streamRefSettings = StreamRefSettings(config.getConfig("stream-ref"))) + streamRefSettings = StreamRefSettings(config.getConfig("stream-ref")), + blockingIoDispatcher = config.getString("blocking-io-dispatcher")) /** * Create [[ActorMaterializerSettings]] from individual settings (Java). @@ -299,11 +306,15 @@ object ActorMaterializerSettings { outputBurstLimit: Int, fuzzingMode: Boolean, autoFusing: Boolean, - maxFixedBufferSize: Int) = + maxFixedBufferSize: Int) = { + // these sins were comitted in the name of bin comp: + val config = ConfigFactory.defaultReference new ActorMaterializerSettings( initialInputBufferSize, maxInputBufferSize, dispatcher, supervisionDecider, subscriptionTimeoutSettings, debugLogging, outputBurstLimit, fuzzingMode, autoFusing, maxFixedBufferSize, 1000, IOSettings(tcpWriteBufferSize = 16 * 1024), - StreamRefSettings(ConfigFactory.load().getConfig("akka.stream.materializer.stream-ref"))) + StreamRefSettings(config.getConfig("akka.stream.materializer.stream-ref")), + config.getString("akka.stream.blocking-io-dispatcher")) + } /** * Create [[ActorMaterializerSettings]] from the settings of an [[akka.actor.ActorSystem]] (Java). @@ -322,8 +333,10 @@ object ActorMaterializerSettings { /** * This class describes the configurable properties of the [[ActorMaterializer]]. * Please refer to the `withX` methods for descriptions of the individual settings. + * + * The constructor is not public API, use create or apply on the [[ActorMaterializerSettings]] companion instead. */ -final class ActorMaterializerSettings private ( +final class ActorMaterializerSettings @InternalApi private ( /* * Important note: `initialInputBufferSize`, `maxInputBufferSize`, `dispatcher` and * `supervisionDecider` must not be used as values in the materializer, or anything the materializer phases use @@ -342,7 +355,8 @@ final class ActorMaterializerSettings private ( val maxFixedBufferSize: Int, val syncProcessingLimit: Int, val ioSettings: IOSettings, - val streamRefSettings: StreamRefSettings) { + val streamRefSettings: StreamRefSettings, + val blockingIoDispatcher: String) { require(initialInputBufferSize > 0, "initialInputBufferSize must be > 0") require(syncProcessingLimit > 0, "syncProcessingLimit must be > 0") @@ -351,6 +365,7 @@ final class ActorMaterializerSettings private ( require(initialInputBufferSize <= maxInputBufferSize, s"initialInputBufferSize($initialInputBufferSize) must be <= maxInputBufferSize($maxInputBufferSize)") // backwards compatibility when added IOSettings, shouldn't be needed since private, but added to satisfy mima + @deprecated("Use ActorMaterializerSettings.apply or ActorMaterializerSettings.create instead", "2.5.10") def this( initialInputBufferSize: Int, maxInputBufferSize: Int, @@ -364,11 +379,15 @@ final class ActorMaterializerSettings private ( maxFixedBufferSize: Int, syncProcessingLimit: Int, ioSettings: IOSettings) = + // using config like this is not quite right but the only way to solve backwards comp without hard coding settings this(initialInputBufferSize, maxInputBufferSize, dispatcher, supervisionDecider, subscriptionTimeoutSettings, debugLogging, outputBurstLimit, fuzzingMode, autoFusing, maxFixedBufferSize, syncProcessingLimit, ioSettings, - StreamRefSettings(ConfigFactory.load().getConfig("akka.stream.materializer.stream-ref"))) + StreamRefSettings(ConfigFactory.defaultReference().getConfig("akka.stream.materializer.stream-ref")), + ConfigFactory.defaultReference().getString("akka.stream.blocking-io-dispatcher") + ) // backwards compatibility when added IOSettings, shouldn't be needed since private, but added to satisfy mima + @deprecated("Use ActorMaterializerSettings.apply or ActorMaterializerSettings.create instead", "2.5.10") def this( initialInputBufferSize: Int, maxInputBufferSize: Int, @@ -381,11 +400,15 @@ final class ActorMaterializerSettings private ( autoFusing: Boolean, maxFixedBufferSize: Int, syncProcessingLimit: Int) = + // using config like this is not quite right but the only way to solve backwards comp without hard coding settings this(initialInputBufferSize, maxInputBufferSize, dispatcher, supervisionDecider, subscriptionTimeoutSettings, debugLogging, outputBurstLimit, fuzzingMode, autoFusing, maxFixedBufferSize, syncProcessingLimit, - IOSettings(tcpWriteBufferSize = 16 * 1024), StreamRefSettings(ConfigFactory.load().getConfig("akka.stream.materializer.stream-ref"))) + IOSettings(tcpWriteBufferSize = 16 * 1024), StreamRefSettings(ConfigFactory.defaultReference().getConfig("akka.stream.materializer.stream-ref")), + ConfigFactory.defaultReference().getString("akka.stream.blocking-io-dispatcher") + ) // backwards compatibility when added IOSettings, shouldn't be needed since private, but added to satisfy mima + @deprecated("Use ActorMaterializerSettings.apply or ActorMaterializerSettings.create instead", "2.5.10") def this( initialInputBufferSize: Int, maxInputBufferSize: Int, @@ -397,9 +420,12 @@ final class ActorMaterializerSettings private ( fuzzingMode: Boolean, autoFusing: Boolean, maxFixedBufferSize: Int) = + // using config like this is not quite right but the only way to solve backwards comp without hard coding settings this(initialInputBufferSize, maxInputBufferSize, dispatcher, supervisionDecider, subscriptionTimeoutSettings, debugLogging, outputBurstLimit, fuzzingMode, autoFusing, maxFixedBufferSize, 1000, IOSettings(tcpWriteBufferSize = 16 * 1024), - StreamRefSettings(ConfigFactory.load().getConfig("akka.stream.materializer.stream-ref"))) + StreamRefSettings(ConfigFactory.defaultReference().getConfig("akka.stream.materializer.stream-ref")), + ConfigFactory.defaultReference().getString("akka.stream.blocking-io-dispatcher") + ) private def copy( initialInputBufferSize: Int = this.initialInputBufferSize, @@ -414,10 +440,11 @@ final class ActorMaterializerSettings private ( maxFixedBufferSize: Int = this.maxFixedBufferSize, syncProcessingLimit: Int = this.syncProcessingLimit, ioSettings: IOSettings = this.ioSettings, - streamRefSettings: StreamRefSettings = this.streamRefSettings) = { + streamRefSettings: StreamRefSettings = this.streamRefSettings, + blockingIoDispatcher: String = this.blockingIoDispatcher) = { new ActorMaterializerSettings( initialInputBufferSize, maxInputBufferSize, dispatcher, supervisionDecider, subscriptionTimeoutSettings, debugLogging, - outputBurstLimit, fuzzingMode, autoFusing, maxFixedBufferSize, syncProcessingLimit, ioSettings, streamRefSettings) + outputBurstLimit, fuzzingMode, autoFusing, maxFixedBufferSize, syncProcessingLimit, ioSettings, streamRefSettings, blockingIoDispatcher) } /** @@ -541,6 +568,10 @@ final class ActorMaterializerSettings private ( if (streamRefSettings == this.streamRefSettings) this else copy(streamRefSettings = streamRefSettings) + def withBlockingIoDispatcher(newBlockingIoDispatcher: String): ActorMaterializerSettings = + if (newBlockingIoDispatcher == blockingIoDispatcher) this + else copy(blockingIoDispatcher = newBlockingIoDispatcher) + private def requirePowerOfTwo(n: Integer, name: String): Unit = { require(n > 0, s"$name must be > 0") require((n & (n - 1)) == 0, s"$name must be a power of two") @@ -558,7 +589,8 @@ final class ActorMaterializerSettings private ( s.syncProcessingLimit == syncProcessingLimit && s.fuzzingMode == fuzzingMode && s.autoFusing == autoFusing && - s.ioSettings == ioSettings + s.ioSettings == ioSettings && + s.blockingIoDispatcher == blockingIoDispatcher case _ ⇒ false } diff --git a/akka-stream/src/main/scala/akka/stream/Attributes.scala b/akka-stream/src/main/scala/akka/stream/Attributes.scala index 9ea43836fd..d97cfc6358 100644 --- a/akka-stream/src/main/scala/akka/stream/Attributes.scala +++ b/akka-stream/src/main/scala/akka/stream/Attributes.scala @@ -367,7 +367,8 @@ object ActorAttributes { final case class Dispatcher(dispatcher: String) extends MandatoryAttribute final case class SupervisionStrategy(decider: Supervision.Decider) extends MandatoryAttribute - val IODispatcher: Dispatcher = ActorAttributes.Dispatcher("akka.stream.default-blocking-io-dispatcher") + // this is actually a config key that needs reading and itself will contain the actual dispatcher name + val IODispatcher: Dispatcher = ActorAttributes.Dispatcher("akka.stream.blocking-io-dispatcher") /** * Specifies the name of the dispatcher. This also adds an async boundary. diff --git a/akka-stream/src/main/scala/akka/stream/Materializer.scala b/akka-stream/src/main/scala/akka/stream/Materializer.scala index f4b3825592..42f4038b23 100644 --- a/akka-stream/src/main/scala/akka/stream/Materializer.scala +++ b/akka-stream/src/main/scala/akka/stream/Materializer.scala @@ -56,6 +56,8 @@ abstract class Materializer { * within Sources, Sinks, etc. This [[scala.concurrent.ExecutionContextExecutor]] * can be used by parts of the flow to submit processing jobs for execution, * run Future callbacks, etc. + * + * Note that this is not necessarily the same execution context the stream stage itself is running on. */ implicit def executionContext: ExecutionContextExecutor diff --git a/akka-stream/src/main/scala/akka/stream/impl/ActorMaterializerImpl.scala b/akka-stream/src/main/scala/akka/stream/impl/ActorMaterializerImpl.scala index d351beae6d..097f7d6d12 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/ActorMaterializerImpl.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/ActorMaterializerImpl.scala @@ -43,11 +43,12 @@ import scala.concurrent.{ Await, ExecutionContextExecutor } * INTERNAL API */ @InternalApi private[akka] override def actorOf(context: MaterializationContext, props: Props): ActorRef = { - // if the props already have a dispatcher set we respect that, if not - // we take it from the attributes val effectiveProps = if (props.dispatcher == Dispatchers.DefaultDispatcherId) props.withDispatcher(context.effectiveAttributes.mandatoryAttribute[ActorAttributes.Dispatcher].dispatcher) + else if (props.dispatcher == ActorAttributes.IODispatcher.dispatcher) + // this one is actually not a dispatcher but a relative config key pointing containing the actual dispatcher name + props.withDispatcher(settings.blockingIoDispatcher) else props actorOf(effectiveProps, context.islandName) } diff --git a/akka-stream/src/main/scala/akka/stream/impl/PhasedFusingActorMaterializer.scala b/akka-stream/src/main/scala/akka/stream/impl/PhasedFusingActorMaterializer.scala index 959bc79001..d57c461a6d 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/PhasedFusingActorMaterializer.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/PhasedFusingActorMaterializer.scala @@ -12,8 +12,10 @@ import akka.actor.{ ActorContext, ActorRef, ActorRefFactory, ActorSystem, Cancel import akka.annotation.{ DoNotInherit, InternalApi } import akka.dispatch.Dispatchers import akka.event.{ Logging, LoggingAdapter } +import akka.stream.ActorAttributes.Dispatcher import akka.stream.Attributes.InputBuffer import akka.stream._ +import akka.stream.impl.Stages.DefaultAttributes.IODispatcher import akka.stream.impl.StreamLayout.AtomicModule import akka.stream.impl.fusing.ActorGraphInterpreter.{ ActorOutputBoundary, BatchingActorInputBoundary } import akka.stream.impl.fusing.GraphInterpreter.Connection @@ -376,7 +378,7 @@ private final case class SavedIslandData(islandGlobalOffset: Int, lastVisitedOff * Default attributes for the materializer, based on the [[ActorMaterializerSettings]] and * are always seen as least specific, so any attribute specified in the graph "wins" over these. * In addition to that this also guarantees that the attributes `InputBuffer`, `SupervisionStrategy`, - * and `Dispatcher` is _always_ present in the attributes. + * and `Dispatcher` is _always_ present in the attributes and can be accessed through `Attributes.mandatoryAttribute` * * When these attributes are needed later in the materialization process it is important that the * they are gotten through the attributes and not through the [[ActorMaterializerSettings]] @@ -736,8 +738,14 @@ private final case class SavedIslandData(islandGlobalOffset: Int, lastVisitedOff fuseIntoExistingInterperter(shell) case _ ⇒ + + val dispatcher = + effectiveAttributes.mandatoryAttribute[ActorAttributes.Dispatcher] match { + case ActorAttributes.IODispatcher ⇒ settings.blockingIoDispatcher + case ActorAttributes.Dispatcher(dispatcher) ⇒ dispatcher + } val props = ActorGraphInterpreter.props(shell) - .withDispatcher(effectiveAttributes.mandatoryAttribute[ActorAttributes.Dispatcher].dispatcher) + .withDispatcher(dispatcher) val actorName = fullIslandName match { case OptionVal.Some(n) ⇒ n case OptionVal.None ⇒ islandName @@ -879,7 +887,10 @@ private final case class SavedIslandData(islandGlobalOffset: Int, lastVisitedOff def materializeAtomic(mod: AtomicModule[Shape, Any], attributes: Attributes): (NotUsed, Any) = { val tls = mod.asInstanceOf[TlsModule] - val dispatcher = attributes.mandatoryAttribute[ActorAttributes.Dispatcher].dispatcher + val dispatcher = attributes.mandatoryAttribute[Dispatcher] match { + case IODispatcher ⇒ materializer.settings.blockingIoDispatcher + case Dispatcher(name) ⇒ name + } val maxInputBuffer = attributes.mandatoryAttribute[Attributes.InputBuffer].max val props = diff --git a/akka-stream/src/main/scala/akka/stream/impl/io/IOSinks.scala b/akka-stream/src/main/scala/akka/stream/impl/io/IOSinks.scala index 4eb5d366fe..aeb8833718 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/io/IOSinks.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/io/IOSinks.scala @@ -4,7 +4,7 @@ package akka.stream.impl.io import java.io.OutputStream -import java.nio.file.{ Path, OpenOption } +import java.nio.file.{ OpenOption, Path } import akka.annotation.InternalApi import akka.stream._ @@ -33,7 +33,11 @@ import scala.concurrent.{ Future, Promise } val ioResultPromise = Promise[IOResult]() val props = FileSubscriber.props(f, ioResultPromise, maxInputBufferSize, startPosition, options) - val dispatcher = context.effectiveAttributes.get[Dispatcher](IODispatcher).dispatcher + + val dispatcher = context.effectiveAttributes.mandatoryAttribute[Dispatcher] match { + case IODispatcher ⇒ ActorMaterializerHelper.downcast(context.materializer).settings.blockingIoDispatcher + case Dispatcher(name) ⇒ name + } val ref = materializer.actorOf(context, props.withDispatcher(dispatcher)) (akka.stream.actor.ActorSubscriber[ByteString](ref), ioResultPromise.future) @@ -61,7 +65,12 @@ import scala.concurrent.{ Future, Promise } val maxInputBufferSize = context.effectiveAttributes.mandatoryAttribute[Attributes.InputBuffer].max - val props = OutputStreamSubscriber.props(os, ioResultPromise, maxInputBufferSize, autoFlush) + val dispatcher = context.effectiveAttributes.mandatoryAttribute[Dispatcher] match { + case IODispatcher ⇒ ActorMaterializerHelper.downcast(context.materializer).settings.blockingIoDispatcher + case Dispatcher(name) ⇒ name + } + + val props = OutputStreamSubscriber.props(os, ioResultPromise, maxInputBufferSize, autoFlush).withDispatcher(dispatcher) val ref = materializer.actorOf(context, props) (akka.stream.actor.ActorSubscriber[ByteString](ref), ioResultPromise.future) diff --git a/akka-stream/src/main/scala/akka/stream/impl/io/IOSources.scala b/akka-stream/src/main/scala/akka/stream/impl/io/IOSources.scala index a5c7496df6..bcfaa75766 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/io/IOSources.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/io/IOSources.scala @@ -10,7 +10,9 @@ import java.nio.file.{ Files, NoSuchFileException, Path, StandardOpenOption } import akka.Done import akka.annotation.InternalApi +import akka.stream.ActorAttributes.Dispatcher import akka.stream.Attributes.InputBuffer +import akka.stream.impl.Stages.DefaultAttributes.IODispatcher import akka.stream.impl.{ ErrorPublisher, SourceModule } import akka.stream.stage._ import akka.stream.{ IOResult, _ } @@ -153,7 +155,12 @@ private[akka] final class FileSource(path: Path, chunkSize: Int, startPosition: val pub = try { val is = createInputStream() // can throw, i.e. FileNotFound - val props = InputStreamPublisher.props(is, ioResultPromise, chunkSize) + val dispatcher = context.effectiveAttributes.mandatoryAttribute[Dispatcher] match { + case IODispatcher ⇒ ActorMaterializerHelper.downcast(context.materializer).settings.blockingIoDispatcher + case Dispatcher(name) ⇒ name + } + + val props = InputStreamPublisher.props(is, ioResultPromise, chunkSize).withDispatcher(dispatcher) val ref = materializer.actorOf(context, props) akka.stream.actor.ActorPublisher[ByteString](ref) diff --git a/akka-stream/src/main/scala/akka/stream/impl/io/OutputStreamSourceStage.scala b/akka-stream/src/main/scala/akka/stream/impl/io/OutputStreamSourceStage.scala index 65b71c2a4b..ae97b00977 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/io/OutputStreamSourceStage.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/io/OutputStreamSourceStage.scala @@ -6,22 +6,18 @@ package akka.stream.impl.io import java.io.{ IOException, OutputStream } import java.util.concurrent.atomic.AtomicReference import java.util.concurrent.{ BlockingQueue, LinkedBlockingQueue } -import akka.stream.{ Outlet, SourceShape, Attributes } + import akka.stream.Attributes.InputBuffer import akka.stream.impl.Stages.DefaultAttributes import akka.stream.impl.io.OutputStreamSourceStage._ import akka.stream.stage._ +import akka.stream.{ ActorMaterializerHelper, Attributes, Outlet, SourceShape } import akka.util.ByteString + import scala.concurrent.duration.FiniteDuration -import scala.concurrent.{ Await, Future, Promise } +import scala.concurrent.{ Await, ExecutionContext, Future, Promise } import scala.util.control.NonFatal import scala.util.{ Failure, Success, Try } -import akka.stream.ActorAttributes -import akka.stream.impl.Stages.DefaultAttributes.IODispatcher -import akka.stream.ActorAttributes.Dispatcher -import scala.concurrent.ExecutionContext -import akka.stream.ActorMaterializer -import akka.stream.ActorMaterializerHelper private[stream] object OutputStreamSourceStage { sealed trait AdapterToStageMessage @@ -42,8 +38,6 @@ final private[stream] class OutputStreamSourceStage(writeTimeout: FiniteDuration override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, OutputStream) = { val maxBuffer = inheritedAttributes.getAttribute(classOf[InputBuffer], InputBuffer(16, 16)).max - val dispatcherId = inheritedAttributes.get[Dispatcher](IODispatcher).dispatcher - require(maxBuffer > 0, "Buffer size must be greater than 0") val dataQueue = new LinkedBlockingQueue[ByteString](maxBuffer) @@ -109,8 +103,10 @@ final private[stream] class OutputStreamSourceStage(writeTimeout: FiniteDuration } override def preStart(): Unit = { - dispatcher = ActorMaterializerHelper.downcast(materializer).system.dispatchers.lookup(dispatcherId) - super.preStart() + // this stage is running on the blocking IO dispatcher by default, but we also want to schedule futures + // that are blocking, so we need to look it up + val actorMat = ActorMaterializerHelper.downcast(materializer) + dispatcher = actorMat.system.dispatchers.lookup(actorMat.settings.blockingIoDispatcher) } setHandler(out, new OutHandler { diff --git a/akka-testkit/src/main/mima-filters/2.5.10.backwards.excludes b/akka-testkit/src/main/mima-filters/2.5.9.backwards.excludes similarity index 100% rename from akka-testkit/src/main/mima-filters/2.5.10.backwards.excludes rename to akka-testkit/src/main/mima-filters/2.5.9.backwards.excludes