Merge pull request #19711 from johanandren/wip-19710-scala-types-attributes-java-api-johanandren
!str #19710 Use Java types in Attributes Java API
This commit is contained in:
commit
f042204d8b
4 changed files with 66 additions and 21 deletions
|
|
@ -7,6 +7,7 @@ import static org.junit.Assert.assertEquals;
|
||||||
|
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
|
import java.util.Optional;
|
||||||
|
|
||||||
import org.junit.ClassRule;
|
import org.junit.ClassRule;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
@ -47,4 +48,32 @@ public class AttributesTest extends StreamTest {
|
||||||
attributes.getAttribute(Attributes.Name.class, new Attributes.Name("default")));
|
attributes.getAttribute(Attributes.Name.class, new Attributes.Name("default")));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void mustGetMissingAttributeByClass() {
|
||||||
|
assertEquals(
|
||||||
|
Optional.empty(),
|
||||||
|
attributes.getAttribute(Attributes.LogLevels.class));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void mustGetPossiblyMissingAttributeByClass() {
|
||||||
|
assertEquals(
|
||||||
|
Optional.of(new Attributes.Name("b")),
|
||||||
|
attributes.getAttribute(Attributes.Name.class));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void mustGetPossiblyMissingFirstAttributeByClass() {
|
||||||
|
assertEquals(
|
||||||
|
Optional.of(new Attributes.Name("a")),
|
||||||
|
attributes.getFirstAttribute(Attributes.Name.class));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void mustGetMissingFirstAttributeByClass() {
|
||||||
|
assertEquals(
|
||||||
|
Optional.empty(),
|
||||||
|
attributes.getFirstAttribute(Attributes.LogLevels.class));
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -60,6 +60,17 @@ class AttributesSpec extends AkkaSpec with ScalaFutures {
|
||||||
attributes.get[Name] should contain(Name("attributesSink"))
|
attributes.get[Name] should contain(Name("attributesSink"))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
val attributes = Attributes.name("a") and Attributes.name("b") and Attributes.inputBuffer(1, 2)
|
||||||
|
|
||||||
|
"give access to first attribute" in {
|
||||||
|
attributes.getFirst[Name] should ===(Some(Attributes.Name("a")))
|
||||||
|
}
|
||||||
|
|
||||||
|
"give access to attribute byt type" in {
|
||||||
|
attributes.get[Name] should ===(Some(Attributes.Name("b")))
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -3,12 +3,15 @@
|
||||||
*/
|
*/
|
||||||
package akka.stream
|
package akka.stream
|
||||||
|
|
||||||
|
import java.util.Optional
|
||||||
|
|
||||||
import akka.event.Logging
|
import akka.event.Logging
|
||||||
import scala.annotation.tailrec
|
import scala.annotation.tailrec
|
||||||
import scala.reflect.{ classTag, ClassTag }
|
import scala.reflect.{ classTag, ClassTag }
|
||||||
import akka.japi.function
|
import akka.japi.function
|
||||||
import akka.stream.impl.StreamLayout._
|
import akka.stream.impl.StreamLayout._
|
||||||
import java.net.URLEncoder
|
import java.net.URLEncoder
|
||||||
|
import scala.compat.java8.OptionConverters._
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Holds attributes which can be used to alter [[akka.stream.scaladsl.Flow]] / [[akka.stream.javadsl.Flow]]
|
* Holds attributes which can be used to alter [[akka.stream.scaladsl.Flow]] / [[akka.stream.javadsl.Flow]]
|
||||||
|
|
@ -48,32 +51,30 @@ final case class Attributes(attributeList: List[Attributes.Attribute] = Nil) {
|
||||||
* If no such attribute exists the `default` value is returned.
|
* If no such attribute exists the `default` value is returned.
|
||||||
*/
|
*/
|
||||||
def getAttribute[T <: Attribute](c: Class[T], default: T): T =
|
def getAttribute[T <: Attribute](c: Class[T], default: T): T =
|
||||||
getAttribute(c) match {
|
getAttribute(c).orElse(default)
|
||||||
case Some(a) ⇒ a
|
|
||||||
case None ⇒ default
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Java API: Get the first (least specific) attribute of a given `Class` or subclass thereof.
|
* Java API: Get the first (least specific) attribute of a given `Class` or subclass thereof.
|
||||||
* If no such attribute exists the `default` value is returned.
|
* If no such attribute exists the `default` value is returned.
|
||||||
*/
|
*/
|
||||||
def getFirstAttribute[T <: Attribute](c: Class[T], default: T): T =
|
def getFirstAttribute[T <: Attribute](c: Class[T], default: T): T =
|
||||||
getFirstAttribute(c) match {
|
getFirstAttribute(c).orElse(default)
|
||||||
case Some(a) ⇒ a
|
|
||||||
case None ⇒ default
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Java API: Get the last (most specific) attribute of a given `Class` or subclass thereof.
|
* Java API: Get the last (most specific) attribute of a given `Class` or subclass thereof.
|
||||||
*/
|
*/
|
||||||
def getAttribute[T <: Attribute](c: Class[T]): Option[T] =
|
def getAttribute[T <: Attribute](c: Class[T]): Optional[T] =
|
||||||
Option(attributeList.foldLeft(null.asInstanceOf[T])((acc, attr) ⇒ if (c.isInstance(attr)) c.cast(attr) else acc))
|
Optional.ofNullable(attributeList.foldLeft(
|
||||||
|
null.asInstanceOf[T]
|
||||||
|
)(
|
||||||
|
(acc, attr) ⇒ if (c.isInstance(attr)) c.cast(attr) else acc)
|
||||||
|
)
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Java API: Get the first (least specific) attribute of a given `Class` or subclass thereof.
|
* Java API: Get the first (least specific) attribute of a given `Class` or subclass thereof.
|
||||||
*/
|
*/
|
||||||
def getFirstAttribute[T <: Attribute](c: Class[T]): Option[T] =
|
def getFirstAttribute[T <: Attribute](c: Class[T]): Optional[T] =
|
||||||
attributeList.find(c isInstance _).map(c cast _)
|
attributeList.collectFirst { case attr if c.isInstance(attr) => c cast attr }.asJava
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Scala API: get all attributes of a given type (or subtypes thereof).
|
* Scala API: get all attributes of a given type (or subtypes thereof).
|
||||||
|
|
@ -81,7 +82,7 @@ final case class Attributes(attributeList: List[Attributes.Attribute] = Nil) {
|
||||||
def filtered[T <: Attribute: ClassTag]: List[T] = {
|
def filtered[T <: Attribute: ClassTag]: List[T] = {
|
||||||
val c = implicitly[ClassTag[T]].runtimeClass.asInstanceOf[Class[T]]
|
val c = implicitly[ClassTag[T]].runtimeClass.asInstanceOf[Class[T]]
|
||||||
attributeList.collect {
|
attributeList.collect {
|
||||||
case a if c.isAssignableFrom(a.getClass) ⇒ c.cast(a)
|
case attr if c.isAssignableFrom(attr.getClass) ⇒ c.cast(attr)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -102,14 +103,18 @@ final case class Attributes(attributeList: List[Attributes.Attribute] = Nil) {
|
||||||
/**
|
/**
|
||||||
* Scala API: Get the last (most specific) attribute of a given type parameter T `Class` or subclass thereof.
|
* Scala API: Get the last (most specific) attribute of a given type parameter T `Class` or subclass thereof.
|
||||||
*/
|
*/
|
||||||
def get[T <: Attribute: ClassTag]: Option[T] =
|
def get[T <: Attribute: ClassTag]: Option[T] = {
|
||||||
getAttribute(classTag[T].runtimeClass.asInstanceOf[Class[T]])
|
val c = classTag[T].runtimeClass.asInstanceOf[Class[T]]
|
||||||
|
attributeList.reverseIterator.collectFirst[T] { case attr if c.isInstance(attr) => c.cast(attr) }
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Scala API: Get the first (least specific) attribute of a given type parameter T `Class` or subclass thereof.
|
* Scala API: Get the first (least specific) attribute of a given type parameter T `Class` or subclass thereof.
|
||||||
*/
|
*/
|
||||||
def getFirst[T <: Attribute: ClassTag]: Option[T] =
|
def getFirst[T <: Attribute: ClassTag]: Option[T] = {
|
||||||
getFirstAttribute(classTag[T].runtimeClass.asInstanceOf[Class[T]])
|
val c = classTag[T].runtimeClass.asInstanceOf[Class[T]]
|
||||||
|
attributeList.collectFirst { case attr if c.isInstance(attr) => c.cast(attr) }
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Test whether the given attribute is contained within this attributes list.
|
* Test whether the given attribute is contained within this attributes list.
|
||||||
|
|
|
||||||
|
|
@ -445,7 +445,7 @@ private[akka] final case class Batch[In, Out](max: Long, costFn: In ⇒ Long, se
|
||||||
|
|
||||||
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) {
|
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) {
|
||||||
|
|
||||||
val decider = inheritedAttributes.getAttribute(classOf[SupervisionStrategy]).map(_.decider).getOrElse(Supervision.stoppingDecider)
|
val decider = inheritedAttributes.get[SupervisionStrategy].map(_.decider).getOrElse(Supervision.stoppingDecider)
|
||||||
|
|
||||||
private var agg: Out = null.asInstanceOf[Out]
|
private var agg: Out = null.asInstanceOf[Out]
|
||||||
private var left: Long = max
|
private var left: Long = max
|
||||||
|
|
@ -637,7 +637,7 @@ private[akka] final case class MapAsync[In, Out](parallelism: Int, f: In ⇒ Fut
|
||||||
override def toString = s"MapAsync.Logic(buffer=$buffer)"
|
override def toString = s"MapAsync.Logic(buffer=$buffer)"
|
||||||
|
|
||||||
//FIXME Put Supervision.stoppingDecider as a SupervisionStrategy on DefaultAttributes.mapAsync?
|
//FIXME Put Supervision.stoppingDecider as a SupervisionStrategy on DefaultAttributes.mapAsync?
|
||||||
val decider = inheritedAttributes.getAttribute(classOf[SupervisionStrategy]).map(_.decider).getOrElse(Supervision.stoppingDecider)
|
val decider = inheritedAttributes.get[SupervisionStrategy].map(_.decider).getOrElse(Supervision.stoppingDecider)
|
||||||
|
|
||||||
val buffer = new BoundedBuffer[Holder[Try[Out]]](parallelism)
|
val buffer = new BoundedBuffer[Holder[Try[Out]]](parallelism)
|
||||||
def todo = buffer.used
|
def todo = buffer.used
|
||||||
|
|
@ -715,7 +715,7 @@ private[akka] final case class MapAsyncUnordered[In, Out](parallelism: Int, f: I
|
||||||
override def toString = s"MapAsyncUnordered.Logic(inFlight=$inFlight, buffer=$buffer)"
|
override def toString = s"MapAsyncUnordered.Logic(inFlight=$inFlight, buffer=$buffer)"
|
||||||
|
|
||||||
val decider =
|
val decider =
|
||||||
inheritedAttributes.getAttribute(classOf[SupervisionStrategy])
|
inheritedAttributes.get[SupervisionStrategy]
|
||||||
.map(_.decider).getOrElse(Supervision.stoppingDecider)
|
.map(_.decider).getOrElse(Supervision.stoppingDecider)
|
||||||
|
|
||||||
var inFlight = 0
|
var inFlight = 0
|
||||||
|
|
@ -953,7 +953,7 @@ private[stream] final class Delay[T](d: FiniteDuration, strategy: DelayOverflowS
|
||||||
override def initialAttributes: Attributes = DefaultAttributes.delay
|
override def initialAttributes: Attributes = DefaultAttributes.delay
|
||||||
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new TimerGraphStageLogic(shape) {
|
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new TimerGraphStageLogic(shape) {
|
||||||
val size =
|
val size =
|
||||||
inheritedAttributes.getAttribute(classOf[InputBuffer]) match {
|
inheritedAttributes.get[InputBuffer] match {
|
||||||
case None ⇒ throw new IllegalStateException(s"Couldn't find InputBuffer Attribute for $this")
|
case None ⇒ throw new IllegalStateException(s"Couldn't find InputBuffer Attribute for $this")
|
||||||
case Some(InputBuffer(min, max)) ⇒ max
|
case Some(InputBuffer(min, max)) ⇒ max
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue