rewrite Buffer as a GraphStage #21528

This commit is contained in:
Hawstein 2016-11-16 01:48:33 +08:00 committed by Johan Andrén
parent 88a6bdb059
commit 96a1d2a081
4 changed files with 66 additions and 60 deletions

View file

@ -843,61 +843,74 @@ final case class Sliding[T](val n: Int, val step: Int) extends GraphStage[FlowSh
/**
* INTERNAL API
*/
final case class Buffer[T](size: Int, overflowStrategy: OverflowStrategy) extends DetachedStage[T, T] {
final case class Buffer[T](size: Int, overflowStrategy: OverflowStrategy) extends SimpleLinearGraphStage[T] {
private var buffer: BufferImpl[T] = _
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) with InHandler with OutHandler {
override def preStart(ctx: LifecycleContext): Unit = {
buffer = BufferImpl(size, ctx.materializer)
}
private var buffer: BufferImpl[T] = _
override def onPush(elem: T, ctx: DetachedContext[T]): UpstreamDirective =
if (ctx.isHoldingDownstream) ctx.pushAndPull(elem)
else enqueueAction(ctx, elem)
override def onPull(ctx: DetachedContext[T]): DownstreamDirective = {
if (ctx.isFinishing) {
val elem = buffer.dequeue()
if (buffer.isEmpty) ctx.pushAndFinish(elem)
else ctx.push(elem)
} else if (ctx.isHoldingUpstream) ctx.pushAndPull(buffer.dequeue())
else if (buffer.isEmpty) ctx.holdDownstream()
else ctx.push(buffer.dequeue())
}
override def onUpstreamFinish(ctx: DetachedContext[T]): TerminationDirective =
if (buffer.isEmpty) ctx.finish()
else ctx.absorbTermination()
val enqueueAction: (DetachedContext[T], T) UpstreamDirective =
overflowStrategy match {
case DropHead (ctx, elem)
if (buffer.isFull) buffer.dropHead()
buffer.enqueue(elem)
ctx.pull()
case DropTail (ctx, elem)
if (buffer.isFull) buffer.dropTail()
buffer.enqueue(elem)
ctx.pull()
case DropBuffer (ctx, elem)
if (buffer.isFull) buffer.clear()
buffer.enqueue(elem)
ctx.pull()
case DropNew (ctx, elem)
if (!buffer.isFull) buffer.enqueue(elem)
ctx.pull()
case Backpressure (ctx, elem)
buffer.enqueue(elem)
if (buffer.isFull) ctx.holdUpstream()
else ctx.pull()
case Fail (ctx, elem)
if (buffer.isFull) ctx.fail(new BufferOverflowException(s"Buffer overflow (max capacity was: $size)!"))
else {
val enqueueAction: T Unit =
overflowStrategy match {
case DropHead elem
if (buffer.isFull) buffer.dropHead()
buffer.enqueue(elem)
ctx.pull()
}
pull(in)
case DropTail elem
if (buffer.isFull) buffer.dropTail()
buffer.enqueue(elem)
pull(in)
case DropBuffer elem
if (buffer.isFull) buffer.clear()
buffer.enqueue(elem)
pull(in)
case DropNew elem
if (!buffer.isFull) buffer.enqueue(elem)
pull(in)
case Backpressure elem
buffer.enqueue(elem)
if (!buffer.isFull) pull(in)
case Fail elem
if (buffer.isFull) failStage(new BufferOverflowException(s"Buffer overflow (max capacity was: $size)!"))
else {
buffer.enqueue(elem)
pull(in)
}
}
override def preStart(): Unit = {
buffer = BufferImpl(size, materializer)
pull(in)
}
override def onPush(): Unit = {
val elem = grab(in)
// If out is available, then it has been pulled but no dequeued element has been delivered.
// It means the buffer at this moment is definitely empty,
// so we just push the current element to out, then pull.
if (isAvailable(out)) {
push(out, elem)
pull(in)
} else {
enqueueAction(elem)
}
}
override def onPull(): Unit = {
if (buffer.nonEmpty) push(out, buffer.dequeue())
if (isClosed(in)) {
if (buffer.isEmpty) completeStage()
} else if (!hasBeenPulled(in)) {
pull(in)
}
}
override def onUpstreamFinish(): Unit = {
if (buffer.isEmpty) completeStage()
}
setHandlers(in, out, this)
}
}
/**