diff --git a/akka-stream-tests/src/test/scala/akka/stream/io/TlsSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/io/TlsSpec.scala index 3e3ac2e5b7..3fe49fa454 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/io/TlsSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/io/TlsSpec.scala @@ -453,7 +453,6 @@ class TlsSpec extends StreamSpec("akka.loglevel=INFO\nakka.actor.debug.receive=o .via(new Timeout(6.seconds)) .dropWhile(_.size < scenario.output.size) - val f = Source(scenario.inputs) .via(outFlow) diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowFoldSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowFoldSpec.scala index 4c0bac1783..dd6e765da1 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowFoldSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowFoldSpec.scala @@ -8,9 +8,9 @@ import akka.stream.testkit.StreamSpec import scala.concurrent.Await import scala.util.control.NoStackTrace - -import akka.stream.ActorMaterializer +import akka.stream.{ ActorAttributes, ActorMaterializer, Supervision } import akka.stream.testkit.Utils._ + import scala.concurrent.duration._ class FlowFoldSpec extends StreamSpec { @@ -50,12 +50,28 @@ class FlowFoldSpec extends StreamSpec { the[Exception] thrownBy Await.result(future, 3.seconds) should be(error) } - "complete future with failure when folding function throws" in assertAllStagesStopped { + "complete future with failure when the folding function throws and the supervisor strategy decides to stop" in assertAllStagesStopped { val error = new Exception with NoStackTrace val future = inputSource.runFold(0)((x, y) ⇒ if (x > 50) throw error else x + y) the[Exception] thrownBy Await.result(future, 3.seconds) should be(error) } + "resume with the accumulated state when the folding function throws and the supervisor strategy decides to resume" in assertAllStagesStopped { + val error = new Exception with NoStackTrace + val fold = Sink.fold[Int, Int](0)((x, y) ⇒ if (y == 50) throw error else x + y) + val future = inputSource.runWith(fold.withAttributes(ActorAttributes.supervisionStrategy(Supervision.resumingDecider))) + + Await.result(future, 3.seconds) should be(expected - 50) + } + + "resume and reset the state when the folding function throws when the supervisor strategy decides to restart" in assertAllStagesStopped { + val error = new Exception with NoStackTrace + val fold = Sink.fold[Int, Int](0)((x, y) ⇒ if (y == 50) throw error else x + y) + val future = inputSource.runWith(fold.withAttributes(ActorAttributes.supervisionStrategy(Supervision.restartingDecider))) + + Await.result(future, 3.seconds) should be((51 to 100).sum) + } + "complete future and return zero given an empty stream" in assertAllStagesStopped { val futureValue = Source.fromIterator[Int](() ⇒ Iterator.empty) diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowReduceSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowReduceSpec.scala index ea163ab68e..f105b137f7 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowReduceSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowReduceSpec.scala @@ -7,9 +7,9 @@ import akka.stream.testkit.StreamSpec import scala.concurrent.Await import scala.util.control.NoStackTrace - -import akka.stream.ActorMaterializer +import akka.stream.{ ActorAttributes, ActorMaterializer, Supervision } import akka.stream.testkit.Utils._ + import scala.concurrent.duration._ class FlowReduceSpec extends StreamSpec { @@ -49,12 +49,26 @@ class FlowReduceSpec extends StreamSpec { the[Exception] thrownBy Await.result(future, 3.seconds) should be(error) } - "complete future with failure when reducing function throws" in assertAllStagesStopped { + "complete future with failure when reducing function throws and the supervisor strategy decides to stop" in assertAllStagesStopped { val error = new Exception with NoStackTrace val future = inputSource.runReduce[Int]((x, y) ⇒ if (x > 50) throw error else x + y) the[Exception] thrownBy Await.result(future, 3.seconds) should be(error) } + "resume with the accumulated state when the folding function throws and the supervisor strategy decides to resume" in assertAllStagesStopped { + val error = new Exception with NoStackTrace + val reduce = Sink.reduce[Int]((x, y) ⇒ if (y == 50) throw error else x + y) + val future = inputSource.runWith(reduce.withAttributes(ActorAttributes.supervisionStrategy(Supervision.resumingDecider))) + Await.result(future, 3.seconds) should be(expected - 50) + } + + "resume and reset the state when the folding function throws when the supervisor strategy decides to restart" in assertAllStagesStopped { + val error = new Exception with NoStackTrace + val reduce = Sink.reduce[Int]((x, y) ⇒ if (y == 50) throw error else x + y) + val future = inputSource.runWith(reduce.withAttributes(ActorAttributes.supervisionStrategy(Supervision.restartingDecider))) + Await.result(future, 3.seconds) should be((51 to 100).sum) + } + "fail on empty stream using Source.runReduce" in assertAllStagesStopped { val result = Source.empty[Int].runReduce(_ + _) val ex = intercept[NoSuchElementException] { Await.result(result, 3.seconds) } diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala index 381ebbc7cb..54465d4ab1 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala @@ -522,8 +522,9 @@ final case class Fold[In, Out](zero: Out, f: (Out, In) ⇒ Out) extends GraphSta pull(in) } catch { case NonFatal(ex) ⇒ decider(ex) match { - case Supervision.Stop ⇒ failStage(ex) - case _ ⇒ + case Supervision.Stop ⇒ failStage(ex) + case Supervision.Resume ⇒ pull(in) + case Supervision.Restart ⇒ aggregator = zero pull(in) } @@ -1623,21 +1624,37 @@ final class Reduce[T](val f: (T, T) ⇒ T) extends SimpleLinearGraphStage[T] { var aggregator: T = _ - // Initial input handler - setHandler(in, new InHandler { - override def onPush(): Unit = { - aggregator = grab(in) - pull(in) - setHandler(in, self) - } + private def decider = + inheritedAttributes.get[SupervisionStrategy].map(_.decider).getOrElse(Supervision.stoppingDecider) - override def onUpstreamFinish(): Unit = - failStage(new NoSuchElementException("reduce over empty stream")) - }) + def setInitialInHandler(): Unit = { + // Initial input handler + setHandler(in, new InHandler { + override def onPush(): Unit = { + aggregator = grab(in) + pull(in) + setHandler(in, self) + } + + override def onUpstreamFinish(): Unit = + failStage(new NoSuchElementException("reduce over empty stream")) + }) + } override def onPush(): Unit = { - aggregator = f(aggregator, grab(in)) - pull(in) + try { + aggregator = f(aggregator, grab(in)) + pull(in) + } catch { + case NonFatal(ex) ⇒ decider(ex) match { + case Supervision.Stop ⇒ failStage(ex) + case Supervision.Resume ⇒ pull(in) + case Supervision.Restart ⇒ + aggregator = _: T + setInitialInHandler() + pull(in) + } + } } override def onPull(): Unit = pull(in) @@ -1647,6 +1664,7 @@ final class Reduce[T](val f: (T, T) ⇒ T) extends SimpleLinearGraphStage[T] { completeStage() } + setInitialInHandler() setHandler(out, self) } diff --git a/akka-stream/src/main/scala/akka/stream/impl/io/TLSActor.scala b/akka-stream/src/main/scala/akka/stream/impl/io/TLSActor.scala index c7932eddc5..918b57b129 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/io/TLSActor.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/io/TLSActor.scala @@ -316,7 +316,7 @@ class TLSActor( if (tracing) log.debug(s"SSLException during doUnwrap: $ex") fail(ex, closeTransport = false) engine.closeInbound() // we don't need to add lastHandshakeStatus check here because - completeOrFlush() // it doesn't make any sense to write anything to the network anymore + completeOrFlush() // it doesn't make any sense to write anything to the network anymore false } } else true