diff --git a/docs/src/main/paradox/stream/operators/Source-or-Flow/groupedAdjacentBy.md b/docs/src/main/paradox/stream/operators/Source-or-Flow/groupedAdjacentBy.md
new file mode 100644
index 0000000000..0d970777c2
--- /dev/null
+++ b/docs/src/main/paradox/stream/operators/Source-or-Flow/groupedAdjacentBy.md
@@ -0,0 +1,43 @@
+# groupedAdjacentBy
+
+Partitions this stream into chunks by a delimiter function.
+
+@ref[Simple operators](../index.md#simple-operators)
+
+## Signature
+
+@apidoc[Source.groupedAdjacentBy](Source) { scala="#groupedAdjacentBy(f:Out=>T):FlowOps.this.Repr[scala.collection.immutable.Seq[Out]]" java="#groupedAdjacentBy(org.apache.pekko.japi.function.Function)" }
+@apidoc[Flow.groupedAdjacentBy](Flow) { scala="#groupedAdjacentBy(f:Out=>T):FlowOps.this.Repr[scala.collection.immutable.Seq[Out]]" java="#groupedAdjacentBy(org.apache.pekko.japi.function.Function)" }
+
+
+## Description
+
+Partitions this stream into chunks by a delimiter function.
+
+See also:
+
+* @ref[groupedAdjacentByWeighted](groupedAdjacentByWeighted.md) for a variant that groups with weight limit too.
+
+## Examples
+
+The example below demonstrates how `groupedAdjacentBy` partitions the elements into @scala[`Seq`] @java[`List`].
+
+Scala
+: @@snip [GroupedAdjacentBy.scala](/docs/src/test/scala/docs/stream/operators/sourceorflow/GroupedAdjacentBy.scala) { #groupedAdjacentBy }
+
+Java
+: @@snip [SourceOrFlow.java](/docs/src/test/java/jdocs/stream/operators/SourceOrFlow.java) { #groupedAdjacentBy }
+
+## Reactive Streams semantics
+
+@@@div { .callout }
+
+**emits** when the delimiter function returns a different value than the previous element's result
+
+**backpressures** when a chunk has been assembled and downstream backpressures
+
+**completes** when upstream completes
+
+@@@
+
+
diff --git a/docs/src/main/paradox/stream/operators/Source-or-Flow/groupedAdjacentByWeighted.md b/docs/src/main/paradox/stream/operators/Source-or-Flow/groupedAdjacentByWeighted.md
new file mode 100644
index 0000000000..4aaa68020c
--- /dev/null
+++ b/docs/src/main/paradox/stream/operators/Source-or-Flow/groupedAdjacentByWeighted.md
@@ -0,0 +1,42 @@
+# groupedAdjacentByWeighted
+
+Partitions this stream into chunks by a delimiter function and a weight limit.
+
+@ref[Simple operators](../index.md#simple-operators)
+
+## Signature
+
+@apidoc[Source.groupedAdjacentByWeighted](Source) { scala="#groupedAdjacentByWeighted(f:Out=>T,maxWeight:Long)(costFn:Out=>Long):FlowOps.this.Repr[scala.collection.immutable.Seq[Out]]" java="#groupedAdjacentBy(org.apache.pekko.japi.function.Function,long,org.apache.pekko.japi.function.Function)" }
+@apidoc[Flow.groupedAdjacentByWeighted](Flow) { scala="#groupedAdjacentByWeighted(f:Out=>T,maxWeight:Long)(costFn:Out=>Long):FlowOps.this.Repr[scala.collection.immutable.Seq[Out]]" java="#groupedAdjacentBy(org.apache.pekko.japi.function.Function,long,org.apache.pekko.japi.function.Function)" }
+
+## Description
+
+Partitions this stream into chunks by a delimiter function.
+
+See also:
+
+* @ref[groupedAdjacentBy](groupedAdjacentBy.md) for a simpler variant.
+
+## Examples
+
+The example below demonstrates how `groupedAdjacentByWeighted` partitions the elements into @scala[`Seq`] @java[`List`].
+
+Scala
+: @@snip [GroupedAdjacentBy.scala](/docs/src/test/scala/docs/stream/operators/sourceorflow/GroupedAdjacentBy.scala) { #groupedAdjacentByWeighted }
+
+Java
+: @@snip [SourceOrFlow.java](/docs/src/test/java/jdocs/stream/operators/SourceOrFlow.java) { #groupedAdjacentByWeighted }
+
+## Reactive Streams semantics
+
+@@@div { .callout }
+
+**emits** when the delimiter function returns a different value than the previous element's result, or exceeds the `maxWeight`.
+
+**backpressures** when a chunk has been assembled and downstream backpressures
+
+**completes** when upstream completes
+
+@@@
+
+
diff --git a/docs/src/main/paradox/stream/operators/index.md b/docs/src/main/paradox/stream/operators/index.md
index 37e2d6fdcc..8d0dfabcba 100644
--- a/docs/src/main/paradox/stream/operators/index.md
+++ b/docs/src/main/paradox/stream/operators/index.md
@@ -169,6 +169,8 @@ depending on being backpressured by downstream or not.
|Source/Flow|@ref[fromMaterializer](Source-or-Flow/fromMaterializer.md)|Defer the creation of a `Source/Flow` until materialization and access `Materializer` and `Attributes`|
|Flow|@ref[futureFlow](Flow/futureFlow.md)|Streams the elements through the given future flow once it successfully completes.|
|Source/Flow|@ref[grouped](Source-or-Flow/grouped.md)|Accumulate incoming events until the specified number of elements have been accumulated and then pass the collection of elements downstream.|
+|Source/Flow|@ref[groupedAdjacentBy](Source-or-Flow/groupedAdjacentBy.md)|Partitions this stream into chunks by a delimiter function.|
+|Source/Flow|@ref[groupedAdjacentByWeighted](Source-or-Flow/groupedAdjacentByWeighted.md)|Partitions this stream into chunks by a delimiter function and a weight limit.|
|Source/Flow|@ref[groupedWeighted](Source-or-Flow/groupedWeighted.md)|Accumulate incoming events until the combined weight of elements is greater than or equal to the minimum weight and then pass the collection of elements downstream.|
|Source/Flow|@ref[intersperse](Source-or-Flow/intersperse.md)|Intersperse stream with provided element similar to `List.mkString`.|
|Flow|@ref[lazyCompletionStageFlow](Flow/lazyCompletionStageFlow.md)|Defers creation and materialization of a `Flow` until there is a first element.|
@@ -499,6 +501,8 @@ For more background see the @ref[Error Handling in Streams](../stream-error.md)
* [futureSource](Source/futureSource.md)
* [groupBy](Source-or-Flow/groupBy.md)
* [grouped](Source-or-Flow/grouped.md)
+* [groupedAdjacentBy](Source-or-Flow/groupedAdjacentBy.md)
+* [groupedAdjacentByWeighted](Source-or-Flow/groupedAdjacentByWeighted.md)
* [groupedWeighted](Source-or-Flow/groupedWeighted.md)
* [groupedWeightedWithin](Source-or-Flow/groupedWeightedWithin.md)
* [groupedWithin](Source-or-Flow/groupedWithin.md)
diff --git a/docs/src/test/java/jdocs/stream/operators/SourceOrFlow.java b/docs/src/test/java/jdocs/stream/operators/SourceOrFlow.java
index 4a9d058d42..65c985d849 100644
--- a/docs/src/test/java/jdocs/stream/operators/SourceOrFlow.java
+++ b/docs/src/test/java/jdocs/stream/operators/SourceOrFlow.java
@@ -467,6 +467,32 @@ class SourceOrFlow {
// #groupedWeighted
}
+ void groupedAdjacentByExample() {
+ // #groupedAdjacentBy
+ Source.from(Arrays.asList("Hello", "Hi", "Greetings", "Hey"))
+ .groupedAdjacentBy(str -> str.charAt(0))
+ .runForeach(System.out::println, system);
+ // prints:
+ // [Hello, Hi]
+ // [Greetings]
+ // [Hey]
+ // #groupedAdjacentBy
+ }
+
+ void groupedAdjacentByWeightedExample() {
+ // #groupedAdjacentByWeighted
+ Source.from(Arrays.asList("Hello", "HiHi", "Hi", "Hi", "Greetings", "Hey"))
+ .groupedAdjacentByWeighted(str -> str.charAt(0), 4, str -> (long) str.length())
+ .runForeach(System.out::println, system);
+ // prints:
+ // [Hello]
+ // [HiHi]
+ // [Hi, Hi]
+ // [Greetings]
+ // [Hey]
+ // #groupedAdjacentByWeighted
+ }
+
static
// #fold // #foldAsync
class Histogram {
diff --git a/docs/src/test/scala/docs/stream/operators/sourceorflow/GroupedAdjacentBy.scala b/docs/src/test/scala/docs/stream/operators/sourceorflow/GroupedAdjacentBy.scala
new file mode 100644
index 0000000000..a8f1f51f51
--- /dev/null
+++ b/docs/src/test/scala/docs/stream/operators/sourceorflow/GroupedAdjacentBy.scala
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * license agreements; and to You under the Apache License, version 2.0:
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This file is part of the Apache Pekko project, which was derived from Akka.
+ */
+
+/*
+ * Copyright (C) 2019-2022 Lightbend Inc.
+ */
+
+package docs.stream.operators.sourceorflow
+
+import org.apache.pekko.stream.scaladsl.Source
+
+import scala.collection.immutable
+
+object GroupedAdjacentBy {
+ def groupedAdjacentByExample(): Unit = {
+ import org.apache.pekko.actor.ActorSystem
+ implicit val system: ActorSystem = ActorSystem()
+
+ // #groupedAdjacentBy
+ Source(List("Hello", "Hi", "Greetings", "Hey"))
+ .groupedAdjacentBy(_.head)
+ .runForeach(println)
+ // prints:
+ // Vector(Hello, Hi)
+ // Vector(Greetings)
+ // Vector(Hey)
+ // #groupedAdjacentBy
+ }
+
+ def groupedAdjacentByWeightedExample(): Unit = {
+ import org.apache.pekko.actor.ActorSystem
+ implicit val system: ActorSystem = ActorSystem()
+
+ // #groupedAdjacentByWeighted
+ Source(List("Hello", "HiHi", "Hi", "Hi", "Greetings", "Hey"))
+ .groupedAdjacentByWeighted(_.head, 4)(_.length)
+ .runForeach(println)
+ // prints:
+ // Vector(Hello)
+ // Vector(HiHi)
+ // Vector(Hi, Hi)
+ // Vector(Greetings)
+ // Vector(Hey)
+ // #groupedAdjacentByWeighted
+ }
+
+}
diff --git a/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/FlowTest.java b/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/FlowTest.java
index 1b6b400f85..0a45acc2bd 100644
--- a/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/FlowTest.java
+++ b/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/FlowTest.java
@@ -13,6 +13,7 @@
package org.apache.pekko.stream.javadsl;
+import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import org.apache.pekko.Done;
import org.apache.pekko.NotUsed;
@@ -122,6 +123,32 @@ public class FlowTest extends StreamTest {
probe.expectMsgEquals("de");
}
+ @Test
+ public void mustBeAbleToUseGroupedAdjacentBy() {
+ Source.from(Arrays.asList("Hello", "Hi", "Greetings", "Hey"))
+ .groupedAdjacentBy(str -> str.charAt(0))
+ .runWith(TestSink.probe(system), system)
+ .request(4)
+ .expectNext(Lists.newArrayList("Hello", "Hi"))
+ .expectNext(Lists.newArrayList("Greetings"))
+ .expectNext(Lists.newArrayList("Hey"))
+ .expectComplete();
+ }
+
+ @Test
+ public void mustBeAbleToUseGroupedAdjacentByWeighted() {
+ Source.from(Arrays.asList("Hello", "HiHi", "Hi", "Hi", "Greetings", "Hey"))
+ .groupedAdjacentByWeighted(str -> str.charAt(0), 4, str -> (long) str.length())
+ .runWith(TestSink.probe(system), system)
+ .request(6)
+ .expectNext(Lists.newArrayList("Hello"))
+ .expectNext(Lists.newArrayList("HiHi"))
+ .expectNext(Lists.newArrayList("Hi", "Hi"))
+ .expectNext(Lists.newArrayList("Greetings"))
+ .expectNext(Lists.newArrayList("Hey"))
+ .expectComplete();
+ }
+
@Test
public void mustBeAbleToUseContraMap() {
final Source source = Source.from(Arrays.asList("1", "2", "3"));
diff --git a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowGroupedAdjacentByWeightedSpec.scala b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowGroupedAdjacentByWeightedSpec.scala
new file mode 100644
index 0000000000..b6b53fdf18
--- /dev/null
+++ b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowGroupedAdjacentByWeightedSpec.scala
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pekko.stream.scaladsl
+
+import org.apache.pekko
+import pekko.stream.testkit.scaladsl.TestSink
+import pekko.stream.testkit.{ ScriptedTest, StreamSpec }
+
+class FlowGroupedAdjacentByWeightedSpec extends StreamSpec("""
+ pekko.stream.materializer.initial-input-buffer-size = 2
+ """) with ScriptedTest {
+
+ "A GroupedAdjacentByWeighted" must {
+ "produce no group when source is empty" in {
+ Source.empty[String]
+ .groupedAdjacentBy(identity(_))
+ .runWith(TestSink.probe[Seq[String]])
+ .request(1)
+ .expectComplete()
+ }
+
+ "group adjacent elements by predicate" in {
+ val input = List("a", "a", "b", "b", "c", "c")
+ Source(input)
+ .groupedAdjacentBy(identity(_))
+ .runWith(TestSink.probe[Seq[String]])
+ .request(6)
+ .expectNext(Seq("a", "a"))
+ .expectNext(Seq("b", "b"))
+ .expectNext(Seq("c", "c"))
+ .expectComplete()
+ }
+
+ "group adjust elements by leading char" in {
+ val input = List("Hello", "Hi", "Greetings", "Hey")
+ Source(input)
+ .groupedAdjacentBy(_.head)
+ .runWith(TestSink.probe[Seq[String]])
+ .request(4)
+ .expectNext(Seq("Hello", "Hi"))
+ .expectNext(Seq("Greetings"))
+ .expectNext(Seq("Hey"))
+ .expectComplete()
+ }
+
+ "be able to act like bufferUntilChanged" in {
+ Source(List(1, 1, 2, 2, 3, 3, 1))
+ .groupedAdjacentBy(identity(_))
+ .runWith(TestSink.probe[Seq[Int]])
+ .request(7)
+ .expectNext(Seq(1, 1))
+ .expectNext(Seq(2, 2))
+ .expectNext(Seq(3, 3))
+ .expectNext(Seq(1))
+ .expectComplete()
+ }
+
+ "Be able to limit the chunk size" in {
+ Source(List("Hello", "Hi", "Hey", "Greetings", "Hey"))
+ .groupedAdjacentByWeighted(_.head, 2)(_ => 1L)
+ .runWith(TestSink.probe[Seq[String]])
+ .request(5)
+ .expectNext(Seq("Hello", "Hi"))
+ .expectNext(Seq("Hey"))
+ .expectNext(Seq("Greetings"))
+ .expectNext(Seq("Hey"))
+ .expectComplete()
+ }
+
+ "Be able to handle single heavy weighted element" in {
+ Source(List("Hello", "HiHi", "Hi", "Hi", "Greetings", "Hey"))
+ .groupedAdjacentByWeighted(_.head, 4)(_.length)
+ .runWith(TestSink.probe[Seq[String]])
+ .request(6)
+ .expectNext(Seq("Hello"))
+ .expectNext(Seq("HiHi"))
+ .expectNext(Seq("Hi", "Hi"))
+ .expectNext(Seq("Greetings"))
+ .expectNext(Seq("Hey"))
+ .expectComplete()
+ }
+
+ }
+
+}
diff --git a/stream/src/main/scala/org/apache/pekko/stream/impl/Stages.scala b/stream/src/main/scala/org/apache/pekko/stream/impl/Stages.scala
index 00278f7a41..cd01348ce3 100755
--- a/stream/src/main/scala/org/apache/pekko/stream/impl/Stages.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/impl/Stages.scala
@@ -47,6 +47,7 @@ import pekko.stream.Attributes._
val mapWithResource = name("mapWithResource") and IODispatcher
val ask = name("ask")
val grouped = name("grouped")
+ val groupedAdjacentByWeighted = name("groupedAdjacentByWeighted")
val groupedWithin = name("groupedWithin")
val groupedWeighted = name("groupedWeighted")
val groupedWeightedWithin = name("groupedWeightedWithin")
diff --git a/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/GroupedAdjacentByWeighted.scala b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/GroupedAdjacentByWeighted.scala
new file mode 100644
index 0000000000..adb395b141
--- /dev/null
+++ b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/GroupedAdjacentByWeighted.scala
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pekko.stream.impl.fusing
+
+import org.apache.pekko
+import pekko.annotation.InternalApi
+import pekko.stream.impl.Stages.DefaultAttributes
+import pekko.stream.{ Attributes, FlowShape, Inlet, Outlet }
+import pekko.stream.stage.{ GraphStage, GraphStageLogic, InHandler, OutHandler }
+import pekko.util.OptionVal
+
+import scala.collection.immutable
+
+/**
+ * INTERNAL API
+ */
+@InternalApi
+private[pekko] final case class GroupedAdjacentByWeighted[T, R](
+ f: T => R,
+ maxWeight: Long,
+ costFn: T => Long)
+ extends GraphStage[FlowShape[T, immutable.Seq[T]]] {
+
+ require(f != null, "f must not be null")
+ require(maxWeight > 0, "maxWeight must be greater than 0")
+ require(costFn != null, "costFn must not be null")
+
+ private val in = Inlet[T]("GroupedAdjacentByWeighted.in")
+ private val out = Outlet[immutable.Seq[T]]("GroupedAdjacentByWeighted.out")
+
+ override val shape: FlowShape[T, immutable.Seq[T]] = FlowShape(in, out)
+ override def initialAttributes: Attributes = DefaultAttributes.groupedAdjacentByWeighted
+
+ override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
+ new GraphStageLogic(shape) with InHandler with OutHandler {
+ private var builder = Vector.newBuilder[T]
+ private var currentWeight: Long = 0L
+ // used to track if elements has been added to the current group, zero weight is allowed
+ private var hasElements: Boolean = false
+ private var currentKey: OptionVal[R] = OptionVal.none
+ private var pendingGroup: OptionVal[immutable.Seq[T]] = OptionVal.none
+
+ override def onPush(): Unit = {
+ val elem = grab(in)
+ val cost = costFn(elem)
+
+ if (cost < 0L) {
+ failStage(new IllegalArgumentException(s"Negative weight [$cost] for element [$elem] is not allowed"))
+ return
+ }
+
+ val elemKey = f(elem)
+ require(elemKey != null, "Element key must not be null")
+
+ if (shouldPushDirectly(cost)) {
+ push(out, Vector(elem))
+ } else if (shouldStartNewGroup(elemKey, cost)) {
+ emitCurrentGroup()
+ handleNewElement(elem, cost, elemKey)
+ } else {
+ addToCurrentGroup(elem, cost, elemKey)
+ tryPullIfNeeded()
+ }
+ }
+
+ private def shouldPushDirectly(cost: Long): Boolean = {
+ cost >= maxWeight && !hasElements
+ }
+
+ private def shouldStartNewGroup(elemKey: R, cost: Long): Boolean = currentKey match {
+ case OptionVal.Some(key) if (elemKey != key) || (currentWeight + cost > maxWeight) => true
+ case OptionVal.None if cost > maxWeight => true
+ case _ => false
+ }
+
+ private def emitCurrentGroup(): Unit = if (hasElements) {
+ val group = builder.result()
+ resetGroup()
+ pushOrQueue(group)
+ }
+
+ private def handleNewElement(elem: T, cost: Long, key: R): Unit = {
+ if (cost > maxWeight) {
+ pushOrQueue(Vector(elem))
+ } else {
+ addToCurrentGroup(elem, cost, key)
+ }
+ tryPullIfNeeded()
+ }
+
+ private def addToCurrentGroup(elem: T, cost: Long, key: R): Unit = {
+ builder += elem
+ hasElements = true
+ currentWeight += cost
+ currentKey = OptionVal.Some(key)
+ }
+
+ private def resetGroup(): Unit = {
+ builder.clear()
+ hasElements = false
+ currentWeight = 0L
+ currentKey = OptionVal.none
+ }
+
+ private def pushOrQueue(group: immutable.Seq[T]): Unit = pendingGroup match {
+ case OptionVal.Some(pending) =>
+ push(out, pending)
+ pendingGroup = OptionVal.Some(group)
+ case OptionVal.None =>
+ if (isAvailable(out)) {
+ push(out, group)
+ } else {
+ pendingGroup = OptionVal.Some(group)
+ }
+ }
+
+ private def tryPullIfNeeded(): Unit = pendingGroup match {
+ case OptionVal.None if !hasBeenPulled(in) && isAvailable(out) => pull(in)
+ case _ =>
+ }
+
+ override def onPull(): Unit = {
+ pendingGroup match {
+ case OptionVal.Some(group) =>
+ push(out, group)
+ pendingGroup = OptionVal.none
+ case _ => if (!hasBeenPulled(in)) pull(in)
+ }
+ }
+
+ override def onUpstreamFinish(): Unit = {
+ val finalGroup = builder.result()
+ builder = null
+
+ pendingGroup match {
+ case OptionVal.Some(group) =>
+ if (finalGroup.nonEmpty) {
+ emitMultiple(out, List(group, finalGroup).iterator, () => completeStage())
+ } else {
+ emit(out, group, () => completeStage())
+ }
+ case OptionVal.None =>
+ if (finalGroup.nonEmpty) {
+ emit(out, finalGroup, () => completeStage())
+ } else {
+ completeStage()
+ }
+ }
+ }
+
+ setHandlers(in, out, this)
+ }
+}
diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala
index fb0696ad55..53697ed7e3 100755
--- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala
@@ -1320,6 +1320,50 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr
costFn: java.util.function.Function[Out, java.lang.Long]): javadsl.Flow[In, java.util.List[Out], Mat] =
new Flow(delegate.groupedWeighted(minWeight)(costFn.apply).map(_.asJava)) // TODO optimize to one step
+ /**
+ * Partitions this stream into chunks by a delimiter function, which is applied to each incoming element,
+ * when the result of the function is not the same as the previous element's result, a chunk is emitted.
+ *
+ * The `f` function must return a non-null value for all elements, otherwise the stage will fail.
+ *
+ * '''Emits when''' the delimiter function returns a different value than the previous element's result
+ *
+ * '''Backpressures when''' a chunk has been assembled and downstream backpressures
+ *
+ * '''Completes when''' upstream completes
+ *
+ * '''Cancels when''' downstream cancels
+ *
+ * @since 1.2.0
+ */
+ def groupedAdjacentBy[R](
+ f: function.Function[Out, R]): javadsl.Flow[In, java.util.List[Out @uncheckedVariance], Mat] =
+ new Flow(delegate.groupedAdjacentBy(f.apply).map(_.asJava))
+
+ /**
+ * Partitions this stream into chunks by a delimiter function, which is applied to each incoming element,
+ * when the result of the function is not the same as the previous element's result, or the accumulated weight exceeds
+ * the `maxWeight`, a chunk is emitted.
+ *
+ * The `f` function must return a non-null value , and the `costFn` must return a non-negative result for all inputs,
+ * otherwise the stage will fail.
+ *
+ * '''Emits when''' the delimiter function returns a different value than the previous element's result, or exceeds the `maxWeight`.
+ *
+ * '''Backpressures when''' a chunk has been assembled and downstream backpressures
+ *
+ * '''Completes when''' upstream completes
+ *
+ * '''Cancels when''' downstream cancels
+ *
+ * @since 1.2.0
+ */
+ def groupedAdjacentByWeighted[R](f: function.Function[Out, R],
+ maxWeight: Long,
+ costFn: java.util.function.Function[Out, java.lang.Long])
+ : javadsl.Flow[In, java.util.List[Out @uncheckedVariance], Mat] =
+ new Flow(delegate.groupedAdjacentByWeighted(f.apply, maxWeight)(costFn.apply).map(_.asJava))
+
/**
* Ensure stream boundedness by limiting the number of elements from upstream.
* If the number of incoming elements exceeds max, it will signal
diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala
index fe3e6c694b..8ce6c92889 100755
--- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala
@@ -3103,6 +3103,49 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[
: javadsl.Source[java.util.List[Out @uncheckedVariance], Mat] =
new Source(delegate.groupedWeighted(minWeight)(costFn.apply).map(_.asJava))
+ /**
+ * Partitions this stream into chunks by a delimiter function, which is applied to each incoming element,
+ * when the result of the function is not the same as the previous element's result, a chunk is emitted.
+ *
+ * The `f` function must return a non-null value for all elements, otherwise the stage will fail.
+ *
+ * '''Emits when''' the delimiter function returns a different value than the previous element's result
+ *
+ * '''Backpressures when''' a chunk has been assembled and downstream backpressures
+ *
+ * '''Completes when''' upstream completes
+ *
+ * '''Cancels when''' downstream cancels
+ *
+ * @since 1.2.0
+ */
+ def groupedAdjacentBy[R](f: function.Function[Out, R]): javadsl.Source[java.util.List[Out @uncheckedVariance], Mat] =
+ new Source(delegate.groupedAdjacentBy(f.apply).map(_.asJava))
+
+ /**
+ * Partitions this stream into chunks by a delimiter function, which is applied to each incoming element,
+ * when the result of the function is not the same as the previous element's result, or the accumulated weight exceeds
+ * the `maxWeight`, a chunk is emitted.
+ *
+ * The `f` function must return a non-null value , and the `costFn` must return a non-negative result for all inputs,
+ * otherwise the stage will fail.
+ *
+ * '''Emits when''' the delimiter function returns a different value than the previous element's result, or exceeds the `maxWeight`.
+ *
+ * '''Backpressures when''' a chunk has been assembled and downstream backpressures
+ *
+ * '''Completes when''' upstream completes
+ *
+ * '''Cancels when''' downstream cancels
+ *
+ * @since 1.2.0
+ */
+ def groupedAdjacentByWeighted[R](f: function.Function[Out, R],
+ maxWeight: Long,
+ costFn: java.util.function.Function[Out, java.lang.Long])
+ : javadsl.Source[java.util.List[Out @uncheckedVariance], Mat] =
+ new Source(delegate.groupedAdjacentByWeighted(f.apply, maxWeight)(costFn.apply).map(_.asJava))
+
/**
* Ensure stream boundedness by limiting the number of elements from upstream.
* If the number of incoming elements exceeds max, it will signal
diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala
index 5358de307a..edb3b907b8 100755
--- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala
@@ -679,6 +679,49 @@ class SubFlow[In, Out, Mat](
costFn: function.Function[Out, java.lang.Long]): SubFlow[In, java.util.List[Out @uncheckedVariance], Mat] =
new SubFlow(delegate.groupedWeighted(minWeight)(costFn.apply).map(_.asJava)) // TODO optimize to one step
+ /**
+ * Partitions this stream into chunks by a delimiter function, which is applied to each incoming element,
+ * when the result of the function is not the same as the previous element's result, a chunk is emitted.
+ *
+ * The `f` function must return a non-null value for all elements, otherwise the stage will fail.
+ *
+ * '''Emits when''' the delimiter function returns a different value than the previous element's result
+ *
+ * '''Backpressures when''' a chunk has been assembled and downstream backpressures
+ *
+ * '''Completes when''' upstream completes
+ *
+ * '''Cancels when''' downstream cancels
+ *
+ * @since 1.2.0
+ */
+ def groupedAdjacentBy[R](f: function.Function[Out, R]): SubFlow[In, java.util.List[Out @uncheckedVariance], Mat] =
+ new SubFlow(delegate.groupedAdjacentBy(f.apply).map(_.asJava))
+
+ /**
+ * Partitions this stream into chunks by a delimiter function, which is applied to each incoming element,
+ * when the result of the function is not the same as the previous element's result, or the accumulated weight exceeds
+ * the `maxWeight`, a chunk is emitted.
+ *
+ * The `f` function must return a non-null value , and the `costFn` must return a non-negative result for all inputs,
+ * otherwise the stage will fail.
+ *
+ * '''Emits when''' the delimiter function returns a different value than the previous element's result, or exceeds the `maxWeight`.
+ *
+ * '''Backpressures when''' a chunk has been assembled and downstream backpressures
+ *
+ * '''Completes when''' upstream completes
+ *
+ * '''Cancels when''' downstream cancels
+ *
+ * @since 1.2.0
+ */
+ def groupedAdjacentByWeighted[R](f: function.Function[Out, R],
+ maxWeight: Long,
+ costFn: java.util.function.Function[Out, java.lang.Long])
+ : SubFlow[In, java.util.List[Out @uncheckedVariance], Mat] =
+ new SubFlow(delegate.groupedAdjacentByWeighted(f.apply, maxWeight)(costFn.apply).map(_.asJava))
+
/**
* Ensure stream boundedness by limiting the number of elements from upstream.
* If the number of incoming elements exceeds max, it will signal
diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala
index 3ab439ca9f..bc77cb5bee 100755
--- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala
@@ -669,6 +669,49 @@ class SubSource[Out, Mat](
costFn: function.Function[Out, java.lang.Long]): SubSource[java.util.List[Out @uncheckedVariance], Mat] =
new SubSource(delegate.groupedWeighted(minWeight)(costFn.apply).map(_.asJava)) // TODO optimize to one step
+ /**
+ * Partitions this stream into chunks by a delimiter function, which is applied to each incoming element,
+ * when the result of the function is not the same as the previous element's result, a chunk is emitted.
+ *
+ * The `f` function must return a non-null value for all elements, otherwise the stage will fail.
+ *
+ * '''Emits when''' the delimiter function returns a different value than the previous element's result
+ *
+ * '''Backpressures when''' a chunk has been assembled and downstream backpressures
+ *
+ * '''Completes when''' upstream completes
+ *
+ * '''Cancels when''' downstream cancels
+ *
+ * @since 1.2.0
+ */
+ def groupedAdjacentBy[R](f: function.Function[Out, R]): SubSource[java.util.List[Out @uncheckedVariance], Mat] =
+ new SubSource(delegate.groupedAdjacentBy(f.apply).map(_.asJava))
+
+ /**
+ * Partitions this stream into chunks by a delimiter function, which is applied to each incoming element,
+ * when the result of the function is not the same as the previous element's result, or the accumulated weight exceeds
+ * the `maxWeight`, a chunk is emitted.
+ *
+ * The `f` function must return a non-null value , and the `costFn` must return a non-negative result for all inputs,
+ * otherwise the stage will fail.
+ *
+ * '''Emits when''' the delimiter function returns a different value than the previous element's result, or exceeds the `maxWeight`.
+ *
+ * '''Backpressures when''' a chunk has been assembled and downstream backpressures
+ *
+ * '''Completes when''' upstream completes
+ *
+ * '''Cancels when''' downstream cancels
+ *
+ * @since 1.2.0
+ */
+ def groupedAdjacentByWeighted[R](f: function.Function[Out, R],
+ maxWeight: Long,
+ costFn: java.util.function.Function[Out, java.lang.Long])
+ : SubSource[java.util.List[Out @uncheckedVariance], Mat] =
+ new SubSource(delegate.groupedAdjacentByWeighted(f.apply, maxWeight)(costFn.apply).map(_.asJava))
+
/**
* Apply a sliding window over the stream and return the windows as groups of elements, with the last group
* possibly smaller than requested due to end-of-stream.
diff --git a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala
index b1670f0e22..451b43ca0d 100755
--- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala
@@ -1779,6 +1779,46 @@ trait FlowOps[+Out, +Mat] {
def groupedWeighted(minWeight: Long)(costFn: Out => Long): Repr[immutable.Seq[Out]] =
via(GroupedWeighted[Out](minWeight, costFn))
+ /**
+ * Partitions this stream into chunks by a delimiter function, which is applied to each incoming element,
+ * when the result of the function is not the same as the previous element's result, a chunk is emitted.
+ *
+ * The `f` function must return a non-null value for all elements, otherwise the stage will fail.
+ *
+ * '''Emits when''' the delimiter function returns a different value than the previous element's result
+ *
+ * '''Backpressures when''' a chunk has been assembled and downstream backpressures
+ *
+ * '''Completes when''' upstream completes
+ *
+ * '''Cancels when''' downstream cancels
+ *
+ * @since 1.2.0
+ */
+ def groupedAdjacentBy[T](f: Out => T): Repr[immutable.Seq[Out]] =
+ via(GroupedAdjacentByWeighted(f, Long.MaxValue, ConstantFun.oneLong))
+
+ /**
+ * Partitions this stream into chunks by a delimiter function, which is applied to each incoming element,
+ * when the result of the function is not the same as the previous element's result, or the accumulated weight exceeds
+ * the `maxWeight`, a chunk is emitted.
+ *
+ * The `f` function must return a non-null value , and the `costFn` must return a non-negative result for all inputs,
+ * otherwise the stage will fail.
+ *
+ * '''Emits when''' the delimiter function returns a different value than the previous element's result, or exceeds the `maxWeight`.
+ *
+ * '''Backpressures when''' a chunk has been assembled and downstream backpressures
+ *
+ * '''Completes when''' upstream completes
+ *
+ * '''Cancels when''' downstream cancels
+ *
+ * @since 1.2.0
+ */
+ def groupedAdjacentByWeighted[T](f: Out => T, maxWeight: Long)(costFn: Out => Long): Repr[immutable.Seq[Out]] =
+ via(GroupedAdjacentByWeighted(f, maxWeight, costFn))
+
/**
* Ensure stream boundedness by limiting the number of elements from upstream.
* If the number of incoming elements exceeds max, it will signal