Merge branch 'release-2.3-dev' of github.com:akka/akka into release-2.3-dev
This commit is contained in:
commit
8e62368afe
9 changed files with 309 additions and 50 deletions
|
|
@ -240,43 +240,43 @@ private[akka] final case class Fold[In, Out](zero: Out, f: (Out, In) ⇒ Out, de
|
|||
/**
|
||||
* INTERNAL API
|
||||
*/
|
||||
private[akka] final case class Intersperse[T](start: Option[T], inject: T, end: Option[T]) extends StatefulStage[T, T] {
|
||||
private var needsToEmitStart = start.isDefined
|
||||
final case class Intersperse[T](start: Option[T], inject: T, end: Option[T]) extends GraphStage[FlowShape[T, T]] {
|
||||
|
||||
override def initial: StageState[T, T] =
|
||||
start match {
|
||||
case Some(initial) ⇒ firstWithInitial(initial)
|
||||
case _ ⇒ first
|
||||
private val in = Inlet[T]("in")
|
||||
private val out = Outlet[T]("out")
|
||||
|
||||
override val shape = FlowShape(in, out)
|
||||
|
||||
override def createLogic(attr: Attributes): GraphStageLogic = new GraphStageLogic(shape) {
|
||||
val startInHandler = new InHandler {
|
||||
override def onPush(): Unit = {
|
||||
// if else (to avoid using Iterator[T].flatten in hot code)
|
||||
if (start.isDefined) emitMultiple(out, Iterator(start.get, grab(in)))
|
||||
else emit(out, grab(in))
|
||||
setHandler(in, restInHandler) // switch handler
|
||||
}
|
||||
|
||||
override def onUpstreamFinish(): Unit = {
|
||||
emitMultiple(out, Iterator(start, end).flatten)
|
||||
completeStage()
|
||||
}
|
||||
}
|
||||
|
||||
def firstWithInitial(initial: T) = new StageState[T, T] {
|
||||
override def onPush(elem: T, ctx: Context[T]) = {
|
||||
needsToEmitStart = false
|
||||
emit(Iterator(initial, elem), ctx, running)
|
||||
}
|
||||
}
|
||||
val restInHandler = new InHandler {
|
||||
override def onPush(): Unit = emitMultiple(out, Iterator(inject, grab(in)))
|
||||
|
||||
def first = new StageState[T, T] {
|
||||
override def onPush(elem: T, ctx: Context[T]) = {
|
||||
become(running)
|
||||
ctx.push(elem)
|
||||
override def onUpstreamFinish(): Unit = {
|
||||
if (end.isDefined) emit(out, end.get)
|
||||
completeStage()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
def running = new StageState[T, T] {
|
||||
override def onPush(elem: T, ctx: Context[T]): SyncDirective =
|
||||
emit(Iterator(inject, elem), ctx)
|
||||
}
|
||||
|
||||
override def onUpstreamFinish(ctx: Context[T]): TerminationDirective = {
|
||||
end match {
|
||||
case Some(e) if needsToEmitStart ⇒
|
||||
terminationEmit(Iterator(start.get, end.get), ctx)
|
||||
case Some(e) ⇒
|
||||
terminationEmit(Iterator(end.get), ctx)
|
||||
case _ ⇒
|
||||
terminationEmit(Iterator(), ctx)
|
||||
val outHandler = new OutHandler {
|
||||
override def onPull(): Unit = pull(in)
|
||||
}
|
||||
|
||||
setHandler(in, startInHandler)
|
||||
setHandler(out, outHandler)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue