WIP - Base intersperse on GraphStage instead of StatefulStage. TODO: handle logic to add last element
This commit is contained in:
parent
108688de99
commit
0d04d3bf5c
3 changed files with 27 additions and 39 deletions
|
|
@ -188,10 +188,6 @@ private[stream] object Stages {
|
|||
override def create(attr: Attributes): Stage[In, Out] = fusing.Scan(zero, f, supervision(attr))
|
||||
}
|
||||
|
||||
final case class Intersperse[T](start: Option[T], inject: T, end: Option[T], attributes: Attributes = intersperse) extends SymbolicStage[T, T] {
|
||||
override def create(attr: Attributes) = fusing.Intersperse(start, inject, end)
|
||||
}
|
||||
|
||||
final case class Fold[In, Out](zero: Out, f: (Out, In) ⇒ Out, attributes: Attributes = fold) extends SymbolicStage[In, Out] {
|
||||
override def create(attr: Attributes): Stage[In, Out] = fusing.Fold(zero, f, supervision(attr))
|
||||
}
|
||||
|
|
|
|||
|
|
@ -239,43 +239,35 @@ private[akka] final case class Fold[In, Out](zero: Out, f: (Out, In) ⇒ Out, de
|
|||
/**
|
||||
* INTERNAL API
|
||||
*/
|
||||
private[akka] final case class Intersperse[T](start: Option[T], inject: T, end: Option[T]) extends StatefulStage[T, T] {
|
||||
private var needsToEmitStart = start.isDefined
|
||||
private[akka] final case class Intersperse[T](start: Option[T], inject: T, end: Option[T]) extends GraphStage[FlowShape[T, T]] {
|
||||
|
||||
override def initial: StageState[T, T] =
|
||||
start match {
|
||||
case Some(initial) ⇒ firstWithInitial(initial)
|
||||
case _ ⇒ first
|
||||
}
|
||||
private val in = Inlet[T]("in")
|
||||
private val out = Outlet[T]("out")
|
||||
|
||||
def firstWithInitial(initial: T) = new StageState[T, T] {
|
||||
override def onPush(elem: T, ctx: Context[T]) = {
|
||||
needsToEmitStart = false
|
||||
emit(Iterator(initial, elem), ctx, running)
|
||||
}
|
||||
}
|
||||
override val shape = FlowShape(in, out)
|
||||
|
||||
def first = new StageState[T, T] {
|
||||
override def onPush(elem: T, ctx: Context[T]) = {
|
||||
become(running)
|
||||
ctx.push(elem)
|
||||
}
|
||||
}
|
||||
override def createLogic(attr: Attributes): GraphStageLogic = new GraphStageLogic(shape) {
|
||||
var s_ = start.isDefined
|
||||
var m_ = false
|
||||
var e_ = end.isDefined
|
||||
|
||||
def running = new StageState[T, T] {
|
||||
override def onPush(elem: T, ctx: Context[T]): SyncDirective =
|
||||
emit(Iterator(inject, elem), ctx)
|
||||
}
|
||||
setHandler(in, new InHandler {
|
||||
override def onPush(): Unit = push(out, grab(in))
|
||||
})
|
||||
|
||||
override def onUpstreamFinish(ctx: Context[T]): TerminationDirective = {
|
||||
end match {
|
||||
case Some(e) if needsToEmitStart ⇒
|
||||
terminationEmit(Iterator(start.get, end.get), ctx)
|
||||
case Some(e) ⇒
|
||||
terminationEmit(Iterator(end.get), ctx)
|
||||
case _ ⇒
|
||||
terminationEmit(Iterator(), ctx)
|
||||
}
|
||||
setHandler(out, new OutHandler {
|
||||
override def onPull(): Unit = {
|
||||
if (s_) { // emit start
|
||||
push(out, start.get)
|
||||
s_ = false
|
||||
} else { // emit inject
|
||||
if (m_) push(out, inject)
|
||||
else pull(in)
|
||||
|
||||
m_ = !m_
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -9,7 +9,7 @@ import akka.stream._
|
|||
import akka.stream.impl.Stages.{ DirectProcessor, StageModule, SymbolicGraphStage }
|
||||
import akka.stream.impl.StreamLayout.{ EmptyModule, Module }
|
||||
import akka.stream.impl._
|
||||
import akka.stream.impl.fusing.{ DropWithin, GroupedWithin, MapAsync, MapAsyncUnordered, TakeWithin }
|
||||
import akka.stream.impl.fusing._
|
||||
import akka.stream.stage.AbstractStage.{ PushPullGraphStage, PushPullGraphStageWithMaterializedValue }
|
||||
import akka.stream.stage._
|
||||
import org.reactivestreams.{ Processor, Publisher, Subscriber, Subscription }
|
||||
|
|
@ -669,7 +669,7 @@ trait FlowOps[+Out, +Mat] {
|
|||
ReactiveStreamsCompliance.requireNonNullElement(start)
|
||||
ReactiveStreamsCompliance.requireNonNullElement(inject)
|
||||
ReactiveStreamsCompliance.requireNonNullElement(end)
|
||||
andThen(Intersperse(Some(start), inject, Some(end)))
|
||||
via(Intersperse(Some(start), inject, Some(end)))
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -696,7 +696,7 @@ trait FlowOps[+Out, +Mat] {
|
|||
*/
|
||||
def intersperse[T >: Out](inject: T): Repr[T, Mat] = {
|
||||
ReactiveStreamsCompliance.requireNonNullElement(inject)
|
||||
andThen(Intersperse(None, inject, None))
|
||||
via(Intersperse(None, inject, None))
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue