Merge pull request #250 from jboner/wip-1714-withinTimeRange-patriknw
Improved API of OneForOneStrategy and AllForOneStrategy. See #1714
This commit is contained in:
commit
dcdbca1167
23 changed files with 192 additions and 162 deletions
|
|
@ -81,7 +81,8 @@ class ActorFireForgetRequestReplySpec extends AkkaSpec with BeforeAndAfterEach w
|
||||||
|
|
||||||
"should shutdown crashed temporary actor" in {
|
"should shutdown crashed temporary actor" in {
|
||||||
filterEvents(EventFilter[Exception]("Expected exception")) {
|
filterEvents(EventFilter[Exception]("Expected exception")) {
|
||||||
val supervisor = system.actorOf(Props(new Supervisor(OneForOneStrategy(List(classOf[Exception]), Some(0)))))
|
val supervisor = system.actorOf(Props(new Supervisor(
|
||||||
|
OneForOneStrategy(maxNrOfRetries = 0)(List(classOf[Exception])))))
|
||||||
val actor = Await.result((supervisor ? Props[CrashingActor]).mapTo[ActorRef], timeout.duration)
|
val actor = Await.result((supervisor ? Props[CrashingActor]).mapTo[ActorRef], timeout.duration)
|
||||||
actor.isTerminated must be(false)
|
actor.isTerminated must be(false)
|
||||||
actor ! "Die"
|
actor ! "Die"
|
||||||
|
|
|
||||||
|
|
@ -36,7 +36,8 @@ class ActorLifeCycleSpec extends AkkaSpec with BeforeAndAfterEach with ImplicitS
|
||||||
"invoke preRestart, preStart, postRestart when using OneForOneStrategy" in {
|
"invoke preRestart, preStart, postRestart when using OneForOneStrategy" in {
|
||||||
filterException[ActorKilledException] {
|
filterException[ActorKilledException] {
|
||||||
val id = newUuid().toString
|
val id = newUuid().toString
|
||||||
val supervisor = system.actorOf(Props(new Supervisor(OneForOneStrategy(List(classOf[Exception]), Some(3)))))
|
val supervisor = system.actorOf(Props(new Supervisor(
|
||||||
|
OneForOneStrategy(maxNrOfRetries = 3)(List(classOf[Exception])))))
|
||||||
val gen = new AtomicInteger(0)
|
val gen = new AtomicInteger(0)
|
||||||
val restarterProps = Props(new LifeCycleTestActor(testActor, id, gen) {
|
val restarterProps = Props(new LifeCycleTestActor(testActor, id, gen) {
|
||||||
override def preRestart(reason: Throwable, message: Option[Any]) { report("preRestart") }
|
override def preRestart(reason: Throwable, message: Option[Any]) { report("preRestart") }
|
||||||
|
|
@ -70,7 +71,8 @@ class ActorLifeCycleSpec extends AkkaSpec with BeforeAndAfterEach with ImplicitS
|
||||||
"default for preRestart and postRestart is to call postStop and preStart respectively" in {
|
"default for preRestart and postRestart is to call postStop and preStart respectively" in {
|
||||||
filterException[ActorKilledException] {
|
filterException[ActorKilledException] {
|
||||||
val id = newUuid().toString
|
val id = newUuid().toString
|
||||||
val supervisor = system.actorOf(Props(new Supervisor(OneForOneStrategy(List(classOf[Exception]), Some(3)))))
|
val supervisor = system.actorOf(Props(new Supervisor(
|
||||||
|
OneForOneStrategy(maxNrOfRetries = 3)(List(classOf[Exception])))))
|
||||||
val gen = new AtomicInteger(0)
|
val gen = new AtomicInteger(0)
|
||||||
val restarterProps = Props(new LifeCycleTestActor(testActor, id, gen))
|
val restarterProps = Props(new LifeCycleTestActor(testActor, id, gen))
|
||||||
val restarter = Await.result((supervisor ? restarterProps).mapTo[ActorRef], timeout.duration)
|
val restarter = Await.result((supervisor ? restarterProps).mapTo[ActorRef], timeout.duration)
|
||||||
|
|
@ -100,7 +102,8 @@ class ActorLifeCycleSpec extends AkkaSpec with BeforeAndAfterEach with ImplicitS
|
||||||
|
|
||||||
"not invoke preRestart and postRestart when never restarted using OneForOneStrategy" in {
|
"not invoke preRestart and postRestart when never restarted using OneForOneStrategy" in {
|
||||||
val id = newUuid().toString
|
val id = newUuid().toString
|
||||||
val supervisor = system.actorOf(Props(new Supervisor(OneForOneStrategy(List(classOf[Exception]), Some(3)))))
|
val supervisor = system.actorOf(Props(new Supervisor(
|
||||||
|
OneForOneStrategy(maxNrOfRetries = 3)(List(classOf[Exception])))))
|
||||||
val gen = new AtomicInteger(0)
|
val gen = new AtomicInteger(0)
|
||||||
val props = Props(new LifeCycleTestActor(testActor, id, gen))
|
val props = Props(new LifeCycleTestActor(testActor, id, gen))
|
||||||
val a = Await.result((supervisor ? props).mapTo[ActorRef], timeout.duration)
|
val a = Await.result((supervisor ? props).mapTo[ActorRef], timeout.duration)
|
||||||
|
|
|
||||||
|
|
@ -376,7 +376,8 @@ class ActorRefSpec extends AkkaSpec with DefaultTimeout {
|
||||||
|
|
||||||
val boss = system.actorOf(Props(new Actor {
|
val boss = system.actorOf(Props(new Actor {
|
||||||
|
|
||||||
override val supervisorStrategy = OneForOneStrategy(List(classOf[Throwable]), 2, 1000)
|
override val supervisorStrategy =
|
||||||
|
OneForOneStrategy(maxNrOfRetries = 2, withinTimeRange = 1 second)(List(classOf[Throwable]))
|
||||||
|
|
||||||
val ref = context.actorOf(
|
val ref = context.actorOf(
|
||||||
Props(new Actor {
|
Props(new Actor {
|
||||||
|
|
|
||||||
|
|
@ -95,7 +95,8 @@ trait DeathWatchSpec { this: AkkaSpec with ImplicitSender with DefaultTimeout
|
||||||
|
|
||||||
"notify with a Terminated message once when an Actor is stopped but not when restarted" in {
|
"notify with a Terminated message once when an Actor is stopped but not when restarted" in {
|
||||||
filterException[ActorKilledException] {
|
filterException[ActorKilledException] {
|
||||||
val supervisor = system.actorOf(Props(new Supervisor(OneForOneStrategy(List(classOf[Exception]), Some(2)))))
|
val supervisor = system.actorOf(Props(new Supervisor(
|
||||||
|
OneForOneStrategy(maxNrOfRetries = 2)(List(classOf[Exception])))))
|
||||||
val terminalProps = Props(context ⇒ { case x ⇒ context.sender ! x })
|
val terminalProps = Props(context ⇒ { case x ⇒ context.sender ! x })
|
||||||
val terminal = Await.result((supervisor ? terminalProps).mapTo[ActorRef], timeout.duration)
|
val terminal = Await.result((supervisor ? terminalProps).mapTo[ActorRef], timeout.duration)
|
||||||
|
|
||||||
|
|
@ -116,7 +117,7 @@ trait DeathWatchSpec { this: AkkaSpec with ImplicitSender with DefaultTimeout
|
||||||
"fail a monitor which does not handle Terminated()" in {
|
"fail a monitor which does not handle Terminated()" in {
|
||||||
filterEvents(EventFilter[ActorKilledException](), EventFilter[DeathPactException]()) {
|
filterEvents(EventFilter[ActorKilledException](), EventFilter[DeathPactException]()) {
|
||||||
case class FF(fail: Failed)
|
case class FF(fail: Failed)
|
||||||
val strategy = new OneForOneStrategy(SupervisorStrategy.makeDecider(List(classOf[Exception])), Some(0)) {
|
val strategy = new OneForOneStrategy(maxNrOfRetries = 0)(SupervisorStrategy.makeDecider(List(classOf[Exception]))) {
|
||||||
override def handleFailure(context: ActorContext, child: ActorRef, cause: Throwable, stats: ChildRestartStats, children: Iterable[ChildRestartStats]) = {
|
override def handleFailure(context: ActorContext, child: ActorRef, cause: Throwable, stats: ChildRestartStats, children: Iterable[ChildRestartStats]) = {
|
||||||
testActor.tell(FF(Failed(cause)), child)
|
testActor.tell(FF(Failed(cause)), child)
|
||||||
super.handleFailure(context, child, cause, stats, children)
|
super.handleFailure(context, child, cause, stats, children)
|
||||||
|
|
|
||||||
|
|
@ -5,8 +5,8 @@ package akka.actor
|
||||||
|
|
||||||
import akka.testkit._
|
import akka.testkit._
|
||||||
import akka.util.duration._
|
import akka.util.duration._
|
||||||
|
|
||||||
import FSM._
|
import FSM._
|
||||||
|
import akka.util.Duration
|
||||||
|
|
||||||
object FSMTransitionSpec {
|
object FSMTransitionSpec {
|
||||||
|
|
||||||
|
|
@ -72,7 +72,7 @@ class FSMTransitionSpec extends AkkaSpec with ImplicitSender {
|
||||||
val fsm = system.actorOf(Props(new MyFSM(testActor)))
|
val fsm = system.actorOf(Props(new MyFSM(testActor)))
|
||||||
val sup = system.actorOf(Props(new Actor {
|
val sup = system.actorOf(Props(new Actor {
|
||||||
context.watch(fsm)
|
context.watch(fsm)
|
||||||
override val supervisorStrategy = OneForOneStrategy(List(classOf[Throwable]), None, None)
|
override val supervisorStrategy = OneForOneStrategy(withinTimeRange = Duration.Inf)(List(classOf[Throwable]))
|
||||||
def receive = { case _ ⇒ }
|
def receive = { case _ ⇒ }
|
||||||
}))
|
}))
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -14,6 +14,7 @@ import akka.testkit.AkkaSpec
|
||||||
import akka.testkit.DefaultTimeout
|
import akka.testkit.DefaultTimeout
|
||||||
import akka.testkit.TestLatch
|
import akka.testkit.TestLatch
|
||||||
import akka.util.duration._
|
import akka.util.duration._
|
||||||
|
import akka.util.Duration
|
||||||
import akka.pattern.ask
|
import akka.pattern.ask
|
||||||
|
|
||||||
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
|
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
|
||||||
|
|
@ -29,7 +30,8 @@ class RestartStrategySpec extends AkkaSpec with DefaultTimeout {
|
||||||
"A RestartStrategy" must {
|
"A RestartStrategy" must {
|
||||||
|
|
||||||
"ensure that slave stays dead after max restarts within time range" in {
|
"ensure that slave stays dead after max restarts within time range" in {
|
||||||
val boss = system.actorOf(Props(new Supervisor(OneForOneStrategy(List(classOf[Throwable]), 2, 1000))))
|
val boss = system.actorOf(Props(new Supervisor(
|
||||||
|
OneForOneStrategy(maxNrOfRetries = 2, withinTimeRange = 1 second)(List(classOf[Throwable])))))
|
||||||
|
|
||||||
val restartLatch = new TestLatch
|
val restartLatch = new TestLatch
|
||||||
val secondRestartLatch = new TestLatch
|
val secondRestartLatch = new TestLatch
|
||||||
|
|
@ -75,7 +77,7 @@ class RestartStrategySpec extends AkkaSpec with DefaultTimeout {
|
||||||
}
|
}
|
||||||
|
|
||||||
"ensure that slave is immortal without max restarts and time range" in {
|
"ensure that slave is immortal without max restarts and time range" in {
|
||||||
val boss = system.actorOf(Props(new Supervisor(OneForOneStrategy(List(classOf[Throwable]), None, None))))
|
val boss = system.actorOf(Props(new Supervisor(OneForOneStrategy()(List(classOf[Throwable])))))
|
||||||
|
|
||||||
val countDownLatch = new TestLatch(100)
|
val countDownLatch = new TestLatch(100)
|
||||||
|
|
||||||
|
|
@ -97,7 +99,8 @@ class RestartStrategySpec extends AkkaSpec with DefaultTimeout {
|
||||||
}
|
}
|
||||||
|
|
||||||
"ensure that slave restarts after number of crashes not within time range" in {
|
"ensure that slave restarts after number of crashes not within time range" in {
|
||||||
val boss = system.actorOf(Props(new Supervisor(OneForOneStrategy(List(classOf[Throwable]), 2, 500))))
|
val boss = system.actorOf(Props(new Supervisor(
|
||||||
|
OneForOneStrategy(maxNrOfRetries = 2, withinTimeRange = 500 millis)(List(classOf[Throwable])))))
|
||||||
|
|
||||||
val restartLatch = new TestLatch
|
val restartLatch = new TestLatch
|
||||||
val secondRestartLatch = new TestLatch
|
val secondRestartLatch = new TestLatch
|
||||||
|
|
@ -154,7 +157,7 @@ class RestartStrategySpec extends AkkaSpec with DefaultTimeout {
|
||||||
}
|
}
|
||||||
|
|
||||||
"ensure that slave is not restarted after max retries" in {
|
"ensure that slave is not restarted after max retries" in {
|
||||||
val boss = system.actorOf(Props(new Supervisor(OneForOneStrategy(List(classOf[Throwable]), Some(2), None))))
|
val boss = system.actorOf(Props(new Supervisor(OneForOneStrategy(maxNrOfRetries = 2)(List(classOf[Throwable])))))
|
||||||
|
|
||||||
val restartLatch = new TestLatch
|
val restartLatch = new TestLatch
|
||||||
val secondRestartLatch = new TestLatch
|
val secondRestartLatch = new TestLatch
|
||||||
|
|
@ -209,7 +212,7 @@ class RestartStrategySpec extends AkkaSpec with DefaultTimeout {
|
||||||
val countDownLatch = new TestLatch(2)
|
val countDownLatch = new TestLatch(2)
|
||||||
|
|
||||||
val boss = system.actorOf(Props(new Actor {
|
val boss = system.actorOf(Props(new Actor {
|
||||||
override val supervisorStrategy = OneForOneStrategy(List(classOf[Throwable]), None, Some(1000))
|
override val supervisorStrategy = OneForOneStrategy(withinTimeRange = 1 second)(List(classOf[Throwable]))
|
||||||
def receive = {
|
def receive = {
|
||||||
case p: Props ⇒ sender ! context.watch(context.actorOf(p))
|
case p: Props ⇒ sender ! context.watch(context.actorOf(p))
|
||||||
case t: Terminated ⇒ maxNoOfRestartsLatch.open()
|
case t: Terminated ⇒ maxNoOfRestartsLatch.open()
|
||||||
|
|
|
||||||
|
|
@ -134,7 +134,7 @@ class SchedulerSpec extends AkkaSpec with BeforeAndAfterEach with DefaultTimeout
|
||||||
val restartLatch = new TestLatch
|
val restartLatch = new TestLatch
|
||||||
val pingLatch = new TestLatch(6)
|
val pingLatch = new TestLatch(6)
|
||||||
|
|
||||||
val supervisor = system.actorOf(Props(new Supervisor(AllForOneStrategy(List(classOf[Exception]), 3, 1000))))
|
val supervisor = system.actorOf(Props(new Supervisor(AllForOneStrategy(3, 1 second)(List(classOf[Exception])))))
|
||||||
val props = Props(new Actor {
|
val props = Props(new Actor {
|
||||||
def receive = {
|
def receive = {
|
||||||
case Ping ⇒ pingLatch.countDown()
|
case Ping ⇒ pingLatch.countDown()
|
||||||
|
|
@ -165,8 +165,8 @@ class SchedulerSpec extends AkkaSpec with BeforeAndAfterEach with DefaultTimeout
|
||||||
def receive = {
|
def receive = {
|
||||||
case Msg(ts) ⇒
|
case Msg(ts) ⇒
|
||||||
val now = System.nanoTime
|
val now = System.nanoTime
|
||||||
// Make sure that no message has been dispatched before the scheduled time (10ms = 10000000ns) has occurred
|
// Make sure that no message has been dispatched before the scheduled time (10ms) has occurred
|
||||||
if (now - ts < 10000000) throw new RuntimeException("Interval is too small: " + (now - ts))
|
if (now - ts < 10.millis.toNanos) throw new RuntimeException("Interval is too small: " + (now - ts))
|
||||||
ticks.countDown()
|
ticks.countDown()
|
||||||
}
|
}
|
||||||
}))
|
}))
|
||||||
|
|
|
||||||
|
|
@ -5,10 +5,11 @@
|
||||||
package akka.actor
|
package akka.actor
|
||||||
|
|
||||||
import akka.testkit._
|
import akka.testkit._
|
||||||
|
|
||||||
import java.util.concurrent.{ TimeUnit, CountDownLatch }
|
import java.util.concurrent.{ TimeUnit, CountDownLatch }
|
||||||
import akka.dispatch.Await
|
import akka.dispatch.Await
|
||||||
import akka.pattern.ask
|
import akka.pattern.ask
|
||||||
|
import akka.util.Duration
|
||||||
|
import akka.util.duration._
|
||||||
|
|
||||||
object SupervisorHierarchySpec {
|
object SupervisorHierarchySpec {
|
||||||
class FireWorkerException(msg: String) extends Exception(msg)
|
class FireWorkerException(msg: String) extends Exception(msg)
|
||||||
|
|
@ -39,9 +40,9 @@ class SupervisorHierarchySpec extends AkkaSpec with DefaultTimeout {
|
||||||
"restart manager and workers in AllForOne" in {
|
"restart manager and workers in AllForOne" in {
|
||||||
val countDown = new CountDownLatch(4)
|
val countDown = new CountDownLatch(4)
|
||||||
|
|
||||||
val boss = system.actorOf(Props(new Supervisor(OneForOneStrategy(List(classOf[Exception]), None, None))))
|
val boss = system.actorOf(Props(new Supervisor(OneForOneStrategy()(List(classOf[Exception])))))
|
||||||
|
|
||||||
val managerProps = Props(new CountDownActor(countDown, AllForOneStrategy(List(), None, None)))
|
val managerProps = Props(new CountDownActor(countDown, AllForOneStrategy()(List())))
|
||||||
val manager = Await.result((boss ? managerProps).mapTo[ActorRef], timeout.duration)
|
val manager = Await.result((boss ? managerProps).mapTo[ActorRef], timeout.duration)
|
||||||
|
|
||||||
val workerProps = Props(new CountDownActor(countDown, SupervisorStrategy.defaultStrategy))
|
val workerProps = Props(new CountDownActor(countDown, SupervisorStrategy.defaultStrategy))
|
||||||
|
|
@ -61,7 +62,8 @@ class SupervisorHierarchySpec extends AkkaSpec with DefaultTimeout {
|
||||||
val countDownMessages = new CountDownLatch(1)
|
val countDownMessages = new CountDownLatch(1)
|
||||||
val countDownMax = new CountDownLatch(1)
|
val countDownMax = new CountDownLatch(1)
|
||||||
val boss = system.actorOf(Props(new Actor {
|
val boss = system.actorOf(Props(new Actor {
|
||||||
override val supervisorStrategy = OneForOneStrategy(List(classOf[Throwable]), 1, 5000)
|
override val supervisorStrategy =
|
||||||
|
OneForOneStrategy(maxNrOfRetries = 1, withinTimeRange = 5 seconds)(List(classOf[Throwable]))
|
||||||
|
|
||||||
val crasher = context.watch(context.actorOf(Props(new CountDownActor(countDownMessages, SupervisorStrategy.defaultStrategy))))
|
val crasher = context.watch(context.actorOf(Props(new CountDownActor(countDownMessages, SupervisorStrategy.defaultStrategy))))
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -9,6 +9,7 @@ import java.util.concurrent.{ TimeUnit, CountDownLatch }
|
||||||
import akka.testkit.AkkaSpec
|
import akka.testkit.AkkaSpec
|
||||||
import akka.testkit.DefaultTimeout
|
import akka.testkit.DefaultTimeout
|
||||||
import akka.pattern.ask
|
import akka.pattern.ask
|
||||||
|
import akka.util.duration._
|
||||||
|
|
||||||
object SupervisorMiscSpec {
|
object SupervisorMiscSpec {
|
||||||
val config = """
|
val config = """
|
||||||
|
|
@ -29,7 +30,8 @@ class SupervisorMiscSpec extends AkkaSpec(SupervisorMiscSpec.config) with Defaul
|
||||||
filterEvents(EventFilter[Exception]("Kill")) {
|
filterEvents(EventFilter[Exception]("Kill")) {
|
||||||
val countDownLatch = new CountDownLatch(4)
|
val countDownLatch = new CountDownLatch(4)
|
||||||
|
|
||||||
val supervisor = system.actorOf(Props(new Supervisor(OneForOneStrategy(List(classOf[Exception]), 3, 5000))))
|
val supervisor = system.actorOf(Props(new Supervisor(
|
||||||
|
OneForOneStrategy(maxNrOfRetries = 3, withinTimeRange = 5 seconds)(List(classOf[Exception])))))
|
||||||
|
|
||||||
val workerProps = Props(new Actor {
|
val workerProps = Props(new Actor {
|
||||||
override def postRestart(cause: Throwable) { countDownLatch.countDown() }
|
override def postRestart(cause: Throwable) { countDownLatch.countDown() }
|
||||||
|
|
|
||||||
|
|
@ -14,7 +14,7 @@ import akka.dispatch.Await
|
||||||
import akka.pattern.ask
|
import akka.pattern.ask
|
||||||
|
|
||||||
object SupervisorSpec {
|
object SupervisorSpec {
|
||||||
val Timeout = 5 seconds
|
val Timeout = 5.seconds
|
||||||
|
|
||||||
case object DieReply
|
case object DieReply
|
||||||
|
|
||||||
|
|
@ -54,7 +54,7 @@ object SupervisorSpec {
|
||||||
|
|
||||||
var s: ActorRef = _
|
var s: ActorRef = _
|
||||||
|
|
||||||
override val supervisorStrategy = OneForOneStrategy(List(classOf[Exception]), Some(0))
|
override val supervisorStrategy = OneForOneStrategy(maxNrOfRetries = 0)(List(classOf[Exception]))
|
||||||
|
|
||||||
def receive = {
|
def receive = {
|
||||||
case Die ⇒ temp forward Die
|
case Die ⇒ temp forward Die
|
||||||
|
|
@ -69,7 +69,7 @@ class SupervisorSpec extends AkkaSpec with BeforeAndAfterEach with ImplicitSende
|
||||||
|
|
||||||
import SupervisorSpec._
|
import SupervisorSpec._
|
||||||
|
|
||||||
val TimeoutMillis = Timeout.dilated.toMillis.toInt
|
val DilatedTimeout = Timeout.dilated
|
||||||
|
|
||||||
// =====================================================
|
// =====================================================
|
||||||
// Creating actors and supervisors
|
// Creating actors and supervisors
|
||||||
|
|
@ -78,45 +78,51 @@ class SupervisorSpec extends AkkaSpec with BeforeAndAfterEach with ImplicitSende
|
||||||
private def child(supervisor: ActorRef, props: Props): ActorRef = Await.result((supervisor ? props).mapTo[ActorRef], timeout.duration)
|
private def child(supervisor: ActorRef, props: Props): ActorRef = Await.result((supervisor ? props).mapTo[ActorRef], timeout.duration)
|
||||||
|
|
||||||
def temporaryActorAllForOne = {
|
def temporaryActorAllForOne = {
|
||||||
val supervisor = system.actorOf(Props(new Supervisor(AllForOneStrategy(List(classOf[Exception]), Some(0)))))
|
val supervisor = system.actorOf(Props(new Supervisor(AllForOneStrategy(maxNrOfRetries = 0)(List(classOf[Exception])))))
|
||||||
val temporaryActor = child(supervisor, Props(new PingPongActor(testActor)))
|
val temporaryActor = child(supervisor, Props(new PingPongActor(testActor)))
|
||||||
|
|
||||||
(temporaryActor, supervisor)
|
(temporaryActor, supervisor)
|
||||||
}
|
}
|
||||||
|
|
||||||
def singleActorAllForOne = {
|
def singleActorAllForOne = {
|
||||||
val supervisor = system.actorOf(Props(new Supervisor(AllForOneStrategy(List(classOf[Exception]), 3, TimeoutMillis))))
|
val supervisor = system.actorOf(Props(new Supervisor(
|
||||||
|
AllForOneStrategy(maxNrOfRetries = 3, withinTimeRange = DilatedTimeout)(List(classOf[Exception])))))
|
||||||
val pingpong = child(supervisor, Props(new PingPongActor(testActor)))
|
val pingpong = child(supervisor, Props(new PingPongActor(testActor)))
|
||||||
|
|
||||||
(pingpong, supervisor)
|
(pingpong, supervisor)
|
||||||
}
|
}
|
||||||
|
|
||||||
def singleActorOneForOne = {
|
def singleActorOneForOne = {
|
||||||
val supervisor = system.actorOf(Props(new Supervisor(OneForOneStrategy(List(classOf[Exception]), 3, TimeoutMillis))))
|
val supervisor = system.actorOf(Props(new Supervisor(
|
||||||
|
OneForOneStrategy(maxNrOfRetries = 3, withinTimeRange = DilatedTimeout)(List(classOf[Exception])))))
|
||||||
val pingpong = child(supervisor, Props(new PingPongActor(testActor)))
|
val pingpong = child(supervisor, Props(new PingPongActor(testActor)))
|
||||||
|
|
||||||
(pingpong, supervisor)
|
(pingpong, supervisor)
|
||||||
}
|
}
|
||||||
|
|
||||||
def multipleActorsAllForOne = {
|
def multipleActorsAllForOne = {
|
||||||
val supervisor = system.actorOf(Props(new Supervisor(AllForOneStrategy(List(classOf[Exception]), 3, TimeoutMillis))))
|
val supervisor = system.actorOf(Props(new Supervisor(
|
||||||
|
AllForOneStrategy(maxNrOfRetries = 3, withinTimeRange = DilatedTimeout)(List(classOf[Exception])))))
|
||||||
val pingpong1, pingpong2, pingpong3 = child(supervisor, Props(new PingPongActor(testActor)))
|
val pingpong1, pingpong2, pingpong3 = child(supervisor, Props(new PingPongActor(testActor)))
|
||||||
|
|
||||||
(pingpong1, pingpong2, pingpong3, supervisor)
|
(pingpong1, pingpong2, pingpong3, supervisor)
|
||||||
}
|
}
|
||||||
|
|
||||||
def multipleActorsOneForOne = {
|
def multipleActorsOneForOne = {
|
||||||
val supervisor = system.actorOf(Props(new Supervisor(OneForOneStrategy(List(classOf[Exception]), 3, TimeoutMillis))))
|
val supervisor = system.actorOf(Props(new Supervisor(
|
||||||
|
OneForOneStrategy(maxNrOfRetries = 3, withinTimeRange = DilatedTimeout)(List(classOf[Exception])))))
|
||||||
val pingpong1, pingpong2, pingpong3 = child(supervisor, Props(new PingPongActor(testActor)))
|
val pingpong1, pingpong2, pingpong3 = child(supervisor, Props(new PingPongActor(testActor)))
|
||||||
|
|
||||||
(pingpong1, pingpong2, pingpong3, supervisor)
|
(pingpong1, pingpong2, pingpong3, supervisor)
|
||||||
}
|
}
|
||||||
|
|
||||||
def nestedSupervisorsAllForOne = {
|
def nestedSupervisorsAllForOne = {
|
||||||
val topSupervisor = system.actorOf(Props(new Supervisor(AllForOneStrategy(List(classOf[Exception]), 3, TimeoutMillis))))
|
val topSupervisor = system.actorOf(Props(new Supervisor(
|
||||||
|
AllForOneStrategy(maxNrOfRetries = 3, withinTimeRange = DilatedTimeout)(List(classOf[Exception])))))
|
||||||
val pingpong1 = child(topSupervisor, Props(new PingPongActor(testActor)))
|
val pingpong1 = child(topSupervisor, Props(new PingPongActor(testActor)))
|
||||||
|
|
||||||
val middleSupervisor = child(topSupervisor, Props(new Supervisor(AllForOneStrategy(Nil, 3, TimeoutMillis))))
|
val middleSupervisor = child(topSupervisor, Props(new Supervisor(
|
||||||
|
AllForOneStrategy(maxNrOfRetries = 3, withinTimeRange = DilatedTimeout)(Nil))))
|
||||||
val pingpong2, pingpong3 = child(middleSupervisor, Props(new PingPongActor(testActor)))
|
val pingpong2, pingpong3 = child(middleSupervisor, Props(new PingPongActor(testActor)))
|
||||||
|
|
||||||
(pingpong1, pingpong2, pingpong3, topSupervisor)
|
(pingpong1, pingpong2, pingpong3, topSupervisor)
|
||||||
|
|
@ -131,14 +137,14 @@ class SupervisorSpec extends AkkaSpec with BeforeAndAfterEach with ImplicitSende
|
||||||
}
|
}
|
||||||
|
|
||||||
def ping(pingPongActor: ActorRef) = {
|
def ping(pingPongActor: ActorRef) = {
|
||||||
Await.result(pingPongActor.?(Ping)(TimeoutMillis), TimeoutMillis millis) must be === PongMessage
|
Await.result(pingPongActor.?(Ping)(DilatedTimeout), DilatedTimeout) must be === PongMessage
|
||||||
expectMsg(Timeout, PingMessage)
|
expectMsg(Timeout, PingMessage)
|
||||||
}
|
}
|
||||||
|
|
||||||
def kill(pingPongActor: ActorRef) = {
|
def kill(pingPongActor: ActorRef) = {
|
||||||
val result = (pingPongActor.?(DieReply)(TimeoutMillis))
|
val result = (pingPongActor.?(DieReply)(DilatedTimeout))
|
||||||
expectMsg(Timeout, ExceptionMessage)
|
expectMsg(Timeout, ExceptionMessage)
|
||||||
intercept[RuntimeException] { Await.result(result, TimeoutMillis millis) }
|
intercept[RuntimeException] { Await.result(result, DilatedTimeout) }
|
||||||
}
|
}
|
||||||
|
|
||||||
"A supervisor" must {
|
"A supervisor" must {
|
||||||
|
|
@ -154,7 +160,7 @@ class SupervisorSpec extends AkkaSpec with BeforeAndAfterEach with ImplicitSende
|
||||||
"not restart temporary actor" in {
|
"not restart temporary actor" in {
|
||||||
val (temporaryActor, _) = temporaryActorAllForOne
|
val (temporaryActor, _) = temporaryActorAllForOne
|
||||||
|
|
||||||
intercept[RuntimeException] { Await.result(temporaryActor.?(DieReply)(TimeoutMillis), TimeoutMillis millis) }
|
intercept[RuntimeException] { Await.result(temporaryActor.?(DieReply)(DilatedTimeout), DilatedTimeout) }
|
||||||
|
|
||||||
expectNoMsg(1 second)
|
expectNoMsg(1 second)
|
||||||
}
|
}
|
||||||
|
|
@ -280,7 +286,8 @@ class SupervisorSpec extends AkkaSpec with BeforeAndAfterEach with ImplicitSende
|
||||||
|
|
||||||
"must attempt restart when exception during restart" in {
|
"must attempt restart when exception during restart" in {
|
||||||
val inits = new AtomicInteger(0)
|
val inits = new AtomicInteger(0)
|
||||||
val supervisor = system.actorOf(Props(new Supervisor(OneForOneStrategy(classOf[Exception] :: Nil, 3, 10000))))
|
val supervisor = system.actorOf(Props(new Supervisor(
|
||||||
|
OneForOneStrategy(maxNrOfRetries = 3, withinTimeRange = 10 seconds)(classOf[Exception] :: Nil))))
|
||||||
|
|
||||||
val dyingProps = Props(new Actor {
|
val dyingProps = Props(new Actor {
|
||||||
inits.incrementAndGet
|
inits.incrementAndGet
|
||||||
|
|
@ -300,11 +307,11 @@ class SupervisorSpec extends AkkaSpec with BeforeAndAfterEach with ImplicitSende
|
||||||
filterEvents(EventFilter[RuntimeException]("Expected", occurrences = 1),
|
filterEvents(EventFilter[RuntimeException]("Expected", occurrences = 1),
|
||||||
EventFilter[IllegalStateException]("error while creating actor", occurrences = 1)) {
|
EventFilter[IllegalStateException]("error while creating actor", occurrences = 1)) {
|
||||||
intercept[RuntimeException] {
|
intercept[RuntimeException] {
|
||||||
Await.result(dyingActor.?(DieReply)(TimeoutMillis), TimeoutMillis millis)
|
Await.result(dyingActor.?(DieReply)(DilatedTimeout), DilatedTimeout)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
Await.result(dyingActor.?(Ping)(TimeoutMillis), TimeoutMillis millis) must be === PongMessage
|
Await.result(dyingActor.?(Ping)(DilatedTimeout), DilatedTimeout) must be === PongMessage
|
||||||
|
|
||||||
inits.get must be(3)
|
inits.get must be(3)
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -23,7 +23,7 @@ class SupervisorTreeSpec extends AkkaSpec with ImplicitSender with DefaultTimeou
|
||||||
EventFilter[ActorKilledException](occurrences = 1) intercept {
|
EventFilter[ActorKilledException](occurrences = 1) intercept {
|
||||||
within(5 seconds) {
|
within(5 seconds) {
|
||||||
val p = Props(new Actor {
|
val p = Props(new Actor {
|
||||||
override val supervisorStrategy = OneForOneStrategy(List(classOf[Exception]), 3, 1000)
|
override val supervisorStrategy = OneForOneStrategy(maxNrOfRetries = 3, withinTimeRange = 1 second)(List(classOf[Exception]))
|
||||||
def receive = {
|
def receive = {
|
||||||
case p: Props ⇒ sender ! context.actorOf(p)
|
case p: Props ⇒ sender ! context.actorOf(p)
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -12,6 +12,7 @@ import akka.testkit.ImplicitSender
|
||||||
import akka.testkit.DefaultTimeout
|
import akka.testkit.DefaultTimeout
|
||||||
import akka.dispatch.Await
|
import akka.dispatch.Await
|
||||||
import akka.pattern.ask
|
import akka.pattern.ask
|
||||||
|
import akka.util.duration._
|
||||||
|
|
||||||
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
|
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
|
||||||
class Ticket669Spec extends AkkaSpec with BeforeAndAfterAll with ImplicitSender with DefaultTimeout {
|
class Ticket669Spec extends AkkaSpec with BeforeAndAfterAll with ImplicitSender with DefaultTimeout {
|
||||||
|
|
@ -25,7 +26,8 @@ class Ticket669Spec extends AkkaSpec with BeforeAndAfterAll with ImplicitSender
|
||||||
"A supervised actor with lifecycle PERMANENT" should {
|
"A supervised actor with lifecycle PERMANENT" should {
|
||||||
"be able to reply on failure during preRestart" in {
|
"be able to reply on failure during preRestart" in {
|
||||||
filterEvents(EventFilter[Exception]("test", occurrences = 1)) {
|
filterEvents(EventFilter[Exception]("test", occurrences = 1)) {
|
||||||
val supervisor = system.actorOf(Props(new Supervisor(AllForOneStrategy(List(classOf[Exception]), 5, 10000))))
|
val supervisor = system.actorOf(Props(new Supervisor(
|
||||||
|
AllForOneStrategy(5, 10 seconds)(List(classOf[Exception])))))
|
||||||
val supervised = Await.result((supervisor ? Props[Supervised]).mapTo[ActorRef], timeout.duration)
|
val supervised = Await.result((supervisor ? Props[Supervised]).mapTo[ActorRef], timeout.duration)
|
||||||
|
|
||||||
supervised.!("test")(testActor)
|
supervised.!("test")(testActor)
|
||||||
|
|
@ -36,7 +38,8 @@ class Ticket669Spec extends AkkaSpec with BeforeAndAfterAll with ImplicitSender
|
||||||
|
|
||||||
"be able to reply on failure during postStop" in {
|
"be able to reply on failure during postStop" in {
|
||||||
filterEvents(EventFilter[Exception]("test", occurrences = 1)) {
|
filterEvents(EventFilter[Exception]("test", occurrences = 1)) {
|
||||||
val supervisor = system.actorOf(Props(new Supervisor(AllForOneStrategy(List(classOf[Exception]), Some(0), None))))
|
val supervisor = system.actorOf(Props(new Supervisor(
|
||||||
|
AllForOneStrategy(maxNrOfRetries = 0)(List(classOf[Exception])))))
|
||||||
val supervised = Await.result((supervisor ? Props[Supervised]).mapTo[ActorRef], timeout.duration)
|
val supervised = Await.result((supervisor ? Props[Supervised]).mapTo[ActorRef], timeout.duration)
|
||||||
|
|
||||||
supervised.!("test")(testActor)
|
supervised.!("test")(testActor)
|
||||||
|
|
|
||||||
|
|
@ -300,7 +300,7 @@ class TypedActorSpec extends AkkaSpec(TypedActorSpec.config)
|
||||||
"be able to handle exceptions when calling methods" in {
|
"be able to handle exceptions when calling methods" in {
|
||||||
filterEvents(EventFilter[IllegalStateException]("expected")) {
|
filterEvents(EventFilter[IllegalStateException]("expected")) {
|
||||||
val boss = system.actorOf(Props(new Actor {
|
val boss = system.actorOf(Props(new Actor {
|
||||||
override val supervisorStrategy = OneForOneStrategy {
|
override val supervisorStrategy = OneForOneStrategy() {
|
||||||
case e: IllegalStateException if e.getMessage == "expected" ⇒ SupervisorStrategy.Resume
|
case e: IllegalStateException if e.getMessage == "expected" ⇒ SupervisorStrategy.Resume
|
||||||
}
|
}
|
||||||
def receive = {
|
def receive = {
|
||||||
|
|
|
||||||
|
|
@ -15,7 +15,8 @@ import akka.actor._
|
||||||
|
|
||||||
object LoggingReceiveSpec {
|
object LoggingReceiveSpec {
|
||||||
class TestLogActor extends Actor {
|
class TestLogActor extends Actor {
|
||||||
override val supervisorStrategy = OneForOneStrategy(List(classOf[Throwable]), 5, 5000)
|
override val supervisorStrategy =
|
||||||
|
OneForOneStrategy(maxNrOfRetries = 5, withinTimeRange = 5 seconds)(List(classOf[Throwable]))
|
||||||
def receive = { case _ ⇒ }
|
def receive = { case _ ⇒ }
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -142,12 +142,12 @@ object Actor {
|
||||||
* {{{
|
* {{{
|
||||||
* class ExampleActor extends Actor {
|
* class ExampleActor extends Actor {
|
||||||
*
|
*
|
||||||
* override val supervisorStrategy = OneForOneStrategy({
|
* override val supervisorStrategy = OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange = 1 minute) {
|
||||||
* case _: ArithmeticException ⇒ Resume
|
* case _: ArithmeticException ⇒ Resume
|
||||||
* case _: NullPointerException ⇒ Restart
|
* case _: NullPointerException ⇒ Restart
|
||||||
* case _: IllegalArgumentException ⇒ Stop
|
* case _: IllegalArgumentException ⇒ Stop
|
||||||
* case _: Exception ⇒ Escalate
|
* case _: Exception ⇒ Escalate
|
||||||
* }: Decider, maxNrOfRetries = Some(10), withinTimeRange = Some(60000))
|
* }
|
||||||
*
|
*
|
||||||
* def receive = {
|
* def receive = {
|
||||||
* // directly calculated reply
|
* // directly calculated reply
|
||||||
|
|
|
||||||
|
|
@ -355,7 +355,7 @@ class LocalActorRefProvider(
|
||||||
|
|
||||||
override val supervisorStrategy = {
|
override val supervisorStrategy = {
|
||||||
import akka.actor.SupervisorStrategy._
|
import akka.actor.SupervisorStrategy._
|
||||||
OneForOneStrategy {
|
OneForOneStrategy() {
|
||||||
case _: ActorKilledException ⇒ Stop
|
case _: ActorKilledException ⇒ Stop
|
||||||
case _: ActorInitializationException ⇒ Stop
|
case _: ActorInitializationException ⇒ Stop
|
||||||
case _: Exception ⇒ Restart
|
case _: Exception ⇒ Restart
|
||||||
|
|
|
||||||
|
|
@ -8,6 +8,7 @@ import scala.annotation.tailrec
|
||||||
import scala.collection.mutable.ArrayBuffer
|
import scala.collection.mutable.ArrayBuffer
|
||||||
import scala.collection.JavaConversions._
|
import scala.collection.JavaConversions._
|
||||||
import java.lang.{ Iterable ⇒ JIterable }
|
import java.lang.{ Iterable ⇒ JIterable }
|
||||||
|
import akka.util.Duration
|
||||||
|
|
||||||
case class ChildRestartStats(val child: ActorRef, var maxNrOfRetriesCount: Int = 0, var restartTimeWindowStartNanos: Long = 0L) {
|
case class ChildRestartStats(val child: ActorRef, var maxNrOfRetriesCount: Int = 0, var restartTimeWindowStartNanos: Long = 0L) {
|
||||||
|
|
||||||
|
|
@ -44,7 +45,16 @@ case class ChildRestartStats(val child: ActorRef, var maxNrOfRetriesCount: Int =
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
object SupervisorStrategy {
|
trait SupervisorStrategyLowPriorityImplicits { this: SupervisorStrategy.type ⇒
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Implicit conversion from `Seq` of Cause-Action pairs to a `Decider`. See makeDecider(causeAction).
|
||||||
|
*/
|
||||||
|
implicit def seqCauseAction2Decider(trapExit: Iterable[CauseAction]): Decider = makeDecider(trapExit)
|
||||||
|
// the above would clash with seqThrowable2Decider for empty lists
|
||||||
|
}
|
||||||
|
|
||||||
|
object SupervisorStrategy extends SupervisorStrategyLowPriorityImplicits {
|
||||||
sealed trait Action
|
sealed trait Action
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -95,6 +105,13 @@ object SupervisorStrategy {
|
||||||
*/
|
*/
|
||||||
def escalate = Escalate
|
def escalate = Escalate
|
||||||
|
|
||||||
|
/**
|
||||||
|
* When supervisorStrategy is not specified for an actor this
|
||||||
|
* is used by default. The child will be stopped when
|
||||||
|
* [[akka.ActorInitializationException]] or [[akka.ActorKilledException]]
|
||||||
|
* is thrown. It will be restarted for other `Exception` types.
|
||||||
|
* The error is escalated if it's a `Throwable`, i.e. `Error`.
|
||||||
|
*/
|
||||||
final val defaultStrategy: SupervisorStrategy = {
|
final val defaultStrategy: SupervisorStrategy = {
|
||||||
def defaultDecider: Decider = {
|
def defaultDecider: Decider = {
|
||||||
case _: ActorInitializationException ⇒ Stop
|
case _: ActorInitializationException ⇒ Stop
|
||||||
|
|
@ -102,32 +119,38 @@ object SupervisorStrategy {
|
||||||
case _: Exception ⇒ Restart
|
case _: Exception ⇒ Restart
|
||||||
case _ ⇒ Escalate
|
case _ ⇒ Escalate
|
||||||
}
|
}
|
||||||
OneForOneStrategy(defaultDecider, None, None)
|
OneForOneStrategy()(defaultDecider)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Implicit conversion from `Seq` of Throwables to a `Decider`.
|
||||||
|
* This maps the given Throwables to restarts, otherwise escalates.
|
||||||
|
*/
|
||||||
|
implicit def seqThrowable2Decider(trapExit: Seq[Class[_ <: Throwable]]): Decider = makeDecider(trapExit)
|
||||||
|
|
||||||
type Decider = PartialFunction[Throwable, Action]
|
type Decider = PartialFunction[Throwable, Action]
|
||||||
type JDecider = akka.japi.Function[Throwable, Action]
|
type JDecider = akka.japi.Function[Throwable, Action]
|
||||||
type CauseAction = (Class[_ <: Throwable], Action)
|
type CauseAction = (Class[_ <: Throwable], Action)
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Backwards compatible Decider builder which just checks whether one of
|
* Decider builder which just checks whether one of
|
||||||
* the given Throwables matches the cause and restarts, otherwise escalates.
|
* the given Throwables matches the cause and restarts, otherwise escalates.
|
||||||
*/
|
*/
|
||||||
def makeDecider(trapExit: Array[Class[_ <: Throwable]]): Decider =
|
def makeDecider(trapExit: Array[Class[_ <: Throwable]]): Decider =
|
||||||
{ case x ⇒ if (trapExit exists (_ isInstance x)) Restart else Escalate }
|
{ case x ⇒ if (trapExit exists (_ isInstance x)) Restart else Escalate }
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Backwards compatible Decider builder which just checks whether one of
|
* Decider builder which just checks whether one of
|
||||||
* the given Throwables matches the cause and restarts, otherwise escalates.
|
* the given Throwables matches the cause and restarts, otherwise escalates.
|
||||||
*/
|
*/
|
||||||
def makeDecider(trapExit: List[Class[_ <: Throwable]]): Decider =
|
def makeDecider(trapExit: Seq[Class[_ <: Throwable]]): Decider =
|
||||||
{ case x ⇒ if (trapExit exists (_ isInstance x)) Restart else Escalate }
|
{ case x ⇒ if (trapExit exists (_ isInstance x)) Restart else Escalate }
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Backwards compatible Decider builder which just checks whether one of
|
* Decider builder which just checks whether one of
|
||||||
* the given Throwables matches the cause and restarts, otherwise escalates.
|
* the given Throwables matches the cause and restarts, otherwise escalates.
|
||||||
*/
|
*/
|
||||||
def makeDecider(trapExit: JIterable[Class[_ <: Throwable]]): Decider = makeDecider(trapExit.toList)
|
def makeDecider(trapExit: JIterable[Class[_ <: Throwable]]): Decider = makeDecider(trapExit.toSeq)
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Decider builder for Iterables of cause-action pairs, e.g. a map obtained
|
* Decider builder for Iterables of cause-action pairs, e.g. a map obtained
|
||||||
|
|
@ -156,6 +179,11 @@ object SupervisorStrategy {
|
||||||
}
|
}
|
||||||
buf
|
buf
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private[akka] def withinTimeRangeOption(withinTimeRange: Duration): Option[Duration] =
|
||||||
|
if (withinTimeRange.isFinite && withinTimeRange >= Duration.Zero) Some(withinTimeRange) else None
|
||||||
|
private[akka] def maxNrOfRetriesOption(maxNrOfRetries: Int): Option[Int] =
|
||||||
|
if (maxNrOfRetries < 0) None else Some(maxNrOfRetries)
|
||||||
}
|
}
|
||||||
|
|
||||||
abstract class SupervisorStrategy {
|
abstract class SupervisorStrategy {
|
||||||
|
|
@ -196,49 +224,36 @@ abstract class SupervisorStrategy {
|
||||||
case Escalate ⇒ false
|
case Escalate ⇒ false
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
object AllForOneStrategy {
|
|
||||||
def apply(trapExit: List[Class[_ <: Throwable]], maxNrOfRetries: Int, withinTimeRange: Int): AllForOneStrategy =
|
|
||||||
new AllForOneStrategy(SupervisorStrategy.makeDecider(trapExit),
|
|
||||||
if (maxNrOfRetries < 0) None else Some(maxNrOfRetries), if (withinTimeRange < 0) None else Some(withinTimeRange))
|
|
||||||
def apply(trapExit: List[Class[_ <: Throwable]], maxNrOfRetries: Option[Int], withinTimeRange: Option[Int]): AllForOneStrategy =
|
|
||||||
new AllForOneStrategy(SupervisorStrategy.makeDecider(trapExit), maxNrOfRetries, withinTimeRange)
|
|
||||||
def apply(trapExit: List[Class[_ <: Throwable]], maxNrOfRetries: Option[Int]): AllForOneStrategy =
|
|
||||||
new AllForOneStrategy(SupervisorStrategy.makeDecider(trapExit), maxNrOfRetries, None)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Restart all actors linked to the same supervisor when one fails,
|
* Restart all actors linked to the same supervisor when one fails,
|
||||||
* trapExit = which Throwables should be intercepted
|
* @param maxNrOfRetries the number of times an actor is allowed to be restarted
|
||||||
* maxNrOfRetries = the number of times an actor is allowed to be restarted
|
* @param withinTimeRange duration of the time window for maxNrOfRetries, Duration.Inf means no window
|
||||||
* withinTimeRange = millisecond time window for maxNrOfRetries, negative means no window
|
* @param decider = mapping from Throwable to [[akka.actor.SupervisorStrategy.Action]], you can also use a
|
||||||
|
* `Seq` of Throwables which maps the given Throwables to restarts, otherwise escalates.
|
||||||
*/
|
*/
|
||||||
case class AllForOneStrategy(decider: SupervisorStrategy.Decider,
|
case class AllForOneStrategy(maxNrOfRetries: Int = -1, withinTimeRange: Duration = Duration.Inf)(val decider: SupervisorStrategy.Decider)
|
||||||
maxNrOfRetries: Option[Int] = None,
|
extends SupervisorStrategy {
|
||||||
withinTimeRange: Option[Int] = None) extends SupervisorStrategy {
|
|
||||||
|
|
||||||
def this(decider: SupervisorStrategy.JDecider, maxNrOfRetries: Int, withinTimeRange: Int) =
|
def this(maxNrOfRetries: Int, withinTimeRange: Duration, decider: SupervisorStrategy.JDecider) =
|
||||||
this(SupervisorStrategy.makeDecider(decider),
|
this(maxNrOfRetries, withinTimeRange)(SupervisorStrategy.makeDecider(decider))
|
||||||
if (maxNrOfRetries < 0) None else Some(maxNrOfRetries),
|
|
||||||
if (withinTimeRange < 0) None else Some(withinTimeRange))
|
|
||||||
|
|
||||||
def this(trapExit: JIterable[Class[_ <: Throwable]], maxNrOfRetries: Int, withinTimeRange: Int) =
|
def this(maxNrOfRetries: Int, withinTimeRange: Duration, trapExit: JIterable[Class[_ <: Throwable]]) =
|
||||||
this(SupervisorStrategy.makeDecider(trapExit),
|
this(maxNrOfRetries, withinTimeRange)(SupervisorStrategy.makeDecider(trapExit))
|
||||||
if (maxNrOfRetries < 0) None else Some(maxNrOfRetries),
|
|
||||||
if (withinTimeRange < 0) None else Some(withinTimeRange))
|
|
||||||
|
|
||||||
def this(trapExit: Array[Class[_ <: Throwable]], maxNrOfRetries: Int, withinTimeRange: Int) =
|
def this(maxNrOfRetries: Int, withinTimeRange: Duration, trapExit: Array[Class[_ <: Throwable]]) =
|
||||||
this(SupervisorStrategy.makeDecider(trapExit),
|
this(maxNrOfRetries, withinTimeRange)(SupervisorStrategy.makeDecider(trapExit))
|
||||||
if (maxNrOfRetries < 0) None else Some(maxNrOfRetries),
|
|
||||||
if (withinTimeRange < 0) None else Some(withinTimeRange))
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* this is a performance optimization to avoid re-allocating the pairs upon
|
* this is a performance optimization to avoid re-allocating the pairs upon
|
||||||
* every call to requestRestartPermission, assuming that strategies are shared
|
* every call to requestRestartPermission, assuming that strategies are shared
|
||||||
* across actors and thus this field does not take up much space
|
* across actors and thus this field does not take up much space
|
||||||
*/
|
*/
|
||||||
val retriesWindow = (maxNrOfRetries, withinTimeRange)
|
private val retriesWindow = (
|
||||||
|
SupervisorStrategy.maxNrOfRetriesOption(maxNrOfRetries),
|
||||||
|
SupervisorStrategy.withinTimeRangeOption(withinTimeRange).map(_.toMillis.toInt))
|
||||||
|
|
||||||
def handleChildTerminated(context: ActorContext, child: ActorRef, children: Iterable[ActorRef]): Unit = {
|
def handleChildTerminated(context: ActorContext, child: ActorRef, children: Iterable[ActorRef]): Unit = {
|
||||||
children foreach (context.stop(_))
|
children foreach (context.stop(_))
|
||||||
|
|
@ -255,47 +270,33 @@ case class AllForOneStrategy(decider: SupervisorStrategy.Decider,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
object OneForOneStrategy {
|
|
||||||
def apply(trapExit: List[Class[_ <: Throwable]], maxNrOfRetries: Int, withinTimeRange: Int): OneForOneStrategy =
|
|
||||||
new OneForOneStrategy(SupervisorStrategy.makeDecider(trapExit),
|
|
||||||
if (maxNrOfRetries < 0) None else Some(maxNrOfRetries), if (withinTimeRange < 0) None else Some(withinTimeRange))
|
|
||||||
def apply(trapExit: List[Class[_ <: Throwable]], maxNrOfRetries: Option[Int], withinTimeRange: Option[Int]): OneForOneStrategy =
|
|
||||||
new OneForOneStrategy(SupervisorStrategy.makeDecider(trapExit), maxNrOfRetries, withinTimeRange)
|
|
||||||
def apply(trapExit: List[Class[_ <: Throwable]], maxNrOfRetries: Option[Int]): OneForOneStrategy =
|
|
||||||
new OneForOneStrategy(SupervisorStrategy.makeDecider(trapExit), maxNrOfRetries, None)
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Restart an actor when it fails
|
* Restart an actor when it fails
|
||||||
* trapExit = which Throwables should be intercepted
|
* @param maxNrOfRetries the number of times an actor is allowed to be restarted
|
||||||
* maxNrOfRetries = the number of times an actor is allowed to be restarted
|
* @param withinTimeRange duration of the time window for maxNrOfRetries, Duration.Inf means no window
|
||||||
* withinTimeRange = millisecond time window for maxNrOfRetries, negative means no window
|
* @param decider = mapping from Throwable to [[akka.actor.SupervisorStrategy.Action]], you can also use a
|
||||||
|
* `Seq` of Throwables which maps the given Throwables to restarts, otherwise escalates.
|
||||||
*/
|
*/
|
||||||
case class OneForOneStrategy(decider: SupervisorStrategy.Decider,
|
case class OneForOneStrategy(maxNrOfRetries: Int = -1, withinTimeRange: Duration = Duration.Inf)(val decider: SupervisorStrategy.Decider)
|
||||||
maxNrOfRetries: Option[Int] = None,
|
extends SupervisorStrategy {
|
||||||
withinTimeRange: Option[Int] = None) extends SupervisorStrategy {
|
|
||||||
|
|
||||||
def this(decider: SupervisorStrategy.JDecider, maxNrOfRetries: Int, withinTimeRange: Int) =
|
def this(maxNrOfRetries: Int, withinTimeRange: Duration, decider: SupervisorStrategy.JDecider) =
|
||||||
this(SupervisorStrategy.makeDecider(decider),
|
this(maxNrOfRetries, withinTimeRange)(SupervisorStrategy.makeDecider(decider))
|
||||||
if (maxNrOfRetries < 0) None else Some(maxNrOfRetries),
|
|
||||||
if (withinTimeRange < 0) None else Some(withinTimeRange))
|
|
||||||
|
|
||||||
def this(trapExit: JIterable[Class[_ <: Throwable]], maxNrOfRetries: Int, withinTimeRange: Int) =
|
def this(maxNrOfRetries: Int, withinTimeRange: Duration, trapExit: JIterable[Class[_ <: Throwable]]) =
|
||||||
this(SupervisorStrategy.makeDecider(trapExit),
|
this(maxNrOfRetries, withinTimeRange)(SupervisorStrategy.makeDecider(trapExit))
|
||||||
if (maxNrOfRetries < 0) None else Some(maxNrOfRetries),
|
|
||||||
if (withinTimeRange < 0) None else Some(withinTimeRange))
|
|
||||||
|
|
||||||
def this(trapExit: Array[Class[_ <: Throwable]], maxNrOfRetries: Int, withinTimeRange: Int) =
|
def this(maxNrOfRetries: Int, withinTimeRange: Duration, trapExit: Array[Class[_ <: Throwable]]) =
|
||||||
this(SupervisorStrategy.makeDecider(trapExit),
|
this(maxNrOfRetries, withinTimeRange)(SupervisorStrategy.makeDecider(trapExit))
|
||||||
if (maxNrOfRetries < 0) None else Some(maxNrOfRetries),
|
|
||||||
if (withinTimeRange < 0) None else Some(withinTimeRange))
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* this is a performance optimization to avoid re-allocating the pairs upon
|
* this is a performance optimization to avoid re-allocating the pairs upon
|
||||||
* every call to requestRestartPermission, assuming that strategies are shared
|
* every call to requestRestartPermission, assuming that strategies are shared
|
||||||
* across actors and thus this field does not take up much space
|
* across actors and thus this field does not take up much space
|
||||||
*/
|
*/
|
||||||
val retriesWindow = (maxNrOfRetries, withinTimeRange)
|
private val retriesWindow = (
|
||||||
|
SupervisorStrategy.maxNrOfRetriesOption(maxNrOfRetries),
|
||||||
|
SupervisorStrategy.withinTimeRangeOption(withinTimeRange).map(_.toMillis.toInt))
|
||||||
|
|
||||||
def handleChildTerminated(context: ActorContext, child: ActorRef, children: Iterable[ActorRef]): Unit = {}
|
def handleChildTerminated(context: ActorContext, child: ActorRef, children: Iterable[ActorRef]): Unit = {}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -37,20 +37,21 @@ import akka.dispatch.{ MessageDispatcher, Promise }
|
||||||
* }
|
* }
|
||||||
* }
|
* }
|
||||||
*
|
*
|
||||||
* private static SupervisorStrategy strategy = new OneForOneStrategy(new Function<Throwable, Action>() {
|
* private static SupervisorStrategy strategy = new OneForOneStrategy(10, Duration.parse("1 minute"),
|
||||||
* @Override
|
* new Function<Throwable, Action>() {
|
||||||
* public Action apply(Throwable t) {
|
* @Override
|
||||||
* if (t instanceof ArithmeticException) {
|
* public Action apply(Throwable t) {
|
||||||
* return resume();
|
* if (t instanceof ArithmeticException) {
|
||||||
* } else if (t instanceof NullPointerException) {
|
* return resume();
|
||||||
* return restart();
|
* } else if (t instanceof NullPointerException) {
|
||||||
* } else if (t instanceof IllegalArgumentException) {
|
* return restart();
|
||||||
* return stop();
|
* } else if (t instanceof IllegalArgumentException) {
|
||||||
* } else {
|
* return stop();
|
||||||
* return escalate();
|
* } else {
|
||||||
|
* return escalate();
|
||||||
|
* }
|
||||||
* }
|
* }
|
||||||
* }
|
* });
|
||||||
* }, 10, 60000);
|
|
||||||
*
|
*
|
||||||
* @Override
|
* @Override
|
||||||
* public SupervisorStrategy supervisorStrategy() {
|
* public SupervisorStrategy supervisorStrategy() {
|
||||||
|
|
|
||||||
|
|
@ -39,20 +39,21 @@ public class FaultHandlingTestBase {
|
||||||
static public class Supervisor extends UntypedActor {
|
static public class Supervisor extends UntypedActor {
|
||||||
|
|
||||||
//#strategy
|
//#strategy
|
||||||
private static SupervisorStrategy strategy = new OneForOneStrategy(new Function<Throwable, Action>() {
|
private static SupervisorStrategy strategy = new OneForOneStrategy(10, Duration.parse("1 minute"),
|
||||||
@Override
|
new Function<Throwable, Action>() {
|
||||||
public Action apply(Throwable t) {
|
@Override
|
||||||
if (t instanceof ArithmeticException) {
|
public Action apply(Throwable t) {
|
||||||
return resume();
|
if (t instanceof ArithmeticException) {
|
||||||
} else if (t instanceof NullPointerException) {
|
return resume();
|
||||||
return restart();
|
} else if (t instanceof NullPointerException) {
|
||||||
} else if (t instanceof IllegalArgumentException) {
|
return restart();
|
||||||
return stop();
|
} else if (t instanceof IllegalArgumentException) {
|
||||||
} else {
|
return stop();
|
||||||
return escalate();
|
} else {
|
||||||
}
|
return escalate();
|
||||||
}
|
}
|
||||||
}, 10, 60000);
|
}
|
||||||
|
});
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public SupervisorStrategy supervisorStrategy() {
|
public SupervisorStrategy supervisorStrategy() {
|
||||||
|
|
@ -76,20 +77,21 @@ public class FaultHandlingTestBase {
|
||||||
static public class Supervisor2 extends UntypedActor {
|
static public class Supervisor2 extends UntypedActor {
|
||||||
|
|
||||||
//#strategy2
|
//#strategy2
|
||||||
private static SupervisorStrategy strategy = new OneForOneStrategy(new Function<Throwable, Action>() {
|
private static SupervisorStrategy strategy = new OneForOneStrategy(10, Duration.parse("1 minute"),
|
||||||
@Override
|
new Function<Throwable, Action>() {
|
||||||
public Action apply(Throwable t) {
|
@Override
|
||||||
if (t instanceof ArithmeticException) {
|
public Action apply(Throwable t) {
|
||||||
return resume();
|
if (t instanceof ArithmeticException) {
|
||||||
} else if (t instanceof NullPointerException) {
|
return resume();
|
||||||
return restart();
|
} else if (t instanceof NullPointerException) {
|
||||||
} else if (t instanceof IllegalArgumentException) {
|
return restart();
|
||||||
return stop();
|
} else if (t instanceof IllegalArgumentException) {
|
||||||
} else {
|
return stop();
|
||||||
return escalate();
|
} else {
|
||||||
}
|
return escalate();
|
||||||
}
|
}
|
||||||
}, 10, 60000);
|
}
|
||||||
|
});
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public SupervisorStrategy supervisorStrategy() {
|
public SupervisorStrategy supervisorStrategy() {
|
||||||
|
|
|
||||||
|
|
@ -467,7 +467,7 @@ v1.3::
|
||||||
|
|
||||||
val supervisor = Supervisor(
|
val supervisor = Supervisor(
|
||||||
SupervisorConfig(
|
SupervisorConfig(
|
||||||
AllForOneStrategy(List(classOf[Exception]), 3, 1000),
|
OneForOneStrategy(List(classOf[Exception]), 3, 1000),
|
||||||
Supervise(
|
Supervise(
|
||||||
actorOf[MyActor1],
|
actorOf[MyActor1],
|
||||||
Permanent) ::
|
Permanent) ::
|
||||||
|
|
@ -479,12 +479,12 @@ v1.3::
|
||||||
v2.0::
|
v2.0::
|
||||||
|
|
||||||
class MyActor extends Actor {
|
class MyActor extends Actor {
|
||||||
override val supervisorStrategy = OneForOneStrategy({
|
override val supervisorStrategy = OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange = 1 minute) {
|
||||||
case _: ArithmeticException ⇒ Resume
|
case _: ArithmeticException ⇒ Resume
|
||||||
case _: NullPointerException ⇒ Restart
|
case _: NullPointerException ⇒ Restart
|
||||||
case _: IllegalArgumentException ⇒ Stop
|
case _: IllegalArgumentException ⇒ Stop
|
||||||
case _: Exception ⇒ Escalate
|
case _: Exception ⇒ Escalate
|
||||||
}: Decider, maxNrOfRetries = Some(10), withinTimeRange = Some(60000))
|
}
|
||||||
|
|
||||||
def receive = {
|
def receive = {
|
||||||
case x =>
|
case x =>
|
||||||
|
|
|
||||||
|
|
@ -20,13 +20,14 @@ object FaultHandlingDocSpec {
|
||||||
//#strategy
|
//#strategy
|
||||||
import akka.actor.OneForOneStrategy
|
import akka.actor.OneForOneStrategy
|
||||||
import akka.actor.SupervisorStrategy._
|
import akka.actor.SupervisorStrategy._
|
||||||
|
import akka.util.duration._
|
||||||
|
|
||||||
override val supervisorStrategy = OneForOneStrategy({
|
override val supervisorStrategy = OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange = 1 minute) {
|
||||||
case _: ArithmeticException ⇒ Resume
|
case _: ArithmeticException ⇒ Resume
|
||||||
case _: NullPointerException ⇒ Restart
|
case _: NullPointerException ⇒ Restart
|
||||||
case _: IllegalArgumentException ⇒ Stop
|
case _: IllegalArgumentException ⇒ Stop
|
||||||
case _: Exception ⇒ Escalate
|
case _: Exception ⇒ Escalate
|
||||||
}: Decider, maxNrOfRetries = Some(10), withinTimeRange = Some(60000))
|
}
|
||||||
//#strategy
|
//#strategy
|
||||||
|
|
||||||
def receive = {
|
def receive = {
|
||||||
|
|
@ -40,13 +41,14 @@ object FaultHandlingDocSpec {
|
||||||
//#strategy2
|
//#strategy2
|
||||||
import akka.actor.OneForOneStrategy
|
import akka.actor.OneForOneStrategy
|
||||||
import akka.actor.SupervisorStrategy._
|
import akka.actor.SupervisorStrategy._
|
||||||
|
import akka.util.duration._
|
||||||
|
|
||||||
override val supervisorStrategy = OneForOneStrategy({
|
override val supervisorStrategy = OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange = 1 minute) {
|
||||||
case _: ArithmeticException ⇒ Resume
|
case _: ArithmeticException ⇒ Resume
|
||||||
case _: NullPointerException ⇒ Restart
|
case _: NullPointerException ⇒ Restart
|
||||||
case _: IllegalArgumentException ⇒ Stop
|
case _: IllegalArgumentException ⇒ Stop
|
||||||
case _: Exception ⇒ Escalate
|
case _: Exception ⇒ Escalate
|
||||||
}: Decider, maxNrOfRetries = Some(10), withinTimeRange = Some(60000))
|
}
|
||||||
//#strategy2
|
//#strategy2
|
||||||
|
|
||||||
def receive = {
|
def receive = {
|
||||||
|
|
|
||||||
|
|
@ -31,8 +31,7 @@ that the respective limit does not apply, leaving the possibility to specify an
|
||||||
absolute upper limit on the restarts or to make the restarts work infinitely.
|
absolute upper limit on the restarts or to make the restarts work infinitely.
|
||||||
|
|
||||||
The match statement which forms the bulk of the body is of type ``Decider``,
|
The match statement which forms the bulk of the body is of type ``Decider``,
|
||||||
which is a ``PartialFunction[Throwable, Action]``, and we need to help out the
|
which is a ``PartialFunction[Throwable, Action]``. This
|
||||||
type inferencer a bit here by ascribing that type after the closing brace. This
|
|
||||||
is the piece which maps child failure types to their corresponding actions.
|
is the piece which maps child failure types to their corresponding actions.
|
||||||
|
|
||||||
Practical Application
|
Practical Application
|
||||||
|
|
|
||||||
|
|
@ -182,7 +182,8 @@ class TestActorRefSpec extends AkkaSpec("disp1.type=Dispatcher") with BeforeAndA
|
||||||
override def postRestart(reason: Throwable) { counter -= 1 }
|
override def postRestart(reason: Throwable) { counter -= 1 }
|
||||||
}), self, "child")
|
}), self, "child")
|
||||||
|
|
||||||
override def supervisorStrategy = OneForOneStrategy(List(classOf[ActorKilledException]), 5, 1000)
|
override def supervisorStrategy =
|
||||||
|
OneForOneStrategy(maxNrOfRetries = 5, withinTimeRange = 1 second)(List(classOf[ActorKilledException]))
|
||||||
|
|
||||||
def receiveT = { case "sendKill" ⇒ ref ! Kill }
|
def receiveT = { case "sendKill" ⇒ ref ! Kill }
|
||||||
}))
|
}))
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue