diff --git a/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SourceTest.java b/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SourceTest.java index 2fd92863a2..f3ffc0fd45 100644 --- a/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SourceTest.java +++ b/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SourceTest.java @@ -824,6 +824,37 @@ public class SourceTest extends StreamTest { Assert.assertEquals("123415", result.toCompletableFuture().get(3, TimeUnit.SECONDS)); } + @Test + @SuppressWarnings("unchecked") + public void mustBeAbleToUseAggregateWithBoundary() { + final java.lang.Iterable input = Arrays.asList(1, 1, 2, 3, 3, 4, 5, 5, 6); + // used to implement grouped(2) + Source.from(input) + .aggregateWithBoundary( + () -> (List) new ArrayList(2), + (agg, elem) -> { + if (agg.size() == 1) { + agg.add(elem); + return Pair.create(agg, true); + } else { + agg.add(elem); + return Pair.create(agg, false); + } + }, + Function.identity(), + Optional.empty()) + .runWith(TestSink.create(system), system) + .ensureSubscription() + .request(6) + .expectNext( + Arrays.asList(1, 1), + Arrays.asList(2, 3), + Arrays.asList(3, 4), + Arrays.asList(5, 5), + Arrays.asList(6)) + .expectComplete(); + } + @Test public void mustBeAbleToUseStatefulMapAsDropRepeated() throws Exception { final java.lang.Iterable input = Arrays.asList(1, 1, 1, 2, 3, 3, 3, 4, 5, 5, 5); diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala index cba5959a4e..fb0696ad55 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala @@ -30,7 +30,6 @@ import pekko.Done import pekko.NotUsed import pekko.actor.ActorRef import pekko.actor.ClassicActorSystemProvider -import pekko.annotation.ApiMayChange import pekko.dispatch.ExecutionContexts import pekko.event.{ LogMarker, LoggingAdapter, MarkerLoggingAdapter } import pekko.japi.Pair @@ -4624,11 +4623,12 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr * @param harvest this is invoked before emit within the current stage/operator * @param emitOnTimer decide whether the current aggregated elements can be emitted, the custom function is invoked on every interval */ - @ApiMayChange + @deprecated("Use the overloaded one which accepts an Optional instead.", since = "1.2.0") def aggregateWithBoundary[Agg, Emit](allocate: java.util.function.Supplier[Agg], aggregate: function.Function2[Agg, Out, Pair[Agg, Boolean]], harvest: function.Function[Agg, Emit], - emitOnTimer: Pair[java.util.function.Predicate[Agg], java.time.Duration]): javadsl.Flow[In, Emit, Mat] = + emitOnTimer: Pair[java.util.function.Predicate[Agg], java.time.Duration]) + : javadsl.Flow[In, Emit, Mat] = { asScala .aggregateWithBoundary(() => allocate.get())( aggregate = (agg, out) => aggregate.apply(agg, out).toScala, @@ -4637,6 +4637,41 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr case Pair(predicate, duration) => (agg => predicate.test(agg), duration.asScala) }) .asJava + } + + /** + * Aggregate input elements into an arbitrary data structure that can be completed and emitted downstream + * when custom condition is met which can be triggered by aggregate or timer. + * It can be thought of a more general [[groupedWeightedWithin]]. + * + * '''Emits when''' the aggregation function decides the aggregate is complete or the timer function returns true + * + * '''Backpressures when''' downstream backpressures and the aggregate is complete + * + * '''Completes when''' upstream completes and the last aggregate has been emitted downstream + * + * '''Cancels when''' downstream cancels + * + * @param allocate allocate the initial data structure for aggregated elements + * @param aggregate update the aggregated elements, return true if ready to emit after update. + * @param harvest this is invoked before emit within the current stage/operator + * @param emitOnTimer decide whether the current aggregated elements can be emitted, the custom function is invoked on every interval + */ + def aggregateWithBoundary[Agg, Emit](allocate: java.util.function.Supplier[Agg], + aggregate: function.Function2[Agg, Out, Pair[Agg, Boolean]], + harvest: function.Function[Agg, Emit], + emitOnTimer: Optional[Pair[java.util.function.Predicate[Agg], java.time.Duration]]) + : javadsl.Flow[In, Emit, Mat] = { + import org.apache.pekko.util.OptionConverters._ + asScala + .aggregateWithBoundary(() => allocate.get())( + aggregate = (agg, out) => aggregate.apply(agg, out).toScala, + harvest = agg => harvest.apply(agg), + emitOnTimer = emitOnTimer.toScala.map { + case Pair(predicate, duration) => (agg => predicate.test(agg), duration.asScala) + }) + .asJava + } override def getAttributes: Attributes = delegate.getAttributes diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala index 15afdc250c..7be481b4c5 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala @@ -28,7 +28,6 @@ import scala.reflect.ClassTag import org.apache.pekko import pekko.{ Done, NotUsed } import pekko.actor.{ ActorRef, Cancellable, ClassicActorSystemProvider } -import pekko.annotation.ApiMayChange import pekko.dispatch.ExecutionContexts import pekko.event.{ LogMarker, LoggingAdapter, MarkerLoggingAdapter } import pekko.japi.{ function, JavaPartialFunction, Pair } @@ -5170,11 +5169,11 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[ * @param harvest this is invoked before emit within the current stage/operator * @param emitOnTimer decide whether the current aggregated elements can be emitted, the custom function is invoked on every interval */ - @ApiMayChange + @deprecated("Use the overloaded one which accepts an Optional instead.", since = "1.2.0") def aggregateWithBoundary[Agg, Emit](allocate: java.util.function.Supplier[Agg], aggregate: function.Function2[Agg, Out, Pair[Agg, Boolean]], harvest: function.Function[Agg, Emit], - emitOnTimer: Pair[java.util.function.Predicate[Agg], java.time.Duration]): javadsl.Source[Emit, Mat] = + emitOnTimer: Pair[java.util.function.Predicate[Agg], java.time.Duration]): javadsl.Source[Emit, Mat] = { asScala .aggregateWithBoundary(() => allocate.get())( aggregate = (agg, out) => aggregate.apply(agg, out).toScala, @@ -5183,6 +5182,40 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[ case Pair(predicate, duration) => (agg => predicate.test(agg), duration.asScala) }) .asJava + } + + /** + * Aggregate input elements into an arbitrary data structure that can be completed and emitted downstream + * when custom condition is met which can be triggered by aggregate or timer. + * It can be thought of a more general [[groupedWeightedWithin]]. + * + * '''Emits when''' the aggregation function decides the aggregate is complete or the timer function returns true + * + * '''Backpressures when''' downstream backpressures and the aggregate is complete + * + * '''Completes when''' upstream completes and the last aggregate has been emitted downstream + * + * '''Cancels when''' downstream cancels + * + * @param allocate allocate the initial data structure for aggregated elements + * @param aggregate update the aggregated elements, return true if ready to emit after update. + * @param harvest this is invoked before emit within the current stage/operator + * @param emitOnTimer decide whether the current aggregated elements can be emitted, the custom function is invoked on every interval + */ + def aggregateWithBoundary[Agg, Emit](allocate: java.util.function.Supplier[Agg], + aggregate: function.Function2[Agg, Out, Pair[Agg, Boolean]], + harvest: function.Function[Agg, Emit], + emitOnTimer: Optional[Pair[java.util.function.Predicate[Agg], java.time.Duration]]): javadsl.Source[Emit, Mat] = { + import org.apache.pekko.util.OptionConverters._ + asScala + .aggregateWithBoundary(() => allocate.get())( + aggregate = (agg, out) => aggregate.apply(agg, out).toScala, + harvest = agg => harvest.apply(agg), + emitOnTimer = emitOnTimer.toScala.map { + case Pair(predicate, duration) => (agg => predicate.test(agg), duration.asScala) + }) + .asJava + } override def getAttributes: Attributes = delegate.getAttributes diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala index 04d251fb1d..5358de307a 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala @@ -25,7 +25,6 @@ import scala.reflect.ClassTag import org.apache.pekko import pekko.NotUsed -import pekko.annotation.ApiMayChange import pekko.event.{ LogMarker, LoggingAdapter, MarkerLoggingAdapter } import pekko.japi.{ function, Pair } import pekko.stream._ @@ -3129,11 +3128,12 @@ class SubFlow[In, Out, Mat]( * @param harvest this is invoked before emit within the current stage/operator * @param emitOnTimer decide whether the current aggregated elements can be emitted, the custom function is invoked on every interval */ - @ApiMayChange + @deprecated("Use the overloaded one which accepts an Optional instead.", since = "1.2.0") def aggregateWithBoundary[Agg, Emit](allocate: java.util.function.Supplier[Agg], aggregate: function.Function2[Agg, Out, Pair[Agg, Boolean]], harvest: function.Function[Agg, Emit], - emitOnTimer: Pair[java.util.function.Predicate[Agg], java.time.Duration]): javadsl.SubFlow[In, Emit, Mat] = + emitOnTimer: Pair[java.util.function.Predicate[Agg], java.time.Duration]) + : javadsl.SubFlow[In, Emit, Mat] = { new SubFlow( asScala.aggregateWithBoundary(() => allocate.get())( aggregate = (agg, out) => aggregate.apply(agg, out).toScala, @@ -3141,5 +3141,39 @@ class SubFlow[In, Out, Mat]( emitOnTimer = Option(emitOnTimer).map { case Pair(predicate, duration) => (agg => predicate.test(agg), duration.asScala) })) + } + + /** + * Aggregate input elements into an arbitrary data structure that can be completed and emitted downstream + * when custom condition is met which can be triggered by aggregate or timer. + * It can be thought of a more general [[groupedWeightedWithin]]. + * + * '''Emits when''' the aggregation function decides the aggregate is complete or the timer function returns true + * + * '''Backpressures when''' downstream backpressures and the aggregate is complete + * + * '''Completes when''' upstream completes and the last aggregate has been emitted downstream + * + * '''Cancels when''' downstream cancels + * + * @param allocate allocate the initial data structure for aggregated elements + * @param aggregate update the aggregated elements, return true if ready to emit after update. + * @param harvest this is invoked before emit within the current stage/operator + * @param emitOnTimer decide whether the current aggregated elements can be emitted, the custom function is invoked on every interval + */ + def aggregateWithBoundary[Agg, Emit](allocate: java.util.function.Supplier[Agg], + aggregate: function.Function2[Agg, Out, Pair[Agg, Boolean]], + harvest: function.Function[Agg, Emit], + emitOnTimer: Optional[Pair[java.util.function.Predicate[Agg], java.time.Duration]]) + : javadsl.SubFlow[In, Emit, Mat] = { + import org.apache.pekko.util.OptionConverters._ + new SubFlow( + asScala.aggregateWithBoundary(() => allocate.get())( + aggregate = (agg, out) => aggregate.apply(agg, out).toScala, + harvest = agg => harvest.apply(agg), + emitOnTimer = emitOnTimer.toScala.map { + case Pair(predicate, duration) => (agg => predicate.test(agg), duration.asScala) + })) + } } diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala index bea6c8c0db..3ab439ca9f 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala @@ -25,7 +25,6 @@ import scala.reflect.ClassTag import org.apache.pekko import pekko.NotUsed -import pekko.annotation.ApiMayChange import pekko.event.{ LogMarker, LoggingAdapter, MarkerLoggingAdapter } import pekko.japi.{ function, Pair } import pekko.stream._ @@ -3097,11 +3096,12 @@ class SubSource[Out, Mat]( * @param harvest this is invoked before emit within the current stage/operator * @param emitOnTimer decide whether the current aggregated elements can be emitted, the custom function is invoked on every interval */ - @ApiMayChange + @deprecated("Use the overloaded one which accepts an Optional instead.", since = "1.2.0") def aggregateWithBoundary[Agg, Emit](allocate: java.util.function.Supplier[Agg], aggregate: function.Function2[Agg, Out, Pair[Agg, Boolean]], harvest: function.Function[Agg, Emit], - emitOnTimer: Pair[java.util.function.Predicate[Agg], java.time.Duration]): javadsl.SubSource[Emit, Mat] = + emitOnTimer: Pair[java.util.function.Predicate[Agg], java.time.Duration]) + : javadsl.SubSource[Emit, Mat] = { new SubSource( asScala.aggregateWithBoundary(() => allocate.get())( aggregate = (agg, out) => aggregate.apply(agg, out).toScala, @@ -3109,4 +3109,38 @@ class SubSource[Out, Mat]( emitOnTimer = Option(emitOnTimer).map { case Pair(predicate, duration) => (agg => predicate.test(agg), duration.asScala) })) + } + + /** + * Aggregate input elements into an arbitrary data structure that can be completed and emitted downstream + * when custom condition is met which can be triggered by aggregate or timer. + * It can be thought of a more general [[groupedWeightedWithin]]. + * + * '''Emits when''' the aggregation function decides the aggregate is complete or the timer function returns true + * + * '''Backpressures when''' downstream backpressures and the aggregate is complete + * + * '''Completes when''' upstream completes and the last aggregate has been emitted downstream + * + * '''Cancels when''' downstream cancels + * + * @param allocate allocate the initial data structure for aggregated elements + * @param aggregate update the aggregated elements, return true if ready to emit after update. + * @param harvest this is invoked before emit within the current stage/operator + * @param emitOnTimer decide whether the current aggregated elements can be emitted, the custom function is invoked on every interval + */ + def aggregateWithBoundary[Agg, Emit](allocate: java.util.function.Supplier[Agg], + aggregate: function.Function2[Agg, Out, Pair[Agg, Boolean]], + harvest: function.Function[Agg, Emit], + emitOnTimer: Optional[Pair[java.util.function.Predicate[Agg], java.time.Duration]]) + : javadsl.SubSource[Emit, Mat] = { + import org.apache.pekko.util.OptionConverters._ + new SubSource( + asScala.aggregateWithBoundary(() => allocate.get())( + aggregate = (agg, out) => aggregate.apply(agg, out).toScala, + harvest = agg => harvest.apply(agg), + emitOnTimer = emitOnTimer.toScala.map { + case Pair(predicate, duration) => (agg => predicate.test(agg), duration.asScala) + })) + } }