From 7856916e4d291aa604b57d02fd8ee8875b515858 Mon Sep 17 00:00:00 2001 From: Tapio Rautonen Date: Fri, 26 Feb 2016 02:51:07 +0200 Subject: [PATCH] Replaced PushStage based Drop with GraphStage #19834 --- .../impl/fusing/InterpreterStressSpec.scala | 5 +++- .../main/scala/akka/stream/impl/Stages.scala | 4 --- .../scala/akka/stream/impl/fusing/Ops.scala | 25 +++++++++++++------ .../scala/akka/stream/scaladsl/Flow.scala | 3 ++- 4 files changed, 24 insertions(+), 13 deletions(-) diff --git a/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/InterpreterStressSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/InterpreterStressSpec.scala index 930bf4d92f..99f2a20b33 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/InterpreterStressSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/InterpreterStressSpec.scala @@ -16,6 +16,9 @@ class InterpreterStressSpec extends AkkaSpec with GraphInterpreterSpecKit { val map = Map((x: Int) ⇒ x + 1, stoppingDecider).toGS + // GraphStage can be reused + val dropOne = Drop(1) + "Interpreter" must { "work with a massive chain of maps" in new OneBoundedSetup[Int](Vector.fill(chainLength)(map): _*) { @@ -80,7 +83,7 @@ class InterpreterStressSpec extends AkkaSpec with GraphInterpreterSpecKit { } - "work with a massive chain of drops" in new OneBoundedSetup[Int](Vector.fill(chainLength / 1000)(Drop(1))) { + "work with a massive chain of drops" in new OneBoundedSetup[Int](Vector.fill(chainLength / 1000)(dropOne): _*) { lastEvents() should be(Set.empty) downstream.requestOne() diff --git a/akka-stream/src/main/scala/akka/stream/impl/Stages.scala b/akka-stream/src/main/scala/akka/stream/impl/Stages.scala index 89f8241096..ac9e344abd 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/Stages.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/Stages.scala @@ -183,10 +183,6 @@ private[stream] object Stages { override def create(attr: Attributes): Stage[T, T] = fusing.Take(n) } - final case class Drop[T](n: Long, attributes: Attributes = drop) extends SymbolicStage[T, T] { - override def create(attr: Attributes): Stage[T, T] = fusing.Drop(n) - } - final case class TakeWhile[T](p: T ⇒ Boolean, attributes: Attributes = takeWhile) extends SymbolicStage[T, T] { override def create(attr: Attributes): Stage[T, T] = fusing.TakeWhile(p, supervision(attr)) } diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala index 4dd67c4754..01fbc59a9a 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala @@ -142,14 +142,25 @@ private[akka] final case class Take[T](count: Long) extends PushPullStage[T, T] /** * INTERNAL API */ -private[akka] final case class Drop[T](count: Long) extends PushStage[T, T] { - private var left: Long = count +private[akka] final case class Drop[T](count: Long) extends SimpleLinearGraphStage[T] { + override def initialAttributes: Attributes = DefaultAttributes.drop - override def onPush(elem: T, ctx: Context[T]): SyncDirective = - if (left > 0) { - left -= 1 - ctx.pull() - } else ctx.push(elem) + override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) with InHandler with OutHandler { + private var left: Long = count + + override def onPush(): Unit = { + if (left > 0) { + left -= 1 + pull(in) + } else push(out, grab(in)) + } + + override def onPull(): Unit = pull(in) + + setHandlers(in, out, this) + } + + override def toString: String = "Drop" } /** diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala index e0e21b73de..707d92cf4f 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala @@ -887,7 +887,8 @@ trait FlowOps[+Out, +Mat] { * * '''Cancels when''' downstream cancels */ - def drop(n: Long): Repr[Out] = andThen(Drop(n)) + def drop(n: Long): Repr[Out] = + via(Drop[Out](n)) /** * Discard the elements received within the given duration at beginning of the stream.