From 12b84fb2c7d43ab7a45d30062e413102d0d36f4a Mon Sep 17 00:00:00 2001 From: yongjiaw Date: Tue, 11 Jan 2022 07:15:07 -0800 Subject: [PATCH] New stream operator aggregateWithBoundary #30797 --- .../Source-or-Flow/aggregateWithBoundary.md | 31 ++ .../main/paradox/stream/operators/index.md | 2 + .../scaladsl/AggregateWithBoundarySpec.scala | 274 ++++++++++++++++++ .../impl/fusing/AggregateWithBoundary.scala | 80 +++++ .../main/scala/akka/stream/javadsl/Flow.scala | 34 ++- .../scala/akka/stream/javadsl/Source.scala | 34 ++- .../scala/akka/stream/javadsl/SubFlow.scala | 32 ++ .../scala/akka/stream/javadsl/SubSource.scala | 31 ++ .../scala/akka/stream/scaladsl/Flow.scala | 28 +- .../ExplicitlyTriggeredScheduler.scala | 5 + 10 files changed, 547 insertions(+), 4 deletions(-) create mode 100644 akka-docs/src/main/paradox/stream/operators/Source-or-Flow/aggregateWithBoundary.md create mode 100644 akka-stream-tests/src/test/scala/akka/stream/scaladsl/AggregateWithBoundarySpec.scala create mode 100644 akka-stream/src/main/scala/akka/stream/impl/fusing/AggregateWithBoundary.scala diff --git a/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/aggregateWithBoundary.md b/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/aggregateWithBoundary.md new file mode 100644 index 0000000000..d2d91614df --- /dev/null +++ b/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/aggregateWithBoundary.md @@ -0,0 +1,31 @@ +# aggregateWithBoundary + +Aggregate and emit until custom boundary condition met. + +@ref[Backpressure aware operators](../index.md#backpressure-aware-operators) + +@ref[Timer driven operators](../index.md#timer-driven-operators) + +## Signature + +@apidoc[Source.aggregateWithBoundary](Source) { scala="#aggregateWithBoundary[Agg,Emit](allocate:()=%3EAgg)(aggregate:(Agg,Out)=%3E(Agg,Boolean),harvest:Agg=%3EEmit,emitOnTimer:Option[(Agg=%3EBoolean,scala.concurrent.duration.FiniteDuration)]):FlowOps.this.Repr[Emit]" java="#aggregateWithBoundary(java.util.function.Supplier,akka.japi.function.Function2,akka.japi.function.Function,akka.japi.Pair)"} +@apidoc[Flow.aggregateWithBoundary](Flow) { scala="#aggregateWithBoundary[Agg,Emit](allocate:()=%3EAgg)(aggregate:(Agg,Out)=%3E(Agg,Boolean),harvest:Agg=%3EEmit,emitOnTimer:Option[(Agg=%3EBoolean,scala.concurrent.duration.FiniteDuration)]):FlowOps.this.Repr[Emit]" java="#aggregateWithBoundary(java.util.function.Supplier,akka.japi.function.Function2,akka.japi.function.Function,akka.japi.Pair)" } + + +## Description + +This operator can be customized into a broad class of aggregate/group/fold operators, based on custom state or timer conditions. + +## Reactive Streams semantics + +@@@div { .callout } + +**emits** when the aggregation function decides the aggregate is complete or the timer function returns true + +**backpressures** when downstream backpressures and the aggregate is complete + +**completes** when upstream completes and the last aggregate has been emitted downstream + +**cancels** when downstream cancels + +@@@ \ No newline at end of file diff --git a/akka-docs/src/main/paradox/stream/operators/index.md b/akka-docs/src/main/paradox/stream/operators/index.md index abc0fa1510..ffc086b25e 100644 --- a/akka-docs/src/main/paradox/stream/operators/index.md +++ b/akka-docs/src/main/paradox/stream/operators/index.md @@ -215,6 +215,7 @@ These operators are aware of the backpressure provided by their downstreams and | |Operator|Description| |--|--|--| +|Source/Flow|@ref[aggregateWithBoundary](Source-or-Flow/aggregateWithBoundary.md)|Aggregate and emit until custom boundary condition met.| |Source/Flow|@ref[batch](Source-or-Flow/batch.md)|Allow for a slower downstream by passing incoming elements and a summary into an aggregate function as long as there is backpressure and a maximum number of batched elements is not yet reached.| |Source/Flow|@ref[batchWeighted](Source-or-Flow/batchWeighted.md)|Allow for a slower downstream by passing incoming elements and a summary into an aggregate function as long as there is backpressure and a maximum weight batched elements is not yet reached.| |Source/Flow|@ref[buffer](Source-or-Flow/buffer.md)|Allow for a temporarily faster upstream events by buffering `size` elements.| @@ -366,6 +367,7 @@ For more background see the @ref[Error Handling in Streams](../stream-error.md) * [actorRefWithBackpressure](Sink/actorRefWithBackpressure.md) * [actorRefWithBackpressure](ActorSource/actorRefWithBackpressure.md) * [actorRefWithBackpressure](ActorSink/actorRefWithBackpressure.md) +* [aggregateWithBoundary](Source-or-Flow/aggregateWithBoundary.md) * [alsoTo](Source-or-Flow/alsoTo.md) * [asFlowWithContext](Flow/asFlowWithContext.md) * [asInputStream](StreamConverters/asInputStream.md) diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/AggregateWithBoundarySpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/AggregateWithBoundarySpec.scala new file mode 100644 index 0000000000..2c51bbc629 --- /dev/null +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/AggregateWithBoundarySpec.scala @@ -0,0 +1,274 @@ +/* + * Copyright (C) 2021 Lightbend Inc. + */ + +package akka.stream.scaladsl + +import akka.actor.ActorSystem +import akka.stream.OverflowStrategy +import akka.stream.testkit.{ StreamSpec, TestPublisher, TestSubscriber } +import akka.testkit.{ AkkaSpec, ExplicitlyTriggeredScheduler } +import com.typesafe.config.{ ConfigFactory, ConfigValueFactory } +import org.scalatest.matchers.should.Matchers +import org.scalatest.wordspec.AnyWordSpecLike + +import scala.collection.mutable.ListBuffer +import scala.concurrent.Await +import scala.concurrent.duration._ + +class AggregateWithBoundarySpec extends StreamSpec { + + "split aggregator by size" in { + + val stream = collection.immutable.Seq(1, 2, 3, 4, 5, 6, 7) + val groupSize = 3 + val result = Source(stream) + .aggregateWithBoundary(allocate = () => ListBuffer.empty[Int])(aggregate = (buffer, i) => { + buffer.addOne(i) + (buffer, buffer.size >= groupSize) + }, harvest = buffer => buffer.toSeq, emitOnTimer = None) + .runWith(Sink.collection) + + Await.result(result, 10.seconds) should be(stream.grouped(groupSize).toSeq) + + } + + "split aggregator by size and harvest" in { + val stream = collection.immutable.Seq(1, 2, 3, 4, 5, 6, 7) + val groupSize = 3 + val result = Source(stream) + .aggregateWithBoundary(allocate = () => ListBuffer.empty[Int])( + aggregate = (buffer, i) => { + buffer.addOne(i) + (buffer, buffer.size >= groupSize) + }, + harvest = buffer => buffer.toSeq :+ -1, // append -1 to output to demonstrate the effect of harvest + emitOnTimer = None) + .runWith(Sink.collection) + + Await.result(result, 10.seconds) should be(stream.grouped(groupSize).toSeq.map(seq => seq :+ -1)) + + } + + "split aggregator by custom weight condition" in { + val stream = collection.immutable.Seq(1, 2, 3, 4, 5, 6, 7) + val weight = 10 + + val result = Source(stream) + .aggregateWithBoundary(allocate = () => ListBuffer.empty[Int])(aggregate = (buffer, i) => { + buffer.addOne(i) + (buffer, buffer.sum >= weight) + }, harvest = buffer => buffer.toSeq, emitOnTimer = None) + .runWith(Sink.collection) + + Await.result(result, 10.seconds) should be(Seq(Seq(1, 2, 3, 4), Seq(5, 6), Seq(7))) + } + +} + +// To run multiple tests in parallel using simulated timer, +// the tests must be in separate Specs with different instances of the ActorSystem +class AggregateWithTimeBoundaryAndSimulatedTimeSpec extends AnyWordSpecLike with Matchers { + + private def createActorSystem(id: String) = + ActorSystem( + s"ActorSystemWithExplicitlyTriggeredScheduler-$id", + ConfigFactory.load( + AkkaSpec.testConf.withValue( + "akka.scheduler.implementation", + ConfigValueFactory.fromAnyRef("akka.testkit.ExplicitlyTriggeredScheduler")))) + + private def getEts(actor: ActorSystem): ExplicitlyTriggeredScheduler = { + actor.scheduler match { + case ets: ExplicitlyTriggeredScheduler => ets + case other => throw new Exception(s"expecting ${classOf[ExplicitlyTriggeredScheduler]} but got ${other.getClass}") + } + } + + private def timePasses(amount: FiniteDuration)(implicit actorSystem: ActorSystem): Unit = + getEts(actorSystem).timePasses(amount) + + private def schedulerTimeMs(implicit actorSystem: ActorSystem): Long = getEts(actorSystem).currentTimeMs + + implicit class SourceWrapper[+Out, +Mat](val source: Source[Out, Mat]) { + + /** + * This is a convenient wrapper of [[aggregateWithBoundary]] to handle additional time constraints + * @param maxGap the gap allowed between consecutive aggregate operations + * @param maxDuration the duration of the sequence of aggregate operations from initial seed until emit is triggered + * @param interval interval of the timer to check the maxGap and maxDuration condition + * @param currentTimeMs source of the system time, can be simulated time in testing + */ + def aggregateWithTimeBoundary[Agg, Emit](allocate: => Agg)( + aggregate: (Agg, Out) => (Agg, Boolean), + harvest: Agg => Emit, + maxGap: Option[FiniteDuration], + maxDuration: Option[FiniteDuration], + interval: FiniteDuration, + currentTimeMs: => Long): source.Repr[Emit] = { + require( + maxDuration.nonEmpty || maxGap.nonEmpty, + s"required timing condition maxGap and maxDuration are both missing, use aggregateWithBoundary if it's intended") + + class ValueTimeWrapper[T](var value: T) { + var firstTime: Long = -1 + var lastTime: Long = -1 + def updateTime(time: Long): Unit = { + if (firstTime == -1) firstTime = time + lastTime = time + } + } + + source.aggregateWithBoundary(allocate = () => new ValueTimeWrapper(value = allocate))(aggregate = (agg, in) => { + agg.updateTime(currentTimeMs) + // user provided Agg type must be mutable + val (updated, result) = aggregate(agg.value, in) + agg.value = updated + (agg, result) + }, harvest = agg => harvest(agg.value), emitOnTimer = Some((agg => { + val currentTime = currentTimeMs + maxDuration.exists(md => currentTime - agg.firstTime >= md.toMillis) || + maxGap.exists(mg => currentTime - agg.lastTime >= mg.toMillis) + }, interval))) + } + } + + "split aggregator by gap for slow upstream" in { + + implicit val actorSystem = createActorSystem("1") + + val maxGap = 20.seconds + + val p = TestPublisher.probe[Int]() + + val result = Source + .fromPublisher(p) + .aggregateWithTimeBoundary(allocate = ListBuffer.empty[Int])( + aggregate = (buffer, i) => { + buffer.addOne(i) + (buffer, false) + }, + harvest = seq => seq.toSeq, + maxGap = Some(maxGap), // elements with longer gap will put put to next aggregator + maxDuration = None, + currentTimeMs = schedulerTimeMs, + interval = 1.milli) + .buffer(1, OverflowStrategy.backpressure) + .runWith(Sink.collection) + + p.sendNext(1) + timePasses(maxGap / 2) // less than maxGap should not cause emit + p.sendNext(2) + timePasses(maxGap) + + p.sendNext(3) + timePasses(maxGap / 2) // less than maxGap should not cause emit + p.sendNext(4) + timePasses(maxGap) + + p.sendNext(5) + timePasses(maxGap / 2) // less than maxGap should not cause emit + p.sendNext(6) + timePasses(maxGap / 2) // less than maxGap should not cause emit and it does not accumulate + p.sendNext(7) + p.sendComplete() + + Await.result(result, 10.seconds) should be(Seq(Seq(1, 2), Seq(3, 4), Seq(5, 6, 7))) + + } + + "split aggregator by total duration" in { + + implicit val actorSystem = createActorSystem("2") + + val maxDuration = 400.seconds + + val p = TestPublisher.probe[Int]() + + val result = Source + .fromPublisher(p) + .aggregateWithTimeBoundary(allocate = ListBuffer.empty[Int])( + aggregate = (buffer, i) => { + buffer.addOne(i) + (buffer, false) + }, + harvest = seq => seq.toSeq, + maxGap = None, + maxDuration = Some(maxDuration), // elements with longer gap will put put to next aggregator + currentTimeMs = schedulerTimeMs, + interval = 1.milli) + .buffer(1, OverflowStrategy.backpressure) + .runWith(Sink.collection) + + p.sendNext(1) + timePasses(maxDuration / 4) + p.sendNext(2) + timePasses(maxDuration / 4) + p.sendNext(3) + timePasses(maxDuration / 4) + p.sendNext(4) + timePasses(maxDuration / 4) // maxDuration will accumulate and reach threshold here + + p.sendNext(5) + p.sendNext(6) + p.sendNext(7) + p.sendComplete() + + Await.result(result, 10.seconds) should be(Seq(Seq(1, 2, 3, 4), Seq(5, 6, 7))) + + } + + "down stream back pressure should not miss data on completion with pull on start" in { + + implicit val actorSystem = createActorSystem("3") + + val maxGap = 1.second + val upstream = TestPublisher.probe[Int]() + val downstream = TestSubscriber.probe[Seq[Int]]() + + Source + .fromPublisher(upstream) + .aggregateWithTimeBoundary(allocate = ListBuffer.empty[Int])( + aggregate = (buffer, i) => { + buffer.addOne(i) + (buffer, false) + }, + harvest = buffer => buffer.toSeq, + maxGap = Some(maxGap), + maxDuration = None, + currentTimeMs = schedulerTimeMs, + interval = 1.milli) + .buffer(1, OverflowStrategy.backpressure) + .to(Sink.fromSubscriber(downstream)) + .run() + + downstream.ensureSubscription() + upstream.sendNext(1) // onPush(1) -> aggregator=Seq(1), due to the preStart pull, will pull upstream again since queue is empty + timePasses(maxGap) // harvest onTimer, queue=Queue(Seq(1)), aggregator=null + upstream.sendNext(2) // onPush(2) -> aggregator=Seq(2), due to the previous pull, even the queue is already full at this point due to timer, but it won't pull upstream again + timePasses(maxGap) // harvest onTimer, queue=(Seq(1), Seq(2)), aggregator=null, note queue size can be 1 more than the threshold + upstream.sendNext(3) // 3 will not be pushed to the stage until the stage pull upstream + timePasses(maxGap) // since 3 stayed outside of the stage, this gap will not cause 3 to be emitted + downstream.request(1).expectNext(Seq(1)) // onPull emit Seq(1), queue=(Seq(2)) + timePasses(maxGap) // since 3 stayed outside of the stage, this gap will not cause 3 to be emitted + downstream + .request(1) + .expectNext(Seq(2)) // onPull emit Seq(2). queue is empty now, pull upstream and 3 will be pushed into the stage + // onPush(3) -> aggregator=Seq(3) pull upstream since queue is empty + downstream + .request(1) + .expectNoMessage() // onPull, no data to emit, won't pull upstream again since it's already pulled + timePasses(maxGap) // emit Seq(3) onTimer + downstream.expectNext(Seq(3)) + upstream.sendNext(4) // onPush(4) -> aggregator=Seq(4) will follow, and pull upstream again + upstream.sendNext(5) // onPush(5) -> aggregator=Seq(4,5) will happen right after due to the previous pull from onPush(4), eagerly pull even out is not available + upstream.sendNext(6) // onPush(6) -> aggregator=Seq(4,5,6) will happen right after due to the previous pull from onPush(5), even the queue is full at this point + timePasses(maxGap) // harvest queue=(Seq(4,5,6)) + upstream.sendNext(7) // onPush(7), aggregator=Seq(7), queue=(Seq(4,5,6) no pulling upstream due to queue is full + // if sending another message it will stay in upstream, prevent the upstream completion from happening + upstream.sendComplete() // emit remaining queue=Queue(Seq(4,5,6)) + harvest and emit aggregator=Seq(7) + // since there is no pending push from upstream, onUpstreamFinish will be triggered to emit the queue and pending aggregator + downstream.request(2).expectNext(Seq(4, 5, 6), Seq(7)) // clean up the emit queue and complete downstream + downstream.expectComplete() + } +} diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/AggregateWithBoundary.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/AggregateWithBoundary.scala new file mode 100644 index 0000000000..a80bd59efe --- /dev/null +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/AggregateWithBoundary.scala @@ -0,0 +1,80 @@ +/* + * Copyright (C) 2021 Lightbend Inc. + */ + +package akka.stream.impl.fusing + +import akka.annotation.InternalApi +import akka.stream.{ Attributes, FlowShape, Inlet, Outlet } +import akka.stream.stage.{ GraphStage, GraphStageLogic, InHandler, OutHandler, TimerGraphStageLogic } + +import scala.concurrent.duration._ + +/** + * INTERNAL API + */ +@InternalApi +private[akka] final case class AggregateWithBoundary[In, Agg, Out]( + allocate: () => Agg, + aggregate: (Agg, In) => (Agg, Boolean), + harvest: Agg => Out, + emitOnTimer: Option[(Agg => Boolean, FiniteDuration)]) + extends GraphStage[FlowShape[In, Out]] { + + emitOnTimer.foreach { + case (_, interval) => require(interval.gteq(1.milli), s"timer(${interval.toCoarsest}) must not be smaller than 1ms") + } + + val in: Inlet[In] = Inlet[In](s"${this.getClass.getName}.in") + val out: Outlet[Out] = Outlet[Out](s"${this.getClass.getName}.out") + override val shape: FlowShape[In, Out] = FlowShape(in, out) + + override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = + new TimerGraphStageLogic(shape) with InHandler with OutHandler { + + private[this] var aggregated: Agg = null.asInstanceOf[Agg] + + override def preStart(): Unit = { + emitOnTimer.foreach { + case (_, interval) => scheduleWithFixedDelay(s"${this.getClass.getSimpleName}Timer", interval, interval) + } + } + + override protected def onTimer(timerKey: Any): Unit = { + emitOnTimer.foreach { + case (isReadyOnTimer, _) => if (aggregated != null && isReadyOnTimer(aggregated)) harvestAndEmit() + } + } + + // at onPush, isAvailable(in)=true hasBeenPulled(in)=false, isAvailable(out) could be true or false due to timer triggered emit + override def onPush(): Unit = { + if (aggregated == null) aggregated = allocate() + val (updated, result) = aggregate(aggregated, grab(in)) + aggregated = updated + if (result) harvestAndEmit() + // the decision to pull entirely depend on isAvailable(out)=true, regardless of result of aggregate + // 1. aggregate=true: isAvailable(out) will be false + // 2. aggregate=false: if isAvailable(out)=false, this means timer has caused emit, cannot pull or it could emit indefinitely bypassing back pressure + if (isAvailable(out)) pull(in) + } + + override def onUpstreamFinish(): Unit = { + // Note that emit is asynchronous, it will keep the stage alive until downstream actually take the element + if (aggregated != null) emit(out, harvest(aggregated)) + completeStage() + } + + // at onPull, isAvailable(out) is always true indicating downstream is waiting + // isAvailable(in) and hasBeenPulled(in) can be (true, false) (false, true) or (false, false) + override def onPull(): Unit = if (!hasBeenPulled(in)) pull(in) + + setHandlers(in, out, this) + + private def harvestAndEmit(): Unit = { + emit(out, harvest(aggregated)) + aggregated = null.asInstanceOf[Agg] + } + + } + +} diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala index b00d260724..087fbf0cfc 100755 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala @@ -14,14 +14,13 @@ import scala.annotation.unchecked.uncheckedVariance import scala.compat.java8.FutureConverters._ import scala.concurrent.duration.FiniteDuration import scala.reflect.ClassTag - import scala.annotation.nowarn import org.reactivestreams.Processor - import akka.Done import akka.NotUsed import akka.actor.ActorRef import akka.actor.ClassicActorSystemProvider +import akka.annotation.ApiMayChange import akka.dispatch.ExecutionContexts import akka.event.{ LogMarker, LoggingAdapter, MarkerLoggingAdapter } import akka.japi.Pair @@ -3978,6 +3977,37 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr extractContext: function.Function[Out, CtxOut]): FlowWithContext[U, CtxU, Out, CtxOut, Mat] = this.asScala.asFlowWithContext((x: U, c: CtxU) => collapseContext.apply(x, c))(x => extractContext.apply(x)).asJava + /** + * Aggregate input elements into an arbitrary data structure that can be completed and emitted downstream + * when custom condition is met which can be triggered by aggregate or timer. + * It can be thought of a more general [[groupedWeightedWithin]]. + * + * '''Emits when''' the aggregation function decides the aggregate is complete or the timer function returns true + * + * '''Backpressures when''' downstream backpressures and the aggregate is complete + * + * '''Completes when''' upstream completes and the last aggregate has been emitted downstream + * + * '''Cancels when''' downstream cancels + * + * @param allocate allocate the initial data structure for aggregated elements + * @param aggregate update the aggregated elements, return true if ready to emit after update. + * @param harvest this is invoked before emit within the current stage/operator + * @param emitOnTimer decide whether the current aggregated elements can be emitted, the custom function is invoked on every interval + */ + @ApiMayChange + def aggregateWithBoundary[Agg, Emit](allocate: java.util.function.Supplier[Agg])( + aggregate: function.Function2[Agg, Out, Pair[Agg, Boolean]], + harvest: function.Function[Agg, Emit], + emitOnTimer: Pair[java.util.function.Predicate[Agg], java.time.Duration]): javadsl.Flow[In, Emit, Mat] = + asScala + .aggregateWithBoundary(() => allocate.get())( + aggregate = (agg, out) => aggregate.apply(agg, out).toScala, + harvest = agg => harvest.apply(agg), + emitOnTimer = Option(emitOnTimer).map { + case Pair(predicate, duration) => (agg => predicate.test(agg), duration.asScala) + }) + .asJava } object RunnableGraph { diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala index 47d3219fac..21da0cef80 100755 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala @@ -16,11 +16,11 @@ import scala.compat.java8.OptionConverters._ import scala.concurrent.{ Future, Promise } import scala.concurrent.duration.FiniteDuration import scala.reflect.ClassTag - import scala.annotation.nowarn import org.reactivestreams.{ Publisher, Subscriber } import akka.{ Done, NotUsed } import akka.actor.{ ActorRef, Cancellable, ClassicActorSystemProvider } +import akka.annotation.ApiMayChange import akka.dispatch.ExecutionContexts import akka.event.{ LogMarker, LoggingAdapter, MarkerLoggingAdapter } import akka.japi.{ function, JavaPartialFunction, Pair, Util } @@ -4522,4 +4522,36 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[ **/ def asSourceWithContext[Ctx](extractContext: function.Function[Out, Ctx]): SourceWithContext[Out, Ctx, Mat] = new scaladsl.SourceWithContext(this.asScala.map(x => (x, extractContext.apply(x)))).asJava + + /** + * Aggregate input elements into an arbitrary data structure that can be completed and emitted downstream + * when custom condition is met which can be triggered by aggregate or timer. + * It can be thought of a more general [[groupedWeightedWithin]]. + * + * '''Emits when''' the aggregation function decides the aggregate is complete or the timer function returns true + * + * '''Backpressures when''' downstream backpressures and the aggregate is complete + * + * '''Completes when''' upstream completes and the last aggregate has been emitted downstream + * + * '''Cancels when''' downstream cancels + * + * @param allocate allocate the initial data structure for aggregated elements + * @param aggregate update the aggregated elements, return true if ready to emit after update. + * @param harvest this is invoked before emit within the current stage/operator + * @param emitOnTimer decide whether the current aggregated elements can be emitted, the custom function is invoked on every interval + */ + @ApiMayChange + def aggregateWithBoundary[Agg, Emit](allocate: java.util.function.Supplier[Agg])( + aggregate: function.Function2[Agg, Out, Pair[Agg, Boolean]], + harvest: function.Function[Agg, Emit], + emitOnTimer: Pair[java.util.function.Predicate[Agg], java.time.Duration]): javadsl.Source[Emit, Mat] = + asScala + .aggregateWithBoundary(() => allocate.get())( + aggregate = (agg, out) => aggregate.apply(agg, out).toScala, + harvest = agg => harvest.apply(agg), + emitOnTimer = Option(emitOnTimer).map { + case Pair(predicate, duration) => (agg => predicate.test(agg), duration.asScala) + }) + .asJava } diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/SubFlow.scala b/akka-stream/src/main/scala/akka/stream/javadsl/SubFlow.scala index 3ccf927d90..4cfe4744a2 100755 --- a/akka-stream/src/main/scala/akka/stream/javadsl/SubFlow.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/SubFlow.scala @@ -16,6 +16,7 @@ import scala.reflect.ClassTag import scala.annotation.nowarn import akka.NotUsed +import akka.annotation.ApiMayChange import akka.event.{ LogMarker, LoggingAdapter, MarkerLoggingAdapter } import akka.japi.{ function, Pair, Util } import akka.stream._ @@ -2619,4 +2620,35 @@ class SubFlow[In, Out, Mat]( def logWithMarker(name: String, marker: function.Function[Out, LogMarker]): SubFlow[In, Out, Mat] = this.logWithMarker(name, marker, ConstantFun.javaIdentityFunction[Out], null) + /** + * Aggregate input elements into an arbitrary data structure that can be completed and emitted downstream + * when custom condition is met which can be triggered by aggregate or timer. + * It can be thought of a more general [[groupedWeightedWithin]]. + * + * '''Emits when''' the aggregation function decides the aggregate is complete or the timer function returns true + * + * '''Backpressures when''' downstream backpressures and the aggregate is complete + * + * '''Completes when''' upstream completes and the last aggregate has been emitted downstream + * + * '''Cancels when''' downstream cancels + * + * @param allocate allocate the initial data structure for aggregated elements + * @param aggregate update the aggregated elements, return true if ready to emit after update. + * @param harvest this is invoked before emit within the current stage/operator + * @param emitOnTimer decide whether the current aggregated elements can be emitted, the custom function is invoked on every interval + */ + @ApiMayChange + def aggregateWithBoundary[Agg, Emit](allocate: java.util.function.Supplier[Agg])( + aggregate: function.Function2[Agg, Out, Pair[Agg, Boolean]], + harvest: function.Function[Agg, Emit], + emitOnTimer: Pair[java.util.function.Predicate[Agg], java.time.Duration]): javadsl.SubFlow[In, Emit, Mat] = + new SubFlow( + asScala.aggregateWithBoundary(() => allocate.get())( + aggregate = (agg, out) => aggregate.apply(agg, out).toScala, + harvest = agg => harvest.apply(agg), + emitOnTimer = Option(emitOnTimer).map { + case Pair(predicate, duration) => (agg => predicate.test(agg), duration.asScala) + })) + } diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/SubSource.scala b/akka-stream/src/main/scala/akka/stream/javadsl/SubSource.scala index c8743974cc..f5a99adcf8 100755 --- a/akka-stream/src/main/scala/akka/stream/javadsl/SubSource.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/SubSource.scala @@ -16,6 +16,7 @@ import scala.reflect.ClassTag import scala.annotation.nowarn import akka.NotUsed +import akka.annotation.ApiMayChange import akka.event.{ LogMarker, LoggingAdapter, MarkerLoggingAdapter } import akka.japi.{ function, Pair, Util } import akka.stream._ @@ -2592,4 +2593,34 @@ class SubSource[Out, Mat]( def logWithMarker(name: String, marker: function.Function[Out, LogMarker]): SubSource[Out, Mat] = this.logWithMarker(name, marker, ConstantFun.javaIdentityFunction[Out], null) + /** + * Aggregate input elements into an arbitrary data structure that can be completed and emitted downstream + * when custom condition is met which can be triggered by aggregate or timer. + * It can be thought of a more general [[groupedWeightedWithin]]. + * + * '''Emits when''' the aggregation function decides the aggregate is complete or the timer function returns true + * + * '''Backpressures when''' downstream backpressures and the aggregate is complete + * + * '''Completes when''' upstream completes and the last aggregate has been emitted downstream + * + * '''Cancels when''' downstream cancels + * + * @param allocate allocate the initial data structure for aggregated elements + * @param aggregate update the aggregated elements, return true if ready to emit after update. + * @param harvest this is invoked before emit within the current stage/operator + * @param emitOnTimer decide whether the current aggregated elements can be emitted, the custom function is invoked on every interval + */ + @ApiMayChange + def aggregateWithBoundary[Agg, Emit](allocate: java.util.function.Supplier[Agg])( + aggregate: function.Function2[Agg, Out, Pair[Agg, Boolean]], + harvest: function.Function[Agg, Emit], + emitOnTimer: Pair[java.util.function.Predicate[Agg], java.time.Duration]): javadsl.SubSource[Emit, Mat] = + new SubSource( + asScala.aggregateWithBoundary(() => allocate.get())( + aggregate = (agg, out) => aggregate.apply(agg, out).toScala, + harvest = agg => harvest.apply(agg), + emitOnTimer = Option(emitOnTimer).map { + case Pair(predicate, duration) => (agg => predicate.test(agg), duration.asScala) + })) } diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala index 875040a75a..c91c82a192 100755 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala @@ -14,7 +14,7 @@ import org.reactivestreams.{ Processor, Publisher, Subscriber, Subscription } import akka.Done import akka.NotUsed import akka.actor.ActorRef -import akka.annotation.DoNotInherit +import akka.annotation.{ ApiMayChange, DoNotInherit } import akka.event.{ LogMarker, LoggingAdapter, MarkerLoggingAdapter } import akka.stream.Attributes.SourceLocation import akka.stream._ @@ -3272,6 +3272,32 @@ trait FlowOps[+Out, +Mat] { * asynchronously. */ def async: Repr[Out] + + /** + * Aggregate input elements into an arbitrary data structure that can be completed and emitted downstream + * when custom condition is met which can be triggered by aggregate or timer. + * It can be thought of a more general [[groupedWeightedWithin]]. + * + * '''Emits when''' the aggregation function decides the aggregate is complete or the timer function returns true + * + * '''Backpressures when''' downstream backpressures and the aggregate is complete + * + * '''Completes when''' upstream completes and the last aggregate has been emitted downstream + * + * '''Cancels when''' downstream cancels + * + * @param allocate allocate the initial data structure for aggregated elements + * @param aggregate update the aggregated elements, return true if ready to emit after update. + * @param harvest this is invoked before emit within the current stage/operator + * @param emitOnTimer decide whether the current aggregated elements can be emitted, the custom function is invoked on every interval + */ + @ApiMayChange + def aggregateWithBoundary[Agg, Emit](allocate: () => Agg)( + aggregate: (Agg, Out) => (Agg, Boolean), + harvest: Agg => Emit, + emitOnTimer: Option[(Agg => Boolean, FiniteDuration)]): Repr[Emit] = + via(AggregateWithBoundary(allocate, aggregate, harvest, emitOnTimer)) + } /** diff --git a/akka-testkit/src/main/scala/akka/testkit/ExplicitlyTriggeredScheduler.scala b/akka-testkit/src/main/scala/akka/testkit/ExplicitlyTriggeredScheduler.scala index 8e0f880472..fe12052ff7 100644 --- a/akka-testkit/src/main/scala/akka/testkit/ExplicitlyTriggeredScheduler.scala +++ b/akka-testkit/src/main/scala/akka/testkit/ExplicitlyTriggeredScheduler.scala @@ -123,4 +123,9 @@ class ExplicitlyTriggeredScheduler(@unused config: Config, log: LoggingAdapter, } override def maxFrequency: Double = 42 + + /** + * The scheduler need to expose its internal time for testing. + */ + def currentTimeMs: Long = currentTime.get() }