#19444 simplify signature of FlowOps.expand

Any previous usage can be represented in this fashion while typically
saving one function allocation and also saving the function literal’s
syntax overhead (for the seed). Plus a new feature: the provided
iterator does not have to be infinite, limiting how far expand is
allowed to go.
This commit is contained in:
Roland Kuhn 2016-01-18 11:29:14 +01:00
parent a12451e007
commit 7463c50fc9
16 changed files with 127 additions and 151 deletions

View file

@ -20,6 +20,7 @@ import scala.util.control.NonFatal
import scala.util.{ Failure, Success, Try }
import akka.stream.ActorAttributes.SupervisionStrategy
import scala.concurrent.duration.{ FiniteDuration, _ }
import akka.stream.impl.Stages.DefaultAttributes
/**
* INTERNAL API
@ -480,47 +481,54 @@ private[akka] final case class Conflate[In, Out](seed: In ⇒ Out, aggregate: (O
/**
* INTERNAL API
*/
private[akka] final case class Expand[In, Out, Seed](seed: In Seed, extrapolate: Seed (Out, Seed)) extends DetachedStage[In, Out] {
private var s: Seed = _
private var started: Boolean = false
private var expanded: Boolean = false
private[akka] final class Expand[In, Out](extrapolate: In Iterator[Out]) extends GraphStage[FlowShape[In, Out]] {
private val in = Inlet[In]("expand.in")
private val out = Outlet[Out]("expand.out")
override def onPush(elem: In, ctx: DetachedContext[Out]): UpstreamDirective = {
s = seed(elem)
started = true
expanded = false
if (ctx.isHoldingDownstream) {
val (emit, newS) = extrapolate(s)
s = newS
expanded = true
ctx.pushAndPull(emit)
} else ctx.holdUpstream()
override def initialAttributes = DefaultAttributes.expand
override val shape = FlowShape(in, out)
override def createLogic(attr: Attributes) = new GraphStageLogic(shape) {
private var iterator: Iterator[Out] = Iterator.empty
private var expanded = false
override def preStart(): Unit = pull(in)
setHandler(in, new InHandler {
override def onPush(): Unit = {
iterator = extrapolate(grab(in))
if (iterator.hasNext) {
if (isAvailable(out)) {
expanded = true
pull(in)
push(out, iterator.next())
} else expanded = false
} else pull(in)
}
override def onUpstreamFinish(): Unit = {
if (iterator.hasNext && !expanded) () // need to wait
else completeStage()
}
})
setHandler(out, new OutHandler {
override def onPull(): Unit = {
if (iterator.hasNext) {
if (expanded == false) {
expanded = true
if (isClosed(in)) {
push(out, iterator.next())
completeStage()
} else {
// expand needs to pull first to be fair when upstream is not actually slow
pull(in)
push(out, iterator.next())
}
} else push(out, iterator.next())
}
}
})
}
override def onPull(ctx: DetachedContext[Out]): DownstreamDirective = {
if (ctx.isFinishing) {
if (!started) ctx.finish()
else ctx.pushAndFinish(extrapolate(s)._1)
} else if (!started) ctx.holdDownstream()
else {
val (emit, newS) = extrapolate(s)
s = newS
expanded = true
if (ctx.isHoldingUpstream) ctx.pushAndPull(emit)
else ctx.push(emit)
}
}
override def onUpstreamFinish(ctx: DetachedContext[Out]): TerminationDirective = {
if (expanded) ctx.finish()
else ctx.absorbTermination()
}
override def decide(t: Throwable): Supervision.Directive = Supervision.Stop
override def restart(): Expand[In, Out, Seed] =
throw new UnsupportedOperationException("Expand doesn't support restart")
}
/**