Extend GroupedWeightedWithin to accept maxWeight and maxNumber simultaneously #30020

This commit is contained in:
Alex 2021-03-15 15:38:28 +04:00 committed by GitHub
parent 23f703fa08
commit 1044c4996e
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
11 changed files with 194 additions and 17 deletions

View file

@ -315,7 +315,7 @@ For a pull request to be considered at all it has to meet these requirements:
Some additional guidelines regarding source code are: Some additional guidelines regarding source code are:
- Keep the code [DRY](http://programmer.97things.oreilly.com/wiki/index.php/Don%27t_Repeat_Yourself). - Keep the code [DRY](http://programmer.97things.oreilly.com/wiki/index.php/Don%27t_Repeat_Yourself).
- Apply the [Boy Scout Rule](http://programmer.97things.oreilly.com/wiki/index.php/The_Boy_Scout_Rule) whenever you have the chance to. - Apply the [Boy Scout Rule](https://www.oreilly.com/library/view/97-things-every/9780596809515/ch08.html) whenever you have the chance to.
- Never delete or change existing copyright notices, just add additional info. - Never delete or change existing copyright notices, just add additional info.
- Do not use ``@author`` tags since it does not encourage [Collective Code Ownership](http://www.extremeprogramming.org/rules/collective.html). - Do not use ``@author`` tags since it does not encourage [Collective Code Ownership](http://www.extremeprogramming.org/rules/collective.html).
- Contributors , each project should make sure that the contributors gets the credit they deserve—in a text file or page on the project website and in the release notes etc. - Contributors , each project should make sure that the contributors gets the credit they deserve—in a text file or page on the project website and in the release notes etc.

View file

@ -12,8 +12,8 @@ Chunk up this stream into groups of elements received within a time window, or l
## Description ## Description
Chunk up this stream into groups of elements received within a time window, or limited by the weight of the elements, Chunk up this stream into groups of elements received within a time window, or limited by the weight and number of
whatever happens first. Empty groups will not be emitted if no elements are received from upstream. the elements, whatever happens first. Empty groups will not be emitted if no elements are received from upstream.
The last group before end-of-stream will contain the buffered elements since the previously emitted group. The last group before end-of-stream will contain the buffered elements since the previously emitted group.
See also: See also:

View file

@ -252,19 +252,19 @@ class FlowGroupedWithinSpec extends StreamSpec with ScriptedTest {
} }
"not emit an empty group if first element is heavier than maxWeight" taggedAs TimingTest in { "not emit an empty group if first element is heavier than maxWeight" taggedAs TimingTest in {
val וupstream = TestPublisher.probe[Long]() val upstream = TestPublisher.probe[Long]()
val downstream = TestSubscriber.probe[immutable.Seq[Long]]() val downstream = TestSubscriber.probe[immutable.Seq[Long]]()
Source Source
.fromPublisher(וupstream) .fromPublisher(upstream)
.groupedWeightedWithin(10, 50.millis)(identity) .groupedWeightedWithin(10, 50.millis)(identity)
.to(Sink.fromSubscriber(downstream)) .to(Sink.fromSubscriber(downstream))
.run() .run()
downstream.ensureSubscription() downstream.ensureSubscription()
downstream.request(1) downstream.request(1)
וupstream.sendNext(11) upstream.sendNext(11)
downstream.expectNext(Vector(11): immutable.Seq[Long]) downstream.expectNext(Vector(11): immutable.Seq[Long])
וupstream.sendComplete() upstream.sendComplete()
downstream.expectComplete() downstream.expectComplete()
} }
@ -288,5 +288,43 @@ class FlowGroupedWithinSpec extends StreamSpec with ScriptedTest {
upstream.sendComplete() upstream.sendComplete()
downstream.expectComplete() downstream.expectComplete()
} }
"group by max weight and max number of elements reached" taggedAs TimingTest in {
val upstream = TestPublisher.probe[Long]()
val downstream = TestSubscriber.probe[immutable.Seq[Long]]()
Source
.fromPublisher(upstream)
.groupedWeightedWithin(10, 3, 30.seconds)(identity)
.to(Sink.fromSubscriber(downstream))
.run()
downstream.ensureSubscription()
upstream.sendNext(1)
upstream.sendNext(2)
upstream.sendNext(3)
upstream.sendNext(4)
upstream.sendNext(5)
upstream.sendNext(6)
upstream.sendNext(11)
upstream.sendNext(7)
upstream.sendNext(2)
upstream.sendComplete()
downstream.request(1)
// split because of maxNumber: 3 element
downstream.expectNext(Vector(1, 2, 3): immutable.Seq[Long])
downstream.request(1)
// split because of maxWeight: 9=4+5, one more element did not fit
downstream.expectNext(Vector(4, 5): immutable.Seq[Long])
downstream.request(1)
// split because of maxWeight: 6, one more element did not fit
downstream.expectNext(Vector(6): immutable.Seq[Long])
downstream.request(1)
// split because of maxWeight: 11
downstream.expectNext(Vector(11): immutable.Seq[Long])
downstream.request(1)
// no split
downstream.expectNext(Vector(7, 2): immutable.Seq[Long])
downstream.expectComplete()
}
} }
} }

View file

@ -0,0 +1,2 @@
# disable compatibility check for @InternalApi private[akka] class
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.fusing.GroupedWeightedWithin.this")

View file

@ -1714,10 +1714,12 @@ private[stream] object Collect {
*/ */
@InternalApi private[akka] final class GroupedWeightedWithin[T]( @InternalApi private[akka] final class GroupedWeightedWithin[T](
val maxWeight: Long, val maxWeight: Long,
costFn: T => Long, val maxNumber: Int,
val costFn: T => Long,
val interval: FiniteDuration) val interval: FiniteDuration)
extends GraphStage[FlowShape[T, immutable.Seq[T]]] { extends GraphStage[FlowShape[T, immutable.Seq[T]]] {
require(maxWeight > 0, "maxWeight must be greater than 0") require(maxWeight > 0, "maxWeight must be greater than 0")
require(maxNumber > 0, "maxNumber must be greater than 0")
require(interval > Duration.Zero) require(interval > Duration.Zero)
val in = Inlet[T]("in") val in = Inlet[T]("in")
@ -1747,6 +1749,7 @@ private[stream] object Collect {
private var groupEmitted = true private var groupEmitted = true
private var finished = false private var finished = false
private var totalWeight = 0L private var totalWeight = 0L
private var totalNumber = 0
private var hasElements = false private var hasElements = false
override def preStart() = { override def preStart() = {
@ -1761,15 +1764,17 @@ private[stream] object Collect {
failStage(new IllegalArgumentException(s"Negative weight [$cost] for element [$elem] is not allowed")) failStage(new IllegalArgumentException(s"Negative weight [$cost] for element [$elem] is not allowed"))
else { else {
hasElements = true hasElements = true
if (totalWeight + cost <= maxWeight) { // if there is place (both weight and number) for `elem` in the current group
if (totalWeight + cost <= maxWeight && totalNumber + 1 <= maxNumber) {
buf += elem buf += elem
totalWeight += cost totalWeight += cost
totalNumber += 1;
if (totalWeight < maxWeight) pull(in) // if potentially there is a place (both weight and number) for one more element in the current group
if (totalWeight < maxWeight && totalNumber < maxNumber) pull(in)
else { else {
// `totalWeight >= maxWeight` which means that downstream can get the next group.
if (!isAvailable(out)) { if (!isAvailable(out)) {
// We should emit group when downstream becomes available // we should emit group when downstream becomes available
pushEagerly = true pushEagerly = true
// we want to pull anyway, since we allow for zero weight elements // we want to pull anyway, since we allow for zero weight elements
// but since `emitGroup()` will pull internally (by calling `startNewGroup()`) // but since `emitGroup()` will pull internally (by calling `startNewGroup()`)
@ -1781,10 +1786,11 @@ private[stream] object Collect {
} }
} }
} else { } else {
//we have a single heavy element that weighs more than the limit // if there is a single heavy element that weighs more than the limit
if (totalWeight == 0L) { if (totalWeight == 0L && totalNumber == 0) {
buf += elem buf += elem
totalWeight += cost totalWeight += cost
totalNumber += 1;
pushEagerly = true pushEagerly = true
} else { } else {
pending = elem pending = elem
@ -1813,12 +1819,14 @@ private[stream] object Collect {
private def startNewGroup(): Unit = { private def startNewGroup(): Unit = {
if (pending != null) { if (pending != null) {
totalWeight = pendingWeight totalWeight = pendingWeight
totalNumber = 1
pendingWeight = 0L pendingWeight = 0L
buf += pending buf += pending
pending = null.asInstanceOf[T] pending = null.asInstanceOf[T]
groupEmitted = false groupEmitted = false
} else { } else {
totalWeight = 0L totalWeight = 0L
totalNumber = 0
hasElements = false hasElements = false
} }
pushEagerly = false pushEagerly = false

View file

@ -1315,6 +1315,32 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr
d: java.time.Duration): javadsl.Flow[In, java.util.List[Out], Mat] = d: java.time.Duration): javadsl.Flow[In, java.util.List[Out], Mat] =
groupedWeightedWithin(maxWeight, costFn, d.asScala) groupedWeightedWithin(maxWeight, costFn, d.asScala)
/**
* Chunk up this stream into groups of elements received within a time window,
* or limited by the weight and number of the elements, whatever happens first.
* Empty groups will not be emitted if no elements are received from upstream.
* The last group before end-of-stream will contain the buffered elements
* since the previously emitted group.
*
* '''Emits when''' the configured time elapses since the last group has been emitted or weight limit reached
*
* '''Backpressures when''' downstream backpressures, and buffered group (+ pending element) weighs more than
* `maxWeight` or has more than `maxNumber` elements
*
* '''Completes when''' upstream completes (emits last group)
*
* '''Cancels when''' downstream completes
*
* `maxWeight` must be positive, `maxNumber` must be positive, and `d` must be greater than 0 seconds,
* otherwise IllegalArgumentException is thrown.
*/
def groupedWeightedWithin(
maxWeight: Long,
maxNumber: Int,
costFn: function.Function[Out, java.lang.Long],
d: java.time.Duration): javadsl.Flow[In, java.util.List[Out], Mat] =
new Flow(delegate.groupedWeightedWithin(maxWeight, maxNumber, d.asScala)(costFn.apply).map(_.asJava))
/** /**
* Shifts elements emission in time by a specified amount. It allows to store elements * Shifts elements emission in time by a specified amount. It allows to store elements
* in internal buffer while waiting for next element to be emitted. Depending on the defined * in internal buffer while waiting for next element to be emitted. Depending on the defined

View file

@ -2755,6 +2755,32 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[
d: java.time.Duration): javadsl.Source[java.util.List[Out @uncheckedVariance], Mat] = d: java.time.Duration): javadsl.Source[java.util.List[Out @uncheckedVariance], Mat] =
groupedWeightedWithin(maxWeight, costFn, d.asScala) groupedWeightedWithin(maxWeight, costFn, d.asScala)
/**
* Chunk up this stream into groups of elements received within a time window,
* or limited by the weight and number of the elements, whatever happens first.
* Empty groups will not be emitted if no elements are received from upstream.
* The last group before end-of-stream will contain the buffered elements
* since the previously emitted group.
*
* '''Emits when''' the configured time elapses since the last group has been emitted or weight limit reached
*
* '''Backpressures when''' downstream backpressures, and buffered group (+ pending element) weighs more than
* `maxWeight` or has more than `maxNumber` elements
*
* '''Completes when''' upstream completes (emits last group)
*
* '''Cancels when''' downstream completes
*
* `maxWeight` must be positive, `maxNumber` must be positive, and `d` must be greater than 0 seconds,
* otherwise IllegalArgumentException is thrown.
*/
def groupedWeightedWithin(
maxWeight: Long,
maxNumber: Int,
costFn: function.Function[Out, java.lang.Long],
d: java.time.Duration): javadsl.Source[java.util.List[Out @uncheckedVariance], Mat] =
new Source(delegate.groupedWeightedWithin(maxWeight, maxNumber, d.asScala)(costFn.apply).map(_.asJava))
/** /**
* Shifts elements emission in time by a specified amount. It allows to store elements * Shifts elements emission in time by a specified amount. It allows to store elements
* in internal buffer while waiting for next element to be emitted. Depending on the defined * in internal buffer while waiting for next element to be emitted. Depending on the defined

View file

@ -752,6 +752,32 @@ class SubFlow[In, Out, Mat](
d: java.time.Duration): javadsl.SubFlow[In, java.util.List[Out @uncheckedVariance], Mat] = d: java.time.Duration): javadsl.SubFlow[In, java.util.List[Out @uncheckedVariance], Mat] =
groupedWeightedWithin(maxWeight, costFn, d.asScala) groupedWeightedWithin(maxWeight, costFn, d.asScala)
/**
* Chunk up this stream into groups of elements received within a time window,
* or limited by the weight and number of the elements, whatever happens first.
* Empty groups will not be emitted if no elements are received from upstream.
* The last group before end-of-stream will contain the buffered elements
* since the previously emitted group.
*
* '''Emits when''' the configured time elapses since the last group has been emitted or weight limit reached
*
* '''Backpressures when''' downstream backpressures, and buffered group (+ pending element) weighs more than
* `maxWeight` or has more than `maxNumber` elements
*
* '''Completes when''' upstream completes (emits last group)
*
* '''Cancels when''' downstream completes
*
* `maxWeight` must be positive, `maxNumber` must be positive, and `d` must be greater than 0 seconds,
* otherwise IllegalArgumentException is thrown.
*/
def groupedWeightedWithin(
maxWeight: Long,
maxNumber: Int,
costFn: function.Function[Out, java.lang.Long],
d: java.time.Duration): javadsl.SubFlow[In, java.util.List[Out @uncheckedVariance], Mat] =
new SubFlow(delegate.groupedWeightedWithin(maxWeight, maxNumber, d.asScala)(costFn.apply).map(_.asJava))
/** /**
* Shifts elements emission in time by a specified amount. It allows to store elements * Shifts elements emission in time by a specified amount. It allows to store elements
* in internal buffer while waiting for next element to be emitted. Depending on the defined * in internal buffer while waiting for next element to be emitted. Depending on the defined

View file

@ -741,6 +741,32 @@ class SubSource[Out, Mat](
d: java.time.Duration): javadsl.SubSource[java.util.List[Out @uncheckedVariance], Mat] = d: java.time.Duration): javadsl.SubSource[java.util.List[Out @uncheckedVariance], Mat] =
groupedWeightedWithin(maxWeight, costFn, d.asScala) groupedWeightedWithin(maxWeight, costFn, d.asScala)
/**
* Chunk up this stream into groups of elements received within a time window,
* or limited by the weight and number of the elements, whatever happens first.
* Empty groups will not be emitted if no elements are received from upstream.
* The last group before end-of-stream will contain the buffered elements
* since the previously emitted group.
*
* '''Emits when''' the configured time elapses since the last group has been emitted or weight limit reached
*
* '''Backpressures when''' downstream backpressures, and buffered group (+ pending element) weighs more than
* `maxWeight` or has more than `maxNumber` elements
*
* '''Completes when''' upstream completes (emits last group)
*
* '''Cancels when''' downstream completes
*
* `maxWeight` must be positive, `maxNumber` must be positive, and `d` must be greater than 0 seconds,
* otherwise IllegalArgumentException is thrown.
*/
def groupedWeightedWithin(
maxWeight: Long,
maxNumber: Int,
costFn: function.Function[Out, java.lang.Long],
d: java.time.Duration): javadsl.SubSource[java.util.List[Out @uncheckedVariance], Mat] =
new SubSource(delegate.groupedWeightedWithin(maxWeight, maxNumber, d.asScala)(costFn.apply).map(_.asJava))
/** /**
* Discard the given number of elements at the beginning of the stream. * Discard the given number of elements at the beginning of the stream.
* No elements will be dropped if `n` is zero or negative. * No elements will be dropped if `n` is zero or negative.

View file

@ -1612,7 +1612,9 @@ trait FlowOps[+Out, +Mat] {
* '''Cancels when''' downstream completes * '''Cancels when''' downstream completes
*/ */
def groupedWithin(n: Int, d: FiniteDuration): Repr[immutable.Seq[Out]] = def groupedWithin(n: Int, d: FiniteDuration): Repr[immutable.Seq[Out]] =
via(new GroupedWeightedWithin[Out](n, ConstantFun.oneLong, d).withAttributes(DefaultAttributes.groupedWithin)) via(
new GroupedWeightedWithin[Out](Long.MaxValue, n, ConstantFun.zeroLong, d)
.withAttributes(DefaultAttributes.groupedWithin))
/** /**
* Chunk up this stream into groups of elements received within a time window, * Chunk up this stream into groups of elements received within a time window,
@ -1633,7 +1635,30 @@ trait FlowOps[+Out, +Mat] {
* '''Cancels when''' downstream completes * '''Cancels when''' downstream completes
*/ */
def groupedWeightedWithin(maxWeight: Long, d: FiniteDuration)(costFn: Out => Long): Repr[immutable.Seq[Out]] = def groupedWeightedWithin(maxWeight: Long, d: FiniteDuration)(costFn: Out => Long): Repr[immutable.Seq[Out]] =
via(new GroupedWeightedWithin[Out](maxWeight, costFn, d)) via(new GroupedWeightedWithin[Out](maxWeight, Int.MaxValue, costFn, d))
/**
* Chunk up this stream into groups of elements received within a time window,
* or limited by the weight and number of the elements, whatever happens first.
* Empty groups will not be emitted if no elements are received from upstream.
* The last group before end-of-stream will contain the buffered elements
* since the previously emitted group.
*
* `maxWeight` must be positive, `maxNumber` must be positive, and `d` must be greater than 0 seconds,
* otherwise IllegalArgumentException is thrown.
*
* '''Emits when''' the configured time elapses since the last group has been emitted or weight limit reached
*
* '''Backpressures when''' downstream backpressures, and buffered group (+ pending element) weighs more than
* `maxWeight` or has more than `maxNumber` elements
*
* '''Completes when''' upstream completes (emits last group)
*
* '''Cancels when''' downstream completes
*/
def groupedWeightedWithin(maxWeight: Long, maxNumber: Int, d: FiniteDuration)(
costFn: Out => Long): Repr[immutable.Seq[Out]] =
via(new GroupedWeightedWithin[Out](maxWeight, maxNumber, costFn, d))
/** /**
* Shifts elements emission in time by a specified amount. It allows to store elements * Shifts elements emission in time by a specified amount. It allows to store elements