diff --git a/akka-docs/rst/java/stream/stages-overview.rst b/akka-docs/rst/java/stream/stages-overview.rst index e51e934999..3cd4c6e154 100644 --- a/akka-docs/rst/java/stream/stages-overview.rst +++ b/akka-docs/rst/java/stream/stages-overview.rst @@ -992,12 +992,27 @@ Drop elements until a timeout has fired groupedWithin ^^^^^^^^^^^^^ -Chunk up the stream into groups of elements received within a time window, or limited by the given number of elements, -whichever happens first. +Chunk up this stream into groups of elements received within a time window, or limited by the number of the elements, +whatever happens first. Empty groups will not be emitted if no elements are received from upstream. +The last group before end-of-stream will contain the buffered elements since the previously emitted group. -**emits** when the configured time elapses since the last group has been emitted +**emits** when the configured time elapses since the last group has been emitted, +but not if no elements has been grouped (i.e: no empty groups), or when limit has been reached. -**backpressures** when the group has been assembled (the duration elapsed) and downstream backpressures +**backpressures** downstream backpressures, and there are `n+1` buffered elements + +**completes** when upstream completes + +groupedWeightedWithin +^^^^^^^^^^^^^ +Chunk up this stream into groups of elements received within a time window, or limited by the weight of the elements, +whatever happens first. Empty groups will not be emitted if no elements are received from upstream. +The last group before end-of-stream will contain the buffered elements since the previously emitted group. + +**emits** when the configured time elapses since the last group has been emitted, +but not if no elements has been grouped (i.e: no empty groups), or when weight limit has been reached. + +**backpressures** downstream backpressures, and buffered group (+ pending element) weighs more than `maxWeight` **completes** when upstream completes diff --git a/akka-docs/rst/scala/stream/stages-overview.rst b/akka-docs/rst/scala/stream/stages-overview.rst index d14bfb4440..026d246b6b 100644 --- a/akka-docs/rst/scala/stream/stages-overview.rst +++ b/akka-docs/rst/scala/stream/stages-overview.rst @@ -981,12 +981,27 @@ Drop elements until a timeout has fired groupedWithin ^^^^^^^^^^^^^ -Chunk up the stream into groups of elements received within a time window, or limited by the given number of elements, -whichever happens first. +Chunk up this stream into groups of elements received within a time window, or limited by the number of the elements, +whatever happens first. Empty groups will not be emitted if no elements are received from upstream. +The last group before end-of-stream will contain the buffered elements since the previously emitted group. -**emits** when the configured time elapses since the last group has been emitted +**emits** when the configured time elapses since the last group has been emitted, +but not if no elements has been grouped (i.e: no empty groups), or when limit has been reached. -**backpressures** when the group has been assembled (the duration elapsed) and downstream backpressures +**backpressures** downstream backpressures, and there are `n+1` buffered elements + +**completes** when upstream completes + +groupedWeightedWithin +^^^^^^^^^^^^^ +Chunk up this stream into groups of elements received within a time window, or limited by the weight of the elements, +whatever happens first. Empty groups will not be emitted if no elements are received from upstream. +The last group before end-of-stream will contain the buffered elements since the previously emitted group. + +**emits** when the configured time elapses since the last group has been emitted, +but not if no elements has been grouped (i.e: no empty groups), or when weight limit has been reached. + +**backpressures** downstream backpressures, and buffered group (+ pending element) weighs more than `maxWeight` **completes** when upstream completes diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowGroupedWithinSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowGroupedWithinSpec.scala index 9f42197494..238f7b4805 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowGroupedWithinSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowGroupedWithinSpec.scala @@ -97,7 +97,6 @@ class FlowGroupedWithinSpec extends StreamSpec with ScriptedTest { c.expectNoMsg(600.millis) pSub.sendComplete() c.expectComplete - c.expectNoMsg(100.millis) } "not emit empty group when finished while not being pushed" taggedAs TimingTest in { @@ -113,7 +112,6 @@ class FlowGroupedWithinSpec extends StreamSpec with ScriptedTest { } "reset time window when max elements reached" taggedAs TimingTest in { - val inputs = Iterator.from(1) val upstream = TestPublisher.probe[Int]() val downstream = TestSubscriber.probe[immutable.Seq[Int]]() Source.fromPublisher(upstream).groupedWithin(3, 2.second).to(Sink.fromSubscriber(downstream)).run() @@ -121,7 +119,7 @@ class FlowGroupedWithinSpec extends StreamSpec with ScriptedTest { downstream.request(2) downstream.expectNoMsg(1000.millis) - (1 to 4) foreach { _ ⇒ upstream.sendNext(inputs.next()) } + (1 to 4).foreach(upstream.sendNext) downstream.within(1000.millis) { downstream.expectNext((1 to 3).toVector) } @@ -133,10 +131,26 @@ class FlowGroupedWithinSpec extends StreamSpec with ScriptedTest { } upstream.sendComplete() - downstream.expectComplete + downstream.expectComplete() downstream.expectNoMsg(100.millis) } + "reset time window when exact max elements reached" taggedAs TimingTest in { + val upstream = TestPublisher.probe[Int]() + val downstream = TestSubscriber.probe[immutable.Seq[Int]]() + Source.fromPublisher(upstream).groupedWithin(3, 1.second).to(Sink.fromSubscriber(downstream)).run() + + downstream.request(2) + + (1 to 3).foreach(upstream.sendNext) + downstream.within(1000.millis) { + downstream.expectNext((1 to 3).toVector) + } + + upstream.sendComplete() + downstream.expectComplete() + } + "group evenly" taggedAs TimingTest in { def script = Script(TestConfig.RandomTestRange map { _ ⇒ val x, y, z = random.nextInt(); Seq(x, y, z) → Seq(immutable.Seq(x, y, z)) }: _*) TestConfig.RandomTestRange foreach (_ ⇒ runScript(script, settings)(_.groupedWithin(3, 10.minutes))) @@ -157,4 +171,86 @@ class FlowGroupedWithinSpec extends StreamSpec with ScriptedTest { } + "A GroupedWeightedWithin" must { + "handle elements larger than the limit" taggedAs TimingTest in { + val downstream = TestSubscriber.probe[immutable.Seq[Int]]() + Source(List(1, 2, 3, 101, 4, 5, 6)) + .groupedWeightedWithin(100, 100.millis)(_.toLong) + .to(Sink.fromSubscriber(downstream)) + .run() + + downstream.request(1) + downstream.expectNext((1 to 3).toVector) + downstream.request(1) + downstream.expectNext(Vector(101)) + downstream.request(1) + downstream.expectNext((4 to 6).toVector) + downstream.expectComplete() + } + + "not drop a pending last element on upstream finish" taggedAs TimingTest in { + val upstream = TestPublisher.probe[Long]() + val downstream = TestSubscriber.probe[immutable.Seq[Long]]() + Source + .fromPublisher(upstream) + .groupedWeightedWithin(5, 50.millis)(identity) + .to(Sink.fromSubscriber(downstream)) + .run() + + downstream.ensureSubscription() + downstream.expectNoMsg(100.millis) + upstream.sendNext(1) + upstream.sendNext(2) + upstream.sendNext(3) + upstream.sendComplete() + downstream.request(1) + downstream.expectNext(Vector(1, 2): immutable.Seq[Long]) + downstream.expectNoMsg(100.millis) + downstream.request(1) + downstream.expectNext(Vector(3): immutable.Seq[Long]) + downstream.expectComplete() + } + + "append zero weighted elements to a full group before timeout received, if downstream hasn't pulled yet" taggedAs TimingTest in { + val upstream = TestPublisher.probe[String]() + val downstream = TestSubscriber.probe[immutable.Seq[String]]() + Source + .fromPublisher(upstream) + .groupedWeightedWithin(5, 50.millis)(_.length.toLong) + .to(Sink.fromSubscriber(downstream)) + .run() + + downstream.ensureSubscription() + upstream.sendNext("333") + upstream.sendNext("22") + upstream.sendNext("") + upstream.sendNext("") + upstream.sendNext("") + downstream.request(1) + downstream.expectNext(Vector("333", "22", "", "", ""): immutable.Seq[String]) + upstream.sendNext("") + upstream.sendNext("") + upstream.sendComplete() + downstream.request(1) + downstream.expectNext(Vector("", ""): immutable.Seq[String]) + downstream.expectComplete() + } + + "not emit an empty group if first element is heavier than maxWeight" taggedAs TimingTest in { + val וupstream = TestPublisher.probe[Long]() + val downstream = TestSubscriber.probe[immutable.Seq[Long]]() + Source + .fromPublisher(וupstream) + .groupedWeightedWithin(10, 50.millis)(identity) + .to(Sink.fromSubscriber(downstream)) + .run() + + downstream.ensureSubscription() + downstream.request(1) + וupstream.sendNext(11) + downstream.expectNext(Vector(11): immutable.Seq[Long]) + וupstream.sendComplete() + downstream.expectComplete() + } + } } diff --git a/akka-stream/src/main/scala/akka/stream/impl/Stages.scala b/akka-stream/src/main/scala/akka/stream/impl/Stages.scala index cce0acd844..0efaf4a46b 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/Stages.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/Stages.scala @@ -30,6 +30,7 @@ import akka.stream._ val mapAsyncUnordered = name("mapAsyncUnordered") val grouped = name("grouped") val groupedWithin = name("groupedWithin") + val groupedWeightedWithin = name("groupedWeightedWithin") val limit = name("limit") val limitWeighted = name("limitWeighted") val sliding = name("sliding") diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala index 66fba3a99e..f1af46a881 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala @@ -1399,52 +1399,90 @@ private[stream] object Collect { } +@InternalApi private[akka] object GroupedWeightedWithin { + val groupedWeightedWithinTimer = "GroupedWeightedWithinTimer" +} /** * INTERNAL API */ -@InternalApi private[akka] final class GroupedWithin[T](val n: Int, val d: FiniteDuration) extends GraphStage[FlowShape[T, immutable.Seq[T]]] { - require(n > 0, "n must be greater than 0") - require(d > Duration.Zero) +@InternalApi private[akka] final class GroupedWeightedWithin[T](val maxWeight: Long, costFn: T ⇒ Long, val interval: FiniteDuration) extends GraphStage[FlowShape[T, immutable.Seq[T]]] { + require(maxWeight > 0, "maxWeight must be greater than 0") + require(interval > Duration.Zero) val in = Inlet[T]("in") val out = Outlet[immutable.Seq[T]]("out") - override def initialAttributes = DefaultAttributes.groupedWithin + override def initialAttributes = DefaultAttributes.groupedWeightedWithin val shape = FlowShape(in, out) override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new TimerGraphStageLogic(shape) with InHandler with OutHandler { private val buf: VectorBuilder[T] = new VectorBuilder + private var pending: T = null.asInstanceOf[T] + private var pendingWeight: Long = 0L // True if: // - buf is nonEmpty // AND - // - timer fired OR group is full - private var groupClosed = false + // - (timer fired + // OR + // totalWeight >= maxWeight + // OR + // pending != null + // OR + // upstream completed) + private var pushEagerly = false private var groupEmitted = true private var finished = false - private var elements = 0 - - private val GroupedWithinTimer = "GroupedWithinTimer" + private var totalWeight = 0L override def preStart() = { - schedulePeriodically(GroupedWithinTimer, d) + schedulePeriodically(GroupedWeightedWithin.groupedWeightedWithinTimer, interval) pull(in) } private def nextElement(elem: T): Unit = { groupEmitted = false - buf += elem - elements += 1 - if (elements == n) { - schedulePeriodically(GroupedWithinTimer, d) - closeGroup() - } else pull(in) + val cost = costFn(elem) + if (cost < 0) failStage(new IllegalArgumentException(s"Negative weight [$cost] for element [$elem] is not allowed")) + else { + if (totalWeight + cost <= maxWeight) { + buf += elem + totalWeight += cost + + if (totalWeight < maxWeight) pull(in) + else { + // `totalWeight >= maxWeight` which means that downstream can get the next group. + if (!isAvailable(out)) { + // We should emit group when downstream becomes available + pushEagerly = true + // we want to pull anyway, since we allow for zero weight elements + // but since `emitGroup()` will pull internally (by calling `startNewGroup()`) + // we also have to pull if downstream hasn't yet requested an element. + pull(in) + } else { + schedulePeriodically(GroupedWeightedWithin.groupedWeightedWithinTimer, interval) + emitGroup() + } + } + } else { + if (totalWeight == 0L) { + buf += elem + totalWeight += cost + pushEagerly = true + } else { + pending = elem + pendingWeight = cost + } + schedulePeriodically(GroupedWeightedWithin.groupedWeightedWithinTimer, interval) + tryCloseGroup() + } + } } - private def closeGroup(): Unit = { - groupClosed = true + private def tryCloseGroup(): Unit = { if (isAvailable(out)) emitGroup() + else if (pending != null || finished) pushEagerly = true } private def emitGroup(): Unit = { @@ -1452,30 +1490,41 @@ private[stream] object Collect { push(out, buf.result()) buf.clear() if (!finished) startNewGroup() + else if (pending != null) emit(out, Vector(pending), () ⇒ completeStage()) else completeStage() } private def startNewGroup(): Unit = { - elements = 0 - groupClosed = false + if (pending != null) { + totalWeight = pendingWeight + pendingWeight = 0L + buf += pending + pending = null.asInstanceOf[T] + groupEmitted = false + } else { + totalWeight = 0 + } + pushEagerly = false if (isAvailable(in)) nextElement(grab(in)) else if (!hasBeenPulled(in)) pull(in) } override def onPush(): Unit = { - if (!groupClosed) nextElement(grab(in)) // otherwise keep the element for next round + if (pending == null) nextElement(grab(in)) // otherwise keep the element for next round } - override def onPull(): Unit = if (groupClosed) emitGroup() + override def onPull(): Unit = if (pushEagerly) emitGroup() override def onUpstreamFinish(): Unit = { finished = true if (groupEmitted) completeStage() - else closeGroup() + else tryCloseGroup() } - override protected def onTimer(timerKey: Any) = if (elements > 0) closeGroup() - + override protected def onTimer(timerKey: Any) = if (totalWeight > 0) { + if (isAvailable(out)) emitGroup() + else pushEagerly = true + } setHandlers(in, out, this) } } diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala index 15a487a701..013559f163 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala @@ -702,9 +702,9 @@ final class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends * The last group before end-of-stream will contain the buffered elements * since the previously emitted group. * - * '''Emits when''' the configured time elapses since the last group has been emitted + * '''Emits when''' the configured time elapses since the last group has been emitted or `n` elements is buffered * - * '''Backpressures when''' the configured time elapses since the last group has been emitted + * '''Backpressures when''' downstream backpressures, and there are `n+1` buffered elements * * '''Completes when''' upstream completes (emits last group) * @@ -716,6 +716,27 @@ final class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends def groupedWithin(n: Int, d: FiniteDuration): javadsl.Flow[In, java.util.List[Out @uncheckedVariance], Mat] = new Flow(delegate.groupedWithin(n, d).map(_.asJava)) // TODO optimize to one step + /** + * Chunk up this stream into groups of elements received within a time window, + * or limited by the weight of the elements, whatever happens first. + * Empty groups will not be emitted if no elements are received from upstream. + * The last group before end-of-stream will contain the buffered elements + * since the previously emitted group. + * + * '''Emits when''' the configured time elapses since the last group has been emitted or weight limit reached + * + * '''Backpressures when''' downstream backpressures, and buffered group (+ pending element) weighs more than `maxWeight` + * + * '''Completes when''' upstream completes (emits last group) + * + * '''Cancels when''' downstream completes + * + * `maxWeight` must be positive, and `d` must be greater than 0 seconds, otherwise + * IllegalArgumentException is thrown. + */ + def groupedWeightedWithin(maxWeight: Long, costFn: function.Function[Out, Long], d: FiniteDuration): javadsl.Flow[In, java.util.List[Out @uncheckedVariance], Mat] = + new Flow(delegate.groupedWeightedWithin(maxWeight, d)(costFn.apply).map(_.asJava)) + /** * Shifts elements emission in time by a specified amount. It allows to store elements * in internal buffer while waiting for next element to be emitted. Depending on the defined diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala index 73a801bce4..21962ad1f8 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala @@ -1443,9 +1443,9 @@ final class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Grap * The last group before end-of-stream will contain the buffered elements * since the previously emitted group. * - * '''Emits when''' the configured time elapses since the last group has been emitted + * '''Emits when''' the configured time elapses since the last group has been emitted or `n` elements is buffered * - * '''Backpressures when''' the configured time elapses since the last group has been emitted + * '''Backpressures when''' downstream backpressures, and there are `n+1` buffered elements * * '''Completes when''' upstream completes (emits last group) * @@ -1457,6 +1457,27 @@ final class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Grap def groupedWithin(n: Int, d: FiniteDuration): javadsl.Source[java.util.List[Out @uncheckedVariance], Mat] = new Source(delegate.groupedWithin(n, d).map(_.asJava)) // TODO optimize to one step + /** + * Chunk up this stream into groups of elements received within a time window, + * or limited by the weight of the elements, whatever happens first. + * Empty groups will not be emitted if no elements are received from upstream. + * The last group before end-of-stream will contain the buffered elements + * since the previously emitted group. + * + * '''Emits when''' the configured time elapses since the last group has been emitted or weight limit reached + * + * '''Backpressures when''' downstream backpressures, and buffered group (+ pending element) weighs more than `maxWeight` + * + * '''Completes when''' upstream completes (emits last group) + * + * '''Cancels when''' downstream completes + * + * `maxWeight` must be positive, and `d` must be greater than 0 seconds, otherwise + * IllegalArgumentException is thrown. + */ + def groupedWeightedWithin(maxWeight: Long, costFn: function.Function[Out, Long], d: FiniteDuration): javadsl.Source[java.util.List[Out @uncheckedVariance], Mat] = + new Source(delegate.groupedWeightedWithin(maxWeight, d)(costFn.apply).map(_.asJava)) + /** * Shifts elements emission in time by a specified amount. It allows to store elements * in internal buffer while waiting for next element to be emitted. Depending on the defined diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/SubFlow.scala b/akka-stream/src/main/scala/akka/stream/javadsl/SubFlow.scala index 253c9d2ee5..61f080f56d 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/SubFlow.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/SubFlow.scala @@ -536,9 +536,9 @@ class SubFlow[-In, +Out, +Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Flo * The last group before end-of-stream will contain the buffered elements * since the previously emitted group. * - * '''Emits when''' the configured time elapses since the last group has been emitted + * '''Emits when''' the configured time elapses since the last group has been emitted or `n` elements is buffered * - * '''Backpressures when''' the configured time elapses since the last group has been emitted + * '''Backpressures when''' downstream backpressures, and there are `n+1` buffered elements * * '''Completes when''' upstream completes (emits last group) * @@ -550,6 +550,27 @@ class SubFlow[-In, +Out, +Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Flo def groupedWithin(n: Int, d: FiniteDuration): SubFlow[In, java.util.List[Out @uncheckedVariance], Mat] = new SubFlow(delegate.groupedWithin(n, d).map(_.asJava)) // TODO optimize to one step + /** + * Chunk up this stream into groups of elements received within a time window, + * or limited by the weight of the elements, whatever happens first. + * Empty groups will not be emitted if no elements are received from upstream. + * The last group before end-of-stream will contain the buffered elements + * since the previously emitted group. + * + * '''Emits when''' the configured time elapses since the last group has been emitted or weight limit reached + * + * '''Backpressures when''' downstream backpressures, and buffered group (+ pending element) weighs more than `maxWeight` + * + * '''Completes when''' upstream completes (emits last group) + * + * '''Cancels when''' downstream completes + * + * `maxWeight` must be positive, and `d` must be greater than 0 seconds, otherwise + * IllegalArgumentException is thrown. + */ + def groupedWeightedWithin(maxWeight: Long, costFn: function.Function[Out, Long], d: FiniteDuration): javadsl.SubFlow[In, java.util.List[Out @uncheckedVariance], Mat] = + new SubFlow(delegate.groupedWeightedWithin(maxWeight, d)(costFn.apply).map(_.asJava)) + /** * Shifts elements emission in time by a specified amount. It allows to store elements * in internal buffer while waiting for next element to be emitted. Depending on the defined diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/SubSource.scala b/akka-stream/src/main/scala/akka/stream/javadsl/SubSource.scala index 8bb8f13292..7fa5a1466c 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/SubSource.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/SubSource.scala @@ -536,9 +536,9 @@ class SubSource[+Out, +Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Source * The last group before end-of-stream will contain the buffered elements * since the previously emitted group. * - * '''Emits when''' the configured time elapses since the last group has been emitted + * '''Emits when''' the configured time elapses since the last group has been emitted or `n` elements is buffered * - * '''Backpressures when''' the configured time elapses since the last group has been emitted + * '''Backpressures when''' downstream backpressures, and there are `n+1` buffered elements * * '''Completes when''' upstream completes (emits last group) * @@ -550,6 +550,27 @@ class SubSource[+Out, +Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Source def groupedWithin(n: Int, d: FiniteDuration): SubSource[java.util.List[Out @uncheckedVariance], Mat] = new SubSource(delegate.groupedWithin(n, d).map(_.asJava)) // TODO optimize to one step + /** + * Chunk up this stream into groups of elements received within a time window, + * or limited by the weight of the elements, whatever happens first. + * Empty groups will not be emitted if no elements are received from upstream. + * The last group before end-of-stream will contain the buffered elements + * since the previously emitted group. + * + * '''Emits when''' the configured time elapses since the last group has been emitted or weight limit reached + * + * '''Backpressures when''' downstream backpressures, and buffered group (+ pending element) weighs more than `maxWeight` + * + * '''Completes when''' upstream completes (emits last group) + * + * '''Cancels when''' downstream completes + * + * `maxWeight` must be positive, and `d` must be greater than 0 seconds, otherwise + * IllegalArgumentException is thrown. + */ + def groupedWeightedWithin(maxWeight: Long, costFn: function.Function[Out, Long], d: FiniteDuration): javadsl.SubSource[java.util.List[Out @uncheckedVariance], Mat] = + new SubSource(delegate.groupedWeightedWithin(maxWeight, d)(costFn.apply).map(_.asJava)) + /** * Discard the given number of elements at the beginning of the stream. * No elements will be dropped if `n` is zero or negative. diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala index e973df6b77..d6b38ec173 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala @@ -981,16 +981,37 @@ trait FlowOps[+Out, +Mat] { * `n` must be positive, and `d` must be greater than 0 seconds, otherwise * IllegalArgumentException is thrown. * - * '''Emits when''' the configured time elapses since the last group has been emitted + * '''Emits when''' the configured time elapses since the last group has been emitted or `n` elements is buffered * - * '''Backpressures when''' the configured time elapses since the last group has been emitted + * '''Backpressures when''' downstream backpressures, and there are `n+1` buffered elements * * '''Completes when''' upstream completes (emits last group) * * '''Cancels when''' downstream completes */ def groupedWithin(n: Int, d: FiniteDuration): Repr[immutable.Seq[Out]] = - via(new GroupedWithin[Out](n, d)) + via(new GroupedWeightedWithin[Out](n, ConstantFun.oneLong, d).withAttributes(DefaultAttributes.groupedWithin)) + + /** + * Chunk up this stream into groups of elements received within a time window, + * or limited by the weight of the elements, whatever happens first. + * Empty groups will not be emitted if no elements are received from upstream. + * The last group before end-of-stream will contain the buffered elements + * since the previously emitted group. + * + * `maxWeight` must be positive, and `d` must be greater than 0 seconds, otherwise + * IllegalArgumentException is thrown. + * + * '''Emits when''' the configured time elapses since the last group has been emitted or weight limit reached + * + * '''Backpressures when''' downstream backpressures, and buffered group (+ pending element) weighs more than `maxWeight` + * + * '''Completes when''' upstream completes (emits last group) + * + * '''Cancels when''' downstream completes + */ + def groupedWeightedWithin(maxWeight: Long, d: FiniteDuration)(costFn: Out ⇒ Long): Repr[immutable.Seq[Out]] = + via(new GroupedWeightedWithin[Out](maxWeight, costFn, d)) /** * Shifts elements emission in time by a specified amount. It allows to store elements diff --git a/project/MiMa.scala b/project/MiMa.scala index f272134a6c..eae610a357 100644 --- a/project/MiMa.scala +++ b/project/MiMa.scala @@ -1072,6 +1072,9 @@ object MiMa extends AutoPlugin { FilterAnyProblemStartingWith("akka.remote.artery") ), "2.4.17" -> Seq( + // #22711 changes to groupedWithin internal classes + ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.scaladsl.FlowOps.groupedWeightedWithin"), + // #22277 changes to internal classes ProblemFilters.exclude[DirectMissingMethodProblem]("akka.remote.transport.netty.TcpServerHandler.this"), ProblemFilters.exclude[DirectMissingMethodProblem]("akka.remote.transport.netty.TcpClientHandler.this"),