Merge pull request #19886 from trautonen/drop-as-graphstage-trautonen

Replaced PushStage based Drop with GraphStage #19834
This commit is contained in:
Johan Andrén 2016-02-28 17:55:18 +01:00
commit 1c40d64d62
4 changed files with 24 additions and 13 deletions

View file

@ -16,6 +16,9 @@ class InterpreterStressSpec extends AkkaSpec with GraphInterpreterSpecKit {
val map = Map((x: Int) x + 1, stoppingDecider).toGS
// GraphStage can be reused
val dropOne = Drop(1)
"Interpreter" must {
"work with a massive chain of maps" in new OneBoundedSetup[Int](Vector.fill(chainLength)(map): _*) {
@ -80,7 +83,7 @@ class InterpreterStressSpec extends AkkaSpec with GraphInterpreterSpecKit {
}
"work with a massive chain of drops" in new OneBoundedSetup[Int](Vector.fill(chainLength / 1000)(Drop(1))) {
"work with a massive chain of drops" in new OneBoundedSetup[Int](Vector.fill(chainLength / 1000)(dropOne): _*) {
lastEvents() should be(Set.empty)
downstream.requestOne()

View file

@ -183,10 +183,6 @@ private[stream] object Stages {
override def create(attr: Attributes): Stage[T, T] = fusing.Take(n)
}
final case class Drop[T](n: Long, attributes: Attributes = drop) extends SymbolicStage[T, T] {
override def create(attr: Attributes): Stage[T, T] = fusing.Drop(n)
}
final case class TakeWhile[T](p: T Boolean, attributes: Attributes = takeWhile) extends SymbolicStage[T, T] {
override def create(attr: Attributes): Stage[T, T] = fusing.TakeWhile(p, supervision(attr))
}

View file

@ -142,14 +142,25 @@ private[akka] final case class Take[T](count: Long) extends PushPullStage[T, T]
/**
* INTERNAL API
*/
private[akka] final case class Drop[T](count: Long) extends PushStage[T, T] {
private var left: Long = count
private[akka] final case class Drop[T](count: Long) extends SimpleLinearGraphStage[T] {
override def initialAttributes: Attributes = DefaultAttributes.drop
override def onPush(elem: T, ctx: Context[T]): SyncDirective =
if (left > 0) {
left -= 1
ctx.pull()
} else ctx.push(elem)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) with InHandler with OutHandler {
private var left: Long = count
override def onPush(): Unit = {
if (left > 0) {
left -= 1
pull(in)
} else push(out, grab(in))
}
override def onPull(): Unit = pull(in)
setHandlers(in, out, this)
}
override def toString: String = "Drop"
}
/**

View file

@ -887,7 +887,8 @@ trait FlowOps[+Out, +Mat] {
*
* '''Cancels when''' downstream cancels
*/
def drop(n: Long): Repr[Out] = andThen(Drop(n))
def drop(n: Long): Repr[Out] =
via(Drop[Out](n))
/**
* Discard the elements received within the given duration at beginning of the stream.