Added support for 'deputy-nodes'.

* Added 'nr-of-deputy-nodes' config option
* Added fetching of current deputy node addresses
* Minor refactorings

Signed-off-by: Jonas Bonér <jonas@jonasboner.com>
This commit is contained in:
Jonas Bonér 2011-01-01 01:50:33 +01:00
parent 83c97d08da
commit e4b1d8609f
4 changed files with 44 additions and 46 deletions

View file

@ -14,6 +14,7 @@ akka {
# the number of gossip daemon actors
nr-of-gossip-daemons = 4
nr-of-deputy-nodes = 3
gossip {
initialDelay = 5s

View file

@ -22,4 +22,5 @@ class ClusterSettings(val config: Config, val systemName: String) {
val GossipInitialDelay = Duration(getMilliseconds("akka.cluster.gossip.initialDelay"), MILLISECONDS)
val GossipFrequency = Duration(getMilliseconds("akka.cluster.gossip.frequency"), MILLISECONDS)
val NrOfGossipDaemons = getInt("akka.cluster.nr-of-gossip-daemons")
val NrOfDeputyNodes = getInt("akka.cluster.nr-of-deputy-nodes")
}

View file

@ -184,7 +184,6 @@ final class ClusterGossipDaemon(system: ActorSystem, node: Node) extends Actor {
}
// FIXME Cluster public API should be an Extension
// FIXME Add cluster Node class and refactor out all non-gossip related stuff out of Node
/**
* This module is responsible for Gossiping cluster information. The abstraction maintains the list of live
@ -228,6 +227,7 @@ case class Node(system: ActorSystemImpl, remote: RemoteActorRefProvider) {
val failureDetector = new AccrualFailureDetector(
system, remoteAddress, clusterSettings.FailureDetectorThreshold, clusterSettings.FailureDetectorMaxSampleSize)
private val nrOfDeputyNodes = clusterSettings.NrOfDeputyNodes
private val nrOfGossipDaemons = clusterSettings.NrOfGossipDaemons
private val nodeToJoin: Option[Address] = clusterSettings.NodeToJoin filter (_ != remoteAddress)
@ -237,8 +237,6 @@ case class Node(system: ActorSystemImpl, remote: RemoteActorRefProvider) {
private val log = Logging(system, "Node")
private val random = SecureRandom.getInstance("SHA1PRNG")
// Is it right to put this guy under the /system path or should we have a top-level /cluster or something else...?
// FIXME should be defined as a router so we get concurrency here
private val clusterCommandDaemon = system.systemActorOf(
Props(new ClusterCommandDaemon(system, this)), "clusterCommand")
@ -259,12 +257,12 @@ case class Node(system: ActorSystemImpl, remote: RemoteActorRefProvider) {
join()
// start periodic gossip to random nodes in cluster
val gossipCanceller = system.scheduler.schedule(gossipInitialDelay, gossipFrequency) {
private val gossipCanceller = system.scheduler.schedule(gossipInitialDelay, gossipFrequency) {
gossip()
}
// start periodic cluster scrutinization (moving nodes condemned by the failure detector to unreachable list)
val scrutinizeCanceller = system.scheduler.schedule(gossipInitialDelay, gossipFrequency) {
private val scrutinizeCanceller = system.scheduler.schedule(gossipInitialDelay, gossipFrequency) {
scrutinize()
}
@ -295,6 +293,13 @@ case class Node(system: ActorSystemImpl, remote: RemoteActorRefProvider) {
*/
def isSingletonCluster: Boolean = isSingletonCluster(state.get)
/**
* Checks if we have a cluster convergence.
*
* @returns Some(convergedGossip) if convergence have been reached and None if not
*/
def convergence: Option[Gossip] = convergence(latestGossip)
/**
* Shuts down all connections to other members, the cluster daemon and the periodic gossip and cleanup tasks.
*/
@ -317,11 +322,37 @@ case class Node(system: ActorSystemImpl, remote: RemoteActorRefProvider) {
}
}
/**
* Registers a listener to subscribe to cluster membership changes.
*/
@tailrec
final def registerListener(listener: MembershipChangeListener) {
val localState = state.get
val newListeners = localState.memberMembershipChangeListeners + listener
val newState = localState copy (memberMembershipChangeListeners = newListeners)
if (!state.compareAndSet(localState, newState)) registerListener(listener) // recur
}
/**
* Unsubscribes to cluster membership changes.
*/
@tailrec
final def unregisterListener(listener: MembershipChangeListener) {
val localState = state.get
val newListeners = localState.memberMembershipChangeListeners - listener
val newState = localState copy (memberMembershipChangeListeners = newListeners)
if (!state.compareAndSet(localState, newState)) unregisterListener(listener) // recur
}
// ========================================================
// ===================== INTERNAL API =====================
// ========================================================
/**
* New node joining.
*/
@tailrec
final def joining(node: Address) {
private[cluster] final def joining(node: Address) {
log.info("Node [{}] - Node [{}] is joining", remoteAddress, node)
failureDetector heartbeat node // update heartbeat in failure detector
@ -350,7 +381,7 @@ case class Node(system: ActorSystemImpl, remote: RemoteActorRefProvider) {
* Receive new gossip.
*/
@tailrec
final def receive(sender: Member, remoteGossip: Gossip) {
private[cluster] final def receive(sender: Member, remoteGossip: Gossip) {
log.debug("Node [{}] - Receiving gossip from [{}]", remoteAddress, sender.address)
failureDetector heartbeat sender.address // update heartbeat in failure detector
@ -390,39 +421,6 @@ case class Node(system: ActorSystemImpl, remote: RemoteActorRefProvider) {
}
}
/**
* Registers a listener to subscribe to cluster membership changes.
*/
@tailrec
final def registerListener(listener: MembershipChangeListener) {
val localState = state.get
val newListeners = localState.memberMembershipChangeListeners + listener
val newState = localState copy (memberMembershipChangeListeners = newListeners)
if (!state.compareAndSet(localState, newState)) registerListener(listener) // recur
}
/**
* Unsubscribes to cluster membership changes.
*/
@tailrec
final def unregisterListener(listener: MembershipChangeListener) {
val localState = state.get
val newListeners = localState.memberMembershipChangeListeners - listener
val newState = localState copy (memberMembershipChangeListeners = newListeners)
if (!state.compareAndSet(localState, newState)) unregisterListener(listener) // recur
}
/**
* Checks if we have a cluster convergence.
*
* @returns Some(convergedGossip) if convergence have been reached and None if not
*/
def convergence: Option[Gossip] = convergence(latestGossip)
// ========================================================
// ===================== INTERNAL API =====================
// ========================================================
/**
* Joins the pre-configured contact point and retrieves current gossip state.
*/
@ -461,7 +459,7 @@ case class Node(system: ActorSystemImpl, remote: RemoteActorRefProvider) {
}
// 3. gossip to a deputy nodes for facilitating partition healing
val deputies = deputyNodesWithoutMyself
val deputies = deputyNodes
if ((!gossipedToDeputy || localMembersSize < 1) && !deputies.isEmpty) {
if (localMembersSize == 0) gossipToRandomNodeOf(deputies)
else {
@ -530,11 +528,8 @@ case class Node(system: ActorSystemImpl, remote: RemoteActorRefProvider) {
private def gossipToRandomNodeOf(addresses: Seq[Address]): Boolean = {
val peers = addresses filter (_ != remoteAddress) // filter out myself
val peer = selectRandomNode(peers)
val localState = state.get
val localGossip = localState.latestGossip
// if connection can't be established/found => ignore it since the failure detector will take care of the potential problem
gossipTo(peer)
deputyNodesWithoutMyself exists (peer == _)
deputyNodes exists (peer == _)
}
/**
@ -607,7 +602,7 @@ case class Node(system: ActorSystemImpl, remote: RemoteActorRefProvider) {
*/
private def clusterGossipConnectionFor(address: Address): ActorRef = system.actorFor(RootActorPath(address) / "system" / "clusterGossip")
private def deputyNodesWithoutMyself: Seq[Address] = Seq.empty[Address] filter (_ != remoteAddress) // FIXME read in deputy nodes from gossip data - now empty seq
private def deputyNodes: Seq[Address] = state.get.latestGossip.members.toSeq map (_.address) drop 1 take nrOfDeputyNodes filter (_ != remoteAddress)
private def selectRandomNode(addresses: Seq[Address]): Address = addresses(random nextInt addresses.size)

View file

@ -29,6 +29,7 @@ class ClusterConfigSpec extends AkkaSpec(
GossipInitialDelay must be(5 seconds)
GossipFrequency must be(1 second)
NrOfGossipDaemons must be(4)
NrOfDeputyNodes must be(3)
}
}
}