parent
c15e04e051
commit
5afb68cd59
1 changed files with 4 additions and 116 deletions
|
|
@ -11,37 +11,15 @@ import akka.stream.stage.Stage
|
||||||
import akka.stream.stage.SyncDirective
|
import akka.stream.stage.SyncDirective
|
||||||
import akka.testkit.AkkaSpec
|
import akka.testkit.AkkaSpec
|
||||||
|
|
||||||
object InterpreterSupervisionSpec {
|
|
||||||
val TE = new Exception("TEST") with NoStackTrace {
|
|
||||||
override def toString = "TE"
|
|
||||||
}
|
|
||||||
|
|
||||||
class RestartTestStage extends PushPullStage[Int, Int] {
|
|
||||||
var sum = 0
|
|
||||||
def onPush(elem: Int, ctx: Context[Int]): SyncDirective = {
|
|
||||||
sum += elem
|
|
||||||
ctx.push(sum)
|
|
||||||
}
|
|
||||||
|
|
||||||
override def onPull(ctx: Context[Int]): SyncDirective = {
|
|
||||||
ctx.pull()
|
|
||||||
}
|
|
||||||
|
|
||||||
override def decide(t: Throwable): Supervision.Directive = Supervision.Restart
|
|
||||||
|
|
||||||
override def restart(): Stage[Int, Int] = {
|
|
||||||
sum = 0
|
|
||||||
this
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
class InterpreterSupervisionSpec extends AkkaSpec with GraphInterpreterSpecKit {
|
class InterpreterSupervisionSpec extends AkkaSpec with GraphInterpreterSpecKit {
|
||||||
import InterpreterSupervisionSpec._
|
|
||||||
import Supervision.stoppingDecider
|
import Supervision.stoppingDecider
|
||||||
import Supervision.resumingDecider
|
import Supervision.resumingDecider
|
||||||
import Supervision.restartingDecider
|
import Supervision.restartingDecider
|
||||||
|
|
||||||
|
val TE = new Exception("TEST") with NoStackTrace {
|
||||||
|
override def toString = "TE"
|
||||||
|
}
|
||||||
|
|
||||||
"Interpreter error handling" must {
|
"Interpreter error handling" must {
|
||||||
|
|
||||||
"handle external failure" in new OneBoundedSetup[Int](Seq(Map((x: Int) ⇒ x + 1, stoppingDecider))) {
|
"handle external failure" in new OneBoundedSetup[Int](Seq(Map((x: Int) ⇒ x + 1, stoppingDecider))) {
|
||||||
|
|
@ -49,7 +27,6 @@ class InterpreterSupervisionSpec extends AkkaSpec with GraphInterpreterSpecKit {
|
||||||
|
|
||||||
upstream.onError(TE)
|
upstream.onError(TE)
|
||||||
lastEvents() should be(Set(OnError(TE)))
|
lastEvents() should be(Set(OnError(TE)))
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
"emit failure when op throws" in new OneBoundedSetup[Int](Seq(Map((x: Int) ⇒ if (x == 0) throw TE else x, stoppingDecider))) {
|
"emit failure when op throws" in new OneBoundedSetup[Int](Seq(Map((x: Int) ⇒ if (x == 0) throw TE else x, stoppingDecider))) {
|
||||||
|
|
@ -163,95 +140,6 @@ class InterpreterSupervisionSpec extends AkkaSpec with GraphInterpreterSpecKit {
|
||||||
lastEvents() should be(Set(OnNext(Vector(13, 14)), OnComplete))
|
lastEvents() should be(Set(OnNext(Vector(13, 14)), OnComplete))
|
||||||
}
|
}
|
||||||
|
|
||||||
"restart when onPush throws" in {
|
|
||||||
val stage = new RestartTestStage {
|
|
||||||
override def onPush(elem: Int, ctx: Context[Int]): SyncDirective = {
|
|
||||||
if (elem <= 0) throw TE
|
|
||||||
else super.onPush(elem, ctx)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
new OneBoundedSetup[Int](Seq(
|
|
||||||
Map((x: Int) ⇒ x + 1, restartingDecider),
|
|
||||||
stage,
|
|
||||||
Map((x: Int) ⇒ x + 100, restartingDecider))) {
|
|
||||||
|
|
||||||
downstream.requestOne()
|
|
||||||
lastEvents() should be(Set(RequestOne))
|
|
||||||
upstream.onNext(2)
|
|
||||||
lastEvents() should be(Set(OnNext(103)))
|
|
||||||
|
|
||||||
downstream.requestOne()
|
|
||||||
lastEvents() should be(Set(RequestOne))
|
|
||||||
upstream.onNext(-1) // boom
|
|
||||||
lastEvents() should be(Set(RequestOne))
|
|
||||||
|
|
||||||
upstream.onNext(3)
|
|
||||||
lastEvents() should be(Set(OnNext(104)))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
"restart when onPush throws after ctx.push" in {
|
|
||||||
val stage = new RestartTestStage {
|
|
||||||
override def onPush(elem: Int, ctx: Context[Int]): SyncDirective = {
|
|
||||||
val ret = ctx.push(elem)
|
|
||||||
if (elem <= 0) throw TE
|
|
||||||
ret
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
new OneBoundedSetup[Int](Seq(
|
|
||||||
Map((x: Int) ⇒ x + 1, restartingDecider),
|
|
||||||
stage,
|
|
||||||
Map((x: Int) ⇒ x + 100, restartingDecider))) {
|
|
||||||
|
|
||||||
downstream.requestOne()
|
|
||||||
lastEvents() should be(Set(RequestOne))
|
|
||||||
upstream.onNext(2)
|
|
||||||
lastEvents() should be(Set(OnNext(103)))
|
|
||||||
|
|
||||||
downstream.requestOne()
|
|
||||||
lastEvents() should be(Set(RequestOne))
|
|
||||||
upstream.onNext(-1) // boom
|
|
||||||
// The element has been pushed before the exception, there is no way back
|
|
||||||
lastEvents() should be(Set(OnNext(100)))
|
|
||||||
|
|
||||||
downstream.requestOne()
|
|
||||||
lastEvents() should be(Set(RequestOne))
|
|
||||||
|
|
||||||
upstream.onNext(3)
|
|
||||||
lastEvents() should be(Set(OnNext(104)))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
"fail when onPull throws" in {
|
|
||||||
val stage = new RestartTestStage {
|
|
||||||
override def onPull(ctx: Context[Int]): SyncDirective = {
|
|
||||||
if (sum < 0) throw TE
|
|
||||||
super.onPull(ctx)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
new OneBoundedSetup[Int](Seq(
|
|
||||||
Map((x: Int) ⇒ x + 1, restartingDecider),
|
|
||||||
stage,
|
|
||||||
Map((x: Int) ⇒ x + 100, restartingDecider))) {
|
|
||||||
|
|
||||||
downstream.requestOne()
|
|
||||||
lastEvents() should be(Set(RequestOne))
|
|
||||||
upstream.onNext(2)
|
|
||||||
lastEvents() should be(Set(OnNext(103)))
|
|
||||||
|
|
||||||
downstream.requestOne()
|
|
||||||
lastEvents() should be(Set(RequestOne))
|
|
||||||
upstream.onNext(-5) // this will trigger failure of next requestOne (pull)
|
|
||||||
lastEvents() should be(Set(OnNext(99)))
|
|
||||||
|
|
||||||
downstream.requestOne() // boom
|
|
||||||
lastEvents() should be(Set(OnError(TE), Cancel))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
"fail when Expand `seed` throws" in new OneBoundedSetup[Int](
|
"fail when Expand `seed` throws" in new OneBoundedSetup[Int](
|
||||||
new Expand((in: Int) ⇒ if (in == 2) throw TE else Iterator(in) ++ Iterator.continually(-math.abs(in)))) {
|
new Expand((in: Int) ⇒ if (in == 2) throw TE else Iterator(in) ++ Iterator.continually(-math.abs(in)))) {
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue