+str #16610 adding aggregate & aggregateWeighted ops
This commit is contained in:
parent
b1351b36ed
commit
d690067fc9
9 changed files with 492 additions and 1 deletions
|
|
@ -478,6 +478,86 @@ private[akka] final case class Conflate[In, Out](seed: In ⇒ Out, aggregate: (O
|
|||
override def restart(): Conflate[In, Out] = copy()
|
||||
}
|
||||
|
||||
/**
|
||||
* INTERNAL API
|
||||
*/
|
||||
private[akka] final case class AggregateWeighted[In, Out](max: Long, costFn: In ⇒ Long, seed: In ⇒ Out,
|
||||
aggregate: (Out, In) ⇒ Out,
|
||||
decider: Supervision.Decider) extends DetachedStage[In, Out] {
|
||||
private var agg: Any = null
|
||||
private var left: Long = max
|
||||
private var pending: Any = null
|
||||
|
||||
private[this] def flush(ctx: DetachedContext[Out]) = {
|
||||
val result = agg.asInstanceOf[Out]
|
||||
agg = null
|
||||
left = max
|
||||
if (pending != null) {
|
||||
val elem = pending.asInstanceOf[In]
|
||||
agg = seed(elem)
|
||||
left -= costFn(elem)
|
||||
pending = null
|
||||
}
|
||||
ctx.pushAndPull(result)
|
||||
}
|
||||
|
||||
override def onPush(elem: In, ctx: DetachedContext[Out]): UpstreamDirective = {
|
||||
val cost = costFn(elem)
|
||||
if (agg == null) {
|
||||
left -= cost
|
||||
agg = seed(elem)
|
||||
} else if (left <= 0 || left - cost < 0) {
|
||||
pending = elem
|
||||
} else {
|
||||
left -= cost
|
||||
agg = aggregate(agg.asInstanceOf[Out], elem)
|
||||
}
|
||||
|
||||
if (!ctx.isHoldingDownstream && pending == null) ctx.pull()
|
||||
else if (!ctx.isHoldingDownstream) ctx.holdUpstream()
|
||||
else flush(ctx)
|
||||
}
|
||||
|
||||
override def onPull(ctx: DetachedContext[Out]): DownstreamDirective = {
|
||||
//if ctx.isFinishing, we still might emit up to 2 more elements (whatever agg is + possibly a pending heavy element)
|
||||
if (ctx.isFinishing) {
|
||||
//agg != null since we already checked it in onUpstreamFinish
|
||||
val result = agg.asInstanceOf[Out]
|
||||
if (pending == null) ctx.pushAndFinish(result)
|
||||
else {
|
||||
val elem = pending.asInstanceOf[In]
|
||||
agg = seed(elem)
|
||||
pending = null
|
||||
ctx.push(result)
|
||||
}
|
||||
} else if (ctx.isHoldingBoth) flush(ctx)
|
||||
else if (agg == null) ctx.holdDownstream()
|
||||
else {
|
||||
val result = agg.asInstanceOf[Out]
|
||||
left = max
|
||||
if (pending != null) {
|
||||
val elem = pending.asInstanceOf[In]
|
||||
agg = seed(elem)
|
||||
left -= costFn(elem)
|
||||
pending = null
|
||||
} else {
|
||||
agg = null
|
||||
}
|
||||
if (ctx.isHoldingUpstream) ctx.pushAndPull(result)
|
||||
else ctx.push(result)
|
||||
}
|
||||
}
|
||||
|
||||
override def onUpstreamFinish(ctx: DetachedContext[Out]): TerminationDirective = {
|
||||
if (agg == null) ctx.finish()
|
||||
else ctx.absorbTermination()
|
||||
}
|
||||
|
||||
override def decide(t: Throwable): Supervision.Directive = decider(t)
|
||||
|
||||
override def restart(): AggregateWeighted[In, Out] = copy()
|
||||
}
|
||||
|
||||
/**
|
||||
* INTERNAL API
|
||||
*/
|
||||
|
|
@ -993,4 +1073,4 @@ private[stream] final class DropWithin[T](timeout: FiniteDuration) extends Simpl
|
|||
}
|
||||
|
||||
override def toString = "DropWithin"
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue