Merged with master

This commit is contained in:
Jonas Bonér 2012-06-14 16:13:53 +02:00
commit cb0cfac6c7
96 changed files with 2377 additions and 1965 deletions

View file

@ -186,7 +186,7 @@ case class GossipOverview(
*/
case class Gossip(
overview: GossipOverview = GossipOverview(),
members: SortedSet[Member], // sorted set of members with their status, sorted by name
members: SortedSet[Member], // sorted set of members with their status, sorted by address
meta: Map[String, Array[Byte]] = Map.empty[String, Array[Byte]],
version: VectorClock = VectorClock()) // vector clock version
extends ClusterMessage // is a serializable cluster message
@ -220,12 +220,8 @@ case class Gossip(
// 1. merge vector clocks
val mergedVClock = this.version merge that.version
// 2. group all members by Address => Vector[Member]
var membersGroupedByAddress = Map.empty[Address, Vector[Member]]
(this.members ++ that.members) foreach { m
val ms = membersGroupedByAddress.get(m.address).getOrElse(Vector.empty[Member])
membersGroupedByAddress += (m.address -> (ms :+ m))
}
// 2. group all members by Address => Seq[Member]
val membersGroupedByAddress = (this.members.toSeq ++ that.members.toSeq).groupBy(_.address)
// 3. merge members by selecting the single Member with highest MemberStatus out of the Member groups
val mergedMembers =
@ -243,7 +239,7 @@ case class Gossip(
this.overview.unreachable ++ that.overview.unreachable)
Gossip(mergedOverview, mergedMembers, mergedMeta, mergedVClock)
}
}
override def toString =
"Gossip(" +
@ -258,11 +254,9 @@ case class Gossip(
* Manages routing of the different cluster commands.
* Instantiated as a single instance for each Cluster - e.g. commands are serialized to Cluster message after message.
*/
final class ClusterCommandDaemon extends Actor {
import ClusterUserAction._
import ClusterLeaderAction._
private[akka] final class ClusterCommandDaemon(cluster: Cluster) extends Actor {
import ClusterAction._
val cluster = Cluster(context.system)
val log = Logging(context.system, this)
def receive = {
@ -280,9 +274,8 @@ final class ClusterCommandDaemon extends Actor {
* Pooled and routed with N number of configurable instances.
* Concurrent access to Cluster.
*/
final class ClusterGossipDaemon extends Actor {
private[akka] final class ClusterGossipDaemon(cluster: Cluster) extends Actor {
val log = Logging(context.system, this)
val cluster = Cluster(context.system)
def receive = {
case GossipEnvelope(sender, gossip) cluster.receive(sender, gossip)
@ -294,13 +287,13 @@ final class ClusterGossipDaemon extends Actor {
/**
* Supervisor managing the different Cluster daemons.
*/
final class ClusterDaemonSupervisor extends Actor {
private[akka] final class ClusterDaemonSupervisor(cluster: Cluster) extends Actor {
val log = Logging(context.system, this)
val cluster = Cluster(context.system)
private val commands = context.actorOf(Props[ClusterCommandDaemon], "commands")
private val commands = context.actorOf(Props(new ClusterCommandDaemon(cluster)), "commands")
private val gossip = context.actorOf(
Props[ClusterGossipDaemon].withRouter(RoundRobinRouter(cluster.clusterSettings.NrOfGossipDaemons)), "gossip")
Props(new ClusterGossipDaemon(cluster)).withRouter(
RoundRobinRouter(cluster.clusterSettings.NrOfGossipDaemons)), "gossip")
def receive = Actor.emptyBehavior
@ -319,7 +312,21 @@ object Cluster extends ExtensionId[Cluster] with ExtensionIdProvider {
override def lookup = Cluster
override def createExtension(system: ExtendedActorSystem): Cluster = new Cluster(system)
override def createExtension(system: ExtendedActorSystem): Cluster = {
val clusterSettings = new ClusterSettings(system.settings.config, system.name)
val failureDetector = clusterSettings.FailureDetectorImplementationClass match {
case None new AccrualFailureDetector(system, clusterSettings)
case Some(fqcn)
system.dynamicAccess.createInstanceFor[FailureDetector](
fqcn, Seq((classOf[ActorSystem], system), (classOf[ClusterSettings], clusterSettings))) match {
case Right(fd) fd
case Left(e) throw new ConfigurationException("Could not create custom failure detector [" + fqcn + "] due to:" + e.toString)
}
}
new Cluster(system, failureDetector)
}
}
/**
@ -360,7 +367,7 @@ trait ClusterNodeMBean {
* if (Cluster(system).isLeader) { ... }
* }}}
*/
class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode
class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector) extends Extension { clusterNode
/**
* Represents the state for this Cluster. Implemented using optimistic lockless concurrency.
@ -377,24 +384,15 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒
val remoteSettings = new RemoteSettings(system.settings.config, system.name)
val clusterSettings = new ClusterSettings(system.settings.config, system.name)
import clusterSettings._
val selfAddress = remote.transport.address
val failureDetector = new AccrualFailureDetector(
system, selfAddress, clusterSettings.FailureDetectorThreshold, clusterSettings.FailureDetectorMaxSampleSize)
private val vclockNode = VectorClock.Node(selfAddress.toString)
private val periodicTasksInitialDelay = clusterSettings.PeriodicTasksInitialDelay
private val gossipFrequency = clusterSettings.GossipFrequency
private val leaderActionsFrequency = clusterSettings.LeaderActionsFrequency
private val unreachableNodesReaperFrequency = clusterSettings.UnreachableNodesReaperFrequency
implicit private val defaultTimeout = Timeout(remoteSettings.RemoteSystemDaemonAckTimeout)
private val autoDown = clusterSettings.AutoDown
private val nrOfDeputyNodes = clusterSettings.NrOfDeputyNodes
private val nrOfGossipDaemons = clusterSettings.NrOfGossipDaemons
private val nodeToJoin: Option[Address] = clusterSettings.NodeToJoin filter (_ != selfAddress)
private val nodeToJoin: Option[Address] = NodeToJoin filter (_ != selfAddress)
private val serialization = remote.serialization
@ -408,7 +406,7 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒
// create superisor for daemons under path "/system/cluster"
private val clusterDaemons = {
val createChild = CreateChild(Props[ClusterDaemonSupervisor], "cluster")
val createChild = CreateChild(Props(new ClusterDaemonSupervisor(this)), "cluster")
Await.result(system.systemGuardian ? createChild, defaultTimeout.duration) match {
case a: ActorRef a
case e: Exception throw e
@ -429,17 +427,17 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒
// ========================================================
// start periodic gossip to random nodes in cluster
private val gossipCanceller = system.scheduler.schedule(periodicTasksInitialDelay, gossipFrequency) {
private val gossipCanceller = system.scheduler.schedule(PeriodicTasksInitialDelay, GossipInterval) {
gossip()
}
// start periodic cluster failure detector reaping (moving nodes condemned by the failure detector to unreachable list)
private val failureDetectorReaperCanceller = system.scheduler.schedule(periodicTasksInitialDelay, unreachableNodesReaperFrequency) {
private val failureDetectorReaperCanceller = system.scheduler.schedule(PeriodicTasksInitialDelay, UnreachableNodesReaperInterval) {
reapUnreachableMembers()
}
// start periodic leader action management (only applies for the current leader)
private val leaderActionsCanceller = system.scheduler.schedule(periodicTasksInitialDelay, leaderActionsFrequency) {
private val leaderActionsCanceller = system.scheduler.schedule(PeriodicTasksInitialDelay, LeaderActionsInterval) {
leaderActions()
}
@ -493,7 +491,7 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒
/**
* Checks if we have a cluster convergence.
*
* @returns Some(convergedGossip) if convergence have been reached and None if not
* @return Some(convergedGossip) if convergence have been reached and None if not
*/
def convergence: Option[Gossip] = convergence(latestGossip)
@ -594,27 +592,25 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒
val localState = state.get
val localGossip = localState.latestGossip
val localMembers = localGossip.members
val localOverview = localGossip.overview
val localUnreachableMembers = localOverview.unreachable
// remove the node from the 'unreachable' set in case it is a DOWN node that is rejoining cluster
val newUnreachableMembers = localUnreachableMembers filterNot { _.address == node }
val newOverview = localOverview copy (unreachable = newUnreachableMembers)
if (!localMembers.exists(_.address == node)) {
val newMembers = localMembers + Member(node, MemberStatus.Joining) // add joining node as Joining
val newGossip = localGossip copy (overview = newOverview, members = newMembers)
// remove the node from the 'unreachable' set in case it is a DOWN node that is rejoining cluster
val newUnreachableMembers = localGossip.overview.unreachable filterNot { _.address == node }
val newOverview = localGossip.overview copy (unreachable = newUnreachableMembers)
val versionedGossip = newGossip + vclockNode
val seenVersionedGossip = versionedGossip seen selfAddress
val newMembers = localMembers + Member(node, MemberStatus.Joining) // add joining node as Joining
val newGossip = localGossip copy (overview = newOverview, members = newMembers)
val newState = localState copy (latestGossip = seenVersionedGossip)
val versionedGossip = newGossip + vclockNode
val seenVersionedGossip = versionedGossip seen selfAddress
if (!state.compareAndSet(localState, newState)) joining(node) // recur if we failed update
else {
if (node != selfAddress) failureDetector heartbeat node
val newState = localState copy (latestGossip = seenVersionedGossip)
if (convergence(newState.latestGossip).isDefined) {
newState.memberMembershipChangeListeners foreach { _ notify newMembers }
if (!state.compareAndSet(localState, newState)) joining(node) // recur if we failed update
else {
if (node != selfAddress) failureDetector heartbeat node
notifyMembershipChangeListeners(localState, newState)
}
}
}
@ -640,13 +636,18 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒
if (!state.compareAndSet(localState, newState)) leaving(address) // recur if we failed update
else {
failureDetector heartbeat address // update heartbeat in failure detector
if (convergence(newState.latestGossip).isDefined) {
newState.memberMembershipChangeListeners foreach { _ notify newMembers }
}
if (address != selfAddress) failureDetector heartbeat address // update heartbeat in failure detector
notifyMembershipChangeListeners(localState, newState)
}
}
private def notifyMembershipChangeListeners(oldState: State, newState: State): Unit = {
val oldMembersStatus = oldState.latestGossip.members.toSeq.map(m (m.address, m.status))
val newMembersStatus = newState.latestGossip.members.toSeq.map(m (m.address, m.status))
if (newMembersStatus != oldMembersStatus)
newState.memberMembershipChangeListeners foreach { _ notify newState.latestGossip.members }
}
/**
* State transition to EXITING.
*/
@ -722,9 +723,7 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒
if (!state.compareAndSet(localState, newState)) downing(address) // recur if we fail the update
else {
if (convergence(newState.latestGossip).isDefined) {
newState.memberMembershipChangeListeners foreach { _ notify newState.latestGossip.members }
}
notifyMembershipChangeListeners(localState, newState)
}
}
@ -739,7 +738,6 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒
val winningGossip =
if (remoteGossip.version <> localGossip.version) {
// concurrent
println("=======>>> CONCURRENT")
val mergedGossip = remoteGossip merge localGossip
val versionedMergedGossip = mergedGossip + vclockNode
@ -750,17 +748,14 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒
versionedMergedGossip
} else if (remoteGossip.version < localGossip.version) {
println("=======>>> LOCAL")
// local gossip is newer
localGossip
} else {
println("=======>>> REMOTE")
// remote gossip is newer
remoteGossip
}
println("=======>>> WINNING " + winningGossip.members.mkString(", "))
val newState = localState copy (latestGossip = winningGossip seen selfAddress)
// if we won the race then update else try again
@ -769,10 +764,7 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒
log.info("Cluster Node [{}] - Receiving gossip from [{}]", selfAddress, sender.address)
if (sender.address != selfAddress) failureDetector heartbeat sender.address
if (convergence(newState.latestGossip).isDefined) {
newState.memberMembershipChangeListeners foreach { _ notify newState.latestGossip.members }
}
notifyMembershipChangeListeners(localState, newState)
}
}
@ -817,9 +809,11 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒
}
/**
* INTERNAL API
*
* Gossips latest gossip to an address.
*/
private def gossipTo(address: Address): Unit = {
private[akka] def gossipTo(address: Address): Unit = {
val connection = clusterGossipConnectionFor(address)
log.debug("Cluster Node [{}] - Gossiping to [{}]", selfAddress, connection)
connection ! GossipEnvelope(self, latestGossip)
@ -828,66 +822,90 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒
/**
* Gossips latest gossip to a random member in the set of members passed in as argument.
*
* @return 'true' if it gossiped to a "deputy" member.
* @return the used [[akka.actor.Address] if any
*/
private def gossipToRandomNodeOf(addresses: Iterable[Address]): Boolean = {
private def gossipToRandomNodeOf(addresses: IndexedSeq[Address]): Option[Address] = {
log.debug("Cluster Node [{}] - Selecting random node to gossip to [{}]", selfAddress, addresses.mkString(", "))
if (addresses.isEmpty) false
else {
val peers = addresses filter (_ != selfAddress) // filter out myself
val peer = selectRandomNode(peers)
gossipTo(peer)
deputyNodes exists (peer == _)
val peers = addresses filterNot (_ == selfAddress) // filter out myself
val peer = selectRandomNode(peers)
peer foreach gossipTo
peer
}
/**
* INTERNAL API
*/
private[akka] def gossipToUnreachableProbablity(membersSize: Int, unreachableSize: Int): Double =
(membersSize + unreachableSize) match {
case 0 0.0
case sum unreachableSize.toDouble / sum
}
/**
* INTERNAL API
*/
private[akka] def gossipToDeputyProbablity(membersSize: Int, unreachableSize: Int, nrOfDeputyNodes: Int): Double = {
if (nrOfDeputyNodes > membersSize) 1.0
else if (nrOfDeputyNodes == 0) 0.0
else (membersSize + unreachableSize) match {
case 0 0.0
case sum (nrOfDeputyNodes + unreachableSize).toDouble / sum
}
}
/**
* INTERNAL API
*
* Initates a new round of gossip.
*/
private def gossip(): Unit = {
private[akka] def gossip(): Unit = {
val localState = state.get
log.debug("Cluster Node [{}] - Initiating new round of gossip", selfAddress)
if (isSingletonCluster(localState)) {
// gossip to myself
// TODO could perhaps be optimized, no need to gossip to myself when Up?
gossipTo(selfAddress)
} else if (isAvailable(localState)) {
log.debug("Cluster Node [{}] - Initiating new round of gossip", selfAddress)
val localGossip = localState.latestGossip
val localMembers = localGossip.members
// important to not accidentally use `map` of the SortedSet, since the original order is not preserved
val localMembers = localGossip.members.toIndexedSeq
val localMembersSize = localMembers.size
val localMemberAddresses = localMembers map { _.address }
val localUnreachableMembers = localGossip.overview.unreachable
val localUnreachableMembers = localGossip.overview.unreachable.toIndexedSeq
val localUnreachableSize = localUnreachableMembers.size
// 1. gossip to alive members
val gossipedToDeputy = gossipToRandomNodeOf(localMembers map { _.address })
val gossipedToAlive = gossipToRandomNodeOf(localMemberAddresses)
// 2. gossip to unreachable members
if (localUnreachableSize > 0) {
val probability: Double = localUnreachableSize / (localMembersSize + 1)
if (ThreadLocalRandom.current.nextDouble() < probability) gossipToRandomNodeOf(localUnreachableMembers.map(_.address))
val probability = gossipToUnreachableProbablity(localMembersSize, localUnreachableSize)
if (ThreadLocalRandom.current.nextDouble() < probability)
gossipToRandomNodeOf(localUnreachableMembers.map(_.address))
}
// 3. gossip to a deputy nodes for facilitating partition healing
val deputies = deputyNodes
if ((!gossipedToDeputy || localMembersSize < 1) && deputies.nonEmpty) {
if (localMembersSize == 0) gossipToRandomNodeOf(deputies)
else {
val probability = 1.0 / localMembersSize + localUnreachableSize
if (ThreadLocalRandom.current.nextDouble() <= probability) gossipToRandomNodeOf(deputies)
}
val deputies = deputyNodes(localMemberAddresses)
val alreadyGossipedToDeputy = gossipedToAlive.map(deputies.contains(_)).getOrElse(false)
if ((!alreadyGossipedToDeputy || localMembersSize < NrOfDeputyNodes) && deputies.nonEmpty) {
val probability = gossipToDeputyProbablity(localMembersSize, localUnreachableSize, NrOfDeputyNodes)
if (ThreadLocalRandom.current.nextDouble() < probability)
gossipToRandomNodeOf(deputies)
}
}
}
/**
* INTERNAL API
*
* Reaps the unreachable members (moves them to the 'unreachable' list in the cluster overview) according to the failure detector's verdict.
*/
@tailrec
final private def reapUnreachableMembers(): Unit = {
final private[akka] def reapUnreachableMembers(): Unit = {
val localState = state.get
if (!isSingletonCluster(localState) && isAvailable(localState)) {
@ -895,7 +913,6 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒
val localGossip = localState.latestGossip
val localOverview = localGossip.overview
val localSeen = localOverview.seen
val localMembers = localGossip.members
val localUnreachableMembers = localGossip.overview.unreachable
@ -920,19 +937,19 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒
else {
log.info("Cluster Node [{}] - Marking node(s) as UNREACHABLE [{}]", selfAddress, newlyDetectedUnreachableMembers.mkString(", "))
if (convergence(newState.latestGossip).isDefined) {
newState.memberMembershipChangeListeners foreach { _ notify newMembers }
}
notifyMembershipChangeListeners(localState, newState)
}
}
}
}
/**
* INTERNAL API
*
* Runs periodic leader actions, such as auto-downing unreachable nodes, assigning partitions etc.
*/
@tailrec
final private def leaderActions(): Unit = {
final private[akka] def leaderActions(): Unit = {
val localState = state.get
val localGossip = localState.latestGossip
val localMembers = localGossip.members
@ -1003,7 +1020,7 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒
localGossip copy (members = newMembers) // update gossip
} else if (autoDown) {
} else if (AutoDown) {
// we don't have convergence - so we might have unreachable nodes
// if 'auto-down' is turned on, then try to auto-down any unreachable nodes
@ -1044,9 +1061,7 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒
// if we won the race then update else try again
if (!state.compareAndSet(localState, newState)) leaderActions() // recur
else {
if (convergence(newState.latestGossip).isDefined) {
newState.memberMembershipChangeListeners foreach { _ notify newGossip.members }
}
notifyMembershipChangeListeners(localState, newState)
}
}
}
@ -1074,10 +1089,8 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒
val seen = gossip.overview.seen
val views = Set.empty[VectorClock] ++ seen.values
println("=======>>> VIEWS " + views.size)
if (views.size == 1) {
log.debug("Cluster Node [{}] - Cluster convergence reached", selfAddress)
println("=======>>> ----------------------- HAS CONVERGENCE")
log.debug("Cluster Node [{}] - Cluster convergence reached: [{}]", selfAddress, gossip.members.mkString(", "))
Some(gossip)
} else None
} else None
@ -1111,11 +1124,17 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒
private def clusterGossipConnectionFor(address: Address): ActorRef = system.actorFor(RootActorPath(address) / "system" / "cluster" / "gossip")
/**
* Gets an Iterable with the addresses of a all the 'deputy' nodes - excluding this node if part of the group.
* Gets the addresses of a all the 'deputy' nodes - excluding this node if part of the group.
*/
private def deputyNodes: Iterable[Address] = state.get.latestGossip.members.toIterable map (_.address) drop 1 take nrOfDeputyNodes filter (_ != selfAddress)
private def deputyNodes(addresses: IndexedSeq[Address]): IndexedSeq[Address] =
addresses drop 1 take NrOfDeputyNodes filterNot (_ == selfAddress)
private def selectRandomNode(addresses: Iterable[Address]): Address = addresses.toSeq(ThreadLocalRandom.current nextInt addresses.size)
/**
* INTERNAL API
*/
private[akka] def selectRandomNode(addresses: IndexedSeq[Address]): Option[Address] =
if (addresses.isEmpty) None
else Some(addresses(ThreadLocalRandom.current nextInt addresses.size))
private def isSingletonCluster(currentState: State): Boolean = currentState.latestGossip.members.size == 1