From f8d7e0fb34c93750eeac37f2af328d0009ae866f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=A2=A6=E5=A2=83=E8=BF=B7=E7=A6=BB?= <568845948@qq.com> Date: Thu, 24 Mar 2022 22:42:27 +0800 Subject: [PATCH] Add `getAttributes` on `Graph` (#31264) --- .../scaladsl/GraphMergePrioritizedNSpec.scala | 43 +++++++++++++++++++ .../issue-31142-Attributes.excludes | 9 ++++ .../src/main/scala/akka/stream/Graph.scala | 3 ++ .../scala/akka/stream/javadsl/BidiFlow.scala | 2 + .../main/scala/akka/stream/javadsl/Flow.scala | 3 ++ .../main/scala/akka/stream/javadsl/Sink.scala | 2 + .../scala/akka/stream/javadsl/Source.scala | 3 ++ .../scala/akka/stream/scaladsl/BidiFlow.scala | 2 + .../scala/akka/stream/scaladsl/Flow.scala | 2 + .../scala/akka/stream/scaladsl/Sink.scala | 3 ++ .../scala/akka/stream/scaladsl/Source.scala | 2 + 11 files changed, 74 insertions(+) create mode 100644 akka-stream/src/main/mima-filters/2.6.18.backwards.excludes/issue-31142-Attributes.excludes diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphMergePrioritizedNSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphMergePrioritizedNSpec.scala index 2d9f192572..ea91027031 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphMergePrioritizedNSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphMergePrioritizedNSpec.scala @@ -8,6 +8,8 @@ import scala.collection.immutable import akka.NotUsed import akka.stream.testkit.TestSubscriber.ManualProbe import akka.stream.testkit.{ StreamSpec, TestSubscriber } +import akka.stream.Attributes +import akka.stream.Attributes.Attribute import scala.concurrent.duration._ @@ -134,4 +136,45 @@ class GraphMergePrioritizedNSpec extends StreamSpec { .initialDelay(50.millis) .to(Sink.fromSubscriber(probe)) } + + "get Priority from graph" in { + val elementCount = 10 + case class MyPriority(priority: Int) extends Attribute + + val myAttributes1 = Attributes(MyPriority(6)) + val myAttributes2 = Attributes(MyPriority(3)) + val myAttributes3 = Attributes(MyPriority(1)) + + val defaultPriority = MyPriority(-1) + + val source1: Source[Int, NotUsed] = + Source.fromIterator(() => Iterator.continually(1).take(elementCount)).addAttributes(myAttributes1) + val source2 = Source.fromIterator[Int](() => Iterator.empty).addAttributes(myAttributes2) + val source3: Source[Int, NotUsed] = Source.fromIterator[Int](() => Iterator.empty).addAttributes(myAttributes3) + + val sourcesAndPriorities = Seq( + (source1, source1.getAttributes.get[MyPriority](defaultPriority).priority), + (source2, source2.getAttributes.get[MyPriority](defaultPriority).priority), + (source3, source3.getAttributes.get[MyPriority](defaultPriority).priority)); + + val probe = TestSubscriber.manualProbe[Int]() + + threeSourceMerge(sourcesAndPriorities, probe).run() + + val subscription = probe.expectSubscription() + + var collected = Seq.empty[Int] + for (_ <- 1 to elementCount) { + subscription.request(1) + collected :+= probe.expectNext() + } + + val ones = collected.count(_ == 1) + val twos = collected.count(_ == 2) + val threes = collected.count(_ == 3) + + ones shouldEqual elementCount + twos shouldEqual 0 + threes shouldEqual 0 + } } diff --git a/akka-stream/src/main/mima-filters/2.6.18.backwards.excludes/issue-31142-Attributes.excludes b/akka-stream/src/main/mima-filters/2.6.18.backwards.excludes/issue-31142-Attributes.excludes new file mode 100644 index 0000000000..2cd97143e4 --- /dev/null +++ b/akka-stream/src/main/mima-filters/2.6.18.backwards.excludes/issue-31142-Attributes.excludes @@ -0,0 +1,9 @@ +ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.Graph.getAttributes") +ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.javadsl.Flow.getAttributes") +ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.javadsl.BidiFlow.getAttributes") +ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.javadsl.Sink.getAttributes") +ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.javadsl.Source.getAttributes") +ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.scaladsl.BidiFlow.getAttributes") +ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.scaladsl.Sink.getAttributes") +ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.scaladsl.Source.getAttributes") +ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.scaladsl.RunnableGraph.getAttributes") diff --git a/akka-stream/src/main/scala/akka/stream/Graph.scala b/akka-stream/src/main/scala/akka/stream/Graph.scala index 3220ba0cdc..bf134543bc 100644 --- a/akka-stream/src/main/scala/akka/stream/Graph.scala +++ b/akka-stream/src/main/scala/akka/stream/Graph.scala @@ -78,6 +78,9 @@ trait Graph[+S <: Shape, +M] { * less specific than attributes set directly on the individual graphs of the composite. */ def addAttributes(attr: Attributes): Graph[S, M] = withAttributes(traversalBuilder.attributes and attr) + + def getAttributes: Attributes = traversalBuilder.attributes + } object Graph { diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/BidiFlow.scala b/akka-stream/src/main/scala/akka/stream/javadsl/BidiFlow.scala index 267d0e9fea..b4fe97030c 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/BidiFlow.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/BidiFlow.scala @@ -271,4 +271,6 @@ final class BidiFlow[I1, O1, I2, O2, Mat](delegate: scaladsl.BidiFlow[I1, O1, I2 */ override def async(dispatcher: String, inputBufferSize: Int): BidiFlow[I1, O1, I2, O2, Mat] = new BidiFlow(delegate.async(dispatcher, inputBufferSize)) + + override def getAttributes: Attributes = delegate.getAttributes } diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala index 116fe57ab1..2ae85e0c78 100755 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala @@ -4008,6 +4008,9 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr case Pair(predicate, duration) => (agg => predicate.test(agg), duration.asScala) }) .asJava + + override def getAttributes: Attributes = delegate.getAttributes + } object RunnableGraph { diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala index a6951202bf..ea34e588b7 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala @@ -587,4 +587,6 @@ final class Sink[In, Mat](delegate: scaladsl.Sink[In, Mat]) extends Graph[SinkSh override def async(dispatcher: String, inputBufferSize: Int): javadsl.Sink[In, Mat] = new Sink(delegate.async(dispatcher, inputBufferSize)) + override def getAttributes: Attributes = delegate.getAttributes + } diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala index 486e913afb..b9f5ca936f 100755 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala @@ -4576,4 +4576,7 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[ case Pair(predicate, duration) => (agg => predicate.test(agg), duration.asScala) }) .asJava + + override def getAttributes: Attributes = delegate.getAttributes + } diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/BidiFlow.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/BidiFlow.scala index af9bcbfd65..3d447c9fa9 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/BidiFlow.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/BidiFlow.scala @@ -218,6 +218,8 @@ final class BidiFlow[-I1, +O1, -I2, +O2, +Mat]( override def async(dispatcher: String, inputBufferSize: Int): BidiFlow[I1, O1, I2, O2, Mat] = super.async(dispatcher, inputBufferSize).asInstanceOf[BidiFlow[I1, O1, I2, O2, Mat]] + override def getAttributes: Attributes = traversalBuilder.attributes + } object BidiFlow { diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala index 27119fa70a..16ea4cb0a2 100755 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala @@ -774,6 +774,8 @@ final case class RunnableGraph[+Mat](override val traversalBuilder: TraversalBui /** Converts this Scala DSL element to it's Java DSL counterpart. */ def asJava: javadsl.RunnableGraph[Mat] = javadsl.RunnableGraph.fromGraph(this) + + override def getAttributes: Attributes = traversalBuilder.attributes } /** diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala index 4fae6944af..9568675483 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala @@ -125,6 +125,9 @@ final class Sink[-In, +Mat](override val traversalBuilder: LinearTraversalBuilde * Converts this Scala DSL element to it's Java DSL counterpart. */ def asJava[JIn <: In]: javadsl.Sink[JIn, Mat @uncheckedVariance] = new javadsl.Sink(this) + + override def getAttributes: Attributes = traversalBuilder.attributes + } object Sink { diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala index 2923f18934..9365a2fe7e 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala @@ -238,6 +238,8 @@ final class Source[+Out, +Mat]( **/ def asSourceWithContext[Ctx](f: Out => Ctx): SourceWithContext[Out, Ctx, Mat] = new SourceWithContext(this.map(e => (e, f(e)))) + + override def getAttributes: Attributes = traversalBuilder.attributes } object Source {