Split up ClusterDaemon into ClusterGossipDaemon (routed with configurable N instances) and ClusterCommandDaemon (shortly to be an FSM). Removed ConnectionManager crap.

Signed-off-by: Jonas Bonér <jonas@jonasboner.com>
This commit is contained in:
Jonas Bonér 2012-02-20 15:26:12 +01:00
parent 0d022afa5e
commit 2c67a6d50d
5 changed files with 59 additions and 53 deletions

View file

@ -12,6 +12,9 @@ akka {
# leave as empty string if the node should be a singleton cluster
node-to-join = ""
# the number of gossip daemon actors
nr-of-gossip-daemons = 4
gossip {
initialDelay = 5s
frequency = 1s

View file

@ -21,4 +21,5 @@ class ClusterSettings(val config: Config, val systemName: String) {
}
val GossipInitialDelay = Duration(getMilliseconds("akka.cluster.gossip.initialDelay"), MILLISECONDS)
val GossipFrequency = Duration(getMilliseconds("akka.cluster.gossip.frequency"), MILLISECONDS)
val NrOfGossipDaemons = getInt("akka.cluster.nr-of-gossip-daemons")
}

View file

@ -7,6 +7,7 @@ package akka.cluster
import akka.actor._
import akka.actor.Status._
import akka.remote._
import akka.routing._
import akka.event.Logging
import akka.dispatch.Await
import akka.pattern.ask
@ -119,7 +120,7 @@ case class GossipOverview(
case class Gossip(
overview: GossipOverview = GossipOverview(),
members: SortedSet[Member], // sorted set of members with their status, sorted by name
//partitions: Tree[PartitionPath, Node] = Tree.empty[PartitionPath, Node],
//partitions: Tree[PartitionPath, Node] = Tree.empty[PartitionPath, Node], // name/partition service
//pending: Set[PartitioningChange] = Set.empty[PartitioningChange],
meta: Map[String, Array[Byte]] = Map.empty[String, Array[Byte]],
version: VectorClock = VectorClock()) // vector clock version
@ -152,16 +153,32 @@ case class Gossip(
")"
}
// FIXME add FSM trait?
final class ClusterDaemon(system: ActorSystem, gossiper: Gossiper) extends Actor {
val log = Logging(system, "ClusterDaemon")
// FIXME ClusterCommandDaemon with FSM trait
/**
* Single instance. FSM managing the different cluster nodes states.
* Serialized access to Gossiper.
*/
final class ClusterCommandDaemon(system: ActorSystem, gossiper: Gossiper) extends Actor {
val log = Logging(system, "ClusterCommandDaemon")
def receive = {
case Join(address) gossiper.joining(address)
case Leave(address) //gossiper.leaving(address)
case Down(address) //gossiper.downing(address)
case Remove(address) //gossiper.removing(address)
case unknown log.error("Unknown message sent to cluster daemon [" + unknown + "]")
}
}
/**
* Pooled and routed wit N number of configurable instances.
* Concurrent access to Gossiper.
*/
final class ClusterGossipDaemon(system: ActorSystem, gossiper: Gossiper) extends Actor {
val log = Logging(system, "ClusterGossipDaemon")
def receive = {
case GossipEnvelope(sender, gossip) gossiper.receive(sender, gossip)
case Join(address) gossiper.joining(address)
case Leave(address) //gossiper.leaving(address)
case Down(address) //gossiper.downing(address)
case Remove(address) //gossiper.removing(address)
case unknown log.error("Unknown message sent to cluster daemon [" + unknown + "]")
}
}
@ -211,6 +228,7 @@ case class Gossiper(system: ActorSystemImpl, remote: RemoteActorRefProvider) {
val failureDetector = new AccrualFailureDetector(
system, remoteAddress, clusterSettings.FailureDetectorThreshold, clusterSettings.FailureDetectorMaxSampleSize)
private val nrOfGossipDaemons = clusterSettings.NrOfGossipDaemons
private val nodeToJoin: Option[Address] = clusterSettings.NodeToJoin filter (_ != remoteAddress)
private val serialization = remote.serialization
@ -221,7 +239,11 @@ case class Gossiper(system: ActorSystemImpl, remote: RemoteActorRefProvider) {
// Is it right to put this guy under the /system path or should we have a top-level /cluster or something else...?
// FIXME should be defined as a router so we get concurrency here
private val clusterDaemon = system.systemActorOf(Props(new ClusterDaemon(system, this)), "cluster")
private val clusterCommandDaemon = system.systemActorOf(
Props(new ClusterCommandDaemon(system, this)), "clusterCommand")
private val clusterGossipDaemon = system.systemActorOf(
Props(new ClusterGossipDaemon(system, this)).withRouter(RoundRobinRouter(nrOfGossipDaemons)), "clusterGossip")
private val state = {
val member = Member(remoteAddress, MemberStatus.Joining)
@ -229,9 +251,6 @@ case class Gossiper(system: ActorSystemImpl, remote: RemoteActorRefProvider) {
new AtomicReference[State](State(member, gossip))
}
// FIXME manage connections in some other way so we can delete the RemoteConnectionManager (SINCE IT SUCKS!!!)
private val connectionManager = new RemoteConnectionManager(system, remote, failureDetector, Map.empty[Address, ActorRef])
import Versioned.latestVersionOf
log.info("Node [{}] - Starting cluster Gossiper...", remoteAddress)
@ -278,10 +297,12 @@ case class Gossiper(system: ActorSystemImpl, remote: RemoteActorRefProvider) {
if (isRunning.compareAndSet(true, false)) {
log.info("Node [{}] - Shutting down Gossiper and ClusterDaemon...", remoteAddress)
try system.stop(clusterDaemon) finally {
try gossipCanceller.cancel() finally {
try scrutinizeCanceller.cancel() finally {
log.info("Node [{}] - Gossiper and ClusterDaemon shut down successfully", remoteAddress)
try system.stop(clusterCommandDaemon) finally {
try system.stop(clusterGossipDaemon) finally {
try gossipCanceller.cancel() finally {
try scrutinizeCanceller.cancel() finally {
log.info("Node [{}] - Gossiper and ClusterDaemon shut down successfully", remoteAddress)
}
}
}
}
@ -398,11 +419,10 @@ case class Gossiper(system: ActorSystemImpl, remote: RemoteActorRefProvider) {
* Joins the pre-configured contact point and retrieves current gossip state.
*/
private def join() = nodeToJoin foreach { address
setUpConnectionTo(address) foreach { connection
val command = Join(remoteAddress)
log.info("Node [{}] - Sending [{}] to [{}] through connection [{}]", remoteAddress, command, address, connection)
connection ! command
}
val connection = clusterCommandConnectionFor(address)
val command = Join(remoteAddress)
log.info("Node [{}] - Sending [{}] to [{}] through connection [{}]", remoteAddress, command, address, connection)
connection ! command
}
/**
@ -489,10 +509,9 @@ case class Gossiper(system: ActorSystemImpl, remote: RemoteActorRefProvider) {
* Gossips latest gossip to an address.
*/
private def gossipTo(address: Address) {
setUpConnectionTo(address) foreach { connection
log.debug("Node [{}] - Gossiping to [{}]", remoteAddress, connection)
connection ! GossipEnvelope(self, latestGossip)
}
val connection = clusterGossipConnectionFor(address)
log.debug("Node [{}] - Gossiping to [{}]", remoteAddress, connection)
connection ! GossipEnvelope(self, latestGossip)
}
/**
@ -567,37 +586,15 @@ case class Gossiper(system: ActorSystemImpl, remote: RemoteActorRefProvider) {
} else None
}
// FIXME should shuffle list randomly before start traversing to avoid connecting to some member on every member
@tailrec
final private def connectToRandomNodeOf(addresses: Seq[Address]): ActorRef = {
addresses match {
case address :: rest
setUpConnectionTo(address) match {
case Some(connection) connection
case None connectToRandomNodeOf(rest) // recur - if we could not set up a connection - try next address
}
case Nil
throw new RemoteConnectionException(
"Could not establish connection to any of the addresses in the argument list [" + addresses.mkString(", ") + "]")
}
}
/**
* Sets up cluster command connection.
*/
private def clusterCommandConnectionFor(address: Address): ActorRef = system.actorFor(RootActorPath(address) / "system" / "clusterCommand")
/**
* Sets up remote connections to all the addresses in the argument list.
* Sets up cluster gossip connection.
*/
private def setUpConnectionsTo(addresses: Seq[Address]): Seq[Option[ActorRef]] = addresses map setUpConnectionTo
/**
* Sets up remote connection.
*/
private def setUpConnectionTo(address: Address): Option[ActorRef] = Option {
// FIXME no need for using a factory here - remove connectionManager
try connectionManager.putIfAbsent(address, () system.actorFor(RootActorPath(address) / "system" / "cluster")) catch {
case e: Exception null
}
}
private def clusterGossipConnectionFor(address: Address): ActorRef = system.actorFor(RootActorPath(address) / "system" / "clusterGossip")
private def deputyNodesWithoutMyself: Seq[Address] = Seq.empty[Address] filter (_ != remoteAddress) // FIXME read in deputy nodes from gossip data - now empty seq

View file

@ -28,6 +28,7 @@ class ClusterConfigSpec extends AkkaSpec(
NodeToJoin must be(None)
GossipInitialDelay must be(5 seconds)
GossipFrequency must be(1 second)
NrOfGossipDaemons must be(4)
}
}
}

View file

@ -77,6 +77,8 @@ class MembershipChangeListenerSpec extends AkkaSpec("""
latch.await(10.seconds.dilated.toMillis, TimeUnit.MILLISECONDS)
Thread.sleep(10.seconds.dilated.toMillis)
// check cluster convergence
gossiper0.convergence must be('defined)
gossiper1.convergence must be('defined)
@ -119,6 +121,8 @@ class MembershipChangeListenerSpec extends AkkaSpec("""
latch.await(10.seconds.dilated.toMillis, TimeUnit.MILLISECONDS)
Thread.sleep(10.seconds.dilated.toMillis)
// check cluster convergence
gossiper0.convergence must be('defined)
gossiper1.convergence must be('defined)