diff --git a/akka-actor-tests/src/test/java/akka/config/SupervisionConfig.java b/akka-actor-tests/src/test/java/akka/config/SupervisionConfig.java index 97605a4a79..47daf4e506 100644 --- a/akka-actor-tests/src/test/java/akka/config/SupervisionConfig.java +++ b/akka-actor-tests/src/test/java/akka/config/SupervisionConfig.java @@ -18,6 +18,6 @@ public class SupervisionConfig { } - return new SupervisorConfig(new AllForOneStrategy(new Class[] { Exception.class }, 50, 1000), targets.toArray(new Server[targets.size()])); + return new SupervisorConfig(new AllForOnePermanentStrategy(new Class[] { Exception.class }, 50, 1000), targets.toArray(new Server[targets.size()])); } } diff --git a/akka-actor-tests/src/test/scala/akka/actor/ActorFireForgetRequestReplySpec.scala b/akka-actor-tests/src/test/scala/akka/actor/ActorFireForgetRequestReplySpec.scala index 8d4d4e94e6..4f5dded329 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/ActorFireForgetRequestReplySpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/ActorFireForgetRequestReplySpec.scala @@ -84,12 +84,14 @@ class ActorFireForgetRequestReplySpec extends WordSpec with MustMatchers with Be "should shutdown crashed temporary actor" in { filterEvents(EventFilter[Exception]("Expected")) { - val actor = actorOf(Props[CrashingActor].withLifeCycle(Temporary)) + val supervisor = actorOf(Props(self ⇒ { case _ ⇒ }).withFaultHandler(OneForOneTemporaryStrategy(List(classOf[Exception])))) + val actor = actorOf(Props[CrashingActor].withSupervisor(supervisor)) actor.isRunning must be(true) actor ! "Die" state.finished.await sleepFor(1 second) actor.isShutdown must be(true) + supervisor.stop() } } } diff --git a/akka-actor-tests/src/test/scala/akka/actor/ActorRefSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/ActorRefSpec.scala index 6a5d79a891..b77283cefe 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/ActorRefSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/ActorRefSpec.scala @@ -10,7 +10,7 @@ import org.scalatest.matchers.MustMatchers import akka.testkit._ import akka.util.duration._ import akka.testkit.Testing.sleepFor -import akka.config.Supervision.{ OneForOneStrategy } +import akka.config.Supervision.{ OneForOnePermanentStrategy } import akka.dispatch.Future import java.util.concurrent.{ TimeUnit, CountDownLatch } import java.lang.IllegalStateException @@ -338,7 +338,7 @@ class ActorRefSpec extends WordSpec with MustMatchers { }).withSupervisor(self)) protected def receive = { case "sendKill" ⇒ ref ! Kill } - }).withFaultHandler(OneForOneStrategy(List(classOf[Throwable]), 2, 1000))) + }).withFaultHandler(OneForOnePermanentStrategy(List(classOf[Throwable]), 2, 1000))) boss ! "sendKill" latch.await(5, TimeUnit.SECONDS) must be === true diff --git a/akka-actor-tests/src/test/scala/akka/actor/ActorRestartSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/ActorRestartSpec.scala index acc280708a..ce94adff72 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/ActorRestartSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/ActorRestartSpec.scala @@ -10,7 +10,7 @@ import org.scalatest.matchers.MustMatchers import Actor.actorOf import akka.testkit._ import akka.util.duration._ -import akka.config.Supervision.OneForOneStrategy +import akka.config.Supervision.OneForOnePermanentStrategy import java.util.concurrent.atomic._ @@ -85,7 +85,7 @@ class ActorRestartSpec extends WordSpec with MustMatchers with TestKit with Befo } private def createSupervisor = - actorOf(Props[Supervisor].withFaultHandler(OneForOneStrategy(List(classOf[Throwable]), 5, 5000))) + actorOf(Props[Supervisor].withFaultHandler(OneForOnePermanentStrategy(List(classOf[Throwable]), 5, 5000))) val expectedEvents = Seq(EventFilter[ActorKilledException], EventFilter[IllegalActorStateException]("expected")) diff --git a/akka-actor-tests/src/test/scala/akka/actor/FSMTransitionSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/FSMTransitionSpec.scala index 3d1fb82615..7222e1b073 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/FSMTransitionSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/FSMTransitionSpec.scala @@ -62,7 +62,7 @@ class FSMTransitionSpec extends WordSpec with MustMatchers with TestKit { "not fail when listener goes away" in { val forward = Actor.actorOf(new Forwarder(testActor)) val fsm = Actor.actorOf(new MyFSM(testActor)) - val sup = Actor.actorOf(Props[Supervisor].withFaultHandler(OneForOneStrategy(List(classOf[Throwable]), None, None))) + val sup = Actor.actorOf(Props[Supervisor].withFaultHandler(OneForOnePermanentStrategy(List(classOf[Throwable]), None, None))) sup link fsm within(300 millis) { fsm ! SubscribeTransitionCallBack(forward) diff --git a/akka-actor-tests/src/test/scala/akka/actor/LoggingReceiveSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/LoggingReceiveSpec.scala index 00f9ed3115..22b7e43075 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/LoggingReceiveSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/LoggingReceiveSpec.scala @@ -133,7 +133,7 @@ class LoggingReceiveSpec within(2 seconds) { val supervisor = TestActorRef(Props(new Actor { def receive = { case _ ⇒ } - }).withFaultHandler(OneForOneStrategy(List(classOf[Throwable]), 5, 5000))) + }).withFaultHandler(OneForOnePermanentStrategy(List(classOf[Throwable]), 5, 5000))) val f = Actor.getClass.getDeclaredField("debugLifecycle") f.setAccessible(true) f.setBoolean(Actor, true) diff --git a/akka-actor-tests/src/test/scala/akka/actor/RestartStrategySpec.scala b/akka-actor-tests/src/test/scala/akka/actor/RestartStrategySpec.scala index add6aab744..0493ef2cbf 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/RestartStrategySpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/RestartStrategySpec.scala @@ -15,7 +15,7 @@ import akka.testkit.EventFilter import Actor._ import java.util.concurrent.{ TimeUnit, CountDownLatch } -import akka.config.Supervision.{ Permanent, LifeCycle, OneForOneStrategy } +import akka.config.Supervision.{ Permanent, LifeCycle, OneForOnePermanentStrategy } import org.multiverse.api.latches.StandardLatch class RestartStrategySpec extends JUnitSuite with BeforeAndAfterAll { @@ -36,7 +36,7 @@ class RestartStrategySpec extends JUnitSuite with BeforeAndAfterAll { val boss = actorOf(Props(new Actor { protected def receive = { case _ ⇒ () } - }).withFaultHandler(OneForOneStrategy(List(classOf[Throwable]), 2, 1000))) + }).withFaultHandler(OneForOnePermanentStrategy(List(classOf[Throwable]), 2, 1000))) val restartLatch = new StandardLatch val secondRestartLatch = new StandardLatch @@ -88,7 +88,7 @@ class RestartStrategySpec extends JUnitSuite with BeforeAndAfterAll { val boss = actorOf(Props(new Actor { def receive = { case _ ⇒ () } - }).withFaultHandler(OneForOneStrategy(List(classOf[Throwable]), None, None))) + }).withFaultHandler(OneForOnePermanentStrategy(List(classOf[Throwable]), None, None))) val countDownLatch = new CountDownLatch(100) @@ -113,7 +113,7 @@ class RestartStrategySpec extends JUnitSuite with BeforeAndAfterAll { val boss = actorOf(Props(new Actor { def receive = { case _ ⇒ () } - }).withFaultHandler(OneForOneStrategy(List(classOf[Throwable]), 2, 500))) + }).withFaultHandler(OneForOnePermanentStrategy(List(classOf[Throwable]), 2, 500))) val restartLatch = new StandardLatch val secondRestartLatch = new StandardLatch @@ -172,7 +172,7 @@ class RestartStrategySpec extends JUnitSuite with BeforeAndAfterAll { def slaveShouldNotRestartAfterMaxRetries = { val boss = actorOf(Props(new Actor { def receive = { case _ ⇒ () } - }).withFaultHandler(OneForOneStrategy(List(classOf[Throwable]), Some(2), None))) + }).withFaultHandler(OneForOnePermanentStrategy(List(classOf[Throwable]), Some(2), None))) val restartLatch = new StandardLatch val secondRestartLatch = new StandardLatch @@ -229,7 +229,7 @@ class RestartStrategySpec extends JUnitSuite with BeforeAndAfterAll { val boss = actorOf(Props(new Actor { def receive = { case m: MaximumNumberOfRestartsWithinTimeRangeReached ⇒ maxNoOfRestartsLatch.open } - }).withFaultHandler(OneForOneStrategy(List(classOf[Throwable]), None, Some(1000)))) + }).withFaultHandler(OneForOnePermanentStrategy(List(classOf[Throwable]), None, Some(1000)))) val slave = actorOf(Props(new Actor { diff --git a/akka-actor-tests/src/test/scala/akka/actor/SchedulerSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/SchedulerSpec.scala index ff3e18b768..3413575e95 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/SchedulerSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/SchedulerSpec.scala @@ -129,7 +129,7 @@ class SchedulerSpec extends JUnitSuite { Supervisor( SupervisorConfig( - AllForOneStrategy(List(classOf[Exception]), 3, 1000), + AllForOnePermanentStrategy(List(classOf[Exception]), 3, 1000), Supervise( actor, Permanent) diff --git a/akka-actor-tests/src/test/scala/akka/actor/SupervisorHierarchySpec.scala b/akka-actor-tests/src/test/scala/akka/actor/SupervisorHierarchySpec.scala index df40fdddc0..3d276d400e 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/SupervisorHierarchySpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/SupervisorHierarchySpec.scala @@ -8,7 +8,7 @@ import org.scalatest.junit.JUnitSuite import org.junit.Test import Actor._ -import akka.config.Supervision.OneForOneStrategy +import akka.config.Supervision.OneForOnePermanentStrategy import java.util.concurrent.{ TimeUnit, CountDownLatch } @@ -28,15 +28,13 @@ class SupervisorHierarchySpec extends JUnitSuite { def killWorkerShouldRestartMangerAndOtherWorkers = { val countDown = new CountDownLatch(4) - val boss = actorOf(Props(new Actor { - protected def receive = { case _ ⇒ () } - }).withFaultHandler(OneForOneStrategy(List(classOf[Throwable]), 5, 1000))) + val boss = actorOf(Props(self ⇒ { case _ ⇒ }).withFaultHandler(OneForOnePermanentStrategy(List(classOf[Throwable]), 5, 1000))) - val manager = actorOf(Props(new CountDownActor(countDown)).withSupervisor(boss)) + val manager = actorOf(Props(new CountDownActor(countDown)).withFaultHandler(OneForOnePermanentStrategy(List(), None, None)).withSupervisor(boss)) val workerOne, workerTwo, workerThree = actorOf(Props(new CountDownActor(countDown)).withSupervisor(manager)) - workerOne ! Death(workerOne, new FireWorkerException("Fire the worker!"), true) + manager ! Death(workerOne, new FireWorkerException("Fire the worker!"), true) // manager + all workers should be restarted by only killing a worker // manager doesn't trap exits, so boss will restart manager @@ -52,7 +50,7 @@ class SupervisorHierarchySpec extends JUnitSuite { protected def receive = { case MaximumNumberOfRestartsWithinTimeRangeReached(_, _, _, _) ⇒ countDownMax.countDown() } - }).withFaultHandler(OneForOneStrategy(List(classOf[Throwable]), 1, 5000))) + }).withFaultHandler(OneForOnePermanentStrategy(List(classOf[Throwable]), 1, 5000))) val crasher = actorOf(Props(new CountDownActor(countDownMessages)).withSupervisor(boss)) diff --git a/akka-actor-tests/src/test/scala/akka/actor/SupervisorMiscSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/SupervisorMiscSpec.scala index 0c8e606247..5bbdc5f06f 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/SupervisorMiscSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/SupervisorMiscSpec.scala @@ -5,7 +5,7 @@ package akka.actor import org.scalatest.WordSpec import org.scalatest.matchers.MustMatchers -import akka.config.Supervision.{ SupervisorConfig, OneForOneStrategy, Supervise, Permanent } +import akka.config.Supervision.{ SupervisorConfig, OneForOnePermanentStrategy, Supervise, Permanent } import java.util.concurrent.CountDownLatch import akka.testkit.{ filterEvents, EventFilter } import akka.dispatch.{ PinnedDispatcher, Dispatchers } @@ -55,7 +55,7 @@ class SupervisorMiscSpec extends WordSpec with MustMatchers { val sup = Supervisor( SupervisorConfig( - OneForOneStrategy(List(classOf[Exception]), 3, 5000), + OneForOnePermanentStrategy(List(classOf[Exception]), 3, 5000), Supervise(actor1, Permanent) :: Supervise(actor2, Permanent) :: Supervise(actor3, Permanent) :: diff --git a/akka-actor-tests/src/test/scala/akka/actor/SupervisorSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/SupervisorSpec.scala index be142bcf7f..2e61d7cf4a 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/SupervisorSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/SupervisorSpec.scala @@ -57,7 +57,7 @@ object SupervisorSpec { class Master extends Actor { - val temp = actorOf(Props[PingPongActor].withLifeCycle(Temporary).withSupervisor(self)) + val temp = actorOf(Props[PingPongActor].withSupervisor(self)) override def receive = { case Die ⇒ (temp.?(Die, TimeoutMillis)).get @@ -70,11 +70,11 @@ object SupervisorSpec { // ===================================================== def temporaryActorAllForOne = { - val temporaryActor = actorOf(Props[PingPongActor].withLifeCycle(Temporary)) + val temporaryActor = actorOf(Props[PingPongActor]) val supervisor = Supervisor( SupervisorConfig( - AllForOneStrategy(List(classOf[Exception]), 3, TimeoutMillis), + AllForOneTemporaryStrategy(List(classOf[Exception])), Supervise( temporaryActor, Temporary) @@ -88,7 +88,7 @@ object SupervisorSpec { val supervisor = Supervisor( SupervisorConfig( - AllForOneStrategy(List(classOf[Exception]), 3, TimeoutMillis), + AllForOnePermanentStrategy(List(classOf[Exception]), 3, TimeoutMillis), Supervise( pingpong, Permanent) @@ -102,7 +102,7 @@ object SupervisorSpec { val supervisor = Supervisor( SupervisorConfig( - OneForOneStrategy(List(classOf[Exception]), 3, TimeoutMillis), + OneForOnePermanentStrategy(List(classOf[Exception]), 3, TimeoutMillis), Supervise( pingpong, Permanent) @@ -118,7 +118,7 @@ object SupervisorSpec { val supervisor = Supervisor( SupervisorConfig( - AllForOneStrategy(List(classOf[Exception]), 3, TimeoutMillis), + AllForOnePermanentStrategy(List(classOf[Exception]), 3, TimeoutMillis), Supervise( pingpong1, Permanent) @@ -142,7 +142,7 @@ object SupervisorSpec { val supervisor = Supervisor( SupervisorConfig( - OneForOneStrategy(List(classOf[Exception]), 3, TimeoutMillis), + OneForOnePermanentStrategy(List(classOf[Exception]), 3, TimeoutMillis), Supervise( pingpong1, Permanent) @@ -166,13 +166,13 @@ object SupervisorSpec { val supervisor = Supervisor( SupervisorConfig( - AllForOneStrategy(List(classOf[Exception]), 3, TimeoutMillis), + AllForOnePermanentStrategy(List(classOf[Exception]), 3, TimeoutMillis), Supervise( pingpong1, Permanent) :: SupervisorConfig( - AllForOneStrategy(Nil, 3, TimeoutMillis), + AllForOnePermanentStrategy(Nil, 3, TimeoutMillis), Supervise( pingpong2, Permanent) @@ -217,7 +217,7 @@ class SupervisorSpec extends WordSpec with MustMatchers with BeforeAndAfterEach "A supervisor" must { "not restart programmatically linked temporary actor" in { - val master = actorOf(Props[Master].withFaultHandler(OneForOneStrategy(List(classOf[Exception]), 5, (1 second).dilated.toMillis.toInt))) + val master = actorOf(Props[Master].withFaultHandler(OneForOneTemporaryStrategy(List(classOf[Exception])))) intercept[RuntimeException] { (master.?(Die, TimeoutMillis)).get @@ -374,7 +374,7 @@ class SupervisorSpec extends WordSpec with MustMatchers with BeforeAndAfterEach val supervisor = Supervisor( SupervisorConfig( - OneForOneStrategy(classOf[Exception] :: Nil, 3, 10000), + OneForOnePermanentStrategy(classOf[Exception] :: Nil, 3, 10000), Supervise(dyingActor, Permanent) :: Nil)) intercept[RuntimeException] { diff --git a/akka-actor-tests/src/test/scala/akka/actor/SupervisorTreeSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/SupervisorTreeSpec.scala index ab90c2b524..f5fa94ca42 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/SupervisorTreeSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/SupervisorTreeSpec.scala @@ -10,7 +10,7 @@ import akka.util.duration._ import akka.testkit.Testing.sleepFor import akka.testkit.{ EventFilter, filterEvents, filterException } import akka.dispatch.Dispatchers -import akka.config.Supervision.{ SupervisorConfig, OneForOneStrategy, Supervise, Permanent } +import akka.config.Supervision.{ SupervisorConfig, OneForOnePermanentStrategy, Supervise, Permanent } import Actor._ class SupervisorTreeSpec extends WordSpec with MustMatchers { @@ -35,7 +35,7 @@ class SupervisorTreeSpec extends WordSpec with MustMatchers { filterException[Exception] { log = "INIT" - val p = Props.default.withFaultHandler(OneForOneStrategy(List(classOf[Exception]), 3, 1000)) + val p = Props.default.withFaultHandler(OneForOnePermanentStrategy(List(classOf[Exception]), 3, 1000)) val lastActor = actorOf(p.withCreator(new Chainer(None)), "lastActor") val middleActor = actorOf(p.withCreator(new Chainer(Some(lastActor))), "middleActor") val headActor = actorOf(p.withCreator(new Chainer(Some(middleActor))), "headActor") diff --git a/akka-actor-tests/src/test/scala/akka/actor/Ticket669Spec.scala b/akka-actor-tests/src/test/scala/akka/actor/Ticket669Spec.scala index f4727cf364..4d9f6cc546 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/Ticket669Spec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/Ticket669Spec.scala @@ -29,7 +29,7 @@ class Ticket669Spec extends WordSpec with MustMatchers with BeforeAndAfterAll { val supervised = Actor.actorOf[Supervised] val supervisor = Supervisor(SupervisorConfig( - AllForOneStrategy(List(classOf[Exception]), 5, 10000), + AllForOnePermanentStrategy(List(classOf[Exception]), 5, 10000), Supervise(supervised, Permanent) :: Nil)) supervised.!("test")(Some(sender)) @@ -44,7 +44,7 @@ class Ticket669Spec extends WordSpec with MustMatchers with BeforeAndAfterAll { val supervised = Actor.actorOf[Supervised] val supervisor = Supervisor(SupervisorConfig( - AllForOneStrategy(List(classOf[Exception]), 5, 10000), + AllForOnePermanentStrategy(List(classOf[Exception]), 5, 10000), Supervise(supervised, Temporary) :: Nil)) supervised.!("test")(Some(sender)) diff --git a/akka-actor-tests/src/test/scala/akka/routing/ActorPoolSpec.scala b/akka-actor-tests/src/test/scala/akka/routing/ActorPoolSpec.scala index 9dc51b2bc5..9fdbbb2c70 100644 --- a/akka-actor-tests/src/test/scala/akka/routing/ActorPoolSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/routing/ActorPoolSpec.scala @@ -3,13 +3,13 @@ package akka.routing import org.scalatest.WordSpec import org.scalatest.matchers.MustMatchers import akka.dispatch.{ KeptPromise, Future } -import java.util.concurrent.atomic.AtomicInteger import akka.actor.Actor._ import akka.testkit.Testing._ import akka.actor.{ TypedActor, Actor, Props } import akka.testkit.{ TestLatch, filterEvents, EventFilter, filterException } import akka.util.duration._ -import akka.config.Supervision.OneForOneStrategy +import akka.config.Supervision.OneForOnePermanentStrategy +import java.util.concurrent.atomic.{ AtomicBoolean, AtomicInteger } object ActorPoolSpec { @@ -24,7 +24,7 @@ object ActorPoolSpec { } } - val faultHandler = OneForOneStrategy(List(classOf[Exception]), 5, 1000) + val faultHandler = OneForOnePermanentStrategy(List(classOf[Exception]), 5, 1000) } class ActorPoolSpec extends WordSpec with MustMatchers { @@ -364,58 +364,10 @@ class ActorPoolSpec extends WordSpec with MustMatchers { import akka.config.Supervision._ val pingCount = new AtomicInteger(0) val deathCount = new AtomicInteger(0) - var keepDying = false + val keepDying = new AtomicBoolean(false) val pool1 = actorOf( Props(new Actor with DefaultActorPool with BoundedCapacityStrategy with ActiveFuturesPressureCapacitor with SmallestMailboxSelector with BasicFilter { - def lowerBound = 2 - def upperBound = 5 - def rampupRate = 0.1 - def backoffRate = 0.1 - def backoffThreshold = 0.5 - def partialFill = true - def selectionCount = 1 - def instance = factory - def receive = _route - def pressureThreshold = 1 - def factory = actorOf(new Actor { - if (deathCount.get > 5) deathCount.set(0) - if (deathCount.get > 0) { deathCount.incrementAndGet; throw new IllegalStateException("keep dying") } - def receive = { - case akka.Die ⇒ - if (keepDying) deathCount.incrementAndGet - throw new RuntimeException - case _ ⇒ pingCount.incrementAndGet - } - }) - }).withFaultHandler(faultHandler)) - - val pool2 = actorOf( - Props(new Actor with DefaultActorPool with BoundedCapacityStrategy with ActiveFuturesPressureCapacitor with SmallestMailboxSelector with BasicFilter { - def lowerBound = 2 - def upperBound = 5 - def rampupRate = 0.1 - def backoffRate = 0.1 - def backoffThreshold = 0.5 - def partialFill = true - def selectionCount = 1 - def instance = factory - def receive = _route - def pressureThreshold = 1 - def factory = actorOf(new Actor { - if (deathCount.get > 5) deathCount.set(0) - if (deathCount.get > 0) { deathCount.incrementAndGet; throw new IllegalStateException("keep dying") } - def receive = { - case akka.Die ⇒ - if (keepDying) deathCount.incrementAndGet - throw new RuntimeException - case _ ⇒ pingCount.incrementAndGet - } - }) - }).withFaultHandler(faultHandler)) - - val pool3 = actorOf( - Props(new Actor with DefaultActorPool with BoundedCapacityStrategy with ActiveFuturesPressureCapacitor with RoundRobinSelector with BasicFilter { def lowerBound = 2 def upperBound = 5 def rampupRate = 0.1 @@ -431,17 +383,74 @@ class ActorPoolSpec extends WordSpec with MustMatchers { if (deathCount.get > 0) { deathCount.incrementAndGet; throw new IllegalStateException("keep dying") } def receive = { case akka.Die ⇒ - if (keepDying) deathCount.incrementAndGet + if (keepDying.get) deathCount.incrementAndGet throw new RuntimeException case _ ⇒ pingCount.incrementAndGet } - }).withLifeCycle(Temporary)) + })) }).withFaultHandler(faultHandler)) + val pool2 = actorOf( + Props(new Actor with DefaultActorPool with BoundedCapacityStrategy with ActiveFuturesPressureCapacitor with SmallestMailboxSelector with BasicFilter { + def lowerBound = 2 + def upperBound = 5 + def rampupRate = 0.1 + def backoffRate = 0.1 + def backoffThreshold = 0.5 + def partialFill = true + def selectionCount = 1 + def instance = factory + def receive = _route + def pressureThreshold = 1 + def factory = actorOf(Props(new Actor { + if (deathCount.get > 5) deathCount.set(0) + if (deathCount.get > 0) { deathCount.incrementAndGet; throw new IllegalStateException("keep dying") } + def receive = { + case akka.Die ⇒ + if (keepDying.get) deathCount.incrementAndGet + throw new RuntimeException + case _ ⇒ pingCount.incrementAndGet + } + })) + }).withFaultHandler(faultHandler)) + + val pool3 = actorOf( + Props(new Actor with DefaultActorPool with BoundedCapacityStrategy with ActiveFuturesPressureCapacitor with RoundRobinSelector with BasicFilter { + def lowerBound = 2 + def upperBound = 5 + def rampupRate = 0.1 + def backoffRate = 0.1 + def backoffThreshold = 0.5 + def partialFill = true + def selectionCount = 1 + def instance = factory + def receive = _route + def pressureThreshold = 1 + def factory = actorOf(Props(new Actor { + + System.err.println("Going up: " + self) + + if (deathCount.get > 5) deathCount.set(0) + if (deathCount.get > 0) { deathCount.incrementAndGet; throw new IllegalStateException("keep dying") } + def receive = { + case akka.Die ⇒ + if (keepDying.get) + deathCount.incrementAndGet + + throw new RuntimeException + case _ ⇒ pingCount.incrementAndGet + } + + override def postStop() { + System.err.println("Going down: " + self) + } + })) + }).withFaultHandler(OneForOneTemporaryStrategy(List(classOf[Exception])))) + // default lifecycle // actor comes back right away pingCount.set(0) - keepDying = false + keepDying.set(false) pool1 ! "ping" (pool1 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2) pool1 ! akka.Die @@ -452,7 +461,7 @@ class ActorPoolSpec extends WordSpec with MustMatchers { // default lifecycle // actor dies completely pingCount.set(0) - keepDying = true + keepDying.set(true) pool1 ! "ping" (pool1 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2) pool1 ! akka.Die @@ -465,7 +474,7 @@ class ActorPoolSpec extends WordSpec with MustMatchers { // permanent lifecycle // actor comes back right away pingCount.set(0) - keepDying = false + keepDying.set(false) pool2 ! "ping" (pool2 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2) pool2 ! akka.Die @@ -476,7 +485,7 @@ class ActorPoolSpec extends WordSpec with MustMatchers { // permanent lifecycle // actor dies completely pingCount.set(0) - keepDying = true + keepDying.set(true) pool2 ! "ping" (pool2 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2) pool2 ! akka.Die @@ -488,7 +497,7 @@ class ActorPoolSpec extends WordSpec with MustMatchers { // temporary lifecycle pingCount.set(0) - keepDying = false + keepDying.set(false) pool3 ! "ping" (pool3 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2) pool3 ! akka.Die @@ -507,7 +516,7 @@ class ActorPoolSpec extends WordSpec with MustMatchers { import akka.config.Supervision._ val pingCount = new AtomicInteger(0) val deathCount = new AtomicInteger(0) - var keepDying = false + var keepDying = new AtomicBoolean(false) object BadState @@ -528,18 +537,18 @@ class ActorPoolSpec extends WordSpec with MustMatchers { if (deathCount.get > 0) { deathCount.incrementAndGet; throw new IllegalStateException("keep dying") } def receive = { case BadState ⇒ - if (keepDying) deathCount.incrementAndGet + if (keepDying.get) deathCount.incrementAndGet throw new IllegalStateException case akka.Die ⇒ throw new RuntimeException case _ ⇒ pingCount.incrementAndGet } }) - }).withFaultHandler(OneForOneStrategy(List(classOf[IllegalStateException]), 5, 1000))) + }).withFaultHandler(OneForOnePermanentStrategy(List(classOf[IllegalStateException]), 5, 1000))) // actor comes back right away pingCount.set(0) - keepDying = false + keepDying.set(false) pool1 ! "ping" (pool1 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2) pool1 ! BadState @@ -549,7 +558,7 @@ class ActorPoolSpec extends WordSpec with MustMatchers { // actor dies completely pingCount.set(0) - keepDying = true + keepDying.set(true) pool1 ! "ping" (pool1 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2) pool1 ! BadState diff --git a/akka-actor-tests/src/test/scala/akka/ticket/Ticket703Spec.scala b/akka-actor-tests/src/test/scala/akka/ticket/Ticket703Spec.scala index f024546e15..a24c98a7b6 100644 --- a/akka-actor-tests/src/test/scala/akka/ticket/Ticket703Spec.scala +++ b/akka-actor-tests/src/test/scala/akka/ticket/Ticket703Spec.scala @@ -5,7 +5,7 @@ import akka.actor._ import akka.routing._ import org.scalatest.WordSpec import org.scalatest.matchers.MustMatchers -import akka.config.Supervision.OneForOneStrategy +import akka.config.Supervision.OneForOnePermanentStrategy class Ticket703Spec extends WordSpec with MustMatchers { @@ -28,7 +28,7 @@ class Ticket703Spec extends WordSpec with MustMatchers { self.tryReply("Response") } }) - }).withFaultHandler(OneForOneStrategy(List(classOf[Exception]), 5, 1000))) + }).withFaultHandler(OneForOnePermanentStrategy(List(classOf[Exception]), 5, 1000))) (actorPool.?("Ping", 7000)).await.result must be === Some("Response") } } diff --git a/akka-actor/src/main/scala/akka/actor/Actor.scala b/akka-actor/src/main/scala/akka/actor/Actor.scala index d610ca6102..dca202fb71 100644 --- a/akka-actor/src/main/scala/akka/actor/Actor.scala +++ b/akka-actor/src/main/scala/akka/actor/Actor.scala @@ -591,10 +591,11 @@ trait Actor { * User overridable callback. *
* Is called when a message isn't handled by the current behavior of the actor - * by default it throws an UnhandledMessageException + * by default it does: EventHandler.warning(self, message) */ - def unhandled(msg: Any) { - throw new UnhandledMessageException(msg, self) + def unhandled(message: Any) { + //EventHandler.warning(self, message) + throw new UnhandledMessageException(message, self) } /** diff --git a/akka-actor/src/main/scala/akka/actor/ActorRef.scala b/akka-actor/src/main/scala/akka/actor/ActorRef.scala index 5062d32dcc..4d008f3796 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRef.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRef.scala @@ -50,8 +50,7 @@ object Props { final val defaultDeployId: String = "" final val defaultDispatcher: MessageDispatcher = Dispatchers.defaultGlobalDispatcher final val defaultTimeout: Timeout = Timeout(Duration(Actor.TIMEOUT, "millis")) - final val defaultLifeCycle: LifeCycle = Permanent - final val defaultFaultHandler: FaultHandlingStrategy = NoFaultHandlingStrategy + final val defaultFaultHandler: FaultHandlingStrategy = AllForOnePermanentStrategy(classOf[Exception] :: Nil, None, None) final val defaultSupervisor: Option[ActorRef] = None /** @@ -90,8 +89,7 @@ object Props { */ def apply(creator: Creator[_ <: Actor]): Props = default.withCreator(creator.create) - def apply(behavior: (ScalaActorRef with SelfActorRef) ⇒ Actor.Receive): Props = - apply(new Actor { def receive = behavior(self) }) + def apply(behavior: (ScalaActorRef with SelfActorRef) ⇒ Actor.Receive): Props = apply(new Actor { def receive = behavior(self) }) } /** @@ -101,7 +99,6 @@ case class Props(creator: () ⇒ Actor = Props.defaultCreator, deployId: String = Props.defaultDeployId, @transient dispatcher: MessageDispatcher = Props.defaultDispatcher, timeout: Timeout = Props.defaultTimeout, - lifeCycle: LifeCycle = Props.defaultLifeCycle, faultHandler: FaultHandlingStrategy = Props.defaultFaultHandler, supervisor: Option[ActorRef] = Props.defaultSupervisor) { /** @@ -113,7 +110,6 @@ case class Props(creator: () ⇒ Actor = Props.defaultCreator, deployId = Props.defaultDeployId, dispatcher = Props.defaultDispatcher, timeout = Props.defaultTimeout, - lifeCycle = Props.defaultLifeCycle, faultHandler = Props.defaultFaultHandler, supervisor = Props.defaultSupervisor) @@ -147,12 +143,6 @@ case class Props(creator: () ⇒ Actor = Props.defaultCreator, */ def withTimeout(t: Timeout) = copy(timeout = t) - /** - * Returns a new Props with the specified lifecycle set - * Java API - */ - def withLifeCycle(l: LifeCycle) = copy(lifeCycle = l) - /** * Returns a new Props with the specified faulthandler set * Java API @@ -232,11 +222,6 @@ abstract class ActorRef extends ActorRefShared with UntypedChannel with ReplyCha protected[akka] def timeout: Long = Props.defaultTimeout.duration.toMillis //TODO Remove me if possible - /** - * Defines the life-cycle for a supervised actor. - */ - protected[akka] def lifeCycle: LifeCycle = UndefinedLifeCycle //TODO Remove me if possible - /** * Akka Java API. * @see ask(message: AnyRef, sender: ActorRef): Future[_] @@ -523,7 +508,6 @@ class LocalActorRef private[akka] ( } //TODO Why is the guard needed here? protected[akka] override def timeout: Long = props.timeout.duration.toMillis //TODO remove this if possible - protected[akka] override def lifeCycle: LifeCycle = props.lifeCycle //TODO remove this if possible private def serializer: Serializer = //TODO Is this used or needed? try { Serialization.serializerFor(this.getClass) } catch { @@ -754,22 +738,14 @@ class LocalActorRef private[akka] ( currentMessage = null // reset current message after successful invocation } catch { case e ⇒ - { - EventHandler.error(e, this, e.getMessage) + EventHandler.error(e, this, e.getMessage) - //Prevent any further messages to be processed until the actor has been restarted - dispatcher.suspend(this) + //Prevent any further messages to be processed until the actor has been restarted + dispatcher.suspend(this) - channel.sendException(e) + channel.sendException(e) - if (supervisor.isDefined) notifySupervisorWithMessage(Death(this, e, true)) - else { - lifeCycle match { - case Temporary ⇒ shutDownTemporaryActor(this, e) - case _ ⇒ dispatcher.resume(this) //Resume processing for this actor - } - } - } + if (supervisor.isDefined) supervisor.get ! Death(this, e, true) else dispatcher.resume(this) if (e.isInstanceOf[InterruptedException]) throw e //Re-throw InterruptedExceptions as expected } finally { checkReceiveTimeout // Reschedule receive timeout @@ -790,12 +766,22 @@ class LocalActorRef private[akka] ( protected[akka] def handleDeath(death: Death) { props.faultHandler match { - case AllForOneStrategy(trapExit, maxRetries, within) if trapExit.exists(_.isAssignableFrom(death.cause.getClass)) ⇒ + case AllForOnePermanentStrategy(trapExit, maxRetries, within) if trapExit.exists(_.isAssignableFrom(death.cause.getClass)) ⇒ restartLinkedActors(death.cause, maxRetries, within) - case OneForOneStrategy(trapExit, maxRetries, within) if trapExit.exists(_.isAssignableFrom(death.cause.getClass)) ⇒ + case AllForOneTemporaryStrategy(trapExit) if trapExit.exists(_.isAssignableFrom(death.cause.getClass)) ⇒ + restartLinkedActors(death.cause, None, None) + + case OneForOnePermanentStrategy(trapExit, maxRetries, within) if trapExit.exists(_.isAssignableFrom(death.cause.getClass)) ⇒ death.deceased.restart(death.cause, maxRetries, within) + case OneForOneTemporaryStrategy(trapExit) if trapExit.exists(_.isAssignableFrom(death.cause.getClass)) ⇒ + unlink(death.deceased) + death.deceased.stop() + System.err.println("Do not restart: " + death.deceased) + System.err.println("Notifying Supervisor: " + death.deceased + " of MaximumNORWTRR") + this ! MaximumNumberOfRestartsWithinTimeRangeReached(death.deceased, None, None, death.cause) + case _ ⇒ if (_supervisor.isDefined) throw death.cause else death.deceased.stop() //Escalate problem if not handled here } @@ -848,46 +834,36 @@ class LocalActorRef private[akka] ( if (Actor.debugLifecycle) EventHandler.debug(freshActor, "restarted") } - def tooManyRestarts() { - notifySupervisorWithMessage( - MaximumNumberOfRestartsWithinTimeRangeReached(this, maxNrOfRetries, withinTimeRange, reason)) - stop() - } - @tailrec def attemptRestart() { val success = if (requestRestartPermission(maxNrOfRetries, withinTimeRange)) { guard.withGuard[Boolean] { _status = ActorRefInternals.BEING_RESTARTED - lifeCycle match { - case Temporary ⇒ - shutDownTemporaryActor(this, reason) + val success = + try { + performRestart() true + } catch { + case e ⇒ + EventHandler.error(e, this, "Exception in restart of Actor [%s]".format(toString)) + false // an error or exception here should trigger a retry + } finally { + currentMessage = null + } - case _ ⇒ // either permanent or none where default is permanent - val success = - try { - performRestart() - true - } catch { - case e ⇒ - EventHandler.error(e, this, "Exception in restart of Actor [%s]".format(toString)) - false // an error or exception here should trigger a retry - } finally { - currentMessage = null - } - - if (success) { - _status = ActorRefInternals.RUNNING - dispatcher.resume(this) - restartLinkedActors(reason, maxNrOfRetries, withinTimeRange) - } - success + if (success) { + _status = ActorRefInternals.RUNNING + dispatcher.resume(this) + restartLinkedActors(reason, maxNrOfRetries, withinTimeRange) } + success } } else { - tooManyRestarts() + // tooManyRestarts + if (supervisor.isDefined) + supervisor.get ! MaximumNumberOfRestartsWithinTimeRangeReached(this, maxNrOfRetries, withinTimeRange, reason) + stop() true // done } @@ -898,17 +874,30 @@ class LocalActorRef private[akka] ( attemptRestart() // recur } - protected[akka] def restartLinkedActors(reason: Throwable, maxNrOfRetries: Option[Int], withinTimeRange: Option[Int]) = { - val i = _linkedActors.values.iterator - while (i.hasNext) { - val actorRef = i.next - actorRef.lifeCycle match { - // either permanent or none where default is permanent - case Temporary ⇒ shutDownTemporaryActor(actorRef, reason) - case _ ⇒ actorRef.restart(reason, maxNrOfRetries, withinTimeRange) - } + protected[akka] def restartLinkedActors(reason: Throwable, maxNrOfRetries: Option[Int], withinTimeRange: Option[Int]) = + props.faultHandler.lifeCycle match { + case Temporary ⇒ + val i = _linkedActors.values.iterator + while (i.hasNext) { + val actorRef = i.next() + + i.remove() + + actorRef.stop() + // when this comes down through the handleDeath path, we get here when the temp actor is restarted + if (supervisor.isDefined) { + supervisor.get ! MaximumNumberOfRestartsWithinTimeRangeReached(actorRef, Some(0), None, reason) + + //FIXME if last temporary actor is gone, then unlink me from supervisor <-- should this exist? + if (!i.hasNext) + supervisor.get ! UnlinkAndStop(this) + } + } + + case Permanent ⇒ + val i = _linkedActors.values.iterator + while (i.hasNext) i.next().restart(reason, maxNrOfRetries, withinTimeRange) } - } def linkedActors: JCollection[ActorRef] = java.util.Collections.unmodifiableCollection(_linkedActors.values) @@ -943,21 +932,6 @@ class LocalActorRef private[akka] ( case valid ⇒ valid } - private def shutDownTemporaryActor(temporaryActor: ActorRef, reason: Throwable) { - temporaryActor.stop() - _linkedActors.remove(temporaryActor.uuid) // remove the temporary actor - // when this comes down through the handleDeath path, we get here when the temp actor is restarted - notifySupervisorWithMessage(MaximumNumberOfRestartsWithinTimeRangeReached(temporaryActor, Some(0), None, reason)) - // if last temporary actor is gone, then unlink me from supervisor - if (_linkedActors.isEmpty) notifySupervisorWithMessage(UnlinkAndStop(this)) - true - } - - private def notifySupervisorWithMessage(notification: LifeCycleMessage) { - val sup = _supervisor - if (sup.isDefined) sup.get ! notification - } - private def setActorSelfFields(actor: Actor, value: ActorRef) { @tailrec diff --git a/akka-actor/src/main/scala/akka/config/SupervisionConfig.scala b/akka-actor/src/main/scala/akka/config/SupervisionConfig.scala index 5bcbae8b10..1494c1b0d9 100644 --- a/akka-actor/src/main/scala/akka/config/SupervisionConfig.scala +++ b/akka-actor/src/main/scala/akka/config/SupervisionConfig.scala @@ -20,7 +20,7 @@ object Supervision { abstract class Server extends ConfigElement sealed abstract class LifeCycle extends ConfigElement - sealed abstract class FaultHandlingStrategy(val trapExit: List[Class[_ <: Throwable]]) extends ConfigElement + sealed abstract class FaultHandlingStrategy(val trapExit: List[Class[_ <: Throwable]], val lifeCycle: LifeCycle) extends ConfigElement case class SupervisorConfig(restartStrategy: FaultHandlingStrategy, worker: List[Server], maxRestartsHandler: (ActorRef, MaximumNumberOfRestartsWithinTimeRangeReached) ⇒ Unit = { (aRef, max) ⇒ () }) extends Server { //Java API @@ -40,9 +40,9 @@ object Supervision { def unapply(supervise: Supervise) = Some((supervise.actorRef, supervise.lifeCycle, supervise.registerAsRemoteService)) } - object AllForOneStrategy { - def apply(trapExit: List[Class[_ <: Throwable]], maxNrOfRetries: Int, withinTimeRange: Int): AllForOneStrategy = - new AllForOneStrategy(trapExit, + object AllForOnePermanentStrategy { + def apply(trapExit: List[Class[_ <: Throwable]], maxNrOfRetries: Int, withinTimeRange: Int): AllForOnePermanentStrategy = + new AllForOnePermanentStrategy(trapExit, if (maxNrOfRetries < 0) None else Some(maxNrOfRetries), if (withinTimeRange < 0) None else Some(withinTimeRange)) } @@ -52,9 +52,9 @@ object Supervision { * maxNrOfRetries = the number of times an actor is allowed to be restarted * withinTimeRange = millisecond time window for maxNrOfRetries, negative means no window */ - case class AllForOneStrategy(override val trapExit: List[Class[_ <: Throwable]], - maxNrOfRetries: Option[Int] = None, - withinTimeRange: Option[Int] = None) extends FaultHandlingStrategy(trapExit) { + case class AllForOnePermanentStrategy(override val trapExit: List[Class[_ <: Throwable]], + maxNrOfRetries: Option[Int] = None, + withinTimeRange: Option[Int] = None) extends FaultHandlingStrategy(trapExit, Permanent) { def this(trapExit: List[Class[_ <: Throwable]], maxNrOfRetries: Int, withinTimeRange: Int) = this(trapExit, if (maxNrOfRetries < 0) None else Some(maxNrOfRetries), if (withinTimeRange < 0) None else Some(withinTimeRange)) @@ -68,9 +68,11 @@ object Supervision { if (maxNrOfRetries < 0) None else Some(maxNrOfRetries), if (withinTimeRange < 0) None else Some(withinTimeRange)) } - object OneForOneStrategy { - def apply(trapExit: List[Class[_ <: Throwable]], maxNrOfRetries: Int, withinTimeRange: Int): OneForOneStrategy = - new OneForOneStrategy(trapExit, + case class AllForOneTemporaryStrategy(override val trapExit: List[Class[_ <: Throwable]]) extends FaultHandlingStrategy(trapExit, Temporary) + + object OneForOnePermanentStrategy { + def apply(trapExit: List[Class[_ <: Throwable]], maxNrOfRetries: Int, withinTimeRange: Int): OneForOnePermanentStrategy = + new OneForOnePermanentStrategy(trapExit, if (maxNrOfRetries < 0) None else Some(maxNrOfRetries), if (withinTimeRange < 0) None else Some(withinTimeRange)) } @@ -80,9 +82,9 @@ object Supervision { * maxNrOfRetries = the number of times an actor is allowed to be restarted * withinTimeRange = millisecond time window for maxNrOfRetries, negative means no window */ - case class OneForOneStrategy(override val trapExit: List[Class[_ <: Throwable]], - maxNrOfRetries: Option[Int] = None, - withinTimeRange: Option[Int] = None) extends FaultHandlingStrategy(trapExit) { + case class OneForOnePermanentStrategy(override val trapExit: List[Class[_ <: Throwable]], + maxNrOfRetries: Option[Int] = None, + withinTimeRange: Option[Int] = None) extends FaultHandlingStrategy(trapExit, Permanent) { def this(trapExit: List[Class[_ <: Throwable]], maxNrOfRetries: Int, withinTimeRange: Int) = this(trapExit, if (maxNrOfRetries < 0) None else Some(maxNrOfRetries), if (withinTimeRange < 0) None else Some(withinTimeRange)) @@ -96,20 +98,15 @@ object Supervision { if (maxNrOfRetries < 0) None else Some(maxNrOfRetries), if (withinTimeRange < 0) None else Some(withinTimeRange)) } - case object NoFaultHandlingStrategy extends FaultHandlingStrategy(Nil) + case class OneForOneTemporaryStrategy(override val trapExit: List[Class[_ <: Throwable]]) extends FaultHandlingStrategy(trapExit, Temporary) //Scala API case object Permanent extends LifeCycle case object Temporary extends LifeCycle - case object UndefinedLifeCycle extends LifeCycle //Java API (& Scala if you fancy) def permanent(): LifeCycle = Permanent def temporary(): LifeCycle = Temporary - def undefinedLifeCycle(): LifeCycle = UndefinedLifeCycle - - //Java API - def noFaultHandlingStrategy = NoFaultHandlingStrategy case class SuperviseTypedActor(_intf: Class[_], val target: Class[_], diff --git a/akka-actor/src/main/scala/akka/event/DeathWatch.scala b/akka-actor/src/main/scala/akka/event/DeathWatch.scala index 7ba9392ff6..2fe131299b 100644 --- a/akka-actor/src/main/scala/akka/event/DeathWatch.scala +++ b/akka-actor/src/main/scala/akka/event/DeathWatch.scala @@ -2,21 +2,99 @@ * Copyright (C) 2009-2011 Typesafe Inc.