Incorporate feedback from review, see #2214
This commit is contained in:
parent
e2551494c4
commit
d957c68639
1 changed files with 19 additions and 18 deletions
|
|
@ -14,18 +14,14 @@ import akka.pattern.ask
|
|||
import akka.util._
|
||||
import akka.util.duration._
|
||||
import akka.ConfigurationException
|
||||
|
||||
import java.util.concurrent.atomic.{ AtomicReference, AtomicBoolean }
|
||||
import java.util.concurrent.TimeUnit._
|
||||
import java.util.concurrent.TimeoutException
|
||||
import akka.jsr166y.ThreadLocalRandom
|
||||
|
||||
import java.lang.management.ManagementFactory
|
||||
import javax.management._
|
||||
|
||||
import scala.collection.immutable.{ Map, SortedSet }
|
||||
import scala.annotation.tailrec
|
||||
|
||||
import com.google.protobuf.ByteString
|
||||
|
||||
/**
|
||||
|
|
@ -44,6 +40,8 @@ trait MetaDataChangeListener {
|
|||
|
||||
/**
|
||||
* Base trait for all cluster messages. All ClusterMessage's are serializable.
|
||||
*
|
||||
* FIXME Protobuf all ClusterMessages
|
||||
*/
|
||||
sealed trait ClusterMessage extends Serializable
|
||||
|
||||
|
|
@ -82,6 +80,7 @@ object ClusterAction {
|
|||
|
||||
/**
|
||||
* Represents the address and the current status of a cluster member node.
|
||||
*
|
||||
*/
|
||||
class Member(val address: Address, val status: MemberStatus) extends ClusterMessage {
|
||||
override def hashCode = address.##
|
||||
|
|
@ -175,6 +174,10 @@ case class GossipOverview(
|
|||
"])"
|
||||
}
|
||||
|
||||
object Gossip {
|
||||
val emptyMembers: SortedSet[Member] = SortedSet.empty
|
||||
}
|
||||
|
||||
/**
|
||||
* Represents the state of the cluster; cluster ring membership, ring convergence, meta data - all versioned by a vector clock.
|
||||
*/
|
||||
|
|
@ -219,7 +222,7 @@ case class Gossip(
|
|||
|
||||
// 3. merge members by selecting the single Member with highest MemberStatus out of the Member groups
|
||||
val mergedMembers =
|
||||
SortedSet.empty[Member] ++
|
||||
Gossip.emptyMembers ++
|
||||
membersGroupedByAddress.values.foldLeft(Vector.empty[Member]) { (acc, members) ⇒
|
||||
acc :+ members.reduceLeft(Member.highestPriorityOf(_, _))
|
||||
}
|
||||
|
|
@ -244,7 +247,10 @@ case class Gossip(
|
|||
")"
|
||||
}
|
||||
|
||||
case class Heartbeat(from: Address)
|
||||
/**
|
||||
* Sent at regular intervals for failure detection.
|
||||
*/
|
||||
case class Heartbeat(from: Address) extends ClusterMessage
|
||||
|
||||
/**
|
||||
* Manages routing of the different cluster commands.
|
||||
|
|
@ -372,6 +378,8 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒
|
|||
import clusterSettings._
|
||||
|
||||
val selfAddress = remote.transport.address
|
||||
private val selfHeartbeat = Heartbeat(selfAddress)
|
||||
|
||||
val failureDetector = new AccrualFailureDetector(
|
||||
system, selfAddress, FailureDetectorThreshold, FailureDetectorMaxSampleSize)
|
||||
|
||||
|
|
@ -402,7 +410,7 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒
|
|||
|
||||
private val state = {
|
||||
val member = Member(selfAddress, MemberStatus.Joining)
|
||||
val versionedGossip = Gossip(members = SortedSet.empty[Member] + member) + vclockNode // add me as member and update my vector clock
|
||||
val versionedGossip = Gossip(members = Gossip.emptyMembers + member) + vclockNode // add me as member and update my vector clock
|
||||
val seenVersionedGossip = versionedGossip seen selfAddress
|
||||
new AtomicReference[State](State(seenVersionedGossip))
|
||||
}
|
||||
|
|
@ -757,9 +765,7 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒
|
|||
/**
|
||||
* INTERNAL API
|
||||
*/
|
||||
private[cluster] def receiveHeartbeat(from: Address): Unit = {
|
||||
failureDetector heartbeat from
|
||||
}
|
||||
private[cluster] def receiveHeartbeat(from: Address): Unit = failureDetector heartbeat from
|
||||
|
||||
/**
|
||||
* Joins the pre-configured contact point.
|
||||
|
|
@ -785,10 +791,7 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒
|
|||
val newSelf = localSelf copy (status = newStatus)
|
||||
|
||||
// change my state in 'gossip.members'
|
||||
val newMembers = localMembers map { member ⇒
|
||||
if (member.address == selfAddress) newSelf
|
||||
else member
|
||||
}
|
||||
val newMembers = localMembers map { member ⇒ if (member.address == selfAddress) newSelf else member }
|
||||
|
||||
val newGossip = localGossip copy (members = newMembers)
|
||||
|
||||
|
|
@ -893,13 +896,11 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒
|
|||
|
||||
if (!isSingletonCluster(localState)) {
|
||||
val liveMembers = localState.latestGossip.members.toIndexedSeq
|
||||
val unreachableMembers = localState.latestGossip.overview.unreachable
|
||||
|
||||
// FIXME use unreachable?
|
||||
for (member ← (liveMembers ++ unreachableMembers); if member.address != selfAddress) {
|
||||
for (member ← liveMembers; if member.address != selfAddress) {
|
||||
val connection = clusterGossipConnectionFor(member.address)
|
||||
log.debug("Cluster Node [{}] - Heartbeat to [{}]", selfAddress, connection)
|
||||
connection ! Heartbeat(selfAddress)
|
||||
connection ! selfHeartbeat
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue