Merge branch 'wip-1466-remove-stop-rk'

This commit is contained in:
Roland 2011-12-14 15:56:08 +01:00
commit 1ab2cecc2c
87 changed files with 475 additions and 359 deletions

View file

@ -20,7 +20,7 @@ public class JavaAPI {
@AfterClass @AfterClass
public static void afterAll() { public static void afterAll() {
system.stop(); system.shutdown();
system = null; system = null;
} }

View file

@ -58,7 +58,7 @@ public class JavaExtension {
@AfterClass @AfterClass
public static void afterAll() { public static void afterAll() {
system.stop(); system.shutdown();
system = null; system = null;
} }

View file

@ -34,7 +34,7 @@ public class JavaFutureTests {
@AfterClass @AfterClass
public static void afterAll() { public static void afterAll() {
system.stop(); system.shutdown();
system = null; system = null;
} }

View file

@ -87,7 +87,7 @@ class ActorFireForgetRequestReplySpec extends AkkaSpec with BeforeAndAfterEach w
state.finished.await state.finished.await
1.second.dilated.sleep() 1.second.dilated.sleep()
actor.isTerminated must be(true) actor.isTerminated must be(true)
supervisor.stop() system.stop(supervisor)
} }
} }
} }

View file

@ -61,7 +61,7 @@ class ActorLifeCycleSpec extends AkkaSpec with BeforeAndAfterEach with ImplicitS
restarter ! Kill restarter ! Kill
expectMsg(("postStop", id, 3)) expectMsg(("postStop", id, 3))
expectNoMsg(1 seconds) expectNoMsg(1 seconds)
supervisor.stop system.stop(supervisor)
} }
} }
@ -92,7 +92,7 @@ class ActorLifeCycleSpec extends AkkaSpec with BeforeAndAfterEach with ImplicitS
restarter ! Kill restarter ! Kill
expectMsg(("postStop", id, 3)) expectMsg(("postStop", id, 3))
expectNoMsg(1 seconds) expectNoMsg(1 seconds)
supervisor.stop system.stop(supervisor)
} }
} }
@ -105,10 +105,10 @@ class ActorLifeCycleSpec extends AkkaSpec with BeforeAndAfterEach with ImplicitS
expectMsg(("preStart", id, 0)) expectMsg(("preStart", id, 0))
a ! "status" a ! "status"
expectMsg(("OK", id, 0)) expectMsg(("OK", id, 0))
a.stop system.stop(a)
expectMsg(("postStop", id, 0)) expectMsg(("postStop", id, 0))
expectNoMsg(1 seconds) expectNoMsg(1 seconds)
supervisor.stop system.stop(supervisor)
} }
} }

View file

@ -42,7 +42,7 @@ object ActorRefSpec {
case "work" { case "work" {
work work
sender ! "workDone" sender ! "workDone"
self.stop() context.stop(self)
} }
case ReplyTo(replyTo) { case ReplyTo(replyTo) {
work work
@ -344,8 +344,8 @@ class ActorRefSpec extends AkkaSpec with DefaultTimeout {
latch.await latch.await
clientRef.stop() system.stop(clientRef)
serverRef.stop() system.stop(serverRef)
} }
"stop when sent a poison pill" in { "stop when sent a poison pill" in {

View file

@ -29,7 +29,7 @@ class ActorTimeoutSpec extends AkkaSpec with BeforeAndAfterAll with DefaultTimeo
try { try {
val f = echo ? "hallo" val f = echo ? "hallo"
intercept[FutureTimeoutException] { f.await } intercept[FutureTimeoutException] { f.await }
} finally { echo.stop } } finally { system.stop(echo) }
} }
} }
@ -41,14 +41,14 @@ class ActorTimeoutSpec extends AkkaSpec with BeforeAndAfterAll with DefaultTimeo
val f = (echo ? "hallo").mapTo[String] val f = (echo ? "hallo").mapTo[String]
intercept[FutureTimeoutException] { f.await } intercept[FutureTimeoutException] { f.await }
f.value must be(None) f.value must be(None)
} finally { echo.stop } } finally { system.stop(echo) }
} }
} }
"use explicitly supplied timeout" in { "use explicitly supplied timeout" in {
within(testTimeout - 100.millis, testTimeout + 300.millis) { within(testTimeout - 100.millis, testTimeout + 300.millis) {
val echo = actorWithTimeout(Props.defaultTimeout) val echo = actorWithTimeout(Props.defaultTimeout)
try { (echo.?("hallo", testTimeout)).as[String] must be(None) } finally { echo.stop } try { (echo.?("hallo", testTimeout)).as[String] must be(None) } finally { system.stop(echo) }
} }
} }
} }

View file

@ -85,7 +85,7 @@ object Chameneos {
sumMeetings += i sumMeetings += i
if (numFaded == numChameneos) { if (numFaded == numChameneos) {
Chameneos.end = System.currentTimeMillis Chameneos.end = System.currentTimeMillis
self.stop() context.stop(self)
} }
case msg @ Meet(a, c) case msg @ Meet(a, c)
@ -107,10 +107,11 @@ object Chameneos {
def run { def run {
// System.setProperty("akka.config", "akka.conf") // System.setProperty("akka.config", "akka.conf")
Chameneos.start = System.currentTimeMillis Chameneos.start = System.currentTimeMillis
val system = ActorSystem().actorOf(Props(new Mall(1000000, 4))) val system = ActorSystem()
val actor = system.actorOf(Props(new Mall(1000000, 4)))
Thread.sleep(10000) Thread.sleep(10000)
println("Elapsed: " + (end - start)) println("Elapsed: " + (end - start))
system.stop() system.shutdown()
} }
def main(args: Array[String]): Unit = run def main(args: Array[String]): Unit = run

View file

@ -26,7 +26,7 @@ object ConsistencySpec {
} }
lastStep = step lastStep = step
case "done" sender ! "done"; self.stop() case "done" sender ! "done"; context.stop(self)
} }
} }
} }

View file

@ -43,9 +43,9 @@ class DeathWatchSpec extends AkkaSpec with BeforeAndAfterEach with ImplicitSende
expectTerminationOf(terminal) expectTerminationOf(terminal)
expectTerminationOf(terminal) expectTerminationOf(terminal)
monitor1.stop() system.stop(monitor1)
monitor2.stop() system.stop(monitor2)
monitor3.stop() system.stop(monitor3)
} }
"notify with _current_ monitors with one Terminated message when an Actor is stopped" in { "notify with _current_ monitors with one Terminated message when an Actor is stopped" in {
@ -69,9 +69,9 @@ class DeathWatchSpec extends AkkaSpec with BeforeAndAfterEach with ImplicitSende
expectTerminationOf(terminal) expectTerminationOf(terminal)
expectTerminationOf(terminal) expectTerminationOf(terminal)
monitor1.stop() system.stop(monitor1)
monitor2.stop() system.stop(monitor2)
monitor3.stop() system.stop(monitor3)
} }
"notify with a Terminated message once when an Actor is stopped but not when restarted" in { "notify with a Terminated message once when an Actor is stopped but not when restarted" in {
@ -90,7 +90,7 @@ class DeathWatchSpec extends AkkaSpec with BeforeAndAfterEach with ImplicitSende
expectTerminationOf(terminal) expectTerminationOf(terminal)
terminal.isTerminated must be === true terminal.isTerminated must be === true
supervisor.stop() system.stop(supervisor)
} }
} }
@ -99,9 +99,9 @@ class DeathWatchSpec extends AkkaSpec with BeforeAndAfterEach with ImplicitSende
case class FF(fail: Failed) case class FF(fail: Failed)
val supervisor = system.actorOf(Props[Supervisor] val supervisor = system.actorOf(Props[Supervisor]
.withFaultHandler(new OneForOneStrategy(FaultHandlingStrategy.makeDecider(List(classOf[Exception])), Some(0)) { .withFaultHandler(new OneForOneStrategy(FaultHandlingStrategy.makeDecider(List(classOf[Exception])), Some(0)) {
override def handleFailure(child: ActorRef, cause: Throwable, stats: ChildRestartStats, children: Iterable[ChildRestartStats]) = { override def handleFailure(context: ActorContext, child: ActorRef, cause: Throwable, stats: ChildRestartStats, children: Iterable[ChildRestartStats]) = {
testActor.tell(FF(Failed(cause)), child) testActor.tell(FF(Failed(cause)), child)
super.handleFailure(child, cause, stats, children) super.handleFailure(context, child, cause, stats, children)
} }
})) }))

View file

@ -95,7 +95,7 @@ class DeployerSpec extends AkkaSpec(DeployerSpec.deployerConf) {
} }
""", ConfigParseOptions.defaults).withFallback(AkkaSpec.testConf) """, ConfigParseOptions.defaults).withFallback(AkkaSpec.testConf)
ActorSystem("invalid", invalidDeployerConf).stop() ActorSystem("invalid", invalidDeployerConf).shutdown()
} }
} }

View file

@ -187,7 +187,7 @@ class FSMActorSpec extends AkkaSpec(Map("akka.actor.debug.fsm" -> true)) with Im
} }
val ref = system.actorOf(Props(fsm)) val ref = system.actorOf(Props(fsm))
started.await started.await
ref.stop() system.stop(ref)
expectMsg(1 second, fsm.StopEvent(Shutdown, 1, null)) expectMsg(1 second, fsm.StopEvent(Shutdown, 1, null))
} }
@ -233,7 +233,7 @@ class FSMActorSpec extends AkkaSpec(Map("akka.actor.debug.fsm" -> true)) with Im
} }
} }
} finally { } finally {
fsmEventSystem.stop() fsmEventSystem.shutdown()
} }
} }

View file

@ -78,7 +78,7 @@ class FSMTransitionSpec extends AkkaSpec with ImplicitSender {
within(300 millis) { within(300 millis) {
fsm ! SubscribeTransitionCallBack(forward) fsm ! SubscribeTransitionCallBack(forward)
expectMsg(CurrentState(fsm, 0)) expectMsg(CurrentState(fsm, 0))
forward.stop() system.stop(forward)
fsm ! "tick" fsm ! "tick"
expectNoMsg expectNoMsg
} }

View file

@ -196,9 +196,9 @@ class IOActorSpec extends AkkaSpec with BeforeAndAfterEach with DefaultTimeout {
f1.get must equal(ByteString("Hello World!1")) f1.get must equal(ByteString("Hello World!1"))
f2.get must equal(ByteString("Hello World!2")) f2.get must equal(ByteString("Hello World!2"))
f3.get must equal(ByteString("Hello World!3")) f3.get must equal(ByteString("Hello World!3"))
client.stop system.stop(client)
server.stop system.stop(server)
ioManager.stop system.stop(ioManager)
} }
"run echo server under high load" in { "run echo server under high load" in {
@ -210,9 +210,9 @@ class IOActorSpec extends AkkaSpec with BeforeAndAfterEach with DefaultTimeout {
val list = List.range(0, 1000) val list = List.range(0, 1000)
val f = Future.traverse(list)(i client ? ByteString(i.toString)) val f = Future.traverse(list)(i client ? ByteString(i.toString))
assert(f.get.size === 1000) assert(f.get.size === 1000)
client.stop system.stop(client)
server.stop system.stop(server)
ioManager.stop system.stop(ioManager)
} }
"run echo server under high load with small buffer" in { "run echo server under high load with small buffer" in {
@ -224,9 +224,9 @@ class IOActorSpec extends AkkaSpec with BeforeAndAfterEach with DefaultTimeout {
val list = List.range(0, 1000) val list = List.range(0, 1000)
val f = Future.traverse(list)(i client ? ByteString(i.toString)) val f = Future.traverse(list)(i client ? ByteString(i.toString))
assert(f.get.size === 1000) assert(f.get.size === 1000)
client.stop system.stop(client)
server.stop system.stop(server)
ioManager.stop system.stop(ioManager)
} }
"run key-value store" in { "run key-value store" in {
@ -250,10 +250,10 @@ class IOActorSpec extends AkkaSpec with BeforeAndAfterEach with DefaultTimeout {
f4.get must equal("OK") f4.get must equal("OK")
f5.get must equal(ByteString("I'm a test!")) f5.get must equal(ByteString("I'm a test!"))
f6.get must equal(Map("hello" -> ByteString("World"), "test" -> ByteString("I'm a test!"))) f6.get must equal(Map("hello" -> ByteString("World"), "test" -> ByteString("I'm a test!")))
client1.stop system.stop(client1)
client2.stop system.stop(client2)
server.stop system.stop(server)
ioManager.stop system.stop(ioManager)
} }
} }

View file

@ -26,7 +26,7 @@ class ReceiveTimeoutSpec extends AkkaSpec {
})) }))
timeoutLatch.await timeoutLatch.await
timeoutActor.stop() system.stop(timeoutActor)
} }
"reschedule timeout after regular receive" in { "reschedule timeout after regular receive" in {
@ -45,7 +45,7 @@ class ReceiveTimeoutSpec extends AkkaSpec {
timeoutActor ! Tick timeoutActor ! Tick
timeoutLatch.await timeoutLatch.await
timeoutActor.stop() system.stop(timeoutActor)
} }
"be able to turn off timeout if desired" in { "be able to turn off timeout if desired" in {
@ -69,7 +69,7 @@ class ReceiveTimeoutSpec extends AkkaSpec {
timeoutLatch.await timeoutLatch.await
count.get must be(1) count.get must be(1)
timeoutActor.stop() system.stop(timeoutActor)
} }
"not receive timeout message when not specified" in { "not receive timeout message when not specified" in {
@ -82,7 +82,7 @@ class ReceiveTimeoutSpec extends AkkaSpec {
})) }))
timeoutLatch.awaitTimeout(1 second) // timeout expected timeoutLatch.awaitTimeout(1 second) // timeout expected
timeoutActor.stop() system.stop(timeoutActor)
} }
"have ReceiveTimeout eq to Actors ReceiveTimeout" in { "have ReceiveTimeout eq to Actors ReceiveTimeout" in {

View file

@ -7,4 +7,6 @@ class Supervisor extends Actor {
def receive = { def receive = {
case x: Props sender ! context.actorOf(x) case x: Props sender ! context.actorOf(x)
} }
// need to override the default of stopping all children upon restart, tests rely on keeping them around
override def preRestart(cause: Throwable, msg: Option[Any]) {}
} }

View file

@ -15,6 +15,8 @@ object SupervisorHierarchySpec {
protected def receive = { protected def receive = {
case p: Props sender ! context.actorOf(p) case p: Props sender ! context.actorOf(p)
} }
// test relies on keeping children around during restart
override def preRestart(cause: Throwable, msg: Option[Any]) {}
override def postRestart(reason: Throwable) = { override def postRestart(reason: Throwable) = {
countDown.countDown() countDown.countDown()
} }

View file

@ -24,7 +24,7 @@ class SupervisorMiscSpec extends AkkaSpec with DefaultTimeout {
override def postRestart(cause: Throwable) { countDownLatch.countDown() } override def postRestart(cause: Throwable) { countDownLatch.countDown() }
protected def receive = { protected def receive = {
case "status" this.sender ! "OK" case "status" this.sender ! "OK"
case _ this.self.stop() case _ this.context.stop(self)
} }
}) })

View file

@ -306,7 +306,7 @@ class SupervisorSpec extends AkkaSpec with BeforeAndAfterEach with ImplicitSende
inits.get must be(3) inits.get must be(3)
supervisor.stop() system.stop(supervisor)
} }
} }
} }

View file

@ -35,7 +35,7 @@ class SupervisorTreeSpec extends AkkaSpec with ImplicitSender with DefaultTimeou
expectMsg(middleActor.path) expectMsg(middleActor.path)
expectMsg(lastActor.path) expectMsg(lastActor.path)
expectNoMsg(2 seconds) expectNoMsg(2 seconds)
headActor.stop() system.stop(headActor)
} }
} }
} }

View file

@ -28,7 +28,7 @@ class Ticket669Spec extends AkkaSpec with BeforeAndAfterAll with ImplicitSender
supervised.!("test")(testActor) supervised.!("test")(testActor)
expectMsg("failure1") expectMsg("failure1")
supervisor.stop() system.stop(supervisor)
} }
} }
@ -39,7 +39,7 @@ class Ticket669Spec extends AkkaSpec with BeforeAndAfterAll with ImplicitSender
supervised.!("test")(testActor) supervised.!("test")(testActor)
expectMsg("failure2") expectMsg("failure2")
supervisor.stop() system.stop(supervisor)
} }
} }
} }

View file

@ -77,7 +77,7 @@ object ActorModelSpec {
case Forward(to, msg) ack; to.forward(msg); busy.switchOff() case Forward(to, msg) ack; to.forward(msg); busy.switchOff()
case CountDown(latch) ack; latch.countDown(); busy.switchOff() case CountDown(latch) ack; latch.countDown(); busy.switchOff()
case Increment(count) ack; count.incrementAndGet(); busy.switchOff() case Increment(count) ack; count.incrementAndGet(); busy.switchOff()
case CountDownNStop(l) ack; l.countDown(); self.stop(); busy.switchOff() case CountDownNStop(l) ack; l.countDown(); context.stop(self); busy.switchOff()
case Restart ack; busy.switchOff(); throw new Exception("Restart requested") case Restart ack; busy.switchOff(); throw new Exception("Restart requested")
case Interrupt ack; sender ! Status.Failure(new ActorInterruptedException(new InterruptedException("Ping!"))); busy.switchOff(); throw new InterruptedException("Ping!") case Interrupt ack; sender ! Status.Failure(new ActorInterruptedException(new InterruptedException("Ping!"))); busy.switchOff(); throw new InterruptedException("Ping!")
case ThrowException(e: Throwable) ack; busy.switchOff(); throw e case ThrowException(e: Throwable) ack; busy.switchOff(); throw e
@ -239,7 +239,7 @@ abstract class ActorModelSpec extends AkkaSpec with DefaultTimeout {
assertDispatcher(dispatcher)(stops = 0) assertDispatcher(dispatcher)(stops = 0)
val a = newTestActor(dispatcher) val a = newTestActor(dispatcher)
assertDispatcher(dispatcher)(stops = 0) assertDispatcher(dispatcher)(stops = 0)
a.stop() system.stop(a)
assertDispatcher(dispatcher)(stops = 1) assertDispatcher(dispatcher)(stops = 1)
assertRef(a, dispatcher)( assertRef(a, dispatcher)(
suspensions = 0, suspensions = 0,
@ -260,7 +260,7 @@ abstract class ActorModelSpec extends AkkaSpec with DefaultTimeout {
assertDispatcher(dispatcher)(stops = 2) assertDispatcher(dispatcher)(stops = 2)
a2.stop system.stop(a2)
assertDispatcher(dispatcher)(stops = 3) assertDispatcher(dispatcher)(stops = 3)
} }
@ -279,7 +279,7 @@ abstract class ActorModelSpec extends AkkaSpec with DefaultTimeout {
assertCountDown(oneAtATime, (1.5 seconds).dilated.toMillis, "Processed message when allowed") assertCountDown(oneAtATime, (1.5 seconds).dilated.toMillis, "Processed message when allowed")
assertRefDefaultZero(a)(registers = 1, msgsReceived = 3, msgsProcessed = 3) assertRefDefaultZero(a)(registers = 1, msgsReceived = 3, msgsProcessed = 3)
a.stop() system.stop(a)
assertRefDefaultZero(a)(registers = 1, unregisters = 1, msgsReceived = 3, msgsProcessed = 3) assertRefDefaultZero(a)(registers = 1, unregisters = 1, msgsReceived = 3, msgsProcessed = 3)
} }
@ -298,7 +298,7 @@ abstract class ActorModelSpec extends AkkaSpec with DefaultTimeout {
assertCountDown(counter, 3.seconds.dilated.toMillis, "Should process 200 messages") assertCountDown(counter, 3.seconds.dilated.toMillis, "Should process 200 messages")
assertRefDefaultZero(a)(registers = 1, msgsReceived = 200, msgsProcessed = 200) assertRefDefaultZero(a)(registers = 1, msgsReceived = 200, msgsProcessed = 200)
a.stop() system.stop(a)
} }
def spawn(f: Unit) { def spawn(f: Unit) {
@ -328,7 +328,7 @@ abstract class ActorModelSpec extends AkkaSpec with DefaultTimeout {
assertRefDefaultZero(a)(registers = 1, msgsReceived = 1, msgsProcessed = 1, assertRefDefaultZero(a)(registers = 1, msgsReceived = 1, msgsProcessed = 1,
suspensions = 1, resumes = 1) suspensions = 1, resumes = 1)
a.stop() system.stop(a)
assertRefDefaultZero(a)(registers = 1, unregisters = 1, msgsReceived = 1, msgsProcessed = 1, assertRefDefaultZero(a)(registers = 1, unregisters = 1, msgsReceived = 1, msgsProcessed = 1,
suspensions = 1, resumes = 1) suspensions = 1, resumes = 1)
} }
@ -370,7 +370,7 @@ abstract class ActorModelSpec extends AkkaSpec with DefaultTimeout {
throw e throw e
} }
assertCountDown(stopLatch, waitTime, "Expected all children to stop") assertCountDown(stopLatch, waitTime, "Expected all children to stop")
boss.stop() system.stop(boss)
} }
for (run 1 to 3) { for (run 1 to 3) {
flood(50000) flood(50000)
@ -447,8 +447,8 @@ class DispatcherModelSpec extends ActorModelSpec {
aStop.countDown() aStop.countDown()
a.stop system.stop(a)
b.stop system.stop(b)
while (!a.isTerminated && !b.isTerminated) {} //Busy wait for termination while (!a.isTerminated && !b.isTerminated) {} //Busy wait for termination
@ -484,8 +484,8 @@ class BalancingDispatcherModelSpec extends ActorModelSpec {
aStop.countDown() aStop.countDown()
a.stop system.stop(a)
b.stop system.stop(b)
while (!a.isTerminated && !b.isTerminated) {} //Busy wait for termination while (!a.isTerminated && !b.isTerminated) {} //Busy wait for termination

View file

@ -74,8 +74,8 @@ class BalancingDispatcherSpec extends AkkaSpec {
fast.underlying.actor.asInstanceOf[DelayableActor].invocationCount must be > sentToFast fast.underlying.actor.asInstanceOf[DelayableActor].invocationCount must be > sentToFast
fast.underlying.actor.asInstanceOf[DelayableActor].invocationCount must be > fast.underlying.actor.asInstanceOf[DelayableActor].invocationCount must be >
(slow.underlying.actor.asInstanceOf[DelayableActor].invocationCount) (slow.underlying.actor.asInstanceOf[DelayableActor].invocationCount)
slow.stop() system.stop(slow)
fast.stop() system.stop(fast)
} }
} }
} }

View file

@ -39,14 +39,14 @@ class DispatcherActorSpec extends AkkaSpec with DefaultTimeout {
val actor = system.actorOf(Props[OneWayTestActor].withDispatcher(system.dispatcherFactory.newDispatcher("test").build)) val actor = system.actorOf(Props[OneWayTestActor].withDispatcher(system.dispatcherFactory.newDispatcher("test").build))
val result = actor ! "OneWay" val result = actor ! "OneWay"
assert(OneWayTestActor.oneWay.await(1, TimeUnit.SECONDS)) assert(OneWayTestActor.oneWay.await(1, TimeUnit.SECONDS))
actor.stop() system.stop(actor)
} }
"support ask/reply" in { "support ask/reply" in {
val actor = system.actorOf(Props[TestActor].withDispatcher(system.dispatcherFactory.newDispatcher("test").build)) val actor = system.actorOf(Props[TestActor].withDispatcher(system.dispatcherFactory.newDispatcher("test").build))
val result = (actor ? "Hello").as[String] val result = (actor ? "Hello").as[String]
assert("World" === result.get) assert("World" === result.get)
actor.stop() system.stop(actor)
} }
"respect the throughput setting" in { "respect the throughput setting" in {
@ -72,8 +72,8 @@ class DispatcherActorSpec extends AkkaSpec with DefaultTimeout {
fastOne ! "sabotage" fastOne ! "sabotage"
start.countDown() start.countDown()
latch.await(10, TimeUnit.SECONDS) latch.await(10, TimeUnit.SECONDS)
fastOne.stop() system.stop(fastOne)
slowOne.stop() system.stop(slowOne)
assert(latch.getCount() === 0) assert(latch.getCount() === 0)
} }
@ -90,13 +90,13 @@ class DispatcherActorSpec extends AkkaSpec with DefaultTimeout {
val fastOne = system.actorOf( val fastOne = system.actorOf(
Props(context { Props(context {
case "ping" if (works.get) latch.countDown(); context.self.stop() case "ping" if (works.get) latch.countDown(); context.stop(context.self)
}).withDispatcher(throughputDispatcher)) }).withDispatcher(throughputDispatcher))
val slowOne = system.actorOf( val slowOne = system.actorOf(
Props(context { Props(context {
case "hogexecutor" ready.countDown(); start.await case "hogexecutor" ready.countDown(); start.await
case "ping" works.set(false); context.self.stop() case "ping" works.set(false); context.stop(context.self)
}).withDispatcher(throughputDispatcher)) }).withDispatcher(throughputDispatcher))
slowOne ! "hogexecutor" slowOne ! "hogexecutor"

View file

@ -49,8 +49,8 @@ class DispatcherActorsSpec extends AkkaSpec {
assert(sFinished.getCount > 0) assert(sFinished.getCount > 0)
sFinished.await sFinished.await
assert(sFinished.getCount === 0) assert(sFinished.getCount === 0)
f.stop() system.stop(f)
s.stop() system.stop(s)
} }
} }
} }

View file

@ -30,14 +30,14 @@ class PinnedActorSpec extends AkkaSpec with BeforeAndAfterEach with DefaultTimeo
val actor = system.actorOf(Props(self { case "OneWay" oneWay.countDown() }).withDispatcher(system.dispatcherFactory.newPinnedDispatcher("test"))) val actor = system.actorOf(Props(self { case "OneWay" oneWay.countDown() }).withDispatcher(system.dispatcherFactory.newPinnedDispatcher("test")))
val result = actor ! "OneWay" val result = actor ! "OneWay"
assert(oneWay.await(1, TimeUnit.SECONDS)) assert(oneWay.await(1, TimeUnit.SECONDS))
actor.stop() system.stop(actor)
} }
"support ask/reply" in { "support ask/reply" in {
val actor = system.actorOf(Props[TestActor].withDispatcher(system.dispatcherFactory.newPinnedDispatcher("test"))) val actor = system.actorOf(Props[TestActor].withDispatcher(system.dispatcherFactory.newPinnedDispatcher("test")))
val result = (actor ? "Hello").as[String] val result = (actor ? "Hello").as[String]
assert("World" === result.get) assert("World" === result.get)
actor.stop() system.stop(actor)
} }
} }
} }

View file

@ -50,7 +50,7 @@ class ListenerSpec extends AkkaSpec {
fooLatch.await fooLatch.await
for (a List(broadcast, a1, a2, a3)) a.stop() for (a List(broadcast, a1, a2, a3)) system.stop(a)
} }
} }
} }

View file

@ -120,7 +120,7 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa
val future = actor ? "Hello" val future = actor ? "Hello"
future.await future.await
test(future, "World") test(future, "World")
actor.stop() system.stop(actor)
} }
} }
"throws an exception" must { "throws an exception" must {
@ -130,7 +130,7 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa
val future = actor ? "Failure" val future = actor ? "Failure"
future.await future.await
test(future, "Expected exception; to test fault-tolerance") test(future, "Expected exception; to test fault-tolerance")
actor.stop() system.stop(actor)
} }
} }
} }
@ -144,8 +144,8 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa
val future = actor1 ? "Hello" flatMap { case s: String actor2 ? s } val future = actor1 ? "Hello" flatMap { case s: String actor2 ? s }
future.await future.await
test(future, "WORLD") test(future, "WORLD")
actor1.stop() system.stop(actor1)
actor2.stop() system.stop(actor2)
} }
} }
"will throw an exception" must { "will throw an exception" must {
@ -156,8 +156,8 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa
val future = actor1 ? "Hello" flatMap { case s: String actor2 ? s } val future = actor1 ? "Hello" flatMap { case s: String actor2 ? s }
future.await future.await
test(future, "/ by zero") test(future, "/ by zero")
actor1.stop() system.stop(actor1)
actor2.stop() system.stop(actor2)
} }
} }
} }
@ -169,8 +169,8 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa
val future = actor1 ? "Hello" flatMap { case i: Int actor2 ? i } val future = actor1 ? "Hello" flatMap { case i: Int actor2 ? i }
future.await future.await
test(future, "World (of class java.lang.String)") test(future, "World (of class java.lang.String)")
actor1.stop() system.stop(actor1)
actor2.stop() system.stop(actor2)
} }
} }
} }
@ -204,7 +204,7 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa
future1.get must be("10-14") future1.get must be("10-14")
assert(checkType(future1, manifest[String])) assert(checkType(future1, manifest[String]))
intercept[ClassCastException] { future2.get } intercept[ClassCastException] { future2.get }
actor.stop() system.stop(actor)
} }
} }
@ -233,7 +233,7 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa
future1.get must be("10-14") future1.get must be("10-14")
intercept[MatchError] { future2.get } intercept[MatchError] { future2.get }
actor.stop() system.stop(actor)
} }
} }
@ -280,7 +280,7 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa
future10.get must be("World") future10.get must be("World")
future11.get must be("Oops!") future11.get must be("Oops!")
actor.stop() system.stop(actor)
} }
} }
@ -396,7 +396,7 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa
val actor = system.actorOf(Props[TestActor]) val actor = system.actorOf(Props[TestActor])
actor ? "Hello" onResult { case "World" latch.open } actor ? "Hello" onResult { case "World" latch.open }
assert(latch.tryAwait(5, TimeUnit.SECONDS)) assert(latch.tryAwait(5, TimeUnit.SECONDS))
actor.stop() system.stop(actor)
} }
"shouldTraverseFutures" in { "shouldTraverseFutures" in {
@ -411,7 +411,7 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa
val oddFutures = List.fill(100)(oddActor ? 'GetNext mapTo manifest[Int]) val oddFutures = List.fill(100)(oddActor ? 'GetNext mapTo manifest[Int])
assert(Future.sequence(oddFutures).get.sum === 10000) assert(Future.sequence(oddFutures).get.sum === 10000)
oddActor.stop() system.stop(oddActor)
val list = (1 to 100).toList val list = (1 to 100).toList
assert(Future.traverse(list)(x Future(x * 2 - 1)).get.sum === 10000) assert(Future.traverse(list)(x Future(x * 2 - 1)).get.sum === 10000)
@ -470,7 +470,7 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa
assert(r.get === "Hello World!") assert(r.get === "Hello World!")
actor.stop system.stop(actor)
} }
"futureComposingWithContinuationsFailureDivideZero" in { "futureComposingWithContinuationsFailureDivideZero" in {

View file

@ -8,7 +8,7 @@ import org.scalatest.BeforeAndAfterEach
import akka.testkit._ import akka.testkit._
import akka.util.duration._ import akka.util.duration._
import java.util.concurrent.atomic._ import java.util.concurrent.atomic._
import akka.actor.{ Props, Actor, ActorRef } import akka.actor.{ Props, Actor, ActorRef, ActorSystem }
import java.util.Comparator import java.util.Comparator
import akka.japi.{ Procedure, Function } import akka.japi.{ Procedure, Function }
@ -33,7 +33,7 @@ abstract class EventBusSpec(busName: String) extends AkkaSpec with BeforeAndAfte
def classifierFor(event: BusType#Event): BusType#Classifier def classifierFor(event: BusType#Event): BusType#Classifier
def disposeSubscriber(subscriber: BusType#Subscriber): Unit def disposeSubscriber(system: ActorSystem, subscriber: BusType#Subscriber): Unit
busName must { busName must {
@ -58,7 +58,7 @@ abstract class EventBusSpec(busName: String) extends AkkaSpec with BeforeAndAfte
"not allow to unsubscribe non-existing subscriber" in { "not allow to unsubscribe non-existing subscriber" in {
val sub = createNewSubscriber() val sub = createNewSubscriber()
bus.unsubscribe(sub, classifier) must be === false bus.unsubscribe(sub, classifier) must be === false
disposeSubscriber(sub) disposeSubscriber(system, sub)
} }
"not allow for the same subscriber to subscribe to the same channel twice" in { "not allow for the same subscriber to subscribe to the same channel twice" in {
@ -80,7 +80,7 @@ abstract class EventBusSpec(busName: String) extends AkkaSpec with BeforeAndAfte
subscribers.zip(classifiers) forall { case (s, c) bus.subscribe(s, c) } must be === true subscribers.zip(classifiers) forall { case (s, c) bus.subscribe(s, c) } must be === true
subscribers.zip(classifiers) forall { case (s, c) bus.unsubscribe(s, c) } must be === true subscribers.zip(classifiers) forall { case (s, c) bus.unsubscribe(s, c) } must be === true
subscribers foreach disposeSubscriber subscribers foreach (disposeSubscriber(system, _))
} }
"publishing events without any subscribers shouldn't be a problem" in { "publishing events without any subscribers shouldn't be a problem" in {
@ -113,7 +113,7 @@ abstract class EventBusSpec(busName: String) extends AkkaSpec with BeforeAndAfte
subscribers foreach { s bus.subscribe(s, classifier) must be === true } subscribers foreach { s bus.subscribe(s, classifier) must be === true }
bus.publish(event) bus.publish(event)
range foreach { _ expectMsg(event) } range foreach { _ expectMsg(event) }
subscribers foreach { s bus.unsubscribe(s, classifier) must be === true; disposeSubscriber(s) } subscribers foreach { s bus.unsubscribe(s, classifier) must be === true; disposeSubscriber(system, s) }
} }
"not publish the given event to any other subscribers than the intended ones" in { "not publish the given event to any other subscribers than the intended ones" in {
@ -136,7 +136,7 @@ abstract class EventBusSpec(busName: String) extends AkkaSpec with BeforeAndAfte
} }
"cleanup subscriber" in { "cleanup subscriber" in {
disposeSubscriber(subscriber) disposeSubscriber(system, subscriber)
} }
} }
} }
@ -165,7 +165,7 @@ class ActorEventBusSpec extends EventBusSpec("ActorEventBus") {
def classifierFor(event: BusType#Event) = event.toString def classifierFor(event: BusType#Event) = event.toString
def disposeSubscriber(subscriber: BusType#Subscriber): Unit = subscriber.stop() def disposeSubscriber(system: ActorSystem, subscriber: BusType#Subscriber): Unit = system.stop(subscriber)
} }
object ScanningEventBusSpec { object ScanningEventBusSpec {
@ -194,7 +194,7 @@ class ScanningEventBusSpec extends EventBusSpec("ScanningEventBus") {
def classifierFor(event: BusType#Event) = event.toString def classifierFor(event: BusType#Event) = event.toString
def disposeSubscriber(subscriber: BusType#Subscriber): Unit = () def disposeSubscriber(system: ActorSystem, subscriber: BusType#Subscriber): Unit = ()
} }
object LookupEventBusSpec { object LookupEventBusSpec {
@ -219,5 +219,5 @@ class LookupEventBusSpec extends EventBusSpec("LookupEventBus") {
def classifierFor(event: BusType#Event) = event.toString def classifierFor(event: BusType#Event) = event.toString
def disposeSubscriber(subscriber: BusType#Subscriber): Unit = () def disposeSubscriber(system: ActorSystem, subscriber: BusType#Subscriber): Unit = ()
} }

View file

@ -52,9 +52,9 @@ class LoggingReceiveSpec extends WordSpec with BeforeAndAfterEach with BeforeAnd
} }
override def afterAll { override def afterAll {
appLogging.stop() appLogging.shutdown()
appAuto.stop() appAuto.shutdown()
appLifecycle.stop() appLifecycle.shutdown()
} }
"A LoggingReceive" must { "A LoggingReceive" must {
@ -201,7 +201,7 @@ class LoggingReceiveSpec extends WordSpec with BeforeAndAfterEach with BeforeAnd
assert(set == Set(1, 2, 3), set + " was not Set(1, 2, 3)") assert(set == Set(1, 2, 3), set + " was not Set(1, 2, 3)")
} }
supervisor.stop() system.stop(supervisor)
expectMsg(Logging.Debug(sname, "stopping")) expectMsg(Logging.Debug(sname, "stopping"))
expectMsg(Logging.Debug(aname, "stopped")) expectMsg(Logging.Debug(aname, "stopped"))
expectMsg(Logging.Debug(sname, "stopped")) expectMsg(Logging.Debug(sname, "stopped"))

View file

@ -75,7 +75,7 @@ class TellLatencyPerformanceSpec extends PerformanceSpec {
ok must be(true) ok must be(true)
logMeasurement(numberOfClients, durationNs, stat) logMeasurement(numberOfClients, durationNs, stat)
} }
clients.foreach(_.stop()) clients.foreach(system.stop(_))
} }
} }

View file

@ -173,8 +173,8 @@ class TellThroughput10000PerformanceSpec extends PerformanceSpec {
ok must be(true) ok must be(true)
logMeasurement(numberOfClients, durationNs, repeat) logMeasurement(numberOfClients, durationNs, repeat)
} }
clients.foreach(_.stop()) clients.foreach(system.stop(_))
destinations.foreach(_.stop()) destinations.foreach(system.stop(_))
} }
} }

View file

@ -147,8 +147,8 @@ class TellThroughputComputationPerformanceSpec extends PerformanceSpec {
ok must be(true) ok must be(true)
logMeasurement(numberOfClients, durationNs, repeat) logMeasurement(numberOfClients, durationNs, repeat)
} }
clients.foreach(_.stop()) clients.foreach(system.stop(_))
destinations.foreach(_.stop()) destinations.foreach(system.stop(_))
} }
} }

View file

@ -78,8 +78,8 @@ class TellThroughputPerformanceSpec extends PerformanceSpec {
ok must be(true) ok must be(true)
logMeasurement(numberOfClients, durationNs, repeat) logMeasurement(numberOfClients, durationNs, repeat)
} }
clients.foreach(_.stop()) clients.foreach(system.stop(_))
destinations.foreach(_.stop()) destinations.foreach(system.stop(_))
} }
} }

View file

@ -159,8 +159,8 @@ class TellThroughputSeparateDispatchersPerformanceSpec extends PerformanceSpec {
ok must be(true) ok must be(true)
logMeasurement(numberOfClients, durationNs, repeat) logMeasurement(numberOfClients, durationNs, repeat)
} }
clients.foreach(_.stop()) clients.foreach(system.stop(_))
destinations.foreach(_.stop()) destinations.foreach(system.stop(_))
} }
} }

View file

@ -108,7 +108,7 @@ class TradingLatencyPerformanceSpec extends PerformanceSpec {
} }
logMeasurement(numberOfClients, durationNs, stat) logMeasurement(numberOfClients, durationNs, stat)
} }
clients.foreach(_.stop()) clients.foreach(system.stop(_))
} }
} }

View file

@ -105,7 +105,7 @@ class TradingThroughputPerformanceSpec extends PerformanceSpec {
} }
logMeasurement(numberOfClients, durationNs, totalNumberOfOrders) logMeasurement(numberOfClients, durationNs, totalNumberOfOrders)
} }
clients.foreach(_.stop()) clients.foreach(system.stop(_))
} }
} }

View file

@ -99,7 +99,7 @@ class ActorPoolSpec extends AkkaSpec with DefaultTimeout {
(pool ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2) (pool ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2)
pool.stop() system.stop(pool)
} }
"pass ticket #705" in { "pass ticket #705" in {
@ -129,7 +129,7 @@ class ActorPoolSpec extends AkkaSpec with DefaultTimeout {
_.await.resultOrException.get must be("Response") _.await.resultOrException.get must be("Response")
} }
} finally { } finally {
pool.stop() system.stop(pool)
} }
} }
@ -194,7 +194,7 @@ class ActorPoolSpec extends AkkaSpec with DefaultTimeout {
(pool ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(4) (pool ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(4)
pool.stop() system.stop(pool)
} }
"grow as needed under mailbox pressure" in { "grow as needed under mailbox pressure" in {
@ -250,7 +250,7 @@ class ActorPoolSpec extends AkkaSpec with DefaultTimeout {
(pool ? ActorPool.Stat).as[ActorPool.Stats].get.size must be >= (3) (pool ? ActorPool.Stat).as[ActorPool.Stats].get.size must be >= (3)
pool.stop() system.stop(pool)
} }
"round robin" in { "round robin" in {
@ -281,7 +281,7 @@ class ActorPoolSpec extends AkkaSpec with DefaultTimeout {
latch1.await latch1.await
delegates.size must be(1) delegates.size must be(1)
pool1.stop() system.stop(pool1)
val latch2 = TestLatch(2) val latch2 = TestLatch(2)
delegates.clear() delegates.clear()
@ -309,7 +309,7 @@ class ActorPoolSpec extends AkkaSpec with DefaultTimeout {
latch2.await latch2.await
delegates.size must be(2) delegates.size must be(2)
pool2.stop() system.stop(pool2)
} }
"backoff" in { "backoff" in {
@ -355,7 +355,7 @@ class ActorPoolSpec extends AkkaSpec with DefaultTimeout {
(pool ? ActorPool.Stat).as[ActorPool.Stats].get.size must be <= (z) (pool ? ActorPool.Stat).as[ActorPool.Stats].get.size must be <= (z)
pool.stop() system.stop(pool)
} }
} }
} }

View file

@ -49,7 +49,7 @@ class ConfiguredLocalRoutingSpec extends AkkaSpec with DefaultTimeout with Impli
actor ! "hello" actor ! "hello"
helloLatch.await(5, TimeUnit.SECONDS) must be(true) helloLatch.await(5, TimeUnit.SECONDS) must be(true)
actor.stop() system.stop(actor)
stopLatch.await(5, TimeUnit.SECONDS) must be(true) stopLatch.await(5, TimeUnit.SECONDS) must be(true)
} }
@ -104,7 +104,7 @@ class ConfiguredLocalRoutingSpec extends AkkaSpec with DefaultTimeout with Impli
actor ! Broadcast("hello") actor ! Broadcast("hello")
helloLatch.await(5, TimeUnit.SECONDS) must be(true) helloLatch.await(5, TimeUnit.SECONDS) must be(true)
actor.stop() system.stop(actor)
stopLatch.await(5, TimeUnit.SECONDS) must be(true) stopLatch.await(5, TimeUnit.SECONDS) must be(true)
} }
} }
@ -134,7 +134,7 @@ class ConfiguredLocalRoutingSpec extends AkkaSpec with DefaultTimeout with Impli
for (i 1 to 5) expectMsg("world") for (i 1 to 5) expectMsg("world")
} }
actor.stop() system.stop(actor)
stopLatch.await(5, TimeUnit.SECONDS) must be(true) stopLatch.await(5, TimeUnit.SECONDS) must be(true)
} }
@ -190,7 +190,7 @@ class ConfiguredLocalRoutingSpec extends AkkaSpec with DefaultTimeout with Impli
actor ! Broadcast("hello") actor ! Broadcast("hello")
helloLatch.await(5, TimeUnit.SECONDS) must be(true) helloLatch.await(5, TimeUnit.SECONDS) must be(true)
actor.stop() system.stop(actor)
stopLatch.await(5, TimeUnit.SECONDS) must be(true) stopLatch.await(5, TimeUnit.SECONDS) must be(true)
} }
} }

View file

@ -43,7 +43,7 @@ class RoutingSpec extends AkkaSpec with DefaultTimeout with ImplicitSender {
val c1, c2 = expectMsgType[ActorRef] val c1, c2 = expectMsgType[ActorRef]
watch(router) watch(router)
watch(c2) watch(c2)
c2.stop() system.stop(c2)
expectMsg(Terminated(c2)) expectMsg(Terminated(c2))
// it might take a while until the Router has actually processed the Terminated message // it might take a while until the Router has actually processed the Terminated message
awaitCond { awaitCond {
@ -54,7 +54,7 @@ class RoutingSpec extends AkkaSpec with DefaultTimeout with ImplicitSender {
} }
res == Seq(c1, c1) res == Seq(c1, c1)
} }
c1.stop() system.stop(c1)
expectMsg(Terminated(router)) expectMsg(Terminated(router))
} }
@ -324,8 +324,8 @@ class RoutingSpec extends AkkaSpec with DefaultTimeout with ImplicitSender {
def newActor(id: Int, shudownLatch: Option[TestLatch] = None) = system.actorOf(Props(new Actor { def newActor(id: Int, shudownLatch: Option[TestLatch] = None) = system.actorOf(Props(new Actor {
def receive = { def receive = {
case Stop(None) self.stop() case Stop(None) context.stop(self)
case Stop(Some(_id)) if (_id == id) self.stop() case Stop(Some(_id)) if (_id == id) context.stop(self)
case _id: Int if (_id == id) case _id: Int if (_id == id)
case x { case x {
Thread sleep 100 * id Thread sleep 100 * id

View file

@ -106,7 +106,7 @@ class SerializeSpec extends AkkaSpec(SerializeSpec.serializationConf) {
})) }))
a ! new ObjectOutputStream(new ByteArrayOutputStream()) a ! new ObjectOutputStream(new ByteArrayOutputStream())
expectMsg("pass") expectMsg("pass")
a.stop() system.stop(a)
} }
"serialize DeadLetterActorRef" in { "serialize DeadLetterActorRef" in {
@ -124,7 +124,7 @@ class SerializeSpec extends AkkaSpec(SerializeSpec.serializationConf) {
(deadLetters eq a.deadLetters) must be(true) (deadLetters eq a.deadLetters) must be(true)
} }
} finally { } finally {
a.stop() a.shutdown()
} }
} }
} }

View file

