convert Recover to GraphStage

This commit is contained in:
zhxiaog 2016-04-18 15:20:32 +08:00
parent 2418e610ab
commit 7207189d6a
3 changed files with 37 additions and 22 deletions

View file

@ -194,29 +194,48 @@ private[stream] final case class Collect[In, Out](pf: PartialFunction[In, Out])
/**
* INTERNAL API
*/
private[akka] final case class Recover[T](pf: PartialFunction[Throwable, T]) extends PushPullStage[T, T] {
import Collect.NotApplied
var recovered: Option[T] = None
private[akka] final case class Recover[T](pf: PartialFunction[Throwable, T]) extends GraphStage[FlowShape[T, T]] {
val in = Inlet[T]("Recover.in")
val out = Outlet[T]("Recover.out")
override val shape: FlowShape[T, T] = FlowShape(in, out)
override def onPush(elem: T, ctx: Context[T]): SyncDirective = {
ctx.push(elem)
}
override protected val initialAttributes: Attributes = DefaultAttributes.recover
override def onPull(ctx: Context[T]): SyncDirective =
recovered match {
case Some(value) ctx.pushAndFinish(value)
case None ctx.pull()
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) with InHandler with OutHandler {
import Collect.NotApplied
var recovered: Option[T] = None
override def onPush(): Unit = {
push(out, grab(in))
}
override def onUpstreamFailure(t: Throwable, ctx: Context[T]): TerminationDirective = {
pf.applyOrElse(t, NotApplied) match {
case NotApplied ctx.fail(t)
case result: T @unchecked
recovered = Some(result)
ctx.absorbTermination()
override def onPull(): Unit = {
recovered match {
case Some(elem) {
push(out, elem)
completeStage()
}
case None pull(in)
}
}
}
override def onUpstreamFailure(ex: Throwable): Unit = {
pf.applyOrElse(ex, NotApplied) match {
case NotApplied failStage(ex)
case result: T @unchecked {
if (isAvailable(out)) {
push(out, result)
completeStage()
} else {
recovered = Some(result)
}
}
}
}
setHandlers(in, out, this)
}
}
/**