From e4b1d8609ff164a27d5af7e4b41befab8d18f409 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jonas=20Bone=CC=81r?= Date: Sat, 1 Jan 2011 01:50:33 +0100 Subject: [PATCH] Added support for 'deputy-nodes'. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Added 'nr-of-deputy-nodes' config option * Added fetching of current deputy node addresses * Minor refactorings Signed-off-by: Jonas Bonér --- .../src/main/resources/reference.conf | 1 + .../scala/akka/cluster/ClusterSettings.scala | 1 + .../src/main/scala/akka/cluster/Node.scala | 87 +++++++++---------- .../akka/cluster/ClusterConfigSpec.scala | 1 + 4 files changed, 44 insertions(+), 46 deletions(-) diff --git a/akka-cluster/src/main/resources/reference.conf b/akka-cluster/src/main/resources/reference.conf index feada91c01..0917909504 100644 --- a/akka-cluster/src/main/resources/reference.conf +++ b/akka-cluster/src/main/resources/reference.conf @@ -14,6 +14,7 @@ akka { # the number of gossip daemon actors nr-of-gossip-daemons = 4 + nr-of-deputy-nodes = 3 gossip { initialDelay = 5s diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala index 9872f3e233..10f0316476 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala @@ -22,4 +22,5 @@ class ClusterSettings(val config: Config, val systemName: String) { val GossipInitialDelay = Duration(getMilliseconds("akka.cluster.gossip.initialDelay"), MILLISECONDS) val GossipFrequency = Duration(getMilliseconds("akka.cluster.gossip.frequency"), MILLISECONDS) val NrOfGossipDaemons = getInt("akka.cluster.nr-of-gossip-daemons") + val NrOfDeputyNodes = getInt("akka.cluster.nr-of-deputy-nodes") } diff --git a/akka-cluster/src/main/scala/akka/cluster/Node.scala b/akka-cluster/src/main/scala/akka/cluster/Node.scala index bcb9d1ecbc..bb8bec4d31 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Node.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Node.scala @@ -184,7 +184,6 @@ final class ClusterGossipDaemon(system: ActorSystem, node: Node) extends Actor { } // FIXME Cluster public API should be an Extension -// FIXME Add cluster Node class and refactor out all non-gossip related stuff out of Node /** * This module is responsible for Gossiping cluster information. The abstraction maintains the list of live @@ -228,6 +227,7 @@ case class Node(system: ActorSystemImpl, remote: RemoteActorRefProvider) { val failureDetector = new AccrualFailureDetector( system, remoteAddress, clusterSettings.FailureDetectorThreshold, clusterSettings.FailureDetectorMaxSampleSize) + private val nrOfDeputyNodes = clusterSettings.NrOfDeputyNodes private val nrOfGossipDaemons = clusterSettings.NrOfGossipDaemons private val nodeToJoin: Option[Address] = clusterSettings.NodeToJoin filter (_ != remoteAddress) @@ -237,8 +237,6 @@ case class Node(system: ActorSystemImpl, remote: RemoteActorRefProvider) { private val log = Logging(system, "Node") private val random = SecureRandom.getInstance("SHA1PRNG") - // Is it right to put this guy under the /system path or should we have a top-level /cluster or something else...? - // FIXME should be defined as a router so we get concurrency here private val clusterCommandDaemon = system.systemActorOf( Props(new ClusterCommandDaemon(system, this)), "clusterCommand") @@ -259,12 +257,12 @@ case class Node(system: ActorSystemImpl, remote: RemoteActorRefProvider) { join() // start periodic gossip to random nodes in cluster - val gossipCanceller = system.scheduler.schedule(gossipInitialDelay, gossipFrequency) { + private val gossipCanceller = system.scheduler.schedule(gossipInitialDelay, gossipFrequency) { gossip() } // start periodic cluster scrutinization (moving nodes condemned by the failure detector to unreachable list) - val scrutinizeCanceller = system.scheduler.schedule(gossipInitialDelay, gossipFrequency) { + private val scrutinizeCanceller = system.scheduler.schedule(gossipInitialDelay, gossipFrequency) { scrutinize() } @@ -295,6 +293,13 @@ case class Node(system: ActorSystemImpl, remote: RemoteActorRefProvider) { */ def isSingletonCluster: Boolean = isSingletonCluster(state.get) + /** + * Checks if we have a cluster convergence. + * + * @returns Some(convergedGossip) if convergence have been reached and None if not + */ + def convergence: Option[Gossip] = convergence(latestGossip) + /** * Shuts down all connections to other members, the cluster daemon and the periodic gossip and cleanup tasks. */ @@ -317,11 +322,37 @@ case class Node(system: ActorSystemImpl, remote: RemoteActorRefProvider) { } } + /** + * Registers a listener to subscribe to cluster membership changes. + */ + @tailrec + final def registerListener(listener: MembershipChangeListener) { + val localState = state.get + val newListeners = localState.memberMembershipChangeListeners + listener + val newState = localState copy (memberMembershipChangeListeners = newListeners) + if (!state.compareAndSet(localState, newState)) registerListener(listener) // recur + } + + /** + * Unsubscribes to cluster membership changes. + */ + @tailrec + final def unregisterListener(listener: MembershipChangeListener) { + val localState = state.get + val newListeners = localState.memberMembershipChangeListeners - listener + val newState = localState copy (memberMembershipChangeListeners = newListeners) + if (!state.compareAndSet(localState, newState)) unregisterListener(listener) // recur + } + + // ======================================================== + // ===================== INTERNAL API ===================== + // ======================================================== + /** * New node joining. */ @tailrec - final def joining(node: Address) { + private[cluster] final def joining(node: Address) { log.info("Node [{}] - Node [{}] is joining", remoteAddress, node) failureDetector heartbeat node // update heartbeat in failure detector @@ -350,7 +381,7 @@ case class Node(system: ActorSystemImpl, remote: RemoteActorRefProvider) { * Receive new gossip. */ @tailrec - final def receive(sender: Member, remoteGossip: Gossip) { + private[cluster] final def receive(sender: Member, remoteGossip: Gossip) { log.debug("Node [{}] - Receiving gossip from [{}]", remoteAddress, sender.address) failureDetector heartbeat sender.address // update heartbeat in failure detector @@ -390,39 +421,6 @@ case class Node(system: ActorSystemImpl, remote: RemoteActorRefProvider) { } } - /** - * Registers a listener to subscribe to cluster membership changes. - */ - @tailrec - final def registerListener(listener: MembershipChangeListener) { - val localState = state.get - val newListeners = localState.memberMembershipChangeListeners + listener - val newState = localState copy (memberMembershipChangeListeners = newListeners) - if (!state.compareAndSet(localState, newState)) registerListener(listener) // recur - } - - /** - * Unsubscribes to cluster membership changes. - */ - @tailrec - final def unregisterListener(listener: MembershipChangeListener) { - val localState = state.get - val newListeners = localState.memberMembershipChangeListeners - listener - val newState = localState copy (memberMembershipChangeListeners = newListeners) - if (!state.compareAndSet(localState, newState)) unregisterListener(listener) // recur - } - - /** - * Checks if we have a cluster convergence. - * - * @returns Some(convergedGossip) if convergence have been reached and None if not - */ - def convergence: Option[Gossip] = convergence(latestGossip) - - // ======================================================== - // ===================== INTERNAL API ===================== - // ======================================================== - /** * Joins the pre-configured contact point and retrieves current gossip state. */ @@ -461,7 +459,7 @@ case class Node(system: ActorSystemImpl, remote: RemoteActorRefProvider) { } // 3. gossip to a deputy nodes for facilitating partition healing - val deputies = deputyNodesWithoutMyself + val deputies = deputyNodes if ((!gossipedToDeputy || localMembersSize < 1) && !deputies.isEmpty) { if (localMembersSize == 0) gossipToRandomNodeOf(deputies) else { @@ -530,11 +528,8 @@ case class Node(system: ActorSystemImpl, remote: RemoteActorRefProvider) { private def gossipToRandomNodeOf(addresses: Seq[Address]): Boolean = { val peers = addresses filter (_ != remoteAddress) // filter out myself val peer = selectRandomNode(peers) - val localState = state.get - val localGossip = localState.latestGossip - // if connection can't be established/found => ignore it since the failure detector will take care of the potential problem gossipTo(peer) - deputyNodesWithoutMyself exists (peer == _) + deputyNodes exists (peer == _) } /** @@ -607,7 +602,7 @@ case class Node(system: ActorSystemImpl, remote: RemoteActorRefProvider) { */ private def clusterGossipConnectionFor(address: Address): ActorRef = system.actorFor(RootActorPath(address) / "system" / "clusterGossip") - private def deputyNodesWithoutMyself: Seq[Address] = Seq.empty[Address] filter (_ != remoteAddress) // FIXME read in deputy nodes from gossip data - now empty seq + private def deputyNodes: Seq[Address] = state.get.latestGossip.members.toSeq map (_.address) drop 1 take nrOfDeputyNodes filter (_ != remoteAddress) private def selectRandomNode(addresses: Seq[Address]): Address = addresses(random nextInt addresses.size) diff --git a/akka-cluster/src/test/scala/akka/cluster/ClusterConfigSpec.scala b/akka-cluster/src/test/scala/akka/cluster/ClusterConfigSpec.scala index 2afbc7efc0..6668044f33 100644 --- a/akka-cluster/src/test/scala/akka/cluster/ClusterConfigSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/ClusterConfigSpec.scala @@ -29,6 +29,7 @@ class ClusterConfigSpec extends AkkaSpec( GossipInitialDelay must be(5 seconds) GossipFrequency must be(1 second) NrOfGossipDaemons must be(4) + NrOfDeputyNodes must be(3) } } }