Removed cluster seed nodes, added 'join.contact-point', changed joining phase, added singleton cluster mode plus misc other changes.

Signed-off-by: Jonas Bonér <jonas@jonasboner.com>
This commit is contained in:
Jonas Bonér 2012-02-07 16:53:49 +01:00
parent b0626f0562
commit 20f74bd284
6 changed files with 137 additions and 122 deletions

View file

@ -8,9 +8,18 @@
akka {
cluster {
seed-nodes = []
seed-node-connection-timeout = 30s
max-time-to-retry-joining-cluster = 30s
join {
# contact point on the form of "hostname:port" of a node to try to join
# leave as empty string if the node should be a singleton cluster
contact-point = ""
timeout = 30s
max-time-to-retry = 30s
}
gossip {
initialDelay = 5s
frequency = 1s
}
# accrual failure detection config
failure-detector {
@ -24,10 +33,5 @@ akka {
max-sample-size = 1000
}
gossip {
initial-delay = 5s
frequency = 1s
}
}
}

View file

@ -16,11 +16,16 @@ class ClusterSettings(val config: Config, val systemName: String) {
// cluster config section
val FailureDetectorThreshold = getInt("akka.cluster.failure-detector.threshold")
val FailureDetectorMaxSampleSize = getInt("akka.cluster.failure-detector.max-sample-size")
val SeedNodeConnectionTimeout = Duration(config.getMilliseconds("akka.cluster.seed-node-connection-timeout"), MILLISECONDS)
val MaxTimeToRetryJoiningCluster = Duration(config.getMilliseconds("akka.cluster.max-time-to-retry-joining-cluster"), MILLISECONDS)
val InitialDelayForGossip = Duration(getMilliseconds("akka.cluster.gossip.initial-delay"), MILLISECONDS)
val GossipFrequency = Duration(getMilliseconds("akka.cluster.gossip.frequency"), MILLISECONDS)
val SeedNodes = Set.empty[Address] ++ getStringList("akka.cluster.seed-nodes").asScala.collect {
case AddressExtractor(addr) addr
// join config
val JoinContactPoint: Option[Address] = getString("akka.cluster.join.contact-point") match {
case "" None
case AddressExtractor(addr) Some(addr)
}
val JoinTimeout = Duration(config.getMilliseconds("akka.cluster.join.timeout"), MILLISECONDS)
val JoinMaxTimeToRetry = Duration(config.getMilliseconds("akka.cluster.join.max-time-to-retry"), MILLISECONDS)
// gossip config
val GossipInitialDelay = Duration(getMilliseconds("akka.cluster.gossip.initialDelay"), MILLISECONDS)
val GossipFrequency = Duration(getMilliseconds("akka.cluster.gossip.frequency"), MILLISECONDS)
}

View file

@ -32,6 +32,8 @@ trait NodeMembershipChangeListener {
def memberDisconnected(member: Member)
}
// FIXME create Protobuf messages out of all the Gossip stuff - but wait until the prototol is fully stablized.
/**
* Base trait for all cluster messages. All ClusterMessage's are serializable.
*/
@ -40,14 +42,13 @@ sealed trait ClusterMessage extends Serializable
/**
* Command to join the cluster.
*/
case object JoinCluster extends ClusterMessage
case class Join(node: Address) extends ClusterMessage
/**
* Represents the state of the cluster; cluster ring membership, ring convergence, meta data - all versioned by a vector clock.
*/
case class Gossip(
version: VectorClock = VectorClock(),
member: Address,
member: Member,
// sorted set of members with their status, sorted by name
members: SortedSet[Member] = SortedSet.empty[Member](Ordering.fromLessThan[Member](_.address.toString > _.address.toString)),
unavailableMembers: Set[Member] = Set.empty[Member],
@ -55,7 +56,9 @@ case class Gossip(
seen: Map[Member, VectorClock] = Map.empty[Member, VectorClock],
// for handoff
//pendingChanges: Option[Vector[PendingPartitioningChange]] = None,
meta: Option[Map[String, Array[Byte]]] = None)
meta: Option[Map[String, Array[Byte]]] = None,
// vector clock version
version: VectorClock = VectorClock())
extends ClusterMessage // is a serializable cluster message
with Versioned // has a vector clock as version
@ -69,13 +72,13 @@ case class Member(address: Address, status: MemberStatus) extends ClusterMessage
*
* Can be one of: Joining, Up, Leaving, Exiting and Down.
*/
sealed trait MemberStatus extends ClusterMessage with Versioned
sealed trait MemberStatus extends ClusterMessage
object MemberStatus {
case class Joining(version: VectorClock = VectorClock()) extends MemberStatus
case class Up(version: VectorClock = VectorClock()) extends MemberStatus
case class Leaving(version: VectorClock = VectorClock()) extends MemberStatus
case class Exiting(version: VectorClock = VectorClock()) extends MemberStatus
case class Down(version: VectorClock = VectorClock()) extends MemberStatus
case object Joining extends MemberStatus
case object Up extends MemberStatus
case object Leaving extends MemberStatus
case object Exiting extends MemberStatus
case object Down extends MemberStatus
}
// sealed trait PendingPartitioningStatus
@ -94,11 +97,9 @@ final class ClusterDaemon(system: ActorSystem, gossiper: Gossiper) extends Actor
val log = Logging(system, "ClusterDaemon")
def receive = {
case JoinCluster sender ! gossiper.latestGossip
case gossip: Gossip
gossiper.tell(gossip)
case unknown log.error("Unknown message sent to cluster daemon [" + unknown + "]")
case Join(address) sender ! gossiper.latestGossip // TODO use address in Join(address) ?
case gossip: Gossip gossiper.tell(gossip)
case unknown log.error("Unknown message sent to cluster daemon [" + unknown + "]")
}
}
@ -113,8 +114,8 @@ final class ClusterDaemon(system: ActorSystem, gossiper: Gossiper) extends Actor
* <pre>
* 1) Gossip to random live member (if any)
* 2) Gossip to random unreachable member with certain probability depending on number of unreachable and live members
* 3) If the member gossiped to at (1) was not seed, or the number of live members is less than number of seeds,
* gossip to random seed with certain probability depending on number of unreachable, seed and live members.
* 3) If the member gossiped to at (1) was not deputy, or the number of live members is less than number of deputy list,
* gossip to random deputy with certain probability depending on number of unreachable, deputy and live members.
* </pre>
*/
case class Gossiper(remote: RemoteActorRefProvider, system: ActorSystemImpl) {
@ -132,22 +133,20 @@ case class Gossiper(remote: RemoteActorRefProvider, system: ActorSystemImpl) {
val protocol = "akka" // TODO should this be hardcoded?
val address = remote.transport.address
val memberFingerprint = address.##
val initialDelayForGossip = clusterSettings.InitialDelayForGossip
val gossipInitialDelay = clusterSettings.GossipInitialDelay
val gossipFrequency = clusterSettings.GossipFrequency
implicit val seedNodeConnectionTimeout = clusterSettings.SeedNodeConnectionTimeout
implicit val joinTimeout = clusterSettings.JoinTimeout
implicit val defaultTimeout = Timeout(remoteSettings.RemoteSystemDaemonAckTimeout)
// seed members
private val seeds: Set[Member] = {
if (clusterSettings.SeedNodes.isEmpty) throw new ConfigurationException(
"At least one seed member must be defined in the configuration [akka.cluster.seed-members]")
else clusterSettings.SeedNodes map (address Member(address, MemberStatus.Up()))
}
private val contactPoint: Option[Member] =
clusterSettings.JoinContactPoint filter (_ != address) map (address Member(address, MemberStatus.Up))
private val serialization = remote.serialization
private val failureDetector = new AccrualFailureDetector(system, clusterSettings.FailureDetectorThreshold, clusterSettings.FailureDetectorMaxSampleSize)
private val failureDetector = new AccrualFailureDetector(
system, clusterSettings.FailureDetectorThreshold, clusterSettings.FailureDetectorMaxSampleSize)
private val isRunning = new AtomicBoolean(true)
private val log = Logging(system, "Gossiper")
@ -162,12 +161,12 @@ case class Gossiper(remote: RemoteActorRefProvider, system: ActorSystemImpl) {
log.info("Starting cluster Gossiper...")
// join the cluster by connecting to one of the seed members and retrieve current cluster state (Gossip)
joinCluster(clusterSettings.MaxTimeToRetryJoiningCluster fromNow)
// join the cluster by connecting to one of the deputy members and retrieve current cluster state (Gossip)
joinContactPoint(clusterSettings.JoinMaxTimeToRetry fromNow)
// start periodic gossip and cluster scrutinization
val initateGossipCanceller = system.scheduler.schedule(initialDelayForGossip, gossipFrequency)(initateGossip())
val scrutinizeCanceller = system.scheduler.schedule(initialDelayForGossip, gossipFrequency)(scrutinize())
val initateGossipCanceller = system.scheduler.schedule(gossipInitialDelay, gossipFrequency)(initateGossip())
val scrutinizeCanceller = system.scheduler.schedule(gossipInitialDelay, gossipFrequency)(scrutinize())
/**
* Shuts down all connections to other members, the cluster daemon and the periodic gossip and cleanup tasks.
@ -196,7 +195,7 @@ case class Gossiper(remote: RemoteActorRefProvider, system: ActorSystemImpl) {
final def tell(newGossip: Gossip) {
val gossipingNode = newGossip.member
failureDetector heartbeat gossipingNode // update heartbeat in failure detector
failureDetector heartbeat gossipingNode.address // update heartbeat in failure detector
// FIXME all below here is WRONG - redesign with cluster convergence in mind
@ -224,7 +223,7 @@ case class Gossiper(remote: RemoteActorRefProvider, system: ActorSystemImpl) {
// println("---------- WON RACE - setting state")
// // create connections for all new members in the latest gossip
// (latestAvailableNodes + gossipingNode) foreach { member
// setUpConnectionToNode(member)
// setUpConnectionTo(member)
// oldState.memberMembershipChangeListeners foreach (_ memberConnected member) // notify listeners about the new members
// }
// }
@ -267,69 +266,43 @@ case class Gossiper(remote: RemoteActorRefProvider, system: ActorSystemImpl) {
}
/**
* Sets up remote connections to all the members in the argument list.
* Joins the pre-configured contact point and retrieves current gossip state.
*/
private def connectToNodes(members: Seq[Member]) {
members foreach { member
setUpConnectionToNode(member)
state.get.memberMembershipChangeListeners foreach (_ memberConnected member) // notify listeners about the new members
}
}
// FIXME should shuffle list randomly before start traversing to avoid connecting to some member on every member
@tailrec
final private def connectToRandomNodeOf(members: Seq[Member]): ActorRef = {
members match {
case member :: rest
setUpConnectionToNode(member) match {
case Some(connection) connection
case None connectToRandomNodeOf(rest) // recur if
}
case Nil
throw new RemoteConnectionException(
"Could not establish connection to any of the members in the argument list")
}
}
/**
* Joins the cluster by connecting to one of the seed members and retrieve current cluster state (Gossip).
*/
private def joinCluster(deadline: Deadline) {
val seedNodes = seedNodesWithoutMyself // filter out myself
if (!seedNodes.isEmpty) { // if we have seed members to contact
connectToNodes(seedNodes)
private def joinContactPoint(deadline: Deadline) {
def tryJoinContactPoint(connection: ActorRef, deadline: Deadline) {
try {
log.info("Trying to join cluster through one of the seed members [{}]", seedNodes.mkString(", "))
Await.result(connectToRandomNodeOf(seedNodes) ? JoinCluster, seedNodeConnectionTimeout) match {
Await.result(connection ? Join(address), joinTimeout) match {
case initialGossip: Gossip
// just sets/overwrites the state/gossip regardless of what it was before
// since it should be treated as the initial state
state.set(state.get copy (currentGossip = initialGossip))
log.debug("Received initial gossip [{}] from seed member", initialGossip)
log.debug("Received initial gossip [{}]", initialGossip)
case unknown
throw new IllegalStateException("Expected initial gossip from seed, received [" + unknown + "]")
throw new IllegalStateException("Expected initial gossip but received [" + unknown + "]")
}
} catch {
case e: Exception
log.error(
"Could not join cluster through any of the seed members - retrying for another {} seconds",
deadline.timeLeft.toSeconds)
log.error("Could not join contact point node - retrying for another {} seconds", deadline.timeLeft.toSeconds)
// retry joining the cluster unless
// 1. Gossiper is shut down
// 2. The connection time window has expired
if (isRunning.get) {
if (deadline.timeLeft.toMillis > 0) joinCluster(deadline) // recur
else throw new RemoteConnectionException(
"Could not join cluster (any of the seed members) - giving up after trying for " +
deadline.time.toSeconds + " seconds")
}
if (isRunning.get && deadline.timeLeft.toMillis > 0) tryJoinContactPoint(connection, deadline) // recur
else throw new RemoteConnectionException(
"Could not join contact point node - giving up after trying for " + deadline.time.toSeconds + " seconds")
}
}
contactPoint match {
case None log.info("Booting up in singleton cluster mode")
case Some(member)
log.info("Trying to join contact point node defined in the configuration [{}]", member)
setUpConnectionTo(member) match {
case None log.error("Could not set up connection to join contact point node defined in the configuration [{}]", member)
case Some(connection) tryJoinContactPoint(connection, deadline)
}
}
}
/**
@ -346,7 +319,7 @@ case class Gossiper(remote: RemoteActorRefProvider, system: ActorSystemImpl) {
val oldUnavailableMembersSize = oldUnavailableMembers.size
// 1. gossip to alive members
val gossipedToSeed =
val shouldGossipToDeputy =
if (oldUnavailableMembersSize > 0) gossipToRandomNodeOf(oldMembers)
else false
@ -356,12 +329,13 @@ case class Gossiper(remote: RemoteActorRefProvider, system: ActorSystemImpl) {
if (random.nextDouble() < probability) gossipToRandomNodeOf(oldUnavailableMembers)
}
// 3. gossip to a seed for facilitating partition healing
if ((!gossipedToSeed || oldMembersSize < 1) && (seeds.head != address)) {
if (oldMembersSize == 0) gossipToRandomNodeOf(seeds)
// 3. gossip to a deputy nodes for facilitating partition healing
val deputies = deputyNodesWithoutMyself
if ((!shouldGossipToDeputy || oldMembersSize < 1) && (deputies.head != address)) {
if (oldMembersSize == 0) gossipToRandomNodeOf(deputies)
else {
val probability = 1.0 / oldMembersSize + oldUnavailableMembersSize
if (random.nextDouble() <= probability) gossipToRandomNodeOf(seeds)
if (random.nextDouble() <= probability) gossipToRandomNodeOf(deputies)
}
}
}
@ -369,18 +343,25 @@ case class Gossiper(remote: RemoteActorRefProvider, system: ActorSystemImpl) {
/**
* Gossips to a random member in the set of members passed in as argument.
*
* @return 'true' if it gossiped to a "seed" member.
* @return 'true' if it gossiped to a "deputy" member.
*/
private def gossipToRandomNodeOf(members: Set[Member]): Boolean = {
private def gossipToRandomNodeOf(members: Seq[Member]): Boolean = {
val peers = members filter (_.address != address) // filter out myself
val peer = selectRandomNode(peers)
val oldState = state.get
val oldGossip = oldState.currentGossip
// if connection can't be established/found => ignore it since the failure detector will take care of the potential problem
setUpConnectionToNode(peer) foreach { _ ! newGossip }
seeds exists (peer == _)
setUpConnectionTo(peer) foreach { _ ! newGossip }
deputyNodesWithoutMyself exists (peer == _)
}
/**
* Gossips to a random member in the set of members passed in as argument.
*
* @return 'true' if it gossiped to a "deputy" member.
*/
private def gossipToRandomNodeOf(members: Set[Member]): Boolean = gossipToRandomNodeOf(members.toList)
/**
* Scrutinizes the cluster; marks members detected by the failure detector as unavailable, and notifies all listeners
* of the change in the cluster membership.
@ -413,7 +394,30 @@ case class Gossiper(remote: RemoteActorRefProvider, system: ActorSystemImpl) {
}
}
private def setUpConnectionToNode(member: Member): Option[ActorRef] = {
// FIXME should shuffle list randomly before start traversing to avoid connecting to some member on every member
@tailrec
final private def connectToRandomNodeOf(members: Seq[Member]): ActorRef = {
members match {
case member :: rest
setUpConnectionTo(member) match {
case Some(connection) connection
case None connectToRandomNodeOf(rest) // recur if
}
case Nil
throw new RemoteConnectionException(
"Could not establish connection to any of the members in the argument list")
}
}
/**
* Sets up remote connections to all the members in the argument list.
*/
private def setUpConnectionsTo(members: Seq[Member]): Seq[Option[ActorRef]] = members map { setUpConnectionTo(_) }
/**
* Sets up remote connection.
*/
private def setUpConnectionTo(member: Member): Option[ActorRef] = {
val address = member.address
try {
Some(
@ -425,14 +429,13 @@ case class Gossiper(remote: RemoteActorRefProvider, system: ActorSystemImpl) {
}
}
private def newGossip(): Gossip = Gossip(member = address)
private def newGossip(): Gossip = Gossip(Member(address, MemberStatus.Joining)) // starts in Joining mode
private def incrementVersionForGossip(from: Gossip): Gossip = {
val newVersion = from.version.increment(memberFingerprint, newTimestamp)
from copy (version = newVersion)
from copy (version = from.version.increment(memberFingerprint, newTimestamp))
}
private def seedNodesWithoutMyself: List[Member] = seeds.filter(_.address != address).toList
private def deputyNodesWithoutMyself: Seq[Member] = Seq.empty[Member] filter (_.address != address) // FIXME read in deputy nodes from gossip data - now empty seq
private def selectRandomNode(members: Set[Member]): Member = members.toList(random.nextInt(members.size))
private def selectRandomNode(members: Seq[Member]): Member = members(random.nextInt(members.size))
}

View file

@ -30,11 +30,11 @@ object Versioned {
/**
* Representation of a Vector-based clock (counting clock), inspired by Lamport logical clocks.
* {{
* {{{
* Reference:
* 1) Leslie Lamport (1978). "Time, clocks, and the ordering of events in a distributed system". Communications of the ACM 21 (7): 558-565.
* 2) Friedemann Mattern (1988). "Virtual Time and Global States of Distributed Systems". Workshop on Parallel and Distributed Algorithms: pp. 215-226
* }}
* }}}
*/
case class VectorClock(
versions: Vector[VectorClock.Entry] = Vector.empty[VectorClock.Entry],
@ -76,11 +76,11 @@ object VectorClock {
/**
* The result of comparing two vector clocks.
* Either:
* {{
* {{{
* 1) v1 is BEFORE v2
* 2) v1 is AFTER t2
* 3) v1 happens CONCURRENTLY to v2
* }}
* }}}
*/
sealed trait Ordering
case object Before extends Ordering
@ -97,11 +97,11 @@ object VectorClock {
/**
* Compare two vector clocks. The outcomes will be one of the following:
* <p/>
* {{
* {{{
* 1. Clock 1 is BEFORE clock 2 if there exists an i such that c1(i) <= c(2) and there does not exist a j such that c1(j) > c2(j).
* 2. Clock 1 is CONCURRENT to clock 2 if there exists an i, j such that c1(i) < c2(i) and c1(j) > c2(j).
* 3. Clock 1 is AFTER clock 2 otherwise.
* }}
* }}}
*
* @param v1 The first VectorClock
* @param v2 The second VectorClock

View file

@ -25,11 +25,13 @@ class ClusterConfigSpec extends AkkaSpec(
import settings._
FailureDetectorThreshold must be(8)
FailureDetectorMaxSampleSize must be(1000)
SeedNodeConnectionTimeout must be(30 seconds)
MaxTimeToRetryJoiningCluster must be(30 seconds)
InitialDelayForGossip must be(5 seconds)
JoinContactPoint must be(None)
JoinTimeout must be(30 seconds)
JoinMaxTimeToRetry must be(30 seconds)
GossipInitialDelay must be(5 seconds)
GossipFrequency must be(1 second)
SeedNodes must be(Set())
}
}
}

View file

@ -74,12 +74,13 @@ each node hosting some part of the application. Cluster membership and
partitioning of the application are decoupled. A node could be a member of a
cluster without hosting any actors.
Single-node Cluster
-------------------
Singleton Cluster
-----------------
If a node does not have a preconfigured contact point to join in the Akka
configuration, then it is considered a single-node cluster and will
automatically transition from ``joining`` to ``up``. Single-node clusters
configuration, then it is considered a singleton cluster (single node cluster)
and will automatically transition from ``joining`` to ``up``. Singleton clusters
can later explicitly send a ``Join`` message to another node to form a N-node
cluster. It is also possible to link multiple N-node clusters by ``joining`` them.