From 9e5f42c17d00ab53a384b992f0170e4c2f1d80d0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jonas=20Bone=CC=81r?= Date: Fri, 2 Mar 2012 16:20:30 +0100 Subject: [PATCH] Added '/system/cluster' top-level supervisor for all cluster daemons. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Jonas Bonér --- .../src/main/scala/akka/cluster/Node.scala | 50 ++++++++++++------- 1 file changed, 31 insertions(+), 19 deletions(-) diff --git a/akka-cluster/src/main/scala/akka/cluster/Node.scala b/akka-cluster/src/main/scala/akka/cluster/Node.scala index dfa5e6efd7..f173bc0a4e 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Node.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Node.scala @@ -12,6 +12,7 @@ import akka.event.Logging import akka.dispatch.Await import akka.pattern.ask import akka.util._ +import akka.util.duration._ import akka.config.ConfigurationException import java.util.concurrent.atomic.{ AtomicReference, AtomicBoolean } @@ -259,7 +260,21 @@ final class ClusterGossipDaemon(system: ActorSystem, node: Node) extends Actor { case GossipEnvelope(sender, gossip) ⇒ node.receive(sender, gossip) } - override def unhandled(unknown: Any) = log.error("Unknown message sent to cluster daemon [" + unknown + "]") + override def unhandled(unknown: Any) = log.error("Unknown message sent to cluster daemon [{}]", unknown) +} + +final class ClusterDaemonSupervisor(system: ActorSystem, node: Node) extends Actor { + val log = Logging(system, "ClusterDaemonSupervisor") + + override val supervisorStrategy = SupervisorStrategy.defaultStrategy + + private val commands = context.actorOf(Props(new ClusterCommandDaemon(system, node)), "commands") + private val gossip = context.actorOf(Props(new ClusterGossipDaemon(system, node)) + .withRouter(RoundRobinRouter(node.clusterSettings.NrOfGossipDaemons)), "gossip") + + def receive = { + case unknown ⇒ log.error("/system/cluster can not respond to messages - received [{}]", unknown) + } } /** @@ -317,8 +332,8 @@ class Node(system: ExtendedActorSystem) extends Extension { private val remote: RemoteActorRefProvider = system.provider.asInstanceOf[RemoteActorRefProvider] - private val remoteSettings = new RemoteSettings(system.settings.config, system.name) - private val clusterSettings = new ClusterSettings(system.settings.config, system.name) + val remoteSettings = new RemoteSettings(system.settings.config, system.name) + val clusterSettings = new ClusterSettings(system.settings.config, system.name) private val remoteAddress = remote.transport.address private val vclockNode = VectorClock.Node(remoteAddress.toString) @@ -343,11 +358,16 @@ class Node(system: ExtendedActorSystem) extends Extension { private val log = Logging(system, "Node") private val random = SecureRandom.getInstance("SHA1PRNG") - private val clusterCommandDaemon = systemActorOf( - Props(new ClusterCommandDaemon(system, this)), "clusterCommand") + // create superisor for daemons under path "/system/cluster" + private val clusterDaemons = { + val createChild = CreateChild(Props(new ClusterDaemonSupervisor(system, this)), "cluster") + Await.result(system.systemGuardian ? createChild, defaultTimeout.duration) match { + case a: ActorRef ⇒ a + case e: Exception ⇒ throw e + } + } - private val clusterGossipDaemon = systemActorOf( - Props(new ClusterGossipDaemon(system, this)).withRouter(RoundRobinRouter(nrOfGossipDaemons)), "clusterGossip") + private val clusterCommandDaemon = system.actorFor("/system/cluster/commands") private val state = { val member = Member(remoteAddress, MemberStatus.Joining) @@ -412,11 +432,10 @@ class Node(system: ExtendedActorSystem) extends Extension { def shutdown() { // FIXME Cheating for now. Can't just shut down. Node must first gossip an Leave command, wait for Leader to do proper Handoff and then await an Exit command before switching to Removed if (isRunning.compareAndSet(true, false)) { - log.info("Node [{}] - Shutting down Node and ClusterDaemon...", remoteAddress) + log.info("Node [{}] - Shutting down Node and cluster daemons...", remoteAddress) gossipCanceller.cancel() scrutinizeCanceller.cancel() - system.stop(clusterCommandDaemon) - system.stop(clusterGossipDaemon) + system.stop(clusterDaemons) } } @@ -747,22 +766,15 @@ class Node(system: ExtendedActorSystem) extends Extension { // } else None } - private def systemActorOf(props: Props, name: String): ActorRef = { - Await.result(system.systemGuardian ? CreateChild(props, name), system.settings.CreationTimeout.duration) match { - case ref: ActorRef ⇒ ref - case ex: Exception ⇒ throw ex - } - } - /** * Sets up cluster command connection. */ - private def clusterCommandConnectionFor(address: Address): ActorRef = system.actorFor(RootActorPath(address) / "system" / "clusterCommand") + private def clusterCommandConnectionFor(address: Address): ActorRef = system.actorFor(RootActorPath(address) / "system" / "cluster" / "commands") /** * Sets up cluster gossip connection. */ - private def clusterGossipConnectionFor(address: Address): ActorRef = system.actorFor(RootActorPath(address) / "system" / "clusterGossip") + private def clusterGossipConnectionFor(address: Address): ActorRef = system.actorFor(RootActorPath(address) / "system" / "cluster" / "gossip") private def deputyNodes: Iterable[Address] = state.get.latestGossip.members.toIterable map (_.address) drop 1 take nrOfDeputyNodes filter (_ != remoteAddress)