diff --git a/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/InterpreterSupervisionSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/InterpreterSupervisionSpec.scala index a0ff76e665..ec86f8e6bc 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/InterpreterSupervisionSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/InterpreterSupervisionSpec.scala @@ -11,37 +11,15 @@ import akka.stream.stage.Stage import akka.stream.stage.SyncDirective import akka.testkit.AkkaSpec -object InterpreterSupervisionSpec { - val TE = new Exception("TEST") with NoStackTrace { - override def toString = "TE" - } - - class RestartTestStage extends PushPullStage[Int, Int] { - var sum = 0 - def onPush(elem: Int, ctx: Context[Int]): SyncDirective = { - sum += elem - ctx.push(sum) - } - - override def onPull(ctx: Context[Int]): SyncDirective = { - ctx.pull() - } - - override def decide(t: Throwable): Supervision.Directive = Supervision.Restart - - override def restart(): Stage[Int, Int] = { - sum = 0 - this - } - } -} - class InterpreterSupervisionSpec extends AkkaSpec with GraphInterpreterSpecKit { - import InterpreterSupervisionSpec._ import Supervision.stoppingDecider import Supervision.resumingDecider import Supervision.restartingDecider + val TE = new Exception("TEST") with NoStackTrace { + override def toString = "TE" + } + "Interpreter error handling" must { "handle external failure" in new OneBoundedSetup[Int](Seq(Map((x: Int) ⇒ x + 1, stoppingDecider))) { @@ -49,7 +27,6 @@ class InterpreterSupervisionSpec extends AkkaSpec with GraphInterpreterSpecKit { upstream.onError(TE) lastEvents() should be(Set(OnError(TE))) - } "emit failure when op throws" in new OneBoundedSetup[Int](Seq(Map((x: Int) ⇒ if (x == 0) throw TE else x, stoppingDecider))) { @@ -163,95 +140,6 @@ class InterpreterSupervisionSpec extends AkkaSpec with GraphInterpreterSpecKit { lastEvents() should be(Set(OnNext(Vector(13, 14)), OnComplete)) } - "restart when onPush throws" in { - val stage = new RestartTestStage { - override def onPush(elem: Int, ctx: Context[Int]): SyncDirective = { - if (elem <= 0) throw TE - else super.onPush(elem, ctx) - } - } - - new OneBoundedSetup[Int](Seq( - Map((x: Int) ⇒ x + 1, restartingDecider), - stage, - Map((x: Int) ⇒ x + 100, restartingDecider))) { - - downstream.requestOne() - lastEvents() should be(Set(RequestOne)) - upstream.onNext(2) - lastEvents() should be(Set(OnNext(103))) - - downstream.requestOne() - lastEvents() should be(Set(RequestOne)) - upstream.onNext(-1) // boom - lastEvents() should be(Set(RequestOne)) - - upstream.onNext(3) - lastEvents() should be(Set(OnNext(104))) - } - } - - "restart when onPush throws after ctx.push" in { - val stage = new RestartTestStage { - override def onPush(elem: Int, ctx: Context[Int]): SyncDirective = { - val ret = ctx.push(elem) - if (elem <= 0) throw TE - ret - } - } - - new OneBoundedSetup[Int](Seq( - Map((x: Int) ⇒ x + 1, restartingDecider), - stage, - Map((x: Int) ⇒ x + 100, restartingDecider))) { - - downstream.requestOne() - lastEvents() should be(Set(RequestOne)) - upstream.onNext(2) - lastEvents() should be(Set(OnNext(103))) - - downstream.requestOne() - lastEvents() should be(Set(RequestOne)) - upstream.onNext(-1) // boom - // The element has been pushed before the exception, there is no way back - lastEvents() should be(Set(OnNext(100))) - - downstream.requestOne() - lastEvents() should be(Set(RequestOne)) - - upstream.onNext(3) - lastEvents() should be(Set(OnNext(104))) - } - } - - "fail when onPull throws" in { - val stage = new RestartTestStage { - override def onPull(ctx: Context[Int]): SyncDirective = { - if (sum < 0) throw TE - super.onPull(ctx) - } - } - - new OneBoundedSetup[Int](Seq( - Map((x: Int) ⇒ x + 1, restartingDecider), - stage, - Map((x: Int) ⇒ x + 100, restartingDecider))) { - - downstream.requestOne() - lastEvents() should be(Set(RequestOne)) - upstream.onNext(2) - lastEvents() should be(Set(OnNext(103))) - - downstream.requestOne() - lastEvents() should be(Set(RequestOne)) - upstream.onNext(-5) // this will trigger failure of next requestOne (pull) - lastEvents() should be(Set(OnNext(99))) - - downstream.requestOne() // boom - lastEvents() should be(Set(OnError(TE), Cancel)) - } - } - "fail when Expand `seed` throws" in new OneBoundedSetup[Int]( new Expand((in: Int) ⇒ if (in == 2) throw TE else Iterator(in) ++ Iterator.continually(-math.abs(in)))) {