From d3837b9fc3f384bd51a847340d218b0bb67ec6cf Mon Sep 17 00:00:00 2001 From: Roland Date: Tue, 18 Oct 2011 15:39:26 +0200 Subject: [PATCH] Introduce parental supervision, BUT TESTS ARE STILL FAILING MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - every actor is now supervised, where the root of the tree is app.guardian, which has its supervisor field set to a special ActorRef obtained from provider.theOneWhoWalksTheBubblesOfSpaceTime (this name is meant to indicate that this ref is outside of the universe, cf. Michio Kaku) - changed all tests to obtain specially supervised children (i.e. not top-level) via (supervisor ? Props).as[ActorRef].get - add private[akka] ScalaActorRef.sendSystemMessage for sending Supervise() - everything routing or remote is broken wrt. supervision, as that was not “properly” implemented to begin with, will be tackled after app/supervision/eventbus/AkkaSpec are stabilized enough --- .../ActorFireForgetRequestReplySpec.scala | 6 +- .../scala/akka/actor/ActorLifeCycleSpec.scala | 17 +++--- .../test/scala/akka/actor/ActorRefSpec.scala | 4 +- .../scala/akka/actor/DeathWatchSpec.scala | 20 ++++++- .../src/test/scala/akka/actor/IOActor.scala | 4 +- .../actor/LocalActorRefProviderSpec.scala | 8 +-- .../scala/akka/actor/LoggingReceiveSpec.scala | 2 +- .../akka/actor/RestartStrategySpec.scala | 46 +++++++------- .../test/scala/akka/actor/SchedulerSpec.scala | 7 ++- .../test/scala/akka/actor/Supervisor.scala | 10 ++++ .../akka/actor/SupervisorHierarchySpec.scala | 16 +++-- .../scala/akka/actor/SupervisorMiscSpec.scala | 15 ++--- .../scala/akka/actor/SupervisorSpec.scala | 44 +++++++------- .../scala/akka/actor/SupervisorTreeSpec.scala | 9 +-- .../test/scala/akka/actor/Ticket669Spec.scala | 8 +-- .../src/main/scala/akka/AkkaApplication.scala | 27 ++++++--- .../src/main/scala/akka/actor/ActorCell.scala | 42 +++++-------- .../src/main/scala/akka/actor/ActorRef.scala | 37 +++++++++--- .../scala/akka/actor/ActorRefProvider.scala | 60 ++++++++++++++----- .../src/main/scala/akka/actor/Props.scala | 23 +------ .../akka/dispatch/AbstractDispatcher.scala | 16 ++--- .../main/scala/akka/event/EventHandler.scala | 2 +- .../scala/akka/remote/RemoteInterface.scala | 2 +- .../src/main/scala/akka/routing/Pool.scala | 2 +- .../src/main/scala/akka/routing/Routing.scala | 8 +-- .../akka/remote/NetworkEventStream.scala | 4 +- .../akka/remote/RemoteActorRefProvider.scala | 19 +++--- .../main/scala/akka/remote/RemoteDaemon.scala | 20 ++++--- .../serialization/SerializationProtocol.scala | 7 +-- .../serialization/ActorSerializeSpec.scala | 8 +-- .../scala/akka/testkit/TestActorRef.scala | 9 ++- .../main/scala/akka/testkit/TestFSMRef.scala | 8 +-- .../src/main/scala/akka/testkit/TestKit.scala | 2 +- .../scala/akka/testkit/TestActorRefSpec.scala | 4 +- 34 files changed, 290 insertions(+), 226 deletions(-) create mode 100644 akka-actor-tests/src/test/scala/akka/actor/Supervisor.scala diff --git a/akka-actor-tests/src/test/scala/akka/actor/ActorFireForgetRequestReplySpec.scala b/akka-actor-tests/src/test/scala/akka/actor/ActorFireForgetRequestReplySpec.scala index 461868df21..895cc31c79 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/ActorFireForgetRequestReplySpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/ActorFireForgetRequestReplySpec.scala @@ -45,10 +45,6 @@ object ActorFireForgetRequestReplySpec { } } - class Supervisor extends Actor { - def receive = { case _ ⇒ () } - } - object state { var s = "NIL" val finished = TestBarrier(2) @@ -83,7 +79,7 @@ class ActorFireForgetRequestReplySpec extends AkkaSpec with BeforeAndAfterEach { "should shutdown crashed temporary actor" in { filterEvents(EventFilter[Exception]("Expected")) { val supervisor = actorOf(Props[Supervisor].withFaultHandler(OneForOneStrategy(List(classOf[Exception]), Some(0)))) - val actor = actorOf(Props[CrashingActor].withSupervisor(supervisor)) + val actor = (supervisor ? Props[CrashingActor]).as[ActorRef].get actor.isShutdown must be(false) actor ! "Die" state.finished.await diff --git a/akka-actor-tests/src/test/scala/akka/actor/ActorLifeCycleSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/ActorLifeCycleSpec.scala index 52e1bd6d58..12456fd3ff 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/ActorLifeCycleSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/ActorLifeCycleSpec.scala @@ -33,12 +33,13 @@ class ActorLifeCycleSpec extends AkkaSpec with BeforeAndAfterEach with ImplicitS "invoke preRestart, preStart, postRestart when using OneForOneStrategy" in { filterException[ActorKilledException] { val id = newUuid().toString - val supervisor = actorOf(Props(self ⇒ { case _ ⇒ }).withFaultHandler(OneForOneStrategy(List(classOf[Exception]), Some(3)))) + val supervisor = actorOf(Props[Supervisor].withFaultHandler(OneForOneStrategy(List(classOf[Exception]), Some(3)))) val gen = new AtomicInteger(0) - val restarter = actorOf(Props(new LifeCycleTestActor(id, gen) { + val restarterProps = Props(new LifeCycleTestActor(id, gen) { override def preRestart(reason: Throwable, message: Option[Any]) { report("preRestart") } override def postRestart(reason: Throwable) { report("postRestart") } - }).withSupervisor(supervisor)) + }) + val restarter = (supervisor ? restarterProps).as[ActorRef].get expectMsg(("preStart", id, 0)) restarter ! Kill @@ -66,9 +67,10 @@ class ActorLifeCycleSpec extends AkkaSpec with BeforeAndAfterEach with ImplicitS "default for preRestart and postRestart is to call postStop and preStart respectively" in { filterException[ActorKilledException] { val id = newUuid().toString - val supervisor = actorOf(Props(self ⇒ { case _ ⇒ }).withFaultHandler(OneForOneStrategy(List(classOf[Exception]), Some(3)))) + val supervisor = actorOf(Props[Supervisor].withFaultHandler(OneForOneStrategy(List(classOf[Exception]), Some(3)))) val gen = new AtomicInteger(0) - val restarter = actorOf(Props(new LifeCycleTestActor(id, gen)).withSupervisor(supervisor)) + val restarterProps = Props(new LifeCycleTestActor(id, gen)) + val restarter = (supervisor ? restarterProps).as[ActorRef].get expectMsg(("preStart", id, 0)) restarter ! Kill @@ -95,9 +97,10 @@ class ActorLifeCycleSpec extends AkkaSpec with BeforeAndAfterEach with ImplicitS "not invoke preRestart and postRestart when never restarted using OneForOneStrategy" in { val id = newUuid().toString - val supervisor = actorOf(Props(self ⇒ { case _ ⇒ }).withFaultHandler(OneForOneStrategy(List(classOf[Exception]), Some(3)))) + val supervisor = actorOf(Props[Supervisor].withFaultHandler(OneForOneStrategy(List(classOf[Exception]), Some(3)))) val gen = new AtomicInteger(0) - val a = actorOf(Props(new LifeCycleTestActor(id, gen)).withSupervisor(supervisor)) + val props = Props(new LifeCycleTestActor(id, gen)) + val a = (supervisor ? props).as[ActorRef].get expectMsg(("preStart", id, 0)) a ! "status" expectMsg(("OK", id, 0)) diff --git a/akka-actor-tests/src/test/scala/akka/actor/ActorRefSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/ActorRefSpec.scala index 9ab8615594..747f776f9b 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/ActorRefSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/ActorRefSpec.scala @@ -394,12 +394,12 @@ class ActorRefSpec extends AkkaSpec { val boss = actorOf(Props(new Actor { - val ref = actorOf( + val ref = context.actorOf( Props(new Actor { def receive = { case _ ⇒ } override def preRestart(reason: Throwable, msg: Option[Any]) = latch.countDown() override def postRestart(reason: Throwable) = latch.countDown() - }).withSupervisor(self)) + })) protected def receive = { case "sendKill" ⇒ ref ! Kill } }).withFaultHandler(OneForOneStrategy(List(classOf[Throwable]), 2, 1000))) diff --git a/akka-actor-tests/src/test/scala/akka/actor/DeathWatchSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/DeathWatchSpec.scala index ca8fdad334..897d3a6180 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/DeathWatchSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/DeathWatchSpec.scala @@ -69,8 +69,9 @@ class DeathWatchSpec extends AkkaSpec with BeforeAndAfterEach with ImplicitSende "notify with a Terminated message once when an Actor is stopped but not when restarted" in { filterException[ActorKilledException] { - val supervisor = actorOf(Props(context ⇒ { case _ ⇒ }).withFaultHandler(OneForOneStrategy(List(classOf[Exception]), Some(2)))) - val terminal = actorOf(Props(context ⇒ { case x ⇒ context.channel ! x }).withSupervisor(supervisor)) + val supervisor = actorOf(Props[Supervisor].withFaultHandler(OneForOneStrategy(List(classOf[Exception]), Some(2)))) + val terminalProps = Props(context ⇒ { case x ⇒ context.channel ! x }) + val terminal = (supervisor ? terminalProps).as[ActorRef].get testActor startsMonitoring terminal @@ -85,6 +86,21 @@ class DeathWatchSpec extends AkkaSpec with BeforeAndAfterEach with ImplicitSende supervisor.stop() } } + + "fail a monitor which does not handle Terminated()" in { + filterEvents(EventFilter[ActorKilledException], EventFilter[DeathPactException]) { + val supervisor = actorOf(Props[Supervisor].withFaultHandler(OneForOneStrategy(List(classOf[Exception]), Some(0)))) + + val failed, brother = (supervisor ? Props.empty).as[ActorRef].get + brother startsMonitoring failed + testActor startsMonitoring brother + + failed ! Kill + expectMsgPF() { + case Terminated(brother, DeathPactException(failed, _: ActorKilledException)) ⇒ true + } + } + } } } diff --git a/akka-actor-tests/src/test/scala/akka/actor/IOActor.scala b/akka-actor-tests/src/test/scala/akka/actor/IOActor.scala index 85c29e1033..78ca017679 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/IOActor.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/IOActor.scala @@ -31,7 +31,7 @@ object IOActorSpec { socket write bytes } } - }).withSupervisor(optionSelf)) + })) def receive = { case msg: NewClient ⇒ @@ -102,7 +102,7 @@ object IOActorSpec { } } } - }).withSupervisor(self)) + })) def receive = { case msg: NewClient ⇒ createWorker forward msg diff --git a/akka-actor-tests/src/test/scala/akka/actor/LocalActorRefProviderSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/LocalActorRefProviderSpec.scala index 9987a2dfcd..f3d0168613 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/LocalActorRefProviderSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/LocalActorRefProviderSpec.scala @@ -38,19 +38,19 @@ class LocalActorRefProviderSpec extends AkkaSpec { val address = "new-actor" + i spawn { - a1 = Some(provider.actorOf(Props(creator = () ⇒ new NewActor), address)) + a1 = Some(provider.actorOf(Props(creator = () ⇒ new NewActor), app.guardian, address)) latch.countDown() } spawn { - a2 = Some(provider.actorOf(Props(creator = () ⇒ new NewActor), address)) + a2 = Some(provider.actorOf(Props(creator = () ⇒ new NewActor), app.guardian, address)) latch.countDown() } spawn { - a3 = Some(provider.actorOf(Props(creator = () ⇒ new NewActor), address)) + a3 = Some(provider.actorOf(Props(creator = () ⇒ new NewActor), app.guardian, address)) latch.countDown() } spawn { - a4 = Some(provider.actorOf(Props(creator = () ⇒ new NewActor), address)) + a4 = Some(provider.actorOf(Props(creator = () ⇒ new NewActor), app.guardian, address)) latch.countDown() } diff --git a/akka-actor-tests/src/test/scala/akka/actor/LoggingReceiveSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/LoggingReceiveSpec.scala index 0af11ba652..e767ddeeb7 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/LoggingReceiveSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/LoggingReceiveSpec.scala @@ -129,7 +129,7 @@ class LoggingReceiveSpec extends WordSpec with BeforeAndAfterEach with BeforeAnd expectMsg(EventHandler.Debug(supervisor, "started")) - val actor = TestActorRef[TestLogActor](Props[TestLogActor].withSupervisor(supervisor)) + val actor = new TestActorRef[TestLogActor](app, Props[TestLogActor], supervisor, "none") expectMsgPF() { case EventHandler.Debug(ref, msg: String) ⇒ ref == supervisor && msg.startsWith("now supervising") diff --git a/akka-actor-tests/src/test/scala/akka/actor/RestartStrategySpec.scala b/akka-actor-tests/src/test/scala/akka/actor/RestartStrategySpec.scala index a43665e91a..545901d3f3 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/RestartStrategySpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/RestartStrategySpec.scala @@ -28,16 +28,14 @@ class RestartStrategySpec extends AkkaSpec with BeforeAndAfterAll { "A RestartStrategy" must { "ensure that slave stays dead after max restarts within time range" in { - val boss = actorOf(Props(new Actor { - protected def receive = { case _ ⇒ () } - }).withFaultHandler(OneForOneStrategy(List(classOf[Throwable]), 2, 1000))) + val boss = actorOf(Props[Supervisor].withFaultHandler(OneForOneStrategy(List(classOf[Throwable]), 2, 1000))) val restartLatch = new StandardLatch val secondRestartLatch = new StandardLatch val countDownLatch = new CountDownLatch(3) val stopLatch = new StandardLatch - val slave = actorOf(Props(new Actor { + val slaveProps = Props(new Actor { protected def receive = { case Ping ⇒ countDownLatch.countDown() @@ -54,7 +52,8 @@ class RestartStrategySpec extends AkkaSpec with BeforeAndAfterAll { override def postStop() = { stopLatch.open } - }).withSupervisor(boss)) + }) + val slave = (boss ? slaveProps).as[ActorRef].get slave ! Ping slave ! Crash @@ -75,13 +74,11 @@ class RestartStrategySpec extends AkkaSpec with BeforeAndAfterAll { } "ensure that slave is immortal without max restarts and time range" in { - val boss = actorOf(Props(new Actor { - def receive = { case _ ⇒ () } - }).withFaultHandler(OneForOneStrategy(List(classOf[Throwable]), None, None))) + val boss = actorOf(Props[Supervisor].withFaultHandler(OneForOneStrategy(List(classOf[Throwable]), None, None))) val countDownLatch = new CountDownLatch(100) - val slave = actorOf(Props(new Actor { + val slaveProps = Props(new Actor { protected def receive = { case Crash ⇒ throw new Exception("Crashing...") @@ -90,7 +87,8 @@ class RestartStrategySpec extends AkkaSpec with BeforeAndAfterAll { override def postRestart(reason: Throwable) = { countDownLatch.countDown() } - }).withSupervisor(boss)) + }) + val slave = (boss ? slaveProps).as[ActorRef].get (1 to 100) foreach { _ ⇒ slave ! Crash } assert(countDownLatch.await(120, TimeUnit.SECONDS)) @@ -98,9 +96,7 @@ class RestartStrategySpec extends AkkaSpec with BeforeAndAfterAll { } "ensure that slave restarts after number of crashes not within time range" in { - val boss = actorOf(Props(new Actor { - def receive = { case _ ⇒ () } - }).withFaultHandler(OneForOneStrategy(List(classOf[Throwable]), 2, 500))) + val boss = actorOf(Props[Supervisor].withFaultHandler(OneForOneStrategy(List(classOf[Throwable]), 2, 500))) val restartLatch = new StandardLatch val secondRestartLatch = new StandardLatch @@ -108,7 +104,7 @@ class RestartStrategySpec extends AkkaSpec with BeforeAndAfterAll { val pingLatch = new StandardLatch val secondPingLatch = new StandardLatch - val slave = actorOf(Props(new Actor { + val slaveProps = Props(new Actor { protected def receive = { case Ping ⇒ @@ -129,7 +125,8 @@ class RestartStrategySpec extends AkkaSpec with BeforeAndAfterAll { secondRestartLatch.open } } - }).withSupervisor(boss)) + }) + val slave = (boss ? slaveProps).as[ActorRef].get slave ! Ping slave ! Crash @@ -156,16 +153,14 @@ class RestartStrategySpec extends AkkaSpec with BeforeAndAfterAll { } "ensure that slave is not restarted after max retries" in { - val boss = actorOf(Props(new Actor { - def receive = { case _ ⇒ () } - }).withFaultHandler(OneForOneStrategy(List(classOf[Throwable]), Some(2), None))) + val boss = actorOf(Props[Supervisor].withFaultHandler(OneForOneStrategy(List(classOf[Throwable]), Some(2), None))) val restartLatch = new StandardLatch val secondRestartLatch = new StandardLatch val countDownLatch = new CountDownLatch(3) val stopLatch = new StandardLatch - val slave = actorOf(Props(new Actor { + val slaveProps = Props(new Actor { protected def receive = { case Ping ⇒ countDownLatch.countDown() @@ -181,7 +176,8 @@ class RestartStrategySpec extends AkkaSpec with BeforeAndAfterAll { override def postStop() = { stopLatch.open } - }).withSupervisor(boss)) + }) + val slave = (boss ? slaveProps).as[ActorRef].get slave ! Ping slave ! Crash @@ -212,10 +208,13 @@ class RestartStrategySpec extends AkkaSpec with BeforeAndAfterAll { val countDownLatch = new CountDownLatch(2) val boss = actorOf(Props(new Actor { - def receive = { case t: Terminated ⇒ maxNoOfRestartsLatch.open } + def receive = { + case p: Props ⇒ reply(context.actorOf(p)) + case t: Terminated ⇒ maxNoOfRestartsLatch.open + } }).withFaultHandler(OneForOneStrategy(List(classOf[Throwable]), None, Some(1000)))) - val slave = actorOf(Props(new Actor { + val slaveProps = Props(new Actor { protected def receive = { case Ping ⇒ countDownLatch.countDown() @@ -229,7 +228,8 @@ class RestartStrategySpec extends AkkaSpec with BeforeAndAfterAll { override def postStop() = { stopLatch.open } - }).withSupervisor(boss)) + }) + val slave = (boss ? slaveProps).as[ActorRef].get boss startsMonitoring slave diff --git a/akka-actor-tests/src/test/scala/akka/actor/SchedulerSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/SchedulerSpec.scala index 4e3ea20fe1..2aa2c3452e 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/SchedulerSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/SchedulerSpec.scala @@ -106,15 +106,16 @@ class SchedulerSpec extends AkkaSpec with BeforeAndAfterEach { val restartLatch = new StandardLatch val pingLatch = new CountDownLatch(6) - val supervisor = actorOf(Props(context ⇒ { case _ ⇒ }).withFaultHandler(AllForOneStrategy(List(classOf[Exception]), 3, 1000))) - val actor = actorOf(Props(new Actor { + val supervisor = actorOf(Props[Supervisor].withFaultHandler(AllForOneStrategy(List(classOf[Exception]), 3, 1000))) + val props = Props(new Actor { def receive = { case Ping ⇒ pingLatch.countDown() case Crash ⇒ throw new Exception("CRASH") } override def postRestart(reason: Throwable) = restartLatch.open - }).withSupervisor(supervisor)) + }) + val actor = (supervisor ? props).as[ActorRef].get collectFuture(app.scheduler.schedule(actor, Ping, 500, 500, TimeUnit.MILLISECONDS)) // appx 2 pings before crash diff --git a/akka-actor-tests/src/test/scala/akka/actor/Supervisor.scala b/akka-actor-tests/src/test/scala/akka/actor/Supervisor.scala new file mode 100644 index 0000000000..023490da31 --- /dev/null +++ b/akka-actor-tests/src/test/scala/akka/actor/Supervisor.scala @@ -0,0 +1,10 @@ +/** + * Copyright (C) 2009-2011 Typesafe Inc. + */ +package akka.actor + +class Supervisor extends Actor { + def receive = { + case x: Props ⇒ reply(context.actorOf(x)) + } +} diff --git a/akka-actor-tests/src/test/scala/akka/actor/SupervisorHierarchySpec.scala b/akka-actor-tests/src/test/scala/akka/actor/SupervisorHierarchySpec.scala index 4404e53c20..cc16c2da08 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/SupervisorHierarchySpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/SupervisorHierarchySpec.scala @@ -12,7 +12,9 @@ object SupervisorHierarchySpec { class FireWorkerException(msg: String) extends Exception(msg) class CountDownActor(countDown: CountDownLatch) extends Actor { - protected def receive = { case _ ⇒ } + protected def receive = { + case p: Props ⇒ reply(context.actorOf(p)) + } override def postRestart(reason: Throwable) = { countDown.countDown() } @@ -27,12 +29,13 @@ class SupervisorHierarchySpec extends AkkaSpec { "restart manager and workers in AllForOne" in { val countDown = new CountDownLatch(4) - val boss = actorOf(Props(self ⇒ { case _ ⇒ }).withFaultHandler(OneForOneStrategy(List(classOf[Exception]), None, None))) + val boss = actorOf(Props[Supervisor].withFaultHandler(OneForOneStrategy(List(classOf[Exception]), None, None))) - val manager = actorOf(Props(new CountDownActor(countDown)).withFaultHandler(AllForOneStrategy(List(), None, None)).withSupervisor(boss)) + val managerProps = Props(new CountDownActor(countDown)).withFaultHandler(AllForOneStrategy(List(), None, None)) + val manager = (boss ? managerProps).as[ActorRef].get - val workerProps = Props(new CountDownActor(countDown)).withSupervisor(manager) - val workerOne, workerTwo, workerThree = actorOf(workerProps) + val workerProps = Props(new CountDownActor(countDown)) + val workerOne, workerTwo, workerThree = (manager ? workerProps).as[ActorRef].get filterException[ActorKilledException] { workerOne ! Kill @@ -48,7 +51,8 @@ class SupervisorHierarchySpec extends AkkaSpec { val countDownMessages = new CountDownLatch(1) val countDownMax = new CountDownLatch(1) val boss = actorOf(Props(new Actor { - val crasher = self startsMonitoring actorOf(Props(new CountDownActor(countDownMessages)).withSupervisor(self)) + val crasher = context.actorOf(Props(new CountDownActor(countDownMessages))) + self startsMonitoring crasher protected def receive = { case "killCrasher" ⇒ crasher ! Kill diff --git a/akka-actor-tests/src/test/scala/akka/actor/SupervisorMiscSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/SupervisorMiscSpec.scala index ea4776981d..cab176d184 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/SupervisorMiscSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/SupervisorMiscSpec.scala @@ -16,26 +16,23 @@ class SupervisorMiscSpec extends AkkaSpec { filterEvents(EventFilter[Exception]("Kill")) { val countDownLatch = new CountDownLatch(4) - val supervisor = actorOf(Props(new Actor { - def receive = { case _ ⇒ } - }).withFaultHandler(OneForOneStrategy(List(classOf[Exception]), 3, 5000))) + val supervisor = actorOf(Props[Supervisor].withFaultHandler(OneForOneStrategy(List(classOf[Exception]), 3, 5000))) val workerProps = Props(new Actor { override def postRestart(cause: Throwable) { countDownLatch.countDown() } - protected def receive = { case "status" ⇒ this.reply("OK") case _ ⇒ this.self.stop() } - }).withSupervisor(supervisor) + }) - val actor1 = actorOf(workerProps.withDispatcher(app.dispatcherFactory.newPinnedDispatcher("pinned"))) + val actor1 = (supervisor ? workerProps.withDispatcher(app.dispatcherFactory.newPinnedDispatcher("pinned"))).as[ActorRef].get - val actor2 = actorOf(workerProps.withDispatcher(app.dispatcherFactory.newPinnedDispatcher("pinned"))) + val actor2 = (supervisor ? workerProps.withDispatcher(app.dispatcherFactory.newPinnedDispatcher("pinned"))).as[ActorRef].get - val actor3 = actorOf(workerProps.withDispatcher(app.dispatcherFactory.newDispatcher("test").build)) + val actor3 = (supervisor ? workerProps.withDispatcher(app.dispatcherFactory.newDispatcher("test").build)).as[ActorRef].get - val actor4 = actorOf(workerProps.withDispatcher(app.dispatcherFactory.newPinnedDispatcher("pinned"))) + val actor4 = (supervisor ? workerProps.withDispatcher(app.dispatcherFactory.newPinnedDispatcher("pinned"))).as[ActorRef].get actor1 ! Kill actor2 ! Kill diff --git a/akka-actor-tests/src/test/scala/akka/actor/SupervisorSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/SupervisorSpec.scala index c4604253c8..b802583340 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/SupervisorSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/SupervisorSpec.scala @@ -54,7 +54,7 @@ object SupervisorSpec { class Master extends Actor { - val temp = context.actorOf(Props[PingPongActor].withSupervisor(self)) + val temp = context.actorOf(Props[PingPongActor]) def receive = { case Die ⇒ (temp.?(Die, TimeoutMillis)).get @@ -71,52 +71,49 @@ class SupervisorSpec extends AkkaSpec with BeforeAndAfterEach with BeforeAndAfte // Creating actors and supervisors // ===================================================== + private def child(supervisor: ActorRef, props: Props): ActorRef = (supervisor ? props).as[ActorRef].get + def temporaryActorAllForOne = { - val supervisor = actorOf(Props(AllForOneStrategy(List(classOf[Exception]), Some(0)))) - val temporaryActor = actorOf(Props[PingPongActor].withSupervisor(supervisor)) + val supervisor = actorOf(Props[Supervisor].withFaultHandler(AllForOneStrategy(List(classOf[Exception]), Some(0)))) + val temporaryActor = child(supervisor, Props[PingPongActor]) (temporaryActor, supervisor) } def singleActorAllForOne = { - val supervisor = actorOf(Props(AllForOneStrategy(List(classOf[Exception]), 3, TimeoutMillis))) - val pingpong = actorOf(Props[PingPongActor].withSupervisor(supervisor)) + val supervisor = actorOf(Props[Supervisor].withFaultHandler(AllForOneStrategy(List(classOf[Exception]), 3, TimeoutMillis))) + val pingpong = child(supervisor, Props[PingPongActor]) (pingpong, supervisor) } def singleActorOneForOne = { - val supervisor = actorOf(Props(OneForOneStrategy(List(classOf[Exception]), 3, TimeoutMillis))) - val pingpong = actorOf(Props[PingPongActor].withSupervisor(supervisor)) + val supervisor = actorOf(Props[Supervisor].withFaultHandler(OneForOneStrategy(List(classOf[Exception]), 3, TimeoutMillis))) + val pingpong = child(supervisor, Props[PingPongActor]) (pingpong, supervisor) } def multipleActorsAllForOne = { - val supervisor = actorOf(Props(AllForOneStrategy(List(classOf[Exception]), 3, TimeoutMillis))) - val pingpong1 = actorOf(Props[PingPongActor].withSupervisor(supervisor)) - val pingpong2 = actorOf(Props[PingPongActor].withSupervisor(supervisor)) - val pingpong3 = actorOf(Props[PingPongActor].withSupervisor(supervisor)) + val supervisor = actorOf(Props[Supervisor].withFaultHandler(AllForOneStrategy(List(classOf[Exception]), 3, TimeoutMillis))) + val pingpong1, pingpong2, pingpong3 = child(supervisor, Props[PingPongActor]) (pingpong1, pingpong2, pingpong3, supervisor) } def multipleActorsOneForOne = { - val supervisor = actorOf(Props(OneForOneStrategy(List(classOf[Exception]), 3, TimeoutMillis))) - val pingpong1 = actorOf(Props[PingPongActor].withSupervisor(supervisor)) - val pingpong2 = actorOf(Props[PingPongActor].withSupervisor(supervisor)) - val pingpong3 = actorOf(Props[PingPongActor].withSupervisor(supervisor)) + val supervisor = actorOf(Props[Supervisor].withFaultHandler(OneForOneStrategy(List(classOf[Exception]), 3, TimeoutMillis))) + val pingpong1, pingpong2, pingpong3 = child(supervisor, Props[PingPongActor]) (pingpong1, pingpong2, pingpong3, supervisor) } def nestedSupervisorsAllForOne = { - val topSupervisor = actorOf(Props(AllForOneStrategy(List(classOf[Exception]), 3, TimeoutMillis))) - val pingpong1 = actorOf(Props[PingPongActor].withSupervisor(topSupervisor)) + val topSupervisor = actorOf(Props[Supervisor].withFaultHandler(AllForOneStrategy(List(classOf[Exception]), 3, TimeoutMillis))) + val pingpong1 = child(topSupervisor, Props[PingPongActor]) - val middleSupervisor = actorOf(Props(AllForOneStrategy(Nil, 3, TimeoutMillis)).withSupervisor(topSupervisor)) - val pingpong2 = actorOf(Props[PingPongActor].withSupervisor(middleSupervisor)) - val pingpong3 = actorOf(Props[PingPongActor].withSupervisor(middleSupervisor)) + val middleSupervisor = child(topSupervisor, Props[Supervisor].withFaultHandler(AllForOneStrategy(Nil, 3, TimeoutMillis))) + val pingpong2, pingpong3 = child(middleSupervisor, Props[PingPongActor]) (pingpong1, pingpong2, pingpong3, topSupervisor) } @@ -290,9 +287,9 @@ class SupervisorSpec extends AkkaSpec with BeforeAndAfterEach with BeforeAndAfte "must attempt restart when exception during restart" in { val inits = new AtomicInteger(0) - val supervisor = actorOf(Props(OneForOneStrategy(classOf[Exception] :: Nil, 3, 10000))) + val supervisor = actorOf(Props[Supervisor].withFaultHandler(OneForOneStrategy(classOf[Exception] :: Nil, 3, 10000))) - val dyingActor = actorOf(Props(new Actor { + val dyingProps = Props(new Actor { inits.incrementAndGet if (inits.get % 2 == 0) throw new IllegalStateException("Don't wanna!") @@ -301,7 +298,8 @@ class SupervisorSpec extends AkkaSpec with BeforeAndAfterEach with BeforeAndAfte case Ping ⇒ tryReply(PongMessage) case Die ⇒ throw new RuntimeException("Expected") } - }).withSupervisor(supervisor)) + }) + val dyingActor = (supervisor ? dyingProps).as[ActorRef].get intercept[RuntimeException] { (dyingActor.?(Die, TimeoutMillis)).get diff --git a/akka-actor-tests/src/test/scala/akka/actor/SupervisorTreeSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/SupervisorTreeSpec.scala index fb4ae089c9..85cbda16a2 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/SupervisorTreeSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/SupervisorTreeSpec.scala @@ -20,13 +20,10 @@ class SupervisorTreeSpec extends AkkaSpec with ImplicitSender { "be able to kill the middle actor and see itself and its child restarted" in { filterException[ActorKilledException] { within(5 seconds) { - val p = Props(new Actor { - def receive = { case false ⇒ } - override def preRestart(reason: Throwable, msg: Option[Any]) { testActor ! self.address } - }).withFaultHandler(OneForOneStrategy(List(classOf[Exception]), 3, 1000)) + val p = Props[Supervisor].withFaultHandler(OneForOneStrategy(List(classOf[Exception]), 3, 1000)) val headActor = actorOf(p) - val middleActor = actorOf(p.withSupervisor(headActor)) - val lastActor = actorOf(p.withSupervisor(middleActor)) + val middleActor = (headActor ? p).as[ActorRef].get + val lastActor = (middleActor ? p).as[ActorRef].get middleActor ! Kill expectMsg(middleActor.address) diff --git a/akka-actor-tests/src/test/scala/akka/actor/Ticket669Spec.scala b/akka-actor-tests/src/test/scala/akka/actor/Ticket669Spec.scala index c32832fbc1..6620aeebf3 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/Ticket669Spec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/Ticket669Spec.scala @@ -18,8 +18,8 @@ class Ticket669Spec extends AkkaSpec with BeforeAndAfterAll with ImplicitSender "A supervised actor with lifecycle PERMANENT" should { "be able to reply on failure during preRestart" in { filterEvents(EventFilter[Exception]("test")) { - val supervisor = actorOf(Props(AllForOneStrategy(List(classOf[Exception]), 5, 10000))) - val supervised = actorOf(Props[Supervised].withSupervisor(supervisor)) + val supervisor = actorOf(Props[Supervisor].withFaultHandler(AllForOneStrategy(List(classOf[Exception]), 5, 10000))) + val supervised = (supervisor ? Props[Supervised]).as[ActorRef].get supervised.!("test")(Some(testActor)) expectMsg("failure1") @@ -29,8 +29,8 @@ class Ticket669Spec extends AkkaSpec with BeforeAndAfterAll with ImplicitSender "be able to reply on failure during postStop" in { filterEvents(EventFilter[Exception]("test")) { - val supervisor = actorOf(Props(AllForOneStrategy(List(classOf[Exception]), Some(0), None))) - val supervised = actorOf(Props[Supervised].withSupervisor(supervisor)) + val supervisor = actorOf(Props[Supervisor].withFaultHandler(AllForOneStrategy(List(classOf[Exception]), Some(0), None))) + val supervised = (supervisor ? Props[Supervised]).as[ActorRef].get supervised.!("test")(Some(testActor)) expectMsg("failure2") diff --git a/akka-actor/src/main/scala/akka/AkkaApplication.scala b/akka-actor/src/main/scala/akka/AkkaApplication.scala index ce67f66745..7a35872c90 100644 --- a/akka-actor/src/main/scala/akka/AkkaApplication.scala +++ b/akka-actor/src/main/scala/akka/AkkaApplication.scala @@ -155,27 +155,36 @@ class AkkaApplication(val name: String, val config: Configuration) extends Actor val defaultAddress = new InetSocketAddress(hostname, AkkaConfig.RemoteServerPort) - if (ConfigVersion != Version) - throw new ConfigurationException("Akka JAR version [" + Version + - "] does not match the provided config version [" + ConfigVersion + "]") - // TODO correctly pull its config from the config val dispatcherFactory = new Dispatchers(this) implicit val dispatcher = dispatcherFactory.defaultGlobalDispatcher + val reflective = new ReflectiveAccess(this) + + // TODO think about memory consistency effects when doing funky stuff inside an ActorRefProvider's constructor + val provider: ActorRefProvider = reflective.createProvider + + // TODO make this configurable + protected[akka] val guardian: ActorRef = { + import akka.actor.FaultHandlingStrategy._ + new LocalActorRef(this, + Props(context ⇒ { case _ ⇒ }).withFaultHandler(OneForOneStrategy { + case _: ActorKilledException ⇒ Stop + case _: Exception ⇒ Restart + }).withDispatcher(dispatcher), + provider.theOneWhoWalksTheBubblesOfSpaceTime, + "ApplicationSupervisor", + true) + } + val eventHandler = new EventHandler(this) val log: Logging = new EventHandlerLogging(eventHandler, this) - val reflective = new ReflectiveAccess(this) - // TODO think about memory consistency effects when doing funky stuff inside an ActorRefProvider's constructor val deployer = new Deployer(this) - // TODO think about memory consistency effects when doing funky stuff inside an ActorRefProvider's constructor - val provider: ActorRefProvider = reflective.createProvider - val deathWatch = provider.createDeathWatch() val typedActor = new TypedActor(this) diff --git a/akka-actor/src/main/scala/akka/actor/ActorCell.scala b/akka-actor/src/main/scala/akka/actor/ActorCell.scala index 3063cfaf01..3afde89403 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorCell.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorCell.scala @@ -64,11 +64,14 @@ private[akka] class ActorCell( val app: AkkaApplication, val self: ActorRef with ScalaActorRef, val props: Props, + val supervisor: ActorRef, var receiveTimeout: Option[Long], var hotswap: Stack[PartialFunction[Any, Unit]]) extends ActorContext { import ActorCell._ + protected def guardian = self + def provider = app.provider var futureTimeout: Option[ScheduledFuture[AnyRef]] = None //FIXME TODO Doesn't need to be volatile either, since it will only ever be accessed when a message is processed @@ -92,14 +95,8 @@ private[akka] class ActorCell( def start(): Unit = { mailbox = dispatcher.createMailbox(this) - if (props.supervisor.isDefined) { - props.supervisor.get match { - case l: LocalActorRef ⇒ - // ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅ - l.underlying.dispatcher.systemDispatch(l.underlying, akka.dispatch.Supervise(self)) //FIXME TODO Support all ActorRefs? - case other ⇒ throw new UnsupportedOperationException("Supervision failure: " + other + " cannot be a supervisor, only LocalActorRefs can") - } - } + // ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅ + supervisor.sendSystemMessage(akka.dispatch.Supervise(self)) dispatcher.attach(this) } @@ -127,9 +124,6 @@ private[akka] class ActorCell( def children: Iterable[ActorRef] = _children.map(_.child) - //TODO FIXME remove this method - def supervisor: Option[ActorRef] = props.supervisor - def postMessageToMailbox(message: Any, channel: UntypedChannel): Unit = dispatcher dispatch Envelope(this, message, channel) def postMessageToMailboxAndCreateFutureResultWithTimeout( @@ -193,7 +187,7 @@ private[akka] class ActorCell( // prevent any further messages to be processed until the actor has been restarted dispatcher.suspend(this) } finally { - if (supervisor.isDefined) supervisor.get ! Failed(self, e) else self.stop() + supervisor ! Failed(self, e) } } @@ -224,7 +218,7 @@ private[akka] class ActorCell( // prevent any further messages to be processed until the actor has been restarted dispatcher.suspend(this) } finally { - if (supervisor.isDefined) supervisor.get ! Failed(self, e) else self.stop() + supervisor ! Failed(self, e) } } @@ -252,16 +246,13 @@ private[akka] class ActorCell( } } } finally { - val cause = new ActorKilledException("Stopped") //FIXME TODO make this an object, can be reused everywhere try { - if (supervisor.isDefined) supervisor.get ! ChildTerminated(self, cause) + val cause = new ActorKilledException("Stopped") //FIXME TODO make this an object, can be reused everywhere + supervisor ! ChildTerminated(self, cause) + app.deathWatch.publish(Terminated(self, cause)) } finally { - try { - app.deathWatch.publish(Terminated(self, cause)) - } finally { - currentMessage = null - clearActorContext() - } + currentMessage = null + clearActorContext() } } } @@ -320,11 +311,8 @@ private[akka] class ActorCell( channel.sendException(e) - if (supervisor.isDefined) { - props.faultHandler.handleSupervisorFailing(self, _children) - supervisor.get ! Failed(self, e) - } else - dispatcher.resume(this) + props.faultHandler.handleSupervisorFailing(self, _children) + supervisor ! Failed(self, e) if (e.isInstanceOf[InterruptedException]) throw e //Re-throw InterruptedExceptions as expected } finally { @@ -343,7 +331,7 @@ private[akka] class ActorCell( } def handleFailure(fail: Failed): Unit = if (!props.faultHandler.handleFailure(fail, _children)) { - if (supervisor.isDefined) throw fail.cause else self.stop() + throw fail.cause } def handleChildTerminated(child: ActorRef): Unit = _children = props.faultHandler.handleChildTerminated(child, _children) diff --git a/akka-actor/src/main/scala/akka/actor/ActorRef.scala b/akka-actor/src/main/scala/akka/actor/ActorRef.scala index 41e8a6aeac..ff5bae9044 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRef.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRef.scala @@ -148,19 +148,20 @@ abstract class ActorRef extends ActorRefShared with UntypedChannel with ReplyCha class LocalActorRef private[akka] ( app: AkkaApplication, props: Props, - givenAddress: String, //Never refer to this internally instead use "address" + _supervisor: ActorRef, + _givenAddress: String, val systemService: Boolean = false, private[akka] val uuid: Uuid = newUuid, receiveTimeout: Option[Long] = None, hotswap: Stack[PartialFunction[Any, Unit]] = Props.noHotSwap) extends ActorRef with ScalaActorRef { - final val address: String = givenAddress match { + final val address: String = _givenAddress match { case null | Props.randomAddress ⇒ uuid.toString case other ⇒ other } - private[this] val actorCell = new ActorCell(app, this, props, receiveTimeout, hotswap) + private[this] val actorCell = new ActorCell(app, this, props, _supervisor, receiveTimeout, hotswap) actorCell.start() /** @@ -224,6 +225,8 @@ class LocalActorRef private[akka] ( instance } + protected[akka] def sendSystemMessage(message: SystemMessage) { underlying.dispatcher.systemDispatch(underlying, message) } + protected[akka] def postMessageToMailbox(message: Any, channel: UntypedChannel): Unit = actorCell.postMessageToMailbox(message, channel) @@ -273,6 +276,8 @@ trait ActorRefShared { */ trait ScalaActorRef extends ActorRefShared with ReplyChannel[Any] { ref: ActorRef ⇒ + protected[akka] def sendSystemMessage(message: SystemMessage): Unit + /** * Sends a one-way asynchronous message. E.g. fire-and-forget semantics. *

@@ -329,6 +334,8 @@ case class SerializedActorRef(uuid: Uuid, address: String, hostname: String, por */ trait UnsupportedActorRef extends ActorRef with ScalaActorRef { + private[akka] def uuid: Uuid = unsupported + def startsMonitoring(actorRef: ActorRef): ActorRef = unsupported def stopsMonitoring(actorRef: ActorRef): ActorRef = unsupported @@ -339,26 +346,38 @@ trait UnsupportedActorRef extends ActorRef with ScalaActorRef { protected[akka] def restart(cause: Throwable): Unit = unsupported + def stop(): Unit = unsupported + + def address: String = unsupported + + def isShutdown = false + + protected[akka] def sendSystemMessage(message: SystemMessage) {} + + protected[akka] def postMessageToMailbox(msg: Any, channel: UntypedChannel) {} + + protected[akka] def postMessageToMailboxAndCreateFutureResultWithTimeout(msg: Any, timeout: Timeout, channel: UntypedChannel): Future[Any] = unsupported + private def unsupported = throw new UnsupportedOperationException("Not supported for %s".format(getClass.getName)) } class DeadLetterActorRef(app: AkkaApplication) extends UnsupportedActorRef { val brokenPromise = new KeptPromise[Any](Left(new ActorKilledException("In DeadLetterActorRef, promises are always broken.")))(app.dispatcher) - val address: String = "akka:internal:DeadLetterActorRef" + override val address: String = "akka:internal:DeadLetterActorRef" - private[akka] val uuid: akka.actor.Uuid = new com.eaio.uuid.UUID(0L, 0L) //Nil UUID + private[akka] override val uuid: akka.actor.Uuid = new com.eaio.uuid.UUID(0L, 0L) //Nil UUID override def startsMonitoring(actorRef: ActorRef): ActorRef = actorRef override def stopsMonitoring(actorRef: ActorRef): ActorRef = actorRef - def isShutdown(): Boolean = true + override def isShutdown(): Boolean = true - def stop(): Unit = () + override def stop(): Unit = () - protected[akka] def postMessageToMailbox(message: Any, channel: UntypedChannel): Unit = app.eventHandler.debug(this, message) + protected[akka] override def postMessageToMailbox(message: Any, channel: UntypedChannel): Unit = app.eventHandler.debug(this, message) - protected[akka] def postMessageToMailboxAndCreateFutureResultWithTimeout( + protected[akka] override def postMessageToMailboxAndCreateFutureResultWithTimeout( message: Any, timeout: Timeout, channel: UntypedChannel): Future[Any] = { app.eventHandler.debug(this, message); brokenPromise } diff --git a/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala b/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala index 7133c82fcd..50d9c243f5 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala @@ -12,26 +12,28 @@ import java.util.concurrent.ConcurrentHashMap import com.eaio.uuid.UUID import akka.AkkaException import akka.event.{ ActorClassification, DeathWatch, EventHandler } -import akka.dispatch.{ Future, MessageDispatcher, Promise } +import akka.dispatch._ /** * Interface for all ActorRef providers to implement. */ trait ActorRefProvider { - def actorOf(props: Props, address: String): ActorRef + def actorOf(props: Props, supervisor: ActorRef, address: String): ActorRef = actorOf(props, supervisor, address, false) - def actorOf(props: RoutedProps, address: String): ActorRef + def actorOf(props: RoutedProps, supervisor: ActorRef, address: String): ActorRef def actorFor(address: String): Option[ActorRef] - private[akka] def actorOf(props: Props, address: String, systemService: Boolean): ActorRef + private[akka] def actorOf(props: Props, supervisor: ActorRef, address: String, systemService: Boolean): ActorRef private[akka] def evict(address: String): Boolean private[akka] def deserialize(actor: SerializedActorRef): Option[ActorRef] private[akka] def createDeathWatch(): DeathWatch + + private[akka] def theOneWhoWalksTheBubblesOfSpaceTime: ActorRef } /** @@ -43,6 +45,11 @@ trait ActorRefFactory { def dispatcher: MessageDispatcher + /** + * Father of all children created by this interface. + */ + protected def guardian: ActorRef + def actorOf(props: Props): ActorRef = actorOf(props, Props.randomAddress) /* @@ -50,7 +57,7 @@ trait ActorRefFactory { * the same address can race on the cluster, and then you never know which * implementation wins */ - def actorOf(props: Props, address: String): ActorRef = provider.actorOf(props, address) + def actorOf(props: Props, address: String): ActorRef = provider.actorOf(props, guardian, address, false) def actorOf[T <: Actor](implicit m: Manifest[T]): ActorRef = actorOf(Props(m.erasure.asInstanceOf[Class[_ <: Actor]])) @@ -65,7 +72,7 @@ trait ActorRefFactory { def actorOf(props: RoutedProps): ActorRef = actorOf(props, Props.randomAddress) - def actorOf(props: RoutedProps, address: String): ActorRef = provider.actorOf(props, address) + def actorOf(props: RoutedProps, address: String): ActorRef = provider.actorOf(props, guardian, address) def findActor(address: String): Option[ActorRef] = provider.actorFor(address) @@ -78,9 +85,32 @@ class ActorRefProviderException(message: String) extends AkkaException(message) */ class LocalActorRefProvider(val app: AkkaApplication) extends ActorRefProvider { - private val actors = new ConcurrentHashMap[String, AnyRef] + /** + * Top-level anchor for the supervision hierarchy of this actor system. Will + * receive only Supervise/ChildTerminated system messages or Failure message. + */ + private[akka] val theOneWhoWalksTheBubblesOfSpaceTime: ActorRef = new UnsupportedActorRef { + override def address = app.name + ":BubbleWalker" - def actorOf(props: Props, address: String): ActorRef = actorOf(props, address, false) + override def toString = address + + protected[akka] override def postMessageToMailbox(msg: Any, channel: UntypedChannel) { + msg match { + case Failed(child, ex) ⇒ child.stop() + case ChildTerminated(child, ex) ⇒ // TODO execute any installed termination handlers + case _ ⇒ app.eventHandler.error(this, this + " received unexpected message " + msg) + } + } + + protected[akka] override def sendSystemMessage(message: SystemMessage) { + message match { + case Supervise(child) ⇒ // TODO register child in some map to keep track of it and enable shutdown after all dead + case _ ⇒ app.eventHandler.error(this, this + " received unexpected system message " + message) + } + } + } + + private val actors = new ConcurrentHashMap[String, AnyRef] def actorFor(address: String): Option[ActorRef] = actors.get(address) match { case null ⇒ None @@ -93,9 +123,9 @@ class LocalActorRefProvider(val app: AkkaApplication) extends ActorRefProvider { */ private[akka] def evict(address: String): Boolean = actors.remove(address) ne null - private[akka] def actorOf(props: Props, address: String, systemService: Boolean): ActorRef = { + private[akka] def actorOf(props: Props, supervisor: ActorRef, address: String, systemService: Boolean): ActorRef = { if ((address eq null) || address == Props.randomAddress) { - val actor = new LocalActorRef(app, props, address, systemService = true) + val actor = new LocalActorRef(app, props, supervisor, address, systemService = true) actors.putIfAbsent(actor.address, actor) match { case null ⇒ actor case other ⇒ throw new IllegalStateException("Same uuid generated twice for: " + actor + " and " + other) @@ -110,7 +140,7 @@ class LocalActorRefProvider(val app: AkkaApplication) extends ActorRefProvider { // create a local actor case None | Some(DeploymentConfig.Deploy(_, _, DeploymentConfig.Direct, _, _, DeploymentConfig.LocalScope)) ⇒ - new LocalActorRef(app, props, address, systemService) // create a local actor + new LocalActorRef(app, props, supervisor, address, systemService) // create a local actor // create a routed actor ref case deploy @ Some(DeploymentConfig.Deploy(_, _, routerType, nrOfInstances, _, DeploymentConfig.LocalScope)) ⇒ @@ -127,9 +157,9 @@ class LocalActorRefProvider(val app: AkkaApplication) extends ActorRefProvider { } val connections: Iterable[ActorRef] = - if (nrOfInstances.factor > 0) Vector.fill(nrOfInstances.factor)(new LocalActorRef(app, props, "", systemService)) else Nil + if (nrOfInstances.factor > 0) Vector.fill(nrOfInstances.factor)(new LocalActorRef(app, props, supervisor, "", systemService)) else Nil - actorOf(RoutedProps(routerFactory = routerFactory, connectionManager = new LocalConnectionManager(connections)), address) + actorOf(RoutedProps(routerFactory = routerFactory, connectionManager = new LocalConnectionManager(connections)), supervisor, address) case _ ⇒ throw new Exception("Don't know how to create this actor ref! Why?") } @@ -155,7 +185,9 @@ class LocalActorRefProvider(val app: AkkaApplication) extends ActorRefProvider { /** * Creates (or fetches) a routed actor reference, configured by the 'props: RoutedProps' configuration. */ - def actorOf(props: RoutedProps, address: String): ActorRef = { + def actorOf(props: RoutedProps, supervisor: ActorRef, address: String): ActorRef = { + // FIXME: this needs to take supervision into account! + //FIXME clustering should be implemented by cluster actor ref provider //TODO Implement support for configuring by deployment ID etc //TODO If address matches an already created actor (Ahead-of-time deployed) return that actor diff --git a/akka-actor/src/main/scala/akka/actor/Props.scala b/akka-actor/src/main/scala/akka/actor/Props.scala index ba668ae280..26fee852bb 100644 --- a/akka-actor/src/main/scala/akka/actor/Props.scala +++ b/akka-actor/src/main/scala/akka/actor/Props.scala @@ -80,8 +80,7 @@ object Props { case class Props(creator: () ⇒ Actor = Props.defaultCreator, @transient dispatcher: MessageDispatcher = Props.defaultDispatcher, timeout: Timeout = Props.defaultTimeout, - faultHandler: FaultHandlingStrategy = Props.defaultFaultHandler, - supervisor: Option[ActorRef] = Props.defaultSupervisor) { + faultHandler: FaultHandlingStrategy = Props.defaultFaultHandler) { /** * No-args constructor that sets all the default values * Java API @@ -90,8 +89,7 @@ case class Props(creator: () ⇒ Actor = Props.defaultCreator, creator = Props.defaultCreator, dispatcher = Props.defaultDispatcher, timeout = Props.defaultTimeout, - faultHandler = Props.defaultFaultHandler, - supervisor = Props.defaultSupervisor) + faultHandler = Props.defaultFaultHandler) /** * Returns a new Props with the specified creator set @@ -129,21 +127,4 @@ case class Props(creator: () ⇒ Actor = Props.defaultCreator, */ def withFaultHandler(f: FaultHandlingStrategy) = copy(faultHandler = f) - /** - * Returns a new Props with the specified supervisor set, if null, it's equivalent to withSupervisor(Option.none()) - * Java API - */ - def withSupervisor(s: ActorRef) = copy(supervisor = Option(s)) - - /** - * Returns a new Props with the specified supervisor set - * Java API - */ - def withSupervisor(s: akka.japi.Option[ActorRef]) = copy(supervisor = s.asScala) - - /** - * Returns a new Props with the specified supervisor set - * Scala API - */ - def withSupervisor(s: scala.Option[ActorRef]) = copy(supervisor = s) } diff --git a/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala index 58502fa334..bb67b618b8 100644 --- a/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala @@ -56,14 +56,14 @@ object SystemMessage { sealed trait SystemMessage extends PossiblyHarmful { var next: SystemMessage = _ } -case class Create() extends SystemMessage -case class Recreate(cause: Throwable) extends SystemMessage -case class Suspend() extends SystemMessage -case class Resume() extends SystemMessage -case class Terminate() extends SystemMessage -case class Supervise(child: ActorRef) extends SystemMessage -case class Link(subject: ActorRef) extends SystemMessage -case class Unlink(subject: ActorRef) extends SystemMessage +case class Create() extends SystemMessage // send to self from Dispatcher.register +case class Recreate(cause: Throwable) extends SystemMessage // sent to self from ActorCell.restart +case class Suspend() extends SystemMessage // sent to self from ActorCell.suspend +case class Resume() extends SystemMessage // sent to self from ActorCell.resume +case class Terminate() extends SystemMessage // sent to self from ActorCell.stop +case class Supervise(child: ActorRef) extends SystemMessage // sent to supervisor ActorRef from ActorCell.start +case class Link(subject: ActorRef) extends SystemMessage // sent to self from ActorCell.startsMonitoring +case class Unlink(subject: ActorRef) extends SystemMessage // sent to self from ActorCell.stopsMonitoring final case class TaskInvocation(app: AkkaApplication, function: () ⇒ Unit, cleanup: () ⇒ Unit) extends Runnable { def run() { diff --git a/akka-actor/src/main/scala/akka/event/EventHandler.scala b/akka-actor/src/main/scala/akka/event/EventHandler.scala index a668c16249..fa867eeada 100644 --- a/akka-actor/src/main/scala/akka/event/EventHandler.scala +++ b/akka-actor/src/main/scala/akka/event/EventHandler.scala @@ -204,7 +204,7 @@ class EventHandler(app: AkkaApplication) extends ListenerManagement { defaultListeners foreach { listenerName ⇒ try { ReflectiveAccess.getClassFor[Actor](listenerName) match { - case Right(actorClass) ⇒ addListener(new LocalActorRef(app, Props(actorClass).withDispatcher(EventHandlerDispatcher), Props.randomAddress, systemService = true)) + case Right(actorClass) ⇒ addListener(new LocalActorRef(app, Props(actorClass).withDispatcher(EventHandlerDispatcher), app.guardian, Props.randomAddress, systemService = true)) case Left(exception) ⇒ throw exception } } catch { diff --git a/akka-actor/src/main/scala/akka/remote/RemoteInterface.scala b/akka-actor/src/main/scala/akka/remote/RemoteInterface.scala index 5df0607365..69b0040efb 100644 --- a/akka-actor/src/main/scala/akka/remote/RemoteInterface.scala +++ b/akka-actor/src/main/scala/akka/remote/RemoteInterface.scala @@ -190,7 +190,7 @@ abstract class RemoteSupport(val app: AkkaApplication) extends ListenerManagemen lazy val eventHandler: ActorRef = { implicit object format extends StatelessActorFormat[RemoteEventHandler] val clazz = classOf[RemoteEventHandler] - val handler = app.provider.actorOf(Props(clazz), clazz.getName, true) + val handler = app.provider.actorOf(Props(clazz), app.guardian, clazz.getName, true) // add the remote client and server listener that pipes the events to the event handler system addListener(handler) handler diff --git a/akka-actor/src/main/scala/akka/routing/Pool.scala b/akka-actor/src/main/scala/akka/routing/Pool.scala index 803bdf1a51..3554cfb1af 100644 --- a/akka-actor/src/main/scala/akka/routing/Pool.scala +++ b/akka-actor/src/main/scala/akka/routing/Pool.scala @@ -93,7 +93,7 @@ trait DefaultActorPool extends ActorPool { this: Actor ⇒ protected[akka] var _delegates = Vector[ActorRef]() - val defaultProps: Props = Props.default.withSupervisor(this.self).withDispatcher(this.context.dispatcher) + val defaultProps: Props = Props.default.withDispatcher(this.context.dispatcher) override def postStop() { _delegates foreach evict diff --git a/akka-actor/src/main/scala/akka/routing/Routing.scala b/akka-actor/src/main/scala/akka/routing/Routing.scala index d499b80efb..28d253d172 100644 --- a/akka-actor/src/main/scala/akka/routing/Routing.scala +++ b/akka-actor/src/main/scala/akka/routing/Routing.scala @@ -94,7 +94,7 @@ object Routing { * An Abstract convenience implementation for building an ActorReference that uses a Router. */ abstract private[akka] class AbstractRoutedActorRef(val props: RoutedProps) extends UnsupportedActorRef { - private[akka] val uuid: Uuid = newUuid + private[akka] override val uuid: Uuid = newUuid val router = props.routerFactory() @@ -120,14 +120,14 @@ abstract private[akka] class AbstractRoutedActorRef(val props: RoutedProps) exte * A RoutedActorRef is an ActorRef that has a set of connected ActorRef and it uses a Router to send a message to * on (or more) of these actors. */ -private[akka] class RoutedActorRef(val routedProps: RoutedProps, val address: String) extends AbstractRoutedActorRef(routedProps) { +private[akka] class RoutedActorRef(val routedProps: RoutedProps, override val address: String) extends AbstractRoutedActorRef(routedProps) { @volatile private var running: Boolean = true - def isShutdown: Boolean = !running + override def isShutdown: Boolean = !running - def stop() { + override def stop() { synchronized { if (running) { running = false diff --git a/akka-remote/src/main/scala/akka/remote/NetworkEventStream.scala b/akka-remote/src/main/scala/akka/remote/NetworkEventStream.scala index b67f782c36..efd645cefd 100644 --- a/akka-remote/src/main/scala/akka/remote/NetworkEventStream.scala +++ b/akka-remote/src/main/scala/akka/remote/NetworkEventStream.scala @@ -64,8 +64,10 @@ class NetworkEventStream(val app: AkkaApplication) { import NetworkEventStream._ + // FIXME: check that this supervision is correct private[akka] val channel = app.provider.actorOf( - Props[Channel].copy(dispatcher = app.dispatcherFactory.newPinnedDispatcher("NetworkEventStream")), Props.randomAddress, systemService = true) + Props[Channel].copy(dispatcher = app.dispatcherFactory.newPinnedDispatcher("NetworkEventStream")), + app.guardian, Props.randomAddress, systemService = true) /** * Registers a network event stream listener (asyncronously). diff --git a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala index 225612812f..6d6c2d14c2 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala @@ -31,6 +31,8 @@ class RemoteActorRefProvider(val app: AkkaApplication) extends ActorRefProvider import java.util.concurrent.ConcurrentHashMap import akka.dispatch.Promise + private[akka] val theOneWhoWalksTheBubblesOfSpaceTime: ActorRef = new UnsupportedActorRef {} + val local = new LocalActorRefProvider(app) val remote = new Remote(app) @@ -44,10 +46,8 @@ class RemoteActorRefProvider(val app: AkkaApplication) extends ActorRefProvider def defaultDispatcher = app.dispatcher def defaultTimeout = app.AkkaConfig.ActorTimeout - def actorOf(props: Props, address: String): ActorRef = actorOf(props, address, false) - - def actorOf(props: Props, address: String, systemService: Boolean): ActorRef = - if (systemService) local.actorOf(props, address, systemService) + def actorOf(props: Props, supervisor: ActorRef, address: String, systemService: Boolean): ActorRef = + if (systemService) local.actorOf(props, supervisor, address, systemService) else { val newFuture = Promise[ActorRef](5000)(defaultDispatcher) // FIXME is this proper timeout? @@ -71,7 +71,7 @@ class RemoteActorRefProvider(val app: AkkaApplication) extends ActorRefProvider if (isReplicaNode) { // we are on one of the replica node for this remote actor - new LocalActorRef(app, props, address, false) + new LocalActorRef(app, props, supervisor, address, false) } else { // we are on the single "reference" node uses the remote actors on the replica nodes @@ -115,10 +115,10 @@ class RemoteActorRefProvider(val app: AkkaApplication) extends ActorRefProvider connections.keys foreach { useActorOnNode(_, address, props.creator) } - actorOf(RoutedProps(routerFactory = routerFactory, connectionManager = connectionManager), address) + actorOf(RoutedProps(routerFactory = routerFactory, connectionManager = connectionManager), supervisor, address) } - case deploy ⇒ local.actorOf(props, address, systemService) + case deploy ⇒ local.actorOf(props, supervisor, address, systemService) } } catch { case e: Exception ⇒ @@ -139,7 +139,8 @@ class RemoteActorRefProvider(val app: AkkaApplication) extends ActorRefProvider /** * Copied from LocalActorRefProvider... */ - def actorOf(props: RoutedProps, address: String): ActorRef = { + // FIXME: implement supervision + def actorOf(props: RoutedProps, supervisor: ActorRef, address: String): ActorRef = { if (props.connectionManager.isEmpty) throw new ConfigurationException("RoutedProps used for creating actor [" + address + "] has zero connections configured; can't create a router") new RoutedActorRef(props, address) } @@ -244,6 +245,8 @@ private[akka] case class RemoteActorRef private[akka] ( def isShutdown: Boolean = !running + protected[akka] def sendSystemMessage(message: SystemMessage): Unit = unsupported + def postMessageToMailbox(message: Any, channel: UntypedChannel) { val chSender = if (channel.isInstanceOf[ActorRef]) Some(channel.asInstanceOf[ActorRef]) else None remote.send[Any](message, chSender, None, remoteAddress, true, this, loader) diff --git a/akka-remote/src/main/scala/akka/remote/RemoteDaemon.scala b/akka-remote/src/main/scala/akka/remote/RemoteDaemon.scala index 449badf073..48e113187e 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteDaemon.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteDaemon.scala @@ -44,12 +44,14 @@ class Remote(val app: AkkaApplication) extends RemoteService { private[remote] lazy val remoteDaemonSupervisor = app.actorOf(Props( OneForOneStrategy(List(classOf[Exception]), None, None))) // is infinite restart what we want? + // FIXME check that this supervision is okay private[remote] lazy val remoteDaemon = new LocalActorRef( app, - props = Props(new RemoteDaemon(this)).withDispatcher(app.dispatcherFactory.newPinnedDispatcher("Remote")).withSupervisor(remoteDaemonSupervisor), - givenAddress = remoteDaemonServiceName, - systemService = true) + Props(new RemoteDaemon(this)).withDispatcher(app.dispatcherFactory.newPinnedDispatcher("Remote")), + app.guardian, + remoteDaemonServiceName, + true) private[remote] lazy val remoteClientLifeCycleHandler = app.actorOf(Props(new Actor { def receive = { @@ -172,36 +174,40 @@ class RemoteDaemon(val remote: Remote) extends Actor { // } } + // FIXME: handle real remote supervision def handle_fun0_unit(message: RemoteProtocol.RemoteDaemonMessageProtocol) { new LocalActorRef(app, Props( context ⇒ { case f: Function0[_] ⇒ try { f() } finally { context.self.stop() } - }).copy(dispatcher = computeGridDispatcher), Props.randomAddress, systemService = true) ! payloadFor(message, classOf[Function0[Unit]]) + }).copy(dispatcher = computeGridDispatcher), app.guardian, Props.randomAddress, systemService = true) ! payloadFor(message, classOf[Function0[Unit]]) } + // FIXME: handle real remote supervision def handle_fun0_any(message: RemoteProtocol.RemoteDaemonMessageProtocol) { new LocalActorRef(app, Props( context ⇒ { case f: Function0[_] ⇒ try { reply(f()) } finally { context.self.stop() } - }).copy(dispatcher = computeGridDispatcher), Props.randomAddress, systemService = true) forward payloadFor(message, classOf[Function0[Any]]) + }).copy(dispatcher = computeGridDispatcher), app.guardian, Props.randomAddress, systemService = true) forward payloadFor(message, classOf[Function0[Any]]) } + // FIXME: handle real remote supervision def handle_fun1_arg_unit(message: RemoteProtocol.RemoteDaemonMessageProtocol) { new LocalActorRef(app, Props( context ⇒ { case (fun: Function[_, _], param: Any) ⇒ try { fun.asInstanceOf[Any ⇒ Unit].apply(param) } finally { context.self.stop() } - }).copy(dispatcher = computeGridDispatcher), Props.randomAddress, systemService = true) ! payloadFor(message, classOf[Tuple2[Function1[Any, Unit], Any]]) + }).copy(dispatcher = computeGridDispatcher), app.guardian, Props.randomAddress, systemService = true) ! payloadFor(message, classOf[Tuple2[Function1[Any, Unit], Any]]) } + // FIXME: handle real remote supervision def handle_fun1_arg_any(message: RemoteProtocol.RemoteDaemonMessageProtocol) { new LocalActorRef(app, Props( context ⇒ { case (fun: Function[_, _], param: Any) ⇒ try { reply(fun.asInstanceOf[Any ⇒ Any](param)) } finally { context.self.stop() } - }).copy(dispatcher = computeGridDispatcher), Props.randomAddress, systemService = true) forward payloadFor(message, classOf[Tuple2[Function1[Any, Any], Any]]) + }).copy(dispatcher = computeGridDispatcher), app.guardian, Props.randomAddress, systemService = true) forward payloadFor(message, classOf[Tuple2[Function1[Any, Any], Any]]) } def handleFailover(message: RemoteProtocol.RemoteDaemonMessageProtocol) { diff --git a/akka-remote/src/main/scala/akka/serialization/SerializationProtocol.scala b/akka-remote/src/main/scala/akka/serialization/SerializationProtocol.scala index 2e4d177aba..d1d35269ce 100644 --- a/akka-remote/src/main/scala/akka/serialization/SerializationProtocol.scala +++ b/akka-remote/src/main/scala/akka/serialization/SerializationProtocol.scala @@ -201,15 +201,14 @@ class ActorSerialization(val app: AkkaApplication, remote: RemoteSupport) { } val props = Props(creator = factory, - timeout = if (protocol.hasTimeout) protocol.getTimeout else app.AkkaConfig.ActorTimeout, - supervisor = storedSupervisor //TODO what dispatcher should it use? + timeout = if (protocol.hasTimeout) protocol.getTimeout else app.AkkaConfig.ActorTimeout //TODO what dispatcher should it use? //TODO what faultHandler should it use? - // ) val receiveTimeout = if (protocol.hasReceiveTimeout) Some(protocol.getReceiveTimeout) else None //TODO FIXME, I'm expensive and slow - val ar = new LocalActorRef(app, props, protocol.getAddress, false, actorUuid, receiveTimeout, storedHotswap) + // FIXME: what to do if storedSupervisor is empty? + val ar = new LocalActorRef(app, props, storedSupervisor getOrElse app.guardian, protocol.getAddress, false, actorUuid, receiveTimeout, storedHotswap) //Deserialize messages { diff --git a/akka-remote/src/test/scala/akka/serialization/ActorSerializeSpec.scala b/akka-remote/src/test/scala/akka/serialization/ActorSerializeSpec.scala index 6a31c6eea3..90e433c26e 100644 --- a/akka-remote/src/test/scala/akka/serialization/ActorSerializeSpec.scala +++ b/akka-remote/src/test/scala/akka/serialization/ActorSerializeSpec.scala @@ -23,7 +23,7 @@ class ActorSerializeSpec extends AkkaSpec with BeforeAndAfterAll { "Serializable actor" must { "must be able to serialize and de-serialize a stateful actor with a given serializer" ignore { - val actor1 = new LocalActorRef(app, Props[MyJavaSerializableActor], Props.randomAddress, systemService = true) + val actor1 = new LocalActorRef(app, Props[MyJavaSerializableActor], app.guardian, Props.randomAddress, systemService = true) (actor1 ? "hello").get must equal("world 1") (actor1 ? "hello").get must equal("world 2") @@ -39,7 +39,7 @@ class ActorSerializeSpec extends AkkaSpec with BeforeAndAfterAll { "must be able to serialize and deserialize a MyStatelessActorWithMessagesInMailbox" ignore { - val actor1 = new LocalActorRef(app, Props[MyStatelessActorWithMessagesInMailbox], Props.randomAddress, systemService = true) + val actor1 = new LocalActorRef(app, Props[MyStatelessActorWithMessagesInMailbox], app.guardian, Props.randomAddress, systemService = true) for (i ← 1 to 10) actor1 ! "hello" actor1.underlying.dispatcher.mailboxSize(actor1.underlying) must be > (0) @@ -57,7 +57,7 @@ class ActorSerializeSpec extends AkkaSpec with BeforeAndAfterAll { "must be able to serialize and deserialize a PersonActorWithMessagesInMailbox" ignore { val p1 = Person("debasish ghosh", 25, SerializeSpec.Address("120", "Monroe Street", "Santa Clara", "95050")) - val actor1 = new LocalActorRef(app, Props[PersonActorWithMessagesInMailbox], Props.randomAddress, systemService = true) + val actor1 = new LocalActorRef(app, Props[PersonActorWithMessagesInMailbox], app.guardian, Props.randomAddress, systemService = true) (actor1 ! p1) (actor1 ! p1) (actor1 ! p1) @@ -103,7 +103,7 @@ class ActorSerializeSpec extends AkkaSpec with BeforeAndAfterAll { "serialize actor that accepts protobuf message" ignore { "must serialize" ignore { - val actor1 = new LocalActorRef(app, Props[MyActorWithProtobufMessagesInMailbox], Props.randomAddress, systemService = true) + val actor1 = new LocalActorRef(app, Props[MyActorWithProtobufMessagesInMailbox], app.guardian, Props.randomAddress, systemService = true) val msg = MyMessage(123, "debasish ghosh", true) val b = ProtobufProtocol.MyMessage.newBuilder.setId(msg.id).setName(msg.name).setStatus(msg.status).build for (i ← 1 to 10) actor1 ! b diff --git a/akka-testkit/src/main/scala/akka/testkit/TestActorRef.scala b/akka-testkit/src/main/scala/akka/testkit/TestActorRef.scala index 8794d00c50..b612a7a7f8 100644 --- a/akka-testkit/src/main/scala/akka/testkit/TestActorRef.scala +++ b/akka-testkit/src/main/scala/akka/testkit/TestActorRef.scala @@ -19,8 +19,8 @@ import akka.AkkaApplication * @author Roland Kuhn * @since 1.1 */ -class TestActorRef[T <: Actor](_app: AkkaApplication, props: Props, address: String) - extends LocalActorRef(_app, props.withDispatcher(new CallingThreadDispatcher(_app)), address, false) { +class TestActorRef[T <: Actor](_app: AkkaApplication, _props: Props, _supervisor: ActorRef, address: String) + extends LocalActorRef(_app, _props.withDispatcher(new CallingThreadDispatcher(_app)), _supervisor, address, false) { /** * Directly inject messages into actor receive behavior. Any exceptions * thrown will be available to you, while still being able to use @@ -48,7 +48,10 @@ object TestActorRef { def apply[T <: Actor](props: Props)(implicit app: AkkaApplication): TestActorRef[T] = apply[T](props, Props.randomAddress) - def apply[T <: Actor](props: Props, address: String)(implicit app: AkkaApplication): TestActorRef[T] = new TestActorRef(app, props, address) + def apply[T <: Actor](props: Props, address: String)(implicit app: AkkaApplication): TestActorRef[T] = apply[T](props, app.guardian, address) + + def apply[T <: Actor](props: Props, supervisor: ActorRef, address: String)(implicit app: AkkaApplication): TestActorRef[T] = + new TestActorRef(app, props, supervisor, address) def apply[T <: Actor](implicit m: Manifest[T], app: AkkaApplication): TestActorRef[T] = apply[T](Props.randomAddress) diff --git a/akka-testkit/src/main/scala/akka/testkit/TestFSMRef.scala b/akka-testkit/src/main/scala/akka/testkit/TestFSMRef.scala index 9b3b59c6bd..6ea2e058c6 100644 --- a/akka-testkit/src/main/scala/akka/testkit/TestFSMRef.scala +++ b/akka-testkit/src/main/scala/akka/testkit/TestFSMRef.scala @@ -34,8 +34,8 @@ import akka.AkkaApplication * @author Roland Kuhn * @since 1.2 */ -class TestFSMRef[S, D, T <: Actor](app: AkkaApplication, props: Props, address: String)(implicit ev: T <:< FSM[S, D]) - extends TestActorRef(app, props, address) { +class TestFSMRef[S, D, T <: Actor](app: AkkaApplication, props: Props, supervisor: ActorRef, address: String)(implicit ev: T <:< FSM[S, D]) + extends TestActorRef(app, props, supervisor, address) { private def fsm: T = underlyingActor @@ -81,8 +81,8 @@ class TestFSMRef[S, D, T <: Actor](app: AkkaApplication, props: Props, address: object TestFSMRef { def apply[S, D, T <: Actor](factory: ⇒ T)(implicit ev: T <:< FSM[S, D], app: AkkaApplication): TestFSMRef[S, D, T] = - new TestFSMRef(app, Props(creator = () ⇒ factory), Props.randomAddress) + new TestFSMRef(app, Props(creator = () ⇒ factory), app.guardian, Props.randomAddress) def apply[S, D, T <: Actor](factory: ⇒ T, address: String)(implicit ev: T <:< FSM[S, D], app: AkkaApplication): TestFSMRef[S, D, T] = - new TestFSMRef(app, Props(creator = () ⇒ factory), address) + new TestFSMRef(app, Props(creator = () ⇒ factory), app.guardian, address) } diff --git a/akka-testkit/src/main/scala/akka/testkit/TestKit.scala b/akka-testkit/src/main/scala/akka/testkit/TestKit.scala index f4541f97f6..6a0f60a533 100644 --- a/akka-testkit/src/main/scala/akka/testkit/TestKit.scala +++ b/akka-testkit/src/main/scala/akka/testkit/TestKit.scala @@ -99,7 +99,7 @@ class TestKit(_app: AkkaApplication) { * ActorRef of the test actor. Access is provided to enable e.g. * registration as message target. */ - val testActor: ActorRef = new LocalActorRef(app, Props(new TestActor(queue)).copy(dispatcher = new CallingThreadDispatcher(app)), "testActor" + TestKit.testActorId.incrementAndGet(), true) + val testActor: ActorRef = new LocalActorRef(app, Props(new TestActor(queue)).copy(dispatcher = new CallingThreadDispatcher(app)), app.guardian, "testActor" + TestKit.testActorId.incrementAndGet(), true) private var end: Duration = Duration.Inf diff --git a/akka-testkit/src/test/scala/akka/testkit/TestActorRefSpec.scala b/akka-testkit/src/test/scala/akka/testkit/TestActorRefSpec.scala index 4546c60a85..434c053e54 100644 --- a/akka-testkit/src/test/scala/akka/testkit/TestActorRefSpec.scala +++ b/akka-testkit/src/test/scala/akka/testkit/TestActorRefSpec.scala @@ -172,11 +172,11 @@ class TestActorRefSpec extends AkkaSpec with BeforeAndAfterEach { val boss = TestActorRef(Props(new TActor { - val ref = TestActorRef(Props(new TActor { + val ref = new TestActorRef(app, Props(new TActor { def receiveT = { case _ ⇒ } override def preRestart(reason: Throwable, msg: Option[Any]) { counter -= 1 } override def postRestart(reason: Throwable) { counter -= 1 } - }).withSupervisor(self)) + }), self, "child") def receiveT = { case "sendKill" ⇒ ref ! Kill } }).withFaultHandler(OneForOneStrategy(List(classOf[ActorKilledException]), 5, 1000)))