Merged with master

This commit is contained in:
Jonas Bonér 2012-06-14 16:21:03 +02:00
commit f74c96b424
19 changed files with 604 additions and 299 deletions

View file

@ -14,19 +14,19 @@ import akka.pattern.ask
import akka.util._
import akka.util.duration._
import akka.ConfigurationException
import java.util.concurrent.atomic.{ AtomicReference, AtomicBoolean }
import java.util.concurrent.TimeUnit._
import java.util.concurrent.TimeoutException
import akka.jsr166y.ThreadLocalRandom
import java.lang.management.ManagementFactory
import java.io.Closeable
import javax.management._
import scala.collection.immutable.{ Map, SortedSet }
import scala.annotation.tailrec
import com.google.protobuf.ByteString
import akka.util.internal.HashedWheelTimer
import akka.dispatch.MonitorableThreadFactory
import MemberStatus._
/**
* Interface for membership change listener.
@ -44,6 +44,8 @@ trait MetaDataChangeListener {
/**
* Base trait for all cluster messages. All ClusterMessage's are serializable.
*
* FIXME Protobuf all ClusterMessages
*/
sealed trait ClusterMessage extends Serializable
@ -88,6 +90,7 @@ object ClusterLeaderAction {
/**
* Represents the address and the current status of a cluster member node.
*
*/
class Member(val address: Address, val status: MemberStatus) extends ClusterMessage {
override def hashCode = address.##
@ -100,7 +103,6 @@ class Member(val address: Address, val status: MemberStatus) extends ClusterMess
* Factory and Utility module for Member instances.
*/
object Member {
import MemberStatus._
/**
* Sort Address by host and port
@ -144,14 +146,20 @@ object Member {
/**
* Envelope adding a sender address to the gossip.
*/
case class GossipEnvelope(sender: Member, gossip: Gossip) extends ClusterMessage
case class GossipEnvelope(from: Address, gossip: Gossip) extends ClusterMessage
/**
* Defines the current status of a cluster member node
*
* Can be one of: Joining, Up, Leaving, Exiting and Down.
*/
sealed trait MemberStatus extends ClusterMessage
sealed trait MemberStatus extends ClusterMessage {
/**
* Using the same notion for 'unavailable' as 'non-convergence': DOWN and REMOVED.
*/
def isUnavailable: Boolean = this == Down || this == Removed
}
object MemberStatus {
case object Joining extends MemberStatus
case object Up extends MemberStatus
@ -159,11 +167,6 @@ object MemberStatus {
case object Exiting extends MemberStatus
case object Down extends MemberStatus
case object Removed extends MemberStatus
/**
* Using the same notion for 'unavailable' as 'non-convergence': DOWN and REMOVED.
*/
def isUnavailable(status: MemberStatus): Boolean = status == MemberStatus.Down || status == MemberStatus.Removed
}
/**
@ -173,16 +176,42 @@ case class GossipOverview(
seen: Map[Address, VectorClock] = Map.empty[Address, VectorClock],
unreachable: Set[Member] = Set.empty[Member]) {
// FIXME document when nodes are put in 'unreachable' set and removed from 'members'
override def toString =
"GossipOverview(seen = [" + seen.mkString(", ") +
"], unreachable = [" + unreachable.mkString(", ") +
"])"
}
object Gossip {
val emptyMembers: SortedSet[Member] = SortedSet.empty
}
/**
* Represents the state of the cluster; cluster ring membership, ring convergence, meta data - all versioned by a vector clock.
* Represents the state of the cluster; cluster ring membership, ring convergence, meta data -
* all versioned by a vector clock.
*
* When a node is joining the Member, with status Joining, is added to `members`.
* If the joining node was downed it is moved from `overview.unreachable` (status Down)
* to `members` (status Joining). It cannot rejoin if not first downed.
*
* When convergence is reached the leader change status of `members` from Joining
* to Up.
*
* When failure detector consider a node as unavailble it will be moved from
* `members` to `overview.unreachable`.
*
* When a node is downed, either manually or automatically, its status is changed to Down.
* It is also removed from `overview.seen` table.
* The node will reside as Down in the `overview.unreachable` set until joining
* again and it will then go through the normal joining procedure.
*
* When a Gossip is received the version (vector clock) is used to determine if the
* received Gossip is newer or older than the current local Gossip. The received Gossip
* and local Gossip is merged in case of conflicting version, i.e. vector clocks without
* same history. When merged the seen table is cleared.
*
* TODO document leaving, exiting and removed when that is implemented
*
*/
case class Gossip(
overview: GossipOverview = GossipOverview(),
@ -192,12 +221,34 @@ case class Gossip(
extends ClusterMessage // is a serializable cluster message
with Versioned[Gossip] {
// FIXME can be disabled as optimization
assertInvariants
private def assertInvariants: Unit = {
val unreachableAndLive = members.intersect(overview.unreachable)
if (unreachableAndLive.nonEmpty)
throw new IllegalArgumentException("Same nodes in both members and unreachable is not allowed, got [%s]"
format unreachableAndLive.mkString(", "))
val allowedLiveMemberStatuses: Set[MemberStatus] = Set(Joining, Up, Leaving, Exiting)
def hasNotAllowedLiveMemberStatus(m: Member) = !allowedLiveMemberStatuses.contains(m.status)
if (members exists hasNotAllowedLiveMemberStatus)
throw new IllegalArgumentException("Live members must have status [%s], got [%s]"
format (allowedLiveMemberStatuses.mkString(", "),
(members filter hasNotAllowedLiveMemberStatus).mkString(", ")))
val seenButNotMember = overview.seen.keySet -- members.map(_.address) -- overview.unreachable.map(_.address)
if (seenButNotMember.nonEmpty)
throw new IllegalArgumentException("Nodes not part of cluster have marked the Gossip as seen, got [%s]"
format seenButNotMember.mkString(", "))
}
/**
* Increments the version for this 'Node'.
*/
def +(node: VectorClock.Node): Gossip = copy(version = version + node)
def :+(node: VectorClock.Node): Gossip = copy(version = version :+ node)
def +(member: Member): Gossip = {
def :+(member: Member): Gossip = {
if (members contains member) this
else this copy (members = members + member)
}
@ -220,26 +271,31 @@ case class Gossip(
// 1. merge vector clocks
val mergedVClock = this.version merge that.version
// 2. group all members by Address => Seq[Member]
val membersGroupedByAddress = (this.members.toSeq ++ that.members.toSeq).groupBy(_.address)
// 3. merge members by selecting the single Member with highest MemberStatus out of the Member groups
val mergedMembers =
SortedSet.empty[Member] ++
membersGroupedByAddress.values.foldLeft(Vector.empty[Member]) { (acc, members)
acc :+ members.reduceLeft(Member.highestPriorityOf(_, _))
}
// 4. merge meta-data
// 2. merge meta-data
val mergedMeta = this.meta ++ that.meta
// 5. merge gossip overview
val mergedOverview = GossipOverview(
this.overview.seen ++ that.overview.seen,
this.overview.unreachable ++ that.overview.unreachable)
def pickHighestPriority(a: Seq[Member], b: Seq[Member]): Set[Member] = {
// group all members by Address => Seq[Member]
val groupedByAddress = (a ++ b).groupBy(_.address)
// pick highest MemberStatus
(Set.empty[Member] /: groupedByAddress) {
case (acc, (_, members)) acc + members.reduceLeft(Member.highestPriorityOf)
}
}
Gossip(mergedOverview, mergedMembers, mergedMeta, mergedVClock)
}
// 3. merge unreachable by selecting the single Member with highest MemberStatus out of the Member groups
val mergedUnreachable = pickHighestPriority(this.overview.unreachable.toSeq, that.overview.unreachable.toSeq)
// 4. merge members by selecting the single Member with highest MemberStatus out of the Member groups,
// and exclude unreachable
val mergedMembers = Gossip.emptyMembers ++ pickHighestPriority(this.members.toSeq, that.members.toSeq).
filterNot(mergedUnreachable.contains)
// 5. fresh seen table
val mergedSeen = Map.empty[Address, VectorClock]
Gossip(GossipOverview(mergedSeen, mergedUnreachable), mergedMembers, mergedMeta, mergedVClock)
}
override def toString =
"Gossip(" +
@ -250,6 +306,11 @@ case class Gossip(
")"
}
/**
* Sent at regular intervals for failure detection.
*/
case class Heartbeat(from: Address) extends ClusterMessage
/**
* Manages routing of the different cluster commands.
* Instantiated as a single instance for each Cluster - e.g. commands are serialized to Cluster message after message.
@ -278,7 +339,8 @@ private[akka] final class ClusterGossipDaemon(cluster: Cluster) extends Actor {
val log = Logging(context.system, this)
def receive = {
case GossipEnvelope(sender, gossip) cluster.receive(sender, gossip)
case Heartbeat(from) cluster.receiveHeartbeat(from)
case GossipEnvelope(from, gossip) cluster.receiveGossip(from, gossip)
}
override def unhandled(unknown: Any) = log.error("[/system/cluster/gossip] can not respond to messages - received [{}]", unknown)
@ -387,6 +449,7 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector)
import clusterSettings._
val selfAddress = remote.transport.address
private val selfHeartbeat = Heartbeat(selfAddress)
private val vclockNode = VectorClock.Node(selfAddress.toString)
@ -404,7 +467,7 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector)
log.info("Cluster Node [{}] - is starting up...", selfAddress)
// create superisor for daemons under path "/system/cluster"
// create supervisor for daemons under path "/system/cluster"
private val clusterDaemons = {
val createChild = CreateChild(Props(new ClusterDaemonSupervisor(this)), "cluster")
Await.result(system.systemGuardian ? createChild, defaultTimeout.duration) match {
@ -414,9 +477,10 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector)
}
private val state = {
val member = Member(selfAddress, MemberStatus.Joining)
val gossip = Gossip(members = SortedSet.empty[Member] + member) + vclockNode // add me as member and update my vector clock
new AtomicReference[State](State(gossip))
val member = Member(selfAddress, Joining)
val versionedGossip = Gossip(members = Gossip.emptyMembers + member) :+ vclockNode // add me as member and update my vector clock
val seenVersionedGossip = versionedGossip seen selfAddress
new AtomicReference[State](State(seenVersionedGossip))
}
// try to join the node defined in the 'akka.cluster.node-to-join' option
@ -426,23 +490,65 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector)
// ===================== WORK DAEMONS =====================
// ========================================================
private val clusterScheduler: Scheduler with Closeable = {
if (system.settings.SchedulerTickDuration > SchedulerTickDuration) {
log.info("Using a dedicated scheduler for cluster. Default scheduler can be used if configured " +
"with 'akka.scheduler.tick-duration' [{} ms] <= 'akka.cluster.scheduler.tick-duration' [{} ms].",
system.settings.SchedulerTickDuration.toMillis, SchedulerTickDuration.toMillis)
val threadFactory = system.threadFactory match {
case tf: MonitorableThreadFactory tf.copy(name = tf.name + "-cluster-scheduler")
case tf tf
}
val hwt = new HashedWheelTimer(log,
threadFactory,
SchedulerTickDuration, SchedulerTicksPerWheel)
new DefaultScheduler(hwt, log, system.dispatcher)
} else {
// delegate to system.scheduler, but don't close
val systemScheduler = system.scheduler
new Scheduler with Closeable {
// we are using system.scheduler, which we are not responsible for closing
def close(): Unit = ()
def schedule(initialDelay: Duration, frequency: Duration, receiver: ActorRef, message: Any): Cancellable =
systemScheduler.schedule(initialDelay, frequency, receiver, message)
def schedule(initialDelay: Duration, frequency: Duration)(f: Unit): Cancellable =
systemScheduler.schedule(initialDelay, frequency)(f)
def schedule(initialDelay: Duration, frequency: Duration, runnable: Runnable): Cancellable =
systemScheduler.schedule(initialDelay, frequency, runnable)
def scheduleOnce(delay: Duration, runnable: Runnable): Cancellable =
systemScheduler.scheduleOnce(delay, runnable)
def scheduleOnce(delay: Duration, receiver: ActorRef, message: Any): Cancellable =
systemScheduler.scheduleOnce(delay, receiver, message)
def scheduleOnce(delay: Duration)(f: Unit): Cancellable =
systemScheduler.scheduleOnce(delay)(f)
}
}
}
// start periodic gossip to random nodes in cluster
private val gossipCanceller = system.scheduler.schedule(PeriodicTasksInitialDelay, GossipInterval) {
private val gossipTask = FixedRateTask(clusterScheduler, PeriodicTasksInitialDelay, GossipInterval) {
gossip()
}
// start periodic heartbeat to all nodes in cluster
private val heartbeatTask = FixedRateTask(clusterScheduler, PeriodicTasksInitialDelay, HeartbeatInterval) {
heartbeat()
}
// start periodic cluster failure detector reaping (moving nodes condemned by the failure detector to unreachable list)
private val failureDetectorReaperCanceller = system.scheduler.schedule(PeriodicTasksInitialDelay, UnreachableNodesReaperInterval) {
private val failureDetectorReaperTask = FixedRateTask(clusterScheduler, PeriodicTasksInitialDelay, UnreachableNodesReaperInterval) {
reapUnreachableMembers()
}
// start periodic leader action management (only applies for the current leader)
private val leaderActionsCanceller = system.scheduler.schedule(PeriodicTasksInitialDelay, LeaderActionsInterval) {
private val leaderActionsTask = FixedRateTask(clusterScheduler, PeriodicTasksInitialDelay, LeaderActionsInterval) {
leaderActions()
}
createMBean()
system.registerOnTermination(shutdown())
log.info("Cluster Node [{}] - has started up successfully", selfAddress)
// ======================================================
@ -509,11 +615,20 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector)
*/
private[akka] def shutdown(): Unit = {
if (isRunning.compareAndSet(true, false)) {
log.info("Cluster Node [{}] - Shutting down cluster node...", selfAddress)
gossipCanceller.cancel()
failureDetectorReaperCanceller.cancel()
leaderActionsCanceller.cancel()
system.stop(clusterDaemons)
log.info("Cluster Node [{}] - Shutting down cluster Node and cluster daemons...", selfAddress)
// cancel the periodic tasks, note that otherwise they will be run when scheduler is shutdown
gossipTask.cancel()
heartbeatTask.cancel()
failureDetectorReaperTask.cancel()
leaderActionsTask.cancel()
clusterScheduler.close()
// FIXME isTerminated check can be removed when ticket #2221 is fixed
// now it prevents logging if system is shutdown (or in progress of shutdown)
if (!clusterDaemons.isTerminated)
system.stop(clusterDaemons)
try {
mBeanServer.unregisterMBean(clusterMBeanName)
} catch {
@ -592,23 +707,30 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector)
val localState = state.get
val localGossip = localState.latestGossip
val localMembers = localGossip.members
val localUnreachable = localGossip.overview.unreachable
if (!localMembers.exists(_.address == node)) {
val alreadyMember = localMembers.exists(_.address == node)
val isUnreachable = localUnreachable.exists { m
m.address == node && m.status != Down && m.status != Removed
}
if (!alreadyMember && !isUnreachable) {
// remove the node from the 'unreachable' set in case it is a DOWN node that is rejoining cluster
val newUnreachableMembers = localGossip.overview.unreachable filterNot { _.address == node }
val newUnreachableMembers = localUnreachable filterNot { _.address == node }
val newOverview = localGossip.overview copy (unreachable = newUnreachableMembers)
val newMembers = localMembers + Member(node, MemberStatus.Joining) // add joining node as Joining
val newMembers = localMembers + Member(node, Joining) // add joining node as Joining
val newGossip = localGossip copy (overview = newOverview, members = newMembers)
val versionedGossip = newGossip + vclockNode
val versionedGossip = newGossip :+ vclockNode
val seenVersionedGossip = versionedGossip seen selfAddress
val newState = localState copy (latestGossip = seenVersionedGossip)
if (!state.compareAndSet(localState, newState)) joining(node) // recur if we failed update
else {
// treat join as initial heartbeat, so that it becomes unavailable if nothing more happens
if (node != selfAddress) failureDetector heartbeat node
notifyMembershipChangeListeners(localState, newState)
}
@ -626,17 +748,16 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector)
val localGossip = localState.latestGossip
val localMembers = localGossip.members
val newMembers = localMembers + Member(address, MemberStatus.Leaving) // mark node as LEAVING
val newMembers = localMembers + Member(address, Leaving) // mark node as LEAVING
val newGossip = localGossip copy (members = newMembers)
val versionedGossip = newGossip + vclockNode
val versionedGossip = newGossip :+ vclockNode
val seenVersionedGossip = versionedGossip seen selfAddress
val newState = localState copy (latestGossip = seenVersionedGossip)
if (!state.compareAndSet(localState, newState)) leaving(address) // recur if we failed update
else {
if (address != selfAddress) failureDetector heartbeat address // update heartbeat in failure detector
notifyMembershipChangeListeners(localState, newState)
}
}
@ -665,8 +786,8 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector)
}
/**
* The node to DOWN is removed from the 'members' set and put in the 'unreachable' set (if not alread there)
* and its status is set to DOWN. The node is alo removed from the 'seen' table.
* The node to DOWN is removed from the 'members' set and put in the 'unreachable' set (if not already there)
* and its status is set to DOWN. The node is also removed from the 'seen' table.
*
* The node will reside as DOWN in the 'unreachable' set until an explicit command JOIN command is sent directly
* to this node and it will then go through the normal JOINING procedure.
@ -681,44 +802,38 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector)
val localUnreachableMembers = localOverview.unreachable
// 1. check if the node to DOWN is in the 'members' set
var downedMember: Option[Member] = None
val newMembers =
localMembers
.map { member
if (member.address == address) {
log.info("Cluster Node [{}] - Marking node [{}] as DOWN", selfAddress, member.address)
val newMember = member copy (status = MemberStatus.Down)
downedMember = Some(newMember)
newMember
} else member
}
.filter(_.status != MemberStatus.Down)
val downedMember: Option[Member] = localMembers.collectFirst {
case m if m.address == address m.copy(status = Down)
}
val newMembers = downedMember match {
case Some(m)
log.info("Cluster Node [{}] - Marking node [{}] as DOWN", selfAddress, m.address)
localMembers - m
case None localMembers
}
// 2. check if the node to DOWN is in the 'unreachable' set
val newUnreachableMembers =
localUnreachableMembers
.filter(_.status != MemberStatus.Down) // no need to DOWN members already DOWN
.map { member
if (member.address == address) {
log.info("Cluster Node [{}] - Marking unreachable node [{}] as DOWN", selfAddress, member.address)
member copy (status = MemberStatus.Down)
} else member
}
localUnreachableMembers.map { member
// no need to DOWN members already DOWN
if (member.address == address && member.status != Down) {
log.info("Cluster Node [{}] - Marking unreachable node [{}] as DOWN", selfAddress, member.address)
member copy (status = Down)
} else member
}
// 3. add the newly DOWNED members from the 'members' (in step 1.) to the 'newUnreachableMembers' set.
val newUnreachablePlusNewlyDownedMembers = downedMember match {
case Some(member) newUnreachableMembers + member
case None newUnreachableMembers
}
val newUnreachablePlusNewlyDownedMembers = newUnreachableMembers ++ downedMember
// 4. remove nodes marked as DOWN from the 'seen' table
val newSeen = newUnreachablePlusNewlyDownedMembers.foldLeft(localSeen) { (currentSeen, member)
currentSeen - member.address
val newSeen = localSeen -- newUnreachablePlusNewlyDownedMembers.collect {
case m if m.status == Down m.address
}
val newOverview = localOverview copy (seen = newSeen, unreachable = newUnreachablePlusNewlyDownedMembers) // update gossip overview
// update gossip overview
val newOverview = localOverview copy (seen = newSeen, unreachable = newUnreachablePlusNewlyDownedMembers)
val newGossip = localGossip copy (overview = newOverview, members = newMembers) // update gossip
val versionedGossip = newGossip + vclockNode
val versionedGossip = newGossip :+ vclockNode
val newState = localState copy (latestGossip = versionedGossip seen selfAddress)
if (!state.compareAndSet(localState, newState)) downing(address) // recur if we fail the update
@ -731,7 +846,7 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector)
* Receive new gossip.
*/
@tailrec
final private[cluster] def receive(sender: Member, remoteGossip: Gossip): Unit = {
final private[cluster] def receiveGossip(from: Address, remoteGossip: Gossip): Unit = {
val localState = state.get
val localGossip = localState.latestGossip
@ -739,10 +854,11 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector)
if (remoteGossip.version <> localGossip.version) {
// concurrent
val mergedGossip = remoteGossip merge localGossip
val versionedMergedGossip = mergedGossip + vclockNode
val versionedMergedGossip = mergedGossip :+ vclockNode
log.debug(
"Can't establish a causal relationship between \"remote\" gossip [{}] and \"local\" gossip [{}] - merging them into [{}]",
// FIXME change to debug log level, when failure detector is stable
log.info(
"""Can't establish a causal relationship between "remote" gossip [{}] and "local" gossip [{}] - merging them into [{}]""",
remoteGossip, localGossip, versionedMergedGossip)
versionedMergedGossip
@ -759,55 +875,23 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector)
val newState = localState copy (latestGossip = winningGossip seen selfAddress)
// if we won the race then update else try again
if (!state.compareAndSet(localState, newState)) receive(sender, remoteGossip) // recur if we fail the update
if (!state.compareAndSet(localState, newState)) receiveGossip(from, remoteGossip) // recur if we fail the update
else {
log.info("Cluster Node [{}] - Receiving gossip from [{}]", selfAddress, sender.address)
if (sender.address != selfAddress) failureDetector heartbeat sender.address
log.debug("Cluster Node [{}] - Receiving gossip from [{}]", selfAddress, from)
notifyMembershipChangeListeners(localState, newState)
}
}
/**
* INTERNAL API
*/
private[cluster] def receiveHeartbeat(from: Address): Unit = failureDetector heartbeat from
/**
* Joins the pre-configured contact point.
*/
private def autoJoin(): Unit = nodeToJoin foreach join
/**
* Switches the member status.
*
* @param newStatus the new member status
* @param oldState the state to change the member status in
* @return the updated new state with the new member status
*/
private def switchMemberStatusTo(newStatus: MemberStatus, state: State): State = { // TODO: Removed this method? Currently not used.
log.debug("Cluster Node [{}] - Switching membership status to [{}]", selfAddress, newStatus)
val localSelf = self
val localGossip = state.latestGossip
val localMembers = localGossip.members
// change my state into a "new" self
val newSelf = localSelf copy (status = newStatus)
// change my state in 'gossip.members'
val newMembersSet = localMembers map { member
if (member.address == selfAddress) newSelf
else member
}
// NOTE: ugly crap to work around bug in scala colletions ('val ss: SortedSet[Member] = SortedSet.empty[Member] ++ aSet' does not compile)
val newMembersSortedSet = SortedSet[Member](newMembersSet.toList: _*)
val newGossip = localGossip copy (members = newMembersSortedSet)
// version my changes
val versionedGossip = newGossip + vclockNode
val seenVersionedGossip = versionedGossip seen selfAddress
state copy (latestGossip = seenVersionedGossip)
}
/**
* INTERNAL API
*
@ -816,7 +900,7 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector)
private[akka] def gossipTo(address: Address): Unit = {
val connection = clusterGossipConnectionFor(address)
log.debug("Cluster Node [{}] - Gossiping to [{}]", selfAddress, connection)
connection ! GossipEnvelope(self, latestGossip)
connection ! GossipEnvelope(selfAddress, latestGossip)
}
/**
@ -863,12 +947,7 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector)
log.debug("Cluster Node [{}] - Initiating new round of gossip", selfAddress)
if (isSingletonCluster(localState)) {
// gossip to myself
// TODO could perhaps be optimized, no need to gossip to myself when Up?
gossipTo(selfAddress)
} else if (isAvailable(localState)) {
if (!isSingletonCluster(localState) && isAvailable(localState)) {
val localGossip = localState.latestGossip
// important to not accidentally use `map` of the SortedSet, since the original order is not preserved
val localMembers = localGossip.members.toIndexedSeq
@ -899,6 +978,23 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector)
}
}
/**
* INTERNAL API
*/
private[akka] def heartbeat(): Unit = {
val localState = state.get
if (!isSingletonCluster(localState)) {
val liveMembers = localState.latestGossip.members.toIndexedSeq
for (member liveMembers; if member.address != selfAddress) {
val connection = clusterGossipConnectionFor(member.address)
log.debug("Cluster Node [{}] - Heartbeat to [{}]", selfAddress, connection)
connection ! selfHeartbeat
}
}
}
/**
* INTERNAL API
*
@ -920,14 +1016,14 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector)
if (newlyDetectedUnreachableMembers.nonEmpty) { // we have newly detected members marked as unavailable
val newMembers = localMembers diff newlyDetectedUnreachableMembers
val newUnreachableMembers: Set[Member] = localUnreachableMembers ++ newlyDetectedUnreachableMembers
val newMembers = localMembers -- newlyDetectedUnreachableMembers
val newUnreachableMembers = localUnreachableMembers ++ newlyDetectedUnreachableMembers
val newOverview = localOverview copy (unreachable = newUnreachableMembers)
val newGossip = localGossip copy (overview = newOverview, members = newMembers)
// updating vclock and 'seen' table
val versionedGossip = newGossip + vclockNode
val versionedGossip = newGossip :+ vclockNode
val seenVersionedGossip = versionedGossip seen selfAddress
val newState = localState copy (latestGossip = seenVersionedGossip)
@ -987,33 +1083,45 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector)
// ----------------------
// 1. Move EXITING => REMOVED - e.g. remove the nodes from the 'members' set/node ring
// ----------------------
localMembers filter { member
if (member.status == MemberStatus.Exiting) {
log.info("Cluster Node [{}] - Leader is moving node [{}] from EXITING to REMOVED - Removing node from node ring", selfAddress, member.address)
// localMembers filter { member
// if (member.status == MemberStatus.Exiting) {
// log.info("Cluster Node [{}] - Leader is moving node [{}] from EXITING to REMOVED - Removing node from node ring", selfAddress, member.address)
// hasChangedState = true
// clusterCommandConnectionFor(member.address) ! ClusterUserAction.Remove(member.address) // tell the removed node to shut himself down
// false
// } else true
localMembers map { member
// ----------------------
// 1. Move JOINING => UP (once all nodes have seen that this node is JOINING e.g. we have a convergence)
// ----------------------
if (member.status == Joining) {
log.info("Cluster Node [{}] - Leader is moving node [{}] from JOINING to UP", selfAddress, member.address)
hasChangedState = true
clusterCommandConnectionFor(member.address) ! ClusterUserAction.Remove(member.address) // tell the removed node to shut himself down
false
} else true
member copy (status = Up)
} else member
} map { member
// ----------------------
// 2. Move JOINING => UP (once all nodes have seen that this node is JOINING e.g. we have a convergence)
// 2. Move EXITING => REMOVED (once all nodes have seen that this node is EXITING e.g. we have a convergence)
// ----------------------
if (member.status == MemberStatus.Joining) {
log.info("Cluster Node [{}] - Leader is moving node [{}] from JOINING to UP", selfAddress, member.address)
if (member.status == Exiting) {
log.info("Cluster Node [{}] - Leader is moving node [{}] from EXITING to REMOVED", selfAddress, member.address)
hasChangedState = true
member copy (status = MemberStatus.Up)
member copy (status = Removed)
} else member
} map { member
// ----------------------
// 3. Move LEAVING => EXITING (once we have a convergence on LEAVING *and* if we have a successful partition handoff)
// ----------------------
if (member.status == MemberStatus.Leaving && hasPartionHandoffCompletedSuccessfully(localGossip)) {
if (member.status == Leaving && hasPartionHandoffCompletedSuccessfully(localGossip)) {
log.info("Cluster Node [{}] - Leader is moving node [{}] from LEAVING to EXITING", selfAddress, member.address)
hasChangedState = true
clusterCommandConnectionFor(member.address) ! ClusterLeaderAction.Exit(member.address) // FIXME should use ? to await completion of handoff?
member copy (status = MemberStatus.Exiting)
// clusterCommandConnectionFor(member.address) ! ClusterLeaderAction.Exit(member.address) // FIXME should use ? to await completion of handoff?
member copy (status = Exiting)
} else member
}
@ -1028,16 +1136,20 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector)
// 4. Move UNREACHABLE => DOWN (auto-downing by leader)
// ----------------------
val newUnreachableMembers =
localUnreachableMembers
.filter(_.status != MemberStatus.Down) // no need to DOWN members already DOWN
.map { member
localUnreachableMembers.map { member
// no need to DOWN members already DOWN
if (member.status == Down) member
else {
log.info("Cluster Node [{}] - Leader is marking unreachable node [{}] as DOWN", selfAddress, member.address)
hasChangedState = true
member copy (status = MemberStatus.Down)
member copy (status = Down)
}
}
// removing nodes marked as DOWN from the 'seen' table
val newSeen = localUnreachableMembers.foldLeft(localSeen)((currentSeen, member) currentSeen - member.address)
val newSeen = localSeen -- newUnreachableMembers.collect {
case m if m.status == Down m.address
}
val newOverview = localOverview copy (seen = newSeen, unreachable = newUnreachableMembers) // update gossip overview
localGossip copy (overview = newOverview) // update gossip
@ -1049,7 +1161,7 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector)
// ----------------------
// 5. Updating the vclock version for the changes
// ----------------------
val versionedGossip = newGossip + vclockNode
val versionedGossip = newGossip :+ vclockNode
// ----------------------
// 6. Updating the 'seen' table
@ -1076,24 +1188,39 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector)
private def convergence(gossip: Gossip): Option[Gossip] = {
val overview = gossip.overview
val unreachable = overview.unreachable
val seen = overview.seen
// First check that:
// 1. we don't have any members that are unreachable (unreachable.isEmpty == true), or
// 2. all unreachable members in the set have status DOWN or REMOVED
// 1. we don't have any members that are unreachable, or
// 2. all unreachable members in the set have status DOWN
// Else we can't continue to check for convergence
// When that is done we check that all the entries in the 'seen' table have the same vector clock version
if (unreachable.isEmpty || !unreachable.exists { m
m.status != MemberStatus.Down &&
m.status != MemberStatus.Removed
}) {
val seen = gossip.overview.seen
val views = Set.empty[VectorClock] ++ seen.values
// and that all members exists in seen table
val hasUnreachable = unreachable.nonEmpty && unreachable.exists { m
m.status != Down && m.status != Removed
}
val allMembersInSeen = gossip.members.forall(m seen.contains(m.address))
if (views.size == 1) {
if (hasUnreachable) {
log.debug("Cluster Node [{}] - No cluster convergence, due to unreachable nodes [{}].", selfAddress, unreachable)
None
} else if (!allMembersInSeen) {
log.debug("Cluster Node [{}] - No cluster convergence, due to members not in seen table [{}].", selfAddress,
gossip.members.map(_.address) -- seen.keySet)
None
} else {
val views = seen.values.toSet.size
if (views == 1) {
log.debug("Cluster Node [{}] - Cluster convergence reached: [{}]", selfAddress, gossip.members.mkString(", "))
Some(gossip)
} else None
} else None
} else {
log.debug("Cluster Node [{}] - No cluster convergence, since not all nodes have seen the same state yet. [{} of {}]",
selfAddress, views, seen.values.size)
None
}
}
}
private def isAvailable(state: State): Boolean = !isUnavailable(state)
@ -1104,7 +1231,7 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector)
val localMembers = localGossip.members
val localUnreachableMembers = localOverview.unreachable
val isUnreachable = localUnreachableMembers exists { _.address == selfAddress }
val hasUnavailableMemberStatus = localMembers exists { m (m == self) && MemberStatus.isUnavailable(m.status) }
val hasUnavailableMemberStatus = localMembers exists { m (m == self) && m.status.isUnavailable }
isUnreachable || hasUnavailableMemberStatus
}