Took grab and pull out of try

This commit is contained in:
Andy Chung 2016-11-04 16:13:11 +00:00
parent a6a4d1ac7f
commit fff707781e

View file

@ -517,17 +517,17 @@ final case class Fold[In, Out](zero: Out, f: (Out, In) ⇒ Out) extends GraphSta
inheritedAttributes.get[SupervisionStrategy].map(_.decider).getOrElse(Supervision.stoppingDecider) inheritedAttributes.get[SupervisionStrategy].map(_.decider).getOrElse(Supervision.stoppingDecider)
override def onPush(): Unit = { override def onPush(): Unit = {
val elem = grab(in)
try { try {
aggregator = f(aggregator, grab(in)) aggregator = f(aggregator, elem)
pull(in)
} catch { } catch {
case NonFatal(ex) decider(ex) match { case NonFatal(ex) decider(ex) match {
case Supervision.Stop failStage(ex) case Supervision.Stop failStage(ex)
case Supervision.Resume pull(in) case Supervision.Restart aggregator = zero
case Supervision.Restart case _ ()
aggregator = zero
pull(in)
} }
} finally {
if (!isClosed(in)) pull(in)
} }
} }
@ -1642,18 +1642,20 @@ final class Reduce[T](val f: (T, T) ⇒ T) extends SimpleLinearGraphStage[T] {
} }
override def onPush(): Unit = { override def onPush(): Unit = {
val elem = grab(in)
try { try {
aggregator = f(aggregator, grab(in)) aggregator = f(aggregator, elem)
pull(in)
} catch { } catch {
case NonFatal(ex) decider(ex) match { case NonFatal(ex) decider(ex) match {
case Supervision.Stop failStage(ex) case Supervision.Stop failStage(ex)
case Supervision.Resume pull(in)
case Supervision.Restart case Supervision.Restart
aggregator = _: T aggregator = _: T
setInitialInHandler() setInitialInHandler()
pull(in) case _ ()
} }
} finally {
if (!isClosed(in)) pull(in)
} }
} }