Merge pull request #247 from jboner/wip-1711-supervisorStrategy-patriknw
FaultHandler as method in Actor instead of in Props. See #1711
This commit is contained in:
commit
d21d03207f
34 changed files with 471 additions and 345 deletions
|
|
@ -80,7 +80,7 @@ class ActorFireForgetRequestReplySpec extends AkkaSpec with BeforeAndAfterEach w
|
|||
|
||||
"should shutdown crashed temporary actor" in {
|
||||
filterEvents(EventFilter[Exception]("Expected exception")) {
|
||||
val supervisor = system.actorOf(Props[Supervisor].withFaultHandler(OneForOneStrategy(List(classOf[Exception]), Some(0))))
|
||||
val supervisor = system.actorOf(Props(new Supervisor(OneForOneStrategy(List(classOf[Exception]), Some(0)))))
|
||||
val actor = Await.result((supervisor ? Props[CrashingActor]).mapTo[ActorRef], timeout.duration)
|
||||
actor.isTerminated must be(false)
|
||||
actor ! "Die"
|
||||
|
|
|
|||
|
|
@ -35,7 +35,7 @@ class ActorLifeCycleSpec extends AkkaSpec with BeforeAndAfterEach with ImplicitS
|
|||
"invoke preRestart, preStart, postRestart when using OneForOneStrategy" in {
|
||||
filterException[ActorKilledException] {
|
||||
val id = newUuid().toString
|
||||
val supervisor = system.actorOf(Props[Supervisor].withFaultHandler(OneForOneStrategy(List(classOf[Exception]), Some(3))))
|
||||
val supervisor = system.actorOf(Props(new Supervisor(OneForOneStrategy(List(classOf[Exception]), Some(3)))))
|
||||
val gen = new AtomicInteger(0)
|
||||
val restarterProps = Props(new LifeCycleTestActor(testActor, id, gen) {
|
||||
override def preRestart(reason: Throwable, message: Option[Any]) { report("preRestart") }
|
||||
|
|
@ -69,7 +69,7 @@ class ActorLifeCycleSpec extends AkkaSpec with BeforeAndAfterEach with ImplicitS
|
|||
"default for preRestart and postRestart is to call postStop and preStart respectively" in {
|
||||
filterException[ActorKilledException] {
|
||||
val id = newUuid().toString
|
||||
val supervisor = system.actorOf(Props[Supervisor].withFaultHandler(OneForOneStrategy(List(classOf[Exception]), Some(3))))
|
||||
val supervisor = system.actorOf(Props(new Supervisor(OneForOneStrategy(List(classOf[Exception]), Some(3)))))
|
||||
val gen = new AtomicInteger(0)
|
||||
val restarterProps = Props(new LifeCycleTestActor(testActor, id, gen))
|
||||
val restarter = Await.result((supervisor ? restarterProps).mapTo[ActorRef], timeout.duration)
|
||||
|
|
@ -99,7 +99,7 @@ class ActorLifeCycleSpec extends AkkaSpec with BeforeAndAfterEach with ImplicitS
|
|||
|
||||
"not invoke preRestart and postRestart when never restarted using OneForOneStrategy" in {
|
||||
val id = newUuid().toString
|
||||
val supervisor = system.actorOf(Props[Supervisor].withFaultHandler(OneForOneStrategy(List(classOf[Exception]), Some(3))))
|
||||
val supervisor = system.actorOf(Props(new Supervisor(OneForOneStrategy(List(classOf[Exception]), Some(3)))))
|
||||
val gen = new AtomicInteger(0)
|
||||
val props = Props(new LifeCycleTestActor(testActor, id, gen))
|
||||
val a = Await.result((supervisor ? props).mapTo[ActorRef], timeout.duration)
|
||||
|
|
|
|||
|
|
@ -374,6 +374,8 @@ class ActorRefSpec extends AkkaSpec with DefaultTimeout {
|
|||
|
||||
val boss = system.actorOf(Props(new Actor {
|
||||
|
||||
override val supervisorStrategy = OneForOneStrategy(List(classOf[Throwable]), 2, 1000)
|
||||
|
||||
val ref = context.actorOf(
|
||||
Props(new Actor {
|
||||
def receive = { case _ ⇒ }
|
||||
|
|
@ -382,7 +384,7 @@ class ActorRefSpec extends AkkaSpec with DefaultTimeout {
|
|||
}))
|
||||
|
||||
protected def receive = { case "sendKill" ⇒ ref ! Kill }
|
||||
}).withFaultHandler(OneForOneStrategy(List(classOf[Throwable]), 2, 1000)))
|
||||
}))
|
||||
|
||||
boss ! "sendKill"
|
||||
Await.ready(latch, 5 seconds)
|
||||
|
|
|
|||
|
|
@ -23,7 +23,7 @@ trait DeathWatchSpec { this: AkkaSpec with ImplicitSender with DefaultTimeout
|
|||
|
||||
import DeathWatchSpec._
|
||||
|
||||
lazy val supervisor = system.actorOf(Props[Supervisor], "watchers")
|
||||
lazy val supervisor = system.actorOf(Props(new Supervisor(SupervisorStrategy.defaultStrategy)), "watchers")
|
||||
|
||||
def startWatching(target: ActorRef) = Await.result((supervisor ? props(target, testActor)).mapTo[ActorRef], 3 seconds)
|
||||
|
||||
|
|
@ -94,7 +94,7 @@ trait DeathWatchSpec { this: AkkaSpec with ImplicitSender with DefaultTimeout
|
|||
|
||||
"notify with a Terminated message once when an Actor is stopped but not when restarted" in {
|
||||
filterException[ActorKilledException] {
|
||||
val supervisor = system.actorOf(Props[Supervisor].withFaultHandler(OneForOneStrategy(List(classOf[Exception]), Some(2))))
|
||||
val supervisor = system.actorOf(Props(new Supervisor(OneForOneStrategy(List(classOf[Exception]), Some(2)))))
|
||||
val terminalProps = Props(context ⇒ { case x ⇒ context.sender ! x })
|
||||
val terminal = Await.result((supervisor ? terminalProps).mapTo[ActorRef], timeout.duration)
|
||||
|
||||
|
|
@ -115,13 +115,13 @@ trait DeathWatchSpec { this: AkkaSpec with ImplicitSender with DefaultTimeout
|
|||
"fail a monitor which does not handle Terminated()" in {
|
||||
filterEvents(EventFilter[ActorKilledException](), EventFilter[DeathPactException]()) {
|
||||
case class FF(fail: Failed)
|
||||
val supervisor = system.actorOf(Props[Supervisor]
|
||||
.withFaultHandler(new OneForOneStrategy(FaultHandlingStrategy.makeDecider(List(classOf[Exception])), Some(0)) {
|
||||
val strategy = new OneForOneStrategy(SupervisorStrategy.makeDecider(List(classOf[Exception])), Some(0)) {
|
||||
override def handleFailure(context: ActorContext, child: ActorRef, cause: Throwable, stats: ChildRestartStats, children: Iterable[ChildRestartStats]) = {
|
||||
testActor.tell(FF(Failed(cause)), child)
|
||||
super.handleFailure(context, child, cause, stats, children)
|
||||
}
|
||||
}))
|
||||
}
|
||||
val supervisor = system.actorOf(Props(new Supervisor(strategy)))
|
||||
|
||||
val failed = Await.result((supervisor ? Props.empty).mapTo[ActorRef], timeout.duration)
|
||||
val brother = Await.result((supervisor ? Props(new Actor {
|
||||
|
|
|
|||
|
|
@ -72,8 +72,9 @@ class FSMTransitionSpec extends AkkaSpec with ImplicitSender {
|
|||
val fsm = system.actorOf(Props(new MyFSM(testActor)))
|
||||
val sup = system.actorOf(Props(new Actor {
|
||||
context.watch(fsm)
|
||||
override val supervisorStrategy = OneForOneStrategy(List(classOf[Throwable]), None, None)
|
||||
def receive = { case _ ⇒ }
|
||||
}).withFaultHandler(OneForOneStrategy(List(classOf[Throwable]), None, None)))
|
||||
}))
|
||||
|
||||
within(300 millis) {
|
||||
fsm ! SubscribeTransitionCallBack(forward)
|
||||
|
|
|
|||
|
|
@ -28,7 +28,7 @@ class RestartStrategySpec extends AkkaSpec with DefaultTimeout {
|
|||
"A RestartStrategy" must {
|
||||
|
||||
"ensure that slave stays dead after max restarts within time range" in {
|
||||
val boss = system.actorOf(Props[Supervisor].withFaultHandler(OneForOneStrategy(List(classOf[Throwable]), 2, 1000)))
|
||||
val boss = system.actorOf(Props(new Supervisor(OneForOneStrategy(List(classOf[Throwable]), 2, 1000))))
|
||||
|
||||
val restartLatch = new TestLatch
|
||||
val secondRestartLatch = new TestLatch
|
||||
|
|
@ -74,7 +74,7 @@ class RestartStrategySpec extends AkkaSpec with DefaultTimeout {
|
|||
}
|
||||
|
||||
"ensure that slave is immortal without max restarts and time range" in {
|
||||
val boss = system.actorOf(Props[Supervisor].withFaultHandler(OneForOneStrategy(List(classOf[Throwable]), None, None)))
|
||||
val boss = system.actorOf(Props(new Supervisor(OneForOneStrategy(List(classOf[Throwable]), None, None))))
|
||||
|
||||
val countDownLatch = new TestLatch(100)
|
||||
|
||||
|
|
@ -96,7 +96,7 @@ class RestartStrategySpec extends AkkaSpec with DefaultTimeout {
|
|||
}
|
||||
|
||||
"ensure that slave restarts after number of crashes not within time range" in {
|
||||
val boss = system.actorOf(Props[Supervisor].withFaultHandler(OneForOneStrategy(List(classOf[Throwable]), 2, 500)))
|
||||
val boss = system.actorOf(Props(new Supervisor(OneForOneStrategy(List(classOf[Throwable]), 2, 500))))
|
||||
|
||||
val restartLatch = new TestLatch
|
||||
val secondRestartLatch = new TestLatch
|
||||
|
|
@ -153,7 +153,7 @@ class RestartStrategySpec extends AkkaSpec with DefaultTimeout {
|
|||
}
|
||||
|
||||
"ensure that slave is not restarted after max retries" in {
|
||||
val boss = system.actorOf(Props[Supervisor].withFaultHandler(OneForOneStrategy(List(classOf[Throwable]), Some(2), None)))
|
||||
val boss = system.actorOf(Props(new Supervisor(OneForOneStrategy(List(classOf[Throwable]), Some(2), None))))
|
||||
|
||||
val restartLatch = new TestLatch
|
||||
val secondRestartLatch = new TestLatch
|
||||
|
|
@ -208,11 +208,12 @@ class RestartStrategySpec extends AkkaSpec with DefaultTimeout {
|
|||
val countDownLatch = new TestLatch(2)
|
||||
|
||||
val boss = system.actorOf(Props(new Actor {
|
||||
override val supervisorStrategy = OneForOneStrategy(List(classOf[Throwable]), None, Some(1000))
|
||||
def receive = {
|
||||
case p: Props ⇒ sender ! context.watch(context.actorOf(p))
|
||||
case t: Terminated ⇒ maxNoOfRestartsLatch.open()
|
||||
}
|
||||
}).withFaultHandler(OneForOneStrategy(List(classOf[Throwable]), None, Some(1000))))
|
||||
}))
|
||||
|
||||
val slaveProps = Props(new Actor {
|
||||
|
||||
|
|
|
|||
|
|
@ -133,7 +133,7 @@ class SchedulerSpec extends AkkaSpec with BeforeAndAfterEach with DefaultTimeout
|
|||
val restartLatch = new TestLatch
|
||||
val pingLatch = new TestLatch(6)
|
||||
|
||||
val supervisor = system.actorOf(Props[Supervisor].withFaultHandler(AllForOneStrategy(List(classOf[Exception]), 3, 1000)))
|
||||
val supervisor = system.actorOf(Props(new Supervisor(AllForOneStrategy(List(classOf[Exception]), 3, 1000))))
|
||||
val props = Props(new Actor {
|
||||
def receive = {
|
||||
case Ping ⇒ pingLatch.countDown()
|
||||
|
|
|
|||
|
|
@ -3,7 +3,12 @@
|
|||
*/
|
||||
package akka.actor
|
||||
|
||||
class Supervisor extends Actor {
|
||||
/**
|
||||
* For testing Supervisor behavior, normally you don't supply the strategy
|
||||
* from the outside like this.
|
||||
*/
|
||||
class Supervisor(override val supervisorStrategy: SupervisorStrategy) extends Actor {
|
||||
|
||||
def receive = {
|
||||
case x: Props ⇒ sender ! context.actorOf(x)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -12,7 +12,12 @@ import akka.dispatch.Await
|
|||
object SupervisorHierarchySpec {
|
||||
class FireWorkerException(msg: String) extends Exception(msg)
|
||||
|
||||
class CountDownActor(countDown: CountDownLatch) extends Actor {
|
||||
/**
|
||||
* For testing Supervisor behavior, normally you don't supply the strategy
|
||||
* from the outside like this.
|
||||
*/
|
||||
class CountDownActor(countDown: CountDownLatch, override val supervisorStrategy: SupervisorStrategy) extends Actor {
|
||||
|
||||
protected def receive = {
|
||||
case p: Props ⇒ sender ! context.actorOf(p)
|
||||
}
|
||||
|
|
@ -33,12 +38,12 @@ class SupervisorHierarchySpec extends AkkaSpec with DefaultTimeout {
|
|||
"restart manager and workers in AllForOne" in {
|
||||
val countDown = new CountDownLatch(4)
|
||||
|
||||
val boss = system.actorOf(Props[Supervisor].withFaultHandler(OneForOneStrategy(List(classOf[Exception]), None, None)))
|
||||
val boss = system.actorOf(Props(new Supervisor(OneForOneStrategy(List(classOf[Exception]), None, None))))
|
||||
|
||||
val managerProps = Props(new CountDownActor(countDown)).withFaultHandler(AllForOneStrategy(List(), None, None))
|
||||
val managerProps = Props(new CountDownActor(countDown, AllForOneStrategy(List(), None, None)))
|
||||
val manager = Await.result((boss ? managerProps).mapTo[ActorRef], timeout.duration)
|
||||
|
||||
val workerProps = Props(new CountDownActor(countDown))
|
||||
val workerProps = Props(new CountDownActor(countDown, SupervisorStrategy.defaultStrategy))
|
||||
val workerOne, workerTwo, workerThree = Await.result((manager ? workerProps).mapTo[ActorRef], timeout.duration)
|
||||
|
||||
filterException[ActorKilledException] {
|
||||
|
|
@ -55,13 +60,15 @@ class SupervisorHierarchySpec extends AkkaSpec with DefaultTimeout {
|
|||
val countDownMessages = new CountDownLatch(1)
|
||||
val countDownMax = new CountDownLatch(1)
|
||||
val boss = system.actorOf(Props(new Actor {
|
||||
val crasher = context.watch(context.actorOf(Props(new CountDownActor(countDownMessages))))
|
||||
override val supervisorStrategy = OneForOneStrategy(List(classOf[Throwable]), 1, 5000)
|
||||
|
||||
val crasher = context.watch(context.actorOf(Props(new CountDownActor(countDownMessages, SupervisorStrategy.defaultStrategy))))
|
||||
|
||||
protected def receive = {
|
||||
case "killCrasher" ⇒ crasher ! Kill
|
||||
case Terminated(_) ⇒ countDownMax.countDown()
|
||||
}
|
||||
}).withFaultHandler(OneForOneStrategy(List(classOf[Throwable]), 1, 5000)))
|
||||
}))
|
||||
|
||||
filterException[ActorKilledException] {
|
||||
boss ! "killCrasher"
|
||||
|
|
|
|||
|
|
@ -28,7 +28,7 @@ class SupervisorMiscSpec extends AkkaSpec(SupervisorMiscSpec.config) with Defaul
|
|||
filterEvents(EventFilter[Exception]("Kill")) {
|
||||
val countDownLatch = new CountDownLatch(4)
|
||||
|
||||
val supervisor = system.actorOf(Props[Supervisor].withFaultHandler(OneForOneStrategy(List(classOf[Exception]), 3, 5000)))
|
||||
val supervisor = system.actorOf(Props(new Supervisor(OneForOneStrategy(List(classOf[Exception]), 3, 5000))))
|
||||
|
||||
val workerProps = Props(new Actor {
|
||||
override def postRestart(cause: Throwable) { countDownLatch.countDown() }
|
||||
|
|
|
|||
|
|
@ -53,6 +53,8 @@ object SupervisorSpec {
|
|||
|
||||
var s: ActorRef = _
|
||||
|
||||
override val supervisorStrategy = OneForOneStrategy(List(classOf[Exception]), Some(0))
|
||||
|
||||
def receive = {
|
||||
case Die ⇒ temp forward Die
|
||||
case Terminated(`temp`) ⇒ sendTo ! "terminated"
|
||||
|
|
@ -75,45 +77,45 @@ class SupervisorSpec extends AkkaSpec with BeforeAndAfterEach with ImplicitSende
|
|||
private def child(supervisor: ActorRef, props: Props): ActorRef = Await.result((supervisor ? props).mapTo[ActorRef], timeout.duration)
|
||||
|
||||
def temporaryActorAllForOne = {
|
||||
val supervisor = system.actorOf(Props[Supervisor].withFaultHandler(AllForOneStrategy(List(classOf[Exception]), Some(0))))
|
||||
val supervisor = system.actorOf(Props(new Supervisor(AllForOneStrategy(List(classOf[Exception]), Some(0)))))
|
||||
val temporaryActor = child(supervisor, Props(new PingPongActor(testActor)))
|
||||
|
||||
(temporaryActor, supervisor)
|
||||
}
|
||||
|
||||
def singleActorAllForOne = {
|
||||
val supervisor = system.actorOf(Props[Supervisor].withFaultHandler(AllForOneStrategy(List(classOf[Exception]), 3, TimeoutMillis)))
|
||||
val supervisor = system.actorOf(Props(new Supervisor(AllForOneStrategy(List(classOf[Exception]), 3, TimeoutMillis))))
|
||||
val pingpong = child(supervisor, Props(new PingPongActor(testActor)))
|
||||
|
||||
(pingpong, supervisor)
|
||||
}
|
||||
|
||||
def singleActorOneForOne = {
|
||||
val supervisor = system.actorOf(Props[Supervisor].withFaultHandler(OneForOneStrategy(List(classOf[Exception]), 3, TimeoutMillis)))
|
||||
val supervisor = system.actorOf(Props(new Supervisor(OneForOneStrategy(List(classOf[Exception]), 3, TimeoutMillis))))
|
||||
val pingpong = child(supervisor, Props(new PingPongActor(testActor)))
|
||||
|
||||
(pingpong, supervisor)
|
||||
}
|
||||
|
||||
def multipleActorsAllForOne = {
|
||||
val supervisor = system.actorOf(Props[Supervisor].withFaultHandler(AllForOneStrategy(List(classOf[Exception]), 3, TimeoutMillis)))
|
||||
val supervisor = system.actorOf(Props(new Supervisor(AllForOneStrategy(List(classOf[Exception]), 3, TimeoutMillis))))
|
||||
val pingpong1, pingpong2, pingpong3 = child(supervisor, Props(new PingPongActor(testActor)))
|
||||
|
||||
(pingpong1, pingpong2, pingpong3, supervisor)
|
||||
}
|
||||
|
||||
def multipleActorsOneForOne = {
|
||||
val supervisor = system.actorOf(Props[Supervisor].withFaultHandler(OneForOneStrategy(List(classOf[Exception]), 3, TimeoutMillis)))
|
||||
val supervisor = system.actorOf(Props(new Supervisor(OneForOneStrategy(List(classOf[Exception]), 3, TimeoutMillis))))
|
||||
val pingpong1, pingpong2, pingpong3 = child(supervisor, Props(new PingPongActor(testActor)))
|
||||
|
||||
(pingpong1, pingpong2, pingpong3, supervisor)
|
||||
}
|
||||
|
||||
def nestedSupervisorsAllForOne = {
|
||||
val topSupervisor = system.actorOf(Props[Supervisor].withFaultHandler(AllForOneStrategy(List(classOf[Exception]), 3, TimeoutMillis)))
|
||||
val topSupervisor = system.actorOf(Props(new Supervisor(AllForOneStrategy(List(classOf[Exception]), 3, TimeoutMillis))))
|
||||
val pingpong1 = child(topSupervisor, Props(new PingPongActor(testActor)))
|
||||
|
||||
val middleSupervisor = child(topSupervisor, Props[Supervisor].withFaultHandler(AllForOneStrategy(Nil, 3, TimeoutMillis)))
|
||||
val middleSupervisor = child(topSupervisor, Props(new Supervisor(AllForOneStrategy(Nil, 3, TimeoutMillis))))
|
||||
val pingpong2, pingpong3 = child(middleSupervisor, Props(new PingPongActor(testActor)))
|
||||
|
||||
(pingpong1, pingpong2, pingpong3, topSupervisor)
|
||||
|
|
@ -141,7 +143,7 @@ class SupervisorSpec extends AkkaSpec with BeforeAndAfterEach with ImplicitSende
|
|||
"A supervisor" must {
|
||||
|
||||
"not restart child more times than permitted" in {
|
||||
val master = system.actorOf(Props(new Master(testActor)).withFaultHandler(OneForOneStrategy(List(classOf[Exception]), Some(0))))
|
||||
val master = system.actorOf(Props(new Master(testActor)))
|
||||
|
||||
master ! Die
|
||||
expectMsg(3 seconds, "terminated")
|
||||
|
|
@ -277,7 +279,7 @@ class SupervisorSpec extends AkkaSpec with BeforeAndAfterEach with ImplicitSende
|
|||
|
||||
"must attempt restart when exception during restart" in {
|
||||
val inits = new AtomicInteger(0)
|
||||
val supervisor = system.actorOf(Props[Supervisor].withFaultHandler(OneForOneStrategy(classOf[Exception] :: Nil, 3, 10000)))
|
||||
val supervisor = system.actorOf(Props(new Supervisor(OneForOneStrategy(classOf[Exception] :: Nil, 3, 10000))))
|
||||
|
||||
val dyingProps = Props(new Actor {
|
||||
inits.incrementAndGet
|
||||
|
|
|
|||
|
|
@ -22,11 +22,12 @@ class SupervisorTreeSpec extends AkkaSpec with ImplicitSender with DefaultTimeou
|
|||
EventFilter[ActorKilledException](occurrences = 1) intercept {
|
||||
within(5 seconds) {
|
||||
val p = Props(new Actor {
|
||||
override val supervisorStrategy = OneForOneStrategy(List(classOf[Exception]), 3, 1000)
|
||||
def receive = {
|
||||
case p: Props ⇒ sender ! context.actorOf(p)
|
||||
}
|
||||
override def preRestart(cause: Throwable, msg: Option[Any]) { testActor ! self.path }
|
||||
}).withFaultHandler(OneForOneStrategy(List(classOf[Exception]), 3, 1000))
|
||||
})
|
||||
val headActor = system.actorOf(p)
|
||||
val middleActor = Await.result((headActor ? p).mapTo[ActorRef], timeout.duration)
|
||||
val lastActor = Await.result((middleActor ? p).mapTo[ActorRef], timeout.duration)
|
||||
|
|
|
|||
|
|
@ -24,7 +24,7 @@ class Ticket669Spec extends AkkaSpec with BeforeAndAfterAll with ImplicitSender
|
|||
"A supervised actor with lifecycle PERMANENT" should {
|
||||
"be able to reply on failure during preRestart" in {
|
||||
filterEvents(EventFilter[Exception]("test", occurrences = 1)) {
|
||||
val supervisor = system.actorOf(Props[Supervisor].withFaultHandler(AllForOneStrategy(List(classOf[Exception]), 5, 10000)))
|
||||
val supervisor = system.actorOf(Props(new Supervisor(AllForOneStrategy(List(classOf[Exception]), 5, 10000))))
|
||||
val supervised = Await.result((supervisor ? Props[Supervised]).mapTo[ActorRef], timeout.duration)
|
||||
|
||||
supervised.!("test")(testActor)
|
||||
|
|
@ -35,7 +35,7 @@ class Ticket669Spec extends AkkaSpec with BeforeAndAfterAll with ImplicitSender
|
|||
|
||||
"be able to reply on failure during postStop" in {
|
||||
filterEvents(EventFilter[Exception]("test", occurrences = 1)) {
|
||||
val supervisor = system.actorOf(Props[Supervisor].withFaultHandler(AllForOneStrategy(List(classOf[Exception]), Some(0), None)))
|
||||
val supervisor = system.actorOf(Props(new Supervisor(AllForOneStrategy(List(classOf[Exception]), Some(0), None))))
|
||||
val supervised = Await.result((supervisor ? Props[Supervised]).mapTo[ActorRef], timeout.duration)
|
||||
|
||||
supervised.!("test")(testActor)
|
||||
|
|
|
|||
|
|
@ -298,10 +298,13 @@ class TypedActorSpec extends AkkaSpec(TypedActorSpec.config)
|
|||
|
||||
"be able to handle exceptions when calling methods" in {
|
||||
filterEvents(EventFilter[IllegalStateException]("expected")) {
|
||||
val boss = system.actorOf(Props(context ⇒ {
|
||||
val boss = system.actorOf(Props(new Actor {
|
||||
override val supervisorStrategy = OneForOneStrategy {
|
||||
case e: IllegalStateException if e.getMessage == "expected" ⇒ SupervisorStrategy.Resume
|
||||
}
|
||||
def receive = {
|
||||
case p: TypedProps[_] ⇒ context.sender ! TypedActor(context).typedActorOf(p)
|
||||
}).withFaultHandler(OneForOneStrategy {
|
||||
case e: IllegalStateException if e.getMessage == "expected" ⇒ FaultHandlingStrategy.Resume
|
||||
}
|
||||
}))
|
||||
val t = Await.result((boss ? TypedProps[Bar](classOf[Foo], classOf[Bar]).withTimeout(2 seconds)).mapTo[Foo], timeout.duration)
|
||||
|
||||
|
|
|
|||
|
|
@ -15,6 +15,7 @@ import akka.actor._
|
|||
|
||||
object LoggingReceiveSpec {
|
||||
class TestLogActor extends Actor {
|
||||
override val supervisorStrategy = OneForOneStrategy(List(classOf[Throwable]), 5, 5000)
|
||||
def receive = { case _ ⇒ }
|
||||
}
|
||||
}
|
||||
|
|
@ -149,7 +150,7 @@ class LoggingReceiveSpec extends WordSpec with BeforeAndAfterEach with BeforeAnd
|
|||
within(3 seconds) {
|
||||
val lifecycleGuardian = appLifecycle.asInstanceOf[ActorSystemImpl].guardian
|
||||
val lname = lifecycleGuardian.path.toString
|
||||
val supervisor = TestActorRef[TestLogActor](Props[TestLogActor].withFaultHandler(OneForOneStrategy(List(classOf[Throwable]), 5, 5000)))
|
||||
val supervisor = TestActorRef[TestLogActor](Props[TestLogActor])
|
||||
val sname = supervisor.path.toString
|
||||
val sclass = classOf[TestLogActor]
|
||||
|
||||
|
|
|
|||
|
|
@ -141,6 +141,14 @@ object Actor {
|
|||
*
|
||||
* {{{
|
||||
* class ExampleActor extends Actor {
|
||||
*
|
||||
* override val supervisorStrategy = OneForOneStrategy({
|
||||
* case _: ArithmeticException ⇒ Resume
|
||||
* case _: NullPointerException ⇒ Restart
|
||||
* case _: IllegalArgumentException ⇒ Stop
|
||||
* case _: Exception ⇒ Escalate
|
||||
* }: Decider, maxNrOfRetries = Some(10), withinTimeRange = Some(60000))
|
||||
*
|
||||
* def receive = {
|
||||
* // directly calculated reply
|
||||
* case Request(r) => sender ! calculate(r)
|
||||
|
|
@ -224,6 +232,12 @@ trait Actor {
|
|||
*/
|
||||
protected def receive: Receive
|
||||
|
||||
/**
|
||||
* User overridable definition the strategy to use for supervising
|
||||
* child actors.
|
||||
*/
|
||||
def supervisorStrategy(): SupervisorStrategy = SupervisorStrategy.defaultStrategy
|
||||
|
||||
/**
|
||||
* User overridable callback.
|
||||
* <p/>
|
||||
|
|
|
|||
|
|
@ -395,7 +395,7 @@ private[akka] class ActorCell(
|
|||
|
||||
dispatcher.resume(this) //FIXME should this be moved down?
|
||||
|
||||
props.faultHandler.handleSupervisorRestarted(cause, self, children)
|
||||
actor.supervisorStrategy.handleSupervisorRestarted(cause, self, children)
|
||||
} catch {
|
||||
// TODO catching all and continue isn't good for OOME, ticket #1418
|
||||
case e ⇒ try {
|
||||
|
|
@ -491,11 +491,11 @@ private[akka] class ActorCell(
|
|||
// make sure that InterruptedException does not leave this thread
|
||||
if (e.isInstanceOf[InterruptedException]) {
|
||||
val ex = ActorInterruptedException(e)
|
||||
props.faultHandler.handleSupervisorFailing(self, children)
|
||||
actor.supervisorStrategy.handleSupervisorFailing(self, children)
|
||||
parent.tell(Failed(ex), self)
|
||||
throw e //Re-throw InterruptedExceptions as expected
|
||||
} else {
|
||||
props.faultHandler.handleSupervisorFailing(self, children)
|
||||
actor.supervisorStrategy.handleSupervisorFailing(self, children)
|
||||
parent.tell(Failed(e), self)
|
||||
}
|
||||
} finally {
|
||||
|
|
@ -569,7 +569,7 @@ private[akka] class ActorCell(
|
|||
}
|
||||
|
||||
final def handleFailure(child: ActorRef, cause: Throwable): Unit = childrenRefs.get(child.path.name) match {
|
||||
case Some(stats) if stats.child == child ⇒ if (!props.faultHandler.handleFailure(this, child, cause, stats, childrenRefs.values)) throw cause
|
||||
case Some(stats) if stats.child == child ⇒ if (!actor.supervisorStrategy.handleFailure(this, child, cause, stats, childrenRefs.values)) throw cause
|
||||
case Some(stats) ⇒ system.eventStream.publish(Warning(self.path.toString, clazz(actor), "dropping Failed(" + cause + ") from unknown child " + child + " matching names but not the same, was: " + stats.child))
|
||||
case None ⇒ system.eventStream.publish(Warning(self.path.toString, clazz(actor), "dropping Failed(" + cause + ") from unknown child " + child))
|
||||
}
|
||||
|
|
@ -577,7 +577,7 @@ private[akka] class ActorCell(
|
|||
final def handleChildTerminated(child: ActorRef): Unit = {
|
||||
if (childrenRefs contains child.path.name) {
|
||||
childrenRefs -= child.path.name
|
||||
props.faultHandler.handleChildTerminated(this, child, children)
|
||||
actor.supervisorStrategy.handleChildTerminated(this, child, children)
|
||||
if (stopping && childrenRefs.isEmpty) doTerminate()
|
||||
} else system.locker ! ChildTerminated(child)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -334,6 +334,16 @@ class LocalActorRefProvider(
|
|||
* exceptions which might have occurred.
|
||||
*/
|
||||
private class Guardian extends Actor {
|
||||
|
||||
override val supervisorStrategy = {
|
||||
import akka.actor.SupervisorStrategy._
|
||||
OneForOneStrategy {
|
||||
case _: ActorKilledException ⇒ Stop
|
||||
case _: ActorInitializationException ⇒ Stop
|
||||
case _: Exception ⇒ Restart
|
||||
}
|
||||
}
|
||||
|
||||
def receive = {
|
||||
case Terminated(_) ⇒ context.stop(self)
|
||||
case CreateChild(child, name) ⇒ sender ! (try context.actorOf(child, name) catch { case e: Exception ⇒ e })
|
||||
|
|
@ -366,16 +376,6 @@ class LocalActorRefProvider(
|
|||
override def preRestart(cause: Throwable, msg: Option[Any]) {}
|
||||
}
|
||||
|
||||
private val guardianFaultHandlingStrategy = {
|
||||
import akka.actor.FaultHandlingStrategy._
|
||||
OneForOneStrategy {
|
||||
case _: ActorKilledException ⇒ Stop
|
||||
case _: ActorInitializationException ⇒ Stop
|
||||
case _: Exception ⇒ Restart
|
||||
}
|
||||
}
|
||||
private val guardianProps = Props(new Guardian).withFaultHandler(guardianFaultHandlingStrategy)
|
||||
|
||||
/*
|
||||
* The problem is that ActorRefs need a reference to the ActorSystem to
|
||||
* provide their service. Hence they cannot be created while the
|
||||
|
|
@ -401,6 +401,8 @@ class LocalActorRefProvider(
|
|||
*/
|
||||
def registerExtraNames(_extras: Map[String, InternalActorRef]): Unit = extraNames ++= _extras
|
||||
|
||||
private val guardianProps = Props(new Guardian)
|
||||
|
||||
lazy val rootGuardian: InternalActorRef =
|
||||
new LocalActorRef(system, guardianProps, theOneWhoWalksTheBubblesOfSpaceTime, rootPath, true) {
|
||||
object Extra {
|
||||
|
|
|
|||
|
|
@ -44,7 +44,7 @@ case class ChildRestartStats(val child: ActorRef, var maxNrOfRetriesCount: Int =
|
|||
}
|
||||
}
|
||||
|
||||
object FaultHandlingStrategy {
|
||||
object SupervisorStrategy {
|
||||
sealed trait Action
|
||||
|
||||
/**
|
||||
|
|
@ -95,6 +95,16 @@ object FaultHandlingStrategy {
|
|||
*/
|
||||
def escalate = Escalate
|
||||
|
||||
final val defaultStrategy: SupervisorStrategy = {
|
||||
def defaultDecider: Decider = {
|
||||
case _: ActorInitializationException ⇒ Stop
|
||||
case _: ActorKilledException ⇒ Stop
|
||||
case _: Exception ⇒ Restart
|
||||
case _ ⇒ Escalate
|
||||
}
|
||||
OneForOneStrategy(defaultDecider, None, None)
|
||||
}
|
||||
|
||||
type Decider = PartialFunction[Throwable, Action]
|
||||
type JDecider = akka.japi.Function[Throwable, Action]
|
||||
type CauseAction = (Class[_ <: Throwable], Action)
|
||||
|
|
@ -148,9 +158,9 @@ object FaultHandlingStrategy {
|
|||
}
|
||||
}
|
||||
|
||||
abstract class FaultHandlingStrategy {
|
||||
abstract class SupervisorStrategy {
|
||||
|
||||
import FaultHandlingStrategy._
|
||||
import SupervisorStrategy._
|
||||
|
||||
def decider: Decider
|
||||
|
||||
|
|
@ -190,12 +200,12 @@ abstract class FaultHandlingStrategy {
|
|||
|
||||
object AllForOneStrategy {
|
||||
def apply(trapExit: List[Class[_ <: Throwable]], maxNrOfRetries: Int, withinTimeRange: Int): AllForOneStrategy =
|
||||
new AllForOneStrategy(FaultHandlingStrategy.makeDecider(trapExit),
|
||||
new AllForOneStrategy(SupervisorStrategy.makeDecider(trapExit),
|
||||
if (maxNrOfRetries < 0) None else Some(maxNrOfRetries), if (withinTimeRange < 0) None else Some(withinTimeRange))
|
||||
def apply(trapExit: List[Class[_ <: Throwable]], maxNrOfRetries: Option[Int], withinTimeRange: Option[Int]): AllForOneStrategy =
|
||||
new AllForOneStrategy(FaultHandlingStrategy.makeDecider(trapExit), maxNrOfRetries, withinTimeRange)
|
||||
new AllForOneStrategy(SupervisorStrategy.makeDecider(trapExit), maxNrOfRetries, withinTimeRange)
|
||||
def apply(trapExit: List[Class[_ <: Throwable]], maxNrOfRetries: Option[Int]): AllForOneStrategy =
|
||||
new AllForOneStrategy(FaultHandlingStrategy.makeDecider(trapExit), maxNrOfRetries, None)
|
||||
new AllForOneStrategy(SupervisorStrategy.makeDecider(trapExit), maxNrOfRetries, None)
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -204,22 +214,22 @@ object AllForOneStrategy {
|
|||
* maxNrOfRetries = the number of times an actor is allowed to be restarted
|
||||
* withinTimeRange = millisecond time window for maxNrOfRetries, negative means no window
|
||||
*/
|
||||
case class AllForOneStrategy(decider: FaultHandlingStrategy.Decider,
|
||||
case class AllForOneStrategy(decider: SupervisorStrategy.Decider,
|
||||
maxNrOfRetries: Option[Int] = None,
|
||||
withinTimeRange: Option[Int] = None) extends FaultHandlingStrategy {
|
||||
withinTimeRange: Option[Int] = None) extends SupervisorStrategy {
|
||||
|
||||
def this(decider: FaultHandlingStrategy.JDecider, maxNrOfRetries: Int, withinTimeRange: Int) =
|
||||
this(FaultHandlingStrategy.makeDecider(decider),
|
||||
def this(decider: SupervisorStrategy.JDecider, maxNrOfRetries: Int, withinTimeRange: Int) =
|
||||
this(SupervisorStrategy.makeDecider(decider),
|
||||
if (maxNrOfRetries < 0) None else Some(maxNrOfRetries),
|
||||
if (withinTimeRange < 0) None else Some(withinTimeRange))
|
||||
|
||||
def this(trapExit: JIterable[Class[_ <: Throwable]], maxNrOfRetries: Int, withinTimeRange: Int) =
|
||||
this(FaultHandlingStrategy.makeDecider(trapExit),
|
||||
this(SupervisorStrategy.makeDecider(trapExit),
|
||||
if (maxNrOfRetries < 0) None else Some(maxNrOfRetries),
|
||||
if (withinTimeRange < 0) None else Some(withinTimeRange))
|
||||
|
||||
def this(trapExit: Array[Class[_ <: Throwable]], maxNrOfRetries: Int, withinTimeRange: Int) =
|
||||
this(FaultHandlingStrategy.makeDecider(trapExit),
|
||||
this(SupervisorStrategy.makeDecider(trapExit),
|
||||
if (maxNrOfRetries < 0) None else Some(maxNrOfRetries),
|
||||
if (withinTimeRange < 0) None else Some(withinTimeRange))
|
||||
|
||||
|
|
@ -247,12 +257,12 @@ case class AllForOneStrategy(decider: FaultHandlingStrategy.Decider,
|
|||
|
||||
object OneForOneStrategy {
|
||||
def apply(trapExit: List[Class[_ <: Throwable]], maxNrOfRetries: Int, withinTimeRange: Int): OneForOneStrategy =
|
||||
new OneForOneStrategy(FaultHandlingStrategy.makeDecider(trapExit),
|
||||
new OneForOneStrategy(SupervisorStrategy.makeDecider(trapExit),
|
||||
if (maxNrOfRetries < 0) None else Some(maxNrOfRetries), if (withinTimeRange < 0) None else Some(withinTimeRange))
|
||||
def apply(trapExit: List[Class[_ <: Throwable]], maxNrOfRetries: Option[Int], withinTimeRange: Option[Int]): OneForOneStrategy =
|
||||
new OneForOneStrategy(FaultHandlingStrategy.makeDecider(trapExit), maxNrOfRetries, withinTimeRange)
|
||||
new OneForOneStrategy(SupervisorStrategy.makeDecider(trapExit), maxNrOfRetries, withinTimeRange)
|
||||
def apply(trapExit: List[Class[_ <: Throwable]], maxNrOfRetries: Option[Int]): OneForOneStrategy =
|
||||
new OneForOneStrategy(FaultHandlingStrategy.makeDecider(trapExit), maxNrOfRetries, None)
|
||||
new OneForOneStrategy(SupervisorStrategy.makeDecider(trapExit), maxNrOfRetries, None)
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -261,22 +271,22 @@ object OneForOneStrategy {
|
|||
* maxNrOfRetries = the number of times an actor is allowed to be restarted
|
||||
* withinTimeRange = millisecond time window for maxNrOfRetries, negative means no window
|
||||
*/
|
||||
case class OneForOneStrategy(decider: FaultHandlingStrategy.Decider,
|
||||
case class OneForOneStrategy(decider: SupervisorStrategy.Decider,
|
||||
maxNrOfRetries: Option[Int] = None,
|
||||
withinTimeRange: Option[Int] = None) extends FaultHandlingStrategy {
|
||||
withinTimeRange: Option[Int] = None) extends SupervisorStrategy {
|
||||
|
||||
def this(decider: FaultHandlingStrategy.JDecider, maxNrOfRetries: Int, withinTimeRange: Int) =
|
||||
this(FaultHandlingStrategy.makeDecider(decider),
|
||||
def this(decider: SupervisorStrategy.JDecider, maxNrOfRetries: Int, withinTimeRange: Int) =
|
||||
this(SupervisorStrategy.makeDecider(decider),
|
||||
if (maxNrOfRetries < 0) None else Some(maxNrOfRetries),
|
||||
if (withinTimeRange < 0) None else Some(withinTimeRange))
|
||||
|
||||
def this(trapExit: JIterable[Class[_ <: Throwable]], maxNrOfRetries: Int, withinTimeRange: Int) =
|
||||
this(FaultHandlingStrategy.makeDecider(trapExit),
|
||||
this(SupervisorStrategy.makeDecider(trapExit),
|
||||
if (maxNrOfRetries < 0) None else Some(maxNrOfRetries),
|
||||
if (withinTimeRange < 0) None else Some(withinTimeRange))
|
||||
|
||||
def this(trapExit: Array[Class[_ <: Throwable]], maxNrOfRetries: Int, withinTimeRange: Int) =
|
||||
this(FaultHandlingStrategy.makeDecider(trapExit),
|
||||
this(SupervisorStrategy.makeDecider(trapExit),
|
||||
if (maxNrOfRetries < 0) None else Some(maxNrOfRetries),
|
||||
if (withinTimeRange < 0) None else Some(withinTimeRange))
|
||||
|
||||
|
|
|
|||
|
|
@ -18,19 +18,11 @@ import akka.routing._
|
|||
* Used when creating new actors through; <code>ActorSystem.actorOf</code> and <code>ActorContext.actorOf</code>.
|
||||
*/
|
||||
object Props {
|
||||
import FaultHandlingStrategy._
|
||||
|
||||
final val defaultCreator: () ⇒ Actor = () ⇒ throw new UnsupportedOperationException("No actor creator specified!")
|
||||
final val defaultDecider: Decider = {
|
||||
case _: ActorInitializationException ⇒ Stop
|
||||
case _: ActorKilledException ⇒ Stop
|
||||
case _: Exception ⇒ Restart
|
||||
case _ ⇒ Escalate
|
||||
}
|
||||
|
||||
final val defaultRoutedProps: RouterConfig = NoRouter
|
||||
|
||||
final val defaultFaultHandler: FaultHandlingStrategy = OneForOneStrategy(defaultDecider, None, None)
|
||||
final val noHotSwap: Stack[Actor.Receive] = Stack.empty
|
||||
final val empty = new Props(() ⇒ new Actor { def receive = Actor.emptyBehavior })
|
||||
|
||||
|
|
@ -79,8 +71,6 @@ object Props {
|
|||
def apply(behavior: ActorContext ⇒ Actor.Receive): Props =
|
||||
apply(new Actor { def receive = behavior(context) })
|
||||
|
||||
def apply(faultHandler: FaultHandlingStrategy): Props =
|
||||
apply(new Actor { def receive = { case _ ⇒ } }).withFaultHandler(faultHandler)
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -94,14 +84,10 @@ object Props {
|
|||
* val props = Props(
|
||||
* creator = ..,
|
||||
* dispatcher = ..,
|
||||
* faultHandler = ..,
|
||||
* routerConfig = ..
|
||||
* )
|
||||
* val props = Props().withCreator(new MyActor)
|
||||
* val props = Props[MyActor].withRouter(RoundRobinRouter(..))
|
||||
* val props = Props[MyActor].withFaultHandler(OneForOneStrategy {
|
||||
* case e: IllegalStateException ⇒ Resume
|
||||
* })
|
||||
* }}}
|
||||
*
|
||||
* Examples on Java API:
|
||||
|
|
@ -114,14 +100,12 @@ object Props {
|
|||
* }
|
||||
* });
|
||||
* Props props = new Props().withCreator(new UntypedActorFactory() { ... });
|
||||
* Props props = new Props(MyActor.class).withFaultHandler(new OneForOneStrategy(...));
|
||||
* Props props = new Props(MyActor.class).withRouter(new RoundRobinRouter(..));
|
||||
* }}}
|
||||
*/
|
||||
case class Props(
|
||||
creator: () ⇒ Actor = Props.defaultCreator,
|
||||
dispatcher: String = Dispatchers.DefaultDispatcherId,
|
||||
faultHandler: FaultHandlingStrategy = Props.defaultFaultHandler,
|
||||
routerConfig: RouterConfig = Props.defaultRoutedProps) {
|
||||
|
||||
/**
|
||||
|
|
@ -129,16 +113,14 @@ case class Props(
|
|||
*/
|
||||
def this() = this(
|
||||
creator = Props.defaultCreator,
|
||||
dispatcher = Dispatchers.DefaultDispatcherId,
|
||||
faultHandler = Props.defaultFaultHandler)
|
||||
dispatcher = Dispatchers.DefaultDispatcherId)
|
||||
|
||||
/**
|
||||
* Java API.
|
||||
*/
|
||||
def this(factory: UntypedActorFactory) = this(
|
||||
creator = () ⇒ factory.create(),
|
||||
dispatcher = Dispatchers.DefaultDispatcherId,
|
||||
faultHandler = Props.defaultFaultHandler)
|
||||
dispatcher = Dispatchers.DefaultDispatcherId)
|
||||
|
||||
/**
|
||||
* Java API.
|
||||
|
|
@ -146,7 +128,6 @@ case class Props(
|
|||
def this(actorClass: Class[_ <: Actor]) = this(
|
||||
creator = () ⇒ actorClass.newInstance,
|
||||
dispatcher = Dispatchers.DefaultDispatcherId,
|
||||
faultHandler = Props.defaultFaultHandler,
|
||||
routerConfig = Props.defaultRoutedProps)
|
||||
|
||||
/**
|
||||
|
|
@ -175,11 +156,6 @@ case class Props(
|
|||
*/
|
||||
def withDispatcher(d: String) = copy(dispatcher = d)
|
||||
|
||||
/**
|
||||
* Returns a new Props with the specified faulthandler set.
|
||||
*/
|
||||
def withFaultHandler(f: FaultHandlingStrategy) = copy(faultHandler = f)
|
||||
|
||||
/**
|
||||
* Returns a new Props with the specified router config set.
|
||||
*/
|
||||
|
|
|
|||
|
|
@ -218,6 +218,11 @@ object TypedActor extends ExtensionId[TypedActorExtension] with ExtensionIdProvi
|
|||
TypedActor.currentContext set null
|
||||
}
|
||||
|
||||
override def supervisorStrategy(): SupervisorStrategy = me match {
|
||||
case l: Supervisor ⇒ l.supervisorStrategy
|
||||
case _ ⇒ super.supervisorStrategy
|
||||
}
|
||||
|
||||
override def preStart(): Unit = me match {
|
||||
case l: PreStart ⇒ l.preStart()
|
||||
case _ ⇒ super.preStart()
|
||||
|
|
@ -275,6 +280,17 @@ object TypedActor extends ExtensionId[TypedActorExtension] with ExtensionIdProvi
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Mix this into your TypedActor to be able to define supervisor strategy
|
||||
*/
|
||||
trait Supervisor {
|
||||
/**
|
||||
* User overridable definition the strategy to use for supervising
|
||||
* child actors.
|
||||
*/
|
||||
def supervisorStrategy(): SupervisorStrategy = SupervisorStrategy.defaultStrategy
|
||||
}
|
||||
|
||||
/**
|
||||
* Mix this into your TypedActor to be able to hook into its lifecycle
|
||||
*/
|
||||
|
|
@ -355,7 +371,6 @@ object TypedActor extends ExtensionId[TypedActorExtension] with ExtensionIdProvi
|
|||
object TypedProps {
|
||||
|
||||
val defaultDispatcherId: String = Dispatchers.DefaultDispatcherId
|
||||
val defaultFaultHandler: FaultHandlingStrategy = akka.actor.Props.defaultFaultHandler
|
||||
val defaultTimeout: Option[Timeout] = None
|
||||
val defaultLoader: Option[ClassLoader] = None
|
||||
|
||||
|
|
@ -415,7 +430,6 @@ case class TypedProps[T <: AnyRef] protected[TypedProps] (
|
|||
interfaces: Seq[Class[_]],
|
||||
creator: () ⇒ T,
|
||||
dispatcher: String = TypedProps.defaultDispatcherId,
|
||||
faultHandler: FaultHandlingStrategy = TypedProps.defaultFaultHandler,
|
||||
timeout: Option[Timeout] = TypedProps.defaultTimeout,
|
||||
loader: Option[ClassLoader] = TypedProps.defaultLoader) {
|
||||
|
||||
|
|
@ -458,11 +472,6 @@ case class TypedProps[T <: AnyRef] protected[TypedProps] (
|
|||
*/
|
||||
def withDispatcher(d: String) = copy(dispatcher = d)
|
||||
|
||||
/**
|
||||
* Returns a new Props with the specified faulthandler set.
|
||||
*/
|
||||
def withFaultHandler(f: FaultHandlingStrategy) = copy(faultHandler = f)
|
||||
|
||||
/**
|
||||
* @returns a new Props that will use the specified ClassLoader to create its proxy class in
|
||||
* If loader is null, it will use the bootstrap classloader.
|
||||
|
|
@ -512,8 +521,8 @@ case class TypedProps[T <: AnyRef] protected[TypedProps] (
|
|||
|
||||
import akka.actor.{ Props ⇒ ActorProps }
|
||||
def actorProps(): ActorProps =
|
||||
if (dispatcher == ActorProps().dispatcher && faultHandler == ActorProps().faultHandler) ActorProps()
|
||||
else ActorProps(dispatcher = dispatcher, faultHandler = faultHandler)
|
||||
if (dispatcher == ActorProps().dispatcher) ActorProps()
|
||||
else ActorProps(dispatcher = dispatcher)
|
||||
}
|
||||
|
||||
case class ContextualTypedActorFactory(typedActor: TypedActorExtension, actorFactory: ActorContext) extends TypedActorFactory {
|
||||
|
|
|
|||
|
|
@ -37,6 +37,26 @@ import akka.dispatch.{ MessageDispatcher, Promise }
|
|||
* }
|
||||
* }
|
||||
*
|
||||
* private static SupervisorStrategy strategy = new OneForOneStrategy(new Function<Throwable, Action>() {
|
||||
* @Override
|
||||
* public Action apply(Throwable t) {
|
||||
* if (t instanceof ArithmeticException) {
|
||||
* return resume();
|
||||
* } else if (t instanceof NullPointerException) {
|
||||
* return restart();
|
||||
* } else if (t instanceof IllegalArgumentException) {
|
||||
* return stop();
|
||||
* } else {
|
||||
* return escalate();
|
||||
* }
|
||||
* }
|
||||
* }, 10, 60000);
|
||||
*
|
||||
* @Override
|
||||
* public SupervisorStrategy supervisorStrategy() {
|
||||
* return strategy;
|
||||
* }
|
||||
*
|
||||
* public void onReceive(Object message) throws Exception {
|
||||
* if (message instanceof String) {
|
||||
* String msg = (String)message;
|
||||
|
|
@ -92,6 +112,12 @@ abstract class UntypedActor extends Actor {
|
|||
*/
|
||||
def getSender(): ActorRef = sender
|
||||
|
||||
/**
|
||||
* User overridable definition the strategy to use for supervising
|
||||
* child actors.
|
||||
*/
|
||||
override def supervisorStrategy(): SupervisorStrategy = super.supervisorStrategy()
|
||||
|
||||
/**
|
||||
* User overridable callback.
|
||||
* <p/>
|
||||
|
|
|
|||
|
|
@ -59,7 +59,7 @@ guide lines which might be helpful:
|
|||
- If one actor depends on another actor for carrying out its duty, it should
|
||||
watch that other actor’s liveness and act upon receiving a termination
|
||||
notice. This is different from supervision, as the watching party has no
|
||||
influence on the supervision strategy, and it should be noted that a
|
||||
influence on the supervisor strategy, and it should be noted that a
|
||||
functional dependency alone is not a criterion for deciding where to place a
|
||||
certain child actor in the hierarchy.
|
||||
|
||||
|
|
|
|||
|
|
@ -10,7 +10,7 @@ encounter while implementing it. For more an in depth reference with all the
|
|||
details please refer to :ref:`actors-scala` and :ref:`untyped-actors-java`.
|
||||
|
||||
An actor is a container for `State`_, `Behavior`_, a `Mailbox`_, `Children`_
|
||||
and a `Fault Handling Strategy`_. All of this is encapsulated behind an `Actor
|
||||
and a `Supervisor Strategy`_. All of this is encapsulated behind an `Actor
|
||||
Reference`_. Finally, this happens `When an Actor Terminates`_.
|
||||
|
||||
Actor Reference
|
||||
|
|
@ -105,16 +105,14 @@ stopping (``context.stop(child)``) children and these actions are reflected
|
|||
immediately. The actual creation and termination actions happen behind the
|
||||
scenes in an asynchronous way, so they do not “block” their supervisor.
|
||||
|
||||
Fault Handling Strategy
|
||||
-----------------------
|
||||
Supervisor Strategy
|
||||
-------------------
|
||||
|
||||
The final piece of an actor is its strategy for handling faults of its
|
||||
children. To keep it simple and robust, this is declared outside of the actor’s
|
||||
code and has no access to the actor’s state. Fault handling is then done
|
||||
transparently by Akka, applying one of the strategies described in
|
||||
:ref:`supervision` for each incoming failure. As this strategy is fundamental
|
||||
to how an actor system is structured, it cannot be changed once an actor has
|
||||
been created.
|
||||
children. Fault handling is then done transparently by Akka, applying one
|
||||
of the strategies described in :ref:`supervision` for each incoming failure.
|
||||
As this strategy is fundamental to how an actor system is structured, it
|
||||
cannot be changed once an actor has been created.
|
||||
|
||||
Considering that there is only one such strategy for each actor, this means
|
||||
that if different strategies apply to the various children of an actor, the
|
||||
|
|
|
|||
|
|
@ -6,8 +6,8 @@ package akka.docs.actor;
|
|||
//#testkit
|
||||
import akka.actor.ActorRef;
|
||||
import akka.actor.ActorSystem;
|
||||
import akka.actor.FaultHandlingStrategy;
|
||||
import static akka.actor.FaultHandlingStrategy.*;
|
||||
import akka.actor.SupervisorStrategy;
|
||||
import static akka.actor.SupervisorStrategy.*;
|
||||
import akka.actor.OneForOneStrategy;
|
||||
import akka.actor.Props;
|
||||
import akka.actor.Terminated;
|
||||
|
|
@ -36,6 +36,30 @@ public class FaultHandlingTestBase {
|
|||
//#testkit
|
||||
//#supervisor
|
||||
static public class Supervisor extends UntypedActor {
|
||||
|
||||
//#strategy
|
||||
private static SupervisorStrategy strategy = new OneForOneStrategy(new Function<Throwable, Action>() {
|
||||
@Override
|
||||
public Action apply(Throwable t) {
|
||||
if (t instanceof ArithmeticException) {
|
||||
return resume();
|
||||
} else if (t instanceof NullPointerException) {
|
||||
return restart();
|
||||
} else if (t instanceof IllegalArgumentException) {
|
||||
return stop();
|
||||
} else {
|
||||
return escalate();
|
||||
}
|
||||
}
|
||||
}, 10, 60000);
|
||||
|
||||
@Override
|
||||
public SupervisorStrategy supervisorStrategy() {
|
||||
return strategy;
|
||||
}
|
||||
|
||||
//#strategy
|
||||
|
||||
public void onReceive(Object o) {
|
||||
if (o instanceof Props) {
|
||||
getSender().tell(getContext().actorOf((Props) o));
|
||||
|
|
@ -44,10 +68,35 @@ public class FaultHandlingTestBase {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
//#supervisor
|
||||
|
||||
//#supervisor2
|
||||
static public class Supervisor2 extends UntypedActor {
|
||||
|
||||
//#strategy2
|
||||
private static SupervisorStrategy strategy = new OneForOneStrategy(new Function<Throwable, Action>() {
|
||||
@Override
|
||||
public Action apply(Throwable t) {
|
||||
if (t instanceof ArithmeticException) {
|
||||
return resume();
|
||||
} else if (t instanceof NullPointerException) {
|
||||
return restart();
|
||||
} else if (t instanceof IllegalArgumentException) {
|
||||
return stop();
|
||||
} else {
|
||||
return escalate();
|
||||
}
|
||||
}
|
||||
}, 10, 60000);
|
||||
|
||||
@Override
|
||||
public SupervisorStrategy supervisorStrategy() {
|
||||
return strategy;
|
||||
}
|
||||
|
||||
//#strategy2
|
||||
|
||||
public void onReceive(Object o) {
|
||||
if (o instanceof Props) {
|
||||
getSender().tell(getContext().actorOf((Props) o));
|
||||
|
|
@ -61,6 +110,7 @@ public class FaultHandlingTestBase {
|
|||
// do not kill all children, which is the default here
|
||||
}
|
||||
}
|
||||
|
||||
//#supervisor2
|
||||
|
||||
//#child
|
||||
|
|
@ -79,24 +129,8 @@ public class FaultHandlingTestBase {
|
|||
}
|
||||
}
|
||||
}
|
||||
//#child
|
||||
|
||||
//#strategy
|
||||
static FaultHandlingStrategy strategy = new OneForOneStrategy(new Function<Throwable, Action>() {
|
||||
@Override
|
||||
public Action apply(Throwable t) {
|
||||
if (t instanceof ArithmeticException) {
|
||||
return resume();
|
||||
} else if (t instanceof NullPointerException) {
|
||||
return restart();
|
||||
} else if (t instanceof IllegalArgumentException) {
|
||||
return stop();
|
||||
} else {
|
||||
return escalate();
|
||||
}
|
||||
}
|
||||
}, 10, 60000);
|
||||
//#strategy
|
||||
//#child
|
||||
|
||||
//#testkit
|
||||
static ActorSystem system;
|
||||
|
|
@ -113,7 +147,7 @@ public class FaultHandlingTestBase {
|
|||
}
|
||||
|
||||
@Test
|
||||
public void mustEmployFaultHandler() {
|
||||
public void mustEmploySupervisorStrategy() {
|
||||
// code here
|
||||
//#testkit
|
||||
EventFilter ex1 = (EventFilter) new ErrorFilter(ArithmeticException.class);
|
||||
|
|
@ -124,7 +158,7 @@ public class FaultHandlingTestBase {
|
|||
system.eventStream().publish(new TestEvent.Mute(ignoreExceptions));
|
||||
|
||||
//#create
|
||||
Props superprops = new Props(Supervisor.class).withFaultHandler(strategy);
|
||||
Props superprops = new Props(Supervisor.class);
|
||||
ActorRef supervisor = system.actorOf(superprops, "supervisor");
|
||||
ActorRef child = (ActorRef) Await.result(supervisor.ask(new Props(Child.class), 5000), timeout);
|
||||
//#create
|
||||
|
|
@ -157,7 +191,7 @@ public class FaultHandlingTestBase {
|
|||
//#escalate-kill
|
||||
|
||||
//#escalate-restart
|
||||
superprops = new Props(Supervisor2.class).withFaultHandler(strategy);
|
||||
superprops = new Props(Supervisor2.class);
|
||||
supervisor = system.actorOf(superprops, "supervisor2");
|
||||
child = (ActorRef) Await.result(supervisor.ask(new Props(Child.class), 5000), timeout);
|
||||
child.tell(23);
|
||||
|
|
@ -167,10 +201,11 @@ public class FaultHandlingTestBase {
|
|||
//#escalate-restart
|
||||
//#testkit
|
||||
}
|
||||
//#testkit
|
||||
|
||||
//#testkit
|
||||
public <A> Seq<A> seq(A... args) {
|
||||
return JavaConverters.collectionAsScalaIterableConverter(java.util.Arrays.asList(args)).asScala().toSeq();
|
||||
}
|
||||
//#testkit
|
||||
//#testkit
|
||||
}
|
||||
//#testkit
|
||||
|
|
@ -1,19 +1,19 @@
|
|||
.. _fault-tolerance-java:
|
||||
|
||||
Fault Handling Strategies (Java)
|
||||
=================================
|
||||
Fault Tolerance (Java)
|
||||
======================
|
||||
|
||||
.. sidebar:: Contents
|
||||
|
||||
.. contents:: :local:
|
||||
|
||||
As explained in :ref:`actor-systems` each actor is the supervisor of its
|
||||
children, and as such each actor is given a fault handling strategy when it is
|
||||
created. This strategy cannot be changed afterwards as it is an integral part
|
||||
of the actor system’s structure.
|
||||
children, and as such each actor defines fault handling supervisor strategy.
|
||||
This strategy cannot be changed afterwards as it is an integral part of the
|
||||
actor system’s structure.
|
||||
|
||||
Creating a Fault Handling Strategy
|
||||
----------------------------------
|
||||
Creating a Supervisor Strategy
|
||||
------------------------------
|
||||
|
||||
For the sake of demonstration let us consider the following strategy:
|
||||
|
||||
|
|
@ -26,7 +26,7 @@ First off, it is a one-for-one strategy, meaning that each child is treated
|
|||
separately (an all-for-one strategy works very similarly, the only difference
|
||||
is that any decision is applied to all children of the supervisor, not only the
|
||||
failing one). There are limits set on the restart frequency, namely maximum 10
|
||||
restarts per minute; each of these settings defaults to could be left out, which means
|
||||
restarts per minute; each of these settings could be left out, which means
|
||||
that the respective limit does not apply, leaving the possibility to specify an
|
||||
absolute upper limit on the restarts or to make the restarts work infinitely.
|
||||
|
||||
|
|
@ -50,7 +50,7 @@ where ``TestProbe`` provides an actor ref useful for receiving and inspecting re
|
|||
.. includecode:: code/akka/docs/actor/FaultHandlingTestBase.java
|
||||
:include: testkit
|
||||
|
||||
Using the strategy shown above let us create actors:
|
||||
Let us create actors:
|
||||
|
||||
.. includecode:: code/akka/docs/actor/FaultHandlingTestBase.java
|
||||
:include: create
|
||||
|
|
|
|||
|
|
@ -153,6 +153,13 @@ you can create child Typed Actors by invoking ``typedActorOf(..)`` on that.
|
|||
|
||||
This also works for creating child Typed Actors in regular Akka Actors.
|
||||
|
||||
Supervisor Strategy
|
||||
-------------------
|
||||
|
||||
By having your Typed Actor implementation class implement ``TypedActor.Supervisor``
|
||||
you can define the strategy to use for supervising child actors, as described in
|
||||
:ref:`supervision` and :ref:`fault-tolerance-java`.
|
||||
|
||||
Lifecycle callbacks
|
||||
-------------------
|
||||
|
||||
|
|
|
|||
|
|
@ -129,6 +129,7 @@ In addition, it offers:
|
|||
|
||||
* :obj:`getSelf()` reference to the :class:`ActorRef` of the actor
|
||||
* :obj:`getSender()` reference sender Actor of the last received message, typically used as described in :ref:`UntypedActor.Reply`
|
||||
* :obj:`supervisorStrategy()` user overridable definition the strategy to use for supervising child actors
|
||||
* :obj:`getContext()` exposes contextual information for the actor and the current message, such as:
|
||||
|
||||
* factory methods to create child actors (:meth:`actorOf`)
|
||||
|
|
|
|||
|
|
@ -403,7 +403,7 @@ v2.0::
|
|||
|
||||
context.parent
|
||||
|
||||
*Fault handling strategy*
|
||||
*Supervisor Strategy*
|
||||
|
||||
v1.3::
|
||||
|
||||
|
|
@ -420,14 +420,18 @@ v1.3::
|
|||
|
||||
v2.0::
|
||||
|
||||
val strategy = OneForOneStrategy({
|
||||
class MyActor extends Actor {
|
||||
override val supervisorStrategy = OneForOneStrategy({
|
||||
case _: ArithmeticException ⇒ Resume
|
||||
case _: NullPointerException ⇒ Restart
|
||||
case _: IllegalArgumentException ⇒ Stop
|
||||
case _: Exception ⇒ Escalate
|
||||
}: Decider, maxNrOfRetries = Some(10), withinTimeRange = Some(60000))
|
||||
|
||||
val supervisor = system.actorOf(Props[Supervisor].withFaultHandler(strategy), "supervisor")
|
||||
def receive = {
|
||||
case x =>
|
||||
}
|
||||
}
|
||||
|
||||
Documentation:
|
||||
|
||||
|
|
|
|||
|
|
@ -154,6 +154,7 @@ In addition, it offers:
|
|||
|
||||
* :obj:`self` reference to the :class:`ActorRef` of the actor
|
||||
* :obj:`sender` reference sender Actor of the last received message, typically used as described in :ref:`Actor.Reply`
|
||||
* :obj:`supervisorStrategy` user overridable definition the strategy to use for supervising child actors
|
||||
* :obj:`context` exposes contextual information for the actor and the current message, such as:
|
||||
|
||||
* factory methods to create child actors (:meth:`actorOf`)
|
||||
|
|
|
|||
|
|
@ -17,6 +17,18 @@ object FaultHandlingDocSpec {
|
|||
//#supervisor
|
||||
//#supervisor
|
||||
class Supervisor extends Actor {
|
||||
//#strategy
|
||||
import akka.actor.OneForOneStrategy
|
||||
import akka.actor.SupervisorStrategy._
|
||||
|
||||
override val supervisorStrategy = OneForOneStrategy({
|
||||
case _: ArithmeticException ⇒ Resume
|
||||
case _: NullPointerException ⇒ Restart
|
||||
case _: IllegalArgumentException ⇒ Stop
|
||||
case _: Exception ⇒ Escalate
|
||||
}: Decider, maxNrOfRetries = Some(10), withinTimeRange = Some(60000))
|
||||
//#strategy
|
||||
|
||||
def receive = {
|
||||
case p: Props ⇒ sender ! context.actorOf(p)
|
||||
}
|
||||
|
|
@ -25,6 +37,18 @@ object FaultHandlingDocSpec {
|
|||
|
||||
//#supervisor2
|
||||
class Supervisor2 extends Actor {
|
||||
//#strategy2
|
||||
import akka.actor.OneForOneStrategy
|
||||
import akka.actor.SupervisorStrategy._
|
||||
|
||||
override val supervisorStrategy = OneForOneStrategy({
|
||||
case _: ArithmeticException ⇒ Resume
|
||||
case _: NullPointerException ⇒ Restart
|
||||
case _: IllegalArgumentException ⇒ Stop
|
||||
case _: Exception ⇒ Escalate
|
||||
}: Decider, maxNrOfRetries = Some(10), withinTimeRange = Some(60000))
|
||||
//#strategy2
|
||||
|
||||
def receive = {
|
||||
case p: Props ⇒ sender ! context.actorOf(p)
|
||||
}
|
||||
|
|
@ -56,21 +80,9 @@ class FaultHandlingDocSpec extends AkkaSpec with ImplicitSender {
|
|||
|
||||
"apply the chosen strategy for its child" in {
|
||||
//#testkit
|
||||
//#strategy
|
||||
import akka.actor.OneForOneStrategy
|
||||
import akka.actor.FaultHandlingStrategy._
|
||||
|
||||
val strategy = OneForOneStrategy({
|
||||
case _: ArithmeticException ⇒ Resume
|
||||
case _: NullPointerException ⇒ Restart
|
||||
case _: IllegalArgumentException ⇒ Stop
|
||||
case _: Exception ⇒ Escalate
|
||||
}: Decider, maxNrOfRetries = Some(10), withinTimeRange = Some(60000))
|
||||
|
||||
//#strategy
|
||||
//#create
|
||||
val superprops = Props[Supervisor].withFaultHandler(strategy)
|
||||
val supervisor = system.actorOf(superprops, "supervisor")
|
||||
val supervisor = system.actorOf(Props[Supervisor], "supervisor")
|
||||
|
||||
supervisor ! Props[Child]
|
||||
val child = expectMsgType[ActorRef] // retrieve answer from TestKit’s testActor
|
||||
|
|
@ -114,8 +126,7 @@ class FaultHandlingDocSpec extends AkkaSpec with ImplicitSender {
|
|||
expectMsg(Terminated(child2))
|
||||
//#escalate-kill
|
||||
//#escalate-restart
|
||||
val superprops2 = Props[Supervisor2].withFaultHandler(strategy)
|
||||
val supervisor2 = system.actorOf(superprops2, "supervisor2")
|
||||
val supervisor2 = system.actorOf(Props[Supervisor2], "supervisor2")
|
||||
|
||||
supervisor2 ! Props[Child]
|
||||
val child3 = expectMsgType[ActorRef]
|
||||
|
|
|
|||
|
|
@ -1,19 +1,19 @@
|
|||
.. _fault-tolerance-scala:
|
||||
|
||||
Fault Handling Strategies (Scala)
|
||||
=================================
|
||||
Fault Tolerance (Scala)
|
||||
=======================
|
||||
|
||||
.. sidebar:: Contents
|
||||
|
||||
.. contents:: :local:
|
||||
|
||||
As explained in :ref:`actor-systems` each actor is the supervisor of its
|
||||
children, and as such each actor is given a fault handling strategy when it is
|
||||
created. This strategy cannot be changed afterwards as it is an integral part
|
||||
of the actor system’s structure.
|
||||
children, and as such each actor defines fault handling supervisor strategy.
|
||||
This strategy cannot be changed afterwards as it is an integral part of the
|
||||
actor system’s structure.
|
||||
|
||||
Creating a Fault Handling Strategy
|
||||
----------------------------------
|
||||
Creating a Supervisor Strategy
|
||||
------------------------------
|
||||
|
||||
For the sake of demonstration let us consider the following strategy:
|
||||
|
||||
|
|
@ -56,7 +56,7 @@ MustMatchers``
|
|||
.. includecode:: code/akka/docs/actor/FaultHandlingDocSpec.scala
|
||||
:include: testkit
|
||||
|
||||
Using the strategy shown above let us create actors:
|
||||
Let us create actors:
|
||||
|
||||
.. includecode:: code/akka/docs/actor/FaultHandlingDocSpec.scala
|
||||
:include: create
|
||||
|
|
|
|||
|
|
@ -153,6 +153,13 @@ you can create child Typed Actors by invoking ``typedActorOf(..)`` on that.
|
|||
|
||||
This also works for creating child Typed Actors in regular Akka Actors.
|
||||
|
||||
Supervisor Strategy
|
||||
-------------------
|
||||
|
||||
By having your Typed Actor implementation class implement ``TypedActor.Supervisor``
|
||||
you can define the strategy to use for supervising child actors, as described in
|
||||
:ref:`supervision` and :ref:`fault-tolerance-scala`.
|
||||
|
||||
Lifecycle callbacks
|
||||
-------------------
|
||||
|
||||
|
|
|
|||
|
|
@ -181,8 +181,10 @@ class TestActorRefSpec extends AkkaSpec("disp1.type=Dispatcher") with BeforeAndA
|
|||
override def postRestart(reason: Throwable) { counter -= 1 }
|
||||
}), self, "child")
|
||||
|
||||
override def supervisorStrategy = OneForOneStrategy(List(classOf[ActorKilledException]), 5, 1000)
|
||||
|
||||
def receiveT = { case "sendKill" ⇒ ref ! Kill }
|
||||
}).withFaultHandler(OneForOneStrategy(List(classOf[ActorKilledException]), 5, 1000)))
|
||||
}))
|
||||
|
||||
boss ! "sendKill"
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue