From ec7fdc7d0ffda7f8cdcaeb5b7517a62f4979bab5 Mon Sep 17 00:00:00 2001 From: "He-Pin(kerr)" Date: Thu, 10 Jul 2025 10:54:46 +0800 Subject: [PATCH] feat: Add groupedAdjacentBy and GroupedAdjacentByWeighted operators. (#1937) --- .../Source-or-Flow/groupedAdjacentBy.md | 43 +++++ .../groupedAdjacentByWeighted.md | 42 +++++ .../main/paradox/stream/operators/index.md | 4 + .../jdocs/stream/operators/SourceOrFlow.java | 26 +++ .../sourceorflow/GroupedAdjacentBy.scala | 53 ++++++ .../apache/pekko/stream/javadsl/FlowTest.java | 27 +++ .../FlowGroupedAdjacentByWeightedSpec.scala | 100 +++++++++++ .../org/apache/pekko/stream/impl/Stages.scala | 1 + .../fusing/GroupedAdjacentByWeighted.scala | 168 ++++++++++++++++++ .../apache/pekko/stream/javadsl/Flow.scala | 44 +++++ .../apache/pekko/stream/javadsl/Source.scala | 43 +++++ .../apache/pekko/stream/javadsl/SubFlow.scala | 43 +++++ .../pekko/stream/javadsl/SubSource.scala | 43 +++++ .../apache/pekko/stream/scaladsl/Flow.scala | 40 +++++ 14 files changed, 677 insertions(+) create mode 100644 docs/src/main/paradox/stream/operators/Source-or-Flow/groupedAdjacentBy.md create mode 100644 docs/src/main/paradox/stream/operators/Source-or-Flow/groupedAdjacentByWeighted.md create mode 100644 docs/src/test/scala/docs/stream/operators/sourceorflow/GroupedAdjacentBy.scala create mode 100644 stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowGroupedAdjacentByWeightedSpec.scala create mode 100644 stream/src/main/scala/org/apache/pekko/stream/impl/fusing/GroupedAdjacentByWeighted.scala diff --git a/docs/src/main/paradox/stream/operators/Source-or-Flow/groupedAdjacentBy.md b/docs/src/main/paradox/stream/operators/Source-or-Flow/groupedAdjacentBy.md new file mode 100644 index 0000000000..0d970777c2 --- /dev/null +++ b/docs/src/main/paradox/stream/operators/Source-or-Flow/groupedAdjacentBy.md @@ -0,0 +1,43 @@ +# groupedAdjacentBy + +Partitions this stream into chunks by a delimiter function. + +@ref[Simple operators](../index.md#simple-operators) + +## Signature + +@apidoc[Source.groupedAdjacentBy](Source) { scala="#groupedAdjacentBy(f:Out=>T):FlowOps.this.Repr[scala.collection.immutable.Seq[Out]]" java="#groupedAdjacentBy(org.apache.pekko.japi.function.Function)" } +@apidoc[Flow.groupedAdjacentBy](Flow) { scala="#groupedAdjacentBy(f:Out=>T):FlowOps.this.Repr[scala.collection.immutable.Seq[Out]]" java="#groupedAdjacentBy(org.apache.pekko.japi.function.Function)" } + + +## Description + +Partitions this stream into chunks by a delimiter function. + +See also: + +* @ref[groupedAdjacentByWeighted](groupedAdjacentByWeighted.md) for a variant that groups with weight limit too. + +## Examples + +The example below demonstrates how `groupedAdjacentBy` partitions the elements into @scala[`Seq`] @java[`List`]. + +Scala +: @@snip [GroupedAdjacentBy.scala](/docs/src/test/scala/docs/stream/operators/sourceorflow/GroupedAdjacentBy.scala) { #groupedAdjacentBy } + +Java +: @@snip [SourceOrFlow.java](/docs/src/test/java/jdocs/stream/operators/SourceOrFlow.java) { #groupedAdjacentBy } + +## Reactive Streams semantics + +@@@div { .callout } + +**emits** when the delimiter function returns a different value than the previous element's result + +**backpressures** when a chunk has been assembled and downstream backpressures + +**completes** when upstream completes + +@@@ + + diff --git a/docs/src/main/paradox/stream/operators/Source-or-Flow/groupedAdjacentByWeighted.md b/docs/src/main/paradox/stream/operators/Source-or-Flow/groupedAdjacentByWeighted.md new file mode 100644 index 0000000000..4aaa68020c --- /dev/null +++ b/docs/src/main/paradox/stream/operators/Source-or-Flow/groupedAdjacentByWeighted.md @@ -0,0 +1,42 @@ +# groupedAdjacentByWeighted + +Partitions this stream into chunks by a delimiter function and a weight limit. + +@ref[Simple operators](../index.md#simple-operators) + +## Signature + +@apidoc[Source.groupedAdjacentByWeighted](Source) { scala="#groupedAdjacentByWeighted(f:Out=>T,maxWeight:Long)(costFn:Out=>Long):FlowOps.this.Repr[scala.collection.immutable.Seq[Out]]" java="#groupedAdjacentBy(org.apache.pekko.japi.function.Function,long,org.apache.pekko.japi.function.Function)" } +@apidoc[Flow.groupedAdjacentByWeighted](Flow) { scala="#groupedAdjacentByWeighted(f:Out=>T,maxWeight:Long)(costFn:Out=>Long):FlowOps.this.Repr[scala.collection.immutable.Seq[Out]]" java="#groupedAdjacentBy(org.apache.pekko.japi.function.Function,long,org.apache.pekko.japi.function.Function)" } + +## Description + +Partitions this stream into chunks by a delimiter function. + +See also: + +* @ref[groupedAdjacentBy](groupedAdjacentBy.md) for a simpler variant. + +## Examples + +The example below demonstrates how `groupedAdjacentByWeighted` partitions the elements into @scala[`Seq`] @java[`List`]. + +Scala +: @@snip [GroupedAdjacentBy.scala](/docs/src/test/scala/docs/stream/operators/sourceorflow/GroupedAdjacentBy.scala) { #groupedAdjacentByWeighted } + +Java +: @@snip [SourceOrFlow.java](/docs/src/test/java/jdocs/stream/operators/SourceOrFlow.java) { #groupedAdjacentByWeighted } + +## Reactive Streams semantics + +@@@div { .callout } + +**emits** when the delimiter function returns a different value than the previous element's result, or exceeds the `maxWeight`. + +**backpressures** when a chunk has been assembled and downstream backpressures + +**completes** when upstream completes + +@@@ + + diff --git a/docs/src/main/paradox/stream/operators/index.md b/docs/src/main/paradox/stream/operators/index.md index 37e2d6fdcc..8d0dfabcba 100644 --- a/docs/src/main/paradox/stream/operators/index.md +++ b/docs/src/main/paradox/stream/operators/index.md @@ -169,6 +169,8 @@ depending on being backpressured by downstream or not. |Source/Flow|@ref[fromMaterializer](Source-or-Flow/fromMaterializer.md)|Defer the creation of a `Source/Flow` until materialization and access `Materializer` and `Attributes`| |Flow|@ref[futureFlow](Flow/futureFlow.md)|Streams the elements through the given future flow once it successfully completes.| |Source/Flow|@ref[grouped](Source-or-Flow/grouped.md)|Accumulate incoming events until the specified number of elements have been accumulated and then pass the collection of elements downstream.| +|Source/Flow|@ref[groupedAdjacentBy](Source-or-Flow/groupedAdjacentBy.md)|Partitions this stream into chunks by a delimiter function.| +|Source/Flow|@ref[groupedAdjacentByWeighted](Source-or-Flow/groupedAdjacentByWeighted.md)|Partitions this stream into chunks by a delimiter function and a weight limit.| |Source/Flow|@ref[groupedWeighted](Source-or-Flow/groupedWeighted.md)|Accumulate incoming events until the combined weight of elements is greater than or equal to the minimum weight and then pass the collection of elements downstream.| |Source/Flow|@ref[intersperse](Source-or-Flow/intersperse.md)|Intersperse stream with provided element similar to `List.mkString`.| |Flow|@ref[lazyCompletionStageFlow](Flow/lazyCompletionStageFlow.md)|Defers creation and materialization of a `Flow` until there is a first element.| @@ -499,6 +501,8 @@ For more background see the @ref[Error Handling in Streams](../stream-error.md) * [futureSource](Source/futureSource.md) * [groupBy](Source-or-Flow/groupBy.md) * [grouped](Source-or-Flow/grouped.md) +* [groupedAdjacentBy](Source-or-Flow/groupedAdjacentBy.md) +* [groupedAdjacentByWeighted](Source-or-Flow/groupedAdjacentByWeighted.md) * [groupedWeighted](Source-or-Flow/groupedWeighted.md) * [groupedWeightedWithin](Source-or-Flow/groupedWeightedWithin.md) * [groupedWithin](Source-or-Flow/groupedWithin.md) diff --git a/docs/src/test/java/jdocs/stream/operators/SourceOrFlow.java b/docs/src/test/java/jdocs/stream/operators/SourceOrFlow.java index 4a9d058d42..65c985d849 100644 --- a/docs/src/test/java/jdocs/stream/operators/SourceOrFlow.java +++ b/docs/src/test/java/jdocs/stream/operators/SourceOrFlow.java @@ -467,6 +467,32 @@ class SourceOrFlow { // #groupedWeighted } + void groupedAdjacentByExample() { + // #groupedAdjacentBy + Source.from(Arrays.asList("Hello", "Hi", "Greetings", "Hey")) + .groupedAdjacentBy(str -> str.charAt(0)) + .runForeach(System.out::println, system); + // prints: + // [Hello, Hi] + // [Greetings] + // [Hey] + // #groupedAdjacentBy + } + + void groupedAdjacentByWeightedExample() { + // #groupedAdjacentByWeighted + Source.from(Arrays.asList("Hello", "HiHi", "Hi", "Hi", "Greetings", "Hey")) + .groupedAdjacentByWeighted(str -> str.charAt(0), 4, str -> (long) str.length()) + .runForeach(System.out::println, system); + // prints: + // [Hello] + // [HiHi] + // [Hi, Hi] + // [Greetings] + // [Hey] + // #groupedAdjacentByWeighted + } + static // #fold // #foldAsync class Histogram { diff --git a/docs/src/test/scala/docs/stream/operators/sourceorflow/GroupedAdjacentBy.scala b/docs/src/test/scala/docs/stream/operators/sourceorflow/GroupedAdjacentBy.scala new file mode 100644 index 0000000000..a8f1f51f51 --- /dev/null +++ b/docs/src/test/scala/docs/stream/operators/sourceorflow/GroupedAdjacentBy.scala @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * license agreements; and to You under the Apache License, version 2.0: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * This file is part of the Apache Pekko project, which was derived from Akka. + */ + +/* + * Copyright (C) 2019-2022 Lightbend Inc. + */ + +package docs.stream.operators.sourceorflow + +import org.apache.pekko.stream.scaladsl.Source + +import scala.collection.immutable + +object GroupedAdjacentBy { + def groupedAdjacentByExample(): Unit = { + import org.apache.pekko.actor.ActorSystem + implicit val system: ActorSystem = ActorSystem() + + // #groupedAdjacentBy + Source(List("Hello", "Hi", "Greetings", "Hey")) + .groupedAdjacentBy(_.head) + .runForeach(println) + // prints: + // Vector(Hello, Hi) + // Vector(Greetings) + // Vector(Hey) + // #groupedAdjacentBy + } + + def groupedAdjacentByWeightedExample(): Unit = { + import org.apache.pekko.actor.ActorSystem + implicit val system: ActorSystem = ActorSystem() + + // #groupedAdjacentByWeighted + Source(List("Hello", "HiHi", "Hi", "Hi", "Greetings", "Hey")) + .groupedAdjacentByWeighted(_.head, 4)(_.length) + .runForeach(println) + // prints: + // Vector(Hello) + // Vector(HiHi) + // Vector(Hi, Hi) + // Vector(Greetings) + // Vector(Hey) + // #groupedAdjacentByWeighted + } + +} diff --git a/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/FlowTest.java b/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/FlowTest.java index 1b6b400f85..0a45acc2bd 100644 --- a/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/FlowTest.java +++ b/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/FlowTest.java @@ -13,6 +13,7 @@ package org.apache.pekko.stream.javadsl; +import com.google.common.collect.Lists; import com.google.common.collect.Sets; import org.apache.pekko.Done; import org.apache.pekko.NotUsed; @@ -122,6 +123,32 @@ public class FlowTest extends StreamTest { probe.expectMsgEquals("de"); } + @Test + public void mustBeAbleToUseGroupedAdjacentBy() { + Source.from(Arrays.asList("Hello", "Hi", "Greetings", "Hey")) + .groupedAdjacentBy(str -> str.charAt(0)) + .runWith(TestSink.probe(system), system) + .request(4) + .expectNext(Lists.newArrayList("Hello", "Hi")) + .expectNext(Lists.newArrayList("Greetings")) + .expectNext(Lists.newArrayList("Hey")) + .expectComplete(); + } + + @Test + public void mustBeAbleToUseGroupedAdjacentByWeighted() { + Source.from(Arrays.asList("Hello", "HiHi", "Hi", "Hi", "Greetings", "Hey")) + .groupedAdjacentByWeighted(str -> str.charAt(0), 4, str -> (long) str.length()) + .runWith(TestSink.probe(system), system) + .request(6) + .expectNext(Lists.newArrayList("Hello")) + .expectNext(Lists.newArrayList("HiHi")) + .expectNext(Lists.newArrayList("Hi", "Hi")) + .expectNext(Lists.newArrayList("Greetings")) + .expectNext(Lists.newArrayList("Hey")) + .expectComplete(); + } + @Test public void mustBeAbleToUseContraMap() { final Source source = Source.from(Arrays.asList("1", "2", "3")); diff --git a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowGroupedAdjacentByWeightedSpec.scala b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowGroupedAdjacentByWeightedSpec.scala new file mode 100644 index 0000000000..b6b53fdf18 --- /dev/null +++ b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowGroupedAdjacentByWeightedSpec.scala @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.pekko.stream.scaladsl + +import org.apache.pekko +import pekko.stream.testkit.scaladsl.TestSink +import pekko.stream.testkit.{ ScriptedTest, StreamSpec } + +class FlowGroupedAdjacentByWeightedSpec extends StreamSpec(""" + pekko.stream.materializer.initial-input-buffer-size = 2 + """) with ScriptedTest { + + "A GroupedAdjacentByWeighted" must { + "produce no group when source is empty" in { + Source.empty[String] + .groupedAdjacentBy(identity(_)) + .runWith(TestSink.probe[Seq[String]]) + .request(1) + .expectComplete() + } + + "group adjacent elements by predicate" in { + val input = List("a", "a", "b", "b", "c", "c") + Source(input) + .groupedAdjacentBy(identity(_)) + .runWith(TestSink.probe[Seq[String]]) + .request(6) + .expectNext(Seq("a", "a")) + .expectNext(Seq("b", "b")) + .expectNext(Seq("c", "c")) + .expectComplete() + } + + "group adjust elements by leading char" in { + val input = List("Hello", "Hi", "Greetings", "Hey") + Source(input) + .groupedAdjacentBy(_.head) + .runWith(TestSink.probe[Seq[String]]) + .request(4) + .expectNext(Seq("Hello", "Hi")) + .expectNext(Seq("Greetings")) + .expectNext(Seq("Hey")) + .expectComplete() + } + + "be able to act like bufferUntilChanged" in { + Source(List(1, 1, 2, 2, 3, 3, 1)) + .groupedAdjacentBy(identity(_)) + .runWith(TestSink.probe[Seq[Int]]) + .request(7) + .expectNext(Seq(1, 1)) + .expectNext(Seq(2, 2)) + .expectNext(Seq(3, 3)) + .expectNext(Seq(1)) + .expectComplete() + } + + "Be able to limit the chunk size" in { + Source(List("Hello", "Hi", "Hey", "Greetings", "Hey")) + .groupedAdjacentByWeighted(_.head, 2)(_ => 1L) + .runWith(TestSink.probe[Seq[String]]) + .request(5) + .expectNext(Seq("Hello", "Hi")) + .expectNext(Seq("Hey")) + .expectNext(Seq("Greetings")) + .expectNext(Seq("Hey")) + .expectComplete() + } + + "Be able to handle single heavy weighted element" in { + Source(List("Hello", "HiHi", "Hi", "Hi", "Greetings", "Hey")) + .groupedAdjacentByWeighted(_.head, 4)(_.length) + .runWith(TestSink.probe[Seq[String]]) + .request(6) + .expectNext(Seq("Hello")) + .expectNext(Seq("HiHi")) + .expectNext(Seq("Hi", "Hi")) + .expectNext(Seq("Greetings")) + .expectNext(Seq("Hey")) + .expectComplete() + } + + } + +} diff --git a/stream/src/main/scala/org/apache/pekko/stream/impl/Stages.scala b/stream/src/main/scala/org/apache/pekko/stream/impl/Stages.scala index 00278f7a41..cd01348ce3 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/impl/Stages.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/impl/Stages.scala @@ -47,6 +47,7 @@ import pekko.stream.Attributes._ val mapWithResource = name("mapWithResource") and IODispatcher val ask = name("ask") val grouped = name("grouped") + val groupedAdjacentByWeighted = name("groupedAdjacentByWeighted") val groupedWithin = name("groupedWithin") val groupedWeighted = name("groupedWeighted") val groupedWeightedWithin = name("groupedWeightedWithin") diff --git a/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/GroupedAdjacentByWeighted.scala b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/GroupedAdjacentByWeighted.scala new file mode 100644 index 0000000000..adb395b141 --- /dev/null +++ b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/GroupedAdjacentByWeighted.scala @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.pekko.stream.impl.fusing + +import org.apache.pekko +import pekko.annotation.InternalApi +import pekko.stream.impl.Stages.DefaultAttributes +import pekko.stream.{ Attributes, FlowShape, Inlet, Outlet } +import pekko.stream.stage.{ GraphStage, GraphStageLogic, InHandler, OutHandler } +import pekko.util.OptionVal + +import scala.collection.immutable + +/** + * INTERNAL API + */ +@InternalApi +private[pekko] final case class GroupedAdjacentByWeighted[T, R]( + f: T => R, + maxWeight: Long, + costFn: T => Long) + extends GraphStage[FlowShape[T, immutable.Seq[T]]] { + + require(f != null, "f must not be null") + require(maxWeight > 0, "maxWeight must be greater than 0") + require(costFn != null, "costFn must not be null") + + private val in = Inlet[T]("GroupedAdjacentByWeighted.in") + private val out = Outlet[immutable.Seq[T]]("GroupedAdjacentByWeighted.out") + + override val shape: FlowShape[T, immutable.Seq[T]] = FlowShape(in, out) + override def initialAttributes: Attributes = DefaultAttributes.groupedAdjacentByWeighted + + override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = + new GraphStageLogic(shape) with InHandler with OutHandler { + private var builder = Vector.newBuilder[T] + private var currentWeight: Long = 0L + // used to track if elements has been added to the current group, zero weight is allowed + private var hasElements: Boolean = false + private var currentKey: OptionVal[R] = OptionVal.none + private var pendingGroup: OptionVal[immutable.Seq[T]] = OptionVal.none + + override def onPush(): Unit = { + val elem = grab(in) + val cost = costFn(elem) + + if (cost < 0L) { + failStage(new IllegalArgumentException(s"Negative weight [$cost] for element [$elem] is not allowed")) + return + } + + val elemKey = f(elem) + require(elemKey != null, "Element key must not be null") + + if (shouldPushDirectly(cost)) { + push(out, Vector(elem)) + } else if (shouldStartNewGroup(elemKey, cost)) { + emitCurrentGroup() + handleNewElement(elem, cost, elemKey) + } else { + addToCurrentGroup(elem, cost, elemKey) + tryPullIfNeeded() + } + } + + private def shouldPushDirectly(cost: Long): Boolean = { + cost >= maxWeight && !hasElements + } + + private def shouldStartNewGroup(elemKey: R, cost: Long): Boolean = currentKey match { + case OptionVal.Some(key) if (elemKey != key) || (currentWeight + cost > maxWeight) => true + case OptionVal.None if cost > maxWeight => true + case _ => false + } + + private def emitCurrentGroup(): Unit = if (hasElements) { + val group = builder.result() + resetGroup() + pushOrQueue(group) + } + + private def handleNewElement(elem: T, cost: Long, key: R): Unit = { + if (cost > maxWeight) { + pushOrQueue(Vector(elem)) + } else { + addToCurrentGroup(elem, cost, key) + } + tryPullIfNeeded() + } + + private def addToCurrentGroup(elem: T, cost: Long, key: R): Unit = { + builder += elem + hasElements = true + currentWeight += cost + currentKey = OptionVal.Some(key) + } + + private def resetGroup(): Unit = { + builder.clear() + hasElements = false + currentWeight = 0L + currentKey = OptionVal.none + } + + private def pushOrQueue(group: immutable.Seq[T]): Unit = pendingGroup match { + case OptionVal.Some(pending) => + push(out, pending) + pendingGroup = OptionVal.Some(group) + case OptionVal.None => + if (isAvailable(out)) { + push(out, group) + } else { + pendingGroup = OptionVal.Some(group) + } + } + + private def tryPullIfNeeded(): Unit = pendingGroup match { + case OptionVal.None if !hasBeenPulled(in) && isAvailable(out) => pull(in) + case _ => + } + + override def onPull(): Unit = { + pendingGroup match { + case OptionVal.Some(group) => + push(out, group) + pendingGroup = OptionVal.none + case _ => if (!hasBeenPulled(in)) pull(in) + } + } + + override def onUpstreamFinish(): Unit = { + val finalGroup = builder.result() + builder = null + + pendingGroup match { + case OptionVal.Some(group) => + if (finalGroup.nonEmpty) { + emitMultiple(out, List(group, finalGroup).iterator, () => completeStage()) + } else { + emit(out, group, () => completeStage()) + } + case OptionVal.None => + if (finalGroup.nonEmpty) { + emit(out, finalGroup, () => completeStage()) + } else { + completeStage() + } + } + } + + setHandlers(in, out, this) + } +} diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala index fb0696ad55..53697ed7e3 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala @@ -1320,6 +1320,50 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr costFn: java.util.function.Function[Out, java.lang.Long]): javadsl.Flow[In, java.util.List[Out], Mat] = new Flow(delegate.groupedWeighted(minWeight)(costFn.apply).map(_.asJava)) // TODO optimize to one step + /** + * Partitions this stream into chunks by a delimiter function, which is applied to each incoming element, + * when the result of the function is not the same as the previous element's result, a chunk is emitted. + * + * The `f` function must return a non-null value for all elements, otherwise the stage will fail. + * + * '''Emits when''' the delimiter function returns a different value than the previous element's result + * + * '''Backpressures when''' a chunk has been assembled and downstream backpressures + * + * '''Completes when''' upstream completes + * + * '''Cancels when''' downstream cancels + * + * @since 1.2.0 + */ + def groupedAdjacentBy[R]( + f: function.Function[Out, R]): javadsl.Flow[In, java.util.List[Out @uncheckedVariance], Mat] = + new Flow(delegate.groupedAdjacentBy(f.apply).map(_.asJava)) + + /** + * Partitions this stream into chunks by a delimiter function, which is applied to each incoming element, + * when the result of the function is not the same as the previous element's result, or the accumulated weight exceeds + * the `maxWeight`, a chunk is emitted. + * + * The `f` function must return a non-null value , and the `costFn` must return a non-negative result for all inputs, + * otherwise the stage will fail. + * + * '''Emits when''' the delimiter function returns a different value than the previous element's result, or exceeds the `maxWeight`. + * + * '''Backpressures when''' a chunk has been assembled and downstream backpressures + * + * '''Completes when''' upstream completes + * + * '''Cancels when''' downstream cancels + * + * @since 1.2.0 + */ + def groupedAdjacentByWeighted[R](f: function.Function[Out, R], + maxWeight: Long, + costFn: java.util.function.Function[Out, java.lang.Long]) + : javadsl.Flow[In, java.util.List[Out @uncheckedVariance], Mat] = + new Flow(delegate.groupedAdjacentByWeighted(f.apply, maxWeight)(costFn.apply).map(_.asJava)) + /** * Ensure stream boundedness by limiting the number of elements from upstream. * If the number of incoming elements exceeds max, it will signal diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala index fe3e6c694b..8ce6c92889 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala @@ -3103,6 +3103,49 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[ : javadsl.Source[java.util.List[Out @uncheckedVariance], Mat] = new Source(delegate.groupedWeighted(minWeight)(costFn.apply).map(_.asJava)) + /** + * Partitions this stream into chunks by a delimiter function, which is applied to each incoming element, + * when the result of the function is not the same as the previous element's result, a chunk is emitted. + * + * The `f` function must return a non-null value for all elements, otherwise the stage will fail. + * + * '''Emits when''' the delimiter function returns a different value than the previous element's result + * + * '''Backpressures when''' a chunk has been assembled and downstream backpressures + * + * '''Completes when''' upstream completes + * + * '''Cancels when''' downstream cancels + * + * @since 1.2.0 + */ + def groupedAdjacentBy[R](f: function.Function[Out, R]): javadsl.Source[java.util.List[Out @uncheckedVariance], Mat] = + new Source(delegate.groupedAdjacentBy(f.apply).map(_.asJava)) + + /** + * Partitions this stream into chunks by a delimiter function, which is applied to each incoming element, + * when the result of the function is not the same as the previous element's result, or the accumulated weight exceeds + * the `maxWeight`, a chunk is emitted. + * + * The `f` function must return a non-null value , and the `costFn` must return a non-negative result for all inputs, + * otherwise the stage will fail. + * + * '''Emits when''' the delimiter function returns a different value than the previous element's result, or exceeds the `maxWeight`. + * + * '''Backpressures when''' a chunk has been assembled and downstream backpressures + * + * '''Completes when''' upstream completes + * + * '''Cancels when''' downstream cancels + * + * @since 1.2.0 + */ + def groupedAdjacentByWeighted[R](f: function.Function[Out, R], + maxWeight: Long, + costFn: java.util.function.Function[Out, java.lang.Long]) + : javadsl.Source[java.util.List[Out @uncheckedVariance], Mat] = + new Source(delegate.groupedAdjacentByWeighted(f.apply, maxWeight)(costFn.apply).map(_.asJava)) + /** * Ensure stream boundedness by limiting the number of elements from upstream. * If the number of incoming elements exceeds max, it will signal diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala index 5358de307a..edb3b907b8 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala @@ -679,6 +679,49 @@ class SubFlow[In, Out, Mat]( costFn: function.Function[Out, java.lang.Long]): SubFlow[In, java.util.List[Out @uncheckedVariance], Mat] = new SubFlow(delegate.groupedWeighted(minWeight)(costFn.apply).map(_.asJava)) // TODO optimize to one step + /** + * Partitions this stream into chunks by a delimiter function, which is applied to each incoming element, + * when the result of the function is not the same as the previous element's result, a chunk is emitted. + * + * The `f` function must return a non-null value for all elements, otherwise the stage will fail. + * + * '''Emits when''' the delimiter function returns a different value than the previous element's result + * + * '''Backpressures when''' a chunk has been assembled and downstream backpressures + * + * '''Completes when''' upstream completes + * + * '''Cancels when''' downstream cancels + * + * @since 1.2.0 + */ + def groupedAdjacentBy[R](f: function.Function[Out, R]): SubFlow[In, java.util.List[Out @uncheckedVariance], Mat] = + new SubFlow(delegate.groupedAdjacentBy(f.apply).map(_.asJava)) + + /** + * Partitions this stream into chunks by a delimiter function, which is applied to each incoming element, + * when the result of the function is not the same as the previous element's result, or the accumulated weight exceeds + * the `maxWeight`, a chunk is emitted. + * + * The `f` function must return a non-null value , and the `costFn` must return a non-negative result for all inputs, + * otherwise the stage will fail. + * + * '''Emits when''' the delimiter function returns a different value than the previous element's result, or exceeds the `maxWeight`. + * + * '''Backpressures when''' a chunk has been assembled and downstream backpressures + * + * '''Completes when''' upstream completes + * + * '''Cancels when''' downstream cancels + * + * @since 1.2.0 + */ + def groupedAdjacentByWeighted[R](f: function.Function[Out, R], + maxWeight: Long, + costFn: java.util.function.Function[Out, java.lang.Long]) + : SubFlow[In, java.util.List[Out @uncheckedVariance], Mat] = + new SubFlow(delegate.groupedAdjacentByWeighted(f.apply, maxWeight)(costFn.apply).map(_.asJava)) + /** * Ensure stream boundedness by limiting the number of elements from upstream. * If the number of incoming elements exceeds max, it will signal diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala index 3ab439ca9f..bc77cb5bee 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala @@ -669,6 +669,49 @@ class SubSource[Out, Mat]( costFn: function.Function[Out, java.lang.Long]): SubSource[java.util.List[Out @uncheckedVariance], Mat] = new SubSource(delegate.groupedWeighted(minWeight)(costFn.apply).map(_.asJava)) // TODO optimize to one step + /** + * Partitions this stream into chunks by a delimiter function, which is applied to each incoming element, + * when the result of the function is not the same as the previous element's result, a chunk is emitted. + * + * The `f` function must return a non-null value for all elements, otherwise the stage will fail. + * + * '''Emits when''' the delimiter function returns a different value than the previous element's result + * + * '''Backpressures when''' a chunk has been assembled and downstream backpressures + * + * '''Completes when''' upstream completes + * + * '''Cancels when''' downstream cancels + * + * @since 1.2.0 + */ + def groupedAdjacentBy[R](f: function.Function[Out, R]): SubSource[java.util.List[Out @uncheckedVariance], Mat] = + new SubSource(delegate.groupedAdjacentBy(f.apply).map(_.asJava)) + + /** + * Partitions this stream into chunks by a delimiter function, which is applied to each incoming element, + * when the result of the function is not the same as the previous element's result, or the accumulated weight exceeds + * the `maxWeight`, a chunk is emitted. + * + * The `f` function must return a non-null value , and the `costFn` must return a non-negative result for all inputs, + * otherwise the stage will fail. + * + * '''Emits when''' the delimiter function returns a different value than the previous element's result, or exceeds the `maxWeight`. + * + * '''Backpressures when''' a chunk has been assembled and downstream backpressures + * + * '''Completes when''' upstream completes + * + * '''Cancels when''' downstream cancels + * + * @since 1.2.0 + */ + def groupedAdjacentByWeighted[R](f: function.Function[Out, R], + maxWeight: Long, + costFn: java.util.function.Function[Out, java.lang.Long]) + : SubSource[java.util.List[Out @uncheckedVariance], Mat] = + new SubSource(delegate.groupedAdjacentByWeighted(f.apply, maxWeight)(costFn.apply).map(_.asJava)) + /** * Apply a sliding window over the stream and return the windows as groups of elements, with the last group * possibly smaller than requested due to end-of-stream. diff --git a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala index b1670f0e22..451b43ca0d 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala @@ -1779,6 +1779,46 @@ trait FlowOps[+Out, +Mat] { def groupedWeighted(minWeight: Long)(costFn: Out => Long): Repr[immutable.Seq[Out]] = via(GroupedWeighted[Out](minWeight, costFn)) + /** + * Partitions this stream into chunks by a delimiter function, which is applied to each incoming element, + * when the result of the function is not the same as the previous element's result, a chunk is emitted. + * + * The `f` function must return a non-null value for all elements, otherwise the stage will fail. + * + * '''Emits when''' the delimiter function returns a different value than the previous element's result + * + * '''Backpressures when''' a chunk has been assembled and downstream backpressures + * + * '''Completes when''' upstream completes + * + * '''Cancels when''' downstream cancels + * + * @since 1.2.0 + */ + def groupedAdjacentBy[T](f: Out => T): Repr[immutable.Seq[Out]] = + via(GroupedAdjacentByWeighted(f, Long.MaxValue, ConstantFun.oneLong)) + + /** + * Partitions this stream into chunks by a delimiter function, which is applied to each incoming element, + * when the result of the function is not the same as the previous element's result, or the accumulated weight exceeds + * the `maxWeight`, a chunk is emitted. + * + * The `f` function must return a non-null value , and the `costFn` must return a non-negative result for all inputs, + * otherwise the stage will fail. + * + * '''Emits when''' the delimiter function returns a different value than the previous element's result, or exceeds the `maxWeight`. + * + * '''Backpressures when''' a chunk has been assembled and downstream backpressures + * + * '''Completes when''' upstream completes + * + * '''Cancels when''' downstream cancels + * + * @since 1.2.0 + */ + def groupedAdjacentByWeighted[T](f: Out => T, maxWeight: Long)(costFn: Out => Long): Repr[immutable.Seq[Out]] = + via(GroupedAdjacentByWeighted(f, maxWeight, costFn)) + /** * Ensure stream boundedness by limiting the number of elements from upstream. * If the number of incoming elements exceeds max, it will signal