Improved API of OneForOneStrategy and AllForOneStrategy. See #1714

* withinTimeRange: Duration
* Removed need for Options in API
This commit is contained in:
Patrik Nordwall 2012-01-24 08:37:01 +01:00
parent 4c6f9b86d1
commit abc072ef0a
20 changed files with 122 additions and 98 deletions

View file

@ -81,7 +81,8 @@ class ActorFireForgetRequestReplySpec extends AkkaSpec with BeforeAndAfterEach w
"should shutdown crashed temporary actor" in { "should shutdown crashed temporary actor" in {
filterEvents(EventFilter[Exception]("Expected exception")) { filterEvents(EventFilter[Exception]("Expected exception")) {
val supervisor = system.actorOf(Props(new Supervisor(OneForOneStrategy(List(classOf[Exception]), Some(0))))) val supervisor = system.actorOf(Props(new Supervisor(OneForOneStrategy(List(classOf[Exception]),
maxNrOfRetries = 0))))
val actor = Await.result((supervisor ? Props[CrashingActor]).mapTo[ActorRef], timeout.duration) val actor = Await.result((supervisor ? Props[CrashingActor]).mapTo[ActorRef], timeout.duration)
actor.isTerminated must be(false) actor.isTerminated must be(false)
actor ! "Die" actor ! "Die"

View file

@ -36,7 +36,7 @@ class ActorLifeCycleSpec extends AkkaSpec with BeforeAndAfterEach with ImplicitS
"invoke preRestart, preStart, postRestart when using OneForOneStrategy" in { "invoke preRestart, preStart, postRestart when using OneForOneStrategy" in {
filterException[ActorKilledException] { filterException[ActorKilledException] {
val id = newUuid().toString val id = newUuid().toString
val supervisor = system.actorOf(Props(new Supervisor(OneForOneStrategy(List(classOf[Exception]), Some(3))))) val supervisor = system.actorOf(Props(new Supervisor(OneForOneStrategy(List(classOf[Exception]), maxNrOfRetries = 3))))
val gen = new AtomicInteger(0) val gen = new AtomicInteger(0)
val restarterProps = Props(new LifeCycleTestActor(testActor, id, gen) { val restarterProps = Props(new LifeCycleTestActor(testActor, id, gen) {
override def preRestart(reason: Throwable, message: Option[Any]) { report("preRestart") } override def preRestart(reason: Throwable, message: Option[Any]) { report("preRestart") }
@ -70,7 +70,7 @@ class ActorLifeCycleSpec extends AkkaSpec with BeforeAndAfterEach with ImplicitS
"default for preRestart and postRestart is to call postStop and preStart respectively" in { "default for preRestart and postRestart is to call postStop and preStart respectively" in {
filterException[ActorKilledException] { filterException[ActorKilledException] {
val id = newUuid().toString val id = newUuid().toString
val supervisor = system.actorOf(Props(new Supervisor(OneForOneStrategy(List(classOf[Exception]), Some(3))))) val supervisor = system.actorOf(Props(new Supervisor(OneForOneStrategy(List(classOf[Exception]), maxNrOfRetries = 3))))
val gen = new AtomicInteger(0) val gen = new AtomicInteger(0)
val restarterProps = Props(new LifeCycleTestActor(testActor, id, gen)) val restarterProps = Props(new LifeCycleTestActor(testActor, id, gen))
val restarter = Await.result((supervisor ? restarterProps).mapTo[ActorRef], timeout.duration) val restarter = Await.result((supervisor ? restarterProps).mapTo[ActorRef], timeout.duration)
@ -100,7 +100,7 @@ class ActorLifeCycleSpec extends AkkaSpec with BeforeAndAfterEach with ImplicitS
"not invoke preRestart and postRestart when never restarted using OneForOneStrategy" in { "not invoke preRestart and postRestart when never restarted using OneForOneStrategy" in {
val id = newUuid().toString val id = newUuid().toString
val supervisor = system.actorOf(Props(new Supervisor(OneForOneStrategy(List(classOf[Exception]), Some(3))))) val supervisor = system.actorOf(Props(new Supervisor(OneForOneStrategy(List(classOf[Exception]), maxNrOfRetries = 3))))
val gen = new AtomicInteger(0) val gen = new AtomicInteger(0)
val props = Props(new LifeCycleTestActor(testActor, id, gen)) val props = Props(new LifeCycleTestActor(testActor, id, gen))
val a = Await.result((supervisor ? props).mapTo[ActorRef], timeout.duration) val a = Await.result((supervisor ? props).mapTo[ActorRef], timeout.duration)

View file

@ -376,7 +376,7 @@ class ActorRefSpec extends AkkaSpec with DefaultTimeout {
val boss = system.actorOf(Props(new Actor { val boss = system.actorOf(Props(new Actor {
override val supervisorStrategy = OneForOneStrategy(List(classOf[Throwable]), 2, 1000) override val supervisorStrategy = OneForOneStrategy(List(classOf[Throwable]), maxNrOfRetries = 2, withinTimeRange = 1 second)
val ref = context.actorOf( val ref = context.actorOf(
Props(new Actor { Props(new Actor {

View file

@ -95,7 +95,7 @@ trait DeathWatchSpec { this: AkkaSpec with ImplicitSender with DefaultTimeout
"notify with a Terminated message once when an Actor is stopped but not when restarted" in { "notify with a Terminated message once when an Actor is stopped but not when restarted" in {
filterException[ActorKilledException] { filterException[ActorKilledException] {
val supervisor = system.actorOf(Props(new Supervisor(OneForOneStrategy(List(classOf[Exception]), Some(2))))) val supervisor = system.actorOf(Props(new Supervisor(OneForOneStrategy(List(classOf[Exception]), maxNrOfRetries = 2))))
val terminalProps = Props(context { case x context.sender ! x }) val terminalProps = Props(context { case x context.sender ! x })
val terminal = Await.result((supervisor ? terminalProps).mapTo[ActorRef], timeout.duration) val terminal = Await.result((supervisor ? terminalProps).mapTo[ActorRef], timeout.duration)
@ -116,7 +116,7 @@ trait DeathWatchSpec { this: AkkaSpec with ImplicitSender with DefaultTimeout
"fail a monitor which does not handle Terminated()" in { "fail a monitor which does not handle Terminated()" in {
filterEvents(EventFilter[ActorKilledException](), EventFilter[DeathPactException]()) { filterEvents(EventFilter[ActorKilledException](), EventFilter[DeathPactException]()) {
case class FF(fail: Failed) case class FF(fail: Failed)
val strategy = new OneForOneStrategy(SupervisorStrategy.makeDecider(List(classOf[Exception])), Some(0)) { val strategy = new OneForOneStrategy(SupervisorStrategy.makeDecider(List(classOf[Exception])), maxNrOfRetries = 0) {
override def handleFailure(context: ActorContext, child: ActorRef, cause: Throwable, stats: ChildRestartStats, children: Iterable[ChildRestartStats]) = { override def handleFailure(context: ActorContext, child: ActorRef, cause: Throwable, stats: ChildRestartStats, children: Iterable[ChildRestartStats]) = {
testActor.tell(FF(Failed(cause)), child) testActor.tell(FF(Failed(cause)), child)
super.handleFailure(context, child, cause, stats, children) super.handleFailure(context, child, cause, stats, children)

View file

@ -5,8 +5,8 @@ package akka.actor
import akka.testkit._ import akka.testkit._
import akka.util.duration._ import akka.util.duration._
import FSM._ import FSM._
import akka.util.Duration
object FSMTransitionSpec { object FSMTransitionSpec {
@ -72,7 +72,7 @@ class FSMTransitionSpec extends AkkaSpec with ImplicitSender {
val fsm = system.actorOf(Props(new MyFSM(testActor))) val fsm = system.actorOf(Props(new MyFSM(testActor)))
val sup = system.actorOf(Props(new Actor { val sup = system.actorOf(Props(new Actor {
context.watch(fsm) context.watch(fsm)
override val supervisorStrategy = OneForOneStrategy(List(classOf[Throwable]), None, None) override val supervisorStrategy = OneForOneStrategy(List(classOf[Throwable]), Duration.Inf)
def receive = { case _ } def receive = { case _ }
})) }))

View file

@ -14,6 +14,7 @@ import akka.testkit.AkkaSpec
import akka.testkit.DefaultTimeout import akka.testkit.DefaultTimeout
import akka.testkit.TestLatch import akka.testkit.TestLatch
import akka.util.duration._ import akka.util.duration._
import akka.util.Duration
import akka.pattern.ask import akka.pattern.ask
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
@ -29,7 +30,8 @@ class RestartStrategySpec extends AkkaSpec with DefaultTimeout {
"A RestartStrategy" must { "A RestartStrategy" must {
"ensure that slave stays dead after max restarts within time range" in { "ensure that slave stays dead after max restarts within time range" in {
val boss = system.actorOf(Props(new Supervisor(OneForOneStrategy(List(classOf[Throwable]), 2, 1000)))) val boss = system.actorOf(Props(new Supervisor(OneForOneStrategy(List(classOf[Throwable]),
maxNrOfRetries = 2, withinTimeRange = 1 second))))
val restartLatch = new TestLatch val restartLatch = new TestLatch
val secondRestartLatch = new TestLatch val secondRestartLatch = new TestLatch
@ -75,7 +77,7 @@ class RestartStrategySpec extends AkkaSpec with DefaultTimeout {
} }
"ensure that slave is immortal without max restarts and time range" in { "ensure that slave is immortal without max restarts and time range" in {
val boss = system.actorOf(Props(new Supervisor(OneForOneStrategy(List(classOf[Throwable]), None, None)))) val boss = system.actorOf(Props(new Supervisor(OneForOneStrategy(List(classOf[Throwable]), Duration.Inf))))
val countDownLatch = new TestLatch(100) val countDownLatch = new TestLatch(100)
@ -97,7 +99,8 @@ class RestartStrategySpec extends AkkaSpec with DefaultTimeout {
} }
"ensure that slave restarts after number of crashes not within time range" in { "ensure that slave restarts after number of crashes not within time range" in {
val boss = system.actorOf(Props(new Supervisor(OneForOneStrategy(List(classOf[Throwable]), 2, 500)))) val boss = system.actorOf(Props(new Supervisor(OneForOneStrategy(List(classOf[Throwable]),
maxNrOfRetries = 2, withinTimeRange = 500 millis))))
val restartLatch = new TestLatch val restartLatch = new TestLatch
val secondRestartLatch = new TestLatch val secondRestartLatch = new TestLatch
@ -154,7 +157,7 @@ class RestartStrategySpec extends AkkaSpec with DefaultTimeout {
} }
"ensure that slave is not restarted after max retries" in { "ensure that slave is not restarted after max retries" in {
val boss = system.actorOf(Props(new Supervisor(OneForOneStrategy(List(classOf[Throwable]), Some(2), None)))) val boss = system.actorOf(Props(new Supervisor(OneForOneStrategy(List(classOf[Throwable]), maxNrOfRetries = 2))))
val restartLatch = new TestLatch val restartLatch = new TestLatch
val secondRestartLatch = new TestLatch val secondRestartLatch = new TestLatch
@ -209,7 +212,7 @@ class RestartStrategySpec extends AkkaSpec with DefaultTimeout {
val countDownLatch = new TestLatch(2) val countDownLatch = new TestLatch(2)
val boss = system.actorOf(Props(new Actor { val boss = system.actorOf(Props(new Actor {
override val supervisorStrategy = OneForOneStrategy(List(classOf[Throwable]), None, Some(1000)) override val supervisorStrategy = OneForOneStrategy(List(classOf[Throwable]), withinTimeRange = 1 second)
def receive = { def receive = {
case p: Props sender ! context.watch(context.actorOf(p)) case p: Props sender ! context.watch(context.actorOf(p))
case t: Terminated maxNoOfRestartsLatch.open() case t: Terminated maxNoOfRestartsLatch.open()

View file

@ -134,7 +134,7 @@ class SchedulerSpec extends AkkaSpec with BeforeAndAfterEach with DefaultTimeout
val restartLatch = new TestLatch val restartLatch = new TestLatch
val pingLatch = new TestLatch(6) val pingLatch = new TestLatch(6)
val supervisor = system.actorOf(Props(new Supervisor(AllForOneStrategy(List(classOf[Exception]), 3, 1000)))) val supervisor = system.actorOf(Props(new Supervisor(AllForOneStrategy(List(classOf[Exception]), 3, 1 second))))
val props = Props(new Actor { val props = Props(new Actor {
def receive = { def receive = {
case Ping pingLatch.countDown() case Ping pingLatch.countDown()
@ -165,8 +165,8 @@ class SchedulerSpec extends AkkaSpec with BeforeAndAfterEach with DefaultTimeout
def receive = { def receive = {
case Msg(ts) case Msg(ts)
val now = System.nanoTime val now = System.nanoTime
// Make sure that no message has been dispatched before the scheduled time (10ms = 10000000ns) has occurred // Make sure that no message has been dispatched before the scheduled time (10ms) has occurred
if (now - ts < 10000000) throw new RuntimeException("Interval is too small: " + (now - ts)) if (now - ts < 10.millis.toNanos) throw new RuntimeException("Interval is too small: " + (now - ts))
ticks.countDown() ticks.countDown()
} }
})) }))

View file

@ -5,10 +5,11 @@
package akka.actor package akka.actor
import akka.testkit._ import akka.testkit._
import java.util.concurrent.{ TimeUnit, CountDownLatch } import java.util.concurrent.{ TimeUnit, CountDownLatch }
import akka.dispatch.Await import akka.dispatch.Await
import akka.pattern.ask import akka.pattern.ask
import akka.util.Duration
import akka.util.duration._
object SupervisorHierarchySpec { object SupervisorHierarchySpec {
class FireWorkerException(msg: String) extends Exception(msg) class FireWorkerException(msg: String) extends Exception(msg)
@ -39,9 +40,9 @@ class SupervisorHierarchySpec extends AkkaSpec with DefaultTimeout {
"restart manager and workers in AllForOne" in { "restart manager and workers in AllForOne" in {
val countDown = new CountDownLatch(4) val countDown = new CountDownLatch(4)
val boss = system.actorOf(Props(new Supervisor(OneForOneStrategy(List(classOf[Exception]), None, None)))) val boss = system.actorOf(Props(new Supervisor(OneForOneStrategy(List(classOf[Exception]), Duration.Inf))))
val managerProps = Props(new CountDownActor(countDown, AllForOneStrategy(List(), None, None))) val managerProps = Props(new CountDownActor(countDown, AllForOneStrategy(List(), Duration.Inf)))
val manager = Await.result((boss ? managerProps).mapTo[ActorRef], timeout.duration) val manager = Await.result((boss ? managerProps).mapTo[ActorRef], timeout.duration)
val workerProps = Props(new CountDownActor(countDown, SupervisorStrategy.defaultStrategy)) val workerProps = Props(new CountDownActor(countDown, SupervisorStrategy.defaultStrategy))
@ -61,7 +62,7 @@ class SupervisorHierarchySpec extends AkkaSpec with DefaultTimeout {
val countDownMessages = new CountDownLatch(1) val countDownMessages = new CountDownLatch(1)
val countDownMax = new CountDownLatch(1) val countDownMax = new CountDownLatch(1)
val boss = system.actorOf(Props(new Actor { val boss = system.actorOf(Props(new Actor {
override val supervisorStrategy = OneForOneStrategy(List(classOf[Throwable]), 1, 5000) override val supervisorStrategy = OneForOneStrategy(List(classOf[Throwable]), maxNrOfRetries = 1, withinTimeRange = 5 seconds)
val crasher = context.watch(context.actorOf(Props(new CountDownActor(countDownMessages, SupervisorStrategy.defaultStrategy)))) val crasher = context.watch(context.actorOf(Props(new CountDownActor(countDownMessages, SupervisorStrategy.defaultStrategy))))

View file

@ -9,6 +9,7 @@ import java.util.concurrent.{ TimeUnit, CountDownLatch }
import akka.testkit.AkkaSpec import akka.testkit.AkkaSpec
import akka.testkit.DefaultTimeout import akka.testkit.DefaultTimeout
import akka.pattern.ask import akka.pattern.ask
import akka.util.duration._
object SupervisorMiscSpec { object SupervisorMiscSpec {
val config = """ val config = """
@ -29,7 +30,8 @@ class SupervisorMiscSpec extends AkkaSpec(SupervisorMiscSpec.config) with Defaul
filterEvents(EventFilter[Exception]("Kill")) { filterEvents(EventFilter[Exception]("Kill")) {
val countDownLatch = new CountDownLatch(4) val countDownLatch = new CountDownLatch(4)
val supervisor = system.actorOf(Props(new Supervisor(OneForOneStrategy(List(classOf[Exception]), 3, 5000)))) val supervisor = system.actorOf(Props(new Supervisor(OneForOneStrategy(List(classOf[Exception]),
maxNrOfRetries = 3, withinTimeRange = 5 seconds))))
val workerProps = Props(new Actor { val workerProps = Props(new Actor {
override def postRestart(cause: Throwable) { countDownLatch.countDown() } override def postRestart(cause: Throwable) { countDownLatch.countDown() }

View file

@ -14,7 +14,7 @@ import akka.dispatch.Await
import akka.pattern.ask import akka.pattern.ask
object SupervisorSpec { object SupervisorSpec {
val Timeout = 5 seconds val Timeout = 5.seconds
case object DieReply case object DieReply
@ -54,7 +54,7 @@ object SupervisorSpec {
var s: ActorRef = _ var s: ActorRef = _
override val supervisorStrategy = OneForOneStrategy(List(classOf[Exception]), Some(0)) override val supervisorStrategy = OneForOneStrategy(List(classOf[Exception]), maxNrOfRetries = 0)
def receive = { def receive = {
case Die temp forward Die case Die temp forward Die
@ -69,7 +69,7 @@ class SupervisorSpec extends AkkaSpec with BeforeAndAfterEach with ImplicitSende
import SupervisorSpec._ import SupervisorSpec._
val TimeoutMillis = Timeout.dilated.toMillis.toInt val DilatedTimeout = Timeout.dilated
// ===================================================== // =====================================================
// Creating actors and supervisors // Creating actors and supervisors
@ -78,45 +78,51 @@ class SupervisorSpec extends AkkaSpec with BeforeAndAfterEach with ImplicitSende
private def child(supervisor: ActorRef, props: Props): ActorRef = Await.result((supervisor ? props).mapTo[ActorRef], timeout.duration) private def child(supervisor: ActorRef, props: Props): ActorRef = Await.result((supervisor ? props).mapTo[ActorRef], timeout.duration)
def temporaryActorAllForOne = { def temporaryActorAllForOne = {
val supervisor = system.actorOf(Props(new Supervisor(AllForOneStrategy(List(classOf[Exception]), Some(0))))) val supervisor = system.actorOf(Props(new Supervisor(AllForOneStrategy(List(classOf[Exception]), maxNrOfRetries = 0))))
val temporaryActor = child(supervisor, Props(new PingPongActor(testActor))) val temporaryActor = child(supervisor, Props(new PingPongActor(testActor)))
(temporaryActor, supervisor) (temporaryActor, supervisor)
} }
def singleActorAllForOne = { def singleActorAllForOne = {
val supervisor = system.actorOf(Props(new Supervisor(AllForOneStrategy(List(classOf[Exception]), 3, TimeoutMillis)))) val supervisor = system.actorOf(Props(new Supervisor(AllForOneStrategy(List(classOf[Exception]),
maxNrOfRetries = 3, withinTimeRange = DilatedTimeout))))
val pingpong = child(supervisor, Props(new PingPongActor(testActor))) val pingpong = child(supervisor, Props(new PingPongActor(testActor)))
(pingpong, supervisor) (pingpong, supervisor)
} }
def singleActorOneForOne = { def singleActorOneForOne = {
val supervisor = system.actorOf(Props(new Supervisor(OneForOneStrategy(List(classOf[Exception]), 3, TimeoutMillis)))) val supervisor = system.actorOf(Props(new Supervisor(OneForOneStrategy(List(classOf[Exception]),
maxNrOfRetries = 3, withinTimeRange = DilatedTimeout))))
val pingpong = child(supervisor, Props(new PingPongActor(testActor))) val pingpong = child(supervisor, Props(new PingPongActor(testActor)))
(pingpong, supervisor) (pingpong, supervisor)
} }
def multipleActorsAllForOne = { def multipleActorsAllForOne = {
val supervisor = system.actorOf(Props(new Supervisor(AllForOneStrategy(List(classOf[Exception]), 3, TimeoutMillis)))) val supervisor = system.actorOf(Props(new Supervisor(AllForOneStrategy(List(classOf[Exception]),
maxNrOfRetries = 3, withinTimeRange = DilatedTimeout))))
val pingpong1, pingpong2, pingpong3 = child(supervisor, Props(new PingPongActor(testActor))) val pingpong1, pingpong2, pingpong3 = child(supervisor, Props(new PingPongActor(testActor)))
(pingpong1, pingpong2, pingpong3, supervisor) (pingpong1, pingpong2, pingpong3, supervisor)
} }
def multipleActorsOneForOne = { def multipleActorsOneForOne = {
val supervisor = system.actorOf(Props(new Supervisor(OneForOneStrategy(List(classOf[Exception]), 3, TimeoutMillis)))) val supervisor = system.actorOf(Props(new Supervisor(OneForOneStrategy(List(classOf[Exception]),
maxNrOfRetries = 3, withinTimeRange = DilatedTimeout))))
val pingpong1, pingpong2, pingpong3 = child(supervisor, Props(new PingPongActor(testActor))) val pingpong1, pingpong2, pingpong3 = child(supervisor, Props(new PingPongActor(testActor)))
(pingpong1, pingpong2, pingpong3, supervisor) (pingpong1, pingpong2, pingpong3, supervisor)
} }
def nestedSupervisorsAllForOne = { def nestedSupervisorsAllForOne = {
val topSupervisor = system.actorOf(Props(new Supervisor(AllForOneStrategy(List(classOf[Exception]), 3, TimeoutMillis)))) val topSupervisor = system.actorOf(Props(new Supervisor(AllForOneStrategy(List(classOf[Exception]),
maxNrOfRetries = 3, withinTimeRange = DilatedTimeout))))
val pingpong1 = child(topSupervisor, Props(new PingPongActor(testActor))) val pingpong1 = child(topSupervisor, Props(new PingPongActor(testActor)))
val middleSupervisor = child(topSupervisor, Props(new Supervisor(AllForOneStrategy(Nil, 3, TimeoutMillis)))) val middleSupervisor = child(topSupervisor, Props(new Supervisor(AllForOneStrategy(Nil,
maxNrOfRetries = 3, withinTimeRange = DilatedTimeout))))
val pingpong2, pingpong3 = child(middleSupervisor, Props(new PingPongActor(testActor))) val pingpong2, pingpong3 = child(middleSupervisor, Props(new PingPongActor(testActor)))
(pingpong1, pingpong2, pingpong3, topSupervisor) (pingpong1, pingpong2, pingpong3, topSupervisor)
@ -131,14 +137,14 @@ class SupervisorSpec extends AkkaSpec with BeforeAndAfterEach with ImplicitSende
} }
def ping(pingPongActor: ActorRef) = { def ping(pingPongActor: ActorRef) = {
Await.result(pingPongActor.?(Ping)(TimeoutMillis), TimeoutMillis millis) must be === PongMessage Await.result(pingPongActor.?(Ping)(DilatedTimeout), DilatedTimeout) must be === PongMessage
expectMsg(Timeout, PingMessage) expectMsg(Timeout, PingMessage)
} }
def kill(pingPongActor: ActorRef) = { def kill(pingPongActor: ActorRef) = {
val result = (pingPongActor.?(DieReply)(TimeoutMillis)) val result = (pingPongActor.?(DieReply)(DilatedTimeout))
expectMsg(Timeout, ExceptionMessage) expectMsg(Timeout, ExceptionMessage)
intercept[RuntimeException] { Await.result(result, TimeoutMillis millis) } intercept[RuntimeException] { Await.result(result, DilatedTimeout) }
} }
"A supervisor" must { "A supervisor" must {
@ -154,7 +160,7 @@ class SupervisorSpec extends AkkaSpec with BeforeAndAfterEach with ImplicitSende
"not restart temporary actor" in { "not restart temporary actor" in {
val (temporaryActor, _) = temporaryActorAllForOne val (temporaryActor, _) = temporaryActorAllForOne
intercept[RuntimeException] { Await.result(temporaryActor.?(DieReply)(TimeoutMillis), TimeoutMillis millis) } intercept[RuntimeException] { Await.result(temporaryActor.?(DieReply)(DilatedTimeout), DilatedTimeout) }
expectNoMsg(1 second) expectNoMsg(1 second)
} }
@ -280,7 +286,8 @@ class SupervisorSpec extends AkkaSpec with BeforeAndAfterEach with ImplicitSende
"must attempt restart when exception during restart" in { "must attempt restart when exception during restart" in {
val inits = new AtomicInteger(0) val inits = new AtomicInteger(0)
val supervisor = system.actorOf(Props(new Supervisor(OneForOneStrategy(classOf[Exception] :: Nil, 3, 10000)))) val supervisor = system.actorOf(Props(new Supervisor(OneForOneStrategy(classOf[Exception] :: Nil,
maxNrOfRetries = 3, withinTimeRange = 10 seconds))))
val dyingProps = Props(new Actor { val dyingProps = Props(new Actor {
inits.incrementAndGet inits.incrementAndGet
@ -300,11 +307,11 @@ class SupervisorSpec extends AkkaSpec with BeforeAndAfterEach with ImplicitSende
filterEvents(EventFilter[RuntimeException]("Expected", occurrences = 1), filterEvents(EventFilter[RuntimeException]("Expected", occurrences = 1),
EventFilter[IllegalStateException]("error while creating actor", occurrences = 1)) { EventFilter[IllegalStateException]("error while creating actor", occurrences = 1)) {
intercept[RuntimeException] { intercept[RuntimeException] {
Await.result(dyingActor.?(DieReply)(TimeoutMillis), TimeoutMillis millis) Await.result(dyingActor.?(DieReply)(DilatedTimeout), DilatedTimeout)
} }
} }
Await.result(dyingActor.?(Ping)(TimeoutMillis), TimeoutMillis millis) must be === PongMessage Await.result(dyingActor.?(Ping)(DilatedTimeout), DilatedTimeout) must be === PongMessage
inits.get must be(3) inits.get must be(3)

View file

@ -23,7 +23,8 @@ class SupervisorTreeSpec extends AkkaSpec with ImplicitSender with DefaultTimeou
EventFilter[ActorKilledException](occurrences = 1) intercept { EventFilter[ActorKilledException](occurrences = 1) intercept {
within(5 seconds) { within(5 seconds) {
val p = Props(new Actor { val p = Props(new Actor {
override val supervisorStrategy = OneForOneStrategy(List(classOf[Exception]), 3, 1000) override val supervisorStrategy = OneForOneStrategy(List(classOf[Exception]),
maxNrOfRetries = 3, withinTimeRange = 1 second)
def receive = { def receive = {
case p: Props sender ! context.actorOf(p) case p: Props sender ! context.actorOf(p)
} }

View file

@ -12,6 +12,7 @@ import akka.testkit.ImplicitSender
import akka.testkit.DefaultTimeout import akka.testkit.DefaultTimeout
import akka.dispatch.Await import akka.dispatch.Await
import akka.pattern.ask import akka.pattern.ask
import akka.util.duration._
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class Ticket669Spec extends AkkaSpec with BeforeAndAfterAll with ImplicitSender with DefaultTimeout { class Ticket669Spec extends AkkaSpec with BeforeAndAfterAll with ImplicitSender with DefaultTimeout {
@ -25,7 +26,7 @@ class Ticket669Spec extends AkkaSpec with BeforeAndAfterAll with ImplicitSender
"A supervised actor with lifecycle PERMANENT" should { "A supervised actor with lifecycle PERMANENT" should {
"be able to reply on failure during preRestart" in { "be able to reply on failure during preRestart" in {
filterEvents(EventFilter[Exception]("test", occurrences = 1)) { filterEvents(EventFilter[Exception]("test", occurrences = 1)) {
val supervisor = system.actorOf(Props(new Supervisor(AllForOneStrategy(List(classOf[Exception]), 5, 10000)))) val supervisor = system.actorOf(Props(new Supervisor(AllForOneStrategy(List(classOf[Exception]), 5, 10 seconds))))
val supervised = Await.result((supervisor ? Props[Supervised]).mapTo[ActorRef], timeout.duration) val supervised = Await.result((supervisor ? Props[Supervised]).mapTo[ActorRef], timeout.duration)
supervised.!("test")(testActor) supervised.!("test")(testActor)
@ -36,7 +37,8 @@ class Ticket669Spec extends AkkaSpec with BeforeAndAfterAll with ImplicitSender
"be able to reply on failure during postStop" in { "be able to reply on failure during postStop" in {
filterEvents(EventFilter[Exception]("test", occurrences = 1)) { filterEvents(EventFilter[Exception]("test", occurrences = 1)) {
val supervisor = system.actorOf(Props(new Supervisor(AllForOneStrategy(List(classOf[Exception]), Some(0), None)))) val supervisor = system.actorOf(Props(new Supervisor(AllForOneStrategy(List(classOf[Exception]),
maxNrOfRetries = 0))))
val supervised = Await.result((supervisor ? Props[Supervised]).mapTo[ActorRef], timeout.duration) val supervised = Await.result((supervisor ? Props[Supervised]).mapTo[ActorRef], timeout.duration)
supervised.!("test")(testActor) supervised.!("test")(testActor)

View file

@ -15,7 +15,8 @@ import akka.actor._
object LoggingReceiveSpec { object LoggingReceiveSpec {
class TestLogActor extends Actor { class TestLogActor extends Actor {
override val supervisorStrategy = OneForOneStrategy(List(classOf[Throwable]), 5, 5000) override val supervisorStrategy = OneForOneStrategy(List(classOf[Throwable]),
maxNrOfRetries = 5, withinTimeRange = 5 seconds)
def receive = { case _ } def receive = { case _ }
} }
} }

View file

@ -147,7 +147,7 @@ object Actor {
* case _: NullPointerException Restart * case _: NullPointerException Restart
* case _: IllegalArgumentException Stop * case _: IllegalArgumentException Stop
* case _: Exception Escalate * case _: Exception Escalate
* }: Decider, maxNrOfRetries = Some(10), withinTimeRange = Some(60000)) * }: Decider, maxNrOfRetries = 10, withinTimeRange = 1 minute))
* *
* def receive = { * def receive = {
* // directly calculated reply * // directly calculated reply

View file

@ -8,6 +8,7 @@ import scala.annotation.tailrec
import scala.collection.mutable.ArrayBuffer import scala.collection.mutable.ArrayBuffer
import scala.collection.JavaConversions._ import scala.collection.JavaConversions._
import java.lang.{ Iterable JIterable } import java.lang.{ Iterable JIterable }
import akka.util.Duration
case class ChildRestartStats(val child: ActorRef, var maxNrOfRetriesCount: Int = 0, var restartTimeWindowStartNanos: Long = 0L) { case class ChildRestartStats(val child: ActorRef, var maxNrOfRetriesCount: Int = 0, var restartTimeWindowStartNanos: Long = 0L) {
@ -95,6 +96,13 @@ object SupervisorStrategy {
*/ */
def escalate = Escalate def escalate = Escalate
/**
* When supervisorStrategy is not specified for an actor this
* is used by default. The child will be stopped when
* [[akka.ActorInitializationException]] or [[akka.ActorKilledException]]
* is thrown. It will be restarted for other `Exception` types.
* The error is escalated if it's a `Throwable`, i.e. `Error`.
*/
final val defaultStrategy: SupervisorStrategy = { final val defaultStrategy: SupervisorStrategy = {
def defaultDecider: Decider = { def defaultDecider: Decider = {
case _: ActorInitializationException Stop case _: ActorInitializationException Stop
@ -102,7 +110,7 @@ object SupervisorStrategy {
case _: Exception Restart case _: Exception Restart
case _ Escalate case _ Escalate
} }
OneForOneStrategy(defaultDecider, None, None) OneForOneStrategy(defaultDecider)
} }
type Decider = PartialFunction[Throwable, Action] type Decider = PartialFunction[Throwable, Action]
@ -120,14 +128,14 @@ object SupervisorStrategy {
* Backwards compatible Decider builder which just checks whether one of * Backwards compatible Decider builder which just checks whether one of
* the given Throwables matches the cause and restarts, otherwise escalates. * the given Throwables matches the cause and restarts, otherwise escalates.
*/ */
def makeDecider(trapExit: List[Class[_ <: Throwable]]): Decider = def makeDecider(trapExit: Seq[Class[_ <: Throwable]]): Decider =
{ case x if (trapExit exists (_ isInstance x)) Restart else Escalate } { case x if (trapExit exists (_ isInstance x)) Restart else Escalate }
/** /**
* Backwards compatible Decider builder which just checks whether one of * Backwards compatible Decider builder which just checks whether one of
* the given Throwables matches the cause and restarts, otherwise escalates. * the given Throwables matches the cause and restarts, otherwise escalates.
*/ */
def makeDecider(trapExit: JIterable[Class[_ <: Throwable]]): Decider = makeDecider(trapExit.toList) def makeDecider(trapExit: JIterable[Class[_ <: Throwable]]): Decider = makeDecider(trapExit.toSeq)
/** /**
* Decider builder for Iterables of cause-action pairs, e.g. a map obtained * Decider builder for Iterables of cause-action pairs, e.g. a map obtained
@ -156,6 +164,11 @@ object SupervisorStrategy {
} }
buf buf
} }
private[akka] def withinTimeRangeOption(withinTimeRange: Duration): Option[Duration] =
if (withinTimeRange.isFinite && withinTimeRange >= Duration.Zero) Some(withinTimeRange) else None
private[akka] def maxNrOfRetriesOption(maxNrOfRetries: Int): Option[Int] =
if (maxNrOfRetries < 0) None else Some(maxNrOfRetries)
} }
abstract class SupervisorStrategy { abstract class SupervisorStrategy {
@ -199,46 +212,41 @@ abstract class SupervisorStrategy {
} }
object AllForOneStrategy { object AllForOneStrategy {
def apply(trapExit: List[Class[_ <: Throwable]], maxNrOfRetries: Int, withinTimeRange: Int): AllForOneStrategy = def apply(trapExit: Seq[Class[_ <: Throwable]], maxNrOfRetries: Int, withinTimeRange: Duration): AllForOneStrategy =
new AllForOneStrategy(SupervisorStrategy.makeDecider(trapExit),
if (maxNrOfRetries < 0) None else Some(maxNrOfRetries), if (withinTimeRange < 0) None else Some(withinTimeRange))
def apply(trapExit: List[Class[_ <: Throwable]], maxNrOfRetries: Option[Int], withinTimeRange: Option[Int]): AllForOneStrategy =
new AllForOneStrategy(SupervisorStrategy.makeDecider(trapExit), maxNrOfRetries, withinTimeRange) new AllForOneStrategy(SupervisorStrategy.makeDecider(trapExit), maxNrOfRetries, withinTimeRange)
def apply(trapExit: List[Class[_ <: Throwable]], maxNrOfRetries: Option[Int]): AllForOneStrategy = def apply(trapExit: Seq[Class[_ <: Throwable]], maxNrOfRetries: Int): AllForOneStrategy =
new AllForOneStrategy(SupervisorStrategy.makeDecider(trapExit), maxNrOfRetries, None) apply(trapExit, maxNrOfRetries, Duration.Inf)
def apply(trapExit: Seq[Class[_ <: Throwable]], withinTimeRange: Duration): AllForOneStrategy =
apply(trapExit, -1, withinTimeRange)
} }
/** /**
* Restart all actors linked to the same supervisor when one fails, * Restart all actors linked to the same supervisor when one fails,
* trapExit = which Throwables should be intercepted * trapExit = which Throwables should be intercepted
* maxNrOfRetries = the number of times an actor is allowed to be restarted * maxNrOfRetries = the number of times an actor is allowed to be restarted
* withinTimeRange = millisecond time window for maxNrOfRetries, negative means no window * withinTimeRange = duration of the time window for maxNrOfRetries, Duration.Inf means no window
*/ */
case class AllForOneStrategy(decider: SupervisorStrategy.Decider, case class AllForOneStrategy(decider: SupervisorStrategy.Decider,
maxNrOfRetries: Option[Int] = None, maxNrOfRetries: Int = -1,
withinTimeRange: Option[Int] = None) extends SupervisorStrategy { withinTimeRange: Duration = Duration.Inf) extends SupervisorStrategy {
def this(decider: SupervisorStrategy.JDecider, maxNrOfRetries: Int, withinTimeRange: Int) = def this(decider: SupervisorStrategy.JDecider, maxNrOfRetries: Int, withinTimeRange: Duration) =
this(SupervisorStrategy.makeDecider(decider), this(SupervisorStrategy.makeDecider(decider), maxNrOfRetries, withinTimeRange)
if (maxNrOfRetries < 0) None else Some(maxNrOfRetries),
if (withinTimeRange < 0) None else Some(withinTimeRange))
def this(trapExit: JIterable[Class[_ <: Throwable]], maxNrOfRetries: Int, withinTimeRange: Int) = def this(trapExit: JIterable[Class[_ <: Throwable]], maxNrOfRetries: Int, withinTimeRange: Duration) =
this(SupervisorStrategy.makeDecider(trapExit), this(SupervisorStrategy.makeDecider(trapExit), maxNrOfRetries, withinTimeRange)
if (maxNrOfRetries < 0) None else Some(maxNrOfRetries),
if (withinTimeRange < 0) None else Some(withinTimeRange))
def this(trapExit: Array[Class[_ <: Throwable]], maxNrOfRetries: Int, withinTimeRange: Int) = def this(trapExit: Array[Class[_ <: Throwable]], maxNrOfRetries: Int, withinTimeRange: Duration) =
this(SupervisorStrategy.makeDecider(trapExit), this(SupervisorStrategy.makeDecider(trapExit), maxNrOfRetries, withinTimeRange)
if (maxNrOfRetries < 0) None else Some(maxNrOfRetries),
if (withinTimeRange < 0) None else Some(withinTimeRange))
/* /*
* this is a performance optimization to avoid re-allocating the pairs upon * this is a performance optimization to avoid re-allocating the pairs upon
* every call to requestRestartPermission, assuming that strategies are shared * every call to requestRestartPermission, assuming that strategies are shared
* across actors and thus this field does not take up much space * across actors and thus this field does not take up much space
*/ */
val retriesWindow = (maxNrOfRetries, withinTimeRange) private val retriesWindow = (
SupervisorStrategy.maxNrOfRetriesOption(maxNrOfRetries),
SupervisorStrategy.withinTimeRangeOption(withinTimeRange).map(_.toMillis.toInt))
def handleChildTerminated(context: ActorContext, child: ActorRef, children: Iterable[ActorRef]): Unit = { def handleChildTerminated(context: ActorContext, child: ActorRef, children: Iterable[ActorRef]): Unit = {
children foreach (context.stop(_)) children foreach (context.stop(_))
@ -256,46 +264,41 @@ case class AllForOneStrategy(decider: SupervisorStrategy.Decider,
} }
object OneForOneStrategy { object OneForOneStrategy {
def apply(trapExit: List[Class[_ <: Throwable]], maxNrOfRetries: Int, withinTimeRange: Int): OneForOneStrategy = def apply(trapExit: Seq[Class[_ <: Throwable]], maxNrOfRetries: Int, withinTimeRange: Duration): OneForOneStrategy =
new OneForOneStrategy(SupervisorStrategy.makeDecider(trapExit),
if (maxNrOfRetries < 0) None else Some(maxNrOfRetries), if (withinTimeRange < 0) None else Some(withinTimeRange))
def apply(trapExit: List[Class[_ <: Throwable]], maxNrOfRetries: Option[Int], withinTimeRange: Option[Int]): OneForOneStrategy =
new OneForOneStrategy(SupervisorStrategy.makeDecider(trapExit), maxNrOfRetries, withinTimeRange) new OneForOneStrategy(SupervisorStrategy.makeDecider(trapExit), maxNrOfRetries, withinTimeRange)
def apply(trapExit: List[Class[_ <: Throwable]], maxNrOfRetries: Option[Int]): OneForOneStrategy = def apply(trapExit: Seq[Class[_ <: Throwable]], maxNrOfRetries: Int): OneForOneStrategy =
new OneForOneStrategy(SupervisorStrategy.makeDecider(trapExit), maxNrOfRetries, None) apply(trapExit, maxNrOfRetries, Duration.Inf)
def apply(trapExit: Seq[Class[_ <: Throwable]], withinTimeRange: Duration): OneForOneStrategy =
apply(trapExit, -1, withinTimeRange)
} }
/** /**
* Restart an actor when it fails * Restart an actor when it fails
* trapExit = which Throwables should be intercepted * trapExit = which Throwables should be intercepted
* maxNrOfRetries = the number of times an actor is allowed to be restarted * maxNrOfRetries = the number of times an actor is allowed to be restarted
* withinTimeRange = millisecond time window for maxNrOfRetries, negative means no window * withinTimeRange = duration of the time window for maxNrOfRetries, Duration.Inf means no window
*/ */
case class OneForOneStrategy(decider: SupervisorStrategy.Decider, case class OneForOneStrategy(decider: SupervisorStrategy.Decider,
maxNrOfRetries: Option[Int] = None, maxNrOfRetries: Int = -1,
withinTimeRange: Option[Int] = None) extends SupervisorStrategy { withinTimeRange: Duration = Duration.Inf) extends SupervisorStrategy {
def this(decider: SupervisorStrategy.JDecider, maxNrOfRetries: Int, withinTimeRange: Int) = def this(decider: SupervisorStrategy.JDecider, maxNrOfRetries: Int, withinTimeRange: Duration) =
this(SupervisorStrategy.makeDecider(decider), this(SupervisorStrategy.makeDecider(decider), maxNrOfRetries, withinTimeRange)
if (maxNrOfRetries < 0) None else Some(maxNrOfRetries),
if (withinTimeRange < 0) None else Some(withinTimeRange))
def this(trapExit: JIterable[Class[_ <: Throwable]], maxNrOfRetries: Int, withinTimeRange: Int) = def this(trapExit: JIterable[Class[_ <: Throwable]], maxNrOfRetries: Int, withinTimeRange: Duration) =
this(SupervisorStrategy.makeDecider(trapExit), this(SupervisorStrategy.makeDecider(trapExit), maxNrOfRetries, withinTimeRange)
if (maxNrOfRetries < 0) None else Some(maxNrOfRetries),
if (withinTimeRange < 0) None else Some(withinTimeRange))
def this(trapExit: Array[Class[_ <: Throwable]], maxNrOfRetries: Int, withinTimeRange: Int) = def this(trapExit: Array[Class[_ <: Throwable]], maxNrOfRetries: Int, withinTimeRange: Duration) =
this(SupervisorStrategy.makeDecider(trapExit), this(SupervisorStrategy.makeDecider(trapExit), maxNrOfRetries, withinTimeRange)
if (maxNrOfRetries < 0) None else Some(maxNrOfRetries),
if (withinTimeRange < 0) None else Some(withinTimeRange))
/* /*
* this is a performance optimization to avoid re-allocating the pairs upon * this is a performance optimization to avoid re-allocating the pairs upon
* every call to requestRestartPermission, assuming that strategies are shared * every call to requestRestartPermission, assuming that strategies are shared
* across actors and thus this field does not take up much space * across actors and thus this field does not take up much space
*/ */
val retriesWindow = (maxNrOfRetries, withinTimeRange) private val retriesWindow = (
SupervisorStrategy.maxNrOfRetriesOption(maxNrOfRetries),
SupervisorStrategy.withinTimeRangeOption(withinTimeRange).map(_.toMillis.toInt))
def handleChildTerminated(context: ActorContext, child: ActorRef, children: Iterable[ActorRef]): Unit = {} def handleChildTerminated(context: ActorContext, child: ActorRef, children: Iterable[ActorRef]): Unit = {}

View file

@ -50,7 +50,7 @@ import akka.dispatch.{ MessageDispatcher, Promise }
* return escalate(); * return escalate();
* } * }
* } * }
* }, 10, 60000); * }, 10, Duration.parse("1 minute");
* *
* @Override * @Override
* public SupervisorStrategy supervisorStrategy() { * public SupervisorStrategy supervisorStrategy() {

View file

@ -52,7 +52,7 @@ public class FaultHandlingTestBase {
return escalate(); return escalate();
} }
} }
}, 10, 60000); }, 10, Duration.parse("1 minute"));
@Override @Override
public SupervisorStrategy supervisorStrategy() { public SupervisorStrategy supervisorStrategy() {
@ -89,7 +89,7 @@ public class FaultHandlingTestBase {
return escalate(); return escalate();
} }
} }
}, 10, 60000); }, 10, Duration.parse("1 minute"));
@Override @Override
public SupervisorStrategy supervisorStrategy() { public SupervisorStrategy supervisorStrategy() {

View file

@ -484,7 +484,7 @@ v2.0::
case _: NullPointerException ⇒ Restart case _: NullPointerException ⇒ Restart
case _: IllegalArgumentException ⇒ Stop case _: IllegalArgumentException ⇒ Stop
case _: Exception ⇒ Escalate case _: Exception ⇒ Escalate
}: Decider, maxNrOfRetries = Some(10), withinTimeRange = Some(60000)) }: Decider, maxNrOfRetries = 10, withinTimeRange = 1 minute)
def receive = { def receive = {
case x => case x =>

View file

@ -20,13 +20,14 @@ object FaultHandlingDocSpec {
//#strategy //#strategy
import akka.actor.OneForOneStrategy import akka.actor.OneForOneStrategy
import akka.actor.SupervisorStrategy._ import akka.actor.SupervisorStrategy._
import akka.util.duration._
override val supervisorStrategy = OneForOneStrategy({ override val supervisorStrategy = OneForOneStrategy({
case _: ArithmeticException Resume case _: ArithmeticException Resume
case _: NullPointerException Restart case _: NullPointerException Restart
case _: IllegalArgumentException Stop case _: IllegalArgumentException Stop
case _: Exception Escalate case _: Exception Escalate
}: Decider, maxNrOfRetries = Some(10), withinTimeRange = Some(60000)) }: Decider, maxNrOfRetries = 10, withinTimeRange = 1 minute)
//#strategy //#strategy
def receive = { def receive = {
@ -40,13 +41,14 @@ object FaultHandlingDocSpec {
//#strategy2 //#strategy2
import akka.actor.OneForOneStrategy import akka.actor.OneForOneStrategy
import akka.actor.SupervisorStrategy._ import akka.actor.SupervisorStrategy._
import akka.util.duration._
override val supervisorStrategy = OneForOneStrategy({ override val supervisorStrategy = OneForOneStrategy({
case _: ArithmeticException Resume case _: ArithmeticException Resume
case _: NullPointerException Restart case _: NullPointerException Restart
case _: IllegalArgumentException Stop case _: IllegalArgumentException Stop
case _: Exception Escalate case _: Exception Escalate
}: Decider, maxNrOfRetries = Some(10), withinTimeRange = Some(60000)) }: Decider, maxNrOfRetries = 10, withinTimeRange = 1 minute)
//#strategy2 //#strategy2
def receive = { def receive = {

View file

@ -182,7 +182,8 @@ class TestActorRefSpec extends AkkaSpec("disp1.type=Dispatcher") with BeforeAndA
override def postRestart(reason: Throwable) { counter -= 1 } override def postRestart(reason: Throwable) { counter -= 1 }
}), self, "child") }), self, "child")
override def supervisorStrategy = OneForOneStrategy(List(classOf[ActorKilledException]), 5, 1000) override def supervisorStrategy = OneForOneStrategy(List(classOf[ActorKilledException]),
maxNrOfRetries = 5, withinTimeRange = 1 second)
def receiveT = { case "sendKill" ref ! Kill } def receiveT = { case "sendKill" ref ! Kill }
})) }))