Removing LifeCycle from Props, it's now a part of AllForOnePermanent, OneForOnePermanent etc
This commit is contained in:
parent
7e2af6a9b9
commit
4a0358ad7d
35 changed files with 342 additions and 286 deletions
|
|
@ -18,6 +18,6 @@ public class SupervisionConfig {
|
|||
}
|
||||
|
||||
|
||||
return new SupervisorConfig(new AllForOneStrategy(new Class[] { Exception.class }, 50, 1000), targets.toArray(new Server[targets.size()]));
|
||||
return new SupervisorConfig(new AllForOnePermanentStrategy(new Class[] { Exception.class }, 50, 1000), targets.toArray(new Server[targets.size()]));
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -84,12 +84,14 @@ class ActorFireForgetRequestReplySpec extends WordSpec with MustMatchers with Be
|
|||
|
||||
"should shutdown crashed temporary actor" in {
|
||||
filterEvents(EventFilter[Exception]("Expected")) {
|
||||
val actor = actorOf(Props[CrashingActor].withLifeCycle(Temporary))
|
||||
val supervisor = actorOf(Props(self ⇒ { case _ ⇒ }).withFaultHandler(OneForOneTemporaryStrategy(List(classOf[Exception]))))
|
||||
val actor = actorOf(Props[CrashingActor].withSupervisor(supervisor))
|
||||
actor.isRunning must be(true)
|
||||
actor ! "Die"
|
||||
state.finished.await
|
||||
sleepFor(1 second)
|
||||
actor.isShutdown must be(true)
|
||||
supervisor.stop()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -10,7 +10,7 @@ import org.scalatest.matchers.MustMatchers
|
|||
import akka.testkit._
|
||||
import akka.util.duration._
|
||||
import akka.testkit.Testing.sleepFor
|
||||
import akka.config.Supervision.{ OneForOneStrategy }
|
||||
import akka.config.Supervision.{ OneForOnePermanentStrategy }
|
||||
import akka.dispatch.Future
|
||||
import java.util.concurrent.{ TimeUnit, CountDownLatch }
|
||||
import java.lang.IllegalStateException
|
||||
|
|
@ -338,7 +338,7 @@ class ActorRefSpec extends WordSpec with MustMatchers {
|
|||
}).withSupervisor(self))
|
||||
|
||||
protected def receive = { case "sendKill" ⇒ ref ! Kill }
|
||||
}).withFaultHandler(OneForOneStrategy(List(classOf[Throwable]), 2, 1000)))
|
||||
}).withFaultHandler(OneForOnePermanentStrategy(List(classOf[Throwable]), 2, 1000)))
|
||||
|
||||
boss ! "sendKill"
|
||||
latch.await(5, TimeUnit.SECONDS) must be === true
|
||||
|
|
|
|||
|
|
@ -10,7 +10,7 @@ import org.scalatest.matchers.MustMatchers
|
|||
import Actor.actorOf
|
||||
import akka.testkit._
|
||||
import akka.util.duration._
|
||||
import akka.config.Supervision.OneForOneStrategy
|
||||
import akka.config.Supervision.OneForOnePermanentStrategy
|
||||
|
||||
import java.util.concurrent.atomic._
|
||||
|
||||
|
|
@ -85,7 +85,7 @@ class ActorRestartSpec extends WordSpec with MustMatchers with TestKit with Befo
|
|||
}
|
||||
|
||||
private def createSupervisor =
|
||||
actorOf(Props[Supervisor].withFaultHandler(OneForOneStrategy(List(classOf[Throwable]), 5, 5000)))
|
||||
actorOf(Props[Supervisor].withFaultHandler(OneForOnePermanentStrategy(List(classOf[Throwable]), 5, 5000)))
|
||||
|
||||
val expectedEvents = Seq(EventFilter[ActorKilledException], EventFilter[IllegalActorStateException]("expected"))
|
||||
|
||||
|
|
|
|||
|
|
@ -62,7 +62,7 @@ class FSMTransitionSpec extends WordSpec with MustMatchers with TestKit {
|
|||
"not fail when listener goes away" in {
|
||||
val forward = Actor.actorOf(new Forwarder(testActor))
|
||||
val fsm = Actor.actorOf(new MyFSM(testActor))
|
||||
val sup = Actor.actorOf(Props[Supervisor].withFaultHandler(OneForOneStrategy(List(classOf[Throwable]), None, None)))
|
||||
val sup = Actor.actorOf(Props[Supervisor].withFaultHandler(OneForOnePermanentStrategy(List(classOf[Throwable]), None, None)))
|
||||
sup link fsm
|
||||
within(300 millis) {
|
||||
fsm ! SubscribeTransitionCallBack(forward)
|
||||
|
|
|
|||
|
|
@ -133,7 +133,7 @@ class LoggingReceiveSpec
|
|||
within(2 seconds) {
|
||||
val supervisor = TestActorRef(Props(new Actor {
|
||||
def receive = { case _ ⇒ }
|
||||
}).withFaultHandler(OneForOneStrategy(List(classOf[Throwable]), 5, 5000)))
|
||||
}).withFaultHandler(OneForOnePermanentStrategy(List(classOf[Throwable]), 5, 5000)))
|
||||
val f = Actor.getClass.getDeclaredField("debugLifecycle")
|
||||
f.setAccessible(true)
|
||||
f.setBoolean(Actor, true)
|
||||
|
|
|
|||
|
|
@ -15,7 +15,7 @@ import akka.testkit.EventFilter
|
|||
|
||||
import Actor._
|
||||
import java.util.concurrent.{ TimeUnit, CountDownLatch }
|
||||
import akka.config.Supervision.{ Permanent, LifeCycle, OneForOneStrategy }
|
||||
import akka.config.Supervision.{ Permanent, LifeCycle, OneForOnePermanentStrategy }
|
||||
import org.multiverse.api.latches.StandardLatch
|
||||
|
||||
class RestartStrategySpec extends JUnitSuite with BeforeAndAfterAll {
|
||||
|
|
@ -36,7 +36,7 @@ class RestartStrategySpec extends JUnitSuite with BeforeAndAfterAll {
|
|||
|
||||
val boss = actorOf(Props(new Actor {
|
||||
protected def receive = { case _ ⇒ () }
|
||||
}).withFaultHandler(OneForOneStrategy(List(classOf[Throwable]), 2, 1000)))
|
||||
}).withFaultHandler(OneForOnePermanentStrategy(List(classOf[Throwable]), 2, 1000)))
|
||||
|
||||
val restartLatch = new StandardLatch
|
||||
val secondRestartLatch = new StandardLatch
|
||||
|
|
@ -88,7 +88,7 @@ class RestartStrategySpec extends JUnitSuite with BeforeAndAfterAll {
|
|||
|
||||
val boss = actorOf(Props(new Actor {
|
||||
def receive = { case _ ⇒ () }
|
||||
}).withFaultHandler(OneForOneStrategy(List(classOf[Throwable]), None, None)))
|
||||
}).withFaultHandler(OneForOnePermanentStrategy(List(classOf[Throwable]), None, None)))
|
||||
|
||||
val countDownLatch = new CountDownLatch(100)
|
||||
|
||||
|
|
@ -113,7 +113,7 @@ class RestartStrategySpec extends JUnitSuite with BeforeAndAfterAll {
|
|||
|
||||
val boss = actorOf(Props(new Actor {
|
||||
def receive = { case _ ⇒ () }
|
||||
}).withFaultHandler(OneForOneStrategy(List(classOf[Throwable]), 2, 500)))
|
||||
}).withFaultHandler(OneForOnePermanentStrategy(List(classOf[Throwable]), 2, 500)))
|
||||
|
||||
val restartLatch = new StandardLatch
|
||||
val secondRestartLatch = new StandardLatch
|
||||
|
|
@ -172,7 +172,7 @@ class RestartStrategySpec extends JUnitSuite with BeforeAndAfterAll {
|
|||
def slaveShouldNotRestartAfterMaxRetries = {
|
||||
val boss = actorOf(Props(new Actor {
|
||||
def receive = { case _ ⇒ () }
|
||||
}).withFaultHandler(OneForOneStrategy(List(classOf[Throwable]), Some(2), None)))
|
||||
}).withFaultHandler(OneForOnePermanentStrategy(List(classOf[Throwable]), Some(2), None)))
|
||||
|
||||
val restartLatch = new StandardLatch
|
||||
val secondRestartLatch = new StandardLatch
|
||||
|
|
@ -229,7 +229,7 @@ class RestartStrategySpec extends JUnitSuite with BeforeAndAfterAll {
|
|||
|
||||
val boss = actorOf(Props(new Actor {
|
||||
def receive = { case m: MaximumNumberOfRestartsWithinTimeRangeReached ⇒ maxNoOfRestartsLatch.open }
|
||||
}).withFaultHandler(OneForOneStrategy(List(classOf[Throwable]), None, Some(1000))))
|
||||
}).withFaultHandler(OneForOnePermanentStrategy(List(classOf[Throwable]), None, Some(1000))))
|
||||
|
||||
val slave = actorOf(Props(new Actor {
|
||||
|
||||
|
|
|
|||
|
|
@ -129,7 +129,7 @@ class SchedulerSpec extends JUnitSuite {
|
|||
|
||||
Supervisor(
|
||||
SupervisorConfig(
|
||||
AllForOneStrategy(List(classOf[Exception]), 3, 1000),
|
||||
AllForOnePermanentStrategy(List(classOf[Exception]), 3, 1000),
|
||||
Supervise(
|
||||
actor,
|
||||
Permanent)
|
||||
|
|
|
|||
|
|
@ -8,7 +8,7 @@ import org.scalatest.junit.JUnitSuite
|
|||
import org.junit.Test
|
||||
|
||||
import Actor._
|
||||
import akka.config.Supervision.OneForOneStrategy
|
||||
import akka.config.Supervision.OneForOnePermanentStrategy
|
||||
|
||||
import java.util.concurrent.{ TimeUnit, CountDownLatch }
|
||||
|
||||
|
|
@ -28,15 +28,13 @@ class SupervisorHierarchySpec extends JUnitSuite {
|
|||
def killWorkerShouldRestartMangerAndOtherWorkers = {
|
||||
val countDown = new CountDownLatch(4)
|
||||
|
||||
val boss = actorOf(Props(new Actor {
|
||||
protected def receive = { case _ ⇒ () }
|
||||
}).withFaultHandler(OneForOneStrategy(List(classOf[Throwable]), 5, 1000)))
|
||||
val boss = actorOf(Props(self ⇒ { case _ ⇒ }).withFaultHandler(OneForOnePermanentStrategy(List(classOf[Throwable]), 5, 1000)))
|
||||
|
||||
val manager = actorOf(Props(new CountDownActor(countDown)).withSupervisor(boss))
|
||||
val manager = actorOf(Props(new CountDownActor(countDown)).withFaultHandler(OneForOnePermanentStrategy(List(), None, None)).withSupervisor(boss))
|
||||
|
||||
val workerOne, workerTwo, workerThree = actorOf(Props(new CountDownActor(countDown)).withSupervisor(manager))
|
||||
|
||||
workerOne ! Death(workerOne, new FireWorkerException("Fire the worker!"), true)
|
||||
manager ! Death(workerOne, new FireWorkerException("Fire the worker!"), true)
|
||||
|
||||
// manager + all workers should be restarted by only killing a worker
|
||||
// manager doesn't trap exits, so boss will restart manager
|
||||
|
|
@ -52,7 +50,7 @@ class SupervisorHierarchySpec extends JUnitSuite {
|
|||
protected def receive = {
|
||||
case MaximumNumberOfRestartsWithinTimeRangeReached(_, _, _, _) ⇒ countDownMax.countDown()
|
||||
}
|
||||
}).withFaultHandler(OneForOneStrategy(List(classOf[Throwable]), 1, 5000)))
|
||||
}).withFaultHandler(OneForOnePermanentStrategy(List(classOf[Throwable]), 1, 5000)))
|
||||
|
||||
val crasher = actorOf(Props(new CountDownActor(countDownMessages)).withSupervisor(boss))
|
||||
|
||||
|
|
|
|||
|
|
@ -5,7 +5,7 @@ package akka.actor
|
|||
|
||||
import org.scalatest.WordSpec
|
||||
import org.scalatest.matchers.MustMatchers
|
||||
import akka.config.Supervision.{ SupervisorConfig, OneForOneStrategy, Supervise, Permanent }
|
||||
import akka.config.Supervision.{ SupervisorConfig, OneForOnePermanentStrategy, Supervise, Permanent }
|
||||
import java.util.concurrent.CountDownLatch
|
||||
import akka.testkit.{ filterEvents, EventFilter }
|
||||
import akka.dispatch.{ PinnedDispatcher, Dispatchers }
|
||||
|
|
@ -55,7 +55,7 @@ class SupervisorMiscSpec extends WordSpec with MustMatchers {
|
|||
|
||||
val sup = Supervisor(
|
||||
SupervisorConfig(
|
||||
OneForOneStrategy(List(classOf[Exception]), 3, 5000),
|
||||
OneForOnePermanentStrategy(List(classOf[Exception]), 3, 5000),
|
||||
Supervise(actor1, Permanent) ::
|
||||
Supervise(actor2, Permanent) ::
|
||||
Supervise(actor3, Permanent) ::
|
||||
|
|
|
|||
|
|
@ -57,7 +57,7 @@ object SupervisorSpec {
|
|||
|
||||
class Master extends Actor {
|
||||
|
||||
val temp = actorOf(Props[PingPongActor].withLifeCycle(Temporary).withSupervisor(self))
|
||||
val temp = actorOf(Props[PingPongActor].withSupervisor(self))
|
||||
|
||||
override def receive = {
|
||||
case Die ⇒ (temp.?(Die, TimeoutMillis)).get
|
||||
|
|
@ -70,11 +70,11 @@ object SupervisorSpec {
|
|||
// =====================================================
|
||||
|
||||
def temporaryActorAllForOne = {
|
||||
val temporaryActor = actorOf(Props[PingPongActor].withLifeCycle(Temporary))
|
||||
val temporaryActor = actorOf(Props[PingPongActor])
|
||||
|
||||
val supervisor = Supervisor(
|
||||
SupervisorConfig(
|
||||
AllForOneStrategy(List(classOf[Exception]), 3, TimeoutMillis),
|
||||
AllForOneTemporaryStrategy(List(classOf[Exception])),
|
||||
Supervise(
|
||||
temporaryActor,
|
||||
Temporary)
|
||||
|
|
@ -88,7 +88,7 @@ object SupervisorSpec {
|
|||
|
||||
val supervisor = Supervisor(
|
||||
SupervisorConfig(
|
||||
AllForOneStrategy(List(classOf[Exception]), 3, TimeoutMillis),
|
||||
AllForOnePermanentStrategy(List(classOf[Exception]), 3, TimeoutMillis),
|
||||
Supervise(
|
||||
pingpong,
|
||||
Permanent)
|
||||
|
|
@ -102,7 +102,7 @@ object SupervisorSpec {
|
|||
|
||||
val supervisor = Supervisor(
|
||||
SupervisorConfig(
|
||||
OneForOneStrategy(List(classOf[Exception]), 3, TimeoutMillis),
|
||||
OneForOnePermanentStrategy(List(classOf[Exception]), 3, TimeoutMillis),
|
||||
Supervise(
|
||||
pingpong,
|
||||
Permanent)
|
||||
|
|
@ -118,7 +118,7 @@ object SupervisorSpec {
|
|||
|
||||
val supervisor = Supervisor(
|
||||
SupervisorConfig(
|
||||
AllForOneStrategy(List(classOf[Exception]), 3, TimeoutMillis),
|
||||
AllForOnePermanentStrategy(List(classOf[Exception]), 3, TimeoutMillis),
|
||||
Supervise(
|
||||
pingpong1,
|
||||
Permanent)
|
||||
|
|
@ -142,7 +142,7 @@ object SupervisorSpec {
|
|||
|
||||
val supervisor = Supervisor(
|
||||
SupervisorConfig(
|
||||
OneForOneStrategy(List(classOf[Exception]), 3, TimeoutMillis),
|
||||
OneForOnePermanentStrategy(List(classOf[Exception]), 3, TimeoutMillis),
|
||||
Supervise(
|
||||
pingpong1,
|
||||
Permanent)
|
||||
|
|
@ -166,13 +166,13 @@ object SupervisorSpec {
|
|||
|
||||
val supervisor = Supervisor(
|
||||
SupervisorConfig(
|
||||
AllForOneStrategy(List(classOf[Exception]), 3, TimeoutMillis),
|
||||
AllForOnePermanentStrategy(List(classOf[Exception]), 3, TimeoutMillis),
|
||||
Supervise(
|
||||
pingpong1,
|
||||
Permanent)
|
||||
::
|
||||
SupervisorConfig(
|
||||
AllForOneStrategy(Nil, 3, TimeoutMillis),
|
||||
AllForOnePermanentStrategy(Nil, 3, TimeoutMillis),
|
||||
Supervise(
|
||||
pingpong2,
|
||||
Permanent)
|
||||
|
|
@ -217,7 +217,7 @@ class SupervisorSpec extends WordSpec with MustMatchers with BeforeAndAfterEach
|
|||
"A supervisor" must {
|
||||
|
||||
"not restart programmatically linked temporary actor" in {
|
||||
val master = actorOf(Props[Master].withFaultHandler(OneForOneStrategy(List(classOf[Exception]), 5, (1 second).dilated.toMillis.toInt)))
|
||||
val master = actorOf(Props[Master].withFaultHandler(OneForOneTemporaryStrategy(List(classOf[Exception]))))
|
||||
|
||||
intercept[RuntimeException] {
|
||||
(master.?(Die, TimeoutMillis)).get
|
||||
|
|
@ -374,7 +374,7 @@ class SupervisorSpec extends WordSpec with MustMatchers with BeforeAndAfterEach
|
|||
val supervisor =
|
||||
Supervisor(
|
||||
SupervisorConfig(
|
||||
OneForOneStrategy(classOf[Exception] :: Nil, 3, 10000),
|
||||
OneForOnePermanentStrategy(classOf[Exception] :: Nil, 3, 10000),
|
||||
Supervise(dyingActor, Permanent) :: Nil))
|
||||
|
||||
intercept[RuntimeException] {
|
||||
|
|
|
|||
|
|
@ -10,7 +10,7 @@ import akka.util.duration._
|
|||
import akka.testkit.Testing.sleepFor
|
||||
import akka.testkit.{ EventFilter, filterEvents, filterException }
|
||||
import akka.dispatch.Dispatchers
|
||||
import akka.config.Supervision.{ SupervisorConfig, OneForOneStrategy, Supervise, Permanent }
|
||||
import akka.config.Supervision.{ SupervisorConfig, OneForOnePermanentStrategy, Supervise, Permanent }
|
||||
import Actor._
|
||||
|
||||
class SupervisorTreeSpec extends WordSpec with MustMatchers {
|
||||
|
|
@ -35,7 +35,7 @@ class SupervisorTreeSpec extends WordSpec with MustMatchers {
|
|||
filterException[Exception] {
|
||||
log = "INIT"
|
||||
|
||||
val p = Props.default.withFaultHandler(OneForOneStrategy(List(classOf[Exception]), 3, 1000))
|
||||
val p = Props.default.withFaultHandler(OneForOnePermanentStrategy(List(classOf[Exception]), 3, 1000))
|
||||
val lastActor = actorOf(p.withCreator(new Chainer(None)), "lastActor")
|
||||
val middleActor = actorOf(p.withCreator(new Chainer(Some(lastActor))), "middleActor")
|
||||
val headActor = actorOf(p.withCreator(new Chainer(Some(middleActor))), "headActor")
|
||||
|
|
|
|||
|
|
@ -29,7 +29,7 @@ class Ticket669Spec extends WordSpec with MustMatchers with BeforeAndAfterAll {
|
|||
|
||||
val supervised = Actor.actorOf[Supervised]
|
||||
val supervisor = Supervisor(SupervisorConfig(
|
||||
AllForOneStrategy(List(classOf[Exception]), 5, 10000),
|
||||
AllForOnePermanentStrategy(List(classOf[Exception]), 5, 10000),
|
||||
Supervise(supervised, Permanent) :: Nil))
|
||||
|
||||
supervised.!("test")(Some(sender))
|
||||
|
|
@ -44,7 +44,7 @@ class Ticket669Spec extends WordSpec with MustMatchers with BeforeAndAfterAll {
|
|||
|
||||
val supervised = Actor.actorOf[Supervised]
|
||||
val supervisor = Supervisor(SupervisorConfig(
|
||||
AllForOneStrategy(List(classOf[Exception]), 5, 10000),
|
||||
AllForOnePermanentStrategy(List(classOf[Exception]), 5, 10000),
|
||||
Supervise(supervised, Temporary) :: Nil))
|
||||
|
||||
supervised.!("test")(Some(sender))
|
||||
|
|
|
|||
|
|
@ -3,13 +3,13 @@ package akka.routing
|
|||
import org.scalatest.WordSpec
|
||||
import org.scalatest.matchers.MustMatchers
|
||||
import akka.dispatch.{ KeptPromise, Future }
|
||||
import java.util.concurrent.atomic.AtomicInteger
|
||||
import akka.actor.Actor._
|
||||
import akka.testkit.Testing._
|
||||
import akka.actor.{ TypedActor, Actor, Props }
|
||||
import akka.testkit.{ TestLatch, filterEvents, EventFilter, filterException }
|
||||
import akka.util.duration._
|
||||
import akka.config.Supervision.OneForOneStrategy
|
||||
import akka.config.Supervision.OneForOnePermanentStrategy
|
||||
import java.util.concurrent.atomic.{ AtomicBoolean, AtomicInteger }
|
||||
|
||||
object ActorPoolSpec {
|
||||
|
||||
|
|
@ -24,7 +24,7 @@ object ActorPoolSpec {
|
|||
}
|
||||
}
|
||||
|
||||
val faultHandler = OneForOneStrategy(List(classOf[Exception]), 5, 1000)
|
||||
val faultHandler = OneForOnePermanentStrategy(List(classOf[Exception]), 5, 1000)
|
||||
}
|
||||
|
||||
class ActorPoolSpec extends WordSpec with MustMatchers {
|
||||
|
|
@ -364,58 +364,10 @@ class ActorPoolSpec extends WordSpec with MustMatchers {
|
|||
import akka.config.Supervision._
|
||||
val pingCount = new AtomicInteger(0)
|
||||
val deathCount = new AtomicInteger(0)
|
||||
var keepDying = false
|
||||
val keepDying = new AtomicBoolean(false)
|
||||
|
||||
val pool1 = actorOf(
|
||||
Props(new Actor with DefaultActorPool with BoundedCapacityStrategy with ActiveFuturesPressureCapacitor with SmallestMailboxSelector with BasicFilter {
|
||||
def lowerBound = 2
|
||||
def upperBound = 5
|
||||
def rampupRate = 0.1
|
||||
def backoffRate = 0.1
|
||||
def backoffThreshold = 0.5
|
||||
def partialFill = true
|
||||
def selectionCount = 1
|
||||
def instance = factory
|
||||
def receive = _route
|
||||
def pressureThreshold = 1
|
||||
def factory = actorOf(new Actor {
|
||||
if (deathCount.get > 5) deathCount.set(0)
|
||||
if (deathCount.get > 0) { deathCount.incrementAndGet; throw new IllegalStateException("keep dying") }
|
||||
def receive = {
|
||||
case akka.Die ⇒
|
||||
if (keepDying) deathCount.incrementAndGet
|
||||
throw new RuntimeException
|
||||
case _ ⇒ pingCount.incrementAndGet
|
||||
}
|
||||
})
|
||||
}).withFaultHandler(faultHandler))
|
||||
|
||||
val pool2 = actorOf(
|
||||
Props(new Actor with DefaultActorPool with BoundedCapacityStrategy with ActiveFuturesPressureCapacitor with SmallestMailboxSelector with BasicFilter {
|
||||
def lowerBound = 2
|
||||
def upperBound = 5
|
||||
def rampupRate = 0.1
|
||||
def backoffRate = 0.1
|
||||
def backoffThreshold = 0.5
|
||||
def partialFill = true
|
||||
def selectionCount = 1
|
||||
def instance = factory
|
||||
def receive = _route
|
||||
def pressureThreshold = 1
|
||||
def factory = actorOf(new Actor {
|
||||
if (deathCount.get > 5) deathCount.set(0)
|
||||
if (deathCount.get > 0) { deathCount.incrementAndGet; throw new IllegalStateException("keep dying") }
|
||||
def receive = {
|
||||
case akka.Die ⇒
|
||||
if (keepDying) deathCount.incrementAndGet
|
||||
throw new RuntimeException
|
||||
case _ ⇒ pingCount.incrementAndGet
|
||||
}
|
||||
})
|
||||
}).withFaultHandler(faultHandler))
|
||||
|
||||
val pool3 = actorOf(
|
||||
Props(new Actor with DefaultActorPool with BoundedCapacityStrategy with ActiveFuturesPressureCapacitor with RoundRobinSelector with BasicFilter {
|
||||
def lowerBound = 2
|
||||
def upperBound = 5
|
||||
def rampupRate = 0.1
|
||||
|
|
@ -431,17 +383,74 @@ class ActorPoolSpec extends WordSpec with MustMatchers {
|
|||
if (deathCount.get > 0) { deathCount.incrementAndGet; throw new IllegalStateException("keep dying") }
|
||||
def receive = {
|
||||
case akka.Die ⇒
|
||||
if (keepDying) deathCount.incrementAndGet
|
||||
if (keepDying.get) deathCount.incrementAndGet
|
||||
throw new RuntimeException
|
||||
case _ ⇒ pingCount.incrementAndGet
|
||||
}
|
||||
}).withLifeCycle(Temporary))
|
||||
}))
|
||||
}).withFaultHandler(faultHandler))
|
||||
|
||||
val pool2 = actorOf(
|
||||
Props(new Actor with DefaultActorPool with BoundedCapacityStrategy with ActiveFuturesPressureCapacitor with SmallestMailboxSelector with BasicFilter {
|
||||
def lowerBound = 2
|
||||
def upperBound = 5
|
||||
def rampupRate = 0.1
|
||||
def backoffRate = 0.1
|
||||
def backoffThreshold = 0.5
|
||||
def partialFill = true
|
||||
def selectionCount = 1
|
||||
def instance = factory
|
||||
def receive = _route
|
||||
def pressureThreshold = 1
|
||||
def factory = actorOf(Props(new Actor {
|
||||
if (deathCount.get > 5) deathCount.set(0)
|
||||
if (deathCount.get > 0) { deathCount.incrementAndGet; throw new IllegalStateException("keep dying") }
|
||||
def receive = {
|
||||
case akka.Die ⇒
|
||||
if (keepDying.get) deathCount.incrementAndGet
|
||||
throw new RuntimeException
|
||||
case _ ⇒ pingCount.incrementAndGet
|
||||
}
|
||||
}))
|
||||
}).withFaultHandler(faultHandler))
|
||||
|
||||
val pool3 = actorOf(
|
||||
Props(new Actor with DefaultActorPool with BoundedCapacityStrategy with ActiveFuturesPressureCapacitor with RoundRobinSelector with BasicFilter {
|
||||
def lowerBound = 2
|
||||
def upperBound = 5
|
||||
def rampupRate = 0.1
|
||||
def backoffRate = 0.1
|
||||
def backoffThreshold = 0.5
|
||||
def partialFill = true
|
||||
def selectionCount = 1
|
||||
def instance = factory
|
||||
def receive = _route
|
||||
def pressureThreshold = 1
|
||||
def factory = actorOf(Props(new Actor {
|
||||
|
||||
System.err.println("Going up: " + self)
|
||||
|
||||
if (deathCount.get > 5) deathCount.set(0)
|
||||
if (deathCount.get > 0) { deathCount.incrementAndGet; throw new IllegalStateException("keep dying") }
|
||||
def receive = {
|
||||
case akka.Die ⇒
|
||||
if (keepDying.get)
|
||||
deathCount.incrementAndGet
|
||||
|
||||
throw new RuntimeException
|
||||
case _ ⇒ pingCount.incrementAndGet
|
||||
}
|
||||
|
||||
override def postStop() {
|
||||
System.err.println("Going down: " + self)
|
||||
}
|
||||
}))
|
||||
}).withFaultHandler(OneForOneTemporaryStrategy(List(classOf[Exception]))))
|
||||
|
||||
// default lifecycle
|
||||
// actor comes back right away
|
||||
pingCount.set(0)
|
||||
keepDying = false
|
||||
keepDying.set(false)
|
||||
pool1 ! "ping"
|
||||
(pool1 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2)
|
||||
pool1 ! akka.Die
|
||||
|
|
@ -452,7 +461,7 @@ class ActorPoolSpec extends WordSpec with MustMatchers {
|
|||
// default lifecycle
|
||||
// actor dies completely
|
||||
pingCount.set(0)
|
||||
keepDying = true
|
||||
keepDying.set(true)
|
||||
pool1 ! "ping"
|
||||
(pool1 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2)
|
||||
pool1 ! akka.Die
|
||||
|
|
@ -465,7 +474,7 @@ class ActorPoolSpec extends WordSpec with MustMatchers {
|
|||
// permanent lifecycle
|
||||
// actor comes back right away
|
||||
pingCount.set(0)
|
||||
keepDying = false
|
||||
keepDying.set(false)
|
||||
pool2 ! "ping"
|
||||
(pool2 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2)
|
||||
pool2 ! akka.Die
|
||||
|
|
@ -476,7 +485,7 @@ class ActorPoolSpec extends WordSpec with MustMatchers {
|
|||
// permanent lifecycle
|
||||
// actor dies completely
|
||||
pingCount.set(0)
|
||||
keepDying = true
|
||||
keepDying.set(true)
|
||||
pool2 ! "ping"
|
||||
(pool2 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2)
|
||||
pool2 ! akka.Die
|
||||
|
|
@ -488,7 +497,7 @@ class ActorPoolSpec extends WordSpec with MustMatchers {
|
|||
|
||||
// temporary lifecycle
|
||||
pingCount.set(0)
|
||||
keepDying = false
|
||||
keepDying.set(false)
|
||||
pool3 ! "ping"
|
||||
(pool3 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2)
|
||||
pool3 ! akka.Die
|
||||
|
|
@ -507,7 +516,7 @@ class ActorPoolSpec extends WordSpec with MustMatchers {
|
|||
import akka.config.Supervision._
|
||||
val pingCount = new AtomicInteger(0)
|
||||
val deathCount = new AtomicInteger(0)
|
||||
var keepDying = false
|
||||
var keepDying = new AtomicBoolean(false)
|
||||
|
||||
object BadState
|
||||
|
||||
|
|
@ -528,18 +537,18 @@ class ActorPoolSpec extends WordSpec with MustMatchers {
|
|||
if (deathCount.get > 0) { deathCount.incrementAndGet; throw new IllegalStateException("keep dying") }
|
||||
def receive = {
|
||||
case BadState ⇒
|
||||
if (keepDying) deathCount.incrementAndGet
|
||||
if (keepDying.get) deathCount.incrementAndGet
|
||||
throw new IllegalStateException
|
||||
case akka.Die ⇒
|
||||
throw new RuntimeException
|
||||
case _ ⇒ pingCount.incrementAndGet
|
||||
}
|
||||
})
|
||||
}).withFaultHandler(OneForOneStrategy(List(classOf[IllegalStateException]), 5, 1000)))
|
||||
}).withFaultHandler(OneForOnePermanentStrategy(List(classOf[IllegalStateException]), 5, 1000)))
|
||||
|
||||
// actor comes back right away
|
||||
pingCount.set(0)
|
||||
keepDying = false
|
||||
keepDying.set(false)
|
||||
pool1 ! "ping"
|
||||
(pool1 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2)
|
||||
pool1 ! BadState
|
||||
|
|
@ -549,7 +558,7 @@ class ActorPoolSpec extends WordSpec with MustMatchers {
|
|||
|
||||
// actor dies completely
|
||||
pingCount.set(0)
|
||||
keepDying = true
|
||||
keepDying.set(true)
|
||||
pool1 ! "ping"
|
||||
(pool1 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2)
|
||||
pool1 ! BadState
|
||||
|
|
|
|||
|
|
@ -5,7 +5,7 @@ import akka.actor._
|
|||
import akka.routing._
|
||||
import org.scalatest.WordSpec
|
||||
import org.scalatest.matchers.MustMatchers
|
||||
import akka.config.Supervision.OneForOneStrategy
|
||||
import akka.config.Supervision.OneForOnePermanentStrategy
|
||||
|
||||
class Ticket703Spec extends WordSpec with MustMatchers {
|
||||
|
||||
|
|
@ -28,7 +28,7 @@ class Ticket703Spec extends WordSpec with MustMatchers {
|
|||
self.tryReply("Response")
|
||||
}
|
||||
})
|
||||
}).withFaultHandler(OneForOneStrategy(List(classOf[Exception]), 5, 1000)))
|
||||
}).withFaultHandler(OneForOnePermanentStrategy(List(classOf[Exception]), 5, 1000)))
|
||||
(actorPool.?("Ping", 7000)).await.result must be === Some("Response")
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -591,10 +591,11 @@ trait Actor {
|
|||
* User overridable callback.
|
||||
* <p/>
|
||||
* Is called when a message isn't handled by the current behavior of the actor
|
||||
* by default it throws an UnhandledMessageException
|
||||
* by default it does: EventHandler.warning(self, message)
|
||||
*/
|
||||
def unhandled(msg: Any) {
|
||||
throw new UnhandledMessageException(msg, self)
|
||||
def unhandled(message: Any) {
|
||||
//EventHandler.warning(self, message)
|
||||
throw new UnhandledMessageException(message, self)
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
|||
|
|
@ -50,8 +50,7 @@ object Props {
|
|||
final val defaultDeployId: String = ""
|
||||
final val defaultDispatcher: MessageDispatcher = Dispatchers.defaultGlobalDispatcher
|
||||
final val defaultTimeout: Timeout = Timeout(Duration(Actor.TIMEOUT, "millis"))
|
||||
final val defaultLifeCycle: LifeCycle = Permanent
|
||||
final val defaultFaultHandler: FaultHandlingStrategy = NoFaultHandlingStrategy
|
||||
final val defaultFaultHandler: FaultHandlingStrategy = AllForOnePermanentStrategy(classOf[Exception] :: Nil, None, None)
|
||||
final val defaultSupervisor: Option[ActorRef] = None
|
||||
|
||||
/**
|
||||
|
|
@ -90,8 +89,7 @@ object Props {
|
|||
*/
|
||||
def apply(creator: Creator[_ <: Actor]): Props = default.withCreator(creator.create)
|
||||
|
||||
def apply(behavior: (ScalaActorRef with SelfActorRef) ⇒ Actor.Receive): Props =
|
||||
apply(new Actor { def receive = behavior(self) })
|
||||
def apply(behavior: (ScalaActorRef with SelfActorRef) ⇒ Actor.Receive): Props = apply(new Actor { def receive = behavior(self) })
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -101,7 +99,6 @@ case class Props(creator: () ⇒ Actor = Props.defaultCreator,
|
|||
deployId: String = Props.defaultDeployId,
|
||||
@transient dispatcher: MessageDispatcher = Props.defaultDispatcher,
|
||||
timeout: Timeout = Props.defaultTimeout,
|
||||
lifeCycle: LifeCycle = Props.defaultLifeCycle,
|
||||
faultHandler: FaultHandlingStrategy = Props.defaultFaultHandler,
|
||||
supervisor: Option[ActorRef] = Props.defaultSupervisor) {
|
||||
/**
|
||||
|
|
@ -113,7 +110,6 @@ case class Props(creator: () ⇒ Actor = Props.defaultCreator,
|
|||
deployId = Props.defaultDeployId,
|
||||
dispatcher = Props.defaultDispatcher,
|
||||
timeout = Props.defaultTimeout,
|
||||
lifeCycle = Props.defaultLifeCycle,
|
||||
faultHandler = Props.defaultFaultHandler,
|
||||
supervisor = Props.defaultSupervisor)
|
||||
|
||||
|
|
@ -147,12 +143,6 @@ case class Props(creator: () ⇒ Actor = Props.defaultCreator,
|
|||
*/
|
||||
def withTimeout(t: Timeout) = copy(timeout = t)
|
||||
|
||||
/**
|
||||
* Returns a new Props with the specified lifecycle set
|
||||
* Java API
|
||||
*/
|
||||
def withLifeCycle(l: LifeCycle) = copy(lifeCycle = l)
|
||||
|
||||
/**
|
||||
* Returns a new Props with the specified faulthandler set
|
||||
* Java API
|
||||
|
|
@ -232,11 +222,6 @@ abstract class ActorRef extends ActorRefShared with UntypedChannel with ReplyCha
|
|||
|
||||
protected[akka] def timeout: Long = Props.defaultTimeout.duration.toMillis //TODO Remove me if possible
|
||||
|
||||
/**
|
||||
* Defines the life-cycle for a supervised actor.
|
||||
*/
|
||||
protected[akka] def lifeCycle: LifeCycle = UndefinedLifeCycle //TODO Remove me if possible
|
||||
|
||||
/**
|
||||
* Akka Java API. <p/>
|
||||
* @see ask(message: AnyRef, sender: ActorRef): Future[_]
|
||||
|
|
@ -523,7 +508,6 @@ class LocalActorRef private[akka] (
|
|||
} //TODO Why is the guard needed here?
|
||||
|
||||
protected[akka] override def timeout: Long = props.timeout.duration.toMillis //TODO remove this if possible
|
||||
protected[akka] override def lifeCycle: LifeCycle = props.lifeCycle //TODO remove this if possible
|
||||
|
||||
private def serializer: Serializer = //TODO Is this used or needed?
|
||||
try { Serialization.serializerFor(this.getClass) } catch {
|
||||
|
|
@ -754,7 +738,6 @@ class LocalActorRef private[akka] (
|
|||
currentMessage = null // reset current message after successful invocation
|
||||
} catch {
|
||||
case e ⇒
|
||||
{
|
||||
EventHandler.error(e, this, e.getMessage)
|
||||
|
||||
//Prevent any further messages to be processed until the actor has been restarted
|
||||
|
|
@ -762,14 +745,7 @@ class LocalActorRef private[akka] (
|
|||
|
||||
channel.sendException(e)
|
||||
|
||||
if (supervisor.isDefined) notifySupervisorWithMessage(Death(this, e, true))
|
||||
else {
|
||||
lifeCycle match {
|
||||
case Temporary ⇒ shutDownTemporaryActor(this, e)
|
||||
case _ ⇒ dispatcher.resume(this) //Resume processing for this actor
|
||||
}
|
||||
}
|
||||
}
|
||||
if (supervisor.isDefined) supervisor.get ! Death(this, e, true) else dispatcher.resume(this)
|
||||
if (e.isInstanceOf[InterruptedException]) throw e //Re-throw InterruptedExceptions as expected
|
||||
} finally {
|
||||
checkReceiveTimeout // Reschedule receive timeout
|
||||
|
|
@ -790,12 +766,22 @@ class LocalActorRef private[akka] (
|
|||
|
||||
protected[akka] def handleDeath(death: Death) {
|
||||
props.faultHandler match {
|
||||
case AllForOneStrategy(trapExit, maxRetries, within) if trapExit.exists(_.isAssignableFrom(death.cause.getClass)) ⇒
|
||||
case AllForOnePermanentStrategy(trapExit, maxRetries, within) if trapExit.exists(_.isAssignableFrom(death.cause.getClass)) ⇒
|
||||
restartLinkedActors(death.cause, maxRetries, within)
|
||||
|
||||
case OneForOneStrategy(trapExit, maxRetries, within) if trapExit.exists(_.isAssignableFrom(death.cause.getClass)) ⇒
|
||||
case AllForOneTemporaryStrategy(trapExit) if trapExit.exists(_.isAssignableFrom(death.cause.getClass)) ⇒
|
||||
restartLinkedActors(death.cause, None, None)
|
||||
|
||||
case OneForOnePermanentStrategy(trapExit, maxRetries, within) if trapExit.exists(_.isAssignableFrom(death.cause.getClass)) ⇒
|
||||
death.deceased.restart(death.cause, maxRetries, within)
|
||||
|
||||
case OneForOneTemporaryStrategy(trapExit) if trapExit.exists(_.isAssignableFrom(death.cause.getClass)) ⇒
|
||||
unlink(death.deceased)
|
||||
death.deceased.stop()
|
||||
System.err.println("Do not restart: " + death.deceased)
|
||||
System.err.println("Notifying Supervisor: " + death.deceased + " of MaximumNORWTRR")
|
||||
this ! MaximumNumberOfRestartsWithinTimeRangeReached(death.deceased, None, None, death.cause)
|
||||
|
||||
case _ ⇒
|
||||
if (_supervisor.isDefined) throw death.cause else death.deceased.stop() //Escalate problem if not handled here
|
||||
}
|
||||
|
|
@ -848,24 +834,12 @@ class LocalActorRef private[akka] (
|
|||
if (Actor.debugLifecycle) EventHandler.debug(freshActor, "restarted")
|
||||
}
|
||||
|
||||
def tooManyRestarts() {
|
||||
notifySupervisorWithMessage(
|
||||
MaximumNumberOfRestartsWithinTimeRangeReached(this, maxNrOfRetries, withinTimeRange, reason))
|
||||
stop()
|
||||
}
|
||||
|
||||
@tailrec
|
||||
def attemptRestart() {
|
||||
val success = if (requestRestartPermission(maxNrOfRetries, withinTimeRange)) {
|
||||
guard.withGuard[Boolean] {
|
||||
_status = ActorRefInternals.BEING_RESTARTED
|
||||
|
||||
lifeCycle match {
|
||||
case Temporary ⇒
|
||||
shutDownTemporaryActor(this, reason)
|
||||
true
|
||||
|
||||
case _ ⇒ // either permanent or none where default is permanent
|
||||
val success =
|
||||
try {
|
||||
performRestart()
|
||||
|
|
@ -885,9 +859,11 @@ class LocalActorRef private[akka] (
|
|||
}
|
||||
success
|
||||
}
|
||||
}
|
||||
} else {
|
||||
tooManyRestarts()
|
||||
// tooManyRestarts
|
||||
if (supervisor.isDefined)
|
||||
supervisor.get ! MaximumNumberOfRestartsWithinTimeRangeReached(this, maxNrOfRetries, withinTimeRange, reason)
|
||||
stop()
|
||||
true // done
|
||||
}
|
||||
|
||||
|
|
@ -898,16 +874,29 @@ class LocalActorRef private[akka] (
|
|||
attemptRestart() // recur
|
||||
}
|
||||
|
||||
protected[akka] def restartLinkedActors(reason: Throwable, maxNrOfRetries: Option[Int], withinTimeRange: Option[Int]) = {
|
||||
protected[akka] def restartLinkedActors(reason: Throwable, maxNrOfRetries: Option[Int], withinTimeRange: Option[Int]) =
|
||||
props.faultHandler.lifeCycle match {
|
||||
case Temporary ⇒
|
||||
val i = _linkedActors.values.iterator
|
||||
while (i.hasNext) {
|
||||
val actorRef = i.next
|
||||
actorRef.lifeCycle match {
|
||||
// either permanent or none where default is permanent
|
||||
case Temporary ⇒ shutDownTemporaryActor(actorRef, reason)
|
||||
case _ ⇒ actorRef.restart(reason, maxNrOfRetries, withinTimeRange)
|
||||
val actorRef = i.next()
|
||||
|
||||
i.remove()
|
||||
|
||||
actorRef.stop()
|
||||
// when this comes down through the handleDeath path, we get here when the temp actor is restarted
|
||||
if (supervisor.isDefined) {
|
||||
supervisor.get ! MaximumNumberOfRestartsWithinTimeRangeReached(actorRef, Some(0), None, reason)
|
||||
|
||||
//FIXME if last temporary actor is gone, then unlink me from supervisor <-- should this exist?
|
||||
if (!i.hasNext)
|
||||
supervisor.get ! UnlinkAndStop(this)
|
||||
}
|
||||
}
|
||||
|
||||
case Permanent ⇒
|
||||
val i = _linkedActors.values.iterator
|
||||
while (i.hasNext) i.next().restart(reason, maxNrOfRetries, withinTimeRange)
|
||||
}
|
||||
|
||||
def linkedActors: JCollection[ActorRef] = java.util.Collections.unmodifiableCollection(_linkedActors.values)
|
||||
|
|
@ -943,21 +932,6 @@ class LocalActorRef private[akka] (
|
|||
case valid ⇒ valid
|
||||
}
|
||||
|
||||
private def shutDownTemporaryActor(temporaryActor: ActorRef, reason: Throwable) {
|
||||
temporaryActor.stop()
|
||||
_linkedActors.remove(temporaryActor.uuid) // remove the temporary actor
|
||||
// when this comes down through the handleDeath path, we get here when the temp actor is restarted
|
||||
notifySupervisorWithMessage(MaximumNumberOfRestartsWithinTimeRangeReached(temporaryActor, Some(0), None, reason))
|
||||
// if last temporary actor is gone, then unlink me from supervisor
|
||||
if (_linkedActors.isEmpty) notifySupervisorWithMessage(UnlinkAndStop(this))
|
||||
true
|
||||
}
|
||||
|
||||
private def notifySupervisorWithMessage(notification: LifeCycleMessage) {
|
||||
val sup = _supervisor
|
||||
if (sup.isDefined) sup.get ! notification
|
||||
}
|
||||
|
||||
private def setActorSelfFields(actor: Actor, value: ActorRef) {
|
||||
|
||||
@tailrec
|
||||
|
|
|
|||
|
|
@ -20,7 +20,7 @@ object Supervision {
|
|||
|
||||
abstract class Server extends ConfigElement
|
||||
sealed abstract class LifeCycle extends ConfigElement
|
||||
sealed abstract class FaultHandlingStrategy(val trapExit: List[Class[_ <: Throwable]]) extends ConfigElement
|
||||
sealed abstract class FaultHandlingStrategy(val trapExit: List[Class[_ <: Throwable]], val lifeCycle: LifeCycle) extends ConfigElement
|
||||
|
||||
case class SupervisorConfig(restartStrategy: FaultHandlingStrategy, worker: List[Server], maxRestartsHandler: (ActorRef, MaximumNumberOfRestartsWithinTimeRangeReached) ⇒ Unit = { (aRef, max) ⇒ () }) extends Server {
|
||||
//Java API
|
||||
|
|
@ -40,9 +40,9 @@ object Supervision {
|
|||
def unapply(supervise: Supervise) = Some((supervise.actorRef, supervise.lifeCycle, supervise.registerAsRemoteService))
|
||||
}
|
||||
|
||||
object AllForOneStrategy {
|
||||
def apply(trapExit: List[Class[_ <: Throwable]], maxNrOfRetries: Int, withinTimeRange: Int): AllForOneStrategy =
|
||||
new AllForOneStrategy(trapExit,
|
||||
object AllForOnePermanentStrategy {
|
||||
def apply(trapExit: List[Class[_ <: Throwable]], maxNrOfRetries: Int, withinTimeRange: Int): AllForOnePermanentStrategy =
|
||||
new AllForOnePermanentStrategy(trapExit,
|
||||
if (maxNrOfRetries < 0) None else Some(maxNrOfRetries), if (withinTimeRange < 0) None else Some(withinTimeRange))
|
||||
}
|
||||
|
||||
|
|
@ -52,9 +52,9 @@ object Supervision {
|
|||
* maxNrOfRetries = the number of times an actor is allowed to be restarted
|
||||
* withinTimeRange = millisecond time window for maxNrOfRetries, negative means no window
|
||||
*/
|
||||
case class AllForOneStrategy(override val trapExit: List[Class[_ <: Throwable]],
|
||||
case class AllForOnePermanentStrategy(override val trapExit: List[Class[_ <: Throwable]],
|
||||
maxNrOfRetries: Option[Int] = None,
|
||||
withinTimeRange: Option[Int] = None) extends FaultHandlingStrategy(trapExit) {
|
||||
withinTimeRange: Option[Int] = None) extends FaultHandlingStrategy(trapExit, Permanent) {
|
||||
def this(trapExit: List[Class[_ <: Throwable]], maxNrOfRetries: Int, withinTimeRange: Int) =
|
||||
this(trapExit,
|
||||
if (maxNrOfRetries < 0) None else Some(maxNrOfRetries), if (withinTimeRange < 0) None else Some(withinTimeRange))
|
||||
|
|
@ -68,9 +68,11 @@ object Supervision {
|
|||
if (maxNrOfRetries < 0) None else Some(maxNrOfRetries), if (withinTimeRange < 0) None else Some(withinTimeRange))
|
||||
}
|
||||
|
||||
object OneForOneStrategy {
|
||||
def apply(trapExit: List[Class[_ <: Throwable]], maxNrOfRetries: Int, withinTimeRange: Int): OneForOneStrategy =
|
||||
new OneForOneStrategy(trapExit,
|
||||
case class AllForOneTemporaryStrategy(override val trapExit: List[Class[_ <: Throwable]]) extends FaultHandlingStrategy(trapExit, Temporary)
|
||||
|
||||
object OneForOnePermanentStrategy {
|
||||
def apply(trapExit: List[Class[_ <: Throwable]], maxNrOfRetries: Int, withinTimeRange: Int): OneForOnePermanentStrategy =
|
||||
new OneForOnePermanentStrategy(trapExit,
|
||||
if (maxNrOfRetries < 0) None else Some(maxNrOfRetries), if (withinTimeRange < 0) None else Some(withinTimeRange))
|
||||
}
|
||||
|
||||
|
|
@ -80,9 +82,9 @@ object Supervision {
|
|||
* maxNrOfRetries = the number of times an actor is allowed to be restarted
|
||||
* withinTimeRange = millisecond time window for maxNrOfRetries, negative means no window
|
||||
*/
|
||||
case class OneForOneStrategy(override val trapExit: List[Class[_ <: Throwable]],
|
||||
case class OneForOnePermanentStrategy(override val trapExit: List[Class[_ <: Throwable]],
|
||||
maxNrOfRetries: Option[Int] = None,
|
||||
withinTimeRange: Option[Int] = None) extends FaultHandlingStrategy(trapExit) {
|
||||
withinTimeRange: Option[Int] = None) extends FaultHandlingStrategy(trapExit, Permanent) {
|
||||
def this(trapExit: List[Class[_ <: Throwable]], maxNrOfRetries: Int, withinTimeRange: Int) =
|
||||
this(trapExit,
|
||||
if (maxNrOfRetries < 0) None else Some(maxNrOfRetries), if (withinTimeRange < 0) None else Some(withinTimeRange))
|
||||
|
|
@ -96,20 +98,15 @@ object Supervision {
|
|||
if (maxNrOfRetries < 0) None else Some(maxNrOfRetries), if (withinTimeRange < 0) None else Some(withinTimeRange))
|
||||
}
|
||||
|
||||
case object NoFaultHandlingStrategy extends FaultHandlingStrategy(Nil)
|
||||
case class OneForOneTemporaryStrategy(override val trapExit: List[Class[_ <: Throwable]]) extends FaultHandlingStrategy(trapExit, Temporary)
|
||||
|
||||
//Scala API
|
||||
case object Permanent extends LifeCycle
|
||||
case object Temporary extends LifeCycle
|
||||
case object UndefinedLifeCycle extends LifeCycle
|
||||
|
||||
//Java API (& Scala if you fancy)
|
||||
def permanent(): LifeCycle = Permanent
|
||||
def temporary(): LifeCycle = Temporary
|
||||
def undefinedLifeCycle(): LifeCycle = UndefinedLifeCycle
|
||||
|
||||
//Java API
|
||||
def noFaultHandlingStrategy = NoFaultHandlingStrategy
|
||||
|
||||
case class SuperviseTypedActor(_intf: Class[_],
|
||||
val target: Class[_],
|
||||
|
|
|
|||
|
|
@ -2,21 +2,99 @@
|
|||
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
|
||||
package akka.event
|
||||
/*package akka.event
|
||||
|
||||
import akka.actor.{ Death, LocalActorRef, ActorRef }
|
||||
import akka.actor.{ Death, ActorRef }
|
||||
import akka.config.Supervision.{ FaultHandlingStrategy }
|
||||
import java.util.concurrent.ConcurrentHashMap
|
||||
import java.util.concurrent.locks.ReentrantReadWriteLock
|
||||
|
||||
trait DeathWatch {
|
||||
def signal(death: Death): Unit
|
||||
}
|
||||
|
||||
class StupidInVMDeathWatchImpl extends DeathWatch {
|
||||
def signal(death: Death) {
|
||||
death match {
|
||||
case c @ Death(victim: LocalActorRef, _, _) if victim.supervisor.isDefined ⇒
|
||||
victim.supervisor.get ! c
|
||||
object Supervision {
|
||||
case class ActiveEntry(monitoring: Vector[ActorRef] = Vector(), supervising: Vector[ActorRef] = Vector(), strategy: FaultHandlingStrategy)
|
||||
case class PassiveEntry(monitors: Vector[ActorRef] = Vector(), supervisor: Option[ActorRef] = None)
|
||||
}
|
||||
|
||||
case other ⇒ EventHandler.debug(this, "No supervisor or not a local actor reference: " + other)
|
||||
trait Supervision { self: DeathWatch =>
|
||||
|
||||
import Supervision._
|
||||
|
||||
val guard = new ReentrantReadWriteLock
|
||||
val read = guard.readLock()
|
||||
val write = guard.writeLock()
|
||||
|
||||
val activeEntries = new ConcurrentHashMap[ActorRef, ActiveEntry](1024)
|
||||
val passiveEntries = new ConcurrentHashMap[ActorRef, PassiveEntry](1024)
|
||||
|
||||
def registerMonitorable(monitor: ActorRef, monitorsSupervisor: Option[ActorRef], faultHandlingStrategy: FaultHandlingStrategy): Unit = {
|
||||
read.lock()
|
||||
try {
|
||||
activeEntries.putIfAbsent(monitor, ActiveEntry(strategy = faultHandlingStrategy))
|
||||
passiveEntries.putIfAbsent(monitor, PassiveEntry(supervisor = monitorsSupervisor))
|
||||
} finally {
|
||||
read.unlock()
|
||||
}
|
||||
}
|
||||
|
||||
def deregisterMonitorable(monitor: ActorRef): Unit = {
|
||||
read.lock()
|
||||
try {
|
||||
activeEntries.remove(monitor)
|
||||
passiveEntries.remove(monitor)
|
||||
} finally {
|
||||
read.unlock()
|
||||
}
|
||||
}
|
||||
|
||||
def startMonitoring(monitor: ActorRef, monitored: ActorRef): ActorRef = {
|
||||
def addActiveEntry(): ActorRef =
|
||||
activeEntries.get(monitor) match {
|
||||
case null => null//He's stopped or not started, which is unlikely
|
||||
case entry =>
|
||||
val updated = entry.copy(monitoring = entry.monitoring :+ monitored)
|
||||
if (activeEntries.replace(monitor, entry, updated))
|
||||
monitored
|
||||
else
|
||||
addActiveEntry()
|
||||
}
|
||||
|
||||
def addPassiveEntry(): ActorRef =
|
||||
activeEntries.get(monitored) match {
|
||||
case null => null//The thing we're trying to monitor isn't registered, abort
|
||||
case _ =>
|
||||
passiveEntries.get(monitored) match {
|
||||
case null =>
|
||||
passiveEntries.putIfAbsent(monitored, PassiveEntry(monitors = Vector(monitor))) match {
|
||||
case null => monitored//All good
|
||||
case _ => addPassiveEntry()
|
||||
}
|
||||
|
||||
case existing =>
|
||||
val updated = existing.copy(monitors = existing.monitors :+ monitor)
|
||||
if (passiveEntries.replace(monitored, existing, updated))
|
||||
monitored
|
||||
else
|
||||
addPassiveEntry()
|
||||
}
|
||||
}
|
||||
|
||||
read.lock()
|
||||
try {
|
||||
addActiveEntry()
|
||||
addPassiveEntry()
|
||||
} finally {
|
||||
read.unlock()
|
||||
}
|
||||
}
|
||||
|
||||
def stopMonitoring(monitor: ActorRef, monitored: ActorRef, strategy: FaultHandlingStrategy, supervise: Boolean): ActorRef = {
|
||||
monitored
|
||||
}
|
||||
}
|
||||
|
||||
class Scenarios {
|
||||
""
|
||||
}*/
|
||||
|
|
@ -178,7 +178,7 @@ class ConsumerScalaTest extends WordSpec with BeforeAndAfterAll with MustMatcher
|
|||
val consumer = Actor.actorOf(new SupervisedConsumer("reply-channel-test-2"))
|
||||
val supervisor = Supervisor(
|
||||
SupervisorConfig(
|
||||
OneForOneStrategy(List(classOf[Exception]), 2, 10000),
|
||||
OneForOnePermanentStrategy(List(classOf[Exception]), 2, 10000),
|
||||
Supervise(consumer, Permanent) :: Nil))
|
||||
|
||||
val latch = new CountDownLatch(1)
|
||||
|
|
@ -189,10 +189,10 @@ class ConsumerScalaTest extends WordSpec with BeforeAndAfterAll with MustMatcher
|
|||
}
|
||||
|
||||
"be able to reply on failure during postStop" in {
|
||||
val consumer = Actor.actorOf(Props(new SupervisedConsumer("reply-channel-test-3")).withLifeCycle(Temporary))
|
||||
val consumer = Actor.actorOf(Props(new SupervisedConsumer("reply-channel-test-3")))
|
||||
val supervisor = Supervisor(
|
||||
SupervisorConfig(
|
||||
OneForOneStrategy(List(classOf[Exception]), 2, 10000),
|
||||
OneForOneTemporaryStrategy(List(classOf[Exception])),
|
||||
Supervise(consumer, Temporary) :: Nil))
|
||||
|
||||
val latch = new CountDownLatch(1)
|
||||
|
|
|
|||
|
|
@ -291,7 +291,7 @@ class DefaultClusterNode private[akka] (
|
|||
|
||||
private[cluster] lazy val remoteDaemonSupervisor = Supervisor(
|
||||
SupervisorConfig(
|
||||
OneForOneStrategy(List(classOf[Exception]), Int.MaxValue, Int.MaxValue), // is infinite restart what we want?
|
||||
OneForOnePermanentStrategy(List(classOf[Exception]), Int.MaxValue, Int.MaxValue), // is infinite restart what we want?
|
||||
Supervise(
|
||||
remoteDaemon,
|
||||
Permanent)
|
||||
|
|
|
|||
|
|
@ -81,7 +81,7 @@ Here is an example of how to define a restart strategy:
|
|||
|
||||
.. code-block:: java
|
||||
|
||||
new AllForOneStrategy( //Or OneForOneStrategy
|
||||
new AllForOnePermanentStrategy( //Or OneForOnePermanentStrategy
|
||||
new Class[]{ Exception.class }, //List of Exceptions/Throwables to handle
|
||||
3, // maximum number of restart retries
|
||||
5000 // within time in millis
|
||||
|
|
@ -119,7 +119,7 @@ The Actor’s supervision can be declaratively defined by creating a ‘Supervis
|
|||
|
||||
Supervisor supervisor = Supervisor.apply(
|
||||
new SupervisorConfig(
|
||||
new AllForOneStrategy(new Class[]{Exception.class}, 3, 5000),
|
||||
new AllForOnePermanentStrategy(new Class[]{Exception.class}, 3, 5000),
|
||||
new Supervise[] {
|
||||
new Supervise(
|
||||
actorOf(MyActor1.class),
|
||||
|
|
@ -151,7 +151,7 @@ MaximumNumberOfRestartsWithinTimeRangeReached message.
|
|||
|
||||
Supervisor supervisor = Supervisor.apply(
|
||||
new SupervisorConfig(
|
||||
new AllForOneStrategy(new Class[]{Exception.class}, 3, 5000),
|
||||
new AllForOnePermanentStrategy(new Class[]{Exception.class}, 3, 5000),
|
||||
new Supervise[] {
|
||||
new Supervise(
|
||||
actorOf(MyActor1.class),
|
||||
|
|
@ -180,7 +180,7 @@ Example usage:
|
|||
|
||||
SupervisorFactory factory = new SupervisorFactory(
|
||||
new SupervisorConfig(
|
||||
new OneForOneStrategy(new Class[]{Exception.class}, 3, 5000),
|
||||
new OneForOnePermanentStrategy(new Class[]{Exception.class}, 3, 5000),
|
||||
new Supervise[] {
|
||||
new Supervise(
|
||||
actorOf(MyActor1.class),
|
||||
|
|
@ -211,7 +211,7 @@ Here is an example:
|
|||
|
||||
Supervisor supervisor = Supervisor.apply(
|
||||
new SupervisorConfig(
|
||||
new AllForOneStrategy(new Class[]{Exception.class}, 3, 5000),
|
||||
new AllForOnePermanentStrategy(new Class[]{Exception.class}, 3, 5000),
|
||||
new Supervise[] {
|
||||
new Supervise(
|
||||
actorOf(MyActor1.class),
|
||||
|
|
@ -257,11 +257,11 @@ The supervising Actor also needs to define a fault handler that defines the rest
|
|||
|
||||
The different options are:
|
||||
|
||||
- AllForOneStrategy(trapExit, maxNrOfRetries, withinTimeRange)
|
||||
- AllForOnePermanentStrategy(trapExit, maxNrOfRetries, withinTimeRange)
|
||||
|
||||
- trapExit is an Array of classes inheriting from Throwable, they signal which types of exceptions this actor will handle
|
||||
|
||||
- OneForOneStrategy(trapExit, maxNrOfRetries, withinTimeRange)
|
||||
- OneForOnePermanentStrategy(trapExit, maxNrOfRetries, withinTimeRange)
|
||||
|
||||
- trapExit is an Array of classes inheriting from Throwable, they signal which types of exceptions this actor will handle
|
||||
|
||||
|
|
@ -269,7 +269,7 @@ Here is an example:
|
|||
|
||||
.. code-block:: java
|
||||
|
||||
getContext().setFaultHandler(new AllForOneStrategy(new Class[]{MyException.class, IOException.class}, 3, 1000));
|
||||
getContext().setFaultHandler(new AllForOnePermanentStrategy(new Class[]{MyException.class, IOException.class}, 3, 1000));
|
||||
|
||||
Putting all this together it can look something like this:
|
||||
|
||||
|
|
@ -277,7 +277,7 @@ Putting all this together it can look something like this:
|
|||
|
||||
class MySupervisor extends UntypedActor {
|
||||
public MySupervisor() {
|
||||
getContext().setFaultHandler(new AllForOneStrategy(new Class[]{MyException.class, IOException.class}, 3, 1000));
|
||||
getContext().setFaultHandler(new AllForOnePermanentStrategy(new Class[]{MyException.class, IOException.class}, 3, 1000));
|
||||
}
|
||||
|
||||
public void onReceive(Object message) throws Exception {
|
||||
|
|
@ -362,7 +362,7 @@ If you remember, when you define the 'RestartStrategy' you also defined maximum
|
|||
|
||||
.. code-block:: java
|
||||
|
||||
new AllForOneStrategy( // FaultHandlingStrategy policy (AllForOneStrategy or OneForOneStrategy)
|
||||
new AllForOnePermanentStrategy( // FaultHandlingStrategy policy (AllForOnePermanentStrategy or OneForOnePermanentStrategy)
|
||||
new Class[]{MyException.class, IOException.class}, //What types of errors will be handled
|
||||
3, // maximum number of restart retries
|
||||
5000 // within time in millis
|
||||
|
|
@ -425,7 +425,7 @@ Here is an example:
|
|||
TypedActorConfigurator manager = new TypedActorConfigurator();
|
||||
|
||||
manager.configure(
|
||||
new AllForOneStrategy(new Class[]{Exception.class}, 3, 1000),
|
||||
new AllForOnePermanentStrategy(new Class[]{Exception.class}, 3, 1000),
|
||||
new SuperviseTypedActor[] {
|
||||
new SuperviseTypedActor(
|
||||
Foo.class,
|
||||
|
|
@ -480,7 +480,7 @@ If the parent TypedActor (supervisor) wants to be able to do handle failing chil
|
|||
|
||||
.. code-block:: java
|
||||
|
||||
TypedActor.faultHandler(supervisor, new AllForOneStrategy(new Class[]{IOException.class}, 3, 2000));
|
||||
TypedActor.faultHandler(supervisor, new AllForOnePermanentStrategy(new Class[]{IOException.class}, 3, 2000));
|
||||
|
||||
For convenience there is an overloaded link that takes trapExit and faultHandler for the supervisor as arguments. Here is an example:
|
||||
|
||||
|
|
@ -492,9 +492,9 @@ For convenience there is an overloaded link that takes trapExit and faultHandler
|
|||
foo = newInstance(Foo.class, FooImpl.class, 1000);
|
||||
bar = newInstance(Bar.class, BarImpl.class, 1000);
|
||||
|
||||
link(foo, bar, new AllForOneStrategy(new Class[]{IOException.class}, 3, 2000));
|
||||
link(foo, bar, new AllForOnePermanentStrategy(new Class[]{IOException.class}, 3, 2000));
|
||||
|
||||
// alternative: chaining
|
||||
bar = faultHandler(foo, new AllForOneStrategy(new Class[]{IOException.class}, 3, 2000)).newInstance(Bar.class, 1000);
|
||||
bar = faultHandler(foo, new AllForOnePermanentStrategy(new Class[]{IOException.class}, 3, 2000)).newInstance(Bar.class, 1000);
|
||||
|
||||
link(foo, bar);
|
||||
|
|
|
|||
|
|
@ -20,7 +20,7 @@ Here is an example:
|
|||
TypedActorConfigurator manager = new TypedActorConfigurator();
|
||||
|
||||
manager.configure(
|
||||
new AllForOneStrategy(new Class[]{Exception.class}, 3, 1000),
|
||||
new AllForOnePermanentStrategy(new Class[]{Exception.class}, 3, 1000),
|
||||
new SuperviseTypedActor[] {
|
||||
new SuperviseTypedActor(
|
||||
Foo.class,
|
||||
|
|
|
|||
|
|
@ -177,7 +177,7 @@ to FaultHandlingStrategy:
|
|||
|
||||
import akka.config.Supervision._
|
||||
|
||||
self.faultHandler = OneForOneStrategy(List(classOf[Exception]), 3, 5000)
|
||||
self.faultHandler = OneForOnePermanentStrategy(List(classOf[Exception]), 3, 5000)
|
||||
|
||||
**Java**
|
||||
|
||||
|
|
@ -185,9 +185,9 @@ to FaultHandlingStrategy:
|
|||
|
||||
import static akka.Supervision.*;
|
||||
|
||||
getContext().setFaultHandler(new OneForOneStrategy(new Class[] { Exception.class },50,1000))
|
||||
getContext().setFaultHandler(new OneForOnePermanentStrategy(new Class[] { Exception.class },50,1000))
|
||||
|
||||
**RestartStrategy, AllForOne, OneForOne** have been replaced with **AllForOneStrategy** and **OneForOneStrategy** in **se.scalablesolutions.akka.config.Supervision**
|
||||
**RestartStrategy, AllForOne, OneForOne** have been replaced with **AllForOnePermanentStrategy** and **OneForOnePermanentStrategy** in **se.scalablesolutions.akka.config.Supervision**
|
||||
|
||||
**Scala**
|
||||
|
||||
|
|
@ -195,7 +195,7 @@ to FaultHandlingStrategy:
|
|||
|
||||
import akka.config.Supervision._
|
||||
SupervisorConfig(
|
||||
OneForOneStrategy(List(classOf[Exception]), 3, 5000),
|
||||
OneForOnePermanentStrategy(List(classOf[Exception]), 3, 5000),
|
||||
Supervise(pingpong1,Permanent) :: Nil
|
||||
)
|
||||
|
||||
|
|
@ -206,7 +206,7 @@ to FaultHandlingStrategy:
|
|||
import static akka.Supervision.*;
|
||||
|
||||
new SupervisorConfig(
|
||||
new OneForOneStrategy(new Class[] { Exception.class },50,1000),
|
||||
new OneForOnePermanentStrategy(new Class[] { Exception.class },50,1000),
|
||||
new Server[] { new Supervise(pingpong1, permanent()) }
|
||||
)
|
||||
|
||||
|
|
|
|||
|
|
@ -82,7 +82,7 @@ Here is an example of how to define a restart strategy:
|
|||
|
||||
.. code-block:: scala
|
||||
|
||||
AllForOneStrategy( //FaultHandlingStrategy; AllForOneStrategy or OneForOneStrategy
|
||||
AllForOnePermanentStrategy( //FaultHandlingStrategy; AllForOnePermanentStrategy or OneForOnePermanentStrategy
|
||||
List(classOf[Exception]), //What exceptions will be handled
|
||||
3, // maximum number of restart retries
|
||||
5000 // within time in millis
|
||||
|
|
@ -116,7 +116,7 @@ The Actor's supervision can be declaratively defined by creating a "Supervisor'
|
|||
|
||||
val supervisor = Supervisor(
|
||||
SupervisorConfig(
|
||||
AllForOneStrategy(List(classOf[Exception]), 3, 1000),
|
||||
AllForOnePermanentStrategy(List(classOf[Exception]), 3, 1000),
|
||||
Supervise(
|
||||
actorOf[MyActor1],
|
||||
Permanent) ::
|
||||
|
|
@ -140,7 +140,7 @@ MaximumNumberOfRestartsWithinTimeRangeReached message.
|
|||
|
||||
val supervisor = Supervisor(
|
||||
SupervisorConfig(
|
||||
AllForOneStrategy(List(classOf[Exception]), 3, 1000),
|
||||
AllForOnePermanentStrategy(List(classOf[Exception]), 3, 1000),
|
||||
Supervise(
|
||||
actorOf[MyActor1],
|
||||
Permanent) ::
|
||||
|
|
@ -166,7 +166,7 @@ Example usage:
|
|||
|
||||
val factory = SupervisorFactory(
|
||||
SupervisorConfig(
|
||||
OneForOneStrategy(List(classOf[Exception]), 3, 10),
|
||||
OneForOnePermanentStrategy(List(classOf[Exception]), 3, 10),
|
||||
Supervise(
|
||||
myFirstActor,
|
||||
Permanent) ::
|
||||
|
|
@ -193,7 +193,7 @@ Here is an example:
|
|||
|
||||
val supervisor = Supervisor(
|
||||
SupervisorConfig(
|
||||
AllForOneStrategy(List(classOf[Exception]), 3, 1000),
|
||||
AllForOnePermanentStrategy(List(classOf[Exception]), 3, 1000),
|
||||
Supervise(
|
||||
actorOf[MyActor1],
|
||||
Permanent,
|
||||
|
|
@ -247,11 +247,11 @@ The supervising Actor also needs to define a fault handler that defines the rest
|
|||
|
||||
The different options are:
|
||||
|
||||
- AllForOneStrategy(trapExit, maxNrOfRetries, withinTimeRange)
|
||||
- AllForOnePermanentStrategy(trapExit, maxNrOfRetries, withinTimeRange)
|
||||
|
||||
- trapExit is a List or Array of classes inheriting from Throwable, they signal which types of exceptions this actor will handle
|
||||
|
||||
- OneForOneStrategy(trapExit, maxNrOfRetries, withinTimeRange)
|
||||
- OneForOnePermanentStrategy(trapExit, maxNrOfRetries, withinTimeRange)
|
||||
|
||||
- trapExit is a List or Array of classes inheriting from Throwable, they signal which types of exceptions this actor will handle
|
||||
|
||||
|
|
@ -259,14 +259,14 @@ Here is an example:
|
|||
|
||||
.. code-block:: scala
|
||||
|
||||
self.faultHandler = AllForOneStrategy(List(classOf[Throwable]), 3, 1000)
|
||||
self.faultHandler = AllForOnePermanentStrategy(List(classOf[Throwable]), 3, 1000)
|
||||
|
||||
Putting all this together it can look something like this:
|
||||
|
||||
.. code-block:: scala
|
||||
|
||||
class MySupervisor extends Actor {
|
||||
self.faultHandler = OneForOneStrategy(List(classOf[Throwable]), 5, 5000)
|
||||
self.faultHandler = OneForOnePermanentStrategy(List(classOf[Throwable]), 5, 5000)
|
||||
|
||||
def receive = {
|
||||
case Register(actor) =>
|
||||
|
|
@ -340,7 +340,7 @@ If you remember, when you define the 'RestartStrategy' you also defined maximum
|
|||
|
||||
.. code-block:: scala
|
||||
|
||||
AllForOneStrategy( //Restart policy, AllForOneStrategy or OneForOneStrategy
|
||||
AllForOnePermanentStrategy( //Restart policy, AllForOnePermanentStrategy or OneForOnePermanentStrategy
|
||||
List(classOf[Exception]), //What kinds of exception it will handle
|
||||
3, // maximum number of restart retries
|
||||
5000 // within time in millis
|
||||
|
|
@ -362,7 +362,7 @@ Here is an example:
|
|||
.. code-block:: scala
|
||||
|
||||
val supervisor = actorOf(new Actor{
|
||||
self.faultHandler = OneForOneStrategy(List(classOf[Throwable]), 5, 5000)
|
||||
self.faultHandler = OneForOnePermanentStrategy(List(classOf[Throwable]), 5, 5000)
|
||||
protected def receive = {
|
||||
case MaximumNumberOfRestartsWithinTimeRangeReached(
|
||||
victimActorRef, maxNrOfRetries, withinTimeRange, lastExceptionCausingRestart) =>
|
||||
|
|
@ -397,7 +397,7 @@ Here is an example:
|
|||
val manager = new TypedActorConfigurator
|
||||
|
||||
manager.configure(
|
||||
AllForOneStrategy(List(classOf[Exception]), 3, 1000),
|
||||
AllForOnePermanentStrategy(List(classOf[Exception]), 3, 1000),
|
||||
List(
|
||||
SuperviseTypedActor(
|
||||
Foo.class,
|
||||
|
|
@ -435,7 +435,7 @@ If the parent TypedActor (supervisor) wants to be able to do handle failing chil
|
|||
|
||||
.. code-block:: scala
|
||||
|
||||
TypedActor.faultHandler(supervisor, AllForOneStrategy(Array(classOf[IOException]), 3, 2000))
|
||||
TypedActor.faultHandler(supervisor, AllForOnePermanentStrategy(Array(classOf[IOException]), 3, 2000))
|
||||
|
||||
For convenience there is an overloaded link that takes trapExit and faultHandler for the supervisor as arguments. Here is an example:
|
||||
|
||||
|
|
@ -446,10 +446,10 @@ For convenience there is an overloaded link that takes trapExit and faultHandler
|
|||
val foo = newInstance(classOf[Foo], 1000)
|
||||
val bar = newInstance(classOf[Bar], 1000)
|
||||
|
||||
link(foo, bar, new AllForOneStrategy(Array(classOf[IOException]), 3, 2000))
|
||||
link(foo, bar, new AllForOnePermanentStrategy(Array(classOf[IOException]), 3, 2000))
|
||||
|
||||
// alternative: chaining
|
||||
bar = faultHandler(foo, new AllForOneStrategy(Array(classOf[IOException]), 3, 2000))
|
||||
bar = faultHandler(foo, new AllForOnePermanentStrategy(Array(classOf[IOException]), 3, 2000))
|
||||
.newInstance(Bar.class, 1000)
|
||||
|
||||
link(foo, bar
|
||||
|
|
|
|||
|
|
@ -160,7 +160,7 @@ In this example, we'll use the built-in *RootEndpoint* class and implement our o
|
|||
class Boot {
|
||||
val factory = SupervisorFactory(
|
||||
SupervisorConfig(
|
||||
OneForOneStrategy(List(classOf[Exception]), 3, 100),
|
||||
OneForOnePermanentStrategy(List(classOf[Exception]), 3, 100),
|
||||
//
|
||||
// in this particular case, just boot the built-in default root endpoint
|
||||
//
|
||||
|
|
|
|||
|
|
@ -270,7 +270,7 @@ I'll try to show you how we can make use Scala's mixins to decouple the Actor im
|
|||
* Chat server. Manages sessions and redirects all other messages to the Session for the client.
|
||||
*/
|
||||
trait ChatServer extends Actor {
|
||||
self.faultHandler = OneForOneStrategy(List(classOf[Exception]),5, 5000)
|
||||
self.faultHandler = OneForOnePermanentStrategy(List(classOf[Exception]),5, 5000)
|
||||
val storage: ActorRef
|
||||
|
||||
EventHandler.info(this, "Chat server is starting up...")
|
||||
|
|
|
|||
|
|
@ -66,13 +66,12 @@ object ActorSerialization {
|
|||
case _ ⇒ None
|
||||
}
|
||||
|
||||
val lifeCycleProtocol: Option[LifeCycleProtocol] = {
|
||||
val lifeCycleProtocol: Option[LifeCycleProtocol] = None /*{
|
||||
actorRef.lifeCycle match {
|
||||
case Permanent ⇒ Some(LifeCycleProtocol.newBuilder.setLifeCycle(LifeCycleType.PERMANENT).build)
|
||||
case Temporary ⇒ Some(LifeCycleProtocol.newBuilder.setLifeCycle(LifeCycleType.TEMPORARY).build)
|
||||
case UndefinedLifeCycle ⇒ None //No need to send the undefined lifecycle over the wire //builder.setLifeCycle(LifeCycleType.UNDEFINED)
|
||||
}
|
||||
}
|
||||
}*/
|
||||
|
||||
val builder = SerializedActorRefProtocol.newBuilder
|
||||
.setUuid(UuidProtocol.newBuilder.setHigh(actorRef.uuid.getTime).setLow(actorRef.uuid.getClockSeqAndNode).build)
|
||||
|
|
@ -199,9 +198,8 @@ object ActorSerialization {
|
|||
protocol.getLifeCycle.getLifeCycle match {
|
||||
case LifeCycleType.PERMANENT ⇒ Permanent
|
||||
case LifeCycleType.TEMPORARY ⇒ Temporary
|
||||
case unknown ⇒ UndefinedLifeCycle
|
||||
}
|
||||
} else UndefinedLifeCycle
|
||||
} else LifeCycleType.PERMANENT
|
||||
|
||||
val storedSupervisor =
|
||||
if (protocol.hasSupervisor) Some(RemoteActorSerialization.fromProtobufToRemoteActorRef(protocol.getSupervisor, loader))
|
||||
|
|
@ -224,7 +222,6 @@ object ActorSerialization {
|
|||
|
||||
val props = Props(creator = factory,
|
||||
timeout = if (protocol.hasTimeout) protocol.getTimeout else Timeout.default,
|
||||
lifeCycle = storedLifeCycle,
|
||||
supervisor = storedSupervisor //TODO what dispatcher should it use?
|
||||
//TODO what faultHandler should it use?
|
||||
//
|
||||
|
|
|
|||
|
|
@ -8,7 +8,7 @@
|
|||
|
||||
import akka.actor.{Actor, ActorRef, Props}
|
||||
import akka.stm._
|
||||
import akka.config.Supervision.{OneForOneStrategy,Permanent}
|
||||
import akka.config.Supervision.{OneForOnePermanentStrategy,Permanent}
|
||||
import Actor._
|
||||
import akka.event.EventHandler
|
||||
|
||||
|
|
@ -176,7 +176,7 @@
|
|||
* Chat server. Manages sessions and redirects all other messages to the Session for the client.
|
||||
*/
|
||||
trait ChatServer extends Actor {
|
||||
self.faultHandler = OneForOneStrategy(List(classOf[Exception]),5, 5000)
|
||||
self.faultHandler = OneForOnePermanentStrategy(List(classOf[Exception]),5, 5000)
|
||||
val storage: ActorRef
|
||||
|
||||
EventHandler.info(this, "Chat server is starting up...")
|
||||
|
|
|
|||
|
|
@ -12,7 +12,7 @@ class Boot {
|
|||
val factory =
|
||||
SupervisorFactory(
|
||||
SupervisorConfig(
|
||||
OneForOneStrategy(List(classOf[Exception]), 3, 100),
|
||||
OneForOnePermanentStrategy(List(classOf[Exception]), 3, 100),
|
||||
Supervise(Actor.actorOf[RootEndpoint], Permanent) ::
|
||||
Supervise(Actor.actorOf[HelloEndpoint], Permanent) :: Nil))
|
||||
|
||||
|
|
|
|||
|
|
@ -51,9 +51,9 @@ class SupervisionBeanDefinitionParser extends AbstractSingleBeanDefinitionParser
|
|||
val trapExceptions = parseTrapExits(trapExitsElement)
|
||||
|
||||
val restartStrategy = failover match {
|
||||
case "AllForOne" ⇒ new AllForOneStrategy(trapExceptions, retries, timeRange)
|
||||
case "OneForOne" ⇒ new OneForOneStrategy(trapExceptions, retries, timeRange)
|
||||
case _ ⇒ new OneForOneStrategy(trapExceptions, retries, timeRange) //Default to OneForOne
|
||||
case "AllForOne" ⇒ new AllForOnePermanentStrategy(trapExceptions, retries, timeRange)
|
||||
case "OneForOne" ⇒ new OneForOnePermanentStrategy(trapExceptions, retries, timeRange)
|
||||
case _ ⇒ new OneForOnePermanentStrategy(trapExceptions, retries, timeRange) //Default to OneForOne
|
||||
}
|
||||
builder.addPropertyValue("restartStrategy", restartStrategy)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -11,7 +11,7 @@ import ScalaDom._
|
|||
|
||||
import org.w3c.dom.Element
|
||||
import org.springframework.beans.factory.support.BeanDefinitionBuilder
|
||||
import akka.config.Supervision.{ FaultHandlingStrategy, AllForOneStrategy }
|
||||
import akka.config.Supervision.{ FaultHandlingStrategy, AllForOnePermanentStrategy }
|
||||
|
||||
/**
|
||||
* Test for SupervisionBeanDefinitionParser
|
||||
|
|
@ -36,9 +36,9 @@ class SupervisionBeanDefinitionParserTest extends Spec with ShouldMatchers {
|
|||
parser.parseSupervisor(createSupervisorElement, builder);
|
||||
val strategy = builder.getBeanDefinition.getPropertyValues.getPropertyValue("restartStrategy").getValue.asInstanceOf[FaultHandlingStrategy]
|
||||
assert(strategy ne null)
|
||||
assert(strategy.isInstanceOf[AllForOneStrategy])
|
||||
expect(3) { strategy.asInstanceOf[AllForOneStrategy].maxNrOfRetries.get }
|
||||
expect(1000) { strategy.asInstanceOf[AllForOneStrategy].withinTimeRange.get }
|
||||
assert(strategy.isInstanceOf[AllForOnePermanentStrategy])
|
||||
expect(3) { strategy.asInstanceOf[AllForOnePermanentStrategy].maxNrOfRetries.get }
|
||||
expect(1000) { strategy.asInstanceOf[AllForOnePermanentStrategy].withinTimeRange.get }
|
||||
}
|
||||
|
||||
it("should parse the supervised typed actors") {
|
||||
|
|
|
|||
|
|
@ -15,7 +15,7 @@ private[akka] class Foo
|
|||
@RunWith(classOf[JUnitRunner])
|
||||
class SupervisionFactoryBeanTest extends Spec with ShouldMatchers {
|
||||
|
||||
val faultHandlingStrategy = new AllForOneStrategy(List(classOf[Exception]), 3, 1000)
|
||||
val faultHandlingStrategy = new AllForOnePermanentStrategy(List(classOf[Exception]), 3, 1000)
|
||||
val typedActors = List(createTypedActorProperties("akka.spring.Foo", "1000"))
|
||||
|
||||
private def createTypedActorProperties(target: String, timeout: String): ActorProperties = {
|
||||
|
|
|
|||
|
|
@ -6,7 +6,7 @@ package akka.testkit
|
|||
import org.scalatest.matchers.MustMatchers
|
||||
import org.scalatest.{ BeforeAndAfterEach, WordSpec }
|
||||
import akka.actor._
|
||||
import akka.config.Supervision.OneForOneStrategy
|
||||
import akka.config.Supervision.OneForOnePermanentStrategy
|
||||
import akka.event.EventHandler
|
||||
import akka.dispatch.{ Future, Promise }
|
||||
|
||||
|
|
@ -182,7 +182,7 @@ class TestActorRefSpec extends WordSpec with MustMatchers with BeforeAndAfterEac
|
|||
}).withSupervisor(self))
|
||||
|
||||
def receiveT = { case "sendKill" ⇒ ref ! Kill }
|
||||
}).withFaultHandler(OneForOneStrategy(List(classOf[ActorKilledException]), 5, 1000)))
|
||||
}).withFaultHandler(OneForOnePermanentStrategy(List(classOf[ActorKilledException]), 5, 1000)))
|
||||
|
||||
boss ! "sendKill"
|
||||
|
||||
|
|
|
|||
|
|
@ -4,7 +4,7 @@ import org.scalatest.WordSpec
|
|||
import org.scalatest.matchers.MustMatchers
|
||||
import org.scalatest.{ BeforeAndAfterEach, WordSpec }
|
||||
import akka.actor._
|
||||
import akka.config.Supervision.OneForOneStrategy
|
||||
import akka.config.Supervision.OneForOnePermanentStrategy
|
||||
import akka.event.EventHandler
|
||||
import akka.dispatch.Future
|
||||
import akka.util.duration._
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue