diff --git a/akka-cluster/src/main/resources/reference.conf b/akka-cluster/src/main/resources/reference.conf index 3df6dd3774..feada91c01 100644 --- a/akka-cluster/src/main/resources/reference.conf +++ b/akka-cluster/src/main/resources/reference.conf @@ -12,6 +12,9 @@ akka { # leave as empty string if the node should be a singleton cluster node-to-join = "" + # the number of gossip daemon actors + nr-of-gossip-daemons = 4 + gossip { initialDelay = 5s frequency = 1s diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala index f4d57bf1f6..9872f3e233 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala @@ -21,4 +21,5 @@ class ClusterSettings(val config: Config, val systemName: String) { } val GossipInitialDelay = Duration(getMilliseconds("akka.cluster.gossip.initialDelay"), MILLISECONDS) val GossipFrequency = Duration(getMilliseconds("akka.cluster.gossip.frequency"), MILLISECONDS) + val NrOfGossipDaemons = getInt("akka.cluster.nr-of-gossip-daemons") } diff --git a/akka-cluster/src/main/scala/akka/cluster/Gossiper.scala b/akka-cluster/src/main/scala/akka/cluster/Gossiper.scala index 313f052e2a..b3e7df27bf 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Gossiper.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Gossiper.scala @@ -7,6 +7,7 @@ package akka.cluster import akka.actor._ import akka.actor.Status._ import akka.remote._ +import akka.routing._ import akka.event.Logging import akka.dispatch.Await import akka.pattern.ask @@ -119,7 +120,7 @@ case class GossipOverview( case class Gossip( overview: GossipOverview = GossipOverview(), members: SortedSet[Member], // sorted set of members with their status, sorted by name - //partitions: Tree[PartitionPath, Node] = Tree.empty[PartitionPath, Node], + //partitions: Tree[PartitionPath, Node] = Tree.empty[PartitionPath, Node], // name/partition service //pending: Set[PartitioningChange] = Set.empty[PartitioningChange], meta: Map[String, Array[Byte]] = Map.empty[String, Array[Byte]], version: VectorClock = VectorClock()) // vector clock version @@ -152,16 +153,32 @@ case class Gossip( ")" } -// FIXME add FSM trait? -final class ClusterDaemon(system: ActorSystem, gossiper: Gossiper) extends Actor { - val log = Logging(system, "ClusterDaemon") +// FIXME ClusterCommandDaemon with FSM trait +/** + * Single instance. FSM managing the different cluster nodes states. + * Serialized access to Gossiper. + */ +final class ClusterCommandDaemon(system: ActorSystem, gossiper: Gossiper) extends Actor { + val log = Logging(system, "ClusterCommandDaemon") + + def receive = { + case Join(address) ⇒ gossiper.joining(address) + case Leave(address) ⇒ //gossiper.leaving(address) + case Down(address) ⇒ //gossiper.downing(address) + case Remove(address) ⇒ //gossiper.removing(address) + case unknown ⇒ log.error("Unknown message sent to cluster daemon [" + unknown + "]") + } +} + +/** + * Pooled and routed wit N number of configurable instances. + * Concurrent access to Gossiper. + */ +final class ClusterGossipDaemon(system: ActorSystem, gossiper: Gossiper) extends Actor { + val log = Logging(system, "ClusterGossipDaemon") def receive = { case GossipEnvelope(sender, gossip) ⇒ gossiper.receive(sender, gossip) - case Join(address) ⇒ gossiper.joining(address) - case Leave(address) ⇒ //gossiper.leaving(address) - case Down(address) ⇒ //gossiper.downing(address) - case Remove(address) ⇒ //gossiper.removing(address) case unknown ⇒ log.error("Unknown message sent to cluster daemon [" + unknown + "]") } } @@ -211,6 +228,7 @@ case class Gossiper(system: ActorSystemImpl, remote: RemoteActorRefProvider) { val failureDetector = new AccrualFailureDetector( system, remoteAddress, clusterSettings.FailureDetectorThreshold, clusterSettings.FailureDetectorMaxSampleSize) + private val nrOfGossipDaemons = clusterSettings.NrOfGossipDaemons private val nodeToJoin: Option[Address] = clusterSettings.NodeToJoin filter (_ != remoteAddress) private val serialization = remote.serialization @@ -221,7 +239,11 @@ case class Gossiper(system: ActorSystemImpl, remote: RemoteActorRefProvider) { // Is it right to put this guy under the /system path or should we have a top-level /cluster or something else...? // FIXME should be defined as a router so we get concurrency here - private val clusterDaemon = system.systemActorOf(Props(new ClusterDaemon(system, this)), "cluster") + private val clusterCommandDaemon = system.systemActorOf( + Props(new ClusterCommandDaemon(system, this)), "clusterCommand") + + private val clusterGossipDaemon = system.systemActorOf( + Props(new ClusterGossipDaemon(system, this)).withRouter(RoundRobinRouter(nrOfGossipDaemons)), "clusterGossip") private val state = { val member = Member(remoteAddress, MemberStatus.Joining) @@ -229,9 +251,6 @@ case class Gossiper(system: ActorSystemImpl, remote: RemoteActorRefProvider) { new AtomicReference[State](State(member, gossip)) } - // FIXME manage connections in some other way so we can delete the RemoteConnectionManager (SINCE IT SUCKS!!!) - private val connectionManager = new RemoteConnectionManager(system, remote, failureDetector, Map.empty[Address, ActorRef]) - import Versioned.latestVersionOf log.info("Node [{}] - Starting cluster Gossiper...", remoteAddress) @@ -278,10 +297,12 @@ case class Gossiper(system: ActorSystemImpl, remote: RemoteActorRefProvider) { if (isRunning.compareAndSet(true, false)) { log.info("Node [{}] - Shutting down Gossiper and ClusterDaemon...", remoteAddress) - try system.stop(clusterDaemon) finally { - try gossipCanceller.cancel() finally { - try scrutinizeCanceller.cancel() finally { - log.info("Node [{}] - Gossiper and ClusterDaemon shut down successfully", remoteAddress) + try system.stop(clusterCommandDaemon) finally { + try system.stop(clusterGossipDaemon) finally { + try gossipCanceller.cancel() finally { + try scrutinizeCanceller.cancel() finally { + log.info("Node [{}] - Gossiper and ClusterDaemon shut down successfully", remoteAddress) + } } } } @@ -398,11 +419,10 @@ case class Gossiper(system: ActorSystemImpl, remote: RemoteActorRefProvider) { * Joins the pre-configured contact point and retrieves current gossip state. */ private def join() = nodeToJoin foreach { address ⇒ - setUpConnectionTo(address) foreach { connection ⇒ - val command = Join(remoteAddress) - log.info("Node [{}] - Sending [{}] to [{}] through connection [{}]", remoteAddress, command, address, connection) - connection ! command - } + val connection = clusterCommandConnectionFor(address) + val command = Join(remoteAddress) + log.info("Node [{}] - Sending [{}] to [{}] through connection [{}]", remoteAddress, command, address, connection) + connection ! command } /** @@ -489,10 +509,9 @@ case class Gossiper(system: ActorSystemImpl, remote: RemoteActorRefProvider) { * Gossips latest gossip to an address. */ private def gossipTo(address: Address) { - setUpConnectionTo(address) foreach { connection ⇒ - log.debug("Node [{}] - Gossiping to [{}]", remoteAddress, connection) - connection ! GossipEnvelope(self, latestGossip) - } + val connection = clusterGossipConnectionFor(address) + log.debug("Node [{}] - Gossiping to [{}]", remoteAddress, connection) + connection ! GossipEnvelope(self, latestGossip) } /** @@ -567,37 +586,15 @@ case class Gossiper(system: ActorSystemImpl, remote: RemoteActorRefProvider) { } else None } - // FIXME should shuffle list randomly before start traversing to avoid connecting to some member on every member - @tailrec - final private def connectToRandomNodeOf(addresses: Seq[Address]): ActorRef = { - addresses match { - - case address :: rest ⇒ - setUpConnectionTo(address) match { - case Some(connection) ⇒ connection - case None ⇒ connectToRandomNodeOf(rest) // recur - if we could not set up a connection - try next address - } - - case Nil ⇒ - throw new RemoteConnectionException( - "Could not establish connection to any of the addresses in the argument list [" + addresses.mkString(", ") + "]") - } - } + /** + * Sets up cluster command connection. + */ + private def clusterCommandConnectionFor(address: Address): ActorRef = system.actorFor(RootActorPath(address) / "system" / "clusterCommand") /** - * Sets up remote connections to all the addresses in the argument list. + * Sets up cluster gossip connection. */ - private def setUpConnectionsTo(addresses: Seq[Address]): Seq[Option[ActorRef]] = addresses map setUpConnectionTo - - /** - * Sets up remote connection. - */ - private def setUpConnectionTo(address: Address): Option[ActorRef] = Option { - // FIXME no need for using a factory here - remove connectionManager - try connectionManager.putIfAbsent(address, () ⇒ system.actorFor(RootActorPath(address) / "system" / "cluster")) catch { - case e: Exception ⇒ null - } - } + private def clusterGossipConnectionFor(address: Address): ActorRef = system.actorFor(RootActorPath(address) / "system" / "clusterGossip") private def deputyNodesWithoutMyself: Seq[Address] = Seq.empty[Address] filter (_ != remoteAddress) // FIXME read in deputy nodes from gossip data - now empty seq diff --git a/akka-cluster/src/test/scala/akka/cluster/ClusterConfigSpec.scala b/akka-cluster/src/test/scala/akka/cluster/ClusterConfigSpec.scala index 78c836f0b5..2afbc7efc0 100644 --- a/akka-cluster/src/test/scala/akka/cluster/ClusterConfigSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/ClusterConfigSpec.scala @@ -28,6 +28,7 @@ class ClusterConfigSpec extends AkkaSpec( NodeToJoin must be(None) GossipInitialDelay must be(5 seconds) GossipFrequency must be(1 second) + NrOfGossipDaemons must be(4) } } } diff --git a/akka-cluster/src/test/scala/akka/cluster/MembershipChangeListenerSpec.scala b/akka-cluster/src/test/scala/akka/cluster/MembershipChangeListenerSpec.scala index e168b7caee..a82bbe4d5e 100644 --- a/akka-cluster/src/test/scala/akka/cluster/MembershipChangeListenerSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/MembershipChangeListenerSpec.scala @@ -77,6 +77,8 @@ class MembershipChangeListenerSpec extends AkkaSpec(""" latch.await(10.seconds.dilated.toMillis, TimeUnit.MILLISECONDS) + Thread.sleep(10.seconds.dilated.toMillis) + // check cluster convergence gossiper0.convergence must be('defined) gossiper1.convergence must be('defined) @@ -119,6 +121,8 @@ class MembershipChangeListenerSpec extends AkkaSpec(""" latch.await(10.seconds.dilated.toMillis, TimeUnit.MILLISECONDS) + Thread.sleep(10.seconds.dilated.toMillis) + // check cluster convergence gossiper0.convergence must be('defined) gossiper1.convergence must be('defined)