New stream operator aggregateWithBoundary #30797

This commit is contained in:
yongjiaw 2022-01-11 07:15:07 -08:00 committed by GitHub
parent 04e22c3687
commit 12b84fb2c7
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
10 changed files with 547 additions and 4 deletions

View file

@ -0,0 +1,31 @@
# aggregateWithBoundary
Aggregate and emit until custom boundary condition met.
@ref[Backpressure aware operators](../index.md#backpressure-aware-operators)
@ref[Timer driven operators](../index.md#timer-driven-operators)
## Signature
@apidoc[Source.aggregateWithBoundary](Source) { scala="#aggregateWithBoundary[Agg,Emit](allocate:()=%3EAgg)(aggregate:(Agg,Out)=%3E(Agg,Boolean),harvest:Agg=%3EEmit,emitOnTimer:Option[(Agg=%3EBoolean,scala.concurrent.duration.FiniteDuration)]):FlowOps.this.Repr[Emit]" java="#aggregateWithBoundary(java.util.function.Supplier,akka.japi.function.Function2,akka.japi.function.Function,akka.japi.Pair)"}
@apidoc[Flow.aggregateWithBoundary](Flow) { scala="#aggregateWithBoundary[Agg,Emit](allocate:()=%3EAgg)(aggregate:(Agg,Out)=%3E(Agg,Boolean),harvest:Agg=%3EEmit,emitOnTimer:Option[(Agg=%3EBoolean,scala.concurrent.duration.FiniteDuration)]):FlowOps.this.Repr[Emit]" java="#aggregateWithBoundary(java.util.function.Supplier,akka.japi.function.Function2,akka.japi.function.Function,akka.japi.Pair)" }
## Description
This operator can be customized into a broad class of aggregate/group/fold operators, based on custom state or timer conditions.
## Reactive Streams semantics
@@@div { .callout }
**emits** when the aggregation function decides the aggregate is complete or the timer function returns true
**backpressures** when downstream backpressures and the aggregate is complete
**completes** when upstream completes and the last aggregate has been emitted downstream
**cancels** when downstream cancels
@@@

View file

@ -215,6 +215,7 @@ These operators are aware of the backpressure provided by their downstreams and
| |Operator|Description| | |Operator|Description|
|--|--|--| |--|--|--|
|Source/Flow|<a name="aggregatewithboundary"></a>@ref[aggregateWithBoundary](Source-or-Flow/aggregateWithBoundary.md)|Aggregate and emit until custom boundary condition met.|
|Source/Flow|<a name="batch"></a>@ref[batch](Source-or-Flow/batch.md)|Allow for a slower downstream by passing incoming elements and a summary into an aggregate function as long as there is backpressure and a maximum number of batched elements is not yet reached.| |Source/Flow|<a name="batch"></a>@ref[batch](Source-or-Flow/batch.md)|Allow for a slower downstream by passing incoming elements and a summary into an aggregate function as long as there is backpressure and a maximum number of batched elements is not yet reached.|
|Source/Flow|<a name="batchweighted"></a>@ref[batchWeighted](Source-or-Flow/batchWeighted.md)|Allow for a slower downstream by passing incoming elements and a summary into an aggregate function as long as there is backpressure and a maximum weight batched elements is not yet reached.| |Source/Flow|<a name="batchweighted"></a>@ref[batchWeighted](Source-or-Flow/batchWeighted.md)|Allow for a slower downstream by passing incoming elements and a summary into an aggregate function as long as there is backpressure and a maximum weight batched elements is not yet reached.|
|Source/Flow|<a name="buffer"></a>@ref[buffer](Source-or-Flow/buffer.md)|Allow for a temporarily faster upstream events by buffering `size` elements.| |Source/Flow|<a name="buffer"></a>@ref[buffer](Source-or-Flow/buffer.md)|Allow for a temporarily faster upstream events by buffering `size` elements.|
@ -366,6 +367,7 @@ For more background see the @ref[Error Handling in Streams](../stream-error.md)
* [actorRefWithBackpressure](Sink/actorRefWithBackpressure.md) * [actorRefWithBackpressure](Sink/actorRefWithBackpressure.md)
* [actorRefWithBackpressure](ActorSource/actorRefWithBackpressure.md) * [actorRefWithBackpressure](ActorSource/actorRefWithBackpressure.md)
* [actorRefWithBackpressure](ActorSink/actorRefWithBackpressure.md) * [actorRefWithBackpressure](ActorSink/actorRefWithBackpressure.md)
* [aggregateWithBoundary](Source-or-Flow/aggregateWithBoundary.md)
* [alsoTo](Source-or-Flow/alsoTo.md) * [alsoTo](Source-or-Flow/alsoTo.md)
* [asFlowWithContext](Flow/asFlowWithContext.md) * [asFlowWithContext](Flow/asFlowWithContext.md)
* [asInputStream](StreamConverters/asInputStream.md) * [asInputStream](StreamConverters/asInputStream.md)

View file

@ -0,0 +1,274 @@
/*
* Copyright (C) 2021 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.stream.scaladsl
import akka.actor.ActorSystem
import akka.stream.OverflowStrategy
import akka.stream.testkit.{ StreamSpec, TestPublisher, TestSubscriber }
import akka.testkit.{ AkkaSpec, ExplicitlyTriggeredScheduler }
import com.typesafe.config.{ ConfigFactory, ConfigValueFactory }
import org.scalatest.matchers.should.Matchers
import org.scalatest.wordspec.AnyWordSpecLike
import scala.collection.mutable.ListBuffer
import scala.concurrent.Await
import scala.concurrent.duration._
class AggregateWithBoundarySpec extends StreamSpec {
"split aggregator by size" in {
val stream = collection.immutable.Seq(1, 2, 3, 4, 5, 6, 7)
val groupSize = 3
val result = Source(stream)
.aggregateWithBoundary(allocate = () => ListBuffer.empty[Int])(aggregate = (buffer, i) => {
buffer.addOne(i)
(buffer, buffer.size >= groupSize)
}, harvest = buffer => buffer.toSeq, emitOnTimer = None)
.runWith(Sink.collection)
Await.result(result, 10.seconds) should be(stream.grouped(groupSize).toSeq)
}
"split aggregator by size and harvest" in {
val stream = collection.immutable.Seq(1, 2, 3, 4, 5, 6, 7)
val groupSize = 3
val result = Source(stream)
.aggregateWithBoundary(allocate = () => ListBuffer.empty[Int])(
aggregate = (buffer, i) => {
buffer.addOne(i)
(buffer, buffer.size >= groupSize)
},
harvest = buffer => buffer.toSeq :+ -1, // append -1 to output to demonstrate the effect of harvest
emitOnTimer = None)
.runWith(Sink.collection)
Await.result(result, 10.seconds) should be(stream.grouped(groupSize).toSeq.map(seq => seq :+ -1))
}
"split aggregator by custom weight condition" in {
val stream = collection.immutable.Seq(1, 2, 3, 4, 5, 6, 7)
val weight = 10
val result = Source(stream)
.aggregateWithBoundary(allocate = () => ListBuffer.empty[Int])(aggregate = (buffer, i) => {
buffer.addOne(i)
(buffer, buffer.sum >= weight)
}, harvest = buffer => buffer.toSeq, emitOnTimer = None)
.runWith(Sink.collection)
Await.result(result, 10.seconds) should be(Seq(Seq(1, 2, 3, 4), Seq(5, 6), Seq(7)))
}
}
// To run multiple tests in parallel using simulated timer,
// the tests must be in separate Specs with different instances of the ActorSystem
class AggregateWithTimeBoundaryAndSimulatedTimeSpec extends AnyWordSpecLike with Matchers {
private def createActorSystem(id: String) =
ActorSystem(
s"ActorSystemWithExplicitlyTriggeredScheduler-$id",
ConfigFactory.load(
AkkaSpec.testConf.withValue(
"akka.scheduler.implementation",
ConfigValueFactory.fromAnyRef("akka.testkit.ExplicitlyTriggeredScheduler"))))
private def getEts(actor: ActorSystem): ExplicitlyTriggeredScheduler = {
actor.scheduler match {
case ets: ExplicitlyTriggeredScheduler => ets
case other => throw new Exception(s"expecting ${classOf[ExplicitlyTriggeredScheduler]} but got ${other.getClass}")
}
}
private def timePasses(amount: FiniteDuration)(implicit actorSystem: ActorSystem): Unit =
getEts(actorSystem).timePasses(amount)
private def schedulerTimeMs(implicit actorSystem: ActorSystem): Long = getEts(actorSystem).currentTimeMs
implicit class SourceWrapper[+Out, +Mat](val source: Source[Out, Mat]) {
/**
* This is a convenient wrapper of [[aggregateWithBoundary]] to handle additional time constraints
* @param maxGap the gap allowed between consecutive aggregate operations
* @param maxDuration the duration of the sequence of aggregate operations from initial seed until emit is triggered
* @param interval interval of the timer to check the maxGap and maxDuration condition
* @param currentTimeMs source of the system time, can be simulated time in testing
*/
def aggregateWithTimeBoundary[Agg, Emit](allocate: => Agg)(
aggregate: (Agg, Out) => (Agg, Boolean),
harvest: Agg => Emit,
maxGap: Option[FiniteDuration],
maxDuration: Option[FiniteDuration],
interval: FiniteDuration,
currentTimeMs: => Long): source.Repr[Emit] = {
require(
maxDuration.nonEmpty || maxGap.nonEmpty,
s"required timing condition maxGap and maxDuration are both missing, use aggregateWithBoundary if it's intended")
class ValueTimeWrapper[T](var value: T) {
var firstTime: Long = -1
var lastTime: Long = -1
def updateTime(time: Long): Unit = {
if (firstTime == -1) firstTime = time
lastTime = time
}
}
source.aggregateWithBoundary(allocate = () => new ValueTimeWrapper(value = allocate))(aggregate = (agg, in) => {
agg.updateTime(currentTimeMs)
// user provided Agg type must be mutable
val (updated, result) = aggregate(agg.value, in)
agg.value = updated
(agg, result)
}, harvest = agg => harvest(agg.value), emitOnTimer = Some((agg => {
val currentTime = currentTimeMs
maxDuration.exists(md => currentTime - agg.firstTime >= md.toMillis) ||
maxGap.exists(mg => currentTime - agg.lastTime >= mg.toMillis)
}, interval)))
}
}
"split aggregator by gap for slow upstream" in {
implicit val actorSystem = createActorSystem("1")
val maxGap = 20.seconds
val p = TestPublisher.probe[Int]()
val result = Source
.fromPublisher(p)
.aggregateWithTimeBoundary(allocate = ListBuffer.empty[Int])(
aggregate = (buffer, i) => {
buffer.addOne(i)
(buffer, false)
},
harvest = seq => seq.toSeq,
maxGap = Some(maxGap), // elements with longer gap will put put to next aggregator
maxDuration = None,
currentTimeMs = schedulerTimeMs,
interval = 1.milli)
.buffer(1, OverflowStrategy.backpressure)
.runWith(Sink.collection)
p.sendNext(1)
timePasses(maxGap / 2) // less than maxGap should not cause emit
p.sendNext(2)
timePasses(maxGap)
p.sendNext(3)
timePasses(maxGap / 2) // less than maxGap should not cause emit
p.sendNext(4)
timePasses(maxGap)
p.sendNext(5)
timePasses(maxGap / 2) // less than maxGap should not cause emit
p.sendNext(6)
timePasses(maxGap / 2) // less than maxGap should not cause emit and it does not accumulate
p.sendNext(7)
p.sendComplete()
Await.result(result, 10.seconds) should be(Seq(Seq(1, 2), Seq(3, 4), Seq(5, 6, 7)))
}
"split aggregator by total duration" in {
implicit val actorSystem = createActorSystem("2")
val maxDuration = 400.seconds
val p = TestPublisher.probe[Int]()
val result = Source
.fromPublisher(p)
.aggregateWithTimeBoundary(allocate = ListBuffer.empty[Int])(
aggregate = (buffer, i) => {
buffer.addOne(i)
(buffer, false)
},
harvest = seq => seq.toSeq,
maxGap = None,
maxDuration = Some(maxDuration), // elements with longer gap will put put to next aggregator
currentTimeMs = schedulerTimeMs,
interval = 1.milli)
.buffer(1, OverflowStrategy.backpressure)
.runWith(Sink.collection)
p.sendNext(1)
timePasses(maxDuration / 4)
p.sendNext(2)
timePasses(maxDuration / 4)
p.sendNext(3)
timePasses(maxDuration / 4)
p.sendNext(4)
timePasses(maxDuration / 4) // maxDuration will accumulate and reach threshold here
p.sendNext(5)
p.sendNext(6)
p.sendNext(7)
p.sendComplete()
Await.result(result, 10.seconds) should be(Seq(Seq(1, 2, 3, 4), Seq(5, 6, 7)))
}
"down stream back pressure should not miss data on completion with pull on start" in {
implicit val actorSystem = createActorSystem("3")
val maxGap = 1.second
val upstream = TestPublisher.probe[Int]()
val downstream = TestSubscriber.probe[Seq[Int]]()
Source
.fromPublisher(upstream)
.aggregateWithTimeBoundary(allocate = ListBuffer.empty[Int])(
aggregate = (buffer, i) => {
buffer.addOne(i)
(buffer, false)
},
harvest = buffer => buffer.toSeq,
maxGap = Some(maxGap),
maxDuration = None,
currentTimeMs = schedulerTimeMs,
interval = 1.milli)
.buffer(1, OverflowStrategy.backpressure)
.to(Sink.fromSubscriber(downstream))
.run()
downstream.ensureSubscription()
upstream.sendNext(1) // onPush(1) -> aggregator=Seq(1), due to the preStart pull, will pull upstream again since queue is empty
timePasses(maxGap) // harvest onTimer, queue=Queue(Seq(1)), aggregator=null
upstream.sendNext(2) // onPush(2) -> aggregator=Seq(2), due to the previous pull, even the queue is already full at this point due to timer, but it won't pull upstream again
timePasses(maxGap) // harvest onTimer, queue=(Seq(1), Seq(2)), aggregator=null, note queue size can be 1 more than the threshold
upstream.sendNext(3) // 3 will not be pushed to the stage until the stage pull upstream
timePasses(maxGap) // since 3 stayed outside of the stage, this gap will not cause 3 to be emitted
downstream.request(1).expectNext(Seq(1)) // onPull emit Seq(1), queue=(Seq(2))
timePasses(maxGap) // since 3 stayed outside of the stage, this gap will not cause 3 to be emitted
downstream
.request(1)
.expectNext(Seq(2)) // onPull emit Seq(2). queue is empty now, pull upstream and 3 will be pushed into the stage
// onPush(3) -> aggregator=Seq(3) pull upstream since queue is empty
downstream
.request(1)
.expectNoMessage() // onPull, no data to emit, won't pull upstream again since it's already pulled
timePasses(maxGap) // emit Seq(3) onTimer
downstream.expectNext(Seq(3))
upstream.sendNext(4) // onPush(4) -> aggregator=Seq(4) will follow, and pull upstream again
upstream.sendNext(5) // onPush(5) -> aggregator=Seq(4,5) will happen right after due to the previous pull from onPush(4), eagerly pull even out is not available
upstream.sendNext(6) // onPush(6) -> aggregator=Seq(4,5,6) will happen right after due to the previous pull from onPush(5), even the queue is full at this point
timePasses(maxGap) // harvest queue=(Seq(4,5,6))
upstream.sendNext(7) // onPush(7), aggregator=Seq(7), queue=(Seq(4,5,6) no pulling upstream due to queue is full
// if sending another message it will stay in upstream, prevent the upstream completion from happening
upstream.sendComplete() // emit remaining queue=Queue(Seq(4,5,6)) + harvest and emit aggregator=Seq(7)
// since there is no pending push from upstream, onUpstreamFinish will be triggered to emit the queue and pending aggregator
downstream.request(2).expectNext(Seq(4, 5, 6), Seq(7)) // clean up the emit queue and complete downstream
downstream.expectComplete()
}
}

View file

@ -0,0 +1,80 @@
/*
* Copyright (C) 2021 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.stream.impl.fusing
import akka.annotation.InternalApi
import akka.stream.{ Attributes, FlowShape, Inlet, Outlet }
import akka.stream.stage.{ GraphStage, GraphStageLogic, InHandler, OutHandler, TimerGraphStageLogic }
import scala.concurrent.duration._
/**
* INTERNAL API
*/
@InternalApi
private[akka] final case class AggregateWithBoundary[In, Agg, Out](
allocate: () => Agg,
aggregate: (Agg, In) => (Agg, Boolean),
harvest: Agg => Out,
emitOnTimer: Option[(Agg => Boolean, FiniteDuration)])
extends GraphStage[FlowShape[In, Out]] {
emitOnTimer.foreach {
case (_, interval) => require(interval.gteq(1.milli), s"timer(${interval.toCoarsest}) must not be smaller than 1ms")
}
val in: Inlet[In] = Inlet[In](s"${this.getClass.getName}.in")
val out: Outlet[Out] = Outlet[Out](s"${this.getClass.getName}.out")
override val shape: FlowShape[In, Out] = FlowShape(in, out)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new TimerGraphStageLogic(shape) with InHandler with OutHandler {
private[this] var aggregated: Agg = null.asInstanceOf[Agg]
override def preStart(): Unit = {
emitOnTimer.foreach {
case (_, interval) => scheduleWithFixedDelay(s"${this.getClass.getSimpleName}Timer", interval, interval)
}
}
override protected def onTimer(timerKey: Any): Unit = {
emitOnTimer.foreach {
case (isReadyOnTimer, _) => if (aggregated != null && isReadyOnTimer(aggregated)) harvestAndEmit()
}
}
// at onPush, isAvailable(in)=true hasBeenPulled(in)=false, isAvailable(out) could be true or false due to timer triggered emit
override def onPush(): Unit = {
if (aggregated == null) aggregated = allocate()
val (updated, result) = aggregate(aggregated, grab(in))
aggregated = updated
if (result) harvestAndEmit()
// the decision to pull entirely depend on isAvailable(out)=true, regardless of result of aggregate
// 1. aggregate=true: isAvailable(out) will be false
// 2. aggregate=false: if isAvailable(out)=false, this means timer has caused emit, cannot pull or it could emit indefinitely bypassing back pressure
if (isAvailable(out)) pull(in)
}
override def onUpstreamFinish(): Unit = {
// Note that emit is asynchronous, it will keep the stage alive until downstream actually take the element
if (aggregated != null) emit(out, harvest(aggregated))
completeStage()
}
// at onPull, isAvailable(out) is always true indicating downstream is waiting
// isAvailable(in) and hasBeenPulled(in) can be (true, false) (false, true) or (false, false)
override def onPull(): Unit = if (!hasBeenPulled(in)) pull(in)
setHandlers(in, out, this)
private def harvestAndEmit(): Unit = {
emit(out, harvest(aggregated))
aggregated = null.asInstanceOf[Agg]
}
}
}

