feat: Add groupedAdjacentBy and GroupedAdjacentByWeighted operators. (#1937)

This commit is contained in:
He-Pin(kerr) 2025-07-10 10:54:46 +08:00 committed by GitHub
parent c6af89b083
commit ec7fdc7d0f
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
14 changed files with 677 additions and 0 deletions

View file

@ -0,0 +1,43 @@
# groupedAdjacentBy
Partitions this stream into chunks by a delimiter function.
@ref[Simple operators](../index.md#simple-operators)
## Signature
@apidoc[Source.groupedAdjacentBy](Source) { scala="#groupedAdjacentBy(f:Out=>T):FlowOps.this.Repr[scala.collection.immutable.Seq[Out]]" java="#groupedAdjacentBy(org.apache.pekko.japi.function.Function)" }
@apidoc[Flow.groupedAdjacentBy](Flow) { scala="#groupedAdjacentBy(f:Out=>T):FlowOps.this.Repr[scala.collection.immutable.Seq[Out]]" java="#groupedAdjacentBy(org.apache.pekko.japi.function.Function)" }
## Description
Partitions this stream into chunks by a delimiter function.
See also:
* @ref[groupedAdjacentByWeighted](groupedAdjacentByWeighted.md) for a variant that groups with weight limit too.
## Examples
The example below demonstrates how `groupedAdjacentBy` partitions the elements into @scala[`Seq`] @java[`List`].
Scala
: @@snip [GroupedAdjacentBy.scala](/docs/src/test/scala/docs/stream/operators/sourceorflow/GroupedAdjacentBy.scala) { #groupedAdjacentBy }
Java
: @@snip [SourceOrFlow.java](/docs/src/test/java/jdocs/stream/operators/SourceOrFlow.java) { #groupedAdjacentBy }
## Reactive Streams semantics
@@@div { .callout }
**emits** when the delimiter function returns a different value than the previous element's result
**backpressures** when a chunk has been assembled and downstream backpressures
**completes** when upstream completes
@@@

View file

@ -0,0 +1,42 @@
# groupedAdjacentByWeighted
Partitions this stream into chunks by a delimiter function and a weight limit.
@ref[Simple operators](../index.md#simple-operators)
## Signature
@apidoc[Source.groupedAdjacentByWeighted](Source) { scala="#groupedAdjacentByWeighted(f:Out=>T,maxWeight:Long)(costFn:Out=>Long):FlowOps.this.Repr[scala.collection.immutable.Seq[Out]]" java="#groupedAdjacentBy(org.apache.pekko.japi.function.Function,long,org.apache.pekko.japi.function.Function)" }
@apidoc[Flow.groupedAdjacentByWeighted](Flow) { scala="#groupedAdjacentByWeighted(f:Out=>T,maxWeight:Long)(costFn:Out=>Long):FlowOps.this.Repr[scala.collection.immutable.Seq[Out]]" java="#groupedAdjacentBy(org.apache.pekko.japi.function.Function,long,org.apache.pekko.japi.function.Function)" }
## Description
Partitions this stream into chunks by a delimiter function.
See also:
* @ref[groupedAdjacentBy](groupedAdjacentBy.md) for a simpler variant.
## Examples
The example below demonstrates how `groupedAdjacentByWeighted` partitions the elements into @scala[`Seq`] @java[`List`].
Scala
: @@snip [GroupedAdjacentBy.scala](/docs/src/test/scala/docs/stream/operators/sourceorflow/GroupedAdjacentBy.scala) { #groupedAdjacentByWeighted }
Java
: @@snip [SourceOrFlow.java](/docs/src/test/java/jdocs/stream/operators/SourceOrFlow.java) { #groupedAdjacentByWeighted }
## Reactive Streams semantics
@@@div { .callout }
**emits** when the delimiter function returns a different value than the previous element's result, or exceeds the `maxWeight`.
**backpressures** when a chunk has been assembled and downstream backpressures
**completes** when upstream completes
@@@

View file

@ -169,6 +169,8 @@ depending on being backpressured by downstream or not.
|Source/Flow|<a name="frommaterializer"></a>@ref[fromMaterializer](Source-or-Flow/fromMaterializer.md)|Defer the creation of a `Source/Flow` until materialization and access `Materializer` and `Attributes`|
|Flow|<a name="futureflow"></a>@ref[futureFlow](Flow/futureFlow.md)|Streams the elements through the given future flow once it successfully completes.|
|Source/Flow|<a name="grouped"></a>@ref[grouped](Source-or-Flow/grouped.md)|Accumulate incoming events until the specified number of elements have been accumulated and then pass the collection of elements downstream.|
|Source/Flow|<a name="groupedadjacentby"></a>@ref[groupedAdjacentBy](Source-or-Flow/groupedAdjacentBy.md)|Partitions this stream into chunks by a delimiter function.|
|Source/Flow|<a name="groupedadjacentbyweighted"></a>@ref[groupedAdjacentByWeighted](Source-or-Flow/groupedAdjacentByWeighted.md)|Partitions this stream into chunks by a delimiter function and a weight limit.|
|Source/Flow|<a name="groupedweighted"></a>@ref[groupedWeighted](Source-or-Flow/groupedWeighted.md)|Accumulate incoming events until the combined weight of elements is greater than or equal to the minimum weight and then pass the collection of elements downstream.|
|Source/Flow|<a name="intersperse"></a>@ref[intersperse](Source-or-Flow/intersperse.md)|Intersperse stream with provided element similar to `List.mkString`.|
|Flow|<a name="lazycompletionstageflow"></a>@ref[lazyCompletionStageFlow](Flow/lazyCompletionStageFlow.md)|Defers creation and materialization of a `Flow` until there is a first element.|
@ -499,6 +501,8 @@ For more background see the @ref[Error Handling in Streams](../stream-error.md)
* [futureSource](Source/futureSource.md)
* [groupBy](Source-or-Flow/groupBy.md)
* [grouped](Source-or-Flow/grouped.md)
* [groupedAdjacentBy](Source-or-Flow/groupedAdjacentBy.md)
* [groupedAdjacentByWeighted](Source-or-Flow/groupedAdjacentByWeighted.md)
* [groupedWeighted](Source-or-Flow/groupedWeighted.md)
* [groupedWeightedWithin](Source-or-Flow/groupedWeightedWithin.md)
* [groupedWithin](Source-or-Flow/groupedWithin.md)

View file

@ -467,6 +467,32 @@ class SourceOrFlow {
// #groupedWeighted
}
void groupedAdjacentByExample() {
// #groupedAdjacentBy
Source.from(Arrays.asList("Hello", "Hi", "Greetings", "Hey"))
.groupedAdjacentBy(str -> str.charAt(0))
.runForeach(System.out::println, system);
// prints:
// [Hello, Hi]
// [Greetings]
// [Hey]
// #groupedAdjacentBy
}
void groupedAdjacentByWeightedExample() {
// #groupedAdjacentByWeighted
Source.from(Arrays.asList("Hello", "HiHi", "Hi", "Hi", "Greetings", "Hey"))
.groupedAdjacentByWeighted(str -> str.charAt(0), 4, str -> (long) str.length())
.runForeach(System.out::println, system);
// prints:
// [Hello]
// [HiHi]
// [Hi, Hi]
// [Greetings]
// [Hey]
// #groupedAdjacentByWeighted
}
static
// #fold // #foldAsync
class Histogram {

View file

@ -0,0 +1,53 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* license agreements; and to You under the Apache License, version 2.0:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* This file is part of the Apache Pekko project, which was derived from Akka.
*/
/*
* Copyright (C) 2019-2022 Lightbend Inc. <https://www.lightbend.com>
*/
package docs.stream.operators.sourceorflow
import org.apache.pekko.stream.scaladsl.Source
import scala.collection.immutable
object GroupedAdjacentBy {
def groupedAdjacentByExample(): Unit = {
import org.apache.pekko.actor.ActorSystem
implicit val system: ActorSystem = ActorSystem()
// #groupedAdjacentBy
Source(List("Hello", "Hi", "Greetings", "Hey"))
.groupedAdjacentBy(_.head)
.runForeach(println)
// prints:
// Vector(Hello, Hi)
// Vector(Greetings)
// Vector(Hey)
// #groupedAdjacentBy
}
def groupedAdjacentByWeightedExample(): Unit = {
import org.apache.pekko.actor.ActorSystem
implicit val system: ActorSystem = ActorSystem()
// #groupedAdjacentByWeighted
Source(List("Hello", "HiHi", "Hi", "Hi", "Greetings", "Hey"))
.groupedAdjacentByWeighted(_.head, 4)(_.length)
.runForeach(println)
// prints:
// Vector(Hello)
// Vector(HiHi)
// Vector(Hi, Hi)
// Vector(Greetings)
// Vector(Hey)
// #groupedAdjacentByWeighted
}
}

View file

@ -13,6 +13,7 @@
package org.apache.pekko.stream.javadsl;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import org.apache.pekko.Done;
import org.apache.pekko.NotUsed;
@ -122,6 +123,32 @@ public class FlowTest extends StreamTest {
probe.expectMsgEquals("de");
}
@Test
public void mustBeAbleToUseGroupedAdjacentBy() {
Source.from(Arrays.asList("Hello", "Hi", "Greetings", "Hey"))
.groupedAdjacentBy(str -> str.charAt(0))
.runWith(TestSink.probe(system), system)
.request(4)
.expectNext(Lists.newArrayList("Hello", "Hi"))
.expectNext(Lists.newArrayList("Greetings"))
.expectNext(Lists.newArrayList("Hey"))
.expectComplete();
}
@Test
public void mustBeAbleToUseGroupedAdjacentByWeighted() {
Source.from(Arrays.asList("Hello", "HiHi", "Hi", "Hi", "Greetings", "Hey"))
.groupedAdjacentByWeighted(str -> str.charAt(0), 4, str -> (long) str.length())
.runWith(TestSink.probe(system), system)
.request(6)
.expectNext(Lists.newArrayList("Hello"))
.expectNext(Lists.newArrayList("HiHi"))
.expectNext(Lists.newArrayList("Hi", "Hi"))
.expectNext(Lists.newArrayList("Greetings"))
.expectNext(Lists.newArrayList("Hey"))
.expectComplete();
}
@Test
public void mustBeAbleToUseContraMap() {
final Source<String, NotUsed> source = Source.from(Arrays.asList("1", "2", "3"));

View file

@ -0,0 +1,100 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.pekko.stream.scaladsl
import org.apache.pekko
import pekko.stream.testkit.scaladsl.TestSink
import pekko.stream.testkit.{ ScriptedTest, StreamSpec }
class FlowGroupedAdjacentByWeightedSpec extends StreamSpec("""
pekko.stream.materializer.initial-input-buffer-size = 2
""") with ScriptedTest {
"A GroupedAdjacentByWeighted" must {
"produce no group when source is empty" in {
Source.empty[String]
.groupedAdjacentBy(identity(_))
.runWith(TestSink.probe[Seq[String]])
.request(1)
.expectComplete()
}
"group adjacent elements by predicate" in {
val input = List("a", "a", "b", "b", "c", "c")
Source(input)
.groupedAdjacentBy(identity(_))
.runWith(TestSink.probe[Seq[String]])
.request(6)
.expectNext(Seq("a", "a"))
.expectNext(Seq("b", "b"))
.expectNext(Seq("c", "c"))
.expectComplete()
}
"group adjust elements by leading char" in {
val input = List("Hello", "Hi", "Greetings", "Hey")
Source(input)
.groupedAdjacentBy(_.head)
.runWith(TestSink.probe[Seq[String]])
.request(4)
.expectNext(Seq("Hello", "Hi"))
.expectNext(Seq("Greetings"))
.expectNext(Seq("Hey"))
.expectComplete()
}
"be able to act like bufferUntilChanged" in {
Source(List(1, 1, 2, 2, 3, 3, 1))
.groupedAdjacentBy(identity(_))
.runWith(TestSink.probe[Seq[Int]])
.request(7)
.expectNext(Seq(1, 1))
.expectNext(Seq(2, 2))
.expectNext(Seq(3, 3))
.expectNext(Seq(1))
.expectComplete()
}
"Be able to limit the chunk size" in {
Source(List("Hello", "Hi", "Hey", "Greetings", "Hey"))
.groupedAdjacentByWeighted(_.head, 2)(_ => 1L)
.runWith(TestSink.probe[Seq[String]])
.request(5)
.expectNext(Seq("Hello", "Hi"))
.expectNext(Seq("Hey"))
.expectNext(Seq("Greetings"))
.expectNext(Seq("Hey"))
.expectComplete()
}
"Be able to handle single heavy weighted element" in {
Source(List("Hello", "HiHi", "Hi", "Hi", "Greetings", "Hey"))
.groupedAdjacentByWeighted(_.head, 4)(_.length)
.runWith(TestSink.probe[Seq[String]])
.request(6)
.expectNext(Seq("Hello"))
.expectNext(Seq("HiHi"))
.expectNext(Seq("Hi", "Hi"))
.expectNext(Seq("Greetings"))
.expectNext(Seq("Hey"))
.expectComplete()
}
}
}

View file

@ -47,6 +47,7 @@ import pekko.stream.Attributes._
val mapWithResource = name("mapWithResource") and IODispatcher
val ask = name("ask")
val grouped = name("grouped")
val groupedAdjacentByWeighted = name("groupedAdjacentByWeighted")
val groupedWithin = name("groupedWithin")
val groupedWeighted = name("groupedWeighted")
val groupedWeightedWithin = name("groupedWeightedWithin")

View file

@ -0,0 +1,168 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.pekko.stream.impl.fusing
import org.apache.pekko
import pekko.annotation.InternalApi
import pekko.stream.impl.Stages.DefaultAttributes
import pekko.stream.{ Attributes, FlowShape, Inlet, Outlet }
import pekko.stream.stage.{ GraphStage, GraphStageLogic, InHandler, OutHandler }
import pekko.util.OptionVal
import scala.collection.immutable
/**
* INTERNAL API
*/
@InternalApi
private[pekko] final case class GroupedAdjacentByWeighted[T, R](
f: T => R,
maxWeight: Long,
costFn: T => Long)
extends GraphStage[FlowShape[T, immutable.Seq[T]]] {
require(f != null, "f must not be null")
require(maxWeight > 0, "maxWeight must be greater than 0")
require(costFn != null, "costFn must not be null")
private val in = Inlet[T]("GroupedAdjacentByWeighted.in")
private val out = Outlet[immutable.Seq[T]]("GroupedAdjacentByWeighted.out")
override val shape: FlowShape[T, immutable.Seq[T]] = FlowShape(in, out)
override def initialAttributes: Attributes = DefaultAttributes.groupedAdjacentByWeighted
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new GraphStageLogic(shape) with InHandler with OutHandler {
private var builder = Vector.newBuilder[T]
private var currentWeight: Long = 0L
// used to track if elements has been added to the current group, zero weight is allowed
private var hasElements: Boolean = false
private var currentKey: OptionVal[R] = OptionVal.none
private var pendingGroup: OptionVal[immutable.Seq[T]] = OptionVal.none
override def onPush(): Unit = {
val elem = grab(in)
val cost = costFn(elem)
if (cost < 0L) {
failStage(new IllegalArgumentException(s"Negative weight [$cost] for element [$elem] is not allowed"))
return
}
val elemKey = f(elem)
require(elemKey != null, "Element key must not be null")
if (shouldPushDirectly(cost)) {
push(out, Vector(elem))
} else if (shouldStartNewGroup(elemKey, cost)) {
emitCurrentGroup()
handleNewElement(elem, cost, elemKey)
} else {
addToCurrentGroup(elem, cost, elemKey)
tryPullIfNeeded()
}
}
private def shouldPushDirectly(cost: Long): Boolean = {
cost >= maxWeight && !hasElements
}
private def shouldStartNewGroup(elemKey: R, cost: Long): Boolean = currentKey match {
case OptionVal.Some(key) if (elemKey != key) || (currentWeight + cost > maxWeight) => true
case OptionVal.None if cost > maxWeight => true
case _ => false
}
private def emitCurrentGroup(): Unit = if (hasElements) {
val group = builder.result()
resetGroup()
pushOrQueue(group)
}
private def handleNewElement(elem: T, cost: Long, key: R): Unit = {
if (cost > maxWeight) {
pushOrQueue(Vector(elem))
} else {
addToCurrentGroup(elem, cost, key)
}
tryPullIfNeeded()
}
private def addToCurrentGroup(elem: T, cost: Long, key: R): Unit = {
builder += elem
hasElements = true
currentWeight += cost
currentKey = OptionVal.Some(key)
}
private def resetGroup(): Unit = {
builder.clear()
hasElements = false
currentWeight = 0L
currentKey = OptionVal.none
}
private def pushOrQueue(group: immutable.Seq[T]): Unit = pendingGroup match {
case OptionVal.Some(pending) =>
push(out, pending)
pendingGroup = OptionVal.Some(group)
case OptionVal.None =>
if (isAvailable(out)) {
push(out, group)
} else {
pendingGroup = OptionVal.Some(group)
}
}
private def tryPullIfNeeded(): Unit = pendingGroup match {
case OptionVal.None if !hasBeenPulled(in) && isAvailable(out) => pull(in)
case _ =>
}
override def onPull(): Unit = {
pendingGroup match {
case OptionVal.Some(group) =>
push(out, group)
pendingGroup = OptionVal.none
case _ => if (!hasBeenPulled(in)) pull(in)
}
}
override def onUpstreamFinish(): Unit = {
val finalGroup = builder.result()
builder = null
pendingGroup match {
case OptionVal.Some(group) =>
if (finalGroup.nonEmpty) {
emitMultiple(out, List(group, finalGroup).iterator, () => completeStage())
} else {
emit(out, group, () => completeStage())
}
case OptionVal.None =>
if (finalGroup.nonEmpty) {
emit(out, finalGroup, () => completeStage())
} else {
completeStage()
}
}
}
setHandlers(in, out, this)
}
}

View file

@ -1320,6 +1320,50 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr
costFn: java.util.function.Function[Out, java.lang.Long]): javadsl.Flow[In, java.util.List[Out], Mat] =
new Flow(delegate.groupedWeighted(minWeight)(costFn.apply).map(_.asJava)) // TODO optimize to one step
/**
* Partitions this stream into chunks by a delimiter function, which is applied to each incoming element,
* when the result of the function is not the same as the previous element's result, a chunk is emitted.
*
* The `f` function must return a non-null value for all elements, otherwise the stage will fail.
*
* '''Emits when''' the delimiter function returns a different value than the previous element's result
*
* '''Backpressures when''' a chunk has been assembled and downstream backpressures
*
* '''Completes when''' upstream completes
*
* '''Cancels when''' downstream cancels
*
* @since 1.2.0
*/
def groupedAdjacentBy[R](
f: function.Function[Out, R]): javadsl.Flow[In, java.util.List[Out @uncheckedVariance], Mat] =
new Flow(delegate.groupedAdjacentBy(f.apply).map(_.asJava))
/**
* Partitions this stream into chunks by a delimiter function, which is applied to each incoming element,
* when the result of the function is not the same as the previous element's result, or the accumulated weight exceeds
* the `maxWeight`, a chunk is emitted.
*
* The `f` function must return a non-null value , and the `costFn` must return a non-negative result for all inputs,
* otherwise the stage will fail.
*
* '''Emits when''' the delimiter function returns a different value than the previous element's result, or exceeds the `maxWeight`.
*
* '''Backpressures when''' a chunk has been assembled and downstream backpressures
*
* '''Completes when''' upstream completes
*
* '''Cancels when''' downstream cancels
*
* @since 1.2.0
*/
def groupedAdjacentByWeighted[R](f: function.Function[Out, R],
maxWeight: Long,
costFn: java.util.function.Function[Out, java.lang.Long])
: javadsl.Flow[In, java.util.List[Out @uncheckedVariance], Mat] =
new Flow(delegate.groupedAdjacentByWeighted(f.apply, maxWeight)(costFn.apply).map(_.asJava))
/**
* Ensure stream boundedness by limiting the number of elements from upstream.
* If the number of incoming elements exceeds max, it will signal

View file

@ -3103,6 +3103,49 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[
: javadsl.Source[java.util.List[Out @uncheckedVariance], Mat] =
new Source(delegate.groupedWeighted(minWeight)(costFn.apply).map(_.asJava))
/**
* Partitions this stream into chunks by a delimiter function, which is applied to each incoming element,
* when the result of the function is not the same as the previous element's result, a chunk is emitted.
*
* The `f` function must return a non-null value for all elements, otherwise the stage will fail.
*
* '''Emits when''' the delimiter function returns a different value than the previous element's result
*
* '''Backpressures when''' a chunk has been assembled and downstream backpressures
*
* '''Completes when''' upstream completes
*
* '''Cancels when''' downstream cancels
*
* @since 1.2.0
*/
def groupedAdjacentBy[R](f: function.Function[Out, R]): javadsl.Source[java.util.List[Out @uncheckedVariance], Mat] =
new Source(delegate.groupedAdjacentBy(f.apply).map(_.asJava))
/**
* Partitions this stream into chunks by a delimiter function, which is applied to each incoming element,
* when the result of the function is not the same as the previous element's result, or the accumulated weight exceeds
* the `maxWeight`, a chunk is emitted.
*
* The `f` function must return a non-null value , and the `costFn` must return a non-negative result for all inputs,
* otherwise the stage will fail.
*
* '''Emits when''' the delimiter function returns a different value than the previous element's result, or exceeds the `maxWeight`.
*
* '''Backpressures when''' a chunk has been assembled and downstream backpressures
*
* '''Completes when''' upstream completes
*
* '''Cancels when''' downstream cancels
*
* @since 1.2.0
*/
def groupedAdjacentByWeighted[R](f: function.Function[Out, R],
maxWeight: Long,
costFn: java.util.function.Function[Out, java.lang.Long])
: javadsl.Source[java.util.List[Out @uncheckedVariance], Mat] =
new Source(delegate.groupedAdjacentByWeighted(f.apply, maxWeight)(costFn.apply).map(_.asJava))
/**
* Ensure stream boundedness by limiting the number of elements from upstream.
* If the number of incoming elements exceeds max, it will signal

View file

@ -679,6 +679,49 @@ class SubFlow[In, Out, Mat](
costFn: function.Function[Out, java.lang.Long]): SubFlow[In, java.util.List[Out @uncheckedVariance], Mat] =
new SubFlow(delegate.groupedWeighted(minWeight)(costFn.apply).map(_.asJava)) // TODO optimize to one step
/**
* Partitions this stream into chunks by a delimiter function, which is applied to each incoming element,
* when the result of the function is not the same as the previous element's result, a chunk is emitted.
*
* The `f` function must return a non-null value for all elements, otherwise the stage will fail.
*
* '''Emits when''' the delimiter function returns a different value than the previous element's result
*
* '''Backpressures when''' a chunk has been assembled and downstream backpressures
*
* '''Completes when''' upstream completes
*
* '''Cancels when''' downstream cancels
*
* @since 1.2.0
*/
def groupedAdjacentBy[R](f: function.Function[Out, R]): SubFlow[In, java.util.List[Out @uncheckedVariance], Mat] =
new SubFlow(delegate.groupedAdjacentBy(f.apply).map(_.asJava))
/**
* Partitions this stream into chunks by a delimiter function, which is applied to each incoming element,
* when the result of the function is not the same as the previous element's result, or the accumulated weight exceeds
* the `maxWeight`, a chunk is emitted.
*
* The `f` function must return a non-null value , and the `costFn` must return a non-negative result for all inputs,
* otherwise the stage will fail.
*
* '''Emits when''' the delimiter function returns a different value than the previous element's result, or exceeds the `maxWeight`.
*
* '''Backpressures when''' a chunk has been assembled and downstream backpressures
*
* '''Completes when''' upstream completes
*
* '''Cancels when''' downstream cancels
*
* @since 1.2.0
*/
def groupedAdjacentByWeighted[R](f: function.Function[Out, R],
maxWeight: Long,
costFn: java.util.function.Function[Out, java.lang.Long])
: SubFlow[In, java.util.List[Out @uncheckedVariance], Mat] =
new SubFlow(delegate.groupedAdjacentByWeighted(f.apply, maxWeight)(costFn.apply).map(_.asJava))
/**
* Ensure stream boundedness by limiting the number of elements from upstream.
* If the number of incoming elements exceeds max, it will signal

View file

@ -669,6 +669,49 @@ class SubSource[Out, Mat](
costFn: function.Function[Out, java.lang.Long]): SubSource[java.util.List[Out @uncheckedVariance], Mat] =
new SubSource(delegate.groupedWeighted(minWeight)(costFn.apply).map(_.asJava)) // TODO optimize to one step
/**
* Partitions this stream into chunks by a delimiter function, which is applied to each incoming element,
* when the result of the function is not the same as the previous element's result, a chunk is emitted.
*
* The `f` function must return a non-null value for all elements, otherwise the stage will fail.
*
* '''Emits when''' the delimiter function returns a different value than the previous element's result
*
* '''Backpressures when''' a chunk has been assembled and downstream backpressures
*
* '''Completes when''' upstream completes
*
* '''Cancels when''' downstream cancels
*
* @since 1.2.0
*/
def groupedAdjacentBy[R](f: function.Function[Out, R]): SubSource[java.util.List[Out @uncheckedVariance], Mat] =
new SubSource(delegate.groupedAdjacentBy(f.apply).map(_.asJava))
/**
* Partitions this stream into chunks by a delimiter function, which is applied to each incoming element,
* when the result of the function is not the same as the previous element's result, or the accumulated weight exceeds
* the `maxWeight`, a chunk is emitted.
*
* The `f` function must return a non-null value , and the `costFn` must return a non-negative result for all inputs,
* otherwise the stage will fail.
*
* '''Emits when''' the delimiter function returns a different value than the previous element's result, or exceeds the `maxWeight`.
*
* '''Backpressures when''' a chunk has been assembled and downstream backpressures
*
* '''Completes when''' upstream completes
*
* '''Cancels when''' downstream cancels
*
* @since 1.2.0
*/
def groupedAdjacentByWeighted[R](f: function.Function[Out, R],
maxWeight: Long,
costFn: java.util.function.Function[Out, java.lang.Long])
: SubSource[java.util.List[Out @uncheckedVariance], Mat] =
new SubSource(delegate.groupedAdjacentByWeighted(f.apply, maxWeight)(costFn.apply).map(_.asJava))
/**
* Apply a sliding window over the stream and return the windows as groups of elements, with the last group
* possibly smaller than requested due to end-of-stream.

View file

@ -1779,6 +1779,46 @@ trait FlowOps[+Out, +Mat] {
def groupedWeighted(minWeight: Long)(costFn: Out => Long): Repr[immutable.Seq[Out]] =
via(GroupedWeighted[Out](minWeight, costFn))
/**
* Partitions this stream into chunks by a delimiter function, which is applied to each incoming element,
* when the result of the function is not the same as the previous element's result, a chunk is emitted.
*
* The `f` function must return a non-null value for all elements, otherwise the stage will fail.
*
* '''Emits when''' the delimiter function returns a different value than the previous element's result
*
* '''Backpressures when''' a chunk has been assembled and downstream backpressures
*
* '''Completes when''' upstream completes
*
* '''Cancels when''' downstream cancels
*
* @since 1.2.0
*/
def groupedAdjacentBy[T](f: Out => T): Repr[immutable.Seq[Out]] =
via(GroupedAdjacentByWeighted(f, Long.MaxValue, ConstantFun.oneLong))
/**
* Partitions this stream into chunks by a delimiter function, which is applied to each incoming element,
* when the result of the function is not the same as the previous element's result, or the accumulated weight exceeds
* the `maxWeight`, a chunk is emitted.
*
* The `f` function must return a non-null value , and the `costFn` must return a non-negative result for all inputs,
* otherwise the stage will fail.
*
* '''Emits when''' the delimiter function returns a different value than the previous element's result, or exceeds the `maxWeight`.
*
* '''Backpressures when''' a chunk has been assembled and downstream backpressures
*
* '''Completes when''' upstream completes
*
* '''Cancels when''' downstream cancels
*
* @since 1.2.0
*/
def groupedAdjacentByWeighted[T](f: Out => T, maxWeight: Long)(costFn: Out => Long): Repr[immutable.Seq[Out]] =
via(GroupedAdjacentByWeighted(f, maxWeight, costFn))
/**
* Ensure stream boundedness by limiting the number of elements from upstream.
* If the number of incoming elements exceeds max, it will signal