=str: #16285: Expand should not drop elements on completion

This commit is contained in:
Endre Sándor Varga 2014-11-25 12:26:24 +01:00
parent cac9137aa9
commit 325e1b6915
4 changed files with 94 additions and 40 deletions

View file

@ -267,25 +267,39 @@ private[akka] final case class Conflate[In, Out](seed: In ⇒ Out, aggregate: (O
* INTERNAL API
*/
private[akka] final case class Expand[In, Out, Seed](seed: In Seed, extrapolate: Seed (Out, Seed)) extends DetachedStage[In, Out] {
private var s: Any = null
private var s: Seed = _
private var started: Boolean = false
private var expanded: Boolean = false
override def onPush(elem: In, ctx: DetachedContext[Out]): UpstreamDirective = {
s = seed(elem)
started = true
expanded = false
if (ctx.isHolding) {
val (emit, newS) = extrapolate(s.asInstanceOf[Seed])
val (emit, newS) = extrapolate(s)
s = newS
expanded = true
ctx.pushAndPull(emit)
} else ctx.hold()
}
override def onPull(ctx: DetachedContext[Out]): DownstreamDirective = {
if (s == null) ctx.hold()
if (ctx.isFinishing) {
if (!started) ctx.finish()
else ctx.pushAndFinish(extrapolate(s)._1)
} else if (!started) ctx.hold()
else {
val (emit, newS) = extrapolate(s.asInstanceOf[Seed])
val (emit, newS) = extrapolate(s)
s = newS
expanded = true
if (ctx.isHolding) ctx.pushAndPull(emit)
else ctx.push(emit)
}
}
override def onUpstreamFinish(ctx: DetachedContext[Out]): TerminationDirective = {
if (expanded) ctx.finish()
else ctx.absorbTermination()
}
}