remove ActorRef.stop()
- replace ActorRef.stop() by ActorRefFactory.stop(child) everywhere - ActorCell “optimizes” this to remove the child from its childrenRefs in order to allow immediate recycling of the name - the lost soul must have a place, for which the Locker has been created, where Davy Jones will happily rebind the soul to his ship (i.e. set “parent” to the locker to avoid mem leak) and periodically revisit it (.stop(), in case of that being lost in comm failure, similar .watch() to re-check liveness)
This commit is contained in:
parent
7da61b6cc1
commit
cb85778b12
67 changed files with 328 additions and 238 deletions
|
|
@ -87,7 +87,7 @@ class ActorFireForgetRequestReplySpec extends AkkaSpec with BeforeAndAfterEach w
|
|||
state.finished.await
|
||||
1.second.dilated.sleep()
|
||||
actor.isTerminated must be(true)
|
||||
supervisor.stop()
|
||||
system.stop(supervisor)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -61,7 +61,7 @@ class ActorLifeCycleSpec extends AkkaSpec with BeforeAndAfterEach with ImplicitS
|
|||
restarter ! Kill
|
||||
expectMsg(("postStop", id, 3))
|
||||
expectNoMsg(1 seconds)
|
||||
supervisor.stop
|
||||
system.stop(supervisor)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -92,7 +92,7 @@ class ActorLifeCycleSpec extends AkkaSpec with BeforeAndAfterEach with ImplicitS
|
|||
restarter ! Kill
|
||||
expectMsg(("postStop", id, 3))
|
||||
expectNoMsg(1 seconds)
|
||||
supervisor.stop
|
||||
system.stop(supervisor)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -105,10 +105,10 @@ class ActorLifeCycleSpec extends AkkaSpec with BeforeAndAfterEach with ImplicitS
|
|||
expectMsg(("preStart", id, 0))
|
||||
a ! "status"
|
||||
expectMsg(("OK", id, 0))
|
||||
a.stop
|
||||
system.stop(a)
|
||||
expectMsg(("postStop", id, 0))
|
||||
expectNoMsg(1 seconds)
|
||||
supervisor.stop
|
||||
system.stop(supervisor)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -42,7 +42,7 @@ object ActorRefSpec {
|
|||
case "work" ⇒ {
|
||||
work
|
||||
sender ! "workDone"
|
||||
self.stop()
|
||||
context.stop(self)
|
||||
}
|
||||
case ReplyTo(replyTo) ⇒ {
|
||||
work
|
||||
|
|
@ -344,8 +344,8 @@ class ActorRefSpec extends AkkaSpec with DefaultTimeout {
|
|||
|
||||
latch.await
|
||||
|
||||
clientRef.stop()
|
||||
serverRef.stop()
|
||||
system.stop(clientRef)
|
||||
system.stop(serverRef)
|
||||
}
|
||||
|
||||
"stop when sent a poison pill" in {
|
||||
|
|
|
|||
|
|
@ -29,7 +29,7 @@ class ActorTimeoutSpec extends AkkaSpec with BeforeAndAfterAll with DefaultTimeo
|
|||
try {
|
||||
val f = echo ? "hallo"
|
||||
intercept[FutureTimeoutException] { f.await }
|
||||
} finally { echo.stop }
|
||||
} finally { system.stop(echo) }
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -41,14 +41,14 @@ class ActorTimeoutSpec extends AkkaSpec with BeforeAndAfterAll with DefaultTimeo
|
|||
val f = (echo ? "hallo").mapTo[String]
|
||||
intercept[FutureTimeoutException] { f.await }
|
||||
f.value must be(None)
|
||||
} finally { echo.stop }
|
||||
} finally { system.stop(echo) }
|
||||
}
|
||||
}
|
||||
|
||||
"use explicitly supplied timeout" in {
|
||||
within(testTimeout - 100.millis, testTimeout + 300.millis) {
|
||||
val echo = actorWithTimeout(Props.defaultTimeout)
|
||||
try { (echo.?("hallo", testTimeout)).as[String] must be(None) } finally { echo.stop }
|
||||
try { (echo.?("hallo", testTimeout)).as[String] must be(None) } finally { system.stop(echo) }
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -85,7 +85,7 @@ object Chameneos {
|
|||
sumMeetings += i
|
||||
if (numFaded == numChameneos) {
|
||||
Chameneos.end = System.currentTimeMillis
|
||||
self.stop()
|
||||
context.stop(self)
|
||||
}
|
||||
|
||||
case msg @ Meet(a, c) ⇒
|
||||
|
|
@ -107,7 +107,8 @@ object Chameneos {
|
|||
def run {
|
||||
// System.setProperty("akka.config", "akka.conf")
|
||||
Chameneos.start = System.currentTimeMillis
|
||||
val system = ActorSystem().actorOf(Props(new Mall(1000000, 4)))
|
||||
val system = ActorSystem()
|
||||
val actor = system.actorOf(Props(new Mall(1000000, 4)))
|
||||
Thread.sleep(10000)
|
||||
println("Elapsed: " + (end - start))
|
||||
system.stop()
|
||||
|
|
|
|||
|
|
@ -26,7 +26,7 @@ object ConsistencySpec {
|
|||
}
|
||||
|
||||
lastStep = step
|
||||
case "done" ⇒ sender ! "done"; self.stop()
|
||||
case "done" ⇒ sender ! "done"; context.stop(self)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -43,9 +43,9 @@ class DeathWatchSpec extends AkkaSpec with BeforeAndAfterEach with ImplicitSende
|
|||
expectTerminationOf(terminal)
|
||||
expectTerminationOf(terminal)
|
||||
|
||||
monitor1.stop()
|
||||
monitor2.stop()
|
||||
monitor3.stop()
|
||||
system.stop(monitor1)
|
||||
system.stop(monitor2)
|
||||
system.stop(monitor3)
|
||||
}
|
||||
|
||||
"notify with _current_ monitors with one Terminated message when an Actor is stopped" in {
|
||||
|
|
@ -69,9 +69,9 @@ class DeathWatchSpec extends AkkaSpec with BeforeAndAfterEach with ImplicitSende
|
|||
expectTerminationOf(terminal)
|
||||
expectTerminationOf(terminal)
|
||||
|
||||
monitor1.stop()
|
||||
monitor2.stop()
|
||||
monitor3.stop()
|
||||
system.stop(monitor1)
|
||||
system.stop(monitor2)
|
||||
system.stop(monitor3)
|
||||
}
|
||||
|
||||
"notify with a Terminated message once when an Actor is stopped but not when restarted" in {
|
||||
|
|
@ -90,7 +90,7 @@ class DeathWatchSpec extends AkkaSpec with BeforeAndAfterEach with ImplicitSende
|
|||
expectTerminationOf(terminal)
|
||||
terminal.isTerminated must be === true
|
||||
|
||||
supervisor.stop()
|
||||
system.stop(supervisor)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -99,9 +99,9 @@ class DeathWatchSpec extends AkkaSpec with BeforeAndAfterEach with ImplicitSende
|
|||
case class FF(fail: Failed)
|
||||
val supervisor = system.actorOf(Props[Supervisor]
|
||||
.withFaultHandler(new OneForOneStrategy(FaultHandlingStrategy.makeDecider(List(classOf[Exception])), Some(0)) {
|
||||
override def handleFailure(child: ActorRef, cause: Throwable, stats: ChildRestartStats, children: Iterable[ChildRestartStats]) = {
|
||||
override def handleFailure(context: ActorContext, child: ActorRef, cause: Throwable, stats: ChildRestartStats, children: Iterable[ChildRestartStats]) = {
|
||||
testActor.tell(FF(Failed(cause)), child)
|
||||
super.handleFailure(child, cause, stats, children)
|
||||
super.handleFailure(context, child, cause, stats, children)
|
||||
}
|
||||
}))
|
||||
|
||||
|
|
|
|||
|
|
@ -187,7 +187,7 @@ class FSMActorSpec extends AkkaSpec(Map("akka.actor.debug.fsm" -> true)) with Im
|
|||
}
|
||||
val ref = system.actorOf(Props(fsm))
|
||||
started.await
|
||||
ref.stop()
|
||||
system.stop(ref)
|
||||
expectMsg(1 second, fsm.StopEvent(Shutdown, 1, null))
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -78,7 +78,7 @@ class FSMTransitionSpec extends AkkaSpec with ImplicitSender {
|
|||
within(300 millis) {
|
||||
fsm ! SubscribeTransitionCallBack(forward)
|
||||
expectMsg(CurrentState(fsm, 0))
|
||||
forward.stop()
|
||||
system.stop(forward)
|
||||
fsm ! "tick"
|
||||
expectNoMsg
|
||||
}
|
||||
|
|
|
|||
|
|
@ -196,9 +196,9 @@ class IOActorSpec extends AkkaSpec with BeforeAndAfterEach with DefaultTimeout {
|
|||
f1.get must equal(ByteString("Hello World!1"))
|
||||
f2.get must equal(ByteString("Hello World!2"))
|
||||
f3.get must equal(ByteString("Hello World!3"))
|
||||
client.stop
|
||||
server.stop
|
||||
ioManager.stop
|
||||
system.stop(client)
|
||||
system.stop(server)
|
||||
system.stop(ioManager)
|
||||
}
|
||||
|
||||
"run echo server under high load" in {
|
||||
|
|
@ -210,9 +210,9 @@ class IOActorSpec extends AkkaSpec with BeforeAndAfterEach with DefaultTimeout {
|
|||
val list = List.range(0, 1000)
|
||||
val f = Future.traverse(list)(i ⇒ client ? ByteString(i.toString))
|
||||
assert(f.get.size === 1000)
|
||||
client.stop
|
||||
server.stop
|
||||
ioManager.stop
|
||||
system.stop(client)
|
||||
system.stop(server)
|
||||
system.stop(ioManager)
|
||||
}
|
||||
|
||||
"run echo server under high load with small buffer" in {
|
||||
|
|
@ -224,9 +224,9 @@ class IOActorSpec extends AkkaSpec with BeforeAndAfterEach with DefaultTimeout {
|
|||
val list = List.range(0, 1000)
|
||||
val f = Future.traverse(list)(i ⇒ client ? ByteString(i.toString))
|
||||
assert(f.get.size === 1000)
|
||||
client.stop
|
||||
server.stop
|
||||
ioManager.stop
|
||||
system.stop(client)
|
||||
system.stop(server)
|
||||
system.stop(ioManager)
|
||||
}
|
||||
|
||||
"run key-value store" in {
|
||||
|
|
@ -250,10 +250,10 @@ class IOActorSpec extends AkkaSpec with BeforeAndAfterEach with DefaultTimeout {
|
|||
f4.get must equal("OK")
|
||||
f5.get must equal(ByteString("I'm a test!"))
|
||||
f6.get must equal(Map("hello" -> ByteString("World"), "test" -> ByteString("I'm a test!")))
|
||||
client1.stop
|
||||
client2.stop
|
||||
server.stop
|
||||
ioManager.stop
|
||||
system.stop(client1)
|
||||
system.stop(client2)
|
||||
system.stop(server)
|
||||
system.stop(ioManager)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -26,7 +26,7 @@ class ReceiveTimeoutSpec extends AkkaSpec {
|
|||
}))
|
||||
|
||||
timeoutLatch.await
|
||||
timeoutActor.stop()
|
||||
system.stop(timeoutActor)
|
||||
}
|
||||
|
||||
"reschedule timeout after regular receive" in {
|
||||
|
|
@ -45,7 +45,7 @@ class ReceiveTimeoutSpec extends AkkaSpec {
|
|||
timeoutActor ! Tick
|
||||
|
||||
timeoutLatch.await
|
||||
timeoutActor.stop()
|
||||
system.stop(timeoutActor)
|
||||
}
|
||||
|
||||
"be able to turn off timeout if desired" in {
|
||||
|
|
@ -69,7 +69,7 @@ class ReceiveTimeoutSpec extends AkkaSpec {
|
|||
|
||||
timeoutLatch.await
|
||||
count.get must be(1)
|
||||
timeoutActor.stop()
|
||||
system.stop(timeoutActor)
|
||||
}
|
||||
|
||||
"not receive timeout message when not specified" in {
|
||||
|
|
@ -82,7 +82,7 @@ class ReceiveTimeoutSpec extends AkkaSpec {
|
|||
}))
|
||||
|
||||
timeoutLatch.awaitTimeout(1 second) // timeout expected
|
||||
timeoutActor.stop()
|
||||
system.stop(timeoutActor)
|
||||
}
|
||||
|
||||
"have ReceiveTimeout eq to Actors ReceiveTimeout" in {
|
||||
|
|
|
|||
|
|
@ -24,7 +24,7 @@ class SupervisorMiscSpec extends AkkaSpec with DefaultTimeout {
|
|||
override def postRestart(cause: Throwable) { countDownLatch.countDown() }
|
||||
protected def receive = {
|
||||
case "status" ⇒ this.sender ! "OK"
|
||||
case _ ⇒ this.self.stop()
|
||||
case _ ⇒ this.context.stop(self)
|
||||
}
|
||||
})
|
||||
|
||||
|
|
|
|||
|
|
@ -306,7 +306,7 @@ class SupervisorSpec extends AkkaSpec with BeforeAndAfterEach with ImplicitSende
|
|||
|
||||
inits.get must be(3)
|
||||
|
||||
supervisor.stop()
|
||||
system.stop(supervisor)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -35,7 +35,7 @@ class SupervisorTreeSpec extends AkkaSpec with ImplicitSender with DefaultTimeou
|
|||
expectMsg(middleActor.path)
|
||||
expectMsg(lastActor.path)
|
||||
expectNoMsg(2 seconds)
|
||||
headActor.stop()
|
||||
system.stop(headActor)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -28,7 +28,7 @@ class Ticket669Spec extends AkkaSpec with BeforeAndAfterAll with ImplicitSender
|
|||
|
||||
supervised.!("test")(testActor)
|
||||
expectMsg("failure1")
|
||||
supervisor.stop()
|
||||
system.stop(supervisor)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -39,7 +39,7 @@ class Ticket669Spec extends AkkaSpec with BeforeAndAfterAll with ImplicitSender
|
|||
|
||||
supervised.!("test")(testActor)
|
||||
expectMsg("failure2")
|
||||
supervisor.stop()
|
||||
system.stop(supervisor)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -77,7 +77,7 @@ object ActorModelSpec {
|
|||
case Forward(to, msg) ⇒ ack; to.forward(msg); busy.switchOff()
|
||||
case CountDown(latch) ⇒ ack; latch.countDown(); busy.switchOff()
|
||||
case Increment(count) ⇒ ack; count.incrementAndGet(); busy.switchOff()
|
||||
case CountDownNStop(l) ⇒ ack; l.countDown(); self.stop(); busy.switchOff()
|
||||
case CountDownNStop(l) ⇒ ack; l.countDown(); context.stop(self); busy.switchOff()
|
||||
case Restart ⇒ ack; busy.switchOff(); throw new Exception("Restart requested")
|
||||
case Interrupt ⇒ ack; sender ! Status.Failure(new ActorInterruptedException(new InterruptedException("Ping!"))); busy.switchOff(); throw new InterruptedException("Ping!")
|
||||
case ThrowException(e: Throwable) ⇒ ack; busy.switchOff(); throw e
|
||||
|
|
@ -239,7 +239,7 @@ abstract class ActorModelSpec extends AkkaSpec with DefaultTimeout {
|
|||
assertDispatcher(dispatcher)(stops = 0)
|
||||
val a = newTestActor(dispatcher)
|
||||
assertDispatcher(dispatcher)(stops = 0)
|
||||
a.stop()
|
||||
system.stop(a)
|
||||
assertDispatcher(dispatcher)(stops = 1)
|
||||
assertRef(a, dispatcher)(
|
||||
suspensions = 0,
|
||||
|
|
@ -260,7 +260,7 @@ abstract class ActorModelSpec extends AkkaSpec with DefaultTimeout {
|
|||
|
||||
assertDispatcher(dispatcher)(stops = 2)
|
||||
|
||||
a2.stop
|
||||
system.stop(a2)
|
||||
assertDispatcher(dispatcher)(stops = 3)
|
||||
}
|
||||
|
||||
|
|
@ -279,7 +279,7 @@ abstract class ActorModelSpec extends AkkaSpec with DefaultTimeout {
|
|||
assertCountDown(oneAtATime, (1.5 seconds).dilated.toMillis, "Processed message when allowed")
|
||||
assertRefDefaultZero(a)(registers = 1, msgsReceived = 3, msgsProcessed = 3)
|
||||
|
||||
a.stop()
|
||||
system.stop(a)
|
||||
assertRefDefaultZero(a)(registers = 1, unregisters = 1, msgsReceived = 3, msgsProcessed = 3)
|
||||
}
|
||||
|
||||
|
|
@ -298,7 +298,7 @@ abstract class ActorModelSpec extends AkkaSpec with DefaultTimeout {
|
|||
assertCountDown(counter, 3.seconds.dilated.toMillis, "Should process 200 messages")
|
||||
assertRefDefaultZero(a)(registers = 1, msgsReceived = 200, msgsProcessed = 200)
|
||||
|
||||
a.stop()
|
||||
system.stop(a)
|
||||
}
|
||||
|
||||
def spawn(f: ⇒ Unit) {
|
||||
|
|
@ -328,7 +328,7 @@ abstract class ActorModelSpec extends AkkaSpec with DefaultTimeout {
|
|||
assertRefDefaultZero(a)(registers = 1, msgsReceived = 1, msgsProcessed = 1,
|
||||
suspensions = 1, resumes = 1)
|
||||
|
||||
a.stop()
|
||||
system.stop(a)
|
||||
assertRefDefaultZero(a)(registers = 1, unregisters = 1, msgsReceived = 1, msgsProcessed = 1,
|
||||
suspensions = 1, resumes = 1)
|
||||
}
|
||||
|
|
@ -370,7 +370,7 @@ abstract class ActorModelSpec extends AkkaSpec with DefaultTimeout {
|
|||
throw e
|
||||
}
|
||||
assertCountDown(stopLatch, waitTime, "Expected all children to stop")
|
||||
boss.stop()
|
||||
system.stop(boss)
|
||||
}
|
||||
for (run ← 1 to 3) {
|
||||
flood(50000)
|
||||
|
|
@ -447,8 +447,8 @@ class DispatcherModelSpec extends ActorModelSpec {
|
|||
|
||||
aStop.countDown()
|
||||
|
||||
a.stop
|
||||
b.stop
|
||||
system.stop(a)
|
||||
system.stop(b)
|
||||
|
||||
while (!a.isTerminated && !b.isTerminated) {} //Busy wait for termination
|
||||
|
||||
|
|
@ -484,8 +484,8 @@ class BalancingDispatcherModelSpec extends ActorModelSpec {
|
|||
|
||||
aStop.countDown()
|
||||
|
||||
a.stop
|
||||
b.stop
|
||||
system.stop(a)
|
||||
system.stop(b)
|
||||
|
||||
while (!a.isTerminated && !b.isTerminated) {} //Busy wait for termination
|
||||
|
||||
|
|
|
|||
|
|
@ -74,8 +74,8 @@ class BalancingDispatcherSpec extends AkkaSpec {
|
|||
fast.underlying.actor.asInstanceOf[DelayableActor].invocationCount must be > sentToFast
|
||||
fast.underlying.actor.asInstanceOf[DelayableActor].invocationCount must be >
|
||||
(slow.underlying.actor.asInstanceOf[DelayableActor].invocationCount)
|
||||
slow.stop()
|
||||
fast.stop()
|
||||
system.stop(slow)
|
||||
system.stop(fast)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -39,14 +39,14 @@ class DispatcherActorSpec extends AkkaSpec with DefaultTimeout {
|
|||
val actor = system.actorOf(Props[OneWayTestActor].withDispatcher(system.dispatcherFactory.newDispatcher("test").build))
|
||||
val result = actor ! "OneWay"
|
||||
assert(OneWayTestActor.oneWay.await(1, TimeUnit.SECONDS))
|
||||
actor.stop()
|
||||
system.stop(actor)
|
||||
}
|
||||
|
||||
"support ask/reply" in {
|
||||
val actor = system.actorOf(Props[TestActor].withDispatcher(system.dispatcherFactory.newDispatcher("test").build))
|
||||
val result = (actor ? "Hello").as[String]
|
||||
assert("World" === result.get)
|
||||
actor.stop()
|
||||
system.stop(actor)
|
||||
}
|
||||
|
||||
"respect the throughput setting" in {
|
||||
|
|
@ -72,8 +72,8 @@ class DispatcherActorSpec extends AkkaSpec with DefaultTimeout {
|
|||
fastOne ! "sabotage"
|
||||
start.countDown()
|
||||
latch.await(10, TimeUnit.SECONDS)
|
||||
fastOne.stop()
|
||||
slowOne.stop()
|
||||
system.stop(fastOne)
|
||||
system.stop(slowOne)
|
||||
assert(latch.getCount() === 0)
|
||||
}
|
||||
|
||||
|
|
@ -90,13 +90,13 @@ class DispatcherActorSpec extends AkkaSpec with DefaultTimeout {
|
|||
|
||||
val fastOne = system.actorOf(
|
||||
Props(context ⇒ {
|
||||
case "ping" ⇒ if (works.get) latch.countDown(); context.self.stop()
|
||||
case "ping" ⇒ if (works.get) latch.countDown(); context.stop(context.self)
|
||||
}).withDispatcher(throughputDispatcher))
|
||||
|
||||
val slowOne = system.actorOf(
|
||||
Props(context ⇒ {
|
||||
case "hogexecutor" ⇒ ready.countDown(); start.await
|
||||
case "ping" ⇒ works.set(false); context.self.stop()
|
||||
case "ping" ⇒ works.set(false); context.stop(context.self)
|
||||
}).withDispatcher(throughputDispatcher))
|
||||
|
||||
slowOne ! "hogexecutor"
|
||||
|
|
|
|||
|
|
@ -49,8 +49,8 @@ class DispatcherActorsSpec extends AkkaSpec {
|
|||
assert(sFinished.getCount > 0)
|
||||
sFinished.await
|
||||
assert(sFinished.getCount === 0)
|
||||
f.stop()
|
||||
s.stop()
|
||||
system.stop(f)
|
||||
system.stop(s)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -30,14 +30,14 @@ class PinnedActorSpec extends AkkaSpec with BeforeAndAfterEach with DefaultTimeo
|
|||
val actor = system.actorOf(Props(self ⇒ { case "OneWay" ⇒ oneWay.countDown() }).withDispatcher(system.dispatcherFactory.newPinnedDispatcher("test")))
|
||||
val result = actor ! "OneWay"
|
||||
assert(oneWay.await(1, TimeUnit.SECONDS))
|
||||
actor.stop()
|
||||
system.stop(actor)
|
||||
}
|
||||
|
||||
"support ask/reply" in {
|
||||
val actor = system.actorOf(Props[TestActor].withDispatcher(system.dispatcherFactory.newPinnedDispatcher("test")))
|
||||
val result = (actor ? "Hello").as[String]
|
||||
assert("World" === result.get)
|
||||
actor.stop()
|
||||
system.stop(actor)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -50,7 +50,7 @@ class ListenerSpec extends AkkaSpec {
|
|||
|
||||
fooLatch.await
|
||||
|
||||
for (a ← List(broadcast, a1, a2, a3)) a.stop()
|
||||
for (a ← List(broadcast, a1, a2, a3)) system.stop(a)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -120,7 +120,7 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa
|
|||
val future = actor ? "Hello"
|
||||
future.await
|
||||
test(future, "World")
|
||||
actor.stop()
|
||||
system.stop(actor)
|
||||
}
|
||||
}
|
||||
"throws an exception" must {
|
||||
|
|
@ -130,7 +130,7 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa
|
|||
val future = actor ? "Failure"
|
||||
future.await
|
||||
test(future, "Expected exception; to test fault-tolerance")
|
||||
actor.stop()
|
||||
system.stop(actor)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -144,8 +144,8 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa
|
|||
val future = actor1 ? "Hello" flatMap { case s: String ⇒ actor2 ? s }
|
||||
future.await
|
||||
test(future, "WORLD")
|
||||
actor1.stop()
|
||||
actor2.stop()
|
||||
system.stop(actor1)
|
||||
system.stop(actor2)
|
||||
}
|
||||
}
|
||||
"will throw an exception" must {
|
||||
|
|
@ -156,8 +156,8 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa
|
|||
val future = actor1 ? "Hello" flatMap { case s: String ⇒ actor2 ? s }
|
||||
future.await
|
||||
test(future, "/ by zero")
|
||||
actor1.stop()
|
||||
actor2.stop()
|
||||
system.stop(actor1)
|
||||
system.stop(actor2)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -169,8 +169,8 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa
|
|||
val future = actor1 ? "Hello" flatMap { case i: Int ⇒ actor2 ? i }
|
||||
future.await
|
||||
test(future, "World (of class java.lang.String)")
|
||||
actor1.stop()
|
||||
actor2.stop()
|
||||
system.stop(actor1)
|
||||
system.stop(actor2)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -204,7 +204,7 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa
|
|||
future1.get must be("10-14")
|
||||
assert(checkType(future1, manifest[String]))
|
||||
intercept[ClassCastException] { future2.get }
|
||||
actor.stop()
|
||||
system.stop(actor)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -233,7 +233,7 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa
|
|||
|
||||
future1.get must be("10-14")
|
||||
intercept[MatchError] { future2.get }
|
||||
actor.stop()
|
||||
system.stop(actor)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -280,7 +280,7 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa
|
|||
future10.get must be("World")
|
||||
future11.get must be("Oops!")
|
||||
|
||||
actor.stop()
|
||||
system.stop(actor)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -396,7 +396,7 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa
|
|||
val actor = system.actorOf(Props[TestActor])
|
||||
actor ? "Hello" onResult { case "World" ⇒ latch.open }
|
||||
assert(latch.tryAwait(5, TimeUnit.SECONDS))
|
||||
actor.stop()
|
||||
system.stop(actor)
|
||||
}
|
||||
|
||||
"shouldTraverseFutures" in {
|
||||
|
|
@ -411,7 +411,7 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa
|
|||
|
||||
val oddFutures = List.fill(100)(oddActor ? 'GetNext mapTo manifest[Int])
|
||||
assert(Future.sequence(oddFutures).get.sum === 10000)
|
||||
oddActor.stop()
|
||||
system.stop(oddActor)
|
||||
|
||||
val list = (1 to 100).toList
|
||||
assert(Future.traverse(list)(x ⇒ Future(x * 2 - 1)).get.sum === 10000)
|
||||
|
|
@ -470,7 +470,7 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa
|
|||
|
||||
assert(r.get === "Hello World!")
|
||||
|
||||
actor.stop
|
||||
system.stop(actor)
|
||||
}
|
||||
|
||||
"futureComposingWithContinuationsFailureDivideZero" in {
|
||||
|
|
|
|||
|
|
@ -8,7 +8,7 @@ import org.scalatest.BeforeAndAfterEach
|
|||
import akka.testkit._
|
||||
import akka.util.duration._
|
||||
import java.util.concurrent.atomic._
|
||||
import akka.actor.{ Props, Actor, ActorRef }
|
||||
import akka.actor.{ Props, Actor, ActorRef, ActorSystem }
|
||||
import java.util.Comparator
|
||||
import akka.japi.{ Procedure, Function }
|
||||
|
||||
|
|
@ -33,7 +33,7 @@ abstract class EventBusSpec(busName: String) extends AkkaSpec with BeforeAndAfte
|
|||
|
||||
def classifierFor(event: BusType#Event): BusType#Classifier
|
||||
|
||||
def disposeSubscriber(subscriber: BusType#Subscriber): Unit
|
||||
def disposeSubscriber(system: ActorSystem, subscriber: BusType#Subscriber): Unit
|
||||
|
||||
busName must {
|
||||
|
||||
|
|
@ -58,7 +58,7 @@ abstract class EventBusSpec(busName: String) extends AkkaSpec with BeforeAndAfte
|
|||
"not allow to unsubscribe non-existing subscriber" in {
|
||||
val sub = createNewSubscriber()
|
||||
bus.unsubscribe(sub, classifier) must be === false
|
||||
disposeSubscriber(sub)
|
||||
disposeSubscriber(system, sub)
|
||||
}
|
||||
|
||||
"not allow for the same subscriber to subscribe to the same channel twice" in {
|
||||
|
|
@ -80,7 +80,7 @@ abstract class EventBusSpec(busName: String) extends AkkaSpec with BeforeAndAfte
|
|||
subscribers.zip(classifiers) forall { case (s, c) ⇒ bus.subscribe(s, c) } must be === true
|
||||
subscribers.zip(classifiers) forall { case (s, c) ⇒ bus.unsubscribe(s, c) } must be === true
|
||||
|
||||
subscribers foreach disposeSubscriber
|
||||
subscribers foreach (disposeSubscriber(system, _))
|
||||
}
|
||||
|
||||
"publishing events without any subscribers shouldn't be a problem" in {
|
||||
|
|
@ -113,7 +113,7 @@ abstract class EventBusSpec(busName: String) extends AkkaSpec with BeforeAndAfte
|
|||
subscribers foreach { s ⇒ bus.subscribe(s, classifier) must be === true }
|
||||
bus.publish(event)
|
||||
range foreach { _ ⇒ expectMsg(event) }
|
||||
subscribers foreach { s ⇒ bus.unsubscribe(s, classifier) must be === true; disposeSubscriber(s) }
|
||||
subscribers foreach { s ⇒ bus.unsubscribe(s, classifier) must be === true; disposeSubscriber(system, s) }
|
||||
}
|
||||
|
||||
"not publish the given event to any other subscribers than the intended ones" in {
|
||||
|
|
@ -136,7 +136,7 @@ abstract class EventBusSpec(busName: String) extends AkkaSpec with BeforeAndAfte
|
|||
}
|
||||
|
||||
"cleanup subscriber" in {
|
||||
disposeSubscriber(subscriber)
|
||||
disposeSubscriber(system, subscriber)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -165,7 +165,7 @@ class ActorEventBusSpec extends EventBusSpec("ActorEventBus") {
|
|||
|
||||
def classifierFor(event: BusType#Event) = event.toString
|
||||
|
||||
def disposeSubscriber(subscriber: BusType#Subscriber): Unit = subscriber.stop()
|
||||
def disposeSubscriber(system: ActorSystem, subscriber: BusType#Subscriber): Unit = system.stop(subscriber)
|
||||
}
|
||||
|
||||
object ScanningEventBusSpec {
|
||||
|
|
@ -194,7 +194,7 @@ class ScanningEventBusSpec extends EventBusSpec("ScanningEventBus") {
|
|||
|
||||
def classifierFor(event: BusType#Event) = event.toString
|
||||
|
||||
def disposeSubscriber(subscriber: BusType#Subscriber): Unit = ()
|
||||
def disposeSubscriber(system: ActorSystem, subscriber: BusType#Subscriber): Unit = ()
|
||||
}
|
||||
|
||||
object LookupEventBusSpec {
|
||||
|
|
@ -219,5 +219,5 @@ class LookupEventBusSpec extends EventBusSpec("LookupEventBus") {
|
|||
|
||||
def classifierFor(event: BusType#Event) = event.toString
|
||||
|
||||
def disposeSubscriber(subscriber: BusType#Subscriber): Unit = ()
|
||||
def disposeSubscriber(system: ActorSystem, subscriber: BusType#Subscriber): Unit = ()
|
||||
}
|
||||
|
|
|
|||
|
|
@ -201,7 +201,7 @@ class LoggingReceiveSpec extends WordSpec with BeforeAndAfterEach with BeforeAnd
|
|||
assert(set == Set(1, 2, 3), set + " was not Set(1, 2, 3)")
|
||||
}
|
||||
|
||||
supervisor.stop()
|
||||
system.stop(supervisor)
|
||||
expectMsg(Logging.Debug(sname, "stopping"))
|
||||
expectMsg(Logging.Debug(aname, "stopped"))
|
||||
expectMsg(Logging.Debug(sname, "stopped"))
|
||||
|
|
|
|||
|
|
@ -75,7 +75,7 @@ class TellLatencyPerformanceSpec extends PerformanceSpec {
|
|||
ok must be(true)
|
||||
logMeasurement(numberOfClients, durationNs, stat)
|
||||
}
|
||||
clients.foreach(_.stop())
|
||||
clients.foreach(system.stop(_))
|
||||
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -173,8 +173,8 @@ class TellThroughput10000PerformanceSpec extends PerformanceSpec {
|
|||
ok must be(true)
|
||||
logMeasurement(numberOfClients, durationNs, repeat)
|
||||
}
|
||||
clients.foreach(_.stop())
|
||||
destinations.foreach(_.stop())
|
||||
clients.foreach(system.stop(_))
|
||||
destinations.foreach(system.stop(_))
|
||||
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -147,8 +147,8 @@ class TellThroughputComputationPerformanceSpec extends PerformanceSpec {
|
|||
ok must be(true)
|
||||
logMeasurement(numberOfClients, durationNs, repeat)
|
||||
}
|
||||
clients.foreach(_.stop())
|
||||
destinations.foreach(_.stop())
|
||||
clients.foreach(system.stop(_))
|
||||
destinations.foreach(system.stop(_))
|
||||
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -78,8 +78,8 @@ class TellThroughputPerformanceSpec extends PerformanceSpec {
|
|||
ok must be(true)
|
||||
logMeasurement(numberOfClients, durationNs, repeat)
|
||||
}
|
||||
clients.foreach(_.stop())
|
||||
destinations.foreach(_.stop())
|
||||
clients.foreach(system.stop(_))
|
||||
destinations.foreach(system.stop(_))
|
||||
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -159,8 +159,8 @@ class TellThroughputSeparateDispatchersPerformanceSpec extends PerformanceSpec {
|
|||
ok must be(true)
|
||||
logMeasurement(numberOfClients, durationNs, repeat)
|
||||
}
|
||||
clients.foreach(_.stop())
|
||||
destinations.foreach(_.stop())
|
||||
clients.foreach(system.stop(_))
|
||||
destinations.foreach(system.stop(_))
|
||||
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -108,7 +108,7 @@ class TradingLatencyPerformanceSpec extends PerformanceSpec {
|
|||
}
|
||||
logMeasurement(numberOfClients, durationNs, stat)
|
||||
}
|
||||
clients.foreach(_.stop())
|
||||
clients.foreach(system.stop(_))
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -105,7 +105,7 @@ class TradingThroughputPerformanceSpec extends PerformanceSpec {
|
|||
}
|
||||
logMeasurement(numberOfClients, durationNs, totalNumberOfOrders)
|
||||
}
|
||||
clients.foreach(_.stop())
|
||||
clients.foreach(system.stop(_))
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -99,7 +99,7 @@ class ActorPoolSpec extends AkkaSpec with DefaultTimeout {
|
|||
|
||||
(pool ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2)
|
||||
|
||||
pool.stop()
|
||||
system.stop(pool)
|
||||
}
|
||||
|
||||
"pass ticket #705" in {
|
||||
|
|
@ -129,7 +129,7 @@ class ActorPoolSpec extends AkkaSpec with DefaultTimeout {
|
|||
_.await.resultOrException.get must be("Response")
|
||||
}
|
||||
} finally {
|
||||
pool.stop()
|
||||
system.stop(pool)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -194,7 +194,7 @@ class ActorPoolSpec extends AkkaSpec with DefaultTimeout {
|
|||
|
||||
(pool ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(4)
|
||||
|
||||
pool.stop()
|
||||
system.stop(pool)
|
||||
}
|
||||
|
||||
"grow as needed under mailbox pressure" in {
|
||||
|
|
@ -250,7 +250,7 @@ class ActorPoolSpec extends AkkaSpec with DefaultTimeout {
|
|||
|
||||
(pool ? ActorPool.Stat).as[ActorPool.Stats].get.size must be >= (3)
|
||||
|
||||
pool.stop()
|
||||
system.stop(pool)
|
||||
}
|
||||
|
||||
"round robin" in {
|
||||
|
|
@ -281,7 +281,7 @@ class ActorPoolSpec extends AkkaSpec with DefaultTimeout {
|
|||
latch1.await
|
||||
delegates.size must be(1)
|
||||
|
||||
pool1.stop()
|
||||
system.stop(pool1)
|
||||
|
||||
val latch2 = TestLatch(2)
|
||||
delegates.clear()
|
||||
|
|
@ -309,7 +309,7 @@ class ActorPoolSpec extends AkkaSpec with DefaultTimeout {
|
|||
latch2.await
|
||||
delegates.size must be(2)
|
||||
|
||||
pool2.stop()
|
||||
system.stop(pool2)
|
||||
}
|
||||
|
||||
"backoff" in {
|
||||
|
|
@ -355,7 +355,7 @@ class ActorPoolSpec extends AkkaSpec with DefaultTimeout {
|
|||
|
||||
(pool ? ActorPool.Stat).as[ActorPool.Stats].get.size must be <= (z)
|
||||
|
||||
pool.stop()
|
||||
system.stop(pool)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -49,7 +49,7 @@ class ConfiguredLocalRoutingSpec extends AkkaSpec with DefaultTimeout with Impli
|
|||
actor ! "hello"
|
||||
helloLatch.await(5, TimeUnit.SECONDS) must be(true)
|
||||
|
||||
actor.stop()
|
||||
system.stop(actor)
|
||||
stopLatch.await(5, TimeUnit.SECONDS) must be(true)
|
||||
}
|
||||
|
||||
|
|
@ -104,7 +104,7 @@ class ConfiguredLocalRoutingSpec extends AkkaSpec with DefaultTimeout with Impli
|
|||
actor ! Broadcast("hello")
|
||||
helloLatch.await(5, TimeUnit.SECONDS) must be(true)
|
||||
|
||||
actor.stop()
|
||||
system.stop(actor)
|
||||
stopLatch.await(5, TimeUnit.SECONDS) must be(true)
|
||||
}
|
||||
}
|
||||
|
|
@ -134,7 +134,7 @@ class ConfiguredLocalRoutingSpec extends AkkaSpec with DefaultTimeout with Impli
|
|||
for (i ← 1 to 5) expectMsg("world")
|
||||
}
|
||||
|
||||
actor.stop()
|
||||
system.stop(actor)
|
||||
stopLatch.await(5, TimeUnit.SECONDS) must be(true)
|
||||
}
|
||||
|
||||
|
|
@ -190,7 +190,7 @@ class ConfiguredLocalRoutingSpec extends AkkaSpec with DefaultTimeout with Impli
|
|||
actor ! Broadcast("hello")
|
||||
helloLatch.await(5, TimeUnit.SECONDS) must be(true)
|
||||
|
||||
actor.stop()
|
||||
system.stop(actor)
|
||||
stopLatch.await(5, TimeUnit.SECONDS) must be(true)
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -43,7 +43,7 @@ class RoutingSpec extends AkkaSpec with DefaultTimeout with ImplicitSender {
|
|||
val c1, c2 = expectMsgType[ActorRef]
|
||||
watch(router)
|
||||
watch(c2)
|
||||
c2.stop()
|
||||
system.stop(c2)
|
||||
expectMsg(Terminated(c2))
|
||||
// it might take a while until the Router has actually processed the Terminated message
|
||||
awaitCond {
|
||||
|
|
@ -54,7 +54,7 @@ class RoutingSpec extends AkkaSpec with DefaultTimeout with ImplicitSender {
|
|||
}
|
||||
res == Seq(c1, c1)
|
||||
}
|
||||
c1.stop()
|
||||
system.stop(c1)
|
||||
expectMsg(Terminated(router))
|
||||
}
|
||||
|
||||
|
|
@ -324,8 +324,8 @@ class RoutingSpec extends AkkaSpec with DefaultTimeout with ImplicitSender {
|
|||
|
||||
def newActor(id: Int, shudownLatch: Option[TestLatch] = None) = system.actorOf(Props(new Actor {
|
||||
def receive = {
|
||||
case Stop(None) ⇒ self.stop()
|
||||
case Stop(Some(_id)) if (_id == id) ⇒ self.stop()
|
||||
case Stop(None) ⇒ context.stop(self)
|
||||
case Stop(Some(_id)) if (_id == id) ⇒ context.stop(self)
|
||||
case _id: Int if (_id == id) ⇒
|
||||
case x ⇒ {
|
||||
Thread sleep 100 * id
|
||||
|
|
|
|||
|
|
@ -106,7 +106,7 @@ class SerializeSpec extends AkkaSpec(SerializeSpec.serializationConf) {
|
|||
}))
|
||||
a ! new ObjectOutputStream(new ByteArrayOutputStream())
|
||||
expectMsg("pass")
|
||||
a.stop()
|
||||
system.stop(a)
|
||||
}
|
||||
|
||||
"serialize DeadLetterActorRef" in {
|
||||
|
|
|
|||
|
|
@ -185,7 +185,7 @@ private[akka] class ActorCell(
|
|||
val system: ActorSystemImpl,
|
||||
val self: InternalActorRef,
|
||||
val props: Props,
|
||||
val parent: InternalActorRef,
|
||||
final val parent: InternalActorRef,
|
||||
/*no member*/ _receiveTimeout: Option[Duration],
|
||||
var hotswap: Stack[PartialFunction[Any, Unit]]) extends UntypedActorContext {
|
||||
|
||||
|
|
@ -242,6 +242,16 @@ private[akka] class ActorCell(
|
|||
_actorOf(props, name)
|
||||
}
|
||||
|
||||
final def stop(actor: ActorRef): Unit = {
|
||||
val a = actor.asInstanceOf[InternalActorRef]
|
||||
if (childrenRefs contains actor.path.name) {
|
||||
system.locker ! a
|
||||
childrenRefs -= actor.path.name
|
||||
handleChildTerminated(actor)
|
||||
}
|
||||
a.stop()
|
||||
}
|
||||
|
||||
final var currentMessage: Envelope = null
|
||||
|
||||
final var actor: Actor = _
|
||||
|
|
@ -405,7 +415,8 @@ private[akka] class ActorCell(
|
|||
// do not process normal messages while waiting for all children to terminate
|
||||
dispatcher suspend this
|
||||
if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self.path.toString, "stopping"))
|
||||
for (child ← c) child.stop()
|
||||
// do not use stop(child) because that would dissociate the children from us, but we still want to wait for them
|
||||
for (child ← c) child.asInstanceOf[InternalActorRef].stop()
|
||||
stopping = true
|
||||
}
|
||||
}
|
||||
|
|
@ -550,15 +561,17 @@ private[akka] class ActorCell(
|
|||
}
|
||||
|
||||
final def handleFailure(child: ActorRef, cause: Throwable): Unit = childrenRefs.get(child.path.name) match {
|
||||
case Some(stats) if stats.child == child ⇒ if (!props.faultHandler.handleFailure(child, cause, stats, childrenRefs.values)) throw cause
|
||||
case Some(stats) if stats.child == child ⇒ if (!props.faultHandler.handleFailure(this, child, cause, stats, childrenRefs.values)) throw cause
|
||||
case Some(stats) ⇒ system.eventStream.publish(Warning(self.path.toString, "dropping Failed(" + cause + ") from unknown child " + child + " matching names but not the same, was: " + stats.child))
|
||||
case None ⇒ system.eventStream.publish(Warning(self.path.toString, "dropping Failed(" + cause + ") from unknown child " + child))
|
||||
}
|
||||
|
||||
final def handleChildTerminated(child: ActorRef): Unit = {
|
||||
childrenRefs -= child.path.name
|
||||
props.faultHandler.handleChildTerminated(child, children)
|
||||
if (stopping && childrenRefs.isEmpty) doTerminate()
|
||||
if (childrenRefs contains child.path.name) {
|
||||
childrenRefs -= child.path.name
|
||||
props.faultHandler.handleChildTerminated(this, child, children)
|
||||
if (stopping && childrenRefs.isEmpty) doTerminate()
|
||||
} else system.locker ! ChildTerminated(child)
|
||||
}
|
||||
|
||||
// ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅
|
||||
|
|
|
|||
|
|
@ -110,11 +110,6 @@ abstract class ActorRef extends java.lang.Comparable[ActorRef] with Serializable
|
|||
*/
|
||||
def forward(message: Any)(implicit context: ActorContext) = tell(message, context.sender)
|
||||
|
||||
/**
|
||||
* Shuts down the actor its dispatcher and message queue.
|
||||
*/
|
||||
def stop(): Unit
|
||||
|
||||
/**
|
||||
* Is the actor shut down?
|
||||
*/
|
||||
|
|
@ -192,6 +187,7 @@ private[akka] abstract class InternalActorRef extends ActorRef with ScalaActorRe
|
|||
def resume(): Unit
|
||||
def suspend(): Unit
|
||||
def restart(cause: Throwable): Unit
|
||||
def stop(): Unit
|
||||
def sendSystemMessage(message: SystemMessage): Unit
|
||||
def getParent: InternalActorRef
|
||||
/**
|
||||
|
|
|
|||
|
|
@ -234,6 +234,14 @@ trait ActorRefFactory {
|
|||
* replies in order to resolve the matching set of actors.
|
||||
*/
|
||||
def actorSelection(path: String): ActorSelection = ActorSelection(lookupRoot, path)
|
||||
|
||||
/**
|
||||
* Stop the actor pointed to by the given [[akka.actor.ActorRef]]; this is
|
||||
* an asynchronous operation, i.e. involves a message send, but if invoked
|
||||
* on an [[akka.actor.ActorContext]] if operating on a child of that
|
||||
* context it will free up the name for immediate reuse.
|
||||
*/
|
||||
def stop(actor: ActorRef): Unit
|
||||
}
|
||||
|
||||
class ActorRefProviderException(message: String) extends AkkaException(message)
|
||||
|
|
@ -248,6 +256,11 @@ private[akka] case class CreateChild(props: Props, name: String)
|
|||
*/
|
||||
private[akka] case class CreateRandomNameChild(props: Props)
|
||||
|
||||
/**
|
||||
* Internal Akka use only, used in implementation of system.stop(child).
|
||||
*/
|
||||
private[akka] case class StopChild(child: ActorRef)
|
||||
|
||||
/**
|
||||
* Local ActorRef provider.
|
||||
*/
|
||||
|
|
@ -309,7 +322,7 @@ class LocalActorRefProvider(
|
|||
override def isTerminated = stopped.isOn
|
||||
|
||||
override def !(message: Any)(implicit sender: ActorRef = null): Unit = stopped.ifOff(message match {
|
||||
case Failed(ex) if sender ne null ⇒ causeOfTermination = Some(ex); sender.stop()
|
||||
case Failed(ex) if sender ne null ⇒ causeOfTermination = Some(ex); sender.asInstanceOf[InternalActorRef].stop()
|
||||
case _ ⇒ log.error(this + " received unexpected message [" + message + "]")
|
||||
})
|
||||
|
||||
|
|
@ -329,9 +342,10 @@ class LocalActorRefProvider(
|
|||
*/
|
||||
private class Guardian extends Actor {
|
||||
def receive = {
|
||||
case Terminated(_) ⇒ context.self.stop()
|
||||
case Terminated(_) ⇒ context.stop(self)
|
||||
case CreateChild(child, name) ⇒ sender ! (try context.actorOf(child, name) catch { case e: Exception ⇒ e })
|
||||
case CreateRandomNameChild(child) ⇒ sender ! (try context.actorOf(child) catch { case e: Exception ⇒ e })
|
||||
case StopChild(child) ⇒ context.stop(child); sender ! "ok"
|
||||
case m ⇒ deadLetters ! DeadLetter(m, sender, self)
|
||||
}
|
||||
}
|
||||
|
|
@ -345,9 +359,10 @@ class LocalActorRefProvider(
|
|||
def receive = {
|
||||
case Terminated(_) ⇒
|
||||
eventStream.stopDefaultLoggers()
|
||||
context.self.stop()
|
||||
context.stop(self)
|
||||
case CreateChild(child, name) ⇒ sender ! (try context.actorOf(child, name) catch { case e: Exception ⇒ e })
|
||||
case CreateRandomNameChild(child) ⇒ sender ! (try context.actorOf(child) catch { case e: Exception ⇒ e })
|
||||
case StopChild(child) ⇒ context.stop(child); sender ! "ok"
|
||||
case m ⇒ deadLetters ! DeadLetter(m, sender, self)
|
||||
}
|
||||
}
|
||||
|
|
@ -508,6 +523,9 @@ class DefaultScheduler(hashedWheelTimer: HashedWheelTimer, log: LoggingAdapter,
|
|||
def schedule(initialDelay: Duration, delay: Duration)(f: ⇒ Unit): Cancellable =
|
||||
new DefaultCancellable(hashedWheelTimer.newTimeout(createContinuousTask(delay, f), initialDelay))
|
||||
|
||||
def schedule(initialDelay: Duration, delay: Duration, runnable: Runnable): Cancellable =
|
||||
new DefaultCancellable(hashedWheelTimer.newTimeout(createContinuousTask(delay, runnable), initialDelay))
|
||||
|
||||
def scheduleOnce(delay: Duration, runnable: Runnable): Cancellable =
|
||||
new DefaultCancellable(hashedWheelTimer.newTimeout(createSingleTask(runnable), delay))
|
||||
|
||||
|
|
@ -565,6 +583,17 @@ class DefaultScheduler(hashedWheelTimer: HashedWheelTimer, log: LoggingAdapter,
|
|||
}
|
||||
}
|
||||
|
||||
private def createContinuousTask(delay: Duration, runnable: Runnable): TimerTask = {
|
||||
new TimerTask {
|
||||
def run(timeout: org.jboss.netty.akka.util.Timeout) {
|
||||
dispatcher.dispatchTask(() ⇒ runnable.run())
|
||||
try timeout.getTimer.newTimeout(this, delay) catch {
|
||||
case _: IllegalStateException ⇒ // stop recurring if timer is stopped
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private def execDirectly(t: HWTimeout): Unit = {
|
||||
try t.getTask.run(t) catch {
|
||||
case e: InterruptedException ⇒ throw e
|
||||
|
|
|
|||
|
|
@ -361,6 +361,24 @@ class ActorSystemImpl(val name: String, applicationConfig: Config) extends Actor
|
|||
}
|
||||
}
|
||||
|
||||
def stop(actor: ActorRef): Unit = {
|
||||
implicit val timeout = settings.CreationTimeout
|
||||
val path = actor.path
|
||||
if (path.parent == guardian.path) {
|
||||
(guardian ? StopChild(actor)).get match {
|
||||
case ex: Exception ⇒ throw ex
|
||||
case _ ⇒
|
||||
}
|
||||
} else if (path.parent == systemGuardian.path) {
|
||||
(systemGuardian ? StopChild(actor)).get match {
|
||||
case ex: Exception ⇒ throw ex
|
||||
case _ ⇒
|
||||
}
|
||||
} else {
|
||||
actor.asInstanceOf[InternalActorRef].stop()
|
||||
}
|
||||
}
|
||||
|
||||
import settings._
|
||||
|
||||
// this provides basic logging (to stdout) until .start() is called below
|
||||
|
|
@ -428,13 +446,15 @@ class ActorSystemImpl(val name: String, applicationConfig: Config) extends Actor
|
|||
this
|
||||
}
|
||||
|
||||
lazy val locker: Locker = new Locker(scheduler, lookupRoot.path / "locker", deathWatch)
|
||||
|
||||
def start() = _start
|
||||
|
||||
def registerOnTermination[T](code: ⇒ T) { terminationFuture onComplete (_ ⇒ code) }
|
||||
def registerOnTermination(code: Runnable) { terminationFuture onComplete (_ ⇒ code.run) }
|
||||
|
||||
def stop() {
|
||||
guardian.stop()
|
||||
stop(guardian)
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
|||
|
|
@ -511,7 +511,7 @@ trait FSM[S, D] extends ListenerManagement {
|
|||
case _ ⇒
|
||||
nextState.replies.reverse foreach { r ⇒ sender ! r }
|
||||
terminate(nextState)
|
||||
self.stop()
|
||||
context.stop(self)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -119,12 +119,12 @@ abstract class FaultHandlingStrategy {
|
|||
/**
|
||||
* This method is called after the child has been removed from the set of children.
|
||||
*/
|
||||
def handleChildTerminated(child: ActorRef, children: Iterable[ActorRef]): Unit
|
||||
def handleChildTerminated(context: ActorContext, child: ActorRef, children: Iterable[ActorRef]): Unit
|
||||
|
||||
/**
|
||||
* This method is called to act on the failure of a child: restart if the flag is true, stop otherwise.
|
||||
*/
|
||||
def processFailure(restart: Boolean, child: ActorRef, cause: Throwable, stats: ChildRestartStats, children: Iterable[ChildRestartStats]): Unit
|
||||
def processFailure(context: ActorContext, restart: Boolean, child: ActorRef, cause: Throwable, stats: ChildRestartStats, children: Iterable[ChildRestartStats]): Unit
|
||||
|
||||
def handleSupervisorFailing(supervisor: ActorRef, children: Iterable[ActorRef]): Unit = {
|
||||
if (children.nonEmpty)
|
||||
|
|
@ -139,12 +139,12 @@ abstract class FaultHandlingStrategy {
|
|||
/**
|
||||
* Returns whether it processed the failure or not
|
||||
*/
|
||||
def handleFailure(child: ActorRef, cause: Throwable, stats: ChildRestartStats, children: Iterable[ChildRestartStats]): Boolean = {
|
||||
def handleFailure(context: ActorContext, child: ActorRef, cause: Throwable, stats: ChildRestartStats, children: Iterable[ChildRestartStats]): Boolean = {
|
||||
val action = if (decider.isDefinedAt(cause)) decider(cause) else Escalate
|
||||
action match {
|
||||
case Resume ⇒ child.asInstanceOf[InternalActorRef].resume(); true
|
||||
case Restart ⇒ processFailure(true, child, cause, stats, children); true
|
||||
case Stop ⇒ processFailure(false, child, cause, stats, children); true
|
||||
case Restart ⇒ processFailure(context, true, child, cause, stats, children); true
|
||||
case Stop ⇒ processFailure(context, false, child, cause, stats, children); true
|
||||
case Escalate ⇒ false
|
||||
}
|
||||
}
|
||||
|
|
@ -192,17 +192,17 @@ case class AllForOneStrategy(decider: FaultHandlingStrategy.Decider,
|
|||
*/
|
||||
val retriesWindow = (maxNrOfRetries, withinTimeRange)
|
||||
|
||||
def handleChildTerminated(child: ActorRef, children: Iterable[ActorRef]): Unit = {
|
||||
children foreach (_.stop())
|
||||
def handleChildTerminated(context: ActorContext, child: ActorRef, children: Iterable[ActorRef]): Unit = {
|
||||
children foreach (context.stop(_))
|
||||
//TODO optimization to drop all children here already?
|
||||
}
|
||||
|
||||
def processFailure(restart: Boolean, child: ActorRef, cause: Throwable, stats: ChildRestartStats, children: Iterable[ChildRestartStats]): Unit = {
|
||||
def processFailure(context: ActorContext, restart: Boolean, child: ActorRef, cause: Throwable, stats: ChildRestartStats, children: Iterable[ChildRestartStats]): Unit = {
|
||||
if (children.nonEmpty) {
|
||||
if (restart && children.forall(_.requestRestartPermission(retriesWindow)))
|
||||
children.foreach(_.child.asInstanceOf[InternalActorRef].restart(cause))
|
||||
else
|
||||
children.foreach(_.child.stop())
|
||||
for (c ← children) context.stop(c.child)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -249,13 +249,13 @@ case class OneForOneStrategy(decider: FaultHandlingStrategy.Decider,
|
|||
*/
|
||||
val retriesWindow = (maxNrOfRetries, withinTimeRange)
|
||||
|
||||
def handleChildTerminated(child: ActorRef, children: Iterable[ActorRef]): Unit = {}
|
||||
def handleChildTerminated(context: ActorContext, child: ActorRef, children: Iterable[ActorRef]): Unit = {}
|
||||
|
||||
def processFailure(restart: Boolean, child: ActorRef, cause: Throwable, stats: ChildRestartStats, children: Iterable[ChildRestartStats]): Unit = {
|
||||
def processFailure(context: ActorContext, restart: Boolean, child: ActorRef, cause: Throwable, stats: ChildRestartStats, children: Iterable[ChildRestartStats]): Unit = {
|
||||
if (restart && stats.requestRestartPermission(retriesWindow))
|
||||
child.asInstanceOf[InternalActorRef].restart(cause)
|
||||
else
|
||||
child.stop() //TODO optimization to drop child here already?
|
||||
context.stop(child) //TODO optimization to drop child here already?
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
63
akka-actor/src/main/scala/akka/actor/Locker.scala
Normal file
63
akka-actor/src/main/scala/akka/actor/Locker.scala
Normal file
|
|
@ -0,0 +1,63 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
package akka.actor
|
||||
|
||||
import akka.dispatch._
|
||||
import akka.util.duration._
|
||||
import java.util.concurrent.ConcurrentHashMap
|
||||
import akka.event.DeathWatch
|
||||
|
||||
class Locker(scheduler: Scheduler, val path: ActorPath, val deathWatch: DeathWatch) extends MinimalActorRef {
|
||||
|
||||
class DavyJones extends Runnable {
|
||||
def run = {
|
||||
val iter = heap.entrySet.iterator
|
||||
while (iter.hasNext) {
|
||||
val soul = iter.next();
|
||||
deathWatch.subscribe(Locker.this, soul.getKey) // in case Terminated got lost somewhere
|
||||
soul.getKey match {
|
||||
case _: LocalActorRef ⇒ // nothing to do, they know what they signed up for
|
||||
case nonlocal ⇒ nonlocal.stop() // try again in case it was due to a communications failure
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private val heap = new ConcurrentHashMap[InternalActorRef, Long]
|
||||
|
||||
scheduler.schedule(5 seconds, 5 seconds, new DavyJones)
|
||||
|
||||
override def sendSystemMessage(msg: SystemMessage): Unit = this.!(msg)
|
||||
|
||||
override def !(msg: Any)(implicit sender: ActorRef = null): Unit = msg match {
|
||||
case Terminated(soul) ⇒ heap.remove(soul)
|
||||
case ChildTerminated(soul) ⇒ heap.remove(soul)
|
||||
case soul: InternalActorRef ⇒
|
||||
heap.put(soul, 0l) // wanted to put System.nanoTime and do something intelligent, but forgot what that was
|
||||
deathWatch.subscribe(this, soul)
|
||||
// now re-bind the soul so that it does not drown its parent
|
||||
soul match {
|
||||
case local: LocalActorRef ⇒
|
||||
val cell = local.underlying
|
||||
rebind(cell, cell.getClass)
|
||||
case _ ⇒
|
||||
}
|
||||
case _ ⇒ // ignore
|
||||
}
|
||||
|
||||
@scala.annotation.tailrec
|
||||
final private def rebind(cell: ActorCell, clazz: Class[_]): Unit = {
|
||||
try {
|
||||
val heart = clazz.getDeclaredField("parent")
|
||||
heart.setAccessible(true)
|
||||
heart.set(cell, this)
|
||||
return
|
||||
} catch {
|
||||
case _: NoSuchFieldException ⇒
|
||||
}
|
||||
val sc = clazz.getSuperclass
|
||||
if (sc != null) rebind(cell, sc)
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -42,6 +42,15 @@ trait Scheduler {
|
|||
*/
|
||||
def schedule(initialDelay: Duration, frequency: Duration)(f: ⇒ Unit): Cancellable
|
||||
|
||||
/**
|
||||
* Schedules a function to be run repeatedly with an initial delay and a frequency.
|
||||
* E.g. if you would like the function to be run after 2 seconds and thereafter every 100ms you would set
|
||||
* delay = Duration(2, TimeUnit.SECONDS) and frequency = Duration(100, TimeUnit.MILLISECONDS)
|
||||
*
|
||||
* Java API
|
||||
*/
|
||||
def schedule(initialDelay: Duration, frequency: Duration, runnable: Runnable): Cancellable
|
||||
|
||||
/**
|
||||
* Schedules a Runnable to be run once with a delay, i.e. a time period that has to pass before the runnable is executed.
|
||||
*
|
||||
|
|
|
|||
|
|
@ -24,7 +24,7 @@ trait TypedActorFactory {
|
|||
*/
|
||||
def stop(proxy: AnyRef): Boolean = getActorRefFor(proxy) match {
|
||||
case null ⇒ false
|
||||
case ref ⇒ ref.stop; true
|
||||
case ref ⇒ ref.asInstanceOf[InternalActorRef].stop; true
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
|||
|
|
@ -137,7 +137,10 @@ trait LoggingBus extends ActorEventBus {
|
|||
} {
|
||||
// this is very necessary, else you get infinite loop with DeadLetter
|
||||
unsubscribe(logger)
|
||||
logger.stop()
|
||||
logger match {
|
||||
case ref: InternalActorRef ⇒ ref.stop()
|
||||
case _ ⇒
|
||||
}
|
||||
}
|
||||
publish(Debug(simpleName(this), "all default loggers stopped"))
|
||||
}
|
||||
|
|
|
|||
|
|
@ -66,49 +66,3 @@ trait ConnectionManager {
|
|||
*/
|
||||
def remove(deadRef: ActorRef)
|
||||
}
|
||||
|
||||
/**
|
||||
* Manages local connections for a router, e.g. local actors.
|
||||
*/
|
||||
class LocalConnectionManager(initialConnections: Iterable[ActorRef]) extends ConnectionManager {
|
||||
|
||||
def this(iterable: java.lang.Iterable[ActorRef]) {
|
||||
this(JavaConverters.iterableAsScalaIterableConverter(iterable).asScala)
|
||||
}
|
||||
|
||||
case class State(version: Long, connections: Iterable[ActorRef]) extends VersionedIterable[ActorRef] {
|
||||
def iterable = connections
|
||||
}
|
||||
|
||||
private val state: AtomicReference[State] = new AtomicReference[State](newState())
|
||||
|
||||
private def newState() = State(Long.MinValue, initialConnections)
|
||||
|
||||
def version: Long = state.get.version
|
||||
|
||||
def size: Int = state.get.connections.size
|
||||
|
||||
def isEmpty: Boolean = state.get.connections.isEmpty
|
||||
|
||||
def connections = state.get
|
||||
|
||||
def shutdown() {
|
||||
state.get.connections foreach (_.stop())
|
||||
}
|
||||
|
||||
@tailrec
|
||||
final def remove(ref: ActorRef) = {
|
||||
val oldState = state.get
|
||||
|
||||
//remote the ref from the connections.
|
||||
var newList = oldState.connections.filter(currentActorRef ⇒ currentActorRef ne ref)
|
||||
|
||||
if (newList.size != oldState.connections.size) {
|
||||
//one or more occurrences of the actorRef were removed, so we need to update the state.
|
||||
|
||||
val newState = State(oldState.version + 1, newList)
|
||||
//if we are not able to update the state, we just try again.
|
||||
if (!state.compareAndSet(oldState, newState)) remove(ref)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -132,7 +132,7 @@ trait Router extends Actor {
|
|||
|
||||
case Terminated(child) ⇒
|
||||
ref._routees = ref._routees filterNot (_ == child)
|
||||
if (ref.routees.isEmpty) self.stop()
|
||||
if (ref.routees.isEmpty) context.stop(self)
|
||||
|
||||
}: Receive) orElse routerReceive
|
||||
|
||||
|
|
|
|||
|
|
@ -4,13 +4,15 @@
|
|||
|
||||
package akka.util
|
||||
|
||||
import akka.actor.Actor
|
||||
|
||||
import java.util.concurrent.ConcurrentSkipListSet
|
||||
import akka.actor.{ ActorInitializationException, ActorRef }
|
||||
|
||||
/**
|
||||
* A manager for listener actors. Intended for mixin by observables.
|
||||
*/
|
||||
trait ListenerManagement {
|
||||
trait ListenerManagement { this: Actor ⇒
|
||||
|
||||
private val listeners = new ConcurrentSkipListSet[ActorRef]
|
||||
|
||||
|
|
@ -33,7 +35,7 @@ trait ListenerManagement {
|
|||
*/
|
||||
def removeListener(listener: ActorRef) {
|
||||
listeners remove listener
|
||||
if (manageLifeCycleOfListeners) listener.stop()
|
||||
if (manageLifeCycleOfListeners) context.stop(listener)
|
||||
}
|
||||
|
||||
/*
|
||||
|
|
|
|||
|
|
@ -62,6 +62,6 @@ class SchedulerDocSpec extends AkkaSpec(Map("akka.loglevel" -> "INFO")) {
|
|||
//This cancels further Ticks to be sent
|
||||
cancellable.cancel()
|
||||
//#schedule-recurring
|
||||
tickActor.stop()
|
||||
system.stop(tickActor)
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -40,7 +40,7 @@ class FirstActor extends Actor {
|
|||
case DoIt(msg) ⇒
|
||||
val replyMsg = doSomeDangerousWork(msg)
|
||||
sender ! replyMsg
|
||||
self.stop()
|
||||
context.stop(self)
|
||||
}
|
||||
def doSomeDangerousWork(msg: ImmutableMessage): String = { "done" }
|
||||
})) ! m
|
||||
|
|
@ -143,7 +143,7 @@ class ActorDocSpec extends AkkaSpec(Map("akka.loglevel" -> "INFO")) {
|
|||
//#import-context
|
||||
|
||||
val first = system.actorOf(Props(new FirstActor))
|
||||
first.stop()
|
||||
system.stop(first)
|
||||
|
||||
}
|
||||
|
||||
|
|
@ -169,7 +169,7 @@ class ActorDocSpec extends AkkaSpec(Map("akka.loglevel" -> "INFO")) {
|
|||
system.eventStream.unsubscribe(testActor)
|
||||
system.eventStream.publish(TestEvent.UnMute(filter))
|
||||
|
||||
myActor.stop()
|
||||
system.stop(myActor)
|
||||
}
|
||||
|
||||
"creating actor with constructor" in {
|
||||
|
|
@ -182,7 +182,7 @@ class ActorDocSpec extends AkkaSpec(Map("akka.loglevel" -> "INFO")) {
|
|||
val myActor = system.actorOf(Props(new MyActor("...")))
|
||||
//#creating-constructor
|
||||
|
||||
myActor.stop()
|
||||
system.stop(myActor)
|
||||
}
|
||||
|
||||
"creating actor with Props" in {
|
||||
|
|
@ -192,7 +192,7 @@ class ActorDocSpec extends AkkaSpec(Map("akka.loglevel" -> "INFO")) {
|
|||
val myActor = system.actorOf(Props[MyActor].withDispatcher(dispatcher), name = "myactor")
|
||||
//#creating-props
|
||||
|
||||
myActor.stop()
|
||||
system.stop(myActor)
|
||||
}
|
||||
|
||||
"using ask" in {
|
||||
|
|
@ -214,7 +214,7 @@ class ActorDocSpec extends AkkaSpec(Map("akka.loglevel" -> "INFO")) {
|
|||
val result: Option[Int] = for (x ← (myActor ? 3).as[Int]) yield { 2 * x }
|
||||
//#using-ask
|
||||
|
||||
myActor.stop()
|
||||
system.stop(myActor)
|
||||
}
|
||||
|
||||
"using receiveTimeout" in {
|
||||
|
|
|
|||
|
|
@ -57,7 +57,7 @@ class RemoteConnectionManager(
|
|||
def isEmpty: Boolean = connections.connections.isEmpty
|
||||
|
||||
def shutdown() {
|
||||
state.get.iterable foreach (_.stop()) // shut down all remote connections
|
||||
state.get.iterable foreach (system.stop(_)) // shut down all remote connections
|
||||
}
|
||||
|
||||
@tailrec
|
||||
|
|
@ -136,7 +136,7 @@ class RemoteConnectionManager(
|
|||
//if we are not able to update the state, we just try again.
|
||||
if (!state.compareAndSet(oldState, newState)) {
|
||||
// we failed, need compensating action
|
||||
newConnection.stop() // stop the new connection actor and try again
|
||||
system.stop(newConnection) // stop the new connection actor and try again
|
||||
putIfAbsent(address, newConnectionFactory) // recur
|
||||
} else {
|
||||
// we succeeded
|
||||
|
|
|
|||
|
|
@ -10,7 +10,7 @@ object RandomRoutedRemoteActorMultiJvmSpec {
|
|||
class SomeActor extends Actor with Serializable {
|
||||
def receive = {
|
||||
case "hit" ⇒ sender ! context.system.nodename
|
||||
case "end" ⇒ self.stop()
|
||||
case "end" ⇒ context.stop(self)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -10,7 +10,7 @@ object RoundRobinRoutedRemoteActorMultiJvmSpec {
|
|||
class SomeActor extends Actor with Serializable {
|
||||
def receive = {
|
||||
case "hit" ⇒ sender ! context.system.nodename
|
||||
case "end" ⇒ self.stop()
|
||||
case "end" ⇒ context.stop(self)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -11,7 +11,7 @@ object ScatterGatherRoutedRemoteActorMultiJvmSpec {
|
|||
class SomeActor extends Actor with Serializable {
|
||||
def receive = {
|
||||
case "hit" ⇒ sender ! context.system.nodename
|
||||
case "end" ⇒ self.stop()
|
||||
case "end" ⇒ context.stop(self)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -103,7 +103,7 @@ akka {
|
|||
expectMsg("preRestart")
|
||||
r ! 42
|
||||
expectMsg(42)
|
||||
r.stop()
|
||||
system.stop(r)
|
||||
expectMsg("postStop")
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -302,8 +302,8 @@ class ThreadBasedAgentUpdater[T](agent: Agent[T]) extends Actor {
|
|||
sender.tell(atomic(txFactory) { agent.ref alter update.function.asInstanceOf[T ⇒ T] })
|
||||
} finally {
|
||||
agent.resume()
|
||||
self.stop()
|
||||
context.stop(self)
|
||||
}
|
||||
case _ ⇒ self.stop()
|
||||
case _ ⇒ context.stop(self)
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -24,6 +24,6 @@ public class EitherOrElseExample {
|
|||
}
|
||||
}.execute();
|
||||
|
||||
brancher.stop();
|
||||
application.stop(brancher);
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -46,7 +46,7 @@ public class RetryExample {
|
|||
System.out.println("Account 2: " + acc2);
|
||||
// Account 2: 600.0
|
||||
|
||||
transferer.stop();
|
||||
application.stop(transferer);
|
||||
|
||||
application.stop();
|
||||
}
|
||||
|
|
|
|||
|
|
@ -40,8 +40,8 @@ public class UntypedCoordinatedExample {
|
|||
}
|
||||
}
|
||||
|
||||
counter1.stop();
|
||||
counter2.stop();
|
||||
application.stop(counter1);
|
||||
application.stop(counter2);
|
||||
|
||||
application.stop();
|
||||
}
|
||||
|
|
|
|||
|
|
@ -39,8 +39,8 @@ public class UntypedTransactorExample {
|
|||
}
|
||||
}
|
||||
|
||||
counter1.stop();
|
||||
counter2.stop();
|
||||
application.stop(counter1);
|
||||
application.stop(counter2);
|
||||
|
||||
application.stop();
|
||||
}
|
||||
|
|
|
|||
|
|
@ -74,8 +74,8 @@ class CoordinatedIncrementSpec extends AkkaSpec with BeforeAndAfterAll {
|
|||
for (counter ← counters) {
|
||||
(counter ? GetCount).as[Int].get must be === 1
|
||||
}
|
||||
counters foreach (_.stop())
|
||||
failer.stop()
|
||||
counters foreach (system.stop(_))
|
||||
system.stop(failer)
|
||||
}
|
||||
|
||||
"increment no counters with a failing transaction" in {
|
||||
|
|
@ -91,8 +91,8 @@ class CoordinatedIncrementSpec extends AkkaSpec with BeforeAndAfterAll {
|
|||
for (counter ← counters) {
|
||||
(counter ? GetCount).as[Int].get must be === 0
|
||||
}
|
||||
counters foreach (_.stop())
|
||||
failer.stop()
|
||||
counters foreach (system.stop(_))
|
||||
system.stop(failer)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -123,8 +123,8 @@ class FickleFriendsSpec extends AkkaSpec with BeforeAndAfterAll {
|
|||
for (counter ← counters) {
|
||||
(counter ? GetCount).as[Int].get must be === 1
|
||||
}
|
||||
counters foreach (_.stop())
|
||||
coordinator.stop()
|
||||
counters foreach (system.stop(_))
|
||||
system.stop(coordinator)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -97,8 +97,8 @@ class TransactorSpec extends AkkaSpec {
|
|||
for (counter ← counters) {
|
||||
(counter ? GetCount).as[Int].get must be === 1
|
||||
}
|
||||
counters foreach (_.stop())
|
||||
failer.stop()
|
||||
counters foreach (system.stop(_))
|
||||
system.stop(failer)
|
||||
}
|
||||
|
||||
"increment no counters with a failing transaction" in {
|
||||
|
|
@ -114,8 +114,8 @@ class TransactorSpec extends AkkaSpec {
|
|||
for (counter ← counters) {
|
||||
(counter ? GetCount).as[Int].get must be === 0
|
||||
}
|
||||
counters foreach (_.stop())
|
||||
failer.stop()
|
||||
counters foreach (system.stop(_))
|
||||
system.stop(failer)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -129,7 +129,7 @@ class TransactorSpec extends AkkaSpec {
|
|||
latch.await
|
||||
val value = atomic { ref.get }
|
||||
value must be === 5
|
||||
transactor.stop()
|
||||
system.stop(transactor)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -76,7 +76,7 @@ abstract class AkkaSpec(_system: ActorSystem)
|
|||
protected def atTermination() {}
|
||||
|
||||
def spawn(body: ⇒ Unit)(implicit dispatcher: MessageDispatcher) {
|
||||
system.actorOf(Props(ctx ⇒ { case "go" ⇒ try body finally ctx.self.stop() }).withDispatcher(dispatcher)) ! "go"
|
||||
system.actorOf(Props(ctx ⇒ { case "go" ⇒ try body finally ctx.stop(ctx.self) }).withDispatcher(dispatcher)) ! "go"
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -120,7 +120,7 @@ class AkkaSpecSpec extends WordSpec with MustMatchers {
|
|||
implicit val davyJones = otherSystem.actorOf(Props(new Actor {
|
||||
def receive = {
|
||||
case m: DeadLetter ⇒ locker :+= m
|
||||
case "Die!" ⇒ sender ! "finally gone"; self.stop()
|
||||
case "Die!" ⇒ sender ! "finally gone"; context.stop(self)
|
||||
}
|
||||
}), "davyJones")
|
||||
|
||||
|
|
|
|||
|
|
@ -56,7 +56,7 @@ object TestActorRefSpec {
|
|||
|
||||
class WorkerActor() extends TActor {
|
||||
def receiveT = {
|
||||
case "work" ⇒ sender ! "workDone"; self.stop()
|
||||
case "work" ⇒ sender ! "workDone"; context.stop(self)
|
||||
case replyTo: Promise[Any] ⇒ replyTo.completeWithResult("complexReply")
|
||||
case replyTo: ActorRef ⇒ replyTo ! "complexReply"
|
||||
}
|
||||
|
|
|
|||
|
|
@ -124,7 +124,7 @@ public class Pi {
|
|||
Result result = (Result) message;
|
||||
pi += result.getValue();
|
||||
nrOfResults += 1;
|
||||
if (nrOfResults == nrOfMessages) getSelf().stop();
|
||||
if (nrOfResults == nrOfMessages) getContext().stop(getSelf());
|
||||
|
||||
} else throw new IllegalArgumentException("Unknown message [" + message + "]");
|
||||
}
|
||||
|
|
|
|||
|
|
@ -67,7 +67,7 @@ object Pi extends App {
|
|||
nrOfResults += 1
|
||||
|
||||
// Stop this actor and all its supervised children
|
||||
if (nrOfResults == nrOfMessages) self.stop()
|
||||
if (nrOfResults == nrOfMessages) context.stop(self)
|
||||
}
|
||||
|
||||
override def preStart() {
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue