Optimize Fold supervision
This commit is contained in:
parent
5382014133
commit
a6cf6c646e
1 changed files with 13 additions and 8 deletions
|
|
@ -379,19 +379,24 @@ final case class Fold[In, Out](zero: Out, f: (Out, In) ⇒ Out) extends GraphSta
|
||||||
override val initialAttributes = DefaultAttributes.fold
|
override val initialAttributes = DefaultAttributes.fold
|
||||||
|
|
||||||
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
|
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
|
||||||
new SupervisedGraphStageLogic(inheritedAttributes, shape) with InHandler with OutHandler {
|
new GraphStageLogic(shape) with InHandler with OutHandler {
|
||||||
private var aggregator: Out = zero
|
private var aggregator: Out = zero
|
||||||
|
|
||||||
override def onResume(t: Throwable): Unit = {
|
private def decider =
|
||||||
aggregator = zero
|
inheritedAttributes.get[SupervisionStrategy].map(_.decider).getOrElse(Supervision.stoppingDecider)
|
||||||
}
|
|
||||||
|
|
||||||
override def onPush(): Unit = withSupervision(() ⇒ grab(in)) match {
|
override def onPush(): Unit = {
|
||||||
case Some(elem) ⇒ {
|
try {
|
||||||
aggregator = f(aggregator, elem)
|
aggregator = f(aggregator, grab(in))
|
||||||
pull(in)
|
pull(in)
|
||||||
|
} catch {
|
||||||
|
case NonFatal(ex) ⇒ decider(ex) match {
|
||||||
|
case Supervision.Stop ⇒ failStage(ex)
|
||||||
|
case _ ⇒
|
||||||
|
aggregator = zero
|
||||||
|
pull(in)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
case None ⇒ pull(in)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
override def onPull(): Unit = {
|
override def onPull(): Unit = {
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue