diff --git a/akka-actor/src/main/scala/akka/actor/dungeon/DeathWatch.scala b/akka-actor/src/main/scala/akka/actor/dungeon/DeathWatch.scala index dbeaf3a7a6..cf49c7f157 100644 --- a/akka-actor/src/main/scala/akka/actor/dungeon/DeathWatch.scala +++ b/akka-actor/src/main/scala/akka/actor/dungeon/DeathWatch.scala @@ -4,9 +4,9 @@ package akka.actor.dungeon -import akka.dispatch.sysmsg.{Unwatch, Watch, DeathWatchNotification} -import akka.event.Logging.{Warning, Debug} -import akka.actor.{InternalActorRef, Address, Terminated, Actor, ActorRefScope, ActorCell, ActorRef, MinimalActorRef} +import akka.dispatch.sysmsg.{ Unwatch, Watch, DeathWatchNotification } +import akka.event.Logging.{ Warning, Debug } +import akka.actor.{ InternalActorRef, Address, Terminated, Actor, ActorRefScope, ActorCell, ActorRef, MinimalActorRef } import akka.event.AddressTerminatedTopic private[akka] trait DeathWatch { this: ActorCell ⇒ diff --git a/akka-stream-tests/src/test/java/akka/stream/javadsl/AttributesTest.java b/akka-stream-tests/src/test/java/akka/stream/javadsl/AttributesTest.java index b298dd6deb..ed133e47ae 100644 --- a/akka-stream-tests/src/test/java/akka/stream/javadsl/AttributesTest.java +++ b/akka-stream-tests/src/test/java/akka/stream/javadsl/AttributesTest.java @@ -7,6 +7,7 @@ import static org.junit.Assert.assertEquals; import java.util.Arrays; import java.util.Collections; +import java.util.Optional; import org.junit.ClassRule; import org.junit.Test; @@ -47,4 +48,32 @@ public class AttributesTest extends StreamTest { attributes.getAttribute(Attributes.Name.class, new Attributes.Name("default"))); } + @Test + public void mustGetMissingAttributeByClass() { + assertEquals( + Optional.empty(), + attributes.getAttribute(Attributes.LogLevels.class)); + } + + @Test + public void mustGetPossiblyMissingAttributeByClass() { + assertEquals( + Optional.of(new Attributes.Name("b")), + attributes.getAttribute(Attributes.Name.class)); + } + + @Test + public void mustGetPossiblyMissingFirstAttributeByClass() { + assertEquals( + Optional.of(new Attributes.Name("a")), + attributes.getFirstAttribute(Attributes.Name.class)); + } + + @Test + public void mustGetMissingFirstAttributeByClass() { + assertEquals( + Optional.empty(), + attributes.getFirstAttribute(Attributes.LogLevels.class)); + } + } diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/AttributesSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/AttributesSpec.scala index 1b354abf65..19e0f9e5cb 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/AttributesSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/AttributesSpec.scala @@ -60,6 +60,17 @@ class AttributesSpec extends AkkaSpec with ScalaFutures { attributes.get[Name] should contain(Name("attributesSink")) } } + + val attributes = Attributes.name("a") and Attributes.name("b") and Attributes.inputBuffer(1, 2) + + "give access to first attribute" in { + attributes.getFirst[Name] should ===(Some(Attributes.Name("a"))) + } + + "give access to attribute byt type" in { + attributes.get[Name] should ===(Some(Attributes.Name("b"))) + } + } } diff --git a/akka-stream/src/main/scala/akka/stream/Attributes.scala b/akka-stream/src/main/scala/akka/stream/Attributes.scala index f895e731cb..bfc82b2212 100644 --- a/akka-stream/src/main/scala/akka/stream/Attributes.scala +++ b/akka-stream/src/main/scala/akka/stream/Attributes.scala @@ -3,12 +3,15 @@ */ package akka.stream +import java.util.Optional + import akka.event.Logging import scala.annotation.tailrec import scala.reflect.{ classTag, ClassTag } import akka.japi.function import akka.stream.impl.StreamLayout._ import java.net.URLEncoder +import scala.compat.java8.OptionConverters._ /** * Holds attributes which can be used to alter [[akka.stream.scaladsl.Flow]] / [[akka.stream.javadsl.Flow]] @@ -48,32 +51,30 @@ final case class Attributes(attributeList: List[Attributes.Attribute] = Nil) { * If no such attribute exists the `default` value is returned. */ def getAttribute[T <: Attribute](c: Class[T], default: T): T = - getAttribute(c) match { - case Some(a) ⇒ a - case None ⇒ default - } + getAttribute(c).orElse(default) /** * Java API: Get the first (least specific) attribute of a given `Class` or subclass thereof. * If no such attribute exists the `default` value is returned. */ def getFirstAttribute[T <: Attribute](c: Class[T], default: T): T = - getFirstAttribute(c) match { - case Some(a) ⇒ a - case None ⇒ default - } + getFirstAttribute(c).orElse(default) /** * Java API: Get the last (most specific) attribute of a given `Class` or subclass thereof. */ - def getAttribute[T <: Attribute](c: Class[T]): Option[T] = - Option(attributeList.foldLeft(null.asInstanceOf[T])((acc, attr) ⇒ if (c.isInstance(attr)) c.cast(attr) else acc)) + def getAttribute[T <: Attribute](c: Class[T]): Optional[T] = + Optional.ofNullable(attributeList.foldLeft( + null.asInstanceOf[T] + )( + (acc, attr) ⇒ if (c.isInstance(attr)) c.cast(attr) else acc) + ) /** * Java API: Get the first (least specific) attribute of a given `Class` or subclass thereof. */ - def getFirstAttribute[T <: Attribute](c: Class[T]): Option[T] = - attributeList.find(c isInstance _).map(c cast _) + def getFirstAttribute[T <: Attribute](c: Class[T]): Optional[T] = + attributeList.collectFirst { case attr if c.isInstance(attr) => c cast attr }.asJava /** * Scala API: get all attributes of a given type (or subtypes thereof). @@ -81,7 +82,7 @@ final case class Attributes(attributeList: List[Attributes.Attribute] = Nil) { def filtered[T <: Attribute: ClassTag]: List[T] = { val c = implicitly[ClassTag[T]].runtimeClass.asInstanceOf[Class[T]] attributeList.collect { - case a if c.isAssignableFrom(a.getClass) ⇒ c.cast(a) + case attr if c.isAssignableFrom(attr.getClass) ⇒ c.cast(attr) } } @@ -102,14 +103,18 @@ final case class Attributes(attributeList: List[Attributes.Attribute] = Nil) { /** * Scala API: Get the last (most specific) attribute of a given type parameter T `Class` or subclass thereof. */ - def get[T <: Attribute: ClassTag]: Option[T] = - getAttribute(classTag[T].runtimeClass.asInstanceOf[Class[T]]) + def get[T <: Attribute: ClassTag]: Option[T] = { + val c = classTag[T].runtimeClass.asInstanceOf[Class[T]] + attributeList.reverseIterator.collectFirst[T] { case attr if c.isInstance(attr) => c.cast(attr) } + } /** * Scala API: Get the first (least specific) attribute of a given type parameter T `Class` or subclass thereof. */ - def getFirst[T <: Attribute: ClassTag]: Option[T] = - getFirstAttribute(classTag[T].runtimeClass.asInstanceOf[Class[T]]) + def getFirst[T <: Attribute: ClassTag]: Option[T] = { + val c = classTag[T].runtimeClass.asInstanceOf[Class[T]] + attributeList.collectFirst { case attr if c.isInstance(attr) => c.cast(attr) } + } /** * Test whether the given attribute is contained within this attributes list. diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala index 669bff29c6..2f5e06a01d 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala @@ -445,7 +445,7 @@ private[akka] final case class Batch[In, Out](max: Long, costFn: In ⇒ Long, se override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) { - val decider = inheritedAttributes.getAttribute(classOf[SupervisionStrategy]).map(_.decider).getOrElse(Supervision.stoppingDecider) + val decider = inheritedAttributes.get[SupervisionStrategy].map(_.decider).getOrElse(Supervision.stoppingDecider) private var agg: Out = null.asInstanceOf[Out] private var left: Long = max @@ -635,7 +635,7 @@ private[akka] final case class MapAsync[In, Out](parallelism: Int, f: In ⇒ Fut override def toString = s"MapAsync.Logic(buffer=$buffer)" //FIXME Put Supervision.stoppingDecider as a SupervisionStrategy on DefaultAttributes.mapAsync? - val decider = inheritedAttributes.getAttribute(classOf[SupervisionStrategy]).map(_.decider).getOrElse(Supervision.stoppingDecider) + val decider = inheritedAttributes.get[SupervisionStrategy].map(_.decider).getOrElse(Supervision.stoppingDecider) val buffer = new BoundedBuffer[Holder[Try[Out]]](parallelism) def todo = buffer.used @@ -713,7 +713,7 @@ private[akka] final case class MapAsyncUnordered[In, Out](parallelism: Int, f: I override def toString = s"MapAsyncUnordered.Logic(inFlight=$inFlight, buffer=$buffer)" val decider = - inheritedAttributes.getAttribute(classOf[SupervisionStrategy]) + inheritedAttributes.get[SupervisionStrategy] .map(_.decider).getOrElse(Supervision.stoppingDecider) var inFlight = 0 @@ -951,7 +951,7 @@ private[stream] final class Delay[T](d: FiniteDuration, strategy: DelayOverflowS override def initialAttributes: Attributes = DefaultAttributes.delay override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new TimerGraphStageLogic(shape) { val size = - inheritedAttributes.getAttribute(classOf[InputBuffer]) match { + inheritedAttributes.get[InputBuffer] match { case None ⇒ throw new IllegalStateException(s"Couldn't find InputBuffer Attribute for $this") case Some(InputBuffer(min, max)) ⇒ max }