diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala index 54465d4ab1..734b1fdd22 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala @@ -517,17 +517,17 @@ final case class Fold[In, Out](zero: Out, f: (Out, In) ⇒ Out) extends GraphSta inheritedAttributes.get[SupervisionStrategy].map(_.decider).getOrElse(Supervision.stoppingDecider) override def onPush(): Unit = { + val elem = grab(in) try { - aggregator = f(aggregator, grab(in)) - pull(in) + aggregator = f(aggregator, elem) } catch { case NonFatal(ex) ⇒ decider(ex) match { - case Supervision.Stop ⇒ failStage(ex) - case Supervision.Resume ⇒ pull(in) - case Supervision.Restart ⇒ - aggregator = zero - pull(in) + case Supervision.Stop ⇒ failStage(ex) + case Supervision.Restart ⇒ aggregator = zero + case _ ⇒ () } + } finally { + if (!isClosed(in)) pull(in) } } @@ -1642,18 +1642,20 @@ final class Reduce[T](val f: (T, T) ⇒ T) extends SimpleLinearGraphStage[T] { } override def onPush(): Unit = { + val elem = grab(in) try { - aggregator = f(aggregator, grab(in)) - pull(in) + aggregator = f(aggregator, elem) } catch { case NonFatal(ex) ⇒ decider(ex) match { - case Supervision.Stop ⇒ failStage(ex) - case Supervision.Resume ⇒ pull(in) + case Supervision.Stop ⇒ failStage(ex) case Supervision.Restart ⇒ aggregator = _: T setInitialInHandler() - pull(in) + case _ ⇒ () + } + } finally { + if (!isClosed(in)) pull(in) } }