diff --git a/akka-actor-tests/src/test/scala/akka/actor/ActorSystemSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/ActorSystemSpec.scala index 99bfadf455..bcda2595bb 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/ActorSystemSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/ActorSystemSpec.scala @@ -221,13 +221,16 @@ class ActorSystemSpec extends AkkaSpec(ActorSystemSpec.config) with ImplicitSend } "return isTerminated status correctly" in { - val system = ActorSystem() + val system = ActorSystem().asInstanceOf[ActorSystemImpl] system.isTerminated should be(false) val wt = system.whenTerminated wt.isCompleted should be(false) val f = system.terminate() - Await.ready(wt, 10 seconds) - Await.ready(f, 10 seconds) + val terminated = Await.result(wt, 10 seconds) + terminated.actor should be(system.provider.rootGuardian) + terminated.addressTerminated should be(true) + terminated.existenceConfirmed should be(true) + terminated should be theSameInstanceAs Await.result(f, 10 seconds) system.awaitTermination(10 seconds) system.isTerminated should be(true) } @@ -239,7 +242,7 @@ class ActorSystemSpec extends AkkaSpec(ActorSystemSpec.config) with ImplicitSend intercept[RejectedExecutionException] { system2.registerOnTermination { println("IF YOU SEE THIS THEN THERE'S A BUG HERE") } - }.getMessage should be("Must be called prior to system shutdown.") + }.getMessage should be("ActorSystem already terminated.") } "reliably create waves of actors" in { diff --git a/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala b/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala index 8ab99a5203..a5ea5bf39a 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala @@ -156,7 +156,7 @@ trait ActorRefProvider { * This Future is completed upon termination of this ActorRefProvider, which * is usually initiated by stopping the guardian via ActorSystem.stop(). */ - def terminationFuture: Future[Unit] + def terminationFuture: Future[Terminated] /** * Obtain the address which is to be used within sender references when @@ -463,9 +463,9 @@ private[akka] class LocalActorRefProvider private[akka] ( override val deadLetters: InternalActorRef = _deadLetters.getOrElse((p: ActorPath) ⇒ new DeadLetterActorRef(this, p, eventStream)).apply(rootPath / "deadLetters") - private[this] final val terminationPromise: Promise[Unit] = Promise[Unit]() + private[this] final val terminationPromise: Promise[Terminated] = Promise[Terminated]() - def terminationFuture: Future[Unit] = terminationPromise.future + def terminationFuture: Future[Terminated] = terminationPromise.future /* * generate name for temporary actor refs @@ -481,7 +481,7 @@ private[akka] class LocalActorRefProvider private[akka] ( * receive only Supervise/ChildTerminated system messages or Failure message. */ private[akka] val theOneWhoWalksTheBubblesOfSpaceTime: InternalActorRef = new MinimalActorRef { - val causeOfTermination: Promise[Unit] = Promise[Unit]() + val causeOfTermination: Promise[Terminated] = Promise[Terminated]() val path = rootPath / "bubble-walker" @@ -490,7 +490,7 @@ private[akka] class LocalActorRefProvider private[akka] ( def isWalking = causeOfTermination.future.isCompleted == false override def stop(): Unit = { - causeOfTermination.trySuccess(()) //Idempotent + causeOfTermination.trySuccess(Terminated(provider.rootGuardian)(existenceConfirmed = true, addressTerminated = true)) //Idempotent terminationPromise.tryCompleteWith(causeOfTermination.future) // Signal termination downstream, idempotent } @@ -506,10 +506,10 @@ private[akka] class LocalActorRefProvider private[akka] ( override def sendSystemMessage(message: SystemMessage): Unit = if (isWalking) { message match { - case Failed(child, ex, _) ⇒ + case Failed(child: InternalActorRef, ex, _) ⇒ log.error(ex, s"guardian $child failed, shutting down!") - causeOfTermination.tryFailure(ex) // FIXME: Is the order of this vs the next line important/right? - child.asInstanceOf[InternalActorRef].stop() + causeOfTermination.tryFailure(ex) + child.stop() case Supervise(_, _) ⇒ // TODO register child in some map to keep track of it and enable shutdown after all dead case _: DeathWatchNotification ⇒ stop() case _ ⇒ log.error(s"$this received unexpected system message [$message]") diff --git a/akka-actor/src/main/scala/akka/actor/ActorSystem.scala b/akka-actor/src/main/scala/akka/actor/ActorSystem.scala index 9b3cf166dc..da31b9a28e 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorSystem.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorSystem.scala @@ -427,13 +427,13 @@ abstract class ActorSystem extends ActorRefFactory { * (below which the logging actors reside) and the execute all registered * termination handlers (see [[ActorSystem.registerOnTermination]]). */ - def terminate(): Future[Unit] + def terminate(): Future[Terminated] /** * Returns a Future which will be completed after the ActorSystem has been terminated * and termination hooks have been executed. */ - def whenTerminated: Future[Unit] + def whenTerminated: Future[Terminated] /** * Registers the provided extension and creates its payload, if this extension isn't already registered @@ -639,7 +639,7 @@ private[akka] class ActorSystemImpl(val name: String, applicationConfig: Config, private[this] final val terminationCallbacks = new TerminationCallbacks(provider.terminationFuture)(dispatcher) - def whenTerminated: Future[Unit] = terminationCallbacks.terminationFuture + override def whenTerminated: Future[Terminated] = terminationCallbacks.terminationFuture def lookupRoot: InternalActorRef = provider.rootGuardian def guardian: LocalActorRef = provider.guardian def systemGuardian: LocalActorRef = provider.systemGuardian @@ -672,7 +672,7 @@ private[akka] class ActorSystemImpl(val name: String, applicationConfig: Config, override def shutdown(): Unit = terminate() - override def terminate(): Future[Unit] = { + override def terminate(): Future[Terminated] = { if (!settings.LogDeadLettersDuringShutdown) logDeadLetterListener foreach stop guardian.stop() whenTerminated diff --git a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala index 4655aea6be..0ed665220f 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala @@ -135,7 +135,7 @@ private[akka] class RemoteActorRefProvider( override def rootGuardian: InternalActorRef = local.rootGuardian override def guardian: LocalActorRef = local.guardian override def systemGuardian: LocalActorRef = local.systemGuardian - override def terminationFuture: Future[Unit] = local.terminationFuture + override def terminationFuture: Future[Terminated] = local.terminationFuture override def registerTempActor(actorRef: InternalActorRef, path: ActorPath): Unit = local.registerTempActor(actorRef, path) override def unregisterTempActor(path: ActorPath): Unit = local.unregisterTempActor(path) override def tempPath(): ActorPath = local.tempPath() diff --git a/akka-samples/akka-sample-persistence-java-lambda/src/main/java/sample/persistence/SnapshotExample.java b/akka-samples/akka-sample-persistence-java-lambda/src/main/java/sample/persistence/SnapshotExample.java index 0313fbe454..8e7f4af8cc 100644 --- a/akka-samples/akka-sample-persistence-java-lambda/src/main/java/sample/persistence/SnapshotExample.java +++ b/akka-samples/akka-sample-persistence-java-lambda/src/main/java/sample/persistence/SnapshotExample.java @@ -10,6 +10,8 @@ import akka.actor.Props; import akka.japi.pf.ReceiveBuilder; import akka.persistence.AbstractPersistentActor; import akka.persistence.SnapshotOffer; +import scala.PartialFunction; +import scala.runtime.BoxedUnit; import java.io.Serializable; import java.util.ArrayList;