From 0ecadf7235b9b0fea1281bf926a27085ec04a130 Mon Sep 17 00:00:00 2001 From: Roman Filonenko Date: Wed, 7 Mar 2018 15:12:34 +0100 Subject: [PATCH] deduplicate logic for IODispatcher #24604 (#24619) * deduplicate logic for IODispatcher #24604 * introduce a resolveDispatcher helper in ActorAttributes * mention akka.stream.materializer.blocking-io-dispatcher instead of akka.stream.blocking-io-dispatcher in scaladocs * fix a flaky test * cosmetic changes in the touched files * move resolveDispather helper to the Dispatcher companion object under a new name resolve * filter out mima warning * fix mima excludes after the 2.5.11 release * address review comments * update stream-io.md with the correct dispatcher config key * mark ActorAttributes.Dispatcher#resolve as internal API * use the dispatche config key in ActorMaterializer * add private[akka] to the resolve methods --- .../src/main/paradox/stream/stream-io.md | 2 +- .../docs/stream/io/StreamFileDocSpec.scala | 1 - .../remote/artery/FlightRecorderSpec.scala | 6 ++- .../akka/stream/scaladsl/AttributesSpec.scala | 23 ++++++--- .../mima-filters/2.5.11.backwards.excludes | 2 + .../scala/akka/stream/ActorMaterializer.scala | 16 +++--- .../main/scala/akka/stream/Attributes.scala | 35 ++++++++++--- .../stream/impl/ActorMaterializerImpl.scala | 12 +++-- .../impl/PhasedFusingActorMaterializer.scala | 49 +++++++------------ .../scala/akka/stream/impl/io/IOSinks.scala | 18 ++----- .../scala/akka/stream/impl/io/IOSources.scala | 10 ++-- .../scala/akka/stream/javadsl/FileIO.scala | 20 ++++---- .../scala/akka/stream/javadsl/Source.scala | 4 +- .../stream/javadsl/StreamConverters.scala | 16 +++--- .../scala/akka/stream/scaladsl/FileIO.scala | 9 ++-- .../scala/akka/stream/scaladsl/Source.scala | 4 +- .../stream/scaladsl/StreamConverters.scala | 11 ++--- 17 files changed, 127 insertions(+), 111 deletions(-) create mode 100644 akka-stream/src/main/mima-filters/2.5.11.backwards.excludes diff --git a/akka-docs/src/main/paradox/stream/stream-io.md b/akka-docs/src/main/paradox/stream/stream-io.md index b76f6623a6..52f459b1fb 100644 --- a/akka-docs/src/main/paradox/stream/stream-io.md +++ b/akka-docs/src/main/paradox/stream/stream-io.md @@ -163,7 +163,7 @@ Java Please note that these processing stages are backed by Actors and by default are configured to run on a pre-configured threadpool-backed dispatcher dedicated for File IO. This is very important as it isolates the blocking file IO operations from the rest of the ActorSystem allowing each dispatcher to be utilised in the most efficient way. If you want to configure a custom -dispatcher for file IO operations globally, you can do so by changing the `akka.stream.blocking-io-dispatcher`, +dispatcher for file IO operations globally, you can do so by changing the `akka.stream.materializer.blocking-io-dispatcher`, or for a specific stage by specifying a custom Dispatcher in code, like this: Scala diff --git a/akka-docs/src/test/scala/docs/stream/io/StreamFileDocSpec.scala b/akka-docs/src/test/scala/docs/stream/io/StreamFileDocSpec.scala index 609936933e..37654eeb07 100644 --- a/akka-docs/src/test/scala/docs/stream/io/StreamFileDocSpec.scala +++ b/akka-docs/src/test/scala/docs/stream/io/StreamFileDocSpec.scala @@ -8,7 +8,6 @@ import java.nio.file.{ Files, Paths } import akka.stream._ import akka.stream.scaladsl.{ FileIO, Sink } import akka.stream.testkit.Utils._ -import akka.stream.testkit._ import akka.util.ByteString import akka.testkit.AkkaSpec diff --git a/akka-remote/src/test/scala/akka/remote/artery/FlightRecorderSpec.scala b/akka-remote/src/test/scala/akka/remote/artery/FlightRecorderSpec.scala index 429812ab7a..425e0fc1d3 100644 --- a/akka-remote/src/test/scala/akka/remote/artery/FlightRecorderSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/artery/FlightRecorderSpec.scala @@ -19,9 +19,13 @@ class FlightRecorderSpec extends AkkaSpec { "Flight Recorder" must { - "properly initialize AFR file when created" in withFlightRecorder { (recorder, reader, channel) ⇒ + "properly initialize AFR file when created" in withFlightRecorder { (_, reader, channel) ⇒ channel.force(false) + + // otherwise isAfter assertion below can randomly fail + Thread.sleep(1) val currentTime = Instant.now() + reader.rereadStructure() currentTime.isAfter(reader.structure.startTime) should be(true) diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/AttributesSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/AttributesSpec.scala index a82827c46d..1d3f69371a 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/AttributesSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/AttributesSpec.scala @@ -3,8 +3,7 @@ */ package akka.stream.scaladsl -import java.util.Optional -import java.util.concurrent.{ CompletableFuture, CompletionStage, TimeUnit } +import java.util.concurrent.{ CompletionStage, TimeUnit } import akka.actor.ActorSystem import akka.{ Done, NotUsed } @@ -535,16 +534,17 @@ class AttributesSpec extends StreamSpec(ConfigFactory.parseString( } "use the default-io-dispatcher by default" in { + import ActorAttributes._ val threadName = Source.fromGraph(new ThreadNameSnitchingStage(None) - // FIXME surprising that we don't have something shorter than this - .addAttributes(Attributes.none and ActorAttributes.IODispatcher)).runWith(Sink.head) + .addAttributes(Attributes(IODispatcher))).runWith(Sink.head) threadName.futureValue should startWith("AttributesSpec-akka.stream.default-blocking-io-dispatcher") } "allow for specifying a custom default io-dispatcher" in { + import ActorAttributes._ val system = ActorSystem("AttributesSpec-io-dispatcher-override", config) try { @@ -552,8 +552,7 @@ class AttributesSpec extends StreamSpec(ConfigFactory.parseString( val mat = ActorMaterializer()(system) val threadName = Source.fromGraph(new ThreadNameSnitchingStage(None) - // FIXME surprising that we don't have something shorter than this - .addAttributes(Attributes.none and ActorAttributes.IODispatcher)).runWith(Sink.head)(mat) + .addAttributes(Attributes(IODispatcher))).runWith(Sink.head)(mat) threadName.futureValue should startWith("AttributesSpec-io-dispatcher-override-my-io-dispatcher-") @@ -561,6 +560,18 @@ class AttributesSpec extends StreamSpec(ConfigFactory.parseString( TestKit.shutdownActorSystem(system) } } + + "resolve the dispatcher attribute" in { + import ActorAttributes._ + + Dispatcher.resolve(dispatcher("my-dispatcher"), materializer.settings) should be("my-dispatcher") + } + + "resolve the blocking io dispatcher attribute" in { + import ActorAttributes._ + + Dispatcher.resolve(Attributes(IODispatcher), materializer.settings) should be("akka.stream.default-blocking-io-dispatcher") + } } } diff --git a/akka-stream/src/main/mima-filters/2.5.11.backwards.excludes b/akka-stream/src/main/mima-filters/2.5.11.backwards.excludes new file mode 100644 index 0000000000..22bab86520 --- /dev/null +++ b/akka-stream/src/main/mima-filters/2.5.11.backwards.excludes @@ -0,0 +1,2 @@ +# #24604 Deduplicate logic for IODispatcher +ProblemFilters.exclude[MissingTypesProblem]("akka.stream.ActorAttributes$Dispatcher$") \ No newline at end of file diff --git a/akka-stream/src/main/scala/akka/stream/ActorMaterializer.scala b/akka-stream/src/main/scala/akka/stream/ActorMaterializer.scala index 529cf73187..9008b48cf4 100644 --- a/akka-stream/src/main/scala/akka/stream/ActorMaterializer.scala +++ b/akka-stream/src/main/scala/akka/stream/ActorMaterializer.scala @@ -76,7 +76,7 @@ object ActorMaterializer { } /** - * Scala API: * Scala API: Creates an ActorMaterializer that can materialize stream blueprints as running streams. + * Scala API: Creates an ActorMaterializer that can materialize stream blueprints as running streams. * * The required [[akka.actor.ActorRefFactory]] * (which can be either an [[akka.actor.ActorSystem]] or an [[akka.actor.ActorContext]]) @@ -255,13 +255,13 @@ object ActorMaterializerSettings { fuzzingMode: Boolean, autoFusing: Boolean, maxFixedBufferSize: Int) = { - // these sins were comitted in the name of bin comp: + // these sins were committed in the name of bin comp: val config = ConfigFactory.defaultReference new ActorMaterializerSettings( initialInputBufferSize, maxInputBufferSize, dispatcher, supervisionDecider, subscriptionTimeoutSettings, debugLogging, outputBurstLimit, fuzzingMode, autoFusing, maxFixedBufferSize, 1000, IOSettings(tcpWriteBufferSize = 16 * 1024), StreamRefSettings(config.getConfig("akka.stream.materializer.stream-ref")), - config.getString("akka.stream.blocking-io-dispatcher") + config.getString(ActorAttributes.IODispatcher.dispatcher) ) } @@ -307,13 +307,13 @@ object ActorMaterializerSettings { fuzzingMode: Boolean, autoFusing: Boolean, maxFixedBufferSize: Int) = { - // these sins were comitted in the name of bin comp: + // these sins were committed in the name of bin comp: val config = ConfigFactory.defaultReference new ActorMaterializerSettings( initialInputBufferSize, maxInputBufferSize, dispatcher, supervisionDecider, subscriptionTimeoutSettings, debugLogging, outputBurstLimit, fuzzingMode, autoFusing, maxFixedBufferSize, 1000, IOSettings(tcpWriteBufferSize = 16 * 1024), StreamRefSettings(config.getConfig("akka.stream.materializer.stream-ref")), - config.getString("akka.stream.blocking-io-dispatcher")) + config.getString(ActorAttributes.IODispatcher.dispatcher)) } /** @@ -383,7 +383,7 @@ final class ActorMaterializerSettings @InternalApi private ( this(initialInputBufferSize, maxInputBufferSize, dispatcher, supervisionDecider, subscriptionTimeoutSettings, debugLogging, outputBurstLimit, fuzzingMode, autoFusing, maxFixedBufferSize, syncProcessingLimit, ioSettings, StreamRefSettings(ConfigFactory.defaultReference().getConfig("akka.stream.materializer.stream-ref")), - ConfigFactory.defaultReference().getString("akka.stream.blocking-io-dispatcher") + ConfigFactory.defaultReference().getString(ActorAttributes.IODispatcher.dispatcher) ) // backwards compatibility when added IOSettings, shouldn't be needed since private, but added to satisfy mima @@ -404,7 +404,7 @@ final class ActorMaterializerSettings @InternalApi private ( this(initialInputBufferSize, maxInputBufferSize, dispatcher, supervisionDecider, subscriptionTimeoutSettings, debugLogging, outputBurstLimit, fuzzingMode, autoFusing, maxFixedBufferSize, syncProcessingLimit, IOSettings(tcpWriteBufferSize = 16 * 1024), StreamRefSettings(ConfigFactory.defaultReference().getConfig("akka.stream.materializer.stream-ref")), - ConfigFactory.defaultReference().getString("akka.stream.blocking-io-dispatcher") + ConfigFactory.defaultReference().getString(ActorAttributes.IODispatcher.dispatcher) ) // backwards compatibility when added IOSettings, shouldn't be needed since private, but added to satisfy mima @@ -424,7 +424,7 @@ final class ActorMaterializerSettings @InternalApi private ( this(initialInputBufferSize, maxInputBufferSize, dispatcher, supervisionDecider, subscriptionTimeoutSettings, debugLogging, outputBurstLimit, fuzzingMode, autoFusing, maxFixedBufferSize, 1000, IOSettings(tcpWriteBufferSize = 16 * 1024), StreamRefSettings(ConfigFactory.defaultReference().getConfig("akka.stream.materializer.stream-ref")), - ConfigFactory.defaultReference().getString("akka.stream.blocking-io-dispatcher") + ConfigFactory.defaultReference().getString(ActorAttributes.IODispatcher.dispatcher) ) private def copy( diff --git a/akka-stream/src/main/scala/akka/stream/Attributes.scala b/akka-stream/src/main/scala/akka/stream/Attributes.scala index d97cfc6358..f78c876a74 100644 --- a/akka-stream/src/main/scala/akka/stream/Attributes.scala +++ b/akka-stream/src/main/scala/akka/stream/Attributes.scala @@ -11,7 +11,6 @@ import scala.annotation.tailrec import scala.reflect.{ ClassTag, classTag } import akka.japi.function import java.net.URLEncoder -import java.util.concurrent.TimeUnit import akka.annotation.InternalApi import akka.stream.impl.TraversalBuilder @@ -71,7 +70,7 @@ final case class Attributes(attributeList: List[Attributes.Attribute] = Nil) { * This is the expected way for stages to access attributes. */ def getAttribute[T <: Attribute](c: Class[T]): Optional[T] = - (attributeList.collectFirst { case attr if c.isInstance(attr) ⇒ c.cast(attr) }).asJava + attributeList.collectFirst { case attr if c.isInstance(attr) ⇒ c.cast(attr) }.asJava /** * Scala API: Get the most specific attribute value for a given Attribute type or subclass thereof or @@ -129,7 +128,7 @@ final case class Attributes(attributeList: List[Attributes.Attribute] = Nil) { find(attributeList) match { case OptionVal.Some(t) ⇒ t.asInstanceOf[T] - case OptionVal.None ⇒ throw new IllegalStateException(s"Mandatory attribute ${c} not found") + case OptionVal.None ⇒ throw new IllegalStateException(s"Mandatory attribute [$c] not found") } } @@ -334,7 +333,7 @@ object Attributes { * Passing in null as any of the arguments sets the level to its default value, which is: * `Debug` for `onElement` and `onFinish`, and `Error` for `onFailure`. */ - def createLogLevels(onElement: Logging.LogLevel, onFinish: Logging.LogLevel, onFailure: Logging.LogLevel) = + def createLogLevels(onElement: Logging.LogLevel, onFinish: Logging.LogLevel, onFailure: Logging.LogLevel): Attributes = logLevels( onElement = Option(onElement).getOrElse(Logging.DebugLevel), onFinish = Option(onFinish).getOrElse(Logging.DebugLevel), @@ -365,10 +364,34 @@ object Attributes { object ActorAttributes { import Attributes._ final case class Dispatcher(dispatcher: String) extends MandatoryAttribute + + object Dispatcher { + /** + * INTERNAL API + * Resolves the dispatcher's name with a fallback to the default blocking IO dispatcher. + * Note that `IODispatcher.dispatcher` is not used here as the config used to create [[ActorMaterializerSettings]] + * is not easily accessible, instead the name is taken from `settings.blockingIoDispatcher` + */ + @InternalApi + private[akka] def resolve(attributes: Attributes, settings: ActorMaterializerSettings): String = + attributes.mandatoryAttribute[Dispatcher] match { + case IODispatcher ⇒ settings.blockingIoDispatcher + case Dispatcher(dispatcher) ⇒ dispatcher + } + + /** + * INTERNAL API + * Resolves the dispatcher name with a fallback to the default blocking IO dispatcher. + */ + @InternalApi + private[akka] def resolve(context: MaterializationContext): String = + resolve(context.effectiveAttributes, ActorMaterializerHelper.downcast(context.materializer).settings) + } + final case class SupervisionStrategy(decider: Supervision.Decider) extends MandatoryAttribute // this is actually a config key that needs reading and itself will contain the actual dispatcher name - val IODispatcher: Dispatcher = ActorAttributes.Dispatcher("akka.stream.blocking-io-dispatcher") + val IODispatcher: Dispatcher = ActorAttributes.Dispatcher("akka.stream.materializer.blocking-io-dispatcher") /** * Specifies the name of the dispatcher. This also adds an async boundary. @@ -402,7 +425,7 @@ object ActorAttributes { * Passing in null as any of the arguments sets the level to its default value, which is: * `Debug` for `onElement` and `onFinish`, and `Error` for `onFailure`. */ - def createLogLevels(onElement: Logging.LogLevel, onFinish: Logging.LogLevel, onFailure: Logging.LogLevel) = + def createLogLevels(onElement: Logging.LogLevel, onFinish: Logging.LogLevel, onFailure: Logging.LogLevel): Attributes = logLevels( onElement = Option(onElement).getOrElse(Logging.DebugLevel), onFinish = Option(onFinish).getOrElse(Logging.DebugLevel), diff --git a/akka-stream/src/main/scala/akka/stream/impl/ActorMaterializerImpl.scala b/akka-stream/src/main/scala/akka/stream/impl/ActorMaterializerImpl.scala index 097f7d6d12..6f1e2a6a55 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/ActorMaterializerImpl.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/ActorMaterializerImpl.scala @@ -43,13 +43,15 @@ import scala.concurrent.{ Await, ExecutionContextExecutor } * INTERNAL API */ @InternalApi private[akka] override def actorOf(context: MaterializationContext, props: Props): ActorRef = { - val effectiveProps = - if (props.dispatcher == Dispatchers.DefaultDispatcherId) + val effectiveProps = props.dispatcher match { + case Dispatchers.DefaultDispatcherId ⇒ props.withDispatcher(context.effectiveAttributes.mandatoryAttribute[ActorAttributes.Dispatcher].dispatcher) - else if (props.dispatcher == ActorAttributes.IODispatcher.dispatcher) + case ActorAttributes.IODispatcher.dispatcher ⇒ // this one is actually not a dispatcher but a relative config key pointing containing the actual dispatcher name props.withDispatcher(settings.blockingIoDispatcher) - else props + case _ ⇒ props + } + actorOf(effectiveProps, context.islandName) } @@ -174,7 +176,7 @@ private[akka] class SubFusingActorMaterializerImpl(val delegate: ExtendedActorMa @InternalApi private[akka] class StreamSupervisor(settings: ActorMaterializerSettings, haveShutDown: AtomicBoolean) extends Actor { import akka.stream.impl.StreamSupervisor._ - override def supervisorStrategy = SupervisorStrategy.stoppingStrategy + override def supervisorStrategy: SupervisorStrategy = SupervisorStrategy.stoppingStrategy def receive = { case Materialize(props, name) ⇒ diff --git a/akka-stream/src/main/scala/akka/stream/impl/PhasedFusingActorMaterializer.scala b/akka-stream/src/main/scala/akka/stream/impl/PhasedFusingActorMaterializer.scala index d57c461a6d..6833192e98 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/PhasedFusingActorMaterializer.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/PhasedFusingActorMaterializer.scala @@ -4,7 +4,6 @@ package akka.stream.impl import java.util -import java.util.ArrayList import java.util.concurrent.atomic.AtomicBoolean import akka.NotUsed @@ -12,10 +11,8 @@ import akka.actor.{ ActorContext, ActorRef, ActorRefFactory, ActorSystem, Cancel import akka.annotation.{ DoNotInherit, InternalApi } import akka.dispatch.Dispatchers import akka.event.{ Logging, LoggingAdapter } -import akka.stream.ActorAttributes.Dispatcher import akka.stream.Attributes.InputBuffer import akka.stream._ -import akka.stream.impl.Stages.DefaultAttributes.IODispatcher import akka.stream.impl.StreamLayout.AtomicModule import akka.stream.impl.fusing.ActorGraphInterpreter.{ ActorOutputBoundary, BatchingActorInputBoundary } import akka.stream.impl.fusing.GraphInterpreter.Connection @@ -150,12 +147,12 @@ private final case class SavedIslandData(islandGlobalOffset: Int, lastVisitedOff private var currentIslandGlobalOffset = 0 // The number of slots that belong to segments of other islands encountered so far, from the // beginning of the island - private var currentIslandSkippetSlots = 0 + private var currentIslandSkippedSlots = 0 - private var segments: java.util.ArrayList[SegmentInfo] = null - private var activePhases: java.util.ArrayList[PhaseIsland[Any]] = null - private var forwardWires: java.util.ArrayList[ForwardWire] = null - private var islandStateStack: java.util.ArrayList[SavedIslandData] = null + private var segments: java.util.ArrayList[SegmentInfo] = _ + private var activePhases: java.util.ArrayList[PhaseIsland[Any]] = _ + private var forwardWires: java.util.ArrayList[ForwardWire] = _ + private var islandStateStack: java.util.ArrayList[SavedIslandData] = _ private var currentPhase: PhaseIsland[Any] = defaultPhase.apply(settings, attributes, materializer, nextIslandName()) @@ -176,7 +173,7 @@ private final case class SavedIslandData(islandGlobalOffset: Int, lastVisitedOff globalislandOffset = currentIslandGlobalOffset, length = currentGlobalOffset - currentSegmentGlobalOffset, globalBaseOffset = currentSegmentGlobalOffset, - relativeBaseOffset = currentSegmentGlobalOffset - currentIslandGlobalOffset - currentIslandSkippetSlots, + relativeBaseOffset = currentSegmentGlobalOffset - currentIslandGlobalOffset - currentIslandSkippedSlots, currentPhase) // Segment tracking is by demand, we only allocate this list if it is used. @@ -193,7 +190,7 @@ private final case class SavedIslandData(islandGlobalOffset: Int, lastVisitedOff completeSegment() val previousPhase = currentPhase val previousIslandOffset = currentIslandGlobalOffset - islandStateStack.add(SavedIslandData(previousIslandOffset, currentGlobalOffset, currentIslandSkippetSlots, previousPhase)) + islandStateStack.add(SavedIslandData(previousIslandOffset, currentGlobalOffset, currentIslandSkippedSlots, previousPhase)) currentPhase = phases(tag)(settings, attributes, materializer, nextIslandName()) activePhases.add(currentPhase) @@ -203,7 +200,7 @@ private final case class SavedIslandData(islandGlobalOffset: Int, lastVisitedOff // The base offset of this segment is the current global offset currentSegmentGlobalOffset = currentGlobalOffset - currentIslandSkippetSlots = 0 + currentIslandSkippedSlots = 0 if (Debug) println(s"Entering island starting at offset = $currentIslandGlobalOffset phase = $currentPhase") } @@ -217,7 +214,7 @@ private final case class SavedIslandData(islandGlobalOffset: Int, lastVisitedOff // We restore data for the island currentIslandGlobalOffset = parentIsland.islandGlobalOffset currentPhase = parentIsland.phase - currentIslandSkippetSlots = parentIsland.skippedSlots + (currentGlobalOffset - parentIsland.lastVisitedOffset) + currentIslandSkippedSlots = parentIsland.skippedSlots + (currentGlobalOffset - parentIsland.lastVisitedOffset) if (Debug) println(s"Exited to island starting at offset = $currentIslandGlobalOffset phase = $currentPhase") } @@ -225,7 +222,7 @@ private final case class SavedIslandData(islandGlobalOffset: Int, lastVisitedOff @InternalApi private[akka] def wireIn(in: InPort, logic: Any): Unit = { // The slot for this InPort always belong to the current segment, so resolving its local // offset/slot is simple - val localInSlot = currentGlobalOffset - currentIslandGlobalOffset - currentIslandSkippetSlots + val localInSlot = currentGlobalOffset - currentIslandGlobalOffset - currentIslandSkippedSlots if (Debug) println(s" wiring port $in inOffs absolute = $currentGlobalOffset local = $localInSlot") // Assign the logic belonging to the current port to its calculated local slot in the island @@ -278,8 +275,8 @@ private final case class SavedIslandData(islandGlobalOffset: Int, lastVisitedOff if (absoluteOffset >= currentSegmentGlobalOffset) { // Wiring is in the same segment, no complex lookup needed - val localInSlot = absoluteOffset - currentIslandGlobalOffset - currentIslandSkippetSlots - if (Debug) println(s" in-segment wiring to local ($absoluteOffset - $currentIslandGlobalOffset - $currentIslandSkippetSlots) = $localInSlot") + val localInSlot = absoluteOffset - currentIslandGlobalOffset - currentIslandSkippedSlots + if (Debug) println(s" in-segment wiring to local ($absoluteOffset - $currentIslandGlobalOffset - $currentIslandSkippedSlots) = $localInSlot") currentPhase.assignPort(out, localInSlot, logic) } else { // Wiring is cross-segment, but we don't know if it is cross-island or not yet @@ -383,7 +380,7 @@ private final case class SavedIslandData(islandGlobalOffset: Int, lastVisitedOff * When these attributes are needed later in the materialization process it is important that the * they are gotten through the attributes and not through the [[ActorMaterializerSettings]] */ - val defaultAttributes = { + val defaultAttributes: Attributes = { Attributes( Attributes.InputBuffer(settings.initialInputBufferSize, settings.maxInputBufferSize) :: ActorAttributes.SupervisionStrategy(settings.supervisionDecider) :: @@ -609,7 +606,7 @@ private final case class SavedIslandData(islandGlobalOffset: Int, lastVisitedOff subflowFuser: OptionVal[GraphInterpreterShell ⇒ ActorRef]) extends PhaseIsland[GraphStageLogic] { // TODO: remove these private val logicArrayType = Array.empty[GraphStageLogic] - private[this] val logics = new ArrayList[GraphStageLogic](16) + private[this] val logics = new util.ArrayList[GraphStageLogic](16) private var connections = new Array[Connection](16) private var maxConnections = 0 @@ -734,18 +731,13 @@ private final case class SavedIslandData(islandGlobalOffset: Int, lastVisitedOff shell.logics = logics.toArray(logicArrayType) subflowFuser match { - case OptionVal.Some(fuseIntoExistingInterperter) ⇒ - fuseIntoExistingInterperter(shell) + case OptionVal.Some(fuseIntoExistingInterpreter) ⇒ + fuseIntoExistingInterpreter(shell) case _ ⇒ - val dispatcher = - effectiveAttributes.mandatoryAttribute[ActorAttributes.Dispatcher] match { - case ActorAttributes.IODispatcher ⇒ settings.blockingIoDispatcher - case ActorAttributes.Dispatcher(dispatcher) ⇒ dispatcher - } - val props = ActorGraphInterpreter.props(shell) - .withDispatcher(dispatcher) + val props = ActorGraphInterpreter.props(shell).withDispatcher(ActorAttributes.Dispatcher.resolve(effectiveAttributes, settings)) + val actorName = fullIslandName match { case OptionVal.Some(n) ⇒ n case OptionVal.None ⇒ islandName @@ -887,10 +879,7 @@ private final case class SavedIslandData(islandGlobalOffset: Int, lastVisitedOff def materializeAtomic(mod: AtomicModule[Shape, Any], attributes: Attributes): (NotUsed, Any) = { val tls = mod.asInstanceOf[TlsModule] - val dispatcher = attributes.mandatoryAttribute[Dispatcher] match { - case IODispatcher ⇒ materializer.settings.blockingIoDispatcher - case Dispatcher(name) ⇒ name - } + val dispatcher = ActorAttributes.Dispatcher.resolve(attributes, materializer.settings) val maxInputBuffer = attributes.mandatoryAttribute[Attributes.InputBuffer].max val props = diff --git a/akka-stream/src/main/scala/akka/stream/impl/io/IOSinks.scala b/akka-stream/src/main/scala/akka/stream/impl/io/IOSinks.scala index aeb8833718..82698cbb3d 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/io/IOSinks.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/io/IOSinks.scala @@ -7,10 +7,9 @@ import java.io.OutputStream import java.nio.file.{ OpenOption, Path } import akka.annotation.InternalApi +import akka.stream.ActorAttributes.Dispatcher import akka.stream._ import akka.stream.impl.SinkModule -import akka.stream.impl.Stages.DefaultAttributes.IODispatcher -import akka.stream.ActorAttributes.Dispatcher import akka.util.ByteString import scala.collection.immutable @@ -34,12 +33,8 @@ import scala.concurrent.{ Future, Promise } val ioResultPromise = Promise[IOResult]() val props = FileSubscriber.props(f, ioResultPromise, maxInputBufferSize, startPosition, options) - val dispatcher = context.effectiveAttributes.mandatoryAttribute[Dispatcher] match { - case IODispatcher ⇒ ActorMaterializerHelper.downcast(context.materializer).settings.blockingIoDispatcher - case Dispatcher(name) ⇒ name - } + val ref = materializer.actorOf(context, props.withDispatcher(Dispatcher.resolve(context))) - val ref = materializer.actorOf(context, props.withDispatcher(dispatcher)) (akka.stream.actor.ActorSubscriber[ByteString](ref), ioResultPromise.future) } @@ -65,12 +60,9 @@ import scala.concurrent.{ Future, Promise } val maxInputBufferSize = context.effectiveAttributes.mandatoryAttribute[Attributes.InputBuffer].max - val dispatcher = context.effectiveAttributes.mandatoryAttribute[Dispatcher] match { - case IODispatcher ⇒ ActorMaterializerHelper.downcast(context.materializer).settings.blockingIoDispatcher - case Dispatcher(name) ⇒ name - } - - val props = OutputStreamSubscriber.props(os, ioResultPromise, maxInputBufferSize, autoFlush).withDispatcher(dispatcher) + val props = OutputStreamSubscriber + .props(os, ioResultPromise, maxInputBufferSize, autoFlush) + .withDispatcher(Dispatcher.resolve(context)) val ref = materializer.actorOf(context, props) (akka.stream.actor.ActorSubscriber[ByteString](ref), ioResultPromise.future) diff --git a/akka-stream/src/main/scala/akka/stream/impl/io/IOSources.scala b/akka-stream/src/main/scala/akka/stream/impl/io/IOSources.scala index bcfaa75766..cb2759d595 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/io/IOSources.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/io/IOSources.scala @@ -12,7 +12,6 @@ import akka.Done import akka.annotation.InternalApi import akka.stream.ActorAttributes.Dispatcher import akka.stream.Attributes.InputBuffer -import akka.stream.impl.Stages.DefaultAttributes.IODispatcher import akka.stream.impl.{ ErrorPublisher, SourceModule } import akka.stream.stage._ import akka.stream.{ IOResult, _ } @@ -155,12 +154,9 @@ private[akka] final class FileSource(path: Path, chunkSize: Int, startPosition: val pub = try { val is = createInputStream() // can throw, i.e. FileNotFound - val dispatcher = context.effectiveAttributes.mandatoryAttribute[Dispatcher] match { - case IODispatcher ⇒ ActorMaterializerHelper.downcast(context.materializer).settings.blockingIoDispatcher - case Dispatcher(name) ⇒ name - } - - val props = InputStreamPublisher.props(is, ioResultPromise, chunkSize).withDispatcher(dispatcher) + val props = InputStreamPublisher + .props(is, ioResultPromise, chunkSize) + .withDispatcher(Dispatcher.resolve(context)) val ref = materializer.actorOf(context, props) akka.stream.actor.ActorPublisher[ByteString](ref) diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/FileIO.scala b/akka-stream/src/main/scala/akka/stream/javadsl/FileIO.scala index 1b7dd1a647..4672747c02 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/FileIO.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/FileIO.scala @@ -26,7 +26,7 @@ object FileIO { * Materializes a [[java.util.concurrent.CompletionStage]] of [[IOResult]] that will be completed with the size of the file (in bytes) at the streams completion, * and a possible exception if IO operation was not completed successfully. * - * You can configure the default dispatcher for this Source by changing the `akka.stream.blocking-io-dispatcher` or + * You can configure the default dispatcher for this Source by changing the `akka.stream.materializer.blocking-io-dispatcher` or * set it for a given Source by using [[ActorAttributes]]. * * @param f The file to write to @@ -42,7 +42,7 @@ object FileIO { * Materializes a [[java.util.concurrent.CompletionStage]] of [[IOResult]] that will be completed with the size of the file (in bytes) at the streams completion, * and a possible exception if IO operation was not completed successfully. * - * You can configure the default dispatcher for this Source by changing the `akka.stream.blocking-io-dispatcher` or + * You can configure the default dispatcher for this Source by changing the `akka.stream.materializer.blocking-io-dispatcher` or * set it for a given Source by using [[ActorAttributes]]. * * Accepts as arguments a set of [[java.nio.file.StandardOpenOption]], which will determine @@ -62,7 +62,7 @@ object FileIO { * Materializes a [[java.util.concurrent.CompletionStage]] of [[IOResult]] that will be completed with the size of the file (in bytes) at the streams completion, * and a possible exception if IO operation was not completed successfully. * - * You can configure the default dispatcher for this Source by changing the `akka.stream.blocking-io-dispatcher` or + * You can configure the default dispatcher for this Source by changing the `akka.stream.materializer.blocking-io-dispatcher` or * set it for a given Source by using [[ActorAttributes]]. * * @param f The file to write to @@ -78,7 +78,7 @@ object FileIO { * Materializes a [[java.util.concurrent.CompletionStage]] of [[IOResult]] that will be completed with the size of the file (in bytes) at the streams completion, * and a possible exception if IO operation was not completed successfully. * - * You can configure the default dispatcher for this Source by changing the `akka.stream.blocking-io-dispatcher` or + * You can configure the default dispatcher for this Source by changing the `akka.stream.materializer.blocking-io-dispatcher` or * set it for a given Source by using [[ActorAttributes]]. * * Accepts as arguments a set of [[java.nio.file.StandardOpenOption]], which will determine @@ -99,7 +99,7 @@ object FileIO { * Materializes a [[java.util.concurrent.CompletionStage]] of [[IOResult]] that will be completed with the size of the file (in bytes) at the streams completion, * and a possible exception if IO operation was not completed successfully. * - * You can configure the default dispatcher for this Source by changing the `akka.stream.blocking-io-dispatcher` or + * You can configure the default dispatcher for this Source by changing the `akka.stream.materializer.blocking-io-dispatcher` or * set it for a given Source by using [[ActorAttributes]]. * * Accepts as arguments a set of [[java.nio.file.StandardOpenOption]], which will determine @@ -120,7 +120,7 @@ object FileIO { * Emitted elements are [[ByteString]] elements, chunked by default by 8192 bytes, * except the last element, which will be up to 8192 in size. * - * You can configure the default dispatcher for this Source by changing the `akka.stream.blocking-io-dispatcher` or + * You can configure the default dispatcher for this Source by changing the `akka.stream.materializer.blocking-io-dispatcher` or * set it for a given Source by using [[ActorAttributes]]. * * It materializes a [[java.util.concurrent.CompletionStage]] of [[IOResult]] containing the number of bytes read from the source file upon completion, @@ -136,7 +136,7 @@ object FileIO { * Emitted elements are [[ByteString]] elements, chunked by default by 8192 bytes, * except the last element, which will be up to 8192 in size. * - * You can configure the default dispatcher for this Source by changing the `akka.stream.blocking-io-dispatcher` or + * You can configure the default dispatcher for this Source by changing the `akka.stream.materializer.blocking-io-dispatcher` or * set it for a given Source by using [[ActorAttributes]]. * * It materializes a [[java.util.concurrent.CompletionStage]] of [[IOResult]] containing the number of bytes read from the source file upon completion, @@ -151,7 +151,7 @@ object FileIO { * Emitted elements are `chunkSize` sized [[ByteString]] elements, * except the last element, which will be up to `chunkSize` in size. * - * You can configure the default dispatcher for this Source by changing the `akka.stream.blocking-io-dispatcher` or + * You can configure the default dispatcher for this Source by changing the `akka.stream.materializer.blocking-io-dispatcher` or * set it for a given Source by using [[ActorAttributes]]. * * It materializes a [[java.util.concurrent.CompletionStage]] of [[IOResult]] containing the number of bytes read from the source file upon completion, @@ -168,7 +168,7 @@ object FileIO { * Emitted elements are `chunkSize` sized [[ByteString]] elements, * except the last element, which will be up to `chunkSize` in size. * - * You can configure the default dispatcher for this Source by changing the `akka.stream.blocking-io-dispatcher` or + * You can configure the default dispatcher for this Source by changing the `akka.stream.materializer.blocking-io-dispatcher` or * set it for a given Source by using [[ActorAttributes]]. * * It materializes a [[java.util.concurrent.CompletionStage]] of [[IOResult]] containing the number of bytes read from the source file upon completion, @@ -185,7 +185,7 @@ object FileIO { * Emitted elements are `chunkSize` sized [[ByteString]] elements, * except the last element, which will be up to `chunkSize` in size. * - * You can configure the default dispatcher for this Source by changing the `akka.stream.blocking-io-dispatcher` or + * You can configure the default dispatcher for this Source by changing the `akka.stream.materializer.blocking-io-dispatcher` or * set it for a given Source by using [[ActorAttributes]]. * * It materializes a [[java.util.concurrent.CompletionStage]] of [[IOResult]] containing the number of bytes read from the source file upon completion, diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala index 5323cd516b..6cebb1631d 100755 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala @@ -416,7 +416,7 @@ object Source { * `Restart` supervision strategy will close and create blocking IO again. Default strategy is `Stop` which means * that stream will be terminated on error in `read` function by default. * - * You can configure the default dispatcher for this Source by changing the `akka.stream.blocking-io-dispatcher` or + * You can configure the default dispatcher for this Source by changing the `akka.stream.materializer.blocking-io-dispatcher` or * set it for a given Source by using [[ActorAttributes]]. * * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute. @@ -444,7 +444,7 @@ object Source { * `Restart` supervision strategy will close and create resource. Default strategy is `Stop` which means * that stream will be terminated on error in `read` function (or future) by default. * - * You can configure the default dispatcher for this Source by changing the `akka.stream.blocking-io-dispatcher` or + * You can configure the default dispatcher for this Source by changing the `akka.stream.materializer.blocking-io-dispatcher` or * set it for a given Source by using [[ActorAttributes]]. * * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute. diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/StreamConverters.scala b/akka-stream/src/main/scala/akka/stream/javadsl/StreamConverters.scala index 145d3f2521..178e4291f1 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/StreamConverters.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/StreamConverters.scala @@ -23,7 +23,7 @@ object StreamConverters { * Materializes a [[CompletionStage]] of [[IOResult]] that will be completed with the size of the file (in bytes) at the streams completion, * and a possible exception if IO operation was not completed successfully. * - * You can configure the default dispatcher for this Source by changing the `akka.stream.blocking-io-dispatcher` or + * You can configure the default dispatcher for this Source by changing the `akka.stream.materializer.blocking-io-dispatcher` or * set it for a given Source by using [[akka.stream.ActorAttributes]]. * * This method uses no auto flush for the [[java.io.OutputStream]] @see [[#fromOutputStream(function.Creator, Boolean)]] if you want to override it. @@ -41,7 +41,7 @@ object StreamConverters { * Materializes a [[CompletionStage]] of [[IOResult]] that will be completed with the size of the file (in bytes) at the streams completion, * and a possible exception if IO operation was not completed successfully. * - * You can configure the default dispatcher for this Source by changing the `akka.stream.blocking-io-dispatcher` or + * You can configure the default dispatcher for this Source by changing the `akka.stream.materializer.blocking-io-dispatcher` or * set it for a given Source by using [[akka.stream.ActorAttributes]]. * * The [[OutputStream]] will be closed when the stream flowing into this [[Sink]] is completed. The [[Sink]] @@ -62,7 +62,7 @@ object StreamConverters { * * This Sink is intended for inter-operation with legacy APIs since it is inherently blocking. * - * You can configure the default dispatcher for this Source by changing the `akka.stream.blocking-io-dispatcher` or + * You can configure the default dispatcher for this Source by changing the `akka.stream.materializer.blocking-io-dispatcher` or * set it for a given Source by using [[akka.stream.ActorAttributes]]. * * The [[InputStream]] will be closed when the stream flowing into this [[Sink]] completes, and @@ -76,7 +76,7 @@ object StreamConverters { * * This Sink is intended for inter-operation with legacy APIs since it is inherently blocking. * - * You can configure the default dispatcher for this Source by changing the `akka.stream.blocking-io-dispatcher` or + * You can configure the default dispatcher for this Source by changing the `akka.stream.materializer.blocking-io-dispatcher` or * set it for a given Source by using [[akka.stream.ActorAttributes]]. * * The [[InputStream]] will be closed when the stream flowing into this [[Sink]] completes, and @@ -94,7 +94,7 @@ object StreamConverters { * [[java.io.InputStream]] returns on each read invocation. Such chunks will * never be larger than chunkSize though. * - * You can configure the default dispatcher for this Source by changing the `akka.stream.blocking-io-dispatcher` or + * You can configure the default dispatcher for this Source by changing the `akka.stream.materializer.blocking-io-dispatcher` or * set it for a given Source by using [[akka.stream.ActorAttributes]]. * * It materializes a [[CompletionStage]] containing the number of bytes read from the source file upon completion. @@ -111,7 +111,7 @@ object StreamConverters { * [[java.io.InputStream]] returns on each read invocation. Such chunks will * never be larger than chunkSize though. * - * You can configure the default dispatcher for this Source by changing the `akka.stream.blocking-io-dispatcher` or + * You can configure the default dispatcher for this Source by changing the `akka.stream.materializer.blocking-io-dispatcher` or * set it for a given Source by using [[akka.stream.ActorAttributes]]. * * It materializes a [[CompletionStage]] of [[IOResult]] containing the number of bytes read from the source file upon completion, @@ -127,7 +127,7 @@ object StreamConverters { * * This Source is intended for inter-operation with legacy APIs since it is inherently blocking. * - * You can configure the default dispatcher for this Source by changing the `akka.stream.blocking-io-dispatcher` or + * You can configure the default dispatcher for this Source by changing the `akka.stream.materializer.blocking-io-dispatcher` or * set it for a given Source by using [[akka.stream.ActorAttributes]]. * * The created [[OutputStream]] will be closed when the [[Source]] is cancelled, and closing the [[OutputStream]] @@ -145,7 +145,7 @@ object StreamConverters { * * This Source is intended for inter-operation with legacy APIs since it is inherently blocking. * - * You can configure the default dispatcher for this Source by changing the `akka.stream.blocking-io-dispatcher` or + * You can configure the default dispatcher for this Source by changing the `akka.stream.materializer.blocking-io-dispatcher` or * set it for a given Source by using [[akka.stream.ActorAttributes]]. * * The created [[OutputStream]] will be closed when the [[Source]] is cancelled, and closing the [[OutputStream]] diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/FileIO.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/FileIO.scala index 4d12e60955..792f5ddaa3 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/FileIO.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/FileIO.scala @@ -4,7 +4,7 @@ package akka.stream.scaladsl import java.io.File -import java.nio.file.{ OpenOption, Path, StandardOpenOption } +import java.nio.file.{ OpenOption, Path } import java.nio.file.StandardOpenOption._ import akka.stream.impl.Stages.DefaultAttributes @@ -20,14 +20,13 @@ import scala.concurrent.Future object FileIO { import Sink.{ shape ⇒ sinkShape } - import Source.{ shape ⇒ sourceShape } /** * Creates a Source from a files contents. * Emitted elements are `chunkSize` sized [[akka.util.ByteString]] elements, * except the final element, which will be up to `chunkSize` in size. * - * You can configure the default dispatcher for this Source by changing the `akka.stream.blocking-io-dispatcher` or + * You can configure the default dispatcher for this Source by changing the `akka.stream.materializer.blocking-io-dispatcher` or * set it for a given Source by using [[akka.stream.ActorAttributes]]. * * It materializes a [[Future]] of [[IOResult]] containing the number of bytes read from the source file upon completion, @@ -45,7 +44,7 @@ object FileIO { * Emitted elements are `chunkSize` sized [[akka.util.ByteString]] elements, * except the final element, which will be up to `chunkSize` in size. * - * You can configure the default dispatcher for this Source by changing the `akka.stream.blocking-io-dispatcher` or + * You can configure the default dispatcher for this Source by changing the `akka.stream.materializer.blocking-io-dispatcher` or * set it for a given Source by using [[akka.stream.ActorAttributes]]. * * It materializes a [[Future]] of [[IOResult]] containing the number of bytes read from the source file upon completion, @@ -62,7 +61,7 @@ object FileIO { * Emitted elements are `chunkSize` sized [[akka.util.ByteString]] elements, * except the final element, which will be up to `chunkSize` in size. * - * You can configure the default dispatcher for this Source by changing the `akka.stream.blocking-io-dispatcher` or + * You can configure the default dispatcher for this Source by changing the `akka.stream.materializer.blocking-io-dispatcher` or * set it for a given Source by using [[ActorAttributes]]. * * It materializes a [[Future]] of [[IOResult]] containing the number of bytes read from the source file upon completion, diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala index 4cdacace92..d91bd16569 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala @@ -631,7 +631,7 @@ object Source { * `Restart` supervision strategy will close and create blocking IO again. Default strategy is `Stop` which means * that stream will be terminated on error in `read` function by default. * - * You can configure the default dispatcher for this Source by changing the `akka.stream.blocking-io-dispatcher` or + * You can configure the default dispatcher for this Source by changing the `akka.stream.materializer.blocking-io-dispatcher` or * set it for a given Source by using [[ActorAttributes]]. * * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute. @@ -654,7 +654,7 @@ object Source { * `Restart` supervision strategy will close and create resource. Default strategy is `Stop` which means * that stream will be terminated on error in `read` function (or future) by default. * - * You can configure the default dispatcher for this Source by changing the `akka.stream.blocking-io-dispatcher` or + * You can configure the default dispatcher for this Source by changing the `akka.stream.materializer.blocking-io-dispatcher` or * set it for a given Source by using [[ActorAttributes]]. * * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute. diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/StreamConverters.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/StreamConverters.scala index 407a894deb..e3709926f4 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/StreamConverters.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/StreamConverters.scala @@ -5,7 +5,6 @@ package akka.stream.scaladsl import java.io.{ OutputStream, InputStream } import java.util.Spliterators -import java.util.concurrent.atomic.AtomicReference import java.util.stream.{ Collector, StreamSupport } import akka.stream.{ Attributes, SinkShape, IOResult } @@ -34,7 +33,7 @@ object StreamConverters { * [[java.io.InputStream]] returns on each read invocation. Such chunks will * never be larger than chunkSize though. * - * You can configure the default dispatcher for this Source by changing the `akka.stream.blocking-io-dispatcher` or + * You can configure the default dispatcher for this Source by changing the `akka.stream.materializer.blocking-io-dispatcher` or * set it for a given Source by using [[akka.stream.ActorAttributes]]. * * It materializes a [[Future]] of [[IOResult]] containing the number of bytes read from the source file upon completion, @@ -54,7 +53,7 @@ object StreamConverters { * * This Source is intended for inter-operation with legacy APIs since it is inherently blocking. * - * You can configure the default dispatcher for this Source by changing the `akka.stream.blocking-io-dispatcher` or + * You can configure the default dispatcher for this Source by changing the `akka.stream.materializer.blocking-io-dispatcher` or * set it for a given Source by using [[akka.stream.ActorAttributes]]. * * The created [[OutputStream]] will be closed when the [[Source]] is cancelled, and closing the [[OutputStream]] @@ -71,7 +70,7 @@ object StreamConverters { * Materializes a [[Future]] of [[IOResult]] that will be completed with the size of the file (in bytes) at the streams completion, * and a possible exception if IO operation was not completed successfully. * - * You can configure the default dispatcher for this Source by changing the `akka.stream.blocking-io-dispatcher` or + * You can configure the default dispatcher for this Source by changing the `akka.stream.materializer.blocking-io-dispatcher` or * set it for a given Source by using [[akka.stream.ActorAttributes]]. * If `autoFlush` is true the OutputStream will be flushed whenever a byte array is written, defaults to false. * @@ -87,7 +86,7 @@ object StreamConverters { * * This Sink is intended for inter-operation with legacy APIs since it is inherently blocking. * - * You can configure the default dispatcher for this Source by changing the `akka.stream.blocking-io-dispatcher` or + * You can configure the default dispatcher for this Source by changing the `akka.stream.materializer.blocking-io-dispatcher` or * set it for a given Source by using [[akka.stream.ActorAttributes]]. * * The [[InputStream]] will be closed when the stream flowing into this [[Sink]] completes, and @@ -169,7 +168,7 @@ object StreamConverters { .mapMaterializedValue(queue ⇒ StreamSupport.stream( Spliterators.spliteratorUnknownSize(new java.util.Iterator[T] { var nextElementFuture: Future[Option[T]] = queue.pull() - var nextElement: Option[T] = null + var nextElement: Option[T] = _ override def hasNext: Boolean = { nextElement = Await.result(nextElementFuture, Inf)