From 873e87fb33f0a5a93f0441e60738725b3ce90319 Mon Sep 17 00:00:00 2001 From: Hawstein Date: Tue, 14 Feb 2017 19:24:28 +0800 Subject: [PATCH] =str refine built-in Take and TakeWhile stages (#21871) * refine build-in stages: * change InHandler to avoid unnecessary condition check * remove unnecessary local variable * fix typo * revert the drop stage change and remove timer --- .../scala/akka/stream/impl/fusing/Ops.scala | 39 ++++++++++--------- 1 file changed, 20 insertions(+), 19 deletions(-) diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala index ad13157596..3ebb32aea1 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala @@ -301,12 +301,11 @@ final case class Take[T](count: Long) extends SimpleLinearGraphStage[T] { private var left: Long = count override def onPush(): Unit = { - val leftBefore = left - if (leftBefore >= 1) { - left = leftBefore - 1 + if (left > 0) { push(out, grab(in)) + left -= 1 } - if (leftBefore <= 1) completeStage() + if (left <= 0) completeStage() } override def onPull(): Unit = { @@ -765,8 +764,8 @@ final case class LimitWeighted[T](val n: Long, val costFn: T ⇒ Long) extends S override def onPush(): Unit = { val elem = grab(in) withSupervision(() ⇒ costFn(elem)) match { - case Some(wight) ⇒ - left -= wight + case Some(weight) ⇒ + left -= weight if (left >= 0) push(out, elem) else failStage(new StreamLimitReachedException(n)) case None ⇒ //do nothing } @@ -1595,11 +1594,9 @@ final class TakeWithin[T](val timeout: FiniteDuration) extends SimpleLinearGraph def onPull(): Unit = pull(in) - setHandler(in, this) - setHandler(out, this) + setHandlers(in, out, this) - final override protected def onTimer(key: Any): Unit = - completeStage() + final override protected def onTimer(key: Any): Unit = completeStage() override def preStart(): Unit = scheduleOnce("TakeWithinTimer", timeout) } @@ -1608,23 +1605,27 @@ final class TakeWithin[T](val timeout: FiniteDuration) extends SimpleLinearGraph } final class DropWithin[T](val timeout: FiniteDuration) extends SimpleLinearGraphStage[T] { - override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new TimerGraphStageLogic(shape) with InHandler with OutHandler { + override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) with InHandler with OutHandler { - private var allow = false + private val startNanoTime = System.nanoTime() + private val timeoutInNano = timeout.toNanos def onPush(): Unit = { - if (allow) push(out, grab(in)) - else pull(in) + if (System.nanoTime() - startNanoTime <= timeoutInNano) { + pull(in) + } else { + push(out, grab(in)) + // change the in handler to avoid System.nanoTime call after timeout + setHandler(in, new InHandler { + def onPush() = push(out, grab(in)) + }) + } } def onPull(): Unit = pull(in) - setHandler(in, this) - setHandler(out, this) + setHandlers(in, out, this) - final override protected def onTimer(key: Any): Unit = allow = true - - override def preStart(): Unit = scheduleOnce("DropWithinTimer", timeout) } override def toString = "DropWithin"