Replaced PushStage based Drop with GraphStage #19834

This commit is contained in:
Tapio Rautonen 2016-02-26 02:51:07 +02:00
parent 2fcc1a3a45
commit 7856916e4d
4 changed files with 24 additions and 13 deletions

View file

@ -142,14 +142,25 @@ private[akka] final case class Take[T](count: Long) extends PushPullStage[T, T]
/**
* INTERNAL API
*/
private[akka] final case class Drop[T](count: Long) extends PushStage[T, T] {
private var left: Long = count
private[akka] final case class Drop[T](count: Long) extends SimpleLinearGraphStage[T] {
override def initialAttributes: Attributes = DefaultAttributes.drop
override def onPush(elem: T, ctx: Context[T]): SyncDirective =
if (left > 0) {
left -= 1
ctx.pull()
} else ctx.push(elem)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) with InHandler with OutHandler {
private var left: Long = count
override def onPush(): Unit = {
if (left > 0) {
left -= 1
pull(in)
} else push(out, grab(in))
}
override def onPull(): Unit = pull(in)
setHandlers(in, out, this)
}
override def toString: String = "Drop"
}
/**