Terminate remote-deployed actors when system shutdown, see #2552
* Termination hook mechanism in system guardian to be able to notify RemoteSystemDaemon and wait for it to terminate children * Stopping the children will trigger ordinary death watch mechanism, in for example routers * Note bug in RemoteSystemDaemon, watch of children was not done properly, which might have been a memory leak for remote deployed actors.
This commit is contained in:
parent
efe32d1fef
commit
d6e5b0a46b
3 changed files with 59 additions and 6 deletions
|
|
@ -13,6 +13,7 @@ import scala.annotation.tailrec
|
|||
import java.util.concurrent.{ ConcurrentHashMap }
|
||||
import akka.event.LoggingAdapter
|
||||
import scala.concurrent.forkjoin.ThreadLocalRandom
|
||||
import scala.collection.JavaConverters
|
||||
|
||||
/**
|
||||
* Immutable and serializable handle to an actor, which may or may not reside
|
||||
|
|
@ -516,4 +517,11 @@ private[akka] class VirtualPathContainer(
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
def hasChildren: Boolean = !children.isEmpty
|
||||
|
||||
def allChildren: Iterable[ActorRef] = {
|
||||
import scala.collection.JavaConverters._
|
||||
children.values.asScala
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -300,6 +300,23 @@ trait ActorRefFactory {
|
|||
*/
|
||||
private[akka] case class StopChild(child: ActorRef)
|
||||
|
||||
/**
|
||||
* INTERNAL API
|
||||
*/
|
||||
private[akka] object Guardian {
|
||||
/**
|
||||
* For the purpose of orderly shutdown it's possible
|
||||
* to register interest in the termination of systemGuardian
|
||||
* and receive a notification [[akka.actor.Guardian.TerminationHook]]
|
||||
* before systemGuardian is stopped. The registered hook is supposed
|
||||
* to reply with [[akka.actor.Guardian.TerminationHookDone]] and the
|
||||
* systemGuardian will not stop until all registered hooks have replied.
|
||||
*/
|
||||
case object RegisterTerminationHook
|
||||
case object TerminationHook
|
||||
case object TerminationHookDone
|
||||
}
|
||||
|
||||
/**
|
||||
* Local ActorRef provider.
|
||||
*/
|
||||
|
|
@ -374,13 +391,23 @@ class LocalActorRefProvider(
|
|||
}
|
||||
|
||||
private class Guardian(override val supervisorStrategy: SupervisorStrategy, isSystem: Boolean) extends Actor {
|
||||
import Guardian._
|
||||
|
||||
var terminationHooks = Set.empty[ActorRef]
|
||||
|
||||
def receive = {
|
||||
case Terminated(_) ⇒ if (isSystem) eventStream.stopDefaultLoggers(); context.stop(self)
|
||||
case Terminated(_) ⇒ terminationHooks foreach { _ ! TerminationHook }; stopWhenAllTerminationHooksDone()
|
||||
case StopChild(child) ⇒ context.stop(child)
|
||||
case RegisterTerminationHook ⇒ terminationHooks += sender
|
||||
case TerminationHookDone ⇒ terminationHooks -= sender; stopWhenAllTerminationHooksDone()
|
||||
case m ⇒ deadLetters ! DeadLetter(m, sender, self)
|
||||
}
|
||||
|
||||
def stopWhenAllTerminationHooksDone(): Unit = if (terminationHooks.isEmpty) {
|
||||
if (isSystem) eventStream.stopDefaultLoggers()
|
||||
context.stop(self)
|
||||
}
|
||||
|
||||
// guardian MUST NOT lose its children during restart
|
||||
override def preRestart(cause: Throwable, msg: Option[Any]) {}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -24,6 +24,12 @@ private[akka] case class DaemonMsgCreate(props: Props, deploy: Deploy, path: Str
|
|||
private[akka] class RemoteSystemDaemon(system: ActorSystemImpl, _path: ActorPath, _parent: InternalActorRef, _log: LoggingAdapter)
|
||||
extends VirtualPathContainer(system.provider, _path, _parent, _log) {
|
||||
|
||||
import akka.actor.Guardian._
|
||||
|
||||
system.provider.systemGuardian.tell(RegisterTerminationHook, this)
|
||||
|
||||
@volatile private var terminating = false
|
||||
|
||||
/**
|
||||
* Find the longest matching path which we know about and return that ref
|
||||
* (or ask that ref to continue searching if elements are left).
|
||||
|
|
@ -50,7 +56,7 @@ private[akka] class RemoteSystemDaemon(system: ActorSystemImpl, _path: ActorPath
|
|||
}
|
||||
|
||||
override def !(msg: Any)(implicit sender: ActorRef = null): Unit = msg match {
|
||||
case message: DaemonMsg ⇒
|
||||
case message: DaemonMsg if !terminating ⇒
|
||||
log.debug("Received command [{}] to RemoteSystemDaemon on [{}]", message, path.address)
|
||||
message match {
|
||||
case DaemonMsgCreate(props, deploy, path, supervisor) ⇒
|
||||
|
|
@ -63,18 +69,30 @@ private[akka] class RemoteSystemDaemon(system: ActorSystemImpl, _path: ActorPath
|
|||
val actor = system.provider.actorOf(system, props, supervisor.asInstanceOf[InternalActorRef],
|
||||
path, systemService = false, Some(deploy), lookupDeploy = true, async = false)
|
||||
addChild(subpath.mkString("/"), actor)
|
||||
this.sendSystemMessage(Watch(actor, this))
|
||||
actor.sendSystemMessage(Watch(actor, this))
|
||||
case _ ⇒
|
||||
log.error("remote path does not match path from message [{}]", message)
|
||||
}
|
||||
}
|
||||
|
||||
case message: DaemonMsg if terminating ⇒
|
||||
log.debug("Skipping [{}] to RemoteSystemDaemon on [{}] while terminating", message, path.address)
|
||||
|
||||
case Terminated(child: ActorRefWithCell) if child.asInstanceOf[ActorRefScope].isLocal ⇒
|
||||
removeChild(child.path.elements.drop(1).mkString("/"))
|
||||
terminationHookDoneWhenNoChildren()
|
||||
|
||||
case t: Terminated ⇒
|
||||
|
||||
case TerminationHook ⇒
|
||||
terminating = true
|
||||
terminationHookDoneWhenNoChildren()
|
||||
allChildren foreach system.stop
|
||||
|
||||
case unknown ⇒ log.warning("Unknown message {} received by {}", unknown, this)
|
||||
}
|
||||
|
||||
def terminationHookDoneWhenNoChildren(): Unit = if (terminating && !hasChildren)
|
||||
system.provider.systemGuardian.tell(TerminationHookDone, this)
|
||||
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue