fix: Change aggregateWithBoundary operator in javadsl to use Optional. (#1876)

This commit is contained in:
He-Pin(kerr) 2025-05-30 03:20:51 +08:00 committed by GitHub
parent 572bebd619
commit 197fb6e60e
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
5 changed files with 179 additions and 12 deletions

View file

@ -824,6 +824,37 @@ public class SourceTest extends StreamTest {
Assert.assertEquals("123415", result.toCompletableFuture().get(3, TimeUnit.SECONDS));
}
@Test
@SuppressWarnings("unchecked")
public void mustBeAbleToUseAggregateWithBoundary() {
final java.lang.Iterable<Integer> input = Arrays.asList(1, 1, 2, 3, 3, 4, 5, 5, 6);
// used to implement grouped(2)
Source.from(input)
.aggregateWithBoundary(
() -> (List<Integer>) new ArrayList<Integer>(2),
(agg, elem) -> {
if (agg.size() == 1) {
agg.add(elem);
return Pair.create(agg, true);
} else {
agg.add(elem);
return Pair.create(agg, false);
}
},
Function.identity(),
Optional.empty())
.runWith(TestSink.create(system), system)
.ensureSubscription()
.request(6)
.expectNext(
Arrays.asList(1, 1),
Arrays.asList(2, 3),
Arrays.asList(3, 4),
Arrays.asList(5, 5),
Arrays.asList(6))
.expectComplete();
}
@Test
public void mustBeAbleToUseStatefulMapAsDropRepeated() throws Exception {
final java.lang.Iterable<Integer> input = Arrays.asList(1, 1, 1, 2, 3, 3, 3, 4, 5, 5, 5);

View file

@ -30,7 +30,6 @@ import pekko.Done
import pekko.NotUsed
import pekko.actor.ActorRef
import pekko.actor.ClassicActorSystemProvider
import pekko.annotation.ApiMayChange
import pekko.dispatch.ExecutionContexts
import pekko.event.{ LogMarker, LoggingAdapter, MarkerLoggingAdapter }
import pekko.japi.Pair
@ -4624,11 +4623,12 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr
* @param harvest this is invoked before emit within the current stage/operator
* @param emitOnTimer decide whether the current aggregated elements can be emitted, the custom function is invoked on every interval
*/
@ApiMayChange
@deprecated("Use the overloaded one which accepts an Optional instead.", since = "1.2.0")
def aggregateWithBoundary[Agg, Emit](allocate: java.util.function.Supplier[Agg],
aggregate: function.Function2[Agg, Out, Pair[Agg, Boolean]],
harvest: function.Function[Agg, Emit],
emitOnTimer: Pair[java.util.function.Predicate[Agg], java.time.Duration]): javadsl.Flow[In, Emit, Mat] =
emitOnTimer: Pair[java.util.function.Predicate[Agg], java.time.Duration])
: javadsl.Flow[In, Emit, Mat] = {
asScala
.aggregateWithBoundary(() => allocate.get())(
aggregate = (agg, out) => aggregate.apply(agg, out).toScala,
@ -4637,6 +4637,41 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr
case Pair(predicate, duration) => (agg => predicate.test(agg), duration.asScala)
})
.asJava
}
/**
* Aggregate input elements into an arbitrary data structure that can be completed and emitted downstream
* when custom condition is met which can be triggered by aggregate or timer.
* It can be thought of a more general [[groupedWeightedWithin]].
*
* '''Emits when''' the aggregation function decides the aggregate is complete or the timer function returns true
*
* '''Backpressures when''' downstream backpressures and the aggregate is complete
*
* '''Completes when''' upstream completes and the last aggregate has been emitted downstream
*
* '''Cancels when''' downstream cancels
*
* @param allocate allocate the initial data structure for aggregated elements
* @param aggregate update the aggregated elements, return true if ready to emit after update.
* @param harvest this is invoked before emit within the current stage/operator
* @param emitOnTimer decide whether the current aggregated elements can be emitted, the custom function is invoked on every interval
*/
def aggregateWithBoundary[Agg, Emit](allocate: java.util.function.Supplier[Agg],
aggregate: function.Function2[Agg, Out, Pair[Agg, Boolean]],
harvest: function.Function[Agg, Emit],
emitOnTimer: Optional[Pair[java.util.function.Predicate[Agg], java.time.Duration]])
: javadsl.Flow[In, Emit, Mat] = {
import org.apache.pekko.util.OptionConverters._
asScala
.aggregateWithBoundary(() => allocate.get())(
aggregate = (agg, out) => aggregate.apply(agg, out).toScala,
harvest = agg => harvest.apply(agg),
emitOnTimer = emitOnTimer.toScala.map {
case Pair(predicate, duration) => (agg => predicate.test(agg), duration.asScala)
})
.asJava
}
override def getAttributes: Attributes = delegate.getAttributes

