From abc072ef0a05703b1fc6a9a00c7d1c92aa1d8311 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Tue, 24 Jan 2012 08:37:01 +0100 Subject: [PATCH 1/7] Improved API of OneForOneStrategy and AllForOneStrategy. See #1714 * withinTimeRange: Duration * Removed need for Options in API --- .../ActorFireForgetRequestReplySpec.scala | 3 +- .../scala/akka/actor/ActorLifeCycleSpec.scala | 6 +- .../test/scala/akka/actor/ActorRefSpec.scala | 2 +- .../scala/akka/actor/DeathWatchSpec.scala | 4 +- .../scala/akka/actor/FSMTransitionSpec.scala | 4 +- .../akka/actor/RestartStrategySpec.scala | 13 ++- .../test/scala/akka/actor/SchedulerSpec.scala | 6 +- .../akka/actor/SupervisorHierarchySpec.scala | 9 +- .../scala/akka/actor/SupervisorMiscSpec.scala | 4 +- .../scala/akka/actor/SupervisorSpec.scala | 41 ++++---- .../scala/akka/actor/SupervisorTreeSpec.scala | 3 +- .../test/scala/akka/actor/Ticket669Spec.scala | 6 +- .../scala/akka/event/LoggingReceiveSpec.scala | 3 +- .../src/main/scala/akka/actor/Actor.scala | 2 +- .../main/scala/akka/actor/FaultHandling.scala | 97 ++++++++++--------- .../main/scala/akka/actor/UntypedActor.scala | 2 +- .../docs/actor/FaultHandlingTestBase.java | 4 +- .../project/migration-guide-1.3.x-2.0.x.rst | 2 +- .../docs/actor/FaultHandlingDocSpec.scala | 6 +- .../scala/akka/testkit/TestActorRefSpec.scala | 3 +- 20 files changed, 122 insertions(+), 98 deletions(-) diff --git a/akka-actor-tests/src/test/scala/akka/actor/ActorFireForgetRequestReplySpec.scala b/akka-actor-tests/src/test/scala/akka/actor/ActorFireForgetRequestReplySpec.scala index c9f1725692..97ebdbaa1e 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/ActorFireForgetRequestReplySpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/ActorFireForgetRequestReplySpec.scala @@ -81,7 +81,8 @@ class ActorFireForgetRequestReplySpec extends AkkaSpec with BeforeAndAfterEach w "should shutdown crashed temporary actor" in { filterEvents(EventFilter[Exception]("Expected exception")) { - val supervisor = system.actorOf(Props(new Supervisor(OneForOneStrategy(List(classOf[Exception]), Some(0))))) + val supervisor = system.actorOf(Props(new Supervisor(OneForOneStrategy(List(classOf[Exception]), + maxNrOfRetries = 0)))) val actor = Await.result((supervisor ? Props[CrashingActor]).mapTo[ActorRef], timeout.duration) actor.isTerminated must be(false) actor ! "Die" diff --git a/akka-actor-tests/src/test/scala/akka/actor/ActorLifeCycleSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/ActorLifeCycleSpec.scala index d2c8a4bd47..fe30c2b39b 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/ActorLifeCycleSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/ActorLifeCycleSpec.scala @@ -36,7 +36,7 @@ class ActorLifeCycleSpec extends AkkaSpec with BeforeAndAfterEach with ImplicitS "invoke preRestart, preStart, postRestart when using OneForOneStrategy" in { filterException[ActorKilledException] { val id = newUuid().toString - val supervisor = system.actorOf(Props(new Supervisor(OneForOneStrategy(List(classOf[Exception]), Some(3))))) + val supervisor = system.actorOf(Props(new Supervisor(OneForOneStrategy(List(classOf[Exception]), maxNrOfRetries = 3)))) val gen = new AtomicInteger(0) val restarterProps = Props(new LifeCycleTestActor(testActor, id, gen) { override def preRestart(reason: Throwable, message: Option[Any]) { report("preRestart") } @@ -70,7 +70,7 @@ class ActorLifeCycleSpec extends AkkaSpec with BeforeAndAfterEach with ImplicitS "default for preRestart and postRestart is to call postStop and preStart respectively" in { filterException[ActorKilledException] { val id = newUuid().toString - val supervisor = system.actorOf(Props(new Supervisor(OneForOneStrategy(List(classOf[Exception]), Some(3))))) + val supervisor = system.actorOf(Props(new Supervisor(OneForOneStrategy(List(classOf[Exception]), maxNrOfRetries = 3)))) val gen = new AtomicInteger(0) val restarterProps = Props(new LifeCycleTestActor(testActor, id, gen)) val restarter = Await.result((supervisor ? restarterProps).mapTo[ActorRef], timeout.duration) @@ -100,7 +100,7 @@ class ActorLifeCycleSpec extends AkkaSpec with BeforeAndAfterEach with ImplicitS "not invoke preRestart and postRestart when never restarted using OneForOneStrategy" in { val id = newUuid().toString - val supervisor = system.actorOf(Props(new Supervisor(OneForOneStrategy(List(classOf[Exception]), Some(3))))) + val supervisor = system.actorOf(Props(new Supervisor(OneForOneStrategy(List(classOf[Exception]), maxNrOfRetries = 3)))) val gen = new AtomicInteger(0) val props = Props(new LifeCycleTestActor(testActor, id, gen)) val a = Await.result((supervisor ? props).mapTo[ActorRef], timeout.duration) diff --git a/akka-actor-tests/src/test/scala/akka/actor/ActorRefSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/ActorRefSpec.scala index 1f5120decd..5158de2262 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/ActorRefSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/ActorRefSpec.scala @@ -376,7 +376,7 @@ class ActorRefSpec extends AkkaSpec with DefaultTimeout { val boss = system.actorOf(Props(new Actor { - override val supervisorStrategy = OneForOneStrategy(List(classOf[Throwable]), 2, 1000) + override val supervisorStrategy = OneForOneStrategy(List(classOf[Throwable]), maxNrOfRetries = 2, withinTimeRange = 1 second) val ref = context.actorOf( Props(new Actor { diff --git a/akka-actor-tests/src/test/scala/akka/actor/DeathWatchSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/DeathWatchSpec.scala index 44d3daa9e2..921e57d2a4 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/DeathWatchSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/DeathWatchSpec.scala @@ -95,7 +95,7 @@ trait DeathWatchSpec { this: AkkaSpec with ImplicitSender with DefaultTimeout "notify with a Terminated message once when an Actor is stopped but not when restarted" in { filterException[ActorKilledException] { - val supervisor = system.actorOf(Props(new Supervisor(OneForOneStrategy(List(classOf[Exception]), Some(2))))) + val supervisor = system.actorOf(Props(new Supervisor(OneForOneStrategy(List(classOf[Exception]), maxNrOfRetries = 2)))) val terminalProps = Props(context ⇒ { case x ⇒ context.sender ! x }) val terminal = Await.result((supervisor ? terminalProps).mapTo[ActorRef], timeout.duration) @@ -116,7 +116,7 @@ trait DeathWatchSpec { this: AkkaSpec with ImplicitSender with DefaultTimeout "fail a monitor which does not handle Terminated()" in { filterEvents(EventFilter[ActorKilledException](), EventFilter[DeathPactException]()) { case class FF(fail: Failed) - val strategy = new OneForOneStrategy(SupervisorStrategy.makeDecider(List(classOf[Exception])), Some(0)) { + val strategy = new OneForOneStrategy(SupervisorStrategy.makeDecider(List(classOf[Exception])), maxNrOfRetries = 0) { override def handleFailure(context: ActorContext, child: ActorRef, cause: Throwable, stats: ChildRestartStats, children: Iterable[ChildRestartStats]) = { testActor.tell(FF(Failed(cause)), child) super.handleFailure(context, child, cause, stats, children) diff --git a/akka-actor-tests/src/test/scala/akka/actor/FSMTransitionSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/FSMTransitionSpec.scala index 36258883e9..d0c270070e 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/FSMTransitionSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/FSMTransitionSpec.scala @@ -5,8 +5,8 @@ package akka.actor import akka.testkit._ import akka.util.duration._ - import FSM._ +import akka.util.Duration object FSMTransitionSpec { @@ -72,7 +72,7 @@ class FSMTransitionSpec extends AkkaSpec with ImplicitSender { val fsm = system.actorOf(Props(new MyFSM(testActor))) val sup = system.actorOf(Props(new Actor { context.watch(fsm) - override val supervisorStrategy = OneForOneStrategy(List(classOf[Throwable]), None, None) + override val supervisorStrategy = OneForOneStrategy(List(classOf[Throwable]), Duration.Inf) def receive = { case _ ⇒ } })) diff --git a/akka-actor-tests/src/test/scala/akka/actor/RestartStrategySpec.scala b/akka-actor-tests/src/test/scala/akka/actor/RestartStrategySpec.scala index beeb243ce8..17750dd693 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/RestartStrategySpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/RestartStrategySpec.scala @@ -14,6 +14,7 @@ import akka.testkit.AkkaSpec import akka.testkit.DefaultTimeout import akka.testkit.TestLatch import akka.util.duration._ +import akka.util.Duration import akka.pattern.ask @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) @@ -29,7 +30,8 @@ class RestartStrategySpec extends AkkaSpec with DefaultTimeout { "A RestartStrategy" must { "ensure that slave stays dead after max restarts within time range" in { - val boss = system.actorOf(Props(new Supervisor(OneForOneStrategy(List(classOf[Throwable]), 2, 1000)))) + val boss = system.actorOf(Props(new Supervisor(OneForOneStrategy(List(classOf[Throwable]), + maxNrOfRetries = 2, withinTimeRange = 1 second)))) val restartLatch = new TestLatch val secondRestartLatch = new TestLatch @@ -75,7 +77,7 @@ class RestartStrategySpec extends AkkaSpec with DefaultTimeout { } "ensure that slave is immortal without max restarts and time range" in { - val boss = system.actorOf(Props(new Supervisor(OneForOneStrategy(List(classOf[Throwable]), None, None)))) + val boss = system.actorOf(Props(new Supervisor(OneForOneStrategy(List(classOf[Throwable]), Duration.Inf)))) val countDownLatch = new TestLatch(100) @@ -97,7 +99,8 @@ class RestartStrategySpec extends AkkaSpec with DefaultTimeout { } "ensure that slave restarts after number of crashes not within time range" in { - val boss = system.actorOf(Props(new Supervisor(OneForOneStrategy(List(classOf[Throwable]), 2, 500)))) + val boss = system.actorOf(Props(new Supervisor(OneForOneStrategy(List(classOf[Throwable]), + maxNrOfRetries = 2, withinTimeRange = 500 millis)))) val restartLatch = new TestLatch val secondRestartLatch = new TestLatch @@ -154,7 +157,7 @@ class RestartStrategySpec extends AkkaSpec with DefaultTimeout { } "ensure that slave is not restarted after max retries" in { - val boss = system.actorOf(Props(new Supervisor(OneForOneStrategy(List(classOf[Throwable]), Some(2), None)))) + val boss = system.actorOf(Props(new Supervisor(OneForOneStrategy(List(classOf[Throwable]), maxNrOfRetries = 2)))) val restartLatch = new TestLatch val secondRestartLatch = new TestLatch @@ -209,7 +212,7 @@ class RestartStrategySpec extends AkkaSpec with DefaultTimeout { val countDownLatch = new TestLatch(2) val boss = system.actorOf(Props(new Actor { - override val supervisorStrategy = OneForOneStrategy(List(classOf[Throwable]), None, Some(1000)) + override val supervisorStrategy = OneForOneStrategy(List(classOf[Throwable]), withinTimeRange = 1 second) def receive = { case p: Props ⇒ sender ! context.watch(context.actorOf(p)) case t: Terminated ⇒ maxNoOfRestartsLatch.open() diff --git a/akka-actor-tests/src/test/scala/akka/actor/SchedulerSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/SchedulerSpec.scala index a07af9d2eb..ea7522f30d 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/SchedulerSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/SchedulerSpec.scala @@ -134,7 +134,7 @@ class SchedulerSpec extends AkkaSpec with BeforeAndAfterEach with DefaultTimeout val restartLatch = new TestLatch val pingLatch = new TestLatch(6) - val supervisor = system.actorOf(Props(new Supervisor(AllForOneStrategy(List(classOf[Exception]), 3, 1000)))) + val supervisor = system.actorOf(Props(new Supervisor(AllForOneStrategy(List(classOf[Exception]), 3, 1 second)))) val props = Props(new Actor { def receive = { case Ping ⇒ pingLatch.countDown() @@ -165,8 +165,8 @@ class SchedulerSpec extends AkkaSpec with BeforeAndAfterEach with DefaultTimeout def receive = { case Msg(ts) ⇒ val now = System.nanoTime - // Make sure that no message has been dispatched before the scheduled time (10ms = 10000000ns) has occurred - if (now - ts < 10000000) throw new RuntimeException("Interval is too small: " + (now - ts)) + // Make sure that no message has been dispatched before the scheduled time (10ms) has occurred + if (now - ts < 10.millis.toNanos) throw new RuntimeException("Interval is too small: " + (now - ts)) ticks.countDown() } })) diff --git a/akka-actor-tests/src/test/scala/akka/actor/SupervisorHierarchySpec.scala b/akka-actor-tests/src/test/scala/akka/actor/SupervisorHierarchySpec.scala index 60107b9754..185651b278 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/SupervisorHierarchySpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/SupervisorHierarchySpec.scala @@ -5,10 +5,11 @@ package akka.actor import akka.testkit._ - import java.util.concurrent.{ TimeUnit, CountDownLatch } import akka.dispatch.Await import akka.pattern.ask +import akka.util.Duration +import akka.util.duration._ object SupervisorHierarchySpec { class FireWorkerException(msg: String) extends Exception(msg) @@ -39,9 +40,9 @@ class SupervisorHierarchySpec extends AkkaSpec with DefaultTimeout { "restart manager and workers in AllForOne" in { val countDown = new CountDownLatch(4) - val boss = system.actorOf(Props(new Supervisor(OneForOneStrategy(List(classOf[Exception]), None, None)))) + val boss = system.actorOf(Props(new Supervisor(OneForOneStrategy(List(classOf[Exception]), Duration.Inf)))) - val managerProps = Props(new CountDownActor(countDown, AllForOneStrategy(List(), None, None))) + val managerProps = Props(new CountDownActor(countDown, AllForOneStrategy(List(), Duration.Inf))) val manager = Await.result((boss ? managerProps).mapTo[ActorRef], timeout.duration) val workerProps = Props(new CountDownActor(countDown, SupervisorStrategy.defaultStrategy)) @@ -61,7 +62,7 @@ class SupervisorHierarchySpec extends AkkaSpec with DefaultTimeout { val countDownMessages = new CountDownLatch(1) val countDownMax = new CountDownLatch(1) val boss = system.actorOf(Props(new Actor { - override val supervisorStrategy = OneForOneStrategy(List(classOf[Throwable]), 1, 5000) + override val supervisorStrategy = OneForOneStrategy(List(classOf[Throwable]), maxNrOfRetries = 1, withinTimeRange = 5 seconds) val crasher = context.watch(context.actorOf(Props(new CountDownActor(countDownMessages, SupervisorStrategy.defaultStrategy)))) diff --git a/akka-actor-tests/src/test/scala/akka/actor/SupervisorMiscSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/SupervisorMiscSpec.scala index de617c8db2..7eb5007152 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/SupervisorMiscSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/SupervisorMiscSpec.scala @@ -9,6 +9,7 @@ import java.util.concurrent.{ TimeUnit, CountDownLatch } import akka.testkit.AkkaSpec import akka.testkit.DefaultTimeout import akka.pattern.ask +import akka.util.duration._ object SupervisorMiscSpec { val config = """ @@ -29,7 +30,8 @@ class SupervisorMiscSpec extends AkkaSpec(SupervisorMiscSpec.config) with Defaul filterEvents(EventFilter[Exception]("Kill")) { val countDownLatch = new CountDownLatch(4) - val supervisor = system.actorOf(Props(new Supervisor(OneForOneStrategy(List(classOf[Exception]), 3, 5000)))) + val supervisor = system.actorOf(Props(new Supervisor(OneForOneStrategy(List(classOf[Exception]), + maxNrOfRetries = 3, withinTimeRange = 5 seconds)))) val workerProps = Props(new Actor { override def postRestart(cause: Throwable) { countDownLatch.countDown() } diff --git a/akka-actor-tests/src/test/scala/akka/actor/SupervisorSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/SupervisorSpec.scala index 6b96038000..630da50f6d 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/SupervisorSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/SupervisorSpec.scala @@ -14,7 +14,7 @@ import akka.dispatch.Await import akka.pattern.ask object SupervisorSpec { - val Timeout = 5 seconds + val Timeout = 5.seconds case object DieReply @@ -54,7 +54,7 @@ object SupervisorSpec { var s: ActorRef = _ - override val supervisorStrategy = OneForOneStrategy(List(classOf[Exception]), Some(0)) + override val supervisorStrategy = OneForOneStrategy(List(classOf[Exception]), maxNrOfRetries = 0) def receive = { case Die ⇒ temp forward Die @@ -69,7 +69,7 @@ class SupervisorSpec extends AkkaSpec with BeforeAndAfterEach with ImplicitSende import SupervisorSpec._ - val TimeoutMillis = Timeout.dilated.toMillis.toInt + val DilatedTimeout = Timeout.dilated // ===================================================== // Creating actors and supervisors @@ -78,45 +78,51 @@ class SupervisorSpec extends AkkaSpec with BeforeAndAfterEach with ImplicitSende private def child(supervisor: ActorRef, props: Props): ActorRef = Await.result((supervisor ? props).mapTo[ActorRef], timeout.duration) def temporaryActorAllForOne = { - val supervisor = system.actorOf(Props(new Supervisor(AllForOneStrategy(List(classOf[Exception]), Some(0))))) + val supervisor = system.actorOf(Props(new Supervisor(AllForOneStrategy(List(classOf[Exception]), maxNrOfRetries = 0)))) val temporaryActor = child(supervisor, Props(new PingPongActor(testActor))) (temporaryActor, supervisor) } def singleActorAllForOne = { - val supervisor = system.actorOf(Props(new Supervisor(AllForOneStrategy(List(classOf[Exception]), 3, TimeoutMillis)))) + val supervisor = system.actorOf(Props(new Supervisor(AllForOneStrategy(List(classOf[Exception]), + maxNrOfRetries = 3, withinTimeRange = DilatedTimeout)))) val pingpong = child(supervisor, Props(new PingPongActor(testActor))) (pingpong, supervisor) } def singleActorOneForOne = { - val supervisor = system.actorOf(Props(new Supervisor(OneForOneStrategy(List(classOf[Exception]), 3, TimeoutMillis)))) + val supervisor = system.actorOf(Props(new Supervisor(OneForOneStrategy(List(classOf[Exception]), + maxNrOfRetries = 3, withinTimeRange = DilatedTimeout)))) val pingpong = child(supervisor, Props(new PingPongActor(testActor))) (pingpong, supervisor) } def multipleActorsAllForOne = { - val supervisor = system.actorOf(Props(new Supervisor(AllForOneStrategy(List(classOf[Exception]), 3, TimeoutMillis)))) + val supervisor = system.actorOf(Props(new Supervisor(AllForOneStrategy(List(classOf[Exception]), + maxNrOfRetries = 3, withinTimeRange = DilatedTimeout)))) val pingpong1, pingpong2, pingpong3 = child(supervisor, Props(new PingPongActor(testActor))) (pingpong1, pingpong2, pingpong3, supervisor) } def multipleActorsOneForOne = { - val supervisor = system.actorOf(Props(new Supervisor(OneForOneStrategy(List(classOf[Exception]), 3, TimeoutMillis)))) + val supervisor = system.actorOf(Props(new Supervisor(OneForOneStrategy(List(classOf[Exception]), + maxNrOfRetries = 3, withinTimeRange = DilatedTimeout)))) val pingpong1, pingpong2, pingpong3 = child(supervisor, Props(new PingPongActor(testActor))) (pingpong1, pingpong2, pingpong3, supervisor) } def nestedSupervisorsAllForOne = { - val topSupervisor = system.actorOf(Props(new Supervisor(AllForOneStrategy(List(classOf[Exception]), 3, TimeoutMillis)))) + val topSupervisor = system.actorOf(Props(new Supervisor(AllForOneStrategy(List(classOf[Exception]), + maxNrOfRetries = 3, withinTimeRange = DilatedTimeout)))) val pingpong1 = child(topSupervisor, Props(new PingPongActor(testActor))) - val middleSupervisor = child(topSupervisor, Props(new Supervisor(AllForOneStrategy(Nil, 3, TimeoutMillis)))) + val middleSupervisor = child(topSupervisor, Props(new Supervisor(AllForOneStrategy(Nil, + maxNrOfRetries = 3, withinTimeRange = DilatedTimeout)))) val pingpong2, pingpong3 = child(middleSupervisor, Props(new PingPongActor(testActor))) (pingpong1, pingpong2, pingpong3, topSupervisor) @@ -131,14 +137,14 @@ class SupervisorSpec extends AkkaSpec with BeforeAndAfterEach with ImplicitSende } def ping(pingPongActor: ActorRef) = { - Await.result(pingPongActor.?(Ping)(TimeoutMillis), TimeoutMillis millis) must be === PongMessage + Await.result(pingPongActor.?(Ping)(DilatedTimeout), DilatedTimeout) must be === PongMessage expectMsg(Timeout, PingMessage) } def kill(pingPongActor: ActorRef) = { - val result = (pingPongActor.?(DieReply)(TimeoutMillis)) + val result = (pingPongActor.?(DieReply)(DilatedTimeout)) expectMsg(Timeout, ExceptionMessage) - intercept[RuntimeException] { Await.result(result, TimeoutMillis millis) } + intercept[RuntimeException] { Await.result(result, DilatedTimeout) } } "A supervisor" must { @@ -154,7 +160,7 @@ class SupervisorSpec extends AkkaSpec with BeforeAndAfterEach with ImplicitSende "not restart temporary actor" in { val (temporaryActor, _) = temporaryActorAllForOne - intercept[RuntimeException] { Await.result(temporaryActor.?(DieReply)(TimeoutMillis), TimeoutMillis millis) } + intercept[RuntimeException] { Await.result(temporaryActor.?(DieReply)(DilatedTimeout), DilatedTimeout) } expectNoMsg(1 second) } @@ -280,7 +286,8 @@ class SupervisorSpec extends AkkaSpec with BeforeAndAfterEach with ImplicitSende "must attempt restart when exception during restart" in { val inits = new AtomicInteger(0) - val supervisor = system.actorOf(Props(new Supervisor(OneForOneStrategy(classOf[Exception] :: Nil, 3, 10000)))) + val supervisor = system.actorOf(Props(new Supervisor(OneForOneStrategy(classOf[Exception] :: Nil, + maxNrOfRetries = 3, withinTimeRange = 10 seconds)))) val dyingProps = Props(new Actor { inits.incrementAndGet @@ -300,11 +307,11 @@ class SupervisorSpec extends AkkaSpec with BeforeAndAfterEach with ImplicitSende filterEvents(EventFilter[RuntimeException]("Expected", occurrences = 1), EventFilter[IllegalStateException]("error while creating actor", occurrences = 1)) { intercept[RuntimeException] { - Await.result(dyingActor.?(DieReply)(TimeoutMillis), TimeoutMillis millis) + Await.result(dyingActor.?(DieReply)(DilatedTimeout), DilatedTimeout) } } - Await.result(dyingActor.?(Ping)(TimeoutMillis), TimeoutMillis millis) must be === PongMessage + Await.result(dyingActor.?(Ping)(DilatedTimeout), DilatedTimeout) must be === PongMessage inits.get must be(3) diff --git a/akka-actor-tests/src/test/scala/akka/actor/SupervisorTreeSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/SupervisorTreeSpec.scala index 04c1292d15..b9e3ff4a20 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/SupervisorTreeSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/SupervisorTreeSpec.scala @@ -23,7 +23,8 @@ class SupervisorTreeSpec extends AkkaSpec with ImplicitSender with DefaultTimeou EventFilter[ActorKilledException](occurrences = 1) intercept { within(5 seconds) { val p = Props(new Actor { - override val supervisorStrategy = OneForOneStrategy(List(classOf[Exception]), 3, 1000) + override val supervisorStrategy = OneForOneStrategy(List(classOf[Exception]), + maxNrOfRetries = 3, withinTimeRange = 1 second) def receive = { case p: Props ⇒ sender ! context.actorOf(p) } diff --git a/akka-actor-tests/src/test/scala/akka/actor/Ticket669Spec.scala b/akka-actor-tests/src/test/scala/akka/actor/Ticket669Spec.scala index 38292decde..b43ffc6d0c 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/Ticket669Spec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/Ticket669Spec.scala @@ -12,6 +12,7 @@ import akka.testkit.ImplicitSender import akka.testkit.DefaultTimeout import akka.dispatch.Await import akka.pattern.ask +import akka.util.duration._ @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) class Ticket669Spec extends AkkaSpec with BeforeAndAfterAll with ImplicitSender with DefaultTimeout { @@ -25,7 +26,7 @@ class Ticket669Spec extends AkkaSpec with BeforeAndAfterAll with ImplicitSender "A supervised actor with lifecycle PERMANENT" should { "be able to reply on failure during preRestart" in { filterEvents(EventFilter[Exception]("test", occurrences = 1)) { - val supervisor = system.actorOf(Props(new Supervisor(AllForOneStrategy(List(classOf[Exception]), 5, 10000)))) + val supervisor = system.actorOf(Props(new Supervisor(AllForOneStrategy(List(classOf[Exception]), 5, 10 seconds)))) val supervised = Await.result((supervisor ? Props[Supervised]).mapTo[ActorRef], timeout.duration) supervised.!("test")(testActor) @@ -36,7 +37,8 @@ class Ticket669Spec extends AkkaSpec with BeforeAndAfterAll with ImplicitSender "be able to reply on failure during postStop" in { filterEvents(EventFilter[Exception]("test", occurrences = 1)) { - val supervisor = system.actorOf(Props(new Supervisor(AllForOneStrategy(List(classOf[Exception]), Some(0), None)))) + val supervisor = system.actorOf(Props(new Supervisor(AllForOneStrategy(List(classOf[Exception]), + maxNrOfRetries = 0)))) val supervised = Await.result((supervisor ? Props[Supervised]).mapTo[ActorRef], timeout.duration) supervised.!("test")(testActor) diff --git a/akka-actor-tests/src/test/scala/akka/event/LoggingReceiveSpec.scala b/akka-actor-tests/src/test/scala/akka/event/LoggingReceiveSpec.scala index cf620bf0fc..1857c0938b 100644 --- a/akka-actor-tests/src/test/scala/akka/event/LoggingReceiveSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/event/LoggingReceiveSpec.scala @@ -15,7 +15,8 @@ import akka.actor._ object LoggingReceiveSpec { class TestLogActor extends Actor { - override val supervisorStrategy = OneForOneStrategy(List(classOf[Throwable]), 5, 5000) + override val supervisorStrategy = OneForOneStrategy(List(classOf[Throwable]), + maxNrOfRetries = 5, withinTimeRange = 5 seconds) def receive = { case _ ⇒ } } } diff --git a/akka-actor/src/main/scala/akka/actor/Actor.scala b/akka-actor/src/main/scala/akka/actor/Actor.scala index 40b590aa51..86e9cb816d 100644 --- a/akka-actor/src/main/scala/akka/actor/Actor.scala +++ b/akka-actor/src/main/scala/akka/actor/Actor.scala @@ -147,7 +147,7 @@ object Actor { * case _: NullPointerException ⇒ Restart * case _: IllegalArgumentException ⇒ Stop * case _: Exception ⇒ Escalate - * }: Decider, maxNrOfRetries = Some(10), withinTimeRange = Some(60000)) + * }: Decider, maxNrOfRetries = 10, withinTimeRange = 1 minute)) * * def receive = { * // directly calculated reply diff --git a/akka-actor/src/main/scala/akka/actor/FaultHandling.scala b/akka-actor/src/main/scala/akka/actor/FaultHandling.scala index e7ae22f57f..79166a01eb 100644 --- a/akka-actor/src/main/scala/akka/actor/FaultHandling.scala +++ b/akka-actor/src/main/scala/akka/actor/FaultHandling.scala @@ -8,6 +8,7 @@ import scala.annotation.tailrec import scala.collection.mutable.ArrayBuffer import scala.collection.JavaConversions._ import java.lang.{ Iterable ⇒ JIterable } +import akka.util.Duration case class ChildRestartStats(val child: ActorRef, var maxNrOfRetriesCount: Int = 0, var restartTimeWindowStartNanos: Long = 0L) { @@ -95,6 +96,13 @@ object SupervisorStrategy { */ def escalate = Escalate + /** + * When supervisorStrategy is not specified for an actor this + * is used by default. The child will be stopped when + * [[akka.ActorInitializationException]] or [[akka.ActorKilledException]] + * is thrown. It will be restarted for other `Exception` types. + * The error is escalated if it's a `Throwable`, i.e. `Error`. + */ final val defaultStrategy: SupervisorStrategy = { def defaultDecider: Decider = { case _: ActorInitializationException ⇒ Stop @@ -102,7 +110,7 @@ object SupervisorStrategy { case _: Exception ⇒ Restart case _ ⇒ Escalate } - OneForOneStrategy(defaultDecider, None, None) + OneForOneStrategy(defaultDecider) } type Decider = PartialFunction[Throwable, Action] @@ -120,14 +128,14 @@ object SupervisorStrategy { * Backwards compatible Decider builder which just checks whether one of * the given Throwables matches the cause and restarts, otherwise escalates. */ - def makeDecider(trapExit: List[Class[_ <: Throwable]]): Decider = + def makeDecider(trapExit: Seq[Class[_ <: Throwable]]): Decider = { case x ⇒ if (trapExit exists (_ isInstance x)) Restart else Escalate } /** * Backwards compatible Decider builder which just checks whether one of * the given Throwables matches the cause and restarts, otherwise escalates. */ - def makeDecider(trapExit: JIterable[Class[_ <: Throwable]]): Decider = makeDecider(trapExit.toList) + def makeDecider(trapExit: JIterable[Class[_ <: Throwable]]): Decider = makeDecider(trapExit.toSeq) /** * Decider builder for Iterables of cause-action pairs, e.g. a map obtained @@ -156,6 +164,11 @@ object SupervisorStrategy { } buf } + + private[akka] def withinTimeRangeOption(withinTimeRange: Duration): Option[Duration] = + if (withinTimeRange.isFinite && withinTimeRange >= Duration.Zero) Some(withinTimeRange) else None + private[akka] def maxNrOfRetriesOption(maxNrOfRetries: Int): Option[Int] = + if (maxNrOfRetries < 0) None else Some(maxNrOfRetries) } abstract class SupervisorStrategy { @@ -199,46 +212,41 @@ abstract class SupervisorStrategy { } object AllForOneStrategy { - def apply(trapExit: List[Class[_ <: Throwable]], maxNrOfRetries: Int, withinTimeRange: Int): AllForOneStrategy = - new AllForOneStrategy(SupervisorStrategy.makeDecider(trapExit), - if (maxNrOfRetries < 0) None else Some(maxNrOfRetries), if (withinTimeRange < 0) None else Some(withinTimeRange)) - def apply(trapExit: List[Class[_ <: Throwable]], maxNrOfRetries: Option[Int], withinTimeRange: Option[Int]): AllForOneStrategy = + def apply(trapExit: Seq[Class[_ <: Throwable]], maxNrOfRetries: Int, withinTimeRange: Duration): AllForOneStrategy = new AllForOneStrategy(SupervisorStrategy.makeDecider(trapExit), maxNrOfRetries, withinTimeRange) - def apply(trapExit: List[Class[_ <: Throwable]], maxNrOfRetries: Option[Int]): AllForOneStrategy = - new AllForOneStrategy(SupervisorStrategy.makeDecider(trapExit), maxNrOfRetries, None) + def apply(trapExit: Seq[Class[_ <: Throwable]], maxNrOfRetries: Int): AllForOneStrategy = + apply(trapExit, maxNrOfRetries, Duration.Inf) + def apply(trapExit: Seq[Class[_ <: Throwable]], withinTimeRange: Duration): AllForOneStrategy = + apply(trapExit, -1, withinTimeRange) } /** * Restart all actors linked to the same supervisor when one fails, * trapExit = which Throwables should be intercepted * maxNrOfRetries = the number of times an actor is allowed to be restarted - * withinTimeRange = millisecond time window for maxNrOfRetries, negative means no window + * withinTimeRange = duration of the time window for maxNrOfRetries, Duration.Inf means no window */ case class AllForOneStrategy(decider: SupervisorStrategy.Decider, - maxNrOfRetries: Option[Int] = None, - withinTimeRange: Option[Int] = None) extends SupervisorStrategy { + maxNrOfRetries: Int = -1, + withinTimeRange: Duration = Duration.Inf) extends SupervisorStrategy { - def this(decider: SupervisorStrategy.JDecider, maxNrOfRetries: Int, withinTimeRange: Int) = - this(SupervisorStrategy.makeDecider(decider), - if (maxNrOfRetries < 0) None else Some(maxNrOfRetries), - if (withinTimeRange < 0) None else Some(withinTimeRange)) + def this(decider: SupervisorStrategy.JDecider, maxNrOfRetries: Int, withinTimeRange: Duration) = + this(SupervisorStrategy.makeDecider(decider), maxNrOfRetries, withinTimeRange) - def this(trapExit: JIterable[Class[_ <: Throwable]], maxNrOfRetries: Int, withinTimeRange: Int) = - this(SupervisorStrategy.makeDecider(trapExit), - if (maxNrOfRetries < 0) None else Some(maxNrOfRetries), - if (withinTimeRange < 0) None else Some(withinTimeRange)) + def this(trapExit: JIterable[Class[_ <: Throwable]], maxNrOfRetries: Int, withinTimeRange: Duration) = + this(SupervisorStrategy.makeDecider(trapExit), maxNrOfRetries, withinTimeRange) - def this(trapExit: Array[Class[_ <: Throwable]], maxNrOfRetries: Int, withinTimeRange: Int) = - this(SupervisorStrategy.makeDecider(trapExit), - if (maxNrOfRetries < 0) None else Some(maxNrOfRetries), - if (withinTimeRange < 0) None else Some(withinTimeRange)) + def this(trapExit: Array[Class[_ <: Throwable]], maxNrOfRetries: Int, withinTimeRange: Duration) = + this(SupervisorStrategy.makeDecider(trapExit), maxNrOfRetries, withinTimeRange) /* * this is a performance optimization to avoid re-allocating the pairs upon * every call to requestRestartPermission, assuming that strategies are shared * across actors and thus this field does not take up much space */ - val retriesWindow = (maxNrOfRetries, withinTimeRange) + private val retriesWindow = ( + SupervisorStrategy.maxNrOfRetriesOption(maxNrOfRetries), + SupervisorStrategy.withinTimeRangeOption(withinTimeRange).map(_.toMillis.toInt)) def handleChildTerminated(context: ActorContext, child: ActorRef, children: Iterable[ActorRef]): Unit = { children foreach (context.stop(_)) @@ -256,46 +264,41 @@ case class AllForOneStrategy(decider: SupervisorStrategy.Decider, } object OneForOneStrategy { - def apply(trapExit: List[Class[_ <: Throwable]], maxNrOfRetries: Int, withinTimeRange: Int): OneForOneStrategy = - new OneForOneStrategy(SupervisorStrategy.makeDecider(trapExit), - if (maxNrOfRetries < 0) None else Some(maxNrOfRetries), if (withinTimeRange < 0) None else Some(withinTimeRange)) - def apply(trapExit: List[Class[_ <: Throwable]], maxNrOfRetries: Option[Int], withinTimeRange: Option[Int]): OneForOneStrategy = + def apply(trapExit: Seq[Class[_ <: Throwable]], maxNrOfRetries: Int, withinTimeRange: Duration): OneForOneStrategy = new OneForOneStrategy(SupervisorStrategy.makeDecider(trapExit), maxNrOfRetries, withinTimeRange) - def apply(trapExit: List[Class[_ <: Throwable]], maxNrOfRetries: Option[Int]): OneForOneStrategy = - new OneForOneStrategy(SupervisorStrategy.makeDecider(trapExit), maxNrOfRetries, None) + def apply(trapExit: Seq[Class[_ <: Throwable]], maxNrOfRetries: Int): OneForOneStrategy = + apply(trapExit, maxNrOfRetries, Duration.Inf) + def apply(trapExit: Seq[Class[_ <: Throwable]], withinTimeRange: Duration): OneForOneStrategy = + apply(trapExit, -1, withinTimeRange) } /** * Restart an actor when it fails * trapExit = which Throwables should be intercepted * maxNrOfRetries = the number of times an actor is allowed to be restarted - * withinTimeRange = millisecond time window for maxNrOfRetries, negative means no window + * withinTimeRange = duration of the time window for maxNrOfRetries, Duration.Inf means no window */ case class OneForOneStrategy(decider: SupervisorStrategy.Decider, - maxNrOfRetries: Option[Int] = None, - withinTimeRange: Option[Int] = None) extends SupervisorStrategy { + maxNrOfRetries: Int = -1, + withinTimeRange: Duration = Duration.Inf) extends SupervisorStrategy { - def this(decider: SupervisorStrategy.JDecider, maxNrOfRetries: Int, withinTimeRange: Int) = - this(SupervisorStrategy.makeDecider(decider), - if (maxNrOfRetries < 0) None else Some(maxNrOfRetries), - if (withinTimeRange < 0) None else Some(withinTimeRange)) + def this(decider: SupervisorStrategy.JDecider, maxNrOfRetries: Int, withinTimeRange: Duration) = + this(SupervisorStrategy.makeDecider(decider), maxNrOfRetries, withinTimeRange) - def this(trapExit: JIterable[Class[_ <: Throwable]], maxNrOfRetries: Int, withinTimeRange: Int) = - this(SupervisorStrategy.makeDecider(trapExit), - if (maxNrOfRetries < 0) None else Some(maxNrOfRetries), - if (withinTimeRange < 0) None else Some(withinTimeRange)) + def this(trapExit: JIterable[Class[_ <: Throwable]], maxNrOfRetries: Int, withinTimeRange: Duration) = + this(SupervisorStrategy.makeDecider(trapExit), maxNrOfRetries, withinTimeRange) - def this(trapExit: Array[Class[_ <: Throwable]], maxNrOfRetries: Int, withinTimeRange: Int) = - this(SupervisorStrategy.makeDecider(trapExit), - if (maxNrOfRetries < 0) None else Some(maxNrOfRetries), - if (withinTimeRange < 0) None else Some(withinTimeRange)) + def this(trapExit: Array[Class[_ <: Throwable]], maxNrOfRetries: Int, withinTimeRange: Duration) = + this(SupervisorStrategy.makeDecider(trapExit), maxNrOfRetries, withinTimeRange) /* * this is a performance optimization to avoid re-allocating the pairs upon * every call to requestRestartPermission, assuming that strategies are shared * across actors and thus this field does not take up much space */ - val retriesWindow = (maxNrOfRetries, withinTimeRange) + private val retriesWindow = ( + SupervisorStrategy.maxNrOfRetriesOption(maxNrOfRetries), + SupervisorStrategy.withinTimeRangeOption(withinTimeRange).map(_.toMillis.toInt)) def handleChildTerminated(context: ActorContext, child: ActorRef, children: Iterable[ActorRef]): Unit = {} diff --git a/akka-actor/src/main/scala/akka/actor/UntypedActor.scala b/akka-actor/src/main/scala/akka/actor/UntypedActor.scala index ba677b750b..46afd447d6 100644 --- a/akka-actor/src/main/scala/akka/actor/UntypedActor.scala +++ b/akka-actor/src/main/scala/akka/actor/UntypedActor.scala @@ -50,7 +50,7 @@ import akka.dispatch.{ MessageDispatcher, Promise } * return escalate(); * } * } - * }, 10, 60000); + * }, 10, Duration.parse("1 minute"); * * @Override * public SupervisorStrategy supervisorStrategy() { diff --git a/akka-docs/java/code/akka/docs/actor/FaultHandlingTestBase.java b/akka-docs/java/code/akka/docs/actor/FaultHandlingTestBase.java index e33644409d..09be1aaaca 100644 --- a/akka-docs/java/code/akka/docs/actor/FaultHandlingTestBase.java +++ b/akka-docs/java/code/akka/docs/actor/FaultHandlingTestBase.java @@ -52,7 +52,7 @@ public class FaultHandlingTestBase { return escalate(); } } - }, 10, 60000); + }, 10, Duration.parse("1 minute")); @Override public SupervisorStrategy supervisorStrategy() { @@ -89,7 +89,7 @@ public class FaultHandlingTestBase { return escalate(); } } - }, 10, 60000); + }, 10, Duration.parse("1 minute")); @Override public SupervisorStrategy supervisorStrategy() { diff --git a/akka-docs/project/migration-guide-1.3.x-2.0.x.rst b/akka-docs/project/migration-guide-1.3.x-2.0.x.rst index 57a19fb393..a8b17fa2de 100644 --- a/akka-docs/project/migration-guide-1.3.x-2.0.x.rst +++ b/akka-docs/project/migration-guide-1.3.x-2.0.x.rst @@ -484,7 +484,7 @@ v2.0:: case _: NullPointerException ⇒ Restart case _: IllegalArgumentException ⇒ Stop case _: Exception ⇒ Escalate - }: Decider, maxNrOfRetries = Some(10), withinTimeRange = Some(60000)) + }: Decider, maxNrOfRetries = 10, withinTimeRange = 1 minute) def receive = { case x => diff --git a/akka-docs/scala/code/akka/docs/actor/FaultHandlingDocSpec.scala b/akka-docs/scala/code/akka/docs/actor/FaultHandlingDocSpec.scala index 22790d2110..5e6b15078c 100644 --- a/akka-docs/scala/code/akka/docs/actor/FaultHandlingDocSpec.scala +++ b/akka-docs/scala/code/akka/docs/actor/FaultHandlingDocSpec.scala @@ -20,13 +20,14 @@ object FaultHandlingDocSpec { //#strategy import akka.actor.OneForOneStrategy import akka.actor.SupervisorStrategy._ + import akka.util.duration._ override val supervisorStrategy = OneForOneStrategy({ case _: ArithmeticException ⇒ Resume case _: NullPointerException ⇒ Restart case _: IllegalArgumentException ⇒ Stop case _: Exception ⇒ Escalate - }: Decider, maxNrOfRetries = Some(10), withinTimeRange = Some(60000)) + }: Decider, maxNrOfRetries = 10, withinTimeRange = 1 minute) //#strategy def receive = { @@ -40,13 +41,14 @@ object FaultHandlingDocSpec { //#strategy2 import akka.actor.OneForOneStrategy import akka.actor.SupervisorStrategy._ + import akka.util.duration._ override val supervisorStrategy = OneForOneStrategy({ case _: ArithmeticException ⇒ Resume case _: NullPointerException ⇒ Restart case _: IllegalArgumentException ⇒ Stop case _: Exception ⇒ Escalate - }: Decider, maxNrOfRetries = Some(10), withinTimeRange = Some(60000)) + }: Decider, maxNrOfRetries = 10, withinTimeRange = 1 minute) //#strategy2 def receive = { diff --git a/akka-testkit/src/test/scala/akka/testkit/TestActorRefSpec.scala b/akka-testkit/src/test/scala/akka/testkit/TestActorRefSpec.scala index eee9318c02..48a62b21eb 100644 --- a/akka-testkit/src/test/scala/akka/testkit/TestActorRefSpec.scala +++ b/akka-testkit/src/test/scala/akka/testkit/TestActorRefSpec.scala @@ -182,7 +182,8 @@ class TestActorRefSpec extends AkkaSpec("disp1.type=Dispatcher") with BeforeAndA override def postRestart(reason: Throwable) { counter -= 1 } }), self, "child") - override def supervisorStrategy = OneForOneStrategy(List(classOf[ActorKilledException]), 5, 1000) + override def supervisorStrategy = OneForOneStrategy(List(classOf[ActorKilledException]), + maxNrOfRetries = 5, withinTimeRange = 1 second) def receiveT = { case "sendKill" ⇒ ref ! Kill } })) From 2a6b7f9b03d0e9db879c99e4bc6beccbf019680e Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Tue, 24 Jan 2012 10:35:09 +0100 Subject: [PATCH 2/7] Moved decider to separate parameter list, and implicit conversion from Seq[Throwable]. See #1714 --- .../ActorFireForgetRequestReplySpec.scala | 4 +- .../scala/akka/actor/ActorLifeCycleSpec.scala | 9 ++- .../test/scala/akka/actor/ActorRefSpec.scala | 3 +- .../scala/akka/actor/DeathWatchSpec.scala | 5 +- .../scala/akka/actor/FSMTransitionSpec.scala | 2 +- .../akka/actor/RestartStrategySpec.scala | 14 ++-- .../test/scala/akka/actor/SchedulerSpec.scala | 2 +- .../akka/actor/SupervisorHierarchySpec.scala | 7 +- .../scala/akka/actor/SupervisorMiscSpec.scala | 4 +- .../scala/akka/actor/SupervisorSpec.scala | 32 ++++---- .../scala/akka/actor/SupervisorTreeSpec.scala | 3 +- .../test/scala/akka/actor/Ticket669Spec.scala | 7 +- .../scala/akka/actor/TypedActorSpec.scala | 2 +- .../scala/akka/event/LoggingReceiveSpec.scala | 4 +- .../src/main/scala/akka/actor/Actor.scala | 4 +- .../scala/akka/actor/ActorRefProvider.scala | 2 +- .../main/scala/akka/actor/FaultHandling.scala | 79 ++++++++----------- .../main/scala/akka/actor/UntypedActor.scala | 27 ++++--- .../docs/actor/FaultHandlingTestBase.java | 58 +++++++------- .../project/migration-guide-1.3.x-2.0.x.rst | 6 +- .../docs/actor/FaultHandlingDocSpec.scala | 8 +- .../scala/akka/testkit/TestActorRefSpec.scala | 4 +- 22 files changed, 142 insertions(+), 144 deletions(-) diff --git a/akka-actor-tests/src/test/scala/akka/actor/ActorFireForgetRequestReplySpec.scala b/akka-actor-tests/src/test/scala/akka/actor/ActorFireForgetRequestReplySpec.scala index 97ebdbaa1e..69cf463276 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/ActorFireForgetRequestReplySpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/ActorFireForgetRequestReplySpec.scala @@ -81,8 +81,8 @@ class ActorFireForgetRequestReplySpec extends AkkaSpec with BeforeAndAfterEach w "should shutdown crashed temporary actor" in { filterEvents(EventFilter[Exception]("Expected exception")) { - val supervisor = system.actorOf(Props(new Supervisor(OneForOneStrategy(List(classOf[Exception]), - maxNrOfRetries = 0)))) + val supervisor = system.actorOf(Props(new Supervisor( + OneForOneStrategy(maxNrOfRetries = 0)(List(classOf[Exception]))))) val actor = Await.result((supervisor ? Props[CrashingActor]).mapTo[ActorRef], timeout.duration) actor.isTerminated must be(false) actor ! "Die" diff --git a/akka-actor-tests/src/test/scala/akka/actor/ActorLifeCycleSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/ActorLifeCycleSpec.scala index fe30c2b39b..781b578d93 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/ActorLifeCycleSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/ActorLifeCycleSpec.scala @@ -36,7 +36,8 @@ class ActorLifeCycleSpec extends AkkaSpec with BeforeAndAfterEach with ImplicitS "invoke preRestart, preStart, postRestart when using OneForOneStrategy" in { filterException[ActorKilledException] { val id = newUuid().toString - val supervisor = system.actorOf(Props(new Supervisor(OneForOneStrategy(List(classOf[Exception]), maxNrOfRetries = 3)))) + val supervisor = system.actorOf(Props(new Supervisor( + OneForOneStrategy(maxNrOfRetries = 3)(List(classOf[Exception]))))) val gen = new AtomicInteger(0) val restarterProps = Props(new LifeCycleTestActor(testActor, id, gen) { override def preRestart(reason: Throwable, message: Option[Any]) { report("preRestart") } @@ -70,7 +71,8 @@ class ActorLifeCycleSpec extends AkkaSpec with BeforeAndAfterEach with ImplicitS "default for preRestart and postRestart is to call postStop and preStart respectively" in { filterException[ActorKilledException] { val id = newUuid().toString - val supervisor = system.actorOf(Props(new Supervisor(OneForOneStrategy(List(classOf[Exception]), maxNrOfRetries = 3)))) + val supervisor = system.actorOf(Props(new Supervisor( + OneForOneStrategy(maxNrOfRetries = 3)(List(classOf[Exception]))))) val gen = new AtomicInteger(0) val restarterProps = Props(new LifeCycleTestActor(testActor, id, gen)) val restarter = Await.result((supervisor ? restarterProps).mapTo[ActorRef], timeout.duration) @@ -100,7 +102,8 @@ class ActorLifeCycleSpec extends AkkaSpec with BeforeAndAfterEach with ImplicitS "not invoke preRestart and postRestart when never restarted using OneForOneStrategy" in { val id = newUuid().toString - val supervisor = system.actorOf(Props(new Supervisor(OneForOneStrategy(List(classOf[Exception]), maxNrOfRetries = 3)))) + val supervisor = system.actorOf(Props(new Supervisor( + OneForOneStrategy(maxNrOfRetries = 3)(List(classOf[Exception]))))) val gen = new AtomicInteger(0) val props = Props(new LifeCycleTestActor(testActor, id, gen)) val a = Await.result((supervisor ? props).mapTo[ActorRef], timeout.duration) diff --git a/akka-actor-tests/src/test/scala/akka/actor/ActorRefSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/ActorRefSpec.scala index 5158de2262..a4dbb4d1cb 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/ActorRefSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/ActorRefSpec.scala @@ -376,7 +376,8 @@ class ActorRefSpec extends AkkaSpec with DefaultTimeout { val boss = system.actorOf(Props(new Actor { - override val supervisorStrategy = OneForOneStrategy(List(classOf[Throwable]), maxNrOfRetries = 2, withinTimeRange = 1 second) + override val supervisorStrategy = + OneForOneStrategy(maxNrOfRetries = 2, withinTimeRange = 1 second)(List(classOf[Throwable])) val ref = context.actorOf( Props(new Actor { diff --git a/akka-actor-tests/src/test/scala/akka/actor/DeathWatchSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/DeathWatchSpec.scala index 921e57d2a4..3e8ff57de5 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/DeathWatchSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/DeathWatchSpec.scala @@ -95,7 +95,8 @@ trait DeathWatchSpec { this: AkkaSpec with ImplicitSender with DefaultTimeout "notify with a Terminated message once when an Actor is stopped but not when restarted" in { filterException[ActorKilledException] { - val supervisor = system.actorOf(Props(new Supervisor(OneForOneStrategy(List(classOf[Exception]), maxNrOfRetries = 2)))) + val supervisor = system.actorOf(Props(new Supervisor( + OneForOneStrategy(maxNrOfRetries = 2)(List(classOf[Exception]))))) val terminalProps = Props(context ⇒ { case x ⇒ context.sender ! x }) val terminal = Await.result((supervisor ? terminalProps).mapTo[ActorRef], timeout.duration) @@ -116,7 +117,7 @@ trait DeathWatchSpec { this: AkkaSpec with ImplicitSender with DefaultTimeout "fail a monitor which does not handle Terminated()" in { filterEvents(EventFilter[ActorKilledException](), EventFilter[DeathPactException]()) { case class FF(fail: Failed) - val strategy = new OneForOneStrategy(SupervisorStrategy.makeDecider(List(classOf[Exception])), maxNrOfRetries = 0) { + val strategy = new OneForOneStrategy(maxNrOfRetries = 0)(SupervisorStrategy.makeDecider(List(classOf[Exception]))) { override def handleFailure(context: ActorContext, child: ActorRef, cause: Throwable, stats: ChildRestartStats, children: Iterable[ChildRestartStats]) = { testActor.tell(FF(Failed(cause)), child) super.handleFailure(context, child, cause, stats, children) diff --git a/akka-actor-tests/src/test/scala/akka/actor/FSMTransitionSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/FSMTransitionSpec.scala index d0c270070e..8d8fc5e725 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/FSMTransitionSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/FSMTransitionSpec.scala @@ -72,7 +72,7 @@ class FSMTransitionSpec extends AkkaSpec with ImplicitSender { val fsm = system.actorOf(Props(new MyFSM(testActor))) val sup = system.actorOf(Props(new Actor { context.watch(fsm) - override val supervisorStrategy = OneForOneStrategy(List(classOf[Throwable]), Duration.Inf) + override val supervisorStrategy = OneForOneStrategy(withinTimeRange = Duration.Inf)(List(classOf[Throwable])) def receive = { case _ ⇒ } })) diff --git a/akka-actor-tests/src/test/scala/akka/actor/RestartStrategySpec.scala b/akka-actor-tests/src/test/scala/akka/actor/RestartStrategySpec.scala index 17750dd693..829ab081e0 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/RestartStrategySpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/RestartStrategySpec.scala @@ -30,8 +30,8 @@ class RestartStrategySpec extends AkkaSpec with DefaultTimeout { "A RestartStrategy" must { "ensure that slave stays dead after max restarts within time range" in { - val boss = system.actorOf(Props(new Supervisor(OneForOneStrategy(List(classOf[Throwable]), - maxNrOfRetries = 2, withinTimeRange = 1 second)))) + val boss = system.actorOf(Props(new Supervisor( + OneForOneStrategy(maxNrOfRetries = 2, withinTimeRange = 1 second)(List(classOf[Throwable]))))) val restartLatch = new TestLatch val secondRestartLatch = new TestLatch @@ -77,7 +77,7 @@ class RestartStrategySpec extends AkkaSpec with DefaultTimeout { } "ensure that slave is immortal without max restarts and time range" in { - val boss = system.actorOf(Props(new Supervisor(OneForOneStrategy(List(classOf[Throwable]), Duration.Inf)))) + val boss = system.actorOf(Props(new Supervisor(OneForOneStrategy()(List(classOf[Throwable]))))) val countDownLatch = new TestLatch(100) @@ -99,8 +99,8 @@ class RestartStrategySpec extends AkkaSpec with DefaultTimeout { } "ensure that slave restarts after number of crashes not within time range" in { - val boss = system.actorOf(Props(new Supervisor(OneForOneStrategy(List(classOf[Throwable]), - maxNrOfRetries = 2, withinTimeRange = 500 millis)))) + val boss = system.actorOf(Props(new Supervisor( + OneForOneStrategy(maxNrOfRetries = 2, withinTimeRange = 500 millis)(List(classOf[Throwable]))))) val restartLatch = new TestLatch val secondRestartLatch = new TestLatch @@ -157,7 +157,7 @@ class RestartStrategySpec extends AkkaSpec with DefaultTimeout { } "ensure that slave is not restarted after max retries" in { - val boss = system.actorOf(Props(new Supervisor(OneForOneStrategy(List(classOf[Throwable]), maxNrOfRetries = 2)))) + val boss = system.actorOf(Props(new Supervisor(OneForOneStrategy(maxNrOfRetries = 2)(List(classOf[Throwable]))))) val restartLatch = new TestLatch val secondRestartLatch = new TestLatch @@ -212,7 +212,7 @@ class RestartStrategySpec extends AkkaSpec with DefaultTimeout { val countDownLatch = new TestLatch(2) val boss = system.actorOf(Props(new Actor { - override val supervisorStrategy = OneForOneStrategy(List(classOf[Throwable]), withinTimeRange = 1 second) + override val supervisorStrategy = OneForOneStrategy(withinTimeRange = 1 second)(List(classOf[Throwable])) def receive = { case p: Props ⇒ sender ! context.watch(context.actorOf(p)) case t: Terminated ⇒ maxNoOfRestartsLatch.open() diff --git a/akka-actor-tests/src/test/scala/akka/actor/SchedulerSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/SchedulerSpec.scala index ea7522f30d..d205d1a1ad 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/SchedulerSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/SchedulerSpec.scala @@ -134,7 +134,7 @@ class SchedulerSpec extends AkkaSpec with BeforeAndAfterEach with DefaultTimeout val restartLatch = new TestLatch val pingLatch = new TestLatch(6) - val supervisor = system.actorOf(Props(new Supervisor(AllForOneStrategy(List(classOf[Exception]), 3, 1 second)))) + val supervisor = system.actorOf(Props(new Supervisor(AllForOneStrategy(3, 1 second)(List(classOf[Exception]))))) val props = Props(new Actor { def receive = { case Ping ⇒ pingLatch.countDown() diff --git a/akka-actor-tests/src/test/scala/akka/actor/SupervisorHierarchySpec.scala b/akka-actor-tests/src/test/scala/akka/actor/SupervisorHierarchySpec.scala index 185651b278..a04e83f39b 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/SupervisorHierarchySpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/SupervisorHierarchySpec.scala @@ -40,9 +40,9 @@ class SupervisorHierarchySpec extends AkkaSpec with DefaultTimeout { "restart manager and workers in AllForOne" in { val countDown = new CountDownLatch(4) - val boss = system.actorOf(Props(new Supervisor(OneForOneStrategy(List(classOf[Exception]), Duration.Inf)))) + val boss = system.actorOf(Props(new Supervisor(OneForOneStrategy()(List(classOf[Exception]))))) - val managerProps = Props(new CountDownActor(countDown, AllForOneStrategy(List(), Duration.Inf))) + val managerProps = Props(new CountDownActor(countDown, AllForOneStrategy()(List()))) val manager = Await.result((boss ? managerProps).mapTo[ActorRef], timeout.duration) val workerProps = Props(new CountDownActor(countDown, SupervisorStrategy.defaultStrategy)) @@ -62,7 +62,8 @@ class SupervisorHierarchySpec extends AkkaSpec with DefaultTimeout { val countDownMessages = new CountDownLatch(1) val countDownMax = new CountDownLatch(1) val boss = system.actorOf(Props(new Actor { - override val supervisorStrategy = OneForOneStrategy(List(classOf[Throwable]), maxNrOfRetries = 1, withinTimeRange = 5 seconds) + override val supervisorStrategy = + OneForOneStrategy(maxNrOfRetries = 1, withinTimeRange = 5 seconds)(List(classOf[Throwable])) val crasher = context.watch(context.actorOf(Props(new CountDownActor(countDownMessages, SupervisorStrategy.defaultStrategy)))) diff --git a/akka-actor-tests/src/test/scala/akka/actor/SupervisorMiscSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/SupervisorMiscSpec.scala index 7eb5007152..fccfc75d98 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/SupervisorMiscSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/SupervisorMiscSpec.scala @@ -30,8 +30,8 @@ class SupervisorMiscSpec extends AkkaSpec(SupervisorMiscSpec.config) with Defaul filterEvents(EventFilter[Exception]("Kill")) { val countDownLatch = new CountDownLatch(4) - val supervisor = system.actorOf(Props(new Supervisor(OneForOneStrategy(List(classOf[Exception]), - maxNrOfRetries = 3, withinTimeRange = 5 seconds)))) + val supervisor = system.actorOf(Props(new Supervisor( + OneForOneStrategy(maxNrOfRetries = 3, withinTimeRange = 5 seconds)(List(classOf[Exception]))))) val workerProps = Props(new Actor { override def postRestart(cause: Throwable) { countDownLatch.countDown() } diff --git a/akka-actor-tests/src/test/scala/akka/actor/SupervisorSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/SupervisorSpec.scala index 630da50f6d..91701596a6 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/SupervisorSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/SupervisorSpec.scala @@ -54,7 +54,7 @@ object SupervisorSpec { var s: ActorRef = _ - override val supervisorStrategy = OneForOneStrategy(List(classOf[Exception]), maxNrOfRetries = 0) + override val supervisorStrategy = OneForOneStrategy(maxNrOfRetries = 0)(List(classOf[Exception])) def receive = { case Die ⇒ temp forward Die @@ -78,51 +78,51 @@ class SupervisorSpec extends AkkaSpec with BeforeAndAfterEach with ImplicitSende private def child(supervisor: ActorRef, props: Props): ActorRef = Await.result((supervisor ? props).mapTo[ActorRef], timeout.duration) def temporaryActorAllForOne = { - val supervisor = system.actorOf(Props(new Supervisor(AllForOneStrategy(List(classOf[Exception]), maxNrOfRetries = 0)))) + val supervisor = system.actorOf(Props(new Supervisor(AllForOneStrategy(maxNrOfRetries = 0)(List(classOf[Exception]))))) val temporaryActor = child(supervisor, Props(new PingPongActor(testActor))) (temporaryActor, supervisor) } def singleActorAllForOne = { - val supervisor = system.actorOf(Props(new Supervisor(AllForOneStrategy(List(classOf[Exception]), - maxNrOfRetries = 3, withinTimeRange = DilatedTimeout)))) + val supervisor = system.actorOf(Props(new Supervisor( + AllForOneStrategy(maxNrOfRetries = 3, withinTimeRange = DilatedTimeout)(List(classOf[Exception]))))) val pingpong = child(supervisor, Props(new PingPongActor(testActor))) (pingpong, supervisor) } def singleActorOneForOne = { - val supervisor = system.actorOf(Props(new Supervisor(OneForOneStrategy(List(classOf[Exception]), - maxNrOfRetries = 3, withinTimeRange = DilatedTimeout)))) + val supervisor = system.actorOf(Props(new Supervisor( + OneForOneStrategy(maxNrOfRetries = 3, withinTimeRange = DilatedTimeout)(List(classOf[Exception]))))) val pingpong = child(supervisor, Props(new PingPongActor(testActor))) (pingpong, supervisor) } def multipleActorsAllForOne = { - val supervisor = system.actorOf(Props(new Supervisor(AllForOneStrategy(List(classOf[Exception]), - maxNrOfRetries = 3, withinTimeRange = DilatedTimeout)))) + val supervisor = system.actorOf(Props(new Supervisor( + AllForOneStrategy(maxNrOfRetries = 3, withinTimeRange = DilatedTimeout)(List(classOf[Exception]))))) val pingpong1, pingpong2, pingpong3 = child(supervisor, Props(new PingPongActor(testActor))) (pingpong1, pingpong2, pingpong3, supervisor) } def multipleActorsOneForOne = { - val supervisor = system.actorOf(Props(new Supervisor(OneForOneStrategy(List(classOf[Exception]), - maxNrOfRetries = 3, withinTimeRange = DilatedTimeout)))) + val supervisor = system.actorOf(Props(new Supervisor( + OneForOneStrategy(maxNrOfRetries = 3, withinTimeRange = DilatedTimeout)(List(classOf[Exception]))))) val pingpong1, pingpong2, pingpong3 = child(supervisor, Props(new PingPongActor(testActor))) (pingpong1, pingpong2, pingpong3, supervisor) } def nestedSupervisorsAllForOne = { - val topSupervisor = system.actorOf(Props(new Supervisor(AllForOneStrategy(List(classOf[Exception]), - maxNrOfRetries = 3, withinTimeRange = DilatedTimeout)))) + val topSupervisor = system.actorOf(Props(new Supervisor( + AllForOneStrategy(maxNrOfRetries = 3, withinTimeRange = DilatedTimeout)(List(classOf[Exception]))))) val pingpong1 = child(topSupervisor, Props(new PingPongActor(testActor))) - val middleSupervisor = child(topSupervisor, Props(new Supervisor(AllForOneStrategy(Nil, - maxNrOfRetries = 3, withinTimeRange = DilatedTimeout)))) + val middleSupervisor = child(topSupervisor, Props(new Supervisor( + AllForOneStrategy(maxNrOfRetries = 3, withinTimeRange = DilatedTimeout)(Nil)))) val pingpong2, pingpong3 = child(middleSupervisor, Props(new PingPongActor(testActor))) (pingpong1, pingpong2, pingpong3, topSupervisor) @@ -286,8 +286,8 @@ class SupervisorSpec extends AkkaSpec with BeforeAndAfterEach with ImplicitSende "must attempt restart when exception during restart" in { val inits = new AtomicInteger(0) - val supervisor = system.actorOf(Props(new Supervisor(OneForOneStrategy(classOf[Exception] :: Nil, - maxNrOfRetries = 3, withinTimeRange = 10 seconds)))) + val supervisor = system.actorOf(Props(new Supervisor( + OneForOneStrategy(maxNrOfRetries = 3, withinTimeRange = 10 seconds)(classOf[Exception] :: Nil)))) val dyingProps = Props(new Actor { inits.incrementAndGet diff --git a/akka-actor-tests/src/test/scala/akka/actor/SupervisorTreeSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/SupervisorTreeSpec.scala index b9e3ff4a20..b84cce002c 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/SupervisorTreeSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/SupervisorTreeSpec.scala @@ -23,8 +23,7 @@ class SupervisorTreeSpec extends AkkaSpec with ImplicitSender with DefaultTimeou EventFilter[ActorKilledException](occurrences = 1) intercept { within(5 seconds) { val p = Props(new Actor { - override val supervisorStrategy = OneForOneStrategy(List(classOf[Exception]), - maxNrOfRetries = 3, withinTimeRange = 1 second) + override val supervisorStrategy = OneForOneStrategy(maxNrOfRetries = 3, withinTimeRange = 1 second)(List(classOf[Exception])) def receive = { case p: Props ⇒ sender ! context.actorOf(p) } diff --git a/akka-actor-tests/src/test/scala/akka/actor/Ticket669Spec.scala b/akka-actor-tests/src/test/scala/akka/actor/Ticket669Spec.scala index b43ffc6d0c..285b63c2c7 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/Ticket669Spec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/Ticket669Spec.scala @@ -26,7 +26,8 @@ class Ticket669Spec extends AkkaSpec with BeforeAndAfterAll with ImplicitSender "A supervised actor with lifecycle PERMANENT" should { "be able to reply on failure during preRestart" in { filterEvents(EventFilter[Exception]("test", occurrences = 1)) { - val supervisor = system.actorOf(Props(new Supervisor(AllForOneStrategy(List(classOf[Exception]), 5, 10 seconds)))) + val supervisor = system.actorOf(Props(new Supervisor( + AllForOneStrategy(5, 10 seconds)(List(classOf[Exception]))))) val supervised = Await.result((supervisor ? Props[Supervised]).mapTo[ActorRef], timeout.duration) supervised.!("test")(testActor) @@ -37,8 +38,8 @@ class Ticket669Spec extends AkkaSpec with BeforeAndAfterAll with ImplicitSender "be able to reply on failure during postStop" in { filterEvents(EventFilter[Exception]("test", occurrences = 1)) { - val supervisor = system.actorOf(Props(new Supervisor(AllForOneStrategy(List(classOf[Exception]), - maxNrOfRetries = 0)))) + val supervisor = system.actorOf(Props(new Supervisor( + AllForOneStrategy(maxNrOfRetries = 0)(List(classOf[Exception]))))) val supervised = Await.result((supervisor ? Props[Supervised]).mapTo[ActorRef], timeout.duration) supervised.!("test")(testActor) diff --git a/akka-actor-tests/src/test/scala/akka/actor/TypedActorSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/TypedActorSpec.scala index ced257dfc8..49b37cc506 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/TypedActorSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/TypedActorSpec.scala @@ -300,7 +300,7 @@ class TypedActorSpec extends AkkaSpec(TypedActorSpec.config) "be able to handle exceptions when calling methods" in { filterEvents(EventFilter[IllegalStateException]("expected")) { val boss = system.actorOf(Props(new Actor { - override val supervisorStrategy = OneForOneStrategy { + override val supervisorStrategy = OneForOneStrategy() { case e: IllegalStateException if e.getMessage == "expected" ⇒ SupervisorStrategy.Resume } def receive = { diff --git a/akka-actor-tests/src/test/scala/akka/event/LoggingReceiveSpec.scala b/akka-actor-tests/src/test/scala/akka/event/LoggingReceiveSpec.scala index 1857c0938b..b632bd3c40 100644 --- a/akka-actor-tests/src/test/scala/akka/event/LoggingReceiveSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/event/LoggingReceiveSpec.scala @@ -15,8 +15,8 @@ import akka.actor._ object LoggingReceiveSpec { class TestLogActor extends Actor { - override val supervisorStrategy = OneForOneStrategy(List(classOf[Throwable]), - maxNrOfRetries = 5, withinTimeRange = 5 seconds) + override val supervisorStrategy = + OneForOneStrategy(maxNrOfRetries = 5, withinTimeRange = 5 seconds)(List(classOf[Throwable])) def receive = { case _ ⇒ } } } diff --git a/akka-actor/src/main/scala/akka/actor/Actor.scala b/akka-actor/src/main/scala/akka/actor/Actor.scala index 86e9cb816d..7a1f640160 100644 --- a/akka-actor/src/main/scala/akka/actor/Actor.scala +++ b/akka-actor/src/main/scala/akka/actor/Actor.scala @@ -142,12 +142,12 @@ object Actor { * {{{ * class ExampleActor extends Actor { * - * override val supervisorStrategy = OneForOneStrategy({ + * override val supervisorStrategy = OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange = 1 minute) { * case _: ArithmeticException ⇒ Resume * case _: NullPointerException ⇒ Restart * case _: IllegalArgumentException ⇒ Stop * case _: Exception ⇒ Escalate - * }: Decider, maxNrOfRetries = 10, withinTimeRange = 1 minute)) + * } * * def receive = { * // directly calculated reply diff --git a/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala b/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala index db7ca09664..2ac45e90a9 100755 --- a/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala @@ -355,7 +355,7 @@ class LocalActorRefProvider( override val supervisorStrategy = { import akka.actor.SupervisorStrategy._ - OneForOneStrategy { + OneForOneStrategy() { case _: ActorKilledException ⇒ Stop case _: ActorInitializationException ⇒ Stop case _: Exception ⇒ Restart diff --git a/akka-actor/src/main/scala/akka/actor/FaultHandling.scala b/akka-actor/src/main/scala/akka/actor/FaultHandling.scala index 79166a01eb..4c7f3611a2 100644 --- a/akka-actor/src/main/scala/akka/actor/FaultHandling.scala +++ b/akka-actor/src/main/scala/akka/actor/FaultHandling.scala @@ -110,29 +110,35 @@ object SupervisorStrategy { case _: Exception ⇒ Restart case _ ⇒ Escalate } - OneForOneStrategy(defaultDecider) + OneForOneStrategy()(defaultDecider) } + /** + * Implicit conversion from `Seq` of Throwables to a `Decider`. + * This maps the given Throwables to restarts, otherwise escalates. + */ + implicit def seqThrowable2Decider(trapExit: Seq[Class[_ <: Throwable]]): Decider = makeDecider(trapExit) + type Decider = PartialFunction[Throwable, Action] type JDecider = akka.japi.Function[Throwable, Action] type CauseAction = (Class[_ <: Throwable], Action) /** - * Backwards compatible Decider builder which just checks whether one of + * Decider builder which just checks whether one of * the given Throwables matches the cause and restarts, otherwise escalates. */ def makeDecider(trapExit: Array[Class[_ <: Throwable]]): Decider = { case x ⇒ if (trapExit exists (_ isInstance x)) Restart else Escalate } /** - * Backwards compatible Decider builder which just checks whether one of + * Decider builder which just checks whether one of * the given Throwables matches the cause and restarts, otherwise escalates. */ def makeDecider(trapExit: Seq[Class[_ <: Throwable]]): Decider = { case x ⇒ if (trapExit exists (_ isInstance x)) Restart else Escalate } /** - * Backwards compatible Decider builder which just checks whether one of + * Decider builder which just checks whether one of * the given Throwables matches the cause and restarts, otherwise escalates. */ def makeDecider(trapExit: JIterable[Class[_ <: Throwable]]): Decider = makeDecider(trapExit.toSeq) @@ -209,35 +215,27 @@ abstract class SupervisorStrategy { case Escalate ⇒ false } } -} -object AllForOneStrategy { - def apply(trapExit: Seq[Class[_ <: Throwable]], maxNrOfRetries: Int, withinTimeRange: Duration): AllForOneStrategy = - new AllForOneStrategy(SupervisorStrategy.makeDecider(trapExit), maxNrOfRetries, withinTimeRange) - def apply(trapExit: Seq[Class[_ <: Throwable]], maxNrOfRetries: Int): AllForOneStrategy = - apply(trapExit, maxNrOfRetries, Duration.Inf) - def apply(trapExit: Seq[Class[_ <: Throwable]], withinTimeRange: Duration): AllForOneStrategy = - apply(trapExit, -1, withinTimeRange) } /** * Restart all actors linked to the same supervisor when one fails, - * trapExit = which Throwables should be intercepted - * maxNrOfRetries = the number of times an actor is allowed to be restarted - * withinTimeRange = duration of the time window for maxNrOfRetries, Duration.Inf means no window + * @param maxNrOfRetries the number of times an actor is allowed to be restarted + * @param withinTimeRange duration of the time window for maxNrOfRetries, Duration.Inf means no window + * @param decider = mapping from Throwable to [[akka.actor.SupervisorStrategy.Action]], you can also use a + * `Seq` of Throwables which maps the given Throwables to restarts, otherwise escalates. */ -case class AllForOneStrategy(decider: SupervisorStrategy.Decider, - maxNrOfRetries: Int = -1, - withinTimeRange: Duration = Duration.Inf) extends SupervisorStrategy { +case class AllForOneStrategy(maxNrOfRetries: Int = -1, withinTimeRange: Duration = Duration.Inf)(val decider: SupervisorStrategy.Decider) + extends SupervisorStrategy { - def this(decider: SupervisorStrategy.JDecider, maxNrOfRetries: Int, withinTimeRange: Duration) = - this(SupervisorStrategy.makeDecider(decider), maxNrOfRetries, withinTimeRange) + def this(maxNrOfRetries: Int, withinTimeRange: Duration, decider: SupervisorStrategy.JDecider) = + this(maxNrOfRetries, withinTimeRange)(SupervisorStrategy.makeDecider(decider)) - def this(trapExit: JIterable[Class[_ <: Throwable]], maxNrOfRetries: Int, withinTimeRange: Duration) = - this(SupervisorStrategy.makeDecider(trapExit), maxNrOfRetries, withinTimeRange) + def this(maxNrOfRetries: Int, withinTimeRange: Duration, trapExit: JIterable[Class[_ <: Throwable]]) = + this(maxNrOfRetries, withinTimeRange)(SupervisorStrategy.makeDecider(trapExit)) - def this(trapExit: Array[Class[_ <: Throwable]], maxNrOfRetries: Int, withinTimeRange: Duration) = - this(SupervisorStrategy.makeDecider(trapExit), maxNrOfRetries, withinTimeRange) + def this(maxNrOfRetries: Int, withinTimeRange: Duration, trapExit: Array[Class[_ <: Throwable]]) = + this(maxNrOfRetries, withinTimeRange)(SupervisorStrategy.makeDecider(trapExit)) /* * this is a performance optimization to avoid re-allocating the pairs upon @@ -263,33 +261,24 @@ case class AllForOneStrategy(decider: SupervisorStrategy.Decider, } } -object OneForOneStrategy { - def apply(trapExit: Seq[Class[_ <: Throwable]], maxNrOfRetries: Int, withinTimeRange: Duration): OneForOneStrategy = - new OneForOneStrategy(SupervisorStrategy.makeDecider(trapExit), maxNrOfRetries, withinTimeRange) - def apply(trapExit: Seq[Class[_ <: Throwable]], maxNrOfRetries: Int): OneForOneStrategy = - apply(trapExit, maxNrOfRetries, Duration.Inf) - def apply(trapExit: Seq[Class[_ <: Throwable]], withinTimeRange: Duration): OneForOneStrategy = - apply(trapExit, -1, withinTimeRange) -} - /** * Restart an actor when it fails - * trapExit = which Throwables should be intercepted - * maxNrOfRetries = the number of times an actor is allowed to be restarted - * withinTimeRange = duration of the time window for maxNrOfRetries, Duration.Inf means no window + * @param maxNrOfRetries the number of times an actor is allowed to be restarted + * @param withinTimeRange duration of the time window for maxNrOfRetries, Duration.Inf means no window + * @param decider = mapping from Throwable to [[akka.actor.SupervisorStrategy.Action]], you can also use a + * `Seq` of Throwables which maps the given Throwables to restarts, otherwise escalates. */ -case class OneForOneStrategy(decider: SupervisorStrategy.Decider, - maxNrOfRetries: Int = -1, - withinTimeRange: Duration = Duration.Inf) extends SupervisorStrategy { +case class OneForOneStrategy(maxNrOfRetries: Int = -1, withinTimeRange: Duration = Duration.Inf)(val decider: SupervisorStrategy.Decider) + extends SupervisorStrategy { - def this(decider: SupervisorStrategy.JDecider, maxNrOfRetries: Int, withinTimeRange: Duration) = - this(SupervisorStrategy.makeDecider(decider), maxNrOfRetries, withinTimeRange) + def this(maxNrOfRetries: Int, withinTimeRange: Duration, decider: SupervisorStrategy.JDecider) = + this(maxNrOfRetries, withinTimeRange)(SupervisorStrategy.makeDecider(decider)) - def this(trapExit: JIterable[Class[_ <: Throwable]], maxNrOfRetries: Int, withinTimeRange: Duration) = - this(SupervisorStrategy.makeDecider(trapExit), maxNrOfRetries, withinTimeRange) + def this(maxNrOfRetries: Int, withinTimeRange: Duration, trapExit: JIterable[Class[_ <: Throwable]]) = + this(maxNrOfRetries, withinTimeRange)(SupervisorStrategy.makeDecider(trapExit)) - def this(trapExit: Array[Class[_ <: Throwable]], maxNrOfRetries: Int, withinTimeRange: Duration) = - this(SupervisorStrategy.makeDecider(trapExit), maxNrOfRetries, withinTimeRange) + def this(maxNrOfRetries: Int, withinTimeRange: Duration, trapExit: Array[Class[_ <: Throwable]]) = + this(maxNrOfRetries, withinTimeRange)(SupervisorStrategy.makeDecider(trapExit)) /* * this is a performance optimization to avoid re-allocating the pairs upon diff --git a/akka-actor/src/main/scala/akka/actor/UntypedActor.scala b/akka-actor/src/main/scala/akka/actor/UntypedActor.scala index 46afd447d6..d06896a10d 100644 --- a/akka-actor/src/main/scala/akka/actor/UntypedActor.scala +++ b/akka-actor/src/main/scala/akka/actor/UntypedActor.scala @@ -37,20 +37,21 @@ import akka.dispatch.{ MessageDispatcher, Promise } * } * } * - * private static SupervisorStrategy strategy = new OneForOneStrategy(new Function() { - * @Override - * public Action apply(Throwable t) { - * if (t instanceof ArithmeticException) { - * return resume(); - * } else if (t instanceof NullPointerException) { - * return restart(); - * } else if (t instanceof IllegalArgumentException) { - * return stop(); - * } else { - * return escalate(); + * private static SupervisorStrategy strategy = new OneForOneStrategy(10, Duration.parse("1 minute"), + * new Function() { + * @Override + * public Action apply(Throwable t) { + * if (t instanceof ArithmeticException) { + * return resume(); + * } else if (t instanceof NullPointerException) { + * return restart(); + * } else if (t instanceof IllegalArgumentException) { + * return stop(); + * } else { + * return escalate(); + * } * } - * } - * }, 10, Duration.parse("1 minute"); + * }); * * @Override * public SupervisorStrategy supervisorStrategy() { diff --git a/akka-docs/java/code/akka/docs/actor/FaultHandlingTestBase.java b/akka-docs/java/code/akka/docs/actor/FaultHandlingTestBase.java index 09be1aaaca..abf2207a1d 100644 --- a/akka-docs/java/code/akka/docs/actor/FaultHandlingTestBase.java +++ b/akka-docs/java/code/akka/docs/actor/FaultHandlingTestBase.java @@ -39,20 +39,21 @@ public class FaultHandlingTestBase { static public class Supervisor extends UntypedActor { //#strategy - private static SupervisorStrategy strategy = new OneForOneStrategy(new Function() { - @Override - public Action apply(Throwable t) { - if (t instanceof ArithmeticException) { - return resume(); - } else if (t instanceof NullPointerException) { - return restart(); - } else if (t instanceof IllegalArgumentException) { - return stop(); - } else { - return escalate(); - } - } - }, 10, Duration.parse("1 minute")); + private static SupervisorStrategy strategy = new OneForOneStrategy(10, Duration.parse("1 minute"), + new Function() { + @Override + public Action apply(Throwable t) { + if (t instanceof ArithmeticException) { + return resume(); + } else if (t instanceof NullPointerException) { + return restart(); + } else if (t instanceof IllegalArgumentException) { + return stop(); + } else { + return escalate(); + } + } + }); @Override public SupervisorStrategy supervisorStrategy() { @@ -76,20 +77,21 @@ public class FaultHandlingTestBase { static public class Supervisor2 extends UntypedActor { //#strategy2 - private static SupervisorStrategy strategy = new OneForOneStrategy(new Function() { - @Override - public Action apply(Throwable t) { - if (t instanceof ArithmeticException) { - return resume(); - } else if (t instanceof NullPointerException) { - return restart(); - } else if (t instanceof IllegalArgumentException) { - return stop(); - } else { - return escalate(); - } - } - }, 10, Duration.parse("1 minute")); + private static SupervisorStrategy strategy = new OneForOneStrategy(10, Duration.parse("1 minute"), + new Function() { + @Override + public Action apply(Throwable t) { + if (t instanceof ArithmeticException) { + return resume(); + } else if (t instanceof NullPointerException) { + return restart(); + } else if (t instanceof IllegalArgumentException) { + return stop(); + } else { + return escalate(); + } + } + }); @Override public SupervisorStrategy supervisorStrategy() { diff --git a/akka-docs/project/migration-guide-1.3.x-2.0.x.rst b/akka-docs/project/migration-guide-1.3.x-2.0.x.rst index a8b17fa2de..d17070aeff 100644 --- a/akka-docs/project/migration-guide-1.3.x-2.0.x.rst +++ b/akka-docs/project/migration-guide-1.3.x-2.0.x.rst @@ -467,7 +467,7 @@ v1.3:: val supervisor = Supervisor( SupervisorConfig( - AllForOneStrategy(List(classOf[Exception]), 3, 1000), + OneForOneStrategy(List(classOf[Exception]), 3, 1000), Supervise( actorOf[MyActor1], Permanent) :: @@ -479,12 +479,12 @@ v1.3:: v2.0:: class MyActor extends Actor { - override val supervisorStrategy = OneForOneStrategy({ + override val supervisorStrategy = OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange = 1 minute) { case _: ArithmeticException ⇒ Resume case _: NullPointerException ⇒ Restart case _: IllegalArgumentException ⇒ Stop case _: Exception ⇒ Escalate - }: Decider, maxNrOfRetries = 10, withinTimeRange = 1 minute) + } def receive = { case x => diff --git a/akka-docs/scala/code/akka/docs/actor/FaultHandlingDocSpec.scala b/akka-docs/scala/code/akka/docs/actor/FaultHandlingDocSpec.scala index 5e6b15078c..ca1eccb73a 100644 --- a/akka-docs/scala/code/akka/docs/actor/FaultHandlingDocSpec.scala +++ b/akka-docs/scala/code/akka/docs/actor/FaultHandlingDocSpec.scala @@ -22,12 +22,12 @@ object FaultHandlingDocSpec { import akka.actor.SupervisorStrategy._ import akka.util.duration._ - override val supervisorStrategy = OneForOneStrategy({ + override val supervisorStrategy = OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange = 1 minute) { case _: ArithmeticException ⇒ Resume case _: NullPointerException ⇒ Restart case _: IllegalArgumentException ⇒ Stop case _: Exception ⇒ Escalate - }: Decider, maxNrOfRetries = 10, withinTimeRange = 1 minute) + } //#strategy def receive = { @@ -43,12 +43,12 @@ object FaultHandlingDocSpec { import akka.actor.SupervisorStrategy._ import akka.util.duration._ - override val supervisorStrategy = OneForOneStrategy({ + override val supervisorStrategy = OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange = 1 minute) { case _: ArithmeticException ⇒ Resume case _: NullPointerException ⇒ Restart case _: IllegalArgumentException ⇒ Stop case _: Exception ⇒ Escalate - }: Decider, maxNrOfRetries = 10, withinTimeRange = 1 minute) + } //#strategy2 def receive = { diff --git a/akka-testkit/src/test/scala/akka/testkit/TestActorRefSpec.scala b/akka-testkit/src/test/scala/akka/testkit/TestActorRefSpec.scala index 48a62b21eb..92476a4249 100644 --- a/akka-testkit/src/test/scala/akka/testkit/TestActorRefSpec.scala +++ b/akka-testkit/src/test/scala/akka/testkit/TestActorRefSpec.scala @@ -182,8 +182,8 @@ class TestActorRefSpec extends AkkaSpec("disp1.type=Dispatcher") with BeforeAndA override def postRestart(reason: Throwable) { counter -= 1 } }), self, "child") - override def supervisorStrategy = OneForOneStrategy(List(classOf[ActorKilledException]), - maxNrOfRetries = 5, withinTimeRange = 1 second) + override def supervisorStrategy = + OneForOneStrategy(maxNrOfRetries = 5, withinTimeRange = 1 second)(List(classOf[ActorKilledException])) def receiveT = { case "sendKill" ⇒ ref ! Kill } })) From 2ed56dea7c5aa77439bba607f5552887c544ed56 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Tue, 24 Jan 2012 10:40:24 +0100 Subject: [PATCH 3/7] Type inferer help not needed --- akka-docs/scala/fault-tolerance.rst | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/akka-docs/scala/fault-tolerance.rst b/akka-docs/scala/fault-tolerance.rst index f17cab2bb3..4c9925c947 100644 --- a/akka-docs/scala/fault-tolerance.rst +++ b/akka-docs/scala/fault-tolerance.rst @@ -31,8 +31,7 @@ that the respective limit does not apply, leaving the possibility to specify an absolute upper limit on the restarts or to make the restarts work infinitely. The match statement which forms the bulk of the body is of type ``Decider``, -which is a ``PartialFunction[Throwable, Action]``, and we need to help out the -type inferencer a bit here by ascribing that type after the closing brace. This +which is a ``PartialFunction[Throwable, Action]``. This is the piece which maps child failure types to their corresponding actions. Practical Application From d5c23bcb761d17352619dabd79e21b5140f620c4 Mon Sep 17 00:00:00 2001 From: Roland Date: Tue, 24 Jan 2012 11:33:40 +0100 Subject: [PATCH 4/7] add ExtendedActorSystem to shield ActorSystemImpl - add provider, guardian, systemGuardian and deathWatch to it - make ActorSystemImpl extend ExtendedActorSystem - use ExtendedActorSystem for creating extensions, thereby limiting the access extensions get to just those four published methods. --- .../test/java/akka/actor/JavaExtension.java | 6 +-- .../scala/akka/actor/ActorSystemSpec.scala | 4 +- .../trading/domain/TradeObserver.scala | 9 ++-- .../main/scala/akka/actor/ActorSystem.scala | 42 ++++++++++++++++++- .../src/main/scala/akka/actor/Extension.scala | 10 ++--- akka-actor/src/main/scala/akka/actor/IO.scala | 2 +- .../main/scala/akka/actor/TypedActor.scala | 4 +- .../akka/serialization/Serialization.scala | 4 +- .../SerializationExtension.scala | 4 +- .../docs/extension/ExtensionDocTestBase.java | 2 +- .../SettingsExtensionDocTestBase.java | 4 +- .../docs/extension/ExtensionDocSpec.scala | 4 +- .../extension/SettingsExtensionDocSpec.scala | 4 +- .../BeanstalkBasedMailboxExtension.scala | 2 +- .../mailbox/FileBasedMailboxExtension.scala | 2 +- .../mailbox/MongoBasedMailboxExtension.scala | 2 +- .../mailbox/RedisBasedMailboxExtension.scala | 2 +- .../ZooKeeperBasedMailboxExtension.scala | 2 +- .../testkit/CallingThreadDispatcher.scala | 26 +++++------- .../scala/akka/testkit/TestKitExtension.scala | 4 +- .../scala/akka/zeromq/ZeroMQExtension.scala | 2 +- 21 files changed, 85 insertions(+), 56 deletions(-) diff --git a/akka-actor-tests/src/test/java/akka/actor/JavaExtension.java b/akka-actor-tests/src/test/java/akka/actor/JavaExtension.java index 812e79e287..28b87bb5db 100644 --- a/akka-actor-tests/src/test/java/akka/actor/JavaExtension.java +++ b/akka-actor-tests/src/test/java/akka/actor/JavaExtension.java @@ -23,15 +23,15 @@ public class JavaExtension { return TestExtensionId.TestExtensionProvider; } - public TestExtension createExtension(ActorSystemImpl i) { + public TestExtension createExtension(ExtendedActorSystem i) { return new TestExtension(i); } } static class TestExtension implements Extension { - public final ActorSystemImpl system; + public final ExtendedActorSystem system; - public TestExtension(ActorSystemImpl i) { + public TestExtension(ExtendedActorSystem i) { system = i; } } diff --git a/akka-actor-tests/src/test/scala/akka/actor/ActorSystemSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/ActorSystemSpec.scala index 9391ad43d1..ea8ce4c7d0 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/ActorSystemSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/ActorSystemSpec.scala @@ -15,11 +15,11 @@ class JavaExtensionSpec extends JavaExtension with JUnitSuite object TestExtension extends ExtensionId[TestExtension] with ExtensionIdProvider { def lookup = this - def createExtension(s: ActorSystemImpl) = new TestExtension(s) + def createExtension(s: ExtendedActorSystem) = new TestExtension(s) } // Dont't place inside ActorSystemSpec object, since it will not be garbage collected and reference to system remains -class TestExtension(val system: ActorSystemImpl) extends Extension +class TestExtension(val system: ExtendedActorSystem) extends Extension @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) class ActorSystemSpec extends AkkaSpec("""akka.extensions = ["akka.actor.TestExtension$"]""") { diff --git a/akka-actor-tests/src/test/scala/akka/performance/trading/domain/TradeObserver.scala b/akka-actor-tests/src/test/scala/akka/performance/trading/domain/TradeObserver.scala index be867121f5..870a2df79d 100644 --- a/akka-actor-tests/src/test/scala/akka/performance/trading/domain/TradeObserver.scala +++ b/akka-actor-tests/src/test/scala/akka/performance/trading/domain/TradeObserver.scala @@ -1,11 +1,8 @@ package akka.performance.trading.domain import java.util.concurrent.atomic.AtomicInteger -import akka.actor.Extension -import akka.actor.ExtensionId -import akka.actor.ExtensionIdProvider -import akka.actor.ActorSystemImpl -import akka.actor.ActorSystem + +import akka.actor.{ ExtensionIdProvider, ExtensionId, Extension, ExtendedActorSystem, ActorSystem } abstract trait TradeObserver { def trade(bid: Bid, ask: Ask) @@ -38,5 +35,5 @@ object TotalTradeCounterExtension extends ExtensionId[TotalTradeCounter] with ExtensionIdProvider { override def lookup = TotalTradeCounterExtension - override def createExtension(system: ActorSystemImpl) = new TotalTradeCounter + override def createExtension(system: ExtendedActorSystem) = new TotalTradeCounter } \ No newline at end of file diff --git a/akka-actor/src/main/scala/akka/actor/ActorSystem.scala b/akka-actor/src/main/scala/akka/actor/ActorSystem.scala index b77aac491f..c6842d0dc2 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorSystem.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorSystem.scala @@ -130,6 +130,13 @@ object ActorSystem { * }}} * * Where no name is given explicitly, one will be automatically generated. + * + * Important Notice: + * + * This class is not meant to be extended by user code. If you want to + * actually roll your own Akka, it will probably be better to look into + * extending [[akka.actor.ExtendedActorSystem]] instead, but beware that you + * are completely on your own in that case! */ abstract class ActorSystem extends ActorRefFactory { import ActorSystem._ @@ -286,7 +293,40 @@ abstract class ActorSystem extends ActorRefFactory { def hasExtension(ext: ExtensionId[_ <: Extension]): Boolean } -class ActorSystemImpl(val name: String, applicationConfig: Config) extends ActorSystem { +/** + * More powerful interface to the actor system’s implementation which is presented to extensions (see [[akka.actor.Extension]]). + * + * Important Notice: + * + * This class is not meant to be extended by user code. If you want to + * actually roll your own Akka, beware that you are completely on your own in + * that case! + */ +abstract class ExtendedActorSystem extends ActorSystem { + + /** + * The ActorRefProvider is the only entity which creates all actor references within this actor system. + */ + def provider: ActorRefProvider + + /** + * The top-level supervisor of all actors created using system.actorOf(...). + */ + def guardian: InternalActorRef + + /** + * The top-level supervisor of all system-internal services like logging. + */ + def systemGuardian: InternalActorRef + + /** + * Implementation of the mechanism which is used for watch()/unwatch(). + */ + def deathWatch: DeathWatch + +} + +class ActorSystemImpl(val name: String, applicationConfig: Config) extends ExtendedActorSystem { if (!name.matches("""^\w+$""")) throw new IllegalArgumentException("invalid ActorSystem name [" + name + "], must contain only word characters (i.e. [a-zA-Z_0-9])") diff --git a/akka-actor/src/main/scala/akka/actor/Extension.scala b/akka-actor/src/main/scala/akka/actor/Extension.scala index 1850661417..f60d6afb22 100644 --- a/akka-actor/src/main/scala/akka/actor/Extension.scala +++ b/akka-actor/src/main/scala/akka/actor/Extension.scala @@ -18,10 +18,8 @@ import akka.util.ReflectiveAccess * The extension itself can be created in any way desired and has full access * to the ActorSystem implementation. * - */ - -/** - * Marker interface to signify an Akka Extension + * This trait is only a marker interface to signify an Akka Extension, see + * [[akka.actor.ExtensionKey]] for a concise way of formulating extensions. */ trait Extension @@ -47,7 +45,7 @@ trait ExtensionId[T <: Extension] { * Is used by Akka to instantiate the Extension identified by this ExtensionId, * internal use only. */ - def createExtension(system: ActorSystemImpl): T + def createExtension(system: ExtendedActorSystem): T } /** @@ -94,7 +92,7 @@ abstract class ExtensionKey[T <: Extension](implicit m: ClassManifest[T]) extend def this(clazz: Class[T]) = this()(ClassManifest.fromClass(clazz)) override def lookup(): ExtensionId[T] = this - def createExtension(system: ActorSystemImpl): T = + def createExtension(system: ExtendedActorSystem): T = ReflectiveAccess.createInstance[T](m.erasure, Array[Class[_]](classOf[ActorSystemImpl]), Array[AnyRef](system)) match { case Left(ex) ⇒ throw ex case Right(r) ⇒ r diff --git a/akka-actor/src/main/scala/akka/actor/IO.scala b/akka-actor/src/main/scala/akka/actor/IO.scala index b7942975d9..1ee38da4b9 100644 --- a/akka-actor/src/main/scala/akka/actor/IO.scala +++ b/akka-actor/src/main/scala/akka/actor/IO.scala @@ -744,7 +744,7 @@ final class IOManager private (system: ActorSystem) extends Extension { object IOManager extends ExtensionId[IOManager] with ExtensionIdProvider { override def lookup = this - override def createExtension(system: ActorSystemImpl) = new IOManager(system) + override def createExtension(system: ExtendedActorSystem) = new IOManager(system) } /** diff --git a/akka-actor/src/main/scala/akka/actor/TypedActor.scala b/akka-actor/src/main/scala/akka/actor/TypedActor.scala index 3293add151..ce487ec2ef 100644 --- a/akka-actor/src/main/scala/akka/actor/TypedActor.scala +++ b/akka-actor/src/main/scala/akka/actor/TypedActor.scala @@ -80,7 +80,7 @@ object TypedActor extends ExtensionId[TypedActorExtension] with ExtensionIdProvi override def get(system: ActorSystem): TypedActorExtension = super.get(system) def lookup() = this - def createExtension(system: ActorSystemImpl): TypedActorExtension = new TypedActorExtension(system) + def createExtension(system: ExtendedActorSystem): TypedActorExtension = new TypedActorExtension(system) /** * Returns a contextual TypedActorFactory of this extension, this means that any TypedActors created by this TypedActorExtension @@ -531,7 +531,7 @@ case class ContextualTypedActorFactory(typedActor: TypedActorExtension, actorFac override def isTypedActor(proxyOrNot: AnyRef): Boolean = typedActor.isTypedActor(proxyOrNot) } -class TypedActorExtension(system: ActorSystemImpl) extends TypedActorFactory with Extension { +class TypedActorExtension(system: ExtendedActorSystem) extends TypedActorFactory with Extension { import TypedActor._ //Import the goodies from the companion object protected def actorFactory: ActorRefFactory = system protected def typedActor = this diff --git a/akka-actor/src/main/scala/akka/serialization/Serialization.scala b/akka-actor/src/main/scala/akka/serialization/Serialization.scala index eaa0ce0925..78cb370b68 100644 --- a/akka-actor/src/main/scala/akka/serialization/Serialization.scala +++ b/akka-actor/src/main/scala/akka/serialization/Serialization.scala @@ -9,7 +9,7 @@ import akka.util.ReflectiveAccess import scala.util.DynamicVariable import com.typesafe.config.Config import akka.config.ConfigurationException -import akka.actor.{ Extension, ActorSystem, ActorSystemImpl } +import akka.actor.{ Extension, ActorSystem, ExtendedActorSystem } case class NoSerializerFoundException(m: String) extends AkkaException(m) @@ -55,7 +55,7 @@ object Serialization { * Serialization module. Contains methods for serialization and deserialization as well as * locating a Serializer for a particular class as defined in the mapping in the 'akka.conf' file. */ -class Serialization(val system: ActorSystemImpl) extends Extension { +class Serialization(val system: ExtendedActorSystem) extends Extension { import Serialization._ val settings = new Settings(system.settings.config) diff --git a/akka-actor/src/main/scala/akka/serialization/SerializationExtension.scala b/akka-actor/src/main/scala/akka/serialization/SerializationExtension.scala index 9d9815f412..f96aa26e0c 100644 --- a/akka-actor/src/main/scala/akka/serialization/SerializationExtension.scala +++ b/akka-actor/src/main/scala/akka/serialization/SerializationExtension.scala @@ -3,7 +3,7 @@ */ package akka.serialization -import akka.actor.{ ActorSystem, ExtensionId, ExtensionIdProvider, ActorSystemImpl } +import akka.actor.{ ActorSystem, ExtensionId, ExtensionIdProvider, ExtendedActorSystem } /** * SerializationExtension is an Akka Extension to interact with the Serialization @@ -12,5 +12,5 @@ import akka.actor.{ ActorSystem, ExtensionId, ExtensionIdProvider, ActorSystemIm object SerializationExtension extends ExtensionId[Serialization] with ExtensionIdProvider { override def get(system: ActorSystem): Serialization = super.get(system) override def lookup = SerializationExtension - override def createExtension(system: ActorSystemImpl): Serialization = new Serialization(system) + override def createExtension(system: ExtendedActorSystem): Serialization = new Serialization(system) } \ No newline at end of file diff --git a/akka-docs/java/code/akka/docs/extension/ExtensionDocTestBase.java b/akka-docs/java/code/akka/docs/extension/ExtensionDocTestBase.java index 6d62eb5bb8..11dfe4c198 100644 --- a/akka-docs/java/code/akka/docs/extension/ExtensionDocTestBase.java +++ b/akka-docs/java/code/akka/docs/extension/ExtensionDocTestBase.java @@ -42,7 +42,7 @@ public class ExtensionDocTestBase { //This method will be called by Akka // to instantiate our Extension - public CountExtensionImpl createExtension(ActorSystemImpl system) { + public CountExtensionImpl createExtension(ExtendedActorSystem system) { return new CountExtensionImpl(); } } diff --git a/akka-docs/java/code/akka/docs/extension/SettingsExtensionDocTestBase.java b/akka-docs/java/code/akka/docs/extension/SettingsExtensionDocTestBase.java index 6ca1c371d9..1bf6ce7c36 100644 --- a/akka-docs/java/code/akka/docs/extension/SettingsExtensionDocTestBase.java +++ b/akka-docs/java/code/akka/docs/extension/SettingsExtensionDocTestBase.java @@ -8,7 +8,7 @@ import akka.actor.Extension; import akka.actor.AbstractExtensionId; import akka.actor.ExtensionIdProvider; import akka.actor.ActorSystem; -import akka.actor.ActorSystemImpl; +import akka.actor.ExtendedActorSystem; import akka.util.Duration; import com.typesafe.config.Config; import java.util.concurrent.TimeUnit; @@ -44,7 +44,7 @@ public class SettingsExtensionDocTestBase { return Settings.SettingsProvider; } - public SettingsImpl createExtension(ActorSystemImpl system) { + public SettingsImpl createExtension(ExtendedActorSystem system) { return new SettingsImpl(system.settings().config()); } } diff --git a/akka-docs/scala/code/akka/docs/extension/ExtensionDocSpec.scala b/akka-docs/scala/code/akka/docs/extension/ExtensionDocSpec.scala index 1f4d777ad9..0c778a4812 100644 --- a/akka-docs/scala/code/akka/docs/extension/ExtensionDocSpec.scala +++ b/akka-docs/scala/code/akka/docs/extension/ExtensionDocSpec.scala @@ -23,7 +23,7 @@ class CountExtensionImpl extends Extension { //#extensionid import akka.actor.ExtensionId import akka.actor.ExtensionIdProvider -import akka.actor.ActorSystemImpl +import akka.actor.ExtendedActorSystem object CountExtension extends ExtensionId[CountExtensionImpl] @@ -36,7 +36,7 @@ object CountExtension //This method will be called by Akka // to instantiate our Extension - override def createExtension(system: ActorSystemImpl) = new CountExtensionImpl + override def createExtension(system: ExtendedActorSystem) = new CountExtensionImpl } //#extensionid diff --git a/akka-docs/scala/code/akka/docs/extension/SettingsExtensionDocSpec.scala b/akka-docs/scala/code/akka/docs/extension/SettingsExtensionDocSpec.scala index ed5ec66517..05765d27a5 100644 --- a/akka-docs/scala/code/akka/docs/extension/SettingsExtensionDocSpec.scala +++ b/akka-docs/scala/code/akka/docs/extension/SettingsExtensionDocSpec.scala @@ -7,7 +7,7 @@ package akka.docs.extension import akka.actor.Extension import akka.actor.ExtensionId import akka.actor.ExtensionIdProvider -import akka.actor.ActorSystemImpl +import akka.actor.ExtendedActorSystem import akka.util.Duration import com.typesafe.config.Config import java.util.concurrent.TimeUnit @@ -29,7 +29,7 @@ object Settings extends ExtensionId[SettingsImpl] with ExtensionIdProvider { override def lookup = Settings - override def createExtension(system: ActorSystemImpl) = new SettingsImpl(system.settings.config) + override def createExtension(system: ExtendedActorSystem) = new SettingsImpl(system.settings.config) } //#extensionid diff --git a/akka-durable-mailboxes/akka-beanstalk-mailbox/src/main/scala/akka/actor/mailbox/BeanstalkBasedMailboxExtension.scala b/akka-durable-mailboxes/akka-beanstalk-mailbox/src/main/scala/akka/actor/mailbox/BeanstalkBasedMailboxExtension.scala index 91e8085778..36ab10393a 100644 --- a/akka-durable-mailboxes/akka-beanstalk-mailbox/src/main/scala/akka/actor/mailbox/BeanstalkBasedMailboxExtension.scala +++ b/akka-durable-mailboxes/akka-beanstalk-mailbox/src/main/scala/akka/actor/mailbox/BeanstalkBasedMailboxExtension.scala @@ -11,7 +11,7 @@ import akka.actor._ object BeanstalkBasedMailboxExtension extends ExtensionId[BeanstalkMailboxSettings] with ExtensionIdProvider { override def get(system: ActorSystem): BeanstalkMailboxSettings = super.get(system) def lookup() = this - def createExtension(system: ActorSystemImpl) = new BeanstalkMailboxSettings(system.settings.config) + def createExtension(system: ExtendedActorSystem) = new BeanstalkMailboxSettings(system.settings.config) } class BeanstalkMailboxSettings(val config: Config) extends Extension { diff --git a/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/FileBasedMailboxExtension.scala b/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/FileBasedMailboxExtension.scala index c09bfc9cb3..f7e6527499 100644 --- a/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/FileBasedMailboxExtension.scala +++ b/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/FileBasedMailboxExtension.scala @@ -11,7 +11,7 @@ import akka.actor._ object FileBasedMailboxExtension extends ExtensionId[FileBasedMailboxSettings] with ExtensionIdProvider { override def get(system: ActorSystem): FileBasedMailboxSettings = super.get(system) def lookup() = this - def createExtension(system: ActorSystemImpl) = new FileBasedMailboxSettings(system.settings.config) + def createExtension(system: ExtendedActorSystem) = new FileBasedMailboxSettings(system.settings.config) } class FileBasedMailboxSettings(val config: Config) extends Extension { diff --git a/akka-durable-mailboxes/akka-mongo-mailbox/src/main/scala/akka/actor/mailbox/MongoBasedMailboxExtension.scala b/akka-durable-mailboxes/akka-mongo-mailbox/src/main/scala/akka/actor/mailbox/MongoBasedMailboxExtension.scala index 0176fc09f3..fac0ad9050 100644 --- a/akka-durable-mailboxes/akka-mongo-mailbox/src/main/scala/akka/actor/mailbox/MongoBasedMailboxExtension.scala +++ b/akka-durable-mailboxes/akka-mongo-mailbox/src/main/scala/akka/actor/mailbox/MongoBasedMailboxExtension.scala @@ -11,7 +11,7 @@ import akka.actor._ object MongoBasedMailboxExtension extends ExtensionId[MongoBasedMailboxSettings] with ExtensionIdProvider { override def get(system: ActorSystem): MongoBasedMailboxSettings = super.get(system) def lookup() = this - def createExtension(system: ActorSystemImpl) = new MongoBasedMailboxSettings(system.settings.config) + def createExtension(system: ExtendedActorSystem) = new MongoBasedMailboxSettings(system.settings.config) } class MongoBasedMailboxSettings(val config: Config) extends Extension { diff --git a/akka-durable-mailboxes/akka-redis-mailbox/src/main/scala/akka/actor/mailbox/RedisBasedMailboxExtension.scala b/akka-durable-mailboxes/akka-redis-mailbox/src/main/scala/akka/actor/mailbox/RedisBasedMailboxExtension.scala index 17ce479244..629f08b145 100644 --- a/akka-durable-mailboxes/akka-redis-mailbox/src/main/scala/akka/actor/mailbox/RedisBasedMailboxExtension.scala +++ b/akka-durable-mailboxes/akka-redis-mailbox/src/main/scala/akka/actor/mailbox/RedisBasedMailboxExtension.scala @@ -9,7 +9,7 @@ import akka.actor._ object RedisBasedMailboxExtension extends ExtensionId[RedisBasedMailboxSettings] with ExtensionIdProvider { override def get(system: ActorSystem): RedisBasedMailboxSettings = super.get(system) def lookup() = this - def createExtension(system: ActorSystemImpl) = new RedisBasedMailboxSettings(system.settings.config) + def createExtension(system: ExtendedActorSystem) = new RedisBasedMailboxSettings(system.settings.config) } class RedisBasedMailboxSettings(val config: Config) extends Extension { diff --git a/akka-durable-mailboxes/akka-zookeeper-mailbox/src/main/scala/akka/actor/mailbox/ZooKeeperBasedMailboxExtension.scala b/akka-durable-mailboxes/akka-zookeeper-mailbox/src/main/scala/akka/actor/mailbox/ZooKeeperBasedMailboxExtension.scala index de2f2d586f..4f3dcfb42f 100644 --- a/akka-durable-mailboxes/akka-zookeeper-mailbox/src/main/scala/akka/actor/mailbox/ZooKeeperBasedMailboxExtension.scala +++ b/akka-durable-mailboxes/akka-zookeeper-mailbox/src/main/scala/akka/actor/mailbox/ZooKeeperBasedMailboxExtension.scala @@ -11,7 +11,7 @@ import akka.actor._ object ZooKeeperBasedMailboxExtension extends ExtensionId[ZooKeeperBasedMailboxSettings] with ExtensionIdProvider { override def get(system: ActorSystem): ZooKeeperBasedMailboxSettings = super.get(system) def lookup() = this - def createExtension(system: ActorSystemImpl) = new ZooKeeperBasedMailboxSettings(system.settings.config) + def createExtension(system: ExtendedActorSystem) = new ZooKeeperBasedMailboxSettings(system.settings.config) } class ZooKeeperBasedMailboxSettings(val config: Config) extends Extension { diff --git a/akka-testkit/src/main/scala/akka/testkit/CallingThreadDispatcher.scala b/akka-testkit/src/main/scala/akka/testkit/CallingThreadDispatcher.scala index a3a1982e9f..96d9f8241a 100644 --- a/akka-testkit/src/main/scala/akka/testkit/CallingThreadDispatcher.scala +++ b/akka-testkit/src/main/scala/akka/testkit/CallingThreadDispatcher.scala @@ -3,26 +3,20 @@ */ package akka.testkit -import akka.event.Logging.{ Warning, Error } +import java.lang.ref.WeakReference import java.util.concurrent.locks.ReentrantLock import java.util.LinkedList -import java.util.concurrent.RejectedExecutionException -import akka.util.Switch -import java.lang.ref.WeakReference + import scala.annotation.tailrec -import akka.actor.{ ActorCell, ActorRef, ActorSystem } -import akka.dispatch._ -import akka.actor.Scheduler -import akka.event.EventStream -import akka.util.Duration -import akka.util.duration._ -import java.util.concurrent.TimeUnit -import akka.actor.ExtensionId -import akka.actor.ExtensionIdProvider -import akka.actor.ActorSystemImpl -import akka.actor.Extension + import com.typesafe.config.Config +import CallingThreadDispatcher.Id +import akka.actor.{ ExtensionIdProvider, ExtensionId, Extension, ExtendedActorSystem, ActorRef, ActorCell } +import akka.dispatch.{ TaskInvocation, SystemMessage, Suspend, Resume, MessageDispatcherConfigurator, MessageDispatcher, Mailbox, Envelope, DispatcherPrerequisites, DefaultSystemMessageQueue } +import akka.util.duration.intToDurationInt +import akka.util.{ Switch, Duration } + /* * Locking rules: * @@ -42,7 +36,7 @@ import com.typesafe.config.Config private[testkit] object CallingThreadDispatcherQueues extends ExtensionId[CallingThreadDispatcherQueues] with ExtensionIdProvider { override def lookup = CallingThreadDispatcherQueues - override def createExtension(system: ActorSystemImpl): CallingThreadDispatcherQueues = new CallingThreadDispatcherQueues + override def createExtension(system: ExtendedActorSystem): CallingThreadDispatcherQueues = new CallingThreadDispatcherQueues } private[testkit] class CallingThreadDispatcherQueues extends Extension { diff --git a/akka-testkit/src/main/scala/akka/testkit/TestKitExtension.scala b/akka-testkit/src/main/scala/akka/testkit/TestKitExtension.scala index 89c40f48f4..ada5a4fd30 100644 --- a/akka-testkit/src/main/scala/akka/testkit/TestKitExtension.scala +++ b/akka-testkit/src/main/scala/akka/testkit/TestKitExtension.scala @@ -6,11 +6,11 @@ package akka.testkit import com.typesafe.config.Config import akka.util.Duration import java.util.concurrent.TimeUnit.MILLISECONDS -import akka.actor.{ ExtensionId, ActorSystem, Extension, ActorSystemImpl } +import akka.actor.{ ExtensionId, ActorSystem, Extension, ExtendedActorSystem } object TestKitExtension extends ExtensionId[TestKitSettings] { override def get(system: ActorSystem): TestKitSettings = super.get(system) - def createExtension(system: ActorSystemImpl): TestKitSettings = new TestKitSettings(system.settings.config) + def createExtension(system: ExtendedActorSystem): TestKitSettings = new TestKitSettings(system.settings.config) } class TestKitSettings(val config: Config) extends Extension { diff --git a/akka-zeromq/src/main/scala/akka/zeromq/ZeroMQExtension.scala b/akka-zeromq/src/main/scala/akka/zeromq/ZeroMQExtension.scala index f0dee326b2..c0486fa3ab 100644 --- a/akka-zeromq/src/main/scala/akka/zeromq/ZeroMQExtension.scala +++ b/akka-zeromq/src/main/scala/akka/zeromq/ZeroMQExtension.scala @@ -24,7 +24,7 @@ case class ZeroMQVersion(major: Int, minor: Int, patch: Int) { */ object ZeroMQExtension extends ExtensionId[ZeroMQExtension] with ExtensionIdProvider { def lookup() = this - def createExtension(system: ActorSystemImpl) = new ZeroMQExtension(system) + def createExtension(system: ExtendedActorSystem) = new ZeroMQExtension(system) private val minVersionString = "2.1.0" private val minVersion = JZMQ.makeVersion(2, 1, 0) From f4ee0969b072063a5bf7247cc2fd49726dcfea85 Mon Sep 17 00:00:00 2001 From: Roland Date: Tue, 24 Jan 2012 12:14:05 +0100 Subject: [PATCH 5/7] add implicit Decider conv. for Iterable[CauseAction] --- .../src/main/scala/akka/actor/FaultHandling.scala | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/akka-actor/src/main/scala/akka/actor/FaultHandling.scala b/akka-actor/src/main/scala/akka/actor/FaultHandling.scala index 4c7f3611a2..8d26567c37 100644 --- a/akka-actor/src/main/scala/akka/actor/FaultHandling.scala +++ b/akka-actor/src/main/scala/akka/actor/FaultHandling.scala @@ -45,7 +45,16 @@ case class ChildRestartStats(val child: ActorRef, var maxNrOfRetriesCount: Int = } } -object SupervisorStrategy { +trait SupervisorStrategyLowPriorityImplicits { this: SupervisorStrategy.type ⇒ + + /** + * Implicit conversion from `Seq` of Cause-Action pairs to a `Decider`. See makeDecider(causeAction). + */ + implicit def seqCauseAction2Decider(trapExit: Iterable[CauseAction]): Decider = makeDecider(trapExit) + // the above would clash with seqThrowable2Decider for empty lists +} + +object SupervisorStrategy extends SupervisorStrategyLowPriorityImplicits { sealed trait Action /** From 6afed30d430e29f99b7f34040e6e6c8c0cccdcc6 Mon Sep 17 00:00:00 2001 From: Roland Date: Tue, 24 Jan 2012 10:45:18 +0100 Subject: [PATCH 6/7] add Java FSM example and reST, see #1428 --- akka-docs/general/actors.rst | 14 +- .../code/akka/docs/actor/FSMDocTest.scala | 8 + .../code/akka/docs/actor/FSMDocTestBase.java | 174 ++++++++++++++++++ akka-docs/java/fsm.rst | 79 ++++++++ akka-docs/java/index.rst | 1 + akka-docs/scala/fsm.rst | 5 +- 6 files changed, 272 insertions(+), 9 deletions(-) create mode 100644 akka-docs/java/code/akka/docs/actor/FSMDocTest.scala create mode 100644 akka-docs/java/code/akka/docs/actor/FSMDocTestBase.java create mode 100644 akka-docs/java/fsm.rst diff --git a/akka-docs/general/actors.rst b/akka-docs/general/actors.rst index 952b1b08e4..4d8f2c8096 100644 --- a/akka-docs/general/actors.rst +++ b/akka-docs/general/actors.rst @@ -32,13 +32,13 @@ State Actor objects will typically contain some variables which reflect possible states the actor may be in. This can be an explicit state machine (e.g. using -the :ref:`fsm` module), or it could be a counter, set of listeners, pending -requests, etc. These data are what make an actor valuable, and they must be -protected from corruption by other actors. The good news is that Akka actors -conceptually each have their own light-weight thread, which is completely -shielded from the rest of the system. This means that instead of having to -synchronize access using locks you can just write your actor code without -worrying about concurrency at all. +the :ref:`fsm-scala` module), or it could be a counter, set of listeners, +pending requests, etc. These data are what make an actor valuable, and they +must be protected from corruption by other actors. The good news is that Akka +actors conceptually each have their own light-weight thread, which is +completely shielded from the rest of the system. This means that instead of +having to synchronize access using locks you can just write your actor code +without worrying about concurrency at all. Behind the scenes Akka will run sets of actors on sets of real threads, where typically many actors share one thread, and subsequent invocations of one actor diff --git a/akka-docs/java/code/akka/docs/actor/FSMDocTest.scala b/akka-docs/java/code/akka/docs/actor/FSMDocTest.scala new file mode 100644 index 0000000000..11bb542808 --- /dev/null +++ b/akka-docs/java/code/akka/docs/actor/FSMDocTest.scala @@ -0,0 +1,8 @@ +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ +package akka.docs.actor + +import org.scalatest.junit.JUnitSuite + +class FSMDocTest extends FSMDocTestBase with JUnitSuite \ No newline at end of file diff --git a/akka-docs/java/code/akka/docs/actor/FSMDocTestBase.java b/akka-docs/java/code/akka/docs/actor/FSMDocTestBase.java new file mode 100644 index 0000000000..981cac15b1 --- /dev/null +++ b/akka-docs/java/code/akka/docs/actor/FSMDocTestBase.java @@ -0,0 +1,174 @@ +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ +package akka.docs.actor; + +//#imports-data +import java.util.ArrayList; +import java.util.List; +import akka.actor.ActorRef; +//#imports-data + +//#imports-actor +import akka.event.LoggingAdapter; +import akka.event.Logging; +import akka.actor.UntypedActor; +//#imports-actor + +import akka.actor.ActorSystem; +import akka.actor.Props; +import akka.testkit.TestProbe; + +public class FSMDocTestBase { + + //#data + public static final class SetTarget { + final ActorRef ref; + public SetTarget(ActorRef ref) { + this.ref = ref; + } + } + + public static final class Queue { + final Object o; + public Queue(Object o) { + this.o = o; + } + } + + public static final Object flush = new Object(); + + public static final class Batch { + final List objects; + public Batch(List objects) { + this.objects = objects; + } + } + //#data + + //#base + static abstract class MyFSMBase extends UntypedActor { + + /* + * This is the mutable state of this state machine. + */ + protected enum State { IDLE, ACTIVE; } + private State state = State.IDLE; + private ActorRef target; + private List queue; + + /* + * Then come all the mutator methods: + */ + protected void init(ActorRef target) { + this.target = target; + queue = new ArrayList(); + } + + protected void setState(State s) { + if (state != s) { + transition(state, s); + state = s; + } + } + + protected void enqueue(Object o) { + if (queue != null) queue.add(o); + } + + protected List drainQueue() { + final List q = queue; + if (q == null) throw new IllegalStateException("drainQueue(): not yet initialized"); + queue = new ArrayList(); + return q; + } + + /* + * Here are the interrogation methods: + */ + protected boolean isInitialized() { + return target != null; + } + + protected State getState() { + return state; + } + + protected ActorRef getTarget() { + if (target == null) throw new IllegalStateException("getTarget(): not yet initialized"); + return target; + } + + /* + * And finally the callbacks (only one in this example: react to state change) + */ + abstract protected void transition(State old, State next); + } + //#base + + //#actor + static public class MyFSM extends MyFSMBase { + + private final LoggingAdapter log = Logging.getLogger(getContext().system(), this); + + @Override + public void onReceive(Object o) { + + if (getState() == State.IDLE) { + + if (o instanceof SetTarget) + init(((SetTarget) o).ref); + + else whenUnhandled(o); + + } else if (getState() == State.ACTIVE) { + + if (o == flush) + setState(State.IDLE); + + else whenUnhandled(o); + } + } + + @Override + public void transition(State old, State next) { + if (old == State.ACTIVE) { + getTarget().tell(new Batch(drainQueue())); + } + } + + private void whenUnhandled(Object o) { + if (o instanceof Queue && isInitialized()) { + enqueue(((Queue) o).o); + setState(State.ACTIVE); + + } else { + log.warning("received unknown message {} in state {}", o, getState()); + } + } + } + //#actor + + ActorSystem system = ActorSystem.create(); + + @org.junit.Test + public void mustBunch() { + final ActorRef buncher = system.actorOf(new Props(MyFSM.class)); + final TestProbe probe = new TestProbe(system); + buncher.tell(new SetTarget(probe.ref())); + buncher.tell(new Queue(1)); + buncher.tell(new Queue(2)); + buncher.tell(flush); + buncher.tell(new Queue(3)); + final Batch b = probe.expectMsgClass(Batch.class); + assert b.objects.size() == 2; + assert b.objects.contains(1); + assert b.objects.contains(2); + } + + @org.junit.After + public void cleanup() { + system.shutdown(); + } + +} diff --git a/akka-docs/java/fsm.rst b/akka-docs/java/fsm.rst new file mode 100644 index 0000000000..d66627d416 --- /dev/null +++ b/akka-docs/java/fsm.rst @@ -0,0 +1,79 @@ +.. _fsm-java: + +########################################### +Building Finite State Machine Actors (Java) +########################################### + +.. sidebar:: Contents + + .. contents:: :local: + +Overview +======== + +The FSM (Finite State Machine) pattern is best described in the `Erlang design +principles +`_. +In short, it can be seen as a set of relations of the form: + + **State(S) x Event(E) -> Actions (A), State(S')** + +These relations are interpreted as meaning: + + *If we are in state S and the event E occurs, we should perform the actions A + and make a transition to the state S'.* + +While the Scala programming language enables the formulation of a nice internal +DSL (domain specific language) for formulating finite state machines (see +:ref:`fsm-scala`), Java’s verbosity does not lend itself well to the same +approach. This chapter describes ways to effectively achieve the same +separation of concerns through self-discipline. + +How State should be Handled +=========================== + +All mutable fields (or transitively mutable data structures) referenced by the +FSM actor’s implementation should be collected in one place and only mutated +using a small well-defined set of methods. One way to achieve this is to +assemble all mutable state in a superclass which keeps it private and offers +protected methods for mutating it. + +.. includecode:: code/akka/docs/actor/FSMDocTestBase.java#imports-data + +.. includecode:: code/akka/docs/actor/FSMDocTestBase.java#base + +The benefit of this approach is that state changes can be acted upon in one +central place, which makes it impossible to forget inserting code for reacting +to state transitions when adding to the FSM’s machinery. + +Message Buncher Example +======================= + +The base class shown above is designed to support a similar example as for the +Scala FSM documentation: an actor which receives and queues messages, to be +delivered in batches to a configurable target actor. The messages involved are: + +.. includecode:: code/akka/docs/actor/FSMDocTestBase.java#data + +This actor has only the two states ``IDLE`` and ``ACTIVE``, making their +handling quite straight-forward in the concrete actor derived from the base +class: + +.. includecode:: code/akka/docs/actor/FSMDocTestBase.java#imports-actor + +.. includecode:: code/akka/docs/actor/FSMDocTestBase.java#actor + +The trick here is to factor out common functionality like :meth:`whenUnhandled` +and :meth:`transition` in order to obtain a few well-defined points for +reacting to change or insert logging. + +State-Centric vs. Event-Centric +=============================== + +In the example above, the subjective complexity of state and events was roughly +equal, making it a matter of taste whether to choose primary dispatch on +either; in the example a state-based dispatch was chosen. Depending on how +evenly the matrix of possible states and events is populated, it may be more +practical to handle different events first and distinguish the states in the +second tier. An example would be a state machine which has a multitude of +internal states but handles only very few distinct events. diff --git a/akka-docs/java/index.rst b/akka-docs/java/index.rst index 4b0226fc35..319dbab302 100644 --- a/akka-docs/java/index.rst +++ b/akka-docs/java/index.rst @@ -21,4 +21,5 @@ Java API stm agents transactors + fsm extending-akka diff --git a/akka-docs/scala/fsm.rst b/akka-docs/scala/fsm.rst index 7b3d136ae4..618381901c 100644 --- a/akka-docs/scala/fsm.rst +++ b/akka-docs/scala/fsm.rst @@ -1,4 +1,4 @@ -.. _fsm: +.. _fsm-scala: ### FSM @@ -21,7 +21,8 @@ A FSM can be described as a set of relations of the form: These relations are interpreted as meaning: - *If we are in state S and the event E occurs, we should perform the actions A and make a transition to the state S'.* + *If we are in state S and the event E occurs, we should perform the actions A + and make a transition to the state S'.* A Simple Example ================ From cd608301f8785ff9f742984e4f975449e1336027 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Tue, 24 Jan 2012 16:02:21 +0100 Subject: [PATCH 7/7] Commented out Schoir plugin. It has bad dependency to log4j 1.2.15. See #1721 --- project/AkkaBuild.scala | 4 ++-- project/plugins.sbt | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/project/AkkaBuild.scala b/project/AkkaBuild.scala index 9e5e773911..f32987fb45 100644 --- a/project/AkkaBuild.scala +++ b/project/AkkaBuild.scala @@ -8,7 +8,7 @@ import sbt._ import sbt.Keys._ import com.typesafe.sbtmultijvm.MultiJvmPlugin import com.typesafe.sbtmultijvm.MultiJvmPlugin.{ MultiJvm, extraOptions, jvmOptions, scalatestOptions } -import com.typesafe.schoir.SchoirPlugin.schoirSettings +//import com.typesafe.schoir.SchoirPlugin.schoirSettings import com.typesafe.sbtscalariform.ScalariformPlugin import com.typesafe.sbtscalariform.ScalariformPlugin.ScalariformKeys import java.lang.Boolean.getBoolean @@ -71,7 +71,7 @@ object AkkaBuild extends Build { id = "akka-remote", base = file("akka-remote"), dependencies = Seq(actor, actorTests % "test->test", testkit % "test->test"), - settings = defaultSettings ++ multiJvmSettings ++ schoirSettings ++ Seq( + settings = defaultSettings ++ multiJvmSettings ++ /*schoirSettings ++*/ Seq( libraryDependencies ++= Dependencies.remote, // disable parallel tests parallelExecution in Test := false, diff --git a/project/plugins.sbt b/project/plugins.sbt index 024f70877c..899db6307f 100644 --- a/project/plugins.sbt +++ b/project/plugins.sbt @@ -3,7 +3,7 @@ resolvers += Classpaths.typesafeResolver addSbtPlugin("com.typesafe.sbtmultijvm" % "sbt-multi-jvm" % "0.1.9") -addSbtPlugin("com.typesafe.schoir" % "schoir" % "0.1.1") +//addSbtPlugin("com.typesafe.schoir" % "schoir" % "0.1.1") addSbtPlugin("com.typesafe.sbteclipse" % "sbteclipse" % "1.5.0")