diff --git a/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/grouped.md b/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/grouped.md
index d3c5d4822a..c58a5204f6 100644
--- a/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/grouped.md
+++ b/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/grouped.md
@@ -15,6 +15,12 @@ Accumulate incoming events until the specified number of elements have been accu
Accumulate incoming events until the specified number of elements have been accumulated and then pass the collection of
elements downstream.
+See also:
+
+* @ref[groupedWeighted](groupedWeighted.md) for a variant that groups based on element weight
+* @ref[groupedWithin](groupedWithin.md) for a variant that groups based on number of elements and a time window
+* @ref[groupedWeightedWithin](groupedWeightedWithin.md) for a variant that groups based on element weight and a time window
+
## Examples
The below example demonstrates how `grouped` groups the accumulated elements into @scala[`Seq`] @java[`List`]
diff --git a/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/groupedWeighted.md b/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/groupedWeighted.md
new file mode 100644
index 0000000000..21d2416025
--- /dev/null
+++ b/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/groupedWeighted.md
@@ -0,0 +1,46 @@
+# groupedWeighted
+
+Accumulate incoming events until the combined weight of elements is greater than or equal to the minimum weight and then pass the collection of elements downstream.
+
+@ref[Simple operators](../index.md#simple-operators)
+
+## Signature
+
+@apidoc[Source.groupedWeighted](Source) { scala="#groupedWeighted(minWeight:Long)(costFn:Out=>Long):FlowOps.this.Repr[scala.collection.immutable.Seq[Out]]" java="#groupedWeighted(long,akka.japi.function.Function)" }
+@apidoc[Flow.groupedWeighted](Flow) { scala="#groupedWeighted(minWeight:Long)(costFn:Out=>Long):FlowOps.this.Repr[scala.collection.immutable.Seq[Out]]" java="#groupedWeighted(long,akka.japi.function.Function)" }
+
+
+## Description
+
+Chunk up this stream into groups of elements that have a cumulative weight greater than or equal to the `minWeight`, with the last group possibly smaller than requested `minWeight` due to end-of-stream.
+
+See also:
+
+* @ref[grouped](grouped.md) for a variant that groups based on number of elements
+* @ref[groupedWithin](groupedWithin.md) for a variant that groups based on number of elements and a time window
+* @ref[groupedWeightedWithin](groupedWeightedWithin.md) for a variant that groups based on element weight and a time window
+
+## Examples
+
+The below example demonstrates how `groupedWeighted` groups the accumulated elements into @scala[`Seq`] @java[`List`]
+and maps with other operation.
+
+Scala
+: @@snip [groupedWeighted.scala](/akka-docs/src/test/scala/docs/stream/operators/sourceorflow/GroupedWeighted.scala) { #groupedWeighted }
+
+Java
+: @@snip [SourceOrFlow.java](/akka-docs/src/test/java/jdocs/stream/operators/SourceOrFlow.java) { #groupedWeighted }
+
+## Reactive Streams semantics
+
+@@@div { .callout }
+
+**emits** when the cumulative weight of elements is greater than or equal to the minimum weight or upstream completed
+
+**backpressures** when a group has been assembled and downstream backpressures
+
+**completes** when upstream completes
+
+@@@
+
+
diff --git a/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/groupedWeightedWithin.md b/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/groupedWeightedWithin.md
index afe45cd9ef..9decabf30b 100644
--- a/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/groupedWeightedWithin.md
+++ b/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/groupedWeightedWithin.md
@@ -16,6 +16,12 @@ Chunk up this stream into groups of elements received within a time window, or l
whatever happens first. Empty groups will not be emitted if no elements are received from upstream.
The last group before end-of-stream will contain the buffered elements since the previously emitted group.
+See also:
+
+* @ref[grouped](grouped.md) for a variant that groups based on number of elements
+* @ref[groupedWeighted](groupedWeighted.md) for a variant that groups based on element weight
+* @ref[groupedWithin](groupedWithin.md) for a variant that groups based on number of elements and a time window
+
## Reactive Streams semantics
@@@div { .callout }
diff --git a/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/groupedWithin.md b/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/groupedWithin.md
index 1cefe4fcd1..34ef4cdd1f 100644
--- a/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/groupedWithin.md
+++ b/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/groupedWithin.md
@@ -16,6 +16,12 @@ Chunk up this stream into groups of elements received within a time window, or l
whatever happens first. Empty groups will not be emitted if no elements are received from upstream.
The last group before end-of-stream will contain the buffered elements since the previously emitted group.
+See also:
+
+* @ref[grouped](grouped.md) for a variant that groups based on number of elements
+* @ref[groupedWeighted](groupedWeighted.md) for a variant that groups based on element weight
+* @ref[groupedWeightedWithin](groupedWeightedWithin.md) for a variant that groups based on element weight and a time window
+
## Reactive Streams semantics
@@@div { .callout }
diff --git a/akka-docs/src/main/paradox/stream/operators/index.md b/akka-docs/src/main/paradox/stream/operators/index.md
index af7d108932..dd48e0b12c 100644
--- a/akka-docs/src/main/paradox/stream/operators/index.md
+++ b/akka-docs/src/main/paradox/stream/operators/index.md
@@ -154,6 +154,7 @@ depending on being backpressured by downstream or not.
|Source/Flow|@ref[fromMaterializer](Source-or-Flow/fromMaterializer.md)|Defer the creation of a `Source/Flow` until materialization and access `Materializer` and `Attributes`|
|Flow|@ref[futureFlow](Flow/futureFlow.md)|Streams the elements through the given future flow once it successfully completes.|
|Source/Flow|@ref[grouped](Source-or-Flow/grouped.md)|Accumulate incoming events until the specified number of elements have been accumulated and then pass the collection of elements downstream.|
+|Source/Flow|@ref[groupedWeighted](Source-or-Flow/groupedWeighted.md)|Accumulate incoming events until the combined weight of elements is greater than or equal to the minimum weight and then pass the collection of elements downstream.|
|Source/Flow|@ref[intersperse](Source-or-Flow/intersperse.md)|Intersperse stream with provided element similar to `List.mkString`.|
|Flow|@ref[lazyCompletionStageFlow](Flow/lazyCompletionStageFlow.md)|Defers creation and materialization of a `Flow` until there is a first element.|
|Flow|@ref[lazyFlow](Flow/lazyFlow.md)|Defers creation and materialization of a `Flow` until there is a first element.|
@@ -442,6 +443,7 @@ For more background see the @ref[Error Handling in Streams](../stream-error.md)
* [futureSource](Source/futureSource.md)
* [groupBy](Source-or-Flow/groupBy.md)
* [grouped](Source-or-Flow/grouped.md)
+* [groupedWeighted](Source-or-Flow/groupedWeighted.md)
* [groupedWeightedWithin](Source-or-Flow/groupedWeightedWithin.md)
* [groupedWithin](Source-or-Flow/groupedWithin.md)
* [gunzip](Compression/gunzip.md)
diff --git a/akka-docs/src/test/java/jdocs/stream/operators/SourceOrFlow.java b/akka-docs/src/test/java/jdocs/stream/operators/SourceOrFlow.java
index 5c1a8b47cc..7d9bd7af26 100644
--- a/akka-docs/src/test/java/jdocs/stream/operators/SourceOrFlow.java
+++ b/akka-docs/src/test/java/jdocs/stream/operators/SourceOrFlow.java
@@ -46,6 +46,7 @@ import akka.stream.Attributes;
// #log
import java.time.Duration;
+import java.util.Collection;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.function.IntSupplier;
@@ -340,6 +341,22 @@ class SourceOrFlow {
// #grouped
}
+ void groupedWeightedExample() {
+ // #groupedWeighted
+ Source.from(Arrays.asList(Arrays.asList(1, 2), Arrays.asList(3, 4), Arrays.asList(5, 6)))
+ .groupedWeighted(4, x -> (long) x.size())
+ .runForeach(System.out::println, system);
+ // [[1, 2], [3, 4]]
+ // [[5, 6]]
+
+ Source.from(Arrays.asList(Arrays.asList(1, 2), Arrays.asList(3, 4), Arrays.asList(5, 6)))
+ .groupedWeighted(3, x -> (long) x.size())
+ .runForeach(System.out::println, system);
+ // [[1, 2], [3, 4]]
+ // [[5, 6]]
+ // #groupedWeighted
+ }
+
static
// #fold // #foldAsync
class Histogram {
diff --git a/akka-docs/src/test/scala/docs/stream/operators/sourceorflow/GroupedWeighted.scala b/akka-docs/src/test/scala/docs/stream/operators/sourceorflow/GroupedWeighted.scala
new file mode 100644
index 0000000000..73675f1d7c
--- /dev/null
+++ b/akka-docs/src/test/scala/docs/stream/operators/sourceorflow/GroupedWeighted.scala
@@ -0,0 +1,27 @@
+/*
+ * Copyright (C) 2019-2021 Lightbend Inc.
+ */
+
+package docs.stream.operators.sourceorflow
+import akka.stream.scaladsl.Source
+import scala.collection.immutable
+
+object GroupedWeighted {
+ def groupedWeightedExample(): Unit = {
+ import akka.actor.ActorSystem
+
+ implicit val system: ActorSystem = ActorSystem()
+
+ //#groupedWeighted
+ val collections = immutable.Iterable(Seq(1, 2), Seq(3, 4), Seq(5, 6))
+ Source[Seq[Int]](collections).groupedWeighted(4)(_.length).runForeach(println)
+ // Vector(Seq(1, 2), Seq(3, 4))
+ // Vector(Seq(5, 6))
+
+ Source[Seq[Int]](collections).groupedWeighted(3)(_.length).runForeach(println)
+ // Vector(Seq(1, 2), Seq(3, 4))
+ // Vector(Seq(5, 6))
+ //#groupedWeighted
+ }
+
+}
diff --git a/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/InterpreterSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/InterpreterSpec.scala
index 5773bb5a6b..e2d8b84e43 100644
--- a/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/InterpreterSpec.scala
+++ b/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/InterpreterSpec.scala
@@ -223,7 +223,7 @@ class InterpreterSpec extends StreamSpec with GraphInterpreterSpecKit {
lastEvents() should be(Set(OnComplete, OnNext(0)))
}
- "implement grouped" in new OneBoundedSetup[Int](Grouped(3)) {
+ "implement grouped" in new OneBoundedSetup[Int](GroupedWeighted(3, ConstantFun.oneLong)) {
lastEvents() should be(Set.empty)
downstream.requestOne()
diff --git a/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/InterpreterSupervisionSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/InterpreterSupervisionSpec.scala
index bb381872e0..b03e5f4249 100644
--- a/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/InterpreterSupervisionSpec.scala
+++ b/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/InterpreterSupervisionSpec.scala
@@ -5,9 +5,9 @@
package akka.stream.impl.fusing
import scala.util.control.NoStackTrace
-
import akka.stream.Supervision
import akka.stream.testkit.StreamSpec
+import akka.util.ConstantFun
class InterpreterSupervisionSpec extends StreamSpec with GraphInterpreterSpecKit {
@@ -102,7 +102,7 @@ class InterpreterSupervisionSpec extends StreamSpec with GraphInterpreterSpecKit
Supervision.resumingDecider,
Map((x: Int) => x + 1),
Map((x: Int) => if (x <= 0) throw TE else x + 10),
- Grouped(3)) {
+ GroupedWeighted(3, ConstantFun.oneLong)) {
downstream.requestOne()
lastEvents() should be(Set(RequestOne))
@@ -123,7 +123,7 @@ class InterpreterSupervisionSpec extends StreamSpec with GraphInterpreterSpecKit
Supervision.resumingDecider,
Map((x: Int) => x + 1),
Map((x: Int) => if (x <= 0) throw TE else x + 10),
- Grouped(1000)) {
+ GroupedWeighted(1000, ConstantFun.oneLong)) {
downstream.requestOne()
lastEvents() should be(Set(RequestOne))
diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowGroupedWeightedSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowGroupedWeightedSpec.scala
new file mode 100644
index 0000000000..30019e36c9
--- /dev/null
+++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowGroupedWeightedSpec.scala
@@ -0,0 +1,105 @@
+/*
+ * Copyright (C) 2020-2021 Lightbend Inc.
+ */
+
+package akka.stream.scaladsl
+
+import akka.stream.testkit.{ ScriptedTest, StreamSpec, TestPublisher, TestSubscriber }
+import akka.testkit.TimingTest
+import akka.util.unused
+
+import scala.collection.immutable
+import scala.concurrent.Await
+import scala.concurrent.duration._
+
+class FlowGroupedWeightedSpec extends StreamSpec("""
+ akka.stream.materializer.initial-input-buffer-size = 2
+ """) with ScriptedTest {
+
+ "A GroupedWeighted" must {
+ "produce no group (empty sink sequence) when source is empty" in {
+ val input = immutable.Seq.empty
+ def costFn(@unused e: Int): Long = 999999L // set to an arbitrarily big value
+ val future = Source(input).groupedWeighted(1)(costFn).runWith(Sink.seq)
+ val result = Await.result(future, remainingOrDefault)
+ result should be(Seq.empty)
+ }
+
+ "always exhaust a source into a single group if cost is 0" in {
+ val input = (1 to 15)
+ def costFn(@unused e: Int): Long = 0L
+ val minWeight = 1 // chose the least possible value for minWeight
+ val future = Source(input).groupedWeighted(minWeight)(costFn).runWith(Sink.seq)
+ val result = Await.result(future, remainingOrDefault)
+ result should be(Seq(input))
+ }
+
+ "exhaust source into one group if minWeight equals the accumulated cost of the source" in {
+ val input = (1 to 16)
+ def costFn(@unused e: Int): Long = 1L
+ val minWeight = input.length
+ val future = Source(input).groupedWeighted(minWeight)(costFn).runWith(Sink.seq)
+ val result = Await.result(future, remainingOrDefault)
+ result should be(Seq(input))
+ }
+
+ "exhaust a source into one group if minWeight is greater than the accumulated cost of source" in {
+ val input = List("this", "is", "some", "string")
+ def costFn(e: String): Long = e.length
+ val minWeight = Long.MaxValue
+ val future = Source(input).groupedWeighted(minWeight)(costFn).runWith(Sink.seq)
+ val result = Await.result(future, remainingOrDefault)
+ result should be(Seq(input))
+ }
+
+ "emit a group each time the grouped weight is greater than minWeight" in {
+ val input = List(1, 2, 1, 2)
+ def costFn(e: Int): Long = e
+ val minWeight = 2 // must produce two groups of List(1, 2)
+ val future = Source(input).groupedWeighted(minWeight)(costFn).runWith(Sink.seq)
+ val result: Seq[Seq[Int]] = Await.result(future, remainingOrDefault)
+ result should be(Seq(Seq(1, 2), Seq(1, 2)))
+ }
+
+ "not emit group when grouped weight is less than minWeight and upstream has not completed" taggedAs TimingTest in {
+ val p = TestPublisher.probe[Int]()
+ val c = TestSubscriber.probe[immutable.Seq[Int]]()
+ // Note that the cost function set to zero here means the stream will accumulate elements until completed
+ Source.fromPublisher(p).groupedWeighted(10)(_ => 0L).to(Sink.fromSubscriber(c)).run()
+ p.sendNext(1)
+ c.expectSubscription().request(1) // create downstream demand so Grouped pulls on upstream
+ c.expectNoMessage(50.millis) // message should not be emitted yet
+ p.sendComplete() // Force Grouped to emit the small group
+ c.expectNext(50.millis, immutable.Seq(1))
+ c.expectComplete()
+ }
+
+ "fail during stream initialization when minWeight is negative" in {
+ val ex = the[IllegalArgumentException] thrownBy Source(1 to 5)
+ .groupedWeighted(-1)(_ => 1L)
+ .to(Sink.collection)
+ .run()
+ ex.getMessage should be("requirement failed: minWeight must be greater than 0")
+ }
+
+ "fail during stream initialization when minWeight is 0" in {
+ val ex = the[IllegalArgumentException] thrownBy Source(1 to 5)
+ .groupedWeighted(0)(_ => 1L)
+ .to(Sink.collection)
+ .run()
+ ex.getMessage should be("requirement failed: minWeight must be greater than 0")
+ }
+
+ "fail the stage when costFn has a negative result" in {
+ val p = TestPublisher.probe[Int]()
+ val c = TestSubscriber.probe[immutable.Seq[Int]]()
+ Source.fromPublisher(p).groupedWeighted(1)(_ => -1L).to(Sink.fromSubscriber(c)).run()
+ c.expectSubscription().request(1) // create downstream demand so Grouped pulls on upstream
+ c.expectNoMessage(50.millis) // shouldn't fail until the message is sent
+ p.sendNext(1) // Send a message that will result in negative cost
+ val error = c.expectError()
+ error shouldBe an[IllegalArgumentException]
+ error.getMessage should be("Negative weight [-1] for element [1] is not allowed")
+ }
+ }
+}
diff --git a/akka-stream/src/main/mima-filters/2.6.11.backwards.excludes/pr-29086-rename-grouped.excludes b/akka-stream/src/main/mima-filters/2.6.11.backwards.excludes/pr-29086-rename-grouped.excludes
new file mode 100644
index 0000000000..e06667f835
--- /dev/null
+++ b/akka-stream/src/main/mima-filters/2.6.11.backwards.excludes/pr-29086-rename-grouped.excludes
@@ -0,0 +1,3 @@
+# Disable compatibility check for @InternalApi private[akka] class
+ProblemFilters.exclude[MissingClassProblem]("akka.stream.impl.fusing.Grouped$")
+ProblemFilters.exclude[MissingClassProblem]("akka.stream.impl.fusing.Grouped")
\ No newline at end of file
diff --git a/akka-stream/src/main/scala/akka/stream/impl/Stages.scala b/akka-stream/src/main/scala/akka/stream/impl/Stages.scala
index b04c90b53e..8515388d3a 100755
--- a/akka-stream/src/main/scala/akka/stream/impl/Stages.scala
+++ b/akka-stream/src/main/scala/akka/stream/impl/Stages.scala
@@ -33,6 +33,7 @@ import akka.stream.Attributes._
val ask = name("ask")
val grouped = name("grouped")
val groupedWithin = name("groupedWithin")
+ val groupedWeighted = name("groupedWeighted")
val groupedWeightedWithin = name("groupedWeightedWithin")
val limit = name("limit")
val limitWeighted = name("limitWeighted")
diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala
index 2c40163ba7..acc4837ac7 100644
--- a/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala
+++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala
@@ -762,34 +762,37 @@ private[stream] object Collect {
/**
* INTERNAL API
*/
-@InternalApi private[akka] final case class Grouped[T](n: Int) extends GraphStage[FlowShape[T, immutable.Seq[T]]] {
- require(n > 0, "n must be greater than 0")
+@InternalApi private[akka] final case class GroupedWeighted[T](minWeight: Long, costFn: T => Long)
+ extends GraphStage[FlowShape[T, immutable.Seq[T]]] {
+ require(minWeight > 0, "minWeight must be greater than 0")
- val in = Inlet[T]("Grouped.in")
- val out = Outlet[immutable.Seq[T]]("Grouped.out")
+ val in = Inlet[T]("GroupedWeighted.in")
+ val out = Outlet[immutable.Seq[T]]("GroupedWeighted.out")
override val shape: FlowShape[T, immutable.Seq[T]] = FlowShape(in, out)
- override protected val initialAttributes: Attributes = DefaultAttributes.grouped
+ override def initialAttributes: Attributes = DefaultAttributes.groupedWeighted
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new GraphStageLogic(shape) with InHandler with OutHandler {
- private val buf = {
- val b = Vector.newBuilder[T]
- b.sizeHint(n)
- b
- }
- var left = n
+ private val buf = Vector.newBuilder[T]
+ var left: Long = minWeight
override def onPush(): Unit = {
- buf += grab(in)
- left -= 1
- if (left == 0) {
- val elements = buf.result()
- buf.clear()
- left = n
- push(out, elements)
- } else {
- pull(in)
+ val elem = grab(in)
+ val cost = costFn(elem)
+ if (cost < 0L)
+ failStage(new IllegalArgumentException(s"Negative weight [$cost] for element [$elem] is not allowed"))
+ else {
+ buf += elem
+ left -= cost
+ if (left <= 0) {
+ val elements = buf.result()
+ buf.clear()
+ left = minWeight
+ push(out, elements)
+ } else {
+ pull(in)
+ }
}
}
@@ -798,12 +801,11 @@ private[stream] object Collect {
}
override def onUpstreamFinish(): Unit = {
- // This means the buf is filled with some elements but not enough (left < n) to group together.
- // Since the upstream has finished we have to push them to downstream though.
- if (left < n) {
- val elements = buf.result()
+ // Since the upstream has finished we have to push any buffered elements downstream.
+ val elements = buf.result()
+ if (elements.nonEmpty) {
buf.clear()
- left = n
+ left = minWeight
push(out, elements)
}
completeStage()
diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala
index 3cd3537a74..f99711e973 100755
--- a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala
+++ b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala
@@ -934,6 +934,26 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr
def grouped(n: Int): javadsl.Flow[In, java.util.List[Out], Mat] =
new Flow(delegate.grouped(n).map(_.asJava)) // TODO optimize to one step
+ /**
+ * Chunk up this stream into groups of elements that have a cumulative weight greater than or equal to
+ * the `minWeight`, with the last group possibly smaller than requested `minWeight` due to end-of-stream.
+ *
+ * `minWeight` must be positive, otherwise IllegalArgumentException is thrown.
+ * `costFn` must return a non-negative result for all inputs, otherwise the stage will fail
+ * with an IllegalArgumentException.
+ *
+ * '''Emits when''' the cumulative weight of elements is greater than or equal to the `minWeight` or upstream completed
+ *
+ * '''Backpressures when''' a buffered group weighs more than `minWeight` and downstream backpressures
+ *
+ * '''Completes when''' upstream completes
+ *
+ * '''Cancels when''' downstream cancels
+ */
+ def groupedWeighted(minWeight: Long)(
+ costFn: java.util.function.Function[Out, java.lang.Long]): javadsl.Flow[In, java.util.List[Out], Mat] =
+ new Flow(delegate.groupedWeighted(minWeight)(costFn.apply).map(_.asJava)) // TODO optimize to one step
+
/**
* Ensure stream boundedness by limiting the number of elements from upstream.
* If the number of incoming elements exceeds max, it will signal
diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala
index 6603cc7b1a..4bbbb0a678 100755
--- a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala
+++ b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala
@@ -2384,6 +2384,26 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[
def grouped(n: Int): javadsl.Source[java.util.List[Out @uncheckedVariance], Mat] =
new Source(delegate.grouped(n).map(_.asJava))
+ /**
+ * Chunk up this stream into groups of elements that have a cumulative weight greater than or equal to
+ * the `minWeight`, with the last group possibly smaller than requested `minWeight` due to end-of-stream.
+ *
+ * `minWeight` must be positive, otherwise IllegalArgumentException is thrown.
+ * `costFn` must return a non-negative result for all inputs, otherwise the stage will fail
+ * with an IllegalArgumentException.
+ *
+ * '''Emits when''' the cumulative weight of elements is greater than or equal to the `minWeight` or upstream completed
+ *
+ * '''Backpressures when''' a buffered group weighs more than `minWeight` and downstream backpressures
+ *
+ * '''Completes when''' upstream completes
+ *
+ * '''Cancels when''' downstream cancels
+ */
+ def groupedWeighted(minWeight: Long)(costFn: java.util.function.Function[Out, java.lang.Long])
+ : javadsl.Source[java.util.List[Out @uncheckedVariance], Mat] =
+ new Source(delegate.groupedWeighted(minWeight)(costFn.apply).map(_.asJava))
+
/**
* Ensure stream boundedness by limiting the number of elements from upstream.
* If the number of incoming elements exceeds max, it will signal
diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/SubFlow.scala b/akka-stream/src/main/scala/akka/stream/javadsl/SubFlow.scala
index 69ad00960d..e7e8b1634e 100755
--- a/akka-stream/src/main/scala/akka/stream/javadsl/SubFlow.scala
+++ b/akka-stream/src/main/scala/akka/stream/javadsl/SubFlow.scala
@@ -380,6 +380,26 @@ class SubFlow[In, Out, Mat](
def grouped(n: Int): SubFlow[In, java.util.List[Out @uncheckedVariance], Mat] =
new SubFlow(delegate.grouped(n).map(_.asJava)) // TODO optimize to one step
+ /**
+ * Chunk up this stream into groups of elements that have a cumulative weight greater than or equal to
+ * the `minWeight`, with the last group possibly smaller than requested `minWeight` due to end-of-stream.
+ *
+ * `minWeight` must be positive, otherwise IllegalArgumentException is thrown.
+ * `costFn` must return a non-negative result for all inputs, otherwise the stage will fail
+ * with an IllegalArgumentException.
+ *
+ * '''Emits when''' the cumulative weight of elements is greater than or equal to the `minWeight` or upstream completed
+ *
+ * '''Backpressures when''' a buffered group weighs more than `minWeight` and downstream backpressures
+ *
+ * '''Completes when''' upstream completes
+ *
+ * '''Cancels when''' downstream cancels
+ */
+ def groupedWeighted(minWeight: Long)(
+ costFn: function.Function[Out, java.lang.Long]): SubFlow[In, java.util.List[Out @uncheckedVariance], Mat] =
+ new SubFlow(delegate.groupedWeighted(minWeight)(costFn.apply).map(_.asJava)) // TODO optimize to one step
+
/**
* Ensure stream boundedness by limiting the number of elements from upstream.
* If the number of incoming elements exceeds max, it will signal
diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/SubSource.scala b/akka-stream/src/main/scala/akka/stream/javadsl/SubSource.scala
index c18b0c2a83..a2981c7763 100755
--- a/akka-stream/src/main/scala/akka/stream/javadsl/SubSource.scala
+++ b/akka-stream/src/main/scala/akka/stream/javadsl/SubSource.scala
@@ -371,6 +371,26 @@ class SubSource[Out, Mat](
def grouped(n: Int): SubSource[java.util.List[Out @uncheckedVariance], Mat] =
new SubSource(delegate.grouped(n).map(_.asJava)) // TODO optimize to one step
+ /**
+ * Chunk up this stream into groups of elements that have a cumulative weight greater than or equal to
+ * the `minWeight`, with the last group possibly smaller than requested `minWeight` due to end-of-stream.
+ *
+ * `minWeight` must be positive, otherwise IllegalArgumentException is thrown.
+ * `costFn` must return a non-negative result for all inputs, otherwise the stage will fail
+ * with an IllegalArgumentException.
+ *
+ * '''Emits when''' the cumulative weight of elements is greater than or equal to the `minWeight` or upstream completed
+ *
+ * '''Backpressures when''' a buffered group weighs more than `minWeight` and downstream backpressures
+ *
+ * '''Completes when''' upstream completes
+ *
+ * '''Cancels when''' downstream cancels
+ */
+ def groupedWeighted(minWeight: Long)(
+ costFn: function.Function[Out, java.lang.Long]): SubSource[java.util.List[Out @uncheckedVariance], Mat] =
+ new SubSource(delegate.groupedWeighted(minWeight)(costFn.apply).map(_.asJava)) // TODO optimize to one step
+
/**
* Apply a sliding window over the stream and return the windows as groups of elements, with the last group
* possibly smaller than requested due to end-of-stream.
diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala
index 9034377b2d..764bd1ee7d 100755
--- a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala
+++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala
@@ -1316,7 +1316,27 @@ trait FlowOps[+Out, +Mat] {
*
* '''Cancels when''' downstream cancels
*/
- def grouped(n: Int): Repr[immutable.Seq[Out]] = via(Grouped(n))
+ def grouped(n: Int): Repr[immutable.Seq[Out]] =
+ via(GroupedWeighted[Out](n, ConstantFun.oneLong).withAttributes(DefaultAttributes.grouped))
+
+ /**
+ * Chunk up this stream into groups of elements that have a cumulative weight greater than or equal to
+ * the `minWeight`, with the last group possibly smaller than requested `minWeight` due to end-of-stream.
+ *
+ * `minWeight` must be positive, otherwise IllegalArgumentException is thrown.
+ * `costFn` must return a non-negative result for all inputs, otherwise the stage will fail
+ * with an IllegalArgumentException.
+ *
+ * '''Emits when''' the cumulative weight of elements is greater than or equal to the `minWeight` or upstream completed
+ *
+ * '''Backpressures when''' a buffered group weighs more than `minWeight` and downstream backpressures
+ *
+ * '''Completes when''' upstream completes
+ *
+ * '''Cancels when''' downstream cancels
+ */
+ def groupedWeighted(minWeight: Long)(costFn: Out => Long): Repr[immutable.Seq[Out]] =
+ via(GroupedWeighted[Out](minWeight, costFn))
/**
* Ensure stream boundedness by limiting the number of elements from upstream.