Start sending heartbeats immediately when joining, see #2249

* Keep track of joins that are in progress in State.joinInProgress,
  with Deadline
* Add test that fails without this feature
This commit is contained in:
Patrik Nordwall 2012-06-20 08:26:41 +02:00
parent 09e92b6bd9
commit 529c25f3dc
6 changed files with 131 additions and 32 deletions

View file

@ -194,8 +194,8 @@ object MemberStatus {
* Represents the overview of the cluster, holds the cluster convergence table and set with unreachable nodes.
*/
case class GossipOverview(
seen: Map[Address, VectorClock] = Map.empty[Address, VectorClock],
unreachable: Set[Member] = Set.empty[Member]) {
seen: Map[Address, VectorClock] = Map.empty,
unreachable: Set[Member] = Set.empty) {
override def toString =
"GossipOverview(seen = [" + seen.mkString(", ") +
@ -241,7 +241,7 @@ object Gossip {
case class Gossip(
overview: GossipOverview = GossipOverview(),
members: SortedSet[Member], // sorted set of members with their status, sorted by address
meta: Map[String, Array[Byte]] = Map.empty[String, Array[Byte]],
meta: Map[String, Array[Byte]] = Map.empty,
version: VectorClock = VectorClock()) // vector clock version
extends ClusterMessage // is a serializable cluster message
with Versioned[Gossip] {
@ -463,7 +463,8 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector)
*/
private case class State(
latestGossip: Gossip,
memberMembershipChangeListeners: Set[MembershipChangeListener] = Set.empty[MembershipChangeListener])
joinInProgress: Map[Address, Deadline] = Map.empty,
memberMembershipChangeListeners: Set[MembershipChangeListener] = Set.empty)
if (!system.provider.isInstanceOf[RemoteActorRefProvider])
throw new ConfigurationException("ActorSystem[" + system + "] needs to have a 'RemoteActorRefProvider' enabled in the configuration")
@ -674,11 +675,18 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector)
* Try to join this cluster node with the node specified by 'address'.
* A 'Join(thisNodeAddress)' command is sent to the node to join.
*/
def join(address: Address): Unit = {
val connection = clusterCommandConnectionFor(address)
val command = ClusterUserAction.Join(selfAddress)
log.info("Cluster Node [{}] - Trying to send JOIN to [{}] through connection [{}]", selfAddress, address, connection)
connection ! command
@tailrec
final def join(address: Address): Unit = {
val localState = state.get
val newState = localState copy (joinInProgress = localState.joinInProgress +
(address -> (Deadline.now + JoinTimeout)))
if (!state.compareAndSet(localState, newState)) join(address) // recur
else {
val connection = clusterCommandConnectionFor(address)
val command = ClusterUserAction.Join(selfAddress)
log.info("Cluster Node [{}] - Trying to send JOIN to [{}] through connection [{}]", selfAddress, address, connection)
connection ! command
}
}
/**
@ -913,7 +921,15 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector)
remoteGossip
}
val newState = localState copy (latestGossip = winningGossip seen selfAddress)
val newJoinInProgress =
if (localState.joinInProgress.isEmpty) localState.joinInProgress
else localState.joinInProgress --
winningGossip.members.map(_.address) --
winningGossip.overview.unreachable.map(_.address)
val newState = localState copy (
latestGossip = winningGossip seen selfAddress,
joinInProgress = newJoinInProgress)
// if we won the race then update else try again
if (!state.compareAndSet(localState, newState)) receiveGossip(from, remoteGossip) // recur if we fail the update
@ -1023,16 +1039,15 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector)
* INTERNAL API.
*/
private[cluster] def heartbeat(): Unit = {
removeOverdueJoinInProgress()
val localState = state.get
if (!isSingletonCluster(localState)) {
val liveMembers = localState.latestGossip.members.toIndexedSeq
val beatTo = localState.latestGossip.members.toSeq.map(_.address) ++ localState.joinInProgress.keys
for (member liveMembers; if member.address != selfAddress) {
val connection = clusterGossipConnectionFor(member.address)
log.debug("Cluster Node [{}] - Heartbeat to [{}]", selfAddress, connection)
connection ! selfHeartbeat
}
for (address beatTo; if address != selfAddress) {
val connection = clusterGossipConnectionFor(address)
log.debug("Cluster Node [{}] - Heartbeat to [{}]", selfAddress, connection)
connection ! selfHeartbeat
}
}
@ -1080,6 +1095,23 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector)
}
}
/**
* INTERNAL API.
*
* Removes overdue joinInProgress from State.
*/
@tailrec
final private[cluster] def removeOverdueJoinInProgress(): Unit = {
val localState = state.get
val overdueJoins = localState.joinInProgress collect {
case (address, deadline) if deadline.isOverdue address
}
if (overdueJoins.nonEmpty) {
val newState = localState copy (joinInProgress = localState.joinInProgress -- overdueJoins)
if (!state.compareAndSet(localState, newState)) removeOverdueJoinInProgress() // recur
}
}
/**
* INTERNAL API.
*