Moved decider to separate parameter list, and implicit conversion from Seq[Throwable]. See #1714
This commit is contained in:
parent
abc072ef0a
commit
2a6b7f9b03
22 changed files with 142 additions and 144 deletions
|
|
@ -81,8 +81,8 @@ class ActorFireForgetRequestReplySpec extends AkkaSpec with BeforeAndAfterEach w
|
|||
|
||||
"should shutdown crashed temporary actor" in {
|
||||
filterEvents(EventFilter[Exception]("Expected exception")) {
|
||||
val supervisor = system.actorOf(Props(new Supervisor(OneForOneStrategy(List(classOf[Exception]),
|
||||
maxNrOfRetries = 0))))
|
||||
val supervisor = system.actorOf(Props(new Supervisor(
|
||||
OneForOneStrategy(maxNrOfRetries = 0)(List(classOf[Exception])))))
|
||||
val actor = Await.result((supervisor ? Props[CrashingActor]).mapTo[ActorRef], timeout.duration)
|
||||
actor.isTerminated must be(false)
|
||||
actor ! "Die"
|
||||
|
|
|
|||
|
|
@ -36,7 +36,8 @@ class ActorLifeCycleSpec extends AkkaSpec with BeforeAndAfterEach with ImplicitS
|
|||
"invoke preRestart, preStart, postRestart when using OneForOneStrategy" in {
|
||||
filterException[ActorKilledException] {
|
||||
val id = newUuid().toString
|
||||
val supervisor = system.actorOf(Props(new Supervisor(OneForOneStrategy(List(classOf[Exception]), maxNrOfRetries = 3))))
|
||||
val supervisor = system.actorOf(Props(new Supervisor(
|
||||
OneForOneStrategy(maxNrOfRetries = 3)(List(classOf[Exception])))))
|
||||
val gen = new AtomicInteger(0)
|
||||
val restarterProps = Props(new LifeCycleTestActor(testActor, id, gen) {
|
||||
override def preRestart(reason: Throwable, message: Option[Any]) { report("preRestart") }
|
||||
|
|
@ -70,7 +71,8 @@ class ActorLifeCycleSpec extends AkkaSpec with BeforeAndAfterEach with ImplicitS
|
|||
"default for preRestart and postRestart is to call postStop and preStart respectively" in {
|
||||
filterException[ActorKilledException] {
|
||||
val id = newUuid().toString
|
||||
val supervisor = system.actorOf(Props(new Supervisor(OneForOneStrategy(List(classOf[Exception]), maxNrOfRetries = 3))))
|
||||
val supervisor = system.actorOf(Props(new Supervisor(
|
||||
OneForOneStrategy(maxNrOfRetries = 3)(List(classOf[Exception])))))
|
||||
val gen = new AtomicInteger(0)
|
||||
val restarterProps = Props(new LifeCycleTestActor(testActor, id, gen))
|
||||
val restarter = Await.result((supervisor ? restarterProps).mapTo[ActorRef], timeout.duration)
|
||||
|
|
@ -100,7 +102,8 @@ class ActorLifeCycleSpec extends AkkaSpec with BeforeAndAfterEach with ImplicitS
|
|||
|
||||
"not invoke preRestart and postRestart when never restarted using OneForOneStrategy" in {
|
||||
val id = newUuid().toString
|
||||
val supervisor = system.actorOf(Props(new Supervisor(OneForOneStrategy(List(classOf[Exception]), maxNrOfRetries = 3))))
|
||||
val supervisor = system.actorOf(Props(new Supervisor(
|
||||
OneForOneStrategy(maxNrOfRetries = 3)(List(classOf[Exception])))))
|
||||
val gen = new AtomicInteger(0)
|
||||
val props = Props(new LifeCycleTestActor(testActor, id, gen))
|
||||
val a = Await.result((supervisor ? props).mapTo[ActorRef], timeout.duration)
|
||||
|
|
|
|||
|
|
@ -376,7 +376,8 @@ class ActorRefSpec extends AkkaSpec with DefaultTimeout {
|
|||
|
||||
val boss = system.actorOf(Props(new Actor {
|
||||
|
||||
override val supervisorStrategy = OneForOneStrategy(List(classOf[Throwable]), maxNrOfRetries = 2, withinTimeRange = 1 second)
|
||||
override val supervisorStrategy =
|
||||
OneForOneStrategy(maxNrOfRetries = 2, withinTimeRange = 1 second)(List(classOf[Throwable]))
|
||||
|
||||
val ref = context.actorOf(
|
||||
Props(new Actor {
|
||||
|
|
|
|||
|
|
@ -95,7 +95,8 @@ trait DeathWatchSpec { this: AkkaSpec with ImplicitSender with DefaultTimeout
|
|||
|
||||
"notify with a Terminated message once when an Actor is stopped but not when restarted" in {
|
||||
filterException[ActorKilledException] {
|
||||
val supervisor = system.actorOf(Props(new Supervisor(OneForOneStrategy(List(classOf[Exception]), maxNrOfRetries = 2))))
|
||||
val supervisor = system.actorOf(Props(new Supervisor(
|
||||
OneForOneStrategy(maxNrOfRetries = 2)(List(classOf[Exception])))))
|
||||
val terminalProps = Props(context ⇒ { case x ⇒ context.sender ! x })
|
||||
val terminal = Await.result((supervisor ? terminalProps).mapTo[ActorRef], timeout.duration)
|
||||
|
||||
|
|
@ -116,7 +117,7 @@ trait DeathWatchSpec { this: AkkaSpec with ImplicitSender with DefaultTimeout
|
|||
"fail a monitor which does not handle Terminated()" in {
|
||||
filterEvents(EventFilter[ActorKilledException](), EventFilter[DeathPactException]()) {
|
||||
case class FF(fail: Failed)
|
||||
val strategy = new OneForOneStrategy(SupervisorStrategy.makeDecider(List(classOf[Exception])), maxNrOfRetries = 0) {
|
||||
val strategy = new OneForOneStrategy(maxNrOfRetries = 0)(SupervisorStrategy.makeDecider(List(classOf[Exception]))) {
|
||||
override def handleFailure(context: ActorContext, child: ActorRef, cause: Throwable, stats: ChildRestartStats, children: Iterable[ChildRestartStats]) = {
|
||||
testActor.tell(FF(Failed(cause)), child)
|
||||
super.handleFailure(context, child, cause, stats, children)
|
||||
|
|
|
|||
|
|
@ -72,7 +72,7 @@ class FSMTransitionSpec extends AkkaSpec with ImplicitSender {
|
|||
val fsm = system.actorOf(Props(new MyFSM(testActor)))
|
||||
val sup = system.actorOf(Props(new Actor {
|
||||
context.watch(fsm)
|
||||
override val supervisorStrategy = OneForOneStrategy(List(classOf[Throwable]), Duration.Inf)
|
||||
override val supervisorStrategy = OneForOneStrategy(withinTimeRange = Duration.Inf)(List(classOf[Throwable]))
|
||||
def receive = { case _ ⇒ }
|
||||
}))
|
||||
|
||||
|
|
|
|||
|
|
@ -30,8 +30,8 @@ class RestartStrategySpec extends AkkaSpec with DefaultTimeout {
|
|||
"A RestartStrategy" must {
|
||||
|
||||
"ensure that slave stays dead after max restarts within time range" in {
|
||||
val boss = system.actorOf(Props(new Supervisor(OneForOneStrategy(List(classOf[Throwable]),
|
||||
maxNrOfRetries = 2, withinTimeRange = 1 second))))
|
||||
val boss = system.actorOf(Props(new Supervisor(
|
||||
OneForOneStrategy(maxNrOfRetries = 2, withinTimeRange = 1 second)(List(classOf[Throwable])))))
|
||||
|
||||
val restartLatch = new TestLatch
|
||||
val secondRestartLatch = new TestLatch
|
||||
|
|
@ -77,7 +77,7 @@ class RestartStrategySpec extends AkkaSpec with DefaultTimeout {
|
|||
}
|
||||
|
||||
"ensure that slave is immortal without max restarts and time range" in {
|
||||
val boss = system.actorOf(Props(new Supervisor(OneForOneStrategy(List(classOf[Throwable]), Duration.Inf))))
|
||||
val boss = system.actorOf(Props(new Supervisor(OneForOneStrategy()(List(classOf[Throwable])))))
|
||||
|
||||
val countDownLatch = new TestLatch(100)
|
||||
|
||||
|
|
@ -99,8 +99,8 @@ class RestartStrategySpec extends AkkaSpec with DefaultTimeout {
|
|||
}
|
||||
|
||||
"ensure that slave restarts after number of crashes not within time range" in {
|
||||
val boss = system.actorOf(Props(new Supervisor(OneForOneStrategy(List(classOf[Throwable]),
|
||||
maxNrOfRetries = 2, withinTimeRange = 500 millis))))
|
||||
val boss = system.actorOf(Props(new Supervisor(
|
||||
OneForOneStrategy(maxNrOfRetries = 2, withinTimeRange = 500 millis)(List(classOf[Throwable])))))
|
||||
|
||||
val restartLatch = new TestLatch
|
||||
val secondRestartLatch = new TestLatch
|
||||
|
|
@ -157,7 +157,7 @@ class RestartStrategySpec extends AkkaSpec with DefaultTimeout {
|
|||
}
|
||||
|
||||
"ensure that slave is not restarted after max retries" in {
|
||||
val boss = system.actorOf(Props(new Supervisor(OneForOneStrategy(List(classOf[Throwable]), maxNrOfRetries = 2))))
|
||||
val boss = system.actorOf(Props(new Supervisor(OneForOneStrategy(maxNrOfRetries = 2)(List(classOf[Throwable])))))
|
||||
|
||||
val restartLatch = new TestLatch
|
||||
val secondRestartLatch = new TestLatch
|
||||
|
|
@ -212,7 +212,7 @@ class RestartStrategySpec extends AkkaSpec with DefaultTimeout {
|
|||
val countDownLatch = new TestLatch(2)
|
||||
|
||||
val boss = system.actorOf(Props(new Actor {
|
||||
override val supervisorStrategy = OneForOneStrategy(List(classOf[Throwable]), withinTimeRange = 1 second)
|
||||
override val supervisorStrategy = OneForOneStrategy(withinTimeRange = 1 second)(List(classOf[Throwable]))
|
||||
def receive = {
|
||||
case p: Props ⇒ sender ! context.watch(context.actorOf(p))
|
||||
case t: Terminated ⇒ maxNoOfRestartsLatch.open()
|
||||
|
|
|
|||
|
|
@ -134,7 +134,7 @@ class SchedulerSpec extends AkkaSpec with BeforeAndAfterEach with DefaultTimeout
|
|||
val restartLatch = new TestLatch
|
||||
val pingLatch = new TestLatch(6)
|
||||
|
||||
val supervisor = system.actorOf(Props(new Supervisor(AllForOneStrategy(List(classOf[Exception]), 3, 1 second))))
|
||||
val supervisor = system.actorOf(Props(new Supervisor(AllForOneStrategy(3, 1 second)(List(classOf[Exception])))))
|
||||
val props = Props(new Actor {
|
||||
def receive = {
|
||||
case Ping ⇒ pingLatch.countDown()
|
||||
|
|
|
|||
|
|
@ -40,9 +40,9 @@ class SupervisorHierarchySpec extends AkkaSpec with DefaultTimeout {
|
|||
"restart manager and workers in AllForOne" in {
|
||||
val countDown = new CountDownLatch(4)
|
||||
|
||||
val boss = system.actorOf(Props(new Supervisor(OneForOneStrategy(List(classOf[Exception]), Duration.Inf))))
|
||||
val boss = system.actorOf(Props(new Supervisor(OneForOneStrategy()(List(classOf[Exception])))))
|
||||
|
||||
val managerProps = Props(new CountDownActor(countDown, AllForOneStrategy(List(), Duration.Inf)))
|
||||
val managerProps = Props(new CountDownActor(countDown, AllForOneStrategy()(List())))
|
||||
val manager = Await.result((boss ? managerProps).mapTo[ActorRef], timeout.duration)
|
||||
|
||||
val workerProps = Props(new CountDownActor(countDown, SupervisorStrategy.defaultStrategy))
|
||||
|
|
@ -62,7 +62,8 @@ class SupervisorHierarchySpec extends AkkaSpec with DefaultTimeout {
|
|||
val countDownMessages = new CountDownLatch(1)
|
||||
val countDownMax = new CountDownLatch(1)
|
||||
val boss = system.actorOf(Props(new Actor {
|
||||
override val supervisorStrategy = OneForOneStrategy(List(classOf[Throwable]), maxNrOfRetries = 1, withinTimeRange = 5 seconds)
|
||||
override val supervisorStrategy =
|
||||
OneForOneStrategy(maxNrOfRetries = 1, withinTimeRange = 5 seconds)(List(classOf[Throwable]))
|
||||
|
||||
val crasher = context.watch(context.actorOf(Props(new CountDownActor(countDownMessages, SupervisorStrategy.defaultStrategy))))
|
||||
|
||||
|
|
|
|||
|
|
@ -30,8 +30,8 @@ class SupervisorMiscSpec extends AkkaSpec(SupervisorMiscSpec.config) with Defaul
|
|||
filterEvents(EventFilter[Exception]("Kill")) {
|
||||
val countDownLatch = new CountDownLatch(4)
|
||||
|
||||
val supervisor = system.actorOf(Props(new Supervisor(OneForOneStrategy(List(classOf[Exception]),
|
||||
maxNrOfRetries = 3, withinTimeRange = 5 seconds))))
|
||||
val supervisor = system.actorOf(Props(new Supervisor(
|
||||
OneForOneStrategy(maxNrOfRetries = 3, withinTimeRange = 5 seconds)(List(classOf[Exception])))))
|
||||
|
||||
val workerProps = Props(new Actor {
|
||||
override def postRestart(cause: Throwable) { countDownLatch.countDown() }
|
||||
|
|
|
|||
|
|
@ -54,7 +54,7 @@ object SupervisorSpec {
|
|||
|
||||
var s: ActorRef = _
|
||||
|
||||
override val supervisorStrategy = OneForOneStrategy(List(classOf[Exception]), maxNrOfRetries = 0)
|
||||
override val supervisorStrategy = OneForOneStrategy(maxNrOfRetries = 0)(List(classOf[Exception]))
|
||||
|
||||
def receive = {
|
||||
case Die ⇒ temp forward Die
|
||||
|
|
@ -78,51 +78,51 @@ class SupervisorSpec extends AkkaSpec with BeforeAndAfterEach with ImplicitSende
|
|||
private def child(supervisor: ActorRef, props: Props): ActorRef = Await.result((supervisor ? props).mapTo[ActorRef], timeout.duration)
|
||||
|
||||
def temporaryActorAllForOne = {
|
||||
val supervisor = system.actorOf(Props(new Supervisor(AllForOneStrategy(List(classOf[Exception]), maxNrOfRetries = 0))))
|
||||
val supervisor = system.actorOf(Props(new Supervisor(AllForOneStrategy(maxNrOfRetries = 0)(List(classOf[Exception])))))
|
||||
val temporaryActor = child(supervisor, Props(new PingPongActor(testActor)))
|
||||
|
||||
(temporaryActor, supervisor)
|
||||
}
|
||||
|
||||
def singleActorAllForOne = {
|
||||
val supervisor = system.actorOf(Props(new Supervisor(AllForOneStrategy(List(classOf[Exception]),
|
||||
maxNrOfRetries = 3, withinTimeRange = DilatedTimeout))))
|
||||
val supervisor = system.actorOf(Props(new Supervisor(
|
||||
AllForOneStrategy(maxNrOfRetries = 3, withinTimeRange = DilatedTimeout)(List(classOf[Exception])))))
|
||||
val pingpong = child(supervisor, Props(new PingPongActor(testActor)))
|
||||
|
||||
(pingpong, supervisor)
|
||||
}
|
||||
|
||||
def singleActorOneForOne = {
|
||||
val supervisor = system.actorOf(Props(new Supervisor(OneForOneStrategy(List(classOf[Exception]),
|
||||
maxNrOfRetries = 3, withinTimeRange = DilatedTimeout))))
|
||||
val supervisor = system.actorOf(Props(new Supervisor(
|
||||
OneForOneStrategy(maxNrOfRetries = 3, withinTimeRange = DilatedTimeout)(List(classOf[Exception])))))
|
||||
val pingpong = child(supervisor, Props(new PingPongActor(testActor)))
|
||||
|
||||
(pingpong, supervisor)
|
||||
}
|
||||
|
||||
def multipleActorsAllForOne = {
|
||||
val supervisor = system.actorOf(Props(new Supervisor(AllForOneStrategy(List(classOf[Exception]),
|
||||
maxNrOfRetries = 3, withinTimeRange = DilatedTimeout))))
|
||||
val supervisor = system.actorOf(Props(new Supervisor(
|
||||
AllForOneStrategy(maxNrOfRetries = 3, withinTimeRange = DilatedTimeout)(List(classOf[Exception])))))
|
||||
val pingpong1, pingpong2, pingpong3 = child(supervisor, Props(new PingPongActor(testActor)))
|
||||
|
||||
(pingpong1, pingpong2, pingpong3, supervisor)
|
||||
}
|
||||
|
||||
def multipleActorsOneForOne = {
|
||||
val supervisor = system.actorOf(Props(new Supervisor(OneForOneStrategy(List(classOf[Exception]),
|
||||
maxNrOfRetries = 3, withinTimeRange = DilatedTimeout))))
|
||||
val supervisor = system.actorOf(Props(new Supervisor(
|
||||
OneForOneStrategy(maxNrOfRetries = 3, withinTimeRange = DilatedTimeout)(List(classOf[Exception])))))
|
||||
val pingpong1, pingpong2, pingpong3 = child(supervisor, Props(new PingPongActor(testActor)))
|
||||
|
||||
(pingpong1, pingpong2, pingpong3, supervisor)
|
||||
}
|
||||
|
||||
def nestedSupervisorsAllForOne = {
|
||||
val topSupervisor = system.actorOf(Props(new Supervisor(AllForOneStrategy(List(classOf[Exception]),
|
||||
maxNrOfRetries = 3, withinTimeRange = DilatedTimeout))))
|
||||
val topSupervisor = system.actorOf(Props(new Supervisor(
|
||||
AllForOneStrategy(maxNrOfRetries = 3, withinTimeRange = DilatedTimeout)(List(classOf[Exception])))))
|
||||
val pingpong1 = child(topSupervisor, Props(new PingPongActor(testActor)))
|
||||
|
||||
val middleSupervisor = child(topSupervisor, Props(new Supervisor(AllForOneStrategy(Nil,
|
||||
maxNrOfRetries = 3, withinTimeRange = DilatedTimeout))))
|
||||
val middleSupervisor = child(topSupervisor, Props(new Supervisor(
|
||||
AllForOneStrategy(maxNrOfRetries = 3, withinTimeRange = DilatedTimeout)(Nil))))
|
||||
val pingpong2, pingpong3 = child(middleSupervisor, Props(new PingPongActor(testActor)))
|
||||
|
||||
(pingpong1, pingpong2, pingpong3, topSupervisor)
|
||||
|
|
@ -286,8 +286,8 @@ class SupervisorSpec extends AkkaSpec with BeforeAndAfterEach with ImplicitSende
|
|||
|
||||
"must attempt restart when exception during restart" in {
|
||||
val inits = new AtomicInteger(0)
|
||||
val supervisor = system.actorOf(Props(new Supervisor(OneForOneStrategy(classOf[Exception] :: Nil,
|
||||
maxNrOfRetries = 3, withinTimeRange = 10 seconds))))
|
||||
val supervisor = system.actorOf(Props(new Supervisor(
|
||||
OneForOneStrategy(maxNrOfRetries = 3, withinTimeRange = 10 seconds)(classOf[Exception] :: Nil))))
|
||||
|
||||
val dyingProps = Props(new Actor {
|
||||
inits.incrementAndGet
|
||||
|
|
|
|||
|
|
@ -23,8 +23,7 @@ class SupervisorTreeSpec extends AkkaSpec with ImplicitSender with DefaultTimeou
|
|||
EventFilter[ActorKilledException](occurrences = 1) intercept {
|
||||
within(5 seconds) {
|
||||
val p = Props(new Actor {
|
||||
override val supervisorStrategy = OneForOneStrategy(List(classOf[Exception]),
|
||||
maxNrOfRetries = 3, withinTimeRange = 1 second)
|
||||
override val supervisorStrategy = OneForOneStrategy(maxNrOfRetries = 3, withinTimeRange = 1 second)(List(classOf[Exception]))
|
||||
def receive = {
|
||||
case p: Props ⇒ sender ! context.actorOf(p)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -26,7 +26,8 @@ class Ticket669Spec extends AkkaSpec with BeforeAndAfterAll with ImplicitSender
|
|||
"A supervised actor with lifecycle PERMANENT" should {
|
||||
"be able to reply on failure during preRestart" in {
|
||||
filterEvents(EventFilter[Exception]("test", occurrences = 1)) {
|
||||
val supervisor = system.actorOf(Props(new Supervisor(AllForOneStrategy(List(classOf[Exception]), 5, 10 seconds))))
|
||||
val supervisor = system.actorOf(Props(new Supervisor(
|
||||
AllForOneStrategy(5, 10 seconds)(List(classOf[Exception])))))
|
||||
val supervised = Await.result((supervisor ? Props[Supervised]).mapTo[ActorRef], timeout.duration)
|
||||
|
||||
supervised.!("test")(testActor)
|
||||
|
|
@ -37,8 +38,8 @@ class Ticket669Spec extends AkkaSpec with BeforeAndAfterAll with ImplicitSender
|
|||
|
||||
"be able to reply on failure during postStop" in {
|
||||
filterEvents(EventFilter[Exception]("test", occurrences = 1)) {
|
||||
val supervisor = system.actorOf(Props(new Supervisor(AllForOneStrategy(List(classOf[Exception]),
|
||||
maxNrOfRetries = 0))))
|
||||
val supervisor = system.actorOf(Props(new Supervisor(
|
||||
AllForOneStrategy(maxNrOfRetries = 0)(List(classOf[Exception])))))
|
||||
val supervised = Await.result((supervisor ? Props[Supervised]).mapTo[ActorRef], timeout.duration)
|
||||
|
||||
supervised.!("test")(testActor)
|
||||
|
|
|
|||
|
|
@ -300,7 +300,7 @@ class TypedActorSpec extends AkkaSpec(TypedActorSpec.config)
|
|||
"be able to handle exceptions when calling methods" in {
|
||||
filterEvents(EventFilter[IllegalStateException]("expected")) {
|
||||
val boss = system.actorOf(Props(new Actor {
|
||||
override val supervisorStrategy = OneForOneStrategy {
|
||||
override val supervisorStrategy = OneForOneStrategy() {
|
||||
case e: IllegalStateException if e.getMessage == "expected" ⇒ SupervisorStrategy.Resume
|
||||
}
|
||||
def receive = {
|
||||
|
|
|
|||
|
|
@ -15,8 +15,8 @@ import akka.actor._
|
|||
|
||||
object LoggingReceiveSpec {
|
||||
class TestLogActor extends Actor {
|
||||
override val supervisorStrategy = OneForOneStrategy(List(classOf[Throwable]),
|
||||
maxNrOfRetries = 5, withinTimeRange = 5 seconds)
|
||||
override val supervisorStrategy =
|
||||
OneForOneStrategy(maxNrOfRetries = 5, withinTimeRange = 5 seconds)(List(classOf[Throwable]))
|
||||
def receive = { case _ ⇒ }
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -142,12 +142,12 @@ object Actor {
|
|||
* {{{
|
||||
* class ExampleActor extends Actor {
|
||||
*
|
||||
* override val supervisorStrategy = OneForOneStrategy({
|
||||
* override val supervisorStrategy = OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange = 1 minute) {
|
||||
* case _: ArithmeticException ⇒ Resume
|
||||
* case _: NullPointerException ⇒ Restart
|
||||
* case _: IllegalArgumentException ⇒ Stop
|
||||
* case _: Exception ⇒ Escalate
|
||||
* }: Decider, maxNrOfRetries = 10, withinTimeRange = 1 minute))
|
||||
* }
|
||||
*
|
||||
* def receive = {
|
||||
* // directly calculated reply
|
||||
|
|
|
|||
|
|
@ -355,7 +355,7 @@ class LocalActorRefProvider(
|
|||
|
||||
override val supervisorStrategy = {
|
||||
import akka.actor.SupervisorStrategy._
|
||||
OneForOneStrategy {
|
||||
OneForOneStrategy() {
|
||||
case _: ActorKilledException ⇒ Stop
|
||||
case _: ActorInitializationException ⇒ Stop
|
||||
case _: Exception ⇒ Restart
|
||||
|
|
|
|||
|
|
@ -110,29 +110,35 @@ object SupervisorStrategy {
|
|||
case _: Exception ⇒ Restart
|
||||
case _ ⇒ Escalate
|
||||
}
|
||||
OneForOneStrategy(defaultDecider)
|
||||
OneForOneStrategy()(defaultDecider)
|
||||
}
|
||||
|
||||
/**
|
||||
* Implicit conversion from `Seq` of Throwables to a `Decider`.
|
||||
* This maps the given Throwables to restarts, otherwise escalates.
|
||||
*/
|
||||
implicit def seqThrowable2Decider(trapExit: Seq[Class[_ <: Throwable]]): Decider = makeDecider(trapExit)
|
||||
|
||||
type Decider = PartialFunction[Throwable, Action]
|
||||
type JDecider = akka.japi.Function[Throwable, Action]
|
||||
type CauseAction = (Class[_ <: Throwable], Action)
|
||||
|
||||
/**
|
||||
* Backwards compatible Decider builder which just checks whether one of
|
||||
* Decider builder which just checks whether one of
|
||||
* the given Throwables matches the cause and restarts, otherwise escalates.
|
||||
*/
|
||||
def makeDecider(trapExit: Array[Class[_ <: Throwable]]): Decider =
|
||||
{ case x ⇒ if (trapExit exists (_ isInstance x)) Restart else Escalate }
|
||||
|
||||
/**
|
||||
* Backwards compatible Decider builder which just checks whether one of
|
||||
* Decider builder which just checks whether one of
|
||||
* the given Throwables matches the cause and restarts, otherwise escalates.
|
||||
*/
|
||||
def makeDecider(trapExit: Seq[Class[_ <: Throwable]]): Decider =
|
||||
{ case x ⇒ if (trapExit exists (_ isInstance x)) Restart else Escalate }
|
||||
|
||||
/**
|
||||
* Backwards compatible Decider builder which just checks whether one of
|
||||
* Decider builder which just checks whether one of
|
||||
* the given Throwables matches the cause and restarts, otherwise escalates.
|
||||
*/
|
||||
def makeDecider(trapExit: JIterable[Class[_ <: Throwable]]): Decider = makeDecider(trapExit.toSeq)
|
||||
|
|
@ -209,35 +215,27 @@ abstract class SupervisorStrategy {
|
|||
case Escalate ⇒ false
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
object AllForOneStrategy {
|
||||
def apply(trapExit: Seq[Class[_ <: Throwable]], maxNrOfRetries: Int, withinTimeRange: Duration): AllForOneStrategy =
|
||||
new AllForOneStrategy(SupervisorStrategy.makeDecider(trapExit), maxNrOfRetries, withinTimeRange)
|
||||
def apply(trapExit: Seq[Class[_ <: Throwable]], maxNrOfRetries: Int): AllForOneStrategy =
|
||||
apply(trapExit, maxNrOfRetries, Duration.Inf)
|
||||
def apply(trapExit: Seq[Class[_ <: Throwable]], withinTimeRange: Duration): AllForOneStrategy =
|
||||
apply(trapExit, -1, withinTimeRange)
|
||||
}
|
||||
|
||||
/**
|
||||
* Restart all actors linked to the same supervisor when one fails,
|
||||
* trapExit = which Throwables should be intercepted
|
||||
* maxNrOfRetries = the number of times an actor is allowed to be restarted
|
||||
* withinTimeRange = duration of the time window for maxNrOfRetries, Duration.Inf means no window
|
||||
* @param maxNrOfRetries the number of times an actor is allowed to be restarted
|
||||
* @param withinTimeRange duration of the time window for maxNrOfRetries, Duration.Inf means no window
|
||||
* @param decider = mapping from Throwable to [[akka.actor.SupervisorStrategy.Action]], you can also use a
|
||||
* `Seq` of Throwables which maps the given Throwables to restarts, otherwise escalates.
|
||||
*/
|
||||
case class AllForOneStrategy(decider: SupervisorStrategy.Decider,
|
||||
maxNrOfRetries: Int = -1,
|
||||
withinTimeRange: Duration = Duration.Inf) extends SupervisorStrategy {
|
||||
case class AllForOneStrategy(maxNrOfRetries: Int = -1, withinTimeRange: Duration = Duration.Inf)(val decider: SupervisorStrategy.Decider)
|
||||
extends SupervisorStrategy {
|
||||
|
||||
def this(decider: SupervisorStrategy.JDecider, maxNrOfRetries: Int, withinTimeRange: Duration) =
|
||||
this(SupervisorStrategy.makeDecider(decider), maxNrOfRetries, withinTimeRange)
|
||||
def this(maxNrOfRetries: Int, withinTimeRange: Duration, decider: SupervisorStrategy.JDecider) =
|
||||
this(maxNrOfRetries, withinTimeRange)(SupervisorStrategy.makeDecider(decider))
|
||||
|
||||
def this(trapExit: JIterable[Class[_ <: Throwable]], maxNrOfRetries: Int, withinTimeRange: Duration) =
|
||||
this(SupervisorStrategy.makeDecider(trapExit), maxNrOfRetries, withinTimeRange)
|
||||
def this(maxNrOfRetries: Int, withinTimeRange: Duration, trapExit: JIterable[Class[_ <: Throwable]]) =
|
||||
this(maxNrOfRetries, withinTimeRange)(SupervisorStrategy.makeDecider(trapExit))
|
||||
|
||||
def this(trapExit: Array[Class[_ <: Throwable]], maxNrOfRetries: Int, withinTimeRange: Duration) =
|
||||
this(SupervisorStrategy.makeDecider(trapExit), maxNrOfRetries, withinTimeRange)
|
||||
def this(maxNrOfRetries: Int, withinTimeRange: Duration, trapExit: Array[Class[_ <: Throwable]]) =
|
||||
this(maxNrOfRetries, withinTimeRange)(SupervisorStrategy.makeDecider(trapExit))
|
||||
|
||||
/*
|
||||
* this is a performance optimization to avoid re-allocating the pairs upon
|
||||
|
|
@ -263,33 +261,24 @@ case class AllForOneStrategy(decider: SupervisorStrategy.Decider,
|
|||
}
|
||||
}
|
||||
|
||||
object OneForOneStrategy {
|
||||
def apply(trapExit: Seq[Class[_ <: Throwable]], maxNrOfRetries: Int, withinTimeRange: Duration): OneForOneStrategy =
|
||||
new OneForOneStrategy(SupervisorStrategy.makeDecider(trapExit), maxNrOfRetries, withinTimeRange)
|
||||
def apply(trapExit: Seq[Class[_ <: Throwable]], maxNrOfRetries: Int): OneForOneStrategy =
|
||||
apply(trapExit, maxNrOfRetries, Duration.Inf)
|
||||
def apply(trapExit: Seq[Class[_ <: Throwable]], withinTimeRange: Duration): OneForOneStrategy =
|
||||
apply(trapExit, -1, withinTimeRange)
|
||||
}
|
||||
|
||||
/**
|
||||
* Restart an actor when it fails
|
||||
* trapExit = which Throwables should be intercepted
|
||||
* maxNrOfRetries = the number of times an actor is allowed to be restarted
|
||||
* withinTimeRange = duration of the time window for maxNrOfRetries, Duration.Inf means no window
|
||||
* @param maxNrOfRetries the number of times an actor is allowed to be restarted
|
||||
* @param withinTimeRange duration of the time window for maxNrOfRetries, Duration.Inf means no window
|
||||
* @param decider = mapping from Throwable to [[akka.actor.SupervisorStrategy.Action]], you can also use a
|
||||
* `Seq` of Throwables which maps the given Throwables to restarts, otherwise escalates.
|
||||
*/
|
||||
case class OneForOneStrategy(decider: SupervisorStrategy.Decider,
|
||||
maxNrOfRetries: Int = -1,
|
||||
withinTimeRange: Duration = Duration.Inf) extends SupervisorStrategy {
|
||||
case class OneForOneStrategy(maxNrOfRetries: Int = -1, withinTimeRange: Duration = Duration.Inf)(val decider: SupervisorStrategy.Decider)
|
||||
extends SupervisorStrategy {
|
||||
|
||||
def this(decider: SupervisorStrategy.JDecider, maxNrOfRetries: Int, withinTimeRange: Duration) =
|
||||
this(SupervisorStrategy.makeDecider(decider), maxNrOfRetries, withinTimeRange)
|
||||
def this(maxNrOfRetries: Int, withinTimeRange: Duration, decider: SupervisorStrategy.JDecider) =
|
||||
this(maxNrOfRetries, withinTimeRange)(SupervisorStrategy.makeDecider(decider))
|
||||
|
||||
def this(trapExit: JIterable[Class[_ <: Throwable]], maxNrOfRetries: Int, withinTimeRange: Duration) =
|
||||
this(SupervisorStrategy.makeDecider(trapExit), maxNrOfRetries, withinTimeRange)
|
||||
def this(maxNrOfRetries: Int, withinTimeRange: Duration, trapExit: JIterable[Class[_ <: Throwable]]) =
|
||||
this(maxNrOfRetries, withinTimeRange)(SupervisorStrategy.makeDecider(trapExit))
|
||||
|
||||
def this(trapExit: Array[Class[_ <: Throwable]], maxNrOfRetries: Int, withinTimeRange: Duration) =
|
||||
this(SupervisorStrategy.makeDecider(trapExit), maxNrOfRetries, withinTimeRange)
|
||||
def this(maxNrOfRetries: Int, withinTimeRange: Duration, trapExit: Array[Class[_ <: Throwable]]) =
|
||||
this(maxNrOfRetries, withinTimeRange)(SupervisorStrategy.makeDecider(trapExit))
|
||||
|
||||
/*
|
||||
* this is a performance optimization to avoid re-allocating the pairs upon
|
||||
|
|
|
|||
|
|
@ -37,20 +37,21 @@ import akka.dispatch.{ MessageDispatcher, Promise }
|
|||
* }
|
||||
* }
|
||||
*
|
||||
* private static SupervisorStrategy strategy = new OneForOneStrategy(new Function<Throwable, Action>() {
|
||||
* @Override
|
||||
* public Action apply(Throwable t) {
|
||||
* if (t instanceof ArithmeticException) {
|
||||
* return resume();
|
||||
* } else if (t instanceof NullPointerException) {
|
||||
* return restart();
|
||||
* } else if (t instanceof IllegalArgumentException) {
|
||||
* return stop();
|
||||
* } else {
|
||||
* return escalate();
|
||||
* private static SupervisorStrategy strategy = new OneForOneStrategy(10, Duration.parse("1 minute"),
|
||||
* new Function<Throwable, Action>() {
|
||||
* @Override
|
||||
* public Action apply(Throwable t) {
|
||||
* if (t instanceof ArithmeticException) {
|
||||
* return resume();
|
||||
* } else if (t instanceof NullPointerException) {
|
||||
* return restart();
|
||||
* } else if (t instanceof IllegalArgumentException) {
|
||||
* return stop();
|
||||
* } else {
|
||||
* return escalate();
|
||||
* }
|
||||
* }
|
||||
* }
|
||||
* }, 10, Duration.parse("1 minute");
|
||||
* });
|
||||
*
|
||||
* @Override
|
||||
* public SupervisorStrategy supervisorStrategy() {
|
||||
|
|
|
|||
|
|
@ -39,20 +39,21 @@ public class FaultHandlingTestBase {
|
|||
static public class Supervisor extends UntypedActor {
|
||||
|
||||
//#strategy
|
||||
private static SupervisorStrategy strategy = new OneForOneStrategy(new Function<Throwable, Action>() {
|
||||
@Override
|
||||
public Action apply(Throwable t) {
|
||||
if (t instanceof ArithmeticException) {
|
||||
return resume();
|
||||
} else if (t instanceof NullPointerException) {
|
||||
return restart();
|
||||
} else if (t instanceof IllegalArgumentException) {
|
||||
return stop();
|
||||
} else {
|
||||
return escalate();
|
||||
}
|
||||
}
|
||||
}, 10, Duration.parse("1 minute"));
|
||||
private static SupervisorStrategy strategy = new OneForOneStrategy(10, Duration.parse("1 minute"),
|
||||
new Function<Throwable, Action>() {
|
||||
@Override
|
||||
public Action apply(Throwable t) {
|
||||
if (t instanceof ArithmeticException) {
|
||||
return resume();
|
||||
} else if (t instanceof NullPointerException) {
|
||||
return restart();
|
||||
} else if (t instanceof IllegalArgumentException) {
|
||||
return stop();
|
||||
} else {
|
||||
return escalate();
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
@Override
|
||||
public SupervisorStrategy supervisorStrategy() {
|
||||
|
|
@ -76,20 +77,21 @@ public class FaultHandlingTestBase {
|
|||
static public class Supervisor2 extends UntypedActor {
|
||||
|
||||
//#strategy2
|
||||
private static SupervisorStrategy strategy = new OneForOneStrategy(new Function<Throwable, Action>() {
|
||||
@Override
|
||||
public Action apply(Throwable t) {
|
||||
if (t instanceof ArithmeticException) {
|
||||
return resume();
|
||||
} else if (t instanceof NullPointerException) {
|
||||
return restart();
|
||||
} else if (t instanceof IllegalArgumentException) {
|
||||
return stop();
|
||||
} else {
|
||||
return escalate();
|
||||
}
|
||||
}
|
||||
}, 10, Duration.parse("1 minute"));
|
||||
private static SupervisorStrategy strategy = new OneForOneStrategy(10, Duration.parse("1 minute"),
|
||||
new Function<Throwable, Action>() {
|
||||
@Override
|
||||
public Action apply(Throwable t) {
|
||||
if (t instanceof ArithmeticException) {
|
||||
return resume();
|
||||
} else if (t instanceof NullPointerException) {
|
||||
return restart();
|
||||
} else if (t instanceof IllegalArgumentException) {
|
||||
return stop();
|
||||
} else {
|
||||
return escalate();
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
@Override
|
||||
public SupervisorStrategy supervisorStrategy() {
|
||||
|
|
|
|||
|
|
@ -467,7 +467,7 @@ v1.3::
|
|||
|
||||
val supervisor = Supervisor(
|
||||
SupervisorConfig(
|
||||
AllForOneStrategy(List(classOf[Exception]), 3, 1000),
|
||||
OneForOneStrategy(List(classOf[Exception]), 3, 1000),
|
||||
Supervise(
|
||||
actorOf[MyActor1],
|
||||
Permanent) ::
|
||||
|
|
@ -479,12 +479,12 @@ v1.3::
|
|||
v2.0::
|
||||
|
||||
class MyActor extends Actor {
|
||||
override val supervisorStrategy = OneForOneStrategy({
|
||||
override val supervisorStrategy = OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange = 1 minute) {
|
||||
case _: ArithmeticException ⇒ Resume
|
||||
case _: NullPointerException ⇒ Restart
|
||||
case _: IllegalArgumentException ⇒ Stop
|
||||
case _: Exception ⇒ Escalate
|
||||
}: Decider, maxNrOfRetries = 10, withinTimeRange = 1 minute)
|
||||
}
|
||||
|
||||
def receive = {
|
||||
case x =>
|
||||
|
|
|
|||
|
|
@ -22,12 +22,12 @@ object FaultHandlingDocSpec {
|
|||
import akka.actor.SupervisorStrategy._
|
||||
import akka.util.duration._
|
||||
|
||||
override val supervisorStrategy = OneForOneStrategy({
|
||||
override val supervisorStrategy = OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange = 1 minute) {
|
||||
case _: ArithmeticException ⇒ Resume
|
||||
case _: NullPointerException ⇒ Restart
|
||||
case _: IllegalArgumentException ⇒ Stop
|
||||
case _: Exception ⇒ Escalate
|
||||
}: Decider, maxNrOfRetries = 10, withinTimeRange = 1 minute)
|
||||
}
|
||||
//#strategy
|
||||
|
||||
def receive = {
|
||||
|
|
@ -43,12 +43,12 @@ object FaultHandlingDocSpec {
|
|||
import akka.actor.SupervisorStrategy._
|
||||
import akka.util.duration._
|
||||
|
||||
override val supervisorStrategy = OneForOneStrategy({
|
||||
override val supervisorStrategy = OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange = 1 minute) {
|
||||
case _: ArithmeticException ⇒ Resume
|
||||
case _: NullPointerException ⇒ Restart
|
||||
case _: IllegalArgumentException ⇒ Stop
|
||||
case _: Exception ⇒ Escalate
|
||||
}: Decider, maxNrOfRetries = 10, withinTimeRange = 1 minute)
|
||||
}
|
||||
//#strategy2
|
||||
|
||||
def receive = {
|
||||
|
|
|
|||
|
|
@ -182,8 +182,8 @@ class TestActorRefSpec extends AkkaSpec("disp1.type=Dispatcher") with BeforeAndA
|
|||
override def postRestart(reason: Throwable) { counter -= 1 }
|
||||
}), self, "child")
|
||||
|
||||
override def supervisorStrategy = OneForOneStrategy(List(classOf[ActorKilledException]),
|
||||
maxNrOfRetries = 5, withinTimeRange = 1 second)
|
||||
override def supervisorStrategy =
|
||||
OneForOneStrategy(maxNrOfRetries = 5, withinTimeRange = 1 second)(List(classOf[ActorKilledException]))
|
||||
|
||||
def receiveT = { case "sendKill" ⇒ ref ! Kill }
|
||||
}))
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue