Finalized initial cluster membership and merging of vector clocks and gossips in case of concurrent cluster updates. Plus misc other fixes.

* Finalized initial cluster membership.
* Added merging of vector clocks and gossips in case of concurrent cluster updates.
* Added toString methods to all cluster protocol classes
* Fixed bugs in incrementation of vector clocks
* Added updates of 'seen' table for cluster convergence
* Revamped to use new VectorClock impl
* Refactored Gossip.State

Signed-off-by: Jonas Bonér <jonas@jonasboner.com>
This commit is contained in:
Jonas Bonér 2012-02-14 20:50:12 +01:00
parent 5f537a7d4c
commit a2785bc89e
2 changed files with 195 additions and 118 deletions

View file

@ -145,7 +145,9 @@ class AccrualFailureDetector(system: ActorSystem, val threshold: Int = 8, val ma
val mean = oldState.failureStats.get(connection).getOrElse(FailureStats()).mean
PhiFactor * timestampDiff / mean
}
log.debug("Phi value [{}] and threshold [{}] for connection [{}] ", phi, threshold, connection)
// FIXME sometimes we get "Phi value [Infinity]" fix it
if (phi > 0.0) log.debug("Phi value [{}] and threshold [{}] for connection [{}] ", phi, threshold, connection) // only log if PHI value is starting to get interesting
phi
}

View file

@ -17,7 +17,6 @@ import java.util.concurrent.atomic.{ AtomicReference, AtomicBoolean }
import java.util.concurrent.TimeUnit._
import java.util.concurrent.TimeoutException
import java.security.SecureRandom
import System.{ currentTimeMillis newTimestamp }
import scala.collection.immutable.{ Map, SortedSet }
import scala.annotation.tailrec
@ -104,11 +103,17 @@ object MemberStatus {
// status: PartitioningStatus)
/**
* Represents the overview of the cluster, holds the cluster convergence table and unreachable nodes.
* Represents the overview of the cluster, holds the cluster convergence table and set with unreachable nodes.
*/
case class GossipOverview(
seen: Map[Address, VectorClock] = Map.empty[Address, VectorClock],
unreachable: Set[Address] = Set.empty[Address])
unreachable: Set[Address] = Set.empty[Address]) {
override def toString =
"GossipOverview(seen = [" + seen.mkString(", ") +
"], unreachable = [" + unreachable.mkString(", ") +
"])"
}
/**
* Represents the state of the cluster; cluster ring membership, ring convergence, meta data - all versioned by a vector clock.
@ -121,9 +126,14 @@ case class Gossip(
meta: Map[String, Array[Byte]] = Map.empty[String, Array[Byte]],
version: VectorClock = VectorClock()) // vector clock version
extends ClusterMessage // is a serializable cluster message
with Versioned {
with Versioned[Gossip] {
def addMember(member: Member): Gossip = {
/**
* Increments the version for this 'Node'.
*/
def +(node: VectorClock.Node): Gossip = copy(version = version + node)
def +(member: Member): Gossip = {
if (members contains member) this
else this copy (members = members + member)
}
@ -132,14 +142,19 @@ case class Gossip(
* Marks the gossip as seen by this node (remoteAddress) by updating the address entry in the 'gossip.overview.seen'
* Map with the VectorClock for the new gossip.
*/
def markAsSeenByThisNode(address: Address): Gossip =
def seen(address: Address): Gossip =
this copy (overview = overview copy (seen = overview.seen + (address -> version)))
def incrementVersion(memberFingerprint: Int): Gossip = {
this copy (version = version.increment(memberFingerprint, newTimestamp))
}
override def toString =
"Gossip(" +
"overview = " + overview +
", members = [" + members.mkString(", ") +
"], meta = [" + meta.mkString(", ") +
"], version = " + version +
")"
}
// FIXME add FSM trait?
final class ClusterDaemon(system: ActorSystem, gossiper: Gossiper) extends Actor {
val log = Logging(system, "ClusterDaemon")
@ -153,6 +168,9 @@ final class ClusterDaemon(system: ActorSystem, gossiper: Gossiper) extends Actor
}
}
// FIXME Cluster public API should be an Extension
// FIXME Add cluster Node class and refactor out all non-gossip related stuff out of Gossiper
/**
* This module is responsible for Gossiping cluster information. The abstraction maintains the list of live
* and dead members. Periodically i.e. every 1 second this module chooses a random member and initiates a round
@ -177,19 +195,18 @@ case class Gossiper(system: ActorSystemImpl, remote: RemoteActorRefProvider) {
private case class State(
self: Member,
latestGossip: Gossip,
isSingletonCluster: Boolean = true, // starts as singleton cluster
memberMembershipChangeListeners: Set[MembershipChangeListener] = Set.empty[MembershipChangeListener])
val remoteSettings = new RemoteSettings(system.settings.config, system.name)
val clusterSettings = new ClusterSettings(system.settings.config, system.name)
val remoteAddress = remote.transport.address
val memberFingerprint = remoteAddress.##
val selfNode = VectorClock.Node(remoteAddress.toString)
val gossipInitialDelay = clusterSettings.GossipInitialDelay
val gossipFrequency = clusterSettings.GossipFrequency
implicit val memberOrdering = Ordering.fromLessThan[Member](_.address.toString > _.address.toString)
implicit val memberOrdering = Ordering.fromLessThan[Member](_.address.toString < _.address.toString)
implicit val defaultTimeout = Timeout(remoteSettings.RemoteSystemDaemonAckTimeout)
@ -204,53 +221,38 @@ case class Gossiper(system: ActorSystemImpl, remote: RemoteActorRefProvider) {
private val random = SecureRandom.getInstance("SHA1PRNG")
// Is it right to put this guy under the /system path or should we have a top-level /cluster or something else...?
// FIXME should be defined as a router so we get concurrency here
private val clusterDaemon = system.systemActorOf(Props(new ClusterDaemon(system, this)), "cluster")
private val state = {
val member = Member(remoteAddress, MemberStatus.Joining)
val gossip = Gossip(members = SortedSet.empty[Member] + member)
val gossip = Gossip(members = SortedSet.empty[Member] + member) + selfNode // add me as member and update my vector clock
new AtomicReference[State](State(member, gossip))
}
// FIXME manage connections in some other way so we can delete the RemoteConnectionManager (SINCE IT SUCKS!!!)
private val connectionManager = new RemoteConnectionManager(system, remote, failureDetector, Map.empty[Address, ActorRef])
import Versioned.latestVersionOf
log.info("Node [{}] - Starting cluster Gossiper...", remoteAddress)
// try to join the node defined in the 'akka.cluster.node-to-join' option
nodeToJoin match {
case None switchStatusTo(MemberStatus.Up) // if we are singleton cluster then we are already considered to be UP
case Some(address) join(address)
}
nodeToJoin foreach join
// start periodic gossip and cluster scrutinization
// start periodic gossip to random nodes in cluster
val gossipCanceller = system.scheduler.schedule(gossipInitialDelay, gossipFrequency) {
gossip()
}
// start periodic cluster scrutinization (moving nodes condemned by the failure detector to unreachable list)
val scrutinizeCanceller = system.scheduler.schedule(gossipInitialDelay, gossipFrequency) {
scrutinize()
}
/**
* Shuts down all connections to other members, the cluster daemon and the periodic gossip and cleanup tasks.
*/
def shutdown() {
// FIXME Cheating. Can't just shut down. Node must first gossip an Leave command, wait for Leader to do proper Handoff and then await an Exit command before switching to Removed
if (isRunning.compareAndSet(true, false)) {
log.info("Node [{}] - Shutting down Gossiper and ClusterDaemon", remoteAddress)
try connectionManager.shutdown() finally {
try system.stop(clusterDaemon) finally {
try gossipCanceller.cancel() finally {
try scrutinizeCanceller.cancel() finally {
log.info("Node [{}] - Gossiper is shut down", remoteAddress)
}
}
}
}
}
}
// ======================================================
// ===================== PUBLIC API =====================
// ======================================================
/**
* Latest gossip.
@ -265,52 +267,90 @@ case class Gossiper(system: ActorSystemImpl, remote: RemoteActorRefProvider) {
/**
* Is this node a singleton cluster?
*/
def isSingletonCluster: Boolean = state.get.isSingletonCluster
def isSingletonCluster: Boolean = isSingletonCluster(state.get)
/**
* Shuts down all connections to other members, the cluster daemon and the periodic gossip and cleanup tasks.
*/
def shutdown() {
// FIXME Cheating for now. Can't just shut down. Node must first gossip an Leave command, wait for Leader to do proper Handoff and then await an Exit command before switching to Removed
if (isRunning.compareAndSet(true, false)) {
log.info("Node [{}] - Shutting down Gossiper and ClusterDaemon...", remoteAddress)
try connectionManager.shutdown() finally {
try system.stop(clusterDaemon) finally {
try gossipCanceller.cancel() finally {
try scrutinizeCanceller.cancel() finally {
log.info("Node [{}] - Gossiper and ClusterDaemon shut down successfully", remoteAddress)
}
}
}
}
}
}
/**
* New node joining.
*/
@tailrec
final def joining(node: Address) {
log.debug("Node [{}] - Node [{}] is joining", remoteAddress, node)
val oldState = state.get
val oldGossip = oldState.latestGossip
val oldMembers = oldGossip.members
val newGossip = oldGossip copy (members = oldMembers + Member(node, MemberStatus.Joining)) // add joining node as Joining
val newState = oldState copy (latestGossip = newGossip.incrementVersion(memberFingerprint))
log.info("Node [{}] - Node [{}] is joining", remoteAddress, node)
// FIXME set flag state.isSingletonCluster = false (if true)
val localState = state.get
val localGossip = localState.latestGossip
val localMembers = localGossip.members
if (!state.compareAndSet(oldState, newState)) joining(node) // recur if we failed update
val newMembers = localMembers + Member(node, MemberStatus.Joining) // add joining node as Joining
val newGossip = localGossip copy (members = newMembers)
val versionedGossip = newGossip + selfNode
val seenVersionedGossip = versionedGossip seen remoteAddress
val newState = localState copy (latestGossip = seenVersionedGossip)
if (!state.compareAndSet(localState, newState)) joining(node) // recur if we failed update
}
/**
* Receive new gossip.
*/
@tailrec
final def receive(sender: Member, newGossip: Gossip) {
final def receive(sender: Member, remoteGossip: Gossip) {
log.debug("Node [{}] - Receiving gossip from [{}]", remoteAddress, sender.address)
failureDetector heartbeat sender.address // update heartbeat in failure detector
// FIXME set flag state.isSingletonCluster = false (if true)
// FIXME check for convergence - if we have convergence then trigger the listeners
val oldState = state.get
val oldGossip = oldState.latestGossip
val localState = state.get
val localGossip = localState.latestGossip
val gossip = Versioned
.latestVersionOf(newGossip, oldGossip)
.addMember(self) // needed if newGossip won
.addMember(sender) // needed if oldGossip won
.markAsSeenByThisNode(remoteAddress)
.incrementVersion(memberFingerprint)
val winningGossip =
if (remoteGossip.version <> localGossip.version) {
// concurrent
val mergedGossip = merge(remoteGossip, localGossip)
val versionedMergedGossip = mergedGossip + selfNode
val newState = oldState copy (latestGossip = gossip)
log.debug("Can't establish a causal relationship between \"remote\" gossip [{}] and \"local\" gossip [{}] - merging them into [{}]",
remoteGossip, localGossip, versionedMergedGossip)
versionedMergedGossip
} else if (remoteGossip.version < localGossip.version) {
// local gossip is newer
localGossip
} else {
// remote gossip is newer
remoteGossip
}
val newState = localState copy (latestGossip = winningGossip seen remoteAddress)
// if we won the race then update else try again
if (!state.compareAndSet(oldState, newState)) receive(sender, newGossip) // recur if we fail the update
if (!state.compareAndSet(localState, newState)) receive(sender, remoteGossip) // recur if we fail the update
}
/**
@ -318,10 +358,10 @@ case class Gossiper(system: ActorSystemImpl, remote: RemoteActorRefProvider) {
*/
@tailrec
final def registerListener(listener: MembershipChangeListener) {
val oldState = state.get
val newListeners = oldState.memberMembershipChangeListeners + listener
val newState = oldState copy (memberMembershipChangeListeners = newListeners)
if (!state.compareAndSet(oldState, newState)) registerListener(listener) // recur
val localState = state.get
val newListeners = localState.memberMembershipChangeListeners + listener
val newState = localState copy (memberMembershipChangeListeners = newListeners)
if (!state.compareAndSet(localState, newState)) registerListener(listener) // recur
}
/**
@ -329,12 +369,16 @@ case class Gossiper(system: ActorSystemImpl, remote: RemoteActorRefProvider) {
*/
@tailrec
final def unregisterListener(listener: MembershipChangeListener) {
val oldState = state.get
val newListeners = oldState.memberMembershipChangeListeners - listener
val newState = oldState copy (memberMembershipChangeListeners = newListeners)
if (!state.compareAndSet(oldState, newState)) unregisterListener(listener) // recur
val localState = state.get
val newListeners = localState.memberMembershipChangeListeners - listener
val newState = localState copy (memberMembershipChangeListeners = newListeners)
if (!state.compareAndSet(localState, newState)) unregisterListener(listener) // recur
}
// ========================================================
// ===================== INTERNAL API =====================
// ========================================================
/**
* Joins the pre-configured contact point and retrieves current gossip state.
*/
@ -360,69 +404,90 @@ case class Gossiper(system: ActorSystemImpl, remote: RemoteActorRefProvider) {
* Initates a new round of gossip.
*/
private def gossip() {
val oldState = state.get
if (!oldState.isSingletonCluster) { // do not gossip if we are a singleton cluster
val oldGossip = oldState.latestGossip
val oldMembers = oldGossip.members
val oldMembersSize = oldMembers.size
val localState = state.get
val localGossip = localState.latestGossip
val localMembers = localGossip.members
val oldUnreachableAddresses = oldGossip.overview.unreachable
val oldUnreachableSize = oldUnreachableAddresses.size
if (!isSingletonCluster(localState)) { // do not gossip if we are a singleton cluster
log.debug("Node [{}] - Initiating new round of gossip", remoteAddress)
val localGossip = localState.latestGossip
val localMembers = localGossip.members
val localMembersSize = localMembers.size
val localUnreachableAddresses = localGossip.overview.unreachable
val localUnreachableSize = localUnreachableAddresses.size
// 1. gossip to alive members
val gossipedToDeputy =
if (oldUnreachableSize > 0) gossipToRandomNodeOf(oldMembers.toList map { _.address })
else false
val gossipedToDeputy = gossipToRandomNodeOf(localMembers.toList map { _.address })
// 2. gossip to unreachable members
if (oldUnreachableSize > 0) {
val probability: Double = oldUnreachableSize / (oldMembersSize + 1)
if (random.nextDouble() < probability) gossipToRandomNodeOf(oldUnreachableAddresses.toList)
if (localUnreachableSize > 0) {
val probability: Double = localUnreachableSize / (localMembersSize + 1)
if (random.nextDouble() < probability) gossipToRandomNodeOf(localUnreachableAddresses.toList)
}
// 3. gossip to a deputy nodes for facilitating partition healing
val deputies = deputyNodesWithoutMyself
if ((!gossipedToDeputy || oldMembersSize < 1) && !deputies.isEmpty) {
if (oldMembersSize == 0) gossipToRandomNodeOf(deputies)
if ((!gossipedToDeputy || localMembersSize < 1) && !deputies.isEmpty) {
if (localMembersSize == 0) gossipToRandomNodeOf(deputies)
else {
val probability = 1.0 / oldMembersSize + oldUnreachableSize
val probability = 1.0 / localMembersSize + localUnreachableSize
if (random.nextDouble() <= probability) gossipToRandomNodeOf(deputies)
}
}
}
}
/**
* Merges two Gossip instances including membership tables, meta-data tables and the VectorClock histories.
*/
private def merge(gossip1: Gossip, gossip2: Gossip): Gossip = {
val mergedVClock = gossip1.version merge gossip2.version
val mergedMembers = gossip1.members union gossip2.members
val mergedMeta = gossip1.meta ++ gossip2.meta
Gossip(gossip2.overview, mergedMembers, mergedMeta, mergedVClock)
}
/**
* Switches the state in the FSM.
*/
@tailrec
final private def switchStatusTo(newStatus: MemberStatus) {
log.info("Node [{}] - Switching membership status to [{}]", remoteAddress, newStatus)
val oldState = state.get
val oldSelf = oldState.self
val oldGossip = oldState.latestGossip
val oldMembers = oldGossip.members
val localState = state.get
val localSelf = localState.self
val newSelf = oldSelf copy (status = newStatus)
val localGossip = localState.latestGossip
val localMembers = localGossip.members
val newMembersSet = oldMembers map { member
val newSelf = localSelf copy (status = newStatus)
val newMembersSet = localMembers map { member
if (member.address == remoteAddress) newSelf
else member
}
// ugly crap to work around bug in scala colletions ('val ss: SortedSet[Member] = SortedSet.empty[Member] ++ aSet' does not compile)
val newMembersSortedSet = SortedSet[Member](newMembersSet.toList: _*)
val newGossip = localGossip copy (members = newMembersSortedSet)
val newGossip = oldGossip copy (members = newMembersSortedSet) incrementVersion memberFingerprint
val newState = oldState copy (self = newSelf, latestGossip = newGossip)
if (!state.compareAndSet(oldState, newState)) switchStatusTo(newStatus) // recur if we failed update
val versionedGossip = newGossip + selfNode
val seenVersionedGossip = versionedGossip seen remoteAddress
val newState = localState copy (self = newSelf, latestGossip = seenVersionedGossip)
if (!state.compareAndSet(localState, newState)) switchStatusTo(newStatus) // recur if we failed update
}
/**
* Gossips latest gossip to an address.
*/
private def gossipTo(address: Address) {
setUpConnectionTo(address) foreach { _ ! GossipEnvelope(self, latestGossip) }
setUpConnectionTo(address) foreach { connection
log.debug("Node [{}] - Gossiping to [{}]", remoteAddress, address)
connection ! GossipEnvelope(self, latestGossip)
}
}
/**
@ -433,8 +498,8 @@ case class Gossiper(system: ActorSystemImpl, remote: RemoteActorRefProvider) {
private def gossipToRandomNodeOf(addresses: Seq[Address]): Boolean = {
val peers = addresses filter (_ != remoteAddress) // filter out myself
val peer = selectRandomNode(peers)
val oldState = state.get
val oldGossip = oldState.latestGossip
val localState = state.get
val localGossip = localState.latestGossip
// if connection can't be established/found => ignore it since the failure detector will take care of the potential problem
gossipTo(peer)
deputyNodesWithoutMyself exists (peer == _)
@ -445,32 +510,39 @@ case class Gossiper(system: ActorSystemImpl, remote: RemoteActorRefProvider) {
*/
@tailrec
final private def scrutinize() {
val oldState = state.get
if (!oldState.isSingletonCluster) { // do not scrutinize if we are a singleton cluster
val oldGossip = oldState.latestGossip
val oldOverview = oldGossip.overview
val oldMembers = oldGossip.members
val oldUnreachableAddresses = oldGossip.overview.unreachable
val localState = state.get
val newlyDetectedUnreachableMembers = oldMembers filterNot { member failureDetector.isAvailable(member.address) }
if (!isSingletonCluster(localState)) { // do not scrutinize if we are a singleton cluster
val localGossip = localState.latestGossip
val localOverview = localGossip.overview
val localMembers = localGossip.members
val localUnreachableAddresses = localGossip.overview.unreachable
val newlyDetectedUnreachableMembers = localMembers filterNot { member failureDetector.isAvailable(member.address) }
val newlyDetectedUnreachableAddresses = newlyDetectedUnreachableMembers map { _.address }
if (!newlyDetectedUnreachableAddresses.isEmpty) { // we have newly detected members marked as unavailable
val newMembers = oldMembers diff newlyDetectedUnreachableMembers
val newUnreachableAddresses: Set[Address] = (oldUnreachableAddresses ++ newlyDetectedUnreachableAddresses)
val newOverview = oldOverview copy (unreachable = newUnreachableAddresses)
val newGossip = oldGossip copy (overview = newOverview, members = newMembers) incrementVersion memberFingerprint
val newState = oldState copy (latestGossip = newGossip)
val newMembers = localMembers diff newlyDetectedUnreachableMembers
val newUnreachableAddresses: Set[Address] = (localUnreachableAddresses ++ newlyDetectedUnreachableAddresses)
val newOverview = localOverview copy (unreachable = newUnreachableAddresses)
val newGossip = localGossip copy (overview = newOverview, members = newMembers)
val versionedGossip = newGossip + selfNode
val seenVersionedGossip = versionedGossip seen remoteAddress
val newState = localState copy (latestGossip = seenVersionedGossip)
// if we won the race then update else try again
if (!state.compareAndSet(oldState, newState)) scrutinize() // recur
if (!state.compareAndSet(localState, newState)) scrutinize() // recur
else {
// FIXME should only notify when there is a cluster convergence
// notify listeners on successful update of state
// for {
// deadNode newUnreachableAddresses
// listener oldState.memberMembershipChangeListeners
// listener localState.memberMembershipChangeListeners
// } listener memberDisconnected deadNode
}
}
@ -481,14 +553,16 @@ case class Gossiper(system: ActorSystemImpl, remote: RemoteActorRefProvider) {
@tailrec
final private def connectToRandomNodeOf(addresses: Seq[Address]): ActorRef = {
addresses match {
case address :: rest
setUpConnectionTo(address) match {
case Some(connection) connection
case None connectToRandomNodeOf(rest) // recur if
case None connectToRandomNodeOf(rest) // recur - if we could not set up a connection - try next address
}
case Nil
throw new RemoteConnectionException(
"Could not establish connection to any of the addresses in the argument list")
"Could not establish connection to any of the addresses in the argument list [" + addresses.mkString(", ") + "]")
}
}
@ -500,15 +574,16 @@ case class Gossiper(system: ActorSystemImpl, remote: RemoteActorRefProvider) {
/**
* Sets up remote connection.
*/
private def setUpConnectionTo(address: Address): Option[ActorRef] = {
try {
Some(connectionManager.putIfAbsent(address, () system.actorFor(RootActorPath(address) / "system" / "cluster")))
} catch {
case e: Exception None
private def setUpConnectionTo(address: Address): Option[ActorRef] = Option {
// FIXME no need for using a factory here - remove connectionManager
try connectionManager.putIfAbsent(address, () system.actorFor(RootActorPath(address) / "system" / "cluster")) catch {
case e: Exception null
}
}
private def deputyNodesWithoutMyself: Seq[Address] = Seq.empty[Address] filter (_ != remoteAddress) // FIXME read in deputy nodes from gossip data - now empty seq
private def selectRandomNode(addresses: Seq[Address]): Address = addresses(random nextInt addresses.size)
private def isSingletonCluster(currentState: State): Boolean = currentState.latestGossip.members.size == 1
}