View file

@ -14,14 +14,13 @@ import scala.annotation.unchecked.uncheckedVariance
import scala.compat.java8.FutureConverters._ import scala.compat.java8.FutureConverters._
import scala.concurrent.duration.FiniteDuration import scala.concurrent.duration.FiniteDuration
import scala.reflect.ClassTag import scala.reflect.ClassTag
import scala.annotation.nowarn import scala.annotation.nowarn
import org.reactivestreams.Processor import org.reactivestreams.Processor
import akka.Done import akka.Done
import akka.NotUsed import akka.NotUsed
import akka.actor.ActorRef import akka.actor.ActorRef
import akka.actor.ClassicActorSystemProvider import akka.actor.ClassicActorSystemProvider
import akka.annotation.ApiMayChange
import akka.dispatch.ExecutionContexts import akka.dispatch.ExecutionContexts
import akka.event.{ LogMarker, LoggingAdapter, MarkerLoggingAdapter } import akka.event.{ LogMarker, LoggingAdapter, MarkerLoggingAdapter }
import akka.japi.Pair import akka.japi.Pair
@ -3978,6 +3977,37 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr
extractContext: function.Function[Out, CtxOut]): FlowWithContext[U, CtxU, Out, CtxOut, Mat] = extractContext: function.Function[Out, CtxOut]): FlowWithContext[U, CtxU, Out, CtxOut, Mat] =
this.asScala.asFlowWithContext((x: U, c: CtxU) => collapseContext.apply(x, c))(x => extractContext.apply(x)).asJava this.asScala.asFlowWithContext((x: U, c: CtxU) => collapseContext.apply(x, c))(x => extractContext.apply(x)).asJava
/**
* Aggregate input elements into an arbitrary data structure that can be completed and emitted downstream
* when custom condition is met which can be triggered by aggregate or timer.
* It can be thought of a more general [[groupedWeightedWithin]].
*
* '''Emits when''' the aggregation function decides the aggregate is complete or the timer function returns true
*
* '''Backpressures when''' downstream backpressures and the aggregate is complete
*
* '''Completes when''' upstream completes and the last aggregate has been emitted downstream
*
* '''Cancels when''' downstream cancels
*
* @param allocate allocate the initial data structure for aggregated elements
* @param aggregate update the aggregated elements, return true if ready to emit after update.
* @param harvest this is invoked before emit within the current stage/operator
* @param emitOnTimer decide whether the current aggregated elements can be emitted, the custom function is invoked on every interval
*/
@ApiMayChange
def aggregateWithBoundary[Agg, Emit](allocate: java.util.function.Supplier[Agg])(
aggregate: function.Function2[Agg, Out, Pair[Agg, Boolean]],
harvest: function.Function[Agg, Emit],
emitOnTimer: Pair[java.util.function.Predicate[Agg], java.time.Duration]): javadsl.Flow[In, Emit, Mat] =
asScala
.aggregateWithBoundary(() => allocate.get())(
aggregate = (agg, out) => aggregate.apply(agg, out).toScala,
harvest = agg => harvest.apply(agg),
emitOnTimer = Option(emitOnTimer).map {
case Pair(predicate, duration) => (agg => predicate.test(agg), duration.asScala)
})
.asJava
} }
object RunnableGraph { object RunnableGraph {

View file

@ -16,11 +16,11 @@ import scala.compat.java8.OptionConverters._
import scala.concurrent.{ Future, Promise } import scala.concurrent.{ Future, Promise }
import scala.concurrent.duration.FiniteDuration import scala.concurrent.duration.FiniteDuration
import scala.reflect.ClassTag import scala.reflect.ClassTag
import scala.annotation.nowarn import scala.annotation.nowarn
import org.reactivestreams.{ Publisher, Subscriber } import org.reactivestreams.{ Publisher, Subscriber }
import akka.{ Done, NotUsed } import akka.{ Done, NotUsed }
import akka.actor.{ ActorRef, Cancellable, ClassicActorSystemProvider } import akka.actor.{ ActorRef, Cancellable, ClassicActorSystemProvider }
import akka.annotation.ApiMayChange
import akka.dispatch.ExecutionContexts import akka.dispatch.ExecutionContexts
import akka.event.{ LogMarker, LoggingAdapter, MarkerLoggingAdapter } import akka.event.{ LogMarker, LoggingAdapter, MarkerLoggingAdapter }
import akka.japi.{ function, JavaPartialFunction, Pair, Util } import akka.japi.{ function, JavaPartialFunction, Pair, Util }
@ -4522,4 +4522,36 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[
**/ **/
def asSourceWithContext[Ctx](extractContext: function.Function[Out, Ctx]): SourceWithContext[Out, Ctx, Mat] = def asSourceWithContext[Ctx](extractContext: function.Function[Out, Ctx]): SourceWithContext[Out, Ctx, Mat] =
new scaladsl.SourceWithContext(this.asScala.map(x => (x, extractContext.apply(x)))).asJava new scaladsl.SourceWithContext(this.asScala.map(x => (x, extractContext.apply(x)))).asJava
/**
* Aggregate input elements into an arbitrary data structure that can be completed and emitted downstream
* when custom condition is met which can be triggered by aggregate or timer.
* It can be thought of a more general [[groupedWeightedWithin]].
*
* '''Emits when''' the aggregation function decides the aggregate is complete or the timer function returns true
*
* '''Backpressures when''' downstream backpressures and the aggregate is complete
*
* '''Completes when''' upstream completes and the last aggregate has been emitted downstream
*
* '''Cancels when''' downstream cancels
*
* @param allocate allocate the initial data structure for aggregated elements
* @param aggregate update the aggregated elements, return true if ready to emit after update.
* @param harvest this is invoked before emit within the current stage/operator
* @param emitOnTimer decide whether the current aggregated elements can be emitted, the custom function is invoked on every interval
*/
@ApiMayChange
def aggregateWithBoundary[Agg, Emit](allocate: java.util.function.Supplier[Agg])(
aggregate: function.Function2[Agg, Out, Pair[Agg, Boolean]],
harvest: function.Function[Agg, Emit],
emitOnTimer: Pair[java.util.function.Predicate[Agg], java.time.Duration]): javadsl.Source[Emit, Mat] =
asScala
.aggregateWithBoundary(() => allocate.get())(
aggregate = (agg, out) => aggregate.apply(agg, out).toScala,
harvest = agg => harvest.apply(agg),
emitOnTimer = Option(emitOnTimer).map {
case Pair(predicate, duration) => (agg => predicate.test(agg), duration.asScala)
})
.asJava
} }

View file

@ -16,6 +16,7 @@ import scala.reflect.ClassTag
import scala.annotation.nowarn import scala.annotation.nowarn
import akka.NotUsed import akka.NotUsed
import akka.annotation.ApiMayChange
import akka.event.{ LogMarker, LoggingAdapter, MarkerLoggingAdapter } import akka.event.{ LogMarker, LoggingAdapter, MarkerLoggingAdapter }
import akka.japi.{ function, Pair, Util } import akka.japi.{ function, Pair, Util }
import akka.stream._ import akka.stream._
@ -2619,4 +2620,35 @@ class SubFlow[In, Out, Mat](
def logWithMarker(name: String, marker: function.Function[Out, LogMarker]): SubFlow[In, Out, Mat] = def logWithMarker(name: String, marker: function.Function[Out, LogMarker]): SubFlow[In, Out, Mat] =
this.logWithMarker(name, marker, ConstantFun.javaIdentityFunction[Out], null) this.logWithMarker(name, marker, ConstantFun.javaIdentityFunction[Out], null)
/**
* Aggregate input elements into an arbitrary data structure that can be completed and emitted downstream
* when custom condition is met which can be triggered by aggregate or timer.
* It can be thought of a more general [[groupedWeightedWithin]].
*
* '''Emits when''' the aggregation function decides the aggregate is complete or the timer function returns true
*
* '''Backpressures when''' downstream backpressures and the aggregate is complete
*
* '''Completes when''' upstream completes and the last aggregate has been emitted downstream
*
* '''Cancels when''' downstream cancels
*
* @param allocate allocate the initial data structure for aggregated elements
* @param aggregate update the aggregated elements, return true if ready to emit after update.
* @param harvest this is invoked before emit within the current stage/operator
* @param emitOnTimer decide whether the current aggregated elements can be emitted, the custom function is invoked on every interval
*/
@ApiMayChange
def aggregateWithBoundary[Agg, Emit](allocate: java.util.function.Supplier[Agg])(
aggregate: function.Function2[Agg, Out, Pair[Agg, Boolean]],
harvest: function.Function[Agg, Emit],
emitOnTimer: Pair[java.util.function.Predicate[Agg], java.time.Duration]): javadsl.SubFlow[In, Emit, Mat] =
new SubFlow(
asScala.aggregateWithBoundary(() => allocate.get())(
aggregate = (agg, out) => aggregate.apply(agg, out).toScala,
harvest = agg => harvest.apply(agg),
emitOnTimer = Option(emitOnTimer).map {
case Pair(predicate, duration) => (agg => predicate.test(agg), duration.asScala)
}))
} }

