diff --git a/akka-docs/rst/project/migration-guide-2.4.x-2.5.x.rst b/akka-docs/rst/project/migration-guide-2.4.x-2.5.x.rst index 02950a54c1..f4eb50cb5b 100644 --- a/akka-docs/rst/project/migration-guide-2.4.x-2.5.x.rst +++ b/akka-docs/rst/project/migration-guide-2.4.x-2.5.x.rst @@ -285,6 +285,17 @@ stages and adds additional protocol and type-safety. You can learn all about it You should also read the blog post series on the official team blog, starting with `Mastering GraphStages, part I`_, which explains using and implementing GraphStages in more practical terms than the reference documentation. +Order of Attributes List +------------------------ + +Imporant performance improvement could be achieved by reverting the order of the ``attributesList`` in ``Attributes``. + +The ``attributeList`` is ordered with the most specific attribute first, least specific last. +Note that the order was the opposite in Akka 2.4.x (but it was not really documented). + +The semantics of the convenience methods, such as ``get`` and ``getFirst`` are the same, but if you use the ``attributesList`` +directly or via ``filtered`` or ``getAttributeList`` you need to take the new order into consideration. + .. _Mastering GraphStages, part I: http://blog.akka.io/streams/2016/07/30/mastering-graph-stage-part-1 Remote diff --git a/akka-stream-tests/src/test/java/akka/stream/javadsl/AttributesTest.java b/akka-stream-tests/src/test/java/akka/stream/javadsl/AttributesTest.java index d92f87a220..7d76685b45 100644 --- a/akka-stream-tests/src/test/java/akka/stream/javadsl/AttributesTest.java +++ b/akka-stream-tests/src/test/java/akka/stream/javadsl/AttributesTest.java @@ -35,7 +35,7 @@ public class AttributesTest extends StreamTest { @Test public void mustGetAttributesByClass() { assertEquals( - Arrays.asList(new Attributes.Name("a"), new Attributes.Name("b")), + Arrays.asList(new Attributes.Name("b"), new Attributes.Name("a")), attributes.getAttributeList(Attributes.Name.class)); assertEquals( Collections.singletonList(new Attributes.InputBuffer(1, 2)), diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowFlattenMergeSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowFlattenMergeSpec.scala index 30ad250a4f..9093c5d0a8 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowFlattenMergeSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowFlattenMergeSpec.scala @@ -178,8 +178,7 @@ class FlowFlattenMergeSpec extends StreamSpec { } setHandler(out, this) } - } - ) + }) "propagate attributes to inner streams" in assertAllStagesStopped { val f = Source.single(attributesSource.addAttributes(Attributes.name("inner"))) @@ -190,7 +189,7 @@ class FlowFlattenMergeSpec extends StreamSpec { val attributes = Await.result(f, 3.seconds).attributeList attributes should contain(Attributes.Name("inner")) attributes should contain(Attributes.Name("outer")) - attributes.indexOf(Attributes.Name("outer")) < attributes.indexOf(Attributes.Name("inner")) should be(true) + attributes.indexOf(Attributes.Name("inner")) < attributes.indexOf(Attributes.Name("outer")) should be(true) } } diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/LazySourceSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/LazySourceSpec.scala index 5bfc75b1f4..14298abf27 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/LazySourceSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/LazySourceSpec.scala @@ -102,8 +102,7 @@ class LazySourceSpec extends StreamSpec with DefaultTimeout with ScalaFutures { } setHandler(out, this) } - } - ) + }) "propagate attributes to inner streams" in assertAllStagesStopped { val f = Source.single(attributesSource.addAttributes(Attributes.name("inner"))) @@ -114,7 +113,7 @@ class LazySourceSpec extends StreamSpec with DefaultTimeout with ScalaFutures { val attributes = f.futureValue.attributeList attributes should contain(Attributes.Name("inner")) attributes should contain(Attributes.Name("outer")) - attributes.indexOf(Attributes.Name("outer")) < attributes.indexOf(Attributes.Name("inner")) should be(true) + attributes.indexOf(Attributes.Name("inner")) < attributes.indexOf(Attributes.Name("outer")) should be(true) } } diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SinkSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SinkSpec.scala index c59b0aa996..51490cfbe4 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SinkSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SinkSpec.scala @@ -128,7 +128,7 @@ class SinkSpec extends StreamSpec with DefaultTimeout with ScalaFutures { import Attributes._ val s: Sink[Int, Future[Int]] = Sink.head[Int].async.addAttributes(none).named("name") - s.traversalBuilder.attributes.filtered[Name] shouldEqual List(Name("headSink"), Name("name")) + s.traversalBuilder.attributes.filtered[Name] shouldEqual List(Name("name"), Name("headSink")) s.traversalBuilder.attributes.getFirst[AsyncBoundary.type] shouldEqual (Some(AsyncBoundary)) } @@ -136,7 +136,7 @@ class SinkSpec extends StreamSpec with DefaultTimeout with ScalaFutures { import Attributes._ val s: Sink[Int, Future[Int]] = Sink.head[Int].async.addAttributes(none).named("name") - s.traversalBuilder.attributes.filtered[Name] shouldEqual List(Name("headSink"), Name("name")) + s.traversalBuilder.attributes.filtered[Name] shouldEqual List(Name("name"), Name("headSink")) } "given one attribute of a class should correctly get it as last attribute with default value" in { diff --git a/akka-stream/src/main/scala/akka/stream/Attributes.scala b/akka-stream/src/main/scala/akka/stream/Attributes.scala index 11a3ea968d..41a0de334a 100644 --- a/akka-stream/src/main/scala/akka/stream/Attributes.scala +++ b/akka-stream/src/main/scala/akka/stream/Attributes.scala @@ -22,6 +22,9 @@ import scala.compat.java8.OptionConverters._ * or [[akka.stream.scaladsl.GraphDSL]] / [[akka.stream.javadsl.GraphDSL]] materialization. * * Note that more attributes for the [[ActorMaterializer]] are defined in [[ActorAttributes]]. + * + * The ``attributeList`` is ordered with the most specific attribute first, least specific last. + * Note that the order was the opposite in Akka 2.4.x. */ final case class Attributes(attributeList: List[Attributes.Attribute] = Nil) { @@ -40,6 +43,9 @@ final case class Attributes(attributeList: List[Attributes.Attribute] = Nil) { /** * Java API + * + * The list is ordered with the most specific attribute first, least specific last. + * Note that the order was the opposite in Akka 2.4.x. */ def getAttributeList(): java.util.List[Attribute] = { import scala.collection.JavaConverters._ @@ -49,6 +55,9 @@ final case class Attributes(attributeList: List[Attributes.Attribute] = Nil) { /** * Java API: Get all attributes of a given `Class` or * subclass thereof. + * + * The list is ordered with the most specific attribute first, least specific last. + * Note that the order was the opposite in Akka 2.4.x. */ def getAttributeList[T <: Attribute](c: Class[T]): java.util.List[T] = if (attributeList.isEmpty) java.util.Collections.emptyList() @@ -62,71 +71,77 @@ final case class Attributes(attributeList: List[Attributes.Attribute] = Nil) { } /** - * Java API: Get the last (most specific) attribute of a given `Class` or subclass thereof. + * Java API: Get the most specific attribute (added last) of a given `Class` or subclass thereof. * If no such attribute exists the `default` value is returned. */ def getAttribute[T <: Attribute](c: Class[T], default: T): T = getAttribute(c).orElse(default) /** - * Java API: Get the first (least specific) attribute of a given `Class` or subclass thereof. + * Java API: Get the least specific attribute (added first) of a given `Class` or subclass thereof. * If no such attribute exists the `default` value is returned. */ def getFirstAttribute[T <: Attribute](c: Class[T], default: T): T = getFirstAttribute(c).orElse(default) /** - * Java API: Get the last (most specific) attribute of a given `Class` or subclass thereof. + * Java API: Get the most specific attribute (added last) of a given `Class` or subclass thereof. */ def getAttribute[T <: Attribute](c: Class[T]): Optional[T] = - Optional.ofNullable(attributeList.foldLeft( - null.asInstanceOf[T])( - (acc, attr) ⇒ if (c.isInstance(attr)) c.cast(attr) else acc)) + (attributeList.collectFirst { case attr if c.isInstance(attr) ⇒ c.cast(attr) }).asJava /** - * Java API: Get the first (least specific) attribute of a given `Class` or subclass thereof. + * Java API: Get the least specific attribute (added first) of a given `Class` or subclass thereof. */ def getFirstAttribute[T <: Attribute](c: Class[T]): Optional[T] = - attributeList.collectFirst { case attr if c.isInstance(attr) ⇒ c cast attr }.asJava + attributeList.reverseIterator.collectFirst { case attr if c.isInstance(attr) ⇒ c cast attr }.asJava /** * Scala API: get all attributes of a given type (or subtypes thereof). + * + * The list is ordered with the most specific attribute first, least specific last. + * Note that the order was the opposite in Akka 2.4.x. */ def filtered[T <: Attribute: ClassTag]: List[T] = { val c = implicitly[ClassTag[T]].runtimeClass.asInstanceOf[Class[T]] - attributeList.collect { - case attr if c.isAssignableFrom(attr.getClass) ⇒ c.cast(attr) + attributeList.collect { case attr if c.isAssignableFrom(attr.getClass) ⇒ c.cast(attr) } + } + + /** + * Scala API: Get the most specific attribute (added last) of a given type parameter T `Class` or subclass thereof. + * If no such attribute exists the `default` value is returned. + */ + def get[T <: Attribute: ClassTag](default: T): T = + get[T] match { + case Some(a) ⇒ a + case None ⇒ default + } + + /** + * Scala API: Get the least specific attribute (added first) of a given type parameter T `Class` or subclass thereof. + * If no such attribute exists the `default` value is returned. + */ + def getFirst[T <: Attribute: ClassTag](default: T): T = { + getFirst[T] match { + case Some(a) ⇒ a + case None ⇒ default } } /** - * Scala API: Get the last (most specific) attribute of a given type parameter T `Class` or subclass thereof. - * If no such attribute exists the `default` value is returned. - */ - def get[T <: Attribute: ClassTag](default: T): T = - getAttribute(classTag[T].runtimeClass.asInstanceOf[Class[T]], default) - - /** - * Scala API: Get the first (least specific) attribute of a given type parameter T `Class` or subclass thereof. - * If no such attribute exists the `default` value is returned. - */ - def getFirst[T <: Attribute: ClassTag](default: T): T = - getFirstAttribute(classTag[T].runtimeClass.asInstanceOf[Class[T]], default) - - /** - * Scala API: Get the last (most specific) attribute of a given type parameter T `Class` or subclass thereof. + * Scala API: Get the most specific attribute (added last) of a given type parameter T `Class` or subclass thereof. */ def get[T <: Attribute: ClassTag]: Option[T] = { val c = classTag[T].runtimeClass.asInstanceOf[Class[T]] - attributeList.reverseIterator.collectFirst[T] { case attr if c.isInstance(attr) ⇒ c.cast(attr) } + attributeList.collectFirst { case attr if c.isInstance(attr) ⇒ c.cast(attr) } } /** - * Scala API: Get the first (least specific) attribute of a given type parameter T `Class` or subclass thereof. + * Scala API: Get the least specific attribute (added first) of a given type parameter T `Class` or subclass thereof. */ def getFirst[T <: Attribute: ClassTag]: Option[T] = { val c = classTag[T].runtimeClass.asInstanceOf[Class[T]] - attributeList.collectFirst { case attr if c.isInstance(attr) ⇒ c.cast(attr) } + attributeList.reverseIterator.collectFirst { case attr if c.isInstance(attr) ⇒ c.cast(attr) } } /** @@ -135,18 +150,22 @@ final case class Attributes(attributeList: List[Attributes.Attribute] = Nil) { def contains(attr: Attribute): Boolean = attributeList.contains(attr) /** - * Adds given attributes to the end of these attributes. + * Adds given attributes. Added attributes are considered more specific than + * already existing attributes of the same type. */ - def and(other: Attributes): Attributes = + def and(other: Attributes): Attributes = { if (attributeList.isEmpty) other else if (other.attributeList.isEmpty) this - else Attributes(attributeList ::: other.attributeList) + else if (other.attributeList.tail.isEmpty) Attributes(other.attributeList.head :: attributeList) + else Attributes(other.attributeList ::: attributeList) + } /** - * Adds given attribute to the end of these attributes. + * Adds given attribute. Added attribute is considered more specific than + * already existing attributes of the same type. */ def and(other: Attribute): Attributes = - Attributes(attributeList :+ other) + Attributes(other :: attributeList) /** * Extracts Name attributes and concatenates them. @@ -173,7 +192,7 @@ final case class Attributes(attributeList: List[Attributes.Attribute] = Nil) { else if (buf eq null) first else buf.toString - concatNames(attributeList.iterator, null, null) match { + concatNames(attributeList.reverseIterator, null, null) match { case null ⇒ default case some ⇒ some } @@ -201,7 +220,7 @@ object Attributes { * INTERNAL API */ def apply(attribute: Attribute): Attributes = - apply(List(attribute)) + apply(attribute :: Nil) val none: Attributes = Attributes() diff --git a/akka-stream/src/main/scala/akka/stream/impl/PhasedFusingActorMaterializer.scala b/akka-stream/src/main/scala/akka/stream/impl/PhasedFusingActorMaterializer.scala index 0b06c647ac..db9751089f 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/PhasedFusingActorMaterializer.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/PhasedFusingActorMaterializer.scala @@ -356,23 +356,35 @@ case class PhasedFusingActorMaterializer( private[this] def createFlowName(): String = flowNames.next() - private val defaultInitialAttributes = Attributes( - Attributes.InputBuffer(settings.initialInputBufferSize, settings.maxInputBufferSize) :: - ActorAttributes.Dispatcher(settings.dispatcher) :: - ActorAttributes.SupervisionStrategy(settings.supervisionDecider) :: - Nil) + private val defaultInitialAttributes = { + val a = Attributes( + Attributes.InputBuffer(settings.initialInputBufferSize, settings.maxInputBufferSize) :: + ActorAttributes.SupervisionStrategy(settings.supervisionDecider) :: + Nil) + if (settings.dispatcher == Deploy.NoDispatcherGiven) a + else a and ActorAttributes.dispatcher(settings.dispatcher) + } override def effectiveSettings(opAttr: Attributes): ActorMaterializerSettings = { import ActorAttributes._ import Attributes._ - opAttr.attributeList.foldLeft(settings) { (s, attr) ⇒ - attr match { - case InputBuffer(initial, max) ⇒ s.withInputBuffer(initial, max) - case Dispatcher(dispatcher) ⇒ s.withDispatcher(dispatcher) - case SupervisionStrategy(decider) ⇒ s.withSupervisionStrategy(decider) - case _ ⇒ s + @tailrec def applyAttributes(attrs: List[Attribute], s: ActorMaterializerSettings, + inputBufferDone: Boolean, dispatcherDone: Boolean, supervisorDone: Boolean): ActorMaterializerSettings = { + attrs match { + case InputBuffer(initial, max) :: tail if !inputBufferDone ⇒ + applyAttributes(tail, s.withInputBuffer(initial, max), inputBufferDone = true, dispatcherDone, supervisorDone) + case Dispatcher(dispatcher) :: tail if !dispatcherDone ⇒ + applyAttributes(tail, s.withDispatcher(dispatcher), inputBufferDone, dispatcherDone = true, supervisorDone) + case SupervisionStrategy(decider) :: tail if !supervisorDone ⇒ + applyAttributes(tail, s.withSupervisionStrategy(decider), inputBufferDone, dispatcherDone, supervisorDone = true) + case _ if inputBufferDone || dispatcherDone || supervisorDone ⇒ s + case _ :: tail ⇒ + applyAttributes(tail, s, inputBufferDone, dispatcherDone, supervisorDone) + case Nil ⇒ + s } } + applyAttributes(opAttr.attributeList, settings, false, false, false) } override lazy val executionContext: ExecutionContextExecutor = dispatchers.lookup(settings.dispatcher match {