Merge branch 'master' into wip-1537-moar-futuredocs-√

This commit is contained in:
Viktor Klang 2012-01-24 16:31:52 +01:00
commit eca809675a
52 changed files with 552 additions and 230 deletions

View file

@ -23,15 +23,15 @@ public class JavaExtension {
return TestExtensionId.TestExtensionProvider; return TestExtensionId.TestExtensionProvider;
} }
public TestExtension createExtension(ActorSystemImpl i) { public TestExtension createExtension(ExtendedActorSystem i) {
return new TestExtension(i); return new TestExtension(i);
} }
} }
static class TestExtension implements Extension { static class TestExtension implements Extension {
public final ActorSystemImpl system; public final ExtendedActorSystem system;
public TestExtension(ActorSystemImpl i) { public TestExtension(ExtendedActorSystem i) {
system = i; system = i;
} }
} }

View file

@ -81,7 +81,8 @@ class ActorFireForgetRequestReplySpec extends AkkaSpec with BeforeAndAfterEach w
"should shutdown crashed temporary actor" in { "should shutdown crashed temporary actor" in {
filterEvents(EventFilter[Exception]("Expected exception")) { filterEvents(EventFilter[Exception]("Expected exception")) {
val supervisor = system.actorOf(Props(new Supervisor(OneForOneStrategy(List(classOf[Exception]), Some(0))))) val supervisor = system.actorOf(Props(new Supervisor(
OneForOneStrategy(maxNrOfRetries = 0)(List(classOf[Exception])))))
val actor = Await.result((supervisor ? Props[CrashingActor]).mapTo[ActorRef], timeout.duration) val actor = Await.result((supervisor ? Props[CrashingActor]).mapTo[ActorRef], timeout.duration)
actor.isTerminated must be(false) actor.isTerminated must be(false)
actor ! "Die" actor ! "Die"

View file

@ -36,7 +36,8 @@ class ActorLifeCycleSpec extends AkkaSpec with BeforeAndAfterEach with ImplicitS
"invoke preRestart, preStart, postRestart when using OneForOneStrategy" in { "invoke preRestart, preStart, postRestart when using OneForOneStrategy" in {
filterException[ActorKilledException] { filterException[ActorKilledException] {
val id = newUuid().toString val id = newUuid().toString
val supervisor = system.actorOf(Props(new Supervisor(OneForOneStrategy(List(classOf[Exception]), Some(3))))) val supervisor = system.actorOf(Props(new Supervisor(
OneForOneStrategy(maxNrOfRetries = 3)(List(classOf[Exception])))))
val gen = new AtomicInteger(0) val gen = new AtomicInteger(0)
val restarterProps = Props(new LifeCycleTestActor(testActor, id, gen) { val restarterProps = Props(new LifeCycleTestActor(testActor, id, gen) {
override def preRestart(reason: Throwable, message: Option[Any]) { report("preRestart") } override def preRestart(reason: Throwable, message: Option[Any]) { report("preRestart") }
@ -70,7 +71,8 @@ class ActorLifeCycleSpec extends AkkaSpec with BeforeAndAfterEach with ImplicitS
"default for preRestart and postRestart is to call postStop and preStart respectively" in { "default for preRestart and postRestart is to call postStop and preStart respectively" in {
filterException[ActorKilledException] { filterException[ActorKilledException] {
val id = newUuid().toString val id = newUuid().toString
val supervisor = system.actorOf(Props(new Supervisor(OneForOneStrategy(List(classOf[Exception]), Some(3))))) val supervisor = system.actorOf(Props(new Supervisor(
OneForOneStrategy(maxNrOfRetries = 3)(List(classOf[Exception])))))
val gen = new AtomicInteger(0) val gen = new AtomicInteger(0)
val restarterProps = Props(new LifeCycleTestActor(testActor, id, gen)) val restarterProps = Props(new LifeCycleTestActor(testActor, id, gen))
val restarter = Await.result((supervisor ? restarterProps).mapTo[ActorRef], timeout.duration) val restarter = Await.result((supervisor ? restarterProps).mapTo[ActorRef], timeout.duration)
@ -100,7 +102,8 @@ class ActorLifeCycleSpec extends AkkaSpec with BeforeAndAfterEach with ImplicitS
"not invoke preRestart and postRestart when never restarted using OneForOneStrategy" in { "not invoke preRestart and postRestart when never restarted using OneForOneStrategy" in {
val id = newUuid().toString val id = newUuid().toString
val supervisor = system.actorOf(Props(new Supervisor(OneForOneStrategy(List(classOf[Exception]), Some(3))))) val supervisor = system.actorOf(Props(new Supervisor(
OneForOneStrategy(maxNrOfRetries = 3)(List(classOf[Exception])))))
val gen = new AtomicInteger(0) val gen = new AtomicInteger(0)
val props = Props(new LifeCycleTestActor(testActor, id, gen)) val props = Props(new LifeCycleTestActor(testActor, id, gen))
val a = Await.result((supervisor ? props).mapTo[ActorRef], timeout.duration) val a = Await.result((supervisor ? props).mapTo[ActorRef], timeout.duration)

View file

@ -376,7 +376,8 @@ class ActorRefSpec extends AkkaSpec with DefaultTimeout {
val boss = system.actorOf(Props(new Actor { val boss = system.actorOf(Props(new Actor {
override val supervisorStrategy = OneForOneStrategy(List(classOf[Throwable]), 2, 1000) override val supervisorStrategy =
OneForOneStrategy(maxNrOfRetries = 2, withinTimeRange = 1 second)(List(classOf[Throwable]))
val ref = context.actorOf( val ref = context.actorOf(
Props(new Actor { Props(new Actor {

View file

@ -15,11 +15,11 @@ class JavaExtensionSpec extends JavaExtension with JUnitSuite
object TestExtension extends ExtensionId[TestExtension] with ExtensionIdProvider { object TestExtension extends ExtensionId[TestExtension] with ExtensionIdProvider {
def lookup = this def lookup = this
def createExtension(s: ActorSystemImpl) = new TestExtension(s) def createExtension(s: ExtendedActorSystem) = new TestExtension(s)
} }
// Dont't place inside ActorSystemSpec object, since it will not be garbage collected and reference to system remains // Dont't place inside ActorSystemSpec object, since it will not be garbage collected and reference to system remains
class TestExtension(val system: ActorSystemImpl) extends Extension class TestExtension(val system: ExtendedActorSystem) extends Extension
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class ActorSystemSpec extends AkkaSpec("""akka.extensions = ["akka.actor.TestExtension$"]""") { class ActorSystemSpec extends AkkaSpec("""akka.extensions = ["akka.actor.TestExtension$"]""") {

View file

@ -95,7 +95,8 @@ trait DeathWatchSpec { this: AkkaSpec with ImplicitSender with DefaultTimeout
"notify with a Terminated message once when an Actor is stopped but not when restarted" in { "notify with a Terminated message once when an Actor is stopped but not when restarted" in {
filterException[ActorKilledException] { filterException[ActorKilledException] {
val supervisor = system.actorOf(Props(new Supervisor(OneForOneStrategy(List(classOf[Exception]), Some(2))))) val supervisor = system.actorOf(Props(new Supervisor(
OneForOneStrategy(maxNrOfRetries = 2)(List(classOf[Exception])))))
val terminalProps = Props(context { case x context.sender ! x }) val terminalProps = Props(context { case x context.sender ! x })
val terminal = Await.result((supervisor ? terminalProps).mapTo[ActorRef], timeout.duration) val terminal = Await.result((supervisor ? terminalProps).mapTo[ActorRef], timeout.duration)
@ -116,7 +117,7 @@ trait DeathWatchSpec { this: AkkaSpec with ImplicitSender with DefaultTimeout
"fail a monitor which does not handle Terminated()" in { "fail a monitor which does not handle Terminated()" in {
filterEvents(EventFilter[ActorKilledException](), EventFilter[DeathPactException]()) { filterEvents(EventFilter[ActorKilledException](), EventFilter[DeathPactException]()) {
case class FF(fail: Failed) case class FF(fail: Failed)
val strategy = new OneForOneStrategy(SupervisorStrategy.makeDecider(List(classOf[Exception])), Some(0)) { val strategy = new OneForOneStrategy(maxNrOfRetries = 0)(SupervisorStrategy.makeDecider(List(classOf[Exception]))) {
override def handleFailure(context: ActorContext, child: ActorRef, cause: Throwable, stats: ChildRestartStats, children: Iterable[ChildRestartStats]) = { override def handleFailure(context: ActorContext, child: ActorRef, cause: Throwable, stats: ChildRestartStats, children: Iterable[ChildRestartStats]) = {
testActor.tell(FF(Failed(cause)), child) testActor.tell(FF(Failed(cause)), child)
super.handleFailure(context, child, cause, stats, children) super.handleFailure(context, child, cause, stats, children)

View file

@ -5,8 +5,8 @@ package akka.actor
import akka.testkit._ import akka.testkit._
import akka.util.duration._ import akka.util.duration._
import FSM._ import FSM._
import akka.util.Duration
object FSMTransitionSpec { object FSMTransitionSpec {
@ -72,7 +72,7 @@ class FSMTransitionSpec extends AkkaSpec with ImplicitSender {
val fsm = system.actorOf(Props(new MyFSM(testActor))) val fsm = system.actorOf(Props(new MyFSM(testActor)))
val sup = system.actorOf(Props(new Actor { val sup = system.actorOf(Props(new Actor {
context.watch(fsm) context.watch(fsm)
override val supervisorStrategy = OneForOneStrategy(List(classOf[Throwable]), None, None) override val supervisorStrategy = OneForOneStrategy(withinTimeRange = Duration.Inf)(List(classOf[Throwable]))
def receive = { case _ } def receive = { case _ }
})) }))

View file

@ -14,6 +14,7 @@ import akka.testkit.AkkaSpec
import akka.testkit.DefaultTimeout import akka.testkit.DefaultTimeout
import akka.testkit.TestLatch import akka.testkit.TestLatch
import akka.util.duration._ import akka.util.duration._
import akka.util.Duration
import akka.pattern.ask import akka.pattern.ask
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
@ -29,7 +30,8 @@ class RestartStrategySpec extends AkkaSpec with DefaultTimeout {
"A RestartStrategy" must { "A RestartStrategy" must {
"ensure that slave stays dead after max restarts within time range" in { "ensure that slave stays dead after max restarts within time range" in {
val boss = system.actorOf(Props(new Supervisor(OneForOneStrategy(List(classOf[Throwable]), 2, 1000)))) val boss = system.actorOf(Props(new Supervisor(
OneForOneStrategy(maxNrOfRetries = 2, withinTimeRange = 1 second)(List(classOf[Throwable])))))
val restartLatch = new TestLatch val restartLatch = new TestLatch
val secondRestartLatch = new TestLatch val secondRestartLatch = new TestLatch
@ -75,7 +77,7 @@ class RestartStrategySpec extends AkkaSpec with DefaultTimeout {
} }
"ensure that slave is immortal without max restarts and time range" in { "ensure that slave is immortal without max restarts and time range" in {
val boss = system.actorOf(Props(new Supervisor(OneForOneStrategy(List(classOf[Throwable]), None, None)))) val boss = system.actorOf(Props(new Supervisor(OneForOneStrategy()(List(classOf[Throwable])))))
val countDownLatch = new TestLatch(100) val countDownLatch = new TestLatch(100)
@ -97,7 +99,8 @@ class RestartStrategySpec extends AkkaSpec with DefaultTimeout {
} }
"ensure that slave restarts after number of crashes not within time range" in { "ensure that slave restarts after number of crashes not within time range" in {
val boss = system.actorOf(Props(new Supervisor(OneForOneStrategy(List(classOf[Throwable]), 2, 500)))) val boss = system.actorOf(Props(new Supervisor(
OneForOneStrategy(maxNrOfRetries = 2, withinTimeRange = 500 millis)(List(classOf[Throwable])))))
val restartLatch = new TestLatch val restartLatch = new TestLatch
val secondRestartLatch = new TestLatch val secondRestartLatch = new TestLatch
@ -154,7 +157,7 @@ class RestartStrategySpec extends AkkaSpec with DefaultTimeout {
} }
"ensure that slave is not restarted after max retries" in { "ensure that slave is not restarted after max retries" in {
val boss = system.actorOf(Props(new Supervisor(OneForOneStrategy(List(classOf[Throwable]), Some(2), None)))) val boss = system.actorOf(Props(new Supervisor(OneForOneStrategy(maxNrOfRetries = 2)(List(classOf[Throwable])))))
val restartLatch = new TestLatch val restartLatch = new TestLatch
val secondRestartLatch = new TestLatch val secondRestartLatch = new TestLatch
@ -209,7 +212,7 @@ class RestartStrategySpec extends AkkaSpec with DefaultTimeout {
val countDownLatch = new TestLatch(2) val countDownLatch = new TestLatch(2)
val boss = system.actorOf(Props(new Actor { val boss = system.actorOf(Props(new Actor {
override val supervisorStrategy = OneForOneStrategy(List(classOf[Throwable]), None, Some(1000)) override val supervisorStrategy = OneForOneStrategy(withinTimeRange = 1 second)(List(classOf[Throwable]))
def receive = { def receive = {
case p: Props sender ! context.watch(context.actorOf(p)) case p: Props sender ! context.watch(context.actorOf(p))
case t: Terminated maxNoOfRestartsLatch.open() case t: Terminated maxNoOfRestartsLatch.open()

View file

@ -134,7 +134,7 @@ class SchedulerSpec extends AkkaSpec with BeforeAndAfterEach with DefaultTimeout
val restartLatch = new TestLatch val restartLatch = new TestLatch
val pingLatch = new TestLatch(6) val pingLatch = new TestLatch(6)
val supervisor = system.actorOf(Props(new Supervisor(AllForOneStrategy(List(classOf[Exception]), 3, 1000)))) val supervisor = system.actorOf(Props(new Supervisor(AllForOneStrategy(3, 1 second)(List(classOf[Exception])))))
val props = Props(new Actor { val props = Props(new Actor {
def receive = { def receive = {
case Ping pingLatch.countDown() case Ping pingLatch.countDown()
@ -165,8 +165,8 @@ class SchedulerSpec extends AkkaSpec with BeforeAndAfterEach with DefaultTimeout
def receive = { def receive = {
case Msg(ts) case Msg(ts)
val now = System.nanoTime val now = System.nanoTime
// Make sure that no message has been dispatched before the scheduled time (10ms = 10000000ns) has occurred // Make sure that no message has been dispatched before the scheduled time (10ms) has occurred
if (now - ts < 10000000) throw new RuntimeException("Interval is too small: " + (now - ts)) if (now - ts < 10.millis.toNanos) throw new RuntimeException("Interval is too small: " + (now - ts))
ticks.countDown() ticks.countDown()
} }
})) }))

View file

@ -5,10 +5,11 @@
package akka.actor package akka.actor
import akka.testkit._ import akka.testkit._
import java.util.concurrent.{ TimeUnit, CountDownLatch } import java.util.concurrent.{ TimeUnit, CountDownLatch }
import akka.dispatch.Await import akka.dispatch.Await
import akka.pattern.ask import akka.pattern.ask
import akka.util.Duration
import akka.util.duration._
object SupervisorHierarchySpec { object SupervisorHierarchySpec {
class FireWorkerException(msg: String) extends Exception(msg) class FireWorkerException(msg: String) extends Exception(msg)
@ -39,9 +40,9 @@ class SupervisorHierarchySpec extends AkkaSpec with DefaultTimeout {
"restart manager and workers in AllForOne" in { "restart manager and workers in AllForOne" in {
val countDown = new CountDownLatch(4) val countDown = new CountDownLatch(4)
val boss = system.actorOf(Props(new Supervisor(OneForOneStrategy(List(classOf[Exception]), None, None)))) val boss = system.actorOf(Props(new Supervisor(OneForOneStrategy()(List(classOf[Exception])))))
val managerProps = Props(new CountDownActor(countDown, AllForOneStrategy(List(), None, None))) val managerProps = Props(new CountDownActor(countDown, AllForOneStrategy()(List())))
val manager = Await.result((boss ? managerProps).mapTo[ActorRef], timeout.duration) val manager = Await.result((boss ? managerProps).mapTo[ActorRef], timeout.duration)
val workerProps = Props(new CountDownActor(countDown, SupervisorStrategy.defaultStrategy)) val workerProps = Props(new CountDownActor(countDown, SupervisorStrategy.defaultStrategy))
@ -61,7 +62,8 @@ class SupervisorHierarchySpec extends AkkaSpec with DefaultTimeout {
val countDownMessages = new CountDownLatch(1) val countDownMessages = new CountDownLatch(1)
val countDownMax = new CountDownLatch(1) val countDownMax = new CountDownLatch(1)
val boss = system.actorOf(Props(new Actor { val boss = system.actorOf(Props(new Actor {
override val supervisorStrategy = OneForOneStrategy(List(classOf[Throwable]), 1, 5000) override val supervisorStrategy =
OneForOneStrategy(maxNrOfRetries = 1, withinTimeRange = 5 seconds)(List(classOf[Throwable]))
val crasher = context.watch(context.actorOf(Props(new CountDownActor(countDownMessages, SupervisorStrategy.defaultStrategy)))) val crasher = context.watch(context.actorOf(Props(new CountDownActor(countDownMessages, SupervisorStrategy.defaultStrategy))))

View file

@ -9,6 +9,7 @@ import java.util.concurrent.{ TimeUnit, CountDownLatch }
import akka.testkit.AkkaSpec import akka.testkit.AkkaSpec
import akka.testkit.DefaultTimeout import akka.testkit.DefaultTimeout
import akka.pattern.ask import akka.pattern.ask
import akka.util.duration._
object SupervisorMiscSpec { object SupervisorMiscSpec {
val config = """ val config = """
@ -29,7 +30,8 @@ class SupervisorMiscSpec extends AkkaSpec(SupervisorMiscSpec.config) with Defaul
filterEvents(EventFilter[Exception]("Kill")) { filterEvents(EventFilter[Exception]("Kill")) {
val countDownLatch = new CountDownLatch(4) val countDownLatch = new CountDownLatch(4)
val supervisor = system.actorOf(Props(new Supervisor(OneForOneStrategy(List(classOf[Exception]), 3, 5000)))) val supervisor = system.actorOf(Props(new Supervisor(
OneForOneStrategy(maxNrOfRetries = 3, withinTimeRange = 5 seconds)(List(classOf[Exception])))))
val workerProps = Props(new Actor { val workerProps = Props(new Actor {
override def postRestart(cause: Throwable) { countDownLatch.countDown() } override def postRestart(cause: Throwable) { countDownLatch.countDown() }

View file

@ -14,7 +14,7 @@ import akka.dispatch.Await
import akka.pattern.ask import akka.pattern.ask
object SupervisorSpec { object SupervisorSpec {
val Timeout = 5 seconds val Timeout = 5.seconds
case object DieReply case object DieReply
@ -54,7 +54,7 @@ object SupervisorSpec {
var s: ActorRef = _ var s: ActorRef = _
override val supervisorStrategy = OneForOneStrategy(List(classOf[Exception]), Some(0)) override val supervisorStrategy = OneForOneStrategy(maxNrOfRetries = 0)(List(classOf[Exception]))
def receive = { def receive = {
case Die temp forward Die case Die temp forward Die
@ -69,7 +69,7 @@ class SupervisorSpec extends AkkaSpec with BeforeAndAfterEach with ImplicitSende
import SupervisorSpec._ import SupervisorSpec._
val TimeoutMillis = Timeout.dilated.toMillis.toInt val DilatedTimeout = Timeout.dilated
// ===================================================== // =====================================================
// Creating actors and supervisors // Creating actors and supervisors
@ -78,45 +78,51 @@ class SupervisorSpec extends AkkaSpec with BeforeAndAfterEach with ImplicitSende
private def child(supervisor: ActorRef, props: Props): ActorRef = Await.result((supervisor ? props).mapTo[ActorRef], timeout.duration) private def child(supervisor: ActorRef, props: Props): ActorRef = Await.result((supervisor ? props).mapTo[ActorRef], timeout.duration)
def temporaryActorAllForOne = { def temporaryActorAllForOne = {
val supervisor = system.actorOf(Props(new Supervisor(AllForOneStrategy(List(classOf[Exception]), Some(0))))) val supervisor = system.actorOf(Props(new Supervisor(AllForOneStrategy(maxNrOfRetries = 0)(List(classOf[Exception])))))
val temporaryActor = child(supervisor, Props(new PingPongActor(testActor))) val temporaryActor = child(supervisor, Props(new PingPongActor(testActor)))
(temporaryActor, supervisor) (temporaryActor, supervisor)
} }
def singleActorAllForOne = { def singleActorAllForOne = {
val supervisor = system.actorOf(Props(new Supervisor(AllForOneStrategy(List(classOf[Exception]), 3, TimeoutMillis)))) val supervisor = system.actorOf(Props(new Supervisor(
AllForOneStrategy(maxNrOfRetries = 3, withinTimeRange = DilatedTimeout)(List(classOf[Exception])))))
val pingpong = child(supervisor, Props(new PingPongActor(testActor))) val pingpong = child(supervisor, Props(new PingPongActor(testActor)))
(pingpong, supervisor) (pingpong, supervisor)
} }
def singleActorOneForOne = { def singleActorOneForOne = {
val supervisor = system.actorOf(Props(new Supervisor(OneForOneStrategy(List(classOf[Exception]), 3, TimeoutMillis)))) val supervisor = system.actorOf(Props(new Supervisor(
OneForOneStrategy(maxNrOfRetries = 3, withinTimeRange = DilatedTimeout)(List(classOf[Exception])))))
val pingpong = child(supervisor, Props(new PingPongActor(testActor))) val pingpong = child(supervisor, Props(new PingPongActor(testActor)))
(pingpong, supervisor) (pingpong, supervisor)
} }
def multipleActorsAllForOne = { def multipleActorsAllForOne = {
val supervisor = system.actorOf(Props(new Supervisor(AllForOneStrategy(List(classOf[Exception]), 3, TimeoutMillis)))) val supervisor = system.actorOf(Props(new Supervisor(
AllForOneStrategy(maxNrOfRetries = 3, withinTimeRange = DilatedTimeout)(List(classOf[Exception])))))
val pingpong1, pingpong2, pingpong3 = child(supervisor, Props(new PingPongActor(testActor))) val pingpong1, pingpong2, pingpong3 = child(supervisor, Props(new PingPongActor(testActor)))
(pingpong1, pingpong2, pingpong3, supervisor) (pingpong1, pingpong2, pingpong3, supervisor)
} }
def multipleActorsOneForOne = { def multipleActorsOneForOne = {
val supervisor = system.actorOf(Props(new Supervisor(OneForOneStrategy(List(classOf[Exception]), 3, TimeoutMillis)))) val supervisor = system.actorOf(Props(new Supervisor(
OneForOneStrategy(maxNrOfRetries = 3, withinTimeRange = DilatedTimeout)(List(classOf[Exception])))))
val pingpong1, pingpong2, pingpong3 = child(supervisor, Props(new PingPongActor(testActor))) val pingpong1, pingpong2, pingpong3 = child(supervisor, Props(new PingPongActor(testActor)))
(pingpong1, pingpong2, pingpong3, supervisor) (pingpong1, pingpong2, pingpong3, supervisor)
} }
def nestedSupervisorsAllForOne = { def nestedSupervisorsAllForOne = {
val topSupervisor = system.actorOf(Props(new Supervisor(AllForOneStrategy(List(classOf[Exception]), 3, TimeoutMillis)))) val topSupervisor = system.actorOf(Props(new Supervisor(
AllForOneStrategy(maxNrOfRetries = 3, withinTimeRange = DilatedTimeout)(List(classOf[Exception])))))
val pingpong1 = child(topSupervisor, Props(new PingPongActor(testActor))) val pingpong1 = child(topSupervisor, Props(new PingPongActor(testActor)))
val middleSupervisor = child(topSupervisor, Props(new Supervisor(AllForOneStrategy(Nil, 3, TimeoutMillis)))) val middleSupervisor = child(topSupervisor, Props(new Supervisor(
AllForOneStrategy(maxNrOfRetries = 3, withinTimeRange = DilatedTimeout)(Nil))))
val pingpong2, pingpong3 = child(middleSupervisor, Props(new PingPongActor(testActor))) val pingpong2, pingpong3 = child(middleSupervisor, Props(new PingPongActor(testActor)))
(pingpong1, pingpong2, pingpong3, topSupervisor) (pingpong1, pingpong2, pingpong3, topSupervisor)
@ -131,14 +137,14 @@ class SupervisorSpec extends AkkaSpec with BeforeAndAfterEach with ImplicitSende
} }
def ping(pingPongActor: ActorRef) = { def ping(pingPongActor: ActorRef) = {
Await.result(pingPongActor.?(Ping)(TimeoutMillis), TimeoutMillis millis) must be === PongMessage Await.result(pingPongActor.?(Ping)(DilatedTimeout), DilatedTimeout) must be === PongMessage
expectMsg(Timeout, PingMessage) expectMsg(Timeout, PingMessage)
} }
def kill(pingPongActor: ActorRef) = { def kill(pingPongActor: ActorRef) = {
val result = (pingPongActor.?(DieReply)(TimeoutMillis)) val result = (pingPongActor.?(DieReply)(DilatedTimeout))
expectMsg(Timeout, ExceptionMessage) expectMsg(Timeout, ExceptionMessage)
intercept[RuntimeException] { Await.result(result, TimeoutMillis millis) } intercept[RuntimeException] { Await.result(result, DilatedTimeout) }
} }
"A supervisor" must { "A supervisor" must {
@ -154,7 +160,7 @@ class SupervisorSpec extends AkkaSpec with BeforeAndAfterEach with ImplicitSende
"not restart temporary actor" in { "not restart temporary actor" in {
val (temporaryActor, _) = temporaryActorAllForOne val (temporaryActor, _) = temporaryActorAllForOne
intercept[RuntimeException] { Await.result(temporaryActor.?(DieReply)(TimeoutMillis), TimeoutMillis millis) } intercept[RuntimeException] { Await.result(temporaryActor.?(DieReply)(DilatedTimeout), DilatedTimeout) }
expectNoMsg(1 second) expectNoMsg(1 second)
} }
@ -280,7 +286,8 @@ class SupervisorSpec extends AkkaSpec with BeforeAndAfterEach with ImplicitSende
"must attempt restart when exception during restart" in { "must attempt restart when exception during restart" in {
val inits = new AtomicInteger(0) val inits = new AtomicInteger(0)
val supervisor = system.actorOf(Props(new Supervisor(OneForOneStrategy(classOf[Exception] :: Nil, 3, 10000)))) val supervisor = system.actorOf(Props(new Supervisor(
OneForOneStrategy(maxNrOfRetries = 3, withinTimeRange = 10 seconds)(classOf[Exception] :: Nil))))
val dyingProps = Props(new Actor { val dyingProps = Props(new Actor {
inits.incrementAndGet inits.incrementAndGet
@ -300,11 +307,11 @@ class SupervisorSpec extends AkkaSpec with BeforeAndAfterEach with ImplicitSende
filterEvents(EventFilter[RuntimeException]("Expected", occurrences = 1), filterEvents(EventFilter[RuntimeException]("Expected", occurrences = 1),
EventFilter[IllegalStateException]("error while creating actor", occurrences = 1)) { EventFilter[IllegalStateException]("error while creating actor", occurrences = 1)) {
intercept[RuntimeException] { intercept[RuntimeException] {
Await.result(dyingActor.?(DieReply)(TimeoutMillis), TimeoutMillis millis) Await.result(dyingActor.?(DieReply)(DilatedTimeout), DilatedTimeout)
} }
} }
Await.result(dyingActor.?(Ping)(TimeoutMillis), TimeoutMillis millis) must be === PongMessage Await.result(dyingActor.?(Ping)(DilatedTimeout), DilatedTimeout) must be === PongMessage
inits.get must be(3) inits.get must be(3)

View file

@ -23,7 +23,7 @@ class SupervisorTreeSpec extends AkkaSpec with ImplicitSender with DefaultTimeou
EventFilter[ActorKilledException](occurrences = 1) intercept { EventFilter[ActorKilledException](occurrences = 1) intercept {
within(5 seconds) { within(5 seconds) {
val p = Props(new Actor { val p = Props(new Actor {
override val supervisorStrategy = OneForOneStrategy(List(classOf[Exception]), 3, 1000) override val supervisorStrategy = OneForOneStrategy(maxNrOfRetries = 3, withinTimeRange = 1 second)(List(classOf[Exception]))
def receive = { def receive = {
case p: Props sender ! context.actorOf(p) case p: Props sender ! context.actorOf(p)
} }

View file

@ -12,6 +12,7 @@ import akka.testkit.ImplicitSender
import akka.testkit.DefaultTimeout import akka.testkit.DefaultTimeout
import akka.dispatch.Await import akka.dispatch.Await
import akka.pattern.ask import akka.pattern.ask
import akka.util.duration._
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class Ticket669Spec extends AkkaSpec with BeforeAndAfterAll with ImplicitSender with DefaultTimeout { class Ticket669Spec extends AkkaSpec with BeforeAndAfterAll with ImplicitSender with DefaultTimeout {
@ -25,7 +26,8 @@ class Ticket669Spec extends AkkaSpec with BeforeAndAfterAll with ImplicitSender
"A supervised actor with lifecycle PERMANENT" should { "A supervised actor with lifecycle PERMANENT" should {
"be able to reply on failure during preRestart" in { "be able to reply on failure during preRestart" in {
filterEvents(EventFilter[Exception]("test", occurrences = 1)) { filterEvents(EventFilter[Exception]("test", occurrences = 1)) {
val supervisor = system.actorOf(Props(new Supervisor(AllForOneStrategy(List(classOf[Exception]), 5, 10000)))) val supervisor = system.actorOf(Props(new Supervisor(
AllForOneStrategy(5, 10 seconds)(List(classOf[Exception])))))
val supervised = Await.result((supervisor ? Props[Supervised]).mapTo[ActorRef], timeout.duration) val supervised = Await.result((supervisor ? Props[Supervised]).mapTo[ActorRef], timeout.duration)
supervised.!("test")(testActor) supervised.!("test")(testActor)
@ -36,7 +38,8 @@ class Ticket669Spec extends AkkaSpec with BeforeAndAfterAll with ImplicitSender
"be able to reply on failure during postStop" in { "be able to reply on failure during postStop" in {
filterEvents(EventFilter[Exception]("test", occurrences = 1)) { filterEvents(EventFilter[Exception]("test", occurrences = 1)) {
val supervisor = system.actorOf(Props(new Supervisor(AllForOneStrategy(List(classOf[Exception]), Some(0), None)))) val supervisor = system.actorOf(Props(new Supervisor(
AllForOneStrategy(maxNrOfRetries = 0)(List(classOf[Exception])))))
val supervised = Await.result((supervisor ? Props[Supervised]).mapTo[ActorRef], timeout.duration) val supervised = Await.result((supervisor ? Props[Supervised]).mapTo[ActorRef], timeout.duration)
supervised.!("test")(testActor) supervised.!("test")(testActor)

View file

@ -300,7 +300,7 @@ class TypedActorSpec extends AkkaSpec(TypedActorSpec.config)
"be able to handle exceptions when calling methods" in { "be able to handle exceptions when calling methods" in {
filterEvents(EventFilter[IllegalStateException]("expected")) { filterEvents(EventFilter[IllegalStateException]("expected")) {
val boss = system.actorOf(Props(new Actor { val boss = system.actorOf(Props(new Actor {
override val supervisorStrategy = OneForOneStrategy { override val supervisorStrategy = OneForOneStrategy() {
case e: IllegalStateException if e.getMessage == "expected" SupervisorStrategy.Resume case e: IllegalStateException if e.getMessage == "expected" SupervisorStrategy.Resume
} }
def receive = { def receive = {

View file

@ -15,7 +15,8 @@ import akka.actor._
object LoggingReceiveSpec { object LoggingReceiveSpec {
class TestLogActor extends Actor { class TestLogActor extends Actor {
override val supervisorStrategy = OneForOneStrategy(List(classOf[Throwable]), 5, 5000) override val supervisorStrategy =
OneForOneStrategy(maxNrOfRetries = 5, withinTimeRange = 5 seconds)(List(classOf[Throwable]))
def receive = { case _ } def receive = { case _ }
} }
} }

View file

@ -1,11 +1,8 @@
package akka.performance.trading.domain package akka.performance.trading.domain
import java.util.concurrent.atomic.AtomicInteger import java.util.concurrent.atomic.AtomicInteger
import akka.actor.Extension
import akka.actor.ExtensionId import akka.actor.{ ExtensionIdProvider, ExtensionId, Extension, ExtendedActorSystem, ActorSystem }
import akka.actor.ExtensionIdProvider
import akka.actor.ActorSystemImpl
import akka.actor.ActorSystem
abstract trait TradeObserver { abstract trait TradeObserver {
def trade(bid: Bid, ask: Ask) def trade(bid: Bid, ask: Ask)
@ -38,5 +35,5 @@ object TotalTradeCounterExtension
extends ExtensionId[TotalTradeCounter] extends ExtensionId[TotalTradeCounter]
with ExtensionIdProvider { with ExtensionIdProvider {
override def lookup = TotalTradeCounterExtension override def lookup = TotalTradeCounterExtension
override def createExtension(system: ActorSystemImpl) = new TotalTradeCounter override def createExtension(system: ExtendedActorSystem) = new TotalTradeCounter
} }

View file

@ -142,12 +142,12 @@ object Actor {
* {{{ * {{{
* class ExampleActor extends Actor { * class ExampleActor extends Actor {
* *
* override val supervisorStrategy = OneForOneStrategy({ * override val supervisorStrategy = OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange = 1 minute) {
* case _: ArithmeticException Resume * case _: ArithmeticException Resume
* case _: NullPointerException Restart * case _: NullPointerException Restart
* case _: IllegalArgumentException Stop * case _: IllegalArgumentException Stop
* case _: Exception Escalate * case _: Exception Escalate
* }: Decider, maxNrOfRetries = Some(10), withinTimeRange = Some(60000)) * }
* *
* def receive = { * def receive = {
* // directly calculated reply * // directly calculated reply

View file

@ -355,7 +355,7 @@ class LocalActorRefProvider(
override val supervisorStrategy = { override val supervisorStrategy = {
import akka.actor.SupervisorStrategy._ import akka.actor.SupervisorStrategy._
OneForOneStrategy { OneForOneStrategy() {
case _: ActorKilledException Stop case _: ActorKilledException Stop
case _: ActorInitializationException Stop case _: ActorInitializationException Stop
case _: Exception Restart case _: Exception Restart

View file

@ -130,6 +130,13 @@ object ActorSystem {
* }}} * }}}
* *
* Where no name is given explicitly, one will be automatically generated. * Where no name is given explicitly, one will be automatically generated.
*
* <b><i>Important Notice:</i></o>
*
* This class is not meant to be extended by user code. If you want to
* actually roll your own Akka, it will probably be better to look into
* extending [[akka.actor.ExtendedActorSystem]] instead, but beware that you
* are completely on your own in that case!
*/ */
abstract class ActorSystem extends ActorRefFactory { abstract class ActorSystem extends ActorRefFactory {
import ActorSystem._ import ActorSystem._
@ -286,7 +293,40 @@ abstract class ActorSystem extends ActorRefFactory {
def hasExtension(ext: ExtensionId[_ <: Extension]): Boolean def hasExtension(ext: ExtensionId[_ <: Extension]): Boolean
} }
class ActorSystemImpl(val name: String, applicationConfig: Config) extends ActorSystem { /**
* More powerful interface to the actor systems implementation which is presented to extensions (see [[akka.actor.Extension]]).
*
* <b><i>Important Notice:</i></o>
*
* This class is not meant to be extended by user code. If you want to
* actually roll your own Akka, beware that you are completely on your own in
* that case!
*/
abstract class ExtendedActorSystem extends ActorSystem {
/**
* The ActorRefProvider is the only entity which creates all actor references within this actor system.
*/
def provider: ActorRefProvider
/**
* The top-level supervisor of all actors created using system.actorOf(...).
*/
def guardian: InternalActorRef
/**
* The top-level supervisor of all system-internal services like logging.
*/
def systemGuardian: InternalActorRef
/**
* Implementation of the mechanism which is used for watch()/unwatch().
*/
def deathWatch: DeathWatch
}
class ActorSystemImpl(val name: String, applicationConfig: Config) extends ExtendedActorSystem {
if (!name.matches("""^\w+$""")) if (!name.matches("""^\w+$"""))
throw new IllegalArgumentException("invalid ActorSystem name [" + name + "], must contain only word characters (i.e. [a-zA-Z_0-9])") throw new IllegalArgumentException("invalid ActorSystem name [" + name + "], must contain only word characters (i.e. [a-zA-Z_0-9])")

View file

@ -18,10 +18,8 @@ import akka.util.ReflectiveAccess
* The extension itself can be created in any way desired and has full access * The extension itself can be created in any way desired and has full access
* to the ActorSystem implementation. * to the ActorSystem implementation.
* *
*/ * This trait is only a marker interface to signify an Akka Extension, see
* [[akka.actor.ExtensionKey]] for a concise way of formulating extensions.
/**
* Marker interface to signify an Akka Extension
*/ */
trait Extension trait Extension
@ -47,7 +45,7 @@ trait ExtensionId[T <: Extension] {
* Is used by Akka to instantiate the Extension identified by this ExtensionId, * Is used by Akka to instantiate the Extension identified by this ExtensionId,
* internal use only. * internal use only.
*/ */
def createExtension(system: ActorSystemImpl): T def createExtension(system: ExtendedActorSystem): T
} }
/** /**
@ -94,7 +92,7 @@ abstract class ExtensionKey[T <: Extension](implicit m: ClassManifest[T]) extend
def this(clazz: Class[T]) = this()(ClassManifest.fromClass(clazz)) def this(clazz: Class[T]) = this()(ClassManifest.fromClass(clazz))
override def lookup(): ExtensionId[T] = this override def lookup(): ExtensionId[T] = this
def createExtension(system: ActorSystemImpl): T = def createExtension(system: ExtendedActorSystem): T =
ReflectiveAccess.createInstance[T](m.erasure, Array[Class[_]](classOf[ActorSystemImpl]), Array[AnyRef](system)) match { ReflectiveAccess.createInstance[T](m.erasure, Array[Class[_]](classOf[ActorSystemImpl]), Array[AnyRef](system)) match {
case Left(ex) throw ex case Left(ex) throw ex
case Right(r) r case Right(r) r

View file

@ -8,6 +8,7 @@ import scala.annotation.tailrec
import scala.collection.mutable.ArrayBuffer import scala.collection.mutable.ArrayBuffer
import scala.collection.JavaConversions._ import scala.collection.JavaConversions._
import java.lang.{ Iterable JIterable } import java.lang.{ Iterable JIterable }
import akka.util.Duration
case class ChildRestartStats(val child: ActorRef, var maxNrOfRetriesCount: Int = 0, var restartTimeWindowStartNanos: Long = 0L) { case class ChildRestartStats(val child: ActorRef, var maxNrOfRetriesCount: Int = 0, var restartTimeWindowStartNanos: Long = 0L) {
@ -44,7 +45,16 @@ case class ChildRestartStats(val child: ActorRef, var maxNrOfRetriesCount: Int =
} }
} }
object SupervisorStrategy { trait SupervisorStrategyLowPriorityImplicits { this: SupervisorStrategy.type
/**
* Implicit conversion from `Seq` of Cause-Action pairs to a `Decider`. See makeDecider(causeAction).
*/
implicit def seqCauseAction2Decider(trapExit: Iterable[CauseAction]): Decider = makeDecider(trapExit)
// the above would clash with seqThrowable2Decider for empty lists
}
object SupervisorStrategy extends SupervisorStrategyLowPriorityImplicits {
sealed trait Action sealed trait Action
/** /**
@ -95,6 +105,13 @@ object SupervisorStrategy {
*/ */
def escalate = Escalate def escalate = Escalate
/**
* When supervisorStrategy is not specified for an actor this
* is used by default. The child will be stopped when
* [[akka.ActorInitializationException]] or [[akka.ActorKilledException]]
* is thrown. It will be restarted for other `Exception` types.
* The error is escalated if it's a `Throwable`, i.e. `Error`.
*/
final val defaultStrategy: SupervisorStrategy = { final val defaultStrategy: SupervisorStrategy = {
def defaultDecider: Decider = { def defaultDecider: Decider = {
case _: ActorInitializationException Stop case _: ActorInitializationException Stop
@ -102,32 +119,38 @@ object SupervisorStrategy {
case _: Exception Restart case _: Exception Restart
case _ Escalate case _ Escalate
} }
OneForOneStrategy(defaultDecider, None, None) OneForOneStrategy()(defaultDecider)
} }
/**
* Implicit conversion from `Seq` of Throwables to a `Decider`.
* This maps the given Throwables to restarts, otherwise escalates.
*/
implicit def seqThrowable2Decider(trapExit: Seq[Class[_ <: Throwable]]): Decider = makeDecider(trapExit)
type Decider = PartialFunction[Throwable, Action] type Decider = PartialFunction[Throwable, Action]
type JDecider = akka.japi.Function[Throwable, Action] type JDecider = akka.japi.Function[Throwable, Action]
type CauseAction = (Class[_ <: Throwable], Action) type CauseAction = (Class[_ <: Throwable], Action)
/** /**
* Backwards compatible Decider builder which just checks whether one of * Decider builder which just checks whether one of
* the given Throwables matches the cause and restarts, otherwise escalates. * the given Throwables matches the cause and restarts, otherwise escalates.
*/ */
def makeDecider(trapExit: Array[Class[_ <: Throwable]]): Decider = def makeDecider(trapExit: Array[Class[_ <: Throwable]]): Decider =
{ case x if (trapExit exists (_ isInstance x)) Restart else Escalate } { case x if (trapExit exists (_ isInstance x)) Restart else Escalate }
/** /**
* Backwards compatible Decider builder which just checks whether one of * Decider builder which just checks whether one of
* the given Throwables matches the cause and restarts, otherwise escalates. * the given Throwables matches the cause and restarts, otherwise escalates.
*/ */
def makeDecider(trapExit: List[Class[_ <: Throwable]]): Decider = def makeDecider(trapExit: Seq[Class[_ <: Throwable]]): Decider =
{ case x if (trapExit exists (_ isInstance x)) Restart else Escalate } { case x if (trapExit exists (_ isInstance x)) Restart else Escalate }
/** /**
* Backwards compatible Decider builder which just checks whether one of * Decider builder which just checks whether one of
* the given Throwables matches the cause and restarts, otherwise escalates. * the given Throwables matches the cause and restarts, otherwise escalates.
*/ */
def makeDecider(trapExit: JIterable[Class[_ <: Throwable]]): Decider = makeDecider(trapExit.toList) def makeDecider(trapExit: JIterable[Class[_ <: Throwable]]): Decider = makeDecider(trapExit.toSeq)
/** /**
* Decider builder for Iterables of cause-action pairs, e.g. a map obtained * Decider builder for Iterables of cause-action pairs, e.g. a map obtained
@ -156,6 +179,11 @@ object SupervisorStrategy {
} }
buf buf
} }
private[akka] def withinTimeRangeOption(withinTimeRange: Duration): Option[Duration] =
if (withinTimeRange.isFinite && withinTimeRange >= Duration.Zero) Some(withinTimeRange) else None
private[akka] def maxNrOfRetriesOption(maxNrOfRetries: Int): Option[Int] =
if (maxNrOfRetries < 0) None else Some(maxNrOfRetries)
} }
abstract class SupervisorStrategy { abstract class SupervisorStrategy {
@ -196,49 +224,36 @@ abstract class SupervisorStrategy {
case Escalate false case Escalate false
} }
} }
}
object AllForOneStrategy {
def apply(trapExit: List[Class[_ <: Throwable]], maxNrOfRetries: Int, withinTimeRange: Int): AllForOneStrategy =
new AllForOneStrategy(SupervisorStrategy.makeDecider(trapExit),
if (maxNrOfRetries < 0) None else Some(maxNrOfRetries), if (withinTimeRange < 0) None else Some(withinTimeRange))
def apply(trapExit: List[Class[_ <: Throwable]], maxNrOfRetries: Option[Int], withinTimeRange: Option[Int]): AllForOneStrategy =
new AllForOneStrategy(SupervisorStrategy.makeDecider(trapExit), maxNrOfRetries, withinTimeRange)
def apply(trapExit: List[Class[_ <: Throwable]], maxNrOfRetries: Option[Int]): AllForOneStrategy =
new AllForOneStrategy(SupervisorStrategy.makeDecider(trapExit), maxNrOfRetries, None)
} }
/** /**
* Restart all actors linked to the same supervisor when one fails, * Restart all actors linked to the same supervisor when one fails,
* trapExit = which Throwables should be intercepted * @param maxNrOfRetries the number of times an actor is allowed to be restarted
* maxNrOfRetries = the number of times an actor is allowed to be restarted * @param withinTimeRange duration of the time window for maxNrOfRetries, Duration.Inf means no window
* withinTimeRange = millisecond time window for maxNrOfRetries, negative means no window * @param decider = mapping from Throwable to [[akka.actor.SupervisorStrategy.Action]], you can also use a
* `Seq` of Throwables which maps the given Throwables to restarts, otherwise escalates.
*/ */
case class AllForOneStrategy(decider: SupervisorStrategy.Decider, case class AllForOneStrategy(maxNrOfRetries: Int = -1, withinTimeRange: Duration = Duration.Inf)(val decider: SupervisorStrategy.Decider)
maxNrOfRetries: Option[Int] = None, extends SupervisorStrategy {
withinTimeRange: Option[Int] = None) extends SupervisorStrategy {
def this(decider: SupervisorStrategy.JDecider, maxNrOfRetries: Int, withinTimeRange: Int) = def this(maxNrOfRetries: Int, withinTimeRange: Duration, decider: SupervisorStrategy.JDecider) =
this(SupervisorStrategy.makeDecider(decider), this(maxNrOfRetries, withinTimeRange)(SupervisorStrategy.makeDecider(decider))
if (maxNrOfRetries < 0) None else Some(maxNrOfRetries),
if (withinTimeRange < 0) None else Some(withinTimeRange))
def this(trapExit: JIterable[Class[_ <: Throwable]], maxNrOfRetries: Int, withinTimeRange: Int) = def this(maxNrOfRetries: Int, withinTimeRange: Duration, trapExit: JIterable[Class[_ <: Throwable]]) =
this(SupervisorStrategy.makeDecider(trapExit), this(maxNrOfRetries, withinTimeRange)(SupervisorStrategy.makeDecider(trapExit))
if (maxNrOfRetries < 0) None else Some(maxNrOfRetries),
if (withinTimeRange < 0) None else Some(withinTimeRange))
def this(trapExit: Array[Class[_ <: Throwable]], maxNrOfRetries: Int, withinTimeRange: Int) = def this(maxNrOfRetries: Int, withinTimeRange: Duration, trapExit: Array[Class[_ <: Throwable]]) =
this(SupervisorStrategy.makeDecider(trapExit), this(maxNrOfRetries, withinTimeRange)(SupervisorStrategy.makeDecider(trapExit))
if (maxNrOfRetries < 0) None else Some(maxNrOfRetries),
if (withinTimeRange < 0) None else Some(withinTimeRange))
/* /*
* this is a performance optimization to avoid re-allocating the pairs upon * this is a performance optimization to avoid re-allocating the pairs upon
* every call to requestRestartPermission, assuming that strategies are shared * every call to requestRestartPermission, assuming that strategies are shared
* across actors and thus this field does not take up much space * across actors and thus this field does not take up much space
*/ */
val retriesWindow = (maxNrOfRetries, withinTimeRange) private val retriesWindow = (
SupervisorStrategy.maxNrOfRetriesOption(maxNrOfRetries),
SupervisorStrategy.withinTimeRangeOption(withinTimeRange).map(_.toMillis.toInt))
def handleChildTerminated(context: ActorContext, child: ActorRef, children: Iterable[ActorRef]): Unit = { def handleChildTerminated(context: ActorContext, child: ActorRef, children: Iterable[ActorRef]): Unit = {
children foreach (context.stop(_)) children foreach (context.stop(_))
@ -255,47 +270,33 @@ case class AllForOneStrategy(decider: SupervisorStrategy.Decider,
} }
} }
object OneForOneStrategy {
def apply(trapExit: List[Class[_ <: Throwable]], maxNrOfRetries: Int, withinTimeRange: Int): OneForOneStrategy =
new OneForOneStrategy(SupervisorStrategy.makeDecider(trapExit),
if (maxNrOfRetries < 0) None else Some(maxNrOfRetries), if (withinTimeRange < 0) None else Some(withinTimeRange))
def apply(trapExit: List[Class[_ <: Throwable]], maxNrOfRetries: Option[Int], withinTimeRange: Option[Int]): OneForOneStrategy =
new OneForOneStrategy(SupervisorStrategy.makeDecider(trapExit), maxNrOfRetries, withinTimeRange)
def apply(trapExit: List[Class[_ <: Throwable]], maxNrOfRetries: Option[Int]): OneForOneStrategy =
new OneForOneStrategy(SupervisorStrategy.makeDecider(trapExit), maxNrOfRetries, None)
}
/** /**
* Restart an actor when it fails * Restart an actor when it fails
* trapExit = which Throwables should be intercepted * @param maxNrOfRetries the number of times an actor is allowed to be restarted
* maxNrOfRetries = the number of times an actor is allowed to be restarted * @param withinTimeRange duration of the time window for maxNrOfRetries, Duration.Inf means no window
* withinTimeRange = millisecond time window for maxNrOfRetries, negative means no window * @param decider = mapping from Throwable to [[akka.actor.SupervisorStrategy.Action]], you can also use a
* `Seq` of Throwables which maps the given Throwables to restarts, otherwise escalates.
*/ */
case class OneForOneStrategy(decider: SupervisorStrategy.Decider, case class OneForOneStrategy(maxNrOfRetries: Int = -1, withinTimeRange: Duration = Duration.Inf)(val decider: SupervisorStrategy.Decider)
maxNrOfRetries: Option[Int] = None, extends SupervisorStrategy {
withinTimeRange: Option[Int] = None) extends SupervisorStrategy {
def this(decider: SupervisorStrategy.JDecider, maxNrOfRetries: Int, withinTimeRange: Int) = def this(maxNrOfRetries: Int, withinTimeRange: Duration, decider: SupervisorStrategy.JDecider) =
this(SupervisorStrategy.makeDecider(decider), this(maxNrOfRetries, withinTimeRange)(SupervisorStrategy.makeDecider(decider))
if (maxNrOfRetries < 0) None else Some(maxNrOfRetries),
if (withinTimeRange < 0) None else Some(withinTimeRange))
def this(trapExit: JIterable[Class[_ <: Throwable]], maxNrOfRetries: Int, withinTimeRange: Int) = def this(maxNrOfRetries: Int, withinTimeRange: Duration, trapExit: JIterable[Class[_ <: Throwable]]) =
this(SupervisorStrategy.makeDecider(trapExit), this(maxNrOfRetries, withinTimeRange)(SupervisorStrategy.makeDecider(trapExit))
if (maxNrOfRetries < 0) None else Some(maxNrOfRetries),
if (withinTimeRange < 0) None else Some(withinTimeRange))
def this(trapExit: Array[Class[_ <: Throwable]], maxNrOfRetries: Int, withinTimeRange: Int) = def this(maxNrOfRetries: Int, withinTimeRange: Duration, trapExit: Array[Class[_ <: Throwable]]) =
this(SupervisorStrategy.makeDecider(trapExit), this(maxNrOfRetries, withinTimeRange)(SupervisorStrategy.makeDecider(trapExit))
if (maxNrOfRetries < 0) None else Some(maxNrOfRetries),
if (withinTimeRange < 0) None else Some(withinTimeRange))
/* /*
* this is a performance optimization to avoid re-allocating the pairs upon * this is a performance optimization to avoid re-allocating the pairs upon
* every call to requestRestartPermission, assuming that strategies are shared * every call to requestRestartPermission, assuming that strategies are shared
* across actors and thus this field does not take up much space * across actors and thus this field does not take up much space
*/ */
val retriesWindow = (maxNrOfRetries, withinTimeRange) private val retriesWindow = (
SupervisorStrategy.maxNrOfRetriesOption(maxNrOfRetries),
SupervisorStrategy.withinTimeRangeOption(withinTimeRange).map(_.toMillis.toInt))
def handleChildTerminated(context: ActorContext, child: ActorRef, children: Iterable[ActorRef]): Unit = {} def handleChildTerminated(context: ActorContext, child: ActorRef, children: Iterable[ActorRef]): Unit = {}

View file

@ -744,7 +744,7 @@ final class IOManager private (system: ActorSystem) extends Extension {
object IOManager extends ExtensionId[IOManager] with ExtensionIdProvider { object IOManager extends ExtensionId[IOManager] with ExtensionIdProvider {
override def lookup = this override def lookup = this
override def createExtension(system: ActorSystemImpl) = new IOManager(system) override def createExtension(system: ExtendedActorSystem) = new IOManager(system)
} }
/** /**

View file

@ -80,7 +80,7 @@ object TypedActor extends ExtensionId[TypedActorExtension] with ExtensionIdProvi
override def get(system: ActorSystem): TypedActorExtension = super.get(system) override def get(system: ActorSystem): TypedActorExtension = super.get(system)
def lookup() = this def lookup() = this
def createExtension(system: ActorSystemImpl): TypedActorExtension = new TypedActorExtension(system) def createExtension(system: ExtendedActorSystem): TypedActorExtension = new TypedActorExtension(system)
/** /**
* Returns a contextual TypedActorFactory of this extension, this means that any TypedActors created by this TypedActorExtension * Returns a contextual TypedActorFactory of this extension, this means that any TypedActors created by this TypedActorExtension
@ -531,7 +531,7 @@ case class ContextualTypedActorFactory(typedActor: TypedActorExtension, actorFac
override def isTypedActor(proxyOrNot: AnyRef): Boolean = typedActor.isTypedActor(proxyOrNot) override def isTypedActor(proxyOrNot: AnyRef): Boolean = typedActor.isTypedActor(proxyOrNot)
} }
class TypedActorExtension(system: ActorSystemImpl) extends TypedActorFactory with Extension { class TypedActorExtension(system: ExtendedActorSystem) extends TypedActorFactory with Extension {
import TypedActor._ //Import the goodies from the companion object import TypedActor._ //Import the goodies from the companion object
protected def actorFactory: ActorRefFactory = system protected def actorFactory: ActorRefFactory = system
protected def typedActor = this protected def typedActor = this

View file

@ -37,7 +37,8 @@ import akka.dispatch.{ MessageDispatcher, Promise }
* } * }
* } * }
* *
* private static SupervisorStrategy strategy = new OneForOneStrategy(new Function<Throwable, Action>() { * private static SupervisorStrategy strategy = new OneForOneStrategy(10, Duration.parse("1 minute"),
* new Function<Throwable, Action>() {
* @Override * @Override
* public Action apply(Throwable t) { * public Action apply(Throwable t) {
* if (t instanceof ArithmeticException) { * if (t instanceof ArithmeticException) {
@ -50,7 +51,7 @@ import akka.dispatch.{ MessageDispatcher, Promise }
* return escalate(); * return escalate();
* } * }
* } * }
* }, 10, 60000); * });
* *
* @Override * @Override
* public SupervisorStrategy supervisorStrategy() { * public SupervisorStrategy supervisorStrategy() {

View file

@ -9,7 +9,7 @@ import akka.util.ReflectiveAccess
import scala.util.DynamicVariable import scala.util.DynamicVariable
import com.typesafe.config.Config import com.typesafe.config.Config
import akka.config.ConfigurationException import akka.config.ConfigurationException
import akka.actor.{ Extension, ActorSystem, ActorSystemImpl } import akka.actor.{ Extension, ActorSystem, ExtendedActorSystem }
case class NoSerializerFoundException(m: String) extends AkkaException(m) case class NoSerializerFoundException(m: String) extends AkkaException(m)
@ -55,7 +55,7 @@ object Serialization {
* Serialization module. Contains methods for serialization and deserialization as well as * Serialization module. Contains methods for serialization and deserialization as well as
* locating a Serializer for a particular class as defined in the mapping in the 'akka.conf' file. * locating a Serializer for a particular class as defined in the mapping in the 'akka.conf' file.
*/ */
class Serialization(val system: ActorSystemImpl) extends Extension { class Serialization(val system: ExtendedActorSystem) extends Extension {
import Serialization._ import Serialization._
val settings = new Settings(system.settings.config) val settings = new Settings(system.settings.config)

View file

@ -3,7 +3,7 @@
*/ */
package akka.serialization package akka.serialization
import akka.actor.{ ActorSystem, ExtensionId, ExtensionIdProvider, ActorSystemImpl } import akka.actor.{ ActorSystem, ExtensionId, ExtensionIdProvider, ExtendedActorSystem }
/** /**
* SerializationExtension is an Akka Extension to interact with the Serialization * SerializationExtension is an Akka Extension to interact with the Serialization
@ -12,5 +12,5 @@ import akka.actor.{ ActorSystem, ExtensionId, ExtensionIdProvider, ActorSystemIm
object SerializationExtension extends ExtensionId[Serialization] with ExtensionIdProvider { object SerializationExtension extends ExtensionId[Serialization] with ExtensionIdProvider {
override def get(system: ActorSystem): Serialization = super.get(system) override def get(system: ActorSystem): Serialization = super.get(system)
override def lookup = SerializationExtension override def lookup = SerializationExtension
override def createExtension(system: ActorSystemImpl): Serialization = new Serialization(system) override def createExtension(system: ExtendedActorSystem): Serialization = new Serialization(system)
} }

View file

@ -32,13 +32,13 @@ State
Actor objects will typically contain some variables which reflect possible Actor objects will typically contain some variables which reflect possible
states the actor may be in. This can be an explicit state machine (e.g. using states the actor may be in. This can be an explicit state machine (e.g. using
the :ref:`fsm` module), or it could be a counter, set of listeners, pending the :ref:`fsm-scala` module), or it could be a counter, set of listeners,
requests, etc. These data are what make an actor valuable, and they must be pending requests, etc. These data are what make an actor valuable, and they
protected from corruption by other actors. The good news is that Akka actors must be protected from corruption by other actors. The good news is that Akka
conceptually each have their own light-weight thread, which is completely actors conceptually each have their own light-weight thread, which is
shielded from the rest of the system. This means that instead of having to completely shielded from the rest of the system. This means that instead of
synchronize access using locks you can just write your actor code without having to synchronize access using locks you can just write your actor code
worrying about concurrency at all. without worrying about concurrency at all.
Behind the scenes Akka will run sets of actors on sets of real threads, where Behind the scenes Akka will run sets of actors on sets of real threads, where
typically many actors share one thread, and subsequent invocations of one actor typically many actors share one thread, and subsequent invocations of one actor

View file

@ -0,0 +1,8 @@
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.docs.actor
import org.scalatest.junit.JUnitSuite
class FSMDocTest extends FSMDocTestBase with JUnitSuite

View file

@ -0,0 +1,174 @@
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.docs.actor;
//#imports-data
import java.util.ArrayList;
import java.util.List;
import akka.actor.ActorRef;
//#imports-data
//#imports-actor
import akka.event.LoggingAdapter;
import akka.event.Logging;
import akka.actor.UntypedActor;
//#imports-actor
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.testkit.TestProbe;
public class FSMDocTestBase {
//#data
public static final class SetTarget {
final ActorRef ref;
public SetTarget(ActorRef ref) {
this.ref = ref;
}
}
public static final class Queue {
final Object o;
public Queue(Object o) {
this.o = o;
}
}
public static final Object flush = new Object();
public static final class Batch {
final List<Object> objects;
public Batch(List<Object> objects) {
this.objects = objects;
}
}
//#data
//#base
static abstract class MyFSMBase extends UntypedActor {
/*
* This is the mutable state of this state machine.
*/
protected enum State { IDLE, ACTIVE; }
private State state = State.IDLE;
private ActorRef target;
private List<Object> queue;
/*
* Then come all the mutator methods:
*/
protected void init(ActorRef target) {
this.target = target;
queue = new ArrayList<Object>();
}
protected void setState(State s) {
if (state != s) {
transition(state, s);
state = s;
}
}
protected void enqueue(Object o) {
if (queue != null) queue.add(o);
}
protected List<Object> drainQueue() {
final List<Object> q = queue;
if (q == null) throw new IllegalStateException("drainQueue(): not yet initialized");
queue = new ArrayList<Object>();
return q;
}
/*
* Here are the interrogation methods:
*/
protected boolean isInitialized() {
return target != null;
}
protected State getState() {
return state;
}
protected ActorRef getTarget() {
if (target == null) throw new IllegalStateException("getTarget(): not yet initialized");
return target;
}
/*
* And finally the callbacks (only one in this example: react to state change)
*/
abstract protected void transition(State old, State next);
}
//#base
//#actor
static public class MyFSM extends MyFSMBase {
private final LoggingAdapter log = Logging.getLogger(getContext().system(), this);
@Override
public void onReceive(Object o) {
if (getState() == State.IDLE) {
if (o instanceof SetTarget)
init(((SetTarget) o).ref);
else whenUnhandled(o);
} else if (getState() == State.ACTIVE) {
if (o == flush)
setState(State.IDLE);
else whenUnhandled(o);
}
}
@Override
public void transition(State old, State next) {
if (old == State.ACTIVE) {
getTarget().tell(new Batch(drainQueue()));
}
}
private void whenUnhandled(Object o) {
if (o instanceof Queue && isInitialized()) {
enqueue(((Queue) o).o);
setState(State.ACTIVE);
} else {
log.warning("received unknown message {} in state {}", o, getState());
}
}
}
//#actor
ActorSystem system = ActorSystem.create();
@org.junit.Test
public void mustBunch() {
final ActorRef buncher = system.actorOf(new Props(MyFSM.class));
final TestProbe probe = new TestProbe(system);
buncher.tell(new SetTarget(probe.ref()));
buncher.tell(new Queue(1));
buncher.tell(new Queue(2));
buncher.tell(flush);
buncher.tell(new Queue(3));
final Batch b = probe.expectMsgClass(Batch.class);
assert b.objects.size() == 2;
assert b.objects.contains(1);
assert b.objects.contains(2);
}
@org.junit.After
public void cleanup() {
system.shutdown();
}
}

View file

@ -39,7 +39,8 @@ public class FaultHandlingTestBase {
static public class Supervisor extends UntypedActor { static public class Supervisor extends UntypedActor {
//#strategy //#strategy
private static SupervisorStrategy strategy = new OneForOneStrategy(new Function<Throwable, Action>() { private static SupervisorStrategy strategy = new OneForOneStrategy(10, Duration.parse("1 minute"),
new Function<Throwable, Action>() {
@Override @Override
public Action apply(Throwable t) { public Action apply(Throwable t) {
if (t instanceof ArithmeticException) { if (t instanceof ArithmeticException) {
@ -52,7 +53,7 @@ public class FaultHandlingTestBase {
return escalate(); return escalate();
} }
} }
}, 10, 60000); });
@Override @Override
public SupervisorStrategy supervisorStrategy() { public SupervisorStrategy supervisorStrategy() {
@ -76,7 +77,8 @@ public class FaultHandlingTestBase {
static public class Supervisor2 extends UntypedActor { static public class Supervisor2 extends UntypedActor {
//#strategy2 //#strategy2
private static SupervisorStrategy strategy = new OneForOneStrategy(new Function<Throwable, Action>() { private static SupervisorStrategy strategy = new OneForOneStrategy(10, Duration.parse("1 minute"),
new Function<Throwable, Action>() {
@Override @Override
public Action apply(Throwable t) { public Action apply(Throwable t) {
if (t instanceof ArithmeticException) { if (t instanceof ArithmeticException) {
@ -89,7 +91,7 @@ public class FaultHandlingTestBase {
return escalate(); return escalate();
} }
} }
}, 10, 60000); });
@Override @Override
public SupervisorStrategy supervisorStrategy() { public SupervisorStrategy supervisorStrategy() {

View file

@ -42,7 +42,7 @@ public class ExtensionDocTestBase {
//This method will be called by Akka //This method will be called by Akka
// to instantiate our Extension // to instantiate our Extension
public CountExtensionImpl createExtension(ActorSystemImpl system) { public CountExtensionImpl createExtension(ExtendedActorSystem system) {
return new CountExtensionImpl(); return new CountExtensionImpl();
} }
} }

View file

@ -8,7 +8,7 @@ import akka.actor.Extension;
import akka.actor.AbstractExtensionId; import akka.actor.AbstractExtensionId;
import akka.actor.ExtensionIdProvider; import akka.actor.ExtensionIdProvider;
import akka.actor.ActorSystem; import akka.actor.ActorSystem;
import akka.actor.ActorSystemImpl; import akka.actor.ExtendedActorSystem;
import akka.util.Duration; import akka.util.Duration;
import com.typesafe.config.Config; import com.typesafe.config.Config;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@ -44,7 +44,7 @@ public class SettingsExtensionDocTestBase {
return Settings.SettingsProvider; return Settings.SettingsProvider;
} }
public SettingsImpl createExtension(ActorSystemImpl system) { public SettingsImpl createExtension(ExtendedActorSystem system) {
return new SettingsImpl(system.settings().config()); return new SettingsImpl(system.settings().config());
} }
} }

79
akka-docs/java/fsm.rst Normal file
View file

@ -0,0 +1,79 @@
.. _fsm-java:
###########################################
Building Finite State Machine Actors (Java)
###########################################
.. sidebar:: Contents
.. contents:: :local:
Overview
========
The FSM (Finite State Machine) pattern is best described in the `Erlang design
principles
<http://www.erlang.org/documentation/doc-4.8.2/doc/design_principles/fsm.html>`_.
In short, it can be seen as a set of relations of the form:
**State(S) x Event(E) -> Actions (A), State(S')**
These relations are interpreted as meaning:
*If we are in state S and the event E occurs, we should perform the actions A
and make a transition to the state S'.*
While the Scala programming language enables the formulation of a nice internal
DSL (domain specific language) for formulating finite state machines (see
:ref:`fsm-scala`), Javas verbosity does not lend itself well to the same
approach. This chapter describes ways to effectively achieve the same
separation of concerns through self-discipline.
How State should be Handled
===========================
All mutable fields (or transitively mutable data structures) referenced by the
FSM actors implementation should be collected in one place and only mutated
using a small well-defined set of methods. One way to achieve this is to
assemble all mutable state in a superclass which keeps it private and offers
protected methods for mutating it.
.. includecode:: code/akka/docs/actor/FSMDocTestBase.java#imports-data
.. includecode:: code/akka/docs/actor/FSMDocTestBase.java#base
The benefit of this approach is that state changes can be acted upon in one
central place, which makes it impossible to forget inserting code for reacting
to state transitions when adding to the FSMs machinery.
Message Buncher Example
=======================
The base class shown above is designed to support a similar example as for the
Scala FSM documentation: an actor which receives and queues messages, to be
delivered in batches to a configurable target actor. The messages involved are:
.. includecode:: code/akka/docs/actor/FSMDocTestBase.java#data
This actor has only the two states ``IDLE`` and ``ACTIVE``, making their
handling quite straight-forward in the concrete actor derived from the base
class:
.. includecode:: code/akka/docs/actor/FSMDocTestBase.java#imports-actor
.. includecode:: code/akka/docs/actor/FSMDocTestBase.java#actor
The trick here is to factor out common functionality like :meth:`whenUnhandled`
and :meth:`transition` in order to obtain a few well-defined points for
reacting to change or insert logging.
State-Centric vs. Event-Centric
===============================
In the example above, the subjective complexity of state and events was roughly
equal, making it a matter of taste whether to choose primary dispatch on
either; in the example a state-based dispatch was chosen. Depending on how
evenly the matrix of possible states and events is populated, it may be more
practical to handle different events first and distinguish the states in the
second tier. An example would be a state machine which has a multitude of
internal states but handles only very few distinct events.

View file

@ -21,4 +21,5 @@ Java API
stm stm
agents agents
transactors transactors
fsm
extending-akka extending-akka

View file

@ -467,7 +467,7 @@ v1.3::
val supervisor = Supervisor( val supervisor = Supervisor(
SupervisorConfig( SupervisorConfig(
AllForOneStrategy(List(classOf[Exception]), 3, 1000), OneForOneStrategy(List(classOf[Exception]), 3, 1000),
Supervise( Supervise(
actorOf[MyActor1], actorOf[MyActor1],
Permanent) :: Permanent) ::
@ -479,12 +479,12 @@ v1.3::
v2.0:: v2.0::
class MyActor extends Actor { class MyActor extends Actor {
override val supervisorStrategy = OneForOneStrategy({ override val supervisorStrategy = OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange = 1 minute) {
case _: ArithmeticException ⇒ Resume case _: ArithmeticException ⇒ Resume
case _: NullPointerException ⇒ Restart case _: NullPointerException ⇒ Restart
case _: IllegalArgumentException ⇒ Stop case _: IllegalArgumentException ⇒ Stop
case _: Exception ⇒ Escalate case _: Exception ⇒ Escalate
}: Decider, maxNrOfRetries = Some(10), withinTimeRange = Some(60000)) }
def receive = { def receive = {
case x => case x =>

View file

@ -20,13 +20,14 @@ object FaultHandlingDocSpec {
//#strategy //#strategy
import akka.actor.OneForOneStrategy import akka.actor.OneForOneStrategy
import akka.actor.SupervisorStrategy._ import akka.actor.SupervisorStrategy._
import akka.util.duration._
override val supervisorStrategy = OneForOneStrategy({ override val supervisorStrategy = OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange = 1 minute) {
case _: ArithmeticException Resume case _: ArithmeticException Resume
case _: NullPointerException Restart case _: NullPointerException Restart
case _: IllegalArgumentException Stop case _: IllegalArgumentException Stop
case _: Exception Escalate case _: Exception Escalate
}: Decider, maxNrOfRetries = Some(10), withinTimeRange = Some(60000)) }
//#strategy //#strategy
def receive = { def receive = {
@ -40,13 +41,14 @@ object FaultHandlingDocSpec {
//#strategy2 //#strategy2
import akka.actor.OneForOneStrategy import akka.actor.OneForOneStrategy
import akka.actor.SupervisorStrategy._ import akka.actor.SupervisorStrategy._
import akka.util.duration._
override val supervisorStrategy = OneForOneStrategy({ override val supervisorStrategy = OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange = 1 minute) {
case _: ArithmeticException Resume case _: ArithmeticException Resume
case _: NullPointerException Restart case _: NullPointerException Restart
case _: IllegalArgumentException Stop case _: IllegalArgumentException Stop
case _: Exception Escalate case _: Exception Escalate
}: Decider, maxNrOfRetries = Some(10), withinTimeRange = Some(60000)) }
//#strategy2 //#strategy2
def receive = { def receive = {

View file

@ -23,7 +23,7 @@ class CountExtensionImpl extends Extension {
//#extensionid //#extensionid
import akka.actor.ExtensionId import akka.actor.ExtensionId
import akka.actor.ExtensionIdProvider import akka.actor.ExtensionIdProvider
import akka.actor.ActorSystemImpl import akka.actor.ExtendedActorSystem
object CountExtension object CountExtension
extends ExtensionId[CountExtensionImpl] extends ExtensionId[CountExtensionImpl]
@ -36,7 +36,7 @@ object CountExtension
//This method will be called by Akka //This method will be called by Akka
// to instantiate our Extension // to instantiate our Extension
override def createExtension(system: ActorSystemImpl) = new CountExtensionImpl override def createExtension(system: ExtendedActorSystem) = new CountExtensionImpl
} }
//#extensionid //#extensionid

View file

@ -7,7 +7,7 @@ package akka.docs.extension
import akka.actor.Extension import akka.actor.Extension
import akka.actor.ExtensionId import akka.actor.ExtensionId
import akka.actor.ExtensionIdProvider import akka.actor.ExtensionIdProvider
import akka.actor.ActorSystemImpl import akka.actor.ExtendedActorSystem
import akka.util.Duration import akka.util.Duration
import com.typesafe.config.Config import com.typesafe.config.Config
import java.util.concurrent.TimeUnit import java.util.concurrent.TimeUnit
@ -29,7 +29,7 @@ object Settings extends ExtensionId[SettingsImpl] with ExtensionIdProvider {
override def lookup = Settings override def lookup = Settings
override def createExtension(system: ActorSystemImpl) = new SettingsImpl(system.settings.config) override def createExtension(system: ExtendedActorSystem) = new SettingsImpl(system.settings.config)
} }
//#extensionid //#extensionid

View file

@ -31,8 +31,7 @@ that the respective limit does not apply, leaving the possibility to specify an
absolute upper limit on the restarts or to make the restarts work infinitely. absolute upper limit on the restarts or to make the restarts work infinitely.
The match statement which forms the bulk of the body is of type ``Decider``, The match statement which forms the bulk of the body is of type ``Decider``,
which is a ``PartialFunction[Throwable, Action]``, and we need to help out the which is a ``PartialFunction[Throwable, Action]``. This
type inferencer a bit here by ascribing that type after the closing brace. This
is the piece which maps child failure types to their corresponding actions. is the piece which maps child failure types to their corresponding actions.
Practical Application Practical Application

View file

@ -1,4 +1,4 @@
.. _fsm: .. _fsm-scala:
### ###
FSM FSM
@ -21,7 +21,8 @@ A FSM can be described as a set of relations of the form:
These relations are interpreted as meaning: These relations are interpreted as meaning:
*If we are in state S and the event E occurs, we should perform the actions A and make a transition to the state S'.* *If we are in state S and the event E occurs, we should perform the actions A
and make a transition to the state S'.*
A Simple Example A Simple Example
================ ================

View file

@ -11,7 +11,7 @@ import akka.actor._
object BeanstalkBasedMailboxExtension extends ExtensionId[BeanstalkMailboxSettings] with ExtensionIdProvider { object BeanstalkBasedMailboxExtension extends ExtensionId[BeanstalkMailboxSettings] with ExtensionIdProvider {
override def get(system: ActorSystem): BeanstalkMailboxSettings = super.get(system) override def get(system: ActorSystem): BeanstalkMailboxSettings = super.get(system)
def lookup() = this def lookup() = this
def createExtension(system: ActorSystemImpl) = new BeanstalkMailboxSettings(system.settings.config) def createExtension(system: ExtendedActorSystem) = new BeanstalkMailboxSettings(system.settings.config)
} }
class BeanstalkMailboxSettings(val config: Config) extends Extension { class BeanstalkMailboxSettings(val config: Config) extends Extension {

View file

@ -11,7 +11,7 @@ import akka.actor._
object FileBasedMailboxExtension extends ExtensionId[FileBasedMailboxSettings] with ExtensionIdProvider { object FileBasedMailboxExtension extends ExtensionId[FileBasedMailboxSettings] with ExtensionIdProvider {
override def get(system: ActorSystem): FileBasedMailboxSettings = super.get(system) override def get(system: ActorSystem): FileBasedMailboxSettings = super.get(system)
def lookup() = this def lookup() = this
def createExtension(system: ActorSystemImpl) = new FileBasedMailboxSettings(system.settings.config) def createExtension(system: ExtendedActorSystem) = new FileBasedMailboxSettings(system.settings.config)
} }
class FileBasedMailboxSettings(val config: Config) extends Extension { class FileBasedMailboxSettings(val config: Config) extends Extension {

View file

@ -11,7 +11,7 @@ import akka.actor._
object MongoBasedMailboxExtension extends ExtensionId[MongoBasedMailboxSettings] with ExtensionIdProvider { object MongoBasedMailboxExtension extends ExtensionId[MongoBasedMailboxSettings] with ExtensionIdProvider {
override def get(system: ActorSystem): MongoBasedMailboxSettings = super.get(system) override def get(system: ActorSystem): MongoBasedMailboxSettings = super.get(system)
def lookup() = this def lookup() = this
def createExtension(system: ActorSystemImpl) = new MongoBasedMailboxSettings(system.settings.config) def createExtension(system: ExtendedActorSystem) = new MongoBasedMailboxSettings(system.settings.config)
} }
class MongoBasedMailboxSettings(val config: Config) extends Extension { class MongoBasedMailboxSettings(val config: Config) extends Extension {

View file

@ -9,7 +9,7 @@ import akka.actor._
object RedisBasedMailboxExtension extends ExtensionId[RedisBasedMailboxSettings] with ExtensionIdProvider { object RedisBasedMailboxExtension extends ExtensionId[RedisBasedMailboxSettings] with ExtensionIdProvider {
override def get(system: ActorSystem): RedisBasedMailboxSettings = super.get(system) override def get(system: ActorSystem): RedisBasedMailboxSettings = super.get(system)
def lookup() = this def lookup() = this
def createExtension(system: ActorSystemImpl) = new RedisBasedMailboxSettings(system.settings.config) def createExtension(system: ExtendedActorSystem) = new RedisBasedMailboxSettings(system.settings.config)
} }
class RedisBasedMailboxSettings(val config: Config) extends Extension { class RedisBasedMailboxSettings(val config: Config) extends Extension {

View file

@ -11,7 +11,7 @@ import akka.actor._
object ZooKeeperBasedMailboxExtension extends ExtensionId[ZooKeeperBasedMailboxSettings] with ExtensionIdProvider { object ZooKeeperBasedMailboxExtension extends ExtensionId[ZooKeeperBasedMailboxSettings] with ExtensionIdProvider {
override def get(system: ActorSystem): ZooKeeperBasedMailboxSettings = super.get(system) override def get(system: ActorSystem): ZooKeeperBasedMailboxSettings = super.get(system)
def lookup() = this def lookup() = this
def createExtension(system: ActorSystemImpl) = new ZooKeeperBasedMailboxSettings(system.settings.config) def createExtension(system: ExtendedActorSystem) = new ZooKeeperBasedMailboxSettings(system.settings.config)
} }
class ZooKeeperBasedMailboxSettings(val config: Config) extends Extension { class ZooKeeperBasedMailboxSettings(val config: Config) extends Extension {

View file

@ -3,26 +3,20 @@
*/ */
package akka.testkit package akka.testkit
import akka.event.Logging.{ Warning, Error } import java.lang.ref.WeakReference
import java.util.concurrent.locks.ReentrantLock import java.util.concurrent.locks.ReentrantLock
import java.util.LinkedList import java.util.LinkedList
import java.util.concurrent.RejectedExecutionException
import akka.util.Switch
import java.lang.ref.WeakReference
import scala.annotation.tailrec import scala.annotation.tailrec
import akka.actor.{ ActorCell, ActorRef, ActorSystem }
import akka.dispatch._
import akka.actor.Scheduler
import akka.event.EventStream
import akka.util.Duration
import akka.util.duration._
import java.util.concurrent.TimeUnit
import akka.actor.ExtensionId
import akka.actor.ExtensionIdProvider
import akka.actor.ActorSystemImpl
import akka.actor.Extension
import com.typesafe.config.Config import com.typesafe.config.Config
import CallingThreadDispatcher.Id
import akka.actor.{ ExtensionIdProvider, ExtensionId, Extension, ExtendedActorSystem, ActorRef, ActorCell }
import akka.dispatch.{ TaskInvocation, SystemMessage, Suspend, Resume, MessageDispatcherConfigurator, MessageDispatcher, Mailbox, Envelope, DispatcherPrerequisites, DefaultSystemMessageQueue }
import akka.util.duration.intToDurationInt
import akka.util.{ Switch, Duration }
/* /*
* Locking rules: * Locking rules:
* *
@ -42,7 +36,7 @@ import com.typesafe.config.Config
private[testkit] object CallingThreadDispatcherQueues extends ExtensionId[CallingThreadDispatcherQueues] with ExtensionIdProvider { private[testkit] object CallingThreadDispatcherQueues extends ExtensionId[CallingThreadDispatcherQueues] with ExtensionIdProvider {
override def lookup = CallingThreadDispatcherQueues override def lookup = CallingThreadDispatcherQueues
override def createExtension(system: ActorSystemImpl): CallingThreadDispatcherQueues = new CallingThreadDispatcherQueues override def createExtension(system: ExtendedActorSystem): CallingThreadDispatcherQueues = new CallingThreadDispatcherQueues
} }
private[testkit] class CallingThreadDispatcherQueues extends Extension { private[testkit] class CallingThreadDispatcherQueues extends Extension {

View file

@ -6,11 +6,11 @@ package akka.testkit
import com.typesafe.config.Config import com.typesafe.config.Config
import akka.util.Duration import akka.util.Duration
import java.util.concurrent.TimeUnit.MILLISECONDS import java.util.concurrent.TimeUnit.MILLISECONDS
import akka.actor.{ ExtensionId, ActorSystem, Extension, ActorSystemImpl } import akka.actor.{ ExtensionId, ActorSystem, Extension, ExtendedActorSystem }
object TestKitExtension extends ExtensionId[TestKitSettings] { object TestKitExtension extends ExtensionId[TestKitSettings] {
override def get(system: ActorSystem): TestKitSettings = super.get(system) override def get(system: ActorSystem): TestKitSettings = super.get(system)
def createExtension(system: ActorSystemImpl): TestKitSettings = new TestKitSettings(system.settings.config) def createExtension(system: ExtendedActorSystem): TestKitSettings = new TestKitSettings(system.settings.config)
} }
class TestKitSettings(val config: Config) extends Extension { class TestKitSettings(val config: Config) extends Extension {

View file

@ -182,7 +182,8 @@ class TestActorRefSpec extends AkkaSpec("disp1.type=Dispatcher") with BeforeAndA
override def postRestart(reason: Throwable) { counter -= 1 } override def postRestart(reason: Throwable) { counter -= 1 }
}), self, "child") }), self, "child")
override def supervisorStrategy = OneForOneStrategy(List(classOf[ActorKilledException]), 5, 1000) override def supervisorStrategy =
OneForOneStrategy(maxNrOfRetries = 5, withinTimeRange = 1 second)(List(classOf[ActorKilledException]))
def receiveT = { case "sendKill" ref ! Kill } def receiveT = { case "sendKill" ref ! Kill }
})) }))

View file

@ -24,7 +24,7 @@ case class ZeroMQVersion(major: Int, minor: Int, patch: Int) {
*/ */
object ZeroMQExtension extends ExtensionId[ZeroMQExtension] with ExtensionIdProvider { object ZeroMQExtension extends ExtensionId[ZeroMQExtension] with ExtensionIdProvider {
def lookup() = this def lookup() = this
def createExtension(system: ActorSystemImpl) = new ZeroMQExtension(system) def createExtension(system: ExtendedActorSystem) = new ZeroMQExtension(system)
private val minVersionString = "2.1.0" private val minVersionString = "2.1.0"
private val minVersion = JZMQ.makeVersion(2, 1, 0) private val minVersion = JZMQ.makeVersion(2, 1, 0)

View file

@ -8,7 +8,7 @@ import sbt._
import sbt.Keys._ import sbt.Keys._
import com.typesafe.sbtmultijvm.MultiJvmPlugin import com.typesafe.sbtmultijvm.MultiJvmPlugin
import com.typesafe.sbtmultijvm.MultiJvmPlugin.{ MultiJvm, extraOptions, jvmOptions, scalatestOptions } import com.typesafe.sbtmultijvm.MultiJvmPlugin.{ MultiJvm, extraOptions, jvmOptions, scalatestOptions }
import com.typesafe.schoir.SchoirPlugin.schoirSettings //import com.typesafe.schoir.SchoirPlugin.schoirSettings
import com.typesafe.sbtscalariform.ScalariformPlugin import com.typesafe.sbtscalariform.ScalariformPlugin
import com.typesafe.sbtscalariform.ScalariformPlugin.ScalariformKeys import com.typesafe.sbtscalariform.ScalariformPlugin.ScalariformKeys
import java.lang.Boolean.getBoolean import java.lang.Boolean.getBoolean
@ -71,7 +71,7 @@ object AkkaBuild extends Build {
id = "akka-remote", id = "akka-remote",
base = file("akka-remote"), base = file("akka-remote"),
dependencies = Seq(actor, actorTests % "test->test", testkit % "test->test"), dependencies = Seq(actor, actorTests % "test->test", testkit % "test->test"),
settings = defaultSettings ++ multiJvmSettings ++ schoirSettings ++ Seq( settings = defaultSettings ++ multiJvmSettings ++ /*schoirSettings ++*/ Seq(
libraryDependencies ++= Dependencies.remote, libraryDependencies ++= Dependencies.remote,
// disable parallel tests // disable parallel tests
parallelExecution in Test := false, parallelExecution in Test := false,

View file

@ -3,7 +3,7 @@ resolvers += Classpaths.typesafeResolver
addSbtPlugin("com.typesafe.sbtmultijvm" % "sbt-multi-jvm" % "0.1.9") addSbtPlugin("com.typesafe.sbtmultijvm" % "sbt-multi-jvm" % "0.1.9")
addSbtPlugin("com.typesafe.schoir" % "schoir" % "0.1.1") //addSbtPlugin("com.typesafe.schoir" % "schoir" % "0.1.1")
addSbtPlugin("com.typesafe.sbteclipse" % "sbteclipse" % "1.5.0") addSbtPlugin("com.typesafe.sbteclipse" % "sbteclipse" % "1.5.0")