Fixed misc issues after review.
Signed-off-by: Jonas Bonér <jonas@jonasboner.com>
This commit is contained in:
parent
06ec519c7c
commit
a3026b3316
11 changed files with 101 additions and 226 deletions
|
|
@ -107,18 +107,6 @@ object MemberStatus {
|
|||
case object Removed extends MemberStatus
|
||||
}
|
||||
|
||||
// sealed trait PartitioningStatus
|
||||
// object PartitioningStatus {
|
||||
// case object Complete extends PartitioningStatus
|
||||
// case object Awaiting extends PartitioningStatus
|
||||
// }
|
||||
|
||||
// case class PartitioningChange(
|
||||
// from: Address,
|
||||
// to: Address,
|
||||
// path: PartitionPath,
|
||||
// status: PartitioningStatus)
|
||||
|
||||
/**
|
||||
* Represents the overview of the cluster, holds the cluster convergence table and set with unreachable nodes.
|
||||
*/
|
||||
|
|
@ -138,8 +126,6 @@ case class GossipOverview(
|
|||
case class Gossip(
|
||||
overview: GossipOverview = GossipOverview(),
|
||||
members: SortedSet[Member], // sorted set of members with their status, sorted by name
|
||||
//partitions: Tree[PartitionPath, Node] = Tree.empty[PartitionPath, Node], // name/partition service
|
||||
//pending: Set[PartitioningChange] = Set.empty[PartitioningChange],
|
||||
meta: Map[String, Array[Byte]] = Map.empty[String, Array[Byte]],
|
||||
version: VectorClock = VectorClock()) // vector clock version
|
||||
extends ClusterMessage // is a serializable cluster message
|
||||
|
|
@ -159,8 +145,10 @@ case class Gossip(
|
|||
* Marks the gossip as seen by this node (remoteAddress) by updating the address entry in the 'gossip.overview.seen'
|
||||
* Map with the VectorClock for the new gossip.
|
||||
*/
|
||||
def seen(address: Address): Gossip =
|
||||
this copy (overview = overview copy (seen = overview.seen + (address -> version)))
|
||||
def seen(address: Address): Gossip = {
|
||||
if (overview.seen.contains(address) && overview.seen(address) == version) this
|
||||
else this copy (overview = overview copy (seen = overview.seen + (address -> version)))
|
||||
}
|
||||
|
||||
override def toString =
|
||||
"Gossip(" +
|
||||
|
|
@ -269,34 +257,26 @@ final class ClusterGossipDaemon(system: ActorSystem, node: Node) extends Actor {
|
|||
|
||||
def receive = {
|
||||
case GossipEnvelope(sender, gossip) ⇒ node.receive(sender, gossip)
|
||||
case unknown ⇒ log.error("Unknown message sent to cluster daemon [" + unknown + "]")
|
||||
}
|
||||
|
||||
override def unhandled(unknown: Any) = log.error("Unknown message sent to cluster daemon [" + unknown + "]")
|
||||
}
|
||||
|
||||
/**
|
||||
* Node Extension Id and factory for creating Node extension.
|
||||
* Example:
|
||||
* {{{
|
||||
* val node = NodeExtension(system)
|
||||
*
|
||||
* if (node.isLeader) { ... }
|
||||
* }}}
|
||||
*
|
||||
* Example:
|
||||
* {{{
|
||||
* import akka.cluster._
|
||||
*
|
||||
* val node = system.node // implicit conversion adds 'node' method
|
||||
* val node = Node(system)
|
||||
*
|
||||
* if (node.isLeader) { ... }
|
||||
* }}}
|
||||
*/
|
||||
object NodeExtension extends ExtensionId[Node] with ExtensionIdProvider {
|
||||
object Node extends ExtensionId[Node] with ExtensionIdProvider {
|
||||
override def get(system: ActorSystem): Node = super.get(system)
|
||||
|
||||
override def lookup = NodeExtension
|
||||
override def lookup = Node
|
||||
|
||||
override def createExtension(system: ExtendedActorSystem): Node = new Node(system.asInstanceOf[ActorSystemImpl]) // not nice but need API in ActorSystemImpl inside Node
|
||||
override def createExtension(system: ExtendedActorSystem): Node = new Node(system)
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -316,21 +296,12 @@ object NodeExtension extends ExtensionId[Node] with ExtensionIdProvider {
|
|||
*
|
||||
* Example:
|
||||
* {{{
|
||||
* val node = NodeExtension(system)
|
||||
*
|
||||
* if (node.isLeader) { ... }
|
||||
* }}}
|
||||
*
|
||||
* Example:
|
||||
* {{{
|
||||
* import akka.cluster._
|
||||
*
|
||||
* val node = system.node // implicit conversion adds 'node' method
|
||||
* val node = Node(system)
|
||||
*
|
||||
* if (node.isLeader) { ... }
|
||||
* }}}
|
||||
*/
|
||||
class Node(system: ActorSystemImpl) extends Extension {
|
||||
class Node(system: ExtendedActorSystem) extends Extension {
|
||||
|
||||
/**
|
||||
* Represents the state for this Node. Implemented using optimistic lockless concurrency,
|
||||
|
|
@ -372,10 +343,10 @@ class Node(system: ActorSystemImpl) extends Extension {
|
|||
private val log = Logging(system, "Node")
|
||||
private val random = SecureRandom.getInstance("SHA1PRNG")
|
||||
|
||||
private val clusterCommandDaemon = system.systemActorOf(
|
||||
private val clusterCommandDaemon = systemActorOf(
|
||||
Props(new ClusterCommandDaemon(system, this)), "clusterCommand")
|
||||
|
||||
private val clusterGossipDaemon = system.systemActorOf(
|
||||
private val clusterGossipDaemon = systemActorOf(
|
||||
Props(new ClusterGossipDaemon(system, this)).withRouter(RoundRobinRouter(nrOfGossipDaemons)), "clusterGossip")
|
||||
|
||||
private val state = {
|
||||
|
|
@ -439,21 +410,13 @@ class Node(system: ActorSystemImpl) extends Extension {
|
|||
* Shuts down all connections to other members, the cluster daemon and the periodic gossip and cleanup tasks.
|
||||
*/
|
||||
def shutdown() {
|
||||
|
||||
// FIXME Cheating for now. Can't just shut down. Node must first gossip an Leave command, wait for Leader to do proper Handoff and then await an Exit command before switching to Removed
|
||||
|
||||
if (isRunning.compareAndSet(true, false)) {
|
||||
log.info("Node [{}] - Shutting down Node and ClusterDaemon...", remoteAddress)
|
||||
|
||||
try system.stop(clusterCommandDaemon) finally {
|
||||
try system.stop(clusterGossipDaemon) finally {
|
||||
try gossipCanceller.cancel() finally {
|
||||
try scrutinizeCanceller.cancel() finally {
|
||||
log.info("Node [{}] - Node and ClusterDaemon shut down successfully", remoteAddress)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
gossipCanceller.cancel()
|
||||
scrutinizeCanceller.cancel()
|
||||
system.stop(clusterCommandDaemon)
|
||||
system.stop(clusterGossipDaemon)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -519,8 +482,6 @@ class Node(system: ActorSystemImpl) extends Extension {
|
|||
private[cluster] final def joining(node: Address) {
|
||||
log.info("Node [{}] - Node [{}] is joining", remoteAddress, node)
|
||||
|
||||
failureDetector heartbeat node // update heartbeat in failure detector
|
||||
|
||||
val localState = state.get
|
||||
val localGossip = localState.latestGossip
|
||||
val localMembers = localGossip.members
|
||||
|
|
@ -535,8 +496,9 @@ class Node(system: ActorSystemImpl) extends Extension {
|
|||
|
||||
if (!state.compareAndSet(localState, newState)) joining(node) // recur if we failed update
|
||||
else {
|
||||
failureDetector heartbeat node // update heartbeat in failure detector
|
||||
if (convergence(newState.latestGossip).isDefined) {
|
||||
newState.memberMembershipChangeListeners map { _ notify newMembers }
|
||||
newState.memberMembershipChangeListeners foreach { _ notify newMembers }
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -571,10 +533,6 @@ class Node(system: ActorSystemImpl) extends Extension {
|
|||
*/
|
||||
@tailrec
|
||||
private[cluster] final def receive(sender: Member, remoteGossip: Gossip) {
|
||||
log.debug("Node [{}] - Receiving gossip from [{}]", remoteAddress, sender.address)
|
||||
|
||||
failureDetector heartbeat sender.address // update heartbeat in failure detector
|
||||
|
||||
val localState = state.get
|
||||
val localGossip = localState.latestGossip
|
||||
|
||||
|
|
@ -604,8 +562,12 @@ class Node(system: ActorSystemImpl) extends Extension {
|
|||
// if we won the race then update else try again
|
||||
if (!state.compareAndSet(localState, newState)) receive(sender, remoteGossip) // recur if we fail the update
|
||||
else {
|
||||
log.debug("Node [{}] - Receiving gossip from [{}]", remoteAddress, sender.address)
|
||||
|
||||
failureDetector heartbeat sender.address // update heartbeat in failure detector
|
||||
|
||||
if (convergence(newState.latestGossip).isDefined) {
|
||||
newState.memberMembershipChangeListeners map { _ notify newState.latestGossip.members }
|
||||
newState.memberMembershipChangeListeners foreach { _ notify newState.latestGossip.members }
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -639,12 +601,12 @@ class Node(system: ActorSystemImpl) extends Extension {
|
|||
val localUnreachableSize = localUnreachableAddresses.size
|
||||
|
||||
// 1. gossip to alive members
|
||||
val gossipedToDeputy = gossipToRandomNodeOf(localMembers.toList map { _.address })
|
||||
val gossipedToDeputy = gossipToRandomNodeOf(localMembers map { _.address })
|
||||
|
||||
// 2. gossip to unreachable members
|
||||
if (localUnreachableSize > 0) {
|
||||
val probability: Double = localUnreachableSize / (localMembersSize + 1)
|
||||
if (random.nextDouble() < probability) gossipToRandomNodeOf(localUnreachableAddresses.toList)
|
||||
if (random.nextDouble() < probability) gossipToRandomNodeOf(localUnreachableAddresses)
|
||||
}
|
||||
|
||||
// 3. gossip to a deputy nodes for facilitating partition healing
|
||||
|
|
@ -714,8 +676,8 @@ class Node(system: ActorSystemImpl) extends Extension {
|
|||
*
|
||||
* @return 'true' if it gossiped to a "deputy" member.
|
||||
*/
|
||||
private def gossipToRandomNodeOf(addresses: Seq[Address]): Boolean = {
|
||||
val peers = addresses filter (_ != remoteAddress) // filter out myself
|
||||
private def gossipToRandomNodeOf(addresses: Iterable[Address]): Boolean = {
|
||||
val peers = addresses filterNot (_ == remoteAddress) // filter out myself
|
||||
val peer = selectRandomNode(peers)
|
||||
gossipTo(peer)
|
||||
deputyNodes exists (peer == _)
|
||||
|
|
@ -744,8 +706,6 @@ class Node(system: ActorSystemImpl) extends Extension {
|
|||
val newMembers = localMembers diff newlyDetectedUnreachableMembers
|
||||
val newUnreachableAddresses: Set[Address] = localUnreachableAddresses ++ newlyDetectedUnreachableAddresses
|
||||
|
||||
log.info("Node [{}] - Marking node(s) an unreachable [{}]", remoteAddress, newlyDetectedUnreachableAddresses.mkString(", "))
|
||||
|
||||
val newSeen = newUnreachableAddresses.foldLeft(localSeen)((currentSeen, address) ⇒ currentSeen - address)
|
||||
|
||||
val newOverview = localOverview copy (seen = newSeen, unreachable = newUnreachableAddresses)
|
||||
|
|
@ -759,8 +719,10 @@ class Node(system: ActorSystemImpl) extends Extension {
|
|||
// if we won the race then update else try again
|
||||
if (!state.compareAndSet(localState, newState)) scrutinize() // recur
|
||||
else {
|
||||
log.info("Node [{}] - Marking node(s) an unreachable [{}]", remoteAddress, newlyDetectedUnreachableAddresses.mkString(", "))
|
||||
|
||||
if (convergence(newState.latestGossip).isDefined) {
|
||||
newState.memberMembershipChangeListeners map { _ notify newMembers }
|
||||
newState.memberMembershipChangeListeners foreach { _ notify newMembers }
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -777,7 +739,7 @@ class Node(system: ActorSystemImpl) extends Extension {
|
|||
// if (overview.unreachable.isEmpty) { // if there are any unreachable nodes then we can't have a convergence -
|
||||
// waiting for user to act (issuing DOWN) or leader to act (issuing DOWN through auto-down)
|
||||
val seen = gossip.overview.seen
|
||||
val views = Set.empty[VectorClock] ++ seen.values
|
||||
val views = seen.values.toSet
|
||||
if (views.size == 1) {
|
||||
log.debug("Node [{}] - Cluster convergence reached", remoteAddress)
|
||||
Some(gossip)
|
||||
|
|
@ -785,6 +747,13 @@ class Node(system: ActorSystemImpl) extends Extension {
|
|||
// } else None
|
||||
}
|
||||
|
||||
private def systemActorOf(props: Props, name: String): ActorRef = {
|
||||
Await.result(system.systemGuardian ? CreateChild(props, name), system.settings.CreationTimeout.duration) match {
|
||||
case ref: ActorRef ⇒ ref
|
||||
case ex: Exception ⇒ throw ex
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets up cluster command connection.
|
||||
*/
|
||||
|
|
@ -795,9 +764,9 @@ class Node(system: ActorSystemImpl) extends Extension {
|
|||
*/
|
||||
private def clusterGossipConnectionFor(address: Address): ActorRef = system.actorFor(RootActorPath(address) / "system" / "clusterGossip")
|
||||
|
||||
private def deputyNodes: Seq[Address] = state.get.latestGossip.members.toSeq map (_.address) drop 1 take nrOfDeputyNodes filter (_ != remoteAddress)
|
||||
private def deputyNodes: Iterable[Address] = state.get.latestGossip.members.toIterable map (_.address) drop 1 take nrOfDeputyNodes filter (_ != remoteAddress)
|
||||
|
||||
private def selectRandomNode(addresses: Seq[Address]): Address = addresses(random nextInt addresses.size)
|
||||
private def selectRandomNode(addresses: Iterable[Address]): Address = addresses.toSeq(random nextInt addresses.size)
|
||||
|
||||
private def isSingletonCluster(currentState: State): Boolean = currentState.latestGossip.members.size == 1
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue