diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala index 6181c75f8a..6028a1c91c 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala @@ -247,41 +247,35 @@ final case class Intersperse[T](start: Option[T], inject: T, end: Option[T]) ext override val shape = FlowShape(in, out) override def createLogic(attr: Attributes): GraphStageLogic = new GraphStageLogic(shape) { - var first = true // is this the first element? - - setHandler(in, new InHandler { + val startInHandler = new InHandler { override def onPush(): Unit = { - if (first) { - first = false - // emit: start if defined, elem - if (start.isDefined) emitMultipe(out, Iterator(start.get, grab(in))) - else emit(out, grab(in)) - } else { - // emit: inject, elem - emitMultipe(out, Iterator(inject, grab(in))) - } + // if else (to avoid using Iterator[T].flatten in hot code) + if (start.isDefined) emitMultipe(out, Iterator(start.get, grab(in))) + else emit(out, grab(in)) + setHandler(in, restInHandler) // switch handler } override def onUpstreamFinish(): Unit = { - if (first) { - // this branch is only called if upstream is an empty one - // emit: start if defined, end if defined, otherwise emit nothing - if (start.isDefined && end.isDefined) emitMultipe(out, Iterator(start.get, end.get)) - else if (start.isDefined && !end.isDefined) emit(out, start.get) - else if (!start.isDefined && end.isDefined) emit(out, end.get) - else { /* emit nothing */ } - } else { - // emit: end if defined, otherwise emit nothing - if (end.isDefined) emit(out, end.get) - else { /* emit nothing */ } - } + emitMultipe(out, Iterator(start, end).flatten) completeStage() } - }) + } - setHandler(out, new OutHandler { + val restInHandler = new InHandler { + override def onPush(): Unit = emitMultipe(out, Iterator(inject, grab(in))) + + override def onUpstreamFinish(): Unit = { + if (end.isDefined) emit(out, end.get) + completeStage() + } + } + + val outHandler = new OutHandler { override def onPull(): Unit = pull(in) - }) + } + + setHandler(in, startInHandler) + setHandler(out, outHandler) } }