From a6f53d860599d909bca5b4f8d13341360542777e Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Wed, 28 Sep 2011 14:08:04 +0200 Subject: [PATCH] Major rework of supervision and death watch, still not fully functioning --- .../java/akka/config/SupervisionConfig.java | 2 +- .../ActorFireForgetRequestReplySpec.scala | 2 +- .../test/scala/akka/actor/ActorRefSpec.scala | 3 +- .../scala/akka/actor/ActorRestartSpec.scala | 3 +- .../scala/akka/actor/FSMTransitionSpec.scala | 2 +- .../scala/akka/actor/LoggingReceiveSpec.scala | 2 +- .../akka/actor/RestartStrategySpec.scala | 11 +- .../test/scala/akka/actor/SchedulerSpec.scala | 2 +- .../akka/actor/SupervisorHierarchySpec.scala | 7 +- .../scala/akka/actor/SupervisorMiscSpec.scala | 72 +-- .../scala/akka/actor/SupervisorSpec.scala | 18 +- .../scala/akka/actor/SupervisorTreeSpec.scala | 4 +- .../test/scala/akka/actor/Ticket669Spec.scala | 4 +- .../scala/akka/routing/ActorPoolSpec.scala | 8 +- .../scala/akka/ticket/Ticket703Spec.scala | 5 +- .../src/main/scala/akka/actor/Actor.scala | 9 +- .../src/main/scala/akka/actor/ActorCell.scala | 466 +++++++++--------- .../src/main/scala/akka/actor/ActorRef.scala | 17 +- .../src/main/scala/akka/actor/Props.scala | 3 +- .../main/scala/akka/config/Configurator.scala | 3 +- .../scala/akka/config/SupervisionConfig.scala | 62 +-- .../main/scala/akka/event/DeathWatch.scala | 42 +- .../scala/akka/camel/ConsumerScalaTest.scala | 4 +- .../src/main/scala/akka/cluster/Cluster.scala | 2 +- akka-docs/java/fault-tolerance.rst | 28 +- akka-docs/java/guice-integration.rst | 2 +- .../project/migration-guide-0.10.x-1.0.x.rst | 10 +- akka-docs/scala/fault-tolerance.rst | 30 +- akka-docs/scala/http.rst | 2 +- akka-docs/scala/tutorial-chat-server.rst | 2 +- .../main/scala/akka/remote/RemoteDaemon.scala | 2 +- .../src/main/scala/ChatServer.scala | 4 +- .../src/main/scala/sample/hello/Boot.scala | 2 +- .../SupervisionBeanDefinitionParser.scala | 6 +- .../SupervisionBeanDefinitionParserTest.scala | 8 +- .../scala/SupervisionFactoryBeanTest.scala | 2 +- .../scala/akka/testkit/TestActorRefSpec.scala | 3 +- .../scala/akka/testkit/TestProbeSpec.scala | 1 - 38 files changed, 367 insertions(+), 488 deletions(-) diff --git a/akka-actor-tests/src/test/java/akka/config/SupervisionConfig.java b/akka-actor-tests/src/test/java/akka/config/SupervisionConfig.java index 47daf4e506..97605a4a79 100644 --- a/akka-actor-tests/src/test/java/akka/config/SupervisionConfig.java +++ b/akka-actor-tests/src/test/java/akka/config/SupervisionConfig.java @@ -18,6 +18,6 @@ public class SupervisionConfig { } - return new SupervisorConfig(new AllForOnePermanentStrategy(new Class[] { Exception.class }, 50, 1000), targets.toArray(new Server[targets.size()])); + return new SupervisorConfig(new AllForOneStrategy(new Class[] { Exception.class }, 50, 1000), targets.toArray(new Server[targets.size()])); } } diff --git a/akka-actor-tests/src/test/scala/akka/actor/ActorFireForgetRequestReplySpec.scala b/akka-actor-tests/src/test/scala/akka/actor/ActorFireForgetRequestReplySpec.scala index 1b9a3e1d81..f27c2c32b1 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/ActorFireForgetRequestReplySpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/ActorFireForgetRequestReplySpec.scala @@ -84,7 +84,7 @@ class ActorFireForgetRequestReplySpec extends WordSpec with MustMatchers with Be "should shutdown crashed temporary actor" in { filterEvents(EventFilter[Exception]("Expected")) { - val supervisor = actorOf(Props(self ⇒ { case _ ⇒ }).withFaultHandler(OneForOneTemporaryStrategy(List(classOf[Exception])))) + val supervisor = actorOf(Props(self ⇒ { case _ ⇒ }).withFaultHandler(OneForOneStrategy(List(classOf[Exception]), Some(0)))) val actor = actorOf(Props[CrashingActor].withSupervisor(supervisor)) actor.isRunning must be(true) actor ! "Die" diff --git a/akka-actor-tests/src/test/scala/akka/actor/ActorRefSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/ActorRefSpec.scala index 3a59541bd2..db43560c18 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/ActorRefSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/ActorRefSpec.scala @@ -10,7 +10,6 @@ import org.scalatest.matchers.MustMatchers import akka.testkit._ import akka.util.duration._ import akka.testkit.Testing.sleepFor -import akka.config.Supervision.{ OneForOnePermanentStrategy } import java.lang.IllegalStateException import akka.util.ReflectiveAccess import akka.actor.Actor.actorOf @@ -379,7 +378,7 @@ class ActorRefSpec extends WordSpec with MustMatchers with TestKit { }).withSupervisor(self)) protected def receive = { case "sendKill" ⇒ ref ! Kill } - }).withFaultHandler(OneForOnePermanentStrategy(List(classOf[Throwable]), 2, 1000))) + }).withFaultHandler(OneForOneStrategy(List(classOf[Throwable]), 2, 1000))) boss ! "sendKill" latch.await(5, TimeUnit.SECONDS) must be === true diff --git a/akka-actor-tests/src/test/scala/akka/actor/ActorRestartSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/ActorRestartSpec.scala index bb06d6939a..b25f065f8d 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/ActorRestartSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/ActorRestartSpec.scala @@ -10,7 +10,6 @@ import org.scalatest.matchers.MustMatchers import Actor.actorOf import akka.testkit._ import akka.util.duration._ -import akka.config.Supervision.OneForOnePermanentStrategy import java.util.concurrent.atomic._ @@ -67,7 +66,7 @@ class ActorRestartSpec extends WordSpec with MustMatchers with TestKit with Befo } private def createSupervisor = - actorOf(Props[Supervisor].withFaultHandler(OneForOnePermanentStrategy(List(classOf[Throwable]), 5, 5000))) + actorOf(Props[Supervisor].withFaultHandler(OneForOneStrategy(List(classOf[Throwable]), 5, 5000))) val expectedEvents = Seq(EventFilter[ActorKilledException], EventFilter[IllegalActorStateException]("expected")) diff --git a/akka-actor-tests/src/test/scala/akka/actor/FSMTransitionSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/FSMTransitionSpec.scala index 7222e1b073..3d1fb82615 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/FSMTransitionSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/FSMTransitionSpec.scala @@ -62,7 +62,7 @@ class FSMTransitionSpec extends WordSpec with MustMatchers with TestKit { "not fail when listener goes away" in { val forward = Actor.actorOf(new Forwarder(testActor)) val fsm = Actor.actorOf(new MyFSM(testActor)) - val sup = Actor.actorOf(Props[Supervisor].withFaultHandler(OneForOnePermanentStrategy(List(classOf[Throwable]), None, None))) + val sup = Actor.actorOf(Props[Supervisor].withFaultHandler(OneForOneStrategy(List(classOf[Throwable]), None, None))) sup link fsm within(300 millis) { fsm ! SubscribeTransitionCallBack(forward) diff --git a/akka-actor-tests/src/test/scala/akka/actor/LoggingReceiveSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/LoggingReceiveSpec.scala index 70ead0027d..f073c84aa2 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/LoggingReceiveSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/LoggingReceiveSpec.scala @@ -139,7 +139,7 @@ class LoggingReceiveSpec "log LifeCycle changes if requested" in { within(2 seconds) { - val supervisor = TestActorRef[TestLogActor](Props[TestLogActor].withFaultHandler(OneForOnePermanentStrategy(List(classOf[Throwable]), 5, 5000))) + val supervisor = TestActorRef[TestLogActor](Props[TestLogActor].withFaultHandler(OneForOneStrategy(List(classOf[Throwable]), 5, 5000))) val f = Actor.getClass.getDeclaredField("debugLifecycle") f.setAccessible(true) diff --git a/akka-actor-tests/src/test/scala/akka/actor/RestartStrategySpec.scala b/akka-actor-tests/src/test/scala/akka/actor/RestartStrategySpec.scala index f786b54163..b12c8c10d6 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/RestartStrategySpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/RestartStrategySpec.scala @@ -15,7 +15,6 @@ import akka.testkit.EventFilter import Actor._ import java.util.concurrent.{ TimeUnit, CountDownLatch } -import akka.config.Supervision.{ Permanent, LifeCycle, OneForOnePermanentStrategy } import org.multiverse.api.latches.StandardLatch class RestartStrategySpec extends JUnitSuite with BeforeAndAfterAll { @@ -36,7 +35,7 @@ class RestartStrategySpec extends JUnitSuite with BeforeAndAfterAll { val boss = actorOf(Props(new Actor { protected def receive = { case _ ⇒ () } - }).withFaultHandler(OneForOnePermanentStrategy(List(classOf[Throwable]), 2, 1000))) + }).withFaultHandler(OneForOneStrategy(List(classOf[Throwable]), 2, 1000))) val restartLatch = new StandardLatch val secondRestartLatch = new StandardLatch @@ -84,7 +83,7 @@ class RestartStrategySpec extends JUnitSuite with BeforeAndAfterAll { val boss = actorOf(Props(new Actor { def receive = { case _ ⇒ () } - }).withFaultHandler(OneForOnePermanentStrategy(List(classOf[Throwable]), None, None))) + }).withFaultHandler(OneForOneStrategy(List(classOf[Throwable]), None, None))) val countDownLatch = new CountDownLatch(100) @@ -109,7 +108,7 @@ class RestartStrategySpec extends JUnitSuite with BeforeAndAfterAll { val boss = actorOf(Props(new Actor { def receive = { case _ ⇒ () } - }).withFaultHandler(OneForOnePermanentStrategy(List(classOf[Throwable]), 2, 500))) + }).withFaultHandler(OneForOneStrategy(List(classOf[Throwable]), 2, 500))) val restartLatch = new StandardLatch val secondRestartLatch = new StandardLatch @@ -168,7 +167,7 @@ class RestartStrategySpec extends JUnitSuite with BeforeAndAfterAll { def slaveShouldNotRestartAfterMaxRetries = { val boss = actorOf(Props(new Actor { def receive = { case _ ⇒ () } - }).withFaultHandler(OneForOnePermanentStrategy(List(classOf[Throwable]), Some(2), None))) + }).withFaultHandler(OneForOneStrategy(List(classOf[Throwable]), Some(2), None))) val restartLatch = new StandardLatch val secondRestartLatch = new StandardLatch @@ -225,7 +224,7 @@ class RestartStrategySpec extends JUnitSuite with BeforeAndAfterAll { val boss = actorOf(Props(new Actor { def receive = { case t: Terminated ⇒ maxNoOfRestartsLatch.open } - }).withFaultHandler(OneForOnePermanentStrategy(List(classOf[Throwable]), None, Some(1000)))) + }).withFaultHandler(OneForOneStrategy(List(classOf[Throwable]), None, Some(1000)))) val slave = actorOf(Props(new Actor { diff --git a/akka-actor-tests/src/test/scala/akka/actor/SchedulerSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/SchedulerSpec.scala index 3413575e95..ff3e18b768 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/SchedulerSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/SchedulerSpec.scala @@ -129,7 +129,7 @@ class SchedulerSpec extends JUnitSuite { Supervisor( SupervisorConfig( - AllForOnePermanentStrategy(List(classOf[Exception]), 3, 1000), + AllForOneStrategy(List(classOf[Exception]), 3, 1000), Supervise( actor, Permanent) diff --git a/akka-actor-tests/src/test/scala/akka/actor/SupervisorHierarchySpec.scala b/akka-actor-tests/src/test/scala/akka/actor/SupervisorHierarchySpec.scala index 71c57a8ad2..8bbdb06813 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/SupervisorHierarchySpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/SupervisorHierarchySpec.scala @@ -8,7 +8,6 @@ import org.scalatest.junit.JUnitSuite import org.junit.Test import Actor._ -import akka.config.Supervision.OneForOnePermanentStrategy import akka.testkit._ import java.util.concurrent.{ TimeUnit, CountDownLatch } @@ -29,9 +28,9 @@ class SupervisorHierarchySpec extends JUnitSuite { def killWorkerShouldRestartMangerAndOtherWorkers = { val countDown = new CountDownLatch(4) - val boss = actorOf(Props(self ⇒ { case _ ⇒ }).withFaultHandler(OneForOnePermanentStrategy(List(classOf[Throwable]), 5, 1000))) + val boss = actorOf(Props(self ⇒ { case _ ⇒ }).withFaultHandler(OneForOneStrategy(List(classOf[Throwable]), 5, 1000))) - val manager = actorOf(Props(new CountDownActor(countDown)).withFaultHandler(OneForOnePermanentStrategy(List(), None, None)).withSupervisor(boss)) + val manager = actorOf(Props(new CountDownActor(countDown)).withFaultHandler(OneForOneStrategy(List(), None, None)).withSupervisor(boss)) val workerOne, workerTwo, workerThree = actorOf(Props(new CountDownActor(countDown)).withSupervisor(manager)) @@ -53,7 +52,7 @@ class SupervisorHierarchySpec extends JUnitSuite { protected def receive = { case Terminated(_, _) ⇒ countDownMax.countDown() } - }).withFaultHandler(OneForOnePermanentStrategy(List(classOf[Throwable]), 1, 5000))) + }).withFaultHandler(OneForOneStrategy(List(classOf[Throwable]), 1, 5000))) val crasher = actorOf(Props(new CountDownActor(countDownMessages)).withSupervisor(boss)) diff --git a/akka-actor-tests/src/test/scala/akka/actor/SupervisorMiscSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/SupervisorMiscSpec.scala index 5bbdc5f06f..5631b84a58 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/SupervisorMiscSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/SupervisorMiscSpec.scala @@ -5,73 +5,49 @@ package akka.actor import org.scalatest.WordSpec import org.scalatest.matchers.MustMatchers -import akka.config.Supervision.{ SupervisorConfig, OneForOnePermanentStrategy, Supervise, Permanent } -import java.util.concurrent.CountDownLatch +import akka.config.Supervision.{ SupervisorConfig, Supervise, Permanent } import akka.testkit.{ filterEvents, EventFilter } import akka.dispatch.{ PinnedDispatcher, Dispatchers } +import java.util.concurrent.{ TimeUnit, CountDownLatch } class SupervisorMiscSpec extends WordSpec with MustMatchers { "A Supervisor" should { "restart a crashing actor and its dispatcher for any dispatcher" in { - filterEvents(EventFilter[Exception]("killed")) { + filterEvents(EventFilter[Exception]("Kill")) { val countDownLatch = new CountDownLatch(4) - val actor1 = Actor.actorOf(Props(new Actor { + val supervisor = Actor.actorOf(Props(new Actor { + def receive = { case _ ⇒ } + }).withFaultHandler(OneForOneStrategy(List(classOf[Exception]), 3, 5000))) + + val workerProps = Props(new Actor { override def postRestart(cause: Throwable) { countDownLatch.countDown() } protected def receive = { - case "kill" ⇒ throw new Exception("killed") - case _ ⇒ println("received unknown message") + case "status" ⇒ this.reply("OK") + case _ ⇒ this.self.stop() } - }).withDispatcher(new PinnedDispatcher())) + }).withSupervisor(supervisor) - val actor2 = Actor.actorOf(Props(new Actor { - override def postRestart(cause: Throwable) { countDownLatch.countDown() } + val actor1 = Actor.actorOf(workerProps.withDispatcher(new PinnedDispatcher())) - protected def receive = { - case "kill" ⇒ throw new Exception("killed") - case _ ⇒ println("received unknown message") - } - }).withDispatcher(new PinnedDispatcher())) + val actor2 = Actor.actorOf(workerProps.withDispatcher(new PinnedDispatcher())) - val actor3 = Actor.actorOf(Props(new Actor { - override def postRestart(cause: Throwable) { countDownLatch.countDown() } + val actor3 = Actor.actorOf(workerProps.withDispatcher(Dispatchers.newDispatcher("test").build)) - protected def receive = { - case "kill" ⇒ throw new Exception("killed") - case _ ⇒ println("received unknown message") - } - }).withDispatcher(Dispatchers.newDispatcher("test").build)) + val actor4 = Actor.actorOf(workerProps.withDispatcher(new PinnedDispatcher())) - val actor4 = Actor.actorOf(Props(new Actor { - override def postRestart(cause: Throwable) { countDownLatch.countDown() } + actor1 ! Kill + actor2 ! Kill + actor3 ! Kill + actor4 ! Kill - protected def receive = { - case "kill" ⇒ throw new Exception("killed") - case _ ⇒ println("received unknown message") - } - }).withDispatcher(new PinnedDispatcher())) - - val sup = Supervisor( - SupervisorConfig( - OneForOnePermanentStrategy(List(classOf[Exception]), 3, 5000), - Supervise(actor1, Permanent) :: - Supervise(actor2, Permanent) :: - Supervise(actor3, Permanent) :: - Supervise(actor4, Permanent) :: - Nil)) - - actor1 ! "kill" - actor2 ! "kill" - actor3 ! "kill" - actor4 ! "kill" - - countDownLatch.await() - assert(!actor1.isShutdown, "actor1 is shutdown") - assert(!actor2.isShutdown, "actor2 is shutdown") - assert(!actor3.isShutdown, "actor3 is shutdown") - assert(!actor4.isShutdown, "actor4 is shutdown") + countDownLatch.await(10, TimeUnit.SECONDS) + assert((actor1 ? "status").as[String].get == "OK", "actor1 is shutdown") + assert((actor2 ? "status").as[String].get == "OK", "actor2 is shutdown") + assert((actor3 ? "status").as[String].get == "OK", "actor3 is shutdown") + assert((actor4 ? "status").as[String].get == "OK", "actor4 is shutdown") } } } diff --git a/akka-actor-tests/src/test/scala/akka/actor/SupervisorSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/SupervisorSpec.scala index d45b13e70b..6b434ad21e 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/SupervisorSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/SupervisorSpec.scala @@ -74,7 +74,7 @@ object SupervisorSpec { val supervisor = Supervisor( SupervisorConfig( - AllForOneTemporaryStrategy(List(classOf[Exception])), + AllForOneStrategy(List(classOf[Exception]), Some(0)), Supervise( temporaryActor, Temporary) @@ -88,7 +88,7 @@ object SupervisorSpec { val supervisor = Supervisor( SupervisorConfig( - AllForOnePermanentStrategy(List(classOf[Exception]), 3, TimeoutMillis), + AllForOneStrategy(List(classOf[Exception]), 3, TimeoutMillis), Supervise( pingpong, Permanent) @@ -102,7 +102,7 @@ object SupervisorSpec { val supervisor = Supervisor( SupervisorConfig( - OneForOnePermanentStrategy(List(classOf[Exception]), 3, TimeoutMillis), + OneForOneStrategy(List(classOf[Exception]), 3, TimeoutMillis), Supervise( pingpong, Permanent) @@ -118,7 +118,7 @@ object SupervisorSpec { val supervisor = Supervisor( SupervisorConfig( - AllForOnePermanentStrategy(List(classOf[Exception]), 3, TimeoutMillis), + AllForOneStrategy(List(classOf[Exception]), 3, TimeoutMillis), Supervise( pingpong1, Permanent) @@ -142,7 +142,7 @@ object SupervisorSpec { val supervisor = Supervisor( SupervisorConfig( - OneForOnePermanentStrategy(List(classOf[Exception]), 3, TimeoutMillis), + OneForOneStrategy(List(classOf[Exception]), 3, TimeoutMillis), Supervise( pingpong1, Permanent) @@ -166,13 +166,13 @@ object SupervisorSpec { val supervisor = Supervisor( SupervisorConfig( - AllForOnePermanentStrategy(List(classOf[Exception]), 3, TimeoutMillis), + AllForOneStrategy(List(classOf[Exception]), 3, TimeoutMillis), Supervise( pingpong1, Permanent) :: SupervisorConfig( - AllForOnePermanentStrategy(Nil, 3, TimeoutMillis), + AllForOneStrategy(Nil, 3, TimeoutMillis), Supervise( pingpong2, Permanent) @@ -217,7 +217,7 @@ class SupervisorSpec extends WordSpec with MustMatchers with BeforeAndAfterEach "A supervisor" must { "not restart programmatically linked temporary actor" in { - val master = actorOf(Props[Master].withFaultHandler(OneForOneTemporaryStrategy(List(classOf[Exception])))) + val master = actorOf(Props[Master].withFaultHandler(OneForOneStrategy(List(classOf[Exception]), Some(0)))) intercept[RuntimeException] { (master.?(Die, TimeoutMillis)).get @@ -374,7 +374,7 @@ class SupervisorSpec extends WordSpec with MustMatchers with BeforeAndAfterEach val supervisor = Supervisor( SupervisorConfig( - OneForOnePermanentStrategy(classOf[Exception] :: Nil, 3, 10000), + OneForOneStrategy(classOf[Exception] :: Nil, 3, 10000), Supervise(dyingActor, Permanent) :: Nil)) intercept[RuntimeException] { diff --git a/akka-actor-tests/src/test/scala/akka/actor/SupervisorTreeSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/SupervisorTreeSpec.scala index f5fa94ca42..cfcb82c171 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/SupervisorTreeSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/SupervisorTreeSpec.scala @@ -10,7 +10,7 @@ import akka.util.duration._ import akka.testkit.Testing.sleepFor import akka.testkit.{ EventFilter, filterEvents, filterException } import akka.dispatch.Dispatchers -import akka.config.Supervision.{ SupervisorConfig, OneForOnePermanentStrategy, Supervise, Permanent } +import akka.config.Supervision.{ SupervisorConfig, Supervise, Permanent } import Actor._ class SupervisorTreeSpec extends WordSpec with MustMatchers { @@ -35,7 +35,7 @@ class SupervisorTreeSpec extends WordSpec with MustMatchers { filterException[Exception] { log = "INIT" - val p = Props.default.withFaultHandler(OneForOnePermanentStrategy(List(classOf[Exception]), 3, 1000)) + val p = Props.default.withFaultHandler(OneForOneStrategy(List(classOf[Exception]), 3, 1000)) val lastActor = actorOf(p.withCreator(new Chainer(None)), "lastActor") val middleActor = actorOf(p.withCreator(new Chainer(Some(lastActor))), "middleActor") val headActor = actorOf(p.withCreator(new Chainer(Some(middleActor))), "headActor") diff --git a/akka-actor-tests/src/test/scala/akka/actor/Ticket669Spec.scala b/akka-actor-tests/src/test/scala/akka/actor/Ticket669Spec.scala index 40f29f6939..0368947647 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/Ticket669Spec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/Ticket669Spec.scala @@ -29,7 +29,7 @@ class Ticket669Spec extends WordSpec with MustMatchers with BeforeAndAfterAll { val supervised = Actor.actorOf[Supervised] val supervisor = Supervisor(SupervisorConfig( - AllForOnePermanentStrategy(List(classOf[Exception]), 5, 10000), + AllForOneStrategy(List(classOf[Exception]), 5, 10000), Supervise(supervised, Permanent) :: Nil)) supervised.!("test")(Some(sender)) @@ -44,7 +44,7 @@ class Ticket669Spec extends WordSpec with MustMatchers with BeforeAndAfterAll { val supervised = Actor.actorOf[Supervised] val supervisor = Supervisor(SupervisorConfig( - AllForOnePermanentStrategy(List(classOf[Exception]), 5, 10000), + AllForOneStrategy(List(classOf[Exception]), 5, 10000), Supervise(supervised, Temporary) :: Nil)) supervised.!("test")(Some(sender)) diff --git a/akka-actor-tests/src/test/scala/akka/routing/ActorPoolSpec.scala b/akka-actor-tests/src/test/scala/akka/routing/ActorPoolSpec.scala index b1951d8f6d..e8c7de8643 100644 --- a/akka-actor-tests/src/test/scala/akka/routing/ActorPoolSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/routing/ActorPoolSpec.scala @@ -3,12 +3,12 @@ package akka.routing import org.scalatest.WordSpec import org.scalatest.matchers.MustMatchers import akka.dispatch.{ KeptPromise, Future } +import akka.actor._ import akka.actor.Actor._ import akka.testkit.Testing._ import akka.actor.{ TypedActor, Actor, Props } import akka.testkit.{ TestLatch, filterEvents, EventFilter, filterException } import akka.util.duration._ -import akka.config.Supervision.OneForOnePermanentStrategy import java.util.concurrent.atomic.{ AtomicBoolean, AtomicInteger } object ActorPoolSpec { @@ -24,7 +24,7 @@ object ActorPoolSpec { } } - val faultHandler = OneForOnePermanentStrategy(List(classOf[Exception]), 5, 1000) + val faultHandler = OneForOneStrategy(List(classOf[Exception]), 5, 1000) } class ActorPoolSpec extends WordSpec with MustMatchers { @@ -439,7 +439,7 @@ class ActorPoolSpec extends WordSpec with MustMatchers { case _ ⇒ pingCount.incrementAndGet } }).withSupervisor(self)) - }).withFaultHandler(OneForOneTemporaryStrategy(List(classOf[Exception])))) + }).withFaultHandler(OneForOneStrategy(List(classOf[Exception]), Some(0)))) // default lifecycle // actor comes back right away @@ -538,7 +538,7 @@ class ActorPoolSpec extends WordSpec with MustMatchers { case _ ⇒ pingCount.incrementAndGet } }) - }).withFaultHandler(OneForOnePermanentStrategy(List(classOf[IllegalStateException]), 5, 1000))) + }).withFaultHandler(OneForOneStrategy(List(classOf[IllegalStateException]), 5, 1000))) // actor comes back right away pingCount.set(0) diff --git a/akka-actor-tests/src/test/scala/akka/ticket/Ticket703Spec.scala b/akka-actor-tests/src/test/scala/akka/ticket/Ticket703Spec.scala index 88ce2d3146..ad45a317e4 100644 --- a/akka-actor-tests/src/test/scala/akka/ticket/Ticket703Spec.scala +++ b/akka-actor-tests/src/test/scala/akka/ticket/Ticket703Spec.scala @@ -1,11 +1,10 @@ package akka.ticket -import akka.actor.Actor._ import akka.actor._ +import akka.actor.Actor._ import akka.routing._ import org.scalatest.WordSpec import org.scalatest.matchers.MustMatchers -import akka.config.Supervision.OneForOnePermanentStrategy class Ticket703Spec extends WordSpec with MustMatchers { @@ -28,7 +27,7 @@ class Ticket703Spec extends WordSpec with MustMatchers { tryReply("Response") } }) - }).withFaultHandler(OneForOnePermanentStrategy(List(classOf[Exception]), 5, 1000))) + }).withFaultHandler(OneForOneStrategy(List(classOf[Exception]), 5, 1000))) (actorPool.?("Ping", 7000)).await.result must be === Some("Response") } } diff --git a/akka-actor/src/main/scala/akka/actor/Actor.scala b/akka-actor/src/main/scala/akka/actor/Actor.scala index 1b680ba02c..e9df81174b 100644 --- a/akka-actor/src/main/scala/akka/actor/Actor.scala +++ b/akka-actor/src/main/scala/akka/actor/Actor.scala @@ -54,12 +54,9 @@ case class HotSwap(code: ActorRef ⇒ Actor.Receive, discardOld: Boolean = true) } case class Failed(@BeanProperty actor: ActorRef, - @BeanProperty cause: Throwable, - @BeanProperty recoverable: Boolean, - @BeanProperty timesRestarted: Int, - @BeanProperty restartTimeWindowStartMs: Long) extends AutoReceivedMessage with PossiblyHarmful + @BeanProperty cause: Throwable) extends AutoReceivedMessage with PossiblyHarmful -case class ChildTerminated(child: ActorRef, cause: Throwable) extends AutoReceivedMessage with PossiblyHarmful +case class ChildTerminated(@BeanProperty child: ActorRef, @BeanProperty cause: Throwable) extends AutoReceivedMessage with PossiblyHarmful case object RevertHotSwap extends AutoReceivedMessage with PossiblyHarmful @@ -603,7 +600,7 @@ trait Actor { case HotSwap(code, discardOld) ⇒ become(code(self), discardOld) case RevertHotSwap ⇒ unbecome() case f: Failed ⇒ context.handleFailure(f) - case ct: ChildTerminated ⇒ context.handleChildTerminated(ct) + case ct: ChildTerminated ⇒ context.handleChildTerminated(ct.child) case Kill ⇒ throw new ActorKilledException("Kill") case PoisonPill ⇒ val ch = channel diff --git a/akka-actor/src/main/scala/akka/actor/ActorCell.scala b/akka-actor/src/main/scala/akka/actor/ActorCell.scala index 4a0cb6819a..03a90c9791 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorCell.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorCell.scala @@ -4,23 +4,14 @@ package akka.actor -import akka.config.Supervision.{ - AllForOnePermanentStrategy, - AllForOneTemporaryStrategy, - FaultHandlingStrategy, - OneForOnePermanentStrategy, - OneForOneTemporaryStrategy, - Temporary, - Permanent -} import akka.dispatch._ import akka.util._ -import java.util.{ Collection ⇒ JCollection } -import java.util.concurrent.atomic.AtomicReference import scala.annotation.tailrec import scala.collection.immutable.Stack +import scala.collection.JavaConverters import akka.event.{ DumbMonitoring, EventHandler } -import java.util.concurrent.{ ConcurrentLinkedQueue, ScheduledFuture, ConcurrentHashMap, TimeUnit } +import java.util.concurrent.{ ScheduledFuture, TimeUnit } +import java.util.{ Collection ⇒ JCollection, Collections ⇒ JCollections } /** * The actor context - the view of the actor cell from the actor. @@ -55,7 +46,145 @@ private[akka] trait ActorContext { def handleFailure(fail: Failed): Unit - def handleChildTerminated(childtermination: ChildTerminated): Unit + def handleChildTerminated(child: ActorRef): Unit +} + +case class ChildRestartStats(val child: ActorRef, var maxNrOfRetriesCount: Int = 0, var restartTimeWindowStartNanos: Long = 0L) { + def requestRestartPermission(maxNrOfRetries: Option[Int], withinTimeRange: Option[Int]): Boolean = { + val denied = if (maxNrOfRetries.isEmpty && withinTimeRange.isEmpty) + false // Never deny an immortal + else if (maxNrOfRetries.nonEmpty && maxNrOfRetries.get < 1) + true //Always deny if no chance of restarting + else if (withinTimeRange.isEmpty) { + // restrict number of restarts + val retries = maxNrOfRetriesCount + 1 + maxNrOfRetriesCount = retries //Increment number of retries + retries > maxNrOfRetries.get + } else { + // cannot restart more than N within M timerange + val retries = maxNrOfRetriesCount + 1 + + val windowStart = restartTimeWindowStartNanos + val now = System.nanoTime + // we are within the time window if it isn't the first restart, or if the window hasn't closed + val insideWindow = if (windowStart == 0) true else (now - windowStart) <= TimeUnit.MILLISECONDS.toNanos(withinTimeRange.get) + + if (windowStart == 0 || !insideWindow) //(Re-)set the start of the window + restartTimeWindowStartNanos = now + + // reset number of restarts if window has expired, otherwise, increment it + maxNrOfRetriesCount = if (windowStart != 0 && !insideWindow) 1 else retries // increment number of retries + + val restartCountLimit = if (maxNrOfRetries.isDefined) maxNrOfRetries.get else 1 + + // the actor is dead if it dies X times within the window of restart + insideWindow && retries > restartCountLimit + } + + denied == false // if we weren't denied, we have a go + } +} + +sealed abstract class FaultHandlingStrategy { + + def trapExit: List[Class[_ <: Throwable]] + + def handleChildTerminated(child: ActorRef, linkedActors: List[ChildRestartStats]): List[ChildRestartStats] + + def processFailure(fail: Failed, linkedActors: List[ChildRestartStats]): Unit + + /** + * Returns whether it processed the failure or not + */ + final def handleFailure(fail: Failed, linkedActors: List[ChildRestartStats]): Boolean = { + if (trapExit.exists(_.isAssignableFrom(fail.cause.getClass))) { + processFailure(fail, linkedActors) + true + } else false + } +} + +object AllForOneStrategy { + def apply(trapExit: List[Class[_ <: Throwable]], maxNrOfRetries: Int, withinTimeRange: Int): AllForOneStrategy = + new AllForOneStrategy(trapExit, if (maxNrOfRetries < 0) None else Some(maxNrOfRetries), if (withinTimeRange < 0) None else Some(withinTimeRange)) +} + +/** + * Restart all actors linked to the same supervisor when one fails, + * trapExit = which Throwables should be intercepted + * maxNrOfRetries = the number of times an actor is allowed to be restarted + * withinTimeRange = millisecond time window for maxNrOfRetries, negative means no window + */ +case class AllForOneStrategy(trapExit: List[Class[_ <: Throwable]], + maxNrOfRetries: Option[Int] = None, + withinTimeRange: Option[Int] = None) extends FaultHandlingStrategy { + def this(trapExit: List[Class[_ <: Throwable]], maxNrOfRetries: Int, withinTimeRange: Int) = + this(trapExit, + if (maxNrOfRetries < 0) None else Some(maxNrOfRetries), if (withinTimeRange < 0) None else Some(withinTimeRange)) + + def this(trapExit: Array[Class[_ <: Throwable]], maxNrOfRetries: Int, withinTimeRange: Int) = + this(trapExit.toList, + if (maxNrOfRetries < 0) None else Some(maxNrOfRetries), if (withinTimeRange < 0) None else Some(withinTimeRange)) + + def this(trapExit: java.util.List[Class[_ <: Throwable]], maxNrOfRetries: Int, withinTimeRange: Int) = + this(trapExit.toArray.toList.asInstanceOf[List[Class[_ <: Throwable]]], + if (maxNrOfRetries < 0) None else Some(maxNrOfRetries), if (withinTimeRange < 0) None else Some(withinTimeRange)) + + def handleChildTerminated(child: ActorRef, linkedActors: List[ChildRestartStats]): List[ChildRestartStats] = { + linkedActors collect { + case stats if stats.child != child ⇒ stats.child.stop(); stats //2 birds with one stone: remove the child + stop the other children + } //TODO optimization to drop all children here already? + } + + def processFailure(fail: Failed, linkedActors: List[ChildRestartStats]): Unit = { + if (linkedActors.nonEmpty) { + if (linkedActors.forall(_.requestRestartPermission(maxNrOfRetries, withinTimeRange))) + linkedActors.foreach(_.child.restart()) + else + linkedActors.foreach(_.child.stop()) + } + } +} + +object OneForOneStrategy { + def apply(trapExit: List[Class[_ <: Throwable]], maxNrOfRetries: Int, withinTimeRange: Int): OneForOneStrategy = + new OneForOneStrategy(trapExit, if (maxNrOfRetries < 0) None else Some(maxNrOfRetries), if (withinTimeRange < 0) None else Some(withinTimeRange)) +} + +/** + * Restart an actor when it fails + * trapExit = which Throwables should be intercepted + * maxNrOfRetries = the number of times an actor is allowed to be restarted + * withinTimeRange = millisecond time window for maxNrOfRetries, negative means no window + */ +case class OneForOneStrategy(trapExit: List[Class[_ <: Throwable]], + maxNrOfRetries: Option[Int] = None, + withinTimeRange: Option[Int] = None) extends FaultHandlingStrategy { + def this(trapExit: List[Class[_ <: Throwable]], maxNrOfRetries: Int, withinTimeRange: Int) = + this(trapExit, + if (maxNrOfRetries < 0) None else Some(maxNrOfRetries), if (withinTimeRange < 0) None else Some(withinTimeRange)) + + def this(trapExit: Array[Class[_ <: Throwable]], maxNrOfRetries: Int, withinTimeRange: Int) = + this(trapExit.toList, + if (maxNrOfRetries < 0) None else Some(maxNrOfRetries), if (withinTimeRange < 0) None else Some(withinTimeRange)) + + def this(trapExit: java.util.List[Class[_ <: Throwable]], maxNrOfRetries: Int, withinTimeRange: Int) = + this(trapExit.toArray.toList.asInstanceOf[List[Class[_ <: Throwable]]], + if (maxNrOfRetries < 0) None else Some(maxNrOfRetries), if (withinTimeRange < 0) None else Some(withinTimeRange)) + + def handleChildTerminated(child: ActorRef, linkedActors: List[ChildRestartStats]): List[ChildRestartStats] = + linkedActors.filterNot(_.child == child) + + def processFailure(fail: Failed, linkedActors: List[ChildRestartStats]): Unit = { + linkedActors.find(_.child == fail.actor) match { + case Some(stats) ⇒ + if (stats.requestRestartPermission(maxNrOfRetries, withinTimeRange)) + fail.actor.restart() + else + fail.actor.stop() //TODO optimization to drop child here already? + case None ⇒ EventHandler.warning(this, "Got Failure from non-child: " + fail) + } + } } private[akka] object ActorCell { @@ -66,49 +195,37 @@ private[akka] object ActorCell { private[akka] class ActorCell( val self: ActorRef with ScalaActorRef, - props: Props, - _receiveTimeout: Option[Long], - _hotswap: Stack[PartialFunction[Any, Unit]]) - extends ActorContext { + val props: Props, + @volatile var receiveTimeout: Option[Long], + @volatile var hotswap: Stack[PartialFunction[Any, Unit]]) extends ActorContext { import ActorCell._ - val guard = new ReentrantGuard // TODO: remove this last synchronization point - @volatile - var futureTimeout: Option[ScheduledFuture[AnyRef]] = None + var futureTimeout: Option[ScheduledFuture[AnyRef]] = None //FIXME TODO Doesn't need to be volatile either, since it will only ever be accessed when a message is processed - @volatile //FIXME doesn't need to be volatile - var maxNrOfRetriesCount: Int = 0 + @volatile //FIXME TODO doesn't need to be volatile if we remove the def linkedActors: JCollection[ActorRef] + var _linkedActors: List[ChildRestartStats] = Nil - @volatile //FIXME doesn't need to be volatile - var restartTimeWindowStartNanos: Long = 0L - - val _linkedActors = new ConcurrentLinkedQueue[ActorRef] - - @volatile //FIXME doesn't need to be volatile - var hotswap: Stack[PartialFunction[Any, Unit]] = _hotswap // TODO: currently settable from outside for compatibility - - @volatile - var receiveTimeout: Option[Long] = _receiveTimeout // TODO: currently settable from outside for compatibility - - @volatile + @volatile //TODO FIXME Might be able to make this non-volatile since it should be guarded by a mailbox.isShutdown test (which will force volatile piggyback read) var currentMessage: Envelope = null - val actor: AtomicReference[Actor] = new AtomicReference[Actor]() //FIXME We can most probably make this just a regular reference to Actor + @volatile //TODO FIXME Might be able to make this non-volatile since it should be guarded by a mailbox.isShutdown test (which will force volatile piggyback read) + var actor: Actor = _ //FIXME We can most probably make this just a regular reference to Actor def ref: ActorRef with ScalaActorRef = self def uuid: Uuid = self.uuid - def actorClass: Class[_] = actor.get.getClass + //FIXME TODO REMOVE THIS + def actorClass: Class[_] = actor.getClass def dispatcher: MessageDispatcher = props.dispatcher def isRunning: Boolean = !isShutdown def isShutdown: Boolean = mailbox.isClosed - @volatile + @volatile //This must be volatile var mailbox: Mailbox = _ def start(): Unit = { @@ -126,30 +243,12 @@ private[akka] class ActorCell( dispatcher.attach(this) } - def newActor(restart: Boolean): Actor = { - val stackBefore = contextStack.get - contextStack.set(stackBefore.push(this)) - try { - val instance = props.creator() - - if (instance eq null) - throw new ActorInitializationException("Actor instance passed to actorOf can't be 'null'") - - instance - } finally { - val stackAfter = contextStack.get - if (stackAfter.nonEmpty) - contextStack.set(if (stackAfter.head eq null) stackAfter.pop.pop else stackAfter.pop) // pop null marker plus our context - } - } - def suspend(): Unit = dispatcher.systemDispatch(SystemEnvelope(this, Suspend, NullChannel)) def resume(): Unit = dispatcher.systemDispatch(SystemEnvelope(this, Resume, NullChannel)) private[akka] def stop(): Unit = - if (isRunning) - dispatcher.systemDispatch(SystemEnvelope(this, Terminate, NullChannel)) + dispatcher.systemDispatch(SystemEnvelope(this, Terminate, NullChannel)) def link(subject: ActorRef): ActorRef = { dispatcher.systemDispatch(SystemEnvelope(this, Link(subject), NullChannel)) @@ -161,7 +260,11 @@ private[akka] class ActorCell( subject } - def linkedActors: JCollection[ActorRef] = java.util.Collections.unmodifiableCollection(_linkedActors) + @deprecated("Dog slow and racy") + def linkedActors: JCollection[ActorRef] = _linkedActors match { + case Nil ⇒ JCollections.emptyList[ActorRef]() + case some ⇒ JCollections.unmodifiableCollection(JavaConverters.asJavaCollectionConverter(some.map(_.child)).asJavaCollection) + } //TODO FIXME remove this method def supervisor: Option[ActorRef] = props.supervisor @@ -199,25 +302,65 @@ private[akka] class ActorCell( def systemInvoke(envelope: SystemEnvelope) { def create(recreation: Boolean): Unit = try { - actor.get() match { + + //This method is in charge of setting up the contextStack and create a new instance of the Actor + def newActor(): Actor = { + val stackBefore = contextStack.get + contextStack.set(stackBefore.push(this)) + try { + val instance = props.creator() + + if (instance eq null) + throw new ActorInitializationException("Actor instance passed to actorOf can't be 'null'") + + instance + } finally { + val stackAfter = contextStack.get + if (stackAfter.nonEmpty) + contextStack.set(if (stackAfter.head eq null) stackAfter.pop.pop else stackAfter.pop) // pop null marker plus our context + } + } + + actor match { case null ⇒ - val created = newActor(restart = false) //TODO !!!! Notify supervisor on failure to create! - actor.set(created) + val created = newActor() //TODO !!!! Notify supervisor on failure to create! + actor = created created.preStart() checkReceiveTimeout if (Actor.debugLifecycle) EventHandler.debug(created, "started") + case instance if recreation ⇒ - restart(new Exception("Restart commanded"), None, None) + val reason = new Exception("CRASHED") //FIXME TODO stash away the exception that caused the failure and reuse that? <------- !!!!!!!!!! RED RED RED + try { + val failedActor = actor + if (Actor.debugLifecycle) EventHandler.debug(failedActor, "restarting") + if (failedActor ne null) { + val c = currentMessage //One read only plz + failedActor.preRestart(reason, if (c ne null) Some(c.message) else None) + } + val freshActor = newActor() + clearActorContext() + actor = freshActor // assign it here so if preStart fails, we can null out the sef-refs next call + freshActor.postRestart(reason) + if (Actor.debugLifecycle) EventHandler.debug(freshActor, "restarted") + } catch { + case e ⇒ + EventHandler.error(e, self, "Exception in restart of Actor [%s]".format(toString)) + throw e + } finally { + currentMessage = null + } + + dispatcher.resume(this) //FIXME should this be moved down? + + //FIXME TODO How should we handle restarting of children? <----- !!!!!!!!!!!!! RED RED RED case _ ⇒ } } catch { case e ⇒ envelope.channel.sendException(e) - if (supervisor.isDefined) - DumbMonitoring.signal(Failed(self, e, false, maxNrOfRetriesCount, restartTimeWindowStartNanos), supervisor.get) - else - throw e + if (supervisor.isDefined) supervisor.get ! Failed(self, e) else throw e } def suspend(): Unit = dispatcher suspend this @@ -232,21 +375,18 @@ private[akka] class ActorCell( dispatcher.detach(this) try { - val a = actor.get + val a = actor if (Actor.debugLifecycle) EventHandler.debug(a, "stopping") if (a ne null) a.postStop() - { //Stop supervised actors - var a = _linkedActors.poll() - while (a ne null) { - a.stop() - a = _linkedActors.poll() - } - } + //Stop supervised actors + _linkedActors.foreach(_.child.stop()) + _linkedActors = Nil } finally { val cause = new ActorKilledException("Stopped") //FIXME make this an object, can be reused everywhere - if (supervisor.isDefined) supervisor ! ChildTerminated(self, cause) + if (supervisor.isDefined) supervisor.get ! ChildTerminated(self, cause) + DumbMonitoring.signal(Terminated(self, cause)) currentMessage = null @@ -254,32 +394,42 @@ private[akka] class ActorCell( } } - guard.lock.lock() + def supervise(child: ActorRef): Unit = { + val links = _linkedActors + if (!links.contains(child)) { + _linkedActors = new ChildRestartStats(child) :: links + if (Actor.debugLifecycle) EventHandler.debug(actor, "now supervising " + child) + } else EventHandler.warning(actor, "Already supervising " + child) + } + try { - if (!mailbox.isClosed) { + val isClosed = mailbox.isClosed //Fence plus volatile read + if (!isClosed) { envelope.message match { - case Create ⇒ create(recreation = false) - case Recreate ⇒ create(recreation = true) - case Link(subject) ⇒ akka.event.DumbMonitoring.link(self, subject); if (Actor.debugLifecycle) EventHandler.debug(actor.get(), "now monitoring " + subject) - case Unlink(subject) ⇒ akka.event.DumbMonitoring.unlink(self, subject); if (Actor.debugLifecycle) EventHandler.debug(actor.get(), "stopped monitoring " + subject) + case Create ⇒ create(recreation = false) + case Recreate ⇒ create(recreation = true) + case Link(subject) ⇒ + akka.event.DumbMonitoring.link(self, subject) + if (Actor.debugLifecycle) EventHandler.debug(actor, "now monitoring " + subject) + case Unlink(subject) ⇒ + akka.event.DumbMonitoring.unlink(self, subject) + if (Actor.debugLifecycle) EventHandler.debug(actor, "stopped monitoring " + subject) case Suspend ⇒ suspend() case Resume ⇒ resume() case Terminate ⇒ terminate() - case Supervise(child) ⇒ if (!_linkedActors.contains(child)) { _linkedActors.offer(child); if (Actor.debugLifecycle) EventHandler.debug(actor.get(), "now supervising " + child) } + case Supervise(child) ⇒ supervise(child) } } } catch { case e ⇒ //Should we really catch everything here? - EventHandler.error(e, actor.get(), "error while processing " + envelope.message) + EventHandler.error(e, actor, "error while processing " + envelope.message) throw e } finally { - mailbox.acknowledgeStatus() - guard.lock.unlock() + mailbox.acknowledgeStatus() //Volatile write } } def invoke(messageHandle: Envelope) { - guard.lock.lock() try { if (!mailbox.isClosed) { currentMessage = messageHandle @@ -287,7 +437,7 @@ private[akka] class ActorCell( try { cancelReceiveTimeout() // FIXME: leave this here? - actor.get().apply(messageHandle.message) + actor(messageHandle.message) currentMessage = null // reset current message after successful invocation } catch { case e ⇒ @@ -299,7 +449,7 @@ private[akka] class ActorCell( channel.sendException(e) if (supervisor.isDefined) - DumbMonitoring.signal(Failed(self, e, true, maxNrOfRetriesCount, restartTimeWindowStartNanos), supervisor.get) + supervisor.get ! Failed(self, e) else dispatcher.resume(this) @@ -309,7 +459,7 @@ private[akka] class ActorCell( } } catch { case e ⇒ - EventHandler.error(e, actor.get(), e.getMessage) + EventHandler.error(e, actor, e.getMessage) throw e } } else { @@ -318,146 +468,14 @@ private[akka] class ActorCell( } } finally { mailbox.acknowledgeStatus() - guard.lock.unlock() } } - def handleFailure(fail: Failed) { - props.faultHandler match { - case AllForOnePermanentStrategy(trapExit, maxRetries, within) if trapExit.exists(_.isAssignableFrom(fail.cause.getClass)) ⇒ - restartLinkedActors(fail.cause, maxRetries, within) + def handleFailure(fail: Failed): Unit = props.faultHandler.handleFailure(fail, _linkedActors) - case AllForOneTemporaryStrategy(trapExit) if trapExit.exists(_.isAssignableFrom(fail.cause.getClass)) ⇒ - restartLinkedActors(fail.cause, None, None) + def handleChildTerminated(child: ActorRef): Unit = _linkedActors = props.faultHandler.handleChildTerminated(child, _linkedActors) - case OneForOnePermanentStrategy(trapExit, maxRetries, within) if trapExit.exists(_.isAssignableFrom(fail.cause.getClass)) ⇒ - fail.actor.restart(fail.cause, maxRetries, within) - - case OneForOneTemporaryStrategy(trapExit) if trapExit.exists(_.isAssignableFrom(fail.cause.getClass)) ⇒ - fail.actor.stop() - - case _ ⇒ - if (supervisor.isDefined) throw fail.cause else fail.actor.stop() //Escalate problem if not handled here - } - } - - def handleChildTerminated(ct: ChildTerminated): Unit = { - props.faultHandler match { - case AllForOnePermanentStrategy(trapExit, maxRetries, within) if trapExit.exists(_.isAssignableFrom(ct.cause.getClass)) ⇒ - //STOP ALL AND ESCALATE - - case AllForOneTemporaryStrategy(trapExit) if trapExit.exists(_.isAssignableFrom(ct.cause.getClass)) ⇒ - //STOP ALL? - - case OneForOnePermanentStrategy(trapExit, maxRetries, within) if trapExit.exists(_.isAssignableFrom(ct.cause.getClass)) ⇒ - //ESCALATE? - - case OneForOneTemporaryStrategy(trapExit) if trapExit.exists(_.isAssignableFrom(ct.cause.getClass)) ⇒ - _linkedActors.remove(ct.child) - - case _ ⇒ throw ct.cause //Escalate problem if not handled here - } - } - - def restart(reason: Throwable, maxNrOfRetries: Option[Int], withinTimeRange: Option[Int]) { - def performRestart() { - val failedActor = actor.get - if (Actor.debugLifecycle) EventHandler.debug(failedActor, "restarting") - if (failedActor ne null) { - val c = currentMessage //One read only plz - failedActor.preRestart(reason, if (c ne null) Some(c.message) else None) - } - val freshActor = newActor(restart = true) - clearActorContext() - actor.set(freshActor) // assign it here so if preStart fails, we can null out the sef-refs next call - freshActor.postRestart(reason) - if (Actor.debugLifecycle) EventHandler.debug(freshActor, "restarted") - } - - @tailrec - def attemptRestart() { - val success = if (requestRestartPermission(maxNrOfRetries, withinTimeRange)) { - guard.withGuard[Boolean] { - val success = - try { - performRestart() - true - } catch { - case e ⇒ - EventHandler.error(e, self, "Exception in restart of Actor [%s]".format(toString)) - false // an error or exception here should trigger a retry - } finally { - currentMessage = null - } - - if (success) { - dispatcher.resume(this) - restartLinkedActors(reason, maxNrOfRetries, withinTimeRange) - } - - success - } - } else { - stop() - true // done - } - - if (success) () // alles gut - else attemptRestart() - } - - attemptRestart() // recur - } - - def requestRestartPermission(maxNrOfRetries: Option[Int], withinTimeRange: Option[Int]): Boolean = { - val denied = if (maxNrOfRetries.isEmpty && withinTimeRange.isEmpty) { - // immortal - false - } else if (withinTimeRange.isEmpty) { - // restrict number of restarts - val retries = maxNrOfRetriesCount + 1 - maxNrOfRetriesCount = retries //Increment number of retries - retries > maxNrOfRetries.get - } else { - // cannot restart more than N within M timerange - val retries = maxNrOfRetriesCount + 1 - - val windowStart = restartTimeWindowStartNanos - val now = System.nanoTime - // we are within the time window if it isn't the first restart, or if the window hasn't closed - val insideWindow = if (windowStart == 0) true else (now - windowStart) <= TimeUnit.MILLISECONDS.toNanos(withinTimeRange.get) - - if (windowStart == 0 || !insideWindow) //(Re-)set the start of the window - restartTimeWindowStartNanos = now - - // reset number of restarts if window has expired, otherwise, increment it - maxNrOfRetriesCount = if (windowStart != 0 && !insideWindow) 1 else retries // increment number of retries - - val restartCountLimit = if (maxNrOfRetries.isDefined) maxNrOfRetries.get else 1 - - // the actor is dead if it dies X times within the window of restart - insideWindow && retries > restartCountLimit - } - - denied == false // if we weren't denied, we have a go - } - - protected[akka] def restartLinkedActors(reason: Throwable, maxNrOfRetries: Option[Int], withinTimeRange: Option[Int]) { - props.faultHandler.lifeCycle match { - case Temporary ⇒ - { //Stop supervised actors - var a = _linkedActors.poll() - while (a ne null) { - a.stop() - a = _linkedActors.poll() - } - } - - case Permanent ⇒ - val i = _linkedActors.iterator - while (i.hasNext) i.next().restart(reason, maxNrOfRetries, withinTimeRange) - } - } + def restart(): Unit = dispatcher.systemDispatch(SystemEnvelope(this, Recreate, NullChannel)) def checkReceiveTimeout() { cancelReceiveTimeout() @@ -497,7 +515,7 @@ private[akka] class ActorCell( lookupAndSetSelfFields(parent, actor, newContext) } } - val a = actor.get() + val a = actor if (a ne null) lookupAndSetSelfFields(a.getClass, a, newContext) } diff --git a/akka-actor/src/main/scala/akka/actor/ActorRef.scala b/akka-actor/src/main/scala/akka/actor/ActorRef.scala index 161fdf5cc9..44cf26e1ab 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRef.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRef.scala @@ -161,7 +161,7 @@ abstract class ActorRef extends ActorRefShared with UntypedChannel with ReplyCha timeout: Timeout, channel: UntypedChannel): Future[Any] - protected[akka] def restart(reason: Throwable, maxNrOfRetries: Option[Int], withinTimeRange: Option[Int]) + protected[akka] def restart(): Unit override def hashCode: Int = HashCode.hash(HashCode.SEED, address) @@ -271,10 +271,10 @@ class LocalActorRef private[akka] ( //FIXME TODO REMOVE THIS @deprecated("This method does a spin-lock to block for the actor, which might never be there, do not use this") protected[akka] def underlyingActorInstance: Actor = { - var instance = actorCell.actor.get + var instance = actorCell.actor while ((instance eq null) && actorCell.isRunning) { try { Thread.sleep(1) } catch { case i: InterruptedException ⇒ } - instance = actorCell.actor.get + instance = actorCell.actor } instance } @@ -293,8 +293,7 @@ class LocalActorRef private[akka] ( protected[akka] def handleFailure(fail: Failed): Unit = actorCell.handleFailure(fail) - protected[akka] def restart(reason: Throwable, maxNrOfRetries: Option[Int], withinTimeRange: Option[Int]): Unit = - actorCell.restart(reason, maxNrOfRetries, withinTimeRange) + protected[akka] def restart(): Unit = actorCell.restart() // ========= PRIVATE FUNCTIONS ========= @@ -382,9 +381,7 @@ private[akka] case class RemoteActorRef private[akka] ( def supervisor: Option[ActorRef] = unsupported - protected[akka] def restart(reason: Throwable, maxNrOfRetries: Option[Int], withinTimeRange: Option[Int]) { - unsupported - } + protected[akka] def restart(): Unit = unsupported private def unsupported = throw new UnsupportedOperationException("Not supported for RemoteActorRef") } @@ -482,9 +479,7 @@ trait UnsupportedActorRef extends ActorRef with ScalaActorRef { def resume(): Unit = unsupported - protected[akka] def restart(reason: Throwable, maxNrOfRetries: Option[Int], withinTimeRange: Option[Int]) { - unsupported - } + protected[akka] def restart(): Unit = unsupported private def unsupported = throw new UnsupportedOperationException("Not supported for %s".format(getClass.getName)) } diff --git a/akka-actor/src/main/scala/akka/actor/Props.scala b/akka-actor/src/main/scala/akka/actor/Props.scala index cbaf77fae1..3f39206d94 100644 --- a/akka-actor/src/main/scala/akka/actor/Props.scala +++ b/akka-actor/src/main/scala/akka/actor/Props.scala @@ -4,7 +4,6 @@ package akka.actor -import akka.config.Supervision._ import akka.dispatch._ import akka.japi.Creator import akka.util._ @@ -20,7 +19,7 @@ object Props { final val defaultDeployId: String = "" final val defaultDispatcher: MessageDispatcher = Dispatchers.defaultGlobalDispatcher final val defaultTimeout: Timeout = Timeout(Duration(Actor.TIMEOUT, "millis")) - final val defaultFaultHandler: FaultHandlingStrategy = AllForOnePermanentStrategy(classOf[Exception] :: Nil, None, None) + final val defaultFaultHandler: FaultHandlingStrategy = OneForOneStrategy(classOf[Exception] :: Nil, None, None) final val defaultSupervisor: Option[ActorRef] = None /** diff --git a/akka-actor/src/main/scala/akka/config/Configurator.scala b/akka-actor/src/main/scala/akka/config/Configurator.scala index 4f7ed90a26..f0d067c5bc 100644 --- a/akka-actor/src/main/scala/akka/config/Configurator.scala +++ b/akka-actor/src/main/scala/akka/config/Configurator.scala @@ -4,7 +4,8 @@ package akka.config -import akka.config.Supervision.{ SuperviseTypedActor, FaultHandlingStrategy } +import akka.actor.FaultHandlingStrategy +import akka.config.Supervision.SuperviseTypedActor private[akka] trait TypedActorConfiguratorBase { def getExternalDependency[T](clazz: Class[T]): T diff --git a/akka-actor/src/main/scala/akka/config/SupervisionConfig.scala b/akka-actor/src/main/scala/akka/config/SupervisionConfig.scala index a81b888d84..3be551248a 100644 --- a/akka-actor/src/main/scala/akka/config/SupervisionConfig.scala +++ b/akka-actor/src/main/scala/akka/config/SupervisionConfig.scala @@ -4,6 +4,7 @@ package akka.config +import akka.actor.FaultHandlingStrategy import akka.dispatch.MessageDispatcher import akka.actor.{ Terminated, ActorRef } import akka.japi.{ Procedure2 } @@ -20,7 +21,6 @@ object Supervision { abstract class Server extends ConfigElement sealed abstract class LifeCycle extends ConfigElement - sealed abstract class FaultHandlingStrategy(val trapExit: List[Class[_ <: Throwable]], val lifeCycle: LifeCycle) extends ConfigElement case class SupervisorConfig(restartStrategy: FaultHandlingStrategy, worker: List[Server], maxRestartsHandler: (ActorRef, Terminated) ⇒ Unit = { (aRef, max) ⇒ () }) extends Server { //Java API @@ -40,66 +40,6 @@ object Supervision { def unapply(supervise: Supervise) = Some((supervise.actorRef, supervise.lifeCycle, supervise.registerAsRemoteService)) } - object AllForOnePermanentStrategy { - def apply(trapExit: List[Class[_ <: Throwable]], maxNrOfRetries: Int, withinTimeRange: Int): AllForOnePermanentStrategy = - new AllForOnePermanentStrategy(trapExit, - if (maxNrOfRetries < 0) None else Some(maxNrOfRetries), if (withinTimeRange < 0) None else Some(withinTimeRange)) - } - - /** - * Restart all actors linked to the same supervisor when one fails, - * trapExit = which Throwables should be intercepted - * maxNrOfRetries = the number of times an actor is allowed to be restarted - * withinTimeRange = millisecond time window for maxNrOfRetries, negative means no window - */ - case class AllForOnePermanentStrategy(override val trapExit: List[Class[_ <: Throwable]], - maxNrOfRetries: Option[Int] = None, - withinTimeRange: Option[Int] = None) extends FaultHandlingStrategy(trapExit, Permanent) { - def this(trapExit: List[Class[_ <: Throwable]], maxNrOfRetries: Int, withinTimeRange: Int) = - this(trapExit, - if (maxNrOfRetries < 0) None else Some(maxNrOfRetries), if (withinTimeRange < 0) None else Some(withinTimeRange)) - - def this(trapExit: Array[Class[_ <: Throwable]], maxNrOfRetries: Int, withinTimeRange: Int) = - this(trapExit.toList, - if (maxNrOfRetries < 0) None else Some(maxNrOfRetries), if (withinTimeRange < 0) None else Some(withinTimeRange)) - - def this(trapExit: java.util.List[Class[_ <: Throwable]], maxNrOfRetries: Int, withinTimeRange: Int) = - this(trapExit.toArray.toList.asInstanceOf[List[Class[_ <: Throwable]]], - if (maxNrOfRetries < 0) None else Some(maxNrOfRetries), if (withinTimeRange < 0) None else Some(withinTimeRange)) - } - - case class AllForOneTemporaryStrategy(override val trapExit: List[Class[_ <: Throwable]]) extends FaultHandlingStrategy(trapExit, Temporary) - - object OneForOnePermanentStrategy { - def apply(trapExit: List[Class[_ <: Throwable]], maxNrOfRetries: Int, withinTimeRange: Int): OneForOnePermanentStrategy = - new OneForOnePermanentStrategy(trapExit, - if (maxNrOfRetries < 0) None else Some(maxNrOfRetries), if (withinTimeRange < 0) None else Some(withinTimeRange)) - } - - /** - * Restart an actor when it fails - * trapExit = which Throwables should be intercepted - * maxNrOfRetries = the number of times an actor is allowed to be restarted - * withinTimeRange = millisecond time window for maxNrOfRetries, negative means no window - */ - case class OneForOnePermanentStrategy(override val trapExit: List[Class[_ <: Throwable]], - maxNrOfRetries: Option[Int] = None, - withinTimeRange: Option[Int] = None) extends FaultHandlingStrategy(trapExit, Permanent) { - def this(trapExit: List[Class[_ <: Throwable]], maxNrOfRetries: Int, withinTimeRange: Int) = - this(trapExit, - if (maxNrOfRetries < 0) None else Some(maxNrOfRetries), if (withinTimeRange < 0) None else Some(withinTimeRange)) - - def this(trapExit: Array[Class[_ <: Throwable]], maxNrOfRetries: Int, withinTimeRange: Int) = - this(trapExit.toList, - if (maxNrOfRetries < 0) None else Some(maxNrOfRetries), if (withinTimeRange < 0) None else Some(withinTimeRange)) - - def this(trapExit: java.util.List[Class[_ <: Throwable]], maxNrOfRetries: Int, withinTimeRange: Int) = - this(trapExit.toArray.toList.asInstanceOf[List[Class[_ <: Throwable]]], - if (maxNrOfRetries < 0) None else Some(maxNrOfRetries), if (withinTimeRange < 0) None else Some(withinTimeRange)) - } - - case class OneForOneTemporaryStrategy(override val trapExit: List[Class[_ <: Throwable]]) extends FaultHandlingStrategy(trapExit, Temporary) - //Scala API case object Permanent extends LifeCycle case object Temporary extends LifeCycle diff --git a/akka-actor/src/main/scala/akka/event/DeathWatch.scala b/akka-actor/src/main/scala/akka/event/DeathWatch.scala index 5e3ea08226..8a0d7f958c 100644 --- a/akka-actor/src/main/scala/akka/event/DeathWatch.scala +++ b/akka-actor/src/main/scala/akka/event/DeathWatch.scala @@ -4,14 +4,9 @@ package akka.event -import akka.config.Supervision.{ FaultHandlingStrategy } -import java.util.concurrent.ConcurrentHashMap -import java.util.concurrent.locks.ReentrantReadWriteLock import akka.actor._ -import akka.dispatch.SystemEnvelope trait DeathWatch { - def signal(fail: Failed, supervisor: ActorRef): Unit def signal(terminated: Terminated): Unit } @@ -26,12 +21,6 @@ object DumbMonitoring extends DeathWatch with Monitoring { val monitoring = new akka.util.Index[ActorRef, ActorRef] //Key == monitored, Values == monitors - def signal(fail: Failed, supervisor: ActorRef): Unit = - supervisor match { - case l: LocalActorRef ⇒ l ! fail //FIXME, should Failed be a system message ? => l.underlying.dispatcher.systemDispatch(SystemEnvelope(l.underlying, fail, NullChannel)) - case other ⇒ throw new IllegalStateException("Supervision only works for local actors currently") - } - def signal(terminated: Terminated): Unit = { val monitors = monitoring.remove(terminated.actor) if (monitors.isDefined) @@ -48,33 +37,4 @@ object DumbMonitoring extends DeathWatch with Monitoring { def unlink(monitor: ActorRef, monitored: ActorRef): Unit = { monitoring.remove(monitored, monitor) } -} - -/* -* Scenarios that can occur: -* -* Child dies without supervisor (will perhaps not be possible) -* Child dies whose supervisor is dead (race) -* Child dies, supervisor cannot deal with the problem and has no supervisor (will perhaps not be possible) -* Child dies, supervisor cannot deal with the problem and its supervisor is dead (race) -* Child dies, supervisor can deal with it: AllForOnePermanentStrategy -* Child dies, supervisor can deal with it: AllForOnePermanentStrategy but has reached max restart quota for child -* Child dies, supervisor can deal with it: AllForOneTemporaryStrategy -* Multiple children dies, supervisor can deal with it: AllForOnePermanentStrategy -* Multiple children dies, supervisor can deal with it: AllForOneTemporaryStrategy -* Child dies, supervisor can deal with it: OneForOnePermanentStrategy -* Child dies, supervisor can deal with it: OneForOneTemporaryStrategy -* -* Things that should be cleared after restart -* - monitored children (not supervised) -* -* Things that should be cleared after resume -* - nothing -* -* Things that should be cleared after death -* - everything -* -* Default implementation of preRestart == postStop -* Default implementation of postRestart == preStart -* -* */ \ No newline at end of file +} \ No newline at end of file diff --git a/akka-camel/src/test/scala/akka/camel/ConsumerScalaTest.scala b/akka-camel/src/test/scala/akka/camel/ConsumerScalaTest.scala index 465e713b0e..8f847b9169 100644 --- a/akka-camel/src/test/scala/akka/camel/ConsumerScalaTest.scala +++ b/akka-camel/src/test/scala/akka/camel/ConsumerScalaTest.scala @@ -178,7 +178,7 @@ class ConsumerScalaTest extends WordSpec with BeforeAndAfterAll with MustMatcher val consumer = Actor.actorOf(new SupervisedConsumer("reply-channel-test-2")) val supervisor = Supervisor( SupervisorConfig( - OneForOnePermanentStrategy(List(classOf[Exception]), 2, 10000), + OneForOneStrategy(List(classOf[Exception]), 2, 10000), Supervise(consumer, Permanent) :: Nil)) val latch = new CountDownLatch(1) @@ -192,7 +192,7 @@ class ConsumerScalaTest extends WordSpec with BeforeAndAfterAll with MustMatcher val consumer = Actor.actorOf(Props(new SupervisedConsumer("reply-channel-test-3"))) val supervisor = Supervisor( SupervisorConfig( - OneForOneTemporaryStrategy(List(classOf[Exception])), + OneForOneStrategy(List(classOf[Exception]), Some(0)), Supervise(consumer, Temporary) :: Nil)) val latch = new CountDownLatch(1) diff --git a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala index 66fafde7e7..b80f056ae0 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala @@ -291,7 +291,7 @@ class DefaultClusterNode private[akka] ( private[cluster] lazy val remoteDaemonSupervisor = Supervisor( SupervisorConfig( - OneForOnePermanentStrategy(List(classOf[Exception]), Int.MaxValue, Int.MaxValue), // is infinite restart what we want? + OneForOneStrategy(List(classOf[Exception]), Int.MaxValue, Int.MaxValue), // is infinite restart what we want? Supervise( remoteDaemon, Permanent) diff --git a/akka-docs/java/fault-tolerance.rst b/akka-docs/java/fault-tolerance.rst index 7f9543428f..23a3b39bb2 100644 --- a/akka-docs/java/fault-tolerance.rst +++ b/akka-docs/java/fault-tolerance.rst @@ -81,7 +81,7 @@ Here is an example of how to define a restart strategy: .. code-block:: java - new AllForOnePermanentStrategy( //Or OneForOnePermanentStrategy + new AllForOneStrategy( //Or OneForOneStrategy new Class[]{ Exception.class }, //List of Exceptions/Throwables to handle 3, // maximum number of restart retries 5000 // within time in millis @@ -119,7 +119,7 @@ The Actor’s supervision can be declaratively defined by creating a ‘Supervis Supervisor supervisor = Supervisor.apply( new SupervisorConfig( - new AllForOnePermanentStrategy(new Class[]{Exception.class}, 3, 5000), + new AllForOneStrategy(new Class[]{Exception.class}, 3, 5000), new Supervise[] { new Supervise( actorOf(MyActor1.class), @@ -151,7 +151,7 @@ MaximumNumberOfRestartsWithinTimeRangeReached message. Supervisor supervisor = Supervisor.apply( new SupervisorConfig( - new AllForOnePermanentStrategy(new Class[]{Exception.class}, 3, 5000), + new AllForOneStrategy(new Class[]{Exception.class}, 3, 5000), new Supervise[] { new Supervise( actorOf(MyActor1.class), @@ -180,7 +180,7 @@ Example usage: SupervisorFactory factory = new SupervisorFactory( new SupervisorConfig( - new OneForOnePermanentStrategy(new Class[]{Exception.class}, 3, 5000), + new OneForOneStrategy(new Class[]{Exception.class}, 3, 5000), new Supervise[] { new Supervise( actorOf(MyActor1.class), @@ -211,7 +211,7 @@ Here is an example: Supervisor supervisor = Supervisor.apply( new SupervisorConfig( - new AllForOnePermanentStrategy(new Class[]{Exception.class}, 3, 5000), + new AllForOneStrategy(new Class[]{Exception.class}, 3, 5000), new Supervise[] { new Supervise( actorOf(MyActor1.class), @@ -257,11 +257,11 @@ The supervising Actor also needs to define a fault handler that defines the rest The different options are: -- AllForOnePermanentStrategy(trapExit, maxNrOfRetries, withinTimeRange) +- AllForOneStrategy(trapExit, maxNrOfRetries, withinTimeRange) - trapExit is an Array of classes inheriting from Throwable, they signal which types of exceptions this actor will handle -- OneForOnePermanentStrategy(trapExit, maxNrOfRetries, withinTimeRange) +- OneForOneStrategy(trapExit, maxNrOfRetries, withinTimeRange) - trapExit is an Array of classes inheriting from Throwable, they signal which types of exceptions this actor will handle @@ -269,7 +269,7 @@ Here is an example: .. code-block:: java - getContext().setFaultHandler(new AllForOnePermanentStrategy(new Class[]{MyException.class, IOException.class}, 3, 1000)); + getContext().setFaultHandler(new AllForOneStrategy(new Class[]{MyException.class, IOException.class}, 3, 1000)); Putting all this together it can look something like this: @@ -277,7 +277,7 @@ Putting all this together it can look something like this: class MySupervisor extends UntypedActor { public MySupervisor() { - getContext().setFaultHandler(new AllForOnePermanentStrategy(new Class[]{MyException.class, IOException.class}, 3, 1000)); + getContext().setFaultHandler(new AllForOneStrategy(new Class[]{MyException.class, IOException.class}, 3, 1000)); } public void onReceive(Object message) throws Exception { @@ -362,7 +362,7 @@ If you remember, when you define the 'RestartStrategy' you also defined maximum .. code-block:: java - new AllForOnePermanentStrategy( // FaultHandlingStrategy policy (AllForOnePermanentStrategy or OneForOnePermanentStrategy) + new AllForOneStrategy( // FaultHandlingStrategy policy (AllForOneStrategy or OneForOneStrategy) new Class[]{MyException.class, IOException.class}, //What types of errors will be handled 3, // maximum number of restart retries 5000 // within time in millis @@ -425,7 +425,7 @@ Here is an example: TypedActorConfigurator manager = new TypedActorConfigurator(); manager.configure( - new AllForOnePermanentStrategy(new Class[]{Exception.class}, 3, 1000), + new AllForOneStrategy(new Class[]{Exception.class}, 3, 1000), new SuperviseTypedActor[] { new SuperviseTypedActor( Foo.class, @@ -480,7 +480,7 @@ If the parent TypedActor (supervisor) wants to be able to do handle failing chil .. code-block:: java - TypedActor.faultHandler(supervisor, new AllForOnePermanentStrategy(new Class[]{IOException.class}, 3, 2000)); + TypedActor.faultHandler(supervisor, new AllForOneStrategy(new Class[]{IOException.class}, 3, 2000)); For convenience there is an overloaded link that takes trapExit and faultHandler for the supervisor as arguments. Here is an example: @@ -492,9 +492,9 @@ For convenience there is an overloaded link that takes trapExit and faultHandler foo = newInstance(Foo.class, FooImpl.class, 1000); bar = newInstance(Bar.class, BarImpl.class, 1000); - link(foo, bar, new AllForOnePermanentStrategy(new Class[]{IOException.class}, 3, 2000)); + link(foo, bar, new AllForOneStrategy(new Class[]{IOException.class}, 3, 2000)); // alternative: chaining - bar = faultHandler(foo, new AllForOnePermanentStrategy(new Class[]{IOException.class}, 3, 2000)).newInstance(Bar.class, 1000); + bar = faultHandler(foo, new AllForOneStrategy(new Class[]{IOException.class}, 3, 2000)).newInstance(Bar.class, 1000); link(foo, bar); diff --git a/akka-docs/java/guice-integration.rst b/akka-docs/java/guice-integration.rst index e6671d1fa3..de00b701cb 100644 --- a/akka-docs/java/guice-integration.rst +++ b/akka-docs/java/guice-integration.rst @@ -20,7 +20,7 @@ Here is an example: TypedActorConfigurator manager = new TypedActorConfigurator(); manager.configure( - new AllForOnePermanentStrategy(new Class[]{Exception.class}, 3, 1000), + new AllForOneStrategy(new Class[]{Exception.class}, 3, 1000), new SuperviseTypedActor[] { new SuperviseTypedActor( Foo.class, diff --git a/akka-docs/project/migration-guide-0.10.x-1.0.x.rst b/akka-docs/project/migration-guide-0.10.x-1.0.x.rst index f0950ad057..fbc951229b 100644 --- a/akka-docs/project/migration-guide-0.10.x-1.0.x.rst +++ b/akka-docs/project/migration-guide-0.10.x-1.0.x.rst @@ -177,7 +177,7 @@ to FaultHandlingStrategy: import akka.config.Supervision._ - self.faultHandler = OneForOnePermanentStrategy(List(classOf[Exception]), 3, 5000) + self.faultHandler = OneForOneStrategy(List(classOf[Exception]), 3, 5000) **Java** @@ -185,9 +185,9 @@ to FaultHandlingStrategy: import static akka.Supervision.*; - getContext().setFaultHandler(new OneForOnePermanentStrategy(new Class[] { Exception.class },50,1000)) + getContext().setFaultHandler(new OneForOneStrategy(new Class[] { Exception.class },50,1000)) -**RestartStrategy, AllForOne, OneForOne** have been replaced with **AllForOnePermanentStrategy** and **OneForOnePermanentStrategy** in **se.scalablesolutions.akka.config.Supervision** +**RestartStrategy, AllForOne, OneForOne** have been replaced with **AllForOneStrategy** and **OneForOneStrategy** in **se.scalablesolutions.akka.config.Supervision** **Scala** @@ -195,7 +195,7 @@ to FaultHandlingStrategy: import akka.config.Supervision._ SupervisorConfig( - OneForOnePermanentStrategy(List(classOf[Exception]), 3, 5000), + OneForOneStrategy(List(classOf[Exception]), 3, 5000), Supervise(pingpong1,Permanent) :: Nil ) @@ -206,7 +206,7 @@ to FaultHandlingStrategy: import static akka.Supervision.*; new SupervisorConfig( - new OneForOnePermanentStrategy(new Class[] { Exception.class },50,1000), + new OneForOneStrategy(new Class[] { Exception.class },50,1000), new Server[] { new Supervise(pingpong1, permanent()) } ) diff --git a/akka-docs/scala/fault-tolerance.rst b/akka-docs/scala/fault-tolerance.rst index 74c1a2b7e8..dfdd968c79 100644 --- a/akka-docs/scala/fault-tolerance.rst +++ b/akka-docs/scala/fault-tolerance.rst @@ -82,7 +82,7 @@ Here is an example of how to define a restart strategy: .. code-block:: scala - AllForOnePermanentStrategy( //FaultHandlingStrategy; AllForOnePermanentStrategy or OneForOnePermanentStrategy + AllForOneStrategy( //FaultHandlingStrategy; AllForOneStrategy or OneForOneStrategy List(classOf[Exception]), //What exceptions will be handled 3, // maximum number of restart retries 5000 // within time in millis @@ -116,7 +116,7 @@ The Actor's supervision can be declaratively defined by creating a "Supervisor' val supervisor = Supervisor( SupervisorConfig( - AllForOnePermanentStrategy(List(classOf[Exception]), 3, 1000), + AllForOneStrategy(List(classOf[Exception]), 3, 1000), Supervise( actorOf[MyActor1], Permanent) :: @@ -140,7 +140,7 @@ MaximumNumberOfRestartsWithinTimeRangeReached message. val supervisor = Supervisor( SupervisorConfig( - AllForOnePermanentStrategy(List(classOf[Exception]), 3, 1000), + AllForOneStrategy(List(classOf[Exception]), 3, 1000), Supervise( actorOf[MyActor1], Permanent) :: @@ -166,7 +166,7 @@ Example usage: val factory = SupervisorFactory( SupervisorConfig( - OneForOnePermanentStrategy(List(classOf[Exception]), 3, 10), + OneForOneStrategy(List(classOf[Exception]), 3, 10), Supervise( myFirstActor, Permanent) :: @@ -193,7 +193,7 @@ Here is an example: val supervisor = Supervisor( SupervisorConfig( - AllForOnePermanentStrategy(List(classOf[Exception]), 3, 1000), + AllForOneStrategy(List(classOf[Exception]), 3, 1000), Supervise( actorOf[MyActor1], Permanent, @@ -247,11 +247,11 @@ The supervising Actor also needs to define a fault handler that defines the rest The different options are: -- AllForOnePermanentStrategy(trapExit, maxNrOfRetries, withinTimeRange) +- AllForOneStrategy(trapExit, maxNrOfRetries, withinTimeRange) - trapExit is a List or Array of classes inheriting from Throwable, they signal which types of exceptions this actor will handle -- OneForOnePermanentStrategy(trapExit, maxNrOfRetries, withinTimeRange) +- OneForOneStrategy(trapExit, maxNrOfRetries, withinTimeRange) - trapExit is a List or Array of classes inheriting from Throwable, they signal which types of exceptions this actor will handle @@ -259,14 +259,14 @@ Here is an example: .. code-block:: scala - self.faultHandler = AllForOnePermanentStrategy(List(classOf[Throwable]), 3, 1000) + self.faultHandler = AllForOneStrategy(List(classOf[Throwable]), 3, 1000) Putting all this together it can look something like this: .. code-block:: scala class MySupervisor extends Actor { - self.faultHandler = OneForOnePermanentStrategy(List(classOf[Throwable]), 5, 5000) + self.faultHandler = OneForOneStrategy(List(classOf[Throwable]), 5, 5000) def receive = { case Register(actor) => @@ -340,7 +340,7 @@ If you remember, when you define the 'RestartStrategy' you also defined maximum .. code-block:: scala - AllForOnePermanentStrategy( //Restart policy, AllForOnePermanentStrategy or OneForOnePermanentStrategy + AllForOneStrategy( //Restart policy, AllForOneStrategy or OneForOneStrategy List(classOf[Exception]), //What kinds of exception it will handle 3, // maximum number of restart retries 5000 // within time in millis @@ -362,7 +362,7 @@ Here is an example: .. code-block:: scala val supervisor = actorOf(new Actor{ - self.faultHandler = OneForOnePermanentStrategy(List(classOf[Throwable]), 5, 5000) + self.faultHandler = OneForOneStrategy(List(classOf[Throwable]), 5, 5000) protected def receive = { case MaximumNumberOfRestartsWithinTimeRangeReached( victimActorRef, maxNrOfRetries, withinTimeRange, lastExceptionCausingRestart) => @@ -397,7 +397,7 @@ Here is an example: val manager = new TypedActorConfigurator manager.configure( - AllForOnePermanentStrategy(List(classOf[Exception]), 3, 1000), + AllForOneStrategy(List(classOf[Exception]), 3, 1000), List( SuperviseTypedActor( Foo.class, @@ -435,7 +435,7 @@ If the parent TypedActor (supervisor) wants to be able to do handle failing chil .. code-block:: scala - TypedActor.faultHandler(supervisor, AllForOnePermanentStrategy(Array(classOf[IOException]), 3, 2000)) + TypedActor.faultHandler(supervisor, AllForOneStrategy(Array(classOf[IOException]), 3, 2000)) For convenience there is an overloaded link that takes trapExit and faultHandler for the supervisor as arguments. Here is an example: @@ -446,10 +446,10 @@ For convenience there is an overloaded link that takes trapExit and faultHandler val foo = newInstance(classOf[Foo], 1000) val bar = newInstance(classOf[Bar], 1000) - link(foo, bar, new AllForOnePermanentStrategy(Array(classOf[IOException]), 3, 2000)) + link(foo, bar, new AllForOneStrategy(Array(classOf[IOException]), 3, 2000)) // alternative: chaining - bar = faultHandler(foo, new AllForOnePermanentStrategy(Array(classOf[IOException]), 3, 2000)) + bar = faultHandler(foo, new AllForOneStrategy(Array(classOf[IOException]), 3, 2000)) .newInstance(Bar.class, 1000) link(foo, bar diff --git a/akka-docs/scala/http.rst b/akka-docs/scala/http.rst index 2414b0bc8c..835408ae9b 100644 --- a/akka-docs/scala/http.rst +++ b/akka-docs/scala/http.rst @@ -160,7 +160,7 @@ In this example, we'll use the built-in *RootEndpoint* class and implement our o class Boot { val factory = SupervisorFactory( SupervisorConfig( - OneForOnePermanentStrategy(List(classOf[Exception]), 3, 100), + OneForOneStrategy(List(classOf[Exception]), 3, 100), // // in this particular case, just boot the built-in default root endpoint // diff --git a/akka-docs/scala/tutorial-chat-server.rst b/akka-docs/scala/tutorial-chat-server.rst index 26280aa54f..a256984b73 100644 --- a/akka-docs/scala/tutorial-chat-server.rst +++ b/akka-docs/scala/tutorial-chat-server.rst @@ -270,7 +270,7 @@ I'll try to show you how we can make use Scala's mixins to decouple the Actor im * Chat server. Manages sessions and redirects all other messages to the Session for the client. */ trait ChatServer extends Actor { - self.faultHandler = OneForOnePermanentStrategy(List(classOf[Exception]),5, 5000) + self.faultHandler = OneForOneStrategy(List(classOf[Exception]),5, 5000) val storage: ActorRef EventHandler.info(this, "Chat server is starting up...") diff --git a/akka-remote/src/main/scala/akka/remote/RemoteDaemon.scala b/akka-remote/src/main/scala/akka/remote/RemoteDaemon.scala index cb083d6597..9c06af9545 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteDaemon.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteDaemon.scala @@ -48,7 +48,7 @@ object Remote extends RemoteService { private[remote] lazy val remoteDaemonSupervisor = Supervisor( SupervisorConfig( - OneForOnePermanentStrategy(List(classOf[Exception]), Int.MaxValue, Int.MaxValue), // is infinite restart what we want? + OneForOneStrategy(List(classOf[Exception]), Int.MaxValue, Int.MaxValue), // is infinite restart what we want? Supervise( remoteDaemon, Permanent) diff --git a/akka-samples/akka-sample-chat/src/main/scala/ChatServer.scala b/akka-samples/akka-sample-chat/src/main/scala/ChatServer.scala index bc9de6bd2e..64944760ba 100644 --- a/akka-samples/akka-sample-chat/src/main/scala/ChatServer.scala +++ b/akka-samples/akka-sample-chat/src/main/scala/ChatServer.scala @@ -8,7 +8,7 @@ import akka.actor.{Actor, ActorRef, Props} import akka.stm._ - import akka.config.Supervision.{OneForOnePermanentStrategy,Permanent} + import akka.config.Supervision.{OneForOneStrategy,Permanent} import Actor._ import akka.event.EventHandler @@ -174,7 +174,7 @@ * Chat server. Manages sessions and redirects all other messages to the Session for the client. */ trait ChatServer extends Actor { - //faultHandler = OneForOnePermanentStrategy(List(classOf[Exception]),5, 5000) + //faultHandler = OneForOneStrategy(List(classOf[Exception]),5, 5000) val storage: ActorRef EventHandler.info(this, "Chat server is starting up...") diff --git a/akka-samples/akka-sample-hello/src/main/scala/sample/hello/Boot.scala b/akka-samples/akka-sample-hello/src/main/scala/sample/hello/Boot.scala index 5c2c24f53c..823ae024ee 100644 --- a/akka-samples/akka-sample-hello/src/main/scala/sample/hello/Boot.scala +++ b/akka-samples/akka-sample-hello/src/main/scala/sample/hello/Boot.scala @@ -12,7 +12,7 @@ class Boot { val factory = SupervisorFactory( SupervisorConfig( - OneForOnePermanentStrategy(List(classOf[Exception]), 3, 100), + OneForOneStrategy(List(classOf[Exception]), 3, 100), Supervise(Actor.actorOf[RootEndpoint], Permanent) :: Supervise(Actor.actorOf[HelloEndpoint], Permanent) :: Nil)) diff --git a/akka-spring/src/main/scala/akka/spring/SupervisionBeanDefinitionParser.scala b/akka-spring/src/main/scala/akka/spring/SupervisionBeanDefinitionParser.scala index 087db8e0ed..3979efab60 100644 --- a/akka-spring/src/main/scala/akka/spring/SupervisionBeanDefinitionParser.scala +++ b/akka-spring/src/main/scala/akka/spring/SupervisionBeanDefinitionParser.scala @@ -51,9 +51,9 @@ class SupervisionBeanDefinitionParser extends AbstractSingleBeanDefinitionParser val trapExceptions = parseTrapExits(trapExitsElement) val restartStrategy = failover match { - case "AllForOne" ⇒ new AllForOnePermanentStrategy(trapExceptions, retries, timeRange) - case "OneForOne" ⇒ new OneForOnePermanentStrategy(trapExceptions, retries, timeRange) - case _ ⇒ new OneForOnePermanentStrategy(trapExceptions, retries, timeRange) //Default to OneForOne + case "AllForOne" ⇒ new AllForOneStrategy(trapExceptions, retries, timeRange) + case "OneForOne" ⇒ new OneForOneStrategy(trapExceptions, retries, timeRange) + case _ ⇒ new OneForOneStrategy(trapExceptions, retries, timeRange) //Default to OneForOne } builder.addPropertyValue("restartStrategy", restartStrategy) } diff --git a/akka-spring/src/test/scala/SupervisionBeanDefinitionParserTest.scala b/akka-spring/src/test/scala/SupervisionBeanDefinitionParserTest.scala index c74731dbaf..2dc0445005 100644 --- a/akka-spring/src/test/scala/SupervisionBeanDefinitionParserTest.scala +++ b/akka-spring/src/test/scala/SupervisionBeanDefinitionParserTest.scala @@ -11,7 +11,7 @@ import ScalaDom._ import org.w3c.dom.Element import org.springframework.beans.factory.support.BeanDefinitionBuilder -import akka.config.Supervision.{ FaultHandlingStrategy, AllForOnePermanentStrategy } +import akka.config.Supervision.{ FaultHandlingStrategy, AllForOneStrategy } /** * Test for SupervisionBeanDefinitionParser @@ -36,9 +36,9 @@ class SupervisionBeanDefinitionParserTest extends Spec with ShouldMatchers { parser.parseSupervisor(createSupervisorElement, builder); val strategy = builder.getBeanDefinition.getPropertyValues.getPropertyValue("restartStrategy").getValue.asInstanceOf[FaultHandlingStrategy] assert(strategy ne null) - assert(strategy.isInstanceOf[AllForOnePermanentStrategy]) - expect(3) { strategy.asInstanceOf[AllForOnePermanentStrategy].maxNrOfRetries.get } - expect(1000) { strategy.asInstanceOf[AllForOnePermanentStrategy].withinTimeRange.get } + assert(strategy.isInstanceOf[AllForOneStrategy]) + expect(3) { strategy.asInstanceOf[AllForOneStrategy].maxNrOfRetries.get } + expect(1000) { strategy.asInstanceOf[AllForOneStrategy].withinTimeRange.get } } it("should parse the supervised typed actors") { diff --git a/akka-spring/src/test/scala/SupervisionFactoryBeanTest.scala b/akka-spring/src/test/scala/SupervisionFactoryBeanTest.scala index 760b660d2f..f17e2cc92c 100644 --- a/akka-spring/src/test/scala/SupervisionFactoryBeanTest.scala +++ b/akka-spring/src/test/scala/SupervisionFactoryBeanTest.scala @@ -15,7 +15,7 @@ private[akka] class Foo @RunWith(classOf[JUnitRunner]) class SupervisionFactoryBeanTest extends Spec with ShouldMatchers { - val faultHandlingStrategy = new AllForOnePermanentStrategy(List(classOf[Exception]), 3, 1000) + val faultHandlingStrategy = new AllForOneStrategy(List(classOf[Exception]), 3, 1000) val typedActors = List(createTypedActorProperties("akka.spring.Foo", "1000")) private def createTypedActorProperties(target: String, timeout: String): ActorProperties = { diff --git a/akka-testkit/src/test/scala/akka/testkit/TestActorRefSpec.scala b/akka-testkit/src/test/scala/akka/testkit/TestActorRefSpec.scala index ad826bce09..4176937159 100644 --- a/akka-testkit/src/test/scala/akka/testkit/TestActorRefSpec.scala +++ b/akka-testkit/src/test/scala/akka/testkit/TestActorRefSpec.scala @@ -6,7 +6,6 @@ package akka.testkit import org.scalatest.matchers.MustMatchers import org.scalatest.{ BeforeAndAfterEach, WordSpec } import akka.actor._ -import akka.config.Supervision.OneForOnePermanentStrategy import akka.event.EventHandler import akka.dispatch.{ Future, Promise } @@ -182,7 +181,7 @@ class TestActorRefSpec extends WordSpec with MustMatchers with BeforeAndAfterEac }).withSupervisor(self)) def receiveT = { case "sendKill" ⇒ ref ! Kill } - }).withFaultHandler(OneForOnePermanentStrategy(List(classOf[ActorKilledException]), 5, 1000))) + }).withFaultHandler(OneForOneStrategy(List(classOf[ActorKilledException]), 5, 1000))) boss ! "sendKill" diff --git a/akka-testkit/src/test/scala/akka/testkit/TestProbeSpec.scala b/akka-testkit/src/test/scala/akka/testkit/TestProbeSpec.scala index 5a64d16be7..54a186781f 100644 --- a/akka-testkit/src/test/scala/akka/testkit/TestProbeSpec.scala +++ b/akka-testkit/src/test/scala/akka/testkit/TestProbeSpec.scala @@ -4,7 +4,6 @@ import org.scalatest.WordSpec import org.scalatest.matchers.MustMatchers import org.scalatest.{ BeforeAndAfterEach, WordSpec } import akka.actor._ -import akka.config.Supervision.OneForOnePermanentStrategy import akka.event.EventHandler import akka.dispatch.Future import akka.util.duration._