diff --git a/akka-actor-tests/src/test/scala/akka/actor/ActorFireForgetRequestReplySpec.scala b/akka-actor-tests/src/test/scala/akka/actor/ActorFireForgetRequestReplySpec.scala index 13ccddefe2..9fa3ef2709 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/ActorFireForgetRequestReplySpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/ActorFireForgetRequestReplySpec.scala @@ -80,7 +80,7 @@ class ActorFireForgetRequestReplySpec extends AkkaSpec with BeforeAndAfterEach w "should shutdown crashed temporary actor" in { filterEvents(EventFilter[Exception]("Expected exception")) { - val supervisor = system.actorOf(Props[Supervisor].withFaultHandler(OneForOneStrategy(List(classOf[Exception]), Some(0)))) + val supervisor = system.actorOf(Props(new Supervisor(OneForOneStrategy(List(classOf[Exception]), Some(0))))) val actor = Await.result((supervisor ? Props[CrashingActor]).mapTo[ActorRef], timeout.duration) actor.isTerminated must be(false) actor ! "Die" diff --git a/akka-actor-tests/src/test/scala/akka/actor/ActorLifeCycleSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/ActorLifeCycleSpec.scala index b203ff256f..5fb9187cf7 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/ActorLifeCycleSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/ActorLifeCycleSpec.scala @@ -35,7 +35,7 @@ class ActorLifeCycleSpec extends AkkaSpec with BeforeAndAfterEach with ImplicitS "invoke preRestart, preStart, postRestart when using OneForOneStrategy" in { filterException[ActorKilledException] { val id = newUuid().toString - val supervisor = system.actorOf(Props[Supervisor].withFaultHandler(OneForOneStrategy(List(classOf[Exception]), Some(3)))) + val supervisor = system.actorOf(Props(new Supervisor(OneForOneStrategy(List(classOf[Exception]), Some(3))))) val gen = new AtomicInteger(0) val restarterProps = Props(new LifeCycleTestActor(testActor, id, gen) { override def preRestart(reason: Throwable, message: Option[Any]) { report("preRestart") } @@ -69,7 +69,7 @@ class ActorLifeCycleSpec extends AkkaSpec with BeforeAndAfterEach with ImplicitS "default for preRestart and postRestart is to call postStop and preStart respectively" in { filterException[ActorKilledException] { val id = newUuid().toString - val supervisor = system.actorOf(Props[Supervisor].withFaultHandler(OneForOneStrategy(List(classOf[Exception]), Some(3)))) + val supervisor = system.actorOf(Props(new Supervisor(OneForOneStrategy(List(classOf[Exception]), Some(3))))) val gen = new AtomicInteger(0) val restarterProps = Props(new LifeCycleTestActor(testActor, id, gen)) val restarter = Await.result((supervisor ? restarterProps).mapTo[ActorRef], timeout.duration) @@ -99,7 +99,7 @@ class ActorLifeCycleSpec extends AkkaSpec with BeforeAndAfterEach with ImplicitS "not invoke preRestart and postRestart when never restarted using OneForOneStrategy" in { val id = newUuid().toString - val supervisor = system.actorOf(Props[Supervisor].withFaultHandler(OneForOneStrategy(List(classOf[Exception]), Some(3)))) + val supervisor = system.actorOf(Props(new Supervisor(OneForOneStrategy(List(classOf[Exception]), Some(3))))) val gen = new AtomicInteger(0) val props = Props(new LifeCycleTestActor(testActor, id, gen)) val a = Await.result((supervisor ? props).mapTo[ActorRef], timeout.duration) diff --git a/akka-actor-tests/src/test/scala/akka/actor/ActorRefSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/ActorRefSpec.scala index 63f8ca242e..b4ab7f066d 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/ActorRefSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/ActorRefSpec.scala @@ -374,6 +374,8 @@ class ActorRefSpec extends AkkaSpec with DefaultTimeout { val boss = system.actorOf(Props(new Actor { + override val supervisorStrategy = OneForOneStrategy(List(classOf[Throwable]), 2, 1000) + val ref = context.actorOf( Props(new Actor { def receive = { case _ ⇒ } @@ -382,7 +384,7 @@ class ActorRefSpec extends AkkaSpec with DefaultTimeout { })) protected def receive = { case "sendKill" ⇒ ref ! Kill } - }).withFaultHandler(OneForOneStrategy(List(classOf[Throwable]), 2, 1000))) + })) boss ! "sendKill" Await.ready(latch, 5 seconds) diff --git a/akka-actor-tests/src/test/scala/akka/actor/DeathWatchSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/DeathWatchSpec.scala index cd6dc58129..78fd2a6780 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/DeathWatchSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/DeathWatchSpec.scala @@ -23,7 +23,7 @@ trait DeathWatchSpec { this: AkkaSpec with ImplicitSender with DefaultTimeout import DeathWatchSpec._ - lazy val supervisor = system.actorOf(Props[Supervisor], "watchers") + lazy val supervisor = system.actorOf(Props(new Supervisor(FaultHandlingStrategy.defaultFaultHandler)), "watchers") def startWatching(target: ActorRef) = Await.result((supervisor ? props(target, testActor)).mapTo[ActorRef], 3 seconds) @@ -94,7 +94,7 @@ trait DeathWatchSpec { this: AkkaSpec with ImplicitSender with DefaultTimeout "notify with a Terminated message once when an Actor is stopped but not when restarted" in { filterException[ActorKilledException] { - val supervisor = system.actorOf(Props[Supervisor].withFaultHandler(OneForOneStrategy(List(classOf[Exception]), Some(2)))) + val supervisor = system.actorOf(Props(new Supervisor(OneForOneStrategy(List(classOf[Exception]), Some(2))))) val terminalProps = Props(context ⇒ { case x ⇒ context.sender ! x }) val terminal = Await.result((supervisor ? terminalProps).mapTo[ActorRef], timeout.duration) @@ -115,13 +115,13 @@ trait DeathWatchSpec { this: AkkaSpec with ImplicitSender with DefaultTimeout "fail a monitor which does not handle Terminated()" in { filterEvents(EventFilter[ActorKilledException](), EventFilter[DeathPactException]()) { case class FF(fail: Failed) - val supervisor = system.actorOf(Props[Supervisor] - .withFaultHandler(new OneForOneStrategy(FaultHandlingStrategy.makeDecider(List(classOf[Exception])), Some(0)) { - override def handleFailure(context: ActorContext, child: ActorRef, cause: Throwable, stats: ChildRestartStats, children: Iterable[ChildRestartStats]) = { - testActor.tell(FF(Failed(cause)), child) - super.handleFailure(context, child, cause, stats, children) - } - })) + val strategy = new OneForOneStrategy(FaultHandlingStrategy.makeDecider(List(classOf[Exception])), Some(0)) { + override def handleFailure(context: ActorContext, child: ActorRef, cause: Throwable, stats: ChildRestartStats, children: Iterable[ChildRestartStats]) = { + testActor.tell(FF(Failed(cause)), child) + super.handleFailure(context, child, cause, stats, children) + } + } + val supervisor = system.actorOf(Props(new Supervisor(strategy))) val failed = Await.result((supervisor ? Props.empty).mapTo[ActorRef], timeout.duration) val brother = Await.result((supervisor ? Props(new Actor { diff --git a/akka-actor-tests/src/test/scala/akka/actor/FSMTransitionSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/FSMTransitionSpec.scala index cc63b76704..36258883e9 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/FSMTransitionSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/FSMTransitionSpec.scala @@ -72,8 +72,9 @@ class FSMTransitionSpec extends AkkaSpec with ImplicitSender { val fsm = system.actorOf(Props(new MyFSM(testActor))) val sup = system.actorOf(Props(new Actor { context.watch(fsm) + override val supervisorStrategy = OneForOneStrategy(List(classOf[Throwable]), None, None) def receive = { case _ ⇒ } - }).withFaultHandler(OneForOneStrategy(List(classOf[Throwable]), None, None))) + })) within(300 millis) { fsm ! SubscribeTransitionCallBack(forward) diff --git a/akka-actor-tests/src/test/scala/akka/actor/RestartStrategySpec.scala b/akka-actor-tests/src/test/scala/akka/actor/RestartStrategySpec.scala index e2fd22a030..4b7853af95 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/RestartStrategySpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/RestartStrategySpec.scala @@ -28,7 +28,7 @@ class RestartStrategySpec extends AkkaSpec with DefaultTimeout { "A RestartStrategy" must { "ensure that slave stays dead after max restarts within time range" in { - val boss = system.actorOf(Props[Supervisor].withFaultHandler(OneForOneStrategy(List(classOf[Throwable]), 2, 1000))) + val boss = system.actorOf(Props(new Supervisor(OneForOneStrategy(List(classOf[Throwable]), 2, 1000)))) val restartLatch = new TestLatch val secondRestartLatch = new TestLatch @@ -74,7 +74,7 @@ class RestartStrategySpec extends AkkaSpec with DefaultTimeout { } "ensure that slave is immortal without max restarts and time range" in { - val boss = system.actorOf(Props[Supervisor].withFaultHandler(OneForOneStrategy(List(classOf[Throwable]), None, None))) + val boss = system.actorOf(Props(new Supervisor(OneForOneStrategy(List(classOf[Throwable]), None, None)))) val countDownLatch = new TestLatch(100) @@ -96,7 +96,7 @@ class RestartStrategySpec extends AkkaSpec with DefaultTimeout { } "ensure that slave restarts after number of crashes not within time range" in { - val boss = system.actorOf(Props[Supervisor].withFaultHandler(OneForOneStrategy(List(classOf[Throwable]), 2, 500))) + val boss = system.actorOf(Props(new Supervisor(OneForOneStrategy(List(classOf[Throwable]), 2, 500)))) val restartLatch = new TestLatch val secondRestartLatch = new TestLatch @@ -153,7 +153,7 @@ class RestartStrategySpec extends AkkaSpec with DefaultTimeout { } "ensure that slave is not restarted after max retries" in { - val boss = system.actorOf(Props[Supervisor].withFaultHandler(OneForOneStrategy(List(classOf[Throwable]), Some(2), None))) + val boss = system.actorOf(Props(new Supervisor(OneForOneStrategy(List(classOf[Throwable]), Some(2), None)))) val restartLatch = new TestLatch val secondRestartLatch = new TestLatch @@ -208,11 +208,12 @@ class RestartStrategySpec extends AkkaSpec with DefaultTimeout { val countDownLatch = new TestLatch(2) val boss = system.actorOf(Props(new Actor { + override val supervisorStrategy = OneForOneStrategy(List(classOf[Throwable]), None, Some(1000)) def receive = { case p: Props ⇒ sender ! context.watch(context.actorOf(p)) case t: Terminated ⇒ maxNoOfRestartsLatch.open() } - }).withFaultHandler(OneForOneStrategy(List(classOf[Throwable]), None, Some(1000)))) + })) val slaveProps = Props(new Actor { diff --git a/akka-actor-tests/src/test/scala/akka/actor/SchedulerSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/SchedulerSpec.scala index 6126911162..3a7e91a47d 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/SchedulerSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/SchedulerSpec.scala @@ -133,7 +133,7 @@ class SchedulerSpec extends AkkaSpec with BeforeAndAfterEach with DefaultTimeout val restartLatch = new TestLatch val pingLatch = new TestLatch(6) - val supervisor = system.actorOf(Props[Supervisor].withFaultHandler(AllForOneStrategy(List(classOf[Exception]), 3, 1000))) + val supervisor = system.actorOf(Props(new Supervisor(AllForOneStrategy(List(classOf[Exception]), 3, 1000)))) val props = Props(new Actor { def receive = { case Ping ⇒ pingLatch.countDown() diff --git a/akka-actor-tests/src/test/scala/akka/actor/Supervisor.scala b/akka-actor-tests/src/test/scala/akka/actor/Supervisor.scala index 0bf8183137..d04990c375 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/Supervisor.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/Supervisor.scala @@ -3,7 +3,12 @@ */ package akka.actor -class Supervisor extends Actor { +/** + * For testing Supervisor behavior, normally you don't supply the strategy + * from the outside like this. + */ +class Supervisor(override val supervisorStrategy: FaultHandlingStrategy) extends Actor { + def receive = { case x: Props ⇒ sender ! context.actorOf(x) } diff --git a/akka-actor-tests/src/test/scala/akka/actor/SupervisorHierarchySpec.scala b/akka-actor-tests/src/test/scala/akka/actor/SupervisorHierarchySpec.scala index 576c328bc7..2fc5996b54 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/SupervisorHierarchySpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/SupervisorHierarchySpec.scala @@ -12,7 +12,12 @@ import akka.dispatch.Await object SupervisorHierarchySpec { class FireWorkerException(msg: String) extends Exception(msg) - class CountDownActor(countDown: CountDownLatch) extends Actor { + /** + * For testing Supervisor behavior, normally you don't supply the strategy + * from the outside like this. + */ + class CountDownActor(countDown: CountDownLatch, override val supervisorStrategy: FaultHandlingStrategy) extends Actor { + protected def receive = { case p: Props ⇒ sender ! context.actorOf(p) } @@ -33,12 +38,12 @@ class SupervisorHierarchySpec extends AkkaSpec with DefaultTimeout { "restart manager and workers in AllForOne" in { val countDown = new CountDownLatch(4) - val boss = system.actorOf(Props[Supervisor].withFaultHandler(OneForOneStrategy(List(classOf[Exception]), None, None))) + val boss = system.actorOf(Props(new Supervisor(OneForOneStrategy(List(classOf[Exception]), None, None)))) - val managerProps = Props(new CountDownActor(countDown)).withFaultHandler(AllForOneStrategy(List(), None, None)) + val managerProps = Props(new CountDownActor(countDown, AllForOneStrategy(List(), None, None))) val manager = Await.result((boss ? managerProps).mapTo[ActorRef], timeout.duration) - val workerProps = Props(new CountDownActor(countDown)) + val workerProps = Props(new CountDownActor(countDown, FaultHandlingStrategy.defaultFaultHandler)) val workerOne, workerTwo, workerThree = Await.result((manager ? workerProps).mapTo[ActorRef], timeout.duration) filterException[ActorKilledException] { @@ -55,13 +60,15 @@ class SupervisorHierarchySpec extends AkkaSpec with DefaultTimeout { val countDownMessages = new CountDownLatch(1) val countDownMax = new CountDownLatch(1) val boss = system.actorOf(Props(new Actor { - val crasher = context.watch(context.actorOf(Props(new CountDownActor(countDownMessages)))) + override val supervisorStrategy = OneForOneStrategy(List(classOf[Throwable]), 1, 5000) + + val crasher = context.watch(context.actorOf(Props(new CountDownActor(countDownMessages, FaultHandlingStrategy.defaultFaultHandler)))) protected def receive = { case "killCrasher" ⇒ crasher ! Kill case Terminated(_) ⇒ countDownMax.countDown() } - }).withFaultHandler(OneForOneStrategy(List(classOf[Throwable]), 1, 5000))) + })) filterException[ActorKilledException] { boss ! "killCrasher" diff --git a/akka-actor-tests/src/test/scala/akka/actor/SupervisorMiscSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/SupervisorMiscSpec.scala index e269456e9b..6e63a31d9a 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/SupervisorMiscSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/SupervisorMiscSpec.scala @@ -28,7 +28,7 @@ class SupervisorMiscSpec extends AkkaSpec(SupervisorMiscSpec.config) with Defaul filterEvents(EventFilter[Exception]("Kill")) { val countDownLatch = new CountDownLatch(4) - val supervisor = system.actorOf(Props[Supervisor].withFaultHandler(OneForOneStrategy(List(classOf[Exception]), 3, 5000))) + val supervisor = system.actorOf(Props(new Supervisor(OneForOneStrategy(List(classOf[Exception]), 3, 5000)))) val workerProps = Props(new Actor { override def postRestart(cause: Throwable) { countDownLatch.countDown() } diff --git a/akka-actor-tests/src/test/scala/akka/actor/SupervisorSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/SupervisorSpec.scala index 30b1a24493..4a0f8e997f 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/SupervisorSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/SupervisorSpec.scala @@ -53,6 +53,8 @@ object SupervisorSpec { var s: ActorRef = _ + override val supervisorStrategy = OneForOneStrategy(List(classOf[Exception]), Some(0)) + def receive = { case Die ⇒ temp forward Die case Terminated(`temp`) ⇒ sendTo ! "terminated" @@ -75,45 +77,45 @@ class SupervisorSpec extends AkkaSpec with BeforeAndAfterEach with ImplicitSende private def child(supervisor: ActorRef, props: Props): ActorRef = Await.result((supervisor ? props).mapTo[ActorRef], timeout.duration) def temporaryActorAllForOne = { - val supervisor = system.actorOf(Props[Supervisor].withFaultHandler(AllForOneStrategy(List(classOf[Exception]), Some(0)))) + val supervisor = system.actorOf(Props(new Supervisor(AllForOneStrategy(List(classOf[Exception]), Some(0))))) val temporaryActor = child(supervisor, Props(new PingPongActor(testActor))) (temporaryActor, supervisor) } def singleActorAllForOne = { - val supervisor = system.actorOf(Props[Supervisor].withFaultHandler(AllForOneStrategy(List(classOf[Exception]), 3, TimeoutMillis))) + val supervisor = system.actorOf(Props(new Supervisor(AllForOneStrategy(List(classOf[Exception]), 3, TimeoutMillis)))) val pingpong = child(supervisor, Props(new PingPongActor(testActor))) (pingpong, supervisor) } def singleActorOneForOne = { - val supervisor = system.actorOf(Props[Supervisor].withFaultHandler(OneForOneStrategy(List(classOf[Exception]), 3, TimeoutMillis))) + val supervisor = system.actorOf(Props(new Supervisor(OneForOneStrategy(List(classOf[Exception]), 3, TimeoutMillis)))) val pingpong = child(supervisor, Props(new PingPongActor(testActor))) (pingpong, supervisor) } def multipleActorsAllForOne = { - val supervisor = system.actorOf(Props[Supervisor].withFaultHandler(AllForOneStrategy(List(classOf[Exception]), 3, TimeoutMillis))) + val supervisor = system.actorOf(Props(new Supervisor(AllForOneStrategy(List(classOf[Exception]), 3, TimeoutMillis)))) val pingpong1, pingpong2, pingpong3 = child(supervisor, Props(new PingPongActor(testActor))) (pingpong1, pingpong2, pingpong3, supervisor) } def multipleActorsOneForOne = { - val supervisor = system.actorOf(Props[Supervisor].withFaultHandler(OneForOneStrategy(List(classOf[Exception]), 3, TimeoutMillis))) + val supervisor = system.actorOf(Props(new Supervisor(OneForOneStrategy(List(classOf[Exception]), 3, TimeoutMillis)))) val pingpong1, pingpong2, pingpong3 = child(supervisor, Props(new PingPongActor(testActor))) (pingpong1, pingpong2, pingpong3, supervisor) } def nestedSupervisorsAllForOne = { - val topSupervisor = system.actorOf(Props[Supervisor].withFaultHandler(AllForOneStrategy(List(classOf[Exception]), 3, TimeoutMillis))) + val topSupervisor = system.actorOf(Props(new Supervisor(AllForOneStrategy(List(classOf[Exception]), 3, TimeoutMillis)))) val pingpong1 = child(topSupervisor, Props(new PingPongActor(testActor))) - val middleSupervisor = child(topSupervisor, Props[Supervisor].withFaultHandler(AllForOneStrategy(Nil, 3, TimeoutMillis))) + val middleSupervisor = child(topSupervisor, Props(new Supervisor(AllForOneStrategy(Nil, 3, TimeoutMillis)))) val pingpong2, pingpong3 = child(middleSupervisor, Props(new PingPongActor(testActor))) (pingpong1, pingpong2, pingpong3, topSupervisor) @@ -141,7 +143,7 @@ class SupervisorSpec extends AkkaSpec with BeforeAndAfterEach with ImplicitSende "A supervisor" must { "not restart child more times than permitted" in { - val master = system.actorOf(Props(new Master(testActor)).withFaultHandler(OneForOneStrategy(List(classOf[Exception]), Some(0)))) + val master = system.actorOf(Props(new Master(testActor))) master ! Die expectMsg(3 seconds, "terminated") @@ -277,7 +279,7 @@ class SupervisorSpec extends AkkaSpec with BeforeAndAfterEach with ImplicitSende "must attempt restart when exception during restart" in { val inits = new AtomicInteger(0) - val supervisor = system.actorOf(Props[Supervisor].withFaultHandler(OneForOneStrategy(classOf[Exception] :: Nil, 3, 10000))) + val supervisor = system.actorOf(Props(new Supervisor(OneForOneStrategy(classOf[Exception] :: Nil, 3, 10000)))) val dyingProps = Props(new Actor { inits.incrementAndGet diff --git a/akka-actor-tests/src/test/scala/akka/actor/SupervisorTreeSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/SupervisorTreeSpec.scala index 27ecec4863..ea97415e72 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/SupervisorTreeSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/SupervisorTreeSpec.scala @@ -22,11 +22,12 @@ class SupervisorTreeSpec extends AkkaSpec with ImplicitSender with DefaultTimeou EventFilter[ActorKilledException](occurrences = 1) intercept { within(5 seconds) { val p = Props(new Actor { + override val supervisorStrategy = OneForOneStrategy(List(classOf[Exception]), 3, 1000) def receive = { case p: Props ⇒ sender ! context.actorOf(p) } override def preRestart(cause: Throwable, msg: Option[Any]) { testActor ! self.path } - }).withFaultHandler(OneForOneStrategy(List(classOf[Exception]), 3, 1000)) + }) val headActor = system.actorOf(p) val middleActor = Await.result((headActor ? p).mapTo[ActorRef], timeout.duration) val lastActor = Await.result((middleActor ? p).mapTo[ActorRef], timeout.duration) diff --git a/akka-actor-tests/src/test/scala/akka/actor/Ticket669Spec.scala b/akka-actor-tests/src/test/scala/akka/actor/Ticket669Spec.scala index 73f3b416e9..cc4e7b2f74 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/Ticket669Spec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/Ticket669Spec.scala @@ -24,7 +24,7 @@ class Ticket669Spec extends AkkaSpec with BeforeAndAfterAll with ImplicitSender "A supervised actor with lifecycle PERMANENT" should { "be able to reply on failure during preRestart" in { filterEvents(EventFilter[Exception]("test", occurrences = 1)) { - val supervisor = system.actorOf(Props[Supervisor].withFaultHandler(AllForOneStrategy(List(classOf[Exception]), 5, 10000))) + val supervisor = system.actorOf(Props(new Supervisor(AllForOneStrategy(List(classOf[Exception]), 5, 10000)))) val supervised = Await.result((supervisor ? Props[Supervised]).mapTo[ActorRef], timeout.duration) supervised.!("test")(testActor) @@ -35,7 +35,7 @@ class Ticket669Spec extends AkkaSpec with BeforeAndAfterAll with ImplicitSender "be able to reply on failure during postStop" in { filterEvents(EventFilter[Exception]("test", occurrences = 1)) { - val supervisor = system.actorOf(Props[Supervisor].withFaultHandler(AllForOneStrategy(List(classOf[Exception]), Some(0), None))) + val supervisor = system.actorOf(Props(new Supervisor(AllForOneStrategy(List(classOf[Exception]), Some(0), None)))) val supervised = Await.result((supervisor ? Props[Supervised]).mapTo[ActorRef], timeout.duration) supervised.!("test")(testActor) diff --git a/akka-actor-tests/src/test/scala/akka/actor/TypedActorSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/TypedActorSpec.scala index 17bbd948c3..6ef7608ce2 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/TypedActorSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/TypedActorSpec.scala @@ -298,10 +298,13 @@ class TypedActorSpec extends AkkaSpec(TypedActorSpec.config) "be able to handle exceptions when calling methods" in { filterEvents(EventFilter[IllegalStateException]("expected")) { - val boss = system.actorOf(Props(context ⇒ { - case p: TypedProps[_] ⇒ context.sender ! TypedActor(context).typedActorOf(p) - }).withFaultHandler(OneForOneStrategy { - case e: IllegalStateException if e.getMessage == "expected" ⇒ FaultHandlingStrategy.Resume + val boss = system.actorOf(Props(new Actor { + override val supervisorStrategy = OneForOneStrategy { + case e: IllegalStateException if e.getMessage == "expected" ⇒ FaultHandlingStrategy.Resume + } + def receive = { + case p: TypedProps[_] ⇒ context.sender ! TypedActor(context).typedActorOf(p) + } })) val t = Await.result((boss ? TypedProps[Bar](classOf[Foo], classOf[Bar]).withTimeout(2 seconds)).mapTo[Foo], timeout.duration) diff --git a/akka-actor-tests/src/test/scala/akka/event/LoggingReceiveSpec.scala b/akka-actor-tests/src/test/scala/akka/event/LoggingReceiveSpec.scala index 7e756657f5..cf620bf0fc 100644 --- a/akka-actor-tests/src/test/scala/akka/event/LoggingReceiveSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/event/LoggingReceiveSpec.scala @@ -15,6 +15,7 @@ import akka.actor._ object LoggingReceiveSpec { class TestLogActor extends Actor { + override val supervisorStrategy = OneForOneStrategy(List(classOf[Throwable]), 5, 5000) def receive = { case _ ⇒ } } } @@ -149,7 +150,7 @@ class LoggingReceiveSpec extends WordSpec with BeforeAndAfterEach with BeforeAnd within(3 seconds) { val lifecycleGuardian = appLifecycle.asInstanceOf[ActorSystemImpl].guardian val lname = lifecycleGuardian.path.toString - val supervisor = TestActorRef[TestLogActor](Props[TestLogActor].withFaultHandler(OneForOneStrategy(List(classOf[Throwable]), 5, 5000))) + val supervisor = TestActorRef[TestLogActor](Props[TestLogActor]) val sname = supervisor.path.toString val sclass = classOf[TestLogActor] diff --git a/akka-actor/src/main/scala/akka/actor/Actor.scala b/akka-actor/src/main/scala/akka/actor/Actor.scala index 2cdd41c36e..f48b367e56 100644 --- a/akka-actor/src/main/scala/akka/actor/Actor.scala +++ b/akka-actor/src/main/scala/akka/actor/Actor.scala @@ -141,6 +141,14 @@ object Actor { * * {{{ * class ExampleActor extends Actor { + * + * override val supervisorStrategy = OneForOneStrategy({ + * case _: ArithmeticException ⇒ Resume + * case _: NullPointerException ⇒ Restart + * case _: IllegalArgumentException ⇒ Stop + * case _: Exception ⇒ Escalate + * }: Decider, maxNrOfRetries = Some(10), withinTimeRange = Some(60000)) + * * def receive = { * // directly calculated reply * case Request(r) => sender ! calculate(r) @@ -224,6 +232,12 @@ trait Actor { */ protected def receive: Receive + /** + * User overridable definition the strategy to use for supervising + * child actors. + */ + def supervisorStrategy(): FaultHandlingStrategy = FaultHandlingStrategy.defaultFaultHandler + /** * User overridable callback. *

diff --git a/akka-actor/src/main/scala/akka/actor/ActorCell.scala b/akka-actor/src/main/scala/akka/actor/ActorCell.scala index 0e813e5cd2..5aaf4ae8d5 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorCell.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorCell.scala @@ -395,7 +395,7 @@ private[akka] class ActorCell( dispatcher.resume(this) //FIXME should this be moved down? - props.faultHandler.handleSupervisorRestarted(cause, self, children) + actor.supervisorStrategy.handleSupervisorRestarted(cause, self, children) } catch { // TODO catching all and continue isn't good for OOME, ticket #1418 case e ⇒ try { @@ -491,11 +491,11 @@ private[akka] class ActorCell( // make sure that InterruptedException does not leave this thread if (e.isInstanceOf[InterruptedException]) { val ex = ActorInterruptedException(e) - props.faultHandler.handleSupervisorFailing(self, children) + actor.supervisorStrategy.handleSupervisorFailing(self, children) parent.tell(Failed(ex), self) throw e //Re-throw InterruptedExceptions as expected } else { - props.faultHandler.handleSupervisorFailing(self, children) + actor.supervisorStrategy.handleSupervisorFailing(self, children) parent.tell(Failed(e), self) } } finally { @@ -569,7 +569,7 @@ private[akka] class ActorCell( } final def handleFailure(child: ActorRef, cause: Throwable): Unit = childrenRefs.get(child.path.name) match { - case Some(stats) if stats.child == child ⇒ if (!props.faultHandler.handleFailure(this, child, cause, stats, childrenRefs.values)) throw cause + case Some(stats) if stats.child == child ⇒ if (!actor.supervisorStrategy.handleFailure(this, child, cause, stats, childrenRefs.values)) throw cause case Some(stats) ⇒ system.eventStream.publish(Warning(self.path.toString, clazz(actor), "dropping Failed(" + cause + ") from unknown child " + child + " matching names but not the same, was: " + stats.child)) case None ⇒ system.eventStream.publish(Warning(self.path.toString, clazz(actor), "dropping Failed(" + cause + ") from unknown child " + child)) } @@ -577,7 +577,7 @@ private[akka] class ActorCell( final def handleChildTerminated(child: ActorRef): Unit = { if (childrenRefs contains child.path.name) { childrenRefs -= child.path.name - props.faultHandler.handleChildTerminated(this, child, children) + actor.supervisorStrategy.handleChildTerminated(this, child, children) if (stopping && childrenRefs.isEmpty) doTerminate() } else system.locker ! ChildTerminated(child) } diff --git a/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala b/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala index 5073725fa6..0da5c13e1d 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala @@ -334,6 +334,16 @@ class LocalActorRefProvider( * exceptions which might have occurred. */ private class Guardian extends Actor { + + override val supervisorStrategy = { + import akka.actor.FaultHandlingStrategy._ + OneForOneStrategy { + case _: ActorKilledException ⇒ Stop + case _: ActorInitializationException ⇒ Stop + case _: Exception ⇒ Restart + } + } + def receive = { case Terminated(_) ⇒ context.stop(self) case CreateChild(child, name) ⇒ sender ! (try context.actorOf(child, name) catch { case e: Exception ⇒ e }) @@ -366,15 +376,7 @@ class LocalActorRefProvider( override def preRestart(cause: Throwable, msg: Option[Any]) {} } - private val guardianFaultHandlingStrategy = { - import akka.actor.FaultHandlingStrategy._ - OneForOneStrategy { - case _: ActorKilledException ⇒ Stop - case _: ActorInitializationException ⇒ Stop - case _: Exception ⇒ Restart - } - } - private val guardianProps = Props(new Guardian).withFaultHandler(guardianFaultHandlingStrategy) + private val guardianProps = Props(new Guardian) /* * The problem is that ActorRefs need a reference to the ActorSystem to diff --git a/akka-actor/src/main/scala/akka/actor/FaultHandling.scala b/akka-actor/src/main/scala/akka/actor/FaultHandling.scala index 8dc816f8ad..02c94a34bc 100644 --- a/akka-actor/src/main/scala/akka/actor/FaultHandling.scala +++ b/akka-actor/src/main/scala/akka/actor/FaultHandling.scala @@ -95,6 +95,16 @@ object FaultHandlingStrategy { */ def escalate = Escalate + final val defaultFaultHandler: FaultHandlingStrategy = { + def defaultDecider: Decider = { + case _: ActorInitializationException ⇒ Stop + case _: ActorKilledException ⇒ Stop + case _: Exception ⇒ Restart + case _ ⇒ Escalate + } + OneForOneStrategy(defaultDecider, None, None) + } + type Decider = PartialFunction[Throwable, Action] type JDecider = akka.japi.Function[Throwable, Action] type CauseAction = (Class[_ <: Throwable], Action) diff --git a/akka-actor/src/main/scala/akka/actor/Props.scala b/akka-actor/src/main/scala/akka/actor/Props.scala index 8cce3f35e5..c0b84d0017 100644 --- a/akka-actor/src/main/scala/akka/actor/Props.scala +++ b/akka-actor/src/main/scala/akka/actor/Props.scala @@ -18,19 +18,11 @@ import akka.routing._ * Used when creating new actors through; ActorSystem.actorOf and ActorContext.actorOf. */ object Props { - import FaultHandlingStrategy._ final val defaultCreator: () ⇒ Actor = () ⇒ throw new UnsupportedOperationException("No actor creator specified!") - final val defaultDecider: Decider = { - case _: ActorInitializationException ⇒ Stop - case _: ActorKilledException ⇒ Stop - case _: Exception ⇒ Restart - case _ ⇒ Escalate - } final val defaultRoutedProps: RouterConfig = NoRouter - final val defaultFaultHandler: FaultHandlingStrategy = OneForOneStrategy(defaultDecider, None, None) final val noHotSwap: Stack[Actor.Receive] = Stack.empty final val empty = new Props(() ⇒ new Actor { def receive = Actor.emptyBehavior }) @@ -79,8 +71,6 @@ object Props { def apply(behavior: ActorContext ⇒ Actor.Receive): Props = apply(new Actor { def receive = behavior(context) }) - def apply(faultHandler: FaultHandlingStrategy): Props = - apply(new Actor { def receive = { case _ ⇒ } }).withFaultHandler(faultHandler) } /** @@ -94,14 +84,10 @@ object Props { * val props = Props( * creator = .., * dispatcher = .., - * faultHandler = .., * routerConfig = .. * ) * val props = Props().withCreator(new MyActor) * val props = Props[MyActor].withRouter(RoundRobinRouter(..)) - * val props = Props[MyActor].withFaultHandler(OneForOneStrategy { - * case e: IllegalStateException ⇒ Resume - * }) * }}} * * Examples on Java API: @@ -114,14 +100,12 @@ object Props { * } * }); * Props props = new Props().withCreator(new UntypedActorFactory() { ... }); - * Props props = new Props(MyActor.class).withFaultHandler(new OneForOneStrategy(...)); * Props props = new Props(MyActor.class).withRouter(new RoundRobinRouter(..)); * }}} */ case class Props( creator: () ⇒ Actor = Props.defaultCreator, dispatcher: String = Dispatchers.DefaultDispatcherId, - faultHandler: FaultHandlingStrategy = Props.defaultFaultHandler, routerConfig: RouterConfig = Props.defaultRoutedProps) { /** @@ -129,16 +113,14 @@ case class Props( */ def this() = this( creator = Props.defaultCreator, - dispatcher = Dispatchers.DefaultDispatcherId, - faultHandler = Props.defaultFaultHandler) + dispatcher = Dispatchers.DefaultDispatcherId) /** * Java API. */ def this(factory: UntypedActorFactory) = this( creator = () ⇒ factory.create(), - dispatcher = Dispatchers.DefaultDispatcherId, - faultHandler = Props.defaultFaultHandler) + dispatcher = Dispatchers.DefaultDispatcherId) /** * Java API. @@ -146,7 +128,6 @@ case class Props( def this(actorClass: Class[_ <: Actor]) = this( creator = () ⇒ actorClass.newInstance, dispatcher = Dispatchers.DefaultDispatcherId, - faultHandler = Props.defaultFaultHandler, routerConfig = Props.defaultRoutedProps) /** @@ -175,11 +156,6 @@ case class Props( */ def withDispatcher(d: String) = copy(dispatcher = d) - /** - * Returns a new Props with the specified faulthandler set. - */ - def withFaultHandler(f: FaultHandlingStrategy) = copy(faultHandler = f) - /** * Returns a new Props with the specified router config set. */ diff --git a/akka-actor/src/main/scala/akka/actor/TypedActor.scala b/akka-actor/src/main/scala/akka/actor/TypedActor.scala index 15397ab966..9f81f4c754 100644 --- a/akka-actor/src/main/scala/akka/actor/TypedActor.scala +++ b/akka-actor/src/main/scala/akka/actor/TypedActor.scala @@ -218,6 +218,11 @@ object TypedActor extends ExtensionId[TypedActorExtension] with ExtensionIdProvi TypedActor.currentContext set null } + override def supervisorStrategy: FaultHandlingStrategy = me match { + case l: SupervisorStrategy ⇒ l.supervisorStrategy + case _ ⇒ super.supervisorStrategy + } + override def preStart(): Unit = me match { case l: PreStart ⇒ l.preStart() case _ ⇒ super.preStart() @@ -275,6 +280,17 @@ object TypedActor extends ExtensionId[TypedActorExtension] with ExtensionIdProvi } } + /** + * Mix this into your TypedActor to be able to define supervisor strategy + */ + trait SupervisorStrategy { + /** + * User overridable definition the strategy to use for supervising + * child actors. + */ + def supervisorStrategy: FaultHandlingStrategy = FaultHandlingStrategy.defaultFaultHandler + } + /** * Mix this into your TypedActor to be able to hook into its lifecycle */ @@ -355,7 +371,6 @@ object TypedActor extends ExtensionId[TypedActorExtension] with ExtensionIdProvi object TypedProps { val defaultDispatcherId: String = Dispatchers.DefaultDispatcherId - val defaultFaultHandler: FaultHandlingStrategy = akka.actor.Props.defaultFaultHandler val defaultTimeout: Option[Timeout] = None val defaultLoader: Option[ClassLoader] = None @@ -415,7 +430,6 @@ case class TypedProps[T <: AnyRef] protected[TypedProps] ( interfaces: Seq[Class[_]], creator: () ⇒ T, dispatcher: String = TypedProps.defaultDispatcherId, - faultHandler: FaultHandlingStrategy = TypedProps.defaultFaultHandler, timeout: Option[Timeout] = TypedProps.defaultTimeout, loader: Option[ClassLoader] = TypedProps.defaultLoader) { @@ -458,11 +472,6 @@ case class TypedProps[T <: AnyRef] protected[TypedProps] ( */ def withDispatcher(d: String) = copy(dispatcher = d) - /** - * Returns a new Props with the specified faulthandler set. - */ - def withFaultHandler(f: FaultHandlingStrategy) = copy(faultHandler = f) - /** * @returns a new Props that will use the specified ClassLoader to create its proxy class in * If loader is null, it will use the bootstrap classloader. @@ -512,8 +521,8 @@ case class TypedProps[T <: AnyRef] protected[TypedProps] ( import akka.actor.{ Props ⇒ ActorProps } def actorProps(): ActorProps = - if (dispatcher == ActorProps().dispatcher && faultHandler == ActorProps().faultHandler) ActorProps() - else ActorProps(dispatcher = dispatcher, faultHandler = faultHandler) + if (dispatcher == ActorProps().dispatcher) ActorProps() + else ActorProps(dispatcher = dispatcher) } case class ContextualTypedActorFactory(typedActor: TypedActorExtension, actorFactory: ActorContext) extends TypedActorFactory { diff --git a/akka-actor/src/main/scala/akka/actor/UntypedActor.scala b/akka-actor/src/main/scala/akka/actor/UntypedActor.scala index 76af2cfb4e..69cb096e26 100644 --- a/akka-actor/src/main/scala/akka/actor/UntypedActor.scala +++ b/akka-actor/src/main/scala/akka/actor/UntypedActor.scala @@ -37,6 +37,26 @@ import akka.dispatch.{ MessageDispatcher, Promise } * } * } * + * private static FaultHandlingStrategy strategy = new OneForOneStrategy(new Function() { + * @Override + * public Action apply(Throwable t) { + * if (t instanceof ArithmeticException) { + * return resume(); + * } else if (t instanceof NullPointerException) { + * return restart(); + * } else if (t instanceof IllegalArgumentException) { + * return stop(); + * } else { + * return escalate(); + * } + * } + * }, 10, 60000); + * + * @Override + * public FaultHandlingStrategy supervisorStrategy() { + * return strategy; + * } + * * public void onReceive(Object message) throws Exception { * if (message instanceof String) { * String msg = (String)message; @@ -92,6 +112,12 @@ abstract class UntypedActor extends Actor { */ def getSender(): ActorRef = sender + /** + * User overridable definition the strategy to use for supervising + * child actors. + */ + override def supervisorStrategy(): FaultHandlingStrategy = super.supervisorStrategy() + /** * User overridable callback. *

diff --git a/akka-actor/src/main/scala/akka/dispatch/Future.scala b/akka-actor/src/main/scala/akka/dispatch/Future.scala index 7af41035e5..654b561b94 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Future.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Future.scala @@ -272,7 +272,7 @@ object Future { * in blocking calls after the call to this method, giving the system a * chance to spawn new threads, reuse old threads or otherwise, to prevent * starvation and/or unfairness. - * + * * Assures that any Future tasks initiated in the current thread will be * executed asynchronously, including any tasks currently queued to be * executed in the current thread. This is needed if the current task may diff --git a/akka-docs/general/actor-systems.rst b/akka-docs/general/actor-systems.rst index 6dfb5cf731..33b5a95fb8 100644 --- a/akka-docs/general/actor-systems.rst +++ b/akka-docs/general/actor-systems.rst @@ -3,90 +3,90 @@ Actor Systems ============= -Actors are objects which encapsulate state and behavior, they communicate -exclusively by exchanging messages which are placed into the recipient’s -mailbox. In a sense, actors are the most strigent form of object-oriented -programming, but it serves better to view them as persons: while modeling a -solution with actors, envision a group of people and assign sub-tasks to them, -arrange their functions into an organizational structure and think about how to -escalate failure (all with the benefit of not actually dealing with people, -which means that we need not concern ourselves with their emotional state or -moral issues). The result can then serve as a mental scaffolding for building +Actors are objects which encapsulate state and behavior, they communicate +exclusively by exchanging messages which are placed into the recipient’s +mailbox. In a sense, actors are the most strigent form of object-oriented +programming, but it serves better to view them as persons: while modeling a +solution with actors, envision a group of people and assign sub-tasks to them, +arrange their functions into an organizational structure and think about how to +escalate failure (all with the benefit of not actually dealing with people, +which means that we need not concern ourselves with their emotional state or +moral issues). The result can then serve as a mental scaffolding for building the software implementation. Hierarchical Structure ---------------------- -Like in an economic organization, actors naturally form hierarchies. One actor, -which is to oversee a certain function in the program might want to split up -its task into smaller, more manageable pieces. For this purpose it starts child -actors which it supervises. While the details of supervision are explained -:ref:`here `, we shall concentrate on the underlying concepts in -this section. The only prerequisite is to know that each actor has exactly one +Like in an economic organization, actors naturally form hierarchies. One actor, +which is to oversee a certain function in the program might want to split up +its task into smaller, more manageable pieces. For this purpose it starts child +actors which it supervises. While the details of supervision are explained +:ref:`here `, we shall concentrate on the underlying concepts in +this section. The only prerequisite is to know that each actor has exactly one supervisor, which is the actor that created it. -The quintessential feature of actor systems is that tasks are split up and -delegated until they become small enough to be handled in one piece. In doing -so, not only is the task itself clearly structured, but the resulting actors -can be reasoned about in terms of which messages they should process, how they -should react nominally and how failure should be handled. If one actor does not -have the means for dealing with a certain situation, it sends a corresponding -failure message to its supervisor, asking for help. The recursive structure -then allows to handle failure at the right level. +The quintessential feature of actor systems is that tasks are split up and +delegated until they become small enough to be handled in one piece. In doing +so, not only is the task itself clearly structured, but the resulting actors +can be reasoned about in terms of which messages they should process, how they +should react nominally and how failure should be handled. If one actor does not +have the means for dealing with a certain situation, it sends a corresponding +failure message to its supervisor, asking for help. The recursive structure +then allows to handle failure at the right level. -Compare this to layered software design which easily devolves into defensive -programming with the aim of not leaking any failure out: if the problem is -communicated to the right person, a better solution can be found than if +Compare this to layered software design which easily devolves into defensive +programming with the aim of not leaking any failure out: if the problem is +communicated to the right person, a better solution can be found than if trying to keep everything “under the carpet”. -Now, the difficulty in designing such a system is how to decide who should -supervise what. There is of course no single best solution, but there are a few +Now, the difficulty in designing such a system is how to decide who should +supervise what. There is of course no single best solution, but there are a few guide lines which might be helpful: - - If one actor manages the work another actor is doing, e.g. by passing on - sub-tasks, then the manager should supervise the child. The reason is that - the manager knows which kind of failures are expected and how to handle + - If one actor manages the work another actor is doing, e.g. by passing on + sub-tasks, then the manager should supervise the child. The reason is that + the manager knows which kind of failures are expected and how to handle them. - - If one actor carries very important data (i.e. its state shall not be lost - if avoidable), this actor should source out any possibly dangerous sub-tasks - to children it supervises and handle failures of these children as - appropriate. Depending on the nature of the requests, it may be best to - create a new child for each request, which simplifies state management for - collecting the replies. This is known as the “Error Kernel Pattern” from + - If one actor carries very important data (i.e. its state shall not be lost + if avoidable), this actor should source out any possibly dangerous sub-tasks + to children it supervises and handle failures of these children as + appropriate. Depending on the nature of the requests, it may be best to + create a new child for each request, which simplifies state management for + collecting the replies. This is known as the “Error Kernel Pattern” from Erlang. - - If one actor depends on another actor for carrying out its duty, it should - watch that other actor’s liveness and act upon receiving a termination - notice. This is different from supervision, as the watching party has no - influence on the supervision strategy, and it should be noted that a - functional dependency alone is not a criterion for deciding where to place a + - If one actor depends on another actor for carrying out its duty, it should + watch that other actor’s liveness and act upon receiving a termination + notice. This is different from supervision, as the watching party has no + influence on the supervisor strategy, and it should be noted that a + functional dependency alone is not a criterion for deciding where to place a certain child actor in the hierarchy. -There are of course always exceptions to these rules, but no matter whether you +There are of course always exceptions to these rules, but no matter whether you follow the rules or break them, you should always have a reason. Configuration Container ----------------------- -The actor system as a collaborating ensemble of actors is the natural unit for -managing shared facilities like scheduling services, configuration, logging, -etc. Several actor systems with different configuration may co-exist within the -same JVM without problems, there is no global shared state within Akka itself. -Couple this with the transparent communication between actor systems—within one -node or across a network connection—to see that actor systems themselves can be +The actor system as a collaborating ensemble of actors is the natural unit for +managing shared facilities like scheduling services, configuration, logging, +etc. Several actor systems with different configuration may co-exist within the +same JVM without problems, there is no global shared state within Akka itself. +Couple this with the transparent communication between actor systems—within one +node or across a network connection—to see that actor systems themselves can be used as building blocks in a functional hierarchy. Actor Best Practices -------------------- -#. Actors should be like nice co-workers: do their job efficiently without - bothering everyone else needlessly and avoid hogging resources. Translated - to programming this means to process events and generate responses (or more - requests) in an event-driven manner. Actors should not block (i.e. passively - wait while occupying a Thread) on some external entity, which might be a - lock, a network socket, etc. The blocking operations should be done in some - special-cased thread which sends messages to the actors which shall act on +#. Actors should be like nice co-workers: do their job efficiently without + bothering everyone else needlessly and avoid hogging resources. Translated + to programming this means to process events and generate responses (or more + requests) in an event-driven manner. Actors should not block (i.e. passively + wait while occupying a Thread) on some external entity, which might be a + lock, a network socket, etc. The blocking operations should be done in some + special-cased thread which sends messages to the actors which shall act on them. #. Do not pass mutable objects between actors. In order to ensure that, prefer @@ -94,21 +94,21 @@ Actor Best Practices their mutable state to the outside, you are back in normal Java concurrency land with all the drawbacks. -#. Actors are made to be containers for behavior and state, embracing this - means to not routinely send behavior within messages (which may be tempting - using Scala closures). One of the risks is to accidentally share mutable - state between actors, and this violation of the actor model unfortunately - breaks all the properties which make programming in actors such a nice +#. Actors are made to be containers for behavior and state, embracing this + means to not routinely send behavior within messages (which may be tempting + using Scala closures). One of the risks is to accidentally share mutable + state between actors, and this violation of the actor model unfortunately + breaks all the properties which make programming in actors such a nice experience. What you should not concern yourself with ----------------------------------------- -An actor system manages the resources it is configured to use in order to run -the actors which it contains. There may be millions of actors within one such -system, after all the mantra is to view them as abundant and they weigh in at -an overhead of only roughly 300 bytes per instance. Naturally, the exact order -in which messages are processed in large systems is not controllable by the -application author, but this is also not intended. Take a step back and relax +An actor system manages the resources it is configured to use in order to run +the actors which it contains. There may be millions of actors within one such +system, after all the mantra is to view them as abundant and they weigh in at +an overhead of only roughly 300 bytes per instance. Naturally, the exact order +in which messages are processed in large systems is not controllable by the +application author, but this is also not intended. Take a step back and relax while Akka does the heavy lifting under the hood. diff --git a/akka-docs/java/code/akka/docs/actor/FaultHandlingTestBase.java b/akka-docs/java/code/akka/docs/actor/FaultHandlingTestBase.java index e402d44520..87610a2aef 100644 --- a/akka-docs/java/code/akka/docs/actor/FaultHandlingTestBase.java +++ b/akka-docs/java/code/akka/docs/actor/FaultHandlingTestBase.java @@ -36,6 +36,30 @@ public class FaultHandlingTestBase { //#testkit //#supervisor static public class Supervisor extends UntypedActor { + + //#strategy + private static FaultHandlingStrategy strategy = new OneForOneStrategy(new Function() { + @Override + public Action apply(Throwable t) { + if (t instanceof ArithmeticException) { + return resume(); + } else if (t instanceof NullPointerException) { + return restart(); + } else if (t instanceof IllegalArgumentException) { + return stop(); + } else { + return escalate(); + } + } + }, 10, 60000); + + @Override + public FaultHandlingStrategy supervisorStrategy() { + return strategy; + } + + //#strategy + public void onReceive(Object o) { if (o instanceof Props) { getSender().tell(getContext().actorOf((Props) o)); @@ -44,10 +68,35 @@ public class FaultHandlingTestBase { } } } + //#supervisor - + //#supervisor2 static public class Supervisor2 extends UntypedActor { + + //#strategy2 + private static FaultHandlingStrategy strategy = new OneForOneStrategy(new Function() { + @Override + public Action apply(Throwable t) { + if (t instanceof ArithmeticException) { + return resume(); + } else if (t instanceof NullPointerException) { + return restart(); + } else if (t instanceof IllegalArgumentException) { + return stop(); + } else { + return escalate(); + } + } + }, 10, 60000); + + @Override + public FaultHandlingStrategy supervisorStrategy() { + return strategy; + } + + //#strategy2 + public void onReceive(Object o) { if (o instanceof Props) { getSender().tell(getContext().actorOf((Props) o)); @@ -55,18 +104,19 @@ public class FaultHandlingTestBase { unhandled(o); } } - + @Override public void preRestart(Throwable cause, Option msg) { // do not kill all children, which is the default here } } + //#supervisor2 - + //#child static public class Child extends UntypedActor { int state = 0; - + public void onReceive(Object o) throws Exception { if (o instanceof Exception) { throw (Exception) o; @@ -79,39 +129,23 @@ public class FaultHandlingTestBase { } } } + //#child - - //#strategy - static FaultHandlingStrategy strategy = new OneForOneStrategy(new Function() { - @Override - public Action apply(Throwable t) { - if (t instanceof ArithmeticException) { - return resume(); - } else if (t instanceof NullPointerException) { - return restart(); - } else if (t instanceof IllegalArgumentException) { - return stop(); - } else { - return escalate(); - } - } - }, 10, 60000); - //#strategy - + //#testkit static ActorSystem system; Duration timeout = Duration.create(5, SECONDS); - + @BeforeClass public static void start() { system = ActorSystem.create("test", AkkaSpec.testConf()); } - + @AfterClass public static void cleanup() { system.shutdown(); } - + @Test public void mustEmployFaultHandler() { // code here @@ -122,32 +156,32 @@ public class FaultHandlingTestBase { EventFilter ex4 = (EventFilter) new ErrorFilter(Exception.class); Seq ignoreExceptions = seq(ex1, ex2, ex3, ex4); system.eventStream().publish(new TestEvent.Mute(ignoreExceptions)); - + //#create - Props superprops = new Props(Supervisor.class).withFaultHandler(strategy); + Props superprops = new Props(Supervisor.class); ActorRef supervisor = system.actorOf(superprops, "supervisor"); ActorRef child = (ActorRef) Await.result(supervisor.ask(new Props(Child.class), 5000), timeout); //#create - + //#resume child.tell(42); assert Await.result(child.ask("get", 5000), timeout).equals(42); child.tell(new ArithmeticException()); assert Await.result(child.ask("get", 5000), timeout).equals(42); //#resume - + //#restart child.tell(new NullPointerException()); assert Await.result(child.ask("get", 5000), timeout).equals(0); //#restart - + //#stop final TestProbe probe = new TestProbe(system); probe.watch(child); child.tell(new IllegalArgumentException()); probe.expectMsg(new Terminated(child)); //#stop - + //#escalate-kill child = (ActorRef) Await.result(supervisor.ask(new Props(Child.class), 5000), timeout); probe.watch(child); @@ -155,9 +189,9 @@ public class FaultHandlingTestBase { child.tell(new Exception()); probe.expectMsg(new Terminated(child)); //#escalate-kill - + //#escalate-restart - superprops = new Props(Supervisor2.class).withFaultHandler(strategy); + superprops = new Props(Supervisor2.class); supervisor = system.actorOf(superprops, "supervisor2"); child = (ActorRef) Await.result(supervisor.ask(new Props(Child.class), 5000), timeout); child.tell(23); @@ -167,10 +201,11 @@ public class FaultHandlingTestBase { //#escalate-restart //#testkit } -//#testkit + + //#testkit public Seq seq(A... args) { return JavaConverters.collectionAsScalaIterableConverter(java.util.Arrays.asList(args)).asScala().toSeq(); } -//#testkit + //#testkit } //#testkit \ No newline at end of file diff --git a/akka-docs/java/fault-tolerance.rst b/akka-docs/java/fault-tolerance.rst index ca6481bb77..4d0b1abe8c 100644 --- a/akka-docs/java/fault-tolerance.rst +++ b/akka-docs/java/fault-tolerance.rst @@ -8,9 +8,9 @@ Fault Handling Strategies (Java) .. contents:: :local: As explained in :ref:`actor-systems` each actor is the supervisor of its -children, and as such each actor is given a fault handling strategy when it is -created. This strategy cannot be changed afterwards as it is an integral part -of the actor system’s structure. +children, and as such each actor defines fault handling supervisor strategy. +This strategy cannot be changed afterwards as it is an integral part of the +actor system’s structure. Creating a Fault Handling Strategy ---------------------------------- @@ -26,7 +26,7 @@ First off, it is a one-for-one strategy, meaning that each child is treated separately (an all-for-one strategy works very similarly, the only difference is that any decision is applied to all children of the supervisor, not only the failing one). There are limits set on the restart frequency, namely maximum 10 -restarts per minute; each of these settings defaults to could be left out, which means +restarts per minute; each of these settings could be left out, which means that the respective limit does not apply, leaving the possibility to specify an absolute upper limit on the restarts or to make the restarts work infinitely. @@ -50,7 +50,7 @@ where ``TestProbe`` provides an actor ref useful for receiving and inspecting re .. includecode:: code/akka/docs/actor/FaultHandlingTestBase.java :include: testkit -Using the strategy shown above let us create actors: +Let us create actors: .. includecode:: code/akka/docs/actor/FaultHandlingTestBase.java :include: create diff --git a/akka-docs/scala/code/akka/docs/actor/FaultHandlingDocSpec.scala b/akka-docs/scala/code/akka/docs/actor/FaultHandlingDocSpec.scala index 6fb4d3c36b..dc9159de43 100644 --- a/akka-docs/scala/code/akka/docs/actor/FaultHandlingDocSpec.scala +++ b/akka-docs/scala/code/akka/docs/actor/FaultHandlingDocSpec.scala @@ -17,6 +17,18 @@ object FaultHandlingDocSpec { //#supervisor //#supervisor class Supervisor extends Actor { + //#strategy + import akka.actor.OneForOneStrategy + import akka.actor.FaultHandlingStrategy._ + + override val supervisorStrategy = OneForOneStrategy({ + case _: ArithmeticException ⇒ Resume + case _: NullPointerException ⇒ Restart + case _: IllegalArgumentException ⇒ Stop + case _: Exception ⇒ Escalate + }: Decider, maxNrOfRetries = Some(10), withinTimeRange = Some(60000)) + //#strategy + def receive = { case p: Props ⇒ sender ! context.actorOf(p) } @@ -25,6 +37,18 @@ object FaultHandlingDocSpec { //#supervisor2 class Supervisor2 extends Actor { + //#strategy2 + import akka.actor.OneForOneStrategy + import akka.actor.FaultHandlingStrategy._ + + override val supervisorStrategy = OneForOneStrategy({ + case _: ArithmeticException ⇒ Resume + case _: NullPointerException ⇒ Restart + case _: IllegalArgumentException ⇒ Stop + case _: Exception ⇒ Escalate + }: Decider, maxNrOfRetries = Some(10), withinTimeRange = Some(60000)) + //#strategy2 + def receive = { case p: Props ⇒ sender ! context.actorOf(p) } @@ -56,21 +80,9 @@ class FaultHandlingDocSpec extends AkkaSpec with ImplicitSender { "apply the chosen strategy for its child" in { //#testkit - //#strategy - import akka.actor.OneForOneStrategy - import akka.actor.FaultHandlingStrategy._ - val strategy = OneForOneStrategy({ - case _: ArithmeticException ⇒ Resume - case _: NullPointerException ⇒ Restart - case _: IllegalArgumentException ⇒ Stop - case _: Exception ⇒ Escalate - }: Decider, maxNrOfRetries = Some(10), withinTimeRange = Some(60000)) - - //#strategy //#create - val superprops = Props[Supervisor].withFaultHandler(strategy) - val supervisor = system.actorOf(superprops, "supervisor") + val supervisor = system.actorOf(Props[Supervisor], "supervisor") supervisor ! Props[Child] val child = expectMsgType[ActorRef] // retrieve answer from TestKit’s testActor @@ -114,8 +126,7 @@ class FaultHandlingDocSpec extends AkkaSpec with ImplicitSender { expectMsg(Terminated(child2)) //#escalate-kill //#escalate-restart - val superprops2 = Props[Supervisor2].withFaultHandler(strategy) - val supervisor2 = system.actorOf(superprops2, "supervisor2") + val supervisor2 = system.actorOf(Props[Supervisor2], "supervisor2") supervisor2 ! Props[Child] val child3 = expectMsgType[ActorRef] diff --git a/akka-docs/scala/fault-tolerance.rst b/akka-docs/scala/fault-tolerance.rst index c2ffa10c79..a4cdd584f0 100644 --- a/akka-docs/scala/fault-tolerance.rst +++ b/akka-docs/scala/fault-tolerance.rst @@ -8,9 +8,9 @@ Fault Handling Strategies (Scala) .. contents:: :local: As explained in :ref:`actor-systems` each actor is the supervisor of its -children, and as such each actor is given a fault handling strategy when it is -created. This strategy cannot be changed afterwards as it is an integral part -of the actor system’s structure. +children, and as such each actor defines fault handling supervisor strategy. +This strategy cannot be changed afterwards as it is an integral part of the +actor system’s structure. Creating a Fault Handling Strategy ---------------------------------- @@ -56,7 +56,7 @@ MustMatchers`` .. includecode:: code/akka/docs/actor/FaultHandlingDocSpec.scala :include: testkit -Using the strategy shown above let us create actors: +Let us create actors: .. includecode:: code/akka/docs/actor/FaultHandlingDocSpec.scala :include: create diff --git a/akka-testkit/src/test/scala/akka/testkit/TestActorRefSpec.scala b/akka-testkit/src/test/scala/akka/testkit/TestActorRefSpec.scala index ad5b572ec3..97df807119 100644 --- a/akka-testkit/src/test/scala/akka/testkit/TestActorRefSpec.scala +++ b/akka-testkit/src/test/scala/akka/testkit/TestActorRefSpec.scala @@ -181,8 +181,10 @@ class TestActorRefSpec extends AkkaSpec("disp1.type=Dispatcher") with BeforeAndA override def postRestart(reason: Throwable) { counter -= 1 } }), self, "child") + override def supervisorStrategy = OneForOneStrategy(List(classOf[ActorKilledException]), 5, 1000) + def receiveT = { case "sendKill" ⇒ ref ! Kill } - }).withFaultHandler(OneForOneStrategy(List(classOf[ActorKilledException]), 5, 1000))) + })) boss ! "sendKill"