@ -37,6 +37,7 @@ akka {
actor { actor {
provider = "akka.actor.LocalActorRefProvider" provider = "akka.actor.LocalActorRefProvider"
creation-timeout = 20s # Timeout for ActorSystem.actorOf creation-timeout = 20s # Timeout for ActorSystem.actorOf
reaper-interval = 5s # frequency with which stopping actors are prodded in case they had to be removed from their parents
timeout = 5s # Default timeout for Future based invocations timeout = 5s # Default timeout for Future based invocations
# - Actor: ask && ? # - Actor: ask && ?
# - UntypedActor: ask # - UntypedActor: ask

View file

@ -156,15 +156,44 @@ object Actor {
/** /**
* Actor base trait that should be extended by or mixed to create an Actor with the semantics of the 'Actor Model': * Actor base trait that should be extended by or mixed to create an Actor with the semantics of the 'Actor Model':
* <a href="http://en.wikipedia.org/wiki/Actor_model">http://en.wikipedia.org/wiki/Actor_model</a> * <a href="http://en.wikipedia.org/wiki/Actor_model">http://en.wikipedia.org/wiki/Actor_model</a>
* <p/>
* An actor has a well-defined (non-cyclic) life-cycle.
* <pre>
* => RUNNING (created and started actor) - can receive messages
* => SHUTDOWN (when 'stop' or 'exit' is invoked) - can't do anything
* </pre>
* *
* <p/> * An actor has a well-defined (non-cyclic) life-cycle.
* The Actor's own ActorRef is available in the 'self' member variable. * - ''RUNNING'' (created and started actor) - can receive messages
* - ''SHUTDOWN'' (when 'stop' or 'exit' is invoked) - can't do anything
*
* The Actor's own [[akka.actor.ActorRef]] is available as `self`, the current
* messages sender as `sender` and the [[akka.actor.ActorContext]] as
* `context`. The only abstract method is `receive` which shall return the
* initial behavior of the actor as a partial function (behavior can be changed
* using `context.become` and `context.unbecome`).
*
* {{{
* class ExampleActor extends Actor {
* def receive = {
* // directly calculated reply
* case Request(r) => sender ! calculate(r)
*
* // just to demonstrate how to stop yourself
* case Shutdown => context.stop(self)
*
* // error kernel with child replying directly to customer
* case Dangerous(r) => context.actorOf(Props[ReplyToOriginWorker]).tell(PerformWork(r), sender)
*
* // error kernel with reply going through us
* case OtherJob(r) => context.actorOf(Props[ReplyToMeWorker]) ! JobRequest(r, sender)
* case JobReply(result, orig_s) => orig_s ! result
* }
* }
* }}}
*
* The last line demonstrates the essence of the error kernel design: spawn
* one-off actors which terminate after doing their job, pass on `sender` to
* allow direct reply if that is what makes sense, or round-trip the sender
* as shown with the fictitious JobRequest/JobReply message pair.
*
* If you dont like writing `context` you can always `import context._` to get
* direct access to `actorOf`, `stop` etc. This is not default in order to keep
* the name-space clean.
*/ */
trait Actor { trait Actor {
@ -218,25 +247,8 @@ trait Actor {
final def sender: ActorRef = context.sender final def sender: ActorRef = context.sender
/** /**
* User overridable callback/setting. * This defines the initial actor behavior, it must return a partial function
* <p/> * with the actor logic.
* Partial function implementing the actor logic.
* To be implemented by concrete actor class.
* <p/>
* Example code:
* <pre>
* def receive = {
* case Ping =&gt;
* println("got a 'Ping' message")
* sender ! "pong"
*
* case OneWay =&gt;
* println("got a 'OneWay' message")
*
* case unknown =&gt;
* println("unknown message: " + unknown)
* }
* </pre>
*/ */
protected def receive: Receive protected def receive: Receive
@ -258,19 +270,20 @@ trait Actor {
def postStop() {} def postStop() {}
/** /**
* User overridable callback. * User overridable callback: '''By default it disposes of all children and then calls `postStop()`.'''
* <p/> * <p/>
* Is called on a crashed Actor right BEFORE it is restarted to allow clean * Is called on a crashed Actor right BEFORE it is restarted to allow clean
* up of resources before Actor is terminated. * up of resources before Actor is terminated.
* By default it calls postStop()
*/ */
def preRestart(reason: Throwable, message: Option[Any]) { postStop() } def preRestart(reason: Throwable, message: Option[Any]) {
context.children foreach (context.stop(_))
postStop()
}
/** /**
* User overridable callback. * User overridable callback: By default it calls `preStart()`.
* <p/> * <p/>
* Is called right AFTER restart on the newly created Actor to allow reinitialization after an Actor crash. * Is called right AFTER restart on the newly created Actor to allow reinitialization after an Actor crash.
* By default it calls preStart()
*/ */
def postRestart(reason: Throwable) { preStart() } def postRestart(reason: Throwable) { preStart() }
@ -278,7 +291,9 @@ trait Actor {
* User overridable callback. * User overridable callback.
* <p/> * <p/>
* Is called when a message isn't handled by the current behavior of the actor * Is called when a message isn't handled by the current behavior of the actor
* by default it does: EventHandler.warning(self, message) * by default it fails with either a [[akka.actor.DeathPactException]] (in
* case of an unhandled [[akka.actor.Terminated]] message) or a
* [[akka.actor.UnhandledMessageException]].
*/ */
def unhandled(message: Any) { def unhandled(message: Any) {
message match { message match {

View file

@ -185,7 +185,7 @@ private[akka] class ActorCell(
val system: ActorSystemImpl, val system: ActorSystemImpl,
val self: InternalActorRef, val self: InternalActorRef,
val props: Props, val props: Props,
val parent: InternalActorRef, @volatile var parent: InternalActorRef,
/*no member*/ _receiveTimeout: Option[Duration], /*no member*/ _receiveTimeout: Option[Duration],
var hotswap: Stack[PartialFunction[Any, Unit]]) extends UntypedActorContext { var hotswap: Stack[PartialFunction[Any, Unit]]) extends UntypedActorContext {
@ -242,6 +242,16 @@ private[akka] class ActorCell(
_actorOf(props, name) _actorOf(props, name)
} }
final def stop(actor: ActorRef): Unit = {
val a = actor.asInstanceOf[InternalActorRef]
if (childrenRefs contains actor.path.name) {
system.locker ! a
childrenRefs -= actor.path.name
handleChildTerminated(actor)
}
a.stop()
}
final var currentMessage: Envelope = null final var currentMessage: Envelope = null
final var actor: Actor = _ final var actor: Actor = _
@ -405,7 +415,8 @@ private[akka] class ActorCell(
// do not process normal messages while waiting for all children to terminate // do not process normal messages while waiting for all children to terminate
dispatcher suspend this dispatcher suspend this
if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self.path.toString, "stopping")) if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self.path.toString, "stopping"))
for (child c) child.stop() // do not use stop(child) because that would dissociate the children from us, but we still want to wait for them
for (child c) child.asInstanceOf[InternalActorRef].stop()
stopping = true stopping = true
} }
} }
@ -550,15 +561,17 @@ private[akka] class ActorCell(
} }
final def handleFailure(child: ActorRef, cause: Throwable): Unit = childrenRefs.get(child.path.name) match { final def handleFailure(child: ActorRef, cause: Throwable): Unit = childrenRefs.get(child.path.name) match {
case Some(stats) if stats.child == child if (!props.faultHandler.handleFailure(child, cause, stats, childrenRefs.values)) throw cause case Some(stats) if stats.child == child if (!props.faultHandler.handleFailure(this, child, cause, stats, childrenRefs.values)) throw cause
case Some(stats) system.eventStream.publish(Warning(self.path.toString, "dropping Failed(" + cause + ") from unknown child " + child + " matching names but not the same, was: " + stats.child)) case Some(stats) system.eventStream.publish(Warning(self.path.toString, "dropping Failed(" + cause + ") from unknown child " + child + " matching names but not the same, was: " + stats.child))
case None system.eventStream.publish(Warning(self.path.toString, "dropping Failed(" + cause + ") from unknown child " + child)) case None system.eventStream.publish(Warning(self.path.toString, "dropping Failed(" + cause + ") from unknown child " + child))
} }
final def handleChildTerminated(child: ActorRef): Unit = { final def handleChildTerminated(child: ActorRef): Unit = {
childrenRefs -= child.path.name if (childrenRefs contains child.path.name) {
props.faultHandler.handleChildTerminated(child, children) childrenRefs -= child.path.name
if (stopping && childrenRefs.isEmpty) doTerminate() props.faultHandler.handleChildTerminated(this, child, children)
if (stopping && childrenRefs.isEmpty) doTerminate()
} else system.locker ! ChildTerminated(child)
} }
// ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅ // ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅

View file

@ -110,11 +110,6 @@ abstract class ActorRef extends java.lang.Comparable[ActorRef] with Serializable
*/ */
def forward(message: Any)(implicit context: ActorContext) = tell(message, context.sender) def forward(message: Any)(implicit context: ActorContext) = tell(message, context.sender)
/**
* Shuts down the actor its dispatcher and message queue.
*/
def stop(): Unit
/** /**
* Is the actor shut down? * Is the actor shut down?
*/ */
@ -192,6 +187,7 @@ private[akka] abstract class InternalActorRef extends ActorRef with ScalaActorRe
def resume(): Unit def resume(): Unit
def suspend(): Unit def suspend(): Unit
def restart(cause: Throwable): Unit def restart(cause: Throwable): Unit
def stop(): Unit
def sendSystemMessage(message: SystemMessage): Unit def sendSystemMessage(message: SystemMessage): Unit
def getParent: InternalActorRef def getParent: InternalActorRef
/** /**

View file

@ -234,6 +234,18 @@ trait ActorRefFactory {
* replies in order to resolve the matching set of actors. * replies in order to resolve the matching set of actors.
*/ */
def actorSelection(path: String): ActorSelection = ActorSelection(lookupRoot, path) def actorSelection(path: String): ActorSelection = ActorSelection(lookupRoot, path)
/**
* Stop the actor pointed to by the given [[akka.actor.ActorRef]]; this is
* an asynchronous operation, i.e. involves a message send, but if invoked
* on an [[akka.actor.ActorContext]] if operating on a child of that
* context it will free up the name for immediate reuse.
*
* When invoked on [[akka.actor.ActorSystem]] for a top-level actor, this
* method sends a message to the guardian actor and blocks waiting for a reply,
* see `akka.actor.creation-timeout` in the `reference.conf`.
*/
def stop(actor: ActorRef): Unit
} }
class ActorRefProviderException(message: String) extends AkkaException(message) class ActorRefProviderException(message: String) extends AkkaException(message)
@ -248,6 +260,11 @@ private[akka] case class CreateChild(props: Props, name: String)
*/ */
private[akka] case class CreateRandomNameChild(props: Props) private[akka] case class CreateRandomNameChild(props: Props)
/**
* Internal Akka use only, used in implementation of system.stop(child).
*/
private[akka] case class StopChild(child: ActorRef)
/** /**
* Local ActorRef provider. * Local ActorRef provider.
*/ */
@ -309,7 +326,7 @@ class LocalActorRefProvider(
override def isTerminated = stopped.isOn override def isTerminated = stopped.isOn
override def !(message: Any)(implicit sender: ActorRef = null): Unit = stopped.ifOff(message match { override def !(message: Any)(implicit sender: ActorRef = null): Unit = stopped.ifOff(message match {
case Failed(ex) if sender ne null causeOfTermination = Some(ex); sender.stop() case Failed(ex) if sender ne null causeOfTermination = Some(ex); sender.asInstanceOf[InternalActorRef].stop()
case _ log.error(this + " received unexpected message [" + message + "]") case _ log.error(this + " received unexpected message [" + message + "]")
}) })
@ -329,9 +346,10 @@ class LocalActorRefProvider(
*/ */
private class Guardian extends Actor { private class Guardian extends Actor {
def receive = { def receive = {
case Terminated(_) context.self.stop() case Terminated(_) context.stop(self)
case CreateChild(child, name) sender ! (try context.actorOf(child, name) catch { case e: Exception e }) case CreateChild(child, name) sender ! (try context.actorOf(child, name) catch { case e: Exception e })
case CreateRandomNameChild(child) sender ! (try context.actorOf(child) catch { case e: Exception e }) case CreateRandomNameChild(child) sender ! (try context.actorOf(child) catch { case e: Exception e })
case StopChild(child) context.stop(child); sender ! "ok"
case m deadLetters ! DeadLetter(m, sender, self) case m deadLetters ! DeadLetter(m, sender, self)
} }
} }
@ -345,9 +363,10 @@ class LocalActorRefProvider(
def receive = { def receive = {
case Terminated(_) case Terminated(_)
eventStream.stopDefaultLoggers() eventStream.stopDefaultLoggers()
context.self.stop() context.stop(self)
case CreateChild(child, name) sender ! (try context.actorOf(child, name) catch { case e: Exception e }) case CreateChild(child, name) sender ! (try context.actorOf(child, name) catch { case e: Exception e })
case CreateRandomNameChild(child) sender ! (try context.actorOf(child) catch { case e: Exception e }) case CreateRandomNameChild(child) sender ! (try context.actorOf(child) catch { case e: Exception e })
case StopChild(child) context.stop(child); sender ! "ok"
case m deadLetters ! DeadLetter(m, sender, self) case m deadLetters ! DeadLetter(m, sender, self)
} }
} }
@ -508,6 +527,9 @@ class DefaultScheduler(hashedWheelTimer: HashedWheelTimer, log: LoggingAdapter,
def schedule(initialDelay: Duration, delay: Duration)(f: Unit): Cancellable = def schedule(initialDelay: Duration, delay: Duration)(f: Unit): Cancellable =
new DefaultCancellable(hashedWheelTimer.newTimeout(createContinuousTask(delay, f), initialDelay)) new DefaultCancellable(hashedWheelTimer.newTimeout(createContinuousTask(delay, f), initialDelay))
def schedule(initialDelay: Duration, delay: Duration, runnable: Runnable): Cancellable =
new DefaultCancellable(hashedWheelTimer.newTimeout(createContinuousTask(delay, runnable), initialDelay))
def scheduleOnce(delay: Duration, runnable: Runnable): Cancellable = def scheduleOnce(delay: Duration, runnable: Runnable): Cancellable =
new DefaultCancellable(hashedWheelTimer.newTimeout(createSingleTask(runnable), delay)) new DefaultCancellable(hashedWheelTimer.newTimeout(createSingleTask(runnable), delay))
@ -565,6 +587,17 @@ class DefaultScheduler(hashedWheelTimer: HashedWheelTimer, log: LoggingAdapter,
} }
} }
private def createContinuousTask(delay: Duration, runnable: Runnable): TimerTask = {
new TimerTask {
def run(timeout: org.jboss.netty.akka.util.Timeout) {
dispatcher.dispatchTask(() runnable.run())
try timeout.getTimer.newTimeout(this, delay) catch {
case _: IllegalStateException // stop recurring if timer is stopped
}
}
}
}
private def execDirectly(t: HWTimeout): Unit = { private def execDirectly(t: HWTimeout): Unit = {
try t.getTask.run(t) catch { try t.getTask.run(t) catch {
case e: InterruptedException throw e case e: InterruptedException throw e

View file

@ -73,6 +73,7 @@ object ActorSystem {
val ProviderClass = getString("akka.actor.provider") val ProviderClass = getString("akka.actor.provider")
val CreationTimeout = Timeout(Duration(getMilliseconds("akka.actor.creation-timeout"), MILLISECONDS)) val CreationTimeout = Timeout(Duration(getMilliseconds("akka.actor.creation-timeout"), MILLISECONDS))
val ReaperInterval = Duration(getMilliseconds("akka.actor.reaper-interval"), MILLISECONDS)
val ActorTimeout = Timeout(Duration(getMilliseconds("akka.actor.timeout"), MILLISECONDS)) val ActorTimeout = Timeout(Duration(getMilliseconds("akka.actor.timeout"), MILLISECONDS))
val SerializeAllMessages = getBoolean("akka.actor.serialize-messages") val SerializeAllMessages = getBoolean("akka.actor.serialize-messages")
@ -300,7 +301,7 @@ abstract class ActorSystem extends ActorRefFactory {
* (below which the logging actors reside) and the execute all registered * (below which the logging actors reside) and the execute all registered
* termination handlers (see [[ActorSystem.registerOnTermination]]). * termination handlers (see [[ActorSystem.registerOnTermination]]).
*/ */
def stop() def shutdown()
/** /**
* Registers the provided extension and creates its payload, if this extension isn't already registered * Registers the provided extension and creates its payload, if this extension isn't already registered
@ -361,6 +362,18 @@ class ActorSystemImpl(val name: String, applicationConfig: Config) extends Actor
} }
} }
def stop(actor: ActorRef): Unit = {
implicit val timeout = settings.CreationTimeout
val path = actor.path
val guard = guardian.path
val sys = systemGuardian.path
path.parent match {
case `guard` (guardian ? StopChild(actor)).get
case `sys` (systemGuardian ? StopChild(actor)).get
case _ actor.asInstanceOf[InternalActorRef].stop()
}
}
import settings._ import settings._
// this provides basic logging (to stdout) until .start() is called below // this provides basic logging (to stdout) until .start() is called below
@ -428,13 +441,15 @@ class ActorSystemImpl(val name: String, applicationConfig: Config) extends Actor
this this
} }
lazy val locker: Locker = new Locker(scheduler, ReaperInterval, lookupRoot.path / "locker", deathWatch)
def start() = _start def start() = _start
def registerOnTermination[T](code: T) { terminationFuture onComplete (_ code) } def registerOnTermination[T](code: T) { terminationFuture onComplete (_ code) }
def registerOnTermination(code: Runnable) { terminationFuture onComplete (_ code.run) } def registerOnTermination(code: Runnable) { terminationFuture onComplete (_ code.run) }
def stop() { def shutdown() {
guardian.stop() stop(guardian)
} }
/** /**

View file

@ -511,7 +511,7 @@ trait FSM[S, D] extends ListenerManagement {
case _ case _
nextState.replies.reverse foreach { r sender ! r } nextState.replies.reverse foreach { r sender ! r }
terminate(nextState) terminate(nextState)
self.stop() context.stop(self)
} }
} }

View file

@ -119,12 +119,12 @@ abstract class FaultHandlingStrategy {
/** /**
* This method is called after the child has been removed from the set of children. * This method is called after the child has been removed from the set of children.
*/ */
def handleChildTerminated(child: ActorRef, children: Iterable[ActorRef]): Unit def handleChildTerminated(context: ActorContext, child: ActorRef, children: Iterable[ActorRef]): Unit
/** /**
* This method is called to act on the failure of a child: restart if the flag is true, stop otherwise. * This method is called to act on the failure of a child: restart if the flag is true, stop otherwise.
*/ */
def processFailure(restart: Boolean, child: ActorRef, cause: Throwable, stats: ChildRestartStats, children: Iterable[ChildRestartStats]): Unit def processFailure(context: ActorContext, restart: Boolean, child: ActorRef, cause: Throwable, stats: ChildRestartStats, children: Iterable[ChildRestartStats]): Unit
def handleSupervisorFailing(supervisor: ActorRef, children: Iterable[ActorRef]): Unit = { def handleSupervisorFailing(supervisor: ActorRef, children: Iterable[ActorRef]): Unit = {
if (children.nonEmpty) if (children.nonEmpty)
@ -139,12 +139,12 @@ abstract class FaultHandlingStrategy {
/** /**
* Returns whether it processed the failure or not * Returns whether it processed the failure or not
*/ */
def handleFailure(child: ActorRef, cause: Throwable, stats: ChildRestartStats, children: Iterable[ChildRestartStats]): Boolean = { def handleFailure(context: ActorContext, child: ActorRef, cause: Throwable, stats: ChildRestartStats, children: Iterable[ChildRestartStats]): Boolean = {
val action = if (decider.isDefinedAt(cause)) decider(cause) else Escalate val action = if (decider.isDefinedAt(cause)) decider(cause) else Escalate
action match { action match {
case Resume child.asInstanceOf[InternalActorRef].resume(); true case Resume child.asInstanceOf[InternalActorRef].resume(); true
case Restart processFailure(true, child, cause, stats, children); true case Restart processFailure(context, true, child, cause, stats, children); true
case Stop processFailure(false, child, cause, stats, children); true case Stop processFailure(context, false, child, cause, stats, children); true
case Escalate false case Escalate false
} }
} }
@ -192,17 +192,17 @@ case class AllForOneStrategy(decider: FaultHandlingStrategy.Decider,
*/ */
val retriesWindow = (maxNrOfRetries, withinTimeRange) val retriesWindow = (maxNrOfRetries, withinTimeRange)
def handleChildTerminated(child: ActorRef, children: Iterable[ActorRef]): Unit = { def handleChildTerminated(context: ActorContext, child: ActorRef, children: Iterable[ActorRef]): Unit = {
children foreach (_.stop()) children foreach (context.stop(_))
//TODO optimization to drop all children here already? //TODO optimization to drop all children here already?
} }
def processFailure(restart: Boolean, child: ActorRef, cause: Throwable, stats: ChildRestartStats, children: Iterable[ChildRestartStats]): Unit = { def processFailure(context: ActorContext, restart: Boolean, child: ActorRef, cause: Throwable, stats: ChildRestartStats, children: Iterable[ChildRestartStats]): Unit = {
if (children.nonEmpty) { if (children.nonEmpty) {
if (restart && children.forall(_.requestRestartPermission(retriesWindow))) if (restart && children.forall(_.requestRestartPermission(retriesWindow)))
children.foreach(_.child.asInstanceOf[InternalActorRef].restart(cause)) children.foreach(_.child.asInstanceOf[InternalActorRef].restart(cause))
else else
children.foreach(_.child.stop()) for (c children) context.stop(c.child)
} }
} }
} }
@ -249,13 +249,13 @@ case class OneForOneStrategy(decider: FaultHandlingStrategy.Decider,
*/ */
val retriesWindow = (maxNrOfRetries, withinTimeRange) val retriesWindow = (maxNrOfRetries, withinTimeRange)
def handleChildTerminated(child: ActorRef, children: Iterable[ActorRef]): Unit = {} def handleChildTerminated(context: ActorContext, child: ActorRef, children: Iterable[ActorRef]): Unit = {}
def processFailure(restart: Boolean, child: ActorRef, cause: Throwable, stats: ChildRestartStats, children: Iterable[ChildRestartStats]): Unit = { def processFailure(context: ActorContext, restart: Boolean, child: ActorRef, cause: Throwable, stats: ChildRestartStats, children: Iterable[ChildRestartStats]): Unit = {
if (restart && stats.requestRestartPermission(retriesWindow)) if (restart && stats.requestRestartPermission(retriesWindow))
child.asInstanceOf[InternalActorRef].restart(cause) child.asInstanceOf[InternalActorRef].restart(cause)
else else
child.stop() //TODO optimization to drop child here already? context.stop(child) //TODO optimization to drop child here already?
} }
} }

View file

@ -0,0 +1,50 @@
/**
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.actor
import akka.dispatch._
import akka.util.Duration
import akka.util.duration._
import java.util.concurrent.ConcurrentHashMap
import akka.event.DeathWatch
class Locker(scheduler: Scheduler, period: Duration, val path: ActorPath, val deathWatch: DeathWatch) extends MinimalActorRef {
class DavyJones extends Runnable {
def run = {
val iter = heap.entrySet.iterator
while (iter.hasNext) {
val soul = iter.next()
deathWatch.subscribe(Locker.this, soul.getKey) // in case Terminated got lost somewhere
soul.getKey match {
case _: LocalActorRef // nothing to do, they know what they signed up for
case nonlocal nonlocal.stop() // try again in case it was due to a communications failure
}
}
}
}
private val heap = new ConcurrentHashMap[InternalActorRef, Long]
scheduler.schedule(period, period, new DavyJones)
override def sendSystemMessage(msg: SystemMessage): Unit = this.!(msg)
override def !(msg: Any)(implicit sender: ActorRef = null): Unit = msg match {
case Terminated(soul) heap.remove(soul)
case ChildTerminated(soul) heap.remove(soul)
case soul: InternalActorRef
heap.put(soul, 0l) // wanted to put System.nanoTime and do something intelligent, but forgot what that was
deathWatch.subscribe(this, soul)
// now re-bind the soul so that it does not drown its parent
soul match {
case local: LocalActorRef
val cell = local.underlying
cell.parent = this
case _
}
case _ // ignore
}
}

View file

@ -42,6 +42,15 @@ trait Scheduler {
*/ */
def schedule(initialDelay: Duration, frequency: Duration)(f: Unit): Cancellable def schedule(initialDelay: Duration, frequency: Duration)(f: Unit): Cancellable
/**
* Schedules a function to be run repeatedly with an initial delay and a frequency.
* E.g. if you would like the function to be run after 2 seconds and thereafter every 100ms you would set
* delay = Duration(2, TimeUnit.SECONDS) and frequency = Duration(100, TimeUnit.MILLISECONDS)
*
* Java API
*/
def schedule(initialDelay: Duration, frequency: Duration, runnable: Runnable): Cancellable
/** /**
* Schedules a Runnable to be run once with a delay, i.e. a time period that has to pass before the runnable is executed. * Schedules a Runnable to be run once with a delay, i.e. a time period that has to pass before the runnable is executed.
* *

View file

@ -24,7 +24,7 @@ trait TypedActorFactory {
*/ */
def stop(proxy: AnyRef): Boolean = getActorRefFor(proxy) match { def stop(proxy: AnyRef): Boolean = getActorRefFor(proxy) match {
case null false case null false
case ref ref.stop; true case ref ref.asInstanceOf[InternalActorRef].stop; true
} }
/** /**

View file

@ -8,46 +8,67 @@ import akka.japi.{ Creator, Procedure }
import akka.dispatch.{ MessageDispatcher, Promise } import akka.dispatch.{ MessageDispatcher, Promise }
/** /**
* Actor base trait that should be extended by or mixed to create an Actor with the semantics of the 'Actor Model':
* <a href="http://en.wikipedia.org/wiki/Actor_model">http://en.wikipedia.org/wiki/Actor_model</a>
*
* This class is the Java cousin to the [[akka.actor.Actor]] Scala interface.
* Subclass this abstract class to create a MDB-style untyped actor. * Subclass this abstract class to create a MDB-style untyped actor.
* <p/> *
* This class is meant to be used from Java. * An actor has a well-defined (non-cyclic) life-cycle.
* <p/> * - ''RUNNING'' (created and started actor) - can receive messages
* - ''SHUTDOWN'' (when 'stop' or 'exit' is invoked) - can't do anything
*
* The Actor's own [[akka.actor.ActorRef]] is available as `getSelf()`, the current
* messages sender as `getSender()` and the [[akka.actor.UntypedActorContext]] as
* `getContext()`. The only abstract method is `onReceive()` which is invoked for
* each processed message unless dynamically overridden using `getContext().become()`.
*
* Here is an example on how to create and use an UntypedActor: * Here is an example on how to create and use an UntypedActor:
* <pre> *
* {{{
* public class SampleUntypedActor extends UntypedActor { * public class SampleUntypedActor extends UntypedActor {
*
* public class Reply {
* final public ActorRef sender;
* final public Result result;
* Reply(ActorRef sender, Result result) {
* this.sender = sender;
* this.result = result;
* }
* }
*
* public void onReceive(Object message) throws Exception { * public void onReceive(Object message) throws Exception {
* if (message instanceof String) { * if (message instanceof String) {
* String msg = (String)message; * String msg = (String)message;
* *
* if (msg.equals("UseReply")) { * if (msg.equals("UseSender")) {
* // Reply to original sender of message using the 'reply' method * // Reply to original sender of message
* getContext().getSender().tell(msg + ":" + getSelf().getAddress()); * getSender().tell(msg + ":" + getSelf());
*
* } else if (msg.equals("UseSender") && getSender().isDefined()) {
* // Reply to original sender of message using the sender reference
* // also passing along my own reference (the self)
* getSender().get().tell(msg, getSelf());
* *
* } else if (msg.equals("SendToSelf")) { * } else if (msg.equals("SendToSelf")) {
* // Send message to the actor itself recursively * // Send message to the actor itself recursively
* getSelf().tell(msg) * getSelf().tell("SomeOtherMessage");
* *
* } else if (msg.equals("ForwardMessage")) { * } else if (msg.equals("ErrorKernelWithDirectReply")) {
* // Retreive an actor from the ActorRegistry by ID and get an ActorRef back * // Send work to one-off child which will reply directly to original sender
* ActorRef actorRef = Actor.registry.local.actorsFor("some-actor-id").head(); * getContext().actorOf(new Props(Worker.class)).tell("DoSomeDangerousWork", getSender());
*
* } else if (msg.equals("ErrorKernelWithReplyHere")) {
* // Send work to one-off child and collect the answer, reply handled further down
* getContext().actorOf(new Props(Worker.class)).tell("DoWorkAndReplyToMe");
* *
* } else throw new IllegalArgumentException("Unknown message: " + message); * } else throw new IllegalArgumentException("Unknown message: " + message);
*
* } else if (message instanceof Reply) {
*
* final Reply reply = (Reply) message;
* // might want to do some processing/book-keeping here
* reply.sender.tell(reply.result);
*
* } else throw new IllegalArgumentException("Unknown message: " + message); * } else throw new IllegalArgumentException("Unknown message: " + message);
* } * }
*
* public static void main(String[] args) {
* ActorSystem system = ActorSystem.create("Sample");
* ActorRef actor = system.actorOf(SampleUntypedActor.class);
* actor.tell("SendToSelf");
* actor.stop();
* }
* } * }
* </pre> * }}}
*/ */
abstract class UntypedActor extends Actor { abstract class UntypedActor extends Actor {
@ -65,8 +86,9 @@ abstract class UntypedActor extends Actor {
def getSelf(): ActorRef = self def getSelf(): ActorRef = self
/** /**
* The reference sender Actor of the last received message. * The reference sender Actor of the currently processed message. This is
* Is defined if the message was sent from another Actor, else None. * always a legal destination to send to, even if there is no logical recipient
* for the reply, in which case it will be sent to the dead letter mailbox.
*/ */
def getSender(): ActorRef = sender def getSender(): ActorRef = sender
@ -77,7 +99,7 @@ abstract class UntypedActor extends Actor {
* Actor are automatically started asynchronously when created. * Actor are automatically started asynchronously when created.
* Empty default implementation. * Empty default implementation.
*/ */
override def preStart() {} override def preStart(): Unit = super.preStart()
/** /**
* User overridable callback. * User overridable callback.
@ -85,24 +107,22 @@ abstract class UntypedActor extends Actor {
* Is called asynchronously after 'actor.stop()' is invoked. * Is called asynchronously after 'actor.stop()' is invoked.
* Empty default implementation. * Empty default implementation.
*/ */
override def postStop() {} override def postStop(): Unit = super.postStop()
/** /**
* User overridable callback. * User overridable callback: '''By default it disposes of all children and then calls `postStop()`.'''
* <p/> * <p/>
* Is called on a crashed Actor right BEFORE it is restarted to allow clean * Is called on a crashed Actor right BEFORE it is restarted to allow clean
* up of resources before Actor is terminated. * up of resources before Actor is terminated.
* By default it calls postStop()
*/ */
override def preRestart(reason: Throwable, message: Option[Any]) { postStop() } override def preRestart(reason: Throwable, message: Option[Any]): Unit = super.preRestart(reason, message)
/** /**
* User overridable callback. * User overridable callback: By default it calls `preStart()`.
* <p/> * <p/>
* Is called right AFTER restart on the newly created Actor to allow reinitialization after an Actor crash. * Is called right AFTER restart on the newly created Actor to allow reinitialization after an Actor crash.
* By default it calls preStart()
*/ */
override def postRestart(reason: Throwable) { preStart() } override def postRestart(reason: Throwable): Unit = super.postRestart(reason)
/** /**
* User overridable callback. * User overridable callback.

View file

@ -137,7 +137,10 @@ trait LoggingBus extends ActorEventBus {
} { } {
// this is very necessary, else you get infinite loop with DeadLetter // this is very necessary, else you get infinite loop with DeadLetter
unsubscribe(logger) unsubscribe(logger)
logger.stop() logger match {
case ref: InternalActorRef ref.stop()
case _
}
} }
publish(Debug(simpleName(this), "all default loggers stopped")) publish(Debug(simpleName(this), "all default loggers stopped"))
} }

View file

@ -66,49 +66,3 @@ trait ConnectionManager {
*/ */
def remove(deadRef: ActorRef) def remove(deadRef: ActorRef)
} }
/**
* Manages local connections for a router, e.g. local actors.
*/
class LocalConnectionManager(initialConnections: Iterable[ActorRef]) extends ConnectionManager {
def this(iterable: java.lang.Iterable[ActorRef]) {
this(JavaConverters.iterableAsScalaIterableConverter(iterable).asScala)
}
case class State(version: Long, connections: Iterable[ActorRef]) extends VersionedIterable[ActorRef] {
def iterable = connections
}
private val state: AtomicReference[State] = new AtomicReference[State](newState())
private def newState() = State(Long.MinValue, initialConnections)
def version: Long = state.get.version
def size: Int = state.get.connections.size
def isEmpty: Boolean = state.get.connections.isEmpty
def connections = state.get
def shutdown() {
state.get.connections foreach (_.stop())
}
@tailrec
final def remove(ref: ActorRef) = {
val oldState = state.get
//remote the ref from the connections.
var newList = oldState.connections.filter(currentActorRef currentActorRef ne ref)
if (newList.size != oldState.connections.size) {
//one or more occurrences of the actorRef were removed, so we need to update the state.
val newState = State(oldState.version + 1, newList)
//if we are not able to update the state, we just try again.
if (!state.compareAndSet(oldState, newState)) remove(ref)
}
}
}

View file

@ -132,7 +132,7 @@ trait Router extends Actor {
case Terminated(child) case Terminated(child)
ref._routees = ref._routees filterNot (_ == child) ref._routees = ref._routees filterNot (_ == child)
if (ref.routees.isEmpty) self.stop() if (ref.routees.isEmpty) context.stop(self)
}: Receive) orElse routerReceive }: Receive) orElse routerReceive

View file

@ -4,13 +4,15 @@
package akka.util package akka.util
import akka.actor.Actor
import java.util.concurrent.ConcurrentSkipListSet import java.util.concurrent.ConcurrentSkipListSet
import akka.actor.{ ActorInitializationException, ActorRef } import akka.actor.{ ActorInitializationException, ActorRef }
/** /**
* A manager for listener actors. Intended for mixin by observables. * A manager for listener actors. Intended for mixin by observables.
*/ */
trait ListenerManagement { trait ListenerManagement { this: Actor
private val listeners = new ConcurrentSkipListSet[ActorRef] private val listeners = new ConcurrentSkipListSet[ActorRef]
@ -33,7 +35,7 @@ trait ListenerManagement {
*/ */
def removeListener(listener: ActorRef) { def removeListener(listener: ActorRef) {
listeners remove listener listeners remove listener
if (manageLifeCycleOfListeners) listener.stop() if (manageLifeCycleOfListeners) context.stop(listener)
} }
/* /*

View file

@ -62,6 +62,6 @@ class SchedulerDocSpec extends AkkaSpec(Map("akka.loglevel" -> "INFO")) {
//This cancels further Ticks to be sent //This cancels further Ticks to be sent
cancellable.cancel() cancellable.cancel()
//#schedule-recurring //#schedule-recurring
tickActor.stop() system.stop(tickActor)
} }
} }

View file

@ -26,7 +26,7 @@ class ConfigDocSpec extends WordSpec with MustMatchers {
val system = ActorSystem("MySystem", ConfigFactory.load(customConf)) val system = ActorSystem("MySystem", ConfigFactory.load(customConf))
//#custom-config //#custom-config
system.stop() system.shutdown()
} }

View file

@ -25,7 +25,9 @@ which explains the existence of the fourth choice (as a supervisor also is
subordinate to another supervisor higher up) and has implications on the first subordinate to another supervisor higher up) and has implications on the first
three: resuming an actor resumes all its subordinates, restarting an actor three: resuming an actor resumes all its subordinates, restarting an actor
entails restarting all its subordinates, similarly stopping an actor will also entails restarting all its subordinates, similarly stopping an actor will also
stop all its subordinates. stop all its subordinates. It should be noted that the default behavior of an
actor is to stop all its children before restarting, but this can be overridden
using the :meth:`preRestart` hook.
Each supervisor is configured with a function translating all possible failure Each supervisor is configured with a function translating all possible failure
causes (i.e. exceptions) into one of the four choices given above; notably, causes (i.e. exceptions) into one of the four choices given above; notably,
@ -69,14 +71,12 @@ that the restart is not visible outside of the actor itself with the notable
exception that the message during which the failure occurred is not exception that the message during which the failure occurred is not
re-processed. re-processed.
Restarting an actor in this way recursively restarts all its children in the Restarting an actor in this way recursively terminates all its children. If
same fashion, whereby all parentchild relationships are kept intact. If this this is not the right approach for certain sub-trees of the supervision
is not the right approach for certain sub-trees of the supervision hierarchy, hierarchy, you may choose to retain the children, in which case they will be
you should choose to stop the failed actor instead, which will terminate all recursively restarted in the same fashion as the failed parent (with the same
its children recursively, after which that part of the system may be recreated default to terminate children, which must be overridden on a per-actor basis,
from scratch. The second part of this action may be implemented using the see :class:`Actor` for details).
lifecycle monitoring described next or using lifecycle callbacks as described
in :class:`Actor`.
What Lifecycle Monitoring Means What Lifecycle Monitoring Means
------------------------------- -------------------------------

View file

@ -42,7 +42,7 @@ public class UntypedActorTestBase {
ActorRef myActor = system.actorOf(new Props(MyUntypedActor.class)); ActorRef myActor = system.actorOf(new Props(MyUntypedActor.class));
//#system-actorOf //#system-actorOf
myActor.tell("test"); myActor.tell("test");
system.stop(); system.shutdown();
} }
@Test @Test
@ -52,7 +52,7 @@ public class UntypedActorTestBase {
ActorRef myActor = system.actorOf(new Props(MyUntypedActor.class)); ActorRef myActor = system.actorOf(new Props(MyUntypedActor.class));
//#context-actorOf //#context-actorOf
myActor.tell("test"); myActor.tell("test");
system.stop(); system.shutdown();
} }
@Test @Test
@ -67,7 +67,7 @@ public class UntypedActorTestBase {
})); }));
//#creating-constructor //#creating-constructor
myActor.tell("test"); myActor.tell("test");
system.stop(); system.shutdown();
} }
@Test @Test
@ -80,7 +80,7 @@ public class UntypedActorTestBase {
"myactor"); "myactor");
//#creating-props //#creating-props
myActor.tell("test"); myActor.tell("test");
system.stop(); system.shutdown();
} }
@Test @Test
@ -105,7 +105,7 @@ public class UntypedActorTestBase {
} }
} }
//#using-ask //#using-ask
system.stop(); system.shutdown();
} }
@Test @Test
@ -113,7 +113,7 @@ public class UntypedActorTestBase {
ActorSystem system = ActorSystem.create("MySystem"); ActorSystem system = ActorSystem.create("MySystem");
ActorRef myActor = system.actorOf(new Props(MyReceivedTimeoutUntypedActor.class)); ActorRef myActor = system.actorOf(new Props(MyReceivedTimeoutUntypedActor.class));
myActor.tell("Hello"); myActor.tell("Hello");
system.stop(); system.shutdown();
} }
@Test @Test
@ -123,7 +123,7 @@ public class UntypedActorTestBase {
//#poison-pill //#poison-pill
myActor.tell(poisonPill()); myActor.tell(poisonPill());
//#poison-pill //#poison-pill
system.stop(); system.shutdown();
} }
@Test @Test
@ -133,7 +133,7 @@ public class UntypedActorTestBase {
//#kill //#kill
victim.tell(kill()); victim.tell(kill());
//#kill //#kill
system.stop(); system.shutdown();
} }
@Test @Test
@ -147,7 +147,7 @@ public class UntypedActorTestBase {
myActor.tell("foo"); myActor.tell("foo");
myActor.tell("bar"); myActor.tell("bar");
myActor.tell("bar"); myActor.tell("bar");
system.stop(); system.shutdown();
} }
public static class MyActor extends UntypedActor { public static class MyActor extends UntypedActor {

View file

@ -37,7 +37,7 @@ public class LoggingDocTestBase {
} }
})); }));
myActor.tell("test"); myActor.tell("test");
system.stop(); system.shutdown();
} }
//#my-actor //#my-actor

View file

@ -31,7 +31,7 @@ public class DurableMailboxDocTestBase {
})); }));
//#define-dispatcher //#define-dispatcher
myActor.tell("test"); myActor.tell("test");
system.stop(); system.shutdown();
} }
public static class MyUntypedActor extends UntypedActor { public static class MyUntypedActor extends UntypedActor {

View file

@ -40,7 +40,7 @@ class FirstActor extends Actor {
case DoIt(msg) case DoIt(msg)
val replyMsg = doSomeDangerousWork(msg) val replyMsg = doSomeDangerousWork(msg)
sender ! replyMsg sender ! replyMsg
self.stop() context.stop(self)
} }
def doSomeDangerousWork(msg: ImmutableMessage): String = { "done" } def doSomeDangerousWork(msg: ImmutableMessage): String = { "done" }
})) ! m })) ! m
@ -143,7 +143,7 @@ class ActorDocSpec extends AkkaSpec(Map("akka.loglevel" -> "INFO")) {
//#import-context //#import-context
val first = system.actorOf(Props(new FirstActor)) val first = system.actorOf(Props(new FirstActor))
first.stop() system.stop(first)
} }
@ -169,7 +169,7 @@ class ActorDocSpec extends AkkaSpec(Map("akka.loglevel" -> "INFO")) {
system.eventStream.unsubscribe(testActor) system.eventStream.unsubscribe(testActor)
system.eventStream.publish(TestEvent.UnMute(filter)) system.eventStream.publish(TestEvent.UnMute(filter))
myActor.stop() system.stop(myActor)
} }
"creating actor with constructor" in { "creating actor with constructor" in {
@ -182,7 +182,7 @@ class ActorDocSpec extends AkkaSpec(Map("akka.loglevel" -> "INFO")) {
val myActor = system.actorOf(Props(new MyActor("..."))) val myActor = system.actorOf(Props(new MyActor("...")))
//#creating-constructor //#creating-constructor
myActor.stop() system.stop(myActor)
} }
"creating actor with Props" in { "creating actor with Props" in {
@ -192,7 +192,7 @@ class ActorDocSpec extends AkkaSpec(Map("akka.loglevel" -> "INFO")) {
val myActor = system.actorOf(Props[MyActor].withDispatcher(dispatcher), name = "myactor") val myActor = system.actorOf(Props[MyActor].withDispatcher(dispatcher), name = "myactor")
//#creating-props //#creating-props
myActor.stop() system.stop(myActor)
} }
"using ask" in { "using ask" in {
@ -214,7 +214,7 @@ class ActorDocSpec extends AkkaSpec(Map("akka.loglevel" -> "INFO")) {
val result: Option[Int] = for (x (myActor ? 3).as[Int]) yield { 2 * x } val result: Option[Int] = for (x (myActor ? 3).as[Int]) yield { 2 * x }
//#using-ask //#using-ask
myActor.stop() system.stop(myActor)
} }
"using receiveTimeout" in { "using receiveTimeout" in {

View file

@ -149,6 +149,6 @@ object QDumper {
new QueueDumper(filename, system.log)() new QueueDumper(filename, system.log)()
} }
system.stop() system.shutdown()
} }
} }

View file

@ -57,7 +57,7 @@ class RemoteConnectionManager(
def isEmpty: Boolean = connections.connections.isEmpty def isEmpty: Boolean = connections.connections.isEmpty
def shutdown() { def shutdown() {
state.get.iterable foreach (_.stop()) // shut down all remote connections state.get.iterable foreach (system.stop(_)) // shut down all remote connections
} }
@tailrec @tailrec
@ -136,7 +136,7 @@ class RemoteConnectionManager(
//if we are not able to update the state, we just try again. //if we are not able to update the state, we just try again.
if (!state.compareAndSet(oldState, newState)) { if (!state.compareAndSet(oldState, newState)) {
// we failed, need compensating action // we failed, need compensating action
newConnection.stop() // stop the new connection actor and try again system.stop(newConnection) // stop the new connection actor and try again
putIfAbsent(address, newConnectionFactory) // recur putIfAbsent(address, newConnectionFactory) // recur
} else { } else {
// we succeeded // we succeeded

View file

@ -10,7 +10,7 @@ object RandomRoutedRemoteActorMultiJvmSpec {
class SomeActor extends Actor with Serializable { class SomeActor extends Actor with Serializable {
def receive = { def receive = {
case "hit" sender ! context.system.nodename case "hit" sender ! context.system.nodename
case "end" self.stop() case "end" context.stop(self)
} }
} }
} }

View file

@ -10,7 +10,7 @@ object RoundRobinRoutedRemoteActorMultiJvmSpec {
class SomeActor extends Actor with Serializable { class SomeActor extends Actor with Serializable {
def receive = { def receive = {
case "hit" sender ! context.system.nodename case "hit" sender ! context.system.nodename
case "end" self.stop() case "end" context.stop(self)
} }
} }
} }

View file

@ -11,7 +11,7 @@ object ScatterGatherRoutedRemoteActorMultiJvmSpec {
class SomeActor extends Actor with Serializable { class SomeActor extends Actor with Serializable {
def receive = { def receive = {
case "hit" sender ! context.system.nodename case "hit" sender ! context.system.nodename
case "end" self.stop() case "end" context.stop(self)
} }
} }
} }

View file

@ -61,7 +61,7 @@ akka {
implicit val timeout = system.settings.ActorTimeout implicit val timeout = system.settings.ActorTimeout
override def atTermination() { override def atTermination() {
other.stop() other.shutdown()
} }
"Remoting" must { "Remoting" must {
@ -103,7 +103,7 @@ akka {
expectMsg("preRestart") expectMsg("preRestart")
r ! 42 r ! 42
expectMsg(42) expectMsg(42)
r.stop() system.stop(r)
expectMsg("postStop") expectMsg("postStop")
} }
@ -130,4 +130,4 @@ akka {
} }
} }

View file

@ -40,7 +40,7 @@ akka {
val other = ActorSystem("remote_sys", conf) val other = ActorSystem("remote_sys", conf)
override def atTermination() { override def atTermination() {
other.stop() other.shutdown()
} }
"A Remote Router" must { "A Remote Router" must {
@ -55,4 +55,4 @@ akka {
} }
} }

View file

@ -20,7 +20,7 @@ class HelloActor extends Actor {
case Start worldActor ! "Hello" case Start worldActor ! "Hello"
case s: String case s: String
println("Received message: %s".format(s)) println("Received message: %s".format(s))
context.system.stop() context.system.shutdown()
} }
} }

View file

@ -302,8 +302,8 @@ class ThreadBasedAgentUpdater[T](agent: Agent[T]) extends Actor {
sender.tell(atomic(txFactory) { agent.ref alter update.function.asInstanceOf[T T] }) sender.tell(atomic(txFactory) { agent.ref alter update.function.asInstanceOf[T T] })
} finally { } finally {
agent.resume() agent.resume()
self.stop() context.stop(self)
} }
case _ self.stop() case _ context.stop(self)
} }
} }

View file

@ -24,6 +24,6 @@ public class EitherOrElseExample {
} }
}.execute(); }.execute();
brancher.stop(); application.stop(brancher);
} }
} }

View file

@ -46,8 +46,8 @@ public class RetryExample {
System.out.println("Account 2: " + acc2); System.out.println("Account 2: " + acc2);
// Account 2: 600.0 // Account 2: 600.0
transferer.stop(); application.stop(transferer);
application.stop(); application.shutdown();
} }
} }

View file

@ -40,9 +40,9 @@ public class UntypedCoordinatedExample {
} }
} }
counter1.stop(); application.stop(counter1);
counter2.stop(); application.stop(counter2);
application.stop(); application.shutdown();
} }
} }

View file

@ -39,9 +39,9 @@ public class UntypedTransactorExample {
} }
} }
counter1.stop(); application.stop(counter1);
counter2.stop(); application.stop(counter2);
application.stop(); application.shutdown();
} }
} }

View file

@ -44,7 +44,7 @@ public class UntypedCoordinatedIncrementTest {
@AfterClass @AfterClass
public static void afterAll() { public static void afterAll() {
system.stop(); system.shutdown();
system = null; system = null;
} }
@ -113,6 +113,6 @@ public class UntypedCoordinatedIncrementTest {
@After @After
public void stop() { public void stop() {
application.stop(); application.shutdown();
} }
} }

View file

@ -41,7 +41,7 @@ public class UntypedTransactorTest {
@AfterClass @AfterClass
public static void afterAll() { public static void afterAll() {
system.stop(); system.shutdown();
system = null; system = null;
} }

View file

@ -74,8 +74,8 @@ class CoordinatedIncrementSpec extends AkkaSpec with BeforeAndAfterAll {
for (counter counters) { for (counter counters) {
(counter ? GetCount).as[Int].get must be === 1 (counter ? GetCount).as[Int].get must be === 1
} }
counters foreach (_.stop()) counters foreach (system.stop(_))
failer.stop() system.stop(failer)
} }
"increment no counters with a failing transaction" in { "increment no counters with a failing transaction" in {
@ -91,8 +91,8 @@ class CoordinatedIncrementSpec extends AkkaSpec with BeforeAndAfterAll {
for (counter counters) { for (counter counters) {
(counter ? GetCount).as[Int].get must be === 0 (counter ? GetCount).as[Int].get must be === 0
} }
counters foreach (_.stop()) counters foreach (system.stop(_))
failer.stop() system.stop(failer)
} }
} }
} }

View file

@ -123,8 +123,8 @@ class FickleFriendsSpec extends AkkaSpec with BeforeAndAfterAll {
for (counter counters) { for (counter counters) {
(counter ? GetCount).as[Int].get must be === 1 (counter ? GetCount).as[Int].get must be === 1
} }
counters foreach (_.stop()) counters foreach (system.stop(_))
coordinator.stop() system.stop(coordinator)
} }
} }
} }

View file

@ -97,8 +97,8 @@ class TransactorSpec extends AkkaSpec {
for (counter counters) { for (counter counters) {
(counter ? GetCount).as[Int].get must be === 1 (counter ? GetCount).as[Int].get must be === 1
} }
counters foreach (_.stop()) counters foreach (system.stop(_))
failer.stop() system.stop(failer)
} }
"increment no counters with a failing transaction" in { "increment no counters with a failing transaction" in {
@ -114,8 +114,8 @@ class TransactorSpec extends AkkaSpec {
for (counter counters) { for (counter counters) {
(counter ? GetCount).as[Int].get must be === 0 (counter ? GetCount).as[Int].get must be === 0
} }
counters foreach (_.stop()) counters foreach (system.stop(_))
failer.stop() system.stop(failer)
} }
} }
} }
@ -129,7 +129,7 @@ class TransactorSpec extends AkkaSpec {
latch.await latch.await
val value = atomic { ref.get } val value = atomic { ref.get }
value must be === 5 value must be === 5
transactor.stop() system.stop(transactor)
} }
} }
} }

View file

@ -64,7 +64,7 @@ abstract class AkkaSpec(_system: ActorSystem)
} }
final override def afterAll { final override def afterAll {
system.stop() system.shutdown()
try system.asInstanceOf[ActorSystemImpl].terminationFuture.await(5 seconds) catch { try system.asInstanceOf[ActorSystemImpl].terminationFuture.await(5 seconds) catch {
case _: FutureTimeoutException system.log.warning("Failed to stop [{}] within 5 seconds", system.name) case _: FutureTimeoutException system.log.warning("Failed to stop [{}] within 5 seconds", system.name)
} }
@ -76,7 +76,7 @@ abstract class AkkaSpec(_system: ActorSystem)
protected def atTermination() {} protected def atTermination() {}
def spawn(body: Unit)(implicit dispatcher: MessageDispatcher) { def spawn(body: Unit)(implicit dispatcher: MessageDispatcher) {
system.actorOf(Props(ctx { case "go" try body finally ctx.self.stop() }).withDispatcher(dispatcher)) ! "go" system.actorOf(Props(ctx { case "go" try body finally ctx.stop(ctx.self) }).withDispatcher(dispatcher)) ! "go"
} }
} }
@ -96,7 +96,7 @@ class AkkaSpecSpec extends WordSpec with MustMatchers {
val ref = Seq(testActor, system.actorOf(Props.empty, "name")) val ref = Seq(testActor, system.actorOf(Props.empty, "name"))
} }
spec.ref foreach (_.isTerminated must not be true) spec.ref foreach (_.isTerminated must not be true)
system.stop() system.shutdown()
spec.awaitCond(spec.ref forall (_.isTerminated), 2 seconds) spec.awaitCond(spec.ref forall (_.isTerminated), 2 seconds)
} }
@ -120,7 +120,7 @@ class AkkaSpecSpec extends WordSpec with MustMatchers {
implicit val davyJones = otherSystem.actorOf(Props(new Actor { implicit val davyJones = otherSystem.actorOf(Props(new Actor {
def receive = { def receive = {
case m: DeadLetter locker :+= m case m: DeadLetter locker :+= m
case "Die!" sender ! "finally gone"; self.stop() case "Die!" sender ! "finally gone"; context.stop(self)
} }
}), "davyJones") }), "davyJones")
@ -139,15 +139,15 @@ class AkkaSpecSpec extends WordSpec with MustMatchers {
val latch = new TestLatch(1)(system) val latch = new TestLatch(1)(system)
system.registerOnTermination(latch.countDown()) system.registerOnTermination(latch.countDown())
system.stop() system.shutdown()
latch.await(2 seconds) latch.await(2 seconds)
(davyJones ? "Die!").get must be === "finally gone" (davyJones ? "Die!").get must be === "finally gone"
// this will typically also contain log messages which were sent after the logger shutdown // this will typically also contain log messages which were sent after the logger shutdown
locker must contain(DeadLetter(42, davyJones, probe.ref)) locker must contain(DeadLetter(42, davyJones, probe.ref))
} finally { } finally {
system.stop() system.shutdown()
otherSystem.stop() otherSystem.shutdown()
} }
} }

View file

@ -56,7 +56,7 @@ object TestActorRefSpec {
class WorkerActor() extends TActor { class WorkerActor() extends TActor {
def receiveT = { def receiveT = {
case "work" sender ! "workDone"; self.stop() case "work" sender ! "workDone"; context.stop(self)
case replyTo: Promise[Any] replyTo.completeWithResult("complexReply") case replyTo: Promise[Any] replyTo.completeWithResult("complexReply")
case replyTo: ActorRef replyTo ! "complexReply" case replyTo: ActorRef replyTo ! "complexReply"
} }

View file

@ -113,7 +113,7 @@ public class Pi {
Result result = (Result) message; Result result = (Result) message;
pi += result.getValue(); pi += result.getValue();
nrOfResults += 1; nrOfResults += 1;
if (nrOfResults == nrOfMessages) getSelf().stop(); if (nrOfResults == nrOfMessages) getContext().stop(getSelf());
} else throw new IllegalArgumentException("Unknown message [" + message + "]"); } else throw new IllegalArgumentException("Unknown message [" + message + "]");
//#handle-messages //#handle-messages
} }
@ -157,7 +157,7 @@ public class Pi {
latch.await(); latch.await();
// Shut down the system // Shut down the system
system.stop(); system.shutdown();
} }
} }
//#app //#app

View file

@ -62,7 +62,7 @@ object Pi extends App {
pi += value pi += value
nrOfResults += 1 nrOfResults += 1
// Stops this actor and all its supervised children // Stops this actor and all its supervised children
if (nrOfResults == nrOfMessages) self.stop() if (nrOfResults == nrOfMessages) context.stop(self)
//#handle-messages //#handle-messages
} }
//#master-receive //#master-receive
@ -98,7 +98,7 @@ object Pi extends App {
latch.await() latch.await()
// Shut down the system // Shut down the system
system.stop() system.shutdown()
} }
} }
//#app //#app

View file

@ -17,7 +17,7 @@ class WorkerSpec extends WordSpec with MustMatchers with BeforeAndAfterAll {
implicit val system = ActorSystem() implicit val system = ActorSystem()
override def afterAll { override def afterAll {
system.stop() system.shutdown()
} }
"Worker" must { "Worker" must {