From d957c686390b9799e568bbc5a0c5b17621a3d87b Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Mon, 11 Jun 2012 21:12:57 +0200 Subject: [PATCH] Incorporate feedback from review, see #2214 --- .../src/main/scala/akka/cluster/Cluster.scala | 37 ++++++++++--------- 1 file changed, 19 insertions(+), 18 deletions(-) diff --git a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala index df8f2ec89b..b463c3b0ea 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala @@ -14,18 +14,14 @@ import akka.pattern.ask import akka.util._ import akka.util.duration._ import akka.ConfigurationException - import java.util.concurrent.atomic.{ AtomicReference, AtomicBoolean } import java.util.concurrent.TimeUnit._ import java.util.concurrent.TimeoutException import akka.jsr166y.ThreadLocalRandom - import java.lang.management.ManagementFactory import javax.management._ - import scala.collection.immutable.{ Map, SortedSet } import scala.annotation.tailrec - import com.google.protobuf.ByteString /** @@ -44,6 +40,8 @@ trait MetaDataChangeListener { /** * Base trait for all cluster messages. All ClusterMessage's are serializable. + * + * FIXME Protobuf all ClusterMessages */ sealed trait ClusterMessage extends Serializable @@ -82,6 +80,7 @@ object ClusterAction { /** * Represents the address and the current status of a cluster member node. + * */ class Member(val address: Address, val status: MemberStatus) extends ClusterMessage { override def hashCode = address.## @@ -175,6 +174,10 @@ case class GossipOverview( "])" } +object Gossip { + val emptyMembers: SortedSet[Member] = SortedSet.empty +} + /** * Represents the state of the cluster; cluster ring membership, ring convergence, meta data - all versioned by a vector clock. */ @@ -219,7 +222,7 @@ case class Gossip( // 3. merge members by selecting the single Member with highest MemberStatus out of the Member groups val mergedMembers = - SortedSet.empty[Member] ++ + Gossip.emptyMembers ++ membersGroupedByAddress.values.foldLeft(Vector.empty[Member]) { (acc, members) ⇒ acc :+ members.reduceLeft(Member.highestPriorityOf(_, _)) } @@ -244,7 +247,10 @@ case class Gossip( ")" } -case class Heartbeat(from: Address) +/** + * Sent at regular intervals for failure detection. + */ +case class Heartbeat(from: Address) extends ClusterMessage /** * Manages routing of the different cluster commands. @@ -372,6 +378,8 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒ import clusterSettings._ val selfAddress = remote.transport.address + private val selfHeartbeat = Heartbeat(selfAddress) + val failureDetector = new AccrualFailureDetector( system, selfAddress, FailureDetectorThreshold, FailureDetectorMaxSampleSize) @@ -402,7 +410,7 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒ private val state = { val member = Member(selfAddress, MemberStatus.Joining) - val versionedGossip = Gossip(members = SortedSet.empty[Member] + member) + vclockNode // add me as member and update my vector clock + val versionedGossip = Gossip(members = Gossip.emptyMembers + member) + vclockNode // add me as member and update my vector clock val seenVersionedGossip = versionedGossip seen selfAddress new AtomicReference[State](State(seenVersionedGossip)) } @@ -757,9 +765,7 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒ /** * INTERNAL API */ - private[cluster] def receiveHeartbeat(from: Address): Unit = { - failureDetector heartbeat from - } + private[cluster] def receiveHeartbeat(from: Address): Unit = failureDetector heartbeat from /** * Joins the pre-configured contact point. @@ -785,10 +791,7 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒ val newSelf = localSelf copy (status = newStatus) // change my state in 'gossip.members' - val newMembers = localMembers map { member ⇒ - if (member.address == selfAddress) newSelf - else member - } + val newMembers = localMembers map { member ⇒ if (member.address == selfAddress) newSelf else member } val newGossip = localGossip copy (members = newMembers) @@ -893,13 +896,11 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒ if (!isSingletonCluster(localState)) { val liveMembers = localState.latestGossip.members.toIndexedSeq - val unreachableMembers = localState.latestGossip.overview.unreachable - // FIXME use unreachable? - for (member ← (liveMembers ++ unreachableMembers); if member.address != selfAddress) { + for (member ← liveMembers; if member.address != selfAddress) { val connection = clusterGossipConnectionFor(member.address) log.debug("Cluster Node [{}] - Heartbeat to [{}]", selfAddress, connection) - connection ! Heartbeat(selfAddress) + connection ! selfHeartbeat } } }