Joining member should not be leader, see #3021

* Prefer members with status Up or Leaving, and as
  fallback use Joining, Exiting, Down
* Minor scaladoc fixes
This commit is contained in:
Patrik Nordwall 2013-03-06 16:39:22 +01:00
parent 5b844ec1e6
commit d72d48301a
8 changed files with 79 additions and 20 deletions

View file

@ -182,10 +182,7 @@ class Cluster(val system: ExtendedActorSystem) extends Extension {
* or subclass.
*
* A snapshot of [[akka.cluster.ClusterEvent.CurrentClusterState]]
* will be sent to the subscriber as the first event. When
* `to` Class is a [[akka.cluster.ClusterEvent.InstantMemberEvent]]
* (or subclass) the snapshot event will instead be a
* [[akka.cluster.ClusterEvent.InstantClusterState]].
* will be sent to the subscriber as the first event.
*/
def subscribe(subscriber: ActorRef, to: Class[_]): Unit =
clusterCore ! InternalClusterAction.Subscribe(subscriber, to)

View file

@ -291,8 +291,8 @@ private[cluster] case class EWMA(value: Double, alpha: Double) extends ClusterMe
* Equality of Metric is based on its name.
*
* @param name the metric name
* @param value the metric value, which may or may not be defined, it must be a valid numerical value,
* see [[akka.cluster.MetricNumericConverter.defined()]]
* @param value the metric value, which must be a valid numerical value,
* a valid value is neither negative nor NaN/Infinite.
* @param average the data stream of the metric value, for trending over time. Metrics that are already
* averages (e.g. system load average) or finite (e.g. as number of processors), are not trended.
*/
@ -302,8 +302,8 @@ case class Metric private (name: String, value: Number, private val average: Opt
require(defined(value), s"Invalid Metric [$name] value [$value]")
/**
* If defined ( [[akka.cluster.MetricNumericConverter.defined()]] ), updates the new
* data point, and if defined, updates the data stream. Returns the updated metric.
* Updates the data point, and if defined, updates the data stream (average).
* Returns the updated metric.
*/
def :+(latest: Metric): Metric =
if (this sameAs latest) average match {

View file

@ -191,7 +191,11 @@ private[cluster] case class Gossip(
def isLeader(address: Address): Boolean = leader == Some(address)
def leader: Option[Address] = members.find(_.status != Exiting).orElse(members.headOption).map(_.address)
def leader: Option[Address] = {
if (members.isEmpty) None
else members.find(m m.status != Joining && m.status != Exiting && m.status != Down).
orElse(Some(members.min(Member.leaderStatusOrdering))).map(_.address)
}
def isSingletonCluster: Boolean = members.size == 1

View file

@ -40,6 +40,24 @@ object Member {
else false
}
/**
* INTERNAL API
* Orders the members by their address except that members with status
* Joining, Exiting and Down are ordered last (in that order).
*/
private[cluster] val leaderStatusOrdering: Ordering[Member] = Ordering.fromLessThan[Member] { (a, b)
(a.status, b.status) match {
case (as, bs) if as == bs ordering.compare(a, b) <= 0
case (Down, _) false
case (_, Down) true
case (Exiting, _) false
case (_, Exiting) true
case (Joining, _) false
case (_, Joining) true
case _ ordering.compare(a, b) <= 0
}
}
/**
* `Member` ordering type class, sorts members by host and port.
*/

View file

@ -138,7 +138,7 @@ case class AdaptiveLoadBalancingRouter(
* INTERNAL API.
*
* This strategy is a metrics-aware router which performs load balancing of messages to
* cluster nodes based on cluster metric data. It consumes [[akka.cluster.ClusterMetricsChanged]]
* cluster nodes based on cluster metric data. It consumes [[akka.cluster.ClusterEvent.ClusterMetricsChanged]]
* events and the [[akka.cluster.routing.MetricsSelector]] creates an mix of
* weighted routees based on the node metrics. Messages are routed randomly to the
* weighted routees, i.e. nodes with lower load are more likely to be used than nodes with

View file

@ -183,6 +183,13 @@ trait MultiNodeClusterSpec extends Suite with STMultiNodeSpec { self: MultiNodeS
expectedAddresses.sorted.zipWithIndex.foreach { case (a, i) members(i).address must be(a) }
}
/**
* Note that this can only be used for a cluster with all members
* in Up status, i.e. use `awaitMembersUp` before using this method.
* The reason for that is that the cluster leader is preferably a
* member with status Up or Leaving and that information can't
* be determined from the `RoleName`.
*/
def assertLeader(nodesInCluster: RoleName*): Unit =
if (nodesInCluster.contains(myself)) assertLeaderIn(nodesInCluster.to[immutable.Seq])
@ -190,6 +197,12 @@ trait MultiNodeClusterSpec extends Suite with STMultiNodeSpec { self: MultiNodeS
* Assert that the cluster has elected the correct leader
* out of all nodes in the cluster. First
* member in the cluster ring is expected leader.
*
* Note that this can only be used for a cluster with all members
* in Up status, i.e. use `awaitMembersUp` before using this method.
* The reason for that is that the cluster leader is preferably a
* member with status Up or Leaving and that information can't
* be determined from the `RoleName`.
*/
def assertLeaderIn(nodesInCluster: immutable.Seq[RoleName]): Unit =
if (nodesInCluster.contains(myself)) {
@ -228,13 +241,21 @@ trait MultiNodeClusterSpec extends Suite with STMultiNodeSpec { self: MultiNodeS
def awaitSeenSameState(addresses: Address*): Unit =
awaitCond((addresses.toSet -- clusterView.seenBy).isEmpty)
/**
* Leader according to the address ordering of the roles.
* Note that this can only be used for a cluster with all members
* in Up status, i.e. use `awaitMembersUp` before using this method.
* The reason for that is that the cluster leader is preferably a
* member with status Up or Leaving and that information can't
* be determined from the `RoleName`.
*/
def roleOfLeader(nodesInCluster: immutable.Seq[RoleName] = roles): RoleName = {
nodesInCluster.length must not be (0)
nodesInCluster.sorted.head
}
/**
* Sort the roles in the order used by the cluster.
* Sort the roles in the address order used by the cluster node ring.
*/
implicit val clusterOrdering: Ordering[RoleName] = new Ordering[RoleName] {
import Member.addressOrdering

View file

@ -48,9 +48,10 @@ class MemberOrderingSpec extends WordSpec with MustMatchers {
}
"have stable equals and hashCode" in {
val m1 = Member(Address("akka.tcp", "sys1", "host1", 9000), MemberStatus.Joining)
val m2 = Member(Address("akka.tcp", "sys1", "host1", 9000), MemberStatus.Up)
val m3 = Member(Address("akka.tcp", "sys1", "host1", 10000), MemberStatus.Up)
val address = Address("akka.tcp", "sys1", "host1", 9000)
val m1 = Member(address, MemberStatus.Joining)
val m2 = Member(address, MemberStatus.Up)
val m3 = Member(address.copy(port = Some(10000)), MemberStatus.Up)
m1 must be(m2)
m1.hashCode must be(m2.hashCode)
@ -61,7 +62,7 @@ class MemberOrderingSpec extends WordSpec with MustMatchers {
"have consistent ordering and equals" in {
val address1 = Address("akka.tcp", "sys1", "host1", 9001)
val address2 = Address("akka.tcp", "sys1", "host1", 9002)
val address2 = address1.copy(port = Some(9002))
val x = Member(address1, Exiting)
val y = Member(address1, Removed)
@ -72,8 +73,8 @@ class MemberOrderingSpec extends WordSpec with MustMatchers {
"work with SortedSet" in {
val address1 = Address("akka.tcp", "sys1", "host1", 9001)
val address2 = Address("akka.tcp", "sys1", "host1", 9002)
val address3 = Address("akka.tcp", "sys1", "host1", 9003)
val address2 = address1.copy(port = Some(9002))
val address3 = address1.copy(port = Some(9003))
(SortedSet(Member(address1, MemberStatus.Joining)) - Member(address1, MemberStatus.Up)) must be(SortedSet.empty[Member])
(SortedSet(Member(address1, MemberStatus.Exiting)) - Member(address1, MemberStatus.Removed)) must be(SortedSet.empty[Member])
@ -130,4 +131,22 @@ class MemberOrderingSpec extends WordSpec with MustMatchers {
seq(3) must equal(AddressFromURIString("akka://sys@darkstar2:1111"))
}
}
"Leader status ordering" must {
"order members with status Joining, Exiting and Down last" in {
val address = Address("akka.tcp", "sys1", "host1", 5000)
val m1 = Member(address, MemberStatus.Joining)
val m2 = Member(address.copy(port = Some(7000)), MemberStatus.Joining)
val m3 = Member(address.copy(port = Some(3000)), MemberStatus.Exiting)
val m4 = Member(address.copy(port = Some(6000)), MemberStatus.Exiting)
val m5 = Member(address.copy(port = Some(2000)), MemberStatus.Down)
val m6 = Member(address.copy(port = Some(4000)), MemberStatus.Down)
val m7 = Member(address.copy(port = Some(8000)), MemberStatus.Up)
val m8 = Member(address.copy(port = Some(9000)), MemberStatus.Up)
val expected = IndexedSeq(m7, m8, m1, m2, m3, m4, m5, m6)
val shuffled = Random.shuffle(expected)
shuffled.sorted(Member.leaderStatusOrdering) must be(expected)
}
}
}

View file

@ -159,9 +159,9 @@ Leader
After gossip convergence a ``leader`` for the cluster can be determined. There is no
``leader`` election process, the ``leader`` can always be recognised deterministically
by any node whenever there is gossip convergence. The ``leader`` is simply the first
node in sorted order that is able to take the leadership role, where the only
allowed member states for a ``leader`` are ``up``, ``leaving`` or ``exiting`` (see
below for more information about member states).
node in sorted order that is able to take the leadership role, where the preferred
member states for a ``leader`` are ``up`` and ``leaving`` (see below for more
information about member states).
The role of the ``leader`` is to shift members in and out of the cluster, changing
``joining`` members to the ``up`` state or ``exiting`` members to the