diff --git a/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/aggregateWithBoundary.md b/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/aggregateWithBoundary.md
new file mode 100644
index 0000000000..d2d91614df
--- /dev/null
+++ b/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/aggregateWithBoundary.md
@@ -0,0 +1,31 @@
+# aggregateWithBoundary
+
+Aggregate and emit until custom boundary condition met.
+
+@ref[Backpressure aware operators](../index.md#backpressure-aware-operators)
+
+@ref[Timer driven operators](../index.md#timer-driven-operators)
+
+## Signature
+
+@apidoc[Source.aggregateWithBoundary](Source) { scala="#aggregateWithBoundary[Agg,Emit](allocate:()=%3EAgg)(aggregate:(Agg,Out)=%3E(Agg,Boolean),harvest:Agg=%3EEmit,emitOnTimer:Option[(Agg=%3EBoolean,scala.concurrent.duration.FiniteDuration)]):FlowOps.this.Repr[Emit]" java="#aggregateWithBoundary(java.util.function.Supplier,akka.japi.function.Function2,akka.japi.function.Function,akka.japi.Pair)"}
+@apidoc[Flow.aggregateWithBoundary](Flow) { scala="#aggregateWithBoundary[Agg,Emit](allocate:()=%3EAgg)(aggregate:(Agg,Out)=%3E(Agg,Boolean),harvest:Agg=%3EEmit,emitOnTimer:Option[(Agg=%3EBoolean,scala.concurrent.duration.FiniteDuration)]):FlowOps.this.Repr[Emit]" java="#aggregateWithBoundary(java.util.function.Supplier,akka.japi.function.Function2,akka.japi.function.Function,akka.japi.Pair)" }
+
+
+## Description
+
+This operator can be customized into a broad class of aggregate/group/fold operators, based on custom state or timer conditions.
+
+## Reactive Streams semantics
+
+@@@div { .callout }
+
+**emits** when the aggregation function decides the aggregate is complete or the timer function returns true
+
+**backpressures** when downstream backpressures and the aggregate is complete
+
+**completes** when upstream completes and the last aggregate has been emitted downstream
+
+**cancels** when downstream cancels
+
+@@@
\ No newline at end of file
diff --git a/akka-docs/src/main/paradox/stream/operators/index.md b/akka-docs/src/main/paradox/stream/operators/index.md
index abc0fa1510..ffc086b25e 100644
--- a/akka-docs/src/main/paradox/stream/operators/index.md
+++ b/akka-docs/src/main/paradox/stream/operators/index.md
@@ -215,6 +215,7 @@ These operators are aware of the backpressure provided by their downstreams and
| |Operator|Description|
|--|--|--|
+|Source/Flow|@ref[aggregateWithBoundary](Source-or-Flow/aggregateWithBoundary.md)|Aggregate and emit until custom boundary condition met.|
|Source/Flow|@ref[batch](Source-or-Flow/batch.md)|Allow for a slower downstream by passing incoming elements and a summary into an aggregate function as long as there is backpressure and a maximum number of batched elements is not yet reached.|
|Source/Flow|@ref[batchWeighted](Source-or-Flow/batchWeighted.md)|Allow for a slower downstream by passing incoming elements and a summary into an aggregate function as long as there is backpressure and a maximum weight batched elements is not yet reached.|
|Source/Flow|@ref[buffer](Source-or-Flow/buffer.md)|Allow for a temporarily faster upstream events by buffering `size` elements.|
@@ -366,6 +367,7 @@ For more background see the @ref[Error Handling in Streams](../stream-error.md)
* [actorRefWithBackpressure](Sink/actorRefWithBackpressure.md)
* [actorRefWithBackpressure](ActorSource/actorRefWithBackpressure.md)
* [actorRefWithBackpressure](ActorSink/actorRefWithBackpressure.md)
+* [aggregateWithBoundary](Source-or-Flow/aggregateWithBoundary.md)
* [alsoTo](Source-or-Flow/alsoTo.md)
* [asFlowWithContext](Flow/asFlowWithContext.md)
* [asInputStream](StreamConverters/asInputStream.md)
diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/AggregateWithBoundarySpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/AggregateWithBoundarySpec.scala
new file mode 100644
index 0000000000..2c51bbc629
--- /dev/null
+++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/AggregateWithBoundarySpec.scala
@@ -0,0 +1,274 @@
+/*
+ * Copyright (C) 2021 Lightbend Inc.
+ */
+
+package akka.stream.scaladsl
+
+import akka.actor.ActorSystem
+import akka.stream.OverflowStrategy
+import akka.stream.testkit.{ StreamSpec, TestPublisher, TestSubscriber }
+import akka.testkit.{ AkkaSpec, ExplicitlyTriggeredScheduler }
+import com.typesafe.config.{ ConfigFactory, ConfigValueFactory }
+import org.scalatest.matchers.should.Matchers
+import org.scalatest.wordspec.AnyWordSpecLike
+
+import scala.collection.mutable.ListBuffer
+import scala.concurrent.Await
+import scala.concurrent.duration._
+
+class AggregateWithBoundarySpec extends StreamSpec {
+
+ "split aggregator by size" in {
+
+ val stream = collection.immutable.Seq(1, 2, 3, 4, 5, 6, 7)
+ val groupSize = 3
+ val result = Source(stream)
+ .aggregateWithBoundary(allocate = () => ListBuffer.empty[Int])(aggregate = (buffer, i) => {
+ buffer.addOne(i)
+ (buffer, buffer.size >= groupSize)
+ }, harvest = buffer => buffer.toSeq, emitOnTimer = None)
+ .runWith(Sink.collection)
+
+ Await.result(result, 10.seconds) should be(stream.grouped(groupSize).toSeq)
+
+ }
+
+ "split aggregator by size and harvest" in {
+ val stream = collection.immutable.Seq(1, 2, 3, 4, 5, 6, 7)
+ val groupSize = 3
+ val result = Source(stream)
+ .aggregateWithBoundary(allocate = () => ListBuffer.empty[Int])(
+ aggregate = (buffer, i) => {
+ buffer.addOne(i)
+ (buffer, buffer.size >= groupSize)
+ },
+ harvest = buffer => buffer.toSeq :+ -1, // append -1 to output to demonstrate the effect of harvest
+ emitOnTimer = None)
+ .runWith(Sink.collection)
+
+ Await.result(result, 10.seconds) should be(stream.grouped(groupSize).toSeq.map(seq => seq :+ -1))
+
+ }
+
+ "split aggregator by custom weight condition" in {
+ val stream = collection.immutable.Seq(1, 2, 3, 4, 5, 6, 7)
+ val weight = 10
+
+ val result = Source(stream)
+ .aggregateWithBoundary(allocate = () => ListBuffer.empty[Int])(aggregate = (buffer, i) => {
+ buffer.addOne(i)
+ (buffer, buffer.sum >= weight)
+ }, harvest = buffer => buffer.toSeq, emitOnTimer = None)
+ .runWith(Sink.collection)
+
+ Await.result(result, 10.seconds) should be(Seq(Seq(1, 2, 3, 4), Seq(5, 6), Seq(7)))
+ }
+
+}
+
+// To run multiple tests in parallel using simulated timer,
+// the tests must be in separate Specs with different instances of the ActorSystem
+class AggregateWithTimeBoundaryAndSimulatedTimeSpec extends AnyWordSpecLike with Matchers {
+
+ private def createActorSystem(id: String) =
+ ActorSystem(
+ s"ActorSystemWithExplicitlyTriggeredScheduler-$id",
+ ConfigFactory.load(
+ AkkaSpec.testConf.withValue(
+ "akka.scheduler.implementation",
+ ConfigValueFactory.fromAnyRef("akka.testkit.ExplicitlyTriggeredScheduler"))))
+
+ private def getEts(actor: ActorSystem): ExplicitlyTriggeredScheduler = {
+ actor.scheduler match {
+ case ets: ExplicitlyTriggeredScheduler => ets
+ case other => throw new Exception(s"expecting ${classOf[ExplicitlyTriggeredScheduler]} but got ${other.getClass}")
+ }
+ }
+
+ private def timePasses(amount: FiniteDuration)(implicit actorSystem: ActorSystem): Unit =
+ getEts(actorSystem).timePasses(amount)
+
+ private def schedulerTimeMs(implicit actorSystem: ActorSystem): Long = getEts(actorSystem).currentTimeMs
+
+ implicit class SourceWrapper[+Out, +Mat](val source: Source[Out, Mat]) {
+
+ /**
+ * This is a convenient wrapper of [[aggregateWithBoundary]] to handle additional time constraints
+ * @param maxGap the gap allowed between consecutive aggregate operations
+ * @param maxDuration the duration of the sequence of aggregate operations from initial seed until emit is triggered
+ * @param interval interval of the timer to check the maxGap and maxDuration condition
+ * @param currentTimeMs source of the system time, can be simulated time in testing
+ */
+ def aggregateWithTimeBoundary[Agg, Emit](allocate: => Agg)(
+ aggregate: (Agg, Out) => (Agg, Boolean),
+ harvest: Agg => Emit,
+ maxGap: Option[FiniteDuration],
+ maxDuration: Option[FiniteDuration],
+ interval: FiniteDuration,
+ currentTimeMs: => Long): source.Repr[Emit] = {
+ require(
+ maxDuration.nonEmpty || maxGap.nonEmpty,
+ s"required timing condition maxGap and maxDuration are both missing, use aggregateWithBoundary if it's intended")
+
+ class ValueTimeWrapper[T](var value: T) {
+ var firstTime: Long = -1
+ var lastTime: Long = -1
+ def updateTime(time: Long): Unit = {
+ if (firstTime == -1) firstTime = time
+ lastTime = time
+ }
+ }
+
+ source.aggregateWithBoundary(allocate = () => new ValueTimeWrapper(value = allocate))(aggregate = (agg, in) => {
+ agg.updateTime(currentTimeMs)
+ // user provided Agg type must be mutable
+ val (updated, result) = aggregate(agg.value, in)
+ agg.value = updated
+ (agg, result)
+ }, harvest = agg => harvest(agg.value), emitOnTimer = Some((agg => {
+ val currentTime = currentTimeMs
+ maxDuration.exists(md => currentTime - agg.firstTime >= md.toMillis) ||
+ maxGap.exists(mg => currentTime - agg.lastTime >= mg.toMillis)
+ }, interval)))
+ }
+ }
+
+ "split aggregator by gap for slow upstream" in {
+
+ implicit val actorSystem = createActorSystem("1")
+
+ val maxGap = 20.seconds
+
+ val p = TestPublisher.probe[Int]()
+
+ val result = Source
+ .fromPublisher(p)
+ .aggregateWithTimeBoundary(allocate = ListBuffer.empty[Int])(
+ aggregate = (buffer, i) => {
+ buffer.addOne(i)
+ (buffer, false)
+ },
+ harvest = seq => seq.toSeq,
+ maxGap = Some(maxGap), // elements with longer gap will put put to next aggregator
+ maxDuration = None,
+ currentTimeMs = schedulerTimeMs,
+ interval = 1.milli)
+ .buffer(1, OverflowStrategy.backpressure)
+ .runWith(Sink.collection)
+
+ p.sendNext(1)
+ timePasses(maxGap / 2) // less than maxGap should not cause emit
+ p.sendNext(2)
+ timePasses(maxGap)
+
+ p.sendNext(3)
+ timePasses(maxGap / 2) // less than maxGap should not cause emit
+ p.sendNext(4)
+ timePasses(maxGap)
+
+ p.sendNext(5)
+ timePasses(maxGap / 2) // less than maxGap should not cause emit
+ p.sendNext(6)
+ timePasses(maxGap / 2) // less than maxGap should not cause emit and it does not accumulate
+ p.sendNext(7)
+ p.sendComplete()
+
+ Await.result(result, 10.seconds) should be(Seq(Seq(1, 2), Seq(3, 4), Seq(5, 6, 7)))
+
+ }
+
+ "split aggregator by total duration" in {
+
+ implicit val actorSystem = createActorSystem("2")
+
+ val maxDuration = 400.seconds
+
+ val p = TestPublisher.probe[Int]()
+
+ val result = Source
+ .fromPublisher(p)
+ .aggregateWithTimeBoundary(allocate = ListBuffer.empty[Int])(
+ aggregate = (buffer, i) => {
+ buffer.addOne(i)
+ (buffer, false)
+ },
+ harvest = seq => seq.toSeq,
+ maxGap = None,
+ maxDuration = Some(maxDuration), // elements with longer gap will put put to next aggregator
+ currentTimeMs = schedulerTimeMs,
+ interval = 1.milli)
+ .buffer(1, OverflowStrategy.backpressure)
+ .runWith(Sink.collection)
+
+ p.sendNext(1)
+ timePasses(maxDuration / 4)
+ p.sendNext(2)
+ timePasses(maxDuration / 4)
+ p.sendNext(3)
+ timePasses(maxDuration / 4)
+ p.sendNext(4)
+ timePasses(maxDuration / 4) // maxDuration will accumulate and reach threshold here
+
+ p.sendNext(5)
+ p.sendNext(6)
+ p.sendNext(7)
+ p.sendComplete()
+
+ Await.result(result, 10.seconds) should be(Seq(Seq(1, 2, 3, 4), Seq(5, 6, 7)))
+
+ }
+
+ "down stream back pressure should not miss data on completion with pull on start" in {
+
+ implicit val actorSystem = createActorSystem("3")
+
+ val maxGap = 1.second
+ val upstream = TestPublisher.probe[Int]()
+ val downstream = TestSubscriber.probe[Seq[Int]]()
+
+ Source
+ .fromPublisher(upstream)
+ .aggregateWithTimeBoundary(allocate = ListBuffer.empty[Int])(
+ aggregate = (buffer, i) => {
+ buffer.addOne(i)
+ (buffer, false)
+ },
+ harvest = buffer => buffer.toSeq,
+ maxGap = Some(maxGap),
+ maxDuration = None,
+ currentTimeMs = schedulerTimeMs,
+ interval = 1.milli)
+ .buffer(1, OverflowStrategy.backpressure)
+ .to(Sink.fromSubscriber(downstream))
+ .run()
+
+ downstream.ensureSubscription()
+ upstream.sendNext(1) // onPush(1) -> aggregator=Seq(1), due to the preStart pull, will pull upstream again since queue is empty
+ timePasses(maxGap) // harvest onTimer, queue=Queue(Seq(1)), aggregator=null
+ upstream.sendNext(2) // onPush(2) -> aggregator=Seq(2), due to the previous pull, even the queue is already full at this point due to timer, but it won't pull upstream again
+ timePasses(maxGap) // harvest onTimer, queue=(Seq(1), Seq(2)), aggregator=null, note queue size can be 1 more than the threshold
+ upstream.sendNext(3) // 3 will not be pushed to the stage until the stage pull upstream
+ timePasses(maxGap) // since 3 stayed outside of the stage, this gap will not cause 3 to be emitted
+ downstream.request(1).expectNext(Seq(1)) // onPull emit Seq(1), queue=(Seq(2))
+ timePasses(maxGap) // since 3 stayed outside of the stage, this gap will not cause 3 to be emitted
+ downstream
+ .request(1)
+ .expectNext(Seq(2)) // onPull emit Seq(2). queue is empty now, pull upstream and 3 will be pushed into the stage
+ // onPush(3) -> aggregator=Seq(3) pull upstream since queue is empty
+ downstream
+ .request(1)
+ .expectNoMessage() // onPull, no data to emit, won't pull upstream again since it's already pulled
+ timePasses(maxGap) // emit Seq(3) onTimer
+ downstream.expectNext(Seq(3))
+ upstream.sendNext(4) // onPush(4) -> aggregator=Seq(4) will follow, and pull upstream again
+ upstream.sendNext(5) // onPush(5) -> aggregator=Seq(4,5) will happen right after due to the previous pull from onPush(4), eagerly pull even out is not available
+ upstream.sendNext(6) // onPush(6) -> aggregator=Seq(4,5,6) will happen right after due to the previous pull from onPush(5), even the queue is full at this point
+ timePasses(maxGap) // harvest queue=(Seq(4,5,6))
+ upstream.sendNext(7) // onPush(7), aggregator=Seq(7), queue=(Seq(4,5,6) no pulling upstream due to queue is full
+ // if sending another message it will stay in upstream, prevent the upstream completion from happening
+ upstream.sendComplete() // emit remaining queue=Queue(Seq(4,5,6)) + harvest and emit aggregator=Seq(7)
+ // since there is no pending push from upstream, onUpstreamFinish will be triggered to emit the queue and pending aggregator
+ downstream.request(2).expectNext(Seq(4, 5, 6), Seq(7)) // clean up the emit queue and complete downstream
+ downstream.expectComplete()
+ }
+}
diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/AggregateWithBoundary.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/AggregateWithBoundary.scala
new file mode 100644
index 0000000000..a80bd59efe
--- /dev/null
+++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/AggregateWithBoundary.scala
@@ -0,0 +1,80 @@
+/*
+ * Copyright (C) 2021 Lightbend Inc.
+ */
+
+package akka.stream.impl.fusing
+
+import akka.annotation.InternalApi
+import akka.stream.{ Attributes, FlowShape, Inlet, Outlet }
+import akka.stream.stage.{ GraphStage, GraphStageLogic, InHandler, OutHandler, TimerGraphStageLogic }
+
+import scala.concurrent.duration._
+
+/**
+ * INTERNAL API
+ */
+@InternalApi
+private[akka] final case class AggregateWithBoundary[In, Agg, Out](
+ allocate: () => Agg,
+ aggregate: (Agg, In) => (Agg, Boolean),
+ harvest: Agg => Out,
+ emitOnTimer: Option[(Agg => Boolean, FiniteDuration)])
+ extends GraphStage[FlowShape[In, Out]] {
+
+ emitOnTimer.foreach {
+ case (_, interval) => require(interval.gteq(1.milli), s"timer(${interval.toCoarsest}) must not be smaller than 1ms")
+ }
+
+ val in: Inlet[In] = Inlet[In](s"${this.getClass.getName}.in")
+ val out: Outlet[Out] = Outlet[Out](s"${this.getClass.getName}.out")
+ override val shape: FlowShape[In, Out] = FlowShape(in, out)
+
+ override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
+ new TimerGraphStageLogic(shape) with InHandler with OutHandler {
+
+ private[this] var aggregated: Agg = null.asInstanceOf[Agg]
+
+ override def preStart(): Unit = {
+ emitOnTimer.foreach {
+ case (_, interval) => scheduleWithFixedDelay(s"${this.getClass.getSimpleName}Timer", interval, interval)
+ }
+ }
+
+ override protected def onTimer(timerKey: Any): Unit = {
+ emitOnTimer.foreach {
+ case (isReadyOnTimer, _) => if (aggregated != null && isReadyOnTimer(aggregated)) harvestAndEmit()
+ }
+ }
+
+ // at onPush, isAvailable(in)=true hasBeenPulled(in)=false, isAvailable(out) could be true or false due to timer triggered emit
+ override def onPush(): Unit = {
+ if (aggregated == null) aggregated = allocate()
+ val (updated, result) = aggregate(aggregated, grab(in))
+ aggregated = updated
+ if (result) harvestAndEmit()
+ // the decision to pull entirely depend on isAvailable(out)=true, regardless of result of aggregate
+ // 1. aggregate=true: isAvailable(out) will be false
+ // 2. aggregate=false: if isAvailable(out)=false, this means timer has caused emit, cannot pull or it could emit indefinitely bypassing back pressure
+ if (isAvailable(out)) pull(in)
+ }
+
+ override def onUpstreamFinish(): Unit = {
+ // Note that emit is asynchronous, it will keep the stage alive until downstream actually take the element
+ if (aggregated != null) emit(out, harvest(aggregated))
+ completeStage()
+ }
+
+ // at onPull, isAvailable(out) is always true indicating downstream is waiting
+ // isAvailable(in) and hasBeenPulled(in) can be (true, false) (false, true) or (false, false)
+ override def onPull(): Unit = if (!hasBeenPulled(in)) pull(in)
+
+ setHandlers(in, out, this)
+
+ private def harvestAndEmit(): Unit = {
+ emit(out, harvest(aggregated))
+ aggregated = null.asInstanceOf[Agg]
+ }
+
+ }
+
+}
diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala
index b00d260724..087fbf0cfc 100755
--- a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala
+++ b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala
@@ -14,14 +14,13 @@ import scala.annotation.unchecked.uncheckedVariance
import scala.compat.java8.FutureConverters._
import scala.concurrent.duration.FiniteDuration
import scala.reflect.ClassTag
-
import scala.annotation.nowarn
import org.reactivestreams.Processor
-
import akka.Done
import akka.NotUsed
import akka.actor.ActorRef
import akka.actor.ClassicActorSystemProvider
+import akka.annotation.ApiMayChange
import akka.dispatch.ExecutionContexts
import akka.event.{ LogMarker, LoggingAdapter, MarkerLoggingAdapter }
import akka.japi.Pair
@@ -3978,6 +3977,37 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr
extractContext: function.Function[Out, CtxOut]): FlowWithContext[U, CtxU, Out, CtxOut, Mat] =
this.asScala.asFlowWithContext((x: U, c: CtxU) => collapseContext.apply(x, c))(x => extractContext.apply(x)).asJava
+ /**
+ * Aggregate input elements into an arbitrary data structure that can be completed and emitted downstream
+ * when custom condition is met which can be triggered by aggregate or timer.
+ * It can be thought of a more general [[groupedWeightedWithin]].
+ *
+ * '''Emits when''' the aggregation function decides the aggregate is complete or the timer function returns true
+ *
+ * '''Backpressures when''' downstream backpressures and the aggregate is complete
+ *
+ * '''Completes when''' upstream completes and the last aggregate has been emitted downstream
+ *
+ * '''Cancels when''' downstream cancels
+ *
+ * @param allocate allocate the initial data structure for aggregated elements
+ * @param aggregate update the aggregated elements, return true if ready to emit after update.
+ * @param harvest this is invoked before emit within the current stage/operator
+ * @param emitOnTimer decide whether the current aggregated elements can be emitted, the custom function is invoked on every interval
+ */
+ @ApiMayChange
+ def aggregateWithBoundary[Agg, Emit](allocate: java.util.function.Supplier[Agg])(
+ aggregate: function.Function2[Agg, Out, Pair[Agg, Boolean]],
+ harvest: function.Function[Agg, Emit],
+ emitOnTimer: Pair[java.util.function.Predicate[Agg], java.time.Duration]): javadsl.Flow[In, Emit, Mat] =
+ asScala
+ .aggregateWithBoundary(() => allocate.get())(
+ aggregate = (agg, out) => aggregate.apply(agg, out).toScala,
+ harvest = agg => harvest.apply(agg),
+ emitOnTimer = Option(emitOnTimer).map {
+ case Pair(predicate, duration) => (agg => predicate.test(agg), duration.asScala)
+ })
+ .asJava
}
object RunnableGraph {
diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala
index 47d3219fac..21da0cef80 100755
--- a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala
+++ b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala
@@ -16,11 +16,11 @@ import scala.compat.java8.OptionConverters._
import scala.concurrent.{ Future, Promise }
import scala.concurrent.duration.FiniteDuration
import scala.reflect.ClassTag
-
import scala.annotation.nowarn
import org.reactivestreams.{ Publisher, Subscriber }
import akka.{ Done, NotUsed }
import akka.actor.{ ActorRef, Cancellable, ClassicActorSystemProvider }
+import akka.annotation.ApiMayChange
import akka.dispatch.ExecutionContexts
import akka.event.{ LogMarker, LoggingAdapter, MarkerLoggingAdapter }
import akka.japi.{ function, JavaPartialFunction, Pair, Util }
@@ -4522,4 +4522,36 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[
**/
def asSourceWithContext[Ctx](extractContext: function.Function[Out, Ctx]): SourceWithContext[Out, Ctx, Mat] =
new scaladsl.SourceWithContext(this.asScala.map(x => (x, extractContext.apply(x)))).asJava
+
+ /**
+ * Aggregate input elements into an arbitrary data structure that can be completed and emitted downstream
+ * when custom condition is met which can be triggered by aggregate or timer.
+ * It can be thought of a more general [[groupedWeightedWithin]].
+ *
+ * '''Emits when''' the aggregation function decides the aggregate is complete or the timer function returns true
+ *
+ * '''Backpressures when''' downstream backpressures and the aggregate is complete
+ *
+ * '''Completes when''' upstream completes and the last aggregate has been emitted downstream
+ *
+ * '''Cancels when''' downstream cancels
+ *
+ * @param allocate allocate the initial data structure for aggregated elements
+ * @param aggregate update the aggregated elements, return true if ready to emit after update.
+ * @param harvest this is invoked before emit within the current stage/operator
+ * @param emitOnTimer decide whether the current aggregated elements can be emitted, the custom function is invoked on every interval
+ */
+ @ApiMayChange
+ def aggregateWithBoundary[Agg, Emit](allocate: java.util.function.Supplier[Agg])(
+ aggregate: function.Function2[Agg, Out, Pair[Agg, Boolean]],
+ harvest: function.Function[Agg, Emit],
+ emitOnTimer: Pair[java.util.function.Predicate[Agg], java.time.Duration]): javadsl.Source[Emit, Mat] =
+ asScala
+ .aggregateWithBoundary(() => allocate.get())(
+ aggregate = (agg, out) => aggregate.apply(agg, out).toScala,
+ harvest = agg => harvest.apply(agg),
+ emitOnTimer = Option(emitOnTimer).map {
+ case Pair(predicate, duration) => (agg => predicate.test(agg), duration.asScala)
+ })
+ .asJava
}
diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/SubFlow.scala b/akka-stream/src/main/scala/akka/stream/javadsl/SubFlow.scala
index 3ccf927d90..4cfe4744a2 100755
--- a/akka-stream/src/main/scala/akka/stream/javadsl/SubFlow.scala
+++ b/akka-stream/src/main/scala/akka/stream/javadsl/SubFlow.scala
@@ -16,6 +16,7 @@ import scala.reflect.ClassTag
import scala.annotation.nowarn
import akka.NotUsed
+import akka.annotation.ApiMayChange
import akka.event.{ LogMarker, LoggingAdapter, MarkerLoggingAdapter }
import akka.japi.{ function, Pair, Util }
import akka.stream._
@@ -2619,4 +2620,35 @@ class SubFlow[In, Out, Mat](
def logWithMarker(name: String, marker: function.Function[Out, LogMarker]): SubFlow[In, Out, Mat] =
this.logWithMarker(name, marker, ConstantFun.javaIdentityFunction[Out], null)
+ /**
+ * Aggregate input elements into an arbitrary data structure that can be completed and emitted downstream
+ * when custom condition is met which can be triggered by aggregate or timer.
+ * It can be thought of a more general [[groupedWeightedWithin]].
+ *
+ * '''Emits when''' the aggregation function decides the aggregate is complete or the timer function returns true
+ *
+ * '''Backpressures when''' downstream backpressures and the aggregate is complete
+ *
+ * '''Completes when''' upstream completes and the last aggregate has been emitted downstream
+ *
+ * '''Cancels when''' downstream cancels
+ *
+ * @param allocate allocate the initial data structure for aggregated elements
+ * @param aggregate update the aggregated elements, return true if ready to emit after update.
+ * @param harvest this is invoked before emit within the current stage/operator
+ * @param emitOnTimer decide whether the current aggregated elements can be emitted, the custom function is invoked on every interval
+ */
+ @ApiMayChange
+ def aggregateWithBoundary[Agg, Emit](allocate: java.util.function.Supplier[Agg])(
+ aggregate: function.Function2[Agg, Out, Pair[Agg, Boolean]],
+ harvest: function.Function[Agg, Emit],
+ emitOnTimer: Pair[java.util.function.Predicate[Agg], java.time.Duration]): javadsl.SubFlow[In, Emit, Mat] =
+ new SubFlow(
+ asScala.aggregateWithBoundary(() => allocate.get())(
+ aggregate = (agg, out) => aggregate.apply(agg, out).toScala,
+ harvest = agg => harvest.apply(agg),
+ emitOnTimer = Option(emitOnTimer).map {
+ case Pair(predicate, duration) => (agg => predicate.test(agg), duration.asScala)
+ }))
+
}
diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/SubSource.scala b/akka-stream/src/main/scala/akka/stream/javadsl/SubSource.scala
index c8743974cc..f5a99adcf8 100755
--- a/akka-stream/src/main/scala/akka/stream/javadsl/SubSource.scala
+++ b/akka-stream/src/main/scala/akka/stream/javadsl/SubSource.scala
@@ -16,6 +16,7 @@ import scala.reflect.ClassTag
import scala.annotation.nowarn
import akka.NotUsed
+import akka.annotation.ApiMayChange
import akka.event.{ LogMarker, LoggingAdapter, MarkerLoggingAdapter }
import akka.japi.{ function, Pair, Util }
import akka.stream._
@@ -2592,4 +2593,34 @@ class SubSource[Out, Mat](
def logWithMarker(name: String, marker: function.Function[Out, LogMarker]): SubSource[Out, Mat] =
this.logWithMarker(name, marker, ConstantFun.javaIdentityFunction[Out], null)
+ /**
+ * Aggregate input elements into an arbitrary data structure that can be completed and emitted downstream
+ * when custom condition is met which can be triggered by aggregate or timer.
+ * It can be thought of a more general [[groupedWeightedWithin]].
+ *
+ * '''Emits when''' the aggregation function decides the aggregate is complete or the timer function returns true
+ *
+ * '''Backpressures when''' downstream backpressures and the aggregate is complete
+ *
+ * '''Completes when''' upstream completes and the last aggregate has been emitted downstream
+ *
+ * '''Cancels when''' downstream cancels
+ *
+ * @param allocate allocate the initial data structure for aggregated elements
+ * @param aggregate update the aggregated elements, return true if ready to emit after update.
+ * @param harvest this is invoked before emit within the current stage/operator
+ * @param emitOnTimer decide whether the current aggregated elements can be emitted, the custom function is invoked on every interval
+ */
+ @ApiMayChange
+ def aggregateWithBoundary[Agg, Emit](allocate: java.util.function.Supplier[Agg])(
+ aggregate: function.Function2[Agg, Out, Pair[Agg, Boolean]],
+ harvest: function.Function[Agg, Emit],
+ emitOnTimer: Pair[java.util.function.Predicate[Agg], java.time.Duration]): javadsl.SubSource[Emit, Mat] =
+ new SubSource(
+ asScala.aggregateWithBoundary(() => allocate.get())(
+ aggregate = (agg, out) => aggregate.apply(agg, out).toScala,
+ harvest = agg => harvest.apply(agg),
+ emitOnTimer = Option(emitOnTimer).map {
+ case Pair(predicate, duration) => (agg => predicate.test(agg), duration.asScala)
+ }))
}
diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala
index 875040a75a..c91c82a192 100755
--- a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala
+++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala
@@ -14,7 +14,7 @@ import org.reactivestreams.{ Processor, Publisher, Subscriber, Subscription }
import akka.Done
import akka.NotUsed
import akka.actor.ActorRef
-import akka.annotation.DoNotInherit
+import akka.annotation.{ ApiMayChange, DoNotInherit }
import akka.event.{ LogMarker, LoggingAdapter, MarkerLoggingAdapter }
import akka.stream.Attributes.SourceLocation
import akka.stream._
@@ -3272,6 +3272,32 @@ trait FlowOps[+Out, +Mat] {
* asynchronously.
*/
def async: Repr[Out]
+
+ /**
+ * Aggregate input elements into an arbitrary data structure that can be completed and emitted downstream
+ * when custom condition is met which can be triggered by aggregate or timer.
+ * It can be thought of a more general [[groupedWeightedWithin]].
+ *
+ * '''Emits when''' the aggregation function decides the aggregate is complete or the timer function returns true
+ *
+ * '''Backpressures when''' downstream backpressures and the aggregate is complete
+ *
+ * '''Completes when''' upstream completes and the last aggregate has been emitted downstream
+ *
+ * '''Cancels when''' downstream cancels
+ *
+ * @param allocate allocate the initial data structure for aggregated elements
+ * @param aggregate update the aggregated elements, return true if ready to emit after update.
+ * @param harvest this is invoked before emit within the current stage/operator
+ * @param emitOnTimer decide whether the current aggregated elements can be emitted, the custom function is invoked on every interval
+ */
+ @ApiMayChange
+ def aggregateWithBoundary[Agg, Emit](allocate: () => Agg)(
+ aggregate: (Agg, Out) => (Agg, Boolean),
+ harvest: Agg => Emit,
+ emitOnTimer: Option[(Agg => Boolean, FiniteDuration)]): Repr[Emit] =
+ via(AggregateWithBoundary(allocate, aggregate, harvest, emitOnTimer))
+
}
/**
diff --git a/akka-testkit/src/main/scala/akka/testkit/ExplicitlyTriggeredScheduler.scala b/akka-testkit/src/main/scala/akka/testkit/ExplicitlyTriggeredScheduler.scala
index 8e0f880472..fe12052ff7 100644
--- a/akka-testkit/src/main/scala/akka/testkit/ExplicitlyTriggeredScheduler.scala
+++ b/akka-testkit/src/main/scala/akka/testkit/ExplicitlyTriggeredScheduler.scala
@@ -123,4 +123,9 @@ class ExplicitlyTriggeredScheduler(@unused config: Config, log: LoggingAdapter,
}
override def maxFrequency: Double = 42
+
+ /**
+ * The scheduler need to expose its internal time for testing.
+ */
+ def currentTimeMs: Long = currentTime.get()
}