diff --git a/akka-actor-tests/src/test/scala/akka/actor/ActorSystemSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/ActorSystemSpec.scala index 39a6c79990..bcda2595bb 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/ActorSystemSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/ActorSystemSpec.scala @@ -195,7 +195,7 @@ class ActorSystemSpec extends AkkaSpec(ActorSystemSpec.config) with ImplicitSend } } - system2.shutdown() + system2.terminate() Await.ready(latch, 5 seconds) val expected = (for (i ← 1 to count) yield i).reverse @@ -213,28 +213,36 @@ class ActorSystemSpec extends AkkaSpec(ActorSystemSpec.config) with ImplicitSend callbackWasRun = true } import system.dispatcher - system2.scheduler.scheduleOnce(200.millis.dilated) { system2.shutdown() } + system2.scheduler.scheduleOnce(200.millis.dilated) { system2.terminate() } system2.awaitTermination(5 seconds) + Await.ready(system2.whenTerminated, 5 seconds) callbackWasRun should be(true) } "return isTerminated status correctly" in { - val system = ActorSystem() + val system = ActorSystem().asInstanceOf[ActorSystemImpl] system.isTerminated should be(false) - system.shutdown() + val wt = system.whenTerminated + wt.isCompleted should be(false) + val f = system.terminate() + val terminated = Await.result(wt, 10 seconds) + terminated.actor should be(system.provider.rootGuardian) + terminated.addressTerminated should be(true) + terminated.existenceConfirmed should be(true) + terminated should be theSameInstanceAs Await.result(f, 10 seconds) system.awaitTermination(10 seconds) system.isTerminated should be(true) } "throw RejectedExecutionException when shutdown" in { val system2 = ActorSystem("AwaitTermination", AkkaSpec.testConf) - system2.shutdown() + Await.ready(system2.terminate(), 10 seconds) system2.awaitTermination(10 seconds) intercept[RejectedExecutionException] { system2.registerOnTermination { println("IF YOU SEE THIS THEN THERE'S A BUG HERE") } - }.getMessage should be("Must be called prior to system shutdown.") + }.getMessage should be("ActorSystem already terminated.") } "reliably create waves of actors" in { @@ -252,10 +260,10 @@ class ActorSystemSpec extends AkkaSpec(ActorSystemSpec.config) with ImplicitSend "reliable deny creation of actors while shutting down" in { val system = ActorSystem() import system.dispatcher - system.scheduler.scheduleOnce(200 millis) { system.shutdown() } + system.scheduler.scheduleOnce(200 millis) { system.terminate() } var failing = false var created = Vector.empty[ActorRef] - while (!system.isTerminated) { + while (!system.whenTerminated.isCompleted) { try { val t = system.actorOf(Props[ActorSystemSpec.Terminater]) failing should not be true // because once failing => always failing (it’s due to shutdown) @@ -278,7 +286,7 @@ class ActorSystemSpec extends AkkaSpec(ActorSystemSpec.config) with ImplicitSend implicit val system = ActorSystem("Stop", AkkaSpec.testConf) EventFilter[ActorKilledException]() intercept { system.actorSelection("/user") ! Kill - awaitCond(system.isTerminated) + Await.ready(system.whenTerminated, Duration.Inf) } } @@ -313,7 +321,7 @@ class ActorSystemSpec extends AkkaSpec(ActorSystemSpec.config) with ImplicitSend })) EventFilter[Exception]("hello") intercept { a ! "die" - awaitCond(system.isTerminated) + Await.ready(system.whenTerminated, Duration.Inf) } } diff --git a/akka-actor-tests/src/test/scala/akka/actor/Bench.scala b/akka-actor-tests/src/test/scala/akka/actor/Bench.scala index e5e65aeaa6..1351a3fea2 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/Bench.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/Bench.scala @@ -111,7 +111,7 @@ object Chameneos { val actor = system.actorOf(Props(new Mall(1000000, 4))) Thread.sleep(10000) println("Elapsed: " + (end - start)) - system.shutdown() + system.terminate() } def main(args: Array[String]): Unit = run() diff --git a/akka-actor/src/main/scala/akka/Main.scala b/akka-actor/src/main/scala/akka/Main.scala index 4ba69afc2a..1306c441bc 100644 --- a/akka-actor/src/main/scala/akka/Main.scala +++ b/akka-actor/src/main/scala/akka/Main.scala @@ -33,7 +33,7 @@ object Main { val app = system.actorOf(Props(appClass), "app") val terminator = system.actorOf(Props(classOf[Terminator], app), "app-terminator") } catch { - case NonFatal(e) ⇒ system.shutdown(); throw e + case NonFatal(e) ⇒ system.terminate(); throw e } } } @@ -43,7 +43,7 @@ object Main { def receive = { case Terminated(_) ⇒ log.info("application supervisor has terminated, shutting down") - context.system.shutdown() + context.system.terminate() } } diff --git a/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala b/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala index 101b2538c6..a5ea5bf39a 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala @@ -156,7 +156,7 @@ trait ActorRefProvider { * This Future is completed upon termination of this ActorRefProvider, which * is usually initiated by stopping the guardian via ActorSystem.stop(). */ - def terminationFuture: Future[Unit] + def terminationFuture: Future[Terminated] /** * Obtain the address which is to be used within sender references when @@ -463,45 +463,53 @@ private[akka] class LocalActorRefProvider private[akka] ( override val deadLetters: InternalActorRef = _deadLetters.getOrElse((p: ActorPath) ⇒ new DeadLetterActorRef(this, p, eventStream)).apply(rootPath / "deadLetters") + private[this] final val terminationPromise: Promise[Terminated] = Promise[Terminated]() + + def terminationFuture: Future[Terminated] = terminationPromise.future + /* * generate name for temporary actor refs */ private val tempNumber = new AtomicLong - private def tempName() = Helpers.base64(tempNumber.getAndIncrement()) - private val tempNode = rootPath / "temp" - override def tempPath(): ActorPath = tempNode / tempName() + override def tempPath(): ActorPath = tempNode / Helpers.base64(tempNumber.getAndIncrement()) /** * Top-level anchor for the supervision hierarchy of this actor system. Will * receive only Supervise/ChildTerminated system messages or Failure message. */ private[akka] val theOneWhoWalksTheBubblesOfSpaceTime: InternalActorRef = new MinimalActorRef { - val stopped = new Switch(false) - - @volatile - var causeOfTermination: Option[Throwable] = None + val causeOfTermination: Promise[Terminated] = Promise[Terminated]() val path = rootPath / "bubble-walker" def provider: ActorRefProvider = LocalActorRefProvider.this - override def stop(): Unit = stopped switchOn { terminationPromise.complete(causeOfTermination.map(Failure(_)).getOrElse(Success(()))) } - @deprecated("Use context.watch(actor) and receive Terminated(actor)", "2.2") override def isTerminated: Boolean = stopped.isOn + def isWalking = causeOfTermination.future.isCompleted == false - override def !(message: Any)(implicit sender: ActorRef = Actor.noSender): Unit = stopped.ifOff(message match { - case null ⇒ throw new InvalidMessageException("Message is null") - case _ ⇒ log.error(s"$this received unexpected message [$message]") - }) + override def stop(): Unit = { + causeOfTermination.trySuccess(Terminated(provider.rootGuardian)(existenceConfirmed = true, addressTerminated = true)) //Idempotent + terminationPromise.tryCompleteWith(causeOfTermination.future) // Signal termination downstream, idempotent + } - override def sendSystemMessage(message: SystemMessage): Unit = stopped ifOff { + @deprecated("Use context.watch(actor) and receive Terminated(actor)", "2.2") + override def isTerminated: Boolean = !isWalking + + override def !(message: Any)(implicit sender: ActorRef = Actor.noSender): Unit = + if (isWalking) + message match { + case null ⇒ throw new InvalidMessageException("Message is null") + case _ ⇒ log.error(s"$this received unexpected message [$message]") + } + + override def sendSystemMessage(message: SystemMessage): Unit = if (isWalking) { message match { - case Failed(child, ex, _) ⇒ + case Failed(child: InternalActorRef, ex, _) ⇒ log.error(ex, s"guardian $child failed, shutting down!") - causeOfTermination = Some(ex) - child.asInstanceOf[InternalActorRef].stop() + causeOfTermination.tryFailure(ex) + child.stop() case Supervise(_, _) ⇒ // TODO register child in some map to keep track of it and enable shutdown after all dead case _: DeathWatchNotification ⇒ stop() case _ ⇒ log.error(s"$this received unexpected system message [$message]") @@ -519,10 +527,6 @@ private[akka] class LocalActorRefProvider private[akka] ( @volatile private var system: ActorSystemImpl = _ - lazy val terminationPromise: Promise[Unit] = Promise[Unit]() - - def terminationFuture: Future[Unit] = terminationPromise.future - @volatile private var extraNames: Map[String, InternalActorRef] = Map() diff --git a/akka-actor/src/main/scala/akka/actor/ActorSystem.scala b/akka-actor/src/main/scala/akka/actor/ActorSystem.scala index 5ad6f94334..da31b9a28e 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorSystem.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorSystem.scala @@ -6,6 +6,7 @@ package akka.actor import java.io.Closeable import java.util.concurrent.{ ConcurrentHashMap, ThreadFactory, CountDownLatch, TimeoutException, RejectedExecutionException } +import java.util.concurrent.atomic.{ AtomicReference } import java.util.concurrent.TimeUnit.MILLISECONDS import com.typesafe.config.{ Config, ConfigFactory } import akka.event._ @@ -17,7 +18,7 @@ import akka.util._ import scala.annotation.tailrec import scala.collection.immutable import scala.concurrent.duration.{ FiniteDuration, Duration } -import scala.concurrent.{ Await, Awaitable, CanAwait, Future, ExecutionContext, ExecutionContextExecutor } +import scala.concurrent.{ Await, Future, Promise, ExecutionContext, ExecutionContextExecutor } import scala.util.{ Failure, Success, Try } import scala.util.control.{ NonFatal, ControlThrowable } @@ -391,12 +392,14 @@ abstract class ActorSystem extends ActorRefFactory { * * @throws TimeoutException in case of timeout */ + @deprecated("Use Await.result(whenTerminated, timeout) instead", "2.4") def awaitTermination(timeout: Duration): Unit /** * Block current thread until the system has been shutdown. This will * block until after all on termination callbacks have been run. */ + @deprecated("Use Await.result(whenTerminated, Duration.Inf) instead", "2.4") def awaitTermination(): Unit /** @@ -405,6 +408,7 @@ abstract class ActorSystem extends ActorRefFactory { * (below which the logging actors reside) and the execute all registered * termination handlers (see [[ActorSystem.registerOnTermination]]). */ + @deprecated("Use the terminate() method instead", "2.4") def shutdown(): Unit /** @@ -414,8 +418,23 @@ abstract class ActorSystem extends ActorRefFactory { * returns `false`, the status is actually unknown, since it might have * changed since you queried it. */ + @deprecated("Use the whenTerminated method instead.", "2.4") def isTerminated: Boolean + /** + * Terminates this actor system. This will stop the guardian actor, which in turn + * will recursively stop all its child actors, then the system guardian + * (below which the logging actors reside) and the execute all registered + * termination handlers (see [[ActorSystem.registerOnTermination]]). + */ + def terminate(): Future[Terminated] + + /** + * Returns a Future which will be completed after the ActorSystem has been terminated + * and termination hooks have been executed. + */ + def whenTerminated: Future[Terminated] + /** * Registers the provided extension and creates its payload, if this extension isn't already registered * This method has putIfAbsent-semantics, this method can potentially block, waiting for the initialization @@ -533,7 +552,7 @@ private[akka] class ActorSystemImpl(val name: String, applicationConfig: Config, } } else { log.error(cause, "Uncaught fatal error from thread [{}] shutting down ActorSystem [{}]", thread.getName, name) - shutdown() + terminate() } } } @@ -618,7 +637,9 @@ private[akka] class ActorSystemImpl(val name: String, applicationConfig: Config, override def reportFailure(t: Throwable): Unit = dispatcher reportFailure t }) - def terminationFuture: Future[Unit] = provider.terminationFuture + private[this] final val terminationCallbacks = new TerminationCallbacks(provider.terminationFuture)(dispatcher) + + override def whenTerminated: Future[Terminated] = terminationCallbacks.terminationFuture def lookupRoot: InternalActorRef = provider.rootGuardian def guardian: LocalActorRef = provider.guardian def systemGuardian: LocalActorRef = provider.systemGuardian @@ -638,29 +659,23 @@ private[akka] class ActorSystemImpl(val name: String, applicationConfig: Config, this } catch { case NonFatal(e) ⇒ - try { - shutdown() - } catch { case NonFatal(_) ⇒ Try(stopScheduler()) } + try terminate() catch { case NonFatal(_) ⇒ Try(stopScheduler()) } throw e } def start(): this.type = _start - - private lazy val terminationCallbacks = { - implicit val d = dispatcher - val callbacks = new TerminationCallbacks - terminationFuture onComplete (_ ⇒ callbacks.run) - callbacks - } def registerOnTermination[T](code: ⇒ T) { registerOnTermination(new Runnable { def run = code }) } def registerOnTermination(code: Runnable) { terminationCallbacks.add(code) } - def awaitTermination(timeout: Duration) { Await.ready(terminationCallbacks, timeout) } - def awaitTermination() = awaitTermination(Duration.Inf) - def isTerminated = terminationCallbacks.isTerminated + override def awaitTermination(timeout: Duration) { Await.ready(whenTerminated, timeout) } + override def awaitTermination() = awaitTermination(Duration.Inf) + override def isTerminated = whenTerminated.isCompleted - def shutdown(): Unit = { + override def shutdown(): Unit = terminate() + + override def terminate(): Future[Terminated] = { if (!settings.LogDeadLettersDuringShutdown) logDeadLetterListener foreach stop guardian.stop() + whenTerminated } @volatile var aborting = false @@ -673,7 +688,7 @@ private[akka] class ActorSystemImpl(val name: String, applicationConfig: Config, */ def abort(): Unit = { aborting = true - shutdown() + terminate() } //#create-scheduler @@ -798,44 +813,31 @@ private[akka] class ActorSystemImpl(val name: String, applicationConfig: Config, printNode(lookupRoot, "") } - final class TerminationCallbacks extends Runnable with Awaitable[Unit] { - private val lock = new ReentrantGuard - private var callbacks: List[Runnable] = _ //non-volatile since guarded by the lock - lock withGuard { callbacks = Nil } + final class TerminationCallbacks[T](upStreamTerminated: Future[T])(implicit ec: ExecutionContext) { + private[this] final val done = Promise[T]() + private[this] final val ref = new AtomicReference(done) - private val latch = new CountDownLatch(1) + // onComplete never fires twice so safe to avoid nullcheck + upStreamTerminated onComplete { t ⇒ ref.getAndSet(null).complete(t) } - final def add(callback: Runnable): Unit = { - latch.getCount match { - case 0 ⇒ throw new RejectedExecutionException("Must be called prior to system shutdown.") - case _ ⇒ lock withGuard { - if (latch.getCount == 0) throw new RejectedExecutionException("Must be called prior to system shutdown.") - else callbacks ::= callback - } + /** + * Adds a Runnable that will be executed on ActorSystem termination. + * Note that callbacks are executed in reverse order of insertion. + * @param r The callback to be executed on ActorSystem termination + * @throws RejectedExecutionException if called after ActorSystem has been terminated + */ + final def add(r: Runnable): Unit = { + @tailrec def addRec(r: Runnable, p: Promise[T]): Unit = ref.get match { + case null ⇒ throw new RejectedExecutionException("ActorSystem already terminated.") + case some if ref.compareAndSet(some, p) ⇒ some.completeWith(p.future.andThen { case _ ⇒ r.run() }) + case _ ⇒ addRec(r, p) } + addRec(r, Promise[T]()) } - final def run(): Unit = lock withGuard { - @tailrec def runNext(c: List[Runnable]): List[Runnable] = c match { - case Nil ⇒ Nil - case callback :: rest ⇒ - try callback.run() catch { case NonFatal(e) ⇒ log.error(e, "Failed to run termination callback, due to [{}]", e.getMessage) } - runNext(rest) - } - try { callbacks = runNext(callbacks) } finally latch.countDown() - } - - final def ready(atMost: Duration)(implicit permit: CanAwait): this.type = { - if (atMost.isFinite()) { - if (!latch.await(atMost.length, atMost.unit)) - throw new TimeoutException("Await termination timed out after [%s]" format (atMost.toString)) - } else latch.await() - - this - } - - final def result(atMost: Duration)(implicit permit: CanAwait): Unit = ready(atMost) - - final def isTerminated: Boolean = latch.getCount == 0 + /** + * Returns a Future which will be completed once all registered callbacks have been executed. + */ + def terminationFuture: Future[T] = done.future } } diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/ClusterDeathWatchSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/ClusterDeathWatchSpec.scala index 698a4a026f..6cf47df028 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/ClusterDeathWatchSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/ClusterDeathWatchSpec.scala @@ -4,6 +4,7 @@ package akka.cluster import language.postfixOps +import scala.concurrent.Await import scala.concurrent.duration._ import com.typesafe.config.ConfigFactory import org.scalatest.BeforeAndAfter @@ -237,7 +238,7 @@ abstract class ClusterDeathWatchSpec enterBarrier("first-unavailable") val timeout = remainingOrDefault - try system.awaitTermination(timeout) catch { + try Await.ready(system.whenTerminated, timeout) catch { case _: TimeoutException ⇒ fail("Failed to stop [%s] within [%s] \n%s".format(system.name, timeout, system.asInstanceOf[ActorSystemImpl].printTree)) diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/RestartFirstSeedNodeSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/RestartFirstSeedNodeSpec.scala index 225ad4fcb5..0c755b97c7 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/RestartFirstSeedNodeSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/RestartFirstSeedNodeSpec.scala @@ -53,10 +53,8 @@ abstract class RestartFirstSeedNodeSpec override def afterAll(): Unit = { runOn(seed1) { - if (seed1System.isTerminated) - shutdown(restartedSeed1System) - else - shutdown(seed1System) + shutdown( + if (seed1System.whenTerminated.isCompleted) restartedSeed1System else seed1System) } super.afterAll() } diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/UnreachableNodeJoinsAgainSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/UnreachableNodeJoinsAgainSpec.scala index d6ba103a9b..47c8ba73c1 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/UnreachableNodeJoinsAgainSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/UnreachableNodeJoinsAgainSpec.scala @@ -5,6 +5,7 @@ package akka.cluster import language.postfixOps import scala.collection.immutable +import scala.concurrent.Await import scala.concurrent.duration._ import com.typesafe.config.ConfigFactory import akka.actor.ActorSystem @@ -159,7 +160,7 @@ abstract class UnreachableNodeJoinsAgainSpec runOn(victim) { val victimAddress = system.asInstanceOf[ExtendedActorSystem].provider.getDefaultAddress - system.awaitTermination(10 seconds) + Await.ready(system.whenTerminated, 10 seconds) // create new ActorSystem with same host:port val freshSystem = ActorSystem(system.name, ConfigFactory.parseString(s""" akka.remote.netty.tcp { diff --git a/akka-contrib/src/test/scala/akka/contrib/mailbox/PeekMailboxSpec.scala b/akka-contrib/src/test/scala/akka/contrib/mailbox/PeekMailboxSpec.scala index 97fba0df76..d328ac1e86 100644 --- a/akka-contrib/src/test/scala/akka/contrib/mailbox/PeekMailboxSpec.scala +++ b/akka-contrib/src/test/scala/akka/contrib/mailbox/PeekMailboxSpec.scala @@ -108,7 +108,7 @@ class MyActor extends Actor { } override def postStop() { - context.system.shutdown() + context.system.terminate() } //#business-logic-elided } diff --git a/akka-contrib/src/test/scala/akka/contrib/pattern/ClusterSingletonProxySpec.scala b/akka-contrib/src/test/scala/akka/contrib/pattern/ClusterSingletonProxySpec.scala index 3cbbf67b06..e47e15d7ca 100644 --- a/akka-contrib/src/test/scala/akka/contrib/pattern/ClusterSingletonProxySpec.scala +++ b/akka-contrib/src/test/scala/akka/contrib/pattern/ClusterSingletonProxySpec.scala @@ -26,7 +26,7 @@ class ClusterSingletonProxySpec extends WordSpecLike with Matchers with BeforeAn } } - override def afterAll() = testSystems.foreach(_.system.shutdown()) + override def afterAll() = testSystems.foreach(_.system.terminate()) } object ClusterSingletonProxySpec { diff --git a/akka-docs/rst/java/code/docs/actor/japi/FaultHandlingDocSample.java b/akka-docs/rst/java/code/docs/actor/japi/FaultHandlingDocSample.java index 7392d3a48b..4535ff47d3 100644 --- a/akka-docs/rst/java/code/docs/actor/japi/FaultHandlingDocSample.java +++ b/akka-docs/rst/java/code/docs/actor/japi/FaultHandlingDocSample.java @@ -22,7 +22,6 @@ import com.typesafe.config.ConfigFactory; import static akka.japi.Util.classTag; -import static akka.actor.SupervisorStrategy.resume; import static akka.actor.SupervisorStrategy.restart; import static akka.actor.SupervisorStrategy.stop; import static akka.actor.SupervisorStrategy.escalate; @@ -76,12 +75,12 @@ public class FaultHandlingDocSample { log.info("Current progress: {} %", progress.percent); if (progress.percent >= 100.0) { log.info("That's all, shutting down"); - getContext().system().shutdown(); + getContext().system().terminate(); } } else if (msg == ReceiveTimeout.getInstance()) { // No progress within 15 seconds, ServiceUnavailable log.error("Shutting down due to unavailable service"); - getContext().system().shutdown(); + getContext().system().terminate(); } else { unhandled(msg); } diff --git a/akka-docs/rst/java/code/docs/io/japi/EchoServer.java b/akka-docs/rst/java/code/docs/io/japi/EchoServer.java index 5b54ef721b..98186cdaa0 100644 --- a/akka-docs/rst/java/code/docs/io/japi/EchoServer.java +++ b/akka-docs/rst/java/code/docs/io/japi/EchoServer.java @@ -28,7 +28,7 @@ public class EchoServer { watcher.tell(ackServer, ActorRef.noSender()); latch.await(10, TimeUnit.MINUTES); } finally { - system.shutdown(); + system.terminate(); } } diff --git a/akka-docs/rst/java/lambda-actors.rst b/akka-docs/rst/java/lambda-actors.rst index 8c3202a9ea..d13297e0bf 100644 --- a/akka-docs/rst/java/lambda-actors.rst +++ b/akka-docs/rst/java/lambda-actors.rst @@ -678,7 +678,7 @@ actors does not respond (i.e. processing a message for extended periods of time and therefore not receiving the stop command), this whole process will be stuck. -Upon :meth:`ActorSystem.shutdown()`, the system guardian actors will be +Upon :meth:`ActorSystem.terminate()`, the system guardian actors will be stopped, and the aforementioned process will ensure proper termination of the whole system. diff --git a/akka-docs/rst/java/untyped-actors.rst b/akka-docs/rst/java/untyped-actors.rst index 63416f9694..41d62d63b0 100644 --- a/akka-docs/rst/java/untyped-actors.rst +++ b/akka-docs/rst/java/untyped-actors.rst @@ -621,7 +621,7 @@ actors does not respond (i.e. processing a message for extended periods of time and therefore not receiving the stop command), this whole process will be stuck. -Upon :meth:`ActorSystem.shutdown()`, the system guardian actors will be +Upon :meth:`ActorSystem.terminate()`, the system guardian actors will be stopped, and the aforementioned process will ensure proper termination of the whole system. diff --git a/akka-docs/rst/scala/actors.rst b/akka-docs/rst/scala/actors.rst index 29ce42edc7..8050664f7b 100644 --- a/akka-docs/rst/scala/actors.rst +++ b/akka-docs/rst/scala/actors.rst @@ -672,7 +672,7 @@ actors does not respond (i.e. processing a message for extended periods of time and therefore not receiving the stop command), this whole process will be stuck. -Upon :meth:`ActorSystem.shutdown()`, the system guardian actors will be +Upon :meth:`ActorSystem.terminate()`, the system guardian actors will be stopped, and the aforementioned process will ensure proper termination of the whole system. diff --git a/akka-docs/rst/scala/code/docs/actor/FaultHandlingDocSample.scala b/akka-docs/rst/scala/code/docs/actor/FaultHandlingDocSample.scala index 5b097a6763..3e9454647e 100644 --- a/akka-docs/rst/scala/code/docs/actor/FaultHandlingDocSample.scala +++ b/akka-docs/rst/scala/code/docs/actor/FaultHandlingDocSample.scala @@ -53,13 +53,13 @@ class Listener extends Actor with ActorLogging { log.info("Current progress: {} %", percent) if (percent >= 100.0) { log.info("That's all, shutting down") - context.system.shutdown() + context.system.terminate() } case ReceiveTimeout => // No progress within 15 seconds, ServiceUnavailable log.error("Shutting down due to unavailable service") - context.system.shutdown() + context.system.terminate() } } diff --git a/akka-docs/rst/scala/code/docs/io/EchoServer.scala b/akka-docs/rst/scala/code/docs/io/EchoServer.scala index f63008b2cb..0b515a709c 100644 --- a/akka-docs/rst/scala/code/docs/io/EchoServer.scala +++ b/akka-docs/rst/scala/code/docs/io/EchoServer.scala @@ -22,7 +22,7 @@ object EchoServer extends App { // make sure to stop the system so that the application stops try run() - finally system.shutdown() + finally system.terminate() def run(): Unit = { import ActorDSL._ diff --git a/akka-docs/rst/scala/code/docs/io/ReadBackPressure.scala b/akka-docs/rst/scala/code/docs/io/ReadBackPressure.scala index 8d0d0f0c3c..57956ce31c 100644 --- a/akka-docs/rst/scala/code/docs/io/ReadBackPressure.scala +++ b/akka-docs/rst/scala/code/docs/io/ReadBackPressure.scala @@ -10,6 +10,9 @@ import java.net.InetSocketAddress import akka.testkit.{ ImplicitSender, TestProbe, AkkaSpec } import akka.util.ByteString +import scala.concurrent.Await +import scala.concurrent.duration.Duration + object PullReadingExample { class Listener(monitor: ActorRef) extends Actor { @@ -77,7 +80,6 @@ class PullReadingSpec extends AkkaSpec with ImplicitSender { client.send(connection, ResumeReading) client.expectMsg(Received(ByteString("hello"))) - system.shutdown() - system.awaitTermination + Await.ready(system.terminate(), Duration.Inf) } } diff --git a/akka-docs/rst/scala/code/docs/persistence/PersistenceDocSpec.scala b/akka-docs/rst/scala/code/docs/persistence/PersistenceDocSpec.scala index 1528708ddf..c09e1b87cb 100644 --- a/akka-docs/rst/scala/code/docs/persistence/PersistenceDocSpec.scala +++ b/akka-docs/rst/scala/code/docs/persistence/PersistenceDocSpec.scala @@ -322,7 +322,7 @@ trait PersistenceDocSpec { processor ! PersistentBatch(List(Persistent("a"), Persistent("b"))) //#batch-write - system.shutdown() + system.terminate() } new AnyRef { diff --git a/akka-docs/rst/scala/code/docs/testkit/TestkitDocSpec.scala b/akka-docs/rst/scala/code/docs/testkit/TestkitDocSpec.scala index 2a0513bc31..a56f129b22 100644 --- a/akka-docs/rst/scala/code/docs/testkit/TestkitDocSpec.scala +++ b/akka-docs/rst/scala/code/docs/testkit/TestkitDocSpec.scala @@ -284,7 +284,7 @@ class TestkitDocSpec extends AkkaSpec with DefaultTimeout with ImplicitSender { //#put-your-test-code-here val probe = TestProbe() probe.send(testActor, "hello") - try expectMsg("hello") catch { case NonFatal(e) => system.shutdown(); throw e } + try expectMsg("hello") catch { case NonFatal(e) => system.terminate(); throw e } //#put-your-test-code-here shutdown(system) diff --git a/akka-kernel/src/main/scala/akka/kernel/Main.scala b/akka-kernel/src/main/scala/akka/kernel/Main.scala index 6933e642b3..79cf45172c 100644 --- a/akka-kernel/src/main/scala/akka/kernel/Main.scala +++ b/akka-kernel/src/main/scala/akka/kernel/Main.scala @@ -29,7 +29,7 @@ import scala.collection.JavaConverters._ * } * * def shutdown = { - * system.shutdown() + * system.terminate() * } * } * }}} diff --git a/akka-multi-node-testkit/src/main/scala/akka/remote/testconductor/Player.scala b/akka-multi-node-testkit/src/main/scala/akka/remote/testconductor/Player.scala index 80cd9a87fa..e947a8e7be 100644 --- a/akka-multi-node-testkit/src/main/scala/akka/remote/testconductor/Player.scala +++ b/akka-multi-node-testkit/src/main/scala/akka/remote/testconductor/Player.scala @@ -31,9 +31,11 @@ trait Player { this: TestConductorExt ⇒ private var _client: ActorRef = _ private def client = _client match { - case null ⇒ throw new IllegalStateException("TestConductor client not yet started") - case _ if system.isTerminated ⇒ throw new IllegalStateException("TestConductor unavailable because system is shutdown; you need to startNewSystem() before this point") - case x ⇒ x + case null ⇒ + throw new IllegalStateException("TestConductor client not yet started") + case _ if system.whenTerminated.isCompleted ⇒ + throw new IllegalStateException("TestConductor unavailable because system is terminated; you need to startNewSystem() before this point") + case x ⇒ x } /** @@ -241,7 +243,7 @@ private[akka] class ClientFSM(name: RoleName, controllerAddr: InetSocketAddress) // FIXME: Currently ignoring, needs support from Remoting stay case TerminateMsg(Left(false)) ⇒ - context.system.shutdown() + context.system.terminate() stay case TerminateMsg(Left(true)) ⇒ context.system.asInstanceOf[ActorSystemImpl].abort() diff --git a/akka-osgi/src/main/scala/akka/osgi/ActorSystemActivator.scala b/akka-osgi/src/main/scala/akka/osgi/ActorSystemActivator.scala index a67b414242..10c3b2863a 100644 --- a/akka-osgi/src/main/scala/akka/osgi/ActorSystemActivator.scala +++ b/akka-osgi/src/main/scala/akka/osgi/ActorSystemActivator.scala @@ -82,7 +82,7 @@ abstract class ActorSystemActivator extends BundleActivator { */ def stop(context: BundleContext): Unit = { registration foreach (_.unregister()) - system foreach (_.shutdown()) + system foreach (_.terminate()) } /** diff --git a/akka-osgi/src/test/scala/akka/osgi/ActorSystemActivatorTest.scala b/akka-osgi/src/test/scala/akka/osgi/ActorSystemActivatorTest.scala index ea9f82afb4..9ea6d5a3ca 100644 --- a/akka-osgi/src/test/scala/akka/osgi/ActorSystemActivatorTest.scala +++ b/akka-osgi/src/test/scala/akka/osgi/ActorSystemActivatorTest.scala @@ -51,11 +51,11 @@ class PingPongActorSystemActivatorTest extends WordSpec with Matchers with PojoS "stop the ActorSystem when bundle stops" in { filterErrors() { val system = serviceForType[ActorSystem] - system.isTerminated should be(false) + system.whenTerminated.isCompleted should be(false) bundleForName(TEST_BUNDLE_NAME).stop() - system.awaitTermination() - system.isTerminated should be(true) + Await.ready(system.whenTerminated, Duration.Inf) + system.whenTerminated.isCompleted should be(true) } } } diff --git a/akka-persistence/src/test/scala/akka/persistence/serialization/SerializerSpec.scala b/akka-persistence/src/test/scala/akka/persistence/serialization/SerializerSpec.scala index e9efe922eb..7fd31e2875 100644 --- a/akka-persistence/src/test/scala/akka/persistence/serialization/SerializerSpec.scala +++ b/akka-persistence/src/test/scala/akka/persistence/serialization/SerializerSpec.scala @@ -13,6 +13,8 @@ import akka.testkit._ import akka.persistence.AtLeastOnceDelivery.AtLeastOnceDeliverySnapshot import akka.persistence.AtLeastOnceDelivery.UnconfirmedDelivery +import scala.concurrent.Await +import scala.concurrent.duration.Duration object SerializerSpecConfigs { val customSerializers = ConfigFactory.parseString( @@ -202,8 +204,7 @@ class MessageSerializerRemotingSpec extends AkkaSpec(remote.withFallback(customS } override def afterTermination() { - remoteSystem.shutdown() - remoteSystem.awaitTermination() + Await.ready(remoteSystem.terminate(), Duration.Inf) } "A message serializer" must { diff --git a/akka-remote-tests/src/multi-jvm/scala/akka/remote/RemoteDeploymentDeathWatchSpec.scala b/akka-remote-tests/src/multi-jvm/scala/akka/remote/RemoteDeploymentDeathWatchSpec.scala index 93d353d528..b3ce6dc60f 100644 --- a/akka-remote-tests/src/multi-jvm/scala/akka/remote/RemoteDeploymentDeathWatchSpec.scala +++ b/akka-remote-tests/src/multi-jvm/scala/akka/remote/RemoteDeploymentDeathWatchSpec.scala @@ -5,6 +5,7 @@ package akka.remote import language.postfixOps import java.util.concurrent.TimeoutException +import scala.concurrent.Await import scala.concurrent.duration._ import com.typesafe.config.ConfigFactory import akka.actor.Actor @@ -76,8 +77,9 @@ abstract class RemoteDeploymentDeathWatchSpec sleep() // if the remote deployed actor is not removed the system will not shutdown + val timeout = remainingOrDefault - try system.awaitTermination(timeout) catch { + try Await.ready(system.whenTerminated, timeout) catch { case _: TimeoutException ⇒ fail("Failed to stop [%s] within [%s] \n%s".format(system.name, timeout, system.asInstanceOf[ActorSystemImpl].printTree)) diff --git a/akka-remote-tests/src/multi-jvm/scala/akka/remote/RemoteNodeRestartDeathWatchSpec.scala b/akka-remote-tests/src/multi-jvm/scala/akka/remote/RemoteNodeRestartDeathWatchSpec.scala index 5e5e1f0bc1..916c12279d 100644 --- a/akka-remote-tests/src/multi-jvm/scala/akka/remote/RemoteNodeRestartDeathWatchSpec.scala +++ b/akka-remote-tests/src/multi-jvm/scala/akka/remote/RemoteNodeRestartDeathWatchSpec.scala @@ -4,6 +4,7 @@ package akka.remote import language.postfixOps +import scala.concurrent.Await import scala.concurrent.duration._ import com.typesafe.config.ConfigFactory import akka.actor.Actor @@ -40,7 +41,7 @@ object RemoteNodeRestartDeathWatchMultiJvmSpec extends MultiNodeConfig { def receive = { case "shutdown" ⇒ sender() ! "shutdown-ack" - context.system.shutdown() + context.system.terminate() case msg ⇒ sender() ! msg } } @@ -100,7 +101,7 @@ abstract class RemoteNodeRestartDeathWatchSpec enterBarrier("watch-established") - system.awaitTermination(30.seconds) + Await.ready(system.whenTerminated, 30.seconds) val freshSystem = ActorSystem(system.name, ConfigFactory.parseString(s""" akka.remote.netty.tcp { @@ -110,7 +111,7 @@ abstract class RemoteNodeRestartDeathWatchSpec """).withFallback(system.settings.config)) freshSystem.actorOf(Props[Subject], "subject") - freshSystem.awaitTermination(30.seconds) + Await.ready(freshSystem.whenTerminated, 30.seconds) } } diff --git a/akka-remote-tests/src/multi-jvm/scala/akka/remote/RemoteNodeShutdownAndComesBackSpec.scala b/akka-remote-tests/src/multi-jvm/scala/akka/remote/RemoteNodeShutdownAndComesBackSpec.scala index e6afb20416..5fcd9998a1 100644 --- a/akka-remote-tests/src/multi-jvm/scala/akka/remote/RemoteNodeShutdownAndComesBackSpec.scala +++ b/akka-remote-tests/src/multi-jvm/scala/akka/remote/RemoteNodeShutdownAndComesBackSpec.scala @@ -37,7 +37,7 @@ object RemoteNodeShutdownAndComesBackSpec extends MultiNodeConfig { class Subject extends Actor { def receive = { - case "shutdown" ⇒ context.system.shutdown() + case "shutdown" ⇒ context.system.terminate() case msg ⇒ sender() ! msg } } @@ -133,7 +133,7 @@ abstract class RemoteNodeShutdownAndComesBackSpec enterBarrier("watch-established") - system.awaitTermination(30.seconds) + Await.ready(system.whenTerminated, 30.seconds) val freshSystem = ActorSystem(system.name, ConfigFactory.parseString(s""" akka.remote.netty.tcp { @@ -143,7 +143,7 @@ abstract class RemoteNodeShutdownAndComesBackSpec """).withFallback(system.settings.config)) freshSystem.actorOf(Props[Subject], "subject") - freshSystem.awaitTermination(30.seconds) + Await.ready(freshSystem.whenTerminated, 30.seconds) } } diff --git a/akka-remote-tests/src/multi-jvm/scala/akka/remote/RemoteQuarantinePiercingSpec.scala b/akka-remote-tests/src/multi-jvm/scala/akka/remote/RemoteQuarantinePiercingSpec.scala index f73e3980ce..b966841563 100644 --- a/akka-remote-tests/src/multi-jvm/scala/akka/remote/RemoteQuarantinePiercingSpec.scala +++ b/akka-remote-tests/src/multi-jvm/scala/akka/remote/RemoteQuarantinePiercingSpec.scala @@ -32,8 +32,8 @@ object RemoteQuarantinePiercingSpec extends MultiNodeConfig { class Subject extends Actor { def receive = { - case "shutdown" ⇒ context.system.shutdown() - case "identify" ⇒ sender() ! (AddressUidExtension(context.system).addressUid, self) + case "shutdown" ⇒ context.system.terminate() + case "identify" ⇒ sender() ! (AddressUidExtension(context.system).addressUid -> self) } } @@ -100,7 +100,7 @@ abstract class RemoteQuarantinePiercingSpec extends MultiNodeSpec(RemoteQuaranti enterBarrier("actor-identified") - system.awaitTermination(30.seconds) + Await.ready(system.whenTerminated, 30.seconds) val freshSystem = ActorSystem(system.name, ConfigFactory.parseString(s""" akka.remote.netty.tcp { @@ -110,7 +110,7 @@ abstract class RemoteQuarantinePiercingSpec extends MultiNodeSpec(RemoteQuaranti """).withFallback(system.settings.config)) freshSystem.actorOf(Props[Subject], "subject") - freshSystem.awaitTermination(30.seconds) + Await.ready(freshSystem.whenTerminated, 30.seconds) } } diff --git a/akka-remote-tests/src/multi-jvm/scala/akka/remote/RemoteReDeploymentSpec.scala b/akka-remote-tests/src/multi-jvm/scala/akka/remote/RemoteReDeploymentSpec.scala index 0a20971fdb..42f5a990f6 100644 --- a/akka-remote-tests/src/multi-jvm/scala/akka/remote/RemoteReDeploymentSpec.scala +++ b/akka-remote-tests/src/multi-jvm/scala/akka/remote/RemoteReDeploymentSpec.scala @@ -10,6 +10,7 @@ import akka.actor.Props import akka.remote.transport.ThrottlerTransportAdapter.Direction._ import com.typesafe.config.ConfigFactory import akka.actor.ActorSystem +import scala.concurrent.Await import scala.concurrent.duration._ import akka.actor.ActorLogging import akka.remote.testconductor.TestConductor @@ -124,7 +125,7 @@ abstract class RemoteReDeploymentMultiJvmSpec extends MultiNodeSpec(RemoteReDepl var sys: ActorSystem = null runOn(second) { - system.awaitTermination(30.seconds) + Await.ready(system.whenTerminated, 30.seconds) expectNoMsg(sleepAfterKill) sys = startNewSystem() } diff --git a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala index 4655aea6be..0ed665220f 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala @@ -135,7 +135,7 @@ private[akka] class RemoteActorRefProvider( override def rootGuardian: InternalActorRef = local.rootGuardian override def guardian: LocalActorRef = local.guardian override def systemGuardian: LocalActorRef = local.systemGuardian - override def terminationFuture: Future[Unit] = local.terminationFuture + override def terminationFuture: Future[Terminated] = local.terminationFuture override def registerTempActor(actorRef: InternalActorRef, path: ActorPath): Unit = local.registerTempActor(actorRef, path) override def unregisterTempActor(path: ActorPath): Unit = local.unregisterTempActor(path) override def tempPath(): ActorPath = local.tempPath() diff --git a/akka-samples/akka-docs-java-lambda/src/test/java/docs/actor/ActorDocTest.java b/akka-samples/akka-docs-java-lambda/src/test/java/docs/actor/ActorDocTest.java index 4caa42f60d..b30085a147 100644 --- a/akka-samples/akka-docs-java-lambda/src/test/java/docs/actor/ActorDocTest.java +++ b/akka-samples/akka-docs-java-lambda/src/test/java/docs/actor/ActorDocTest.java @@ -5,8 +5,6 @@ package docs.actor; import akka.actor.*; -import akka.event.LoggingAdapter; -import akka.event.Logging; import akka.japi.pf.ReceiveBuilder; import akka.testkit.ErrorFilter; import akka.testkit.EventFilter; @@ -65,9 +63,8 @@ public class ActorDocTest { } @AfterClass - public static void afterClass() { - system.shutdown(); - system.awaitTermination(Duration.create("5 seconds")); + public static void afterClass() throws Exception { + Await.ready(system.terminate(), Duration.create("5 seconds")); } static @@ -316,7 +313,7 @@ public class ActorDocTest { swapper.tell(Swap, ActorRef.noSender()); // logs Ho swapper.tell(Swap, ActorRef.noSender()); // logs Hi swapper.tell(Swap, ActorRef.noSender()); // logs Ho - system.shutdown(); + system.terminate(); } } //#swapper diff --git a/akka-samples/akka-docs-java-lambda/src/test/java/docs/actor/InitializationDocTest.java b/akka-samples/akka-docs-java-lambda/src/test/java/docs/actor/InitializationDocTest.java index 67c22748b8..b799e12083 100644 --- a/akka-samples/akka-docs-java-lambda/src/test/java/docs/actor/InitializationDocTest.java +++ b/akka-samples/akka-docs-java-lambda/src/test/java/docs/actor/InitializationDocTest.java @@ -10,11 +10,11 @@ import akka.testkit.JavaTestKit; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; -import scala.PartialFunction; import scala.concurrent.duration.Duration; -import scala.runtime.BoxedUnit; +import scala.concurrent.Await; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; public class InitializationDocTest { @@ -26,9 +26,8 @@ public class InitializationDocTest { } @AfterClass - public static void afterClass() { - system.shutdown(); - system.awaitTermination(Duration.create("5 seconds")); + public static void afterClass() throws Exception { + Await.ready(system.terminate(), Duration.create("5 seconds")); } public static class MessageInitExample extends AbstractActor { diff --git a/akka-samples/akka-docs-java-lambda/src/test/java/docs/actor/japi/FaultHandlingDocSample.java b/akka-samples/akka-docs-java-lambda/src/test/java/docs/actor/japi/FaultHandlingDocSample.java index 1588709d9f..43eaf266da 100644 --- a/akka-samples/akka-docs-java-lambda/src/test/java/docs/actor/japi/FaultHandlingDocSample.java +++ b/akka-samples/akka-docs-java-lambda/src/test/java/docs/actor/japi/FaultHandlingDocSample.java @@ -19,11 +19,8 @@ import akka.util.Timeout; import com.typesafe.config.Config; import com.typesafe.config.ConfigFactory; import scala.concurrent.duration.Duration; -import scala.PartialFunction; -import scala.runtime.BoxedUnit; import static akka.japi.Util.classTag; -import static akka.actor.SupervisorStrategy.resume; import static akka.actor.SupervisorStrategy.restart; import static akka.actor.SupervisorStrategy.stop; import static akka.actor.SupervisorStrategy.escalate; @@ -79,13 +76,13 @@ public class FaultHandlingDocSample { log().info("Current progress: {} %", progress.percent); if (progress.percent >= 100.0) { log().info("That's all, shutting down"); - context().system().shutdown(); + context().system().terminate(); } }). matchEquals(ReceiveTimeout.getInstance(), x -> { // No progress within 15 seconds, ServiceUnavailable log().error("Shutting down due to unavailable service"); - context().system().shutdown(); + context().system().terminate(); }).build(), context() )); } diff --git a/akka-samples/akka-sample-hello-kernel/src/main/java/sample/kernel/hello/java/HelloKernel.java b/akka-samples/akka-sample-hello-kernel/src/main/java/sample/kernel/hello/java/HelloKernel.java index bb8f1de929..2c5676069f 100644 --- a/akka-samples/akka-sample-hello-kernel/src/main/java/sample/kernel/hello/java/HelloKernel.java +++ b/akka-samples/akka-sample-hello-kernel/src/main/java/sample/kernel/hello/java/HelloKernel.java @@ -38,6 +38,6 @@ public class HelloKernel implements Bootable { } public void shutdown() { - system.shutdown(); + system.terminate(); } } diff --git a/akka-samples/akka-sample-hello-kernel/src/main/scala/sample/kernel/hello/HelloKernel.scala b/akka-samples/akka-sample-hello-kernel/src/main/scala/sample/kernel/hello/HelloKernel.scala index 7ebc78b5bf..5a336d29ca 100644 --- a/akka-samples/akka-sample-hello-kernel/src/main/scala/sample/kernel/hello/HelloKernel.scala +++ b/akka-samples/akka-sample-hello-kernel/src/main/scala/sample/kernel/hello/HelloKernel.scala @@ -29,6 +29,6 @@ class HelloKernel extends Bootable { } def shutdown = { - system.shutdown() + system.terminate() } } diff --git a/akka-samples/akka-sample-main-java-lambda/src/main/java/sample/hello/Main2.java b/akka-samples/akka-sample-main-java-lambda/src/main/java/sample/hello/Main2.java index cc67f6bbcc..e5618086e7 100644 --- a/akka-samples/akka-sample-main-java-lambda/src/main/java/sample/hello/Main2.java +++ b/akka-samples/akka-sample-main-java-lambda/src/main/java/sample/hello/Main2.java @@ -21,7 +21,7 @@ public class Main2 { receive(ReceiveBuilder. match(Terminated.class, t -> { log().info("{} has terminated, shutting down system", ref.path()); - context().system().shutdown(); + context().system().terminate(); }).build()); } } diff --git a/akka-samples/akka-sample-main-java/src/main/java/sample/hello/Main2.java b/akka-samples/akka-sample-main-java/src/main/java/sample/hello/Main2.java index e4216b5246..23754bc840 100644 --- a/akka-samples/akka-sample-main-java/src/main/java/sample/hello/Main2.java +++ b/akka-samples/akka-sample-main-java/src/main/java/sample/hello/Main2.java @@ -30,7 +30,7 @@ public class Main2 { public void onReceive(Object msg) { if (msg instanceof Terminated) { log.info("{} has terminated, shutting down system", ref.path()); - getContext().system().shutdown(); + getContext().system().terminate(); } else { unhandled(msg); } diff --git a/akka-samples/akka-sample-main-scala/src/main/scala/sample/hello/Main2.scala b/akka-samples/akka-sample-main-scala/src/main/scala/sample/hello/Main2.scala index 52108e1f41..1d79c65206 100644 --- a/akka-samples/akka-sample-main-scala/src/main/scala/sample/hello/Main2.scala +++ b/akka-samples/akka-sample-main-scala/src/main/scala/sample/hello/Main2.scala @@ -20,7 +20,7 @@ object Main2 { def receive = { case Terminated(_) => log.info("{} has terminated, shutting down system", ref.path) - context.system.shutdown() + context.system.terminate() } } diff --git a/akka-samples/akka-sample-persistence-java-lambda/src/main/java/sample/persistence/PersistentActorExample.java b/akka-samples/akka-sample-persistence-java-lambda/src/main/java/sample/persistence/PersistentActorExample.java index 3313274aa8..e3bb11710c 100644 --- a/akka-samples/akka-sample-persistence-java-lambda/src/main/java/sample/persistence/PersistentActorExample.java +++ b/akka-samples/akka-sample-persistence-java-lambda/src/main/java/sample/persistence/PersistentActorExample.java @@ -125,6 +125,6 @@ public class PersistentActorExample { processor.tell("print", null); Thread.sleep(1000); - system.shutdown(); + system.terminate(); } } diff --git a/akka-samples/akka-sample-persistence-java-lambda/src/main/java/sample/persistence/SnapshotExample.java b/akka-samples/akka-sample-persistence-java-lambda/src/main/java/sample/persistence/SnapshotExample.java index 58113b85b5..8e7f4af8cc 100644 --- a/akka-samples/akka-sample-persistence-java-lambda/src/main/java/sample/persistence/SnapshotExample.java +++ b/akka-samples/akka-sample-persistence-java-lambda/src/main/java/sample/persistence/SnapshotExample.java @@ -88,6 +88,6 @@ public class SnapshotExample { persistentActor.tell("print", null); Thread.sleep(1000); - system.shutdown(); + system.terminate(); } } diff --git a/akka-samples/akka-sample-persistence-java/src/main/java/sample/persistence/PersistentActorExample.java b/akka-samples/akka-sample-persistence-java/src/main/java/sample/persistence/PersistentActorExample.java index 528e92500e..a10533d5b0 100644 --- a/akka-samples/akka-sample-persistence-java/src/main/java/sample/persistence/PersistentActorExample.java +++ b/akka-samples/akka-sample-persistence-java/src/main/java/sample/persistence/PersistentActorExample.java @@ -127,6 +127,6 @@ public class PersistentActorExample { persistentActor.tell("print", null); Thread.sleep(1000); - system.shutdown(); + system.terminate(); } } diff --git a/akka-samples/akka-sample-persistence-java/src/main/java/sample/persistence/SnapshotExample.java b/akka-samples/akka-sample-persistence-java/src/main/java/sample/persistence/SnapshotExample.java index d3734a7507..65cdeae01c 100644 --- a/akka-samples/akka-sample-persistence-java/src/main/java/sample/persistence/SnapshotExample.java +++ b/akka-samples/akka-sample-persistence-java/src/main/java/sample/persistence/SnapshotExample.java @@ -95,6 +95,6 @@ public class SnapshotExample { persistentActor.tell("print", null); Thread.sleep(1000); - system.shutdown(); + system.terminate(); } } diff --git a/akka-samples/akka-sample-persistence-scala/src/main/scala/sample/persistence/PersistentActorExample.scala b/akka-samples/akka-sample-persistence-scala/src/main/scala/sample/persistence/PersistentActorExample.scala index bf54a32da1..45ea16f9b6 100644 --- a/akka-samples/akka-sample-persistence-scala/src/main/scala/sample/persistence/PersistentActorExample.scala +++ b/akka-samples/akka-sample-persistence-scala/src/main/scala/sample/persistence/PersistentActorExample.scala @@ -56,5 +56,5 @@ object PersistentActorExample extends App { persistentActor ! "print" Thread.sleep(1000) - system.shutdown() + system.terminate() } diff --git a/akka-samples/akka-sample-persistence-scala/src/main/scala/sample/persistence/SnapshotExample.scala b/akka-samples/akka-sample-persistence-scala/src/main/scala/sample/persistence/SnapshotExample.scala index 492711ffb9..73e01a2137 100644 --- a/akka-samples/akka-sample-persistence-scala/src/main/scala/sample/persistence/SnapshotExample.scala +++ b/akka-samples/akka-sample-persistence-scala/src/main/scala/sample/persistence/SnapshotExample.scala @@ -44,5 +44,5 @@ object SnapshotExample extends App { persistentActor ! "print" Thread.sleep(1000) - system.shutdown() + system.terminate() } diff --git a/akka-samples/akka-sample-remote-scala/src/main/scala/sample/remote/benchmark/Receiver.scala b/akka-samples/akka-sample-remote-scala/src/main/scala/sample/remote/benchmark/Receiver.scala index 9cbd5fe249..f0f377f6d0 100644 --- a/akka-samples/akka-sample-remote-scala/src/main/scala/sample/remote/benchmark/Receiver.scala +++ b/akka-samples/akka-sample-remote-scala/src/main/scala/sample/remote/benchmark/Receiver.scala @@ -17,7 +17,7 @@ class Receiver extends Actor { def receive = { case m: Echo => sender() ! m - case Shutdown => context.system.shutdown() + case Shutdown => context.system.terminate() case _ => } } diff --git a/akka-samples/akka-sample-remote-scala/src/main/scala/sample/remote/benchmark/Sender.scala b/akka-samples/akka-sample-remote-scala/src/main/scala/sample/remote/benchmark/Sender.scala index f18e731670..2c856ce97d 100644 --- a/akka-samples/akka-sample-remote-scala/src/main/scala/sample/remote/benchmark/Sender.scala +++ b/akka-samples/akka-sample-remote-scala/src/main/scala/sample/remote/benchmark/Sender.scala @@ -104,7 +104,7 @@ class Sender(path: String, totalMessages: Int, burstSize: Int, payloadSize: Int) case Terminated(`actor`) => println("Receiver terminated") - context.system.shutdown() + context.system.terminate() } diff --git a/akka-samples/akka-sample-supervision-java-lambda/src/main/java/supervision/Main.java b/akka-samples/akka-sample-supervision-java-lambda/src/main/java/supervision/Main.java index ca0fd13099..199062af25 100644 --- a/akka-samples/akka-sample-supervision-java-lambda/src/main/java/supervision/Main.java +++ b/akka-samples/akka-sample-supervision-java-lambda/src/main/java/supervision/Main.java @@ -33,7 +33,6 @@ public class Main { Integer result = Await.result(ask(calculatorService, task, new Timeout(duration)).mapTo(classTag(Integer.class)), duration); System.out.println("Got result: " + result); - system.shutdown(); - system.awaitTermination(); + Await.ready(system.terminate(), Duration.Inf()); } } diff --git a/akka-testkit/src/main/scala/akka/testkit/TestKit.scala b/akka-testkit/src/main/scala/akka/testkit/TestKit.scala index d35a481964..4820f54ded 100644 --- a/akka-testkit/src/main/scala/akka/testkit/TestKit.scala +++ b/akka-testkit/src/main/scala/akka/testkit/TestKit.scala @@ -6,6 +6,7 @@ package akka.testkit import language.postfixOps import scala.annotation.{ varargs, tailrec } import scala.collection.immutable +import scala.concurrent.Await import scala.concurrent.duration._ import scala.reflect.ClassTag import java.util.concurrent._ @@ -698,10 +699,14 @@ trait TestKitBase { * * val test = system.actorOf(Props[SomeActor] * - * within (1 second) { - * test ! SomeWork - * expectMsg(Result1) // bounded to 1 second - * expectMsg(Result2) // bounded to the remainder of the 1 second + * within (1 second) { + * test ! SomeWork + * expectMsg(Result1) // bounded to 1 second + * expectMsg(Result2) // bounded to the remainder of the 1 second + * } + * + * } finally { + * system.terminate() * } * * } finally { @@ -768,8 +773,8 @@ object TestKit { def shutdownActorSystem(actorSystem: ActorSystem, duration: Duration = 10.seconds, verifySystemShutdown: Boolean = false): Unit = { - actorSystem.shutdown() - try actorSystem.awaitTermination(duration) catch { + actorSystem.terminate() + try Await.ready(actorSystem.whenTerminated, duration) catch { case _: TimeoutException ⇒ val msg = "Failed to stop [%s] within [%s] \n%s".format(actorSystem.name, duration, actorSystem.asInstanceOf[ActorSystemImpl].printTree) diff --git a/akka-testkit/src/test/scala/akka/testkit/DefaultTimeoutSpec.scala b/akka-testkit/src/test/scala/akka/testkit/DefaultTimeoutSpec.scala index a8877f989d..d353defca6 100644 --- a/akka-testkit/src/test/scala/akka/testkit/DefaultTimeoutSpec.scala +++ b/akka-testkit/src/test/scala/akka/testkit/DefaultTimeoutSpec.scala @@ -13,7 +13,7 @@ class DefaultTimeoutSpec implicit lazy val system = ActorSystem("AkkaCustomSpec") - override def afterAll = system.shutdown + override def afterAll = system.terminate "A spec with DefaultTimeout" should { "use timeout from settings" in { diff --git a/akka-testkit/src/test/scala/akka/testkit/ImplicitSenderSpec.scala b/akka-testkit/src/test/scala/akka/testkit/ImplicitSenderSpec.scala index a846827ccc..78601b54e4 100644 --- a/akka-testkit/src/test/scala/akka/testkit/ImplicitSenderSpec.scala +++ b/akka-testkit/src/test/scala/akka/testkit/ImplicitSenderSpec.scala @@ -13,7 +13,7 @@ class ImplicitSenderSpec implicit lazy val system = ActorSystem("AkkaCustomSpec") - override def afterAll = system.shutdown + override def afterAll = system.terminate "An ImplicitSender" should { "have testActor as its self" in { diff --git a/project/AkkaBuild.scala b/project/AkkaBuild.scala index 42e0d21126..1baa2d9763 100644 --- a/project/AkkaBuild.scala +++ b/project/AkkaBuild.scala @@ -457,7 +457,7 @@ object AkkaBuild extends Build { |var remoteConfig = ConfigFactory.parseString("akka.remote.netty{port=0,use-dispatcher-for-io=akka.actor.default-dispatcher,execution-pool-size=0},akka.actor.provider=akka.remote.RemoteActorRefProvider").withFallback(config) |var system: ActorSystem = null |implicit def _system = system - |def startSystem(remoting: Boolean = false) { system = ActorSystem("repl", if(remoting) remoteConfig else config); println("don’t forget to system.shutdown()!") } + |def startSystem(remoting: Boolean = false) { system = ActorSystem("repl", if(remoting) remoteConfig else config); println("don’t forget to system.terminate()!") } |implicit def ec = system.dispatcher |implicit val timeout = Timeout(5 seconds) |""".stripMargin,