From d6e5b0a46b4d83c9889f7bc79e198fb5f48e4da3 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Tue, 25 Sep 2012 14:12:04 +0200 Subject: [PATCH] Terminate remote-deployed actors when system shutdown, see #2552 * Termination hook mechanism in system guardian to be able to notify RemoteSystemDaemon and wait for it to terminate children * Stopping the children will trigger ordinary death watch mechanism, in for example routers * Note bug in RemoteSystemDaemon, watch of children was not done properly, which might have been a memory leak for remote deployed actors. --- .../src/main/scala/akka/actor/ActorRef.scala | 8 +++++ .../scala/akka/actor/ActorRefProvider.scala | 33 +++++++++++++++++-- .../main/scala/akka/remote/RemoteDaemon.scala | 24 ++++++++++++-- 3 files changed, 59 insertions(+), 6 deletions(-) diff --git a/akka-actor/src/main/scala/akka/actor/ActorRef.scala b/akka-actor/src/main/scala/akka/actor/ActorRef.scala index 615c4ef92e..b8b2164403 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRef.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRef.scala @@ -13,6 +13,7 @@ import scala.annotation.tailrec import java.util.concurrent.{ ConcurrentHashMap } import akka.event.LoggingAdapter import scala.concurrent.forkjoin.ThreadLocalRandom +import scala.collection.JavaConverters /** * Immutable and serializable handle to an actor, which may or may not reside @@ -516,4 +517,11 @@ private[akka] class VirtualPathContainer( } } } + + def hasChildren: Boolean = !children.isEmpty + + def allChildren: Iterable[ActorRef] = { + import scala.collection.JavaConverters._ + children.values.asScala + } } diff --git a/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala b/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala index 3f96bd839c..5aa6049c74 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala @@ -300,6 +300,23 @@ trait ActorRefFactory { */ private[akka] case class StopChild(child: ActorRef) +/** + * INTERNAL API + */ +private[akka] object Guardian { + /** + * For the purpose of orderly shutdown it's possible + * to register interest in the termination of systemGuardian + * and receive a notification [[akka.actor.Guardian.TerminationHook]] + * before systemGuardian is stopped. The registered hook is supposed + * to reply with [[akka.actor.Guardian.TerminationHookDone]] and the + * systemGuardian will not stop until all registered hooks have replied. + */ + case object RegisterTerminationHook + case object TerminationHook + case object TerminationHookDone +} + /** * Local ActorRef provider. */ @@ -374,11 +391,21 @@ class LocalActorRefProvider( } private class Guardian(override val supervisorStrategy: SupervisorStrategy, isSystem: Boolean) extends Actor { + import Guardian._ + + var terminationHooks = Set.empty[ActorRef] def receive = { - case Terminated(_) ⇒ if (isSystem) eventStream.stopDefaultLoggers(); context.stop(self) - case StopChild(child) ⇒ context.stop(child) - case m ⇒ deadLetters ! DeadLetter(m, sender, self) + case Terminated(_) ⇒ terminationHooks foreach { _ ! TerminationHook }; stopWhenAllTerminationHooksDone() + case StopChild(child) ⇒ context.stop(child) + case RegisterTerminationHook ⇒ terminationHooks += sender + case TerminationHookDone ⇒ terminationHooks -= sender; stopWhenAllTerminationHooksDone() + case m ⇒ deadLetters ! DeadLetter(m, sender, self) + } + + def stopWhenAllTerminationHooksDone(): Unit = if (terminationHooks.isEmpty) { + if (isSystem) eventStream.stopDefaultLoggers() + context.stop(self) } // guardian MUST NOT lose its children during restart diff --git a/akka-remote/src/main/scala/akka/remote/RemoteDaemon.scala b/akka-remote/src/main/scala/akka/remote/RemoteDaemon.scala index 53023687c0..ccdeee439b 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteDaemon.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteDaemon.scala @@ -24,6 +24,12 @@ private[akka] case class DaemonMsgCreate(props: Props, deploy: Deploy, path: Str private[akka] class RemoteSystemDaemon(system: ActorSystemImpl, _path: ActorPath, _parent: InternalActorRef, _log: LoggingAdapter) extends VirtualPathContainer(system.provider, _path, _parent, _log) { + import akka.actor.Guardian._ + + system.provider.systemGuardian.tell(RegisterTerminationHook, this) + + @volatile private var terminating = false + /** * Find the longest matching path which we know about and return that ref * (or ask that ref to continue searching if elements are left). @@ -50,7 +56,7 @@ private[akka] class RemoteSystemDaemon(system: ActorSystemImpl, _path: ActorPath } override def !(msg: Any)(implicit sender: ActorRef = null): Unit = msg match { - case message: DaemonMsg ⇒ + case message: DaemonMsg if !terminating ⇒ log.debug("Received command [{}] to RemoteSystemDaemon on [{}]", message, path.address) message match { case DaemonMsgCreate(props, deploy, path, supervisor) ⇒ @@ -63,18 +69,30 @@ private[akka] class RemoteSystemDaemon(system: ActorSystemImpl, _path: ActorPath val actor = system.provider.actorOf(system, props, supervisor.asInstanceOf[InternalActorRef], path, systemService = false, Some(deploy), lookupDeploy = true, async = false) addChild(subpath.mkString("/"), actor) - this.sendSystemMessage(Watch(actor, this)) + actor.sendSystemMessage(Watch(actor, this)) case _ ⇒ log.error("remote path does not match path from message [{}]", message) } } + case message: DaemonMsg if terminating ⇒ + log.debug("Skipping [{}] to RemoteSystemDaemon on [{}] while terminating", message, path.address) + case Terminated(child: ActorRefWithCell) if child.asInstanceOf[ActorRefScope].isLocal ⇒ removeChild(child.path.elements.drop(1).mkString("/")) + terminationHookDoneWhenNoChildren() case t: Terminated ⇒ - case unknown ⇒ log.warning("Unknown message {} received by {}", unknown, this) + case TerminationHook ⇒ + terminating = true + terminationHookDoneWhenNoChildren() + allChildren foreach system.stop + + case unknown ⇒ log.warning("Unknown message {} received by {}", unknown, this) } + def terminationHookDoneWhenNoChildren(): Unit = if (terminating && !hasChildren) + system.provider.systemGuardian.tell(TerminationHookDone, this) + }