View file

@ -16,6 +16,7 @@ import scala.reflect.ClassTag
import scala.annotation.nowarn import scala.annotation.nowarn
import akka.NotUsed import akka.NotUsed
import akka.annotation.ApiMayChange
import akka.event.{ LogMarker, LoggingAdapter, MarkerLoggingAdapter } import akka.event.{ LogMarker, LoggingAdapter, MarkerLoggingAdapter }
import akka.japi.{ function, Pair, Util } import akka.japi.{ function, Pair, Util }
import akka.stream._ import akka.stream._
@ -2592,4 +2593,34 @@ class SubSource[Out, Mat](
def logWithMarker(name: String, marker: function.Function[Out, LogMarker]): SubSource[Out, Mat] = def logWithMarker(name: String, marker: function.Function[Out, LogMarker]): SubSource[Out, Mat] =
this.logWithMarker(name, marker, ConstantFun.javaIdentityFunction[Out], null) this.logWithMarker(name, marker, ConstantFun.javaIdentityFunction[Out], null)
/**
* Aggregate input elements into an arbitrary data structure that can be completed and emitted downstream
* when custom condition is met which can be triggered by aggregate or timer.
* It can be thought of a more general [[groupedWeightedWithin]].
*
* '''Emits when''' the aggregation function decides the aggregate is complete or the timer function returns true
*
* '''Backpressures when''' downstream backpressures and the aggregate is complete
*
* '''Completes when''' upstream completes and the last aggregate has been emitted downstream
*
* '''Cancels when''' downstream cancels
*
* @param allocate allocate the initial data structure for aggregated elements
* @param aggregate update the aggregated elements, return true if ready to emit after update.
* @param harvest this is invoked before emit within the current stage/operator
* @param emitOnTimer decide whether the current aggregated elements can be emitted, the custom function is invoked on every interval
*/
@ApiMayChange
def aggregateWithBoundary[Agg, Emit](allocate: java.util.function.Supplier[Agg])(
aggregate: function.Function2[Agg, Out, Pair[Agg, Boolean]],
harvest: function.Function[Agg, Emit],
emitOnTimer: Pair[java.util.function.Predicate[Agg], java.time.Duration]): javadsl.SubSource[Emit, Mat] =
new SubSource(
asScala.aggregateWithBoundary(() => allocate.get())(
aggregate = (agg, out) => aggregate.apply(agg, out).toScala,
harvest = agg => harvest.apply(agg),
emitOnTimer = Option(emitOnTimer).map {
case Pair(predicate, duration) => (agg => predicate.test(agg), duration.asScala)
}))
} }

View file

@ -14,7 +14,7 @@ import org.reactivestreams.{ Processor, Publisher, Subscriber, Subscription }
import akka.Done import akka.Done
import akka.NotUsed import akka.NotUsed
import akka.actor.ActorRef import akka.actor.ActorRef
import akka.annotation.DoNotInherit import akka.annotation.{ ApiMayChange, DoNotInherit }
import akka.event.{ LogMarker, LoggingAdapter, MarkerLoggingAdapter } import akka.event.{ LogMarker, LoggingAdapter, MarkerLoggingAdapter }
import akka.stream.Attributes.SourceLocation import akka.stream.Attributes.SourceLocation
import akka.stream._ import akka.stream._
@ -3272,6 +3272,32 @@ trait FlowOps[+Out, +Mat] {
* asynchronously. * asynchronously.
*/ */
def async: Repr[Out] def async: Repr[Out]
/**
* Aggregate input elements into an arbitrary data structure that can be completed and emitted downstream
* when custom condition is met which can be triggered by aggregate or timer.
* It can be thought of a more general [[groupedWeightedWithin]].
*
* '''Emits when''' the aggregation function decides the aggregate is complete or the timer function returns true
*
* '''Backpressures when''' downstream backpressures and the aggregate is complete
*
* '''Completes when''' upstream completes and the last aggregate has been emitted downstream
*
* '''Cancels when''' downstream cancels
*
* @param allocate allocate the initial data structure for aggregated elements
* @param aggregate update the aggregated elements, return true if ready to emit after update.
* @param harvest this is invoked before emit within the current stage/operator
* @param emitOnTimer decide whether the current aggregated elements can be emitted, the custom function is invoked on every interval
*/
@ApiMayChange
def aggregateWithBoundary[Agg, Emit](allocate: () => Agg)(
aggregate: (Agg, Out) => (Agg, Boolean),
harvest: Agg => Emit,
emitOnTimer: Option[(Agg => Boolean, FiniteDuration)]): Repr[Emit] =
via(AggregateWithBoundary(allocate, aggregate, harvest, emitOnTimer))
} }
/** /**

View file

@ -123,4 +123,9 @@ class ExplicitlyTriggeredScheduler(@unused config: Config, log: LoggingAdapter,
} }
override def maxFrequency: Double = 42 override def maxFrequency: Double = 42
/**
* The scheduler need to expose its internal time for testing.
*/
def currentTimeMs: Long = currentTime.get()
} }