diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala index 6336133c6d..3103712952 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala @@ -379,19 +379,24 @@ final case class Fold[In, Out](zero: Out, f: (Out, In) ⇒ Out) extends GraphSta override val initialAttributes = DefaultAttributes.fold override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = - new SupervisedGraphStageLogic(inheritedAttributes, shape) with InHandler with OutHandler { + new GraphStageLogic(shape) with InHandler with OutHandler { private var aggregator: Out = zero - override def onResume(t: Throwable): Unit = { - aggregator = zero - } + private def decider = + inheritedAttributes.get[SupervisionStrategy].map(_.decider).getOrElse(Supervision.stoppingDecider) - override def onPush(): Unit = withSupervision(() ⇒ grab(in)) match { - case Some(elem) ⇒ { - aggregator = f(aggregator, elem) + override def onPush(): Unit = { + try { + aggregator = f(aggregator, grab(in)) pull(in) + } catch { + case NonFatal(ex) ⇒ decider(ex) match { + case Supervision.Stop ⇒ failStage(ex) + case _ ⇒ + aggregator = zero + pull(in) + } } - case None ⇒ pull(in) } override def onPull(): Unit = {