diff --git a/akka-cluster/src/main/scala/akka/cluster/Node.scala b/akka-cluster/src/main/scala/akka/cluster/Node.scala index dfa5e6efd7..f173bc0a4e 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Node.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Node.scala @@ -12,6 +12,7 @@ import akka.event.Logging import akka.dispatch.Await import akka.pattern.ask import akka.util._ +import akka.util.duration._ import akka.config.ConfigurationException import java.util.concurrent.atomic.{ AtomicReference, AtomicBoolean } @@ -259,7 +260,21 @@ final class ClusterGossipDaemon(system: ActorSystem, node: Node) extends Actor { case GossipEnvelope(sender, gossip) ⇒ node.receive(sender, gossip) } - override def unhandled(unknown: Any) = log.error("Unknown message sent to cluster daemon [" + unknown + "]") + override def unhandled(unknown: Any) = log.error("Unknown message sent to cluster daemon [{}]", unknown) +} + +final class ClusterDaemonSupervisor(system: ActorSystem, node: Node) extends Actor { + val log = Logging(system, "ClusterDaemonSupervisor") + + override val supervisorStrategy = SupervisorStrategy.defaultStrategy + + private val commands = context.actorOf(Props(new ClusterCommandDaemon(system, node)), "commands") + private val gossip = context.actorOf(Props(new ClusterGossipDaemon(system, node)) + .withRouter(RoundRobinRouter(node.clusterSettings.NrOfGossipDaemons)), "gossip") + + def receive = { + case unknown ⇒ log.error("/system/cluster can not respond to messages - received [{}]", unknown) + } } /** @@ -317,8 +332,8 @@ class Node(system: ExtendedActorSystem) extends Extension { private val remote: RemoteActorRefProvider = system.provider.asInstanceOf[RemoteActorRefProvider] - private val remoteSettings = new RemoteSettings(system.settings.config, system.name) - private val clusterSettings = new ClusterSettings(system.settings.config, system.name) + val remoteSettings = new RemoteSettings(system.settings.config, system.name) + val clusterSettings = new ClusterSettings(system.settings.config, system.name) private val remoteAddress = remote.transport.address private val vclockNode = VectorClock.Node(remoteAddress.toString) @@ -343,11 +358,16 @@ class Node(system: ExtendedActorSystem) extends Extension { private val log = Logging(system, "Node") private val random = SecureRandom.getInstance("SHA1PRNG") - private val clusterCommandDaemon = systemActorOf( - Props(new ClusterCommandDaemon(system, this)), "clusterCommand") + // create superisor for daemons under path "/system/cluster" + private val clusterDaemons = { + val createChild = CreateChild(Props(new ClusterDaemonSupervisor(system, this)), "cluster") + Await.result(system.systemGuardian ? createChild, defaultTimeout.duration) match { + case a: ActorRef ⇒ a + case e: Exception ⇒ throw e + } + } - private val clusterGossipDaemon = systemActorOf( - Props(new ClusterGossipDaemon(system, this)).withRouter(RoundRobinRouter(nrOfGossipDaemons)), "clusterGossip") + private val clusterCommandDaemon = system.actorFor("/system/cluster/commands") private val state = { val member = Member(remoteAddress, MemberStatus.Joining) @@ -412,11 +432,10 @@ class Node(system: ExtendedActorSystem) extends Extension { def shutdown() { // FIXME Cheating for now. Can't just shut down. Node must first gossip an Leave command, wait for Leader to do proper Handoff and then await an Exit command before switching to Removed if (isRunning.compareAndSet(true, false)) { - log.info("Node [{}] - Shutting down Node and ClusterDaemon...", remoteAddress) + log.info("Node [{}] - Shutting down Node and cluster daemons...", remoteAddress) gossipCanceller.cancel() scrutinizeCanceller.cancel() - system.stop(clusterCommandDaemon) - system.stop(clusterGossipDaemon) + system.stop(clusterDaemons) } } @@ -747,22 +766,15 @@ class Node(system: ExtendedActorSystem) extends Extension { // } else None } - private def systemActorOf(props: Props, name: String): ActorRef = { - Await.result(system.systemGuardian ? CreateChild(props, name), system.settings.CreationTimeout.duration) match { - case ref: ActorRef ⇒ ref - case ex: Exception ⇒ throw ex - } - } - /** * Sets up cluster command connection. */ - private def clusterCommandConnectionFor(address: Address): ActorRef = system.actorFor(RootActorPath(address) / "system" / "clusterCommand") + private def clusterCommandConnectionFor(address: Address): ActorRef = system.actorFor(RootActorPath(address) / "system" / "cluster" / "commands") /** * Sets up cluster gossip connection. */ - private def clusterGossipConnectionFor(address: Address): ActorRef = system.actorFor(RootActorPath(address) / "system" / "clusterGossip") + private def clusterGossipConnectionFor(address: Address): ActorRef = system.actorFor(RootActorPath(address) / "system" / "cluster" / "gossip") private def deputyNodes: Iterable[Address] = state.get.latestGossip.members.toIterable map (_.address) drop 1 take nrOfDeputyNodes filter (_ != remoteAddress)