deduplicate logic for IODispatcher #24604 (#24619)

* deduplicate logic for IODispatcher #24604
 * introduce a resolveDispatcher helper in ActorAttributes
 * mention akka.stream.materializer.blocking-io-dispatcher instead of akka.stream.blocking-io-dispatcher in scaladocs
 * fix a flaky test
 * cosmetic changes in the touched files

* move resolveDispather helper to the Dispatcher companion object under a new name resolve

* filter out mima warning

* fix mima excludes after the 2.5.11 release

* address review comments
 * update stream-io.md with the correct dispatcher config key
 * mark ActorAttributes.Dispatcher#resolve as internal API
 * use the dispatche config key in ActorMaterializer

* add private[akka] to the resolve methods
This commit is contained in:
Roman Filonenko 2018-03-07 15:12:34 +01:00 committed by Patrik Nordwall
parent a13f5cab00
commit 0ecadf7235
17 changed files with 127 additions and 111 deletions

View file

@ -163,7 +163,7 @@ Java
Please note that these processing stages are backed by Actors and by default are configured to run on a pre-configured
threadpool-backed dispatcher dedicated for File IO. This is very important as it isolates the blocking file IO operations from the rest
of the ActorSystem allowing each dispatcher to be utilised in the most efficient way. If you want to configure a custom
dispatcher for file IO operations globally, you can do so by changing the `akka.stream.blocking-io-dispatcher`,
dispatcher for file IO operations globally, you can do so by changing the `akka.stream.materializer.blocking-io-dispatcher`,
or for a specific stage by specifying a custom Dispatcher in code, like this:
Scala

View file

@ -8,7 +8,6 @@ import java.nio.file.{ Files, Paths }
import akka.stream._
import akka.stream.scaladsl.{ FileIO, Sink }
import akka.stream.testkit.Utils._
import akka.stream.testkit._
import akka.util.ByteString
import akka.testkit.AkkaSpec

View file

@ -19,9 +19,13 @@ class FlightRecorderSpec extends AkkaSpec {
"Flight Recorder" must {
"properly initialize AFR file when created" in withFlightRecorder { (recorder, reader, channel)
"properly initialize AFR file when created" in withFlightRecorder { (_, reader, channel)
channel.force(false)
// otherwise isAfter assertion below can randomly fail
Thread.sleep(1)
val currentTime = Instant.now()
reader.rereadStructure()
currentTime.isAfter(reader.structure.startTime) should be(true)

View file

@ -3,8 +3,7 @@
*/
package akka.stream.scaladsl
import java.util.Optional
import java.util.concurrent.{ CompletableFuture, CompletionStage, TimeUnit }
import java.util.concurrent.{ CompletionStage, TimeUnit }
import akka.actor.ActorSystem
import akka.{ Done, NotUsed }
@ -535,16 +534,17 @@ class AttributesSpec extends StreamSpec(ConfigFactory.parseString(
}
"use the default-io-dispatcher by default" in {
import ActorAttributes._
val threadName =
Source.fromGraph(new ThreadNameSnitchingStage(None)
// FIXME surprising that we don't have something shorter than this
.addAttributes(Attributes.none and ActorAttributes.IODispatcher)).runWith(Sink.head)
.addAttributes(Attributes(IODispatcher))).runWith(Sink.head)
threadName.futureValue should startWith("AttributesSpec-akka.stream.default-blocking-io-dispatcher")
}
"allow for specifying a custom default io-dispatcher" in {
import ActorAttributes._
val system = ActorSystem("AttributesSpec-io-dispatcher-override", config)
try {
@ -552,8 +552,7 @@ class AttributesSpec extends StreamSpec(ConfigFactory.parseString(
val mat = ActorMaterializer()(system)
val threadName =
Source.fromGraph(new ThreadNameSnitchingStage(None)
// FIXME surprising that we don't have something shorter than this
.addAttributes(Attributes.none and ActorAttributes.IODispatcher)).runWith(Sink.head)(mat)
.addAttributes(Attributes(IODispatcher))).runWith(Sink.head)(mat)
threadName.futureValue should startWith("AttributesSpec-io-dispatcher-override-my-io-dispatcher-")
@ -561,6 +560,18 @@ class AttributesSpec extends StreamSpec(ConfigFactory.parseString(
TestKit.shutdownActorSystem(system)
}
}
"resolve the dispatcher attribute" in {
import ActorAttributes._
Dispatcher.resolve(dispatcher("my-dispatcher"), materializer.settings) should be("my-dispatcher")
}
"resolve the blocking io dispatcher attribute" in {
import ActorAttributes._
Dispatcher.resolve(Attributes(IODispatcher), materializer.settings) should be("akka.stream.default-blocking-io-dispatcher")
}
}
}

View file

@ -0,0 +1,2 @@
# #24604 Deduplicate logic for IODispatcher
ProblemFilters.exclude[MissingTypesProblem]("akka.stream.ActorAttributes$Dispatcher$")

View file

@ -76,7 +76,7 @@ object ActorMaterializer {
}
/**
* Scala API: * Scala API: Creates an ActorMaterializer that can materialize stream blueprints as running streams.
* Scala API: Creates an ActorMaterializer that can materialize stream blueprints as running streams.
*
* The required [[akka.actor.ActorRefFactory]]
* (which can be either an [[akka.actor.ActorSystem]] or an [[akka.actor.ActorContext]])
@ -255,13 +255,13 @@ object ActorMaterializerSettings {
fuzzingMode: Boolean,
autoFusing: Boolean,
maxFixedBufferSize: Int) = {
// these sins were comitted in the name of bin comp:
// these sins were committed in the name of bin comp:
val config = ConfigFactory.defaultReference
new ActorMaterializerSettings(
initialInputBufferSize, maxInputBufferSize, dispatcher, supervisionDecider, subscriptionTimeoutSettings, debugLogging,
outputBurstLimit, fuzzingMode, autoFusing, maxFixedBufferSize, 1000, IOSettings(tcpWriteBufferSize = 16 * 1024),
StreamRefSettings(config.getConfig("akka.stream.materializer.stream-ref")),
config.getString("akka.stream.blocking-io-dispatcher")
config.getString(ActorAttributes.IODispatcher.dispatcher)
)
}
@ -307,13 +307,13 @@ object ActorMaterializerSettings {
fuzzingMode: Boolean,
autoFusing: Boolean,
maxFixedBufferSize: Int) = {
// these sins were comitted in the name of bin comp:
// these sins were committed in the name of bin comp:
val config = ConfigFactory.defaultReference
new ActorMaterializerSettings(
initialInputBufferSize, maxInputBufferSize, dispatcher, supervisionDecider, subscriptionTimeoutSettings, debugLogging,
outputBurstLimit, fuzzingMode, autoFusing, maxFixedBufferSize, 1000, IOSettings(tcpWriteBufferSize = 16 * 1024),
StreamRefSettings(config.getConfig("akka.stream.materializer.stream-ref")),
config.getString("akka.stream.blocking-io-dispatcher"))
config.getString(ActorAttributes.IODispatcher.dispatcher))
}
/**
@ -383,7 +383,7 @@ final class ActorMaterializerSettings @InternalApi private (
this(initialInputBufferSize, maxInputBufferSize, dispatcher, supervisionDecider, subscriptionTimeoutSettings, debugLogging,
outputBurstLimit, fuzzingMode, autoFusing, maxFixedBufferSize, syncProcessingLimit, ioSettings,
StreamRefSettings(ConfigFactory.defaultReference().getConfig("akka.stream.materializer.stream-ref")),
ConfigFactory.defaultReference().getString("akka.stream.blocking-io-dispatcher")
ConfigFactory.defaultReference().getString(ActorAttributes.IODispatcher.dispatcher)
)
// backwards compatibility when added IOSettings, shouldn't be needed since private, but added to satisfy mima
@ -404,7 +404,7 @@ final class ActorMaterializerSettings @InternalApi private (
this(initialInputBufferSize, maxInputBufferSize, dispatcher, supervisionDecider, subscriptionTimeoutSettings, debugLogging,
outputBurstLimit, fuzzingMode, autoFusing, maxFixedBufferSize, syncProcessingLimit,
IOSettings(tcpWriteBufferSize = 16 * 1024), StreamRefSettings(ConfigFactory.defaultReference().getConfig("akka.stream.materializer.stream-ref")),
ConfigFactory.defaultReference().getString("akka.stream.blocking-io-dispatcher")
ConfigFactory.defaultReference().getString(ActorAttributes.IODispatcher.dispatcher)
)
// backwards compatibility when added IOSettings, shouldn't be needed since private, but added to satisfy mima
@ -424,7 +424,7 @@ final class ActorMaterializerSettings @InternalApi private (
this(initialInputBufferSize, maxInputBufferSize, dispatcher, supervisionDecider, subscriptionTimeoutSettings, debugLogging,
outputBurstLimit, fuzzingMode, autoFusing, maxFixedBufferSize, 1000, IOSettings(tcpWriteBufferSize = 16 * 1024),
StreamRefSettings(ConfigFactory.defaultReference().getConfig("akka.stream.materializer.stream-ref")),
ConfigFactory.defaultReference().getString("akka.stream.blocking-io-dispatcher")
ConfigFactory.defaultReference().getString(ActorAttributes.IODispatcher.dispatcher)
)
private def copy(

View file

@ -11,7 +11,6 @@ import scala.annotation.tailrec
import scala.reflect.{ ClassTag, classTag }
import akka.japi.function
import java.net.URLEncoder
import java.util.concurrent.TimeUnit
import akka.annotation.InternalApi
import akka.stream.impl.TraversalBuilder
@ -71,7 +70,7 @@ final case class Attributes(attributeList: List[Attributes.Attribute] = Nil) {
* This is the expected way for stages to access attributes.
*/
def getAttribute[T <: Attribute](c: Class[T]): Optional[T] =
(attributeList.collectFirst { case attr if c.isInstance(attr) c.cast(attr) }).asJava
attributeList.collectFirst { case attr if c.isInstance(attr) c.cast(attr) }.asJava
/**
* Scala API: Get the most specific attribute value for a given Attribute type or subclass thereof or
@ -129,7 +128,7 @@ final case class Attributes(attributeList: List[Attributes.Attribute] = Nil) {
find(attributeList) match {
case OptionVal.Some(t) t.asInstanceOf[T]
case OptionVal.None throw new IllegalStateException(s"Mandatory attribute ${c} not found")
case OptionVal.None throw new IllegalStateException(s"Mandatory attribute [$c] not found")
}
}
@ -334,7 +333,7 @@ object Attributes {
* Passing in null as any of the arguments sets the level to its default value, which is:
* `Debug` for `onElement` and `onFinish`, and `Error` for `onFailure`.
*/
def createLogLevels(onElement: Logging.LogLevel, onFinish: Logging.LogLevel, onFailure: Logging.LogLevel) =
def createLogLevels(onElement: Logging.LogLevel, onFinish: Logging.LogLevel, onFailure: Logging.LogLevel): Attributes =
logLevels(
onElement = Option(onElement).getOrElse(Logging.DebugLevel),
onFinish = Option(onFinish).getOrElse(Logging.DebugLevel),
@ -365,10 +364,34 @@ object Attributes {
object ActorAttributes {
import Attributes._
final case class Dispatcher(dispatcher: String) extends MandatoryAttribute
object Dispatcher {
/**
* INTERNAL API
* Resolves the dispatcher's name with a fallback to the default blocking IO dispatcher.
* Note that `IODispatcher.dispatcher` is not used here as the config used to create [[ActorMaterializerSettings]]
* is not easily accessible, instead the name is taken from `settings.blockingIoDispatcher`
*/
@InternalApi
private[akka] def resolve(attributes: Attributes, settings: ActorMaterializerSettings): String =
attributes.mandatoryAttribute[Dispatcher] match {
case IODispatcher settings.blockingIoDispatcher
case Dispatcher(dispatcher) dispatcher
}
/**
* INTERNAL API
* Resolves the dispatcher name with a fallback to the default blocking IO dispatcher.
*/
@InternalApi
private[akka] def resolve(context: MaterializationContext): String =
resolve(context.effectiveAttributes, ActorMaterializerHelper.downcast(context.materializer).settings)
}
final case class SupervisionStrategy(decider: Supervision.Decider) extends MandatoryAttribute
// this is actually a config key that needs reading and itself will contain the actual dispatcher name
val IODispatcher: Dispatcher = ActorAttributes.Dispatcher("akka.stream.blocking-io-dispatcher")
val IODispatcher: Dispatcher = ActorAttributes.Dispatcher("akka.stream.materializer.blocking-io-dispatcher")
/**
* Specifies the name of the dispatcher. This also adds an async boundary.
@ -402,7 +425,7 @@ object ActorAttributes {
* Passing in null as any of the arguments sets the level to its default value, which is:
* `Debug` for `onElement` and `onFinish`, and `Error` for `onFailure`.
*/
def createLogLevels(onElement: Logging.LogLevel, onFinish: Logging.LogLevel, onFailure: Logging.LogLevel) =
def createLogLevels(onElement: Logging.LogLevel, onFinish: Logging.LogLevel, onFailure: Logging.LogLevel): Attributes =
logLevels(
onElement = Option(onElement).getOrElse(Logging.DebugLevel),
onFinish = Option(onFinish).getOrElse(Logging.DebugLevel),

View file

@ -43,13 +43,15 @@ import scala.concurrent.{ Await, ExecutionContextExecutor }
* INTERNAL API
*/
@InternalApi private[akka] override def actorOf(context: MaterializationContext, props: Props): ActorRef = {
val effectiveProps =
if (props.dispatcher == Dispatchers.DefaultDispatcherId)
val effectiveProps = props.dispatcher match {
case Dispatchers.DefaultDispatcherId
props.withDispatcher(context.effectiveAttributes.mandatoryAttribute[ActorAttributes.Dispatcher].dispatcher)
else if (props.dispatcher == ActorAttributes.IODispatcher.dispatcher)
case ActorAttributes.IODispatcher.dispatcher
// this one is actually not a dispatcher but a relative config key pointing containing the actual dispatcher name
props.withDispatcher(settings.blockingIoDispatcher)
else props
case _ props
}
actorOf(effectiveProps, context.islandName)
}
@ -174,7 +176,7 @@ private[akka] class SubFusingActorMaterializerImpl(val delegate: ExtendedActorMa
@InternalApi private[akka] class StreamSupervisor(settings: ActorMaterializerSettings, haveShutDown: AtomicBoolean) extends Actor {
import akka.stream.impl.StreamSupervisor._
override def supervisorStrategy = SupervisorStrategy.stoppingStrategy
override def supervisorStrategy: SupervisorStrategy = SupervisorStrategy.stoppingStrategy
def receive = {
case Materialize(props, name)

View file

@ -4,7 +4,6 @@
package akka.stream.impl
import java.util
import java.util.ArrayList
import java.util.concurrent.atomic.AtomicBoolean
import akka.NotUsed
@ -12,10 +11,8 @@ import akka.actor.{ ActorContext, ActorRef, ActorRefFactory, ActorSystem, Cancel
import akka.annotation.{ DoNotInherit, InternalApi }
import akka.dispatch.Dispatchers
import akka.event.{ Logging, LoggingAdapter }
import akka.stream.ActorAttributes.Dispatcher
import akka.stream.Attributes.InputBuffer
import akka.stream._
import akka.stream.impl.Stages.DefaultAttributes.IODispatcher
import akka.stream.impl.StreamLayout.AtomicModule
import akka.stream.impl.fusing.ActorGraphInterpreter.{ ActorOutputBoundary, BatchingActorInputBoundary }
import akka.stream.impl.fusing.GraphInterpreter.Connection
@ -150,12 +147,12 @@ private final case class SavedIslandData(islandGlobalOffset: Int, lastVisitedOff
private var currentIslandGlobalOffset = 0
// The number of slots that belong to segments of other islands encountered so far, from the
// beginning of the island
private var currentIslandSkippetSlots = 0
private var currentIslandSkippedSlots = 0
private var segments: java.util.ArrayList[SegmentInfo] = null
private var activePhases: java.util.ArrayList[PhaseIsland[Any]] = null
private var forwardWires: java.util.ArrayList[ForwardWire] = null
private var islandStateStack: java.util.ArrayList[SavedIslandData] = null
private var segments: java.util.ArrayList[SegmentInfo] = _
private var activePhases: java.util.ArrayList[PhaseIsland[Any]] = _
private var forwardWires: java.util.ArrayList[ForwardWire] = _
private var islandStateStack: java.util.ArrayList[SavedIslandData] = _
private var currentPhase: PhaseIsland[Any] = defaultPhase.apply(settings, attributes, materializer, nextIslandName())
@ -176,7 +173,7 @@ private final case class SavedIslandData(islandGlobalOffset: Int, lastVisitedOff
globalislandOffset = currentIslandGlobalOffset,
length = currentGlobalOffset - currentSegmentGlobalOffset,
globalBaseOffset = currentSegmentGlobalOffset,
relativeBaseOffset = currentSegmentGlobalOffset - currentIslandGlobalOffset - currentIslandSkippetSlots,
relativeBaseOffset = currentSegmentGlobalOffset - currentIslandGlobalOffset - currentIslandSkippedSlots,
currentPhase)
// Segment tracking is by demand, we only allocate this list if it is used.
@ -193,7 +190,7 @@ private final case class SavedIslandData(islandGlobalOffset: Int, lastVisitedOff
completeSegment()
val previousPhase = currentPhase
val previousIslandOffset = currentIslandGlobalOffset
islandStateStack.add(SavedIslandData(previousIslandOffset, currentGlobalOffset, currentIslandSkippetSlots, previousPhase))
islandStateStack.add(SavedIslandData(previousIslandOffset, currentGlobalOffset, currentIslandSkippedSlots, previousPhase))
currentPhase = phases(tag)(settings, attributes, materializer, nextIslandName())
activePhases.add(currentPhase)
@ -203,7 +200,7 @@ private final case class SavedIslandData(islandGlobalOffset: Int, lastVisitedOff
// The base offset of this segment is the current global offset
currentSegmentGlobalOffset = currentGlobalOffset
currentIslandSkippetSlots = 0
currentIslandSkippedSlots = 0
if (Debug) println(s"Entering island starting at offset = $currentIslandGlobalOffset phase = $currentPhase")
}
@ -217,7 +214,7 @@ private final case class SavedIslandData(islandGlobalOffset: Int, lastVisitedOff
// We restore data for the island
currentIslandGlobalOffset = parentIsland.islandGlobalOffset
currentPhase = parentIsland.phase
currentIslandSkippetSlots = parentIsland.skippedSlots + (currentGlobalOffset - parentIsland.lastVisitedOffset)
currentIslandSkippedSlots = parentIsland.skippedSlots + (currentGlobalOffset - parentIsland.lastVisitedOffset)
if (Debug) println(s"Exited to island starting at offset = $currentIslandGlobalOffset phase = $currentPhase")
}
@ -225,7 +222,7 @@ private final case class SavedIslandData(islandGlobalOffset: Int, lastVisitedOff
@InternalApi private[akka] def wireIn(in: InPort, logic: Any): Unit = {
// The slot for this InPort always belong to the current segment, so resolving its local
// offset/slot is simple
val localInSlot = currentGlobalOffset - currentIslandGlobalOffset - currentIslandSkippetSlots
val localInSlot = currentGlobalOffset - currentIslandGlobalOffset - currentIslandSkippedSlots
if (Debug) println(s" wiring port $in inOffs absolute = $currentGlobalOffset local = $localInSlot")
// Assign the logic belonging to the current port to its calculated local slot in the island
@ -278,8 +275,8 @@ private final case class SavedIslandData(islandGlobalOffset: Int, lastVisitedOff
if (absoluteOffset >= currentSegmentGlobalOffset) {
// Wiring is in the same segment, no complex lookup needed
val localInSlot = absoluteOffset - currentIslandGlobalOffset - currentIslandSkippetSlots
if (Debug) println(s" in-segment wiring to local ($absoluteOffset - $currentIslandGlobalOffset - $currentIslandSkippetSlots) = $localInSlot")
val localInSlot = absoluteOffset - currentIslandGlobalOffset - currentIslandSkippedSlots
if (Debug) println(s" in-segment wiring to local ($absoluteOffset - $currentIslandGlobalOffset - $currentIslandSkippedSlots) = $localInSlot")
currentPhase.assignPort(out, localInSlot, logic)
} else {
// Wiring is cross-segment, but we don't know if it is cross-island or not yet
@ -383,7 +380,7 @@ private final case class SavedIslandData(islandGlobalOffset: Int, lastVisitedOff
* When these attributes are needed later in the materialization process it is important that the
* they are gotten through the attributes and not through the [[ActorMaterializerSettings]]
*/
val defaultAttributes = {
val defaultAttributes: Attributes = {
Attributes(
Attributes.InputBuffer(settings.initialInputBufferSize, settings.maxInputBufferSize) ::
ActorAttributes.SupervisionStrategy(settings.supervisionDecider) ::
@ -609,7 +606,7 @@ private final case class SavedIslandData(islandGlobalOffset: Int, lastVisitedOff
subflowFuser: OptionVal[GraphInterpreterShell ActorRef]) extends PhaseIsland[GraphStageLogic] {
// TODO: remove these
private val logicArrayType = Array.empty[GraphStageLogic]
private[this] val logics = new ArrayList[GraphStageLogic](16)
private[this] val logics = new util.ArrayList[GraphStageLogic](16)
private var connections = new Array[Connection](16)
private var maxConnections = 0
@ -734,18 +731,13 @@ private final case class SavedIslandData(islandGlobalOffset: Int, lastVisitedOff
shell.logics = logics.toArray(logicArrayType)
subflowFuser match {
case OptionVal.Some(fuseIntoExistingInterperter)
fuseIntoExistingInterperter(shell)
case OptionVal.Some(fuseIntoExistingInterpreter)
fuseIntoExistingInterpreter(shell)
case _
val dispatcher =
effectiveAttributes.mandatoryAttribute[ActorAttributes.Dispatcher] match {
case ActorAttributes.IODispatcher settings.blockingIoDispatcher
case ActorAttributes.Dispatcher(dispatcher) dispatcher
}
val props = ActorGraphInterpreter.props(shell)
.withDispatcher(dispatcher)
val props = ActorGraphInterpreter.props(shell).withDispatcher(ActorAttributes.Dispatcher.resolve(effectiveAttributes, settings))
val actorName = fullIslandName match {
case OptionVal.Some(n) n
case OptionVal.None islandName
@ -887,10 +879,7 @@ private final case class SavedIslandData(islandGlobalOffset: Int, lastVisitedOff
def materializeAtomic(mod: AtomicModule[Shape, Any], attributes: Attributes): (NotUsed, Any) = {
val tls = mod.asInstanceOf[TlsModule]
val dispatcher = attributes.mandatoryAttribute[Dispatcher] match {
case IODispatcher materializer.settings.blockingIoDispatcher
case Dispatcher(name) name
}
val dispatcher = ActorAttributes.Dispatcher.resolve(attributes, materializer.settings)
val maxInputBuffer = attributes.mandatoryAttribute[Attributes.InputBuffer].max
val props =

View file

@ -7,10 +7,9 @@ import java.io.OutputStream
import java.nio.file.{ OpenOption, Path }
import akka.annotation.InternalApi
import akka.stream.ActorAttributes.Dispatcher
import akka.stream._
import akka.stream.impl.SinkModule
import akka.stream.impl.Stages.DefaultAttributes.IODispatcher
import akka.stream.ActorAttributes.Dispatcher
import akka.util.ByteString
import scala.collection.immutable
@ -34,12 +33,8 @@ import scala.concurrent.{ Future, Promise }
val ioResultPromise = Promise[IOResult]()
val props = FileSubscriber.props(f, ioResultPromise, maxInputBufferSize, startPosition, options)
val dispatcher = context.effectiveAttributes.mandatoryAttribute[Dispatcher] match {
case IODispatcher ActorMaterializerHelper.downcast(context.materializer).settings.blockingIoDispatcher
case Dispatcher(name) name
}
val ref = materializer.actorOf(context, props.withDispatcher(Dispatcher.resolve(context)))
val ref = materializer.actorOf(context, props.withDispatcher(dispatcher))
(akka.stream.actor.ActorSubscriber[ByteString](ref), ioResultPromise.future)
}
@ -65,12 +60,9 @@ import scala.concurrent.{ Future, Promise }
val maxInputBufferSize = context.effectiveAttributes.mandatoryAttribute[Attributes.InputBuffer].max
val dispatcher = context.effectiveAttributes.mandatoryAttribute[Dispatcher] match {
case IODispatcher ActorMaterializerHelper.downcast(context.materializer).settings.blockingIoDispatcher
case Dispatcher(name) name
}
val props = OutputStreamSubscriber.props(os, ioResultPromise, maxInputBufferSize, autoFlush).withDispatcher(dispatcher)
val props = OutputStreamSubscriber
.props(os, ioResultPromise, maxInputBufferSize, autoFlush)
.withDispatcher(Dispatcher.resolve(context))
val ref = materializer.actorOf(context, props)
(akka.stream.actor.ActorSubscriber[ByteString](ref), ioResultPromise.future)

View file

@ -12,7 +12,6 @@ import akka.Done
import akka.annotation.InternalApi
import akka.stream.ActorAttributes.Dispatcher
import akka.stream.Attributes.InputBuffer
import akka.stream.impl.Stages.DefaultAttributes.IODispatcher
import akka.stream.impl.{ ErrorPublisher, SourceModule }
import akka.stream.stage._
import akka.stream.{ IOResult, _ }
@ -155,12 +154,9 @@ private[akka] final class FileSource(path: Path, chunkSize: Int, startPosition:
val pub = try {
val is = createInputStream() // can throw, i.e. FileNotFound
val dispatcher = context.effectiveAttributes.mandatoryAttribute[Dispatcher] match {
case IODispatcher ActorMaterializerHelper.downcast(context.materializer).settings.blockingIoDispatcher
case Dispatcher(name) name
}
val props = InputStreamPublisher.props(is, ioResultPromise, chunkSize).withDispatcher(dispatcher)
val props = InputStreamPublisher
.props(is, ioResultPromise, chunkSize)
.withDispatcher(Dispatcher.resolve(context))
val ref = materializer.actorOf(context, props)
akka.stream.actor.ActorPublisher[ByteString](ref)

View file

@ -26,7 +26,7 @@ object FileIO {
* Materializes a [[java.util.concurrent.CompletionStage]] of [[IOResult]] that will be completed with the size of the file (in bytes) at the streams completion,
* and a possible exception if IO operation was not completed successfully.
*
* You can configure the default dispatcher for this Source by changing the `akka.stream.blocking-io-dispatcher` or
* You can configure the default dispatcher for this Source by changing the `akka.stream.materializer.blocking-io-dispatcher` or
* set it for a given Source by using [[ActorAttributes]].
*
* @param f The file to write to
@ -42,7 +42,7 @@ object FileIO {
* Materializes a [[java.util.concurrent.CompletionStage]] of [[IOResult]] that will be completed with the size of the file (in bytes) at the streams completion,
* and a possible exception if IO operation was not completed successfully.
*
* You can configure the default dispatcher for this Source by changing the `akka.stream.blocking-io-dispatcher` or
* You can configure the default dispatcher for this Source by changing the `akka.stream.materializer.blocking-io-dispatcher` or
* set it for a given Source by using [[ActorAttributes]].
*
* Accepts as arguments a set of [[java.nio.file.StandardOpenOption]], which will determine
@ -62,7 +62,7 @@ object FileIO {
* Materializes a [[java.util.concurrent.CompletionStage]] of [[IOResult]] that will be completed with the size of the file (in bytes) at the streams completion,
* and a possible exception if IO operation was not completed successfully.
*
* You can configure the default dispatcher for this Source by changing the `akka.stream.blocking-io-dispatcher` or
* You can configure the default dispatcher for this Source by changing the `akka.stream.materializer.blocking-io-dispatcher` or
* set it for a given Source by using [[ActorAttributes]].
*
* @param f The file to write to
@ -78,7 +78,7 @@ object FileIO {
* Materializes a [[java.util.concurrent.CompletionStage]] of [[IOResult]] that will be completed with the size of the file (in bytes) at the streams completion,
* and a possible exception if IO operation was not completed successfully.
*
* You can configure the default dispatcher for this Source by changing the `akka.stream.blocking-io-dispatcher` or
* You can configure the default dispatcher for this Source by changing the `akka.stream.materializer.blocking-io-dispatcher` or
* set it for a given Source by using [[ActorAttributes]].
*
* Accepts as arguments a set of [[java.nio.file.StandardOpenOption]], which will determine
@ -99,7 +99,7 @@ object FileIO {
* Materializes a [[java.util.concurrent.CompletionStage]] of [[IOResult]] that will be completed with the size of the file (in bytes) at the streams completion,
* and a possible exception if IO operation was not completed successfully.
*
* You can configure the default dispatcher for this Source by changing the `akka.stream.blocking-io-dispatcher` or
* You can configure the default dispatcher for this Source by changing the `akka.stream.materializer.blocking-io-dispatcher` or
* set it for a given Source by using [[ActorAttributes]].
*
* Accepts as arguments a set of [[java.nio.file.StandardOpenOption]], which will determine
@ -120,7 +120,7 @@ object FileIO {
* Emitted elements are [[ByteString]] elements, chunked by default by 8192 bytes,
* except the last element, which will be up to 8192 in size.
*
* You can configure the default dispatcher for this Source by changing the `akka.stream.blocking-io-dispatcher` or
* You can configure the default dispatcher for this Source by changing the `akka.stream.materializer.blocking-io-dispatcher` or
* set it for a given Source by using [[ActorAttributes]].
*
* It materializes a [[java.util.concurrent.CompletionStage]] of [[IOResult]] containing the number of bytes read from the source file upon completion,
@ -136,7 +136,7 @@ object FileIO {
* Emitted elements are [[ByteString]] elements, chunked by default by 8192 bytes,
* except the last element, which will be up to 8192 in size.
*
* You can configure the default dispatcher for this Source by changing the `akka.stream.blocking-io-dispatcher` or
* You can configure the default dispatcher for this Source by changing the `akka.stream.materializer.blocking-io-dispatcher` or
* set it for a given Source by using [[ActorAttributes]].
*
* It materializes a [[java.util.concurrent.CompletionStage]] of [[IOResult]] containing the number of bytes read from the source file upon completion,
@ -151,7 +151,7 @@ object FileIO {
* Emitted elements are `chunkSize` sized [[ByteString]] elements,
* except the last element, which will be up to `chunkSize` in size.
*
* You can configure the default dispatcher for this Source by changing the `akka.stream.blocking-io-dispatcher` or
* You can configure the default dispatcher for this Source by changing the `akka.stream.materializer.blocking-io-dispatcher` or
* set it for a given Source by using [[ActorAttributes]].
*
* It materializes a [[java.util.concurrent.CompletionStage]] of [[IOResult]] containing the number of bytes read from the source file upon completion,
@ -168,7 +168,7 @@ object FileIO {
* Emitted elements are `chunkSize` sized [[ByteString]] elements,
* except the last element, which will be up to `chunkSize` in size.
*
* You can configure the default dispatcher for this Source by changing the `akka.stream.blocking-io-dispatcher` or
* You can configure the default dispatcher for this Source by changing the `akka.stream.materializer.blocking-io-dispatcher` or
* set it for a given Source by using [[ActorAttributes]].
*
* It materializes a [[java.util.concurrent.CompletionStage]] of [[IOResult]] containing the number of bytes read from the source file upon completion,
@ -185,7 +185,7 @@ object FileIO {
* Emitted elements are `chunkSize` sized [[ByteString]] elements,
* except the last element, which will be up to `chunkSize` in size.
*
* You can configure the default dispatcher for this Source by changing the `akka.stream.blocking-io-dispatcher` or
* You can configure the default dispatcher for this Source by changing the `akka.stream.materializer.blocking-io-dispatcher` or
* set it for a given Source by using [[ActorAttributes]].
*
* It materializes a [[java.util.concurrent.CompletionStage]] of [[IOResult]] containing the number of bytes read from the source file upon completion,

View file

@ -416,7 +416,7 @@ object Source {
* `Restart` supervision strategy will close and create blocking IO again. Default strategy is `Stop` which means
* that stream will be terminated on error in `read` function by default.
*
* You can configure the default dispatcher for this Source by changing the `akka.stream.blocking-io-dispatcher` or
* You can configure the default dispatcher for this Source by changing the `akka.stream.materializer.blocking-io-dispatcher` or
* set it for a given Source by using [[ActorAttributes]].
*
* Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute.
@ -444,7 +444,7 @@ object Source {
* `Restart` supervision strategy will close and create resource. Default strategy is `Stop` which means
* that stream will be terminated on error in `read` function (or future) by default.
*
* You can configure the default dispatcher for this Source by changing the `akka.stream.blocking-io-dispatcher` or
* You can configure the default dispatcher for this Source by changing the `akka.stream.materializer.blocking-io-dispatcher` or
* set it for a given Source by using [[ActorAttributes]].
*
* Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute.

View file

@ -23,7 +23,7 @@ object StreamConverters {
* Materializes a [[CompletionStage]] of [[IOResult]] that will be completed with the size of the file (in bytes) at the streams completion,
* and a possible exception if IO operation was not completed successfully.
*
* You can configure the default dispatcher for this Source by changing the `akka.stream.blocking-io-dispatcher` or
* You can configure the default dispatcher for this Source by changing the `akka.stream.materializer.blocking-io-dispatcher` or
* set it for a given Source by using [[akka.stream.ActorAttributes]].
*
* This method uses no auto flush for the [[java.io.OutputStream]] @see [[#fromOutputStream(function.Creator, Boolean)]] if you want to override it.
@ -41,7 +41,7 @@ object StreamConverters {
* Materializes a [[CompletionStage]] of [[IOResult]] that will be completed with the size of the file (in bytes) at the streams completion,
* and a possible exception if IO operation was not completed successfully.
*
* You can configure the default dispatcher for this Source by changing the `akka.stream.blocking-io-dispatcher` or
* You can configure the default dispatcher for this Source by changing the `akka.stream.materializer.blocking-io-dispatcher` or
* set it for a given Source by using [[akka.stream.ActorAttributes]].
*
* The [[OutputStream]] will be closed when the stream flowing into this [[Sink]] is completed. The [[Sink]]
@ -62,7 +62,7 @@ object StreamConverters {
*
* This Sink is intended for inter-operation with legacy APIs since it is inherently blocking.
*
* You can configure the default dispatcher for this Source by changing the `akka.stream.blocking-io-dispatcher` or
* You can configure the default dispatcher for this Source by changing the `akka.stream.materializer.blocking-io-dispatcher` or
* set it for a given Source by using [[akka.stream.ActorAttributes]].
*
* The [[InputStream]] will be closed when the stream flowing into this [[Sink]] completes, and
@ -76,7 +76,7 @@ object StreamConverters {
*
* This Sink is intended for inter-operation with legacy APIs since it is inherently blocking.
*
* You can configure the default dispatcher for this Source by changing the `akka.stream.blocking-io-dispatcher` or
* You can configure the default dispatcher for this Source by changing the `akka.stream.materializer.blocking-io-dispatcher` or
* set it for a given Source by using [[akka.stream.ActorAttributes]].
*
* The [[InputStream]] will be closed when the stream flowing into this [[Sink]] completes, and
@ -94,7 +94,7 @@ object StreamConverters {
* [[java.io.InputStream]] returns on each read invocation. Such chunks will
* never be larger than chunkSize though.
*
* You can configure the default dispatcher for this Source by changing the `akka.stream.blocking-io-dispatcher` or
* You can configure the default dispatcher for this Source by changing the `akka.stream.materializer.blocking-io-dispatcher` or
* set it for a given Source by using [[akka.stream.ActorAttributes]].
*
* It materializes a [[CompletionStage]] containing the number of bytes read from the source file upon completion.
@ -111,7 +111,7 @@ object StreamConverters {
* [[java.io.InputStream]] returns on each read invocation. Such chunks will
* never be larger than chunkSize though.
*
* You can configure the default dispatcher for this Source by changing the `akka.stream.blocking-io-dispatcher` or
* You can configure the default dispatcher for this Source by changing the `akka.stream.materializer.blocking-io-dispatcher` or
* set it for a given Source by using [[akka.stream.ActorAttributes]].
*
* It materializes a [[CompletionStage]] of [[IOResult]] containing the number of bytes read from the source file upon completion,
@ -127,7 +127,7 @@ object StreamConverters {
*
* This Source is intended for inter-operation with legacy APIs since it is inherently blocking.
*
* You can configure the default dispatcher for this Source by changing the `akka.stream.blocking-io-dispatcher` or
* You can configure the default dispatcher for this Source by changing the `akka.stream.materializer.blocking-io-dispatcher` or
* set it for a given Source by using [[akka.stream.ActorAttributes]].
*
* The created [[OutputStream]] will be closed when the [[Source]] is cancelled, and closing the [[OutputStream]]
@ -145,7 +145,7 @@ object StreamConverters {
*
* This Source is intended for inter-operation with legacy APIs since it is inherently blocking.
*
* You can configure the default dispatcher for this Source by changing the `akka.stream.blocking-io-dispatcher` or
* You can configure the default dispatcher for this Source by changing the `akka.stream.materializer.blocking-io-dispatcher` or
* set it for a given Source by using [[akka.stream.ActorAttributes]].
*
* The created [[OutputStream]] will be closed when the [[Source]] is cancelled, and closing the [[OutputStream]]

View file

@ -4,7 +4,7 @@
package akka.stream.scaladsl
import java.io.File
import java.nio.file.{ OpenOption, Path, StandardOpenOption }
import java.nio.file.{ OpenOption, Path }
import java.nio.file.StandardOpenOption._
import akka.stream.impl.Stages.DefaultAttributes
@ -20,14 +20,13 @@ import scala.concurrent.Future
object FileIO {
import Sink.{ shape sinkShape }
import Source.{ shape sourceShape }
/**
* Creates a Source from a files contents.
* Emitted elements are `chunkSize` sized [[akka.util.ByteString]] elements,
* except the final element, which will be up to `chunkSize` in size.
*
* You can configure the default dispatcher for this Source by changing the `akka.stream.blocking-io-dispatcher` or
* You can configure the default dispatcher for this Source by changing the `akka.stream.materializer.blocking-io-dispatcher` or
* set it for a given Source by using [[akka.stream.ActorAttributes]].
*
* It materializes a [[Future]] of [[IOResult]] containing the number of bytes read from the source file upon completion,
@ -45,7 +44,7 @@ object FileIO {
* Emitted elements are `chunkSize` sized [[akka.util.ByteString]] elements,
* except the final element, which will be up to `chunkSize` in size.
*
* You can configure the default dispatcher for this Source by changing the `akka.stream.blocking-io-dispatcher` or
* You can configure the default dispatcher for this Source by changing the `akka.stream.materializer.blocking-io-dispatcher` or
* set it for a given Source by using [[akka.stream.ActorAttributes]].
*
* It materializes a [[Future]] of [[IOResult]] containing the number of bytes read from the source file upon completion,
@ -62,7 +61,7 @@ object FileIO {
* Emitted elements are `chunkSize` sized [[akka.util.ByteString]] elements,
* except the final element, which will be up to `chunkSize` in size.
*
* You can configure the default dispatcher for this Source by changing the `akka.stream.blocking-io-dispatcher` or
* You can configure the default dispatcher for this Source by changing the `akka.stream.materializer.blocking-io-dispatcher` or
* set it for a given Source by using [[ActorAttributes]].
*
* It materializes a [[Future]] of [[IOResult]] containing the number of bytes read from the source file upon completion,

View file

@ -631,7 +631,7 @@ object Source {
* `Restart` supervision strategy will close and create blocking IO again. Default strategy is `Stop` which means
* that stream will be terminated on error in `read` function by default.
*
* You can configure the default dispatcher for this Source by changing the `akka.stream.blocking-io-dispatcher` or
* You can configure the default dispatcher for this Source by changing the `akka.stream.materializer.blocking-io-dispatcher` or
* set it for a given Source by using [[ActorAttributes]].
*
* Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute.
@ -654,7 +654,7 @@ object Source {
* `Restart` supervision strategy will close and create resource. Default strategy is `Stop` which means
* that stream will be terminated on error in `read` function (or future) by default.
*
* You can configure the default dispatcher for this Source by changing the `akka.stream.blocking-io-dispatcher` or
* You can configure the default dispatcher for this Source by changing the `akka.stream.materializer.blocking-io-dispatcher` or
* set it for a given Source by using [[ActorAttributes]].
*
* Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute.

View file

@ -5,7 +5,6 @@ package akka.stream.scaladsl
import java.io.{ OutputStream, InputStream }
import java.util.Spliterators
import java.util.concurrent.atomic.AtomicReference
import java.util.stream.{ Collector, StreamSupport }
import akka.stream.{ Attributes, SinkShape, IOResult }
@ -34,7 +33,7 @@ object StreamConverters {
* [[java.io.InputStream]] returns on each read invocation. Such chunks will
* never be larger than chunkSize though.
*
* You can configure the default dispatcher for this Source by changing the `akka.stream.blocking-io-dispatcher` or
* You can configure the default dispatcher for this Source by changing the `akka.stream.materializer.blocking-io-dispatcher` or
* set it for a given Source by using [[akka.stream.ActorAttributes]].
*
* It materializes a [[Future]] of [[IOResult]] containing the number of bytes read from the source file upon completion,
@ -54,7 +53,7 @@ object StreamConverters {
*
* This Source is intended for inter-operation with legacy APIs since it is inherently blocking.
*
* You can configure the default dispatcher for this Source by changing the `akka.stream.blocking-io-dispatcher` or
* You can configure the default dispatcher for this Source by changing the `akka.stream.materializer.blocking-io-dispatcher` or
* set it for a given Source by using [[akka.stream.ActorAttributes]].
*
* The created [[OutputStream]] will be closed when the [[Source]] is cancelled, and closing the [[OutputStream]]
@ -71,7 +70,7 @@ object StreamConverters {
* Materializes a [[Future]] of [[IOResult]] that will be completed with the size of the file (in bytes) at the streams completion,
* and a possible exception if IO operation was not completed successfully.
*
* You can configure the default dispatcher for this Source by changing the `akka.stream.blocking-io-dispatcher` or
* You can configure the default dispatcher for this Source by changing the `akka.stream.materializer.blocking-io-dispatcher` or
* set it for a given Source by using [[akka.stream.ActorAttributes]].
* If `autoFlush` is true the OutputStream will be flushed whenever a byte array is written, defaults to false.
*
@ -87,7 +86,7 @@ object StreamConverters {
*
* This Sink is intended for inter-operation with legacy APIs since it is inherently blocking.
*
* You can configure the default dispatcher for this Source by changing the `akka.stream.blocking-io-dispatcher` or
* You can configure the default dispatcher for this Source by changing the `akka.stream.materializer.blocking-io-dispatcher` or
* set it for a given Source by using [[akka.stream.ActorAttributes]].
*
* The [[InputStream]] will be closed when the stream flowing into this [[Sink]] completes, and
@ -169,7 +168,7 @@ object StreamConverters {
.mapMaterializedValue(queue StreamSupport.stream(
Spliterators.spliteratorUnknownSize(new java.util.Iterator[T] {
var nextElementFuture: Future[Option[T]] = queue.pull()
var nextElement: Option[T] = null
var nextElement: Option[T] = _
override def hasNext: Boolean = {
nextElement = Await.result(nextElementFuture, Inf)