diff --git a/akka-cluster/src/main/resources/reference.conf b/akka-cluster/src/main/resources/reference.conf index e097d34f3e..53bf7a41eb 100644 --- a/akka-cluster/src/main/resources/reference.conf +++ b/akka-cluster/src/main/resources/reference.conf @@ -8,9 +8,18 @@ akka { cluster { - seed-nodes = [] - seed-node-connection-timeout = 30s - max-time-to-retry-joining-cluster = 30s + join { + # contact point on the form of "hostname:port" of a node to try to join + # leave as empty string if the node should be a singleton cluster + contact-point = "" + timeout = 30s + max-time-to-retry = 30s + } + + gossip { + initialDelay = 5s + frequency = 1s + } # accrual failure detection config failure-detector { @@ -24,10 +33,5 @@ akka { max-sample-size = 1000 } - - gossip { - initial-delay = 5s - frequency = 1s - } } } diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala index e88c3ae72c..0a0697223b 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala @@ -16,11 +16,16 @@ class ClusterSettings(val config: Config, val systemName: String) { // cluster config section val FailureDetectorThreshold = getInt("akka.cluster.failure-detector.threshold") val FailureDetectorMaxSampleSize = getInt("akka.cluster.failure-detector.max-sample-size") - val SeedNodeConnectionTimeout = Duration(config.getMilliseconds("akka.cluster.seed-node-connection-timeout"), MILLISECONDS) - val MaxTimeToRetryJoiningCluster = Duration(config.getMilliseconds("akka.cluster.max-time-to-retry-joining-cluster"), MILLISECONDS) - val InitialDelayForGossip = Duration(getMilliseconds("akka.cluster.gossip.initial-delay"), MILLISECONDS) - val GossipFrequency = Duration(getMilliseconds("akka.cluster.gossip.frequency"), MILLISECONDS) - val SeedNodes = Set.empty[Address] ++ getStringList("akka.cluster.seed-nodes").asScala.collect { - case AddressExtractor(addr) ⇒ addr + + // join config + val JoinContactPoint: Option[Address] = getString("akka.cluster.join.contact-point") match { + case "" ⇒ None + case AddressExtractor(addr) ⇒ Some(addr) } + val JoinTimeout = Duration(config.getMilliseconds("akka.cluster.join.timeout"), MILLISECONDS) + val JoinMaxTimeToRetry = Duration(config.getMilliseconds("akka.cluster.join.max-time-to-retry"), MILLISECONDS) + + // gossip config + val GossipInitialDelay = Duration(getMilliseconds("akka.cluster.gossip.initialDelay"), MILLISECONDS) + val GossipFrequency = Duration(getMilliseconds("akka.cluster.gossip.frequency"), MILLISECONDS) } diff --git a/akka-cluster/src/main/scala/akka/cluster/Gossiper.scala b/akka-cluster/src/main/scala/akka/cluster/Gossiper.scala index bb15223842..47536ff5d2 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Gossiper.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Gossiper.scala @@ -32,6 +32,8 @@ trait NodeMembershipChangeListener { def memberDisconnected(member: Member) } +// FIXME create Protobuf messages out of all the Gossip stuff - but wait until the prototol is fully stablized. + /** * Base trait for all cluster messages. All ClusterMessage's are serializable. */ @@ -40,14 +42,13 @@ sealed trait ClusterMessage extends Serializable /** * Command to join the cluster. */ -case object JoinCluster extends ClusterMessage +case class Join(node: Address) extends ClusterMessage /** * Represents the state of the cluster; cluster ring membership, ring convergence, meta data - all versioned by a vector clock. */ case class Gossip( - version: VectorClock = VectorClock(), - member: Address, + member: Member, // sorted set of members with their status, sorted by name members: SortedSet[Member] = SortedSet.empty[Member](Ordering.fromLessThan[Member](_.address.toString > _.address.toString)), unavailableMembers: Set[Member] = Set.empty[Member], @@ -55,7 +56,9 @@ case class Gossip( seen: Map[Member, VectorClock] = Map.empty[Member, VectorClock], // for handoff //pendingChanges: Option[Vector[PendingPartitioningChange]] = None, - meta: Option[Map[String, Array[Byte]]] = None) + meta: Option[Map[String, Array[Byte]]] = None, + // vector clock version + version: VectorClock = VectorClock()) extends ClusterMessage // is a serializable cluster message with Versioned // has a vector clock as version @@ -69,13 +72,13 @@ case class Member(address: Address, status: MemberStatus) extends ClusterMessage * * Can be one of: Joining, Up, Leaving, Exiting and Down. */ -sealed trait MemberStatus extends ClusterMessage with Versioned +sealed trait MemberStatus extends ClusterMessage object MemberStatus { - case class Joining(version: VectorClock = VectorClock()) extends MemberStatus - case class Up(version: VectorClock = VectorClock()) extends MemberStatus - case class Leaving(version: VectorClock = VectorClock()) extends MemberStatus - case class Exiting(version: VectorClock = VectorClock()) extends MemberStatus - case class Down(version: VectorClock = VectorClock()) extends MemberStatus + case object Joining extends MemberStatus + case object Up extends MemberStatus + case object Leaving extends MemberStatus + case object Exiting extends MemberStatus + case object Down extends MemberStatus } // sealed trait PendingPartitioningStatus @@ -94,11 +97,9 @@ final class ClusterDaemon(system: ActorSystem, gossiper: Gossiper) extends Actor val log = Logging(system, "ClusterDaemon") def receive = { - case JoinCluster ⇒ sender ! gossiper.latestGossip - case gossip: Gossip ⇒ - gossiper.tell(gossip) - - case unknown ⇒ log.error("Unknown message sent to cluster daemon [" + unknown + "]") + case Join(address) ⇒ sender ! gossiper.latestGossip // TODO use address in Join(address) ? + case gossip: Gossip ⇒ gossiper.tell(gossip) + case unknown ⇒ log.error("Unknown message sent to cluster daemon [" + unknown + "]") } } @@ -113,8 +114,8 @@ final class ClusterDaemon(system: ActorSystem, gossiper: Gossiper) extends Actor *
* 1) Gossip to random live member (if any) * 2) Gossip to random unreachable member with certain probability depending on number of unreachable and live members - * 3) If the member gossiped to at (1) was not seed, or the number of live members is less than number of seeds, - * gossip to random seed with certain probability depending on number of unreachable, seed and live members. + * 3) If the member gossiped to at (1) was not deputy, or the number of live members is less than number of deputy list, + * gossip to random deputy with certain probability depending on number of unreachable, deputy and live members. **/ case class Gossiper(remote: RemoteActorRefProvider, system: ActorSystemImpl) { @@ -132,22 +133,20 @@ case class Gossiper(remote: RemoteActorRefProvider, system: ActorSystemImpl) { val protocol = "akka" // TODO should this be hardcoded? val address = remote.transport.address - val memberFingerprint = address.## - val initialDelayForGossip = clusterSettings.InitialDelayForGossip + + val gossipInitialDelay = clusterSettings.GossipInitialDelay val gossipFrequency = clusterSettings.GossipFrequency - implicit val seedNodeConnectionTimeout = clusterSettings.SeedNodeConnectionTimeout + + implicit val joinTimeout = clusterSettings.JoinTimeout implicit val defaultTimeout = Timeout(remoteSettings.RemoteSystemDaemonAckTimeout) - // seed members - private val seeds: Set[Member] = { - if (clusterSettings.SeedNodes.isEmpty) throw new ConfigurationException( - "At least one seed member must be defined in the configuration [akka.cluster.seed-members]") - else clusterSettings.SeedNodes map (address ⇒ Member(address, MemberStatus.Up())) - } + private val contactPoint: Option[Member] = + clusterSettings.JoinContactPoint filter (_ != address) map (address ⇒ Member(address, MemberStatus.Up)) private val serialization = remote.serialization - private val failureDetector = new AccrualFailureDetector(system, clusterSettings.FailureDetectorThreshold, clusterSettings.FailureDetectorMaxSampleSize) + private val failureDetector = new AccrualFailureDetector( + system, clusterSettings.FailureDetectorThreshold, clusterSettings.FailureDetectorMaxSampleSize) private val isRunning = new AtomicBoolean(true) private val log = Logging(system, "Gossiper") @@ -162,12 +161,12 @@ case class Gossiper(remote: RemoteActorRefProvider, system: ActorSystemImpl) { log.info("Starting cluster Gossiper...") - // join the cluster by connecting to one of the seed members and retrieve current cluster state (Gossip) - joinCluster(clusterSettings.MaxTimeToRetryJoiningCluster fromNow) + // join the cluster by connecting to one of the deputy members and retrieve current cluster state (Gossip) + joinContactPoint(clusterSettings.JoinMaxTimeToRetry fromNow) // start periodic gossip and cluster scrutinization - val initateGossipCanceller = system.scheduler.schedule(initialDelayForGossip, gossipFrequency)(initateGossip()) - val scrutinizeCanceller = system.scheduler.schedule(initialDelayForGossip, gossipFrequency)(scrutinize()) + val initateGossipCanceller = system.scheduler.schedule(gossipInitialDelay, gossipFrequency)(initateGossip()) + val scrutinizeCanceller = system.scheduler.schedule(gossipInitialDelay, gossipFrequency)(scrutinize()) /** * Shuts down all connections to other members, the cluster daemon and the periodic gossip and cleanup tasks. @@ -196,7 +195,7 @@ case class Gossiper(remote: RemoteActorRefProvider, system: ActorSystemImpl) { final def tell(newGossip: Gossip) { val gossipingNode = newGossip.member - failureDetector heartbeat gossipingNode // update heartbeat in failure detector + failureDetector heartbeat gossipingNode.address // update heartbeat in failure detector // FIXME all below here is WRONG - redesign with cluster convergence in mind @@ -224,7 +223,7 @@ case class Gossiper(remote: RemoteActorRefProvider, system: ActorSystemImpl) { // println("---------- WON RACE - setting state") // // create connections for all new members in the latest gossip // (latestAvailableNodes + gossipingNode) foreach { member ⇒ - // setUpConnectionToNode(member) + // setUpConnectionTo(member) // oldState.memberMembershipChangeListeners foreach (_ memberConnected member) // notify listeners about the new members // } // } @@ -267,69 +266,43 @@ case class Gossiper(remote: RemoteActorRefProvider, system: ActorSystemImpl) { } /** - * Sets up remote connections to all the members in the argument list. + * Joins the pre-configured contact point and retrieves current gossip state. */ - private def connectToNodes(members: Seq[Member]) { - members foreach { member ⇒ - setUpConnectionToNode(member) - state.get.memberMembershipChangeListeners foreach (_ memberConnected member) // notify listeners about the new members - } - } - - // FIXME should shuffle list randomly before start traversing to avoid connecting to some member on every member - @tailrec - final private def connectToRandomNodeOf(members: Seq[Member]): ActorRef = { - members match { - case member :: rest ⇒ - setUpConnectionToNode(member) match { - case Some(connection) ⇒ connection - case None ⇒ connectToRandomNodeOf(rest) // recur if - } - case Nil ⇒ - throw new RemoteConnectionException( - "Could not establish connection to any of the members in the argument list") - } - } - - /** - * Joins the cluster by connecting to one of the seed members and retrieve current cluster state (Gossip). - */ - private def joinCluster(deadline: Deadline) { - val seedNodes = seedNodesWithoutMyself // filter out myself - - if (!seedNodes.isEmpty) { // if we have seed members to contact - connectToNodes(seedNodes) - + private def joinContactPoint(deadline: Deadline) { + def tryJoinContactPoint(connection: ActorRef, deadline: Deadline) { try { - log.info("Trying to join cluster through one of the seed members [{}]", seedNodes.mkString(", ")) - - Await.result(connectToRandomNodeOf(seedNodes) ? JoinCluster, seedNodeConnectionTimeout) match { + Await.result(connection ? Join(address), joinTimeout) match { case initialGossip: Gossip ⇒ // just sets/overwrites the state/gossip regardless of what it was before // since it should be treated as the initial state state.set(state.get copy (currentGossip = initialGossip)) - log.debug("Received initial gossip [{}] from seed member", initialGossip) + log.debug("Received initial gossip [{}]", initialGossip) case unknown ⇒ - throw new IllegalStateException("Expected initial gossip from seed, received [" + unknown + "]") + throw new IllegalStateException("Expected initial gossip but received [" + unknown + "]") } } catch { case e: Exception ⇒ - log.error( - "Could not join cluster through any of the seed members - retrying for another {} seconds", - deadline.timeLeft.toSeconds) + log.error("Could not join contact point node - retrying for another {} seconds", deadline.timeLeft.toSeconds) // retry joining the cluster unless // 1. Gossiper is shut down // 2. The connection time window has expired - if (isRunning.get) { - if (deadline.timeLeft.toMillis > 0) joinCluster(deadline) // recur - else throw new RemoteConnectionException( - "Could not join cluster (any of the seed members) - giving up after trying for " + - deadline.time.toSeconds + " seconds") - } + if (isRunning.get && deadline.timeLeft.toMillis > 0) tryJoinContactPoint(connection, deadline) // recur + else throw new RemoteConnectionException( + "Could not join contact point node - giving up after trying for " + deadline.time.toSeconds + " seconds") } } + + contactPoint match { + case None ⇒ log.info("Booting up in singleton cluster mode") + case Some(member) ⇒ + log.info("Trying to join contact point node defined in the configuration [{}]", member) + setUpConnectionTo(member) match { + case None ⇒ log.error("Could not set up connection to join contact point node defined in the configuration [{}]", member) + case Some(connection) ⇒ tryJoinContactPoint(connection, deadline) + } + } } /** @@ -346,7 +319,7 @@ case class Gossiper(remote: RemoteActorRefProvider, system: ActorSystemImpl) { val oldUnavailableMembersSize = oldUnavailableMembers.size // 1. gossip to alive members - val gossipedToSeed = + val shouldGossipToDeputy = if (oldUnavailableMembersSize > 0) gossipToRandomNodeOf(oldMembers) else false @@ -356,12 +329,13 @@ case class Gossiper(remote: RemoteActorRefProvider, system: ActorSystemImpl) { if (random.nextDouble() < probability) gossipToRandomNodeOf(oldUnavailableMembers) } - // 3. gossip to a seed for facilitating partition healing - if ((!gossipedToSeed || oldMembersSize < 1) && (seeds.head != address)) { - if (oldMembersSize == 0) gossipToRandomNodeOf(seeds) + // 3. gossip to a deputy nodes for facilitating partition healing + val deputies = deputyNodesWithoutMyself + if ((!shouldGossipToDeputy || oldMembersSize < 1) && (deputies.head != address)) { + if (oldMembersSize == 0) gossipToRandomNodeOf(deputies) else { val probability = 1.0 / oldMembersSize + oldUnavailableMembersSize - if (random.nextDouble() <= probability) gossipToRandomNodeOf(seeds) + if (random.nextDouble() <= probability) gossipToRandomNodeOf(deputies) } } } @@ -369,18 +343,25 @@ case class Gossiper(remote: RemoteActorRefProvider, system: ActorSystemImpl) { /** * Gossips to a random member in the set of members passed in as argument. * - * @return 'true' if it gossiped to a "seed" member. + * @return 'true' if it gossiped to a "deputy" member. */ - private def gossipToRandomNodeOf(members: Set[Member]): Boolean = { + private def gossipToRandomNodeOf(members: Seq[Member]): Boolean = { val peers = members filter (_.address != address) // filter out myself val peer = selectRandomNode(peers) val oldState = state.get val oldGossip = oldState.currentGossip // if connection can't be established/found => ignore it since the failure detector will take care of the potential problem - setUpConnectionToNode(peer) foreach { _ ! newGossip } - seeds exists (peer == _) + setUpConnectionTo(peer) foreach { _ ! newGossip } + deputyNodesWithoutMyself exists (peer == _) } + /** + * Gossips to a random member in the set of members passed in as argument. + * + * @return 'true' if it gossiped to a "deputy" member. + */ + private def gossipToRandomNodeOf(members: Set[Member]): Boolean = gossipToRandomNodeOf(members.toList) + /** * Scrutinizes the cluster; marks members detected by the failure detector as unavailable, and notifies all listeners * of the change in the cluster membership. @@ -413,7 +394,30 @@ case class Gossiper(remote: RemoteActorRefProvider, system: ActorSystemImpl) { } } - private def setUpConnectionToNode(member: Member): Option[ActorRef] = { + // FIXME should shuffle list randomly before start traversing to avoid connecting to some member on every member + @tailrec + final private def connectToRandomNodeOf(members: Seq[Member]): ActorRef = { + members match { + case member :: rest ⇒ + setUpConnectionTo(member) match { + case Some(connection) ⇒ connection + case None ⇒ connectToRandomNodeOf(rest) // recur if + } + case Nil ⇒ + throw new RemoteConnectionException( + "Could not establish connection to any of the members in the argument list") + } + } + + /** + * Sets up remote connections to all the members in the argument list. + */ + private def setUpConnectionsTo(members: Seq[Member]): Seq[Option[ActorRef]] = members map { setUpConnectionTo(_) } + + /** + * Sets up remote connection. + */ + private def setUpConnectionTo(member: Member): Option[ActorRef] = { val address = member.address try { Some( @@ -425,14 +429,13 @@ case class Gossiper(remote: RemoteActorRefProvider, system: ActorSystemImpl) { } } - private def newGossip(): Gossip = Gossip(member = address) + private def newGossip(): Gossip = Gossip(Member(address, MemberStatus.Joining)) // starts in Joining mode private def incrementVersionForGossip(from: Gossip): Gossip = { - val newVersion = from.version.increment(memberFingerprint, newTimestamp) - from copy (version = newVersion) + from copy (version = from.version.increment(memberFingerprint, newTimestamp)) } - private def seedNodesWithoutMyself: List[Member] = seeds.filter(_.address != address).toList + private def deputyNodesWithoutMyself: Seq[Member] = Seq.empty[Member] filter (_.address != address) // FIXME read in deputy nodes from gossip data - now empty seq - private def selectRandomNode(members: Set[Member]): Member = members.toList(random.nextInt(members.size)) + private def selectRandomNode(members: Seq[Member]): Member = members(random.nextInt(members.size)) } diff --git a/akka-cluster/src/main/scala/akka/cluster/VectorClock.scala b/akka-cluster/src/main/scala/akka/cluster/VectorClock.scala index ef1f1be490..d8d87db75b 100644 --- a/akka-cluster/src/main/scala/akka/cluster/VectorClock.scala +++ b/akka-cluster/src/main/scala/akka/cluster/VectorClock.scala @@ -30,11 +30,11 @@ object Versioned { /** * Representation of a Vector-based clock (counting clock), inspired by Lamport logical clocks. - * {{ + * {{{ * Reference: * 1) Leslie Lamport (1978). "Time, clocks, and the ordering of events in a distributed system". Communications of the ACM 21 (7): 558-565. * 2) Friedemann Mattern (1988). "Virtual Time and Global States of Distributed Systems". Workshop on Parallel and Distributed Algorithms: pp. 215-226 - * }} + * }}} */ case class VectorClock( versions: Vector[VectorClock.Entry] = Vector.empty[VectorClock.Entry], @@ -76,11 +76,11 @@ object VectorClock { /** * The result of comparing two vector clocks. * Either: - * {{ + * {{{ * 1) v1 is BEFORE v2 * 2) v1 is AFTER t2 * 3) v1 happens CONCURRENTLY to v2 - * }} + * }}} */ sealed trait Ordering case object Before extends Ordering @@ -97,11 +97,11 @@ object VectorClock { /** * Compare two vector clocks. The outcomes will be one of the following: * - * {{ + * {{{ * 1. Clock 1 is BEFORE clock 2 if there exists an i such that c1(i) <= c(2) and there does not exist a j such that c1(j) > c2(j). * 2. Clock 1 is CONCURRENT to clock 2 if there exists an i, j such that c1(i) < c2(i) and c1(j) > c2(j). * 3. Clock 1 is AFTER clock 2 otherwise. - * }} + * }}} * * @param v1 The first VectorClock * @param v2 The second VectorClock diff --git a/akka-cluster/src/test/scala/akka/cluster/ClusterConfigSpec.scala b/akka-cluster/src/test/scala/akka/cluster/ClusterConfigSpec.scala index 240d1ad3ff..7f1b26e553 100644 --- a/akka-cluster/src/test/scala/akka/cluster/ClusterConfigSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/ClusterConfigSpec.scala @@ -25,11 +25,13 @@ class ClusterConfigSpec extends AkkaSpec( import settings._ FailureDetectorThreshold must be(8) FailureDetectorMaxSampleSize must be(1000) - SeedNodeConnectionTimeout must be(30 seconds) - MaxTimeToRetryJoiningCluster must be(30 seconds) - InitialDelayForGossip must be(5 seconds) + + JoinContactPoint must be(None) + JoinTimeout must be(30 seconds) + JoinMaxTimeToRetry must be(30 seconds) + + GossipInitialDelay must be(5 seconds) GossipFrequency must be(1 second) - SeedNodes must be(Set()) } } } diff --git a/akka-docs/cluster/cluster.rst b/akka-docs/cluster/cluster.rst index 371cdf2615..c145456552 100644 --- a/akka-docs/cluster/cluster.rst +++ b/akka-docs/cluster/cluster.rst @@ -74,12 +74,13 @@ each node hosting some part of the application. Cluster membership and partitioning of the application are decoupled. A node could be a member of a cluster without hosting any actors. -Single-node Cluster -------------------- + +Singleton Cluster +----------------- If a node does not have a preconfigured contact point to join in the Akka -configuration, then it is considered a single-node cluster and will -automatically transition from ``joining`` to ``up``. Single-node clusters +configuration, then it is considered a singleton cluster (single node cluster) +and will automatically transition from ``joining`` to ``up``. Singleton clusters can later explicitly send a ``Join`` message to another node to form a N-node cluster. It is also possible to link multiple N-node clusters by ``joining`` them.