diff --git a/akka-actor-tests/src/test/java/akka/actor/JavaAPI.java b/akka-actor-tests/src/test/java/akka/actor/JavaAPI.java index 9678cbc76d..7070e8bf67 100644 --- a/akka-actor-tests/src/test/java/akka/actor/JavaAPI.java +++ b/akka-actor-tests/src/test/java/akka/actor/JavaAPI.java @@ -20,7 +20,7 @@ public class JavaAPI { @AfterClass public static void afterAll() { - system.stop(); + system.shutdown(); system = null; } diff --git a/akka-actor-tests/src/test/java/akka/actor/JavaExtension.java b/akka-actor-tests/src/test/java/akka/actor/JavaExtension.java index 0a994b93d6..e7597309c4 100644 --- a/akka-actor-tests/src/test/java/akka/actor/JavaExtension.java +++ b/akka-actor-tests/src/test/java/akka/actor/JavaExtension.java @@ -58,7 +58,7 @@ public class JavaExtension { @AfterClass public static void afterAll() { - system.stop(); + system.shutdown(); system = null; } diff --git a/akka-actor-tests/src/test/java/akka/dispatch/JavaFutureTests.java b/akka-actor-tests/src/test/java/akka/dispatch/JavaFutureTests.java index d534d87103..3f56ae2781 100644 --- a/akka-actor-tests/src/test/java/akka/dispatch/JavaFutureTests.java +++ b/akka-actor-tests/src/test/java/akka/dispatch/JavaFutureTests.java @@ -34,7 +34,7 @@ public class JavaFutureTests { @AfterClass public static void afterAll() { - system.stop(); + system.shutdown(); system = null; } diff --git a/akka-actor-tests/src/test/scala/akka/actor/ActorFireForgetRequestReplySpec.scala b/akka-actor-tests/src/test/scala/akka/actor/ActorFireForgetRequestReplySpec.scala index aa7d76d3dc..e374453901 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/ActorFireForgetRequestReplySpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/ActorFireForgetRequestReplySpec.scala @@ -87,7 +87,7 @@ class ActorFireForgetRequestReplySpec extends AkkaSpec with BeforeAndAfterEach w state.finished.await 1.second.dilated.sleep() actor.isTerminated must be(true) - supervisor.stop() + system.stop(supervisor) } } } diff --git a/akka-actor-tests/src/test/scala/akka/actor/ActorLifeCycleSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/ActorLifeCycleSpec.scala index 8f3a58e5e5..0bb1682b81 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/ActorLifeCycleSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/ActorLifeCycleSpec.scala @@ -61,7 +61,7 @@ class ActorLifeCycleSpec extends AkkaSpec with BeforeAndAfterEach with ImplicitS restarter ! Kill expectMsg(("postStop", id, 3)) expectNoMsg(1 seconds) - supervisor.stop + system.stop(supervisor) } } @@ -92,7 +92,7 @@ class ActorLifeCycleSpec extends AkkaSpec with BeforeAndAfterEach with ImplicitS restarter ! Kill expectMsg(("postStop", id, 3)) expectNoMsg(1 seconds) - supervisor.stop + system.stop(supervisor) } } @@ -105,10 +105,10 @@ class ActorLifeCycleSpec extends AkkaSpec with BeforeAndAfterEach with ImplicitS expectMsg(("preStart", id, 0)) a ! "status" expectMsg(("OK", id, 0)) - a.stop + system.stop(a) expectMsg(("postStop", id, 0)) expectNoMsg(1 seconds) - supervisor.stop + system.stop(supervisor) } } diff --git a/akka-actor-tests/src/test/scala/akka/actor/ActorRefSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/ActorRefSpec.scala index f1cca42011..47dc5dcae8 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/ActorRefSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/ActorRefSpec.scala @@ -42,7 +42,7 @@ object ActorRefSpec { case "work" ⇒ { work sender ! "workDone" - self.stop() + context.stop(self) } case ReplyTo(replyTo) ⇒ { work @@ -344,8 +344,8 @@ class ActorRefSpec extends AkkaSpec with DefaultTimeout { latch.await - clientRef.stop() - serverRef.stop() + system.stop(clientRef) + system.stop(serverRef) } "stop when sent a poison pill" in { diff --git a/akka-actor-tests/src/test/scala/akka/actor/ActorTimeoutSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/ActorTimeoutSpec.scala index 735867bc97..f97c68913e 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/ActorTimeoutSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/ActorTimeoutSpec.scala @@ -29,7 +29,7 @@ class ActorTimeoutSpec extends AkkaSpec with BeforeAndAfterAll with DefaultTimeo try { val f = echo ? "hallo" intercept[FutureTimeoutException] { f.await } - } finally { echo.stop } + } finally { system.stop(echo) } } } @@ -41,14 +41,14 @@ class ActorTimeoutSpec extends AkkaSpec with BeforeAndAfterAll with DefaultTimeo val f = (echo ? "hallo").mapTo[String] intercept[FutureTimeoutException] { f.await } f.value must be(None) - } finally { echo.stop } + } finally { system.stop(echo) } } } "use explicitly supplied timeout" in { within(testTimeout - 100.millis, testTimeout + 300.millis) { val echo = actorWithTimeout(Props.defaultTimeout) - try { (echo.?("hallo", testTimeout)).as[String] must be(None) } finally { echo.stop } + try { (echo.?("hallo", testTimeout)).as[String] must be(None) } finally { system.stop(echo) } } } } diff --git a/akka-actor-tests/src/test/scala/akka/actor/Bench.scala b/akka-actor-tests/src/test/scala/akka/actor/Bench.scala index 4ef5a94b12..2432cc113d 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/Bench.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/Bench.scala @@ -85,7 +85,7 @@ object Chameneos { sumMeetings += i if (numFaded == numChameneos) { Chameneos.end = System.currentTimeMillis - self.stop() + context.stop(self) } case msg @ Meet(a, c) ⇒ @@ -107,10 +107,11 @@ object Chameneos { def run { // System.setProperty("akka.config", "akka.conf") Chameneos.start = System.currentTimeMillis - val system = ActorSystem().actorOf(Props(new Mall(1000000, 4))) + val system = ActorSystem() + val actor = system.actorOf(Props(new Mall(1000000, 4))) Thread.sleep(10000) println("Elapsed: " + (end - start)) - system.stop() + system.shutdown() } def main(args: Array[String]): Unit = run diff --git a/akka-actor-tests/src/test/scala/akka/actor/ConsistencySpec.scala b/akka-actor-tests/src/test/scala/akka/actor/ConsistencySpec.scala index 1118daff1c..1638cd9e4b 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/ConsistencySpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/ConsistencySpec.scala @@ -26,7 +26,7 @@ object ConsistencySpec { } lastStep = step - case "done" ⇒ sender ! "done"; self.stop() + case "done" ⇒ sender ! "done"; context.stop(self) } } } diff --git a/akka-actor-tests/src/test/scala/akka/actor/DeathWatchSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/DeathWatchSpec.scala index 9aba8979c1..c3fae8e8a8 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/DeathWatchSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/DeathWatchSpec.scala @@ -43,9 +43,9 @@ class DeathWatchSpec extends AkkaSpec with BeforeAndAfterEach with ImplicitSende expectTerminationOf(terminal) expectTerminationOf(terminal) - monitor1.stop() - monitor2.stop() - monitor3.stop() + system.stop(monitor1) + system.stop(monitor2) + system.stop(monitor3) } "notify with _current_ monitors with one Terminated message when an Actor is stopped" in { @@ -69,9 +69,9 @@ class DeathWatchSpec extends AkkaSpec with BeforeAndAfterEach with ImplicitSende expectTerminationOf(terminal) expectTerminationOf(terminal) - monitor1.stop() - monitor2.stop() - monitor3.stop() + system.stop(monitor1) + system.stop(monitor2) + system.stop(monitor3) } "notify with a Terminated message once when an Actor is stopped but not when restarted" in { @@ -90,7 +90,7 @@ class DeathWatchSpec extends AkkaSpec with BeforeAndAfterEach with ImplicitSende expectTerminationOf(terminal) terminal.isTerminated must be === true - supervisor.stop() + system.stop(supervisor) } } @@ -99,9 +99,9 @@ class DeathWatchSpec extends AkkaSpec with BeforeAndAfterEach with ImplicitSende case class FF(fail: Failed) val supervisor = system.actorOf(Props[Supervisor] .withFaultHandler(new OneForOneStrategy(FaultHandlingStrategy.makeDecider(List(classOf[Exception])), Some(0)) { - override def handleFailure(child: ActorRef, cause: Throwable, stats: ChildRestartStats, children: Iterable[ChildRestartStats]) = { + override def handleFailure(context: ActorContext, child: ActorRef, cause: Throwable, stats: ChildRestartStats, children: Iterable[ChildRestartStats]) = { testActor.tell(FF(Failed(cause)), child) - super.handleFailure(child, cause, stats, children) + super.handleFailure(context, child, cause, stats, children) } })) diff --git a/akka-actor-tests/src/test/scala/akka/actor/DeployerSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/DeployerSpec.scala index e38ea1c3d4..83837012aa 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/DeployerSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/DeployerSpec.scala @@ -95,7 +95,7 @@ class DeployerSpec extends AkkaSpec(DeployerSpec.deployerConf) { } """, ConfigParseOptions.defaults).withFallback(AkkaSpec.testConf) - ActorSystem("invalid", invalidDeployerConf).stop() + ActorSystem("invalid", invalidDeployerConf).shutdown() } } diff --git a/akka-actor-tests/src/test/scala/akka/actor/FSMActorSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/FSMActorSpec.scala index e4a30e10e0..a856c045c1 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/FSMActorSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/FSMActorSpec.scala @@ -187,7 +187,7 @@ class FSMActorSpec extends AkkaSpec(Map("akka.actor.debug.fsm" -> true)) with Im } val ref = system.actorOf(Props(fsm)) started.await - ref.stop() + system.stop(ref) expectMsg(1 second, fsm.StopEvent(Shutdown, 1, null)) } @@ -233,7 +233,7 @@ class FSMActorSpec extends AkkaSpec(Map("akka.actor.debug.fsm" -> true)) with Im } } } finally { - fsmEventSystem.stop() + fsmEventSystem.shutdown() } } diff --git a/akka-actor-tests/src/test/scala/akka/actor/FSMTransitionSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/FSMTransitionSpec.scala index 1b1f90e5b3..9db408770c 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/FSMTransitionSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/FSMTransitionSpec.scala @@ -78,7 +78,7 @@ class FSMTransitionSpec extends AkkaSpec with ImplicitSender { within(300 millis) { fsm ! SubscribeTransitionCallBack(forward) expectMsg(CurrentState(fsm, 0)) - forward.stop() + system.stop(forward) fsm ! "tick" expectNoMsg } diff --git a/akka-actor-tests/src/test/scala/akka/actor/IOActor.scala b/akka-actor-tests/src/test/scala/akka/actor/IOActor.scala index f2127d92bc..7047760371 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/IOActor.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/IOActor.scala @@ -196,9 +196,9 @@ class IOActorSpec extends AkkaSpec with BeforeAndAfterEach with DefaultTimeout { f1.get must equal(ByteString("Hello World!1")) f2.get must equal(ByteString("Hello World!2")) f3.get must equal(ByteString("Hello World!3")) - client.stop - server.stop - ioManager.stop + system.stop(client) + system.stop(server) + system.stop(ioManager) } "run echo server under high load" in { @@ -210,9 +210,9 @@ class IOActorSpec extends AkkaSpec with BeforeAndAfterEach with DefaultTimeout { val list = List.range(0, 1000) val f = Future.traverse(list)(i ⇒ client ? ByteString(i.toString)) assert(f.get.size === 1000) - client.stop - server.stop - ioManager.stop + system.stop(client) + system.stop(server) + system.stop(ioManager) } "run echo server under high load with small buffer" in { @@ -224,9 +224,9 @@ class IOActorSpec extends AkkaSpec with BeforeAndAfterEach with DefaultTimeout { val list = List.range(0, 1000) val f = Future.traverse(list)(i ⇒ client ? ByteString(i.toString)) assert(f.get.size === 1000) - client.stop - server.stop - ioManager.stop + system.stop(client) + system.stop(server) + system.stop(ioManager) } "run key-value store" in { @@ -250,10 +250,10 @@ class IOActorSpec extends AkkaSpec with BeforeAndAfterEach with DefaultTimeout { f4.get must equal("OK") f5.get must equal(ByteString("I'm a test!")) f6.get must equal(Map("hello" -> ByteString("World"), "test" -> ByteString("I'm a test!"))) - client1.stop - client2.stop - server.stop - ioManager.stop + system.stop(client1) + system.stop(client2) + system.stop(server) + system.stop(ioManager) } } diff --git a/akka-actor-tests/src/test/scala/akka/actor/ReceiveTimeoutSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/ReceiveTimeoutSpec.scala index 02b5aab8c1..9706a77d9f 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/ReceiveTimeoutSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/ReceiveTimeoutSpec.scala @@ -26,7 +26,7 @@ class ReceiveTimeoutSpec extends AkkaSpec { })) timeoutLatch.await - timeoutActor.stop() + system.stop(timeoutActor) } "reschedule timeout after regular receive" in { @@ -45,7 +45,7 @@ class ReceiveTimeoutSpec extends AkkaSpec { timeoutActor ! Tick timeoutLatch.await - timeoutActor.stop() + system.stop(timeoutActor) } "be able to turn off timeout if desired" in { @@ -69,7 +69,7 @@ class ReceiveTimeoutSpec extends AkkaSpec { timeoutLatch.await count.get must be(1) - timeoutActor.stop() + system.stop(timeoutActor) } "not receive timeout message when not specified" in { @@ -82,7 +82,7 @@ class ReceiveTimeoutSpec extends AkkaSpec { })) timeoutLatch.awaitTimeout(1 second) // timeout expected - timeoutActor.stop() + system.stop(timeoutActor) } "have ReceiveTimeout eq to Actors ReceiveTimeout" in { diff --git a/akka-actor-tests/src/test/scala/akka/actor/Supervisor.scala b/akka-actor-tests/src/test/scala/akka/actor/Supervisor.scala index 6c438f1776..174939915d 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/Supervisor.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/Supervisor.scala @@ -7,4 +7,6 @@ class Supervisor extends Actor { def receive = { case x: Props ⇒ sender ! context.actorOf(x) } + // need to override the default of stopping all children upon restart, tests rely on keeping them around + override def preRestart(cause: Throwable, msg: Option[Any]) {} } diff --git a/akka-actor-tests/src/test/scala/akka/actor/SupervisorHierarchySpec.scala b/akka-actor-tests/src/test/scala/akka/actor/SupervisorHierarchySpec.scala index dc45d012fd..d51b333b35 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/SupervisorHierarchySpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/SupervisorHierarchySpec.scala @@ -15,6 +15,8 @@ object SupervisorHierarchySpec { protected def receive = { case p: Props ⇒ sender ! context.actorOf(p) } + // test relies on keeping children around during restart + override def preRestart(cause: Throwable, msg: Option[Any]) {} override def postRestart(reason: Throwable) = { countDown.countDown() } diff --git a/akka-actor-tests/src/test/scala/akka/actor/SupervisorMiscSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/SupervisorMiscSpec.scala index 6438d6eee3..899626330f 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/SupervisorMiscSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/SupervisorMiscSpec.scala @@ -24,7 +24,7 @@ class SupervisorMiscSpec extends AkkaSpec with DefaultTimeout { override def postRestart(cause: Throwable) { countDownLatch.countDown() } protected def receive = { case "status" ⇒ this.sender ! "OK" - case _ ⇒ this.self.stop() + case _ ⇒ this.context.stop(self) } }) diff --git a/akka-actor-tests/src/test/scala/akka/actor/SupervisorSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/SupervisorSpec.scala index fdd87a2ba4..a78c8576c2 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/SupervisorSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/SupervisorSpec.scala @@ -306,7 +306,7 @@ class SupervisorSpec extends AkkaSpec with BeforeAndAfterEach with ImplicitSende inits.get must be(3) - supervisor.stop() + system.stop(supervisor) } } } diff --git a/akka-actor-tests/src/test/scala/akka/actor/SupervisorTreeSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/SupervisorTreeSpec.scala index 9ed84ca2b6..c3723b8564 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/SupervisorTreeSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/SupervisorTreeSpec.scala @@ -35,7 +35,7 @@ class SupervisorTreeSpec extends AkkaSpec with ImplicitSender with DefaultTimeou expectMsg(middleActor.path) expectMsg(lastActor.path) expectNoMsg(2 seconds) - headActor.stop() + system.stop(headActor) } } } diff --git a/akka-actor-tests/src/test/scala/akka/actor/Ticket669Spec.scala b/akka-actor-tests/src/test/scala/akka/actor/Ticket669Spec.scala index 154ba58fcd..032c2ade05 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/Ticket669Spec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/Ticket669Spec.scala @@ -28,7 +28,7 @@ class Ticket669Spec extends AkkaSpec with BeforeAndAfterAll with ImplicitSender supervised.!("test")(testActor) expectMsg("failure1") - supervisor.stop() + system.stop(supervisor) } } @@ -39,7 +39,7 @@ class Ticket669Spec extends AkkaSpec with BeforeAndAfterAll with ImplicitSender supervised.!("test")(testActor) expectMsg("failure2") - supervisor.stop() + system.stop(supervisor) } } } diff --git a/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala index bffa5bac82..5bcf8467c3 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala @@ -77,7 +77,7 @@ object ActorModelSpec { case Forward(to, msg) ⇒ ack; to.forward(msg); busy.switchOff() case CountDown(latch) ⇒ ack; latch.countDown(); busy.switchOff() case Increment(count) ⇒ ack; count.incrementAndGet(); busy.switchOff() - case CountDownNStop(l) ⇒ ack; l.countDown(); self.stop(); busy.switchOff() + case CountDownNStop(l) ⇒ ack; l.countDown(); context.stop(self); busy.switchOff() case Restart ⇒ ack; busy.switchOff(); throw new Exception("Restart requested") case Interrupt ⇒ ack; sender ! Status.Failure(new ActorInterruptedException(new InterruptedException("Ping!"))); busy.switchOff(); throw new InterruptedException("Ping!") case ThrowException(e: Throwable) ⇒ ack; busy.switchOff(); throw e @@ -239,7 +239,7 @@ abstract class ActorModelSpec extends AkkaSpec with DefaultTimeout { assertDispatcher(dispatcher)(stops = 0) val a = newTestActor(dispatcher) assertDispatcher(dispatcher)(stops = 0) - a.stop() + system.stop(a) assertDispatcher(dispatcher)(stops = 1) assertRef(a, dispatcher)( suspensions = 0, @@ -260,7 +260,7 @@ abstract class ActorModelSpec extends AkkaSpec with DefaultTimeout { assertDispatcher(dispatcher)(stops = 2) - a2.stop + system.stop(a2) assertDispatcher(dispatcher)(stops = 3) } @@ -279,7 +279,7 @@ abstract class ActorModelSpec extends AkkaSpec with DefaultTimeout { assertCountDown(oneAtATime, (1.5 seconds).dilated.toMillis, "Processed message when allowed") assertRefDefaultZero(a)(registers = 1, msgsReceived = 3, msgsProcessed = 3) - a.stop() + system.stop(a) assertRefDefaultZero(a)(registers = 1, unregisters = 1, msgsReceived = 3, msgsProcessed = 3) } @@ -298,7 +298,7 @@ abstract class ActorModelSpec extends AkkaSpec with DefaultTimeout { assertCountDown(counter, 3.seconds.dilated.toMillis, "Should process 200 messages") assertRefDefaultZero(a)(registers = 1, msgsReceived = 200, msgsProcessed = 200) - a.stop() + system.stop(a) } def spawn(f: ⇒ Unit) { @@ -328,7 +328,7 @@ abstract class ActorModelSpec extends AkkaSpec with DefaultTimeout { assertRefDefaultZero(a)(registers = 1, msgsReceived = 1, msgsProcessed = 1, suspensions = 1, resumes = 1) - a.stop() + system.stop(a) assertRefDefaultZero(a)(registers = 1, unregisters = 1, msgsReceived = 1, msgsProcessed = 1, suspensions = 1, resumes = 1) } @@ -370,7 +370,7 @@ abstract class ActorModelSpec extends AkkaSpec with DefaultTimeout { throw e } assertCountDown(stopLatch, waitTime, "Expected all children to stop") - boss.stop() + system.stop(boss) } for (run ← 1 to 3) { flood(50000) @@ -447,8 +447,8 @@ class DispatcherModelSpec extends ActorModelSpec { aStop.countDown() - a.stop - b.stop + system.stop(a) + system.stop(b) while (!a.isTerminated && !b.isTerminated) {} //Busy wait for termination @@ -484,8 +484,8 @@ class BalancingDispatcherModelSpec extends ActorModelSpec { aStop.countDown() - a.stop - b.stop + system.stop(a) + system.stop(b) while (!a.isTerminated && !b.isTerminated) {} //Busy wait for termination diff --git a/akka-actor-tests/src/test/scala/akka/actor/dispatch/BalancingDispatcherSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/dispatch/BalancingDispatcherSpec.scala index 6ebc81409e..8c7054721d 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/dispatch/BalancingDispatcherSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/dispatch/BalancingDispatcherSpec.scala @@ -74,8 +74,8 @@ class BalancingDispatcherSpec extends AkkaSpec { fast.underlying.actor.asInstanceOf[DelayableActor].invocationCount must be > sentToFast fast.underlying.actor.asInstanceOf[DelayableActor].invocationCount must be > (slow.underlying.actor.asInstanceOf[DelayableActor].invocationCount) - slow.stop() - fast.stop() + system.stop(slow) + system.stop(fast) } } } diff --git a/akka-actor-tests/src/test/scala/akka/actor/dispatch/DispatcherActorSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/dispatch/DispatcherActorSpec.scala index c6e04c6cf7..3b3ba56c37 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/dispatch/DispatcherActorSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/dispatch/DispatcherActorSpec.scala @@ -39,14 +39,14 @@ class DispatcherActorSpec extends AkkaSpec with DefaultTimeout { val actor = system.actorOf(Props[OneWayTestActor].withDispatcher(system.dispatcherFactory.newDispatcher("test").build)) val result = actor ! "OneWay" assert(OneWayTestActor.oneWay.await(1, TimeUnit.SECONDS)) - actor.stop() + system.stop(actor) } "support ask/reply" in { val actor = system.actorOf(Props[TestActor].withDispatcher(system.dispatcherFactory.newDispatcher("test").build)) val result = (actor ? "Hello").as[String] assert("World" === result.get) - actor.stop() + system.stop(actor) } "respect the throughput setting" in { @@ -72,8 +72,8 @@ class DispatcherActorSpec extends AkkaSpec with DefaultTimeout { fastOne ! "sabotage" start.countDown() latch.await(10, TimeUnit.SECONDS) - fastOne.stop() - slowOne.stop() + system.stop(fastOne) + system.stop(slowOne) assert(latch.getCount() === 0) } @@ -90,13 +90,13 @@ class DispatcherActorSpec extends AkkaSpec with DefaultTimeout { val fastOne = system.actorOf( Props(context ⇒ { - case "ping" ⇒ if (works.get) latch.countDown(); context.self.stop() + case "ping" ⇒ if (works.get) latch.countDown(); context.stop(context.self) }).withDispatcher(throughputDispatcher)) val slowOne = system.actorOf( Props(context ⇒ { case "hogexecutor" ⇒ ready.countDown(); start.await - case "ping" ⇒ works.set(false); context.self.stop() + case "ping" ⇒ works.set(false); context.stop(context.self) }).withDispatcher(throughputDispatcher)) slowOne ! "hogexecutor" diff --git a/akka-actor-tests/src/test/scala/akka/actor/dispatch/DispatcherActorsSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/dispatch/DispatcherActorsSpec.scala index 8ad5bc641d..d054d15e83 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/dispatch/DispatcherActorsSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/dispatch/DispatcherActorsSpec.scala @@ -49,8 +49,8 @@ class DispatcherActorsSpec extends AkkaSpec { assert(sFinished.getCount > 0) sFinished.await assert(sFinished.getCount === 0) - f.stop() - s.stop() + system.stop(f) + system.stop(s) } } } diff --git a/akka-actor-tests/src/test/scala/akka/actor/dispatch/PinnedActorSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/dispatch/PinnedActorSpec.scala index c4750a4691..fd26780d65 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/dispatch/PinnedActorSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/dispatch/PinnedActorSpec.scala @@ -30,14 +30,14 @@ class PinnedActorSpec extends AkkaSpec with BeforeAndAfterEach with DefaultTimeo val actor = system.actorOf(Props(self ⇒ { case "OneWay" ⇒ oneWay.countDown() }).withDispatcher(system.dispatcherFactory.newPinnedDispatcher("test"))) val result = actor ! "OneWay" assert(oneWay.await(1, TimeUnit.SECONDS)) - actor.stop() + system.stop(actor) } "support ask/reply" in { val actor = system.actorOf(Props[TestActor].withDispatcher(system.dispatcherFactory.newPinnedDispatcher("test"))) val result = (actor ? "Hello").as[String] assert("World" === result.get) - actor.stop() + system.stop(actor) } } } diff --git a/akka-actor-tests/src/test/scala/akka/actor/routing/ListenerSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/routing/ListenerSpec.scala index ab149216a7..38a57fda10 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/routing/ListenerSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/routing/ListenerSpec.scala @@ -50,7 +50,7 @@ class ListenerSpec extends AkkaSpec { fooLatch.await - for (a ← List(broadcast, a1, a2, a3)) a.stop() + for (a ← List(broadcast, a1, a2, a3)) system.stop(a) } } } diff --git a/akka-actor-tests/src/test/scala/akka/dispatch/FutureSpec.scala b/akka-actor-tests/src/test/scala/akka/dispatch/FutureSpec.scala index 44ddf4f8bc..804d3b5e62 100644 --- a/akka-actor-tests/src/test/scala/akka/dispatch/FutureSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/dispatch/FutureSpec.scala @@ -120,7 +120,7 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa val future = actor ? "Hello" future.await test(future, "World") - actor.stop() + system.stop(actor) } } "throws an exception" must { @@ -130,7 +130,7 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa val future = actor ? "Failure" future.await test(future, "Expected exception; to test fault-tolerance") - actor.stop() + system.stop(actor) } } } @@ -144,8 +144,8 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa val future = actor1 ? "Hello" flatMap { case s: String ⇒ actor2 ? s } future.await test(future, "WORLD") - actor1.stop() - actor2.stop() + system.stop(actor1) + system.stop(actor2) } } "will throw an exception" must { @@ -156,8 +156,8 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa val future = actor1 ? "Hello" flatMap { case s: String ⇒ actor2 ? s } future.await test(future, "/ by zero") - actor1.stop() - actor2.stop() + system.stop(actor1) + system.stop(actor2) } } } @@ -169,8 +169,8 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa val future = actor1 ? "Hello" flatMap { case i: Int ⇒ actor2 ? i } future.await test(future, "World (of class java.lang.String)") - actor1.stop() - actor2.stop() + system.stop(actor1) + system.stop(actor2) } } } @@ -204,7 +204,7 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa future1.get must be("10-14") assert(checkType(future1, manifest[String])) intercept[ClassCastException] { future2.get } - actor.stop() + system.stop(actor) } } @@ -233,7 +233,7 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa future1.get must be("10-14") intercept[MatchError] { future2.get } - actor.stop() + system.stop(actor) } } @@ -280,7 +280,7 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa future10.get must be("World") future11.get must be("Oops!") - actor.stop() + system.stop(actor) } } @@ -396,7 +396,7 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa val actor = system.actorOf(Props[TestActor]) actor ? "Hello" onResult { case "World" ⇒ latch.open } assert(latch.tryAwait(5, TimeUnit.SECONDS)) - actor.stop() + system.stop(actor) } "shouldTraverseFutures" in { @@ -411,7 +411,7 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa val oddFutures = List.fill(100)(oddActor ? 'GetNext mapTo manifest[Int]) assert(Future.sequence(oddFutures).get.sum === 10000) - oddActor.stop() + system.stop(oddActor) val list = (1 to 100).toList assert(Future.traverse(list)(x ⇒ Future(x * 2 - 1)).get.sum === 10000) @@ -470,7 +470,7 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa assert(r.get === "Hello World!") - actor.stop + system.stop(actor) } "futureComposingWithContinuationsFailureDivideZero" in { diff --git a/akka-actor-tests/src/test/scala/akka/event/EventBusSpec.scala b/akka-actor-tests/src/test/scala/akka/event/EventBusSpec.scala index 5923e84305..fd7fd4c1b0 100644 --- a/akka-actor-tests/src/test/scala/akka/event/EventBusSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/event/EventBusSpec.scala @@ -8,7 +8,7 @@ import org.scalatest.BeforeAndAfterEach import akka.testkit._ import akka.util.duration._ import java.util.concurrent.atomic._ -import akka.actor.{ Props, Actor, ActorRef } +import akka.actor.{ Props, Actor, ActorRef, ActorSystem } import java.util.Comparator import akka.japi.{ Procedure, Function } @@ -33,7 +33,7 @@ abstract class EventBusSpec(busName: String) extends AkkaSpec with BeforeAndAfte def classifierFor(event: BusType#Event): BusType#Classifier - def disposeSubscriber(subscriber: BusType#Subscriber): Unit + def disposeSubscriber(system: ActorSystem, subscriber: BusType#Subscriber): Unit busName must { @@ -58,7 +58,7 @@ abstract class EventBusSpec(busName: String) extends AkkaSpec with BeforeAndAfte "not allow to unsubscribe non-existing subscriber" in { val sub = createNewSubscriber() bus.unsubscribe(sub, classifier) must be === false - disposeSubscriber(sub) + disposeSubscriber(system, sub) } "not allow for the same subscriber to subscribe to the same channel twice" in { @@ -80,7 +80,7 @@ abstract class EventBusSpec(busName: String) extends AkkaSpec with BeforeAndAfte subscribers.zip(classifiers) forall { case (s, c) ⇒ bus.subscribe(s, c) } must be === true subscribers.zip(classifiers) forall { case (s, c) ⇒ bus.unsubscribe(s, c) } must be === true - subscribers foreach disposeSubscriber + subscribers foreach (disposeSubscriber(system, _)) } "publishing events without any subscribers shouldn't be a problem" in { @@ -113,7 +113,7 @@ abstract class EventBusSpec(busName: String) extends AkkaSpec with BeforeAndAfte subscribers foreach { s ⇒ bus.subscribe(s, classifier) must be === true } bus.publish(event) range foreach { _ ⇒ expectMsg(event) } - subscribers foreach { s ⇒ bus.unsubscribe(s, classifier) must be === true; disposeSubscriber(s) } + subscribers foreach { s ⇒ bus.unsubscribe(s, classifier) must be === true; disposeSubscriber(system, s) } } "not publish the given event to any other subscribers than the intended ones" in { @@ -136,7 +136,7 @@ abstract class EventBusSpec(busName: String) extends AkkaSpec with BeforeAndAfte } "cleanup subscriber" in { - disposeSubscriber(subscriber) + disposeSubscriber(system, subscriber) } } } @@ -165,7 +165,7 @@ class ActorEventBusSpec extends EventBusSpec("ActorEventBus") { def classifierFor(event: BusType#Event) = event.toString - def disposeSubscriber(subscriber: BusType#Subscriber): Unit = subscriber.stop() + def disposeSubscriber(system: ActorSystem, subscriber: BusType#Subscriber): Unit = system.stop(subscriber) } object ScanningEventBusSpec { @@ -194,7 +194,7 @@ class ScanningEventBusSpec extends EventBusSpec("ScanningEventBus") { def classifierFor(event: BusType#Event) = event.toString - def disposeSubscriber(subscriber: BusType#Subscriber): Unit = () + def disposeSubscriber(system: ActorSystem, subscriber: BusType#Subscriber): Unit = () } object LookupEventBusSpec { @@ -219,5 +219,5 @@ class LookupEventBusSpec extends EventBusSpec("LookupEventBus") { def classifierFor(event: BusType#Event) = event.toString - def disposeSubscriber(subscriber: BusType#Subscriber): Unit = () + def disposeSubscriber(system: ActorSystem, subscriber: BusType#Subscriber): Unit = () } diff --git a/akka-actor-tests/src/test/scala/akka/event/LoggingReceiveSpec.scala b/akka-actor-tests/src/test/scala/akka/event/LoggingReceiveSpec.scala index 6427997b78..77a815f455 100644 --- a/akka-actor-tests/src/test/scala/akka/event/LoggingReceiveSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/event/LoggingReceiveSpec.scala @@ -52,9 +52,9 @@ class LoggingReceiveSpec extends WordSpec with BeforeAndAfterEach with BeforeAnd } override def afterAll { - appLogging.stop() - appAuto.stop() - appLifecycle.stop() + appLogging.shutdown() + appAuto.shutdown() + appLifecycle.shutdown() } "A LoggingReceive" must { @@ -201,7 +201,7 @@ class LoggingReceiveSpec extends WordSpec with BeforeAndAfterEach with BeforeAnd assert(set == Set(1, 2, 3), set + " was not Set(1, 2, 3)") } - supervisor.stop() + system.stop(supervisor) expectMsg(Logging.Debug(sname, "stopping")) expectMsg(Logging.Debug(aname, "stopped")) expectMsg(Logging.Debug(sname, "stopped")) diff --git a/akka-actor-tests/src/test/scala/akka/performance/microbench/TellLatencyPerformanceSpec.scala b/akka-actor-tests/src/test/scala/akka/performance/microbench/TellLatencyPerformanceSpec.scala index c1de7702e3..ace20bb662 100644 --- a/akka-actor-tests/src/test/scala/akka/performance/microbench/TellLatencyPerformanceSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/performance/microbench/TellLatencyPerformanceSpec.scala @@ -75,7 +75,7 @@ class TellLatencyPerformanceSpec extends PerformanceSpec { ok must be(true) logMeasurement(numberOfClients, durationNs, stat) } - clients.foreach(_.stop()) + clients.foreach(system.stop(_)) } } diff --git a/akka-actor-tests/src/test/scala/akka/performance/microbench/TellThroughput10000PerformanceSpec.scala b/akka-actor-tests/src/test/scala/akka/performance/microbench/TellThroughput10000PerformanceSpec.scala index 29109f8472..4541c093ca 100644 --- a/akka-actor-tests/src/test/scala/akka/performance/microbench/TellThroughput10000PerformanceSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/performance/microbench/TellThroughput10000PerformanceSpec.scala @@ -173,8 +173,8 @@ class TellThroughput10000PerformanceSpec extends PerformanceSpec { ok must be(true) logMeasurement(numberOfClients, durationNs, repeat) } - clients.foreach(_.stop()) - destinations.foreach(_.stop()) + clients.foreach(system.stop(_)) + destinations.foreach(system.stop(_)) } } diff --git a/akka-actor-tests/src/test/scala/akka/performance/microbench/TellThroughputComputationPerformanceSpec.scala b/akka-actor-tests/src/test/scala/akka/performance/microbench/TellThroughputComputationPerformanceSpec.scala index 6a20f982dd..d9f6988231 100644 --- a/akka-actor-tests/src/test/scala/akka/performance/microbench/TellThroughputComputationPerformanceSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/performance/microbench/TellThroughputComputationPerformanceSpec.scala @@ -147,8 +147,8 @@ class TellThroughputComputationPerformanceSpec extends PerformanceSpec { ok must be(true) logMeasurement(numberOfClients, durationNs, repeat) } - clients.foreach(_.stop()) - destinations.foreach(_.stop()) + clients.foreach(system.stop(_)) + destinations.foreach(system.stop(_)) } } diff --git a/akka-actor-tests/src/test/scala/akka/performance/microbench/TellThroughputPerformanceSpec.scala b/akka-actor-tests/src/test/scala/akka/performance/microbench/TellThroughputPerformanceSpec.scala index 111cc8fc6a..a1c9d1c271 100644 --- a/akka-actor-tests/src/test/scala/akka/performance/microbench/TellThroughputPerformanceSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/performance/microbench/TellThroughputPerformanceSpec.scala @@ -78,8 +78,8 @@ class TellThroughputPerformanceSpec extends PerformanceSpec { ok must be(true) logMeasurement(numberOfClients, durationNs, repeat) } - clients.foreach(_.stop()) - destinations.foreach(_.stop()) + clients.foreach(system.stop(_)) + destinations.foreach(system.stop(_)) } } diff --git a/akka-actor-tests/src/test/scala/akka/performance/microbench/TellThroughputSeparateDispatchersPerformanceSpec.scala b/akka-actor-tests/src/test/scala/akka/performance/microbench/TellThroughputSeparateDispatchersPerformanceSpec.scala index ca471b2222..41a969badc 100644 --- a/akka-actor-tests/src/test/scala/akka/performance/microbench/TellThroughputSeparateDispatchersPerformanceSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/performance/microbench/TellThroughputSeparateDispatchersPerformanceSpec.scala @@ -159,8 +159,8 @@ class TellThroughputSeparateDispatchersPerformanceSpec extends PerformanceSpec { ok must be(true) logMeasurement(numberOfClients, durationNs, repeat) } - clients.foreach(_.stop()) - destinations.foreach(_.stop()) + clients.foreach(system.stop(_)) + destinations.foreach(system.stop(_)) } } diff --git a/akka-actor-tests/src/test/scala/akka/performance/trading/system/TradingLatencyPerformanceSpec.scala b/akka-actor-tests/src/test/scala/akka/performance/trading/system/TradingLatencyPerformanceSpec.scala index 6470f6c0ba..f86987270a 100644 --- a/akka-actor-tests/src/test/scala/akka/performance/trading/system/TradingLatencyPerformanceSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/performance/trading/system/TradingLatencyPerformanceSpec.scala @@ -108,7 +108,7 @@ class TradingLatencyPerformanceSpec extends PerformanceSpec { } logMeasurement(numberOfClients, durationNs, stat) } - clients.foreach(_.stop()) + clients.foreach(system.stop(_)) } } diff --git a/akka-actor-tests/src/test/scala/akka/performance/trading/system/TradingThroughputPerformanceSpec.scala b/akka-actor-tests/src/test/scala/akka/performance/trading/system/TradingThroughputPerformanceSpec.scala index aca85b8d3d..88a9ce21a0 100644 --- a/akka-actor-tests/src/test/scala/akka/performance/trading/system/TradingThroughputPerformanceSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/performance/trading/system/TradingThroughputPerformanceSpec.scala @@ -105,7 +105,7 @@ class TradingThroughputPerformanceSpec extends PerformanceSpec { } logMeasurement(numberOfClients, durationNs, totalNumberOfOrders) } - clients.foreach(_.stop()) + clients.foreach(system.stop(_)) } } diff --git a/akka-actor-tests/src/test/scala/akka/routing/ActorPoolSpec.scala b/akka-actor-tests/src/test/scala/akka/routing/ActorPoolSpec.scala index 1893732686..9ef69ab028 100644 --- a/akka-actor-tests/src/test/scala/akka/routing/ActorPoolSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/routing/ActorPoolSpec.scala @@ -99,7 +99,7 @@ class ActorPoolSpec extends AkkaSpec with DefaultTimeout { (pool ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2) - pool.stop() + system.stop(pool) } "pass ticket #705" in { @@ -129,7 +129,7 @@ class ActorPoolSpec extends AkkaSpec with DefaultTimeout { _.await.resultOrException.get must be("Response") } } finally { - pool.stop() + system.stop(pool) } } @@ -194,7 +194,7 @@ class ActorPoolSpec extends AkkaSpec with DefaultTimeout { (pool ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(4) - pool.stop() + system.stop(pool) } "grow as needed under mailbox pressure" in { @@ -250,7 +250,7 @@ class ActorPoolSpec extends AkkaSpec with DefaultTimeout { (pool ? ActorPool.Stat).as[ActorPool.Stats].get.size must be >= (3) - pool.stop() + system.stop(pool) } "round robin" in { @@ -281,7 +281,7 @@ class ActorPoolSpec extends AkkaSpec with DefaultTimeout { latch1.await delegates.size must be(1) - pool1.stop() + system.stop(pool1) val latch2 = TestLatch(2) delegates.clear() @@ -309,7 +309,7 @@ class ActorPoolSpec extends AkkaSpec with DefaultTimeout { latch2.await delegates.size must be(2) - pool2.stop() + system.stop(pool2) } "backoff" in { @@ -355,7 +355,7 @@ class ActorPoolSpec extends AkkaSpec with DefaultTimeout { (pool ? ActorPool.Stat).as[ActorPool.Stats].get.size must be <= (z) - pool.stop() + system.stop(pool) } } } diff --git a/akka-actor-tests/src/test/scala/akka/routing/ConfiguredLocalRoutingSpec.scala b/akka-actor-tests/src/test/scala/akka/routing/ConfiguredLocalRoutingSpec.scala index 0b6cdae645..ff72daa101 100644 --- a/akka-actor-tests/src/test/scala/akka/routing/ConfiguredLocalRoutingSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/routing/ConfiguredLocalRoutingSpec.scala @@ -49,7 +49,7 @@ class ConfiguredLocalRoutingSpec extends AkkaSpec with DefaultTimeout with Impli actor ! "hello" helloLatch.await(5, TimeUnit.SECONDS) must be(true) - actor.stop() + system.stop(actor) stopLatch.await(5, TimeUnit.SECONDS) must be(true) } @@ -104,7 +104,7 @@ class ConfiguredLocalRoutingSpec extends AkkaSpec with DefaultTimeout with Impli actor ! Broadcast("hello") helloLatch.await(5, TimeUnit.SECONDS) must be(true) - actor.stop() + system.stop(actor) stopLatch.await(5, TimeUnit.SECONDS) must be(true) } } @@ -134,7 +134,7 @@ class ConfiguredLocalRoutingSpec extends AkkaSpec with DefaultTimeout with Impli for (i ← 1 to 5) expectMsg("world") } - actor.stop() + system.stop(actor) stopLatch.await(5, TimeUnit.SECONDS) must be(true) } @@ -190,7 +190,7 @@ class ConfiguredLocalRoutingSpec extends AkkaSpec with DefaultTimeout with Impli actor ! Broadcast("hello") helloLatch.await(5, TimeUnit.SECONDS) must be(true) - actor.stop() + system.stop(actor) stopLatch.await(5, TimeUnit.SECONDS) must be(true) } } diff --git a/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala b/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala index e6e0f1c898..ea8b5a6e05 100644 --- a/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala @@ -43,7 +43,7 @@ class RoutingSpec extends AkkaSpec with DefaultTimeout with ImplicitSender { val c1, c2 = expectMsgType[ActorRef] watch(router) watch(c2) - c2.stop() + system.stop(c2) expectMsg(Terminated(c2)) // it might take a while until the Router has actually processed the Terminated message awaitCond { @@ -54,7 +54,7 @@ class RoutingSpec extends AkkaSpec with DefaultTimeout with ImplicitSender { } res == Seq(c1, c1) } - c1.stop() + system.stop(c1) expectMsg(Terminated(router)) } @@ -324,8 +324,8 @@ class RoutingSpec extends AkkaSpec with DefaultTimeout with ImplicitSender { def newActor(id: Int, shudownLatch: Option[TestLatch] = None) = system.actorOf(Props(new Actor { def receive = { - case Stop(None) ⇒ self.stop() - case Stop(Some(_id)) if (_id == id) ⇒ self.stop() + case Stop(None) ⇒ context.stop(self) + case Stop(Some(_id)) if (_id == id) ⇒ context.stop(self) case _id: Int if (_id == id) ⇒ case x ⇒ { Thread sleep 100 * id diff --git a/akka-actor-tests/src/test/scala/akka/serialization/SerializeSpec.scala b/akka-actor-tests/src/test/scala/akka/serialization/SerializeSpec.scala index 3b0b6ea5bc..cceb608452 100644 --- a/akka-actor-tests/src/test/scala/akka/serialization/SerializeSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/serialization/SerializeSpec.scala @@ -106,7 +106,7 @@ class SerializeSpec extends AkkaSpec(SerializeSpec.serializationConf) { })) a ! new ObjectOutputStream(new ByteArrayOutputStream()) expectMsg("pass") - a.stop() + system.stop(a) } "serialize DeadLetterActorRef" in { @@ -124,7 +124,7 @@ class SerializeSpec extends AkkaSpec(SerializeSpec.serializationConf) { (deadLetters eq a.deadLetters) must be(true) } } finally { - a.stop() + a.shutdown() } } } diff --git a/akka-actor/src/main/resources/reference.conf b/akka-actor/src/main/resources/reference.conf index f31e61bcbe..b9357227cf 100644 --- a/akka-actor/src/main/resources/reference.conf +++ b/akka-actor/src/main/resources/reference.conf @@ -37,6 +37,7 @@ akka { actor { provider = "akka.actor.LocalActorRefProvider" creation-timeout = 20s # Timeout for ActorSystem.actorOf + reaper-interval = 5s # frequency with which stopping actors are prodded in case they had to be removed from their parents timeout = 5s # Default timeout for Future based invocations # - Actor: ask && ? # - UntypedActor: ask diff --git a/akka-actor/src/main/scala/akka/actor/Actor.scala b/akka-actor/src/main/scala/akka/actor/Actor.scala index ffb941408a..72c4ecabc3 100644 --- a/akka-actor/src/main/scala/akka/actor/Actor.scala +++ b/akka-actor/src/main/scala/akka/actor/Actor.scala @@ -156,15 +156,44 @@ object Actor { /** * Actor base trait that should be extended by or mixed to create an Actor with the semantics of the 'Actor Model': * http://en.wikipedia.org/wiki/Actor_model - *

- * An actor has a well-defined (non-cyclic) life-cycle. - *

- * => RUNNING (created and started actor) - can receive messages
- * => SHUTDOWN (when 'stop' or 'exit' is invoked) - can't do anything
- * 
* - *

- * The Actor's own ActorRef is available in the 'self' member variable. + * An actor has a well-defined (non-cyclic) life-cycle. + * - ''RUNNING'' (created and started actor) - can receive messages + * - ''SHUTDOWN'' (when 'stop' or 'exit' is invoked) - can't do anything + * + * The Actor's own [[akka.actor.ActorRef]] is available as `self`, the current + * message’s sender as `sender` and the [[akka.actor.ActorContext]] as + * `context`. The only abstract method is `receive` which shall return the + * initial behavior of the actor as a partial function (behavior can be changed + * using `context.become` and `context.unbecome`). + * + * {{{ + * class ExampleActor extends Actor { + * def receive = { + * // directly calculated reply + * case Request(r) => sender ! calculate(r) + * + * // just to demonstrate how to stop yourself + * case Shutdown => context.stop(self) + * + * // error kernel with child replying directly to “customer” + * case Dangerous(r) => context.actorOf(Props[ReplyToOriginWorker]).tell(PerformWork(r), sender) + * + * // error kernel with reply going through us + * case OtherJob(r) => context.actorOf(Props[ReplyToMeWorker]) ! JobRequest(r, sender) + * case JobReply(result, orig_s) => orig_s ! result + * } + * } + * }}} + * + * The last line demonstrates the essence of the error kernel design: spawn + * one-off actors which terminate after doing their job, pass on `sender` to + * allow direct reply if that is what makes sense, or round-trip the sender + * as shown with the fictitious JobRequest/JobReply message pair. + * + * If you don’t like writing `context` you can always `import context._` to get + * direct access to `actorOf`, `stop` etc. This is not default in order to keep + * the name-space clean. */ trait Actor { @@ -218,25 +247,8 @@ trait Actor { final def sender: ActorRef = context.sender /** - * User overridable callback/setting. - *

- * Partial function implementing the actor logic. - * To be implemented by concrete actor class. - *

- * Example code: - *

-   *   def receive = {
-   *     case Ping =>
-   *       println("got a 'Ping' message")
-   *       sender ! "pong"
-   *
-   *     case OneWay =>
-   *       println("got a 'OneWay' message")
-   *
-   *     case unknown =>
-   *       println("unknown message: " + unknown)
-   * }
-   * 
+ * This defines the initial actor behavior, it must return a partial function + * with the actor logic. */ protected def receive: Receive @@ -258,19 +270,20 @@ trait Actor { def postStop() {} /** - * User overridable callback. + * User overridable callback: '''By default it disposes of all children and then calls `postStop()`.''' *

* Is called on a crashed Actor right BEFORE it is restarted to allow clean * up of resources before Actor is terminated. - * By default it calls postStop() */ - def preRestart(reason: Throwable, message: Option[Any]) { postStop() } + def preRestart(reason: Throwable, message: Option[Any]) { + context.children foreach (context.stop(_)) + postStop() + } /** - * User overridable callback. + * User overridable callback: By default it calls `preStart()`. *

* Is called right AFTER restart on the newly created Actor to allow reinitialization after an Actor crash. - * By default it calls preStart() */ def postRestart(reason: Throwable) { preStart() } @@ -278,7 +291,9 @@ trait Actor { * User overridable callback. *

* Is called when a message isn't handled by the current behavior of the actor - * by default it does: EventHandler.warning(self, message) + * by default it fails with either a [[akka.actor.DeathPactException]] (in + * case of an unhandled [[akka.actor.Terminated]] message) or a + * [[akka.actor.UnhandledMessageException]]. */ def unhandled(message: Any) { message match { diff --git a/akka-actor/src/main/scala/akka/actor/ActorCell.scala b/akka-actor/src/main/scala/akka/actor/ActorCell.scala index c4053081cd..be6bf2d1f4 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorCell.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorCell.scala @@ -185,7 +185,7 @@ private[akka] class ActorCell( val system: ActorSystemImpl, val self: InternalActorRef, val props: Props, - val parent: InternalActorRef, + @volatile var parent: InternalActorRef, /*no member*/ _receiveTimeout: Option[Duration], var hotswap: Stack[PartialFunction[Any, Unit]]) extends UntypedActorContext { @@ -242,6 +242,16 @@ private[akka] class ActorCell( _actorOf(props, name) } + final def stop(actor: ActorRef): Unit = { + val a = actor.asInstanceOf[InternalActorRef] + if (childrenRefs contains actor.path.name) { + system.locker ! a + childrenRefs -= actor.path.name + handleChildTerminated(actor) + } + a.stop() + } + final var currentMessage: Envelope = null final var actor: Actor = _ @@ -405,7 +415,8 @@ private[akka] class ActorCell( // do not process normal messages while waiting for all children to terminate dispatcher suspend this if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self.path.toString, "stopping")) - for (child ← c) child.stop() + // do not use stop(child) because that would dissociate the children from us, but we still want to wait for them + for (child ← c) child.asInstanceOf[InternalActorRef].stop() stopping = true } } @@ -550,15 +561,17 @@ private[akka] class ActorCell( } final def handleFailure(child: ActorRef, cause: Throwable): Unit = childrenRefs.get(child.path.name) match { - case Some(stats) if stats.child == child ⇒ if (!props.faultHandler.handleFailure(child, cause, stats, childrenRefs.values)) throw cause + case Some(stats) if stats.child == child ⇒ if (!props.faultHandler.handleFailure(this, child, cause, stats, childrenRefs.values)) throw cause case Some(stats) ⇒ system.eventStream.publish(Warning(self.path.toString, "dropping Failed(" + cause + ") from unknown child " + child + " matching names but not the same, was: " + stats.child)) case None ⇒ system.eventStream.publish(Warning(self.path.toString, "dropping Failed(" + cause + ") from unknown child " + child)) } final def handleChildTerminated(child: ActorRef): Unit = { - childrenRefs -= child.path.name - props.faultHandler.handleChildTerminated(child, children) - if (stopping && childrenRefs.isEmpty) doTerminate() + if (childrenRefs contains child.path.name) { + childrenRefs -= child.path.name + props.faultHandler.handleChildTerminated(this, child, children) + if (stopping && childrenRefs.isEmpty) doTerminate() + } else system.locker ! ChildTerminated(child) } // ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅ diff --git a/akka-actor/src/main/scala/akka/actor/ActorRef.scala b/akka-actor/src/main/scala/akka/actor/ActorRef.scala index 3fc3574e75..f2b8afa1e2 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRef.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRef.scala @@ -110,11 +110,6 @@ abstract class ActorRef extends java.lang.Comparable[ActorRef] with Serializable */ def forward(message: Any)(implicit context: ActorContext) = tell(message, context.sender) - /** - * Shuts down the actor its dispatcher and message queue. - */ - def stop(): Unit - /** * Is the actor shut down? */ @@ -192,6 +187,7 @@ private[akka] abstract class InternalActorRef extends ActorRef with ScalaActorRe def resume(): Unit def suspend(): Unit def restart(cause: Throwable): Unit + def stop(): Unit def sendSystemMessage(message: SystemMessage): Unit def getParent: InternalActorRef /** diff --git a/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala b/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala index 74762f170b..a07b14ac43 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala @@ -234,6 +234,18 @@ trait ActorRefFactory { * replies in order to resolve the matching set of actors. */ def actorSelection(path: String): ActorSelection = ActorSelection(lookupRoot, path) + + /** + * Stop the actor pointed to by the given [[akka.actor.ActorRef]]; this is + * an asynchronous operation, i.e. involves a message send, but if invoked + * on an [[akka.actor.ActorContext]] if operating on a child of that + * context it will free up the name for immediate reuse. + * + * When invoked on [[akka.actor.ActorSystem]] for a top-level actor, this + * method sends a message to the guardian actor and blocks waiting for a reply, + * see `akka.actor.creation-timeout` in the `reference.conf`. + */ + def stop(actor: ActorRef): Unit } class ActorRefProviderException(message: String) extends AkkaException(message) @@ -248,6 +260,11 @@ private[akka] case class CreateChild(props: Props, name: String) */ private[akka] case class CreateRandomNameChild(props: Props) +/** + * Internal Akka use only, used in implementation of system.stop(child). + */ +private[akka] case class StopChild(child: ActorRef) + /** * Local ActorRef provider. */ @@ -309,7 +326,7 @@ class LocalActorRefProvider( override def isTerminated = stopped.isOn override def !(message: Any)(implicit sender: ActorRef = null): Unit = stopped.ifOff(message match { - case Failed(ex) if sender ne null ⇒ causeOfTermination = Some(ex); sender.stop() + case Failed(ex) if sender ne null ⇒ causeOfTermination = Some(ex); sender.asInstanceOf[InternalActorRef].stop() case _ ⇒ log.error(this + " received unexpected message [" + message + "]") }) @@ -329,9 +346,10 @@ class LocalActorRefProvider( */ private class Guardian extends Actor { def receive = { - case Terminated(_) ⇒ context.self.stop() + case Terminated(_) ⇒ context.stop(self) case CreateChild(child, name) ⇒ sender ! (try context.actorOf(child, name) catch { case e: Exception ⇒ e }) case CreateRandomNameChild(child) ⇒ sender ! (try context.actorOf(child) catch { case e: Exception ⇒ e }) + case StopChild(child) ⇒ context.stop(child); sender ! "ok" case m ⇒ deadLetters ! DeadLetter(m, sender, self) } } @@ -345,9 +363,10 @@ class LocalActorRefProvider( def receive = { case Terminated(_) ⇒ eventStream.stopDefaultLoggers() - context.self.stop() + context.stop(self) case CreateChild(child, name) ⇒ sender ! (try context.actorOf(child, name) catch { case e: Exception ⇒ e }) case CreateRandomNameChild(child) ⇒ sender ! (try context.actorOf(child) catch { case e: Exception ⇒ e }) + case StopChild(child) ⇒ context.stop(child); sender ! "ok" case m ⇒ deadLetters ! DeadLetter(m, sender, self) } } @@ -508,6 +527,9 @@ class DefaultScheduler(hashedWheelTimer: HashedWheelTimer, log: LoggingAdapter, def schedule(initialDelay: Duration, delay: Duration)(f: ⇒ Unit): Cancellable = new DefaultCancellable(hashedWheelTimer.newTimeout(createContinuousTask(delay, f), initialDelay)) + def schedule(initialDelay: Duration, delay: Duration, runnable: Runnable): Cancellable = + new DefaultCancellable(hashedWheelTimer.newTimeout(createContinuousTask(delay, runnable), initialDelay)) + def scheduleOnce(delay: Duration, runnable: Runnable): Cancellable = new DefaultCancellable(hashedWheelTimer.newTimeout(createSingleTask(runnable), delay)) @@ -565,6 +587,17 @@ class DefaultScheduler(hashedWheelTimer: HashedWheelTimer, log: LoggingAdapter, } } + private def createContinuousTask(delay: Duration, runnable: Runnable): TimerTask = { + new TimerTask { + def run(timeout: org.jboss.netty.akka.util.Timeout) { + dispatcher.dispatchTask(() ⇒ runnable.run()) + try timeout.getTimer.newTimeout(this, delay) catch { + case _: IllegalStateException ⇒ // stop recurring if timer is stopped + } + } + } + } + private def execDirectly(t: HWTimeout): Unit = { try t.getTask.run(t) catch { case e: InterruptedException ⇒ throw e diff --git a/akka-actor/src/main/scala/akka/actor/ActorSystem.scala b/akka-actor/src/main/scala/akka/actor/ActorSystem.scala index af0ec81d7b..0ce6b4d529 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorSystem.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorSystem.scala @@ -73,6 +73,7 @@ object ActorSystem { val ProviderClass = getString("akka.actor.provider") val CreationTimeout = Timeout(Duration(getMilliseconds("akka.actor.creation-timeout"), MILLISECONDS)) + val ReaperInterval = Duration(getMilliseconds("akka.actor.reaper-interval"), MILLISECONDS) val ActorTimeout = Timeout(Duration(getMilliseconds("akka.actor.timeout"), MILLISECONDS)) val SerializeAllMessages = getBoolean("akka.actor.serialize-messages") @@ -300,7 +301,7 @@ abstract class ActorSystem extends ActorRefFactory { * (below which the logging actors reside) and the execute all registered * termination handlers (see [[ActorSystem.registerOnTermination]]). */ - def stop() + def shutdown() /** * Registers the provided extension and creates its payload, if this extension isn't already registered @@ -361,6 +362,18 @@ class ActorSystemImpl(val name: String, applicationConfig: Config) extends Actor } } + def stop(actor: ActorRef): Unit = { + implicit val timeout = settings.CreationTimeout + val path = actor.path + val guard = guardian.path + val sys = systemGuardian.path + path.parent match { + case `guard` ⇒ (guardian ? StopChild(actor)).get + case `sys` ⇒ (systemGuardian ? StopChild(actor)).get + case _ ⇒ actor.asInstanceOf[InternalActorRef].stop() + } + } + import settings._ // this provides basic logging (to stdout) until .start() is called below @@ -428,13 +441,15 @@ class ActorSystemImpl(val name: String, applicationConfig: Config) extends Actor this } + lazy val locker: Locker = new Locker(scheduler, ReaperInterval, lookupRoot.path / "locker", deathWatch) + def start() = _start def registerOnTermination[T](code: ⇒ T) { terminationFuture onComplete (_ ⇒ code) } def registerOnTermination(code: Runnable) { terminationFuture onComplete (_ ⇒ code.run) } - def stop() { - guardian.stop() + def shutdown() { + stop(guardian) } /** diff --git a/akka-actor/src/main/scala/akka/actor/FSM.scala b/akka-actor/src/main/scala/akka/actor/FSM.scala index ce7e7f8318..1f8f9cba70 100644 --- a/akka-actor/src/main/scala/akka/actor/FSM.scala +++ b/akka-actor/src/main/scala/akka/actor/FSM.scala @@ -511,7 +511,7 @@ trait FSM[S, D] extends ListenerManagement { case _ ⇒ nextState.replies.reverse foreach { r ⇒ sender ! r } terminate(nextState) - self.stop() + context.stop(self) } } diff --git a/akka-actor/src/main/scala/akka/actor/FaultHandling.scala b/akka-actor/src/main/scala/akka/actor/FaultHandling.scala index 87e65002fe..e4e2ee856a 100644 --- a/akka-actor/src/main/scala/akka/actor/FaultHandling.scala +++ b/akka-actor/src/main/scala/akka/actor/FaultHandling.scala @@ -119,12 +119,12 @@ abstract class FaultHandlingStrategy { /** * This method is called after the child has been removed from the set of children. */ - def handleChildTerminated(child: ActorRef, children: Iterable[ActorRef]): Unit + def handleChildTerminated(context: ActorContext, child: ActorRef, children: Iterable[ActorRef]): Unit /** * This method is called to act on the failure of a child: restart if the flag is true, stop otherwise. */ - def processFailure(restart: Boolean, child: ActorRef, cause: Throwable, stats: ChildRestartStats, children: Iterable[ChildRestartStats]): Unit + def processFailure(context: ActorContext, restart: Boolean, child: ActorRef, cause: Throwable, stats: ChildRestartStats, children: Iterable[ChildRestartStats]): Unit def handleSupervisorFailing(supervisor: ActorRef, children: Iterable[ActorRef]): Unit = { if (children.nonEmpty) @@ -139,12 +139,12 @@ abstract class FaultHandlingStrategy { /** * Returns whether it processed the failure or not */ - def handleFailure(child: ActorRef, cause: Throwable, stats: ChildRestartStats, children: Iterable[ChildRestartStats]): Boolean = { + def handleFailure(context: ActorContext, child: ActorRef, cause: Throwable, stats: ChildRestartStats, children: Iterable[ChildRestartStats]): Boolean = { val action = if (decider.isDefinedAt(cause)) decider(cause) else Escalate action match { case Resume ⇒ child.asInstanceOf[InternalActorRef].resume(); true - case Restart ⇒ processFailure(true, child, cause, stats, children); true - case Stop ⇒ processFailure(false, child, cause, stats, children); true + case Restart ⇒ processFailure(context, true, child, cause, stats, children); true + case Stop ⇒ processFailure(context, false, child, cause, stats, children); true case Escalate ⇒ false } } @@ -192,17 +192,17 @@ case class AllForOneStrategy(decider: FaultHandlingStrategy.Decider, */ val retriesWindow = (maxNrOfRetries, withinTimeRange) - def handleChildTerminated(child: ActorRef, children: Iterable[ActorRef]): Unit = { - children foreach (_.stop()) + def handleChildTerminated(context: ActorContext, child: ActorRef, children: Iterable[ActorRef]): Unit = { + children foreach (context.stop(_)) //TODO optimization to drop all children here already? } - def processFailure(restart: Boolean, child: ActorRef, cause: Throwable, stats: ChildRestartStats, children: Iterable[ChildRestartStats]): Unit = { + def processFailure(context: ActorContext, restart: Boolean, child: ActorRef, cause: Throwable, stats: ChildRestartStats, children: Iterable[ChildRestartStats]): Unit = { if (children.nonEmpty) { if (restart && children.forall(_.requestRestartPermission(retriesWindow))) children.foreach(_.child.asInstanceOf[InternalActorRef].restart(cause)) else - children.foreach(_.child.stop()) + for (c ← children) context.stop(c.child) } } } @@ -249,13 +249,13 @@ case class OneForOneStrategy(decider: FaultHandlingStrategy.Decider, */ val retriesWindow = (maxNrOfRetries, withinTimeRange) - def handleChildTerminated(child: ActorRef, children: Iterable[ActorRef]): Unit = {} + def handleChildTerminated(context: ActorContext, child: ActorRef, children: Iterable[ActorRef]): Unit = {} - def processFailure(restart: Boolean, child: ActorRef, cause: Throwable, stats: ChildRestartStats, children: Iterable[ChildRestartStats]): Unit = { + def processFailure(context: ActorContext, restart: Boolean, child: ActorRef, cause: Throwable, stats: ChildRestartStats, children: Iterable[ChildRestartStats]): Unit = { if (restart && stats.requestRestartPermission(retriesWindow)) child.asInstanceOf[InternalActorRef].restart(cause) else - child.stop() //TODO optimization to drop child here already? + context.stop(child) //TODO optimization to drop child here already? } } diff --git a/akka-actor/src/main/scala/akka/actor/Locker.scala b/akka-actor/src/main/scala/akka/actor/Locker.scala new file mode 100644 index 0000000000..8bbcdd15e6 --- /dev/null +++ b/akka-actor/src/main/scala/akka/actor/Locker.scala @@ -0,0 +1,50 @@ +/** + * Copyright (C) 2009-2011 Typesafe Inc. + */ +package akka.actor + +import akka.dispatch._ +import akka.util.Duration +import akka.util.duration._ +import java.util.concurrent.ConcurrentHashMap +import akka.event.DeathWatch + +class Locker(scheduler: Scheduler, period: Duration, val path: ActorPath, val deathWatch: DeathWatch) extends MinimalActorRef { + + class DavyJones extends Runnable { + def run = { + val iter = heap.entrySet.iterator + while (iter.hasNext) { + val soul = iter.next() + deathWatch.subscribe(Locker.this, soul.getKey) // in case Terminated got lost somewhere + soul.getKey match { + case _: LocalActorRef ⇒ // nothing to do, they know what they signed up for + case nonlocal ⇒ nonlocal.stop() // try again in case it was due to a communications failure + } + } + } + } + + private val heap = new ConcurrentHashMap[InternalActorRef, Long] + + scheduler.schedule(period, period, new DavyJones) + + override def sendSystemMessage(msg: SystemMessage): Unit = this.!(msg) + + override def !(msg: Any)(implicit sender: ActorRef = null): Unit = msg match { + case Terminated(soul) ⇒ heap.remove(soul) + case ChildTerminated(soul) ⇒ heap.remove(soul) + case soul: InternalActorRef ⇒ + heap.put(soul, 0l) // wanted to put System.nanoTime and do something intelligent, but forgot what that was + deathWatch.subscribe(this, soul) + // now re-bind the soul so that it does not drown its parent + soul match { + case local: LocalActorRef ⇒ + val cell = local.underlying + cell.parent = this + case _ ⇒ + } + case _ ⇒ // ignore + } + +} diff --git a/akka-actor/src/main/scala/akka/actor/Scheduler.scala b/akka-actor/src/main/scala/akka/actor/Scheduler.scala index 7075ee0a8a..e1d502f5b4 100644 --- a/akka-actor/src/main/scala/akka/actor/Scheduler.scala +++ b/akka-actor/src/main/scala/akka/actor/Scheduler.scala @@ -42,6 +42,15 @@ trait Scheduler { */ def schedule(initialDelay: Duration, frequency: Duration)(f: ⇒ Unit): Cancellable + /** + * Schedules a function to be run repeatedly with an initial delay and a frequency. + * E.g. if you would like the function to be run after 2 seconds and thereafter every 100ms you would set + * delay = Duration(2, TimeUnit.SECONDS) and frequency = Duration(100, TimeUnit.MILLISECONDS) + * + * Java API + */ + def schedule(initialDelay: Duration, frequency: Duration, runnable: Runnable): Cancellable + /** * Schedules a Runnable to be run once with a delay, i.e. a time period that has to pass before the runnable is executed. * diff --git a/akka-actor/src/main/scala/akka/actor/TypedActor.scala b/akka-actor/src/main/scala/akka/actor/TypedActor.scala index e16d027fd8..a488c967d9 100644 --- a/akka-actor/src/main/scala/akka/actor/TypedActor.scala +++ b/akka-actor/src/main/scala/akka/actor/TypedActor.scala @@ -24,7 +24,7 @@ trait TypedActorFactory { */ def stop(proxy: AnyRef): Boolean = getActorRefFor(proxy) match { case null ⇒ false - case ref ⇒ ref.stop; true + case ref ⇒ ref.asInstanceOf[InternalActorRef].stop; true } /** diff --git a/akka-actor/src/main/scala/akka/actor/UntypedActor.scala b/akka-actor/src/main/scala/akka/actor/UntypedActor.scala index 1692396a8f..ccac32f82f 100644 --- a/akka-actor/src/main/scala/akka/actor/UntypedActor.scala +++ b/akka-actor/src/main/scala/akka/actor/UntypedActor.scala @@ -8,46 +8,67 @@ import akka.japi.{ Creator, Procedure } import akka.dispatch.{ MessageDispatcher, Promise } /** + * Actor base trait that should be extended by or mixed to create an Actor with the semantics of the 'Actor Model': + * http://en.wikipedia.org/wiki/Actor_model + * + * This class is the Java cousin to the [[akka.actor.Actor]] Scala interface. * Subclass this abstract class to create a MDB-style untyped actor. - *

- * This class is meant to be used from Java. - *

+ * + * An actor has a well-defined (non-cyclic) life-cycle. + * - ''RUNNING'' (created and started actor) - can receive messages + * - ''SHUTDOWN'' (when 'stop' or 'exit' is invoked) - can't do anything + * + * The Actor's own [[akka.actor.ActorRef]] is available as `getSelf()`, the current + * message’s sender as `getSender()` and the [[akka.actor.UntypedActorContext]] as + * `getContext()`. The only abstract method is `onReceive()` which is invoked for + * each processed message unless dynamically overridden using `getContext().become()`. + * * Here is an example on how to create and use an UntypedActor: - *

+ *
+ * {{{
  *  public class SampleUntypedActor extends UntypedActor {
+ *
+ *    public class Reply {
+ *      final public ActorRef sender;
+ *      final public Result result;
+ *      Reply(ActorRef sender, Result result) {
+ *        this.sender = sender;
+ *        this.result = result;
+ *      }
+ *    }
+ *
  *    public void onReceive(Object message) throws Exception {
  *      if (message instanceof String) {
  *        String msg = (String)message;
  *
- *        if (msg.equals("UseReply")) {
- *          // Reply to original sender of message using the 'reply' method
- *          getContext().getSender().tell(msg + ":" + getSelf().getAddress());
- *
- *        } else if (msg.equals("UseSender") && getSender().isDefined()) {
- *          // Reply to original sender of message using the sender reference
- *          // also passing along my own reference (the self)
- *          getSender().get().tell(msg, getSelf());
+ *        if (msg.equals("UseSender")) {
+ *          // Reply to original sender of message
+ *          getSender().tell(msg + ":" + getSelf());
  *
  *        } else if (msg.equals("SendToSelf")) {
  *          // Send message to the actor itself recursively
- *          getSelf().tell(msg)
+ *          getSelf().tell("SomeOtherMessage");
  *
- *        } else if (msg.equals("ForwardMessage")) {
- *          // Retreive an actor from the ActorRegistry by ID and get an ActorRef back
- *          ActorRef actorRef = Actor.registry.local.actorsFor("some-actor-id").head();
+ *        } else if (msg.equals("ErrorKernelWithDirectReply")) {
+ *          // Send work to one-off child which will reply directly to original sender
+ *          getContext().actorOf(new Props(Worker.class)).tell("DoSomeDangerousWork", getSender());
+ *
+ *        } else if (msg.equals("ErrorKernelWithReplyHere")) {
+ *          // Send work to one-off child and collect the answer, reply handled further down
+ *          getContext().actorOf(new Props(Worker.class)).tell("DoWorkAndReplyToMe");
  *
  *        } else throw new IllegalArgumentException("Unknown message: " + message);
+ *
+ *      } else if (message instanceof Reply) {
+ *
+ *        final Reply reply = (Reply) message;
+ *        // might want to do some processing/book-keeping here
+ *        reply.sender.tell(reply.result);
+ *
  *      } else throw new IllegalArgumentException("Unknown message: " + message);
  *    }
- *
- *    public static void main(String[] args) {
- *      ActorSystem system = ActorSystem.create("Sample");
- *      ActorRef actor = system.actorOf(SampleUntypedActor.class);
- *      actor.tell("SendToSelf");
- *      actor.stop();
- *    }
  *  }
- * 
+ * }}} */ abstract class UntypedActor extends Actor { @@ -65,8 +86,9 @@ abstract class UntypedActor extends Actor { def getSelf(): ActorRef = self /** - * The reference sender Actor of the last received message. - * Is defined if the message was sent from another Actor, else None. + * The reference sender Actor of the currently processed message. This is + * always a legal destination to send to, even if there is no logical recipient + * for the reply, in which case it will be sent to the dead letter mailbox. */ def getSender(): ActorRef = sender @@ -77,7 +99,7 @@ abstract class UntypedActor extends Actor { * Actor are automatically started asynchronously when created. * Empty default implementation. */ - override def preStart() {} + override def preStart(): Unit = super.preStart() /** * User overridable callback. @@ -85,24 +107,22 @@ abstract class UntypedActor extends Actor { * Is called asynchronously after 'actor.stop()' is invoked. * Empty default implementation. */ - override def postStop() {} + override def postStop(): Unit = super.postStop() /** - * User overridable callback. + * User overridable callback: '''By default it disposes of all children and then calls `postStop()`.''' *

* Is called on a crashed Actor right BEFORE it is restarted to allow clean * up of resources before Actor is terminated. - * By default it calls postStop() */ - override def preRestart(reason: Throwable, message: Option[Any]) { postStop() } + override def preRestart(reason: Throwable, message: Option[Any]): Unit = super.preRestart(reason, message) /** - * User overridable callback. + * User overridable callback: By default it calls `preStart()`. *

* Is called right AFTER restart on the newly created Actor to allow reinitialization after an Actor crash. - * By default it calls preStart() */ - override def postRestart(reason: Throwable) { preStart() } + override def postRestart(reason: Throwable): Unit = super.postRestart(reason) /** * User overridable callback. diff --git a/akka-actor/src/main/scala/akka/event/Logging.scala b/akka-actor/src/main/scala/akka/event/Logging.scala index 0425b4c661..bde3bd725a 100644 --- a/akka-actor/src/main/scala/akka/event/Logging.scala +++ b/akka-actor/src/main/scala/akka/event/Logging.scala @@ -137,7 +137,10 @@ trait LoggingBus extends ActorEventBus { } { // this is very necessary, else you get infinite loop with DeadLetter unsubscribe(logger) - logger.stop() + logger match { + case ref: InternalActorRef ⇒ ref.stop() + case _ ⇒ + } } publish(Debug(simpleName(this), "all default loggers stopped")) } diff --git a/akka-actor/src/main/scala/akka/routing/ConnectionManager.scala b/akka-actor/src/main/scala/akka/routing/ConnectionManager.scala index a417c75bac..135546ad2b 100644 --- a/akka-actor/src/main/scala/akka/routing/ConnectionManager.scala +++ b/akka-actor/src/main/scala/akka/routing/ConnectionManager.scala @@ -66,49 +66,3 @@ trait ConnectionManager { */ def remove(deadRef: ActorRef) } - -/** - * Manages local connections for a router, e.g. local actors. - */ -class LocalConnectionManager(initialConnections: Iterable[ActorRef]) extends ConnectionManager { - - def this(iterable: java.lang.Iterable[ActorRef]) { - this(JavaConverters.iterableAsScalaIterableConverter(iterable).asScala) - } - - case class State(version: Long, connections: Iterable[ActorRef]) extends VersionedIterable[ActorRef] { - def iterable = connections - } - - private val state: AtomicReference[State] = new AtomicReference[State](newState()) - - private def newState() = State(Long.MinValue, initialConnections) - - def version: Long = state.get.version - - def size: Int = state.get.connections.size - - def isEmpty: Boolean = state.get.connections.isEmpty - - def connections = state.get - - def shutdown() { - state.get.connections foreach (_.stop()) - } - - @tailrec - final def remove(ref: ActorRef) = { - val oldState = state.get - - //remote the ref from the connections. - var newList = oldState.connections.filter(currentActorRef ⇒ currentActorRef ne ref) - - if (newList.size != oldState.connections.size) { - //one or more occurrences of the actorRef were removed, so we need to update the state. - - val newState = State(oldState.version + 1, newList) - //if we are not able to update the state, we just try again. - if (!state.compareAndSet(oldState, newState)) remove(ref) - } - } -} diff --git a/akka-actor/src/main/scala/akka/routing/Routing.scala b/akka-actor/src/main/scala/akka/routing/Routing.scala index 7df6a388cb..a321bb8983 100644 --- a/akka-actor/src/main/scala/akka/routing/Routing.scala +++ b/akka-actor/src/main/scala/akka/routing/Routing.scala @@ -132,7 +132,7 @@ trait Router extends Actor { case Terminated(child) ⇒ ref._routees = ref._routees filterNot (_ == child) - if (ref.routees.isEmpty) self.stop() + if (ref.routees.isEmpty) context.stop(self) }: Receive) orElse routerReceive diff --git a/akka-actor/src/main/scala/akka/util/ListenerManagement.scala b/akka-actor/src/main/scala/akka/util/ListenerManagement.scala index 3efbcbc902..1d6df328d5 100644 --- a/akka-actor/src/main/scala/akka/util/ListenerManagement.scala +++ b/akka-actor/src/main/scala/akka/util/ListenerManagement.scala @@ -4,13 +4,15 @@ package akka.util +import akka.actor.Actor + import java.util.concurrent.ConcurrentSkipListSet import akka.actor.{ ActorInitializationException, ActorRef } /** * A manager for listener actors. Intended for mixin by observables. */ -trait ListenerManagement { +trait ListenerManagement { this: Actor ⇒ private val listeners = new ConcurrentSkipListSet[ActorRef] @@ -33,7 +35,7 @@ trait ListenerManagement { */ def removeListener(listener: ActorRef) { listeners remove listener - if (manageLifeCycleOfListeners) listener.stop() + if (manageLifeCycleOfListeners) context.stop(listener) } /* diff --git a/akka-docs/common/code/SchedulerDocSpec.scala b/akka-docs/common/code/SchedulerDocSpec.scala index ac101e396d..5c4635b864 100644 --- a/akka-docs/common/code/SchedulerDocSpec.scala +++ b/akka-docs/common/code/SchedulerDocSpec.scala @@ -62,6 +62,6 @@ class SchedulerDocSpec extends AkkaSpec(Map("akka.loglevel" -> "INFO")) { //This cancels further Ticks to be sent cancellable.cancel() //#schedule-recurring - tickActor.stop() + system.stop(tickActor) } } diff --git a/akka-docs/general/code/akka/docs/config/ConfigDocSpec.scala b/akka-docs/general/code/akka/docs/config/ConfigDocSpec.scala index 557c33ff53..b520a3b45d 100644 --- a/akka-docs/general/code/akka/docs/config/ConfigDocSpec.scala +++ b/akka-docs/general/code/akka/docs/config/ConfigDocSpec.scala @@ -26,7 +26,7 @@ class ConfigDocSpec extends WordSpec with MustMatchers { val system = ActorSystem("MySystem", ConfigFactory.load(customConf)) //#custom-config - system.stop() + system.shutdown() } diff --git a/akka-docs/general/supervision.rst b/akka-docs/general/supervision.rst index 0867d931f8..74c95abfc5 100644 --- a/akka-docs/general/supervision.rst +++ b/akka-docs/general/supervision.rst @@ -25,7 +25,9 @@ which explains the existence of the fourth choice (as a supervisor also is subordinate to another supervisor higher up) and has implications on the first three: resuming an actor resumes all its subordinates, restarting an actor entails restarting all its subordinates, similarly stopping an actor will also -stop all its subordinates. +stop all its subordinates. It should be noted that the default behavior of an +actor is to stop all its children before restarting, but this can be overridden +using the :meth:`preRestart` hook. Each supervisor is configured with a function translating all possible failure causes (i.e. exceptions) into one of the four choices given above; notably, @@ -69,14 +71,12 @@ that the restart is not visible outside of the actor itself with the notable exception that the message during which the failure occurred is not re-processed. -Restarting an actor in this way recursively restarts all its children in the -same fashion, whereby all parent–child relationships are kept intact. If this -is not the right approach for certain sub-trees of the supervision hierarchy, -you should choose to stop the failed actor instead, which will terminate all -its children recursively, after which that part of the system may be recreated -from scratch. The second part of this action may be implemented using the -lifecycle monitoring described next or using lifecycle callbacks as described -in :class:`Actor`. +Restarting an actor in this way recursively terminates all its children. If +this is not the right approach for certain sub-trees of the supervision +hierarchy, you may choose to retain the children, in which case they will be +recursively restarted in the same fashion as the failed parent (with the same +default to terminate children, which must be overridden on a per-actor basis, +see :class:`Actor` for details). What Lifecycle Monitoring Means ------------------------------- diff --git a/akka-docs/java/code/akka/docs/actor/UntypedActorTestBase.java b/akka-docs/java/code/akka/docs/actor/UntypedActorTestBase.java index c2a877d962..922f4ed0e6 100644 --- a/akka-docs/java/code/akka/docs/actor/UntypedActorTestBase.java +++ b/akka-docs/java/code/akka/docs/actor/UntypedActorTestBase.java @@ -42,7 +42,7 @@ public class UntypedActorTestBase { ActorRef myActor = system.actorOf(new Props(MyUntypedActor.class)); //#system-actorOf myActor.tell("test"); - system.stop(); + system.shutdown(); } @Test @@ -52,7 +52,7 @@ public class UntypedActorTestBase { ActorRef myActor = system.actorOf(new Props(MyUntypedActor.class)); //#context-actorOf myActor.tell("test"); - system.stop(); + system.shutdown(); } @Test @@ -67,7 +67,7 @@ public class UntypedActorTestBase { })); //#creating-constructor myActor.tell("test"); - system.stop(); + system.shutdown(); } @Test @@ -80,7 +80,7 @@ public class UntypedActorTestBase { "myactor"); //#creating-props myActor.tell("test"); - system.stop(); + system.shutdown(); } @Test @@ -105,7 +105,7 @@ public class UntypedActorTestBase { } } //#using-ask - system.stop(); + system.shutdown(); } @Test @@ -113,7 +113,7 @@ public class UntypedActorTestBase { ActorSystem system = ActorSystem.create("MySystem"); ActorRef myActor = system.actorOf(new Props(MyReceivedTimeoutUntypedActor.class)); myActor.tell("Hello"); - system.stop(); + system.shutdown(); } @Test @@ -123,7 +123,7 @@ public class UntypedActorTestBase { //#poison-pill myActor.tell(poisonPill()); //#poison-pill - system.stop(); + system.shutdown(); } @Test @@ -133,7 +133,7 @@ public class UntypedActorTestBase { //#kill victim.tell(kill()); //#kill - system.stop(); + system.shutdown(); } @Test @@ -147,7 +147,7 @@ public class UntypedActorTestBase { myActor.tell("foo"); myActor.tell("bar"); myActor.tell("bar"); - system.stop(); + system.shutdown(); } public static class MyActor extends UntypedActor { diff --git a/akka-docs/java/code/akka/docs/event/LoggingDocTestBase.java b/akka-docs/java/code/akka/docs/event/LoggingDocTestBase.java index 3241623e95..ba689e2fa1 100644 --- a/akka-docs/java/code/akka/docs/event/LoggingDocTestBase.java +++ b/akka-docs/java/code/akka/docs/event/LoggingDocTestBase.java @@ -37,7 +37,7 @@ public class LoggingDocTestBase { } })); myActor.tell("test"); - system.stop(); + system.shutdown(); } //#my-actor diff --git a/akka-docs/modules/code/akka/docs/actor/mailbox/DurableMailboxDocTestBase.java b/akka-docs/modules/code/akka/docs/actor/mailbox/DurableMailboxDocTestBase.java index 4ac3204d0b..cc9cb31bce 100644 --- a/akka-docs/modules/code/akka/docs/actor/mailbox/DurableMailboxDocTestBase.java +++ b/akka-docs/modules/code/akka/docs/actor/mailbox/DurableMailboxDocTestBase.java @@ -31,7 +31,7 @@ public class DurableMailboxDocTestBase { })); //#define-dispatcher myActor.tell("test"); - system.stop(); + system.shutdown(); } public static class MyUntypedActor extends UntypedActor { diff --git a/akka-docs/scala/code/ActorDocSpec.scala b/akka-docs/scala/code/ActorDocSpec.scala index 5592572443..a08aa93593 100644 --- a/akka-docs/scala/code/ActorDocSpec.scala +++ b/akka-docs/scala/code/ActorDocSpec.scala @@ -40,7 +40,7 @@ class FirstActor extends Actor { case DoIt(msg) ⇒ val replyMsg = doSomeDangerousWork(msg) sender ! replyMsg - self.stop() + context.stop(self) } def doSomeDangerousWork(msg: ImmutableMessage): String = { "done" } })) ! m @@ -143,7 +143,7 @@ class ActorDocSpec extends AkkaSpec(Map("akka.loglevel" -> "INFO")) { //#import-context val first = system.actorOf(Props(new FirstActor)) - first.stop() + system.stop(first) } @@ -169,7 +169,7 @@ class ActorDocSpec extends AkkaSpec(Map("akka.loglevel" -> "INFO")) { system.eventStream.unsubscribe(testActor) system.eventStream.publish(TestEvent.UnMute(filter)) - myActor.stop() + system.stop(myActor) } "creating actor with constructor" in { @@ -182,7 +182,7 @@ class ActorDocSpec extends AkkaSpec(Map("akka.loglevel" -> "INFO")) { val myActor = system.actorOf(Props(new MyActor("..."))) //#creating-constructor - myActor.stop() + system.stop(myActor) } "creating actor with Props" in { @@ -192,7 +192,7 @@ class ActorDocSpec extends AkkaSpec(Map("akka.loglevel" -> "INFO")) { val myActor = system.actorOf(Props[MyActor].withDispatcher(dispatcher), name = "myactor") //#creating-props - myActor.stop() + system.stop(myActor) } "using ask" in { @@ -214,7 +214,7 @@ class ActorDocSpec extends AkkaSpec(Map("akka.loglevel" -> "INFO")) { val result: Option[Int] = for (x ← (myActor ? 3).as[Int]) yield { 2 * x } //#using-ask - myActor.stop() + system.stop(myActor) } "using receiveTimeout" in { diff --git a/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/filequeue/tools/QDumper.scala b/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/filequeue/tools/QDumper.scala index 54c5ba36b6..06f151d84a 100644 --- a/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/filequeue/tools/QDumper.scala +++ b/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/filequeue/tools/QDumper.scala @@ -149,6 +149,6 @@ object QDumper { new QueueDumper(filename, system.log)() } - system.stop() + system.shutdown() } } diff --git a/akka-remote/src/main/scala/akka/remote/RemoteConnectionManager.scala b/akka-remote/src/main/scala/akka/remote/RemoteConnectionManager.scala index 9f623ff853..03aa5ddc62 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteConnectionManager.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteConnectionManager.scala @@ -57,7 +57,7 @@ class RemoteConnectionManager( def isEmpty: Boolean = connections.connections.isEmpty def shutdown() { - state.get.iterable foreach (_.stop()) // shut down all remote connections + state.get.iterable foreach (system.stop(_)) // shut down all remote connections } @tailrec @@ -136,7 +136,7 @@ class RemoteConnectionManager( //if we are not able to update the state, we just try again. if (!state.compareAndSet(oldState, newState)) { // we failed, need compensating action - newConnection.stop() // stop the new connection actor and try again + system.stop(newConnection) // stop the new connection actor and try again putIfAbsent(address, newConnectionFactory) // recur } else { // we succeeded diff --git a/akka-remote/src/multi-jvm/scala/akka/remote/random_routed/RandomRoutedRemoteActorMultiJvmSpec.scala b/akka-remote/src/multi-jvm/scala/akka/remote/random_routed/RandomRoutedRemoteActorMultiJvmSpec.scala index 3efc3c5ce5..87a3177b3b 100644 --- a/akka-remote/src/multi-jvm/scala/akka/remote/random_routed/RandomRoutedRemoteActorMultiJvmSpec.scala +++ b/akka-remote/src/multi-jvm/scala/akka/remote/random_routed/RandomRoutedRemoteActorMultiJvmSpec.scala @@ -10,7 +10,7 @@ object RandomRoutedRemoteActorMultiJvmSpec { class SomeActor extends Actor with Serializable { def receive = { case "hit" ⇒ sender ! context.system.nodename - case "end" ⇒ self.stop() + case "end" ⇒ context.stop(self) } } } diff --git a/akka-remote/src/multi-jvm/scala/akka/remote/round_robin_routed/RoundRobinRoutedRemoteActorMultiJvmSpec.scala b/akka-remote/src/multi-jvm/scala/akka/remote/round_robin_routed/RoundRobinRoutedRemoteActorMultiJvmSpec.scala index 786f278a7e..29e57d4209 100644 --- a/akka-remote/src/multi-jvm/scala/akka/remote/round_robin_routed/RoundRobinRoutedRemoteActorMultiJvmSpec.scala +++ b/akka-remote/src/multi-jvm/scala/akka/remote/round_robin_routed/RoundRobinRoutedRemoteActorMultiJvmSpec.scala @@ -10,7 +10,7 @@ object RoundRobinRoutedRemoteActorMultiJvmSpec { class SomeActor extends Actor with Serializable { def receive = { case "hit" ⇒ sender ! context.system.nodename - case "end" ⇒ self.stop() + case "end" ⇒ context.stop(self) } } } diff --git a/akka-remote/src/multi-jvm/scala/akka/remote/scatter_gather_routed/ScatterGatherRoutedRemoteActorMultiJvmSpec.scala b/akka-remote/src/multi-jvm/scala/akka/remote/scatter_gather_routed/ScatterGatherRoutedRemoteActorMultiJvmSpec.scala index 10d6e22f58..c985bf2152 100644 --- a/akka-remote/src/multi-jvm/scala/akka/remote/scatter_gather_routed/ScatterGatherRoutedRemoteActorMultiJvmSpec.scala +++ b/akka-remote/src/multi-jvm/scala/akka/remote/scatter_gather_routed/ScatterGatherRoutedRemoteActorMultiJvmSpec.scala @@ -11,7 +11,7 @@ object ScatterGatherRoutedRemoteActorMultiJvmSpec { class SomeActor extends Actor with Serializable { def receive = { case "hit" ⇒ sender ! context.system.nodename - case "end" ⇒ self.stop() + case "end" ⇒ context.stop(self) } } } diff --git a/akka-remote/src/test/scala/akka/remote/RemoteCommunicationSpec.scala b/akka-remote/src/test/scala/akka/remote/RemoteCommunicationSpec.scala index 62de045fb5..d7827134bc 100644 --- a/akka-remote/src/test/scala/akka/remote/RemoteCommunicationSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/RemoteCommunicationSpec.scala @@ -61,7 +61,7 @@ akka { implicit val timeout = system.settings.ActorTimeout override def atTermination() { - other.stop() + other.shutdown() } "Remoting" must { @@ -103,7 +103,7 @@ akka { expectMsg("preRestart") r ! 42 expectMsg(42) - r.stop() + system.stop(r) expectMsg("postStop") } @@ -130,4 +130,4 @@ akka { } -} \ No newline at end of file +} diff --git a/akka-remote/src/test/scala/akka/remote/RemoteRouterSpec.scala b/akka-remote/src/test/scala/akka/remote/RemoteRouterSpec.scala index 60209d087b..f183a940a7 100644 --- a/akka-remote/src/test/scala/akka/remote/RemoteRouterSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/RemoteRouterSpec.scala @@ -40,7 +40,7 @@ akka { val other = ActorSystem("remote_sys", conf) override def atTermination() { - other.stop() + other.shutdown() } "A Remote Router" must { @@ -55,4 +55,4 @@ akka { } -} \ No newline at end of file +} diff --git a/akka-samples/akka-sample-hello/src/main/scala/sample/hello/Main.scala b/akka-samples/akka-sample-hello/src/main/scala/sample/hello/Main.scala index 710a099312..2921c2d27c 100644 --- a/akka-samples/akka-sample-hello/src/main/scala/sample/hello/Main.scala +++ b/akka-samples/akka-sample-hello/src/main/scala/sample/hello/Main.scala @@ -20,7 +20,7 @@ class HelloActor extends Actor { case Start ⇒ worldActor ! "Hello" case s: String ⇒ println("Received message: %s".format(s)) - context.system.stop() + context.system.shutdown() } } diff --git a/akka-stm/src/main/scala/akka/agent/Agent.scala b/akka-stm/src/main/scala/akka/agent/Agent.scala index cfe618ce47..3fd6c02f7d 100644 --- a/akka-stm/src/main/scala/akka/agent/Agent.scala +++ b/akka-stm/src/main/scala/akka/agent/Agent.scala @@ -302,8 +302,8 @@ class ThreadBasedAgentUpdater[T](agent: Agent[T]) extends Actor { sender.tell(atomic(txFactory) { agent.ref alter update.function.asInstanceOf[T ⇒ T] }) } finally { agent.resume() - self.stop() + context.stop(self) } - case _ ⇒ self.stop() + case _ ⇒ context.stop(self) } } diff --git a/akka-stm/src/test/java/akka/stm/example/EitherOrElseExample.java b/akka-stm/src/test/java/akka/stm/example/EitherOrElseExample.java index a8f3fd475c..61d172e82f 100644 --- a/akka-stm/src/test/java/akka/stm/example/EitherOrElseExample.java +++ b/akka-stm/src/test/java/akka/stm/example/EitherOrElseExample.java @@ -24,6 +24,6 @@ public class EitherOrElseExample { } }.execute(); - brancher.stop(); + application.stop(brancher); } } diff --git a/akka-stm/src/test/java/akka/stm/example/RetryExample.java b/akka-stm/src/test/java/akka/stm/example/RetryExample.java index f15850d232..590e05d94e 100644 --- a/akka-stm/src/test/java/akka/stm/example/RetryExample.java +++ b/akka-stm/src/test/java/akka/stm/example/RetryExample.java @@ -46,8 +46,8 @@ public class RetryExample { System.out.println("Account 2: " + acc2); // Account 2: 600.0 - transferer.stop(); + application.stop(transferer); - application.stop(); + application.shutdown(); } } diff --git a/akka-stm/src/test/java/akka/transactor/example/UntypedCoordinatedExample.java b/akka-stm/src/test/java/akka/transactor/example/UntypedCoordinatedExample.java index 9baf0f1485..4e72bea237 100644 --- a/akka-stm/src/test/java/akka/transactor/example/UntypedCoordinatedExample.java +++ b/akka-stm/src/test/java/akka/transactor/example/UntypedCoordinatedExample.java @@ -40,9 +40,9 @@ public class UntypedCoordinatedExample { } } - counter1.stop(); - counter2.stop(); + application.stop(counter1); + application.stop(counter2); - application.stop(); + application.shutdown(); } } diff --git a/akka-stm/src/test/java/akka/transactor/example/UntypedTransactorExample.java b/akka-stm/src/test/java/akka/transactor/example/UntypedTransactorExample.java index 55e28f872f..d5b0a2d691 100644 --- a/akka-stm/src/test/java/akka/transactor/example/UntypedTransactorExample.java +++ b/akka-stm/src/test/java/akka/transactor/example/UntypedTransactorExample.java @@ -39,9 +39,9 @@ public class UntypedTransactorExample { } } - counter1.stop(); - counter2.stop(); + application.stop(counter1); + application.stop(counter2); - application.stop(); + application.shutdown(); } } diff --git a/akka-stm/src/test/java/akka/transactor/test/UntypedCoordinatedIncrementTest.java b/akka-stm/src/test/java/akka/transactor/test/UntypedCoordinatedIncrementTest.java index a90e0a1952..f2df33a260 100644 --- a/akka-stm/src/test/java/akka/transactor/test/UntypedCoordinatedIncrementTest.java +++ b/akka-stm/src/test/java/akka/transactor/test/UntypedCoordinatedIncrementTest.java @@ -44,7 +44,7 @@ public class UntypedCoordinatedIncrementTest { @AfterClass public static void afterAll() { - system.stop(); + system.shutdown(); system = null; } @@ -113,6 +113,6 @@ public class UntypedCoordinatedIncrementTest { @After public void stop() { - application.stop(); + application.shutdown(); } } diff --git a/akka-stm/src/test/java/akka/transactor/test/UntypedTransactorTest.java b/akka-stm/src/test/java/akka/transactor/test/UntypedTransactorTest.java index 528a2a14f8..7436bbb132 100644 --- a/akka-stm/src/test/java/akka/transactor/test/UntypedTransactorTest.java +++ b/akka-stm/src/test/java/akka/transactor/test/UntypedTransactorTest.java @@ -41,7 +41,7 @@ public class UntypedTransactorTest { @AfterClass public static void afterAll() { - system.stop(); + system.shutdown(); system = null; } diff --git a/akka-stm/src/test/scala/akka/transactor/test/CoordinatedIncrementSpec.scala b/akka-stm/src/test/scala/akka/transactor/test/CoordinatedIncrementSpec.scala index eda336b78e..c2298c9229 100644 --- a/akka-stm/src/test/scala/akka/transactor/test/CoordinatedIncrementSpec.scala +++ b/akka-stm/src/test/scala/akka/transactor/test/CoordinatedIncrementSpec.scala @@ -74,8 +74,8 @@ class CoordinatedIncrementSpec extends AkkaSpec with BeforeAndAfterAll { for (counter ← counters) { (counter ? GetCount).as[Int].get must be === 1 } - counters foreach (_.stop()) - failer.stop() + counters foreach (system.stop(_)) + system.stop(failer) } "increment no counters with a failing transaction" in { @@ -91,8 +91,8 @@ class CoordinatedIncrementSpec extends AkkaSpec with BeforeAndAfterAll { for (counter ← counters) { (counter ? GetCount).as[Int].get must be === 0 } - counters foreach (_.stop()) - failer.stop() + counters foreach (system.stop(_)) + system.stop(failer) } } } diff --git a/akka-stm/src/test/scala/akka/transactor/test/FickleFriendsSpec.scala b/akka-stm/src/test/scala/akka/transactor/test/FickleFriendsSpec.scala index a74490b410..42fe5dbc5a 100644 --- a/akka-stm/src/test/scala/akka/transactor/test/FickleFriendsSpec.scala +++ b/akka-stm/src/test/scala/akka/transactor/test/FickleFriendsSpec.scala @@ -123,8 +123,8 @@ class FickleFriendsSpec extends AkkaSpec with BeforeAndAfterAll { for (counter ← counters) { (counter ? GetCount).as[Int].get must be === 1 } - counters foreach (_.stop()) - coordinator.stop() + counters foreach (system.stop(_)) + system.stop(coordinator) } } } diff --git a/akka-stm/src/test/scala/akka/transactor/test/TransactorSpec.scala b/akka-stm/src/test/scala/akka/transactor/test/TransactorSpec.scala index 43ee399196..d19abee2b0 100644 --- a/akka-stm/src/test/scala/akka/transactor/test/TransactorSpec.scala +++ b/akka-stm/src/test/scala/akka/transactor/test/TransactorSpec.scala @@ -97,8 +97,8 @@ class TransactorSpec extends AkkaSpec { for (counter ← counters) { (counter ? GetCount).as[Int].get must be === 1 } - counters foreach (_.stop()) - failer.stop() + counters foreach (system.stop(_)) + system.stop(failer) } "increment no counters with a failing transaction" in { @@ -114,8 +114,8 @@ class TransactorSpec extends AkkaSpec { for (counter ← counters) { (counter ? GetCount).as[Int].get must be === 0 } - counters foreach (_.stop()) - failer.stop() + counters foreach (system.stop(_)) + system.stop(failer) } } } @@ -129,7 +129,7 @@ class TransactorSpec extends AkkaSpec { latch.await val value = atomic { ref.get } value must be === 5 - transactor.stop() + system.stop(transactor) } } } diff --git a/akka-testkit/src/test/scala/akka/testkit/AkkaSpec.scala b/akka-testkit/src/test/scala/akka/testkit/AkkaSpec.scala index 1fdbaee7d7..bba22d7f76 100644 --- a/akka-testkit/src/test/scala/akka/testkit/AkkaSpec.scala +++ b/akka-testkit/src/test/scala/akka/testkit/AkkaSpec.scala @@ -64,7 +64,7 @@ abstract class AkkaSpec(_system: ActorSystem) } final override def afterAll { - system.stop() + system.shutdown() try system.asInstanceOf[ActorSystemImpl].terminationFuture.await(5 seconds) catch { case _: FutureTimeoutException ⇒ system.log.warning("Failed to stop [{}] within 5 seconds", system.name) } @@ -76,7 +76,7 @@ abstract class AkkaSpec(_system: ActorSystem) protected def atTermination() {} def spawn(body: ⇒ Unit)(implicit dispatcher: MessageDispatcher) { - system.actorOf(Props(ctx ⇒ { case "go" ⇒ try body finally ctx.self.stop() }).withDispatcher(dispatcher)) ! "go" + system.actorOf(Props(ctx ⇒ { case "go" ⇒ try body finally ctx.stop(ctx.self) }).withDispatcher(dispatcher)) ! "go" } } @@ -96,7 +96,7 @@ class AkkaSpecSpec extends WordSpec with MustMatchers { val ref = Seq(testActor, system.actorOf(Props.empty, "name")) } spec.ref foreach (_.isTerminated must not be true) - system.stop() + system.shutdown() spec.awaitCond(spec.ref forall (_.isTerminated), 2 seconds) } @@ -120,7 +120,7 @@ class AkkaSpecSpec extends WordSpec with MustMatchers { implicit val davyJones = otherSystem.actorOf(Props(new Actor { def receive = { case m: DeadLetter ⇒ locker :+= m - case "Die!" ⇒ sender ! "finally gone"; self.stop() + case "Die!" ⇒ sender ! "finally gone"; context.stop(self) } }), "davyJones") @@ -139,15 +139,15 @@ class AkkaSpecSpec extends WordSpec with MustMatchers { val latch = new TestLatch(1)(system) system.registerOnTermination(latch.countDown()) - system.stop() + system.shutdown() latch.await(2 seconds) (davyJones ? "Die!").get must be === "finally gone" // this will typically also contain log messages which were sent after the logger shutdown locker must contain(DeadLetter(42, davyJones, probe.ref)) } finally { - system.stop() - otherSystem.stop() + system.shutdown() + otherSystem.shutdown() } } diff --git a/akka-testkit/src/test/scala/akka/testkit/TestActorRefSpec.scala b/akka-testkit/src/test/scala/akka/testkit/TestActorRefSpec.scala index 462ee6ffc6..e9ecf69cc6 100644 --- a/akka-testkit/src/test/scala/akka/testkit/TestActorRefSpec.scala +++ b/akka-testkit/src/test/scala/akka/testkit/TestActorRefSpec.scala @@ -56,7 +56,7 @@ object TestActorRefSpec { class WorkerActor() extends TActor { def receiveT = { - case "work" ⇒ sender ! "workDone"; self.stop() + case "work" ⇒ sender ! "workDone"; context.stop(self) case replyTo: Promise[Any] ⇒ replyTo.completeWithResult("complexReply") case replyTo: ActorRef ⇒ replyTo ! "complexReply" } diff --git a/akka-tutorials/akka-tutorial-first/src/main/java/akka/tutorial/first/java/Pi.java b/akka-tutorials/akka-tutorial-first/src/main/java/akka/tutorial/first/java/Pi.java index 577c32fe3e..efec273469 100644 --- a/akka-tutorials/akka-tutorial-first/src/main/java/akka/tutorial/first/java/Pi.java +++ b/akka-tutorials/akka-tutorial-first/src/main/java/akka/tutorial/first/java/Pi.java @@ -113,7 +113,7 @@ public class Pi { Result result = (Result) message; pi += result.getValue(); nrOfResults += 1; - if (nrOfResults == nrOfMessages) getSelf().stop(); + if (nrOfResults == nrOfMessages) getContext().stop(getSelf()); } else throw new IllegalArgumentException("Unknown message [" + message + "]"); //#handle-messages } @@ -157,7 +157,7 @@ public class Pi { latch.await(); // Shut down the system - system.stop(); + system.shutdown(); } } -//#app \ No newline at end of file +//#app diff --git a/akka-tutorials/akka-tutorial-first/src/main/scala/Pi.scala b/akka-tutorials/akka-tutorial-first/src/main/scala/Pi.scala index 0a6c0ed04e..cc2b9adce8 100644 --- a/akka-tutorials/akka-tutorial-first/src/main/scala/Pi.scala +++ b/akka-tutorials/akka-tutorial-first/src/main/scala/Pi.scala @@ -62,7 +62,7 @@ object Pi extends App { pi += value nrOfResults += 1 // Stops this actor and all its supervised children - if (nrOfResults == nrOfMessages) self.stop() + if (nrOfResults == nrOfMessages) context.stop(self) //#handle-messages } //#master-receive @@ -98,7 +98,7 @@ object Pi extends App { latch.await() // Shut down the system - system.stop() + system.shutdown() } } //#app diff --git a/akka-tutorials/akka-tutorial-first/src/test/scala/WorkerSpec.scala b/akka-tutorials/akka-tutorial-first/src/test/scala/WorkerSpec.scala index a752b3c783..a9d2a202fd 100644 --- a/akka-tutorials/akka-tutorial-first/src/test/scala/WorkerSpec.scala +++ b/akka-tutorials/akka-tutorial-first/src/test/scala/WorkerSpec.scala @@ -17,7 +17,7 @@ class WorkerSpec extends WordSpec with MustMatchers with BeforeAndAfterAll { implicit val system = ActorSystem() override def afterAll { - system.stop() + system.shutdown() } "Worker" must {