Merge branch 'master' into wip-2263-gossip-unreachable-patriknw

Conflicts:
	akka-cluster/src/main/scala/akka/cluster/Cluster.scala
This commit is contained in:
Patrik Nordwall 2012-06-25 20:59:50 +02:00
commit 2cd38e2004
22 changed files with 304 additions and 138 deletions

View file

@ -11,7 +11,7 @@ import akka.dispatch.Await
import akka.dispatch.MonitorableThreadFactory
import akka.event.Logging
import akka.jsr166y.ThreadLocalRandom
import akka.pattern.ask
import akka.pattern._
import akka.remote._
import akka.routing._
import akka.util._
@ -55,11 +55,29 @@ sealed trait ClusterMessage extends Serializable
object ClusterUserAction {
/**
* Command to join the cluster. Sent when a node (reprsesented by 'address')
* Command to join the cluster. Sent when a node (represented by 'address')
* wants to join another node (the receiver).
*/
case class Join(address: Address) extends ClusterMessage
/**
* Start message of the process to join one of the seed nodes.
* The node sends `InitJoin` to all seed nodes, which replies
* with `InitJoinAck`. The first reply is used others are discarded.
* The node sends `Join` command to the seed node that replied first.
*/
case object JoinSeedNode extends ClusterMessage
/**
* @see JoinSeedNode
*/
case object InitJoin extends ClusterMessage
/**
* @see JoinSeedNode
*/
case class InitJoinAck(address: Address) extends ClusterMessage
/**
* Command to leave the cluster.
*/
@ -346,11 +364,27 @@ private[cluster] final class ClusterCommandDaemon(cluster: Cluster) extends Acto
val log = Logging(context.system, this)
def receive = {
case Join(address) cluster.joining(address)
case Down(address) cluster.downing(address)
case Leave(address) cluster.leaving(address)
case Exit(address) cluster.exiting(address)
case Remove(address) cluster.removing(address)
case JoinSeedNode joinSeedNode()
case InitJoin sender ! InitJoinAck(cluster.selfAddress)
case InitJoinAck(address) cluster.join(address)
case Join(address) cluster.joining(address)
case Down(address) cluster.downing(address)
case Leave(address) cluster.leaving(address)
case Exit(address) cluster.exiting(address)
case Remove(address) cluster.removing(address)
}
def joinSeedNode(): Unit = {
val seedRoutees = for (address cluster.seedNodes; if address != cluster.selfAddress)
yield self.path.toStringWithAddress(address)
if (seedRoutees.nonEmpty) {
implicit val within = Timeout(cluster.clusterSettings.SeedNodeTimeout)
val seedRouter = context.actorOf(
Props.empty.withRouter(ScatterGatherFirstCompletedRouter(
routees = seedRoutees, within = within.duration)))
seedRouter ? InitJoin pipeTo self
seedRouter ! PoisonPill
}
}
override def unhandled(unknown: Any) = log.error("Illegal command [{}]", unknown)
@ -480,8 +514,6 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector)
implicit private val defaultTimeout = Timeout(remoteSettings.RemoteSystemDaemonAckTimeout)
private val nodeToJoin: Option[Address] = NodeToJoin filter (_ != selfAddress)
private val serialization = remote.serialization
private val _isRunning = new AtomicBoolean(true)
@ -508,8 +540,8 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector)
new AtomicReference[State](State(seenVersionedGossip))
}
// try to join the node defined in the 'akka.cluster.node-to-join' option
autoJoin()
// try to join one of the nodes defined in the 'akka.cluster.seed-nodes'
if (AutoJoin) joinSeedNode()
// ========================================================
// ===================== WORK DAEMONS =====================
@ -647,6 +679,12 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector)
*/
def isAvailable: Boolean = !isUnavailable(state.get)
/**
* Make it possible to override/configure seedNodes from tests without
* specifying in config. Addresses are unknown before startup time.
*/
def seedNodes: IndexedSeq[Address] = SeedNodes
/**
* Registers a listener to subscribe to cluster membership changes.
*/
@ -950,9 +988,9 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector)
private[cluster] def receiveHeartbeat(from: Address): Unit = failureDetector heartbeat from
/**
* Joins the pre-configured contact point.
* Joins the pre-configured contact points.
*/
private def autoJoin(): Unit = nodeToJoin foreach join
private def joinSeedNode(): Unit = clusterCommandDaemon ! ClusterUserAction.JoinSeedNode
/**
* INTERNAL API.
@ -1016,8 +1054,8 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector)
// 2. gossip to a deputy nodes for facilitating partition healing
val deputies = deputyNodes(localMemberAddresses)
val alreadyGossipedToDeputy = gossipedToAlive.map(deputies.contains(_)).getOrElse(false)
if ((!alreadyGossipedToDeputy || localMembersSize < NrOfDeputyNodes) && deputies.nonEmpty) {
val probability = gossipToDeputyProbablity(localMembersSize, localUnreachableSize, NrOfDeputyNodes)
if ((!alreadyGossipedToDeputy || localMembersSize < seedNodes.size) && deputies.nonEmpty) {
val probability = gossipToDeputyProbablity(localMembersSize, localUnreachableSize, seedNodes.size)
if (ThreadLocalRandom.current.nextDouble() < probability)
gossipToRandomNodeOf(deputies)
}
@ -1360,7 +1398,7 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector)
* Gets the addresses of a all the 'deputy' nodes - excluding this node if part of the group.
*/
private def deputyNodes(addresses: IndexedSeq[Address]): IndexedSeq[Address] =
addresses drop 1 take NrOfDeputyNodes filterNot (_ == selfAddress)
addresses filterNot (_ == selfAddress) intersect seedNodes
/**
* INTERNAL API.