View file

@ -28,7 +28,6 @@ import scala.reflect.ClassTag
import org.apache.pekko
import pekko.{ Done, NotUsed }
import pekko.actor.{ ActorRef, Cancellable, ClassicActorSystemProvider }
import pekko.annotation.ApiMayChange
import pekko.dispatch.ExecutionContexts
import pekko.event.{ LogMarker, LoggingAdapter, MarkerLoggingAdapter }
import pekko.japi.{ function, JavaPartialFunction, Pair }
@ -5170,11 +5169,11 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[
* @param harvest this is invoked before emit within the current stage/operator
* @param emitOnTimer decide whether the current aggregated elements can be emitted, the custom function is invoked on every interval
*/
@ApiMayChange
@deprecated("Use the overloaded one which accepts an Optional instead.", since = "1.2.0")
def aggregateWithBoundary[Agg, Emit](allocate: java.util.function.Supplier[Agg],
aggregate: function.Function2[Agg, Out, Pair[Agg, Boolean]],
harvest: function.Function[Agg, Emit],
emitOnTimer: Pair[java.util.function.Predicate[Agg], java.time.Duration]): javadsl.Source[Emit, Mat] =
emitOnTimer: Pair[java.util.function.Predicate[Agg], java.time.Duration]): javadsl.Source[Emit, Mat] = {
asScala
.aggregateWithBoundary(() => allocate.get())(
aggregate = (agg, out) => aggregate.apply(agg, out).toScala,
@ -5183,6 +5182,40 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[
case Pair(predicate, duration) => (agg => predicate.test(agg), duration.asScala)
})
.asJava
}
/**
* Aggregate input elements into an arbitrary data structure that can be completed and emitted downstream
* when custom condition is met which can be triggered by aggregate or timer.
* It can be thought of a more general [[groupedWeightedWithin]].
*
* '''Emits when''' the aggregation function decides the aggregate is complete or the timer function returns true
*
* '''Backpressures when''' downstream backpressures and the aggregate is complete
*
* '''Completes when''' upstream completes and the last aggregate has been emitted downstream
*
* '''Cancels when''' downstream cancels
*
* @param allocate allocate the initial data structure for aggregated elements
* @param aggregate update the aggregated elements, return true if ready to emit after update.
* @param harvest this is invoked before emit within the current stage/operator
* @param emitOnTimer decide whether the current aggregated elements can be emitted, the custom function is invoked on every interval
*/
def aggregateWithBoundary[Agg, Emit](allocate: java.util.function.Supplier[Agg],
aggregate: function.Function2[Agg, Out, Pair[Agg, Boolean]],
harvest: function.Function[Agg, Emit],
emitOnTimer: Optional[Pair[java.util.function.Predicate[Agg], java.time.Duration]]): javadsl.Source[Emit, Mat] = {
import org.apache.pekko.util.OptionConverters._
asScala
.aggregateWithBoundary(() => allocate.get())(
aggregate = (agg, out) => aggregate.apply(agg, out).toScala,
harvest = agg => harvest.apply(agg),
emitOnTimer = emitOnTimer.toScala.map {
case Pair(predicate, duration) => (agg => predicate.test(agg), duration.asScala)
})
.asJava
}
override def getAttributes: Attributes = delegate.getAttributes

View file

