Add getAttributes on Graph (#31264)

This commit is contained in:
梦境迷离 2022-03-24 22:42:27 +08:00 committed by GitHub
parent c3be49bb7b
commit f8d7e0fb34
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
11 changed files with 74 additions and 0 deletions

View file

@ -8,6 +8,8 @@ import scala.collection.immutable
import akka.NotUsed
import akka.stream.testkit.TestSubscriber.ManualProbe
import akka.stream.testkit.{ StreamSpec, TestSubscriber }
import akka.stream.Attributes
import akka.stream.Attributes.Attribute
import scala.concurrent.duration._
@ -134,4 +136,45 @@ class GraphMergePrioritizedNSpec extends StreamSpec {
.initialDelay(50.millis)
.to(Sink.fromSubscriber(probe))
}
"get Priority from graph" in {
val elementCount = 10
case class MyPriority(priority: Int) extends Attribute
val myAttributes1 = Attributes(MyPriority(6))
val myAttributes2 = Attributes(MyPriority(3))
val myAttributes3 = Attributes(MyPriority(1))
val defaultPriority = MyPriority(-1)
val source1: Source[Int, NotUsed] =
Source.fromIterator(() => Iterator.continually(1).take(elementCount)).addAttributes(myAttributes1)
val source2 = Source.fromIterator[Int](() => Iterator.empty).addAttributes(myAttributes2)
val source3: Source[Int, NotUsed] = Source.fromIterator[Int](() => Iterator.empty).addAttributes(myAttributes3)
val sourcesAndPriorities = Seq(
(source1, source1.getAttributes.get[MyPriority](defaultPriority).priority),
(source2, source2.getAttributes.get[MyPriority](defaultPriority).priority),
(source3, source3.getAttributes.get[MyPriority](defaultPriority).priority));
val probe = TestSubscriber.manualProbe[Int]()
threeSourceMerge(sourcesAndPriorities, probe).run()
val subscription = probe.expectSubscription()
var collected = Seq.empty[Int]
for (_ <- 1 to elementCount) {
subscription.request(1)
collected :+= probe.expectNext()
}
val ones = collected.count(_ == 1)
val twos = collected.count(_ == 2)
val threes = collected.count(_ == 3)
ones shouldEqual elementCount
twos shouldEqual 0
threes shouldEqual 0
}
}

View file

@ -0,0 +1,9 @@
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.Graph.getAttributes")
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.javadsl.Flow.getAttributes")
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.javadsl.BidiFlow.getAttributes")
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.javadsl.Sink.getAttributes")
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.javadsl.Source.getAttributes")
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.scaladsl.BidiFlow.getAttributes")
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.scaladsl.Sink.getAttributes")
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.scaladsl.Source.getAttributes")
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.scaladsl.RunnableGraph.getAttributes")

View file

@ -78,6 +78,9 @@ trait Graph[+S <: Shape, +M] {
* less specific than attributes set directly on the individual graphs of the composite.
*/
def addAttributes(attr: Attributes): Graph[S, M] = withAttributes(traversalBuilder.attributes and attr)
def getAttributes: Attributes = traversalBuilder.attributes
}
object Graph {

View file

@ -271,4 +271,6 @@ final class BidiFlow[I1, O1, I2, O2, Mat](delegate: scaladsl.BidiFlow[I1, O1, I2
*/
override def async(dispatcher: String, inputBufferSize: Int): BidiFlow[I1, O1, I2, O2, Mat] =
new BidiFlow(delegate.async(dispatcher, inputBufferSize))
override def getAttributes: Attributes = delegate.getAttributes
}

View file

@ -4008,6 +4008,9 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr
case Pair(predicate, duration) => (agg => predicate.test(agg), duration.asScala)
})
.asJava
override def getAttributes: Attributes = delegate.getAttributes
}
object RunnableGraph {

View file

@ -587,4 +587,6 @@ final class Sink[In, Mat](delegate: scaladsl.Sink[In, Mat]) extends Graph[SinkSh
override def async(dispatcher: String, inputBufferSize: Int): javadsl.Sink[In, Mat] =
new Sink(delegate.async(dispatcher, inputBufferSize))
override def getAttributes: Attributes = delegate.getAttributes
}

View file

@ -4576,4 +4576,7 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[
case Pair(predicate, duration) => (agg => predicate.test(agg), duration.asScala)
})
.asJava
override def getAttributes: Attributes = delegate.getAttributes
}

View file

@ -218,6 +218,8 @@ final class BidiFlow[-I1, +O1, -I2, +O2, +Mat](
override def async(dispatcher: String, inputBufferSize: Int): BidiFlow[I1, O1, I2, O2, Mat] =
super.async(dispatcher, inputBufferSize).asInstanceOf[BidiFlow[I1, O1, I2, O2, Mat]]
override def getAttributes: Attributes = traversalBuilder.attributes
}
object BidiFlow {

View file

@ -774,6 +774,8 @@ final case class RunnableGraph[+Mat](override val traversalBuilder: TraversalBui
/** Converts this Scala DSL element to it's Java DSL counterpart. */
def asJava: javadsl.RunnableGraph[Mat] = javadsl.RunnableGraph.fromGraph(this)
override def getAttributes: Attributes = traversalBuilder.attributes
}
/**

View file

@ -125,6 +125,9 @@ final class Sink[-In, +Mat](override val traversalBuilder: LinearTraversalBuilde
* Converts this Scala DSL element to it's Java DSL counterpart.
*/
def asJava[JIn <: In]: javadsl.Sink[JIn, Mat @uncheckedVariance] = new javadsl.Sink(this)
override def getAttributes: Attributes = traversalBuilder.attributes
}
object Sink {

View file

@ -238,6 +238,8 @@ final class Source[+Out, +Mat](
**/
def asSourceWithContext[Ctx](f: Out => Ctx): SourceWithContext[Out, Ctx, Mat] =
new SourceWithContext(this.map(e => (e, f(e))))
override def getAttributes: Attributes = traversalBuilder.attributes
}
object Source {