=str refine built-in Take and TakeWhile stages (#21871)

* refine build-in stages:

* change InHandler to avoid unnecessary condition check
* remove unnecessary local variable
* fix typo

* revert the drop stage change and remove timer
This commit is contained in:
Hawstein 2017-02-14 19:24:28 +08:00 committed by Konrad `ktoso` Malawski
parent 70f2beaf0b
commit 873e87fb33

View file

@ -301,12 +301,11 @@ final case class Take[T](count: Long) extends SimpleLinearGraphStage[T] {
private var left: Long = count
override def onPush(): Unit = {
val leftBefore = left
if (leftBefore >= 1) {
left = leftBefore - 1
if (left > 0) {
push(out, grab(in))
left -= 1
}
if (leftBefore <= 1) completeStage()
if (left <= 0) completeStage()
}
override def onPull(): Unit = {
@ -765,8 +764,8 @@ final case class LimitWeighted[T](val n: Long, val costFn: T ⇒ Long) extends S
override def onPush(): Unit = {
val elem = grab(in)
withSupervision(() costFn(elem)) match {
case Some(wight)
left -= wight
case Some(weight)
left -= weight
if (left >= 0) push(out, elem) else failStage(new StreamLimitReachedException(n))
case None //do nothing
}
@ -1595,11 +1594,9 @@ final class TakeWithin[T](val timeout: FiniteDuration) extends SimpleLinearGraph
def onPull(): Unit = pull(in)
setHandler(in, this)
setHandler(out, this)
setHandlers(in, out, this)
final override protected def onTimer(key: Any): Unit =
completeStage()
final override protected def onTimer(key: Any): Unit = completeStage()
override def preStart(): Unit = scheduleOnce("TakeWithinTimer", timeout)
}
@ -1608,23 +1605,27 @@ final class TakeWithin[T](val timeout: FiniteDuration) extends SimpleLinearGraph
}
final class DropWithin[T](val timeout: FiniteDuration) extends SimpleLinearGraphStage[T] {
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new TimerGraphStageLogic(shape) with InHandler with OutHandler {
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) with InHandler with OutHandler {
private var allow = false
private val startNanoTime = System.nanoTime()
private val timeoutInNano = timeout.toNanos
def onPush(): Unit = {
if (allow) push(out, grab(in))
else pull(in)
if (System.nanoTime() - startNanoTime <= timeoutInNano) {
pull(in)
} else {
push(out, grab(in))
// change the in handler to avoid System.nanoTime call after timeout
setHandler(in, new InHandler {
def onPush() = push(out, grab(in))
})
}
}
def onPull(): Unit = pull(in)
setHandler(in, this)
setHandler(out, this)
setHandlers(in, out, this)
final override protected def onTimer(key: Any): Unit = allow = true
override def preStart(): Unit = scheduleOnce("DropWithinTimer", timeout)
}
override def toString = "DropWithin"