Fixed error handling behaviour of Fold and Reduce

This commit is contained in:
Andy Chung 2016-11-04 10:38:35 +00:00
parent 20942b3126
commit a6a4d1ac7f
5 changed files with 69 additions and 22 deletions

View file

@ -453,7 +453,6 @@ class TlsSpec extends StreamSpec("akka.loglevel=INFO\nakka.actor.debug.receive=o
.via(new Timeout(6.seconds)) .via(new Timeout(6.seconds))
.dropWhile(_.size < scenario.output.size) .dropWhile(_.size < scenario.output.size)
val f = val f =
Source(scenario.inputs) Source(scenario.inputs)
.via(outFlow) .via(outFlow)

View file

@ -8,9 +8,9 @@ import akka.stream.testkit.StreamSpec
import scala.concurrent.Await import scala.concurrent.Await
import scala.util.control.NoStackTrace import scala.util.control.NoStackTrace
import akka.stream.{ ActorAttributes, ActorMaterializer, Supervision }
import akka.stream.ActorMaterializer
import akka.stream.testkit.Utils._ import akka.stream.testkit.Utils._
import scala.concurrent.duration._ import scala.concurrent.duration._
class FlowFoldSpec extends StreamSpec { class FlowFoldSpec extends StreamSpec {
@ -50,12 +50,28 @@ class FlowFoldSpec extends StreamSpec {
the[Exception] thrownBy Await.result(future, 3.seconds) should be(error) the[Exception] thrownBy Await.result(future, 3.seconds) should be(error)
} }
"complete future with failure when folding function throws" in assertAllStagesStopped { "complete future with failure when the folding function throws and the supervisor strategy decides to stop" in assertAllStagesStopped {
val error = new Exception with NoStackTrace val error = new Exception with NoStackTrace
val future = inputSource.runFold(0)((x, y) if (x > 50) throw error else x + y) val future = inputSource.runFold(0)((x, y) if (x > 50) throw error else x + y)
the[Exception] thrownBy Await.result(future, 3.seconds) should be(error) the[Exception] thrownBy Await.result(future, 3.seconds) should be(error)
} }
"resume with the accumulated state when the folding function throws and the supervisor strategy decides to resume" in assertAllStagesStopped {
val error = new Exception with NoStackTrace
val fold = Sink.fold[Int, Int](0)((x, y) if (y == 50) throw error else x + y)
val future = inputSource.runWith(fold.withAttributes(ActorAttributes.supervisionStrategy(Supervision.resumingDecider)))
Await.result(future, 3.seconds) should be(expected - 50)
}
"resume and reset the state when the folding function throws when the supervisor strategy decides to restart" in assertAllStagesStopped {
val error = new Exception with NoStackTrace
val fold = Sink.fold[Int, Int](0)((x, y) if (y == 50) throw error else x + y)
val future = inputSource.runWith(fold.withAttributes(ActorAttributes.supervisionStrategy(Supervision.restartingDecider)))
Await.result(future, 3.seconds) should be((51 to 100).sum)
}
"complete future and return zero given an empty stream" in assertAllStagesStopped { "complete future and return zero given an empty stream" in assertAllStagesStopped {
val futureValue = val futureValue =
Source.fromIterator[Int](() Iterator.empty) Source.fromIterator[Int](() Iterator.empty)

View file

@ -7,9 +7,9 @@ import akka.stream.testkit.StreamSpec
import scala.concurrent.Await import scala.concurrent.Await
import scala.util.control.NoStackTrace import scala.util.control.NoStackTrace
import akka.stream.{ ActorAttributes, ActorMaterializer, Supervision }
import akka.stream.ActorMaterializer
import akka.stream.testkit.Utils._ import akka.stream.testkit.Utils._
import scala.concurrent.duration._ import scala.concurrent.duration._
class FlowReduceSpec extends StreamSpec { class FlowReduceSpec extends StreamSpec {
@ -49,12 +49,26 @@ class FlowReduceSpec extends StreamSpec {
the[Exception] thrownBy Await.result(future, 3.seconds) should be(error) the[Exception] thrownBy Await.result(future, 3.seconds) should be(error)
} }
"complete future with failure when reducing function throws" in assertAllStagesStopped { "complete future with failure when reducing function throws and the supervisor strategy decides to stop" in assertAllStagesStopped {
val error = new Exception with NoStackTrace val error = new Exception with NoStackTrace
val future = inputSource.runReduce[Int]((x, y) if (x > 50) throw error else x + y) val future = inputSource.runReduce[Int]((x, y) if (x > 50) throw error else x + y)
the[Exception] thrownBy Await.result(future, 3.seconds) should be(error) the[Exception] thrownBy Await.result(future, 3.seconds) should be(error)
} }
"resume with the accumulated state when the folding function throws and the supervisor strategy decides to resume" in assertAllStagesStopped {
val error = new Exception with NoStackTrace
val reduce = Sink.reduce[Int]((x, y) if (y == 50) throw error else x + y)
val future = inputSource.runWith(reduce.withAttributes(ActorAttributes.supervisionStrategy(Supervision.resumingDecider)))
Await.result(future, 3.seconds) should be(expected - 50)
}
"resume and reset the state when the folding function throws when the supervisor strategy decides to restart" in assertAllStagesStopped {
val error = new Exception with NoStackTrace
val reduce = Sink.reduce[Int]((x, y) if (y == 50) throw error else x + y)
val future = inputSource.runWith(reduce.withAttributes(ActorAttributes.supervisionStrategy(Supervision.restartingDecider)))
Await.result(future, 3.seconds) should be((51 to 100).sum)
}
"fail on empty stream using Source.runReduce" in assertAllStagesStopped { "fail on empty stream using Source.runReduce" in assertAllStagesStopped {
val result = Source.empty[Int].runReduce(_ + _) val result = Source.empty[Int].runReduce(_ + _)
val ex = intercept[NoSuchElementException] { Await.result(result, 3.seconds) } val ex = intercept[NoSuchElementException] { Await.result(result, 3.seconds) }

View file

@ -522,8 +522,9 @@ final case class Fold[In, Out](zero: Out, f: (Out, In) ⇒ Out) extends GraphSta
pull(in) pull(in)
} catch { } catch {
case NonFatal(ex) decider(ex) match { case NonFatal(ex) decider(ex) match {
case Supervision.Stop failStage(ex) case Supervision.Stop failStage(ex)
case _ case Supervision.Resume pull(in)
case Supervision.Restart
aggregator = zero aggregator = zero
pull(in) pull(in)
} }
@ -1623,21 +1624,37 @@ final class Reduce[T](val f: (T, T) ⇒ T) extends SimpleLinearGraphStage[T] {
var aggregator: T = _ var aggregator: T = _
// Initial input handler private def decider =
setHandler(in, new InHandler { inheritedAttributes.get[SupervisionStrategy].map(_.decider).getOrElse(Supervision.stoppingDecider)
override def onPush(): Unit = {
aggregator = grab(in)
pull(in)
setHandler(in, self)
}
override def onUpstreamFinish(): Unit = def setInitialInHandler(): Unit = {
failStage(new NoSuchElementException("reduce over empty stream")) // Initial input handler
}) setHandler(in, new InHandler {
override def onPush(): Unit = {
aggregator = grab(in)
pull(in)
setHandler(in, self)
}
override def onUpstreamFinish(): Unit =
failStage(new NoSuchElementException("reduce over empty stream"))
})
}
override def onPush(): Unit = { override def onPush(): Unit = {
aggregator = f(aggregator, grab(in)) try {
pull(in) aggregator = f(aggregator, grab(in))
pull(in)
} catch {
case NonFatal(ex) decider(ex) match {
case Supervision.Stop failStage(ex)
case Supervision.Resume pull(in)
case Supervision.Restart
aggregator = _: T
setInitialInHandler()
pull(in)
}
}
} }
override def onPull(): Unit = pull(in) override def onPull(): Unit = pull(in)
@ -1647,6 +1664,7 @@ final class Reduce[T](val f: (T, T) ⇒ T) extends SimpleLinearGraphStage[T] {
completeStage() completeStage()
} }
setInitialInHandler()
setHandler(out, self) setHandler(out, self)
} }

View file

@ -316,7 +316,7 @@ class TLSActor(
if (tracing) log.debug(s"SSLException during doUnwrap: $ex") if (tracing) log.debug(s"SSLException during doUnwrap: $ex")
fail(ex, closeTransport = false) fail(ex, closeTransport = false)
engine.closeInbound() // we don't need to add lastHandshakeStatus check here because engine.closeInbound() // we don't need to add lastHandshakeStatus check here because
completeOrFlush() // it doesn't make any sense to write anything to the network anymore completeOrFlush() // it doesn't make any sense to write anything to the network anymore
false false
} }
} else true } else true