From cb85778b126ee6d17ea7bba235120d85b0866a3b Mon Sep 17 00:00:00 2001 From: Roland Date: Wed, 14 Dec 2011 00:06:36 +0100 Subject: [PATCH 1/6] remove ActorRef.stop() MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - replace ActorRef.stop() by ActorRefFactory.stop(child) everywhere - ActorCell “optimizes” this to remove the child from its childrenRefs in order to allow immediate recycling of the name - the lost soul must have a place, for which the Locker has been created, where Davy Jones will happily rebind the soul to his ship (i.e. set “parent” to the locker to avoid mem leak) and periodically revisit it (.stop(), in case of that being lost in comm failure, similar .watch() to re-check liveness) --- .../ActorFireForgetRequestReplySpec.scala | 2 +- .../scala/akka/actor/ActorLifeCycleSpec.scala | 8 +-- .../test/scala/akka/actor/ActorRefSpec.scala | 6 +- .../scala/akka/actor/ActorTimeoutSpec.scala | 6 +- .../src/test/scala/akka/actor/Bench.scala | 5 +- .../scala/akka/actor/ConsistencySpec.scala | 2 +- .../scala/akka/actor/DeathWatchSpec.scala | 18 +++--- .../test/scala/akka/actor/FSMActorSpec.scala | 2 +- .../scala/akka/actor/FSMTransitionSpec.scala | 2 +- .../src/test/scala/akka/actor/IOActor.scala | 26 ++++---- .../scala/akka/actor/ReceiveTimeoutSpec.scala | 8 +-- .../scala/akka/actor/SupervisorMiscSpec.scala | 2 +- .../scala/akka/actor/SupervisorSpec.scala | 2 +- .../scala/akka/actor/SupervisorTreeSpec.scala | 2 +- .../test/scala/akka/actor/Ticket669Spec.scala | 4 +- .../akka/actor/dispatch/ActorModelSpec.scala | 22 +++---- .../dispatch/BalancingDispatcherSpec.scala | 4 +- .../actor/dispatch/DispatcherActorSpec.scala | 12 ++-- .../actor/dispatch/DispatcherActorsSpec.scala | 4 +- .../akka/actor/dispatch/PinnedActorSpec.scala | 4 +- .../akka/actor/routing/ListenerSpec.scala | 2 +- .../test/scala/akka/dispatch/FutureSpec.scala | 28 ++++----- .../test/scala/akka/event/EventBusSpec.scala | 18 +++--- .../scala/akka/event/LoggingReceiveSpec.scala | 2 +- .../TellLatencyPerformanceSpec.scala | 2 +- .../TellThroughput10000PerformanceSpec.scala | 4 +- ...ThroughputComputationPerformanceSpec.scala | 4 +- .../TellThroughputPerformanceSpec.scala | 4 +- ...utSeparateDispatchersPerformanceSpec.scala | 4 +- .../TradingLatencyPerformanceSpec.scala | 2 +- .../TradingThroughputPerformanceSpec.scala | 2 +- .../scala/akka/routing/ActorPoolSpec.scala | 14 ++--- .../routing/ConfiguredLocalRoutingSpec.scala | 8 +-- .../test/scala/akka/routing/RoutingSpec.scala | 8 +-- .../akka/serialization/SerializeSpec.scala | 2 +- .../src/main/scala/akka/actor/ActorCell.scala | 25 ++++++-- .../src/main/scala/akka/actor/ActorRef.scala | 6 +- .../scala/akka/actor/ActorRefProvider.scala | 35 ++++++++++- .../main/scala/akka/actor/ActorSystem.scala | 22 ++++++- .../src/main/scala/akka/actor/FSM.scala | 2 +- .../main/scala/akka/actor/FaultHandling.scala | 24 +++---- .../src/main/scala/akka/actor/Locker.scala | 63 +++++++++++++++++++ .../src/main/scala/akka/actor/Scheduler.scala | 9 +++ .../main/scala/akka/actor/TypedActor.scala | 2 +- .../src/main/scala/akka/event/Logging.scala | 5 +- .../akka/routing/ConnectionManager.scala | 46 -------------- .../src/main/scala/akka/routing/Routing.scala | 2 +- .../scala/akka/util/ListenerManagement.scala | 6 +- akka-docs/common/code/SchedulerDocSpec.scala | 2 +- akka-docs/scala/code/ActorDocSpec.scala | 12 ++-- .../akka/remote/RemoteConnectionManager.scala | 4 +- .../RandomRoutedRemoteActorMultiJvmSpec.scala | 2 +- ...ndRobinRoutedRemoteActorMultiJvmSpec.scala | 2 +- ...rGatherRoutedRemoteActorMultiJvmSpec.scala | 2 +- .../akka/remote/RemoteCommunicationSpec.scala | 2 +- .../src/main/scala/akka/agent/Agent.scala | 4 +- .../akka/stm/example/EitherOrElseExample.java | 2 +- .../java/akka/stm/example/RetryExample.java | 2 +- .../example/UntypedCoordinatedExample.java | 4 +- .../example/UntypedTransactorExample.java | 4 +- .../test/CoordinatedIncrementSpec.scala | 8 +-- .../transactor/test/FickleFriendsSpec.scala | 4 +- .../akka/transactor/test/TransactorSpec.scala | 10 +-- .../test/scala/akka/testkit/AkkaSpec.scala | 4 +- .../scala/akka/testkit/TestActorRefSpec.scala | 2 +- .../java/akka/tutorial/first/java/Pi.java | 2 +- .../src/main/scala/Pi.scala | 2 +- 67 files changed, 328 insertions(+), 238 deletions(-) create mode 100644 akka-actor/src/main/scala/akka/actor/Locker.scala diff --git a/akka-actor-tests/src/test/scala/akka/actor/ActorFireForgetRequestReplySpec.scala b/akka-actor-tests/src/test/scala/akka/actor/ActorFireForgetRequestReplySpec.scala index aa7d76d3dc..e374453901 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/ActorFireForgetRequestReplySpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/ActorFireForgetRequestReplySpec.scala @@ -87,7 +87,7 @@ class ActorFireForgetRequestReplySpec extends AkkaSpec with BeforeAndAfterEach w state.finished.await 1.second.dilated.sleep() actor.isTerminated must be(true) - supervisor.stop() + system.stop(supervisor) } } } diff --git a/akka-actor-tests/src/test/scala/akka/actor/ActorLifeCycleSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/ActorLifeCycleSpec.scala index 8f3a58e5e5..0bb1682b81 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/ActorLifeCycleSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/ActorLifeCycleSpec.scala @@ -61,7 +61,7 @@ class ActorLifeCycleSpec extends AkkaSpec with BeforeAndAfterEach with ImplicitS restarter ! Kill expectMsg(("postStop", id, 3)) expectNoMsg(1 seconds) - supervisor.stop + system.stop(supervisor) } } @@ -92,7 +92,7 @@ class ActorLifeCycleSpec extends AkkaSpec with BeforeAndAfterEach with ImplicitS restarter ! Kill expectMsg(("postStop", id, 3)) expectNoMsg(1 seconds) - supervisor.stop + system.stop(supervisor) } } @@ -105,10 +105,10 @@ class ActorLifeCycleSpec extends AkkaSpec with BeforeAndAfterEach with ImplicitS expectMsg(("preStart", id, 0)) a ! "status" expectMsg(("OK", id, 0)) - a.stop + system.stop(a) expectMsg(("postStop", id, 0)) expectNoMsg(1 seconds) - supervisor.stop + system.stop(supervisor) } } diff --git a/akka-actor-tests/src/test/scala/akka/actor/ActorRefSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/ActorRefSpec.scala index f1cca42011..47dc5dcae8 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/ActorRefSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/ActorRefSpec.scala @@ -42,7 +42,7 @@ object ActorRefSpec { case "work" ⇒ { work sender ! "workDone" - self.stop() + context.stop(self) } case ReplyTo(replyTo) ⇒ { work @@ -344,8 +344,8 @@ class ActorRefSpec extends AkkaSpec with DefaultTimeout { latch.await - clientRef.stop() - serverRef.stop() + system.stop(clientRef) + system.stop(serverRef) } "stop when sent a poison pill" in { diff --git a/akka-actor-tests/src/test/scala/akka/actor/ActorTimeoutSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/ActorTimeoutSpec.scala index 735867bc97..f97c68913e 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/ActorTimeoutSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/ActorTimeoutSpec.scala @@ -29,7 +29,7 @@ class ActorTimeoutSpec extends AkkaSpec with BeforeAndAfterAll with DefaultTimeo try { val f = echo ? "hallo" intercept[FutureTimeoutException] { f.await } - } finally { echo.stop } + } finally { system.stop(echo) } } } @@ -41,14 +41,14 @@ class ActorTimeoutSpec extends AkkaSpec with BeforeAndAfterAll with DefaultTimeo val f = (echo ? "hallo").mapTo[String] intercept[FutureTimeoutException] { f.await } f.value must be(None) - } finally { echo.stop } + } finally { system.stop(echo) } } } "use explicitly supplied timeout" in { within(testTimeout - 100.millis, testTimeout + 300.millis) { val echo = actorWithTimeout(Props.defaultTimeout) - try { (echo.?("hallo", testTimeout)).as[String] must be(None) } finally { echo.stop } + try { (echo.?("hallo", testTimeout)).as[String] must be(None) } finally { system.stop(echo) } } } } diff --git a/akka-actor-tests/src/test/scala/akka/actor/Bench.scala b/akka-actor-tests/src/test/scala/akka/actor/Bench.scala index 4ef5a94b12..98f24b0bc5 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/Bench.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/Bench.scala @@ -85,7 +85,7 @@ object Chameneos { sumMeetings += i if (numFaded == numChameneos) { Chameneos.end = System.currentTimeMillis - self.stop() + context.stop(self) } case msg @ Meet(a, c) ⇒ @@ -107,7 +107,8 @@ object Chameneos { def run { // System.setProperty("akka.config", "akka.conf") Chameneos.start = System.currentTimeMillis - val system = ActorSystem().actorOf(Props(new Mall(1000000, 4))) + val system = ActorSystem() + val actor = system.actorOf(Props(new Mall(1000000, 4))) Thread.sleep(10000) println("Elapsed: " + (end - start)) system.stop() diff --git a/akka-actor-tests/src/test/scala/akka/actor/ConsistencySpec.scala b/akka-actor-tests/src/test/scala/akka/actor/ConsistencySpec.scala index 1118daff1c..1638cd9e4b 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/ConsistencySpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/ConsistencySpec.scala @@ -26,7 +26,7 @@ object ConsistencySpec { } lastStep = step - case "done" ⇒ sender ! "done"; self.stop() + case "done" ⇒ sender ! "done"; context.stop(self) } } } diff --git a/akka-actor-tests/src/test/scala/akka/actor/DeathWatchSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/DeathWatchSpec.scala index 9aba8979c1..c3fae8e8a8 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/DeathWatchSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/DeathWatchSpec.scala @@ -43,9 +43,9 @@ class DeathWatchSpec extends AkkaSpec with BeforeAndAfterEach with ImplicitSende expectTerminationOf(terminal) expectTerminationOf(terminal) - monitor1.stop() - monitor2.stop() - monitor3.stop() + system.stop(monitor1) + system.stop(monitor2) + system.stop(monitor3) } "notify with _current_ monitors with one Terminated message when an Actor is stopped" in { @@ -69,9 +69,9 @@ class DeathWatchSpec extends AkkaSpec with BeforeAndAfterEach with ImplicitSende expectTerminationOf(terminal) expectTerminationOf(terminal) - monitor1.stop() - monitor2.stop() - monitor3.stop() + system.stop(monitor1) + system.stop(monitor2) + system.stop(monitor3) } "notify with a Terminated message once when an Actor is stopped but not when restarted" in { @@ -90,7 +90,7 @@ class DeathWatchSpec extends AkkaSpec with BeforeAndAfterEach with ImplicitSende expectTerminationOf(terminal) terminal.isTerminated must be === true - supervisor.stop() + system.stop(supervisor) } } @@ -99,9 +99,9 @@ class DeathWatchSpec extends AkkaSpec with BeforeAndAfterEach with ImplicitSende case class FF(fail: Failed) val supervisor = system.actorOf(Props[Supervisor] .withFaultHandler(new OneForOneStrategy(FaultHandlingStrategy.makeDecider(List(classOf[Exception])), Some(0)) { - override def handleFailure(child: ActorRef, cause: Throwable, stats: ChildRestartStats, children: Iterable[ChildRestartStats]) = { + override def handleFailure(context: ActorContext, child: ActorRef, cause: Throwable, stats: ChildRestartStats, children: Iterable[ChildRestartStats]) = { testActor.tell(FF(Failed(cause)), child) - super.handleFailure(child, cause, stats, children) + super.handleFailure(context, child, cause, stats, children) } })) diff --git a/akka-actor-tests/src/test/scala/akka/actor/FSMActorSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/FSMActorSpec.scala index e4a30e10e0..89156ff83f 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/FSMActorSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/FSMActorSpec.scala @@ -187,7 +187,7 @@ class FSMActorSpec extends AkkaSpec(Map("akka.actor.debug.fsm" -> true)) with Im } val ref = system.actorOf(Props(fsm)) started.await - ref.stop() + system.stop(ref) expectMsg(1 second, fsm.StopEvent(Shutdown, 1, null)) } diff --git a/akka-actor-tests/src/test/scala/akka/actor/FSMTransitionSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/FSMTransitionSpec.scala index 1b1f90e5b3..9db408770c 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/FSMTransitionSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/FSMTransitionSpec.scala @@ -78,7 +78,7 @@ class FSMTransitionSpec extends AkkaSpec with ImplicitSender { within(300 millis) { fsm ! SubscribeTransitionCallBack(forward) expectMsg(CurrentState(fsm, 0)) - forward.stop() + system.stop(forward) fsm ! "tick" expectNoMsg } diff --git a/akka-actor-tests/src/test/scala/akka/actor/IOActor.scala b/akka-actor-tests/src/test/scala/akka/actor/IOActor.scala index f2127d92bc..7047760371 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/IOActor.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/IOActor.scala @@ -196,9 +196,9 @@ class IOActorSpec extends AkkaSpec with BeforeAndAfterEach with DefaultTimeout { f1.get must equal(ByteString("Hello World!1")) f2.get must equal(ByteString("Hello World!2")) f3.get must equal(ByteString("Hello World!3")) - client.stop - server.stop - ioManager.stop + system.stop(client) + system.stop(server) + system.stop(ioManager) } "run echo server under high load" in { @@ -210,9 +210,9 @@ class IOActorSpec extends AkkaSpec with BeforeAndAfterEach with DefaultTimeout { val list = List.range(0, 1000) val f = Future.traverse(list)(i ⇒ client ? ByteString(i.toString)) assert(f.get.size === 1000) - client.stop - server.stop - ioManager.stop + system.stop(client) + system.stop(server) + system.stop(ioManager) } "run echo server under high load with small buffer" in { @@ -224,9 +224,9 @@ class IOActorSpec extends AkkaSpec with BeforeAndAfterEach with DefaultTimeout { val list = List.range(0, 1000) val f = Future.traverse(list)(i ⇒ client ? ByteString(i.toString)) assert(f.get.size === 1000) - client.stop - server.stop - ioManager.stop + system.stop(client) + system.stop(server) + system.stop(ioManager) } "run key-value store" in { @@ -250,10 +250,10 @@ class IOActorSpec extends AkkaSpec with BeforeAndAfterEach with DefaultTimeout { f4.get must equal("OK") f5.get must equal(ByteString("I'm a test!")) f6.get must equal(Map("hello" -> ByteString("World"), "test" -> ByteString("I'm a test!"))) - client1.stop - client2.stop - server.stop - ioManager.stop + system.stop(client1) + system.stop(client2) + system.stop(server) + system.stop(ioManager) } } diff --git a/akka-actor-tests/src/test/scala/akka/actor/ReceiveTimeoutSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/ReceiveTimeoutSpec.scala index 02b5aab8c1..9706a77d9f 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/ReceiveTimeoutSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/ReceiveTimeoutSpec.scala @@ -26,7 +26,7 @@ class ReceiveTimeoutSpec extends AkkaSpec { })) timeoutLatch.await - timeoutActor.stop() + system.stop(timeoutActor) } "reschedule timeout after regular receive" in { @@ -45,7 +45,7 @@ class ReceiveTimeoutSpec extends AkkaSpec { timeoutActor ! Tick timeoutLatch.await - timeoutActor.stop() + system.stop(timeoutActor) } "be able to turn off timeout if desired" in { @@ -69,7 +69,7 @@ class ReceiveTimeoutSpec extends AkkaSpec { timeoutLatch.await count.get must be(1) - timeoutActor.stop() + system.stop(timeoutActor) } "not receive timeout message when not specified" in { @@ -82,7 +82,7 @@ class ReceiveTimeoutSpec extends AkkaSpec { })) timeoutLatch.awaitTimeout(1 second) // timeout expected - timeoutActor.stop() + system.stop(timeoutActor) } "have ReceiveTimeout eq to Actors ReceiveTimeout" in { diff --git a/akka-actor-tests/src/test/scala/akka/actor/SupervisorMiscSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/SupervisorMiscSpec.scala index 6438d6eee3..899626330f 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/SupervisorMiscSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/SupervisorMiscSpec.scala @@ -24,7 +24,7 @@ class SupervisorMiscSpec extends AkkaSpec with DefaultTimeout { override def postRestart(cause: Throwable) { countDownLatch.countDown() } protected def receive = { case "status" ⇒ this.sender ! "OK" - case _ ⇒ this.self.stop() + case _ ⇒ this.context.stop(self) } }) diff --git a/akka-actor-tests/src/test/scala/akka/actor/SupervisorSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/SupervisorSpec.scala index fdd87a2ba4..a78c8576c2 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/SupervisorSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/SupervisorSpec.scala @@ -306,7 +306,7 @@ class SupervisorSpec extends AkkaSpec with BeforeAndAfterEach with ImplicitSende inits.get must be(3) - supervisor.stop() + system.stop(supervisor) } } } diff --git a/akka-actor-tests/src/test/scala/akka/actor/SupervisorTreeSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/SupervisorTreeSpec.scala index 9ed84ca2b6..c3723b8564 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/SupervisorTreeSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/SupervisorTreeSpec.scala @@ -35,7 +35,7 @@ class SupervisorTreeSpec extends AkkaSpec with ImplicitSender with DefaultTimeou expectMsg(middleActor.path) expectMsg(lastActor.path) expectNoMsg(2 seconds) - headActor.stop() + system.stop(headActor) } } } diff --git a/akka-actor-tests/src/test/scala/akka/actor/Ticket669Spec.scala b/akka-actor-tests/src/test/scala/akka/actor/Ticket669Spec.scala index 154ba58fcd..032c2ade05 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/Ticket669Spec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/Ticket669Spec.scala @@ -28,7 +28,7 @@ class Ticket669Spec extends AkkaSpec with BeforeAndAfterAll with ImplicitSender supervised.!("test")(testActor) expectMsg("failure1") - supervisor.stop() + system.stop(supervisor) } } @@ -39,7 +39,7 @@ class Ticket669Spec extends AkkaSpec with BeforeAndAfterAll with ImplicitSender supervised.!("test")(testActor) expectMsg("failure2") - supervisor.stop() + system.stop(supervisor) } } } diff --git a/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala index bffa5bac82..5bcf8467c3 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala @@ -77,7 +77,7 @@ object ActorModelSpec { case Forward(to, msg) ⇒ ack; to.forward(msg); busy.switchOff() case CountDown(latch) ⇒ ack; latch.countDown(); busy.switchOff() case Increment(count) ⇒ ack; count.incrementAndGet(); busy.switchOff() - case CountDownNStop(l) ⇒ ack; l.countDown(); self.stop(); busy.switchOff() + case CountDownNStop(l) ⇒ ack; l.countDown(); context.stop(self); busy.switchOff() case Restart ⇒ ack; busy.switchOff(); throw new Exception("Restart requested") case Interrupt ⇒ ack; sender ! Status.Failure(new ActorInterruptedException(new InterruptedException("Ping!"))); busy.switchOff(); throw new InterruptedException("Ping!") case ThrowException(e: Throwable) ⇒ ack; busy.switchOff(); throw e @@ -239,7 +239,7 @@ abstract class ActorModelSpec extends AkkaSpec with DefaultTimeout { assertDispatcher(dispatcher)(stops = 0) val a = newTestActor(dispatcher) assertDispatcher(dispatcher)(stops = 0) - a.stop() + system.stop(a) assertDispatcher(dispatcher)(stops = 1) assertRef(a, dispatcher)( suspensions = 0, @@ -260,7 +260,7 @@ abstract class ActorModelSpec extends AkkaSpec with DefaultTimeout { assertDispatcher(dispatcher)(stops = 2) - a2.stop + system.stop(a2) assertDispatcher(dispatcher)(stops = 3) } @@ -279,7 +279,7 @@ abstract class ActorModelSpec extends AkkaSpec with DefaultTimeout { assertCountDown(oneAtATime, (1.5 seconds).dilated.toMillis, "Processed message when allowed") assertRefDefaultZero(a)(registers = 1, msgsReceived = 3, msgsProcessed = 3) - a.stop() + system.stop(a) assertRefDefaultZero(a)(registers = 1, unregisters = 1, msgsReceived = 3, msgsProcessed = 3) } @@ -298,7 +298,7 @@ abstract class ActorModelSpec extends AkkaSpec with DefaultTimeout { assertCountDown(counter, 3.seconds.dilated.toMillis, "Should process 200 messages") assertRefDefaultZero(a)(registers = 1, msgsReceived = 200, msgsProcessed = 200) - a.stop() + system.stop(a) } def spawn(f: ⇒ Unit) { @@ -328,7 +328,7 @@ abstract class ActorModelSpec extends AkkaSpec with DefaultTimeout { assertRefDefaultZero(a)(registers = 1, msgsReceived = 1, msgsProcessed = 1, suspensions = 1, resumes = 1) - a.stop() + system.stop(a) assertRefDefaultZero(a)(registers = 1, unregisters = 1, msgsReceived = 1, msgsProcessed = 1, suspensions = 1, resumes = 1) } @@ -370,7 +370,7 @@ abstract class ActorModelSpec extends AkkaSpec with DefaultTimeout { throw e } assertCountDown(stopLatch, waitTime, "Expected all children to stop") - boss.stop() + system.stop(boss) } for (run ← 1 to 3) { flood(50000) @@ -447,8 +447,8 @@ class DispatcherModelSpec extends ActorModelSpec { aStop.countDown() - a.stop - b.stop + system.stop(a) + system.stop(b) while (!a.isTerminated && !b.isTerminated) {} //Busy wait for termination @@ -484,8 +484,8 @@ class BalancingDispatcherModelSpec extends ActorModelSpec { aStop.countDown() - a.stop - b.stop + system.stop(a) + system.stop(b) while (!a.isTerminated && !b.isTerminated) {} //Busy wait for termination diff --git a/akka-actor-tests/src/test/scala/akka/actor/dispatch/BalancingDispatcherSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/dispatch/BalancingDispatcherSpec.scala index 6ebc81409e..8c7054721d 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/dispatch/BalancingDispatcherSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/dispatch/BalancingDispatcherSpec.scala @@ -74,8 +74,8 @@ class BalancingDispatcherSpec extends AkkaSpec { fast.underlying.actor.asInstanceOf[DelayableActor].invocationCount must be > sentToFast fast.underlying.actor.asInstanceOf[DelayableActor].invocationCount must be > (slow.underlying.actor.asInstanceOf[DelayableActor].invocationCount) - slow.stop() - fast.stop() + system.stop(slow) + system.stop(fast) } } } diff --git a/akka-actor-tests/src/test/scala/akka/actor/dispatch/DispatcherActorSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/dispatch/DispatcherActorSpec.scala index c6e04c6cf7..3b3ba56c37 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/dispatch/DispatcherActorSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/dispatch/DispatcherActorSpec.scala @@ -39,14 +39,14 @@ class DispatcherActorSpec extends AkkaSpec with DefaultTimeout { val actor = system.actorOf(Props[OneWayTestActor].withDispatcher(system.dispatcherFactory.newDispatcher("test").build)) val result = actor ! "OneWay" assert(OneWayTestActor.oneWay.await(1, TimeUnit.SECONDS)) - actor.stop() + system.stop(actor) } "support ask/reply" in { val actor = system.actorOf(Props[TestActor].withDispatcher(system.dispatcherFactory.newDispatcher("test").build)) val result = (actor ? "Hello").as[String] assert("World" === result.get) - actor.stop() + system.stop(actor) } "respect the throughput setting" in { @@ -72,8 +72,8 @@ class DispatcherActorSpec extends AkkaSpec with DefaultTimeout { fastOne ! "sabotage" start.countDown() latch.await(10, TimeUnit.SECONDS) - fastOne.stop() - slowOne.stop() + system.stop(fastOne) + system.stop(slowOne) assert(latch.getCount() === 0) } @@ -90,13 +90,13 @@ class DispatcherActorSpec extends AkkaSpec with DefaultTimeout { val fastOne = system.actorOf( Props(context ⇒ { - case "ping" ⇒ if (works.get) latch.countDown(); context.self.stop() + case "ping" ⇒ if (works.get) latch.countDown(); context.stop(context.self) }).withDispatcher(throughputDispatcher)) val slowOne = system.actorOf( Props(context ⇒ { case "hogexecutor" ⇒ ready.countDown(); start.await - case "ping" ⇒ works.set(false); context.self.stop() + case "ping" ⇒ works.set(false); context.stop(context.self) }).withDispatcher(throughputDispatcher)) slowOne ! "hogexecutor" diff --git a/akka-actor-tests/src/test/scala/akka/actor/dispatch/DispatcherActorsSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/dispatch/DispatcherActorsSpec.scala index 8ad5bc641d..d054d15e83 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/dispatch/DispatcherActorsSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/dispatch/DispatcherActorsSpec.scala @@ -49,8 +49,8 @@ class DispatcherActorsSpec extends AkkaSpec { assert(sFinished.getCount > 0) sFinished.await assert(sFinished.getCount === 0) - f.stop() - s.stop() + system.stop(f) + system.stop(s) } } } diff --git a/akka-actor-tests/src/test/scala/akka/actor/dispatch/PinnedActorSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/dispatch/PinnedActorSpec.scala index c4750a4691..fd26780d65 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/dispatch/PinnedActorSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/dispatch/PinnedActorSpec.scala @@ -30,14 +30,14 @@ class PinnedActorSpec extends AkkaSpec with BeforeAndAfterEach with DefaultTimeo val actor = system.actorOf(Props(self ⇒ { case "OneWay" ⇒ oneWay.countDown() }).withDispatcher(system.dispatcherFactory.newPinnedDispatcher("test"))) val result = actor ! "OneWay" assert(oneWay.await(1, TimeUnit.SECONDS)) - actor.stop() + system.stop(actor) } "support ask/reply" in { val actor = system.actorOf(Props[TestActor].withDispatcher(system.dispatcherFactory.newPinnedDispatcher("test"))) val result = (actor ? "Hello").as[String] assert("World" === result.get) - actor.stop() + system.stop(actor) } } } diff --git a/akka-actor-tests/src/test/scala/akka/actor/routing/ListenerSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/routing/ListenerSpec.scala index ab149216a7..38a57fda10 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/routing/ListenerSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/routing/ListenerSpec.scala @@ -50,7 +50,7 @@ class ListenerSpec extends AkkaSpec { fooLatch.await - for (a ← List(broadcast, a1, a2, a3)) a.stop() + for (a ← List(broadcast, a1, a2, a3)) system.stop(a) } } } diff --git a/akka-actor-tests/src/test/scala/akka/dispatch/FutureSpec.scala b/akka-actor-tests/src/test/scala/akka/dispatch/FutureSpec.scala index 44ddf4f8bc..804d3b5e62 100644 --- a/akka-actor-tests/src/test/scala/akka/dispatch/FutureSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/dispatch/FutureSpec.scala @@ -120,7 +120,7 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa val future = actor ? "Hello" future.await test(future, "World") - actor.stop() + system.stop(actor) } } "throws an exception" must { @@ -130,7 +130,7 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa val future = actor ? "Failure" future.await test(future, "Expected exception; to test fault-tolerance") - actor.stop() + system.stop(actor) } } } @@ -144,8 +144,8 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa val future = actor1 ? "Hello" flatMap { case s: String ⇒ actor2 ? s } future.await test(future, "WORLD") - actor1.stop() - actor2.stop() + system.stop(actor1) + system.stop(actor2) } } "will throw an exception" must { @@ -156,8 +156,8 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa val future = actor1 ? "Hello" flatMap { case s: String ⇒ actor2 ? s } future.await test(future, "/ by zero") - actor1.stop() - actor2.stop() + system.stop(actor1) + system.stop(actor2) } } } @@ -169,8 +169,8 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa val future = actor1 ? "Hello" flatMap { case i: Int ⇒ actor2 ? i } future.await test(future, "World (of class java.lang.String)") - actor1.stop() - actor2.stop() + system.stop(actor1) + system.stop(actor2) } } } @@ -204,7 +204,7 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa future1.get must be("10-14") assert(checkType(future1, manifest[String])) intercept[ClassCastException] { future2.get } - actor.stop() + system.stop(actor) } } @@ -233,7 +233,7 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa future1.get must be("10-14") intercept[MatchError] { future2.get } - actor.stop() + system.stop(actor) } } @@ -280,7 +280,7 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa future10.get must be("World") future11.get must be("Oops!") - actor.stop() + system.stop(actor) } } @@ -396,7 +396,7 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa val actor = system.actorOf(Props[TestActor]) actor ? "Hello" onResult { case "World" ⇒ latch.open } assert(latch.tryAwait(5, TimeUnit.SECONDS)) - actor.stop() + system.stop(actor) } "shouldTraverseFutures" in { @@ -411,7 +411,7 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa val oddFutures = List.fill(100)(oddActor ? 'GetNext mapTo manifest[Int]) assert(Future.sequence(oddFutures).get.sum === 10000) - oddActor.stop() + system.stop(oddActor) val list = (1 to 100).toList assert(Future.traverse(list)(x ⇒ Future(x * 2 - 1)).get.sum === 10000) @@ -470,7 +470,7 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa assert(r.get === "Hello World!") - actor.stop + system.stop(actor) } "futureComposingWithContinuationsFailureDivideZero" in { diff --git a/akka-actor-tests/src/test/scala/akka/event/EventBusSpec.scala b/akka-actor-tests/src/test/scala/akka/event/EventBusSpec.scala index 5923e84305..fd7fd4c1b0 100644 --- a/akka-actor-tests/src/test/scala/akka/event/EventBusSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/event/EventBusSpec.scala @@ -8,7 +8,7 @@ import org.scalatest.BeforeAndAfterEach import akka.testkit._ import akka.util.duration._ import java.util.concurrent.atomic._ -import akka.actor.{ Props, Actor, ActorRef } +import akka.actor.{ Props, Actor, ActorRef, ActorSystem } import java.util.Comparator import akka.japi.{ Procedure, Function } @@ -33,7 +33,7 @@ abstract class EventBusSpec(busName: String) extends AkkaSpec with BeforeAndAfte def classifierFor(event: BusType#Event): BusType#Classifier - def disposeSubscriber(subscriber: BusType#Subscriber): Unit + def disposeSubscriber(system: ActorSystem, subscriber: BusType#Subscriber): Unit busName must { @@ -58,7 +58,7 @@ abstract class EventBusSpec(busName: String) extends AkkaSpec with BeforeAndAfte "not allow to unsubscribe non-existing subscriber" in { val sub = createNewSubscriber() bus.unsubscribe(sub, classifier) must be === false - disposeSubscriber(sub) + disposeSubscriber(system, sub) } "not allow for the same subscriber to subscribe to the same channel twice" in { @@ -80,7 +80,7 @@ abstract class EventBusSpec(busName: String) extends AkkaSpec with BeforeAndAfte subscribers.zip(classifiers) forall { case (s, c) ⇒ bus.subscribe(s, c) } must be === true subscribers.zip(classifiers) forall { case (s, c) ⇒ bus.unsubscribe(s, c) } must be === true - subscribers foreach disposeSubscriber + subscribers foreach (disposeSubscriber(system, _)) } "publishing events without any subscribers shouldn't be a problem" in { @@ -113,7 +113,7 @@ abstract class EventBusSpec(busName: String) extends AkkaSpec with BeforeAndAfte subscribers foreach { s ⇒ bus.subscribe(s, classifier) must be === true } bus.publish(event) range foreach { _ ⇒ expectMsg(event) } - subscribers foreach { s ⇒ bus.unsubscribe(s, classifier) must be === true; disposeSubscriber(s) } + subscribers foreach { s ⇒ bus.unsubscribe(s, classifier) must be === true; disposeSubscriber(system, s) } } "not publish the given event to any other subscribers than the intended ones" in { @@ -136,7 +136,7 @@ abstract class EventBusSpec(busName: String) extends AkkaSpec with BeforeAndAfte } "cleanup subscriber" in { - disposeSubscriber(subscriber) + disposeSubscriber(system, subscriber) } } } @@ -165,7 +165,7 @@ class ActorEventBusSpec extends EventBusSpec("ActorEventBus") { def classifierFor(event: BusType#Event) = event.toString - def disposeSubscriber(subscriber: BusType#Subscriber): Unit = subscriber.stop() + def disposeSubscriber(system: ActorSystem, subscriber: BusType#Subscriber): Unit = system.stop(subscriber) } object ScanningEventBusSpec { @@ -194,7 +194,7 @@ class ScanningEventBusSpec extends EventBusSpec("ScanningEventBus") { def classifierFor(event: BusType#Event) = event.toString - def disposeSubscriber(subscriber: BusType#Subscriber): Unit = () + def disposeSubscriber(system: ActorSystem, subscriber: BusType#Subscriber): Unit = () } object LookupEventBusSpec { @@ -219,5 +219,5 @@ class LookupEventBusSpec extends EventBusSpec("LookupEventBus") { def classifierFor(event: BusType#Event) = event.toString - def disposeSubscriber(subscriber: BusType#Subscriber): Unit = () + def disposeSubscriber(system: ActorSystem, subscriber: BusType#Subscriber): Unit = () } diff --git a/akka-actor-tests/src/test/scala/akka/event/LoggingReceiveSpec.scala b/akka-actor-tests/src/test/scala/akka/event/LoggingReceiveSpec.scala index 6427997b78..e38cc24413 100644 --- a/akka-actor-tests/src/test/scala/akka/event/LoggingReceiveSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/event/LoggingReceiveSpec.scala @@ -201,7 +201,7 @@ class LoggingReceiveSpec extends WordSpec with BeforeAndAfterEach with BeforeAnd assert(set == Set(1, 2, 3), set + " was not Set(1, 2, 3)") } - supervisor.stop() + system.stop(supervisor) expectMsg(Logging.Debug(sname, "stopping")) expectMsg(Logging.Debug(aname, "stopped")) expectMsg(Logging.Debug(sname, "stopped")) diff --git a/akka-actor-tests/src/test/scala/akka/performance/microbench/TellLatencyPerformanceSpec.scala b/akka-actor-tests/src/test/scala/akka/performance/microbench/TellLatencyPerformanceSpec.scala index c1de7702e3..ace20bb662 100644 --- a/akka-actor-tests/src/test/scala/akka/performance/microbench/TellLatencyPerformanceSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/performance/microbench/TellLatencyPerformanceSpec.scala @@ -75,7 +75,7 @@ class TellLatencyPerformanceSpec extends PerformanceSpec { ok must be(true) logMeasurement(numberOfClients, durationNs, stat) } - clients.foreach(_.stop()) + clients.foreach(system.stop(_)) } } diff --git a/akka-actor-tests/src/test/scala/akka/performance/microbench/TellThroughput10000PerformanceSpec.scala b/akka-actor-tests/src/test/scala/akka/performance/microbench/TellThroughput10000PerformanceSpec.scala index 29109f8472..4541c093ca 100644 --- a/akka-actor-tests/src/test/scala/akka/performance/microbench/TellThroughput10000PerformanceSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/performance/microbench/TellThroughput10000PerformanceSpec.scala @@ -173,8 +173,8 @@ class TellThroughput10000PerformanceSpec extends PerformanceSpec { ok must be(true) logMeasurement(numberOfClients, durationNs, repeat) } - clients.foreach(_.stop()) - destinations.foreach(_.stop()) + clients.foreach(system.stop(_)) + destinations.foreach(system.stop(_)) } } diff --git a/akka-actor-tests/src/test/scala/akka/performance/microbench/TellThroughputComputationPerformanceSpec.scala b/akka-actor-tests/src/test/scala/akka/performance/microbench/TellThroughputComputationPerformanceSpec.scala index 6a20f982dd..d9f6988231 100644 --- a/akka-actor-tests/src/test/scala/akka/performance/microbench/TellThroughputComputationPerformanceSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/performance/microbench/TellThroughputComputationPerformanceSpec.scala @@ -147,8 +147,8 @@ class TellThroughputComputationPerformanceSpec extends PerformanceSpec { ok must be(true) logMeasurement(numberOfClients, durationNs, repeat) } - clients.foreach(_.stop()) - destinations.foreach(_.stop()) + clients.foreach(system.stop(_)) + destinations.foreach(system.stop(_)) } } diff --git a/akka-actor-tests/src/test/scala/akka/performance/microbench/TellThroughputPerformanceSpec.scala b/akka-actor-tests/src/test/scala/akka/performance/microbench/TellThroughputPerformanceSpec.scala index 111cc8fc6a..a1c9d1c271 100644 --- a/akka-actor-tests/src/test/scala/akka/performance/microbench/TellThroughputPerformanceSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/performance/microbench/TellThroughputPerformanceSpec.scala @@ -78,8 +78,8 @@ class TellThroughputPerformanceSpec extends PerformanceSpec { ok must be(true) logMeasurement(numberOfClients, durationNs, repeat) } - clients.foreach(_.stop()) - destinations.foreach(_.stop()) + clients.foreach(system.stop(_)) + destinations.foreach(system.stop(_)) } } diff --git a/akka-actor-tests/src/test/scala/akka/performance/microbench/TellThroughputSeparateDispatchersPerformanceSpec.scala b/akka-actor-tests/src/test/scala/akka/performance/microbench/TellThroughputSeparateDispatchersPerformanceSpec.scala index ca471b2222..41a969badc 100644 --- a/akka-actor-tests/src/test/scala/akka/performance/microbench/TellThroughputSeparateDispatchersPerformanceSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/performance/microbench/TellThroughputSeparateDispatchersPerformanceSpec.scala @@ -159,8 +159,8 @@ class TellThroughputSeparateDispatchersPerformanceSpec extends PerformanceSpec { ok must be(true) logMeasurement(numberOfClients, durationNs, repeat) } - clients.foreach(_.stop()) - destinations.foreach(_.stop()) + clients.foreach(system.stop(_)) + destinations.foreach(system.stop(_)) } } diff --git a/akka-actor-tests/src/test/scala/akka/performance/trading/system/TradingLatencyPerformanceSpec.scala b/akka-actor-tests/src/test/scala/akka/performance/trading/system/TradingLatencyPerformanceSpec.scala index 6470f6c0ba..f86987270a 100644 --- a/akka-actor-tests/src/test/scala/akka/performance/trading/system/TradingLatencyPerformanceSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/performance/trading/system/TradingLatencyPerformanceSpec.scala @@ -108,7 +108,7 @@ class TradingLatencyPerformanceSpec extends PerformanceSpec { } logMeasurement(numberOfClients, durationNs, stat) } - clients.foreach(_.stop()) + clients.foreach(system.stop(_)) } } diff --git a/akka-actor-tests/src/test/scala/akka/performance/trading/system/TradingThroughputPerformanceSpec.scala b/akka-actor-tests/src/test/scala/akka/performance/trading/system/TradingThroughputPerformanceSpec.scala index aca85b8d3d..88a9ce21a0 100644 --- a/akka-actor-tests/src/test/scala/akka/performance/trading/system/TradingThroughputPerformanceSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/performance/trading/system/TradingThroughputPerformanceSpec.scala @@ -105,7 +105,7 @@ class TradingThroughputPerformanceSpec extends PerformanceSpec { } logMeasurement(numberOfClients, durationNs, totalNumberOfOrders) } - clients.foreach(_.stop()) + clients.foreach(system.stop(_)) } } diff --git a/akka-actor-tests/src/test/scala/akka/routing/ActorPoolSpec.scala b/akka-actor-tests/src/test/scala/akka/routing/ActorPoolSpec.scala index 1893732686..9ef69ab028 100644 --- a/akka-actor-tests/src/test/scala/akka/routing/ActorPoolSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/routing/ActorPoolSpec.scala @@ -99,7 +99,7 @@ class ActorPoolSpec extends AkkaSpec with DefaultTimeout { (pool ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2) - pool.stop() + system.stop(pool) } "pass ticket #705" in { @@ -129,7 +129,7 @@ class ActorPoolSpec extends AkkaSpec with DefaultTimeout { _.await.resultOrException.get must be("Response") } } finally { - pool.stop() + system.stop(pool) } } @@ -194,7 +194,7 @@ class ActorPoolSpec extends AkkaSpec with DefaultTimeout { (pool ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(4) - pool.stop() + system.stop(pool) } "grow as needed under mailbox pressure" in { @@ -250,7 +250,7 @@ class ActorPoolSpec extends AkkaSpec with DefaultTimeout { (pool ? ActorPool.Stat).as[ActorPool.Stats].get.size must be >= (3) - pool.stop() + system.stop(pool) } "round robin" in { @@ -281,7 +281,7 @@ class ActorPoolSpec extends AkkaSpec with DefaultTimeout { latch1.await delegates.size must be(1) - pool1.stop() + system.stop(pool1) val latch2 = TestLatch(2) delegates.clear() @@ -309,7 +309,7 @@ class ActorPoolSpec extends AkkaSpec with DefaultTimeout { latch2.await delegates.size must be(2) - pool2.stop() + system.stop(pool2) } "backoff" in { @@ -355,7 +355,7 @@ class ActorPoolSpec extends AkkaSpec with DefaultTimeout { (pool ? ActorPool.Stat).as[ActorPool.Stats].get.size must be <= (z) - pool.stop() + system.stop(pool) } } } diff --git a/akka-actor-tests/src/test/scala/akka/routing/ConfiguredLocalRoutingSpec.scala b/akka-actor-tests/src/test/scala/akka/routing/ConfiguredLocalRoutingSpec.scala index 0b6cdae645..ff72daa101 100644 --- a/akka-actor-tests/src/test/scala/akka/routing/ConfiguredLocalRoutingSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/routing/ConfiguredLocalRoutingSpec.scala @@ -49,7 +49,7 @@ class ConfiguredLocalRoutingSpec extends AkkaSpec with DefaultTimeout with Impli actor ! "hello" helloLatch.await(5, TimeUnit.SECONDS) must be(true) - actor.stop() + system.stop(actor) stopLatch.await(5, TimeUnit.SECONDS) must be(true) } @@ -104,7 +104,7 @@ class ConfiguredLocalRoutingSpec extends AkkaSpec with DefaultTimeout with Impli actor ! Broadcast("hello") helloLatch.await(5, TimeUnit.SECONDS) must be(true) - actor.stop() + system.stop(actor) stopLatch.await(5, TimeUnit.SECONDS) must be(true) } } @@ -134,7 +134,7 @@ class ConfiguredLocalRoutingSpec extends AkkaSpec with DefaultTimeout with Impli for (i ← 1 to 5) expectMsg("world") } - actor.stop() + system.stop(actor) stopLatch.await(5, TimeUnit.SECONDS) must be(true) } @@ -190,7 +190,7 @@ class ConfiguredLocalRoutingSpec extends AkkaSpec with DefaultTimeout with Impli actor ! Broadcast("hello") helloLatch.await(5, TimeUnit.SECONDS) must be(true) - actor.stop() + system.stop(actor) stopLatch.await(5, TimeUnit.SECONDS) must be(true) } } diff --git a/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala b/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala index e6e0f1c898..ea8b5a6e05 100644 --- a/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala @@ -43,7 +43,7 @@ class RoutingSpec extends AkkaSpec with DefaultTimeout with ImplicitSender { val c1, c2 = expectMsgType[ActorRef] watch(router) watch(c2) - c2.stop() + system.stop(c2) expectMsg(Terminated(c2)) // it might take a while until the Router has actually processed the Terminated message awaitCond { @@ -54,7 +54,7 @@ class RoutingSpec extends AkkaSpec with DefaultTimeout with ImplicitSender { } res == Seq(c1, c1) } - c1.stop() + system.stop(c1) expectMsg(Terminated(router)) } @@ -324,8 +324,8 @@ class RoutingSpec extends AkkaSpec with DefaultTimeout with ImplicitSender { def newActor(id: Int, shudownLatch: Option[TestLatch] = None) = system.actorOf(Props(new Actor { def receive = { - case Stop(None) ⇒ self.stop() - case Stop(Some(_id)) if (_id == id) ⇒ self.stop() + case Stop(None) ⇒ context.stop(self) + case Stop(Some(_id)) if (_id == id) ⇒ context.stop(self) case _id: Int if (_id == id) ⇒ case x ⇒ { Thread sleep 100 * id diff --git a/akka-actor-tests/src/test/scala/akka/serialization/SerializeSpec.scala b/akka-actor-tests/src/test/scala/akka/serialization/SerializeSpec.scala index 3b0b6ea5bc..8617ded486 100644 --- a/akka-actor-tests/src/test/scala/akka/serialization/SerializeSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/serialization/SerializeSpec.scala @@ -106,7 +106,7 @@ class SerializeSpec extends AkkaSpec(SerializeSpec.serializationConf) { })) a ! new ObjectOutputStream(new ByteArrayOutputStream()) expectMsg("pass") - a.stop() + system.stop(a) } "serialize DeadLetterActorRef" in { diff --git a/akka-actor/src/main/scala/akka/actor/ActorCell.scala b/akka-actor/src/main/scala/akka/actor/ActorCell.scala index c4053081cd..74bd482162 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorCell.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorCell.scala @@ -185,7 +185,7 @@ private[akka] class ActorCell( val system: ActorSystemImpl, val self: InternalActorRef, val props: Props, - val parent: InternalActorRef, + final val parent: InternalActorRef, /*no member*/ _receiveTimeout: Option[Duration], var hotswap: Stack[PartialFunction[Any, Unit]]) extends UntypedActorContext { @@ -242,6 +242,16 @@ private[akka] class ActorCell( _actorOf(props, name) } + final def stop(actor: ActorRef): Unit = { + val a = actor.asInstanceOf[InternalActorRef] + if (childrenRefs contains actor.path.name) { + system.locker ! a + childrenRefs -= actor.path.name + handleChildTerminated(actor) + } + a.stop() + } + final var currentMessage: Envelope = null final var actor: Actor = _ @@ -405,7 +415,8 @@ private[akka] class ActorCell( // do not process normal messages while waiting for all children to terminate dispatcher suspend this if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self.path.toString, "stopping")) - for (child ← c) child.stop() + // do not use stop(child) because that would dissociate the children from us, but we still want to wait for them + for (child ← c) child.asInstanceOf[InternalActorRef].stop() stopping = true } } @@ -550,15 +561,17 @@ private[akka] class ActorCell( } final def handleFailure(child: ActorRef, cause: Throwable): Unit = childrenRefs.get(child.path.name) match { - case Some(stats) if stats.child == child ⇒ if (!props.faultHandler.handleFailure(child, cause, stats, childrenRefs.values)) throw cause + case Some(stats) if stats.child == child ⇒ if (!props.faultHandler.handleFailure(this, child, cause, stats, childrenRefs.values)) throw cause case Some(stats) ⇒ system.eventStream.publish(Warning(self.path.toString, "dropping Failed(" + cause + ") from unknown child " + child + " matching names but not the same, was: " + stats.child)) case None ⇒ system.eventStream.publish(Warning(self.path.toString, "dropping Failed(" + cause + ") from unknown child " + child)) } final def handleChildTerminated(child: ActorRef): Unit = { - childrenRefs -= child.path.name - props.faultHandler.handleChildTerminated(child, children) - if (stopping && childrenRefs.isEmpty) doTerminate() + if (childrenRefs contains child.path.name) { + childrenRefs -= child.path.name + props.faultHandler.handleChildTerminated(this, child, children) + if (stopping && childrenRefs.isEmpty) doTerminate() + } else system.locker ! ChildTerminated(child) } // ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅ diff --git a/akka-actor/src/main/scala/akka/actor/ActorRef.scala b/akka-actor/src/main/scala/akka/actor/ActorRef.scala index 3fc3574e75..f2b8afa1e2 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRef.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRef.scala @@ -110,11 +110,6 @@ abstract class ActorRef extends java.lang.Comparable[ActorRef] with Serializable */ def forward(message: Any)(implicit context: ActorContext) = tell(message, context.sender) - /** - * Shuts down the actor its dispatcher and message queue. - */ - def stop(): Unit - /** * Is the actor shut down? */ @@ -192,6 +187,7 @@ private[akka] abstract class InternalActorRef extends ActorRef with ScalaActorRe def resume(): Unit def suspend(): Unit def restart(cause: Throwable): Unit + def stop(): Unit def sendSystemMessage(message: SystemMessage): Unit def getParent: InternalActorRef /** diff --git a/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala b/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala index 74762f170b..01cde0720c 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala @@ -234,6 +234,14 @@ trait ActorRefFactory { * replies in order to resolve the matching set of actors. */ def actorSelection(path: String): ActorSelection = ActorSelection(lookupRoot, path) + + /** + * Stop the actor pointed to by the given [[akka.actor.ActorRef]]; this is + * an asynchronous operation, i.e. involves a message send, but if invoked + * on an [[akka.actor.ActorContext]] if operating on a child of that + * context it will free up the name for immediate reuse. + */ + def stop(actor: ActorRef): Unit } class ActorRefProviderException(message: String) extends AkkaException(message) @@ -248,6 +256,11 @@ private[akka] case class CreateChild(props: Props, name: String) */ private[akka] case class CreateRandomNameChild(props: Props) +/** + * Internal Akka use only, used in implementation of system.stop(child). + */ +private[akka] case class StopChild(child: ActorRef) + /** * Local ActorRef provider. */ @@ -309,7 +322,7 @@ class LocalActorRefProvider( override def isTerminated = stopped.isOn override def !(message: Any)(implicit sender: ActorRef = null): Unit = stopped.ifOff(message match { - case Failed(ex) if sender ne null ⇒ causeOfTermination = Some(ex); sender.stop() + case Failed(ex) if sender ne null ⇒ causeOfTermination = Some(ex); sender.asInstanceOf[InternalActorRef].stop() case _ ⇒ log.error(this + " received unexpected message [" + message + "]") }) @@ -329,9 +342,10 @@ class LocalActorRefProvider( */ private class Guardian extends Actor { def receive = { - case Terminated(_) ⇒ context.self.stop() + case Terminated(_) ⇒ context.stop(self) case CreateChild(child, name) ⇒ sender ! (try context.actorOf(child, name) catch { case e: Exception ⇒ e }) case CreateRandomNameChild(child) ⇒ sender ! (try context.actorOf(child) catch { case e: Exception ⇒ e }) + case StopChild(child) ⇒ context.stop(child); sender ! "ok" case m ⇒ deadLetters ! DeadLetter(m, sender, self) } } @@ -345,9 +359,10 @@ class LocalActorRefProvider( def receive = { case Terminated(_) ⇒ eventStream.stopDefaultLoggers() - context.self.stop() + context.stop(self) case CreateChild(child, name) ⇒ sender ! (try context.actorOf(child, name) catch { case e: Exception ⇒ e }) case CreateRandomNameChild(child) ⇒ sender ! (try context.actorOf(child) catch { case e: Exception ⇒ e }) + case StopChild(child) ⇒ context.stop(child); sender ! "ok" case m ⇒ deadLetters ! DeadLetter(m, sender, self) } } @@ -508,6 +523,9 @@ class DefaultScheduler(hashedWheelTimer: HashedWheelTimer, log: LoggingAdapter, def schedule(initialDelay: Duration, delay: Duration)(f: ⇒ Unit): Cancellable = new DefaultCancellable(hashedWheelTimer.newTimeout(createContinuousTask(delay, f), initialDelay)) + def schedule(initialDelay: Duration, delay: Duration, runnable: Runnable): Cancellable = + new DefaultCancellable(hashedWheelTimer.newTimeout(createContinuousTask(delay, runnable), initialDelay)) + def scheduleOnce(delay: Duration, runnable: Runnable): Cancellable = new DefaultCancellable(hashedWheelTimer.newTimeout(createSingleTask(runnable), delay)) @@ -565,6 +583,17 @@ class DefaultScheduler(hashedWheelTimer: HashedWheelTimer, log: LoggingAdapter, } } + private def createContinuousTask(delay: Duration, runnable: Runnable): TimerTask = { + new TimerTask { + def run(timeout: org.jboss.netty.akka.util.Timeout) { + dispatcher.dispatchTask(() ⇒ runnable.run()) + try timeout.getTimer.newTimeout(this, delay) catch { + case _: IllegalStateException ⇒ // stop recurring if timer is stopped + } + } + } + } + private def execDirectly(t: HWTimeout): Unit = { try t.getTask.run(t) catch { case e: InterruptedException ⇒ throw e diff --git a/akka-actor/src/main/scala/akka/actor/ActorSystem.scala b/akka-actor/src/main/scala/akka/actor/ActorSystem.scala index af0ec81d7b..be90fb413d 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorSystem.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorSystem.scala @@ -361,6 +361,24 @@ class ActorSystemImpl(val name: String, applicationConfig: Config) extends Actor } } + def stop(actor: ActorRef): Unit = { + implicit val timeout = settings.CreationTimeout + val path = actor.path + if (path.parent == guardian.path) { + (guardian ? StopChild(actor)).get match { + case ex: Exception ⇒ throw ex + case _ ⇒ + } + } else if (path.parent == systemGuardian.path) { + (systemGuardian ? StopChild(actor)).get match { + case ex: Exception ⇒ throw ex + case _ ⇒ + } + } else { + actor.asInstanceOf[InternalActorRef].stop() + } + } + import settings._ // this provides basic logging (to stdout) until .start() is called below @@ -428,13 +446,15 @@ class ActorSystemImpl(val name: String, applicationConfig: Config) extends Actor this } + lazy val locker: Locker = new Locker(scheduler, lookupRoot.path / "locker", deathWatch) + def start() = _start def registerOnTermination[T](code: ⇒ T) { terminationFuture onComplete (_ ⇒ code) } def registerOnTermination(code: Runnable) { terminationFuture onComplete (_ ⇒ code.run) } def stop() { - guardian.stop() + stop(guardian) } /** diff --git a/akka-actor/src/main/scala/akka/actor/FSM.scala b/akka-actor/src/main/scala/akka/actor/FSM.scala index ce7e7f8318..1f8f9cba70 100644 --- a/akka-actor/src/main/scala/akka/actor/FSM.scala +++ b/akka-actor/src/main/scala/akka/actor/FSM.scala @@ -511,7 +511,7 @@ trait FSM[S, D] extends ListenerManagement { case _ ⇒ nextState.replies.reverse foreach { r ⇒ sender ! r } terminate(nextState) - self.stop() + context.stop(self) } } diff --git a/akka-actor/src/main/scala/akka/actor/FaultHandling.scala b/akka-actor/src/main/scala/akka/actor/FaultHandling.scala index 87e65002fe..e4e2ee856a 100644 --- a/akka-actor/src/main/scala/akka/actor/FaultHandling.scala +++ b/akka-actor/src/main/scala/akka/actor/FaultHandling.scala @@ -119,12 +119,12 @@ abstract class FaultHandlingStrategy { /** * This method is called after the child has been removed from the set of children. */ - def handleChildTerminated(child: ActorRef, children: Iterable[ActorRef]): Unit + def handleChildTerminated(context: ActorContext, child: ActorRef, children: Iterable[ActorRef]): Unit /** * This method is called to act on the failure of a child: restart if the flag is true, stop otherwise. */ - def processFailure(restart: Boolean, child: ActorRef, cause: Throwable, stats: ChildRestartStats, children: Iterable[ChildRestartStats]): Unit + def processFailure(context: ActorContext, restart: Boolean, child: ActorRef, cause: Throwable, stats: ChildRestartStats, children: Iterable[ChildRestartStats]): Unit def handleSupervisorFailing(supervisor: ActorRef, children: Iterable[ActorRef]): Unit = { if (children.nonEmpty) @@ -139,12 +139,12 @@ abstract class FaultHandlingStrategy { /** * Returns whether it processed the failure or not */ - def handleFailure(child: ActorRef, cause: Throwable, stats: ChildRestartStats, children: Iterable[ChildRestartStats]): Boolean = { + def handleFailure(context: ActorContext, child: ActorRef, cause: Throwable, stats: ChildRestartStats, children: Iterable[ChildRestartStats]): Boolean = { val action = if (decider.isDefinedAt(cause)) decider(cause) else Escalate action match { case Resume ⇒ child.asInstanceOf[InternalActorRef].resume(); true - case Restart ⇒ processFailure(true, child, cause, stats, children); true - case Stop ⇒ processFailure(false, child, cause, stats, children); true + case Restart ⇒ processFailure(context, true, child, cause, stats, children); true + case Stop ⇒ processFailure(context, false, child, cause, stats, children); true case Escalate ⇒ false } } @@ -192,17 +192,17 @@ case class AllForOneStrategy(decider: FaultHandlingStrategy.Decider, */ val retriesWindow = (maxNrOfRetries, withinTimeRange) - def handleChildTerminated(child: ActorRef, children: Iterable[ActorRef]): Unit = { - children foreach (_.stop()) + def handleChildTerminated(context: ActorContext, child: ActorRef, children: Iterable[ActorRef]): Unit = { + children foreach (context.stop(_)) //TODO optimization to drop all children here already? } - def processFailure(restart: Boolean, child: ActorRef, cause: Throwable, stats: ChildRestartStats, children: Iterable[ChildRestartStats]): Unit = { + def processFailure(context: ActorContext, restart: Boolean, child: ActorRef, cause: Throwable, stats: ChildRestartStats, children: Iterable[ChildRestartStats]): Unit = { if (children.nonEmpty) { if (restart && children.forall(_.requestRestartPermission(retriesWindow))) children.foreach(_.child.asInstanceOf[InternalActorRef].restart(cause)) else - children.foreach(_.child.stop()) + for (c ← children) context.stop(c.child) } } } @@ -249,13 +249,13 @@ case class OneForOneStrategy(decider: FaultHandlingStrategy.Decider, */ val retriesWindow = (maxNrOfRetries, withinTimeRange) - def handleChildTerminated(child: ActorRef, children: Iterable[ActorRef]): Unit = {} + def handleChildTerminated(context: ActorContext, child: ActorRef, children: Iterable[ActorRef]): Unit = {} - def processFailure(restart: Boolean, child: ActorRef, cause: Throwable, stats: ChildRestartStats, children: Iterable[ChildRestartStats]): Unit = { + def processFailure(context: ActorContext, restart: Boolean, child: ActorRef, cause: Throwable, stats: ChildRestartStats, children: Iterable[ChildRestartStats]): Unit = { if (restart && stats.requestRestartPermission(retriesWindow)) child.asInstanceOf[InternalActorRef].restart(cause) else - child.stop() //TODO optimization to drop child here already? + context.stop(child) //TODO optimization to drop child here already? } } diff --git a/akka-actor/src/main/scala/akka/actor/Locker.scala b/akka-actor/src/main/scala/akka/actor/Locker.scala new file mode 100644 index 0000000000..0a719ba671 --- /dev/null +++ b/akka-actor/src/main/scala/akka/actor/Locker.scala @@ -0,0 +1,63 @@ +/** + * Copyright (C) 2009-2011 Typesafe Inc. + */ +package akka.actor + +import akka.dispatch._ +import akka.util.duration._ +import java.util.concurrent.ConcurrentHashMap +import akka.event.DeathWatch + +class Locker(scheduler: Scheduler, val path: ActorPath, val deathWatch: DeathWatch) extends MinimalActorRef { + + class DavyJones extends Runnable { + def run = { + val iter = heap.entrySet.iterator + while (iter.hasNext) { + val soul = iter.next(); + deathWatch.subscribe(Locker.this, soul.getKey) // in case Terminated got lost somewhere + soul.getKey match { + case _: LocalActorRef ⇒ // nothing to do, they know what they signed up for + case nonlocal ⇒ nonlocal.stop() // try again in case it was due to a communications failure + } + } + } + } + + private val heap = new ConcurrentHashMap[InternalActorRef, Long] + + scheduler.schedule(5 seconds, 5 seconds, new DavyJones) + + override def sendSystemMessage(msg: SystemMessage): Unit = this.!(msg) + + override def !(msg: Any)(implicit sender: ActorRef = null): Unit = msg match { + case Terminated(soul) ⇒ heap.remove(soul) + case ChildTerminated(soul) ⇒ heap.remove(soul) + case soul: InternalActorRef ⇒ + heap.put(soul, 0l) // wanted to put System.nanoTime and do something intelligent, but forgot what that was + deathWatch.subscribe(this, soul) + // now re-bind the soul so that it does not drown its parent + soul match { + case local: LocalActorRef ⇒ + val cell = local.underlying + rebind(cell, cell.getClass) + case _ ⇒ + } + case _ ⇒ // ignore + } + + @scala.annotation.tailrec + final private def rebind(cell: ActorCell, clazz: Class[_]): Unit = { + try { + val heart = clazz.getDeclaredField("parent") + heart.setAccessible(true) + heart.set(cell, this) + return + } catch { + case _: NoSuchFieldException ⇒ + } + val sc = clazz.getSuperclass + if (sc != null) rebind(cell, sc) + } + +} \ No newline at end of file diff --git a/akka-actor/src/main/scala/akka/actor/Scheduler.scala b/akka-actor/src/main/scala/akka/actor/Scheduler.scala index 7075ee0a8a..e1d502f5b4 100644 --- a/akka-actor/src/main/scala/akka/actor/Scheduler.scala +++ b/akka-actor/src/main/scala/akka/actor/Scheduler.scala @@ -42,6 +42,15 @@ trait Scheduler { */ def schedule(initialDelay: Duration, frequency: Duration)(f: ⇒ Unit): Cancellable + /** + * Schedules a function to be run repeatedly with an initial delay and a frequency. + * E.g. if you would like the function to be run after 2 seconds and thereafter every 100ms you would set + * delay = Duration(2, TimeUnit.SECONDS) and frequency = Duration(100, TimeUnit.MILLISECONDS) + * + * Java API + */ + def schedule(initialDelay: Duration, frequency: Duration, runnable: Runnable): Cancellable + /** * Schedules a Runnable to be run once with a delay, i.e. a time period that has to pass before the runnable is executed. * diff --git a/akka-actor/src/main/scala/akka/actor/TypedActor.scala b/akka-actor/src/main/scala/akka/actor/TypedActor.scala index 711a4ac235..e941c05bac 100644 --- a/akka-actor/src/main/scala/akka/actor/TypedActor.scala +++ b/akka-actor/src/main/scala/akka/actor/TypedActor.scala @@ -24,7 +24,7 @@ trait TypedActorFactory { */ def stop(proxy: AnyRef): Boolean = getActorRefFor(proxy) match { case null ⇒ false - case ref ⇒ ref.stop; true + case ref ⇒ ref.asInstanceOf[InternalActorRef].stop; true } /** diff --git a/akka-actor/src/main/scala/akka/event/Logging.scala b/akka-actor/src/main/scala/akka/event/Logging.scala index 0425b4c661..bde3bd725a 100644 --- a/akka-actor/src/main/scala/akka/event/Logging.scala +++ b/akka-actor/src/main/scala/akka/event/Logging.scala @@ -137,7 +137,10 @@ trait LoggingBus extends ActorEventBus { } { // this is very necessary, else you get infinite loop with DeadLetter unsubscribe(logger) - logger.stop() + logger match { + case ref: InternalActorRef ⇒ ref.stop() + case _ ⇒ + } } publish(Debug(simpleName(this), "all default loggers stopped")) } diff --git a/akka-actor/src/main/scala/akka/routing/ConnectionManager.scala b/akka-actor/src/main/scala/akka/routing/ConnectionManager.scala index a417c75bac..135546ad2b 100644 --- a/akka-actor/src/main/scala/akka/routing/ConnectionManager.scala +++ b/akka-actor/src/main/scala/akka/routing/ConnectionManager.scala @@ -66,49 +66,3 @@ trait ConnectionManager { */ def remove(deadRef: ActorRef) } - -/** - * Manages local connections for a router, e.g. local actors. - */ -class LocalConnectionManager(initialConnections: Iterable[ActorRef]) extends ConnectionManager { - - def this(iterable: java.lang.Iterable[ActorRef]) { - this(JavaConverters.iterableAsScalaIterableConverter(iterable).asScala) - } - - case class State(version: Long, connections: Iterable[ActorRef]) extends VersionedIterable[ActorRef] { - def iterable = connections - } - - private val state: AtomicReference[State] = new AtomicReference[State](newState()) - - private def newState() = State(Long.MinValue, initialConnections) - - def version: Long = state.get.version - - def size: Int = state.get.connections.size - - def isEmpty: Boolean = state.get.connections.isEmpty - - def connections = state.get - - def shutdown() { - state.get.connections foreach (_.stop()) - } - - @tailrec - final def remove(ref: ActorRef) = { - val oldState = state.get - - //remote the ref from the connections. - var newList = oldState.connections.filter(currentActorRef ⇒ currentActorRef ne ref) - - if (newList.size != oldState.connections.size) { - //one or more occurrences of the actorRef were removed, so we need to update the state. - - val newState = State(oldState.version + 1, newList) - //if we are not able to update the state, we just try again. - if (!state.compareAndSet(oldState, newState)) remove(ref) - } - } -} diff --git a/akka-actor/src/main/scala/akka/routing/Routing.scala b/akka-actor/src/main/scala/akka/routing/Routing.scala index 7df6a388cb..a321bb8983 100644 --- a/akka-actor/src/main/scala/akka/routing/Routing.scala +++ b/akka-actor/src/main/scala/akka/routing/Routing.scala @@ -132,7 +132,7 @@ trait Router extends Actor { case Terminated(child) ⇒ ref._routees = ref._routees filterNot (_ == child) - if (ref.routees.isEmpty) self.stop() + if (ref.routees.isEmpty) context.stop(self) }: Receive) orElse routerReceive diff --git a/akka-actor/src/main/scala/akka/util/ListenerManagement.scala b/akka-actor/src/main/scala/akka/util/ListenerManagement.scala index 3efbcbc902..1d6df328d5 100644 --- a/akka-actor/src/main/scala/akka/util/ListenerManagement.scala +++ b/akka-actor/src/main/scala/akka/util/ListenerManagement.scala @@ -4,13 +4,15 @@ package akka.util +import akka.actor.Actor + import java.util.concurrent.ConcurrentSkipListSet import akka.actor.{ ActorInitializationException, ActorRef } /** * A manager for listener actors. Intended for mixin by observables. */ -trait ListenerManagement { +trait ListenerManagement { this: Actor ⇒ private val listeners = new ConcurrentSkipListSet[ActorRef] @@ -33,7 +35,7 @@ trait ListenerManagement { */ def removeListener(listener: ActorRef) { listeners remove listener - if (manageLifeCycleOfListeners) listener.stop() + if (manageLifeCycleOfListeners) context.stop(listener) } /* diff --git a/akka-docs/common/code/SchedulerDocSpec.scala b/akka-docs/common/code/SchedulerDocSpec.scala index ac101e396d..5c4635b864 100644 --- a/akka-docs/common/code/SchedulerDocSpec.scala +++ b/akka-docs/common/code/SchedulerDocSpec.scala @@ -62,6 +62,6 @@ class SchedulerDocSpec extends AkkaSpec(Map("akka.loglevel" -> "INFO")) { //This cancels further Ticks to be sent cancellable.cancel() //#schedule-recurring - tickActor.stop() + system.stop(tickActor) } } diff --git a/akka-docs/scala/code/ActorDocSpec.scala b/akka-docs/scala/code/ActorDocSpec.scala index 5592572443..a08aa93593 100644 --- a/akka-docs/scala/code/ActorDocSpec.scala +++ b/akka-docs/scala/code/ActorDocSpec.scala @@ -40,7 +40,7 @@ class FirstActor extends Actor { case DoIt(msg) ⇒ val replyMsg = doSomeDangerousWork(msg) sender ! replyMsg - self.stop() + context.stop(self) } def doSomeDangerousWork(msg: ImmutableMessage): String = { "done" } })) ! m @@ -143,7 +143,7 @@ class ActorDocSpec extends AkkaSpec(Map("akka.loglevel" -> "INFO")) { //#import-context val first = system.actorOf(Props(new FirstActor)) - first.stop() + system.stop(first) } @@ -169,7 +169,7 @@ class ActorDocSpec extends AkkaSpec(Map("akka.loglevel" -> "INFO")) { system.eventStream.unsubscribe(testActor) system.eventStream.publish(TestEvent.UnMute(filter)) - myActor.stop() + system.stop(myActor) } "creating actor with constructor" in { @@ -182,7 +182,7 @@ class ActorDocSpec extends AkkaSpec(Map("akka.loglevel" -> "INFO")) { val myActor = system.actorOf(Props(new MyActor("..."))) //#creating-constructor - myActor.stop() + system.stop(myActor) } "creating actor with Props" in { @@ -192,7 +192,7 @@ class ActorDocSpec extends AkkaSpec(Map("akka.loglevel" -> "INFO")) { val myActor = system.actorOf(Props[MyActor].withDispatcher(dispatcher), name = "myactor") //#creating-props - myActor.stop() + system.stop(myActor) } "using ask" in { @@ -214,7 +214,7 @@ class ActorDocSpec extends AkkaSpec(Map("akka.loglevel" -> "INFO")) { val result: Option[Int] = for (x ← (myActor ? 3).as[Int]) yield { 2 * x } //#using-ask - myActor.stop() + system.stop(myActor) } "using receiveTimeout" in { diff --git a/akka-remote/src/main/scala/akka/remote/RemoteConnectionManager.scala b/akka-remote/src/main/scala/akka/remote/RemoteConnectionManager.scala index 9f623ff853..03aa5ddc62 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteConnectionManager.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteConnectionManager.scala @@ -57,7 +57,7 @@ class RemoteConnectionManager( def isEmpty: Boolean = connections.connections.isEmpty def shutdown() { - state.get.iterable foreach (_.stop()) // shut down all remote connections + state.get.iterable foreach (system.stop(_)) // shut down all remote connections } @tailrec @@ -136,7 +136,7 @@ class RemoteConnectionManager( //if we are not able to update the state, we just try again. if (!state.compareAndSet(oldState, newState)) { // we failed, need compensating action - newConnection.stop() // stop the new connection actor and try again + system.stop(newConnection) // stop the new connection actor and try again putIfAbsent(address, newConnectionFactory) // recur } else { // we succeeded diff --git a/akka-remote/src/multi-jvm/scala/akka/remote/random_routed/RandomRoutedRemoteActorMultiJvmSpec.scala b/akka-remote/src/multi-jvm/scala/akka/remote/random_routed/RandomRoutedRemoteActorMultiJvmSpec.scala index 3efc3c5ce5..87a3177b3b 100644 --- a/akka-remote/src/multi-jvm/scala/akka/remote/random_routed/RandomRoutedRemoteActorMultiJvmSpec.scala +++ b/akka-remote/src/multi-jvm/scala/akka/remote/random_routed/RandomRoutedRemoteActorMultiJvmSpec.scala @@ -10,7 +10,7 @@ object RandomRoutedRemoteActorMultiJvmSpec { class SomeActor extends Actor with Serializable { def receive = { case "hit" ⇒ sender ! context.system.nodename - case "end" ⇒ self.stop() + case "end" ⇒ context.stop(self) } } } diff --git a/akka-remote/src/multi-jvm/scala/akka/remote/round_robin_routed/RoundRobinRoutedRemoteActorMultiJvmSpec.scala b/akka-remote/src/multi-jvm/scala/akka/remote/round_robin_routed/RoundRobinRoutedRemoteActorMultiJvmSpec.scala index 786f278a7e..29e57d4209 100644 --- a/akka-remote/src/multi-jvm/scala/akka/remote/round_robin_routed/RoundRobinRoutedRemoteActorMultiJvmSpec.scala +++ b/akka-remote/src/multi-jvm/scala/akka/remote/round_robin_routed/RoundRobinRoutedRemoteActorMultiJvmSpec.scala @@ -10,7 +10,7 @@ object RoundRobinRoutedRemoteActorMultiJvmSpec { class SomeActor extends Actor with Serializable { def receive = { case "hit" ⇒ sender ! context.system.nodename - case "end" ⇒ self.stop() + case "end" ⇒ context.stop(self) } } } diff --git a/akka-remote/src/multi-jvm/scala/akka/remote/scatter_gather_routed/ScatterGatherRoutedRemoteActorMultiJvmSpec.scala b/akka-remote/src/multi-jvm/scala/akka/remote/scatter_gather_routed/ScatterGatherRoutedRemoteActorMultiJvmSpec.scala index 10d6e22f58..c985bf2152 100644 --- a/akka-remote/src/multi-jvm/scala/akka/remote/scatter_gather_routed/ScatterGatherRoutedRemoteActorMultiJvmSpec.scala +++ b/akka-remote/src/multi-jvm/scala/akka/remote/scatter_gather_routed/ScatterGatherRoutedRemoteActorMultiJvmSpec.scala @@ -11,7 +11,7 @@ object ScatterGatherRoutedRemoteActorMultiJvmSpec { class SomeActor extends Actor with Serializable { def receive = { case "hit" ⇒ sender ! context.system.nodename - case "end" ⇒ self.stop() + case "end" ⇒ context.stop(self) } } } diff --git a/akka-remote/src/test/scala/akka/remote/RemoteCommunicationSpec.scala b/akka-remote/src/test/scala/akka/remote/RemoteCommunicationSpec.scala index 62de045fb5..199d070ddb 100644 --- a/akka-remote/src/test/scala/akka/remote/RemoteCommunicationSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/RemoteCommunicationSpec.scala @@ -103,7 +103,7 @@ akka { expectMsg("preRestart") r ! 42 expectMsg(42) - r.stop() + system.stop(r) expectMsg("postStop") } diff --git a/akka-stm/src/main/scala/akka/agent/Agent.scala b/akka-stm/src/main/scala/akka/agent/Agent.scala index cfe618ce47..3fd6c02f7d 100644 --- a/akka-stm/src/main/scala/akka/agent/Agent.scala +++ b/akka-stm/src/main/scala/akka/agent/Agent.scala @@ -302,8 +302,8 @@ class ThreadBasedAgentUpdater[T](agent: Agent[T]) extends Actor { sender.tell(atomic(txFactory) { agent.ref alter update.function.asInstanceOf[T ⇒ T] }) } finally { agent.resume() - self.stop() + context.stop(self) } - case _ ⇒ self.stop() + case _ ⇒ context.stop(self) } } diff --git a/akka-stm/src/test/java/akka/stm/example/EitherOrElseExample.java b/akka-stm/src/test/java/akka/stm/example/EitherOrElseExample.java index a8f3fd475c..61d172e82f 100644 --- a/akka-stm/src/test/java/akka/stm/example/EitherOrElseExample.java +++ b/akka-stm/src/test/java/akka/stm/example/EitherOrElseExample.java @@ -24,6 +24,6 @@ public class EitherOrElseExample { } }.execute(); - brancher.stop(); + application.stop(brancher); } } diff --git a/akka-stm/src/test/java/akka/stm/example/RetryExample.java b/akka-stm/src/test/java/akka/stm/example/RetryExample.java index f15850d232..6a1ab1993a 100644 --- a/akka-stm/src/test/java/akka/stm/example/RetryExample.java +++ b/akka-stm/src/test/java/akka/stm/example/RetryExample.java @@ -46,7 +46,7 @@ public class RetryExample { System.out.println("Account 2: " + acc2); // Account 2: 600.0 - transferer.stop(); + application.stop(transferer); application.stop(); } diff --git a/akka-stm/src/test/java/akka/transactor/example/UntypedCoordinatedExample.java b/akka-stm/src/test/java/akka/transactor/example/UntypedCoordinatedExample.java index 9baf0f1485..64951e1533 100644 --- a/akka-stm/src/test/java/akka/transactor/example/UntypedCoordinatedExample.java +++ b/akka-stm/src/test/java/akka/transactor/example/UntypedCoordinatedExample.java @@ -40,8 +40,8 @@ public class UntypedCoordinatedExample { } } - counter1.stop(); - counter2.stop(); + application.stop(counter1); + application.stop(counter2); application.stop(); } diff --git a/akka-stm/src/test/java/akka/transactor/example/UntypedTransactorExample.java b/akka-stm/src/test/java/akka/transactor/example/UntypedTransactorExample.java index 55e28f872f..24a36b7ea0 100644 --- a/akka-stm/src/test/java/akka/transactor/example/UntypedTransactorExample.java +++ b/akka-stm/src/test/java/akka/transactor/example/UntypedTransactorExample.java @@ -39,8 +39,8 @@ public class UntypedTransactorExample { } } - counter1.stop(); - counter2.stop(); + application.stop(counter1); + application.stop(counter2); application.stop(); } diff --git a/akka-stm/src/test/scala/akka/transactor/test/CoordinatedIncrementSpec.scala b/akka-stm/src/test/scala/akka/transactor/test/CoordinatedIncrementSpec.scala index eda336b78e..c2298c9229 100644 --- a/akka-stm/src/test/scala/akka/transactor/test/CoordinatedIncrementSpec.scala +++ b/akka-stm/src/test/scala/akka/transactor/test/CoordinatedIncrementSpec.scala @@ -74,8 +74,8 @@ class CoordinatedIncrementSpec extends AkkaSpec with BeforeAndAfterAll { for (counter ← counters) { (counter ? GetCount).as[Int].get must be === 1 } - counters foreach (_.stop()) - failer.stop() + counters foreach (system.stop(_)) + system.stop(failer) } "increment no counters with a failing transaction" in { @@ -91,8 +91,8 @@ class CoordinatedIncrementSpec extends AkkaSpec with BeforeAndAfterAll { for (counter ← counters) { (counter ? GetCount).as[Int].get must be === 0 } - counters foreach (_.stop()) - failer.stop() + counters foreach (system.stop(_)) + system.stop(failer) } } } diff --git a/akka-stm/src/test/scala/akka/transactor/test/FickleFriendsSpec.scala b/akka-stm/src/test/scala/akka/transactor/test/FickleFriendsSpec.scala index a74490b410..42fe5dbc5a 100644 --- a/akka-stm/src/test/scala/akka/transactor/test/FickleFriendsSpec.scala +++ b/akka-stm/src/test/scala/akka/transactor/test/FickleFriendsSpec.scala @@ -123,8 +123,8 @@ class FickleFriendsSpec extends AkkaSpec with BeforeAndAfterAll { for (counter ← counters) { (counter ? GetCount).as[Int].get must be === 1 } - counters foreach (_.stop()) - coordinator.stop() + counters foreach (system.stop(_)) + system.stop(coordinator) } } } diff --git a/akka-stm/src/test/scala/akka/transactor/test/TransactorSpec.scala b/akka-stm/src/test/scala/akka/transactor/test/TransactorSpec.scala index 43ee399196..d19abee2b0 100644 --- a/akka-stm/src/test/scala/akka/transactor/test/TransactorSpec.scala +++ b/akka-stm/src/test/scala/akka/transactor/test/TransactorSpec.scala @@ -97,8 +97,8 @@ class TransactorSpec extends AkkaSpec { for (counter ← counters) { (counter ? GetCount).as[Int].get must be === 1 } - counters foreach (_.stop()) - failer.stop() + counters foreach (system.stop(_)) + system.stop(failer) } "increment no counters with a failing transaction" in { @@ -114,8 +114,8 @@ class TransactorSpec extends AkkaSpec { for (counter ← counters) { (counter ? GetCount).as[Int].get must be === 0 } - counters foreach (_.stop()) - failer.stop() + counters foreach (system.stop(_)) + system.stop(failer) } } } @@ -129,7 +129,7 @@ class TransactorSpec extends AkkaSpec { latch.await val value = atomic { ref.get } value must be === 5 - transactor.stop() + system.stop(transactor) } } } diff --git a/akka-testkit/src/test/scala/akka/testkit/AkkaSpec.scala b/akka-testkit/src/test/scala/akka/testkit/AkkaSpec.scala index 1fdbaee7d7..39006bec5a 100644 --- a/akka-testkit/src/test/scala/akka/testkit/AkkaSpec.scala +++ b/akka-testkit/src/test/scala/akka/testkit/AkkaSpec.scala @@ -76,7 +76,7 @@ abstract class AkkaSpec(_system: ActorSystem) protected def atTermination() {} def spawn(body: ⇒ Unit)(implicit dispatcher: MessageDispatcher) { - system.actorOf(Props(ctx ⇒ { case "go" ⇒ try body finally ctx.self.stop() }).withDispatcher(dispatcher)) ! "go" + system.actorOf(Props(ctx ⇒ { case "go" ⇒ try body finally ctx.stop(ctx.self) }).withDispatcher(dispatcher)) ! "go" } } @@ -120,7 +120,7 @@ class AkkaSpecSpec extends WordSpec with MustMatchers { implicit val davyJones = otherSystem.actorOf(Props(new Actor { def receive = { case m: DeadLetter ⇒ locker :+= m - case "Die!" ⇒ sender ! "finally gone"; self.stop() + case "Die!" ⇒ sender ! "finally gone"; context.stop(self) } }), "davyJones") diff --git a/akka-testkit/src/test/scala/akka/testkit/TestActorRefSpec.scala b/akka-testkit/src/test/scala/akka/testkit/TestActorRefSpec.scala index 462ee6ffc6..e9ecf69cc6 100644 --- a/akka-testkit/src/test/scala/akka/testkit/TestActorRefSpec.scala +++ b/akka-testkit/src/test/scala/akka/testkit/TestActorRefSpec.scala @@ -56,7 +56,7 @@ object TestActorRefSpec { class WorkerActor() extends TActor { def receiveT = { - case "work" ⇒ sender ! "workDone"; self.stop() + case "work" ⇒ sender ! "workDone"; context.stop(self) case replyTo: Promise[Any] ⇒ replyTo.completeWithResult("complexReply") case replyTo: ActorRef ⇒ replyTo ! "complexReply" } diff --git a/akka-tutorials/akka-tutorial-first/src/main/java/akka/tutorial/first/java/Pi.java b/akka-tutorials/akka-tutorial-first/src/main/java/akka/tutorial/first/java/Pi.java index d890bfaf30..a2d280af18 100644 --- a/akka-tutorials/akka-tutorial-first/src/main/java/akka/tutorial/first/java/Pi.java +++ b/akka-tutorials/akka-tutorial-first/src/main/java/akka/tutorial/first/java/Pi.java @@ -124,7 +124,7 @@ public class Pi { Result result = (Result) message; pi += result.getValue(); nrOfResults += 1; - if (nrOfResults == nrOfMessages) getSelf().stop(); + if (nrOfResults == nrOfMessages) getContext().stop(getSelf()); } else throw new IllegalArgumentException("Unknown message [" + message + "]"); } diff --git a/akka-tutorials/akka-tutorial-first/src/main/scala/Pi.scala b/akka-tutorials/akka-tutorial-first/src/main/scala/Pi.scala index 242bababca..8b9e3c2dde 100644 --- a/akka-tutorials/akka-tutorial-first/src/main/scala/Pi.scala +++ b/akka-tutorials/akka-tutorial-first/src/main/scala/Pi.scala @@ -67,7 +67,7 @@ object Pi extends App { nrOfResults += 1 // Stop this actor and all its supervised children - if (nrOfResults == nrOfMessages) self.stop() + if (nrOfResults == nrOfMessages) context.stop(self) } override def preStart() { From 9af58366f6582ab77bd2792155674a634c40874b Mon Sep 17 00:00:00 2001 From: Roland Date: Wed, 14 Dec 2011 00:32:31 +0100 Subject: [PATCH 2/6] change default behavior to kill all children during preRestart - adapted supervision doc accordingly - had to override preRestart in two supervision tests, which is expected --- .../src/test/scala/akka/actor/Supervisor.scala | 1 + .../akka/actor/SupervisorHierarchySpec.scala | 1 + .../src/main/scala/akka/actor/Actor.scala | 7 +++++-- akka-docs/general/supervision.rst | 18 +++++++++--------- 4 files changed, 16 insertions(+), 11 deletions(-) diff --git a/akka-actor-tests/src/test/scala/akka/actor/Supervisor.scala b/akka-actor-tests/src/test/scala/akka/actor/Supervisor.scala index 6c438f1776..a956c2d090 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/Supervisor.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/Supervisor.scala @@ -7,4 +7,5 @@ class Supervisor extends Actor { def receive = { case x: Props ⇒ sender ! context.actorOf(x) } + override def preRestart(cause: Throwable, msg: Option[Any]) {} } diff --git a/akka-actor-tests/src/test/scala/akka/actor/SupervisorHierarchySpec.scala b/akka-actor-tests/src/test/scala/akka/actor/SupervisorHierarchySpec.scala index dc45d012fd..3b9ebbad73 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/SupervisorHierarchySpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/SupervisorHierarchySpec.scala @@ -15,6 +15,7 @@ object SupervisorHierarchySpec { protected def receive = { case p: Props ⇒ sender ! context.actorOf(p) } + override def preRestart(cause: Throwable, msg: Option[Any]) {} override def postRestart(reason: Throwable) = { countDown.countDown() } diff --git a/akka-actor/src/main/scala/akka/actor/Actor.scala b/akka-actor/src/main/scala/akka/actor/Actor.scala index ffb941408a..4c8877463c 100644 --- a/akka-actor/src/main/scala/akka/actor/Actor.scala +++ b/akka-actor/src/main/scala/akka/actor/Actor.scala @@ -262,9 +262,12 @@ trait Actor { *

* Is called on a crashed Actor right BEFORE it is restarted to allow clean * up of resources before Actor is terminated. - * By default it calls postStop() + * By default it disposes of all children calls postStop(). */ - def preRestart(reason: Throwable, message: Option[Any]) { postStop() } + def preRestart(reason: Throwable, message: Option[Any]) { + context.children foreach (context.stop(_)) + postStop() + } /** * User overridable callback. diff --git a/akka-docs/general/supervision.rst b/akka-docs/general/supervision.rst index 0867d931f8..74c95abfc5 100644 --- a/akka-docs/general/supervision.rst +++ b/akka-docs/general/supervision.rst @@ -25,7 +25,9 @@ which explains the existence of the fourth choice (as a supervisor also is subordinate to another supervisor higher up) and has implications on the first three: resuming an actor resumes all its subordinates, restarting an actor entails restarting all its subordinates, similarly stopping an actor will also -stop all its subordinates. +stop all its subordinates. It should be noted that the default behavior of an +actor is to stop all its children before restarting, but this can be overridden +using the :meth:`preRestart` hook. Each supervisor is configured with a function translating all possible failure causes (i.e. exceptions) into one of the four choices given above; notably, @@ -69,14 +71,12 @@ that the restart is not visible outside of the actor itself with the notable exception that the message during which the failure occurred is not re-processed. -Restarting an actor in this way recursively restarts all its children in the -same fashion, whereby all parent–child relationships are kept intact. If this -is not the right approach for certain sub-trees of the supervision hierarchy, -you should choose to stop the failed actor instead, which will terminate all -its children recursively, after which that part of the system may be recreated -from scratch. The second part of this action may be implemented using the -lifecycle monitoring described next or using lifecycle callbacks as described -in :class:`Actor`. +Restarting an actor in this way recursively terminates all its children. If +this is not the right approach for certain sub-trees of the supervision +hierarchy, you may choose to retain the children, in which case they will be +recursively restarted in the same fashion as the failed parent (with the same +default to terminate children, which must be overridden on a per-actor basis, +see :class:`Actor` for details). What Lifecycle Monitoring Means ------------------------------- From 5eedbdd69ffdd9d743caba75ee9de25741267cce Mon Sep 17 00:00:00 2001 From: Roland Date: Wed, 14 Dec 2011 01:06:20 +0100 Subject: [PATCH 3/6] rename ActorSystem.stop() to .shutdown() --- .../src/test/java/akka/actor/JavaAPI.java | 2 +- .../test/java/akka/actor/JavaExtension.java | 2 +- .../java/akka/dispatch/JavaFutureTests.java | 2 +- .../src/test/scala/akka/actor/Bench.scala | 2 +- .../test/scala/akka/actor/DeployerSpec.scala | 2 +- .../test/scala/akka/actor/FSMActorSpec.scala | 2 +- .../scala/akka/event/LoggingReceiveSpec.scala | 6 +++--- .../akka/serialization/SerializeSpec.scala | 2 +- .../main/scala/akka/actor/ActorSystem.scala | 4 ++-- .../code/akka/docs/config/ConfigDocSpec.scala | 2 +- .../akka/docs/actor/UntypedActorTestBase.java | 18 +++++++++--------- .../akka/docs/event/LoggingDocTestBase.java | 2 +- .../mailbox/DurableMailboxDocTestBase.java | 2 +- .../mailbox/filequeue/tools/QDumper.scala | 2 +- .../akka/remote/RemoteCommunicationSpec.scala | 4 ++-- .../scala/akka/remote/RemoteRouterSpec.scala | 4 ++-- .../src/main/scala/sample/hello/Main.scala | 2 +- .../java/akka/stm/example/RetryExample.java | 2 +- .../example/UntypedCoordinatedExample.java | 2 +- .../example/UntypedTransactorExample.java | 2 +- .../test/UntypedCoordinatedIncrementTest.java | 4 ++-- .../transactor/test/UntypedTransactorTest.java | 2 +- .../src/test/scala/akka/testkit/AkkaSpec.scala | 10 +++++----- .../main/java/akka/tutorial/first/java/Pi.java | 2 +- .../src/main/scala/Pi.scala | 2 +- .../src/test/scala/WorkerSpec.scala | 2 +- 26 files changed, 44 insertions(+), 44 deletions(-) diff --git a/akka-actor-tests/src/test/java/akka/actor/JavaAPI.java b/akka-actor-tests/src/test/java/akka/actor/JavaAPI.java index 9678cbc76d..7070e8bf67 100644 --- a/akka-actor-tests/src/test/java/akka/actor/JavaAPI.java +++ b/akka-actor-tests/src/test/java/akka/actor/JavaAPI.java @@ -20,7 +20,7 @@ public class JavaAPI { @AfterClass public static void afterAll() { - system.stop(); + system.shutdown(); system = null; } diff --git a/akka-actor-tests/src/test/java/akka/actor/JavaExtension.java b/akka-actor-tests/src/test/java/akka/actor/JavaExtension.java index 0a994b93d6..e7597309c4 100644 --- a/akka-actor-tests/src/test/java/akka/actor/JavaExtension.java +++ b/akka-actor-tests/src/test/java/akka/actor/JavaExtension.java @@ -58,7 +58,7 @@ public class JavaExtension { @AfterClass public static void afterAll() { - system.stop(); + system.shutdown(); system = null; } diff --git a/akka-actor-tests/src/test/java/akka/dispatch/JavaFutureTests.java b/akka-actor-tests/src/test/java/akka/dispatch/JavaFutureTests.java index d534d87103..3f56ae2781 100644 --- a/akka-actor-tests/src/test/java/akka/dispatch/JavaFutureTests.java +++ b/akka-actor-tests/src/test/java/akka/dispatch/JavaFutureTests.java @@ -34,7 +34,7 @@ public class JavaFutureTests { @AfterClass public static void afterAll() { - system.stop(); + system.shutdown(); system = null; } diff --git a/akka-actor-tests/src/test/scala/akka/actor/Bench.scala b/akka-actor-tests/src/test/scala/akka/actor/Bench.scala index 98f24b0bc5..2432cc113d 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/Bench.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/Bench.scala @@ -111,7 +111,7 @@ object Chameneos { val actor = system.actorOf(Props(new Mall(1000000, 4))) Thread.sleep(10000) println("Elapsed: " + (end - start)) - system.stop() + system.shutdown() } def main(args: Array[String]): Unit = run diff --git a/akka-actor-tests/src/test/scala/akka/actor/DeployerSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/DeployerSpec.scala index e38ea1c3d4..83837012aa 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/DeployerSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/DeployerSpec.scala @@ -95,7 +95,7 @@ class DeployerSpec extends AkkaSpec(DeployerSpec.deployerConf) { } """, ConfigParseOptions.defaults).withFallback(AkkaSpec.testConf) - ActorSystem("invalid", invalidDeployerConf).stop() + ActorSystem("invalid", invalidDeployerConf).shutdown() } } diff --git a/akka-actor-tests/src/test/scala/akka/actor/FSMActorSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/FSMActorSpec.scala index 89156ff83f..a856c045c1 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/FSMActorSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/FSMActorSpec.scala @@ -233,7 +233,7 @@ class FSMActorSpec extends AkkaSpec(Map("akka.actor.debug.fsm" -> true)) with Im } } } finally { - fsmEventSystem.stop() + fsmEventSystem.shutdown() } } diff --git a/akka-actor-tests/src/test/scala/akka/event/LoggingReceiveSpec.scala b/akka-actor-tests/src/test/scala/akka/event/LoggingReceiveSpec.scala index e38cc24413..77a815f455 100644 --- a/akka-actor-tests/src/test/scala/akka/event/LoggingReceiveSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/event/LoggingReceiveSpec.scala @@ -52,9 +52,9 @@ class LoggingReceiveSpec extends WordSpec with BeforeAndAfterEach with BeforeAnd } override def afterAll { - appLogging.stop() - appAuto.stop() - appLifecycle.stop() + appLogging.shutdown() + appAuto.shutdown() + appLifecycle.shutdown() } "A LoggingReceive" must { diff --git a/akka-actor-tests/src/test/scala/akka/serialization/SerializeSpec.scala b/akka-actor-tests/src/test/scala/akka/serialization/SerializeSpec.scala index 8617ded486..cceb608452 100644 --- a/akka-actor-tests/src/test/scala/akka/serialization/SerializeSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/serialization/SerializeSpec.scala @@ -124,7 +124,7 @@ class SerializeSpec extends AkkaSpec(SerializeSpec.serializationConf) { (deadLetters eq a.deadLetters) must be(true) } } finally { - a.stop() + a.shutdown() } } } diff --git a/akka-actor/src/main/scala/akka/actor/ActorSystem.scala b/akka-actor/src/main/scala/akka/actor/ActorSystem.scala index be90fb413d..431dc320cc 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorSystem.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorSystem.scala @@ -300,7 +300,7 @@ abstract class ActorSystem extends ActorRefFactory { * (below which the logging actors reside) and the execute all registered * termination handlers (see [[ActorSystem.registerOnTermination]]). */ - def stop() + def shutdown() /** * Registers the provided extension and creates its payload, if this extension isn't already registered @@ -453,7 +453,7 @@ class ActorSystemImpl(val name: String, applicationConfig: Config) extends Actor def registerOnTermination[T](code: ⇒ T) { terminationFuture onComplete (_ ⇒ code) } def registerOnTermination(code: Runnable) { terminationFuture onComplete (_ ⇒ code.run) } - def stop() { + def shutdown() { stop(guardian) } diff --git a/akka-docs/general/code/akka/docs/config/ConfigDocSpec.scala b/akka-docs/general/code/akka/docs/config/ConfigDocSpec.scala index 557c33ff53..b520a3b45d 100644 --- a/akka-docs/general/code/akka/docs/config/ConfigDocSpec.scala +++ b/akka-docs/general/code/akka/docs/config/ConfigDocSpec.scala @@ -26,7 +26,7 @@ class ConfigDocSpec extends WordSpec with MustMatchers { val system = ActorSystem("MySystem", ConfigFactory.load(customConf)) //#custom-config - system.stop() + system.shutdown() } diff --git a/akka-docs/java/code/akka/docs/actor/UntypedActorTestBase.java b/akka-docs/java/code/akka/docs/actor/UntypedActorTestBase.java index c2a877d962..922f4ed0e6 100644 --- a/akka-docs/java/code/akka/docs/actor/UntypedActorTestBase.java +++ b/akka-docs/java/code/akka/docs/actor/UntypedActorTestBase.java @@ -42,7 +42,7 @@ public class UntypedActorTestBase { ActorRef myActor = system.actorOf(new Props(MyUntypedActor.class)); //#system-actorOf myActor.tell("test"); - system.stop(); + system.shutdown(); } @Test @@ -52,7 +52,7 @@ public class UntypedActorTestBase { ActorRef myActor = system.actorOf(new Props(MyUntypedActor.class)); //#context-actorOf myActor.tell("test"); - system.stop(); + system.shutdown(); } @Test @@ -67,7 +67,7 @@ public class UntypedActorTestBase { })); //#creating-constructor myActor.tell("test"); - system.stop(); + system.shutdown(); } @Test @@ -80,7 +80,7 @@ public class UntypedActorTestBase { "myactor"); //#creating-props myActor.tell("test"); - system.stop(); + system.shutdown(); } @Test @@ -105,7 +105,7 @@ public class UntypedActorTestBase { } } //#using-ask - system.stop(); + system.shutdown(); } @Test @@ -113,7 +113,7 @@ public class UntypedActorTestBase { ActorSystem system = ActorSystem.create("MySystem"); ActorRef myActor = system.actorOf(new Props(MyReceivedTimeoutUntypedActor.class)); myActor.tell("Hello"); - system.stop(); + system.shutdown(); } @Test @@ -123,7 +123,7 @@ public class UntypedActorTestBase { //#poison-pill myActor.tell(poisonPill()); //#poison-pill - system.stop(); + system.shutdown(); } @Test @@ -133,7 +133,7 @@ public class UntypedActorTestBase { //#kill victim.tell(kill()); //#kill - system.stop(); + system.shutdown(); } @Test @@ -147,7 +147,7 @@ public class UntypedActorTestBase { myActor.tell("foo"); myActor.tell("bar"); myActor.tell("bar"); - system.stop(); + system.shutdown(); } public static class MyActor extends UntypedActor { diff --git a/akka-docs/java/code/akka/docs/event/LoggingDocTestBase.java b/akka-docs/java/code/akka/docs/event/LoggingDocTestBase.java index 3241623e95..ba689e2fa1 100644 --- a/akka-docs/java/code/akka/docs/event/LoggingDocTestBase.java +++ b/akka-docs/java/code/akka/docs/event/LoggingDocTestBase.java @@ -37,7 +37,7 @@ public class LoggingDocTestBase { } })); myActor.tell("test"); - system.stop(); + system.shutdown(); } //#my-actor diff --git a/akka-docs/modules/code/akka/docs/actor/mailbox/DurableMailboxDocTestBase.java b/akka-docs/modules/code/akka/docs/actor/mailbox/DurableMailboxDocTestBase.java index 4ac3204d0b..cc9cb31bce 100644 --- a/akka-docs/modules/code/akka/docs/actor/mailbox/DurableMailboxDocTestBase.java +++ b/akka-docs/modules/code/akka/docs/actor/mailbox/DurableMailboxDocTestBase.java @@ -31,7 +31,7 @@ public class DurableMailboxDocTestBase { })); //#define-dispatcher myActor.tell("test"); - system.stop(); + system.shutdown(); } public static class MyUntypedActor extends UntypedActor { diff --git a/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/filequeue/tools/QDumper.scala b/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/filequeue/tools/QDumper.scala index 54c5ba36b6..06f151d84a 100644 --- a/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/filequeue/tools/QDumper.scala +++ b/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/filequeue/tools/QDumper.scala @@ -149,6 +149,6 @@ object QDumper { new QueueDumper(filename, system.log)() } - system.stop() + system.shutdown() } } diff --git a/akka-remote/src/test/scala/akka/remote/RemoteCommunicationSpec.scala b/akka-remote/src/test/scala/akka/remote/RemoteCommunicationSpec.scala index 199d070ddb..d7827134bc 100644 --- a/akka-remote/src/test/scala/akka/remote/RemoteCommunicationSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/RemoteCommunicationSpec.scala @@ -61,7 +61,7 @@ akka { implicit val timeout = system.settings.ActorTimeout override def atTermination() { - other.stop() + other.shutdown() } "Remoting" must { @@ -130,4 +130,4 @@ akka { } -} \ No newline at end of file +} diff --git a/akka-remote/src/test/scala/akka/remote/RemoteRouterSpec.scala b/akka-remote/src/test/scala/akka/remote/RemoteRouterSpec.scala index 60209d087b..f183a940a7 100644 --- a/akka-remote/src/test/scala/akka/remote/RemoteRouterSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/RemoteRouterSpec.scala @@ -40,7 +40,7 @@ akka { val other = ActorSystem("remote_sys", conf) override def atTermination() { - other.stop() + other.shutdown() } "A Remote Router" must { @@ -55,4 +55,4 @@ akka { } -} \ No newline at end of file +} diff --git a/akka-samples/akka-sample-hello/src/main/scala/sample/hello/Main.scala b/akka-samples/akka-sample-hello/src/main/scala/sample/hello/Main.scala index 710a099312..2921c2d27c 100644 --- a/akka-samples/akka-sample-hello/src/main/scala/sample/hello/Main.scala +++ b/akka-samples/akka-sample-hello/src/main/scala/sample/hello/Main.scala @@ -20,7 +20,7 @@ class HelloActor extends Actor { case Start ⇒ worldActor ! "Hello" case s: String ⇒ println("Received message: %s".format(s)) - context.system.stop() + context.system.shutdown() } } diff --git a/akka-stm/src/test/java/akka/stm/example/RetryExample.java b/akka-stm/src/test/java/akka/stm/example/RetryExample.java index 6a1ab1993a..590e05d94e 100644 --- a/akka-stm/src/test/java/akka/stm/example/RetryExample.java +++ b/akka-stm/src/test/java/akka/stm/example/RetryExample.java @@ -48,6 +48,6 @@ public class RetryExample { application.stop(transferer); - application.stop(); + application.shutdown(); } } diff --git a/akka-stm/src/test/java/akka/transactor/example/UntypedCoordinatedExample.java b/akka-stm/src/test/java/akka/transactor/example/UntypedCoordinatedExample.java index 64951e1533..4e72bea237 100644 --- a/akka-stm/src/test/java/akka/transactor/example/UntypedCoordinatedExample.java +++ b/akka-stm/src/test/java/akka/transactor/example/UntypedCoordinatedExample.java @@ -43,6 +43,6 @@ public class UntypedCoordinatedExample { application.stop(counter1); application.stop(counter2); - application.stop(); + application.shutdown(); } } diff --git a/akka-stm/src/test/java/akka/transactor/example/UntypedTransactorExample.java b/akka-stm/src/test/java/akka/transactor/example/UntypedTransactorExample.java index 24a36b7ea0..d5b0a2d691 100644 --- a/akka-stm/src/test/java/akka/transactor/example/UntypedTransactorExample.java +++ b/akka-stm/src/test/java/akka/transactor/example/UntypedTransactorExample.java @@ -42,6 +42,6 @@ public class UntypedTransactorExample { application.stop(counter1); application.stop(counter2); - application.stop(); + application.shutdown(); } } diff --git a/akka-stm/src/test/java/akka/transactor/test/UntypedCoordinatedIncrementTest.java b/akka-stm/src/test/java/akka/transactor/test/UntypedCoordinatedIncrementTest.java index a90e0a1952..f2df33a260 100644 --- a/akka-stm/src/test/java/akka/transactor/test/UntypedCoordinatedIncrementTest.java +++ b/akka-stm/src/test/java/akka/transactor/test/UntypedCoordinatedIncrementTest.java @@ -44,7 +44,7 @@ public class UntypedCoordinatedIncrementTest { @AfterClass public static void afterAll() { - system.stop(); + system.shutdown(); system = null; } @@ -113,6 +113,6 @@ public class UntypedCoordinatedIncrementTest { @After public void stop() { - application.stop(); + application.shutdown(); } } diff --git a/akka-stm/src/test/java/akka/transactor/test/UntypedTransactorTest.java b/akka-stm/src/test/java/akka/transactor/test/UntypedTransactorTest.java index 528a2a14f8..7436bbb132 100644 --- a/akka-stm/src/test/java/akka/transactor/test/UntypedTransactorTest.java +++ b/akka-stm/src/test/java/akka/transactor/test/UntypedTransactorTest.java @@ -41,7 +41,7 @@ public class UntypedTransactorTest { @AfterClass public static void afterAll() { - system.stop(); + system.shutdown(); system = null; } diff --git a/akka-testkit/src/test/scala/akka/testkit/AkkaSpec.scala b/akka-testkit/src/test/scala/akka/testkit/AkkaSpec.scala index 39006bec5a..bba22d7f76 100644 --- a/akka-testkit/src/test/scala/akka/testkit/AkkaSpec.scala +++ b/akka-testkit/src/test/scala/akka/testkit/AkkaSpec.scala @@ -64,7 +64,7 @@ abstract class AkkaSpec(_system: ActorSystem) } final override def afterAll { - system.stop() + system.shutdown() try system.asInstanceOf[ActorSystemImpl].terminationFuture.await(5 seconds) catch { case _: FutureTimeoutException ⇒ system.log.warning("Failed to stop [{}] within 5 seconds", system.name) } @@ -96,7 +96,7 @@ class AkkaSpecSpec extends WordSpec with MustMatchers { val ref = Seq(testActor, system.actorOf(Props.empty, "name")) } spec.ref foreach (_.isTerminated must not be true) - system.stop() + system.shutdown() spec.awaitCond(spec.ref forall (_.isTerminated), 2 seconds) } @@ -139,15 +139,15 @@ class AkkaSpecSpec extends WordSpec with MustMatchers { val latch = new TestLatch(1)(system) system.registerOnTermination(latch.countDown()) - system.stop() + system.shutdown() latch.await(2 seconds) (davyJones ? "Die!").get must be === "finally gone" // this will typically also contain log messages which were sent after the logger shutdown locker must contain(DeadLetter(42, davyJones, probe.ref)) } finally { - system.stop() - otherSystem.stop() + system.shutdown() + otherSystem.shutdown() } } diff --git a/akka-tutorials/akka-tutorial-first/src/main/java/akka/tutorial/first/java/Pi.java b/akka-tutorials/akka-tutorial-first/src/main/java/akka/tutorial/first/java/Pi.java index a2d280af18..ef553bce5c 100644 --- a/akka-tutorials/akka-tutorial-first/src/main/java/akka/tutorial/first/java/Pi.java +++ b/akka-tutorials/akka-tutorial-first/src/main/java/akka/tutorial/first/java/Pi.java @@ -168,6 +168,6 @@ public class Pi { latch.await(); // Shut down the system - system.stop(); + system.shutdown(); } } diff --git a/akka-tutorials/akka-tutorial-first/src/main/scala/Pi.scala b/akka-tutorials/akka-tutorial-first/src/main/scala/Pi.scala index 8b9e3c2dde..17a374986c 100644 --- a/akka-tutorials/akka-tutorial-first/src/main/scala/Pi.scala +++ b/akka-tutorials/akka-tutorial-first/src/main/scala/Pi.scala @@ -102,6 +102,6 @@ object Pi extends App { latch.await() // Shut down the system - system.stop() + system.shutdown() } } diff --git a/akka-tutorials/akka-tutorial-first/src/test/scala/WorkerSpec.scala b/akka-tutorials/akka-tutorial-first/src/test/scala/WorkerSpec.scala index a752b3c783..a9d2a202fd 100644 --- a/akka-tutorials/akka-tutorial-first/src/test/scala/WorkerSpec.scala +++ b/akka-tutorials/akka-tutorial-first/src/test/scala/WorkerSpec.scala @@ -17,7 +17,7 @@ class WorkerSpec extends WordSpec with MustMatchers with BeforeAndAfterAll { implicit val system = ActorSystem() override def afterAll { - system.stop() + system.shutdown() } "Worker" must { From 488576c62a5be639ee81df831c78ef9b64fe1487 Mon Sep 17 00:00:00 2001 From: Roland Date: Wed, 14 Dec 2011 01:34:04 +0100 Subject: [PATCH 4/6] make Davy Jones configurable --- akka-actor/src/main/resources/reference.conf | 1 + akka-actor/src/main/scala/akka/actor/ActorSystem.scala | 3 ++- akka-actor/src/main/scala/akka/actor/Locker.scala | 7 ++++--- 3 files changed, 7 insertions(+), 4 deletions(-) diff --git a/akka-actor/src/main/resources/reference.conf b/akka-actor/src/main/resources/reference.conf index ecbf916a9a..465e5c171b 100644 --- a/akka-actor/src/main/resources/reference.conf +++ b/akka-actor/src/main/resources/reference.conf @@ -37,6 +37,7 @@ akka { actor { provider = "akka.actor.LocalActorRefProvider" creation-timeout = 20s # Timeout for ActorSystem.actorOf + reaper-period = 5s # frequency with which stopping actors are prodded in case they had to be removed from their parents timeout = 5s # Default timeout for Future based invocations # - Actor: ask && ? # - UntypedActor: ask diff --git a/akka-actor/src/main/scala/akka/actor/ActorSystem.scala b/akka-actor/src/main/scala/akka/actor/ActorSystem.scala index 431dc320cc..26a81d5eba 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorSystem.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorSystem.scala @@ -73,6 +73,7 @@ object ActorSystem { val ProviderClass = getString("akka.actor.provider") val CreationTimeout = Timeout(Duration(getMilliseconds("akka.actor.creation-timeout"), MILLISECONDS)) + val ReaperPeriod = Duration(getMilliseconds("akka.actor.reaper-period"), MILLISECONDS) val ActorTimeout = Timeout(Duration(getMilliseconds("akka.actor.timeout"), MILLISECONDS)) val SerializeAllMessages = getBoolean("akka.actor.serialize-messages") @@ -446,7 +447,7 @@ class ActorSystemImpl(val name: String, applicationConfig: Config) extends Actor this } - lazy val locker: Locker = new Locker(scheduler, lookupRoot.path / "locker", deathWatch) + lazy val locker: Locker = new Locker(scheduler, ReaperPeriod, lookupRoot.path / "locker", deathWatch) def start() = _start diff --git a/akka-actor/src/main/scala/akka/actor/Locker.scala b/akka-actor/src/main/scala/akka/actor/Locker.scala index 0a719ba671..f9c9546f2b 100644 --- a/akka-actor/src/main/scala/akka/actor/Locker.scala +++ b/akka-actor/src/main/scala/akka/actor/Locker.scala @@ -4,11 +4,12 @@ package akka.actor import akka.dispatch._ +import akka.util.Duration import akka.util.duration._ import java.util.concurrent.ConcurrentHashMap import akka.event.DeathWatch -class Locker(scheduler: Scheduler, val path: ActorPath, val deathWatch: DeathWatch) extends MinimalActorRef { +class Locker(scheduler: Scheduler, period: Duration, val path: ActorPath, val deathWatch: DeathWatch) extends MinimalActorRef { class DavyJones extends Runnable { def run = { @@ -26,7 +27,7 @@ class Locker(scheduler: Scheduler, val path: ActorPath, val deathWatch: DeathWat private val heap = new ConcurrentHashMap[InternalActorRef, Long] - scheduler.schedule(5 seconds, 5 seconds, new DavyJones) + scheduler.schedule(period, period, new DavyJones) override def sendSystemMessage(msg: SystemMessage): Unit = this.!(msg) @@ -60,4 +61,4 @@ class Locker(scheduler: Scheduler, val path: ActorPath, val deathWatch: DeathWat if (sc != null) rebind(cell, sc) } -} \ No newline at end of file +} From 7d6c74d75c29a9c920f619c21cac474b10c12d1a Mon Sep 17 00:00:00 2001 From: Roland Date: Wed, 14 Dec 2011 12:47:44 +0100 Subject: [PATCH 5/6] UntypedActor hooks default to super. now, plus updated ScalaDoc --- .../src/main/scala/akka/actor/Actor.scala | 76 +++++++++------- .../main/scala/akka/actor/ActorSystem.scala | 18 ++-- .../main/scala/akka/actor/UntypedActor.scala | 88 ++++++++++++------- 3 files changed, 104 insertions(+), 78 deletions(-) diff --git a/akka-actor/src/main/scala/akka/actor/Actor.scala b/akka-actor/src/main/scala/akka/actor/Actor.scala index 4c8877463c..72c4ecabc3 100644 --- a/akka-actor/src/main/scala/akka/actor/Actor.scala +++ b/akka-actor/src/main/scala/akka/actor/Actor.scala @@ -156,15 +156,44 @@ object Actor { /** * Actor base trait that should be extended by or mixed to create an Actor with the semantics of the 'Actor Model': * http://en.wikipedia.org/wiki/Actor_model - *

- * An actor has a well-defined (non-cyclic) life-cycle. - *

- * => RUNNING (created and started actor) - can receive messages
- * => SHUTDOWN (when 'stop' or 'exit' is invoked) - can't do anything
- * 
* - *

- * The Actor's own ActorRef is available in the 'self' member variable. + * An actor has a well-defined (non-cyclic) life-cycle. + * - ''RUNNING'' (created and started actor) - can receive messages + * - ''SHUTDOWN'' (when 'stop' or 'exit' is invoked) - can't do anything + * + * The Actor's own [[akka.actor.ActorRef]] is available as `self`, the current + * message’s sender as `sender` and the [[akka.actor.ActorContext]] as + * `context`. The only abstract method is `receive` which shall return the + * initial behavior of the actor as a partial function (behavior can be changed + * using `context.become` and `context.unbecome`). + * + * {{{ + * class ExampleActor extends Actor { + * def receive = { + * // directly calculated reply + * case Request(r) => sender ! calculate(r) + * + * // just to demonstrate how to stop yourself + * case Shutdown => context.stop(self) + * + * // error kernel with child replying directly to “customer” + * case Dangerous(r) => context.actorOf(Props[ReplyToOriginWorker]).tell(PerformWork(r), sender) + * + * // error kernel with reply going through us + * case OtherJob(r) => context.actorOf(Props[ReplyToMeWorker]) ! JobRequest(r, sender) + * case JobReply(result, orig_s) => orig_s ! result + * } + * } + * }}} + * + * The last line demonstrates the essence of the error kernel design: spawn + * one-off actors which terminate after doing their job, pass on `sender` to + * allow direct reply if that is what makes sense, or round-trip the sender + * as shown with the fictitious JobRequest/JobReply message pair. + * + * If you don’t like writing `context` you can always `import context._` to get + * direct access to `actorOf`, `stop` etc. This is not default in order to keep + * the name-space clean. */ trait Actor { @@ -218,25 +247,8 @@ trait Actor { final def sender: ActorRef = context.sender /** - * User overridable callback/setting. - *

- * Partial function implementing the actor logic. - * To be implemented by concrete actor class. - *

- * Example code: - *

-   *   def receive = {
-   *     case Ping =>
-   *       println("got a 'Ping' message")
-   *       sender ! "pong"
-   *
-   *     case OneWay =>
-   *       println("got a 'OneWay' message")
-   *
-   *     case unknown =>
-   *       println("unknown message: " + unknown)
-   * }
-   * 
+ * This defines the initial actor behavior, it must return a partial function + * with the actor logic. */ protected def receive: Receive @@ -258,11 +270,10 @@ trait Actor { def postStop() {} /** - * User overridable callback. + * User overridable callback: '''By default it disposes of all children and then calls `postStop()`.''' *

* Is called on a crashed Actor right BEFORE it is restarted to allow clean * up of resources before Actor is terminated. - * By default it disposes of all children calls postStop(). */ def preRestart(reason: Throwable, message: Option[Any]) { context.children foreach (context.stop(_)) @@ -270,10 +281,9 @@ trait Actor { } /** - * User overridable callback. + * User overridable callback: By default it calls `preStart()`. *

* Is called right AFTER restart on the newly created Actor to allow reinitialization after an Actor crash. - * By default it calls preStart() */ def postRestart(reason: Throwable) { preStart() } @@ -281,7 +291,9 @@ trait Actor { * User overridable callback. *

* Is called when a message isn't handled by the current behavior of the actor - * by default it does: EventHandler.warning(self, message) + * by default it fails with either a [[akka.actor.DeathPactException]] (in + * case of an unhandled [[akka.actor.Terminated]] message) or a + * [[akka.actor.UnhandledMessageException]]. */ def unhandled(message: Any) { message match { diff --git a/akka-actor/src/main/scala/akka/actor/ActorSystem.scala b/akka-actor/src/main/scala/akka/actor/ActorSystem.scala index 26a81d5eba..f801f7120d 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorSystem.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorSystem.scala @@ -365,18 +365,12 @@ class ActorSystemImpl(val name: String, applicationConfig: Config) extends Actor def stop(actor: ActorRef): Unit = { implicit val timeout = settings.CreationTimeout val path = actor.path - if (path.parent == guardian.path) { - (guardian ? StopChild(actor)).get match { - case ex: Exception ⇒ throw ex - case _ ⇒ - } - } else if (path.parent == systemGuardian.path) { - (systemGuardian ? StopChild(actor)).get match { - case ex: Exception ⇒ throw ex - case _ ⇒ - } - } else { - actor.asInstanceOf[InternalActorRef].stop() + val guard = guardian.path + val sys = systemGuardian.path + path.parent match { + case `guard` ⇒ (guardian ? StopChild(actor)).get + case `sys` ⇒ (systemGuardian ? StopChild(actor)).get + case _ ⇒ actor.asInstanceOf[InternalActorRef].stop() } } diff --git a/akka-actor/src/main/scala/akka/actor/UntypedActor.scala b/akka-actor/src/main/scala/akka/actor/UntypedActor.scala index 1692396a8f..13677f84cc 100644 --- a/akka-actor/src/main/scala/akka/actor/UntypedActor.scala +++ b/akka-actor/src/main/scala/akka/actor/UntypedActor.scala @@ -8,46 +8,67 @@ import akka.japi.{ Creator, Procedure } import akka.dispatch.{ MessageDispatcher, Promise } /** + * Actor base trait that should be extended by or mixed to create an Actor with the semantics of the 'Actor Model': + * http://en.wikipedia.org/wiki/Actor_model + * + * This class is the Java cousin to the [[akka.actor.Actor]] Scala interface. * Subclass this abstract class to create a MDB-style untyped actor. - *

- * This class is meant to be used from Java. - *

+ * + * An actor has a well-defined (non-cyclic) life-cycle. + * - ''RUNNING'' (created and started actor) - can receive messages + * - ''SHUTDOWN'' (when 'stop' or 'exit' is invoked) - can't do anything + * + * The Actor's own [[akka.actor.ActorRef]] is available as `getSelf()`, the current + * message’s sender as `getSender()` and the [[akka.actor.UntypedActorContext]] as + * `getContext()`. The only abstract method is `onReceive()` which is invoked for + * each processed message unless dynamically overridden using `getContext().become()`. + * * Here is an example on how to create and use an UntypedActor: - *

+ *
+ * {{{
  *  public class SampleUntypedActor extends UntypedActor {
+ *  
+ *    public class Reply {
+ *      final public ActorRef sender;
+ *      final public Result result;
+ *      Reply(ActorRef sender, Result result) {
+ *        this.sender = sender;
+ *        this.result = result;
+ *      }
+ *    }
+ *    
  *    public void onReceive(Object message) throws Exception {
  *      if (message instanceof String) {
  *        String msg = (String)message;
  *
- *        if (msg.equals("UseReply")) {
- *          // Reply to original sender of message using the 'reply' method
- *          getContext().getSender().tell(msg + ":" + getSelf().getAddress());
- *
- *        } else if (msg.equals("UseSender") && getSender().isDefined()) {
- *          // Reply to original sender of message using the sender reference
- *          // also passing along my own reference (the self)
- *          getSender().get().tell(msg, getSelf());
+ *        if (msg.equals("UseSender")) {
+ *          // Reply to original sender of message
+ *          getSender().tell(msg + ":" + getSelf());
  *
  *        } else if (msg.equals("SendToSelf")) {
  *          // Send message to the actor itself recursively
- *          getSelf().tell(msg)
+ *          getSelf().tell("SomeOtherMessage");
  *
- *        } else if (msg.equals("ForwardMessage")) {
- *          // Retreive an actor from the ActorRegistry by ID and get an ActorRef back
- *          ActorRef actorRef = Actor.registry.local.actorsFor("some-actor-id").head();
+ *        } else if (msg.equals("ErrorKernelWithDirectReply")) {
+ *          // Send work to one-off child which will reply directly to original sender
+ *          getContext().actorOf(new Props(Worker.class)).tell("DoSomeDangerousWork", getSender());
+ *
+ *        } else if (msg.equals("ErrorKernelWithReplyHere")) {
+ *          // Send work to one-off child and collect the answer, reply handled further down
+ *          getContext().actorOf(new Props(Worker.class)).tell("DoWorkAndReplyToMe");
  *
  *        } else throw new IllegalArgumentException("Unknown message: " + message);
+ *
+ *      } else if (message instanceof Reply) {
+ *
+ *        final Reply reply = (Reply) message;
+ *        // might want to do some processing/book-keeping here
+ *        reply.sender.tell(reply.result);
+ *
  *      } else throw new IllegalArgumentException("Unknown message: " + message);
  *    }
- *
- *    public static void main(String[] args) {
- *      ActorSystem system = ActorSystem.create("Sample");
- *      ActorRef actor = system.actorOf(SampleUntypedActor.class);
- *      actor.tell("SendToSelf");
- *      actor.stop();
- *    }
  *  }
- * 
+ * }}} */ abstract class UntypedActor extends Actor { @@ -65,8 +86,9 @@ abstract class UntypedActor extends Actor { def getSelf(): ActorRef = self /** - * The reference sender Actor of the last received message. - * Is defined if the message was sent from another Actor, else None. + * The reference sender Actor of the currently processed message. This is + * always a legal destination to send to, even if there is no logical recipient + * for the reply, in which case it will be sent to the dead letter mailbox. */ def getSender(): ActorRef = sender @@ -77,7 +99,7 @@ abstract class UntypedActor extends Actor { * Actor are automatically started asynchronously when created. * Empty default implementation. */ - override def preStart() {} + override def preStart(): Unit = super.preStart() /** * User overridable callback. @@ -85,24 +107,22 @@ abstract class UntypedActor extends Actor { * Is called asynchronously after 'actor.stop()' is invoked. * Empty default implementation. */ - override def postStop() {} + override def postStop(): Unit = super.postStop() /** - * User overridable callback. + * User overridable callback: '''By default it disposes of all children and then calls `postStop()`.''' *

* Is called on a crashed Actor right BEFORE it is restarted to allow clean * up of resources before Actor is terminated. - * By default it calls postStop() */ - override def preRestart(reason: Throwable, message: Option[Any]) { postStop() } + override def preRestart(reason: Throwable, message: Option[Any]): Unit = super.preRestart(reason, message) /** - * User overridable callback. + * User overridable callback: By default it calls `preStart()`. *

* Is called right AFTER restart on the newly created Actor to allow reinitialization after an Actor crash. - * By default it calls preStart() */ - override def postRestart(reason: Throwable) { preStart() } + override def postRestart(reason: Throwable): Unit = super.postRestart(reason) /** * User overridable callback. From 49837e4782eb1eef6ed84e9dd064b2cb5f1ae884 Mon Sep 17 00:00:00 2001 From: Roland Date: Wed, 14 Dec 2011 15:24:29 +0100 Subject: [PATCH 6/6] incorporate review comments - fix some code formatting & docs - make ActorCell.parent a volatile var --- .../src/test/scala/akka/actor/Supervisor.scala | 1 + .../akka/actor/SupervisorHierarchySpec.scala | 1 + akka-actor/src/main/resources/reference.conf | 2 +- .../src/main/scala/akka/actor/ActorCell.scala | 2 +- .../scala/akka/actor/ActorRefProvider.scala | 4 ++++ .../main/scala/akka/actor/ActorSystem.scala | 4 ++-- .../src/main/scala/akka/actor/Locker.scala | 18 ++---------------- .../main/scala/akka/actor/UntypedActor.scala | 4 ++-- 8 files changed, 14 insertions(+), 22 deletions(-) diff --git a/akka-actor-tests/src/test/scala/akka/actor/Supervisor.scala b/akka-actor-tests/src/test/scala/akka/actor/Supervisor.scala index a956c2d090..174939915d 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/Supervisor.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/Supervisor.scala @@ -7,5 +7,6 @@ class Supervisor extends Actor { def receive = { case x: Props ⇒ sender ! context.actorOf(x) } + // need to override the default of stopping all children upon restart, tests rely on keeping them around override def preRestart(cause: Throwable, msg: Option[Any]) {} } diff --git a/akka-actor-tests/src/test/scala/akka/actor/SupervisorHierarchySpec.scala b/akka-actor-tests/src/test/scala/akka/actor/SupervisorHierarchySpec.scala index 3b9ebbad73..d51b333b35 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/SupervisorHierarchySpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/SupervisorHierarchySpec.scala @@ -15,6 +15,7 @@ object SupervisorHierarchySpec { protected def receive = { case p: Props ⇒ sender ! context.actorOf(p) } + // test relies on keeping children around during restart override def preRestart(cause: Throwable, msg: Option[Any]) {} override def postRestart(reason: Throwable) = { countDown.countDown() diff --git a/akka-actor/src/main/resources/reference.conf b/akka-actor/src/main/resources/reference.conf index 465e5c171b..3a8acb7ca7 100644 --- a/akka-actor/src/main/resources/reference.conf +++ b/akka-actor/src/main/resources/reference.conf @@ -37,7 +37,7 @@ akka { actor { provider = "akka.actor.LocalActorRefProvider" creation-timeout = 20s # Timeout for ActorSystem.actorOf - reaper-period = 5s # frequency with which stopping actors are prodded in case they had to be removed from their parents + reaper-interval = 5s # frequency with which stopping actors are prodded in case they had to be removed from their parents timeout = 5s # Default timeout for Future based invocations # - Actor: ask && ? # - UntypedActor: ask diff --git a/akka-actor/src/main/scala/akka/actor/ActorCell.scala b/akka-actor/src/main/scala/akka/actor/ActorCell.scala index 74bd482162..be6bf2d1f4 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorCell.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorCell.scala @@ -185,7 +185,7 @@ private[akka] class ActorCell( val system: ActorSystemImpl, val self: InternalActorRef, val props: Props, - final val parent: InternalActorRef, + @volatile var parent: InternalActorRef, /*no member*/ _receiveTimeout: Option[Duration], var hotswap: Stack[PartialFunction[Any, Unit]]) extends UntypedActorContext { diff --git a/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala b/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala index 01cde0720c..a07b14ac43 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala @@ -240,6 +240,10 @@ trait ActorRefFactory { * an asynchronous operation, i.e. involves a message send, but if invoked * on an [[akka.actor.ActorContext]] if operating on a child of that * context it will free up the name for immediate reuse. + * + * When invoked on [[akka.actor.ActorSystem]] for a top-level actor, this + * method sends a message to the guardian actor and blocks waiting for a reply, + * see `akka.actor.creation-timeout` in the `reference.conf`. */ def stop(actor: ActorRef): Unit } diff --git a/akka-actor/src/main/scala/akka/actor/ActorSystem.scala b/akka-actor/src/main/scala/akka/actor/ActorSystem.scala index f801f7120d..0ce6b4d529 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorSystem.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorSystem.scala @@ -73,7 +73,7 @@ object ActorSystem { val ProviderClass = getString("akka.actor.provider") val CreationTimeout = Timeout(Duration(getMilliseconds("akka.actor.creation-timeout"), MILLISECONDS)) - val ReaperPeriod = Duration(getMilliseconds("akka.actor.reaper-period"), MILLISECONDS) + val ReaperInterval = Duration(getMilliseconds("akka.actor.reaper-interval"), MILLISECONDS) val ActorTimeout = Timeout(Duration(getMilliseconds("akka.actor.timeout"), MILLISECONDS)) val SerializeAllMessages = getBoolean("akka.actor.serialize-messages") @@ -441,7 +441,7 @@ class ActorSystemImpl(val name: String, applicationConfig: Config) extends Actor this } - lazy val locker: Locker = new Locker(scheduler, ReaperPeriod, lookupRoot.path / "locker", deathWatch) + lazy val locker: Locker = new Locker(scheduler, ReaperInterval, lookupRoot.path / "locker", deathWatch) def start() = _start diff --git a/akka-actor/src/main/scala/akka/actor/Locker.scala b/akka-actor/src/main/scala/akka/actor/Locker.scala index f9c9546f2b..8bbcdd15e6 100644 --- a/akka-actor/src/main/scala/akka/actor/Locker.scala +++ b/akka-actor/src/main/scala/akka/actor/Locker.scala @@ -15,7 +15,7 @@ class Locker(scheduler: Scheduler, period: Duration, val path: ActorPath, val de def run = { val iter = heap.entrySet.iterator while (iter.hasNext) { - val soul = iter.next(); + val soul = iter.next() deathWatch.subscribe(Locker.this, soul.getKey) // in case Terminated got lost somewhere soul.getKey match { case _: LocalActorRef ⇒ // nothing to do, they know what they signed up for @@ -41,24 +41,10 @@ class Locker(scheduler: Scheduler, period: Duration, val path: ActorPath, val de soul match { case local: LocalActorRef ⇒ val cell = local.underlying - rebind(cell, cell.getClass) + cell.parent = this case _ ⇒ } case _ ⇒ // ignore } - @scala.annotation.tailrec - final private def rebind(cell: ActorCell, clazz: Class[_]): Unit = { - try { - val heart = clazz.getDeclaredField("parent") - heart.setAccessible(true) - heart.set(cell, this) - return - } catch { - case _: NoSuchFieldException ⇒ - } - val sc = clazz.getSuperclass - if (sc != null) rebind(cell, sc) - } - } diff --git a/akka-actor/src/main/scala/akka/actor/UntypedActor.scala b/akka-actor/src/main/scala/akka/actor/UntypedActor.scala index 13677f84cc..ccac32f82f 100644 --- a/akka-actor/src/main/scala/akka/actor/UntypedActor.scala +++ b/akka-actor/src/main/scala/akka/actor/UntypedActor.scala @@ -27,7 +27,7 @@ import akka.dispatch.{ MessageDispatcher, Promise } * * {{{ * public class SampleUntypedActor extends UntypedActor { - * + * * public class Reply { * final public ActorRef sender; * final public Result result; @@ -36,7 +36,7 @@ import akka.dispatch.{ MessageDispatcher, Promise } * this.result = result; * } * } - * + * * public void onReceive(Object message) throws Exception { * if (message instanceof String) { * String msg = (String)message;