diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/AttributesSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/AttributesSpec.scala index e1a227fd54..65be467fc6 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/AttributesSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/AttributesSpec.scala @@ -126,6 +126,12 @@ class AttributesSpec extends StreamSpec(ConfigFactory.parseString( attributes.get[Name] should ===(Some(Attributes.Name("b"))) } + "return a mandatory value without allocating a some" in { + val attributes = Attributes.inputBuffer(2, 2) + + attributes.mandatoryAttribute[InputBuffer] should ===(InputBuffer(2, 2)) + } + } "attributes on a graph stage" must { @@ -485,5 +491,4 @@ class AttributesSpec extends StreamSpec(ConfigFactory.parseString( } } - } diff --git a/akka-stream/src/main/mima-filters/2.5.7.backwards.excludes b/akka-stream/src/main/mima-filters/2.5.7.backwards.excludes index d9a746cdfd..00c723a4be 100644 --- a/akka-stream/src/main/mima-filters/2.5.7.backwards.excludes +++ b/akka-stream/src/main/mima-filters/2.5.7.backwards.excludes @@ -1,2 +1,14 @@ # Attributes overhaul -ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.Graph.async") \ No newline at end of file +ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.Graph.async") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.ActorMaterializer.effectiveSettings") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.ActorProcessorImpl.this") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.FanoutProcessorImpl.this") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.PhasedFusingActorMaterializer.defaultInitialAttributes") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.PhasedFusingActorMaterializer.effectiveSettings") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.Phase.apply") +ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.impl.Phase.apply") +ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.stream.impl.TlsModulePhase.this") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.FanoutProcessorImpl.props") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.IslandTracking.this") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.GraphStageIsland.this") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.fusing.GraphInterpreterShell.this") \ No newline at end of file diff --git a/akka-stream/src/main/scala/akka/stream/ActorMaterializer.scala b/akka-stream/src/main/scala/akka/stream/ActorMaterializer.scala index b647c11516..bcd25edfe3 100644 --- a/akka-stream/src/main/scala/akka/stream/ActorMaterializer.scala +++ b/akka-stream/src/main/scala/akka/stream/ActorMaterializer.scala @@ -3,7 +3,6 @@ */ package akka.stream -import java.util.Locale import java.util.concurrent.TimeUnit import java.util.concurrent.atomic.AtomicBoolean @@ -16,7 +15,6 @@ import com.typesafe.config.Config import scala.concurrent.duration._ import akka.japi.function -import akka.stream.impl.fusing.GraphInterpreterShell import akka.stream.stage.GraphStageLogic import scala.util.control.NoStackTrace @@ -24,9 +22,9 @@ import scala.util.control.NoStackTrace object ActorMaterializer { /** - * Scala API: Creates an ActorMaterializer which will execute every step of a transformation - * pipeline within its own [[akka.actor.Actor]]. The required [[akka.actor.ActorRefFactory]] - * (which can be either an [[akka.actor.ActorSystem]] or an [[akka.actor.ActorContext]]) + * Scala API: Creates an ActorMaterializer that can materialize stream blueprints as running streams. + * + * The required [[akka.actor.ActorRefFactory]] (which can be either an [[akka.actor.ActorSystem]] or an [[akka.actor.ActorContext]]) * will be used to create one actor that in turn creates actors for the transformation steps. * * The materializer's [[akka.stream.ActorMaterializerSettings]] will be obtained from the @@ -44,8 +42,9 @@ object ActorMaterializer { } /** - * Scala API: Creates an ActorMaterializer which will execute every step of a transformation - * pipeline within its own [[akka.actor.Actor]]. The required [[akka.actor.ActorRefFactory]] + * Scala API: Creates an ActorMaterializer that can materialize stream blueprints as running streams. + * + * The required [[akka.actor.ActorRefFactory]] * (which can be either an [[akka.actor.ActorSystem]] or an [[akka.actor.ActorContext]]) * will be used to create these actors, therefore it is *forbidden* to pass this object * to another actor if the factory is an ActorContext. @@ -68,8 +67,9 @@ object ActorMaterializer { } /** - * Scala API: Creates an ActorMaterializer which will execute every step of a transformation - * pipeline within its own [[akka.actor.Actor]]. The required [[akka.actor.ActorRefFactory]] + * Scala API: * Scala API: Creates an ActorMaterializer that can materialize stream blueprints as running streams. + * + * The required [[akka.actor.ActorRefFactory]] * (which can be either an [[akka.actor.ActorSystem]] or an [[akka.actor.ActorContext]]) * will be used to create these actors, therefore it is *forbidden* to pass this object * to another actor if the factory is an ActorContext. @@ -98,8 +98,9 @@ object ActorMaterializer { } /** - * Java API: Creates an ActorMaterializer which will execute every step of a transformation - * pipeline within its own [[akka.actor.Actor]]. The required [[akka.actor.ActorRefFactory]] + * Java API: Creates an ActorMaterializer that can materialize stream blueprints as running streams. + * + * The required [[akka.actor.ActorRefFactory]] * (which can be either an [[akka.actor.ActorSystem]] or an [[akka.actor.ActorContext]]) * will be used to create these actors, therefore it is *forbidden* to pass this object * to another actor if the factory is an ActorContext. @@ -111,8 +112,9 @@ object ActorMaterializer { apply()(context) /** - * Java API: Creates an ActorMaterializer which will execute every step of a transformation - * pipeline within its own [[akka.actor.Actor]]. The required [[akka.actor.ActorRefFactory]] + * Java API: Creates an ActorMaterializer that can materialize stream blueprints as running streams. + * + * The required [[akka.actor.ActorRefFactory]] * (which can be either an [[akka.actor.ActorSystem]] or an [[akka.actor.ActorContext]]) * will be used to create one actor that in turn creates actors for the transformation steps. */ @@ -120,8 +122,9 @@ object ActorMaterializer { apply(Option(settings), None)(context) /** - * Java API: Creates an ActorMaterializer which will execute every step of a transformation - * pipeline within its own [[akka.actor.Actor]]. The required [[akka.actor.ActorRefFactory]] + * Java API: Creates an ActorMaterializer that can materialize stream blueprints as running streams. + * + * The required [[akka.actor.ActorRefFactory]] * (which can be either an [[akka.actor.ActorSystem]] or an [[akka.actor.ActorContext]]) * will be used to create these actors, therefore it is *forbidden* to pass this object * to another actor if the factory is an ActorContext. @@ -162,18 +165,12 @@ private[akka] object ActorMaterializerHelper { } /** - * An ActorMaterializer takes the list of transformations comprising a - * [[akka.stream.scaladsl.Flow]] and materializes them in the form of - * [[org.reactivestreams.Processor]] instances. How transformation - * steps are split up into asynchronous regions is implementation - * dependent. + * An ActorMaterializer takes a stream blueprint and turns it into a running stream. */ abstract class ActorMaterializer extends Materializer with MaterializerLoggingProvider { def settings: ActorMaterializerSettings - def effectiveSettings(opAttr: Attributes): ActorMaterializerSettings - /** * Shuts down this materializer and all the stages that have been materialized through this materializer. After * having shut down, this materializer cannot be used again. Any attempt to materialize stages after having @@ -313,6 +310,12 @@ object ActorMaterializerSettings { * Please refer to the `withX` methods for descriptions of the individual settings. */ final class ActorMaterializerSettings private ( + /* + * Important note: `initialInputBufferSize`, `maxInputBufferSize`, `dispatcher` and + * `supervisionDecider` must not be used as values in the materializer, or anything the materializer phases use + * since these settings allow for overriding using [[Attributes]]. They must always be gotten from the effective + * attributes. + */ val initialInputBufferSize: Int, val maxInputBufferSize: Int, val dispatcher: String, @@ -384,9 +387,12 @@ final class ActorMaterializerSettings private ( /** * Each asynchronous piece of a materialized stream topology is executed by one Actor * that manages an input buffer for all inlets of its shape. This setting configures - * the initial and maximal input buffer in number of elements for each inlet. + * the default for initial and maximal input buffer in number of elements for each inlet. + * This can be overridden for individual parts of the + * stream topology by using [[akka.stream.Attributes#inputBuffer]]. * - * FIXME: Currently only the initialSize is used, auto-tuning is not yet implemented. + * FIXME: this is used for all kinds of buffers, not only the stream actor, some use initial some use max, + * document and or fix if it should not be like that. Search for get[Attributes.InputBuffer] to see how it is used */ def withInputBuffer(initialSize: Int, maxSize: Int): ActorMaterializerSettings = { if (initialSize == this.initialInputBufferSize && maxSize == this.maxInputBufferSize) this @@ -407,6 +413,9 @@ final class ActorMaterializerSettings private ( * Scala API: Decides how exceptions from application code are to be handled, unless * overridden for specific flows of the stream operations with * [[akka.stream.Attributes#supervisionStrategy]]. + * + * Note that supervision in streams are implemented on a per stage basis and is not supported + * by every stage. */ def withSupervisionStrategy(decider: Supervision.Decider): ActorMaterializerSettings = { if (decider eq this.supervisionDecider) this @@ -417,6 +426,9 @@ final class ActorMaterializerSettings private ( * Java API: Decides how exceptions from application code are to be handled, unless * overridden for specific flows of the stream operations with * [[akka.stream.Attributes#supervisionStrategy]]. + * + * Note that supervision in streams are implemented on a per stage basis and is not supported + * by every stage. */ def withSupervisionStrategy(decider: function.Function[Throwable, Supervision.Directive]): ActorMaterializerSettings = { import Supervision._ diff --git a/akka-stream/src/main/scala/akka/stream/Attributes.scala b/akka-stream/src/main/scala/akka/stream/Attributes.scala index 3cfefc6e17..c9d57c04ae 100644 --- a/akka-stream/src/main/scala/akka/stream/Attributes.scala +++ b/akka-stream/src/main/scala/akka/stream/Attributes.scala @@ -10,13 +10,13 @@ import akka.event.Logging import scala.annotation.tailrec import scala.reflect.{ ClassTag, classTag } import akka.japi.function -import akka.stream.impl.StreamLayout._ import java.net.URLEncoder +import akka.annotation.InternalApi import akka.stream.impl.TraversalBuilder import scala.compat.java8.OptionConverters._ -import akka.util.ByteString +import akka.util.{ ByteString, OptionVal } /** * Holds attributes which can be used to alter [[akka.stream.scaladsl.Flow]] / [[akka.stream.javadsl.Flow]] @@ -26,12 +26,17 @@ import akka.util.ByteString * * The ``attributeList`` is ordered with the most specific attribute first, least specific last. * Note that the order was the opposite in Akka 2.4.x. + * + * Stages should in general not access the `attributeList` but instead use `get` to get the expected + * value of an attribute. */ final case class Attributes(attributeList: List[Attributes.Attribute] = Nil) { import Attributes._ /** + * Note that this must only be used during traversal building and not during materialization + * as it will then always return true because of the defaults from the ActorMaterializerSettings * INTERNAL API */ private[stream] def isAsync: Boolean = { @@ -43,74 +48,36 @@ final case class Attributes(attributeList: List[Attributes.Attribute] = Nil) { } /** - * Java API + * Java API: Get the most specific attribute value for a given Attribute type or subclass thereof. + * If no such attribute exists, return a `default` value. * - * The list is ordered with the most specific attribute first, least specific last. - * Note that the order was the opposite in Akka 2.4.x. - */ - def getAttributeList(): java.util.List[Attribute] = { - import scala.collection.JavaConverters._ - attributeList.asJava - } - - /** - * Java API: Get all attributes of a given `Class` or - * subclass thereof. + * The most specific value is the value that was added closest to the graph or stage itself or if + * the same attribute was added multiple times to the same graph, the last to be added. * - * The list is ordered with the most specific attribute first, least specific last. - * Note that the order was the opposite in Akka 2.4.x. - */ - def getAttributeList[T <: Attribute](c: Class[T]): java.util.List[T] = - if (attributeList.isEmpty) java.util.Collections.emptyList() - else { - val result = new java.util.ArrayList[T] - attributeList.foreach { a ⇒ - if (c.isInstance(a)) - result.add(c.cast(a)) - } - result - } - - /** - * Java API: Get the most specific attribute (added last) of a given `Class` or subclass thereof. - * If no such attribute exists the `default` value is returned. + * This is the expected way for stages to access attributes. */ def getAttribute[T <: Attribute](c: Class[T], default: T): T = getAttribute(c).orElse(default) /** - * Java API: Get the least specific attribute (added first) of a given `Class` or subclass thereof. - * If no such attribute exists the `default` value is returned. - */ - def getFirstAttribute[T <: Attribute](c: Class[T], default: T): T = - getFirstAttribute(c).orElse(default) - - /** - * Java API: Get the most specific attribute (added last) of a given `Class` or subclass thereof. + * Java API: Get the most specific attribute value for a given Attribute type or subclass thereof. + * + * The most specific value is the value that was added closest to the graph or stage itself or if + * the same attribute was added multiple times to the same graph, the last to be added. + * + * This is the expected way for stages to access attributes. */ def getAttribute[T <: Attribute](c: Class[T]): Optional[T] = (attributeList.collectFirst { case attr if c.isInstance(attr) ⇒ c.cast(attr) }).asJava /** - * Java API: Get the least specific attribute (added first) of a given `Class` or subclass thereof. - */ - def getFirstAttribute[T <: Attribute](c: Class[T]): Optional[T] = - attributeList.reverseIterator.collectFirst { case attr if c.isInstance(attr) ⇒ c cast attr }.asJava - - /** - * Scala API: get all attributes of a given type (or subtypes thereof). + * Scala API: Get the most specific attribute value for a given Attribute type or subclass thereof or + * if no such attribute exists, return a default value. * - * The list is ordered with the most specific attribute first, least specific last. - * Note that the order was the opposite in Akka 2.4.x. - */ - def filtered[T <: Attribute: ClassTag]: List[T] = { - val c = implicitly[ClassTag[T]].runtimeClass.asInstanceOf[Class[T]] - attributeList.collect { case attr if c.isAssignableFrom(attr.getClass) ⇒ c.cast(attr) } - } - - /** - * Scala API: Get the most specific attribute (added last) of a given type parameter T `Class` or subclass thereof. - * If no such attribute exists the `default` value is returned. + * The most specific value is the value that was added closest to the graph or stage itself or if + * the same attribute was added multiple times to the same graph, the last to be added. + * + * This is the expected way for stages to access attributes. */ def get[T <: Attribute: ClassTag](default: T): T = get[T] match { @@ -119,18 +86,14 @@ final case class Attributes(attributeList: List[Attributes.Attribute] = Nil) { } /** - * Scala API: Get the least specific attribute (added first) of a given type parameter T `Class` or subclass thereof. - * If no such attribute exists the `default` value is returned. - */ - def getFirst[T <: Attribute: ClassTag](default: T): T = { - getFirst[T] match { - case Some(a) ⇒ a - case None ⇒ default - } - } - - /** - * Scala API: Get the most specific attribute (added last) of a given type parameter T `Class` or subclass thereof. + * Scala API: Get the most specific attribute value for a given Attribute type or subclass thereof. + * + * The most specific value is the value that was added closest to the graph or stage itself or if + * the same attribute was added multiple times to the same graph, the last to be added. + * + * This is the expected way for stages to access attributes. + * + * @see [[Attributes#get()]] For providing a default value if the attribute was not set */ def get[T <: Attribute: ClassTag]: Option[T] = { val c = classTag[T].runtimeClass.asInstanceOf[Class[T]] @@ -138,17 +101,34 @@ final case class Attributes(attributeList: List[Attributes.Attribute] = Nil) { } /** - * Scala API: Get the least specific attribute (added first) of a given type parameter T `Class` or subclass thereof. + * Scala API: Get the most specific of one of the mandatory attributes. Mandatory attributes are guaranteed + * to always be among the attributes when the attributes are coming from a materialization. */ - def getFirst[T <: Attribute: ClassTag]: Option[T] = { + def mandatoryAttribute[T <: MandatoryAttribute: ClassTag]: T = { val c = classTag[T].runtimeClass.asInstanceOf[Class[T]] - attributeList.reverseIterator.collectFirst { case attr if c.isInstance(attr) ⇒ c.cast(attr) } + getMandatoryAttribute(c) } /** - * Test whether the given attribute is contained within this attributes list. + * Java API: Get the most specific of one of the mandatory attributes. Mandatory attributes are guaranteed + * to always be among the attributes when the attributes are coming from a materialization. + * + * @param c A class that is a subtype of [[MandatoryAttribute]] */ - def contains(attr: Attribute): Boolean = attributeList.contains(attr) + def getMandatoryAttribute[T <: MandatoryAttribute](c: Class[T]): T = { + @tailrec + def find(list: List[Attribute]): OptionVal[Attribute] = list match { + case Nil ⇒ OptionVal.None + case head :: tail ⇒ + if (c.isInstance(head)) OptionVal.Some(head) + else find(tail) + } + + find(attributeList) match { + case OptionVal.Some(t) ⇒ t.asInstanceOf[T] + case OptionVal.None ⇒ throw new IllegalStateException(s"Mandatory attribute ${c} not found") + } + } /** * Adds given attributes. Added attributes are considered more specific than @@ -192,7 +172,7 @@ final case class Attributes(attributeList: List[Attributes.Attribute] = Nil) { /** * INTERNAL API */ - def nameOrDefault(default: String = "unnamed"): String = { + @InternalApi def nameOrDefault(default: String = "unnamed"): String = { @tailrec def find(attrs: List[Attribute]): String = attrs match { case Attributes.Name(name) :: _ ⇒ name case _ :: tail ⇒ find(tail) @@ -201,6 +181,99 @@ final case class Attributes(attributeList: List[Attributes.Attribute] = Nil) { find(attributeList) } + /** + * Test whether the given attribute is contained within this attributes list. + * + * Note that stages in general should not inspect the whole hierarchy but instead use + * `get` to get the most specific attribute value. + */ + def contains(attr: Attribute): Boolean = attributeList.contains(attr) + + /** + * Java API + * + * The list is ordered with the most specific attribute first, least specific last. + * Note that the order was the opposite in Akka 2.4.x. + * + * Note that stages in general should not inspect the whole hierarchy but instead use + * `get` to get the most specific attribute value. + */ + def getAttributeList(): java.util.List[Attribute] = { + import scala.collection.JavaConverters._ + attributeList.asJava + } + + /** + * Java API: Get all attributes of a given `Class` or + * subclass thereof. + * + * The list is ordered with the most specific attribute first, least specific last. + * Note that the order was the opposite in Akka 2.4.x. + * + * Note that stages in general should not inspect the whole hierarchy but instead use + * `get` to get the most specific attribute value. + */ + def getAttributeList[T <: Attribute](c: Class[T]): java.util.List[T] = + if (attributeList.isEmpty) java.util.Collections.emptyList() + else { + val result = new java.util.ArrayList[T] + attributeList.foreach { a ⇒ + if (c.isInstance(a)) + result.add(c.cast(a)) + } + result + } + + /** + * Scala API: Get all attributes of a given type (or subtypes thereof). + * + * Note that stages in general should not inspect the whole hierarchy but instead use + * `get` to get the most specific attribute value. + * + * The list is ordered with the most specific attribute first, least specific last. + * Note that the order was the opposite in Akka 2.4.x. + */ + def filtered[T <: Attribute: ClassTag]: List[T] = { + val c = implicitly[ClassTag[T]].runtimeClass.asInstanceOf[Class[T]] + attributeList.collect { case attr if c.isAssignableFrom(attr.getClass) ⇒ c.cast(attr) } + } + + /** + * Java API: Get the least specific attribute (added first) of a given `Class` or subclass thereof. + * If no such attribute exists the `default` value is returned. + */ + @deprecated("Attributes should always be most specific, use getAttribute[T]", "2.5.7") + def getFirstAttribute[T <: Attribute](c: Class[T], default: T): T = + getFirstAttribute(c).orElse(default) + + /** + * Java API: Get the least specific attribute (added first) of a given `Class` or subclass thereof. + */ + @deprecated("Attributes should always be most specific, use get[T]", "2.5.7") + def getFirstAttribute[T <: Attribute](c: Class[T]): Optional[T] = + attributeList.reverseIterator.collectFirst { case attr if c.isInstance(attr) ⇒ c cast attr }.asJava + + /** + * Scala API: Get the least specific attribute (added first) of a given type parameter T `Class` or subclass thereof. + * If no such attribute exists the `default` value is returned. + */ + @deprecated("Attributes should always be most specific, use get[T]", "2.5.7") + def getFirst[T <: Attribute: ClassTag](default: T): T = { + getFirst[T] match { + case Some(a) ⇒ a + case None ⇒ default + } + } + + /** + * Scala API: Get the least specific attribute (added first) of a given type parameter T `Class` or subclass thereof. + */ + @deprecated("Attributes should always be most specific, use get[T]", "2.5.7") + def getFirst[T <: Attribute: ClassTag]: Option[T] = { + val c = classTag[T].runtimeClass.asInstanceOf[Class[T]] + attributeList.reverseIterator.collectFirst { case attr if c.isInstance(attr) ⇒ c.cast(attr) } + } + } /** @@ -209,8 +282,11 @@ final case class Attributes(attributeList: List[Attributes.Attribute] = Nil) { object Attributes { trait Attribute + + sealed trait MandatoryAttribute extends Attribute + final case class Name(n: String) extends Attribute - final case class InputBuffer(initial: Int, max: Int) extends Attribute + final case class InputBuffer(initial: Int, max: Int) extends MandatoryAttribute final case class LogLevels(onElement: Logging.LogLevel, onFinish: Logging.LogLevel, onFailure: Logging.LogLevel) extends Attribute final case object AsyncBoundary extends Attribute @@ -285,8 +361,8 @@ object Attributes { */ object ActorAttributes { import Attributes._ - final case class Dispatcher(dispatcher: String) extends Attribute - final case class SupervisionStrategy(decider: Supervision.Decider) extends Attribute + final case class Dispatcher(dispatcher: String) extends MandatoryAttribute + final case class SupervisionStrategy(decider: Supervision.Decider) extends MandatoryAttribute val IODispatcher: Dispatcher = ActorAttributes.Dispatcher("akka.stream.default-blocking-io-dispatcher") diff --git a/akka-stream/src/main/scala/akka/stream/Materializer.scala b/akka-stream/src/main/scala/akka/stream/Materializer.scala index 13fea06216..f4b3825592 100644 --- a/akka-stream/src/main/scala/akka/stream/Materializer.scala +++ b/akka-stream/src/main/scala/akka/stream/Materializer.scala @@ -4,9 +4,11 @@ package akka.stream import akka.actor.Cancellable +import akka.annotation.InternalApi +import akka.stream.ActorAttributes.Dispatcher +import akka.stream.Attributes.InputBuffer import scala.concurrent.ExecutionContextExecutor - import scala.concurrent.duration.FiniteDuration /** @@ -40,10 +42,14 @@ abstract class Materializer { /** * This method interprets the given Flow description and creates the running - * stream using an explicitly provided [[Attributes]] as top level attributes. The result can be highly - * implementation specific, ranging from local actor chains to remote-deployed processing networks. + * stream using an explicitly provided [[Attributes]] as top level (least specific) attributes that + * will be defaults for the materialized stream. + * The result can be highly implementation specific, ranging from local actor chains to remote-deployed + * processing networks. */ - def materialize[Mat](runnable: Graph[ClosedShape, Mat], initialAttributes: Attributes): Mat + def materialize[Mat]( + runnable: Graph[ClosedShape, Mat], + @deprecatedName('initialAttributes) defaultAttributes: Attributes): Mat /** * Running a flow graph will require execution resources, as will computations @@ -76,12 +82,13 @@ abstract class Materializer { /** * INTERNAL API */ +@InternalApi private[akka] object NoMaterializer extends Materializer { override def withNamePrefix(name: String): Materializer = throw new UnsupportedOperationException("NoMaterializer cannot be named") override def materialize[Mat](runnable: Graph[ClosedShape, Mat]): Mat = throw new UnsupportedOperationException("NoMaterializer cannot materialize") - override def materialize[Mat](runnable: Graph[ClosedShape, Mat], initialAttributes: Attributes): Mat = + override def materialize[Mat](runnable: Graph[ClosedShape, Mat], defaultAttributes: Attributes): Mat = throw new UnsupportedOperationException("NoMaterializer cannot materialize") override def executionContext: ExecutionContextExecutor = @@ -95,10 +102,12 @@ private[akka] object NoMaterializer extends Materializer { } /** - * * Context parameter to the `create` methods of sources and sinks. + * + * INTERNAL API */ -case class MaterializationContext( +@InternalApi +private[akka] case class MaterializationContext( materializer: Materializer, effectiveAttributes: Attributes, islandName: String) diff --git a/akka-stream/src/main/scala/akka/stream/impl/ActorMaterializerImpl.scala b/akka-stream/src/main/scala/akka/stream/impl/ActorMaterializerImpl.scala index f20285d499..d351beae6d 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/ActorMaterializerImpl.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/ActorMaterializerImpl.scala @@ -7,6 +7,7 @@ import java.util.concurrent.atomic.AtomicBoolean import akka.actor._ import akka.annotation.{ DoNotInherit, InternalApi } +import akka.dispatch.Dispatchers import akka.event.LoggingAdapter import akka.pattern.ask import akka.stream._ @@ -29,12 +30,12 @@ import scala.concurrent.{ Await, ExecutionContextExecutor } /** INTERNAL API */ @InternalApi def materialize[Mat]( _runnableGraph: Graph[ClosedShape, Mat], - initialAttributes: Attributes): Mat + defaultAttributes: Attributes): Mat /** INTERNAL API */ @InternalApi private[akka] def materialize[Mat]( graph: Graph[ClosedShape, Mat], - initialAttributes: Attributes, + defaultAttributes: Attributes, defaultPhase: Phase[Any], phases: Map[IslandTag, Phase[Any]]): Mat @@ -42,10 +43,13 @@ import scala.concurrent.{ Await, ExecutionContextExecutor } * INTERNAL API */ @InternalApi private[akka] override def actorOf(context: MaterializationContext, props: Props): ActorRef = { - val dispatcher = - if (props.deploy.dispatcher == Deploy.NoDispatcherGiven) effectiveSettings(context.effectiveAttributes).dispatcher - else props.dispatcher - actorOf(props.withDispatcher(dispatcher), context.islandName) + // if the props already have a dispatcher set we respect that, if not + // we take it from the attributes + val effectiveProps = + if (props.dispatcher == Dispatchers.DefaultDispatcherId) + props.withDispatcher(context.effectiveAttributes.mandatoryAttribute[ActorAttributes.Dispatcher].dispatcher) + else props + actorOf(effectiveProps, context.islandName) } /** @@ -88,8 +92,9 @@ import scala.concurrent.{ Await, ExecutionContextExecutor } */ private[akka] class SubFusingActorMaterializerImpl(val delegate: ExtendedActorMaterializer, registerShell: GraphInterpreterShell ⇒ ActorRef) extends Materializer { val subFusingPhase = new Phase[Any] { - override def apply(settings: ActorMaterializerSettings, materializer: PhasedFusingActorMaterializer, islandName: String): PhaseIsland[Any] = { - new GraphStageIsland(settings, materializer, islandName, OptionVal(registerShell)).asInstanceOf[PhaseIsland[Any]] + override def apply(settings: ActorMaterializerSettings, attributes: Attributes, + materializer: PhasedFusingActorMaterializer, islandName: String): PhaseIsland[Any] = { + new GraphStageIsland(settings, attributes, materializer, islandName, OptionVal(registerShell)).asInstanceOf[PhaseIsland[Any]] } } @@ -98,18 +103,18 @@ private[akka] class SubFusingActorMaterializerImpl(val delegate: ExtendedActorMa override def materialize[Mat](runnable: Graph[ClosedShape, Mat]): Mat = delegate match { case am: PhasedFusingActorMaterializer ⇒ - materialize(runnable, am.defaultInitialAttributes) + materialize(runnable, am.defaultAttributes) case other ⇒ throw new IllegalStateException(s"SubFusing only supported by [PhasedFusingActorMaterializer], " + s"yet was used with [${other.getClass.getName}]!") } - override def materialize[Mat](runnable: Graph[ClosedShape, Mat], initialAttributes: Attributes): Mat = { + override def materialize[Mat](runnable: Graph[ClosedShape, Mat], defaultAttributes: Attributes): Mat = { if (PhasedFusingActorMaterializer.Debug) println(s"Using [${getClass.getSimpleName}] to materialize [${runnable}]") val phases = PhasedFusingActorMaterializer.DefaultPhases - delegate.materialize(runnable, initialAttributes, subFusingPhase, phases) + delegate.materialize(runnable, defaultAttributes, subFusingPhase, phases) } override def scheduleOnce(delay: FiniteDuration, task: Runnable): Cancellable = delegate.scheduleOnce(delay, task) diff --git a/akka-stream/src/main/scala/akka/stream/impl/ActorProcessor.scala b/akka-stream/src/main/scala/akka/stream/impl/ActorProcessor.scala index ff4312544d..d8c9cf5bf2 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/ActorProcessor.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/ActorProcessor.scala @@ -5,7 +5,7 @@ package akka.stream.impl import akka.actor._ import akka.annotation.InternalApi -import akka.stream.{ AbruptTerminationException, ActorMaterializerSettings } +import akka.stream.{ AbruptTerminationException, ActorMaterializerSettings, Attributes } import akka.stream.actor.ActorSubscriber.OnSubscribe import akka.stream.actor.ActorSubscriberMessage.{ OnComplete, OnError, OnNext } import org.reactivestreams.{ Processor, Subscriber, Subscription } @@ -248,13 +248,16 @@ import akka.event.Logging /** * INTERNAL API */ -@InternalApi private[akka] abstract class ActorProcessorImpl(val settings: ActorMaterializerSettings) +@InternalApi private[akka] abstract class ActorProcessorImpl(attributes: Attributes, val settings: ActorMaterializerSettings) extends Actor with ActorLogging with Pump { - protected val primaryInputs: Inputs = new BatchingInputBuffer(settings.initialInputBufferSize, this) { - override def inputOnError(e: Throwable): Unit = ActorProcessorImpl.this.onError(e) + protected val primaryInputs: Inputs = { + val initialInputBufferSize = attributes.mandatoryAttribute[Attributes.InputBuffer].initial + new BatchingInputBuffer(initialInputBufferSize, this) { + override def inputOnError(e: Throwable): Unit = ActorProcessorImpl.this.onError(e) + } } protected val primaryOutputs: Outputs = new SimpleOutputs(self, this) diff --git a/akka-stream/src/main/scala/akka/stream/impl/FanoutProcessor.scala b/akka-stream/src/main/scala/akka/stream/impl/FanoutProcessor.scala index 0711fe3b37..bce39c2a59 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/FanoutProcessor.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/FanoutProcessor.scala @@ -2,7 +2,7 @@ package akka.stream.impl import akka.actor.{ Actor, ActorRef, Deploy, Props } import akka.annotation.{ DoNotInherit, InternalApi } -import akka.stream.ActorMaterializerSettings +import akka.stream.{ ActorMaterializerSettings, Attributes } import org.reactivestreams.Subscriber /** @@ -97,19 +97,21 @@ import org.reactivestreams.Subscriber * INTERNAL API */ @InternalApi private[akka] object FanoutProcessorImpl { - def props(actorMaterializerSettings: ActorMaterializerSettings): Props = - Props(new FanoutProcessorImpl(actorMaterializerSettings)).withDeploy(Deploy.local) + def props(attributes: Attributes, actorMaterializerSettings: ActorMaterializerSettings): Props = + Props(new FanoutProcessorImpl(attributes, actorMaterializerSettings)).withDeploy(Deploy.local) } /** * INTERNAL API */ -@InternalApi private[akka] class FanoutProcessorImpl(_settings: ActorMaterializerSettings) - extends ActorProcessorImpl(_settings) { +@InternalApi private[akka] class FanoutProcessorImpl(attributes: Attributes, _settings: ActorMaterializerSettings) + extends ActorProcessorImpl(attributes, _settings) { - override val primaryOutputs: FanoutOutputs = - new FanoutOutputs(settings.maxInputBufferSize, settings.initialInputBufferSize, self, this) { + override val primaryOutputs: FanoutOutputs = { + val inputBuffer = attributes.mandatoryAttribute[Attributes.InputBuffer] + new FanoutOutputs(inputBuffer.max, inputBuffer.initial, self, this) { override def afterShutdown(): Unit = afterFlush() } + } val running: TransferPhase = TransferPhase(primaryInputs.NeedsInput && primaryOutputs.NeedsDemand) { () ⇒ primaryOutputs.enqueueOutputElement(primaryInputs.dequeueInputElement()) diff --git a/akka-stream/src/main/scala/akka/stream/impl/Modules.scala b/akka-stream/src/main/scala/akka/stream/impl/Modules.scala index 5fb884a893..a590ea0a61 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/Modules.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/Modules.scala @@ -6,6 +6,7 @@ package akka.stream.impl import akka.NotUsed import akka.actor._ import akka.annotation.{ DoNotInherit, InternalApi } +import akka.dispatch.Dispatchers import akka.stream._ import akka.stream.impl.StreamLayout.AtomicModule import org.reactivestreams._ diff --git a/akka-stream/src/main/scala/akka/stream/impl/PhasedFusingActorMaterializer.scala b/akka-stream/src/main/scala/akka/stream/impl/PhasedFusingActorMaterializer.scala index 07a457bdb6..b81fe24a8f 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/PhasedFusingActorMaterializer.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/PhasedFusingActorMaterializer.scala @@ -36,29 +36,34 @@ import akka.util.OptionVal val Debug = false val DefaultPhase: Phase[Any] = new Phase[Any] { - override def apply(settings: ActorMaterializerSettings, materializer: PhasedFusingActorMaterializer, islandName: String): PhaseIsland[Any] = - new GraphStageIsland(settings, materializer, islandName, subflowFuser = OptionVal.None).asInstanceOf[PhaseIsland[Any]] + override def apply(settings: ActorMaterializerSettings, effectiveAttributes: Attributes, + materializer: PhasedFusingActorMaterializer, islandName: String): PhaseIsland[Any] = + new GraphStageIsland(settings, effectiveAttributes, materializer, islandName, subflowFuser = OptionVal.None).asInstanceOf[PhaseIsland[Any]] } val DefaultPhases: Map[IslandTag, Phase[Any]] = Map[IslandTag, Phase[Any]]( SinkModuleIslandTag → new Phase[Any] { - override def apply(settings: ActorMaterializerSettings, materializer: PhasedFusingActorMaterializer, - islandName: String): PhaseIsland[Any] = + override def apply(settings: ActorMaterializerSettings, effectiveAttributes: Attributes, + materializer: PhasedFusingActorMaterializer, + islandName: String): PhaseIsland[Any] = new SinkModulePhase(materializer, islandName).asInstanceOf[PhaseIsland[Any]] }, SourceModuleIslandTag → new Phase[Any] { - override def apply(settings: ActorMaterializerSettings, materializer: PhasedFusingActorMaterializer, - islandName: String): PhaseIsland[Any] = + override def apply(settings: ActorMaterializerSettings, effectiveAttributes: Attributes, + materializer: PhasedFusingActorMaterializer, + islandName: String): PhaseIsland[Any] = new SourceModulePhase(materializer, islandName).asInstanceOf[PhaseIsland[Any]] }, ProcessorModuleIslandTag → new Phase[Any] { - override def apply(settings: ActorMaterializerSettings, materializer: PhasedFusingActorMaterializer, - islandName: String): PhaseIsland[Any] = + override def apply(settings: ActorMaterializerSettings, effectiveAttributes: Attributes, + materializer: PhasedFusingActorMaterializer, + islandName: String): PhaseIsland[Any] = new ProcessorModulePhase(materializer, islandName).asInstanceOf[PhaseIsland[Any]] }, TlsModuleIslandTag → new Phase[Any] { - def apply(settings: ActorMaterializerSettings, materializer: PhasedFusingActorMaterializer, islandName: String): PhaseIsland[Any] = - new TlsModulePhase(settings, materializer, islandName).asInstanceOf[PhaseIsland[Any]] + def apply(settings: ActorMaterializerSettings, effectiveAttributes: Attributes, + materializer: PhasedFusingActorMaterializer, islandName: String): PhaseIsland[Any] = + new TlsModulePhase(effectiveAttributes, materializer, islandName).asInstanceOf[PhaseIsland[Any]] }, GraphStageTag → DefaultPhase) @@ -125,6 +130,7 @@ private final case class SavedIslandData(islandGlobalOffset: Int, lastVisitedOff @InternalApi private[akka] class IslandTracking( val phases: Map[IslandTag, Phase[Any]], val settings: ActorMaterializerSettings, + attributes: Attributes, defaultPhase: Phase[Any], val materializer: PhasedFusingActorMaterializer, islandNamePrefix: String) { @@ -150,7 +156,7 @@ private final case class SavedIslandData(islandGlobalOffset: Int, lastVisitedOff private var forwardWires: java.util.ArrayList[ForwardWire] = null private var islandStateStack: java.util.ArrayList[SavedIslandData] = null - private var currentPhase: PhaseIsland[Any] = defaultPhase.apply(settings, materializer, nextIslandName()) + private var currentPhase: PhaseIsland[Any] = defaultPhase.apply(settings, attributes, materializer, nextIslandName()) @InternalApi private[akka] def getCurrentPhase: PhaseIsland[Any] = currentPhase @InternalApi private[akka] def getCurrentOffset: Int = currentGlobalOffset @@ -188,8 +194,7 @@ private final case class SavedIslandData(islandGlobalOffset: Int, lastVisitedOff val previousIslandOffset = currentIslandGlobalOffset islandStateStack.add(SavedIslandData(previousIslandOffset, currentGlobalOffset, currentIslandSkippetSlots, previousPhase)) - val effectiveSettings = materializer.effectiveSettings(attributes) - currentPhase = phases(tag)(effectiveSettings, materializer, nextIslandName()) + currentPhase = phases(tag)(settings, attributes, materializer, nextIslandName()) activePhases.add(currentPhase) // Resolve the phase to be used to materialize this island @@ -368,36 +373,23 @@ private final case class SavedIslandData(islandGlobalOffset: Int, lastVisitedOff private[this] def createFlowName(): String = flowNames.next() - /** INTERNAL API */ - private[akka] val defaultInitialAttributes = { - val a = Attributes( + /** + * Default attributes for the materializer, based on the [[ActorMaterializerSettings]] and + * are always seen as least specific, so any attribute specified in the graph "wins" over these. + * In addition to that this also guarantees that the attributes `InputBuffer`, `SupervisionStrategy`, + * and `Dispatcher` is _always_ present in the attributes. + * + * When these attributes are needed later in the materialization process it is important that the + * they are gotten through the attributes and not through the [[ActorMaterializerSettings]] + */ + val defaultAttributes = { + Attributes( Attributes.InputBuffer(settings.initialInputBufferSize, settings.maxInputBufferSize) :: ActorAttributes.SupervisionStrategy(settings.supervisionDecider) :: - Nil) - if (settings.dispatcher == Deploy.NoDispatcherGiven) a - else a and ActorAttributes.dispatcher(settings.dispatcher) - } - - override def effectiveSettings(opAttr: Attributes): ActorMaterializerSettings = { - import ActorAttributes._ - import Attributes._ - @tailrec def applyAttributes(attrs: List[Attribute], s: ActorMaterializerSettings, - inputBufferDone: Boolean, dispatcherDone: Boolean, supervisorDone: Boolean): ActorMaterializerSettings = { - attrs match { - case InputBuffer(initial, max) :: tail if !inputBufferDone ⇒ - applyAttributes(tail, s.withInputBuffer(initial, max), inputBufferDone = true, dispatcherDone, supervisorDone) - case Dispatcher(dispatcher) :: tail if !dispatcherDone ⇒ - applyAttributes(tail, s.withDispatcher(dispatcher), inputBufferDone, dispatcherDone = true, supervisorDone) - case SupervisionStrategy(decider) :: tail if !supervisorDone ⇒ - applyAttributes(tail, s.withSupervisionStrategy(decider), inputBufferDone, dispatcherDone, supervisorDone = true) - case _ if inputBufferDone || dispatcherDone || supervisorDone ⇒ s - case _ :: tail ⇒ - applyAttributes(tail, s, inputBufferDone, dispatcherDone, supervisorDone) - case Nil ⇒ - s - } - } - applyAttributes(opAttr.attributeList, settings, false, false, false) + ActorAttributes.Dispatcher( + if (settings.dispatcher == Deploy.NoDispatcherGiven) Dispatchers.DefaultDispatcherId + else settings.dispatcher + ) :: Nil) } override lazy val executionContext: ExecutionContextExecutor = dispatchers.lookup(settings.dispatcher match { @@ -412,28 +404,28 @@ private final case class SavedIslandData(islandGlobalOffset: Int, lastVisitedOff system.scheduler.scheduleOnce(delay, task)(executionContext) override def materialize[Mat](_runnableGraph: Graph[ClosedShape, Mat]): Mat = - materialize(_runnableGraph, defaultInitialAttributes) + materialize(_runnableGraph, defaultAttributes) override def materialize[Mat]( _runnableGraph: Graph[ClosedShape, Mat], - initialAttributes: Attributes): Mat = + defaultAttributes: Attributes): Mat = materialize( _runnableGraph, - initialAttributes, + defaultAttributes, PhasedFusingActorMaterializer.DefaultPhase, PhasedFusingActorMaterializer.DefaultPhases) override def materialize[Mat]( graph: Graph[ClosedShape, Mat], - initialAttributes: Attributes, + defaultAttributes: Attributes, defaultPhase: Phase[Any], phases: Map[IslandTag, Phase[Any]]): Mat = { - val islandTracking = new IslandTracking(phases, settings, defaultPhase, this, islandNamePrefix = createFlowName() + "-") + val islandTracking = new IslandTracking(phases, settings, defaultAttributes, defaultPhase, this, islandNamePrefix = createFlowName() + "-") var current: Traversal = graph.traversalBuilder.traversal val attributesStack = new java.util.ArrayDeque[Attributes](8) - attributesStack.addLast(initialAttributes and graph.traversalBuilder.attributes) + attributesStack.addLast(defaultAttributes and graph.traversalBuilder.attributes) val traversalStack = new java.util.ArrayDeque[Traversal](16) traversalStack.addLast(current) @@ -564,9 +556,10 @@ private final case class SavedIslandData(islandGlobalOffset: Int, lastVisitedOff */ @DoNotInherit private[akka] trait Phase[M] { def apply( - effectiveSettings: ActorMaterializerSettings, - materializer: PhasedFusingActorMaterializer, - islandName: String): PhaseIsland[M] + settings: ActorMaterializerSettings, + effectiveAttributes: Attributes, + materializer: PhasedFusingActorMaterializer, + islandName: String): PhaseIsland[M] } /** @@ -599,10 +592,11 @@ private final case class SavedIslandData(islandGlobalOffset: Int, lastVisitedOff * INTERNAL API */ @InternalApi private[akka] final class GraphStageIsland( - effectiveSettings: ActorMaterializerSettings, - materializer: PhasedFusingActorMaterializer, - islandName: String, - subflowFuser: OptionVal[GraphInterpreterShell ⇒ ActorRef]) extends PhaseIsland[GraphStageLogic] { + settings: ActorMaterializerSettings, + effectiveAttributes: Attributes, + materializer: PhasedFusingActorMaterializer, + islandName: String, + subflowFuser: OptionVal[GraphInterpreterShell ⇒ ActorRef]) extends PhaseIsland[GraphStageLogic] { // TODO: remove these private val logicArrayType = Array.empty[GraphStageLogic] private[this] val logics = new ArrayList[GraphStageLogic](16) @@ -615,7 +609,8 @@ private final case class SavedIslandData(islandGlobalOffset: Int, lastVisitedOff val shell = new GraphInterpreterShell( connections = null, logics = null, - effectiveSettings, + settings, + effectiveAttributes, materializer) override def name: String = "Fusing GraphStages phase" @@ -697,7 +692,7 @@ private final case class SavedIslandData(islandGlobalOffset: Int, lastVisitedOff override def takePublisher(slot: Int, publisher: Publisher[Any]): Unit = { val connection = conn(slot) // TODO: proper input port debug string (currently prints the stage) - val bufferSize = connection.inOwner.attributes.get[InputBuffer].get.max + val bufferSize = connection.inOwner.attributes.mandatoryAttribute[InputBuffer].max val boundary = new BatchingActorInputBoundary(bufferSize, shell, publisher, connection.inOwner.toString) logics.add(boundary) @@ -734,7 +729,7 @@ private final case class SavedIslandData(islandGlobalOffset: Int, lastVisitedOff case _ ⇒ val props = ActorGraphInterpreter.props(shell) - .withDispatcher(effectiveSettings.dispatcher) + .withDispatcher(effectiveAttributes.mandatoryAttribute[ActorAttributes.Dispatcher].dispatcher) val actorName = fullIslandName match { case OptionVal.Some(n) ⇒ n case OptionVal.None ⇒ islandName @@ -867,7 +862,7 @@ private final case class SavedIslandData(islandGlobalOffset: Int, lastVisitedOff /** * INTERNAL API */ -@InternalApi private[akka] final class TlsModulePhase(settings: ActorMaterializerSettings, materializer: PhasedFusingActorMaterializer, islandName: String) extends PhaseIsland[NotUsed] { +@InternalApi private[akka] final class TlsModulePhase(attributes: Attributes, materializer: PhasedFusingActorMaterializer, islandName: String) extends PhaseIsland[NotUsed] { def name: String = "TlsModulePhase" var tlsActor: ActorRef = _ @@ -876,8 +871,11 @@ private final case class SavedIslandData(islandGlobalOffset: Int, lastVisitedOff def materializeAtomic(mod: AtomicModule[Shape, Any], attributes: Attributes): (NotUsed, Any) = { val tls = mod.asInstanceOf[TlsModule] + val dispatcher = attributes.mandatoryAttribute[ActorAttributes.Dispatcher].dispatcher + val maxInputBuffer = attributes.mandatoryAttribute[Attributes.InputBuffer].max + val props = - TLSActor.props(settings, tls.createSSLEngine, tls.verifySession, tls.closing).withDispatcher(settings.dispatcher) + TLSActor.props(maxInputBuffer, tls.createSSLEngine, tls.verifySession, tls.closing).withDispatcher(dispatcher) tlsActor = materializer.actorOf(props, islandName) def factory(id: Int) = new ActorPublisher[Any](tlsActor) { override val wakeUpMsg = FanOut.SubstreamSubscribePending(id) diff --git a/akka-stream/src/main/scala/akka/stream/impl/Sinks.scala b/akka-stream/src/main/scala/akka/stream/impl/Sinks.scala index f6080066b0..b0a9cca2e9 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/Sinks.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/Sinks.scala @@ -113,7 +113,7 @@ import scala.collection.generic.CanBuildFrom val actorMaterializer = ActorMaterializerHelper.downcast(context.materializer) val impl = actorMaterializer.actorOf( context, - FanoutProcessorImpl.props(actorMaterializer.effectiveSettings(context.effectiveAttributes))) + FanoutProcessorImpl.props(context.effectiveAttributes, actorMaterializer.settings)) val fanoutProcessor = new ActorProcessor[In, In](impl) impl ! ExposedPublisher(fanoutProcessor.asInstanceOf[ActorPublisher[Any]]) // Resolve cyclic dependency with actor. This MUST be the first message no matter what. @@ -174,10 +174,10 @@ import scala.collection.generic.CanBuildFrom override def create(context: MaterializationContext) = { val actorMaterializer = ActorMaterializerHelper.downcast(context.materializer) - val effectiveSettings = actorMaterializer.effectiveSettings(context.effectiveAttributes) + val maxInputBufferSize = context.effectiveAttributes.mandatoryAttribute[Attributes.InputBuffer].max val subscriberRef = actorMaterializer.actorOf( context, - ActorRefSinkActor.props(ref, effectiveSettings.maxInputBufferSize, onCompleteMessage)) + ActorRefSinkActor.props(ref, maxInputBufferSize, onCompleteMessage)) (akka.stream.actor.ActorSubscriber[In](subscriberRef), NotUsed) } @@ -469,7 +469,7 @@ import scala.collection.generic.CanBuildFrom override def toString: String = "LazySink" override def createLogicAndMaterializedValue(inheritedAttributes: Attributes) = { - lazy val decider = inheritedAttributes.get[SupervisionStrategy].map(_.decider).getOrElse(stoppingDecider) + lazy val decider = inheritedAttributes.mandatoryAttribute[SupervisionStrategy].decider var completed = false val promise = Promise[M]() diff --git a/akka-stream/src/main/scala/akka/stream/impl/UnfoldResourceSource.scala b/akka-stream/src/main/scala/akka/stream/impl/UnfoldResourceSource.scala index 110715f500..296e5a3fa2 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/UnfoldResourceSource.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/UnfoldResourceSource.scala @@ -24,7 +24,7 @@ import scala.util.control.NonFatal override def initialAttributes: Attributes = DefaultAttributes.unfoldResourceSource def createLogic(inheritedAttributes: Attributes) = new GraphStageLogic(shape) with OutHandler { - lazy val decider = inheritedAttributes.get[SupervisionStrategy].map(_.decider).getOrElse(Supervision.stoppingDecider) + lazy val decider = inheritedAttributes.mandatoryAttribute[SupervisionStrategy].decider var open = false var blockingStream: S = _ setHandler(out, this) diff --git a/akka-stream/src/main/scala/akka/stream/impl/UnfoldResourceSourceAsync.scala b/akka-stream/src/main/scala/akka/stream/impl/UnfoldResourceSourceAsync.scala index f643e05687..4c9aee91a8 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/UnfoldResourceSourceAsync.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/UnfoldResourceSourceAsync.scala @@ -27,7 +27,7 @@ import scala.util.control.NonFatal override def initialAttributes: Attributes = DefaultAttributes.unfoldResourceSourceAsync def createLogic(inheritedAttributes: Attributes) = new GraphStageLogic(shape) with OutHandler { - lazy val decider = inheritedAttributes.get[SupervisionStrategy].map(_.decider).getOrElse(Supervision.stoppingDecider) + lazy val decider = inheritedAttributes.mandatoryAttribute[SupervisionStrategy].decider var resource = Promise[S]() var open = false implicit val context = ExecutionContexts.sameThreadExecutionContext diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/ActorGraphInterpreter.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/ActorGraphInterpreter.scala index 49202510ef..f845ab9371 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/ActorGraphInterpreter.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/ActorGraphInterpreter.scala @@ -442,6 +442,7 @@ import scala.util.control.NonFatal var connections: Array[Connection], var logics: Array[GraphStageLogic], settings: ActorMaterializerSettings, + attributes: Attributes, val mat: ExtendedActorMaterializer) { import ActorGraphInterpreter._ @@ -524,7 +525,7 @@ import scala.util.control.NonFatal * because no data can enter “fast enough” from the outside */ // TODO: Fix event limit heuristic - val shellEventLimit = settings.maxInputBufferSize * 16 + val shellEventLimit = attributes.mandatoryAttribute[Attributes.InputBuffer].max * 16 // Limits the number of events processed by the interpreter on an abort event. // TODO: Better heuristic here private val abortLimit = shellEventLimit * 2 diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphStages.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphStages.scala index 7adde88edc..92b5197a61 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphStages.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphStages.scala @@ -346,7 +346,7 @@ import scala.util.control.NonFatal def onFutureSourceCompleted(result: Try[Graph[SourceShape[T], M]]): Unit = { result.map { graph ⇒ val runnable = Source.fromGraph(graph).toMat(sinkIn.sink)(Keep.left) - val matVal = interpreter.subFusingMaterializer.materialize(runnable, initialAttributes = attr) + val matVal = interpreter.subFusingMaterializer.materialize(runnable, defaultAttributes = attr) materialized.success(matVal) setHandler(out, this) diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala index b16200f7c7..d11a2c4812 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala @@ -41,7 +41,7 @@ import akka.util.OptionVal override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) with InHandler with OutHandler { private def decider = - inheritedAttributes.get[SupervisionStrategy].map(_.decider).getOrElse(Supervision.stoppingDecider) + inheritedAttributes.mandatoryAttribute[SupervisionStrategy].decider override def onPush(): Unit = { try { @@ -70,7 +70,7 @@ import akka.util.OptionVal override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) with OutHandler with InHandler { - def decider = inheritedAttributes.get[SupervisionStrategy].map(_.decider).getOrElse(Supervision.stoppingDecider) + def decider = inheritedAttributes.mandatoryAttribute[SupervisionStrategy].decider override def onPush(): Unit = { try { @@ -106,7 +106,7 @@ import akka.util.OptionVal new GraphStageLogic(shape) with OutHandler with InHandler { override def toString = "TakeWhileLogic" - def decider = inheritedAttributes.get[SupervisionStrategy].map(_.decider).getOrElse(Supervision.stoppingDecider) + def decider = inheritedAttributes.mandatoryAttribute[SupervisionStrategy].decider override def onPush(): Unit = { try { @@ -167,7 +167,7 @@ import akka.util.OptionVal * INTERNAL API */ @DoNotInherit private[akka] abstract class SupervisedGraphStageLogic(inheritedAttributes: Attributes, shape: Shape) extends GraphStageLogic(shape) { - private lazy val decider = inheritedAttributes.get[SupervisionStrategy].map(_.decider).getOrElse(Supervision.stoppingDecider) + private lazy val decider = inheritedAttributes.mandatoryAttribute[SupervisionStrategy].decider def withSupervision[T](f: () ⇒ T): Option[T] = try { @@ -363,7 +363,7 @@ private[stream] object Collect { self ⇒ private var aggregator = zero - private lazy val decider = inheritedAttributes.get[SupervisionStrategy].map(_.decider).getOrElse(Supervision.stoppingDecider) + private lazy val decider = inheritedAttributes.mandatoryAttribute[SupervisionStrategy].decider import Supervision.{ Stop, Resume, Restart } import shape.{ in, out } @@ -430,7 +430,7 @@ private[stream] object Collect { private def ec = ExecutionContexts.sameThreadExecutionContext - private lazy val decider = inheritedAttributes.get[SupervisionStrategy].map(_.decider).getOrElse(Supervision.stoppingDecider) + private lazy val decider = inheritedAttributes.mandatoryAttribute[SupervisionStrategy].decider private val ZeroHandler: OutHandler with InHandler = new OutHandler with InHandler { override def onPush(): Unit = @@ -542,7 +542,7 @@ private[stream] object Collect { private var aggregator: Out = zero private def decider = - inheritedAttributes.get[SupervisionStrategy].map(_.decider).getOrElse(Supervision.stoppingDecider) + inheritedAttributes.mandatoryAttribute[SupervisionStrategy].decider override def onPush(): Unit = { val elem = grab(in) @@ -596,7 +596,7 @@ private[stream] object Collect { def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) with InHandler with OutHandler { - val decider = inheritedAttributes.get[SupervisionStrategy].map(_.decider).getOrElse(Supervision.stoppingDecider) + val decider = inheritedAttributes.mandatoryAttribute[SupervisionStrategy].decider private var aggregator: Out = zero private var aggregating: Future[Out] = Future.successful(aggregator) @@ -945,7 +945,7 @@ private[stream] object Collect { override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) with InHandler with OutHandler { - lazy val decider = inheritedAttributes.get[SupervisionStrategy].map(_.decider).getOrElse(Supervision.stoppingDecider) + lazy val decider = inheritedAttributes.mandatoryAttribute[SupervisionStrategy].decider private var agg: Out = null.asInstanceOf[Out] private var left: Long = max @@ -1166,8 +1166,8 @@ private[stream] object Collect { override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) with InHandler with OutHandler { - lazy val decider = inheritedAttributes.get[SupervisionStrategy].map(_.decider) - .getOrElse(Supervision.stoppingDecider) + lazy val decider = inheritedAttributes.mandatoryAttribute[SupervisionStrategy].decider + var buffer: BufferImpl[Holder[Out]] = _ private val futureCB = getAsyncCallback[Holder[Out]](holder ⇒ holder.elem match { @@ -1180,8 +1180,6 @@ private[stream] object Collect { } }) - private var buffer: BufferImpl[Holder[Out]] = _ - override def preStart(): Unit = buffer = BufferImpl(parallelism, materializer) override def onPull(): Unit = pushNextIfPossible() @@ -1266,7 +1264,7 @@ private[stream] object Collect { override def toString = s"MapAsyncUnordered.Logic(inFlight=$inFlight, buffer=$buffer)" val decider = - inheritedAttributes.get[SupervisionStrategy].map(_.decider).getOrElse(Supervision.stoppingDecider) + inheritedAttributes.mandatoryAttribute[SupervisionStrategy].decider private var inFlight = 0 private var buffer: BufferImpl[Out] = _ @@ -1345,7 +1343,7 @@ private[stream] object Collect { private var logLevels: LogLevels = _ private var log: LoggingAdapter = _ - def decider = inheritedAttributes.get[SupervisionStrategy].map(_.decider).getOrElse(Supervision.stoppingDecider) + def decider = inheritedAttributes.mandatoryAttribute[SupervisionStrategy].decider override def preStart(): Unit = { logLevels = inheritedAttributes.get[LogLevels](DefaultLogLevels) @@ -1596,11 +1594,8 @@ private[stream] object Collect { override def initialAttributes: Attributes = DefaultAttributes.delay override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new TimerGraphStageLogic(shape) with InHandler with OutHandler { - val size = - inheritedAttributes.get[InputBuffer] match { - case None ⇒ throw new IllegalStateException(s"Couldn't find InputBuffer Attribute for $this") - case Some(InputBuffer(min, max)) ⇒ max - } + val size = inheritedAttributes.mandatoryAttribute[InputBuffer].max + val delayMillis = d.toMillis var buffer: BufferImpl[(Long, T)] = _ // buffer has pairs timestamp with upstream element @@ -1773,7 +1768,7 @@ private[stream] object Collect { var aggregator: T = _ private def decider = - inheritedAttributes.get[SupervisionStrategy].map(_.decider).getOrElse(Supervision.stoppingDecider) + inheritedAttributes.mandatoryAttribute[SupervisionStrategy].decider def setInitialInHandler(): Unit = { // Initial input handler @@ -1887,7 +1882,7 @@ private[stream] object Collect { override def initialAttributes: Attributes = DefaultAttributes.statefulMapConcat def createLogic(inheritedAttributes: Attributes) = new GraphStageLogic(shape) with InHandler with OutHandler { - lazy val decider = inheritedAttributes.get[SupervisionStrategy].map(_.decider).getOrElse(Supervision.stoppingDecider) + lazy val decider = inheritedAttributes.mandatoryAttribute[SupervisionStrategy].decider var currentIterator: Iterator[Out] = _ var plainFun = f() diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/StreamOfStreams.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/StreamOfStreams.scala index f3cdaa5ff9..23e7a5d47e 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/StreamOfStreams.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/StreamOfStreams.scala @@ -87,7 +87,7 @@ import scala.collection.JavaConverters._ sinkIn.pull() sources += sinkIn val graph = Source.fromGraph(source).to(sinkIn.sink) - interpreter.subFusingMaterializer.materialize(graph, initialAttributes = enclosingAttributes) + interpreter.subFusingMaterializer.materialize(graph, defaultAttributes = enclosingAttributes) } def removeSource(src: SubSinkInlet[T]): Unit = { @@ -223,7 +223,7 @@ import scala.collection.JavaConverters._ override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new TimerGraphStageLogic(shape) with OutHandler with InHandler { parent ⇒ - lazy val decider = inheritedAttributes.get[SupervisionStrategy].map(_.decider).getOrElse(Supervision.stoppingDecider) + lazy val decider = inheritedAttributes.mandatoryAttribute[SupervisionStrategy].decider private val activeSubstreamsMap = new java.util.HashMap[Any, SubstreamSource]() private val closedSubstreams = new java.util.HashSet[Any]() private var timeout: FiniteDuration = _ diff --git a/akka-stream/src/main/scala/akka/stream/impl/io/IOSinks.scala b/akka-stream/src/main/scala/akka/stream/impl/io/IOSinks.scala index e1962da1e5..4eb5d366fe 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/io/IOSinks.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/io/IOSinks.scala @@ -28,10 +28,11 @@ import scala.concurrent.{ Future, Promise } override def create(context: MaterializationContext) = { val materializer = ActorMaterializerHelper.downcast(context.materializer) - val settings = materializer.effectiveSettings(context.effectiveAttributes) + + val maxInputBufferSize = context.effectiveAttributes.mandatoryAttribute[Attributes.InputBuffer].max val ioResultPromise = Promise[IOResult]() - val props = FileSubscriber.props(f, ioResultPromise, settings.maxInputBufferSize, startPosition, options) + val props = FileSubscriber.props(f, ioResultPromise, maxInputBufferSize, startPosition, options) val dispatcher = context.effectiveAttributes.get[Dispatcher](IODispatcher).dispatcher val ref = materializer.actorOf(context, props.withDispatcher(dispatcher)) @@ -54,12 +55,13 @@ import scala.concurrent.{ Future, Promise } override def create(context: MaterializationContext) = { val materializer = ActorMaterializerHelper.downcast(context.materializer) - val settings = materializer.effectiveSettings(context.effectiveAttributes) val ioResultPromise = Promise[IOResult]() val os = createOutput() // if it fails, we fail the materialization - val props = OutputStreamSubscriber.props(os, ioResultPromise, settings.maxInputBufferSize, autoFlush) + val maxInputBufferSize = context.effectiveAttributes.mandatoryAttribute[Attributes.InputBuffer].max + + val props = OutputStreamSubscriber.props(os, ioResultPromise, maxInputBufferSize, autoFlush) val ref = materializer.actorOf(context, props) (akka.stream.actor.ActorSubscriber[ByteString](ref), ioResultPromise.future) diff --git a/akka-stream/src/main/scala/akka/stream/impl/io/TLSActor.scala b/akka-stream/src/main/scala/akka/stream/impl/io/TLSActor.scala index 31316c6ccc..4ceac8c983 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/io/TLSActor.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/io/TLSActor.scala @@ -29,12 +29,12 @@ import scala.util.{ Failure, Success, Try } @InternalApi private[stream] object TLSActor { def props( - settings: ActorMaterializerSettings, - createSSLEngine: ActorSystem ⇒ SSLEngine, // ActorSystem is only needed to support the AkkaSSLConfig legacy, see #21753 - verifySession: (ActorSystem, SSLSession) ⇒ Try[Unit], // ActorSystem is only needed to support the AkkaSSLConfig legacy, see #21753 - closing: TLSClosing, - tracing: Boolean = false): Props = - Props(new TLSActor(settings, createSSLEngine, verifySession, closing, tracing)).withDeploy(Deploy.local) + maxInputBufferSize: Int, + createSSLEngine: ActorSystem ⇒ SSLEngine, // ActorSystem is only needed to support the AkkaSSLConfig legacy, see #21753 + verifySession: (ActorSystem, SSLSession) ⇒ Try[Unit], // ActorSystem is only needed to support the AkkaSSLConfig legacy, see #21753 + closing: TLSClosing, + tracing: Boolean = false): Props = + Props(new TLSActor(maxInputBufferSize, createSSLEngine, verifySession, closing, tracing)).withDeploy(Deploy.local) final val TransportIn = 0 final val TransportOut = 0 @@ -47,11 +47,11 @@ import scala.util.{ Failure, Success, Try } * INTERNAL API. */ @InternalApi private[stream] class TLSActor( - settings: ActorMaterializerSettings, - createSSLEngine: ActorSystem ⇒ SSLEngine, // ActorSystem is only needed to support the AkkaSSLConfig legacy, see #21753 - verifySession: (ActorSystem, SSLSession) ⇒ Try[Unit], // ActorSystem is only needed to support the AkkaSSLConfig legacy, see #21753 - closing: TLSClosing, - tracing: Boolean) + maxInputBufferSize: Int, + createSSLEngine: ActorSystem ⇒ SSLEngine, // ActorSystem is only needed to support the AkkaSSLConfig legacy, see #21753 + verifySession: (ActorSystem, SSLSession) ⇒ Try[Unit], // ActorSystem is only needed to support the AkkaSSLConfig legacy, see #21753 + closing: TLSClosing, + tracing: Boolean) extends Actor with ActorLogging with Pump { import TLSActor._ @@ -59,7 +59,7 @@ import scala.util.{ Failure, Success, Try } protected val outputBunch = new OutputBunch(outputCount = 2, self, this) outputBunch.markAllOutputs() - protected val inputBunch = new InputBunch(inputCount = 2, settings.maxInputBufferSize, this) { + protected val inputBunch = new InputBunch(inputCount = 2, maxInputBufferSize, this) { override def onError(input: Int, e: Throwable): Unit = fail(e) } diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala index 52217a4931..0e0dacc116 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala @@ -3,27 +3,26 @@ */ package akka.stream.scaladsl -import akka.stream.impl.Stages.DefaultAttributes -import akka.util.ConstantFun -import akka.{ Done, NotUsed } +import java.util.concurrent.CompletionStage + import akka.actor.{ ActorRef, Cancellable, Props } import akka.stream.actor.ActorPublisher +import akka.stream.impl.Stages.DefaultAttributes import akka.stream.impl.fusing.GraphStages import akka.stream.impl.fusing.GraphStages._ -import akka.stream.impl.{ EmptyPublisher, ErrorPublisher, PublisherSource, _ } +import akka.stream.impl.{ PublisherSource, _ } +import akka.stream.stage.GraphStageWithMaterializedValue import akka.stream.{ Outlet, SourceShape, _ } +import akka.util.ConstantFun +import akka.{ Done, NotUsed } import org.reactivestreams.{ Publisher, Subscriber } import scala.annotation.tailrec import scala.annotation.unchecked.uncheckedVariance import scala.collection.immutable +import scala.compat.java8.FutureConverters._ import scala.concurrent.duration.FiniteDuration import scala.concurrent.{ Future, Promise } -import java.util.concurrent.CompletionStage - -import akka.stream.stage.{ GraphStage, GraphStageWithMaterializedValue } - -import scala.compat.java8.FutureConverters._ /** * A `Source` is a set of stream processing steps that has one open output. It can comprise