=str #16982 Fix supervision bug when more than one resume/restart

This commit is contained in:
Patrik Nordwall 2015-03-05 14:45:34 +01:00
parent e3e01d2c9b
commit 75087a19f2
3 changed files with 25 additions and 4 deletions

View file

@ -128,6 +128,17 @@ class InterpreterSupervisionSpec extends InterpreterSpecKit {
lastEvents() should be(Set(RequestOne))
upstream.onNext(3)
lastEvents() should be(Set(OnNext(3)))
// try one more time
downstream.requestOne()
lastEvents() should be(Set(RequestOne))
upstream.onNext(0) // boom
lastEvents() should be(Set(RequestOne))
downstream.requestOne()
lastEvents() should be(Set(RequestOne))
upstream.onNext(4)
lastEvents() should be(Set(OnNext(4)))
}
"resume when Map throws in middle of the chain" in new TestSetup(Seq(

View file

@ -23,7 +23,7 @@ class FlowSupervisionSpec extends AkkaSpec {
val failingMap = Flow[Int].map(n if (n == 3) throw exc else n)
def run(f: Flow[Int, Int, Unit]): immutable.Seq[Int] =
Await.result(Source(1 to 5).via(f).grouped(1000).runWith(Sink.head()), 3.seconds)
Await.result(Source((1 to 5).toSeq ++ (1 to 5)).via(f).grouped(1000).runWith(Sink.head()), 3.seconds)
"Stream supervision" must {
@ -35,7 +35,12 @@ class FlowSupervisionSpec extends AkkaSpec {
"support resume " in {
val result = run(failingMap.withAttributes(supervisionStrategy(Supervision.resumingDecider)))
result should be(List(1, 2, 4, 5))
result should be(List(1, 2, 4, 5, 1, 2, 4, 5))
}
"support restart " in {
val result = run(failingMap.withAttributes(supervisionStrategy(Supervision.restartingDecider)))
result should be(List(1, 2, 4, 5, 1, 2, 4, 5))
}
}

View file

@ -419,8 +419,13 @@ private[akka] class OneBoundedInterpreter(ops: Seq[Stage[_, _]], val forkLimit:
lastOpFailing = activeOpIndex
decide(e) match {
case Supervision.Stop state.fail(e)
case Supervision.Resume state.pull()
case Supervision.Resume
// reset, purpose of lastOpFailing is to avoid infinite loops when fail fails -- double fault
lastOpFailing = -1
state.pull()
case Supervision.Restart
// reset, purpose of lastOpFailing is to avoid infinite loops when fail fails -- double fault
lastOpFailing = -1
pipeline(activeOpIndex) = pipeline(activeOpIndex).restart().asInstanceOf[UntypedOp]
state.pull()
}