code restructure to enhance readability
This commit is contained in:
parent
e253593535
commit
fca478c763
1 changed files with 21 additions and 27 deletions
|
|
@ -247,41 +247,35 @@ final case class Intersperse[T](start: Option[T], inject: T, end: Option[T]) ext
|
|||
override val shape = FlowShape(in, out)
|
||||
|
||||
override def createLogic(attr: Attributes): GraphStageLogic = new GraphStageLogic(shape) {
|
||||
var first = true // is this the first element?
|
||||
|
||||
setHandler(in, new InHandler {
|
||||
val startInHandler = new InHandler {
|
||||
override def onPush(): Unit = {
|
||||
if (first) {
|
||||
first = false
|
||||
// emit: start if defined, elem
|
||||
if (start.isDefined) emitMultipe(out, Iterator(start.get, grab(in)))
|
||||
else emit(out, grab(in))
|
||||
} else {
|
||||
// emit: inject, elem
|
||||
emitMultipe(out, Iterator(inject, grab(in)))
|
||||
}
|
||||
// if else (to avoid using Iterator[T].flatten in hot code)
|
||||
if (start.isDefined) emitMultipe(out, Iterator(start.get, grab(in)))
|
||||
else emit(out, grab(in))
|
||||
setHandler(in, restInHandler) // switch handler
|
||||
}
|
||||
|
||||
override def onUpstreamFinish(): Unit = {
|
||||
if (first) {
|
||||
// this branch is only called if upstream is an empty one
|
||||
// emit: start if defined, end if defined, otherwise emit nothing
|
||||
if (start.isDefined && end.isDefined) emitMultipe(out, Iterator(start.get, end.get))
|
||||
else if (start.isDefined && !end.isDefined) emit(out, start.get)
|
||||
else if (!start.isDefined && end.isDefined) emit(out, end.get)
|
||||
else { /* emit nothing */ }
|
||||
} else {
|
||||
// emit: end if defined, otherwise emit nothing
|
||||
if (end.isDefined) emit(out, end.get)
|
||||
else { /* emit nothing */ }
|
||||
}
|
||||
emitMultipe(out, Iterator(start, end).flatten)
|
||||
completeStage()
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
setHandler(out, new OutHandler {
|
||||
val restInHandler = new InHandler {
|
||||
override def onPush(): Unit = emitMultipe(out, Iterator(inject, grab(in)))
|
||||
|
||||
override def onUpstreamFinish(): Unit = {
|
||||
if (end.isDefined) emit(out, end.get)
|
||||
completeStage()
|
||||
}
|
||||
}
|
||||
|
||||
val outHandler = new OutHandler {
|
||||
override def onPull(): Unit = pull(in)
|
||||
})
|
||||
}
|
||||
|
||||
setHandler(in, startInHandler)
|
||||
setHandler(out, outHandler)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue