diff --git a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala index 4c2f39df73..da34727370 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala @@ -182,10 +182,7 @@ class Cluster(val system: ExtendedActorSystem) extends Extension { * or subclass. * * A snapshot of [[akka.cluster.ClusterEvent.CurrentClusterState]] - * will be sent to the subscriber as the first event. When - * `to` Class is a [[akka.cluster.ClusterEvent.InstantMemberEvent]] - * (or subclass) the snapshot event will instead be a - * [[akka.cluster.ClusterEvent.InstantClusterState]]. + * will be sent to the subscriber as the first event. */ def subscribe(subscriber: ActorRef, to: Class[_]): Unit = clusterCore ! InternalClusterAction.Subscribe(subscriber, to) diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterMetricsCollector.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterMetricsCollector.scala index 2b756e75a1..b8e56550ad 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterMetricsCollector.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterMetricsCollector.scala @@ -291,8 +291,8 @@ private[cluster] case class EWMA(value: Double, alpha: Double) extends ClusterMe * Equality of Metric is based on its name. * * @param name the metric name - * @param value the metric value, which may or may not be defined, it must be a valid numerical value, - * see [[akka.cluster.MetricNumericConverter.defined()]] + * @param value the metric value, which must be a valid numerical value, + * a valid value is neither negative nor NaN/Infinite. * @param average the data stream of the metric value, for trending over time. Metrics that are already * averages (e.g. system load average) or finite (e.g. as number of processors), are not trended. */ @@ -302,8 +302,8 @@ case class Metric private (name: String, value: Number, private val average: Opt require(defined(value), s"Invalid Metric [$name] value [$value]") /** - * If defined ( [[akka.cluster.MetricNumericConverter.defined()]] ), updates the new - * data point, and if defined, updates the data stream. Returns the updated metric. + * Updates the data point, and if defined, updates the data stream (average). + * Returns the updated metric. */ def :+(latest: Metric): Metric = if (this sameAs latest) average match { diff --git a/akka-cluster/src/main/scala/akka/cluster/Gossip.scala b/akka-cluster/src/main/scala/akka/cluster/Gossip.scala index 9539bcb025..46b250101d 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Gossip.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Gossip.scala @@ -191,7 +191,11 @@ private[cluster] case class Gossip( def isLeader(address: Address): Boolean = leader == Some(address) - def leader: Option[Address] = members.find(_.status != Exiting).orElse(members.headOption).map(_.address) + def leader: Option[Address] = { + if (members.isEmpty) None + else members.find(m ⇒ m.status != Joining && m.status != Exiting && m.status != Down). + orElse(Some(members.min(Member.leaderStatusOrdering))).map(_.address) + } def isSingletonCluster: Boolean = members.size == 1 diff --git a/akka-cluster/src/main/scala/akka/cluster/Member.scala b/akka-cluster/src/main/scala/akka/cluster/Member.scala index c2868c9335..db3b126571 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Member.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Member.scala @@ -40,6 +40,24 @@ object Member { else false } + /** + * INTERNAL API + * Orders the members by their address except that members with status + * Joining, Exiting and Down are ordered last (in that order). + */ + private[cluster] val leaderStatusOrdering: Ordering[Member] = Ordering.fromLessThan[Member] { (a, b) ⇒ + (a.status, b.status) match { + case (as, bs) if as == bs ⇒ ordering.compare(a, b) <= 0 + case (Down, _) ⇒ false + case (_, Down) ⇒ true + case (Exiting, _) ⇒ false + case (_, Exiting) ⇒ true + case (Joining, _) ⇒ false + case (_, Joining) ⇒ true + case _ ⇒ ordering.compare(a, b) <= 0 + } + } + /** * `Member` ordering type class, sorts members by host and port. */ diff --git a/akka-cluster/src/main/scala/akka/cluster/routing/AdaptiveLoadBalancingRouter.scala b/akka-cluster/src/main/scala/akka/cluster/routing/AdaptiveLoadBalancingRouter.scala index 6c3d7a00a0..8af61e7712 100644 --- a/akka-cluster/src/main/scala/akka/cluster/routing/AdaptiveLoadBalancingRouter.scala +++ b/akka-cluster/src/main/scala/akka/cluster/routing/AdaptiveLoadBalancingRouter.scala @@ -138,7 +138,7 @@ case class AdaptiveLoadBalancingRouter( * INTERNAL API. * * This strategy is a metrics-aware router which performs load balancing of messages to - * cluster nodes based on cluster metric data. It consumes [[akka.cluster.ClusterMetricsChanged]] + * cluster nodes based on cluster metric data. It consumes [[akka.cluster.ClusterEvent.ClusterMetricsChanged]] * events and the [[akka.cluster.routing.MetricsSelector]] creates an mix of * weighted routees based on the node metrics. Messages are routed randomly to the * weighted routees, i.e. nodes with lower load are more likely to be used than nodes with diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala index 18dfc0d7ac..144e7b3fa9 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala @@ -183,6 +183,13 @@ trait MultiNodeClusterSpec extends Suite with STMultiNodeSpec { self: MultiNodeS expectedAddresses.sorted.zipWithIndex.foreach { case (a, i) ⇒ members(i).address must be(a) } } + /** + * Note that this can only be used for a cluster with all members + * in Up status, i.e. use `awaitMembersUp` before using this method. + * The reason for that is that the cluster leader is preferably a + * member with status Up or Leaving and that information can't + * be determined from the `RoleName`. + */ def assertLeader(nodesInCluster: RoleName*): Unit = if (nodesInCluster.contains(myself)) assertLeaderIn(nodesInCluster.to[immutable.Seq]) @@ -190,6 +197,12 @@ trait MultiNodeClusterSpec extends Suite with STMultiNodeSpec { self: MultiNodeS * Assert that the cluster has elected the correct leader * out of all nodes in the cluster. First * member in the cluster ring is expected leader. + * + * Note that this can only be used for a cluster with all members + * in Up status, i.e. use `awaitMembersUp` before using this method. + * The reason for that is that the cluster leader is preferably a + * member with status Up or Leaving and that information can't + * be determined from the `RoleName`. */ def assertLeaderIn(nodesInCluster: immutable.Seq[RoleName]): Unit = if (nodesInCluster.contains(myself)) { @@ -228,13 +241,21 @@ trait MultiNodeClusterSpec extends Suite with STMultiNodeSpec { self: MultiNodeS def awaitSeenSameState(addresses: Address*): Unit = awaitCond((addresses.toSet -- clusterView.seenBy).isEmpty) + /** + * Leader according to the address ordering of the roles. + * Note that this can only be used for a cluster with all members + * in Up status, i.e. use `awaitMembersUp` before using this method. + * The reason for that is that the cluster leader is preferably a + * member with status Up or Leaving and that information can't + * be determined from the `RoleName`. + */ def roleOfLeader(nodesInCluster: immutable.Seq[RoleName] = roles): RoleName = { nodesInCluster.length must not be (0) nodesInCluster.sorted.head } /** - * Sort the roles in the order used by the cluster. + * Sort the roles in the address order used by the cluster node ring. */ implicit val clusterOrdering: Ordering[RoleName] = new Ordering[RoleName] { import Member.addressOrdering diff --git a/akka-cluster/src/test/scala/akka/cluster/MemberOrderingSpec.scala b/akka-cluster/src/test/scala/akka/cluster/MemberOrderingSpec.scala index 2442dd0e46..3e19ab1e0c 100644 --- a/akka-cluster/src/test/scala/akka/cluster/MemberOrderingSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/MemberOrderingSpec.scala @@ -48,9 +48,10 @@ class MemberOrderingSpec extends WordSpec with MustMatchers { } "have stable equals and hashCode" in { - val m1 = Member(Address("akka.tcp", "sys1", "host1", 9000), MemberStatus.Joining) - val m2 = Member(Address("akka.tcp", "sys1", "host1", 9000), MemberStatus.Up) - val m3 = Member(Address("akka.tcp", "sys1", "host1", 10000), MemberStatus.Up) + val address = Address("akka.tcp", "sys1", "host1", 9000) + val m1 = Member(address, MemberStatus.Joining) + val m2 = Member(address, MemberStatus.Up) + val m3 = Member(address.copy(port = Some(10000)), MemberStatus.Up) m1 must be(m2) m1.hashCode must be(m2.hashCode) @@ -61,7 +62,7 @@ class MemberOrderingSpec extends WordSpec with MustMatchers { "have consistent ordering and equals" in { val address1 = Address("akka.tcp", "sys1", "host1", 9001) - val address2 = Address("akka.tcp", "sys1", "host1", 9002) + val address2 = address1.copy(port = Some(9002)) val x = Member(address1, Exiting) val y = Member(address1, Removed) @@ -72,8 +73,8 @@ class MemberOrderingSpec extends WordSpec with MustMatchers { "work with SortedSet" in { val address1 = Address("akka.tcp", "sys1", "host1", 9001) - val address2 = Address("akka.tcp", "sys1", "host1", 9002) - val address3 = Address("akka.tcp", "sys1", "host1", 9003) + val address2 = address1.copy(port = Some(9002)) + val address3 = address1.copy(port = Some(9003)) (SortedSet(Member(address1, MemberStatus.Joining)) - Member(address1, MemberStatus.Up)) must be(SortedSet.empty[Member]) (SortedSet(Member(address1, MemberStatus.Exiting)) - Member(address1, MemberStatus.Removed)) must be(SortedSet.empty[Member]) @@ -130,4 +131,22 @@ class MemberOrderingSpec extends WordSpec with MustMatchers { seq(3) must equal(AddressFromURIString("akka://sys@darkstar2:1111")) } } + + "Leader status ordering" must { + + "order members with status Joining, Exiting and Down last" in { + val address = Address("akka.tcp", "sys1", "host1", 5000) + val m1 = Member(address, MemberStatus.Joining) + val m2 = Member(address.copy(port = Some(7000)), MemberStatus.Joining) + val m3 = Member(address.copy(port = Some(3000)), MemberStatus.Exiting) + val m4 = Member(address.copy(port = Some(6000)), MemberStatus.Exiting) + val m5 = Member(address.copy(port = Some(2000)), MemberStatus.Down) + val m6 = Member(address.copy(port = Some(4000)), MemberStatus.Down) + val m7 = Member(address.copy(port = Some(8000)), MemberStatus.Up) + val m8 = Member(address.copy(port = Some(9000)), MemberStatus.Up) + val expected = IndexedSeq(m7, m8, m1, m2, m3, m4, m5, m6) + val shuffled = Random.shuffle(expected) + shuffled.sorted(Member.leaderStatusOrdering) must be(expected) + } + } } diff --git a/akka-docs/rst/cluster/cluster.rst b/akka-docs/rst/cluster/cluster.rst index dfcb4f0a42..fd9c5116f0 100644 --- a/akka-docs/rst/cluster/cluster.rst +++ b/akka-docs/rst/cluster/cluster.rst @@ -159,9 +159,9 @@ Leader After gossip convergence a ``leader`` for the cluster can be determined. There is no ``leader`` election process, the ``leader`` can always be recognised deterministically by any node whenever there is gossip convergence. The ``leader`` is simply the first -node in sorted order that is able to take the leadership role, where the only -allowed member states for a ``leader`` are ``up``, ``leaving`` or ``exiting`` (see -below for more information about member states). +node in sorted order that is able to take the leadership role, where the preferred +member states for a ``leader`` are ``up`` and ``leaving`` (see below for more +information about member states). The role of the ``leader`` is to shift members in and out of the cluster, changing ``joining`` members to the ``up`` state or ``exiting`` members to the