From 1044c4996e4694ce09c01d652f468fc979bbf65b Mon Sep 17 00:00:00 2001 From: Alex Date: Mon, 15 Mar 2021 15:38:28 +0400 Subject: [PATCH] Extend GroupedWeightedWithin to accept maxWeight and maxNumber simultaneously #30020 --- CONTRIBUTING.md | 2 +- .../Source-or-Flow/groupedWeightedWithin.md | 4 +- .../scaladsl/FlowGroupedWithinSpec.scala | 46 +++++++++++++++++-- ...42-extended-group-weighted-within.excludes | 2 + .../scala/akka/stream/impl/fusing/Ops.scala | 22 ++++++--- .../main/scala/akka/stream/javadsl/Flow.scala | 26 +++++++++++ .../scala/akka/stream/javadsl/Source.scala | 26 +++++++++++ .../scala/akka/stream/javadsl/SubFlow.scala | 26 +++++++++++ .../scala/akka/stream/javadsl/SubSource.scala | 26 +++++++++++ .../scala/akka/stream/scaladsl/Flow.scala | 29 +++++++++++- .../src/main/scala/akka/Plugin.scala | 2 +- 11 files changed, 194 insertions(+), 17 deletions(-) create mode 100644 akka-stream/src/main/mima-filters/2.6.13.backwards.excludes/pr-30042-extended-group-weighted-within.excludes diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index 53aba98267..56ed0213ed 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -315,7 +315,7 @@ For a pull request to be considered at all it has to meet these requirements: Some additional guidelines regarding source code are: - Keep the code [DRY](http://programmer.97things.oreilly.com/wiki/index.php/Don%27t_Repeat_Yourself). -- Apply the [Boy Scout Rule](http://programmer.97things.oreilly.com/wiki/index.php/The_Boy_Scout_Rule) whenever you have the chance to. +- Apply the [Boy Scout Rule](https://www.oreilly.com/library/view/97-things-every/9780596809515/ch08.html) whenever you have the chance to. - Never delete or change existing copyright notices, just add additional info. - Do not use ``@author`` tags since it does not encourage [Collective Code Ownership](http://www.extremeprogramming.org/rules/collective.html). - Contributors , each project should make sure that the contributors gets the credit they deserve—in a text file or page on the project website and in the release notes etc. diff --git a/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/groupedWeightedWithin.md b/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/groupedWeightedWithin.md index 9decabf30b..0ddc48dc5a 100644 --- a/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/groupedWeightedWithin.md +++ b/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/groupedWeightedWithin.md @@ -12,8 +12,8 @@ Chunk up this stream into groups of elements received within a time window, or l ## Description -Chunk up this stream into groups of elements received within a time window, or limited by the weight of the elements, -whatever happens first. Empty groups will not be emitted if no elements are received from upstream. +Chunk up this stream into groups of elements received within a time window, or limited by the weight and number of +the elements, whatever happens first. Empty groups will not be emitted if no elements are received from upstream. The last group before end-of-stream will contain the buffered elements since the previously emitted group. See also: diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowGroupedWithinSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowGroupedWithinSpec.scala index d2bcd4ab4c..354776c516 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowGroupedWithinSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowGroupedWithinSpec.scala @@ -252,19 +252,19 @@ class FlowGroupedWithinSpec extends StreamSpec with ScriptedTest { } "not emit an empty group if first element is heavier than maxWeight" taggedAs TimingTest in { - val וupstream = TestPublisher.probe[Long]() + val upstream = TestPublisher.probe[Long]() val downstream = TestSubscriber.probe[immutable.Seq[Long]]() Source - .fromPublisher(וupstream) + .fromPublisher(upstream) .groupedWeightedWithin(10, 50.millis)(identity) .to(Sink.fromSubscriber(downstream)) .run() downstream.ensureSubscription() downstream.request(1) - וupstream.sendNext(11) + upstream.sendNext(11) downstream.expectNext(Vector(11): immutable.Seq[Long]) - וupstream.sendComplete() + upstream.sendComplete() downstream.expectComplete() } @@ -288,5 +288,43 @@ class FlowGroupedWithinSpec extends StreamSpec with ScriptedTest { upstream.sendComplete() downstream.expectComplete() } + + "group by max weight and max number of elements reached" taggedAs TimingTest in { + val upstream = TestPublisher.probe[Long]() + val downstream = TestSubscriber.probe[immutable.Seq[Long]]() + Source + .fromPublisher(upstream) + .groupedWeightedWithin(10, 3, 30.seconds)(identity) + .to(Sink.fromSubscriber(downstream)) + .run() + + downstream.ensureSubscription() + upstream.sendNext(1) + upstream.sendNext(2) + upstream.sendNext(3) + upstream.sendNext(4) + upstream.sendNext(5) + upstream.sendNext(6) + upstream.sendNext(11) + upstream.sendNext(7) + upstream.sendNext(2) + upstream.sendComplete() + downstream.request(1) + // split because of maxNumber: 3 element + downstream.expectNext(Vector(1, 2, 3): immutable.Seq[Long]) + downstream.request(1) + // split because of maxWeight: 9=4+5, one more element did not fit + downstream.expectNext(Vector(4, 5): immutable.Seq[Long]) + downstream.request(1) + // split because of maxWeight: 6, one more element did not fit + downstream.expectNext(Vector(6): immutable.Seq[Long]) + downstream.request(1) + // split because of maxWeight: 11 + downstream.expectNext(Vector(11): immutable.Seq[Long]) + downstream.request(1) + // no split + downstream.expectNext(Vector(7, 2): immutable.Seq[Long]) + downstream.expectComplete() + } } } diff --git a/akka-stream/src/main/mima-filters/2.6.13.backwards.excludes/pr-30042-extended-group-weighted-within.excludes b/akka-stream/src/main/mima-filters/2.6.13.backwards.excludes/pr-30042-extended-group-weighted-within.excludes new file mode 100644 index 0000000000..6766f6f518 --- /dev/null +++ b/akka-stream/src/main/mima-filters/2.6.13.backwards.excludes/pr-30042-extended-group-weighted-within.excludes @@ -0,0 +1,2 @@ +# disable compatibility check for @InternalApi private[akka] class +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.fusing.GroupedWeightedWithin.this") \ No newline at end of file diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala index ce89040b07..5fc89f2409 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala @@ -1714,10 +1714,12 @@ private[stream] object Collect { */ @InternalApi private[akka] final class GroupedWeightedWithin[T]( val maxWeight: Long, - costFn: T => Long, + val maxNumber: Int, + val costFn: T => Long, val interval: FiniteDuration) extends GraphStage[FlowShape[T, immutable.Seq[T]]] { require(maxWeight > 0, "maxWeight must be greater than 0") + require(maxNumber > 0, "maxNumber must be greater than 0") require(interval > Duration.Zero) val in = Inlet[T]("in") @@ -1747,6 +1749,7 @@ private[stream] object Collect { private var groupEmitted = true private var finished = false private var totalWeight = 0L + private var totalNumber = 0 private var hasElements = false override def preStart() = { @@ -1761,15 +1764,17 @@ private[stream] object Collect { failStage(new IllegalArgumentException(s"Negative weight [$cost] for element [$elem] is not allowed")) else { hasElements = true - if (totalWeight + cost <= maxWeight) { + // if there is place (both weight and number) for `elem` in the current group + if (totalWeight + cost <= maxWeight && totalNumber + 1 <= maxNumber) { buf += elem totalWeight += cost + totalNumber += 1; - if (totalWeight < maxWeight) pull(in) + // if potentially there is a place (both weight and number) for one more element in the current group + if (totalWeight < maxWeight && totalNumber < maxNumber) pull(in) else { - // `totalWeight >= maxWeight` which means that downstream can get the next group. if (!isAvailable(out)) { - // We should emit group when downstream becomes available + // we should emit group when downstream becomes available pushEagerly = true // we want to pull anyway, since we allow for zero weight elements // but since `emitGroup()` will pull internally (by calling `startNewGroup()`) @@ -1781,10 +1786,11 @@ private[stream] object Collect { } } } else { - //we have a single heavy element that weighs more than the limit - if (totalWeight == 0L) { + // if there is a single heavy element that weighs more than the limit + if (totalWeight == 0L && totalNumber == 0) { buf += elem totalWeight += cost + totalNumber += 1; pushEagerly = true } else { pending = elem @@ -1813,12 +1819,14 @@ private[stream] object Collect { private def startNewGroup(): Unit = { if (pending != null) { totalWeight = pendingWeight + totalNumber = 1 pendingWeight = 0L buf += pending pending = null.asInstanceOf[T] groupEmitted = false } else { totalWeight = 0L + totalNumber = 0 hasElements = false } pushEagerly = false diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala index d797564fea..4712191932 100755 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala @@ -1315,6 +1315,32 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr d: java.time.Duration): javadsl.Flow[In, java.util.List[Out], Mat] = groupedWeightedWithin(maxWeight, costFn, d.asScala) + /** + * Chunk up this stream into groups of elements received within a time window, + * or limited by the weight and number of the elements, whatever happens first. + * Empty groups will not be emitted if no elements are received from upstream. + * The last group before end-of-stream will contain the buffered elements + * since the previously emitted group. + * + * '''Emits when''' the configured time elapses since the last group has been emitted or weight limit reached + * + * '''Backpressures when''' downstream backpressures, and buffered group (+ pending element) weighs more than + * `maxWeight` or has more than `maxNumber` elements + * + * '''Completes when''' upstream completes (emits last group) + * + * '''Cancels when''' downstream completes + * + * `maxWeight` must be positive, `maxNumber` must be positive, and `d` must be greater than 0 seconds, + * otherwise IllegalArgumentException is thrown. + */ + def groupedWeightedWithin( + maxWeight: Long, + maxNumber: Int, + costFn: function.Function[Out, java.lang.Long], + d: java.time.Duration): javadsl.Flow[In, java.util.List[Out], Mat] = + new Flow(delegate.groupedWeightedWithin(maxWeight, maxNumber, d.asScala)(costFn.apply).map(_.asJava)) + /** * Shifts elements emission in time by a specified amount. It allows to store elements * in internal buffer while waiting for next element to be emitted. Depending on the defined diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala index 108333f887..6dc8498d61 100755 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala @@ -2755,6 +2755,32 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[ d: java.time.Duration): javadsl.Source[java.util.List[Out @uncheckedVariance], Mat] = groupedWeightedWithin(maxWeight, costFn, d.asScala) + /** + * Chunk up this stream into groups of elements received within a time window, + * or limited by the weight and number of the elements, whatever happens first. + * Empty groups will not be emitted if no elements are received from upstream. + * The last group before end-of-stream will contain the buffered elements + * since the previously emitted group. + * + * '''Emits when''' the configured time elapses since the last group has been emitted or weight limit reached + * + * '''Backpressures when''' downstream backpressures, and buffered group (+ pending element) weighs more than + * `maxWeight` or has more than `maxNumber` elements + * + * '''Completes when''' upstream completes (emits last group) + * + * '''Cancels when''' downstream completes + * + * `maxWeight` must be positive, `maxNumber` must be positive, and `d` must be greater than 0 seconds, + * otherwise IllegalArgumentException is thrown. + */ + def groupedWeightedWithin( + maxWeight: Long, + maxNumber: Int, + costFn: function.Function[Out, java.lang.Long], + d: java.time.Duration): javadsl.Source[java.util.List[Out @uncheckedVariance], Mat] = + new Source(delegate.groupedWeightedWithin(maxWeight, maxNumber, d.asScala)(costFn.apply).map(_.asJava)) + /** * Shifts elements emission in time by a specified amount. It allows to store elements * in internal buffer while waiting for next element to be emitted. Depending on the defined diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/SubFlow.scala b/akka-stream/src/main/scala/akka/stream/javadsl/SubFlow.scala index 9b83a5310e..6c07d2fc15 100755 --- a/akka-stream/src/main/scala/akka/stream/javadsl/SubFlow.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/SubFlow.scala @@ -752,6 +752,32 @@ class SubFlow[In, Out, Mat]( d: java.time.Duration): javadsl.SubFlow[In, java.util.List[Out @uncheckedVariance], Mat] = groupedWeightedWithin(maxWeight, costFn, d.asScala) + /** + * Chunk up this stream into groups of elements received within a time window, + * or limited by the weight and number of the elements, whatever happens first. + * Empty groups will not be emitted if no elements are received from upstream. + * The last group before end-of-stream will contain the buffered elements + * since the previously emitted group. + * + * '''Emits when''' the configured time elapses since the last group has been emitted or weight limit reached + * + * '''Backpressures when''' downstream backpressures, and buffered group (+ pending element) weighs more than + * `maxWeight` or has more than `maxNumber` elements + * + * '''Completes when''' upstream completes (emits last group) + * + * '''Cancels when''' downstream completes + * + * `maxWeight` must be positive, `maxNumber` must be positive, and `d` must be greater than 0 seconds, + * otherwise IllegalArgumentException is thrown. + */ + def groupedWeightedWithin( + maxWeight: Long, + maxNumber: Int, + costFn: function.Function[Out, java.lang.Long], + d: java.time.Duration): javadsl.SubFlow[In, java.util.List[Out @uncheckedVariance], Mat] = + new SubFlow(delegate.groupedWeightedWithin(maxWeight, maxNumber, d.asScala)(costFn.apply).map(_.asJava)) + /** * Shifts elements emission in time by a specified amount. It allows to store elements * in internal buffer while waiting for next element to be emitted. Depending on the defined diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/SubSource.scala b/akka-stream/src/main/scala/akka/stream/javadsl/SubSource.scala index fc97db3157..3b2142e2b6 100755 --- a/akka-stream/src/main/scala/akka/stream/javadsl/SubSource.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/SubSource.scala @@ -741,6 +741,32 @@ class SubSource[Out, Mat]( d: java.time.Duration): javadsl.SubSource[java.util.List[Out @uncheckedVariance], Mat] = groupedWeightedWithin(maxWeight, costFn, d.asScala) + /** + * Chunk up this stream into groups of elements received within a time window, + * or limited by the weight and number of the elements, whatever happens first. + * Empty groups will not be emitted if no elements are received from upstream. + * The last group before end-of-stream will contain the buffered elements + * since the previously emitted group. + * + * '''Emits when''' the configured time elapses since the last group has been emitted or weight limit reached + * + * '''Backpressures when''' downstream backpressures, and buffered group (+ pending element) weighs more than + * `maxWeight` or has more than `maxNumber` elements + * + * '''Completes when''' upstream completes (emits last group) + * + * '''Cancels when''' downstream completes + * + * `maxWeight` must be positive, `maxNumber` must be positive, and `d` must be greater than 0 seconds, + * otherwise IllegalArgumentException is thrown. + */ + def groupedWeightedWithin( + maxWeight: Long, + maxNumber: Int, + costFn: function.Function[Out, java.lang.Long], + d: java.time.Duration): javadsl.SubSource[java.util.List[Out @uncheckedVariance], Mat] = + new SubSource(delegate.groupedWeightedWithin(maxWeight, maxNumber, d.asScala)(costFn.apply).map(_.asJava)) + /** * Discard the given number of elements at the beginning of the stream. * No elements will be dropped if `n` is zero or negative. diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala index 764bd1ee7d..3a2a5f935b 100755 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala @@ -1612,7 +1612,9 @@ trait FlowOps[+Out, +Mat] { * '''Cancels when''' downstream completes */ def groupedWithin(n: Int, d: FiniteDuration): Repr[immutable.Seq[Out]] = - via(new GroupedWeightedWithin[Out](n, ConstantFun.oneLong, d).withAttributes(DefaultAttributes.groupedWithin)) + via( + new GroupedWeightedWithin[Out](Long.MaxValue, n, ConstantFun.zeroLong, d) + .withAttributes(DefaultAttributes.groupedWithin)) /** * Chunk up this stream into groups of elements received within a time window, @@ -1633,7 +1635,30 @@ trait FlowOps[+Out, +Mat] { * '''Cancels when''' downstream completes */ def groupedWeightedWithin(maxWeight: Long, d: FiniteDuration)(costFn: Out => Long): Repr[immutable.Seq[Out]] = - via(new GroupedWeightedWithin[Out](maxWeight, costFn, d)) + via(new GroupedWeightedWithin[Out](maxWeight, Int.MaxValue, costFn, d)) + + /** + * Chunk up this stream into groups of elements received within a time window, + * or limited by the weight and number of the elements, whatever happens first. + * Empty groups will not be emitted if no elements are received from upstream. + * The last group before end-of-stream will contain the buffered elements + * since the previously emitted group. + * + * `maxWeight` must be positive, `maxNumber` must be positive, and `d` must be greater than 0 seconds, + * otherwise IllegalArgumentException is thrown. + * + * '''Emits when''' the configured time elapses since the last group has been emitted or weight limit reached + * + * '''Backpressures when''' downstream backpressures, and buffered group (+ pending element) weighs more than + * `maxWeight` or has more than `maxNumber` elements + * + * '''Completes when''' upstream completes (emits last group) + * + * '''Cancels when''' downstream completes + */ + def groupedWeightedWithin(maxWeight: Long, maxNumber: Int, d: FiniteDuration)( + costFn: Out => Long): Repr[immutable.Seq[Out]] = + via(new GroupedWeightedWithin[Out](maxWeight, maxNumber, costFn, d)) /** * Shifts elements emission in time by a specified amount. It allows to store elements diff --git a/plugins/serialversion-remover-plugin/src/main/scala/akka/Plugin.scala b/plugins/serialversion-remover-plugin/src/main/scala/akka/Plugin.scala index 331d68e702..017dc8d129 100644 --- a/plugins/serialversion-remover-plugin/src/main/scala/akka/Plugin.scala +++ b/plugins/serialversion-remover-plugin/src/main/scala/akka/Plugin.scala @@ -35,7 +35,7 @@ class SerialVersionRemoverPhase extends PluginPhase { if (tree.symbol.getAnnotation(defn.SerialVersionUIDAnnot).isDefined && tree.symbol.is(Trait)) { tree.symbol.removeAnnotation(defn.SerialVersionUIDAnnot) } - + tree }