2011-10-26 08:48:16 +02:00
|
|
|
/**
|
2012-01-19 18:21:06 +01:00
|
|
|
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
|
2011-10-26 08:48:16 +02:00
|
|
|
*/
|
|
|
|
|
|
2012-01-31 13:33:04 +01:00
|
|
|
package akka.cluster
|
2011-10-26 08:48:16 +02:00
|
|
|
|
|
|
|
|
import akka.actor._
|
|
|
|
|
import akka.actor.Status._
|
2012-01-31 13:33:04 +01:00
|
|
|
import akka.remote._
|
2011-10-27 12:46:10 +02:00
|
|
|
import akka.event.Logging
|
2012-01-24 12:09:32 +01:00
|
|
|
import akka.dispatch.Await
|
2012-01-31 13:33:04 +01:00
|
|
|
import akka.pattern.ask
|
|
|
|
|
import akka.util._
|
2011-11-25 12:02:25 +01:00
|
|
|
import akka.config.ConfigurationException
|
|
|
|
|
|
2012-01-30 11:41:41 +01:00
|
|
|
import java.util.concurrent.atomic.{ AtomicReference, AtomicBoolean }
|
2012-01-24 12:09:32 +01:00
|
|
|
import java.util.concurrent.TimeUnit._
|
|
|
|
|
import java.util.concurrent.TimeoutException
|
2011-10-27 15:14:15 +02:00
|
|
|
import java.security.SecureRandom
|
2011-10-26 08:48:16 +02:00
|
|
|
import System.{ currentTimeMillis ⇒ newTimestamp }
|
2011-11-25 12:02:25 +01:00
|
|
|
|
2012-01-30 11:41:41 +01:00
|
|
|
import scala.collection.immutable.{ Map, SortedSet }
|
2011-10-26 08:48:16 +02:00
|
|
|
import scala.annotation.tailrec
|
2011-11-25 12:02:25 +01:00
|
|
|
|
2012-01-24 12:09:32 +01:00
|
|
|
import com.google.protobuf.ByteString
|
|
|
|
|
|
2011-10-26 08:48:16 +02:00
|
|
|
/**
|
2012-01-30 11:41:41 +01:00
|
|
|
* Interface for member membership change listener.
|
2011-10-26 08:48:16 +02:00
|
|
|
*/
|
|
|
|
|
trait NodeMembershipChangeListener {
|
2012-01-30 11:41:41 +01:00
|
|
|
def memberConnected(member: Member)
|
|
|
|
|
def memberDisconnected(member: Member)
|
2011-10-26 08:48:16 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
2012-01-30 11:41:41 +01:00
|
|
|
* Base trait for all cluster messages. All ClusterMessage's are serializable.
|
2011-10-26 08:48:16 +02:00
|
|
|
*/
|
2012-01-30 19:40:28 +01:00
|
|
|
sealed trait ClusterMessage extends Serializable
|
|
|
|
|
|
2012-01-30 11:41:41 +01:00
|
|
|
/**
|
|
|
|
|
* Command to join the cluster.
|
|
|
|
|
*/
|
|
|
|
|
case object JoinCluster extends ClusterMessage
|
2011-10-26 08:48:16 +02:00
|
|
|
|
2012-01-30 11:41:41 +01:00
|
|
|
/**
|
|
|
|
|
* Represents the state of the cluster; cluster ring membership, ring convergence, meta data - all versioned by a vector clock.
|
|
|
|
|
*/
|
|
|
|
|
case class Gossip(
|
|
|
|
|
version: VectorClock = VectorClock(),
|
|
|
|
|
member: Address,
|
|
|
|
|
// sorted set of members with their status, sorted by name
|
|
|
|
|
members: SortedSet[Member] = SortedSet.empty[Member](Ordering.fromLessThan[Member](_.address.toString > _.address.toString)),
|
|
|
|
|
unavailableMembers: Set[Member] = Set.empty[Member],
|
|
|
|
|
// for ring convergence
|
|
|
|
|
seen: Map[Member, VectorClock] = Map.empty[Member, VectorClock],
|
|
|
|
|
// for handoff
|
|
|
|
|
//pendingChanges: Option[Vector[PendingPartitioningChange]] = None,
|
|
|
|
|
meta: Option[Map[String, Array[Byte]]] = None)
|
|
|
|
|
extends ClusterMessage // is a serializable cluster message
|
|
|
|
|
with Versioned // has a vector clock as version
|
2011-10-26 08:48:16 +02:00
|
|
|
|
2012-01-30 11:41:41 +01:00
|
|
|
/**
|
|
|
|
|
* Represents the address and the current status of a cluster member node.
|
|
|
|
|
*/
|
|
|
|
|
case class Member(address: Address, status: MemberStatus) extends ClusterMessage
|
2011-10-26 08:48:16 +02:00
|
|
|
|
2012-01-24 12:09:32 +01:00
|
|
|
/**
|
2012-01-30 11:41:41 +01:00
|
|
|
* Defines the current status of a cluster member node
|
|
|
|
|
*
|
|
|
|
|
* Can be one of: Joining, Up, Leaving, Exiting and Down.
|
2012-01-24 12:09:32 +01:00
|
|
|
*/
|
2012-01-30 11:41:41 +01:00
|
|
|
sealed trait MemberStatus extends ClusterMessage with Versioned
|
|
|
|
|
object MemberStatus {
|
|
|
|
|
case class Joining(version: VectorClock = VectorClock()) extends MemberStatus
|
|
|
|
|
case class Up(version: VectorClock = VectorClock()) extends MemberStatus
|
|
|
|
|
case class Leaving(version: VectorClock = VectorClock()) extends MemberStatus
|
|
|
|
|
case class Exiting(version: VectorClock = VectorClock()) extends MemberStatus
|
|
|
|
|
case class Down(version: VectorClock = VectorClock()) extends MemberStatus
|
2012-01-24 12:09:32 +01:00
|
|
|
}
|
|
|
|
|
|
2012-01-30 11:41:41 +01:00
|
|
|
// sealed trait PendingPartitioningStatus
|
|
|
|
|
// object PendingPartitioningStatus {
|
|
|
|
|
// case object Complete extends PendingPartitioningStatus
|
|
|
|
|
// case object Awaiting extends PendingPartitioningStatus
|
|
|
|
|
// }
|
2012-01-24 12:09:32 +01:00
|
|
|
|
2012-01-30 11:41:41 +01:00
|
|
|
// case class PendingPartitioningChange(
|
|
|
|
|
// owner: Address,
|
|
|
|
|
// nextOwner: Address,
|
|
|
|
|
// changes: Vector[VNodeMod],
|
|
|
|
|
// status: PendingPartitioningStatus)
|
2012-01-24 12:09:32 +01:00
|
|
|
|
2012-01-30 11:41:41 +01:00
|
|
|
final class ClusterDaemon(system: ActorSystem, gossiper: Gossiper) extends Actor {
|
2012-01-24 12:09:32 +01:00
|
|
|
val log = Logging(system, "ClusterDaemon")
|
|
|
|
|
|
|
|
|
|
def receive = {
|
2012-01-30 11:41:41 +01:00
|
|
|
case JoinCluster ⇒ sender ! gossiper.latestGossip
|
|
|
|
|
case gossip: Gossip ⇒
|
|
|
|
|
gossiper.tell(gossip)
|
|
|
|
|
|
|
|
|
|
case unknown ⇒ log.error("Unknown message sent to cluster daemon [" + unknown + "]")
|
2012-01-24 12:09:32 +01:00
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2011-10-26 08:48:16 +02:00
|
|
|
/**
|
|
|
|
|
* This module is responsible for Gossiping cluster information. The abstraction maintains the list of live
|
2012-01-30 11:41:41 +01:00
|
|
|
* and dead members. Periodically i.e. every 1 second this module chooses a random member and initiates a round
|
2011-10-26 08:48:16 +02:00
|
|
|
* of Gossip with it. Whenever it gets gossip updates it updates the Failure Detector with the liveness
|
|
|
|
|
* information.
|
|
|
|
|
* <p/>
|
2012-01-30 11:41:41 +01:00
|
|
|
* During each of these runs the member initiates gossip exchange according to following rules (as defined in the
|
2011-10-26 08:48:16 +02:00
|
|
|
* Cassandra documentation [http://wiki.apache.org/cassandra/ArchitectureGossip]:
|
|
|
|
|
* <pre>
|
2012-01-30 11:41:41 +01:00
|
|
|
* 1) Gossip to random live member (if any)
|
|
|
|
|
* 2) Gossip to random unreachable member with certain probability depending on number of unreachable and live members
|
|
|
|
|
* 3) If the member gossiped to at (1) was not seed, or the number of live members is less than number of seeds,
|
|
|
|
|
* gossip to random seed with certain probability depending on number of unreachable, seed and live members.
|
2011-10-26 08:48:16 +02:00
|
|
|
* </pre>
|
|
|
|
|
*/
|
2012-01-30 19:40:28 +01:00
|
|
|
case class Gossiper(remote: RemoteActorRefProvider, system: ActorSystemImpl) {
|
2011-10-26 08:48:16 +02:00
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Represents the state for this Gossiper. Implemented using optimistic lockless concurrency,
|
|
|
|
|
* all state is represented by this immutable case class and managed by an AtomicReference.
|
|
|
|
|
*/
|
|
|
|
|
private case class State(
|
|
|
|
|
currentGossip: Gossip,
|
2012-01-30 11:41:41 +01:00
|
|
|
memberMembershipChangeListeners: Set[NodeMembershipChangeListener] = Set.empty[NodeMembershipChangeListener])
|
2011-10-26 08:48:16 +02:00
|
|
|
|
2012-01-31 15:00:46 +01:00
|
|
|
val remoteSettings = new RemoteSettings(system.settings.config, system.name)
|
|
|
|
|
val clusterSettings = new ClusterSettings(system.settings.config, system.name)
|
2012-01-30 19:40:28 +01:00
|
|
|
|
2012-01-31 15:00:46 +01:00
|
|
|
val protocol = "akka" // TODO should this be hardcoded?
|
|
|
|
|
val address = remote.transport.address
|
2012-01-30 19:40:28 +01:00
|
|
|
|
2012-01-31 15:00:46 +01:00
|
|
|
val memberFingerprint = address.##
|
|
|
|
|
val initialDelayForGossip = clusterSettings.InitialDelayForGossip
|
|
|
|
|
val gossipFrequency = clusterSettings.GossipFrequency
|
|
|
|
|
implicit val seedNodeConnectionTimeout = clusterSettings.SeedNodeConnectionTimeout
|
2012-01-24 12:09:32 +01:00
|
|
|
implicit val defaultTimeout = Timeout(remoteSettings.RemoteSystemDaemonAckTimeout)
|
|
|
|
|
|
2012-01-30 11:41:41 +01:00
|
|
|
// seed members
|
|
|
|
|
private val seeds: Set[Member] = {
|
2012-01-31 15:00:46 +01:00
|
|
|
if (clusterSettings.SeedNodes.isEmpty) throw new ConfigurationException(
|
2012-01-30 11:41:41 +01:00
|
|
|
"At least one seed member must be defined in the configuration [akka.cluster.seed-members]")
|
2012-01-31 15:00:46 +01:00
|
|
|
else clusterSettings.SeedNodes map (address ⇒ Member(address, MemberStatus.Up()))
|
2011-11-25 12:02:25 +01:00
|
|
|
}
|
2011-10-26 08:48:16 +02:00
|
|
|
|
2012-01-31 15:00:46 +01:00
|
|
|
private val serialization = remote.serialization
|
|
|
|
|
private val failureDetector = new AccrualFailureDetector(system, clusterSettings.FailureDetectorThreshold, clusterSettings.FailureDetectorMaxSampleSize)
|
|
|
|
|
|
2012-01-30 11:41:41 +01:00
|
|
|
private val isRunning = new AtomicBoolean(true)
|
2012-01-24 12:09:32 +01:00
|
|
|
private val log = Logging(system, "Gossiper")
|
2011-10-27 15:14:15 +02:00
|
|
|
private val random = SecureRandom.getInstance("SHA1PRNG")
|
2012-01-30 11:41:41 +01:00
|
|
|
|
|
|
|
|
// Is it right to put this guy under the /system path or should we have a top-level /cluster or something else...?
|
2012-01-24 12:09:32 +01:00
|
|
|
private val clusterDaemon = system.systemActorOf(Props(new ClusterDaemon(system, this)), "cluster")
|
2011-10-26 08:48:16 +02:00
|
|
|
private val state = new AtomicReference[State](State(currentGossip = newGossip()))
|
|
|
|
|
|
2012-01-31 13:33:04 +01:00
|
|
|
// FIXME manage connections in some other way so we can delete the RemoteConnectionManager (SINCE IT SUCKS!!!)
|
|
|
|
|
private val connectionManager = new RemoteConnectionManager(system, remote, failureDetector, Map.empty[Address, ActorRef])
|
|
|
|
|
|
2012-01-24 12:09:32 +01:00
|
|
|
log.info("Starting cluster Gossiper...")
|
|
|
|
|
|
2012-01-30 11:41:41 +01:00
|
|
|
// join the cluster by connecting to one of the seed members and retrieve current cluster state (Gossip)
|
2012-02-01 14:29:56 +01:00
|
|
|
joinCluster(clusterSettings.MaxTimeToRetryJoiningCluster fromNow)
|
2012-01-24 12:09:32 +01:00
|
|
|
|
2012-01-30 11:41:41 +01:00
|
|
|
// start periodic gossip and cluster scrutinization
|
2012-01-31 15:00:46 +01:00
|
|
|
val initateGossipCanceller = system.scheduler.schedule(initialDelayForGossip, gossipFrequency)(initateGossip())
|
|
|
|
|
val scrutinizeCanceller = system.scheduler.schedule(initialDelayForGossip, gossipFrequency)(scrutinize())
|
2012-01-24 12:09:32 +01:00
|
|
|
|
|
|
|
|
/**
|
2012-01-30 11:41:41 +01:00
|
|
|
* Shuts down all connections to other members, the cluster daemon and the periodic gossip and cleanup tasks.
|
2012-01-24 12:09:32 +01:00
|
|
|
*/
|
|
|
|
|
def shutdown() {
|
2012-01-30 11:41:41 +01:00
|
|
|
if (isRunning.compareAndSet(true, false)) {
|
2012-01-31 15:07:15 +01:00
|
|
|
log.info("Shutting down Gossiper for [{}]...", address)
|
|
|
|
|
try connectionManager.shutdown() finally {
|
|
|
|
|
try system.stop(clusterDaemon) finally {
|
|
|
|
|
try initateGossipCanceller.cancel() finally {
|
|
|
|
|
try scrutinizeCanceller.cancel() finally {
|
|
|
|
|
log.info("Gossiper for [{}] is shut down", address)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
2012-01-30 11:41:41 +01:00
|
|
|
}
|
2011-10-26 08:48:16 +02:00
|
|
|
}
|
|
|
|
|
|
2012-01-24 12:09:32 +01:00
|
|
|
def latestGossip: Gossip = state.get.currentGossip
|
|
|
|
|
|
2011-10-26 08:48:16 +02:00
|
|
|
/**
|
2012-01-24 12:09:32 +01:00
|
|
|
* Tell the gossiper some gossip.
|
2011-10-26 08:48:16 +02:00
|
|
|
*/
|
2012-01-30 11:41:41 +01:00
|
|
|
//@tailrec
|
2011-10-26 08:48:16 +02:00
|
|
|
final def tell(newGossip: Gossip) {
|
2012-01-30 11:41:41 +01:00
|
|
|
val gossipingNode = newGossip.member
|
2011-10-26 08:48:16 +02:00
|
|
|
|
|
|
|
|
failureDetector heartbeat gossipingNode // update heartbeat in failure detector
|
|
|
|
|
|
2012-01-30 11:41:41 +01:00
|
|
|
// FIXME all below here is WRONG - redesign with cluster convergence in mind
|
|
|
|
|
|
|
|
|
|
// val oldState = state.get
|
|
|
|
|
// println("-------- NEW VERSION " + newGossip)
|
|
|
|
|
// println("-------- OLD VERSION " + oldState.currentGossip)
|
|
|
|
|
// val latestGossip = VectorClock.latestVersionOf(newGossip, oldState.currentGossip)
|
|
|
|
|
// println("-------- WINNING VERSION " + latestGossip)
|
|
|
|
|
|
|
|
|
|
// val latestAvailableNodes = latestGossip.members
|
|
|
|
|
// val latestUnavailableNodes = latestGossip.unavailableMembers
|
|
|
|
|
// println("=======>>> gossipingNode: " + gossipingNode)
|
|
|
|
|
// println("=======>>> latestAvailableNodes: " + latestAvailableNodes)
|
|
|
|
|
// if (!(latestAvailableNodes contains gossipingNode) && !(latestUnavailableNodes contains gossipingNode)) {
|
|
|
|
|
// println("-------- NEW NODE")
|
|
|
|
|
// // we have a new member
|
|
|
|
|
// val newGossip = latestGossip copy (availableNodes = latestAvailableNodes + gossipingNode)
|
|
|
|
|
// val newState = oldState copy (currentGossip = incrementVersionForGossip(newGossip))
|
|
|
|
|
|
|
|
|
|
// println("--------- new GOSSIP " + newGossip.members)
|
|
|
|
|
// println("--------- new STATE " + newState)
|
|
|
|
|
// // if we won the race then update else try again
|
|
|
|
|
// if (!state.compareAndSet(oldState, newState)) tell(newGossip) // recur
|
|
|
|
|
// else {
|
|
|
|
|
// println("---------- WON RACE - setting state")
|
|
|
|
|
// // create connections for all new members in the latest gossip
|
|
|
|
|
// (latestAvailableNodes + gossipingNode) foreach { member ⇒
|
|
|
|
|
// setUpConnectionToNode(member)
|
|
|
|
|
// oldState.memberMembershipChangeListeners foreach (_ memberConnected member) // notify listeners about the new members
|
|
|
|
|
// }
|
|
|
|
|
// }
|
|
|
|
|
|
|
|
|
|
// } else if (latestUnavailableNodes contains gossipingNode) {
|
|
|
|
|
// // gossip from an old former dead member
|
|
|
|
|
|
|
|
|
|
// val newUnavailableMembers = latestUnavailableNodes - gossipingNode
|
|
|
|
|
// val newMembers = latestAvailableNodes + gossipingNode
|
|
|
|
|
|
|
|
|
|
// val newGossip = latestGossip copy (availableNodes = newMembers, unavailableNodes = newUnavailableMembers)
|
|
|
|
|
// val newState = oldState copy (currentGossip = incrementVersionForGossip(newGossip))
|
|
|
|
|
|
|
|
|
|
// // if we won the race then update else try again
|
|
|
|
|
// if (!state.compareAndSet(oldState, newState)) tell(newGossip) // recur
|
|
|
|
|
// else oldState.memberMembershipChangeListeners foreach (_ memberConnected gossipingNode) // notify listeners on successful update of state
|
|
|
|
|
// }
|
2011-10-26 08:48:16 +02:00
|
|
|
}
|
|
|
|
|
|
2012-01-24 12:09:32 +01:00
|
|
|
/**
|
|
|
|
|
* Registers a listener to subscribe to cluster membership changes.
|
|
|
|
|
*/
|
2011-10-26 08:48:16 +02:00
|
|
|
@tailrec
|
|
|
|
|
final def registerListener(listener: NodeMembershipChangeListener) {
|
|
|
|
|
val oldState = state.get
|
2012-01-30 11:41:41 +01:00
|
|
|
val newListeners = oldState.memberMembershipChangeListeners + listener
|
|
|
|
|
val newState = oldState copy (memberMembershipChangeListeners = newListeners)
|
2011-10-26 08:48:16 +02:00
|
|
|
if (!state.compareAndSet(oldState, newState)) registerListener(listener) // recur
|
|
|
|
|
}
|
|
|
|
|
|
2012-01-24 12:09:32 +01:00
|
|
|
/**
|
|
|
|
|
* Unsubscribes to cluster membership changes.
|
|
|
|
|
*/
|
2011-10-26 08:48:16 +02:00
|
|
|
@tailrec
|
|
|
|
|
final def unregisterListener(listener: NodeMembershipChangeListener) {
|
|
|
|
|
val oldState = state.get
|
2012-01-30 11:41:41 +01:00
|
|
|
val newListeners = oldState.memberMembershipChangeListeners - listener
|
|
|
|
|
val newState = oldState copy (memberMembershipChangeListeners = newListeners)
|
2011-10-26 08:48:16 +02:00
|
|
|
if (!state.compareAndSet(oldState, newState)) unregisterListener(listener) // recur
|
|
|
|
|
}
|
|
|
|
|
|
2012-01-24 12:09:32 +01:00
|
|
|
/**
|
2012-01-30 11:41:41 +01:00
|
|
|
* Sets up remote connections to all the members in the argument list.
|
2012-01-24 12:09:32 +01:00
|
|
|
*/
|
2012-01-30 11:41:41 +01:00
|
|
|
private def connectToNodes(members: Seq[Member]) {
|
|
|
|
|
members foreach { member ⇒
|
|
|
|
|
setUpConnectionToNode(member)
|
|
|
|
|
state.get.memberMembershipChangeListeners foreach (_ memberConnected member) // notify listeners about the new members
|
2012-01-24 12:09:32 +01:00
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2012-01-30 11:41:41 +01:00
|
|
|
// FIXME should shuffle list randomly before start traversing to avoid connecting to some member on every member
|
2012-01-24 12:09:32 +01:00
|
|
|
@tailrec
|
2012-01-30 11:41:41 +01:00
|
|
|
final private def connectToRandomNodeOf(members: Seq[Member]): ActorRef = {
|
|
|
|
|
members match {
|
|
|
|
|
case member :: rest ⇒
|
|
|
|
|
setUpConnectionToNode(member) match {
|
2012-01-24 12:09:32 +01:00
|
|
|
case Some(connection) ⇒ connection
|
|
|
|
|
case None ⇒ connectToRandomNodeOf(rest) // recur if
|
|
|
|
|
}
|
|
|
|
|
case Nil ⇒
|
|
|
|
|
throw new RemoteConnectionException(
|
2012-01-30 11:41:41 +01:00
|
|
|
"Could not establish connection to any of the members in the argument list")
|
2012-01-24 12:09:32 +01:00
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
2012-01-30 11:41:41 +01:00
|
|
|
* Joins the cluster by connecting to one of the seed members and retrieve current cluster state (Gossip).
|
2012-01-24 12:09:32 +01:00
|
|
|
*/
|
2012-01-31 15:00:46 +01:00
|
|
|
private def joinCluster(deadline: Deadline) {
|
2012-01-24 12:09:32 +01:00
|
|
|
val seedNodes = seedNodesWithoutMyself // filter out myself
|
|
|
|
|
|
2012-01-30 11:41:41 +01:00
|
|
|
if (!seedNodes.isEmpty) { // if we have seed members to contact
|
2012-01-24 12:09:32 +01:00
|
|
|
connectToNodes(seedNodes)
|
|
|
|
|
|
|
|
|
|
try {
|
2012-01-30 11:41:41 +01:00
|
|
|
log.info("Trying to join cluster through one of the seed members [{}]", seedNodes.mkString(", "))
|
2012-01-24 12:09:32 +01:00
|
|
|
|
|
|
|
|
Await.result(connectToRandomNodeOf(seedNodes) ? JoinCluster, seedNodeConnectionTimeout) match {
|
|
|
|
|
case initialGossip: Gossip ⇒
|
|
|
|
|
// just sets/overwrites the state/gossip regardless of what it was before
|
|
|
|
|
// since it should be treated as the initial state
|
|
|
|
|
state.set(state.get copy (currentGossip = initialGossip))
|
2012-01-30 11:41:41 +01:00
|
|
|
log.debug("Received initial gossip [{}] from seed member", initialGossip)
|
2012-01-24 12:09:32 +01:00
|
|
|
|
|
|
|
|
case unknown ⇒
|
|
|
|
|
throw new IllegalStateException("Expected initial gossip from seed, received [" + unknown + "]")
|
|
|
|
|
}
|
|
|
|
|
} catch {
|
|
|
|
|
case e: Exception ⇒
|
|
|
|
|
log.error(
|
2012-01-30 11:41:41 +01:00
|
|
|
"Could not join cluster through any of the seed members - retrying for another {} seconds",
|
2012-01-31 15:00:46 +01:00
|
|
|
deadline.timeLeft.toSeconds)
|
2012-01-24 12:09:32 +01:00
|
|
|
|
2012-01-30 11:41:41 +01:00
|
|
|
// retry joining the cluster unless
|
|
|
|
|
// 1. Gossiper is shut down
|
|
|
|
|
// 2. The connection time window has expired
|
|
|
|
|
if (isRunning.get) {
|
2012-01-31 15:00:46 +01:00
|
|
|
if (deadline.timeLeft.toMillis > 0) joinCluster(deadline) // recur
|
2012-01-30 11:41:41 +01:00
|
|
|
else throw new RemoteConnectionException(
|
|
|
|
|
"Could not join cluster (any of the seed members) - giving up after trying for " +
|
2012-01-31 15:00:46 +01:00
|
|
|
deadline.time.toSeconds + " seconds")
|
2012-01-30 11:41:41 +01:00
|
|
|
}
|
2012-01-24 12:09:32 +01:00
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2011-10-26 08:48:16 +02:00
|
|
|
/**
|
|
|
|
|
* Initates a new round of gossip.
|
|
|
|
|
*/
|
|
|
|
|
private def initateGossip() {
|
|
|
|
|
val oldState = state.get
|
|
|
|
|
val oldGossip = oldState.currentGossip
|
|
|
|
|
|
2012-01-30 11:41:41 +01:00
|
|
|
val oldMembers = oldGossip.members
|
|
|
|
|
val oldMembersSize = oldMembers.size
|
2011-10-26 08:48:16 +02:00
|
|
|
|
2012-01-30 11:41:41 +01:00
|
|
|
val oldUnavailableMembers = oldGossip.unavailableMembers
|
|
|
|
|
val oldUnavailableMembersSize = oldUnavailableMembers.size
|
2011-10-26 08:48:16 +02:00
|
|
|
|
2012-01-30 11:41:41 +01:00
|
|
|
// 1. gossip to alive members
|
2011-10-26 08:48:16 +02:00
|
|
|
val gossipedToSeed =
|
2012-01-30 11:41:41 +01:00
|
|
|
if (oldUnavailableMembersSize > 0) gossipToRandomNodeOf(oldMembers)
|
2011-10-26 08:48:16 +02:00
|
|
|
else false
|
|
|
|
|
|
2012-01-30 11:41:41 +01:00
|
|
|
// 2. gossip to dead members
|
|
|
|
|
if (oldUnavailableMembersSize > 0) {
|
|
|
|
|
val probability: Double = oldUnavailableMembersSize / (oldMembersSize + 1)
|
|
|
|
|
if (random.nextDouble() < probability) gossipToRandomNodeOf(oldUnavailableMembers)
|
2011-10-26 08:48:16 +02:00
|
|
|
}
|
|
|
|
|
|
2011-10-27 15:14:15 +02:00
|
|
|
// 3. gossip to a seed for facilitating partition healing
|
2012-01-30 19:40:28 +01:00
|
|
|
if ((!gossipedToSeed || oldMembersSize < 1) && (seeds.head != address)) {
|
2012-01-30 11:41:41 +01:00
|
|
|
if (oldMembersSize == 0) gossipToRandomNodeOf(seeds)
|
2011-10-27 15:14:15 +02:00
|
|
|
else {
|
2012-01-30 11:41:41 +01:00
|
|
|
val probability = 1.0 / oldMembersSize + oldUnavailableMembersSize
|
2012-01-24 12:09:32 +01:00
|
|
|
if (random.nextDouble() <= probability) gossipToRandomNodeOf(seeds)
|
2011-10-26 08:48:16 +02:00
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
2012-01-30 11:41:41 +01:00
|
|
|
* Gossips to a random member in the set of members passed in as argument.
|
2012-01-24 12:09:32 +01:00
|
|
|
*
|
2012-02-06 16:59:09 +01:00
|
|
|
* @return 'true' if it gossiped to a "seed" member.
|
2011-10-26 08:48:16 +02:00
|
|
|
*/
|
2012-01-30 11:41:41 +01:00
|
|
|
private def gossipToRandomNodeOf(members: Set[Member]): Boolean = {
|
2012-01-30 19:40:28 +01:00
|
|
|
val peers = members filter (_.address != address) // filter out myself
|
2011-10-26 08:48:16 +02:00
|
|
|
val peer = selectRandomNode(peers)
|
|
|
|
|
val oldState = state.get
|
|
|
|
|
val oldGossip = oldState.currentGossip
|
2012-01-30 11:41:41 +01:00
|
|
|
// if connection can't be established/found => ignore it since the failure detector will take care of the potential problem
|
|
|
|
|
setUpConnectionToNode(peer) foreach { _ ! newGossip }
|
2011-10-26 08:48:16 +02:00
|
|
|
seeds exists (peer == _)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
2012-01-30 11:41:41 +01:00
|
|
|
* Scrutinizes the cluster; marks members detected by the failure detector as unavailable, and notifies all listeners
|
2011-10-26 08:48:16 +02:00
|
|
|
* of the change in the cluster membership.
|
|
|
|
|
*/
|
|
|
|
|
@tailrec
|
|
|
|
|
final private def scrutinize() {
|
|
|
|
|
val oldState = state.get
|
|
|
|
|
val oldGossip = oldState.currentGossip
|
|
|
|
|
|
2012-01-30 11:41:41 +01:00
|
|
|
val oldMembers = oldGossip.members
|
|
|
|
|
val oldUnavailableMembers = oldGossip.unavailableMembers
|
|
|
|
|
val newlyDetectedUnavailableMembers = oldMembers filterNot (member ⇒ failureDetector.isAvailable(member.address))
|
2011-10-26 08:48:16 +02:00
|
|
|
|
2012-01-30 11:41:41 +01:00
|
|
|
if (!newlyDetectedUnavailableMembers.isEmpty) { // we have newly detected members marked as unavailable
|
|
|
|
|
val newMembers = oldMembers diff newlyDetectedUnavailableMembers
|
|
|
|
|
val newUnavailableMembers = oldUnavailableMembers ++ newlyDetectedUnavailableMembers
|
2011-10-26 08:48:16 +02:00
|
|
|
|
2012-01-30 11:41:41 +01:00
|
|
|
val newGossip = oldGossip copy (members = newMembers, unavailableMembers = newUnavailableMembers)
|
2011-10-26 08:48:16 +02:00
|
|
|
val newState = oldState copy (currentGossip = incrementVersionForGossip(newGossip))
|
|
|
|
|
|
|
|
|
|
// if we won the race then update else try again
|
|
|
|
|
if (!state.compareAndSet(oldState, newState)) scrutinize() // recur
|
|
|
|
|
else {
|
|
|
|
|
// notify listeners on successful update of state
|
|
|
|
|
for {
|
2012-01-30 11:41:41 +01:00
|
|
|
deadNode ← newUnavailableMembers
|
|
|
|
|
listener ← oldState.memberMembershipChangeListeners
|
|
|
|
|
} listener memberDisconnected deadNode
|
2011-10-26 08:48:16 +02:00
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2012-01-30 11:41:41 +01:00
|
|
|
private def setUpConnectionToNode(member: Member): Option[ActorRef] = {
|
|
|
|
|
val address = member.address
|
2012-01-24 12:09:32 +01:00
|
|
|
try {
|
|
|
|
|
Some(
|
|
|
|
|
connectionManager.putIfAbsent(
|
2012-01-30 11:41:41 +01:00
|
|
|
address,
|
2012-01-30 19:40:28 +01:00
|
|
|
() ⇒ system.actorFor(RootActorPath(Address(protocol, system.name)) / "system" / "cluster")))
|
2012-01-24 12:09:32 +01:00
|
|
|
} catch {
|
|
|
|
|
case e: Exception ⇒ None
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2012-01-30 11:41:41 +01:00
|
|
|
private def newGossip(): Gossip = Gossip(member = address)
|
2011-10-26 08:48:16 +02:00
|
|
|
|
|
|
|
|
private def incrementVersionForGossip(from: Gossip): Gossip = {
|
2012-01-30 11:41:41 +01:00
|
|
|
val newVersion = from.version.increment(memberFingerprint, newTimestamp)
|
2011-10-26 08:48:16 +02:00
|
|
|
from copy (version = newVersion)
|
|
|
|
|
}
|
|
|
|
|
|
2012-01-30 19:40:28 +01:00
|
|
|
private def seedNodesWithoutMyself: List[Member] = seeds.filter(_.address != address).toList
|
2012-01-24 12:09:32 +01:00
|
|
|
|
2012-01-30 19:40:28 +01:00
|
|
|
private def selectRandomNode(members: Set[Member]): Member = members.toList(random.nextInt(members.size))
|
2011-10-26 08:48:16 +02:00
|
|
|
}
|