2011-10-26 08:48:16 +02:00
|
|
|
/**
|
2012-01-19 18:21:06 +01:00
|
|
|
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
|
2011-10-26 08:48:16 +02:00
|
|
|
*/
|
|
|
|
|
|
2012-01-31 13:33:04 +01:00
|
|
|
package akka.cluster
|
2011-10-26 08:48:16 +02:00
|
|
|
|
|
|
|
|
import akka.actor._
|
|
|
|
|
import akka.actor.Status._
|
2012-01-31 13:33:04 +01:00
|
|
|
import akka.remote._
|
2012-02-20 15:26:12 +01:00
|
|
|
import akka.routing._
|
2011-10-27 12:46:10 +02:00
|
|
|
import akka.event.Logging
|
2012-01-24 12:09:32 +01:00
|
|
|
import akka.dispatch.Await
|
2012-01-31 13:33:04 +01:00
|
|
|
import akka.pattern.ask
|
|
|
|
|
import akka.util._
|
2012-03-02 16:20:30 +01:00
|
|
|
import akka.util.duration._
|
2012-05-16 17:04:13 +02:00
|
|
|
import akka.ConfigurationException
|
2011-11-25 12:02:25 +01:00
|
|
|
|
2012-01-30 11:41:41 +01:00
|
|
|
import java.util.concurrent.atomic.{ AtomicReference, AtomicBoolean }
|
2012-01-24 12:09:32 +01:00
|
|
|
import java.util.concurrent.TimeUnit._
|
|
|
|
|
import java.util.concurrent.TimeoutException
|
2011-10-27 15:14:15 +02:00
|
|
|
import java.security.SecureRandom
|
2011-11-25 12:02:25 +01:00
|
|
|
|
2012-04-14 20:06:03 +02:00
|
|
|
import java.lang.management.ManagementFactory
|
|
|
|
|
import javax.management._
|
|
|
|
|
|
2012-01-30 11:41:41 +01:00
|
|
|
import scala.collection.immutable.{ Map, SortedSet }
|
2011-10-26 08:48:16 +02:00
|
|
|
import scala.annotation.tailrec
|
2011-11-25 12:02:25 +01:00
|
|
|
|
2012-01-24 12:09:32 +01:00
|
|
|
import com.google.protobuf.ByteString
|
|
|
|
|
|
2011-10-26 08:48:16 +02:00
|
|
|
/**
|
2012-02-09 15:59:10 +01:00
|
|
|
* Interface for membership change listener.
|
2011-10-26 08:48:16 +02:00
|
|
|
*/
|
2012-02-18 22:14:53 +01:00
|
|
|
trait MembershipChangeListener {
|
2012-02-09 15:59:10 +01:00
|
|
|
def notify(members: SortedSet[Member]): Unit
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Interface for meta data change listener.
|
|
|
|
|
*/
|
2012-03-12 19:22:02 +01:00
|
|
|
trait MetaDataChangeListener {
|
2012-02-09 15:59:10 +01:00
|
|
|
def notify(meta: Map[String, Array[Byte]]): Unit
|
2011-10-26 08:48:16 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
2012-01-30 11:41:41 +01:00
|
|
|
* Base trait for all cluster messages. All ClusterMessage's are serializable.
|
2011-10-26 08:48:16 +02:00
|
|
|
*/
|
2012-01-30 19:40:28 +01:00
|
|
|
sealed trait ClusterMessage extends Serializable
|
|
|
|
|
|
2012-01-30 11:41:41 +01:00
|
|
|
/**
|
2012-02-28 17:04:48 +01:00
|
|
|
* Cluster commands sent by the USER.
|
2012-01-30 11:41:41 +01:00
|
|
|
*/
|
2012-02-29 10:02:00 +01:00
|
|
|
object ClusterAction {
|
2011-10-26 08:48:16 +02:00
|
|
|
|
2012-02-28 17:04:48 +01:00
|
|
|
/**
|
|
|
|
|
* Command to join the cluster. Sent when a node (reprsesented by 'address')
|
|
|
|
|
* wants to join another node (the receiver).
|
|
|
|
|
*/
|
|
|
|
|
case class Join(address: Address) extends ClusterMessage
|
2012-02-09 13:36:39 +01:00
|
|
|
|
2012-02-29 10:02:00 +01:00
|
|
|
/**
|
|
|
|
|
* Command to set a node to Up (from Joining).
|
|
|
|
|
*/
|
2012-03-03 23:55:48 +01:00
|
|
|
case class Up(address: Address) extends ClusterMessage
|
2012-02-29 10:02:00 +01:00
|
|
|
|
2012-02-28 17:04:48 +01:00
|
|
|
/**
|
|
|
|
|
* Command to leave the cluster.
|
|
|
|
|
*/
|
2012-03-03 23:55:48 +01:00
|
|
|
case class Leave(address: Address) extends ClusterMessage
|
2012-02-28 17:04:48 +01:00
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Command to mark node as temporary down.
|
|
|
|
|
*/
|
2012-03-03 23:55:48 +01:00
|
|
|
case class Down(address: Address) extends ClusterMessage
|
2012-02-28 17:04:48 +01:00
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Command to mark a node to be removed from the cluster immediately.
|
|
|
|
|
*/
|
2012-03-03 23:55:48 +01:00
|
|
|
case class Exit(address: Address) extends ClusterMessage
|
2012-02-28 17:04:48 +01:00
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Command to remove a node from the cluster immediately.
|
|
|
|
|
*/
|
2012-03-03 23:55:48 +01:00
|
|
|
case class Remove(address: Address) extends ClusterMessage
|
2012-02-28 17:04:48 +01:00
|
|
|
}
|
2011-10-26 08:48:16 +02:00
|
|
|
|
2012-01-30 11:41:41 +01:00
|
|
|
/**
|
|
|
|
|
* Represents the address and the current status of a cluster member node.
|
|
|
|
|
*/
|
2012-03-09 12:56:56 +01:00
|
|
|
class Member(val address: Address, val status: MemberStatus) extends ClusterMessage {
|
|
|
|
|
override def hashCode = address.##
|
|
|
|
|
override def equals(other: Any) = Member.unapply(this) == Member.unapply(other)
|
|
|
|
|
override def toString = "Member(address = %s, status = %s)" format (address, status)
|
|
|
|
|
def copy(address: Address = this.address, status: MemberStatus = this.status): Member = new Member(address, status)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Factory and Utility module for Member instances.
|
|
|
|
|
*/
|
|
|
|
|
object Member {
|
|
|
|
|
import MemberStatus._
|
|
|
|
|
|
2012-05-25 12:10:17 +02:00
|
|
|
implicit val addressOrdering: Ordering[Address] = Ordering.fromLessThan[Address] { (a, b) ⇒
|
2012-05-25 12:51:37 +02:00
|
|
|
if (a.protocol != b.protocol) a.protocol.compareTo(b.protocol) < 0
|
|
|
|
|
else if (a.system != b.system) a.system.compareTo(b.system) < 0
|
|
|
|
|
else if (a.host.getOrElse("") != b.host.getOrElse("")) a.host.getOrElse("").compareTo(b.host.getOrElse("")) < 0
|
|
|
|
|
else if (a.port.getOrElse(0) != b.port.getOrElse(0)) a.port.getOrElse(0) < b.port.getOrElse(0)
|
2012-05-25 12:10:17 +02:00
|
|
|
else false
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
implicit val ordering: Ordering[Member] = new Ordering[Member] {
|
|
|
|
|
def compare(x: Member, y: Member) = addressOrdering.compare(x.address, y.address)
|
|
|
|
|
}
|
2012-03-09 12:56:56 +01:00
|
|
|
|
|
|
|
|
def apply(address: Address, status: MemberStatus): Member = new Member(address, status)
|
|
|
|
|
|
|
|
|
|
def unapply(other: Any) = other match {
|
|
|
|
|
case m: Member ⇒ Some(m.address)
|
|
|
|
|
case _ ⇒ None
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Picks the Member with the highest "priority" MemberStatus.
|
|
|
|
|
*/
|
|
|
|
|
def highestPriorityOf(m1: Member, m2: Member): Member = (m1.status, m2.status) match {
|
|
|
|
|
case (Removed, _) ⇒ m1
|
|
|
|
|
case (_, Removed) ⇒ m2
|
|
|
|
|
case (Down, _) ⇒ m1
|
|
|
|
|
case (_, Down) ⇒ m2
|
|
|
|
|
case (Exiting, _) ⇒ m1
|
|
|
|
|
case (_, Exiting) ⇒ m2
|
|
|
|
|
case (Leaving, _) ⇒ m1
|
|
|
|
|
case (_, Leaving) ⇒ m2
|
|
|
|
|
case (Up, Joining) ⇒ m1
|
|
|
|
|
case (Joining, Up) ⇒ m2
|
|
|
|
|
case (Joining, Joining) ⇒ m1
|
|
|
|
|
case (Up, Up) ⇒ m1
|
|
|
|
|
}
|
|
|
|
|
}
|
2011-10-26 08:48:16 +02:00
|
|
|
|
2012-02-09 15:59:10 +01:00
|
|
|
/**
|
|
|
|
|
* Envelope adding a sender address to the gossip.
|
|
|
|
|
*/
|
|
|
|
|
case class GossipEnvelope(sender: Member, gossip: Gossip) extends ClusterMessage
|
|
|
|
|
|
2012-01-24 12:09:32 +01:00
|
|
|
/**
|
2012-01-30 11:41:41 +01:00
|
|
|
* Defines the current status of a cluster member node
|
|
|
|
|
*
|
|
|
|
|
* Can be one of: Joining, Up, Leaving, Exiting and Down.
|
2012-01-24 12:09:32 +01:00
|
|
|
*/
|
2012-02-07 16:53:49 +01:00
|
|
|
sealed trait MemberStatus extends ClusterMessage
|
2012-01-30 11:41:41 +01:00
|
|
|
object MemberStatus {
|
2012-02-07 16:53:49 +01:00
|
|
|
case object Joining extends MemberStatus
|
|
|
|
|
case object Up extends MemberStatus
|
|
|
|
|
case object Leaving extends MemberStatus
|
|
|
|
|
case object Exiting extends MemberStatus
|
|
|
|
|
case object Down extends MemberStatus
|
2012-02-28 17:04:48 +01:00
|
|
|
case object Removed extends MemberStatus
|
2012-03-09 12:56:56 +01:00
|
|
|
|
|
|
|
|
def isUnavailable(status: MemberStatus): Boolean = {
|
|
|
|
|
status == MemberStatus.Down ||
|
|
|
|
|
status == MemberStatus.Exiting ||
|
|
|
|
|
status == MemberStatus.Removed ||
|
|
|
|
|
status == MemberStatus.Leaving
|
|
|
|
|
}
|
2012-01-24 12:09:32 +01:00
|
|
|
}
|
|
|
|
|
|
2012-02-09 13:36:39 +01:00
|
|
|
/**
|
2012-02-14 20:50:12 +01:00
|
|
|
* Represents the overview of the cluster, holds the cluster convergence table and set with unreachable nodes.
|
2012-02-09 13:36:39 +01:00
|
|
|
*/
|
|
|
|
|
case class GossipOverview(
|
2012-02-09 15:59:10 +01:00
|
|
|
seen: Map[Address, VectorClock] = Map.empty[Address, VectorClock],
|
2012-03-09 12:56:56 +01:00
|
|
|
unreachable: Set[Member] = Set.empty[Member]) {
|
|
|
|
|
|
|
|
|
|
// FIXME document when nodes are put in 'unreachable' set and removed from 'members'
|
2012-02-14 20:50:12 +01:00
|
|
|
|
|
|
|
|
override def toString =
|
|
|
|
|
"GossipOverview(seen = [" + seen.mkString(", ") +
|
|
|
|
|
"], unreachable = [" + unreachable.mkString(", ") +
|
|
|
|
|
"])"
|
|
|
|
|
}
|
2012-02-09 13:36:39 +01:00
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Represents the state of the cluster; cluster ring membership, ring convergence, meta data - all versioned by a vector clock.
|
|
|
|
|
*/
|
|
|
|
|
case class Gossip(
|
|
|
|
|
overview: GossipOverview = GossipOverview(),
|
|
|
|
|
members: SortedSet[Member], // sorted set of members with their status, sorted by name
|
|
|
|
|
meta: Map[String, Array[Byte]] = Map.empty[String, Array[Byte]],
|
|
|
|
|
version: VectorClock = VectorClock()) // vector clock version
|
|
|
|
|
extends ClusterMessage // is a serializable cluster message
|
2012-02-14 20:50:12 +01:00
|
|
|
with Versioned[Gossip] {
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Increments the version for this 'Node'.
|
|
|
|
|
*/
|
|
|
|
|
def +(node: VectorClock.Node): Gossip = copy(version = version + node)
|
2012-02-09 15:59:10 +01:00
|
|
|
|
2012-02-14 20:50:12 +01:00
|
|
|
def +(member: Member): Gossip = {
|
2012-02-09 15:59:10 +01:00
|
|
|
if (members contains member) this
|
|
|
|
|
else this copy (members = members + member)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Marks the gossip as seen by this node (remoteAddress) by updating the address entry in the 'gossip.overview.seen'
|
|
|
|
|
* Map with the VectorClock for the new gossip.
|
|
|
|
|
*/
|
2012-03-02 09:55:54 +01:00
|
|
|
def seen(address: Address): Gossip = {
|
|
|
|
|
if (overview.seen.contains(address) && overview.seen(address) == version) this
|
|
|
|
|
else this copy (overview = overview copy (seen = overview.seen + (address -> version)))
|
|
|
|
|
}
|
2012-02-09 15:59:10 +01:00
|
|
|
|
2012-03-09 12:56:56 +01:00
|
|
|
/**
|
|
|
|
|
* Merges two Gossip instances including membership tables, meta-data tables and the VectorClock histories.
|
|
|
|
|
*/
|
|
|
|
|
def merge(that: Gossip): Gossip = {
|
|
|
|
|
import Member.ordering
|
|
|
|
|
|
|
|
|
|
// 1. merge vector clocks
|
|
|
|
|
val mergedVClock = this.version merge that.version
|
|
|
|
|
|
|
|
|
|
// 2. group all members by Address => Vector[Member]
|
|
|
|
|
var membersGroupedByAddress = Map.empty[Address, Vector[Member]]
|
|
|
|
|
(this.members ++ that.members) foreach { m ⇒
|
|
|
|
|
val ms = membersGroupedByAddress.get(m.address).getOrElse(Vector.empty[Member])
|
|
|
|
|
membersGroupedByAddress += (m.address -> (ms :+ m))
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// 3. merge members by selecting the single Member with highest MemberStatus out of the Member groups
|
|
|
|
|
val mergedMembers =
|
|
|
|
|
SortedSet.empty[Member] ++
|
|
|
|
|
membersGroupedByAddress.values.foldLeft(Vector.empty[Member]) { (acc, members) ⇒
|
|
|
|
|
acc :+ members.reduceLeft(Member.highestPriorityOf(_, _))
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// 4. merge meta-data
|
|
|
|
|
val mergedMeta = this.meta ++ that.meta
|
|
|
|
|
|
|
|
|
|
// 5. merge gossip overview
|
|
|
|
|
val mergedOverview = GossipOverview(
|
|
|
|
|
this.overview.seen ++ that.overview.seen,
|
|
|
|
|
this.overview.unreachable ++ that.overview.unreachable)
|
|
|
|
|
|
|
|
|
|
Gossip(mergedOverview, mergedMembers, mergedMeta, mergedVClock)
|
|
|
|
|
}
|
|
|
|
|
|
2012-02-14 20:50:12 +01:00
|
|
|
override def toString =
|
|
|
|
|
"Gossip(" +
|
|
|
|
|
"overview = " + overview +
|
|
|
|
|
", members = [" + members.mkString(", ") +
|
|
|
|
|
"], meta = [" + meta.mkString(", ") +
|
|
|
|
|
"], version = " + version +
|
|
|
|
|
")"
|
2012-02-09 15:59:10 +01:00
|
|
|
}
|
2012-01-24 12:09:32 +01:00
|
|
|
|
2012-02-20 15:26:12 +01:00
|
|
|
/**
|
2012-03-09 12:56:56 +01:00
|
|
|
* Manages routing of the different cluster commands.
|
2012-03-22 23:04:04 +01:00
|
|
|
* Instantiated as a single instance for each Cluster - e.g. commands are serialized to Cluster message after message.
|
2012-02-20 15:26:12 +01:00
|
|
|
*/
|
2012-03-09 12:56:56 +01:00
|
|
|
final class ClusterCommandDaemon extends Actor {
|
|
|
|
|
import ClusterAction._
|
|
|
|
|
|
2012-03-22 23:04:04 +01:00
|
|
|
val cluster = Cluster(context.system)
|
2012-03-09 12:56:56 +01:00
|
|
|
val log = Logging(context.system, this)
|
2012-02-20 15:26:12 +01:00
|
|
|
|
2012-03-09 12:56:56 +01:00
|
|
|
def receive = {
|
2012-03-22 23:04:04 +01:00
|
|
|
case Join(address) ⇒ cluster.joining(address)
|
|
|
|
|
case Up(address) ⇒ cluster.up(address)
|
|
|
|
|
case Down(address) ⇒ cluster.downing(address)
|
|
|
|
|
case Leave(address) ⇒ cluster.leaving(address)
|
|
|
|
|
case Exit(address) ⇒ cluster.exiting(address)
|
|
|
|
|
case Remove(address) ⇒ cluster.removing(address)
|
2012-02-20 15:26:12 +01:00
|
|
|
}
|
2012-03-09 12:56:56 +01:00
|
|
|
|
|
|
|
|
override def unhandled(unknown: Any) = log.error("Illegal command [{}]", unknown)
|
2012-02-20 15:26:12 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
2012-03-03 23:55:48 +01:00
|
|
|
* Pooled and routed with N number of configurable instances.
|
2012-03-22 23:04:04 +01:00
|
|
|
* Concurrent access to Cluster.
|
2012-02-20 15:26:12 +01:00
|
|
|
*/
|
2012-03-03 23:55:48 +01:00
|
|
|
final class ClusterGossipDaemon extends Actor {
|
|
|
|
|
val log = Logging(context.system, this)
|
2012-03-22 23:04:04 +01:00
|
|
|
val cluster = Cluster(context.system)
|
2012-01-24 12:09:32 +01:00
|
|
|
|
|
|
|
|
def receive = {
|
2012-03-22 23:04:04 +01:00
|
|
|
case GossipEnvelope(sender, gossip) ⇒ cluster.receive(sender, gossip)
|
2012-01-24 12:09:32 +01:00
|
|
|
}
|
2012-03-02 09:55:54 +01:00
|
|
|
|
2012-03-03 23:55:48 +01:00
|
|
|
override def unhandled(unknown: Any) = log.error("[/system/cluster/gossip] can not respond to messages - received [{}]", unknown)
|
2012-03-02 16:20:30 +01:00
|
|
|
}
|
|
|
|
|
|
2012-03-09 12:56:56 +01:00
|
|
|
/**
|
2012-03-22 23:04:04 +01:00
|
|
|
* Supervisor managing the different Cluster daemons.
|
2012-03-09 12:56:56 +01:00
|
|
|
*/
|
2012-03-03 23:55:48 +01:00
|
|
|
final class ClusterDaemonSupervisor extends Actor {
|
|
|
|
|
val log = Logging(context.system, this)
|
2012-03-22 23:04:04 +01:00
|
|
|
val cluster = Cluster(context.system)
|
2012-03-02 16:20:30 +01:00
|
|
|
|
2012-03-03 23:55:48 +01:00
|
|
|
private val commands = context.actorOf(Props[ClusterCommandDaemon], "commands")
|
|
|
|
|
private val gossip = context.actorOf(
|
2012-03-22 23:04:04 +01:00
|
|
|
Props[ClusterGossipDaemon].withRouter(RoundRobinRouter(cluster.clusterSettings.NrOfGossipDaemons)), "gossip")
|
2012-03-02 16:20:30 +01:00
|
|
|
|
2012-03-03 23:55:48 +01:00
|
|
|
def receive = Actor.emptyBehavior
|
2012-03-02 16:20:30 +01:00
|
|
|
|
2012-03-12 19:22:02 +01:00
|
|
|
override def unhandled(unknown: Any): Unit = log.error("[/system/cluster] can not respond to messages - received [{}]", unknown)
|
2012-01-24 12:09:32 +01:00
|
|
|
}
|
|
|
|
|
|
2012-02-22 18:40:16 +01:00
|
|
|
/**
|
2012-03-22 23:04:04 +01:00
|
|
|
* Cluster Extension Id and factory for creating Cluster extension.
|
2012-02-22 18:40:16 +01:00
|
|
|
* Example:
|
|
|
|
|
* {{{
|
2012-04-12 12:25:39 +02:00
|
|
|
* if (Cluster(system).isLeader) { ... }
|
2012-02-29 10:02:00 +01:00
|
|
|
* }}}
|
2012-02-22 18:40:16 +01:00
|
|
|
*/
|
2012-03-22 23:04:04 +01:00
|
|
|
object Cluster extends ExtensionId[Cluster] with ExtensionIdProvider {
|
|
|
|
|
override def get(system: ActorSystem): Cluster = super.get(system)
|
2012-02-22 18:40:16 +01:00
|
|
|
|
2012-03-22 23:04:04 +01:00
|
|
|
override def lookup = Cluster
|
2012-02-22 18:40:16 +01:00
|
|
|
|
2012-03-22 23:04:04 +01:00
|
|
|
override def createExtension(system: ExtendedActorSystem): Cluster = new Cluster(system)
|
2012-02-22 18:40:16 +01:00
|
|
|
}
|
2012-02-14 20:50:12 +01:00
|
|
|
|
2012-04-14 20:06:03 +02:00
|
|
|
/**
|
|
|
|
|
* Interface for the cluster JMX MBean.
|
|
|
|
|
*/
|
|
|
|
|
trait ClusterNodeMBean {
|
|
|
|
|
def getMemberStatus: String
|
|
|
|
|
def getClusterStatus: String
|
2012-04-16 11:23:03 +02:00
|
|
|
def getLeader: String
|
2012-04-14 20:06:03 +02:00
|
|
|
|
|
|
|
|
def isSingleton: Boolean
|
|
|
|
|
def isConvergence: Boolean
|
|
|
|
|
def isAvailable: Boolean
|
|
|
|
|
|
|
|
|
|
def join(address: String)
|
|
|
|
|
def leave(address: String)
|
|
|
|
|
def down(address: String)
|
|
|
|
|
def remove(address: String)
|
|
|
|
|
|
|
|
|
|
def shutdown()
|
|
|
|
|
}
|
|
|
|
|
|
2011-10-26 08:48:16 +02:00
|
|
|
/**
|
|
|
|
|
* This module is responsible for Gossiping cluster information. The abstraction maintains the list of live
|
2012-01-30 11:41:41 +01:00
|
|
|
* and dead members. Periodically i.e. every 1 second this module chooses a random member and initiates a round
|
2011-10-26 08:48:16 +02:00
|
|
|
* of Gossip with it. Whenever it gets gossip updates it updates the Failure Detector with the liveness
|
|
|
|
|
* information.
|
|
|
|
|
* <p/>
|
2012-01-30 11:41:41 +01:00
|
|
|
* During each of these runs the member initiates gossip exchange according to following rules (as defined in the
|
2011-10-26 08:48:16 +02:00
|
|
|
* Cassandra documentation [http://wiki.apache.org/cassandra/ArchitectureGossip]:
|
|
|
|
|
* <pre>
|
2012-01-30 11:41:41 +01:00
|
|
|
* 1) Gossip to random live member (if any)
|
|
|
|
|
* 2) Gossip to random unreachable member with certain probability depending on number of unreachable and live members
|
2012-02-07 16:53:49 +01:00
|
|
|
* 3) If the member gossiped to at (1) was not deputy, or the number of live members is less than number of deputy list,
|
|
|
|
|
* gossip to random deputy with certain probability depending on number of unreachable, deputy and live members.
|
2011-10-26 08:48:16 +02:00
|
|
|
* </pre>
|
2012-02-29 10:02:00 +01:00
|
|
|
*
|
|
|
|
|
* Example:
|
|
|
|
|
* {{{
|
2012-04-12 12:25:39 +02:00
|
|
|
* if (Cluster(system).isLeader) { ... }
|
2012-02-29 10:02:00 +01:00
|
|
|
* }}}
|
2011-10-26 08:48:16 +02:00
|
|
|
*/
|
2012-04-14 20:06:03 +02:00
|
|
|
class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒
|
2012-02-22 18:40:16 +01:00
|
|
|
|
2011-10-26 08:48:16 +02:00
|
|
|
/**
|
2012-03-22 23:04:04 +01:00
|
|
|
* Represents the state for this Cluster. Implemented using optimistic lockless concurrency.
|
2012-03-15 23:00:20 +01:00
|
|
|
* All state is represented by this immutable case class and managed by an AtomicReference.
|
2011-10-26 08:48:16 +02:00
|
|
|
*/
|
|
|
|
|
private case class State(
|
2012-02-08 14:14:01 +01:00
|
|
|
latestGossip: Gossip,
|
2012-02-09 15:59:10 +01:00
|
|
|
memberMembershipChangeListeners: Set[MembershipChangeListener] = Set.empty[MembershipChangeListener])
|
2011-10-26 08:48:16 +02:00
|
|
|
|
2012-02-29 10:02:00 +01:00
|
|
|
if (!system.provider.isInstanceOf[RemoteActorRefProvider])
|
|
|
|
|
throw new ConfigurationException("ActorSystem[" + system + "] needs to have a 'RemoteActorRefProvider' enabled in the configuration")
|
|
|
|
|
|
|
|
|
|
private val remote: RemoteActorRefProvider = system.provider.asInstanceOf[RemoteActorRefProvider]
|
2012-01-30 19:40:28 +01:00
|
|
|
|
2012-03-02 16:20:30 +01:00
|
|
|
val remoteSettings = new RemoteSettings(system.settings.config, system.name)
|
|
|
|
|
val clusterSettings = new ClusterSettings(system.settings.config, system.name)
|
2012-02-07 16:53:49 +01:00
|
|
|
|
2012-03-09 12:56:56 +01:00
|
|
|
val remoteAddress = remote.transport.address
|
|
|
|
|
val failureDetector = new AccrualFailureDetector(
|
|
|
|
|
system, remoteAddress, clusterSettings.FailureDetectorThreshold, clusterSettings.FailureDetectorMaxSampleSize)
|
|
|
|
|
|
2012-02-29 10:02:00 +01:00
|
|
|
private val vclockNode = VectorClock.Node(remoteAddress.toString)
|
2012-02-07 16:53:49 +01:00
|
|
|
|
2012-03-15 23:00:20 +01:00
|
|
|
private val periodicTasksInitialDelay = clusterSettings.PeriodicTasksInitialDelay
|
2012-02-29 10:02:00 +01:00
|
|
|
private val gossipFrequency = clusterSettings.GossipFrequency
|
2012-03-15 23:00:20 +01:00
|
|
|
private val leaderActionsFrequency = clusterSettings.LeaderActionsFrequency
|
|
|
|
|
private val unreachableNodesReaperFrequency = clusterSettings.UnreachableNodesReaperFrequency
|
2012-02-08 16:15:31 +01:00
|
|
|
|
2012-02-29 10:02:00 +01:00
|
|
|
implicit private val defaultTimeout = Timeout(remoteSettings.RemoteSystemDaemonAckTimeout)
|
2012-01-24 12:09:32 +01:00
|
|
|
|
2012-03-09 12:56:56 +01:00
|
|
|
private val autoDown = clusterSettings.AutoDown
|
2011-01-01 01:50:33 +01:00
|
|
|
private val nrOfDeputyNodes = clusterSettings.NrOfDeputyNodes
|
2012-02-20 15:26:12 +01:00
|
|
|
private val nrOfGossipDaemons = clusterSettings.NrOfGossipDaemons
|
2012-02-09 15:59:10 +01:00
|
|
|
private val nodeToJoin: Option[Address] = clusterSettings.NodeToJoin filter (_ != remoteAddress)
|
2011-10-26 08:48:16 +02:00
|
|
|
|
2012-01-31 15:00:46 +01:00
|
|
|
private val serialization = remote.serialization
|
|
|
|
|
|
2012-01-30 11:41:41 +01:00
|
|
|
private val isRunning = new AtomicBoolean(true)
|
2012-02-20 15:45:50 +01:00
|
|
|
private val log = Logging(system, "Node")
|
2011-10-27 15:14:15 +02:00
|
|
|
private val random = SecureRandom.getInstance("SHA1PRNG")
|
2012-01-30 11:41:41 +01:00
|
|
|
|
2012-04-14 20:06:03 +02:00
|
|
|
private val mBeanServer = ManagementFactory.getPlatformMBeanServer
|
|
|
|
|
private val clusterMBeanName = new ObjectName("akka:type=Cluster")
|
|
|
|
|
|
|
|
|
|
log.info("Cluster Node [{}] - is starting up...", remoteAddress)
|
2012-03-09 12:56:56 +01:00
|
|
|
|
2012-03-02 16:20:30 +01:00
|
|
|
// create superisor for daemons under path "/system/cluster"
|
|
|
|
|
private val clusterDaemons = {
|
2012-03-03 23:55:48 +01:00
|
|
|
val createChild = CreateChild(Props[ClusterDaemonSupervisor], "cluster")
|
2012-03-02 16:20:30 +01:00
|
|
|
Await.result(system.systemGuardian ? createChild, defaultTimeout.duration) match {
|
|
|
|
|
case a: ActorRef ⇒ a
|
|
|
|
|
case e: Exception ⇒ throw e
|
|
|
|
|
}
|
|
|
|
|
}
|
2012-02-20 15:26:12 +01:00
|
|
|
|
2012-02-08 14:14:01 +01:00
|
|
|
private val state = {
|
|
|
|
|
val member = Member(remoteAddress, MemberStatus.Joining)
|
2012-02-20 15:45:50 +01:00
|
|
|
val gossip = Gossip(members = SortedSet.empty[Member] + member) + vclockNode // add me as member and update my vector clock
|
2012-03-09 12:56:56 +01:00
|
|
|
new AtomicReference[State](State(gossip))
|
2012-02-08 14:14:01 +01:00
|
|
|
}
|
2011-10-26 08:48:16 +02:00
|
|
|
|
2012-02-08 14:14:01 +01:00
|
|
|
// try to join the node defined in the 'akka.cluster.node-to-join' option
|
2012-02-28 17:04:48 +01:00
|
|
|
autoJoin()
|
2012-01-24 12:09:32 +01:00
|
|
|
|
2012-03-09 12:56:56 +01:00
|
|
|
// ========================================================
|
|
|
|
|
// ===================== WORK DAEMONS =====================
|
|
|
|
|
// ========================================================
|
|
|
|
|
|
2012-02-14 20:50:12 +01:00
|
|
|
// start periodic gossip to random nodes in cluster
|
2012-03-15 23:00:20 +01:00
|
|
|
private val gossipCanceller = system.scheduler.schedule(periodicTasksInitialDelay, gossipFrequency) {
|
2012-02-08 14:14:01 +01:00
|
|
|
gossip()
|
|
|
|
|
}
|
2012-02-14 20:50:12 +01:00
|
|
|
|
2012-03-09 12:56:56 +01:00
|
|
|
// start periodic cluster failure detector reaping (moving nodes condemned by the failure detector to unreachable list)
|
2012-03-15 23:00:20 +01:00
|
|
|
private val failureDetectorReaperCanceller = system.scheduler.schedule(periodicTasksInitialDelay, unreachableNodesReaperFrequency) {
|
2012-03-09 12:56:56 +01:00
|
|
|
reapUnreachableMembers()
|
2012-02-08 14:14:01 +01:00
|
|
|
}
|
2012-01-24 12:09:32 +01:00
|
|
|
|
2012-03-09 12:56:56 +01:00
|
|
|
// start periodic leader action management (only applies for the current leader)
|
2012-03-15 23:00:20 +01:00
|
|
|
private val leaderActionsCanceller = system.scheduler.schedule(periodicTasksInitialDelay, leaderActionsFrequency) {
|
2012-03-09 12:56:56 +01:00
|
|
|
leaderActions()
|
|
|
|
|
}
|
|
|
|
|
|
2012-04-14 20:06:03 +02:00
|
|
|
createMBean()
|
|
|
|
|
|
|
|
|
|
log.info("Cluster Node [{}] - has started up successfully", remoteAddress)
|
2012-03-09 12:56:56 +01:00
|
|
|
|
2012-02-14 20:50:12 +01:00
|
|
|
// ======================================================
|
|
|
|
|
// ===================== PUBLIC API =====================
|
|
|
|
|
// ======================================================
|
|
|
|
|
|
2012-03-09 12:56:56 +01:00
|
|
|
def self: Member = latestGossip.members
|
|
|
|
|
.find(_.address == remoteAddress)
|
2012-04-16 11:23:03 +02:00
|
|
|
.getOrElse(throw new IllegalStateException("Can't find 'this' Member (" + remoteAddress + ") in the cluster membership ring"))
|
2012-03-09 12:56:56 +01:00
|
|
|
|
2012-02-14 20:50:12 +01:00
|
|
|
/**
|
|
|
|
|
* Latest gossip.
|
|
|
|
|
*/
|
|
|
|
|
def latestGossip: Gossip = state.get.latestGossip
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Member status for this node.
|
|
|
|
|
*/
|
2012-03-09 12:56:56 +01:00
|
|
|
def status: MemberStatus = self.status
|
2012-02-14 20:50:12 +01:00
|
|
|
|
2012-02-20 17:22:07 +01:00
|
|
|
/**
|
|
|
|
|
* Is this node the leader?
|
|
|
|
|
*/
|
|
|
|
|
def isLeader: Boolean = {
|
2012-03-09 12:56:56 +01:00
|
|
|
val members = latestGossip.members
|
|
|
|
|
!members.isEmpty && (remoteAddress == members.head.address)
|
2012-02-20 17:22:07 +01:00
|
|
|
}
|
|
|
|
|
|
2012-04-16 11:23:03 +02:00
|
|
|
/**
|
|
|
|
|
* Get the address of the current leader.
|
|
|
|
|
*/
|
|
|
|
|
def leader: Address = latestGossip.members.head.address
|
|
|
|
|
|
2012-02-14 20:50:12 +01:00
|
|
|
/**
|
|
|
|
|
* Is this node a singleton cluster?
|
|
|
|
|
*/
|
|
|
|
|
def isSingletonCluster: Boolean = isSingletonCluster(state.get)
|
|
|
|
|
|
2011-01-01 01:50:33 +01:00
|
|
|
/**
|
|
|
|
|
* Checks if we have a cluster convergence.
|
|
|
|
|
*
|
|
|
|
|
* @returns Some(convergedGossip) if convergence have been reached and None if not
|
|
|
|
|
*/
|
|
|
|
|
def convergence: Option[Gossip] = convergence(latestGossip)
|
|
|
|
|
|
2012-03-09 12:56:56 +01:00
|
|
|
/**
|
|
|
|
|
* Returns true if the node is UP or JOINING.
|
|
|
|
|
*/
|
|
|
|
|
def isAvailable: Boolean = !isUnavailable(state.get)
|
|
|
|
|
|
2012-01-24 12:09:32 +01:00
|
|
|
/**
|
2012-01-30 11:41:41 +01:00
|
|
|
* Shuts down all connections to other members, the cluster daemon and the periodic gossip and cleanup tasks.
|
2012-01-24 12:09:32 +01:00
|
|
|
*/
|
|
|
|
|
def shutdown() {
|
2012-01-30 11:41:41 +01:00
|
|
|
if (isRunning.compareAndSet(true, false)) {
|
2012-03-22 23:04:04 +01:00
|
|
|
log.info("Cluster Node [{}] - Shutting down cluster Node and cluster daemons...", remoteAddress)
|
2012-03-02 09:55:54 +01:00
|
|
|
gossipCanceller.cancel()
|
2012-03-09 12:56:56 +01:00
|
|
|
failureDetectorReaperCanceller.cancel()
|
|
|
|
|
leaderActionsCanceller.cancel()
|
2012-03-02 16:20:30 +01:00
|
|
|
system.stop(clusterDaemons)
|
2012-04-16 16:58:19 +02:00
|
|
|
try {
|
|
|
|
|
mBeanServer.unregisterMBean(clusterMBeanName)
|
|
|
|
|
} catch {
|
|
|
|
|
case e: InstanceNotFoundException ⇒ // ignore - we are running multiple cluster nodes in the same JVM (probably for testing)
|
|
|
|
|
}
|
2012-01-30 11:41:41 +01:00
|
|
|
}
|
2011-10-26 08:48:16 +02:00
|
|
|
}
|
|
|
|
|
|
2011-01-01 01:50:33 +01:00
|
|
|
/**
|
|
|
|
|
* Registers a listener to subscribe to cluster membership changes.
|
|
|
|
|
*/
|
|
|
|
|
@tailrec
|
|
|
|
|
final def registerListener(listener: MembershipChangeListener) {
|
|
|
|
|
val localState = state.get
|
|
|
|
|
val newListeners = localState.memberMembershipChangeListeners + listener
|
|
|
|
|
val newState = localState copy (memberMembershipChangeListeners = newListeners)
|
|
|
|
|
if (!state.compareAndSet(localState, newState)) registerListener(listener) // recur
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Unsubscribes to cluster membership changes.
|
|
|
|
|
*/
|
|
|
|
|
@tailrec
|
|
|
|
|
final def unregisterListener(listener: MembershipChangeListener) {
|
|
|
|
|
val localState = state.get
|
|
|
|
|
val newListeners = localState.memberMembershipChangeListeners - listener
|
|
|
|
|
val newState = localState copy (memberMembershipChangeListeners = newListeners)
|
|
|
|
|
if (!state.compareAndSet(localState, newState)) unregisterListener(listener) // recur
|
|
|
|
|
}
|
|
|
|
|
|
2012-02-29 10:02:00 +01:00
|
|
|
/**
|
2012-04-12 22:50:50 +02:00
|
|
|
* Try to join this cluster node with the node specified by 'address'.
|
2012-04-16 11:23:03 +02:00
|
|
|
* A 'Join(thisNodeAddress)' command is sent to the node to join.
|
2012-02-29 10:02:00 +01:00
|
|
|
*/
|
2012-04-12 22:50:50 +02:00
|
|
|
def join(address: Address) {
|
|
|
|
|
val connection = clusterCommandConnectionFor(address)
|
|
|
|
|
val command = ClusterAction.Join(remoteAddress)
|
|
|
|
|
log.info("Cluster Node [{}] - Trying to send JOIN to [{}] through connection [{}]", remoteAddress, address, connection)
|
|
|
|
|
connection ! command
|
2012-02-29 10:02:00 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
2012-04-12 22:50:50 +02:00
|
|
|
* Send command to issue state transition to LEAVING for the node specified by 'address'.
|
2012-02-29 10:02:00 +01:00
|
|
|
*/
|
2012-04-12 22:50:50 +02:00
|
|
|
def leave(address: Address) {
|
2012-03-03 23:55:48 +01:00
|
|
|
clusterCommandDaemon ! ClusterAction.Leave(address)
|
2012-02-29 10:02:00 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
2012-04-12 22:50:50 +02:00
|
|
|
* Send command to issue state transition to from DOWN to EXITING for the node specified by 'address'.
|
2012-02-29 10:02:00 +01:00
|
|
|
*/
|
2012-04-12 22:50:50 +02:00
|
|
|
def down(address: Address) {
|
2012-03-03 23:55:48 +01:00
|
|
|
clusterCommandDaemon ! ClusterAction.Down(address)
|
2012-02-29 10:02:00 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
2012-04-12 22:50:50 +02:00
|
|
|
* Send command to issue state transition to REMOVED for the node specified by 'address'.
|
2012-02-29 10:02:00 +01:00
|
|
|
*/
|
2012-04-12 22:50:50 +02:00
|
|
|
def remove(address: Address) {
|
2012-03-03 23:55:48 +01:00
|
|
|
clusterCommandDaemon ! ClusterAction.Remove(address)
|
2012-02-29 10:02:00 +01:00
|
|
|
}
|
|
|
|
|
|
2011-01-01 01:50:33 +01:00
|
|
|
// ========================================================
|
|
|
|
|
// ===================== INTERNAL API =====================
|
|
|
|
|
// ========================================================
|
|
|
|
|
|
2012-02-08 14:14:01 +01:00
|
|
|
/**
|
2012-02-28 17:04:48 +01:00
|
|
|
* State transition to JOINING.
|
2012-02-08 14:14:01 +01:00
|
|
|
* New node joining.
|
|
|
|
|
*/
|
|
|
|
|
@tailrec
|
2011-01-01 01:50:33 +01:00
|
|
|
private[cluster] final def joining(node: Address) {
|
2012-03-22 23:04:04 +01:00
|
|
|
log.info("Cluster Node [{}] - Node [{}] is JOINING", remoteAddress, node)
|
2012-02-08 16:15:31 +01:00
|
|
|
|
2012-02-14 20:50:12 +01:00
|
|
|
val localState = state.get
|
|
|
|
|
val localGossip = localState.latestGossip
|
|
|
|
|
val localMembers = localGossip.members
|
2012-03-09 12:56:56 +01:00
|
|
|
val localOverview = localGossip.overview
|
|
|
|
|
val localUnreachableMembers = localOverview.unreachable
|
|
|
|
|
|
|
|
|
|
// remove the node from the 'unreachable' set in case it is a DOWN node that is rejoining cluster
|
|
|
|
|
val newUnreachableMembers = localUnreachableMembers filterNot { _.address == node }
|
|
|
|
|
val newOverview = localOverview copy (unreachable = newUnreachableMembers)
|
2012-02-08 16:15:31 +01:00
|
|
|
|
2012-02-14 20:50:12 +01:00
|
|
|
val newMembers = localMembers + Member(node, MemberStatus.Joining) // add joining node as Joining
|
2012-03-09 12:56:56 +01:00
|
|
|
val newGossip = localGossip copy (overview = newOverview, members = newMembers)
|
2012-02-14 20:50:12 +01:00
|
|
|
|
2012-02-20 15:45:50 +01:00
|
|
|
val versionedGossip = newGossip + vclockNode
|
2012-02-14 20:50:12 +01:00
|
|
|
val seenVersionedGossip = versionedGossip seen remoteAddress
|
|
|
|
|
|
|
|
|
|
val newState = localState copy (latestGossip = seenVersionedGossip)
|
|
|
|
|
|
|
|
|
|
if (!state.compareAndSet(localState, newState)) joining(node) // recur if we failed update
|
2012-02-18 22:14:53 +01:00
|
|
|
else {
|
2012-03-02 09:55:54 +01:00
|
|
|
failureDetector heartbeat node // update heartbeat in failure detector
|
2012-02-18 22:14:53 +01:00
|
|
|
if (convergence(newState.latestGossip).isDefined) {
|
2012-03-02 09:55:54 +01:00
|
|
|
newState.memberMembershipChangeListeners foreach { _ notify newMembers }
|
2012-02-18 22:14:53 +01:00
|
|
|
}
|
|
|
|
|
}
|
2012-02-08 14:14:01 +01:00
|
|
|
}
|
2012-01-24 12:09:32 +01:00
|
|
|
|
2012-02-28 17:04:48 +01:00
|
|
|
/**
|
|
|
|
|
* State transition to UP.
|
|
|
|
|
*/
|
2012-03-09 12:56:56 +01:00
|
|
|
private[cluster] final def up(address: Address) {
|
2012-03-22 23:04:04 +01:00
|
|
|
log.info("Cluster Node [{}] - Marking node [{}] as UP", remoteAddress, address)
|
2012-03-09 12:56:56 +01:00
|
|
|
}
|
2012-02-28 17:04:48 +01:00
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* State transition to LEAVING.
|
|
|
|
|
*/
|
2012-03-09 12:56:56 +01:00
|
|
|
private[cluster] final def leaving(address: Address) {
|
2012-03-22 23:04:04 +01:00
|
|
|
log.info("Cluster Node [{}] - Marking node [{}] as LEAVING", remoteAddress, address)
|
2012-03-09 12:56:56 +01:00
|
|
|
}
|
2012-02-28 17:04:48 +01:00
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* State transition to EXITING.
|
|
|
|
|
*/
|
2012-03-09 12:56:56 +01:00
|
|
|
private[cluster] final def exiting(address: Address) {
|
2012-03-22 23:04:04 +01:00
|
|
|
log.info("Cluster Node [{}] - Marking node [{}] as EXITING", remoteAddress, address)
|
2012-03-09 12:56:56 +01:00
|
|
|
}
|
2012-02-28 17:04:48 +01:00
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* State transition to REMOVED.
|
|
|
|
|
*/
|
2012-03-09 12:56:56 +01:00
|
|
|
private[cluster] final def removing(address: Address) {
|
2012-03-22 23:04:04 +01:00
|
|
|
log.info("Cluster Node [{}] - Marking node [{}] as REMOVED", remoteAddress, address)
|
2012-03-09 12:56:56 +01:00
|
|
|
}
|
2012-02-28 17:04:48 +01:00
|
|
|
|
|
|
|
|
/**
|
2012-03-09 12:56:56 +01:00
|
|
|
* The node to DOWN is removed from the 'members' set and put in the 'unreachable' set (if not alread there)
|
|
|
|
|
* and its status is set to DOWN. The node is alo removed from the 'seen' table.
|
|
|
|
|
*
|
|
|
|
|
* The node will reside as DOWN in the 'unreachable' set until an explicit command JOIN command is sent directly
|
|
|
|
|
* to this node and it will then go through the normal JOINING procedure.
|
2012-02-28 17:04:48 +01:00
|
|
|
*/
|
2012-03-09 12:56:56 +01:00
|
|
|
@tailrec
|
|
|
|
|
final private[cluster] def downing(address: Address) {
|
|
|
|
|
val localState = state.get
|
|
|
|
|
val localGossip = localState.latestGossip
|
|
|
|
|
val localMembers = localGossip.members
|
|
|
|
|
val localOverview = localGossip.overview
|
|
|
|
|
val localSeen = localOverview.seen
|
|
|
|
|
val localUnreachableMembers = localOverview.unreachable
|
|
|
|
|
|
|
|
|
|
// 1. check if the node to DOWN is in the 'members' set
|
|
|
|
|
var downedMember: Option[Member] = None
|
|
|
|
|
val newMembers =
|
|
|
|
|
localMembers
|
|
|
|
|
.map { member ⇒
|
|
|
|
|
if (member.address == address) {
|
2012-03-22 23:04:04 +01:00
|
|
|
log.info("Cluster Node [{}] - Marking node [{}] as DOWN", remoteAddress, member.address)
|
2012-03-09 12:56:56 +01:00
|
|
|
val newMember = member copy (status = MemberStatus.Down)
|
|
|
|
|
downedMember = Some(newMember)
|
|
|
|
|
newMember
|
|
|
|
|
} else member
|
|
|
|
|
}
|
|
|
|
|
.filter(_.status != MemberStatus.Down)
|
|
|
|
|
|
|
|
|
|
// 2. check if the node to DOWN is in the 'unreachable' set
|
|
|
|
|
val newUnreachableMembers =
|
|
|
|
|
localUnreachableMembers
|
|
|
|
|
.filter(_.status != MemberStatus.Down) // no need to DOWN members already DOWN
|
|
|
|
|
.map { member ⇒
|
|
|
|
|
if (member.address == address) {
|
2012-03-22 23:04:04 +01:00
|
|
|
log.info("Cluster Node [{}] - Marking unreachable node [{}] as DOWN", remoteAddress, member.address)
|
2012-03-09 12:56:56 +01:00
|
|
|
member copy (status = MemberStatus.Down)
|
|
|
|
|
} else member
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// 3. add the newly DOWNED members from the 'members' (in step 1.) to the 'newUnreachableMembers' set.
|
|
|
|
|
val newUnreachablePlusNewlyDownedMembers = downedMember match {
|
|
|
|
|
case Some(member) ⇒ newUnreachableMembers + member
|
|
|
|
|
case None ⇒ newUnreachableMembers
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// 4. remove nodes marked as DOWN from the 'seen' table
|
|
|
|
|
val newSeen = newUnreachablePlusNewlyDownedMembers.foldLeft(localSeen) { (currentSeen, member) ⇒
|
|
|
|
|
currentSeen - member.address
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
val newOverview = localOverview copy (seen = newSeen, unreachable = newUnreachablePlusNewlyDownedMembers) // update gossip overview
|
|
|
|
|
val newGossip = localGossip copy (overview = newOverview, members = newMembers) // update gossip
|
|
|
|
|
val versionedGossip = newGossip + vclockNode
|
|
|
|
|
val newState = localState copy (latestGossip = versionedGossip seen remoteAddress)
|
|
|
|
|
|
|
|
|
|
if (!state.compareAndSet(localState, newState)) downing(address) // recur if we fail the update
|
|
|
|
|
else {
|
|
|
|
|
if (convergence(newState.latestGossip).isDefined) {
|
|
|
|
|
newState.memberMembershipChangeListeners foreach { _ notify newState.latestGossip.members }
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
2012-02-28 17:04:48 +01:00
|
|
|
|
2011-10-26 08:48:16 +02:00
|
|
|
/**
|
2012-02-08 14:14:01 +01:00
|
|
|
* Receive new gossip.
|
2011-10-26 08:48:16 +02:00
|
|
|
*/
|
2012-02-09 15:59:10 +01:00
|
|
|
@tailrec
|
2012-03-09 12:56:56 +01:00
|
|
|
final private[cluster] def receive(sender: Member, remoteGossip: Gossip) {
|
2012-02-14 20:50:12 +01:00
|
|
|
val localState = state.get
|
|
|
|
|
val localGossip = localState.latestGossip
|
2012-02-09 15:59:10 +01:00
|
|
|
|
2012-02-14 20:50:12 +01:00
|
|
|
val winningGossip =
|
|
|
|
|
if (remoteGossip.version <> localGossip.version) {
|
|
|
|
|
// concurrent
|
2012-03-09 12:56:56 +01:00
|
|
|
val mergedGossip = remoteGossip merge localGossip
|
2012-02-20 15:45:50 +01:00
|
|
|
val versionedMergedGossip = mergedGossip + vclockNode
|
2012-02-09 15:59:10 +01:00
|
|
|
|
2012-02-18 22:14:53 +01:00
|
|
|
log.debug(
|
|
|
|
|
"Can't establish a causal relationship between \"remote\" gossip [{}] and \"local\" gossip [{}] - merging them into [{}]",
|
2012-02-14 20:50:12 +01:00
|
|
|
remoteGossip, localGossip, versionedMergedGossip)
|
|
|
|
|
|
|
|
|
|
versionedMergedGossip
|
|
|
|
|
|
|
|
|
|
} else if (remoteGossip.version < localGossip.version) {
|
|
|
|
|
// local gossip is newer
|
|
|
|
|
localGossip
|
|
|
|
|
|
|
|
|
|
} else {
|
|
|
|
|
// remote gossip is newer
|
|
|
|
|
remoteGossip
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
val newState = localState copy (latestGossip = winningGossip seen remoteAddress)
|
2012-02-09 15:59:10 +01:00
|
|
|
|
|
|
|
|
// if we won the race then update else try again
|
2012-02-14 20:50:12 +01:00
|
|
|
if (!state.compareAndSet(localState, newState)) receive(sender, remoteGossip) // recur if we fail the update
|
2012-02-18 22:14:53 +01:00
|
|
|
else {
|
2012-03-22 23:04:04 +01:00
|
|
|
log.debug("Cluster Node [{}] - Receiving gossip from [{}]", remoteAddress, sender.address)
|
2012-03-02 09:55:54 +01:00
|
|
|
|
|
|
|
|
failureDetector heartbeat sender.address // update heartbeat in failure detector
|
|
|
|
|
|
2012-02-18 22:14:53 +01:00
|
|
|
if (convergence(newState.latestGossip).isDefined) {
|
2012-03-02 09:55:54 +01:00
|
|
|
newState.memberMembershipChangeListeners foreach { _ notify newState.latestGossip.members }
|
2012-02-18 22:14:53 +01:00
|
|
|
}
|
|
|
|
|
}
|
2011-10-26 08:48:16 +02:00
|
|
|
}
|
|
|
|
|
|
2012-01-24 12:09:32 +01:00
|
|
|
/**
|
2012-02-07 16:53:49 +01:00
|
|
|
* Joins the pre-configured contact point and retrieves current gossip state.
|
2012-01-24 12:09:32 +01:00
|
|
|
*/
|
2012-02-28 17:04:48 +01:00
|
|
|
private def autoJoin() = nodeToJoin foreach { address ⇒
|
2012-02-20 15:26:12 +01:00
|
|
|
val connection = clusterCommandConnectionFor(address)
|
2012-02-29 10:02:00 +01:00
|
|
|
val command = ClusterAction.Join(remoteAddress)
|
2012-03-22 23:04:04 +01:00
|
|
|
log.info("Cluster Node [{}] - Sending [{}] to [{}] through connection [{}]", remoteAddress, command, address, connection)
|
2012-02-20 15:26:12 +01:00
|
|
|
connection ! command
|
2012-01-24 12:09:32 +01:00
|
|
|
}
|
|
|
|
|
|
2012-02-09 15:59:10 +01:00
|
|
|
/**
|
2012-02-28 17:04:48 +01:00
|
|
|
* Switches the member status.
|
|
|
|
|
*
|
|
|
|
|
* @param newStatus the new member status
|
|
|
|
|
* @param oldState the state to change the member status in
|
|
|
|
|
* @return the updated new state with the new member status
|
2012-02-09 15:59:10 +01:00
|
|
|
*/
|
2012-02-28 17:04:48 +01:00
|
|
|
private def switchMemberStatusTo(newStatus: MemberStatus, state: State): State = {
|
2012-03-22 23:04:04 +01:00
|
|
|
log.info("Cluster Node [{}] - Switching membership status to [{}]", remoteAddress, newStatus)
|
2012-02-08 15:11:06 +01:00
|
|
|
|
2012-03-09 12:56:56 +01:00
|
|
|
val localSelf = self
|
2012-02-08 15:11:06 +01:00
|
|
|
|
2012-02-28 17:04:48 +01:00
|
|
|
val localGossip = state.latestGossip
|
2012-02-14 20:50:12 +01:00
|
|
|
val localMembers = localGossip.members
|
2012-02-08 15:11:06 +01:00
|
|
|
|
2012-03-09 12:56:56 +01:00
|
|
|
// change my state into a "new" self
|
2012-02-14 20:50:12 +01:00
|
|
|
val newSelf = localSelf copy (status = newStatus)
|
2012-03-09 12:56:56 +01:00
|
|
|
|
|
|
|
|
// change my state in 'gossip.members'
|
2012-02-14 20:50:12 +01:00
|
|
|
val newMembersSet = localMembers map { member ⇒
|
2012-02-08 15:11:06 +01:00
|
|
|
if (member.address == remoteAddress) newSelf
|
|
|
|
|
else member
|
|
|
|
|
}
|
2012-02-14 20:50:12 +01:00
|
|
|
|
2012-02-08 15:11:06 +01:00
|
|
|
// ugly crap to work around bug in scala colletions ('val ss: SortedSet[Member] = SortedSet.empty[Member] ++ aSet' does not compile)
|
2012-02-08 16:15:31 +01:00
|
|
|
val newMembersSortedSet = SortedSet[Member](newMembersSet.toList: _*)
|
2012-02-14 20:50:12 +01:00
|
|
|
val newGossip = localGossip copy (members = newMembersSortedSet)
|
|
|
|
|
|
2012-03-09 12:56:56 +01:00
|
|
|
// version my changes
|
2012-02-20 15:45:50 +01:00
|
|
|
val versionedGossip = newGossip + vclockNode
|
2012-02-14 20:50:12 +01:00
|
|
|
val seenVersionedGossip = versionedGossip seen remoteAddress
|
2012-02-08 15:11:06 +01:00
|
|
|
|
2012-03-09 12:56:56 +01:00
|
|
|
state copy (latestGossip = seenVersionedGossip)
|
2012-02-08 15:11:06 +01:00
|
|
|
}
|
|
|
|
|
|
2011-10-26 08:48:16 +02:00
|
|
|
/**
|
2012-02-09 15:59:10 +01:00
|
|
|
* Gossips latest gossip to an address.
|
2012-02-08 14:14:01 +01:00
|
|
|
*/
|
2012-02-09 15:59:10 +01:00
|
|
|
private def gossipTo(address: Address) {
|
2012-02-20 15:26:12 +01:00
|
|
|
val connection = clusterGossipConnectionFor(address)
|
2012-03-22 23:04:04 +01:00
|
|
|
log.debug("Cluster Node [{}] - Gossiping to [{}]", remoteAddress, connection)
|
2012-02-20 15:26:12 +01:00
|
|
|
connection ! GossipEnvelope(self, latestGossip)
|
2012-02-08 14:14:01 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Gossips latest gossip to a random member in the set of members passed in as argument.
|
2012-01-24 12:09:32 +01:00
|
|
|
*
|
2012-02-07 16:53:49 +01:00
|
|
|
* @return 'true' if it gossiped to a "deputy" member.
|
2011-10-26 08:48:16 +02:00
|
|
|
*/
|
2012-03-02 09:55:54 +01:00
|
|
|
private def gossipToRandomNodeOf(addresses: Iterable[Address]): Boolean = {
|
2012-03-09 12:56:56 +01:00
|
|
|
if (addresses.isEmpty) false
|
|
|
|
|
else {
|
|
|
|
|
val peers = addresses filter (_ != remoteAddress) // filter out myself
|
|
|
|
|
val peer = selectRandomNode(peers)
|
|
|
|
|
gossipTo(peer)
|
|
|
|
|
deputyNodes exists (peer == _)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Initates a new round of gossip.
|
|
|
|
|
*/
|
|
|
|
|
private def gossip() {
|
|
|
|
|
val localState = state.get
|
|
|
|
|
val localGossip = localState.latestGossip
|
|
|
|
|
val localMembers = localGossip.members
|
|
|
|
|
|
|
|
|
|
if (!isSingletonCluster(localState) && isAvailable(localState)) {
|
|
|
|
|
// only gossip if we are a non-singleton cluster and available
|
|
|
|
|
|
2012-03-22 23:04:04 +01:00
|
|
|
log.debug("Cluster Node [{}] - Initiating new round of gossip", remoteAddress)
|
2012-03-09 12:56:56 +01:00
|
|
|
|
|
|
|
|
val localGossip = localState.latestGossip
|
|
|
|
|
val localMembers = localGossip.members
|
|
|
|
|
val localMembersSize = localMembers.size
|
|
|
|
|
|
|
|
|
|
val localUnreachableMembers = localGossip.overview.unreachable
|
|
|
|
|
val localUnreachableSize = localUnreachableMembers.size
|
|
|
|
|
|
|
|
|
|
// 1. gossip to alive members
|
|
|
|
|
val gossipedToDeputy = gossipToRandomNodeOf(localMembers map { _.address })
|
|
|
|
|
|
|
|
|
|
// 2. gossip to unreachable members
|
|
|
|
|
if (localUnreachableSize > 0) {
|
|
|
|
|
val probability: Double = localUnreachableSize / (localMembersSize + 1)
|
|
|
|
|
if (random.nextDouble() < probability) gossipToRandomNodeOf(localUnreachableMembers.map(_.address))
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// 3. gossip to a deputy nodes for facilitating partition healing
|
|
|
|
|
val deputies = deputyNodes
|
|
|
|
|
if ((!gossipedToDeputy || localMembersSize < 1) && !deputies.isEmpty) {
|
|
|
|
|
if (localMembersSize == 0) gossipToRandomNodeOf(deputies)
|
|
|
|
|
else {
|
|
|
|
|
val probability = 1.0 / localMembersSize + localUnreachableSize
|
|
|
|
|
if (random.nextDouble() <= probability) gossipToRandomNodeOf(deputies)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
2011-10-26 08:48:16 +02:00
|
|
|
}
|
|
|
|
|
|
2012-02-07 16:53:49 +01:00
|
|
|
/**
|
2012-03-09 12:56:56 +01:00
|
|
|
* Reaps the unreachable members (moves them to the 'unreachable' list in the cluster overview) according to the failure detector's verdict.
|
2011-10-26 08:48:16 +02:00
|
|
|
*/
|
|
|
|
|
@tailrec
|
2012-03-09 12:56:56 +01:00
|
|
|
final private def reapUnreachableMembers() {
|
2012-02-14 20:50:12 +01:00
|
|
|
val localState = state.get
|
|
|
|
|
|
2012-03-09 12:56:56 +01:00
|
|
|
if (!isSingletonCluster(localState) && isAvailable(localState)) {
|
|
|
|
|
// only scrutinize if we are a non-singleton cluster and available
|
2012-02-14 20:50:12 +01:00
|
|
|
|
|
|
|
|
val localGossip = localState.latestGossip
|
|
|
|
|
val localOverview = localGossip.overview
|
2012-02-20 17:22:07 +01:00
|
|
|
val localSeen = localOverview.seen
|
2012-02-14 20:50:12 +01:00
|
|
|
val localMembers = localGossip.members
|
2012-03-09 12:56:56 +01:00
|
|
|
val localUnreachableMembers = localGossip.overview.unreachable
|
2012-02-14 20:50:12 +01:00
|
|
|
|
|
|
|
|
val newlyDetectedUnreachableMembers = localMembers filterNot { member ⇒ failureDetector.isAvailable(member.address) }
|
2012-02-09 15:59:10 +01:00
|
|
|
|
2012-03-09 12:56:56 +01:00
|
|
|
if (!newlyDetectedUnreachableMembers.isEmpty) { // we have newly detected members marked as unavailable
|
2012-02-08 16:15:31 +01:00
|
|
|
|
2012-02-14 20:50:12 +01:00
|
|
|
val newMembers = localMembers diff newlyDetectedUnreachableMembers
|
2012-03-09 12:56:56 +01:00
|
|
|
val newUnreachableMembers: Set[Member] = localUnreachableMembers ++ newlyDetectedUnreachableMembers
|
2012-02-20 17:22:07 +01:00
|
|
|
|
2012-03-09 12:56:56 +01:00
|
|
|
val newOverview = localOverview copy (unreachable = newUnreachableMembers)
|
2012-02-14 20:50:12 +01:00
|
|
|
val newGossip = localGossip copy (overview = newOverview, members = newMembers)
|
|
|
|
|
|
2012-03-09 12:56:56 +01:00
|
|
|
// updating vclock and 'seen' table
|
2012-02-20 15:45:50 +01:00
|
|
|
val versionedGossip = newGossip + vclockNode
|
2012-02-14 20:50:12 +01:00
|
|
|
val seenVersionedGossip = versionedGossip seen remoteAddress
|
|
|
|
|
|
|
|
|
|
val newState = localState copy (latestGossip = seenVersionedGossip)
|
2012-02-08 16:15:31 +01:00
|
|
|
|
|
|
|
|
// if we won the race then update else try again
|
2012-03-09 12:56:56 +01:00
|
|
|
if (!state.compareAndSet(localState, newState)) reapUnreachableMembers() // recur
|
2012-02-08 16:15:31 +01:00
|
|
|
else {
|
2012-03-22 23:04:04 +01:00
|
|
|
log.info("Cluster Node [{}] - Marking node(s) as UNREACHABLE [{}]", remoteAddress, newlyDetectedUnreachableMembers.mkString(", "))
|
2012-03-02 09:55:54 +01:00
|
|
|
|
2012-02-18 22:14:53 +01:00
|
|
|
if (convergence(newState.latestGossip).isDefined) {
|
2012-03-02 09:55:54 +01:00
|
|
|
newState.memberMembershipChangeListeners foreach { _ notify newMembers }
|
2012-02-18 22:14:53 +01:00
|
|
|
}
|
2012-02-08 16:15:31 +01:00
|
|
|
}
|
2011-10-26 08:48:16 +02:00
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2012-02-18 22:14:53 +01:00
|
|
|
/**
|
2012-03-09 12:56:56 +01:00
|
|
|
* Runs periodic leader actions, such as auto-downing unreachable nodes, assigning partitions etc.
|
|
|
|
|
*/
|
|
|
|
|
@tailrec
|
|
|
|
|
final private def leaderActions() {
|
|
|
|
|
val localState = state.get
|
|
|
|
|
val localGossip = localState.latestGossip
|
|
|
|
|
val localMembers = localGossip.members
|
|
|
|
|
|
|
|
|
|
val isLeader = !localMembers.isEmpty && (remoteAddress == localMembers.head.address)
|
|
|
|
|
|
|
|
|
|
if (isLeader && isAvailable(localState)) {
|
|
|
|
|
// only run the leader actions if we are the LEADER and available
|
|
|
|
|
|
|
|
|
|
val localOverview = localGossip.overview
|
|
|
|
|
val localSeen = localOverview.seen
|
|
|
|
|
val localUnreachableMembers = localGossip.overview.unreachable
|
|
|
|
|
|
|
|
|
|
// Leader actions are as follows:
|
|
|
|
|
// 1. Move JOINING => UP
|
|
|
|
|
// 2. Move EXITING => REMOVED
|
|
|
|
|
// 3. Move UNREACHABLE => DOWN (auto-downing by leader)
|
|
|
|
|
// 4. Updating the vclock version for the changes
|
|
|
|
|
// 5. Updating the 'seen' table
|
|
|
|
|
|
|
|
|
|
var hasChangedState = false
|
|
|
|
|
val newGossip =
|
|
|
|
|
|
|
|
|
|
if (convergence(localGossip).isDefined) {
|
|
|
|
|
// we have convergence - so we can't have unreachable nodes
|
|
|
|
|
|
|
|
|
|
val newMembers =
|
|
|
|
|
localMembers map { member ⇒
|
|
|
|
|
// 1. Move JOINING => UP
|
|
|
|
|
if (member.status == MemberStatus.Joining) {
|
2012-03-22 23:04:04 +01:00
|
|
|
log.info("Cluster Node [{}] - Leader is moving node [{}] from JOINING to UP", remoteAddress, member.address)
|
2012-03-09 12:56:56 +01:00
|
|
|
hasChangedState = true
|
|
|
|
|
member copy (status = MemberStatus.Up)
|
|
|
|
|
} else member
|
|
|
|
|
} map { member ⇒
|
|
|
|
|
// 2. Move EXITING => REMOVED
|
|
|
|
|
if (member.status == MemberStatus.Exiting) {
|
2012-03-22 23:04:04 +01:00
|
|
|
log.info("Cluster Node [{}] - Leader is moving node [{}] from EXITING to REMOVED", remoteAddress, member.address)
|
2012-03-09 12:56:56 +01:00
|
|
|
hasChangedState = true
|
|
|
|
|
member copy (status = MemberStatus.Removed)
|
|
|
|
|
} else member
|
|
|
|
|
}
|
|
|
|
|
localGossip copy (members = newMembers) // update gossip
|
|
|
|
|
|
|
|
|
|
} else if (autoDown) {
|
|
|
|
|
// we don't have convergence - so we might have unreachable nodes
|
|
|
|
|
// if 'auto-down' is turned on, then try to auto-down any unreachable nodes
|
|
|
|
|
|
|
|
|
|
// 3. Move UNREACHABLE => DOWN (auto-downing by leader)
|
|
|
|
|
val newUnreachableMembers =
|
|
|
|
|
localUnreachableMembers
|
|
|
|
|
.filter(_.status != MemberStatus.Down) // no need to DOWN members already DOWN
|
|
|
|
|
.map { member ⇒
|
2012-03-22 23:04:04 +01:00
|
|
|
log.info("Cluster Node [{}] - Leader is marking unreachable node [{}] as DOWN", remoteAddress, member.address)
|
2012-03-09 12:56:56 +01:00
|
|
|
hasChangedState = true
|
|
|
|
|
member copy (status = MemberStatus.Down)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// removing nodes marked as DOWN from the 'seen' table
|
|
|
|
|
val newSeen = localUnreachableMembers.foldLeft(localSeen)((currentSeen, member) ⇒ currentSeen - member.address)
|
|
|
|
|
|
|
|
|
|
val newOverview = localOverview copy (seen = newSeen, unreachable = newUnreachableMembers) // update gossip overview
|
|
|
|
|
localGossip copy (overview = newOverview) // update gossip
|
|
|
|
|
|
|
|
|
|
} else localGossip
|
|
|
|
|
|
|
|
|
|
if (hasChangedState) { // we have a change of state - version it and try to update
|
|
|
|
|
|
|
|
|
|
// 4. Updating the vclock version for the changes
|
|
|
|
|
val versionedGossip = newGossip + vclockNode
|
|
|
|
|
|
|
|
|
|
// 5. Updating the 'seen' table
|
|
|
|
|
val seenVersionedGossip = versionedGossip seen remoteAddress
|
|
|
|
|
|
|
|
|
|
val newState = localState copy (latestGossip = seenVersionedGossip)
|
|
|
|
|
|
|
|
|
|
// if we won the race then update else try again
|
|
|
|
|
if (!state.compareAndSet(localState, newState)) leaderActions() // recur
|
|
|
|
|
else {
|
|
|
|
|
if (convergence(newState.latestGossip).isDefined) {
|
|
|
|
|
newState.memberMembershipChangeListeners map { _ notify newGossip.members }
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Checks if we have a cluster convergence. If there are any unreachable nodes then we can't have a convergence -
|
|
|
|
|
* waiting for user to act (issuing DOWN) or leader to act (issuing DOWN through auto-down).
|
2012-02-18 22:14:53 +01:00
|
|
|
*
|
|
|
|
|
* @returns Some(convergedGossip) if convergence have been reached and None if not
|
|
|
|
|
*/
|
|
|
|
|
private def convergence(gossip: Gossip): Option[Gossip] = {
|
2012-02-29 10:02:00 +01:00
|
|
|
val overview = gossip.overview
|
2012-03-09 12:56:56 +01:00
|
|
|
val unreachable = overview.unreachable
|
|
|
|
|
|
|
|
|
|
// First check that:
|
|
|
|
|
// 1. we don't have any members that are unreachable (unreachable.isEmpty == true), or
|
|
|
|
|
// 2. all unreachable members in the set have status DOWN
|
|
|
|
|
// Else we can't continue to check for convergence
|
|
|
|
|
// When that is done we check that all the entries in the 'seen' table have the same vector clock version
|
2012-03-15 10:03:40 +01:00
|
|
|
if (unreachable.isEmpty || !unreachable.exists(m ⇒ (m.status != MemberStatus.Down) && (m.status != MemberStatus.Removed))) {
|
2012-03-09 12:56:56 +01:00
|
|
|
val seen = gossip.overview.seen
|
|
|
|
|
val views = Set.empty[VectorClock] ++ seen.values
|
|
|
|
|
|
|
|
|
|
if (views.size == 1) {
|
2012-03-22 23:04:04 +01:00
|
|
|
log.debug("Cluster Node [{}] - Cluster convergence reached", remoteAddress)
|
2012-03-09 12:56:56 +01:00
|
|
|
Some(gossip)
|
|
|
|
|
} else None
|
2012-02-18 22:14:53 +01:00
|
|
|
} else None
|
2012-03-09 12:56:56 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private def isAvailable(state: State): Boolean = !isUnavailable(state)
|
|
|
|
|
|
|
|
|
|
private def isUnavailable(state: State): Boolean = {
|
|
|
|
|
val localGossip = state.latestGossip
|
|
|
|
|
val localOverview = localGossip.overview
|
|
|
|
|
val localMembers = localGossip.members
|
|
|
|
|
val localUnreachableMembers = localOverview.unreachable
|
|
|
|
|
val isUnreachable = localUnreachableMembers exists { _.address == remoteAddress }
|
|
|
|
|
val hasUnavailableMemberStatus = localMembers exists { m ⇒ (m == self) && MemberStatus.isUnavailable(m.status) }
|
|
|
|
|
isUnreachable || hasUnavailableMemberStatus
|
2012-02-18 22:14:53 +01:00
|
|
|
}
|
|
|
|
|
|
2012-02-07 16:53:49 +01:00
|
|
|
/**
|
2012-03-09 12:56:56 +01:00
|
|
|
* Looks up and returns the local cluster command connection.
|
2012-02-07 16:53:49 +01:00
|
|
|
*/
|
2012-03-09 12:56:56 +01:00
|
|
|
private def clusterCommandDaemon = system.actorFor(RootActorPath(remoteAddress) / "system" / "cluster" / "commands")
|
2012-02-07 16:53:49 +01:00
|
|
|
|
2012-03-03 23:55:48 +01:00
|
|
|
/**
|
2012-03-09 12:56:56 +01:00
|
|
|
* Looks up and returns the remote cluster command connection for the specific address.
|
2012-03-03 23:55:48 +01:00
|
|
|
*/
|
2012-03-09 12:56:56 +01:00
|
|
|
private def clusterCommandConnectionFor(address: Address): ActorRef = system.actorFor(RootActorPath(address) / "system" / "cluster" / "commands")
|
2012-03-03 23:55:48 +01:00
|
|
|
|
2012-02-07 16:53:49 +01:00
|
|
|
/**
|
2012-03-09 12:56:56 +01:00
|
|
|
* Looks up and returns the remote cluster gossip connection for the specific address.
|
2012-02-07 16:53:49 +01:00
|
|
|
*/
|
2012-03-02 16:20:30 +01:00
|
|
|
private def clusterGossipConnectionFor(address: Address): ActorRef = system.actorFor(RootActorPath(address) / "system" / "cluster" / "gossip")
|
2012-01-24 12:09:32 +01:00
|
|
|
|
2012-03-09 12:56:56 +01:00
|
|
|
/**
|
|
|
|
|
* Gets an Iterable with the addresses of a all the 'deputy' nodes - excluding this node if part of the group.
|
|
|
|
|
*/
|
2012-03-02 09:55:54 +01:00
|
|
|
private def deputyNodes: Iterable[Address] = state.get.latestGossip.members.toIterable map (_.address) drop 1 take nrOfDeputyNodes filter (_ != remoteAddress)
|
2012-01-24 12:09:32 +01:00
|
|
|
|
2012-03-02 09:55:54 +01:00
|
|
|
private def selectRandomNode(addresses: Iterable[Address]): Address = addresses.toSeq(random nextInt addresses.size)
|
2012-02-14 20:50:12 +01:00
|
|
|
|
|
|
|
|
private def isSingletonCluster(currentState: State): Boolean = currentState.latestGossip.members.size == 1
|
2012-04-14 20:06:03 +02:00
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Creates the cluster JMX MBean and registers it in the MBean server.
|
|
|
|
|
*/
|
|
|
|
|
private def createMBean() = {
|
|
|
|
|
val mbean = new StandardMBean(classOf[ClusterNodeMBean]) with ClusterNodeMBean {
|
|
|
|
|
|
2012-04-16 11:23:03 +02:00
|
|
|
// JMX attributes (bean-style)
|
2012-04-16 16:58:19 +02:00
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
* Sends a string to the JMX client that will list all nodes in the node ring as follows:
|
|
|
|
|
* {{{
|
|
|
|
|
* Members:
|
|
|
|
|
* Member(address = akka://system0@localhost:5550, status = Up)
|
|
|
|
|
* Member(address = akka://system1@localhost:5551, status = Up)
|
|
|
|
|
* Unreachable:
|
|
|
|
|
* Member(address = akka://system2@localhost:5553, status = Down)
|
|
|
|
|
* }}}
|
|
|
|
|
*/
|
|
|
|
|
def getClusterStatus: String = {
|
|
|
|
|
val gossip = clusterNode.latestGossip
|
|
|
|
|
val unreachable = gossip.overview.unreachable
|
|
|
|
|
val metaData = gossip.meta
|
|
|
|
|
"\nMembers:\n\t" + gossip.members.mkString("\n\t") +
|
|
|
|
|
{ if (!unreachable.isEmpty) "\nUnreachable:\n\t" + unreachable.mkString("\n\t") else "" } +
|
|
|
|
|
{ if (!metaData.isEmpty) "\nMeta Data:\t" + metaData.toString else "" }
|
|
|
|
|
}
|
|
|
|
|
|
2012-04-16 11:23:03 +02:00
|
|
|
def getMemberStatus: String = clusterNode.status.toString
|
2012-04-16 16:58:19 +02:00
|
|
|
|
2012-04-16 11:23:03 +02:00
|
|
|
def getLeader: String = clusterNode.leader.toString
|
2012-04-14 20:06:03 +02:00
|
|
|
|
|
|
|
|
def isSingleton: Boolean = clusterNode.isSingletonCluster
|
2012-04-16 16:58:19 +02:00
|
|
|
|
2012-04-14 20:06:03 +02:00
|
|
|
def isConvergence: Boolean = clusterNode.convergence.isDefined
|
2012-04-16 16:58:19 +02:00
|
|
|
|
2012-04-14 20:06:03 +02:00
|
|
|
def isAvailable: Boolean = clusterNode.isAvailable
|
|
|
|
|
|
2012-04-16 11:23:03 +02:00
|
|
|
// JMX commands
|
2012-04-16 16:58:19 +02:00
|
|
|
|
2012-04-14 20:06:03 +02:00
|
|
|
def join(address: String) = clusterNode.join(AddressFromURIString(address))
|
2012-04-16 16:58:19 +02:00
|
|
|
|
2012-04-14 20:06:03 +02:00
|
|
|
def leave(address: String) = clusterNode.leave(AddressFromURIString(address))
|
2012-04-16 16:58:19 +02:00
|
|
|
|
2012-04-14 20:06:03 +02:00
|
|
|
def down(address: String) = clusterNode.down(AddressFromURIString(address))
|
2012-04-16 16:58:19 +02:00
|
|
|
|
2012-04-14 20:06:03 +02:00
|
|
|
def remove(address: String) = clusterNode.remove(AddressFromURIString(address))
|
2012-04-16 16:58:19 +02:00
|
|
|
|
2012-04-14 20:06:03 +02:00
|
|
|
def shutdown() = clusterNode.shutdown()
|
|
|
|
|
}
|
|
|
|
|
log.info("Cluster Node [{}] - registering cluster JMX MBean [{}]", remoteAddress, clusterMBeanName)
|
2012-04-16 16:58:19 +02:00
|
|
|
try {
|
|
|
|
|
mBeanServer.registerMBean(mbean, clusterMBeanName)
|
|
|
|
|
} catch {
|
|
|
|
|
case e: InstanceAlreadyExistsException ⇒ // ignore - we are running multiple cluster nodes in the same JVM (probably for testing)
|
|
|
|
|
}
|
2012-04-14 20:06:03 +02:00
|
|
|
}
|
2011-10-26 08:48:16 +02:00
|
|
|
}
|