Replaced PushStage based TakeWhile with GraphStage #19834

This commit is contained in:
Evgeny Vanslov 2016-02-29 13:20:00 +03:00
parent 53eb0c847a
commit f42ab58b05
4 changed files with 33 additions and 42 deletions

View file

@ -43,17 +43,39 @@ private[akka] final case class Filter[T](p: T ⇒ Boolean, decider: Supervision.
}
/**
* INTERNAL API
*/
private[akka] final case class TakeWhile[T](p: T Boolean, decider: Supervision.Decider) extends PushStage[T, T] {
* INTERNAL API
*/
private[akka] final case class TakeWhile[T](p: T Boolean) extends SimpleLinearGraphStage[T] {
override def initialAttributes: Attributes = DefaultAttributes.takeWhile
override def onPush(elem: T, ctx: Context[T]): SyncDirective =
if (p(elem))
ctx.push(elem)
else
ctx.finish()
override def toString: String = "TakeWhile"
override def decide(t: Throwable): Supervision.Directive = decider(t)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new GraphStageLogic(shape) with OutHandler with InHandler {
override def toString = "TakeWhileLogic"
def decider = inheritedAttributes.get[SupervisionStrategy].map(_.decider).getOrElse(Supervision.stoppingDecider)
override def onPush(): Unit = {
try {
val elem = grab(in)
if (p(elem)) {
push(out, elem)
} else {
completeStage()
}
} catch {
case NonFatal(ex) decider(ex) match {
case Supervision.Stop failStage(ex)
case _ pull(in)
}
}
}
override def onPull(): Unit = pull(in)
setHandlers(in, out, this)
}
}
/**