diff --git a/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/InterpreterSupervisionSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/InterpreterSupervisionSpec.scala index 37f76dfa04..f4daee0ddd 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/InterpreterSupervisionSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/InterpreterSupervisionSpec.scala @@ -128,6 +128,17 @@ class InterpreterSupervisionSpec extends InterpreterSpecKit { lastEvents() should be(Set(RequestOne)) upstream.onNext(3) lastEvents() should be(Set(OnNext(3))) + + // try one more time + downstream.requestOne() + lastEvents() should be(Set(RequestOne)) + upstream.onNext(0) // boom + lastEvents() should be(Set(RequestOne)) + + downstream.requestOne() + lastEvents() should be(Set(RequestOne)) + upstream.onNext(4) + lastEvents() should be(Set(OnNext(4))) } "resume when Map throws in middle of the chain" in new TestSetup(Seq( diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSupervisionSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSupervisionSpec.scala index b8af388255..b6f253d764 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSupervisionSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSupervisionSpec.scala @@ -23,7 +23,7 @@ class FlowSupervisionSpec extends AkkaSpec { val failingMap = Flow[Int].map(n ⇒ if (n == 3) throw exc else n) def run(f: Flow[Int, Int, Unit]): immutable.Seq[Int] = - Await.result(Source(1 to 5).via(f).grouped(1000).runWith(Sink.head()), 3.seconds) + Await.result(Source((1 to 5).toSeq ++ (1 to 5)).via(f).grouped(1000).runWith(Sink.head()), 3.seconds) "Stream supervision" must { @@ -35,7 +35,12 @@ class FlowSupervisionSpec extends AkkaSpec { "support resume " in { val result = run(failingMap.withAttributes(supervisionStrategy(Supervision.resumingDecider))) - result should be(List(1, 2, 4, 5)) + result should be(List(1, 2, 4, 5, 1, 2, 4, 5)) + } + + "support restart " in { + val result = run(failingMap.withAttributes(supervisionStrategy(Supervision.restartingDecider))) + result should be(List(1, 2, 4, 5, 1, 2, 4, 5)) } } diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/Interpreter.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/Interpreter.scala index 6c635a7d45..8e0e8fbe0f 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/Interpreter.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/Interpreter.scala @@ -418,9 +418,14 @@ private[akka] class OneBoundedInterpreter(ops: Seq[Stage[_, _]], val forkLimit: case NonFatal(e) if lastOpFailing != activeOpIndex ⇒ lastOpFailing = activeOpIndex decide(e) match { - case Supervision.Stop ⇒ state.fail(e) - case Supervision.Resume ⇒ state.pull() + case Supervision.Stop ⇒ state.fail(e) + case Supervision.Resume ⇒ + // reset, purpose of lastOpFailing is to avoid infinite loops when fail fails -- double fault + lastOpFailing = -1 + state.pull() case Supervision.Restart ⇒ + // reset, purpose of lastOpFailing is to avoid infinite loops when fail fails -- double fault + lastOpFailing = -1 pipeline(activeOpIndex) = pipeline(activeOpIndex).restart().asInstanceOf[UntypedOp] state.pull() }