@ -25,7 +25,6 @@ import scala.reflect.ClassTag
import org.apache.pekko
import pekko.NotUsed
import pekko.annotation.ApiMayChange
import pekko.event.{ LogMarker, LoggingAdapter, MarkerLoggingAdapter }
import pekko.japi.{ function, Pair }
import pekko.stream._
@ -3129,11 +3128,12 @@ class SubFlow[In, Out, Mat](
* @param harvest this is invoked before emit within the current stage/operator
* @param emitOnTimer decide whether the current aggregated elements can be emitted, the custom function is invoked on every interval
*/
@ApiMayChange
@deprecated("Use the overloaded one which accepts an Optional instead.", since = "1.2.0")
def aggregateWithBoundary[Agg, Emit](allocate: java.util.function.Supplier[Agg],
aggregate: function.Function2[Agg, Out, Pair[Agg, Boolean]],
harvest: function.Function[Agg, Emit],
emitOnTimer: Pair[java.util.function.Predicate[Agg], java.time.Duration]): javadsl.SubFlow[In, Emit, Mat] =
emitOnTimer: Pair[java.util.function.Predicate[Agg], java.time.Duration])
: javadsl.SubFlow[In, Emit, Mat] = {
new SubFlow(
asScala.aggregateWithBoundary(() => allocate.get())(
aggregate = (agg, out) => aggregate.apply(agg, out).toScala,
@ -3141,5 +3141,39 @@ class SubFlow[In, Out, Mat](
emitOnTimer = Option(emitOnTimer).map {
case Pair(predicate, duration) => (agg => predicate.test(agg), duration.asScala)
}))
}
/**
* Aggregate input elements into an arbitrary data structure that can be completed and emitted downstream
* when custom condition is met which can be triggered by aggregate or timer.
* It can be thought of a more general [[groupedWeightedWithin]].
*
* '''Emits when''' the aggregation function decides the aggregate is complete or the timer function returns true
*
* '''Backpressures when''' downstream backpressures and the aggregate is complete
*
* '''Completes when''' upstream completes and the last aggregate has been emitted downstream
*
* '''Cancels when''' downstream cancels
*
* @param allocate allocate the initial data structure for aggregated elements
* @param aggregate update the aggregated elements, return true if ready to emit after update.
* @param harvest this is invoked before emit within the current stage/operator
* @param emitOnTimer decide whether the current aggregated elements can be emitted, the custom function is invoked on every interval
*/
def aggregateWithBoundary[Agg, Emit](allocate: java.util.function.Supplier[Agg],
aggregate: function.Function2[Agg, Out, Pair[Agg, Boolean]],
harvest: function.Function[Agg, Emit],
emitOnTimer: Optional[Pair[java.util.function.Predicate[Agg], java.time.Duration]])
: javadsl.SubFlow[In, Emit, Mat] = {
import org.apache.pekko.util.OptionConverters._
new SubFlow(
asScala.aggregateWithBoundary(() => allocate.get())(
aggregate = (agg, out) => aggregate.apply(agg, out).toScala,
harvest = agg => harvest.apply(agg),
emitOnTimer = emitOnTimer.toScala.map {
case Pair(predicate, duration) => (agg => predicate.test(agg), duration.asScala)
}))
}
}

View file

@ -25,7 +25,6 @@ import scala.reflect.ClassTag
import org.apache.pekko
import pekko.NotUsed
import pekko.annotation.ApiMayChange
import pekko.event.{ LogMarker, LoggingAdapter, MarkerLoggingAdapter }
import pekko.japi.{ function, Pair }
import pekko.stream._
@ -3097,11 +3096,12 @@ class SubSource[Out, Mat](
* @param harvest this is invoked before emit within the current stage/operator
* @param emitOnTimer decide whether the current aggregated elements can be emitted, the custom function is invoked on every interval
*/
@ApiMayChange
@deprecated("Use the overloaded one which accepts an Optional instead.", since = "1.2.0")
def aggregateWithBoundary[Agg, Emit](allocate: java.util.function.Supplier[Agg],
aggregate: function.Function2[Agg, Out, Pair[Agg, Boolean]],
harvest: function.Function[Agg, Emit],
emitOnTimer: Pair[java.util.function.Predicate[Agg], java.time.Duration]): javadsl.SubSource[Emit, Mat] =
emitOnTimer: Pair[java.util.function.Predicate[Agg], java.time.Duration])
: javadsl.SubSource[Emit, Mat] = {
new SubSource(
asScala.aggregateWithBoundary(() => allocate.get())(
aggregate = (agg, out) => aggregate.apply(agg, out).toScala,
@ -3109,4 +3109,38 @@ class SubSource[Out, Mat](
emitOnTimer = Option(emitOnTimer).map {
case Pair(predicate, duration) => (agg => predicate.test(agg), duration.asScala)
}))
}
/**
* Aggregate input elements into an arbitrary data structure that can be completed and emitted downstream
* when custom condition is met which can be triggered by aggregate or timer.
* It can be thought of a more general [[groupedWeightedWithin]].
*
* '''Emits when''' the aggregation function decides the aggregate is complete or the timer function returns true
*
* '''Backpressures when''' downstream backpressures and the aggregate is complete
*
* '''Completes when''' upstream completes and the last aggregate has been emitted downstream
*
* '''Cancels when''' downstream cancels
*
* @param allocate allocate the initial data structure for aggregated elements
* @param aggregate update the aggregated elements, return true if ready to emit after update.
* @param harvest this is invoked before emit within the current stage/operator
* @param emitOnTimer decide whether the current aggregated elements can be emitted, the custom function is invoked on every interval
*/
def aggregateWithBoundary[Agg, Emit](allocate: java.util.function.Supplier[Agg],
aggregate: function.Function2[Agg, Out, Pair[Agg, Boolean]],
harvest: function.Function[Agg, Emit],
emitOnTimer: Optional[Pair[java.util.function.Predicate[Agg], java.time.Duration]])
: javadsl.SubSource[Emit, Mat] = {
import org.apache.pekko.util.OptionConverters._
new SubSource(
asScala.aggregateWithBoundary(() => allocate.get())(
aggregate = (agg, out) => aggregate.apply(agg, out).toScala,
harvest = agg => harvest.apply(agg),
emitOnTimer = emitOnTimer.toScala.map {
case Pair(predicate, duration) => (agg => predicate.test(agg), duration.asScala)
}))
}
}