Added '/system/cluster' top-level supervisor for all cluster daemons.

Signed-off-by: Jonas Bonér <jonas@jonasboner.com>
This commit is contained in:
Jonas Bonér 2012-03-02 16:20:30 +01:00
parent a3026b3316
commit 9e5f42c17d

View file

@ -12,6 +12,7 @@ import akka.event.Logging
import akka.dispatch.Await
import akka.pattern.ask
import akka.util._
import akka.util.duration._
import akka.config.ConfigurationException
import java.util.concurrent.atomic.{ AtomicReference, AtomicBoolean }
@ -259,7 +260,21 @@ final class ClusterGossipDaemon(system: ActorSystem, node: Node) extends Actor {
case GossipEnvelope(sender, gossip) node.receive(sender, gossip)
}
override def unhandled(unknown: Any) = log.error("Unknown message sent to cluster daemon [" + unknown + "]")
override def unhandled(unknown: Any) = log.error("Unknown message sent to cluster daemon [{}]", unknown)
}
final class ClusterDaemonSupervisor(system: ActorSystem, node: Node) extends Actor {
val log = Logging(system, "ClusterDaemonSupervisor")
override val supervisorStrategy = SupervisorStrategy.defaultStrategy
private val commands = context.actorOf(Props(new ClusterCommandDaemon(system, node)), "commands")
private val gossip = context.actorOf(Props(new ClusterGossipDaemon(system, node))
.withRouter(RoundRobinRouter(node.clusterSettings.NrOfGossipDaemons)), "gossip")
def receive = {
case unknown log.error("/system/cluster can not respond to messages - received [{}]", unknown)
}
}
/**
@ -317,8 +332,8 @@ class Node(system: ExtendedActorSystem) extends Extension {
private val remote: RemoteActorRefProvider = system.provider.asInstanceOf[RemoteActorRefProvider]
private val remoteSettings = new RemoteSettings(system.settings.config, system.name)
private val clusterSettings = new ClusterSettings(system.settings.config, system.name)
val remoteSettings = new RemoteSettings(system.settings.config, system.name)
val clusterSettings = new ClusterSettings(system.settings.config, system.name)
private val remoteAddress = remote.transport.address
private val vclockNode = VectorClock.Node(remoteAddress.toString)
@ -343,11 +358,16 @@ class Node(system: ExtendedActorSystem) extends Extension {
private val log = Logging(system, "Node")
private val random = SecureRandom.getInstance("SHA1PRNG")
private val clusterCommandDaemon = systemActorOf(
Props(new ClusterCommandDaemon(system, this)), "clusterCommand")
// create superisor for daemons under path "/system/cluster"
private val clusterDaemons = {
val createChild = CreateChild(Props(new ClusterDaemonSupervisor(system, this)), "cluster")
Await.result(system.systemGuardian ? createChild, defaultTimeout.duration) match {
case a: ActorRef a
case e: Exception throw e
}
}
private val clusterGossipDaemon = systemActorOf(
Props(new ClusterGossipDaemon(system, this)).withRouter(RoundRobinRouter(nrOfGossipDaemons)), "clusterGossip")
private val clusterCommandDaemon = system.actorFor("/system/cluster/commands")
private val state = {
val member = Member(remoteAddress, MemberStatus.Joining)
@ -412,11 +432,10 @@ class Node(system: ExtendedActorSystem) extends Extension {
def shutdown() {
// FIXME Cheating for now. Can't just shut down. Node must first gossip an Leave command, wait for Leader to do proper Handoff and then await an Exit command before switching to Removed
if (isRunning.compareAndSet(true, false)) {
log.info("Node [{}] - Shutting down Node and ClusterDaemon...", remoteAddress)
log.info("Node [{}] - Shutting down Node and cluster daemons...", remoteAddress)
gossipCanceller.cancel()
scrutinizeCanceller.cancel()
system.stop(clusterCommandDaemon)
system.stop(clusterGossipDaemon)
system.stop(clusterDaemons)
}
}
@ -747,22 +766,15 @@ class Node(system: ExtendedActorSystem) extends Extension {
// } else None
}
private def systemActorOf(props: Props, name: String): ActorRef = {
Await.result(system.systemGuardian ? CreateChild(props, name), system.settings.CreationTimeout.duration) match {
case ref: ActorRef ref
case ex: Exception throw ex
}
}
/**
* Sets up cluster command connection.
*/
private def clusterCommandConnectionFor(address: Address): ActorRef = system.actorFor(RootActorPath(address) / "system" / "clusterCommand")
private def clusterCommandConnectionFor(address: Address): ActorRef = system.actorFor(RootActorPath(address) / "system" / "cluster" / "commands")
/**
* Sets up cluster gossip connection.
*/
private def clusterGossipConnectionFor(address: Address): ActorRef = system.actorFor(RootActorPath(address) / "system" / "clusterGossip")
private def clusterGossipConnectionFor(address: Address): ActorRef = system.actorFor(RootActorPath(address) / "system" / "cluster" / "gossip")
private def deputyNodes: Iterable[Address] = state.get.latestGossip.members.toIterable map (_.address) drop 1 take nrOfDeputyNodes filter (_ != remoteAddress)