+str #17226 add dropWhile and takeWhile

This commit is contained in:
Alexander Golubev 2015-06-12 23:22:36 -04:00
parent 632868b868
commit 6f9438a2b0
11 changed files with 326 additions and 0 deletions

View file

@ -36,6 +36,37 @@ private[akka] final case class Filter[T](p: T ⇒ Boolean, decider: Supervision.
override def decide(t: Throwable): Supervision.Directive = decider(t)
}
/**
* INTERNAL API
*/
private[akka] final case class TakeWhile[T](p: T Boolean, decider: Supervision.Decider) extends PushStage[T, T] {
override def onPush(elem: T, ctx: Context[T]): SyncDirective =
if (p(elem))
ctx.push(elem)
else
ctx.finish()
override def decide(t: Throwable): Supervision.Directive = decider(t)
}
/**
* INTERNAL API
*/
private[akka] final case class DropWhile[T](p: T Boolean, decider: Supervision.Decider) extends PushStage[T, T] {
var taking = false
override def onPush(elem: T, ctx: Context[T]): SyncDirective =
if (taking || !p(elem)) {
taking = true
ctx.push(elem)
} else {
ctx.pull
}
override def decide(t: Throwable): Supervision.Directive = decider(t)
}
private[akka] final object Collect {
// Cached function that can be used with PartialFunction.applyOrElse to ensure that A) the guard is only applied once,
// and the caller can check the returned value with Collect.notApplied to query whether the PF was applied or not.