Remove race when terminating, see #2552
This commit is contained in:
parent
d6e5b0a46b
commit
a38694bd69
1 changed files with 16 additions and 13 deletions
|
|
@ -10,6 +10,7 @@ import akka.event.LoggingAdapter
|
|||
import akka.dispatch.Watch
|
||||
import akka.actor.ActorRefWithCell
|
||||
import akka.actor.ActorRefScope
|
||||
import akka.util.Switch
|
||||
|
||||
private[akka] sealed trait DaemonMsg
|
||||
private[akka] case class DaemonMsgCreate(props: Props, deploy: Deploy, path: String, supervisor: ActorRef) extends DaemonMsg
|
||||
|
|
@ -28,7 +29,7 @@ private[akka] class RemoteSystemDaemon(system: ActorSystemImpl, _path: ActorPath
|
|||
|
||||
system.provider.systemGuardian.tell(RegisterTerminationHook, this)
|
||||
|
||||
@volatile private var terminating = false
|
||||
private val terminating = new Switch(false)
|
||||
|
||||
/**
|
||||
* Find the longest matching path which we know about and return that ref
|
||||
|
|
@ -56,7 +57,7 @@ private[akka] class RemoteSystemDaemon(system: ActorSystemImpl, _path: ActorPath
|
|||
}
|
||||
|
||||
override def !(msg: Any)(implicit sender: ActorRef = null): Unit = msg match {
|
||||
case message: DaemonMsg if !terminating ⇒
|
||||
case message: DaemonMsg ⇒
|
||||
log.debug("Received command [{}] to RemoteSystemDaemon on [{}]", message, path.address)
|
||||
message match {
|
||||
case DaemonMsgCreate(props, deploy, path, supervisor) ⇒
|
||||
|
|
@ -66,18 +67,19 @@ private[akka] class RemoteSystemDaemon(system: ActorSystemImpl, _path: ActorPath
|
|||
// TODO RK canonicalize path so as not to duplicate it always #1446
|
||||
val subpath = elems.drop(1)
|
||||
val path = this.path / subpath
|
||||
val actor = system.provider.actorOf(system, props, supervisor.asInstanceOf[InternalActorRef],
|
||||
path, systemService = false, Some(deploy), lookupDeploy = true, async = false)
|
||||
addChild(subpath.mkString("/"), actor)
|
||||
actor.sendSystemMessage(Watch(actor, this))
|
||||
terminating.fold(
|
||||
log.debug("Skipping [{}] to RemoteSystemDaemon on [{}] while terminating", message, path.address)) {
|
||||
|
||||
val actor = system.provider.actorOf(system, props, supervisor.asInstanceOf[InternalActorRef],
|
||||
path, systemService = false, Some(deploy), lookupDeploy = true, async = false)
|
||||
addChild(subpath.mkString("/"), actor)
|
||||
actor.sendSystemMessage(Watch(actor, this))
|
||||
}
|
||||
case _ ⇒
|
||||
log.error("remote path does not match path from message [{}]", message)
|
||||
}
|
||||
}
|
||||
|
||||
case message: DaemonMsg if terminating ⇒
|
||||
log.debug("Skipping [{}] to RemoteSystemDaemon on [{}] while terminating", message, path.address)
|
||||
|
||||
case Terminated(child: ActorRefWithCell) if child.asInstanceOf[ActorRefScope].isLocal ⇒
|
||||
removeChild(child.path.elements.drop(1).mkString("/"))
|
||||
terminationHookDoneWhenNoChildren()
|
||||
|
|
@ -85,14 +87,15 @@ private[akka] class RemoteSystemDaemon(system: ActorSystemImpl, _path: ActorPath
|
|||
case t: Terminated ⇒
|
||||
|
||||
case TerminationHook ⇒
|
||||
terminating = true
|
||||
terminationHookDoneWhenNoChildren()
|
||||
allChildren foreach system.stop
|
||||
terminating.switchOn {
|
||||
terminationHookDoneWhenNoChildren()
|
||||
allChildren foreach system.stop
|
||||
}
|
||||
|
||||
case unknown ⇒ log.warning("Unknown message {} received by {}", unknown, this)
|
||||
}
|
||||
|
||||
def terminationHookDoneWhenNoChildren(): Unit = if (terminating && !hasChildren)
|
||||
def terminationHookDoneWhenNoChildren(): Unit = if (terminating.isOn && !hasChildren)
|
||||
system.provider.systemGuardian.tell(TerminationHookDone, this)
|
||||
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue